ThreadManager
Client-side thread management for WebSocket sessions.
Overview
ThreadManager automatically manages chat threads for WebSocket connections, handling chat.start vs chat.message routing and cleanup.
Features:
- Per-session in-memory storage
- Automatic chat.start vs chat.message selection
- Automatic cleanup on disconnect
- Thread reuse across multiple messages
Initialization
from arc import Client
from arc.client import ThreadManager
# Create per-WebSocket connection
arc_client = Client(endpoint="...", token="...")
thread_manager = ThreadManager(arc_client)
Methods
send_to_agent()
Send message to agent (automatically routes to chat.start or chat.message).
# First message - calls chat.start
response = await thread_manager.send_to_agent(
agent_id="support-agent",
message={
"role": "user",
"parts": [{"type": "text", "content": "Help needed"}]
},
trace_id="trace-123"
)
# Second message - calls chat.message (reuses chat_id)
response = await thread_manager.send_to_agent(
agent_id="support-agent",
message={
"role": "user",
"parts": [{"type": "text", "content": "More info"}]
}
)
Parameters:
agent_id(str): Target agent IDmessage(dict): Message objecttrace_id(Optional[str]): Workflow trace IDstream(bool): Enable SSE streaming (default: False)
Returns: ARCResponse or SSE stream
get_thread_id()
Get stored chat_id for agent.
chat_id = thread_manager.get_thread_id("support-agent")
if chat_id:
print(f"Active chat: {chat_id}")
has_thread()
Check if thread exists for agent.
if thread_manager.has_thread("support-agent"):
print("Thread exists")
end_chat()
End specific chat.
await thread_manager.end_chat(
agent_id="support-agent",
reason="Issue resolved"
)
end_all_chats()
End all active chats (call on disconnect).
await thread_manager.end_all_chats()
cleanup_all()
Alias for end_all_chats().
await thread_manager.cleanup_all()
get_all_agents()
Get list of agents with active threads.
agents = thread_manager.get_all_agents()
print(f"Active agents: {agents}")
WebSocket Integration
FastAPI WebSocket
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from arc import Client
from arc.client import ThreadManager
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# Create per-connection instances
arc_client = Client(endpoint="https://api.example.com/arc", token="token")
thread_manager = ThreadManager(arc_client)
try:
while True:
# Receive message
data = await websocket.receive_json()
# Route to agent (automatic chat.start vs chat.message)
response = await thread_manager.send_to_agent(
agent_id=data["agent_id"],
message=data["message"],
trace_id=data.get("trace_id")
)
# Send response
await websocket.send_json(response.to_dict())
except WebSocketDisconnect:
# Cleanup on disconnect
await thread_manager.end_all_chats()
finally:
await arc_client.close()
Starlette WebSocket
from starlette.applications import Starlette
from starlette.websockets import WebSocket, WebSocketDisconnect
from starlette.routing import WebSocketRoute
from arc import Client
from arc.client import ThreadManager
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
arc_client = Client(endpoint="https://api.example.com/arc", token="token")
thread_manager = ThreadManager(arc_client)
try:
while True:
data = await websocket.receive_json()
response = await thread_manager.send_to_agent(
agent_id=data["agent_id"],
message=data["message"]
)
await websocket.send_json(response.to_dict())
except WebSocketDisconnect:
await thread_manager.end_all_chats()
finally:
await arc_client.close()
app = Starlette(routes=[
WebSocketRoute("/ws", websocket_endpoint)
])
Streaming Integration
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
arc_client = Client(endpoint="...", token="...")
thread_manager = ThreadManager(arc_client)
try:
while True:
data = await websocket.receive_json()
# Stream response
async for event in await thread_manager.send_to_agent(
agent_id=data["agent_id"],
message=data["message"],
stream=True
):
if event.event == "content":
await websocket.send_json({
"type": "content",
"data": event.data
})
elif event.event == "done":
await websocket.send_json({
"type": "done",
"data": event.data
})
break
elif event.event == "error":
await websocket.send_json({
"type": "error",
"data": event.data
})
break
except WebSocketDisconnect:
await thread_manager.end_all_chats()
Multi-Agent Example
Handle multiple agents in single WebSocket:
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
arc_client = Client(endpoint="...", token="...")
thread_manager = ThreadManager(arc_client)
try:
while True:
data = await websocket.receive_json()
# Route to different agents
agent_id = data["agent_id"] # "support", "finance", "hr", etc.
response = await thread_manager.send_to_agent(
agent_id=agent_id,
message=data["message"]
)
await websocket.send_json({
"agent_id": agent_id,
"response": response.to_dict()
})
except WebSocketDisconnect:
# End all chats with all agents
await thread_manager.end_all_chats()
Best Practices
DO
-
Create per WebSocket connection
@app.websocket("/ws")
async def handler(websocket):
thread_manager = ThreadManager(arc_client) # Per connection -
Always cleanup on disconnect
except WebSocketDisconnect:
await thread_manager.end_all_chats() -
Use trace IDs for debugging
await thread_manager.send_to_agent(
agent_id="...",
message=...,
trace_id=workflow_id
)
DON'T
-
Don't share across users
# Bad: Global ThreadManager
global_thread_manager = ThreadManager(...) # ❌ -
Don't forget cleanup
# Bad: No cleanup
except WebSocketDisconnect:
pass # ❌ Leaves open chats
Architecture
User → WebSocket → ThreadManager → ARCClient → Agent Server
↓
{agent_id: chat_id}
ThreadManager maintains in-memory mapping of agent_id to chat_id for the duration of the WebSocket connection.