Chuyển tới nội dung chính

Services & Orchestrators

Lớp "plumbing" kết nối decorators, database, Redis, và Celery. Các services được gọi tự động trong startup sequence và bởi controllers.


1. Config Service

Vị trí: app/core/services/config_service.py

Nhiệm Vụ

Quản lý config key-value với in-memory cache và priority chain (worker DB → global DB → env → settings → default).

Cache

CacheMô tả
_global_cacheDict[str, str] — config với worker_id=NULL
_worker_cacheDict[str, Dict[str, str]] — config theo worker_id
_cache_loadedbool — đã load từ DB chưa

Cache được load tại startup qua load_config_cache() và cập nhật realtime khi set_config() / delete_config().

Các Hàm

HàmMô tả
load_config_cache()Load tất cả config từ DB vào cache. Gọi lúc startup
get_config(key, default, worker_id)Lấy config theo priority chain
get_db_config(key, worker_id)Lấy config chỉ từ DB cache (không fallback env/settings)
get_all_db_configs_for_worker(worker_id)Lấy tất cả config DB của một worker
set_config(key, value, description, worker_id)Tạo/cập nhật config trong DB + cache
delete_config(key, worker_id)Xoá config khỏi DB + cache
get_all_configs(worker_id)Lấy tất cả configs, tuỳ chọn filter theo worker_id

2. Sync Service

Vị trí: app/core/services/sync_service.py

Nhiệm Vụ

Đồng bộ trạng thái worker giữa DB (nguồn bền vững) và Redis (nguồn vận hành).

Các Hàm

HàmMô tả
clear_worker_redis_keys()Xoá tất cả worker keys trong Redis (SCAN-based, non-blocking)
seed_workers_from_registries()Quét workers đăng ký qua decorator → tạo record mới trong DB nếu chưa có
sync_db_to_redis()Đọc worker_status từ DB → push lên Redis. Schedules: status = IDLE/DISABLE. WebSockets: reset về STOP
on_worker_state_changed(worker_id, worker_type, **changes)Cập nhật DB khi worker enable/disable, sync lên Redis
_update_celery_beat(schedule_id, cron_expr)Thêm/cập nhật schedule trong Celery beat_schedule
_remove_celery_beat(schedule_id)Xoá schedule khỏi Celery beat_schedule

Sync Points

Thời điểmHành động
API startupseed → sync
Worker readyclear Redis → seed → sync → start all WebSockets
Beat startupclear Redis → sync
Manual APIPOST /sync/db-to-redis, POST /sync/seed-workers, POST /sync/full
Worker enable/disableon_worker_state_changed()

Quy Tắc Sync

  • CronURL luôn lấy từ code (decorator) — DB không override
  • Chỉ status (enabled/disabled) được lấy từ DB
  • WebSocket status luôn reset về STOP khi sync (trạng thái runtime do controller quản lý)

3. Log Service

Vị trí: app/core/services/log_service.py

Nhiệm Vụ

Ghi execution log vào DB qua background thread non-blocking.

Cơ Chế

logger.info() → _WrappedLogger._log()
├── loguru (console/file)
└── _enqueue(entry) → _log_queue

Background thread (_writer_loop):
drain queue → batch ≤50 items hoặc 2s timeout → _flush_batch() → DB

Thông Số

Thông sốGiá trị
Queue size tối đa10,000 entries
Batch size tối đa50 entries
Flush interval2 giây
Hành vi khi queue đầyDrop (non-blocking)
ThreadDaemon thread tên log-writer

Các Hàm

HàmMô tả
start_log_writer()Start background thread. Gọi lúc startup
stop_log_writer()Dừng thread, chờ tối đa 5s
_enqueue(entry)Đưa log entry vào queue (gọi bởi _WrappedLogger)
_flush_batch(batch)Ghi batch vào DB qua ExecutionLogRepository.create_many()

4. Celery Orchestrator

Vị trí: app/core/orchestrators/celery/

Celery App (app.py)

celery_app = Celery("crawler", broker=redis_url, backend=redis_url)
ConfigGiá trị
task_serializerjson
timezoneAsia/Ho_Chi_Minh
enable_utcTrue
task_track_startedTrue
worker_hijack_root_loggerFalse
beat_schedule{} (populated by sync_service)

Autodiscovery: celery_app.autodiscover_tasks(["app.workers.schedules"])

Include: app.core.decorators.websocket.controller (cho ws_start_task / ws_stop_task)

Worker Signals (signals.py)

@worker_ready — Boot Sequence

  1. Clear stale Redis keys
  2. Import core database (trigger @database registration)
  3. Start log writer
  4. Load config cache
  5. Import WebSocket workers (trigger @websocket registration)
  6. Seed new workers vào DB
  7. Sync DB → Redis
  8. Start tất cả enabled WebSocket workers (spawn daemon threads)

@worker_shutdown

  1. Stop tất cả WebSocket workers
  2. Stop log writer

5. Database Orchestrator

Vị trí: app/core/orchestrators/database/

Thành Phần

FileClassMô tả
orchestrator.pyDatabaseOrchestratorFacade cấp cao — get_unit_of_work(server, db)
unit_of_work.pyUnitOfWorkQuản lý session + repository access
factory.pySessionFactoryTạo và cache DatabaseAdapter per (server, db)
adapter.pyDatabaseAdapterWraps SQLAlchemy Engine + SessionMaker, hỗ trợ auto_sync
strategy.pyDatabaseStrategyBuild URL + engine options theo engine type
registry.pyGlobal SessionFactory instance, auto-populated từ @database registry

Luồng Hoạt Động

DatabaseOrchestrator.get_unit_of_work("server", "db")
→ SessionFactory.create_session("server", "db")
→ DatabaseAdapter (cached per server+db)
→ SQLAlchemy Engine (lazy init, auto_sync nếu bật)
→ Session
→ UnitOfWork(session)
→ get_repository(MyRepo) → cached repo instance
→ commit() / rollback() / close()

Chi tiết cách sử dụng → xem Database.