Tổng Quan Dự Án
Giới Thiệu
Crawler Worker Management System là hệ thống thu thập dữ liệu tự động, hỗ trợ hai kiểu worker:
- Schedule Workers — chạy theo lịch trình cron (ví dụ: cào dữ liệu quỹ cuối ngày, tin tức hàng giờ).
- WebSocket Workers — duy trì kết nối realtime liên tục (ví dụ: nhận dữ liệu khớp lệnh chứng khoán).
Hệ thống cung cấp REST API để giám sát, điều khiển workers và quản lý cấu hình, phục vụ tích hợp vào admin dashboard.
Timezone: Asia/Ho_Chi_Minh
GitHub: simplize-vn/simplize_data_ingestion
Tech Stack
| Thành phần | Công nghệ |
|---|---|
| Ngôn ngữ | Python 3.12 |
| API Server | FastAPI + Uvicorn |
| Task Queue | Celery 5.3+ (broker & backend: Redis) |
| Database ORM | SQLAlchemy 2.0 |
| Database Drivers | PyMySQL (MySQL), psycopg2 (PostgreSQL) |
| Validation | Pydantic v2 |
| HTTP Client | curl_cffi (browser fingerprinting) |
| WebSocket Client | websocket-client |
| Logging | Loguru |
| Containerization | Docker Compose |
Mô Hình 3-Process
Hệ thống hoạt động với 3 process chạy đồng thời:
1. API Process (run_api.py)
- Chạy FastAPI server trên port 8686
- Cổng giao tiếp duy nhất cho Frontend/Admin
- Startup: khởi tạo log writer → load config cache → seed workers → sync DB→Redis
2. Worker Process (run_worker.py)
- Chạy Celery worker (solo pool) để thực thi schedule tasks
- Quản lý các WebSocket listener threads
- Startup: clear Redis → load config → seed workers → sync DB→Redis → start tất cả WebSocket workers
3. Beat Process (run_beat.py)
- Chạy Celery Beat scheduler để trigger các cron jobs
- Startup: clear Redis → load config → sync DB→Redis → import schedule workers → chạy scheduler loop
Yêu Cầu Hệ Thống
Trước khi chạy dịch vụ, cần cấu hình các thành phần sau trong file .env:
Redis
Dùng làm Celery broker, backend và lưu trạng thái vận hành của worker.
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0
MySQL (Core DB)
Lưu cấu hình worker, trạng thái bền vững và execution log.
CORE_DB_HOST=localhost
CORE_DB_PORT=3306
CORE_DB_NAME=simplize_core
CORE_DB_USER=root
CORE_DB_PASSWORD=
Cách Chạy
Option 1: Virtual Environment (Local Development)
python -m venv venv
source venv/bin/activate # Linux/macOS
# venv\Scripts\activate # Windows
pip install -r requirements.txt
Chạy 3 terminal đồng thời:
python run_api.py # API server (port 8686, Swagger tại /docs)
python run_worker.py # Celery worker
python run_beat.py # Celery Beat scheduler
Test mode (không cần worker/beat riêng):
python main.py # Eager mode — task chạy đồng bộ
Option 2: Docker Compose
docker-compose up -d
Tạo 3 container: data-ingestion-api, data-ingestion-celery-worker, data-ingestion-celery-beat.
Cấu Trúc Thư Mục
app/
├── config/ # Cấu hình tập trung (settings.py, đọc .env)
├── core/
│ ├── database/ # Core MySQL: models & repositories nội bộ
│ ├── decorators/ # @schedule, @websocket, @database decorators
│ ├── monitors/ # FastAPI app + routes
│ ├── orchestrators/ # Celery app + Database orchestrator
│ ├── services/ # config_service, sync_service, log_service
│ └── structures/ # Base classes (ScheduleBaseWorker, WebSocketBaseWorker, IRepositoryBase)
├── database/ # User databases (server/db/models/repositories)
├── modules/ # Domain logic (sources, processors, schemas, helpers)
├── plugins/ # Utilities tái sử dụng (logging, redis, http_client, websocket)
└── workers/
├── schedules/ # Schedule worker implementations
└── websockets/ # WebSocket worker implementations
Các Khái Niệm Cốt Lõi
Decorator Self-Registration Pattern
Workers và databases tự đăng ký qua decorators. Chỉ cần decorate class và import trong __init__.py:
@schedule→ tạo Celery task, quản lý status trong Redis@websocket→ đăng ký vào registry, start listener thread khi worker boot@database→ inject classmethods (get_engine,create_session,execute_sql,...)
State Management
- Redis = nguồn trạng thái vận hành (worker status, task_id, last_result)
- MySQL (core) = nguồn trạng thái bền vững (config, worker_status, execution_log)
- Khi khởi động, dữ liệu từ DB được sync lên Redis
Config Priority Chain
Khi đọc config, hệ thống tìm theo thứ tự ưu tiên:
- Config riêng worker (worker_id cụ thể) — cao nhất
- Config chung (worker_id = null)
- Biến môi trường (
os.getenv) settings.py— thấp nhất
Non-Blocking Async Log Writer
Mọi log đều được ghi đồng thời vào loguru (console/file) và enqueue vào background thread để batch write xuống bảng execution_log.
Tài Liệu Chi Tiết
| Tài liệu | Mô tả |
|---|---|
| Config | Cấu hình tập trung, biến môi trường, config priority chain |
| Decorators | Hệ thống 3 decorator: @schedule, @websocket, @database |
| Database | Kiến trúc multi-database, UnitOfWork, IRepositoryBase |
| Workers | Schedule & WebSocket workers, hướng dẫn thêm mới |
| Modules | Domain logic: sources, processors, schemas |
| Plugins | Plugins tiện ích: logging, redis, http_client, websocket |
| Services & Orchestrators | Core services & orchestrators |
| API Control Guide | Hướng dẫn tích hợp API cho admin dashboard |