Plugins
Vị trí: app/plugins/ — các utility tiện ích dùng chung trong toàn bộ ứng dụng.
1. Logging Plugin
Vị trí: app/plugins/logging/
Sử dụng
from app.plugins.logging.logger import logger
logger.info("Processing data...")
logger.warning("Low memory")
logger.error(f"Failed: {error}")
logger.debug("Detail info")
logger.success("Task completed")
logger.critical("System down")
logger.trace("Verbose trace")
Cách Hoạt Động
Mỗi lần gọi logger.info() (hoặc level khác):
- Loguru — ghi ra console/file (tuỳ config
LOG_TO_CONSOLE,LOG_TO_FILE) - Enqueue DB — đưa vào queue để background thread ghi vào bảng
execution_log
Context Variables
Vị trí: app/plugins/logging/context.py
| ContextVar | Mô tả |
|---|---|
log_created_from | Nguồn log: SCHEDULE_WORKER, WEBSOCKET_LISTENER, SYSTEM |
log_created_by | Worker ID đã tạo log |
_is_db_logging | Cờ chống đệ quy — ngăn log writer tạo thêm log |
Context được set tự động bởi decorator khi task/worker bắt đầu:
from app.plugins.logging.context import set_log_context
set_log_context(created_from="SCHEDULE_WORKER", created_by="my_schedule")
Anti-Recursion
Log writer ghi log vào DB. Nếu bản thân việc ghi đó tạo log → vòng lặp vô hạn. _is_db_logging ContextVar ngăn chặn điều này: khi writer thread đang ghi DB, mọi log trong thread đó chỉ đi qua loguru, không enqueue lại.
2. Redis Plugin
Vị trí: app/plugins/redis/
Client
from app.plugins.redis.client import get_redis
r = get_redis() # redis.Redis instance với decode_responses=True
r.get("my_key")
Schedule Store (schedule_store.py)
Quản lý trạng thái schedule workers trong Redis (hash per worker).
| Hàm | Mô tả |
|---|---|
get_status(worker_id) | Đọc status |
get_pre_run_status(worker_id) | Đọc status trước khi run |
get_all_fields(worker_id) | Lấy toàn bộ hash |
set_status(worker_id, status) | Cập nhật status |
set_task_id(worker_id, task_id) | Lưu Celery task ID |
set_running(worker_id, pre_run_status, name, cron, task_id) | Set trạng thái running |
set_completed(worker_id, restore_status, result) | Hoàn thành + restore status |
set_errored(worker_id, restore_status, error) | Lỗi + restore status |
set_cancelled(worker_id, restore_status) | Huỷ + restore status |
acquire_lock(worker_id, ttl) | Distributed lock (SET NX) |
release_lock(worker_id) | Giải phóng lock |
WebSocket Store (websocket_store.py)
Quản lý trạng thái WebSocket workers trong Redis.
| Hàm | Mô tả |
|---|---|
get_status(ws_id) | Đọc status |
get_all_fields(ws_id) | Lấy toàn bộ hash |
set_status(ws_id, status) | Cập nhật status |
set_connected(ws_id, connected) | Cập nhật cờ connected |
set_last_action(ws_id, event, timestamp) | Ghi nhận send/recv |
set_full_status(ws_id, status, connected, name, url) | Set toàn bộ trạng thái |
Store (store.py)
from app.plugins.redis import store
store.clear_all_worker_keys() # Xoá tất cả worker keys (SCAN-based, non-blocking)
3. HTTP Client Plugin
Vị trí: app/plugins/request/http_client.py
Sử Dụng Cơ Bản
from app.plugins.request.http_client import HttpClient
client = HttpClient()
response = client.get("https://api.example.com/data")
data = response.json()
Tham Số Khởi Tạo
| Tham số | Kiểu | Mặc định | Mô tả |
|---|---|---|---|
timeout | int | settings.http_default_timeout (30) | Timeout mỗi request (giây) |
retries | int | settings.http_default_retries (3) | Số lần retry |
profile_name | str | None | Browser profile: chrome, firefox, edge, safari |
proxy_list | list | settings.proxy_list | Danh sách proxy URLs |
proxy_rotation | str | settings.proxy_rotation | round_robin hoặc random |
request_delay | tuple | (0, 0) | (min, max) delay giữa các request (giây) |
session_max_requests | int | 100 | Xoay session sau N requests |
session_max_age | int | 300 | Xoay session sau N giây |
HTTP Methods
| Method | Mô tả |
|---|---|
get(url, **kwargs) | GET request, trả về Response |
post(url, **kwargs) | POST request |
put(url, **kwargs) | PUT request |
delete(url, **kwargs) | DELETE request |
head(url, **kwargs) | HEAD request |
Convenience Helpers
| Method | Mô tả |
|---|---|
get_json(url, **kwargs) | GET + parse JSON |
post_json(url, **kwargs) | POST + parse JSON |
get_text(url, **kwargs) | GET + trả về text |
get_bytes(url, **kwargs) | GET + trả về bytes |
Tính Năng
- Retry: Exponential backoff (1s → 2s → 4s + jitter ngẫu nhiên)
- Proxy rotation: Round-robin hoặc random
- Session rotation: Tự động tạo session mới sau N requests hoặc N giây
- Browser fingerprinting: Giả lập fingerprint trình duyệt qua curl_cffi
- Rate limiting: Delay ngẫu nhiên giữa các request
Context Manager
with HttpClient(request_delay=(0.5, 1.5)) as client:
data = client.get_json("https://api.example.com/endpoint")
# Sessions tự động close khi exit
4. WebSocket Client Plugin
Vị trí: app/plugins/websocket/ws_client.py
Sử Dụng
from app.plugins.websocket.ws_client import WsClient
ws = WsClient(
url="wss://stream.example.com/ws",
on_message_callback=lambda data: print(data),
on_connect_callback=lambda client: client.send({"subscribe": "all"}),
auto_reconnect=True,
)
ws.run_forever() # Blocks cho đến khi stop()
Tham Số Khởi Tạo
| Tham số | Kiểu | Mặc định | Mô tả |
|---|---|---|---|
url | str | bắt buộc | WebSocket URL |
on_message_callback | Callable | bắt buộc | Callback nhận message |
on_connect_callback | Callable | None | Callback khi kết nối thành công |
on_error_callback | Callable | None | Callback khi lỗi |
on_close_callback | Callable | None | Callback khi đóng kết nối |
headers | dict | None | Custom headers (merge với stealth headers) |
origin | str | None | Override Origin header |
proxy | str | None | Proxy URL (http://, socks5://, socks4://) |
cookies | dict | None | Cookies |
ping_interval | int|tuple | (20, 40) | Ping interval (randomized nếu tuple) |
ping_timeout | int | 10 | Ping timeout |
reconnect_delay | int | 5 | Delay ban đầu giữa các lần reconnect (giây) |
max_reconnect_delay | int | 300 | Delay tối đa (giây) |
max_reconnect_attempts | int | 0 | Số lần reconnect tối đa (0 = vô hạn) |
auto_reconnect | bool | True | Tự động reconnect khi mất kết nối |
connect_jitter | tuple | (0.0, 3.0) | Delay ngẫu nhiên trước khi connect |
sslopt | dict | {"cert_reqs": ssl.CERT_NONE} | SSL options |
Methods
| Method | Mô tả |
|---|---|
run_forever() | Kết nối và lắng nghe (blocking). Auto-reconnect nếu enabled |
stop() | Dừng kết nối, ngăn reconnect |
send(data) | Gửi dữ liệu (dict auto-serialize thành JSON, str gửi trực tiếp) |
Properties
| Property | Mô tả |
|---|---|
is_connected | Trạng thái kết nối hiện tại |
is_stopped | True nếu stop() được gọi chủ đích |
last_send_at | ISO timestamp lần gửi gần nhất |
last_recv_at | ISO timestamp lần nhận gần nhất |
Tính Năng Anti-Detection
- Stealth headers: User-Agent, Origin, Accept-Language giống trình duyệt
- Proxy support: HTTP, SOCKS4, SOCKS5 (có auth)
- Connection jitter: Delay ngẫu nhiên trước connect
- Ping interval randomization: Tránh pattern cố định
- Custom SSL: Tuỳ chỉnh SSL options
Auto-Reconnect
Khi auto_reconnect=True, reconnect với exponential backoff:
5s → 10s → 20s → 40s → 80s → 160s → 300s (max)
max_reconnect_attempts=0 → reconnect vô hạn.