Workers
Vị trí: app/workers/ — nơi đăng ký và implement các worker chạy tự động.
Có 2 loại worker:
- Schedule Workers (
app/workers/schedules/) — chạy theo lịch trình cron - WebSocket Workers (
app/workers/websockets/) — duy trì kết nối realtime liên tục
1. Schedule Workers
Base Class
Vị trí: app/core/structures/schedule_base.py
class ScheduleBaseWorker:
def run(self):
"""Logic xử lý chính. Giá trị trả về lưu vào last_result."""
pass
def on_complete(self, result):
"""Gọi sau khi run() thành công. result = giá trị trả về của run()."""
pass
def on_error(self, error: Exception):
"""Gọi khi run() raise exception."""
pass
Lifecycle
1. Cron trigger (Beat) hoặc Manual run (API)
2. Kiểm tra status: nếu DISABLE + cron trigger → skip
3. Acquire distributed lock (nếu lock=True)
4. Set status → RUNNING, lưu pre_run_status
5. Gọi instance.run()
6. Thành công → on_complete(result), status → pre_run_status, lưu last_result
7. Lỗi → on_error(error), status → pre_run_status, lưu error
8. Finally: release lock, sync status về DB
Danh Sách Workers Hiện Có
| ID | Tên | Cron | Lock | Mô tả |
|---|---|---|---|---|
test_schedule | Test Schedule Worker | * * * * * | No | Dùng để test hệ thống |
crawl_funds_fmarket_eod | Funds FMarket EOD Crawl | 0 0 * * * | Yes (300s) | Cào dữ liệu quỹ FMarket cuối ngày |
crawl_fcafe_news_daily | FCAFE News Daily Crawl | 0 0 1 * * | Yes (300s) | Cào tin tức CafeF |
Thêm Schedule Worker Mới
Bước 1: Tạo file worker
# app/workers/schedules/my_new_schedule.py
from app.core.decorators.schedule import schedule
from app.core.structures.schedule_base import ScheduleBaseWorker
from app.plugins.logging.logger import logger
@schedule(
id="my_new_schedule",
name="My New Schedule",
cron="0 9 * * *", # Chạy mỗi ngày lúc 9:00
lock=True, # Dùng distributed lock
lock_ttl=300, # Lock tối đa 5 phút
)
class MyNewSchedule(ScheduleBaseWorker):
def run(self):
logger.info("Starting my schedule...")
# Gọi processor hoặc xử lý trực tiếp
result = self._process_data()
return result
def on_complete(self, result):
logger.success(f"Completed: {result}")
def on_error(self, error: Exception):
logger.error(f"Failed: {error}")
def _process_data(self):
# Logic nghiệp vụ
return {"status": "success", "records": 50}
Bước 2: Đăng ký import
Thêm vào app/workers/schedules/__init__.py:
from app.workers.schedules.my_new_schedule import MyNewSchedule
Bước 3: Restart processes
Worker mới sẽ tự động được seed vào DB và sync lên Redis khi API/Worker/Beat khởi động lại.
Ví Dụ Thực Tế: funds_fmarket_schedule.py
@schedule(
id="crawl_funds_fmarket_eod",
name="Funds FMarket EOD Crawl",
cron="0 0 * * *",
lock=True,
lock_ttl=300,
)
class FundsFmarketEodSchedule(ScheduleBaseWorker):
def __init__(self):
super().__init__()
self.processor = FmarketEodProcessor()
def run(self):
self.processor.run()
Pattern: Worker khởi tạo Processor, gọi processor.run() trong run(). Logic nghiệp vụ nằm trong module processor, không trong worker.
2. WebSocket Workers
Base Class
Vị trí: app/core/structures/websocket_base.py
class WebSocketBaseWorker:
client: WsClient | None # Inject bởi controller khi start
def __init__(self):
self.client = None
def on_open(self):
"""Gọi khi kết nối WebSocket thành công."""
pass
def on_message(self, message: str):
"""Gọi khi nhận message. message đã được parse JSON nếu có thể."""
pass
def on_error(self, error: Exception):
"""Gọi khi có lỗi kết nối."""
pass
def on_close(self):
"""Gọi khi kết nối bị đóng."""
pass
Lifecycle
1. Worker process boot → worker_ready signal
2. start_all_websockets() → cho mỗi worker: spawn daemon thread
3. Thread chạy WsClient.run_forever()
4. on_open() → on_message() loop → on_close() hoặc on_error()
5. Nếu auto_reconnect: exponential backoff → reconnect
6. Status updates → Redis hashes
7. Worker shutdown → stop_all_websockets()
Sử Dụng self.client
self.client là instance WsClient, được inject tự động. Dùng để gửi dữ liệu:
def on_open(self):
# Gửi dict → auto-serialize JSON
self.client.send({"action": "subscribe", "symbols": ["VNM", "VIC"]})
# Gửi text
self.client.send("ping")
Lưu ý: self.client chỉ có giá trị sau khi worker được start bởi controller. Trong test hay ngoài runtime, self.client = None.
Danh Sách Workers Hiện Có
| ID | Tên | URL | Restart Policy | Max Attempts |
|---|---|---|---|---|
test_ws | Test WebSocket Worker | ws://host.docker.internal:9100 | none | — |
ws_vcbs_quote_tick | VCBs Quote Tick | wss://wsc.vcsc.com.vn/wss/vcbs/quote/tick | exponential_backoff | 3 |
Thêm WebSocket Worker Mới
Bước 1: Tạo file worker
# app/workers/websockets/my_ws_worker.py
from app.core.decorators.websocket import websocket
from app.core.structures.websocket_base import WebSocketBaseWorker
from app.plugins.logging.logger import logger
@websocket(
id="ws_my_data_stream",
name="My Data Stream WebSocket",
url="wss://stream.example.com/ws",
restart_policy="exponential_backoff",
restart_max_attempts=5,
)
class MyDataStreamWs(WebSocketBaseWorker):
def on_open(self):
logger.info("Connected to data stream")
self.client.send({"subscribe": ["channel_1", "channel_2"]})
def on_message(self, message):
# message đã được parse JSON nếu có thể
if isinstance(message, dict):
self._process_data(message)
else:
logger.debug(f"Raw message: {message}")
def on_error(self, error: Exception):
logger.error(f"Stream error: {error}")
def on_close(self):
logger.info("Stream disconnected")
def _process_data(self, data):
# Logic xử lý dữ liệu realtime
pass
Bước 2: Đăng ký import
Thêm vào app/workers/websockets/__init__.py:
from app.workers.websockets.my_ws_worker import MyDataStreamWs
Bước 3: Restart Worker process
Worker mới sẽ tự động start khi Worker process khởi động lại.
3. Quy Tắc Đăng Ký
- Worker class phải được import trong
__init__.pytương ứng:- Schedule:
app/workers/schedules/__init__.py - WebSocket:
app/workers/websockets/__init__.py
- Schedule:
- Import trigger decorator → đăng ký vào registry → hệ thống nhận biết worker
- Nếu không import → worker không được đăng ký → không xuất hiện trong API/Beat
Điều Khiển Workers
Workers được điều khiển qua API — xem API Control Guide.