Domain Modules
Vị trí: app/modules/ — chứa logic nghiệp vụ xử lý dữ liệu, chia theo domain.
Cấu Trúc Module
Mỗi module được tổ chức theo cấu trúc:
app/modules/
└── <tên_module>/
├── constants/ # Biến, hằng số dùng chung
├── helpers/ # Hàm tiện ích
├── schemas/ # Pydantic models (DTO / validation)
├── processors/ # Logic xử lý & điều phối
└── sources/ # Logic lấy dữ liệu từ nguồn bên ngoài
| Thư mục | Nhiệm vụ | Ví dụ |
|---|---|---|
sources/ | Gọi API bên ngoài, lấy dữ liệu thô | FundFmarketApi — gọi API FMarket |
schemas/ | Định nghĩa cấu trúc dữ liệu (Pydantic v2) | FmarketFund, FmarketFundDetail |
processors/ | Điều phối: fetch → transform → persist | FmarketEodProcessor |
helpers/ | Hàm tiện ích cho module | convert_days(), to_slug() |
constants/ | Hằng số domain-specific | — |
Mối Quan Hệ Worker → Processor → Source/Repository
Pattern: Worker chỉ là "trigger" — logic nghiệp vụ nằm trong Processor.
Ví Dụ Thực Tế: Module funds
Source: FundFmarketApi
Vị trí: app/modules/funds/sources/fund_fmarket_api.py
Gọi API FMarket để lấy dữ liệu quỹ đầu tư:
class FundFmarketApi:
def __init__(self):
self.base_url = "https://api.fmarket.vn"
self.http_client = HttpClient()
async def get_fund_list(self):
"""Lấy danh sách quỹ (STOCK, BOND, BALANCED, MMF)."""
url = f"{self.base_url}/res/products/filter"
response = self.http_client.post(url, json={...})
return [FmarketFund.model_validate(fund) for fund in rows]
async def get_fund_detail(self, fund_code: str):
"""Lấy thông tin chi tiết quỹ."""
...
async def get_nav_history(self, productId: str):
"""Lấy lịch sử NAV."""
...
Đặc điểm:
- Sử dụng
HttpClient(có retry, proxy, fingerprint) - Validate response bằng Pydantic schema (
FmarketFund.model_validate())
Schema: FmarketFund, FmarketFundDetail
Vị trí: app/modules/funds/schemas/
Pydantic v2 models với camelCase alias mapping:
class FmarketFund(BaseModel):
model_config = ConfigDict(populate_by_name=True)
id: Optional[int] = Field(None, alias="id")
code: Optional[str] = Field(None, alias="code")
name: Optional[str] = Field(None, alias="name")
nav_to_12_months: Optional[float] = Field(None, alias="navTo12Months")
...
Processor: FmarketEodProcessor
Vị trí: app/modules/funds/processors/fmarket_eod_processor.py
Điều phối toàn bộ quy trình xử lý dữ liệu EOD:
class FmarketEodProcessor:
def __init__(self):
# Khởi tạo UnitOfWork cho nhiều databases
self.prices_uow = db.get_unit_of_work("simplize_postgres_dev", "simplize_historical_prices")
self.company_uow = db.get_unit_of_work("simplize_mysql_dev", "simplize_company_dev")
self.fund_fmarket_api = FundFmarketApi()
# Lấy repositories
self.fund_profile_repository = self.company_uow.get_repository(FundProfileRepository)
self.fund_nav_repository = self.company_uow.get_repository(FundNavRepository)
...
async def process_eod(self):
# 1. Fetch danh sách quỹ
fund_list = await self.fund_fmarket_api.get_fund_list()
# 2. Với mỗi quỹ: fetch detail + NAV history
for item in fund_list:
raw_data = await self.fund_fmarket_api.get_fund_detail(item.code)
...
# 3. Upsert vào DB
self.fund_profile_repository.upsert_many(overview_results, ["fund_code"])
self.fund_holdings_repository.upsert_many(holding_data, [...])
# 4. Commit cả hai UnitOfWork
self.company_uow.commit()
self.prices_uow.commit()
Đặc điểm:
- Sử dụng nhiều databases đồng thời (MySQL + PostgreSQL)
- Batch upsert với
upsert_many()cho performance - Commit ở cuối cùng (transaction boundary ở processor level)
Helpers
Vị trí: app/modules/funds/helpers/
datetime_utils.py—convert_days([1,2,3])→["MONDAY", "TUESDAY", "WEDNESDAY"]slugtify.py—to_slug(text)— chuyển text thành slug
Thêm Module Mới
Bước 1: Tạo cấu trúc thư mục
app/modules/my_module/
├── constants/
├── helpers/
├── schemas/
├── processors/
└── sources/
Bước 2: Tạo Source (lấy dữ liệu)
# app/modules/my_module/sources/my_api.py
from app.plugins.request.http_client import HttpClient
class MyExternalApi:
def __init__(self):
self.http_client = HttpClient(request_delay=(0.5, 1.0))
def get_data(self):
return self.http_client.get_json("https://api.example.com/data")
Bước 3: Tạo Schema (validation)
# app/modules/my_module/schemas/my_data.py
from pydantic import BaseModel, Field
from typing import Optional
class MyData(BaseModel):
id: int
name: str
value: Optional[float] = None
Bước 4: Tạo Processor (xử lý)
# app/modules/my_module/processors/my_processor.py
from app.core.orchestrators.database import db
from app.modules.my_module.sources.my_api import MyExternalApi
from app.database.my_server.my_db.repositories.my_repository import MyRepository
from app.plugins.logging.logger import logger
class MyProcessor:
def __init__(self):
self.api = MyExternalApi()
self.uow = db.get_unit_of_work("my_server", "my_database")
self.repo = self.uow.get_repository(MyRepository)
def run(self):
logger.info("Fetching data...")
raw_data = self.api.get_data()
logger.info("Processing...")
processed = self._transform(raw_data)
logger.info("Persisting...")
self.repo.upsert_many(processed, ["id"])
self.uow.commit()
return {"records": len(processed)}
def _transform(self, raw_data):
# Logic biến đổi dữ liệu
return [...]
Bước 5: Kết nối với Worker
# app/workers/schedules/my_schedule.py
@schedule(id="my_schedule", name="My Schedule", cron="0 9 * * *")
class MySchedule(ScheduleBaseWorker):
def run(self):
from app.modules.my_module.processors.my_processor import MyProcessor
return MyProcessor().run()