Chuyển tới nội dung chính

Reconnect & Timeout

Thiết Kế Cốt Lõi

Agent chạy hoàn toàn ở phía server (background task). Client chỉ là người xem stream. Điều này có nghĩa:

  • Client có thể ngắt kết nối bất kỳ lúc nào mà không ảnh hưởng đến agent
  • Mọi event được lưu vào Redis Stream với TTL 1 giờ
  • Client có thể reconnect và replay mọi event còn thiếu qua last_event_id

Luồng Reconnect Chuẩn

1. Client lưu event_id cuối cùng nhận được (vào memory/storage)

2. Kết nối bị mất (mạng, tab background, server restart...)

3. Client reconnect WebSocket:
- connected → auth → auth_ok

4. Client tự động subscribe lại:
subscribe { session_id, last_event_id: "01HXYZ..." }

5. Server xử lý:
- status = "running" → replay từ last_event_id + tiếp tục stream live
- status = "completed" → replay từ last_event_id rồi dừng

6. Client nhận đủ events còn thiếu, UI sync lại

Các Tình Huống Mất Kết Nối

Tình Huống 1: Mất Mạng Tạm Thời

Agent vẫn chạy bình thường ở server. Khi reconnect:

{
"type": "subscribe",
"session_id": "abc-123",
"last_event_id": "01HXYZ100"
}

Server replay từ event 01HXYZ101 trở đi, client nhận đủ events còn thiếu.

Tình Huống 2: Server Restart

Khi server khởi động lại, tất cả background tasks bị kill. Redis vẫn còn status: "running".

Client subscribe lại, server phát hiện:

  • Redis = "running" nhưng không có in-process task tương ứng
  • Server thử resume task từ LangGraph checkpoint (điểm dừng cuối cùng)
  • Nếu resume được: subscribed { status: "running" } → agent tiếp tục
  • Nếu không: subscribed { status: "completed" } → hiển thị lịch sử

Tình Huống 3: Tab Ẩn / App Suspend

Kết nối vẫn tồn tại nhưng heartbeat có thể timeout. Sau khi tab active lại:

  • Nếu WS vẫn kết nối → không cần làm gì
  • Nếu WS bị đóng (code 4008) → reconnect + subscribe lại

Tình Huống 4: F5 Reload

1. Load lại app
2. Gọi GET /v2/sessions/{id}/history để lấy history + last_event_id
3. Render history tĩnh
4. Nếu status = "running" → connect WS + subscribe với last_event_id
5. Server replay events còn thiếu → merge vào UI

Chiến Lược Reconnect Cho Client

Exponential Backoff

Lần 1: đợi 1 giây
Lần 2: đợi 2 giây
Lần 3: đợi 4 giây
Lần 4: đợi 8 giây
...
Tối đa: 30 giây

Không Reconnect Khi

  • Code 4001 — auth timeout → cần lấy token mới
  • Code 4003 — token không hợp lệ → cần refresh token, sau đó reconnect

Bảng Tất Cả Timeout

TimeoutGiá TrịMô Tả
Auth timeout10 giâyPhải gửi auth sau connected
Heartbeat interval25 giâyServer gửi ping mỗi 25 giây
Pong timeout20 giâyPhải gửi pong sau khi nhận ping
Heartbeat miss limit2 lầnSau 2 lần miss, server đóng với code 4008
Approval timeout300 giâyTool call thông thường tự reject sau 5 phút
Question timeout600 giâyask_user_question tự reject sau 10 phút
Event log TTL1 giờEvents trong Redis Stream hết hạn sau 1 giờ

Ví Dụ: React.js — Quản Lý Reconnect

class AgentWebSocketManager {
constructor(token, onEvent) {
this.token = token
this.onEvent = onEvent
this.ws = null
this.currentSessionId = null
this.lastEventIds = {} // { session_id: last_event_id }
this.reconnectAttempt = 0
this.maxReconnectDelay = 30000
this.shouldReconnect = true
}

connect() {
this.ws = new WebSocket('wss://api.simplize.vn/v2/ws')

this.ws.onmessage = (e) => {
const msg = JSON.parse(e.data)

// Lưu event_id để dùng khi reconnect
if (msg.event_id && msg.session_id) {
this.lastEventIds[msg.session_id] = msg.event_id
}

switch (msg.type) {
case 'connected':
this.ws.send(JSON.stringify({ type: 'auth', token: this.token }))
break
case 'auth_ok':
this.reconnectAttempt = 0
// Tự động subscribe lại session đang active sau reconnect
if (this.currentSessionId) {
this.subscribe(this.currentSessionId)
}
break
case 'ping':
this.ws.send(JSON.stringify({ type: 'pong' }))
break
default:
this.onEvent(msg)
}
}

this.ws.onclose = (e) => {
// Auth error: không reconnect
if (e.code === 4001 || e.code === 4003) {
this.onEvent({ type: 'auth_error', code: e.code })
return
}
if (this.shouldReconnect) {
this._scheduleReconnect()
}
}
}

_scheduleReconnect() {
const delay = Math.min(
1000 * Math.pow(2, this.reconnectAttempt),
this.maxReconnectDelay
)
this.reconnectAttempt++
setTimeout(() => this.connect(), delay)
}

subscribe(sessionId, lastEventId) {
this.currentSessionId = sessionId
this.ws?.send(JSON.stringify({
type: 'subscribe',
session_id: sessionId,
// Dùng last_event_id đã lưu nếu không truyền vào
last_event_id: lastEventId ?? this.lastEventIds[sessionId] ?? null,
}))
}

chat(message, sessionId) {
this.currentSessionId = sessionId
this.ws?.send(JSON.stringify({
type: 'chat',
message,
session_id: sessionId,
}))
}

// Gọi khi user chủ động thoát/logout
disconnect() {
this.shouldReconnect = false
this.currentSessionId = null
this.ws?.close()
}
}

