Chuyển tới nội dung chính

Kiến Trúc Database

Tổng Quan 3 Tầng

TầngVị tríMô tả
Core MySQLapp/core/database/Tables nội bộ: config, worker_status, execution_log
User Databasesapp/database/<server>/<db>/Dữ liệu ứng dụng (models + repositories)
Connection Registry@database decoratorĐăng ký server, inject helper methods

1. Core Database

Vị trí: app/core/database/

Core MySQL lưu trữ trạng thái nội bộ của hệ thống. Database tên crawler_core (cấu hình qua CORE_DB_NAME).

Bảng worker_status

Lưu trạng thái enable/disable và thông tin của workers.

CộtKiểuMô tả
idBigInteger (PK)Auto-increment
worker_idString(255)ID worker, unique, indexed
nameString(500)Tên hiển thị
typeEnumSCHEDULE_WORKER hoặc WEBSOCKET_LISTENER
enabledBooleanTrue = hoạt động, False = bị tắt
statusString(50)IDLE, RUNNING, COMPLETED, ERROR, ACTIVE, DISABLE, STOP
last_errorString(2000)Lỗi gần nhất
created_atDateTimeThời gian tạo
updated_atDateTimeThời gian cập nhật gần nhất

Bảng config

Config key-value, hỗ trợ config chung (global) và config riêng worker.

CộtKiểuMô tả
idBigInteger (PK)Auto-increment
worker_idString(255)NULL = global config, có giá trị = config riêng worker
keyString(255)Tên config key
valueTextGiá trị
descriptionTextMô tả
created_atDateTimeThời gian tạo
updated_atDateTimeThời gian cập nhật

Unique constraint: (worker_id, key)

Bảng execution_log

Lịch sử thực thi của các workers.

CộtKiểuMô tả
idBigInteger (PK)Auto-increment
typeEnumTRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL
valueTextNội dung log
created_fromEnumSCHEDULE_WORKER, WEBSOCKET_LISTENER, SYSTEM
created_byString(255)Worker ID đã tạo log
created_atDateTimeThời gian tạo, indexed

2. User Database — Cấu Trúc Thư Mục

Mỗi server database là một thư mục con trong app/database/:

app/database/
└── <server_id>/ # Tên server (trùng id trong @database)
├── connection.py # @database decorator + Base
└── <database_name>/ # Tên database trên server
├── models/
│ └── my_model.py # SQLAlchemy model
└── repositories/
└── my_repository.py # Repository kế thừa IRepositoryBase

Các server hiện có:

Server IDEngineDatabases
core_mysqlMySQLcrawler_core
localhost_mysqlMySQLcraw_result
simplize_mysql_devMySQLsimplize_company_dev
localhost_postgresPostgreSQLsimplize_historical_prices
simplize_postgres_devPostgreSQLsimplize_historical_prices, simplize_company_dev

3. DatabaseOrchestrator & UnitOfWork

DatabaseOrchestrator

Vị trí: app/core/orchestrators/database/orchestrator.py

Facade cấp cao để truy cập database. Import singleton:

from app.core.orchestrators.database import db
# hoặc
from app.core.orchestrators.database.orchestrator import DatabaseOrchestrator
db = DatabaseOrchestrator()

UnitOfWork Pattern

Vị trí: app/core/orchestrators/database/unit_of_work.py

Quản lý session lifecycle và cung cấp repository access.

Cách sử dụng cơ bản:

from app.core.orchestrators.database import db
from app.database.simplize_mysql_dev.simplize_company_dev.repositories.company_repository import CompanyRepository

# 1. Khởi tạo Unit of Work
# db.get_unit_of_work("<tên_server>", "<tên_database>")
# Lưu ý: tên server phải trùng với key đã đăng ký trong Registry
uow = db.get_unit_of_work("simplize_mysql_dev", "simplize_company_dev")

# 2. Lấy repository từ Unit of Work
company_repo = uow.get_repository(CompanyRepository)

# 3. Thao tác dữ liệu
company = company_repo.get_by_ticker("ABC")
all_companies = company_repo.get_all()

if company:
company.name_vi = "Công ty ABC Updated"
company_repo.update(company)

if company:
company_repo.delete(company)

# 4. Commit thay đổi xuống DB
uow.commit()

Context manager (auto-rollback khi exception, auto-close khi exit):

