Chuyển tới nội dung chính

Domain Modules

Vị trí: app/modules/ — chứa logic nghiệp vụ xử lý dữ liệu, chia theo domain.


Cấu Trúc Module

Mỗi module được tổ chức theo cấu trúc:

app/modules/
└── <tên_module>/
├── constants/ # Biến, hằng số dùng chung
├── helpers/ # Hàm tiện ích
├── schemas/ # Pydantic models (DTO / validation)
├── processors/ # Logic xử lý & điều phối
└── sources/ # Logic lấy dữ liệu từ nguồn bên ngoài
Thư mụcNhiệm vụVí dụ
sources/Gọi API bên ngoài, lấy dữ liệu thôFundFmarketApi — gọi API FMarket
schemas/Định nghĩa cấu trúc dữ liệu (Pydantic v2)FmarketFund, FmarketFundDetail
processors/Điều phối: fetch → transform → persistFmarketEodProcessor
helpers/Hàm tiện ích cho moduleconvert_days(), to_slug()
constants/Hằng số domain-specific

Mối Quan Hệ Worker → Processor → Source/Repository

Pattern: Worker chỉ là "trigger" — logic nghiệp vụ nằm trong Processor.


Ví Dụ Thực Tế: Module funds

Source: FundFmarketApi

Vị trí: app/modules/funds/sources/fund_fmarket_api.py

Gọi API FMarket để lấy dữ liệu quỹ đầu tư:

class FundFmarketApi:
def __init__(self):
self.base_url = "https://api.fmarket.vn"
self.http_client = HttpClient()

async def get_fund_list(self):
"""Lấy danh sách quỹ (STOCK, BOND, BALANCED, MMF)."""
url = f"{self.base_url}/res/products/filter"
response = self.http_client.post(url, json={...})
return [FmarketFund.model_validate(fund) for fund in rows]

async def get_fund_detail(self, fund_code: str):
"""Lấy thông tin chi tiết quỹ."""
...

async def get_nav_history(self, productId: str):
"""Lấy lịch sử NAV."""
...

Đặc điểm:

  • Sử dụng HttpClient (có retry, proxy, fingerprint)
  • Validate response bằng Pydantic schema (FmarketFund.model_validate())

Schema: FmarketFund, FmarketFundDetail

Vị trí: app/modules/funds/schemas/

Pydantic v2 models với camelCase alias mapping:

class FmarketFund(BaseModel):
model_config = ConfigDict(populate_by_name=True)

id: Optional[int] = Field(None, alias="id")
code: Optional[str] = Field(None, alias="code")
name: Optional[str] = Field(None, alias="name")
nav_to_12_months: Optional[float] = Field(None, alias="navTo12Months")
...

Processor: FmarketEodProcessor

Vị trí: app/modules/funds/processors/fmarket_eod_processor.py

Điều phối toàn bộ quy trình xử lý dữ liệu EOD:

class FmarketEodProcessor:
def __init__(self):
# Khởi tạo UnitOfWork cho nhiều databases
self.prices_uow = db.get_unit_of_work("simplize_postgres_dev", "simplize_historical_prices")
self.company_uow = db.get_unit_of_work("simplize_mysql_dev", "simplize_company_dev")
self.fund_fmarket_api = FundFmarketApi()

# Lấy repositories
self.fund_profile_repository = self.company_uow.get_repository(FundProfileRepository)
self.fund_nav_repository = self.company_uow.get_repository(FundNavRepository)
...

async def process_eod(self):
# 1. Fetch danh sách quỹ
fund_list = await self.fund_fmarket_api.get_fund_list()

# 2. Với mỗi quỹ: fetch detail + NAV history
for item in fund_list:
raw_data = await self.fund_fmarket_api.get_fund_detail(item.code)
...

# 3. Upsert vào DB
self.fund_profile_repository.upsert_many(overview_results, ["fund_code"])
self.fund_holdings_repository.upsert_many(holding_data, [...])

# 4. Commit cả hai UnitOfWork
self.company_uow.commit()
self.prices_uow.commit()

Đặc điểm:

  • Sử dụng nhiều databases đồng thời (MySQL + PostgreSQL)
  • Batch upsert với upsert_many() cho performance
  • Commit ở cuối cùng (transaction boundary ở processor level)

Helpers

Vị trí: app/modules/funds/helpers/

  • datetime_utils.pyconvert_days([1,2,3])["MONDAY", "TUESDAY", "WEDNESDAY"]
  • slugtify.pyto_slug(text) — chuyển text thành slug

Thêm Module Mới

Bước 1: Tạo cấu trúc thư mục

app/modules/my_module/
├── constants/
├── helpers/
├── schemas/
├── processors/
└── sources/

Bước 2: Tạo Source (lấy dữ liệu)

# app/modules/my_module/sources/my_api.py
from app.plugins.request.http_client import HttpClient

class MyExternalApi:
def __init__(self):
self.http_client = HttpClient(request_delay=(0.5, 1.0))

def get_data(self):
return self.http_client.get_json("https://api.example.com/data")

Bước 3: Tạo Schema (validation)

# app/modules/my_module/schemas/my_data.py
from pydantic import BaseModel, Field
from typing import Optional

class MyData(BaseModel):
id: int
name: str
value: Optional[float] = None

Bước 4: Tạo Processor (xử lý)

# app/modules/my_module/processors/my_processor.py
from app.core.orchestrators.database import db
from app.modules.my_module.sources.my_api import MyExternalApi
from app.database.my_server.my_db.repositories.my_repository import MyRepository
from app.plugins.logging.logger import logger

class MyProcessor:
def __init__(self):
self.api = MyExternalApi()
self.uow = db.get_unit_of_work("my_server", "my_database")
self.repo = self.uow.get_repository(MyRepository)

def run(self):
logger.info("Fetching data...")
raw_data = self.api.get_data()

logger.info("Processing...")
processed = self._transform(raw_data)

logger.info("Persisting...")
self.repo.upsert_many(processed, ["id"])
self.uow.commit()

return {"records": len(processed)}

def _transform(self, raw_data):
# Logic biến đổi dữ liệu
return [...]

Bước 5: Kết nối với Worker

# app/workers/schedules/my_schedule.py
@schedule(id="my_schedule", name="My Schedule", cron="0 9 * * *")
class MySchedule(ScheduleBaseWorker):
def run(self):
from app.modules.my_module.processors.my_processor import MyProcessor
return MyProcessor().run()