Chuyển tới nội dung chính

Workers

Vị trí: app/workers/ — nơi đăng ký và implement các worker chạy tự động.

Có 2 loại worker:

  • Schedule Workers (app/workers/schedules/) — chạy theo lịch trình cron
  • WebSocket Workers (app/workers/websockets/) — duy trì kết nối realtime liên tục

1. Schedule Workers

Base Class

Vị trí: app/core/structures/schedule_base.py

class ScheduleBaseWorker:
def run(self):
"""Logic xử lý chính. Giá trị trả về lưu vào last_result."""
pass

def on_complete(self, result):
"""Gọi sau khi run() thành công. result = giá trị trả về của run()."""
pass

def on_error(self, error: Exception):
"""Gọi khi run() raise exception."""
pass

Lifecycle

1. Cron trigger (Beat) hoặc Manual run (API)
2. Kiểm tra status: nếu DISABLE + cron trigger → skip
3. Acquire distributed lock (nếu lock=True)
4. Set status → RUNNING, lưu pre_run_status
5. Gọi instance.run()
6. Thành công → on_complete(result), status → pre_run_status, lưu last_result
7. Lỗi → on_error(error), status → pre_run_status, lưu error
8. Finally: release lock, sync status về DB

Danh Sách Workers Hiện Có

IDTênCronLockMô tả
test_scheduleTest Schedule Worker* * * * *NoDùng để test hệ thống
crawl_funds_fmarket_eodFunds FMarket EOD Crawl0 0 * * *Yes (300s)Cào dữ liệu quỹ FMarket cuối ngày
crawl_fcafe_news_dailyFCAFE News Daily Crawl0 0 1 * *Yes (300s)Cào tin tức CafeF

Thêm Schedule Worker Mới

Bước 1: Tạo file worker

# app/workers/schedules/my_new_schedule.py
from app.core.decorators.schedule import schedule
from app.core.structures.schedule_base import ScheduleBaseWorker
from app.plugins.logging.logger import logger

@schedule(
id="my_new_schedule",
name="My New Schedule",
cron="0 9 * * *", # Chạy mỗi ngày lúc 9:00
lock=True, # Dùng distributed lock
lock_ttl=300, # Lock tối đa 5 phút
)
class MyNewSchedule(ScheduleBaseWorker):
def run(self):
logger.info("Starting my schedule...")
# Gọi processor hoặc xử lý trực tiếp
result = self._process_data()
return result

def on_complete(self, result):
logger.success(f"Completed: {result}")

def on_error(self, error: Exception):
logger.error(f"Failed: {error}")

def _process_data(self):
# Logic nghiệp vụ
return {"status": "success", "records": 50}

Bước 2: Đăng ký import

Thêm vào app/workers/schedules/__init__.py:

from app.workers.schedules.my_new_schedule import MyNewSchedule

Bước 3: Restart processes

Worker mới sẽ tự động được seed vào DB và sync lên Redis khi API/Worker/Beat khởi động lại.

Ví Dụ Thực Tế: funds_fmarket_schedule.py

@schedule(
id="crawl_funds_fmarket_eod",
name="Funds FMarket EOD Crawl",
cron="0 0 * * *",
lock=True,
lock_ttl=300,
)
class FundsFmarketEodSchedule(ScheduleBaseWorker):
def __init__(self):
super().__init__()
self.processor = FmarketEodProcessor()

def run(self):
self.processor.run()

Pattern: Worker khởi tạo Processor, gọi processor.run() trong run(). Logic nghiệp vụ nằm trong module processor, không trong worker.


2. WebSocket Workers

Base Class

Vị trí: app/core/structures/websocket_base.py

class WebSocketBaseWorker:
client: WsClient | None # Inject bởi controller khi start

def __init__(self):
self.client = None

def on_open(self):
"""Gọi khi kết nối WebSocket thành công."""
pass

def on_message(self, message: str):
"""Gọi khi nhận message. message đã được parse JSON nếu có thể."""
pass

def on_error(self, error: Exception):
"""Gọi khi có lỗi kết nối."""
pass

def on_close(self):
"""Gọi khi kết nối bị đóng."""
pass

Lifecycle

1. Worker process boot → worker_ready signal
2. start_all_websockets() → cho mỗi worker: spawn daemon thread
3. Thread chạy WsClient.run_forever()
4. on_open() → on_message() loop → on_close() hoặc on_error()
5. Nếu auto_reconnect: exponential backoff → reconnect
6. Status updates → Redis hashes
7. Worker shutdown → stop_all_websockets()

Sử Dụng self.client

self.client là instance WsClient, được inject tự động. Dùng để gửi dữ liệu:

def on_open(self):
# Gửi dict → auto-serialize JSON
self.client.send({"action": "subscribe", "symbols": ["VNM", "VIC"]})

# Gửi text
self.client.send("ping")

Lưu ý: self.client chỉ có giá trị sau khi worker được start bởi controller. Trong test hay ngoài runtime, self.client = None.

Danh Sách Workers Hiện Có

IDTênURLRestart PolicyMax Attempts
test_wsTest WebSocket Workerws://host.docker.internal:9100none
ws_vcbs_quote_tickVCBs Quote Tickwss://wsc.vcsc.com.vn/wss/vcbs/quote/tickexponential_backoff3

Thêm WebSocket Worker Mới

Bước 1: Tạo file worker

# app/workers/websockets/my_ws_worker.py
from app.core.decorators.websocket import websocket
from app.core.structures.websocket_base import WebSocketBaseWorker
from app.plugins.logging.logger import logger

@websocket(
id="ws_my_data_stream",
name="My Data Stream WebSocket",
url="wss://stream.example.com/ws",
restart_policy="exponential_backoff",
restart_max_attempts=5,
)
class MyDataStreamWs(WebSocketBaseWorker):
def on_open(self):
logger.info("Connected to data stream")
self.client.send({"subscribe": ["channel_1", "channel_2"]})

def on_message(self, message):
# message đã được parse JSON nếu có thể
if isinstance(message, dict):
self._process_data(message)
else:
logger.debug(f"Raw message: {message}")

def on_error(self, error: Exception):
logger.error(f"Stream error: {error}")

def on_close(self):
logger.info("Stream disconnected")

def _process_data(self, data):
# Logic xử lý dữ liệu realtime
pass

Bước 2: Đăng ký import

Thêm vào app/workers/websockets/__init__.py:

from app.workers.websockets.my_ws_worker import MyDataStreamWs

Bước 3: Restart Worker process

Worker mới sẽ tự động start khi Worker process khởi động lại.


3. Quy Tắc Đăng Ký

  • Worker class phải được import trong __init__.py tương ứng:
    • Schedule: app/workers/schedules/__init__.py
    • WebSocket: app/workers/websockets/__init__.py
  • Import trigger decorator → đăng ký vào registry → hệ thống nhận biết worker
  • Nếu không import → worker không được đăng ký → không xuất hiện trong API/Beat

Điều Khiển Workers

Workers được điều khiển qua API — xem API Control Guide.