Hệ Thống Decorator
Tổng Quan
Hệ thống sử dụng 3 decorator để tự đăng ký workers và database connections. Mỗi decorator tuân theo pattern:
@decorator → Registry (in-memory dict) → Controller (điều khiển runtime)
Quy tắc quan trọng: Class được decorate phải được import trong __init__.py của package tương ứng để decorator được trigger.
1. @schedule — Schedule Worker
Vị trí: app/core/decorators/schedule/
Tham Số
| Tham số | Kiểu | Mặc định | Mô tả |
|---|---|---|---|
id | str | bắt buộc | ID duy nhất của worker |
name | str | bắt buộc | Tên hiển thị |
cron | str | "" | Cron expression (minute hour dom month dow) |
lock | bool | False | Sử dụng distributed lock (Redis) |
lock_ttl | int | 0 | Lock time-to-live (giây) |
Cách Hoạt Động
- Decorator time: đăng ký vào registry, tạo Celery
shared_taskwrappingrun() - Sync time (startup):
sync_db_to_redis()kiểm tra DB để add/remove khỏibeat_schedule - Runtime: Beat trigger task → task function quản lý status → gọi
instance.run()
Lưu ý: Beat registration KHÔNG xảy ra tại decorator time.
sync_servicequản lý việc đăng ký beat dựa trên trạng thái enabled/disabled trong DB.
Status Lifecycle
IDLE ──(cron/manual)──→ RUNNING ──→ COMPLETED ──→ restore pre_run_status (IDLE hoặc DISABLE)
└──→ ERROR ──→ restore pre_run_status
DISABLE ──(manual only)──→ RUNNING ──→ ... ──→ restore DISABLE
DISABLEchặn cron trigger nhưng cho phép chạy thủ công (run once)- Sau khi chạy xong, status được khôi phục về giá trị trước khi run (
pre_run_status)
Lock Mechanism
Khi lock=True và lock_ttl > 0:
- Trước khi run:
SET NXkey lock vào Redis với TTL - Nếu lock đang bị giữ → skip task (
{"skipped": True, "reason": "lock_held"}) - Sau khi run (finally): release lock
Code Mẫu
# app/workers/schedules/my_crawler.py
from app.core.decorators.schedule import schedule
from app.core.structures.schedule_base import ScheduleBaseWorker
from app.plugins.logging.logger import logger
@schedule(
id="crawl_daily_data",
name="Daily Data Crawler",
cron="0 0 * * *", # Chạy mỗi ngày lúc 00:00
lock=True,
lock_ttl=300, # Lock tối đa 5 phút
)
class DailyDataCrawler(ScheduleBaseWorker):
def run(self):
logger.info("Crawling data...")
# Logic xử lý chính
return {"records": 100}
def on_complete(self, result):
logger.info(f"Crawled {result['records']} records")
def on_error(self, error: Exception):
logger.error(f"Crawl failed: {error}")
Đăng ký trong app/workers/schedules/__init__.py:
from app.workers.schedules.my_crawler import DailyDataCrawler
2. @websocket — WebSocket Worker
Vị trí: app/core/decorators/websocket/
Tham Số
| Tham số | Kiểu | Mặc định | Mô tả |
|---|---|---|---|
id | str | bắt buộc | ID duy nhất |
name | str | bắt buộc | Tên hiển thị |
url | str | bắt buộc | WebSocket URL kết nối |
restart_policy | str | "none" | "none" hoặc "exponential_backoff" |
restart_max_attempts | int | 0 | Số lần reconnect tối đa (0 = không giới hạn) |
Cách Hoạt Động
- Decorator time: đăng ký vào registry, patch
__init__để đảm bảoself.client = None - Worker boot (
worker_readysignal): start tất cả enabled WebSocket workers trong daemon threads - Runtime:
WsClient.run_forever()chạy trong thread, gọi callbacks khi có sự kiện
Restart Policies
| Policy | Mô tả |
|---|---|
none | Không reconnect khi disconnect |
exponential_backoff | Reconnect với backoff: 5s → 10s → 20s → ... → 300s max |
Status Lifecycle
STOP ──(start)──→ RUNNING ──(intentional stop)──→ STOP
└──(unexpected error)──→ ERROR
Code Mẫu
# app/workers/websockets/my_ws_listener.py
from app.core.decorators.websocket import websocket
from app.core.structures.websocket_base import WebSocketBaseWorker
from app.plugins.logging.logger import logger
@websocket(
id="ws_market_data",
name="Market Data WebSocket",
url="wss://stream.example.com/ws",
restart_policy="exponential_backoff",
restart_max_attempts=5,
)
class MarketDataWs(WebSocketBaseWorker):
def on_open(self):
logger.info("Connected! Subscribing...")
self.client.send({"action": "subscribe", "channel": "market"})
def on_message(self, message):
logger.debug(f"Received: {message}")
# Xử lý dữ liệu realtime
def on_error(self, error: Exception):
logger.error(f"WS error: {error}")
def on_close(self):
logger.info("Connection closed")
Đăng ký trong app/workers/websockets/__init__.py:
from app.workers.websockets.my_ws_listener import MarketDataWs
Sử Dụng self.client
self.client là instance WsClient, được inject tự động khi controller start worker:
def on_open(self):
# Gửi dữ liệu
self.client.send({"action": "subscribe"})
self.client.send("plain text message")
3. @database — Database Connection
Vị trí: app/core/decorators/database/
Tham Số
| Tham số | Kiểu | Mặc định | Mô tả |
|---|---|---|---|
id | str | bắt buộc | ID server (ví dụ: "simplize_mysql_dev") |
name | str | bắt buộc | Tên hiển thị |
engine | DatabaseEngine | bắt buộc | DatabaseEngine.MYSQL hoặc DatabaseEngine.POSTGRESQL |
host | str | bắt buộc | Database host |
port | int | bắt buộc | Database port |
user | str | bắt buộc | Database user |
password | str | bắt buộc | Database password |
auto_sync | bool | False | Tự động tạo database và sync schema |
Injected Classmethods
Decorator tự động inject 6 classmethods vào class được decorate:
| Method | Mô tả |
|---|---|
get_engine(db_name) | Tạo/cache SQLAlchemy Engine |
create_session(db_name) | Tạo Session mới |
execute_sql(db_name, sql, params) | Chạy raw SQL statement |
test_connection(db_name) | Test kết nối, trả về bool |
create_tables(db_name) | Tạo tables từ Base.metadata |
dispose(db_name) | Dispose engine (None = dispose tất cả) |
auto_sync Mode
Khi auto_sync=True:
- Tự động tạo database nếu chưa tồn tại
- Tự động tạo tables/columns từ SQLAlchemy models khi
get_engine()được gọi lần đầu
Code Mẫu
# app/database/my_server/connection.py
from sqlalchemy.orm import declarative_base
from app.config import settings
from app.core.decorators import database
from app.core.orchestrators.database.strategy import DatabaseEngine
@database(
id="my_mysql_server",
name="My MySQL Server",
engine=DatabaseEngine.MYSQL,
host=settings.my_mysql_host,
port=settings.my_mysql_port,
user=settings.my_mysql_user,
password=settings.my_mysql_password,
auto_sync=True,
)
class MyMysqlConnection:
Base = declarative_base()
Base = MyMysqlConnection.Base
Sử dụng trực tiếp:
from app.database.my_server.connection import MyMysqlConnection
# Test kết nối
ok = MyMysqlConnection.test_connection("my_database")
# Chạy raw SQL
result = MyMysqlConnection.execute_sql("my_database", "SELECT COUNT(*) FROM users")
# Tạo tables
MyMysqlConnection.create_tables("my_database")
Chi tiết về cách tổ chức database → xem Database.