Kiến Trúc Database
Tổng Quan 3 Tầng
| Tầng | Vị trí | Mô tả |
|---|---|---|
| Core MySQL | app/core/database/ | Tables nội bộ: config, worker_status, execution_log |
| User Databases | app/database/<server>/<db>/ | Dữ liệu ứng dụng (models + repositories) |
| Connection Registry | @database decorator | Đăng ký server, inject helper methods |
1. Core Database
Vị trí: app/core/database/
Core MySQL lưu trữ trạng thái nội bộ của hệ thống. Database tên crawler_core (cấu hình qua CORE_DB_NAME).
Bảng worker_status
Lưu trạng thái enable/disable và thông tin của workers.
| Cột | Kiểu | Mô tả |
|---|---|---|
id | BigInteger (PK) | Auto-increment |
worker_id | String(255) | ID worker, unique, indexed |
name | String(500) | Tên hiển thị |
type | Enum | SCHEDULE_WORKER hoặc WEBSOCKET_LISTENER |
enabled | Boolean | True = hoạt động, False = bị tắt |
status | String(50) | IDLE, RUNNING, COMPLETED, ERROR, ACTIVE, DISABLE, STOP |
last_error | String(2000) | Lỗi gần nhất |
created_at | DateTime | Thời gian tạo |
updated_at | DateTime | Thời gian cập nhật gần nhất |
Bảng config
Config key-value, hỗ trợ config chung (global) và config riêng worker.
| Cột | Kiểu | Mô tả |
|---|---|---|
id | BigInteger (PK) | Auto-increment |
worker_id | String(255) | NULL = global config, có giá trị = config riêng worker |
key | String(255) | Tên config key |
value | Text | Giá trị |
description | Text | Mô tả |
created_at | DateTime | Thời gian tạo |
updated_at | DateTime | Thời gian cập nhật |
Unique constraint: (worker_id, key)
Bảng execution_log
Lịch sử thực thi của các workers.
| Cột | Kiểu | Mô tả |
|---|---|---|
id | BigInteger (PK) | Auto-increment |
type | Enum | TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL |
value | Text | Nội dung log |
created_from | Enum | SCHEDULE_WORKER, WEBSOCKET_LISTENER, SYSTEM |
created_by | String(255) | Worker ID đã tạo log |
created_at | DateTime | Thời gian tạo, indexed |
2. User Database — Cấu Trúc Thư Mục
Mỗi server database là một thư mục con trong app/database/:
app/database/
└── <server_id>/ # Tên server (trùng id trong @database)
├── connection.py # @database decorator + Base
└── <database_name>/ # Tên database trên server
├── models/
│ └── my_model.py # SQLAlchemy model
└── repositories/
└── my_repository.py # Repository kế thừa IRepositoryBase
Các server hiện có:
| Server ID | Engine | Databases |
|---|---|---|
core_mysql | MySQL | crawler_core |
localhost_mysql | MySQL | craw_result |
simplize_mysql_dev | MySQL | simplize_company_dev |
localhost_postgres | PostgreSQL | simplize_historical_prices |
simplize_postgres_dev | PostgreSQL | simplize_historical_prices, simplize_company_dev |
3. DatabaseOrchestrator & UnitOfWork
DatabaseOrchestrator
Vị trí: app/core/orchestrators/database/orchestrator.py
Facade cấp cao để truy cập database. Import singleton:
from app.core.orchestrators.database import db
# hoặc
from app.core.orchestrators.database.orchestrator import DatabaseOrchestrator
db = DatabaseOrchestrator()
UnitOfWork Pattern
Vị trí: app/core/orchestrators/database/unit_of_work.py
Quản lý session lifecycle và cung cấp repository access.
Cách sử dụng cơ bản:
from app.core.orchestrators.database import db
from app.database.simplize_mysql_dev.simplize_company_dev.repositories.company_repository import CompanyRepository
# 1. Khởi tạo Unit of Work
# db.get_unit_of_work("<tên_server>", "<tên_database>")
# Lưu ý: tên server phải trùng với key đã đăng ký trong Registry
uow = db.get_unit_of_work("simplize_mysql_dev", "simplize_company_dev")
# 2. Lấy repository từ Unit of Work
company_repo = uow.get_repository(CompanyRepository)
# 3. Thao tác dữ liệu
company = company_repo.get_by_ticker("ABC")
all_companies = company_repo.get_all()
if company:
company.name_vi = "Công ty ABC Updated"
company_repo.update(company)
if company:
company_repo.delete(company)
# 4. Commit thay đổi xuống DB
uow.commit()
Context manager (auto-rollback khi exception, auto-close khi exit):
with db.get_unit_of_work("simplize_mysql_dev", "simplize_company_dev") as uow:
repo = uow.get_repository(CompanyRepository)
companies = repo.get_all()
uow.commit()
Đặc điểm:
- Session lazy-init (chỉ tạo khi cần, không cần gọi
open()thủ công) - Repository cached trong UnitOfWork (gọi
get_repository()nhiều lần trả về cùng instance) - Context manager auto-rollback khi có exception
- Có thể dùng trực tiếp (không context manager) — session tự mở khi truy cập repository
4. IRepositoryBase
Vị trí: app/core/structures/i_repository_base.py
Base repository generic cung cấp CRUD operations. Tất cả repository kế thừa từ class này.
Methods
| Method | Mô tả |
|---|---|
get_all() | Lấy tất cả records |
get_by_id(id) | Lấy record theo primary key |
create(entity) | Tạo mới (flush) |
create_many(entities) | Tạo nhiều records (flush) |
update(entity) | Cập nhật (merge + flush) |
delete(entity) | Xoá record (flush) |
upsert(data, unique_keys) | Insert hoặc update theo unique keys |
upsert_many(data, unique_keys) | Batch upsert với deduplication |
commit() | Commit transaction |
rollback() | Rollback transaction |
Pattern quan trọng: Repository chỉ gọi flush() (không commit). Caller quyết định khi nào commit thông qua uow.commit().
5. DatabaseStrategy
Vị trí: app/core/orchestrators/database/strategy.py
Xây dựng connection URL và engine options theo loại database:
| Engine | Driver | URL Format |
|---|---|---|
MYSQL | mysql+pymysql | mysql+pymysql://user:pass@host:port/db |
POSTGRESQL | postgresql+psycopg2 | postgresql+psycopg2://user:pass@host:port/db |
Engine Options (chung cho cả MySQL và PostgreSQL):
| Option | Giá trị |
|---|---|
pool_size | 5 |
max_overflow | 10 |
pool_recycle | 3600 (1 giờ) |
pool_pre_ping | True |
echo | False |
6. Thêm Database Server Mới
Bước 1: Thêm biến môi trường
Thêm vào app/config/settings.py:
class Settings:
# --- Database: my_new_server ---
my_new_server_host: str = os.getenv("MY_NEW_SERVER_HOST", "localhost")
my_new_server_port: int = int(os.getenv("MY_NEW_SERVER_PORT", "3306"))
my_new_server_user: str = os.getenv("MY_NEW_SERVER_USER", "admin")
my_new_server_password: str = os.getenv("MY_NEW_SERVER_PASSWORD", "admin")
Bước 2: Tạo connection
Tạo app/database/my_new_server/connection.py:
from sqlalchemy.orm import declarative_base
from app.config import settings
from app.core.decorators import database
from app.core.orchestrators.database.strategy import DatabaseEngine
@database(
id="my_new_server",
name="My New Server",
engine=DatabaseEngine.MYSQL,
host=settings.my_new_server_host,
port=settings.my_new_server_port,
user=settings.my_new_server_user,
password=settings.my_new_server_password,
auto_sync=True,
)
class MyNewServerConnection:
Base = declarative_base()
Base = MyNewServerConnection.Base
Bước 3: Tạo model
Tạo app/database/my_new_server/my_database/models/user.py:
from sqlalchemy import Column, BigInteger, String
from app.database.my_new_server.connection import Base
class User(Base):
__tablename__ = "users"
id = Column(BigInteger, primary_key=True, autoincrement=True)
name = Column(String(255))
email = Column(String(255))
Bước 4: Tạo repository
Tạo app/database/my_new_server/my_database/repositories/user_repository.py:
from sqlalchemy.orm import Session
from app.core.structures.i_repository_base import IRepositoryBase
from app.database.my_new_server.my_database.models.user import User
class UserRepository(IRepositoryBase[User]):
def __init__(self, db: Session):
super().__init__(db, User)
# Custom queries
def get_by_email(self, email: str):
return self.db.query(User).filter(User.email == email).first()
Bước 5: Sử dụng
from app.core.orchestrators.database import db
from app.database.my_new_server.my_database.repositories.user_repository import UserRepository
from app.database.my_new_server.my_database.models.user import User
uow = db.get_unit_of_work("my_new_server", "my_database")
user_repo = uow.get_repository(UserRepository)
new_user = User(name="John", email="john@example.com")
user_repo.create(new_user)
user = user_repo.get_by_email("john@example.com")
all_users = user_repo.get_all()
if user:
user.name = "John Updated"
user_repo.update(user)
if user:
user_repo.delete(user)
uow.commit()