Chuyển tới nội dung chính

Plugins

Vị trí: app/plugins/ — các utility tiện ích dùng chung trong toàn bộ ứng dụng.


1. Logging Plugin

Vị trí: app/plugins/logging/

Sử dụng

from app.plugins.logging.logger import logger

logger.info("Processing data...")
logger.warning("Low memory")
logger.error(f"Failed: {error}")
logger.debug("Detail info")
logger.success("Task completed")
logger.critical("System down")
logger.trace("Verbose trace")

Cách Hoạt Động

Mỗi lần gọi logger.info() (hoặc level khác):

  1. Loguru — ghi ra console/file (tuỳ config LOG_TO_CONSOLE, LOG_TO_FILE)
  2. Enqueue DB — đưa vào queue để background thread ghi vào bảng execution_log

Context Variables

Vị trí: app/plugins/logging/context.py

ContextVarMô tả
log_created_fromNguồn log: SCHEDULE_WORKER, WEBSOCKET_LISTENER, SYSTEM
log_created_byWorker ID đã tạo log
_is_db_loggingCờ chống đệ quy — ngăn log writer tạo thêm log

Context được set tự động bởi decorator khi task/worker bắt đầu:

from app.plugins.logging.context import set_log_context
set_log_context(created_from="SCHEDULE_WORKER", created_by="my_schedule")

Anti-Recursion

Log writer ghi log vào DB. Nếu bản thân việc ghi đó tạo log → vòng lặp vô hạn. _is_db_logging ContextVar ngăn chặn điều này: khi writer thread đang ghi DB, mọi log trong thread đó chỉ đi qua loguru, không enqueue lại.


2. Redis Plugin

Vị trí: app/plugins/redis/

Client

from app.plugins.redis.client import get_redis

r = get_redis() # redis.Redis instance với decode_responses=True
r.get("my_key")

Schedule Store (schedule_store.py)

Quản lý trạng thái schedule workers trong Redis (hash per worker).

HàmMô tả
get_status(worker_id)Đọc status
get_pre_run_status(worker_id)Đọc status trước khi run
get_all_fields(worker_id)Lấy toàn bộ hash
set_status(worker_id, status)Cập nhật status
set_task_id(worker_id, task_id)Lưu Celery task ID
set_running(worker_id, pre_run_status, name, cron, task_id)Set trạng thái running
set_completed(worker_id, restore_status, result)Hoàn thành + restore status
set_errored(worker_id, restore_status, error)Lỗi + restore status
set_cancelled(worker_id, restore_status)Huỷ + restore status
acquire_lock(worker_id, ttl)Distributed lock (SET NX)
release_lock(worker_id)Giải phóng lock

WebSocket Store (websocket_store.py)

Quản lý trạng thái WebSocket workers trong Redis.

HàmMô tả
get_status(ws_id)Đọc status
get_all_fields(ws_id)Lấy toàn bộ hash
set_status(ws_id, status)Cập nhật status
set_connected(ws_id, connected)Cập nhật cờ connected
set_last_action(ws_id, event, timestamp)Ghi nhận send/recv
set_full_status(ws_id, status, connected, name, url)Set toàn bộ trạng thái

Store (store.py)

from app.plugins.redis import store
store.clear_all_worker_keys() # Xoá tất cả worker keys (SCAN-based, non-blocking)

3. HTTP Client Plugin

Vị trí: app/plugins/request/http_client.py

Sử Dụng Cơ Bản

from app.plugins.request.http_client import HttpClient

client = HttpClient()
response = client.get("https://api.example.com/data")
data = response.json()

Tham Số Khởi Tạo

Tham sốKiểuMặc địnhMô tả
timeoutintsettings.http_default_timeout (30)Timeout mỗi request (giây)
retriesintsettings.http_default_retries (3)Số lần retry
profile_namestrNoneBrowser profile: chrome, firefox, edge, safari
proxy_listlistsettings.proxy_listDanh sách proxy URLs
proxy_rotationstrsettings.proxy_rotationround_robin hoặc random
request_delaytuple(0, 0)(min, max) delay giữa các request (giây)
session_max_requestsint100Xoay session sau N requests
session_max_ageint300Xoay session sau N giây

