Services & Orchestrators
Lớp "plumbing" kết nối decorators, database, Redis, và Celery. Các services được gọi tự động trong startup sequence và bởi controllers.
1. Config Service
Vị trí: app/core/services/config_service.py
Nhiệm Vụ
Quản lý config key-value với in-memory cache và priority chain (worker DB → global DB → env → settings → default).
Cache
| Cache | Mô tả |
|---|---|
_global_cache | Dict[str, str] — config với worker_id=NULL |
_worker_cache | Dict[str, Dict[str, str]] — config theo worker_id |
_cache_loaded | bool — đã load từ DB chưa |
Cache được load tại startup qua load_config_cache() và cập nhật realtime khi set_config() / delete_config().
Các Hàm
| Hàm | Mô tả |
|---|---|
load_config_cache() | Load tất cả config từ DB vào cache. Gọi lúc startup |
get_config(key, default, worker_id) | Lấy config theo priority chain |
get_db_config(key, worker_id) | Lấy config chỉ từ DB cache (không fallback env/settings) |
get_all_db_configs_for_worker(worker_id) | Lấy tất cả config DB của một worker |
set_config(key, value, description, worker_id) | Tạo/cập nhật config trong DB + cache |
delete_config(key, worker_id) | Xoá config khỏi DB + cache |
get_all_configs(worker_id) | Lấy tất cả configs, tuỳ chọn filter theo worker_id |
2. Sync Service
Vị trí: app/core/services/sync_service.py
Nhiệm Vụ
Đồng bộ trạng thái worker giữa DB (nguồn bền vững) và Redis (nguồn vận hành).
Các Hàm
| Hàm | Mô tả |
|---|---|
clear_worker_redis_keys() | Xoá tất cả worker keys trong Redis (SCAN-based, non-blocking) |
seed_workers_from_registries() | Quét workers đăng ký qua decorator → tạo record mới trong DB nếu chưa có |
sync_db_to_redis() | Đọc worker_status từ DB → push lên Redis. Schedules: status = IDLE/DISABLE. WebSockets: reset về STOP |
on_worker_state_changed(worker_id, worker_type, **changes) | Cập nhật DB khi worker enable/disable, sync lên Redis |
_update_celery_beat(schedule_id, cron_expr) | Thêm/cập nhật schedule trong Celery beat_schedule |
_remove_celery_beat(schedule_id) | Xoá schedule khỏi Celery beat_schedule |
Sync Points
| Thời điểm | Hành động |
|---|---|
| API startup | seed → sync |
| Worker ready | clear Redis → seed → sync → start all WebSockets |
| Beat startup | clear Redis → sync |
| Manual API | POST /sync/db-to-redis, POST /sync/seed-workers, POST /sync/full |
| Worker enable/disable | on_worker_state_changed() |
Quy Tắc Sync
- Cron và URL luôn lấy từ code (decorator) — DB không override
- Chỉ status (enabled/disabled) được lấy từ DB
- WebSocket status luôn reset về
STOPkhi sync (trạng thái runtime do controller quản lý)
3. Log Service
Vị trí: app/core/services/log_service.py
Nhiệm Vụ
Ghi execution log vào DB qua background thread non-blocking.
Cơ Chế
logger.info() → _WrappedLogger._log()
├── loguru (console/file)
└── _enqueue(entry) → _log_queue
Background thread (_writer_loop):
drain queue → batch ≤50 items hoặc 2s timeout → _flush_batch() → DB
Thông Số
| Thông số | Giá trị |
|---|---|
| Queue size tối đa | 10,000 entries |
| Batch size tối đa | 50 entries |
| Flush interval | 2 giây |
| Hành vi khi queue đầy | Drop (non-blocking) |
| Thread | Daemon thread tên log-writer |
Các Hàm
| Hàm | Mô tả |
|---|---|
start_log_writer() | Start background thread. Gọi lúc startup |
stop_log_writer() | Dừng thread, chờ tối đa 5s |
_enqueue(entry) | Đưa log entry vào queue (gọi bởi _WrappedLogger) |
_flush_batch(batch) | Ghi batch vào DB qua ExecutionLogRepository.create_many() |
4. Celery Orchestrator
Vị trí: app/core/orchestrators/celery/
Celery App (app.py)
celery_app = Celery("crawler", broker=redis_url, backend=redis_url)
| Config | Giá trị |
|---|---|
task_serializer | json |
timezone | Asia/Ho_Chi_Minh |
enable_utc | True |
task_track_started | True |
worker_hijack_root_logger | False |
beat_schedule | {} (populated by sync_service) |
Autodiscovery: celery_app.autodiscover_tasks(["app.workers.schedules"])
Include: app.core.decorators.websocket.controller (cho ws_start_task / ws_stop_task)
Worker Signals (signals.py)
@worker_ready — Boot Sequence
- Clear stale Redis keys
- Import core database (trigger
@databaseregistration) - Start log writer
- Load config cache
- Import WebSocket workers (trigger
@websocketregistration) - Seed new workers vào DB
- Sync DB → Redis
- Start tất cả enabled WebSocket workers (spawn daemon threads)
@worker_shutdown
- Stop tất cả WebSocket workers
- Stop log writer
5. Database Orchestrator
Vị trí: app/core/orchestrators/database/
Thành Phần
| File | Class | Mô tả |
|---|---|---|
orchestrator.py | DatabaseOrchestrator | Facade cấp cao — get_unit_of_work(server, db) |
unit_of_work.py | UnitOfWork | Quản lý session + repository access |
factory.py | SessionFactory | Tạo và cache DatabaseAdapter per (server, db) |
adapter.py | DatabaseAdapter | Wraps SQLAlchemy Engine + SessionMaker, hỗ trợ auto_sync |
strategy.py | DatabaseStrategy | Build URL + engine options theo engine type |
registry.py | — | Global SessionFactory instance, auto-populated từ @database registry |
Luồng Hoạt Động
DatabaseOrchestrator.get_unit_of_work("server", "db")
→ SessionFactory.create_session("server", "db")
→ DatabaseAdapter (cached per server+db)
→ SQLAlchemy Engine (lazy init, auto_sync nếu bật)
→ Session
→ UnitOfWork(session)
→ get_repository(MyRepo) → cached repo instance
→ commit() / rollback() / close()
Chi tiết cách sử dụng → xem Database.