Chuyển tới nội dung chính

Hệ Thống Decorator

Tổng Quan

Hệ thống sử dụng 3 decorator để tự đăng ký workers và database connections. Mỗi decorator tuân theo pattern:

@decorator → Registry (in-memory dict) → Controller (điều khiển runtime)

Quy tắc quan trọng: Class được decorate phải được import trong __init__.py của package tương ứng để decorator được trigger.


1. @schedule — Schedule Worker

Vị trí: app/core/decorators/schedule/

Tham Số

Tham sốKiểuMặc địnhMô tả
idstrbắt buộcID duy nhất của worker
namestrbắt buộcTên hiển thị
cronstr""Cron expression (minute hour dom month dow)
lockboolFalseSử dụng distributed lock (Redis)
lock_ttlint0Lock time-to-live (giây)

Cách Hoạt Động

  1. Decorator time: đăng ký vào registry, tạo Celery shared_task wrapping run()
  2. Sync time (startup): sync_db_to_redis() kiểm tra DB để add/remove khỏi beat_schedule
  3. Runtime: Beat trigger task → task function quản lý status → gọi instance.run()

Lưu ý: Beat registration KHÔNG xảy ra tại decorator time. sync_service quản lý việc đăng ký beat dựa trên trạng thái enabled/disabled trong DB.

Status Lifecycle

IDLE ──(cron/manual)──→ RUNNING ──→ COMPLETED ──→ restore pre_run_status (IDLE hoặc DISABLE)
└──→ ERROR ──→ restore pre_run_status
DISABLE ──(manual only)──→ RUNNING ──→ ... ──→ restore DISABLE
  • DISABLE chặn cron trigger nhưng cho phép chạy thủ công (run once)
  • Sau khi chạy xong, status được khôi phục về giá trị trước khi run (pre_run_status)

Lock Mechanism

Khi lock=Truelock_ttl > 0:

  • Trước khi run: SET NX key lock vào Redis với TTL
  • Nếu lock đang bị giữ → skip task ({"skipped": True, "reason": "lock_held"})
  • Sau khi run (finally): release lock

Code Mẫu

# app/workers/schedules/my_crawler.py
from app.core.decorators.schedule import schedule
from app.core.structures.schedule_base import ScheduleBaseWorker
from app.plugins.logging.logger import logger

@schedule(
id="crawl_daily_data",
name="Daily Data Crawler",
cron="0 0 * * *", # Chạy mỗi ngày lúc 00:00
lock=True,
lock_ttl=300, # Lock tối đa 5 phút
)
class DailyDataCrawler(ScheduleBaseWorker):
def run(self):
logger.info("Crawling data...")
# Logic xử lý chính
return {"records": 100}

def on_complete(self, result):
logger.info(f"Crawled {result['records']} records")

def on_error(self, error: Exception):
logger.error(f"Crawl failed: {error}")

Đăng ký trong app/workers/schedules/__init__.py:

from app.workers.schedules.my_crawler import DailyDataCrawler

2. @websocket — WebSocket Worker

Vị trí: app/core/decorators/websocket/

Tham Số

Tham sốKiểuMặc địnhMô tả
idstrbắt buộcID duy nhất
namestrbắt buộcTên hiển thị
urlstrbắt buộcWebSocket URL kết nối
restart_policystr"none""none" hoặc "exponential_backoff"
restart_max_attemptsint0Số lần reconnect tối đa (0 = không giới hạn)

Cách Hoạt Động

  1. Decorator time: đăng ký vào registry, patch __init__ để đảm bảo self.client = None
  2. Worker boot (worker_ready signal): start tất cả enabled WebSocket workers trong daemon threads
  3. Runtime: WsClient.run_forever() chạy trong thread, gọi callbacks khi có sự kiện

Restart Policies

PolicyMô tả
noneKhông reconnect khi disconnect
exponential_backoffReconnect với backoff: 5s → 10s → 20s → ... → 300s max

Status Lifecycle

STOP ──(start)──→ RUNNING ──(intentional stop)──→ STOP
└──(unexpected error)──→ ERROR

Code Mẫu

# app/workers/websockets/my_ws_listener.py
from app.core.decorators.websocket import websocket
from app.core.structures.websocket_base import WebSocketBaseWorker
from app.plugins.logging.logger import logger

@websocket(
id="ws_market_data",
name="Market Data WebSocket",
url="wss://stream.example.com/ws",
restart_policy="exponential_backoff",
restart_max_attempts=5,
)
class MarketDataWs(WebSocketBaseWorker):
def on_open(self):
logger.info("Connected! Subscribing...")
self.client.send({"action": "subscribe", "channel": "market"})

def on_message(self, message):
logger.debug(f"Received: {message}")
# Xử lý dữ liệu realtime

def on_error(self, error: Exception):
logger.error(f"WS error: {error}")

def on_close(self):
logger.info("Connection closed")

Đăng ký trong app/workers/websockets/__init__.py:

from app.workers.websockets.my_ws_listener import MarketDataWs

Sử Dụng self.client

self.client là instance WsClient, được inject tự động khi controller start worker:

def on_open(self):
# Gửi dữ liệu
self.client.send({"action": "subscribe"})
self.client.send("plain text message")

3. @database — Database Connection

Vị trí: app/core/decorators/database/

Tham Số

Tham sốKiểuMặc địnhMô tả
idstrbắt buộcID server (ví dụ: "simplize_mysql_dev")
namestrbắt buộcTên hiển thị
engineDatabaseEnginebắt buộcDatabaseEngine.MYSQL hoặc DatabaseEngine.POSTGRESQL
hoststrbắt buộcDatabase host
portintbắt buộcDatabase port
userstrbắt buộcDatabase user
passwordstrbắt buộcDatabase password
auto_syncboolFalseTự động tạo database và sync schema

Injected Classmethods

Decorator tự động inject 6 classmethods vào class được decorate:

MethodMô tả
get_engine(db_name)Tạo/cache SQLAlchemy Engine
create_session(db_name)Tạo Session mới
execute_sql(db_name, sql, params)Chạy raw SQL statement
test_connection(db_name)Test kết nối, trả về bool
create_tables(db_name)Tạo tables từ Base.metadata
dispose(db_name)Dispose engine (None = dispose tất cả)

auto_sync Mode

Khi auto_sync=True:

  • Tự động tạo database nếu chưa tồn tại
  • Tự động tạo tables/columns từ SQLAlchemy models khi get_engine() được gọi lần đầu

Code Mẫu

# app/database/my_server/connection.py
from sqlalchemy.orm import declarative_base

from app.config import settings
from app.core.decorators import database
from app.core.orchestrators.database.strategy import DatabaseEngine

@database(
id="my_mysql_server",
name="My MySQL Server",
engine=DatabaseEngine.MYSQL,
host=settings.my_mysql_host,
port=settings.my_mysql_port,
user=settings.my_mysql_user,
password=settings.my_mysql_password,
auto_sync=True,
)
class MyMysqlConnection:
Base = declarative_base()

Base = MyMysqlConnection.Base

Sử dụng trực tiếp:

from app.database.my_server.connection import MyMysqlConnection

# Test kết nối
ok = MyMysqlConnection.test_connection("my_database")

# Chạy raw SQL
result = MyMysqlConnection.execute_sql("my_database", "SELECT COUNT(*) FROM users")

# Tạo tables
MyMysqlConnection.create_tables("my_database")

Chi tiết về cách tổ chức database → xem Database.