HTTP Methods

MethodMô tả
get(url, **kwargs)GET request, trả về Response
post(url, **kwargs)POST request
put(url, **kwargs)PUT request
delete(url, **kwargs)DELETE request
head(url, **kwargs)HEAD request

Convenience Helpers

MethodMô tả
get_json(url, **kwargs)GET + parse JSON
post_json(url, **kwargs)POST + parse JSON
get_text(url, **kwargs)GET + trả về text
get_bytes(url, **kwargs)GET + trả về bytes

Tính Năng

  • Retry: Exponential backoff (1s → 2s → 4s + jitter ngẫu nhiên)
  • Proxy rotation: Round-robin hoặc random
  • Session rotation: Tự động tạo session mới sau N requests hoặc N giây
  • Browser fingerprinting: Giả lập fingerprint trình duyệt qua curl_cffi
  • Rate limiting: Delay ngẫu nhiên giữa các request

Context Manager

with HttpClient(request_delay=(0.5, 1.5)) as client:
data = client.get_json("https://api.example.com/endpoint")
# Sessions tự động close khi exit

4. WebSocket Client Plugin

Vị trí: app/plugins/websocket/ws_client.py

Sử Dụng

from app.plugins.websocket.ws_client import WsClient

ws = WsClient(
url="wss://stream.example.com/ws",
on_message_callback=lambda data: print(data),
on_connect_callback=lambda client: client.send({"subscribe": "all"}),
auto_reconnect=True,
)
ws.run_forever() # Blocks cho đến khi stop()

Tham Số Khởi Tạo

Tham sốKiểuMặc địnhMô tả
urlstrbắt buộcWebSocket URL
on_message_callbackCallablebắt buộcCallback nhận message
on_connect_callbackCallableNoneCallback khi kết nối thành công
on_error_callbackCallableNoneCallback khi lỗi
on_close_callbackCallableNoneCallback khi đóng kết nối
headersdictNoneCustom headers (merge với stealth headers)
originstrNoneOverride Origin header
proxystrNoneProxy URL (http://, socks5://, socks4://)
cookiesdictNoneCookies
ping_intervalint|tuple(20, 40)Ping interval (randomized nếu tuple)
ping_timeoutint10Ping timeout
reconnect_delayint5Delay ban đầu giữa các lần reconnect (giây)
max_reconnect_delayint300Delay tối đa (giây)
max_reconnect_attemptsint0Số lần reconnect tối đa (0 = vô hạn)
auto_reconnectboolTrueTự động reconnect khi mất kết nối
connect_jittertuple(0.0, 3.0)Delay ngẫu nhiên trước khi connect
ssloptdict{"cert_reqs": ssl.CERT_NONE}SSL options

Methods

MethodMô tả
run_forever()Kết nối và lắng nghe (blocking). Auto-reconnect nếu enabled
stop()Dừng kết nối, ngăn reconnect
send(data)Gửi dữ liệu (dict auto-serialize thành JSON, str gửi trực tiếp)

Properties

PropertyMô tả
is_connectedTrạng thái kết nối hiện tại
is_stoppedTrue nếu stop() được gọi chủ đích
last_send_atISO timestamp lần gửi gần nhất
last_recv_atISO timestamp lần nhận gần nhất

Tính Năng Anti-Detection

  • Stealth headers: User-Agent, Origin, Accept-Language giống trình duyệt
  • Proxy support: HTTP, SOCKS4, SOCKS5 (có auth)
  • Connection jitter: Delay ngẫu nhiên trước connect
  • Ping interval randomization: Tránh pattern cố định
  • Custom SSL: Tuỳ chỉnh SSL options

Auto-Reconnect

Khi auto_reconnect=True, reconnect với exponential backoff:

5s → 10s → 20s → 40s → 80s → 160s → 300s (max)

max_reconnect_attempts=0 → reconnect vô hạn.