Chuyển tới nội dung chính

Tổng Quan Dự Án

Giới Thiệu

Crawler Worker Management System là hệ thống thu thập dữ liệu tự động, hỗ trợ hai kiểu worker:

  • Schedule Workers — chạy theo lịch trình cron (ví dụ: cào dữ liệu quỹ cuối ngày, tin tức hàng giờ).
  • WebSocket Workers — duy trì kết nối realtime liên tục (ví dụ: nhận dữ liệu khớp lệnh chứng khoán).

Hệ thống cung cấp REST API để giám sát, điều khiển workers và quản lý cấu hình, phục vụ tích hợp vào admin dashboard.

Timezone: Asia/Ho_Chi_Minh

GitHub: simplize-vn/simplize_data_ingestion


Tech Stack

Thành phầnCông nghệ
Ngôn ngữPython 3.12
API ServerFastAPI + Uvicorn
Task QueueCelery 5.3+ (broker & backend: Redis)
Database ORMSQLAlchemy 2.0
Database DriversPyMySQL (MySQL), psycopg2 (PostgreSQL)
ValidationPydantic v2
HTTP Clientcurl_cffi (browser fingerprinting)
WebSocket Clientwebsocket-client
LoggingLoguru
ContainerizationDocker Compose

Mô Hình 3-Process

Hệ thống hoạt động với 3 process chạy đồng thời:

1. API Process (run_api.py)

  • Chạy FastAPI server trên port 8686
  • Cổng giao tiếp duy nhất cho Frontend/Admin
  • Startup: khởi tạo log writer → load config cache → seed workers → sync DB→Redis

2. Worker Process (run_worker.py)

  • Chạy Celery worker (solo pool) để thực thi schedule tasks
  • Quản lý các WebSocket listener threads
  • Startup: clear Redis → load config → seed workers → sync DB→Redis → start tất cả WebSocket workers

3. Beat Process (run_beat.py)

  • Chạy Celery Beat scheduler để trigger các cron jobs
  • Startup: clear Redis → load config → sync DB→Redis → import schedule workers → chạy scheduler loop

Yêu Cầu Hệ Thống

Trước khi chạy dịch vụ, cần cấu hình các thành phần sau trong file .env:

Redis

Dùng làm Celery broker, backend và lưu trạng thái vận hành của worker.

REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0

MySQL (Core DB)

Lưu cấu hình worker, trạng thái bền vững và execution log.

CORE_DB_HOST=localhost
CORE_DB_PORT=3306
CORE_DB_NAME=simplize_core
CORE_DB_USER=root
CORE_DB_PASSWORD=

Cách Chạy

Option 1: Virtual Environment (Local Development)

python -m venv venv
source venv/bin/activate # Linux/macOS
# venv\Scripts\activate # Windows

pip install -r requirements.txt

Chạy 3 terminal đồng thời:

python run_api.py       # API server (port 8686, Swagger tại /docs)
python run_worker.py # Celery worker
python run_beat.py # Celery Beat scheduler

Test mode (không cần worker/beat riêng):

python main.py          # Eager mode — task chạy đồng bộ

Option 2: Docker Compose

docker-compose up -d

Tạo 3 container: data-ingestion-api, data-ingestion-celery-worker, data-ingestion-celery-beat.


Cấu Trúc Thư Mục

app/
├── config/ # Cấu hình tập trung (settings.py, đọc .env)
├── core/
│ ├── database/ # Core MySQL: models & repositories nội bộ
│ ├── decorators/ # @schedule, @websocket, @database decorators
│ ├── monitors/ # FastAPI app + routes
│ ├── orchestrators/ # Celery app + Database orchestrator
│ ├── services/ # config_service, sync_service, log_service
│ └── structures/ # Base classes (ScheduleBaseWorker, WebSocketBaseWorker, IRepositoryBase)
├── database/ # User databases (server/db/models/repositories)
├── modules/ # Domain logic (sources, processors, schemas, helpers)
├── plugins/ # Utilities tái sử dụng (logging, redis, http_client, websocket)
└── workers/
├── schedules/ # Schedule worker implementations
└── websockets/ # WebSocket worker implementations

Các Khái Niệm Cốt Lõi

Decorator Self-Registration Pattern

Workers và databases tự đăng ký qua decorators. Chỉ cần decorate class và import trong __init__.py:

  • @schedule → tạo Celery task, quản lý status trong Redis
  • @websocket → đăng ký vào registry, start listener thread khi worker boot
  • @database → inject classmethods (get_engine, create_session, execute_sql,...)

State Management

  • Redis = nguồn trạng thái vận hành (worker status, task_id, last_result)
  • MySQL (core) = nguồn trạng thái bền vững (config, worker_status, execution_log)
  • Khi khởi động, dữ liệu từ DB được sync lên Redis

Config Priority Chain

Khi đọc config, hệ thống tìm theo thứ tự ưu tiên:

  1. Config riêng worker (worker_id cụ thể) — cao nhất
  2. Config chung (worker_id = null)
  3. Biến môi trường (os.getenv)
  4. settings.py — thấp nhất

Non-Blocking Async Log Writer

Mọi log đều được ghi đồng thời vào loguru (console/file) và enqueue vào background thread để batch write xuống bảng execution_log.


Tài Liệu Chi Tiết

Tài liệuMô tả
ConfigCấu hình tập trung, biến môi trường, config priority chain
DecoratorsHệ thống 3 decorator: @schedule, @websocket, @database
DatabaseKiến trúc multi-database, UnitOfWork, IRepositoryBase
WorkersSchedule & WebSocket workers, hướng dẫn thêm mới
ModulesDomain logic: sources, processors, schemas
PluginsPlugins tiện ích: logging, redis, http_client, websocket
Services & OrchestratorsCore services & orchestrators
API Control GuideHướng dẫn tích hợp API cho admin dashboard