Ví Dụ: Flutter (Dart) — Quản Lý Reconnect

class AgentWebSocketManager {
final String token;
final void Function(Map<String, dynamic>) onEvent;

WebSocketChannel? _channel;
String? _currentSessionId;
final _lastEventIds = <String, String>{};
int _reconnectAttempt = 0;
bool _shouldReconnect = true;
Timer? _reconnectTimer;

AgentWebSocketManager({required this.token, required this.onEvent});

void connect() {
_channel = WebSocketChannel.connect(
Uri.parse('wss://api.simplize.vn/v2/ws'),
);

_channel!.stream.listen(
(data) => _handleMessage(jsonDecode(data as String) as Map<String, dynamic>),
onDone: _onDisconnected,
onError: (_) => _onDisconnected(),
cancelOnError: false,
);
}

void _handleMessage(Map<String, dynamic> msg) {
// Lưu event_id để dùng khi reconnect
final eventId = msg['event_id'] as String?;
final sessionId = msg['session_id'] as String?;
if (eventId != null && sessionId != null) {
_lastEventIds[sessionId] = eventId;
}

switch (msg['type'] as String) {
case 'connected':
_send({'type': 'auth', 'token': token});
case 'auth_ok':
_reconnectAttempt = 0;
// Auto re-subscribe sau reconnect
if (_currentSessionId != null) {
subscribe(_currentSessionId!);
}
case 'ping':
_send({'type': 'pong'});
default:
onEvent(msg);
}
}

void _onDisconnected() {
if (!_shouldReconnect) return;
final delay = Duration(
milliseconds: min(
1000 * pow(2, _reconnectAttempt).toInt(),
30000,
),
);
_reconnectAttempt++;
_reconnectTimer = Timer(delay, connect);
}

void subscribe(String sessionId, {String? lastEventId}) {
_currentSessionId = sessionId;
_send({
'type': 'subscribe',
'session_id': sessionId,
'last_event_id': lastEventId ?? _lastEventIds[sessionId],
});
}

void chat(String message, {String? sessionId}) {
if (sessionId != null) _currentSessionId = sessionId;
_send({
'type': 'chat',
'message': message,
if (sessionId != null) 'session_id': sessionId,
});
}

void stop(String sessionId) {
_send({'type': 'stop', 'session_id': sessionId});
}

// Gọi khi user logout hoặc thoát app
void disconnect() {
_shouldReconnect = false;
_reconnectTimer?.cancel();
_currentSessionId = null;
_channel?.sink.close();
}

void _send(Map<String, dynamic> data) {
_channel?.sink.add(jsonEncode(data));
}
}

Lưu Ý Quan Trọng

last_event_id Phải Được Lưu Liên Tục

Client nên lưu last_event_id vào localStorage (web) hoặc SharedPreferences (mobile) để dùng được cả sau khi app restart:

// Web
ws.onmessage = (e) => {
const msg = JSON.parse(e.data)
if (msg.event_id) {
localStorage.setItem(`lastEventId_${msg.session_id}`, msg.event_id)
}
}

// Khi subscribe
const lastEventId = localStorage.getItem(`lastEventId_${sessionId}`)
ws.subscribe(sessionId, lastEventId)
// Flutter
Future<void> _saveLastEventId(String sessionId, String eventId) async {
final prefs = await SharedPreferences.getInstance();
await prefs.setString('lastEventId_$sessionId', eventId);
}

Future<String?> _getLastEventId(String sessionId) async {
final prefs = await SharedPreferences.getInstance();
return prefs.getString('lastEventId_$sessionId');
}

_currentSessionId Không Được Xóa Khi Mất Kết Nối

Chỉ xóa currentSessionId khi user chủ động thoát hoặc switch session. Không xóa trong onclose/onDone — đây là cơ chế để auto-resubscribe sau reconnect hoạt động đúng.

Event Log TTL 1 Giờ

Nếu client offline hơn 1 giờ, events cũ có thể đã bị xóa. Trong trường hợp này:

  • Subscribe mà không có last_event_id
  • Hoặc load lại từ history API