with db.get_unit_of_work("simplize_mysql_dev", "simplize_company_dev") as uow:
repo = uow.get_repository(CompanyRepository)
companies = repo.get_all()
uow.commit()

Đặc điểm:

  • Session lazy-init (chỉ tạo khi cần, không cần gọi open() thủ công)
  • Repository cached trong UnitOfWork (gọi get_repository() nhiều lần trả về cùng instance)
  • Context manager auto-rollback khi có exception
  • Có thể dùng trực tiếp (không context manager) — session tự mở khi truy cập repository

4. IRepositoryBase

Vị trí: app/core/structures/i_repository_base.py

Base repository generic cung cấp CRUD operations. Tất cả repository kế thừa từ class này.

Methods

MethodMô tả
get_all()Lấy tất cả records
get_by_id(id)Lấy record theo primary key
create(entity)Tạo mới (flush)
create_many(entities)Tạo nhiều records (flush)
update(entity)Cập nhật (merge + flush)
delete(entity)Xoá record (flush)
upsert(data, unique_keys)Insert hoặc update theo unique keys
upsert_many(data, unique_keys)Batch upsert với deduplication
commit()Commit transaction
rollback()Rollback transaction

Pattern quan trọng: Repository chỉ gọi flush() (không commit). Caller quyết định khi nào commit thông qua uow.commit().


5. DatabaseStrategy

Vị trí: app/core/orchestrators/database/strategy.py

Xây dựng connection URL và engine options theo loại database:

EngineDriverURL Format
MYSQLmysql+pymysqlmysql+pymysql://user:pass@host:port/db
POSTGRESQLpostgresql+psycopg2postgresql+psycopg2://user:pass@host:port/db

Engine Options (chung cho cả MySQL và PostgreSQL):

OptionGiá trị
pool_size5
max_overflow10
pool_recycle3600 (1 giờ)
pool_pre_pingTrue
echoFalse

6. Thêm Database Server Mới

Bước 1: Thêm biến môi trường

Thêm vào app/config/settings.py:

class Settings:
# --- Database: my_new_server ---
my_new_server_host: str = os.getenv("MY_NEW_SERVER_HOST", "localhost")
my_new_server_port: int = int(os.getenv("MY_NEW_SERVER_PORT", "3306"))
my_new_server_user: str = os.getenv("MY_NEW_SERVER_USER", "admin")
my_new_server_password: str = os.getenv("MY_NEW_SERVER_PASSWORD", "admin")

Bước 2: Tạo connection

Tạo app/database/my_new_server/connection.py:

from sqlalchemy.orm import declarative_base
from app.config import settings
from app.core.decorators import database
from app.core.orchestrators.database.strategy import DatabaseEngine

@database(
id="my_new_server",
name="My New Server",
engine=DatabaseEngine.MYSQL,
host=settings.my_new_server_host,
port=settings.my_new_server_port,
user=settings.my_new_server_user,
password=settings.my_new_server_password,
auto_sync=True,
)
class MyNewServerConnection:
Base = declarative_base()

Base = MyNewServerConnection.Base

Bước 3: Tạo model

Tạo app/database/my_new_server/my_database/models/user.py:

from sqlalchemy import Column, BigInteger, String
from app.database.my_new_server.connection import Base

class User(Base):
__tablename__ = "users"

id = Column(BigInteger, primary_key=True, autoincrement=True)
name = Column(String(255))
email = Column(String(255))

Bước 4: Tạo repository

Tạo app/database/my_new_server/my_database/repositories/user_repository.py:

from sqlalchemy.orm import Session
from app.core.structures.i_repository_base import IRepositoryBase
from app.database.my_new_server.my_database.models.user import User

class UserRepository(IRepositoryBase[User]):
def __init__(self, db: Session):
super().__init__(db, User)

# Custom queries
def get_by_email(self, email: str):
return self.db.query(User).filter(User.email == email).first()

Bước 5: Sử dụng

from app.core.orchestrators.database import db
from app.database.my_new_server.my_database.repositories.user_repository import UserRepository
from app.database.my_new_server.my_database.models.user import User

uow = db.get_unit_of_work("my_new_server", "my_database")
user_repo = uow.get_repository(UserRepository)

new_user = User(name="John", email="john@example.com")
user_repo.create(new_user)

user = user_repo.get_by_email("john@example.com")
all_users = user_repo.get_all()

if user:
user.name = "John Updated"
user_repo.update(user)

if user:
user_repo.delete(user)

uow.commit()