Skip to main content

Agent Communication

Enable agents to communicate with each other while preserving workflow tracing.

Overview

Agents call other agents using ARC Protocol client. The traceId propagates automatically.

Architecture

User → Agent A → Agent B → Agent C
└─────────traceId────────┘

Basic Agent-to-Agent Call

Agent A Calls Agent B

# agent_a_server.py
from arc import Server, Client

server = Server(server_id="agent-a")
agent_b_client = Client("http://localhost:8002/arc")

@server.agent_handler("agent-a", "chat.start")
async def handle_start(params, context):
user_message = params["initialMessage"]["parts"][0]["content"]

# Call Agent B
agent_b_response = await agent_b_client.chat.start(
target_agent="agent-b",
initial_message={
"role": "user",
"parts": [{"type": "text", "content": f"Process: {user_message}"}]
},
trace_id=context.trace_id # Propagate traceId
)

# Use Agent B's response
b_result = agent_b_response.result['chat']['message']['parts'][0]['content']

return {
"type": "chat",
"chat": {
"chatId": params.get("chatId") or generate_id(),
"status": "ACTIVE",
"message": {
"role": "agent",
"parts": [{"type": "text", "content": f"Agent A processed via Agent B: {b_result}"}]
}
}
}

def generate_id():
import uuid
return f"chat-{uuid.uuid4().hex[:8]}"

if __name__ == "__main__":
server.run(host="0.0.0.0", port=8001)

Agent B Implementation

# agent_b_server.py
from arc import Server

server = Server(server_id="agent-b")

@server.agent_handler("agent-b", "chat.start")
async def handle_start(params, context):
message = params["initialMessage"]["parts"][0]["content"]

# Process request
result = f"Agent B processed: {message}"

return {
"type": "chat",
"chat": {
"chatId": params.get("chatId") or generate_id(),
"status": "ACTIVE",
"message": {
"role": "agent",
"parts": [{"type": "text", "content": result}]
}
}
}

def generate_id():
import uuid
return f"chat-{uuid.uuid4().hex[:8]}"

if __name__ == "__main__":
server.run(host="0.0.0.0", port=8002)

Test Agent Communication

# test_communication.py
from arc import Client
import asyncio

async def test():
client = Client("http://localhost:8001/arc")

# Call Agent A, which calls Agent B internally
response = await client.chat.start(
target_agent="agent-a",
initial_message={
"role": "user",
"parts": [{"type": "text", "content": "Hello"}]
}
)

print(response.result['chat']['message']['parts'][0]['content'])
# Output: "Agent A processed via Agent B: Agent B processed: Process: Hello"

asyncio.run(test())

Workflow Tracing

TraceId Propagation

# Automatic traceId propagation
@server.agent_handler("agent-a", "chat.start")
async def handle_start(params, context):
# context.trace_id is automatically available

# Pass to Agent B
response = await agent_b_client.chat.start(
target_agent="agent-b",
initial_message={...},
trace_id=context.trace_id # Propagate
)

return {...}

Observability Integration

import logging

logger = logging.getLogger(__name__)

@server.agent_handler("agent-a", "chat.start")
async def handle_start(params, context):
logger.info(f"[{context.trace_id}] Agent A started")

# Call Agent B
response = await agent_b_client.chat.start(
target_agent="agent-b",
initial_message={...},
trace_id=context.trace_id
)

logger.info(f"[{context.trace_id}] Agent B completed")

return {...}

Multi-Agent Chain

Agent A → Agent B → Agent C

# Agent A calls Agent B
@server_a.agent_handler("agent-a", "chat.start")
async def a_handle(params, context):
response_b = await client_b.chat.start(
target_agent="agent-b",
initial_message={...},
trace_id=context.trace_id
)

return {...}

# Agent B calls Agent C
@server_b.agent_handler("agent-b", "chat.start")
async def b_handle(params, context):
response_c = await client_c.chat.start(
target_agent="agent-c",
initial_message={...},
trace_id=context.trace_id
)

return {...}

# Agent C processes
@server_c.agent_handler("agent-c", "chat.start")
async def c_handle(params, context):
# Final processing
return {...}

All three agents share the same traceId.

Parallel Agent Calls

Call Multiple Agents Simultaneously

import asyncio

@server.agent_handler("orchestrator", "chat.start")
async def handle_start(params, context):
# Call three agents in parallel
results = await asyncio.gather(
agent_a_client.chat.start(
target_agent="agent-a",
initial_message={...},
trace_id=context.trace_id
),
agent_b_client.chat.start(
target_agent="agent-b",
initial_message={...},
trace_id=context.trace_id
),
agent_c_client.chat.start(
target_agent="agent-c",
initial_message={...},
trace_id=context.trace_id
)
)

# Aggregate results
combined = " | ".join([
r.result['chat']['message']['parts'][0]['content']
for r in results
])

return {
"type": "chat",
"chat": {
"chatId": params.get("chatId") or generate_id(),
"status": "ACTIVE",
"message": {
"role": "agent",
"parts": [{"type": "text", "content": combined}]
}
}
}

Error Handling

Handle Agent Failures

from arc.exceptions import ARCException

@server.agent_handler("agent-a", "chat.start")
async def handle_start(params, context):
try:
response = await agent_b_client.chat.start(
target_agent="agent-b",
initial_message={...},
trace_id=context.trace_id
)

return {...}

except ARCException as e:
# Agent B failed
logger.error(f"Agent B error: {e.message}")

# Fallback response
return {
"type": "chat",
"chat": {
"chatId": params.get("chatId") or generate_id(),
"status": "ACTIVE",
"message": {
"role": "agent",
"parts": [{"type": "text", "content": "Service temporarily unavailable"}]
}
}
}

Real-World Example

Finance Agent Calls Payment Agent

# finance_server.py
from arc import Server, Client

server = Server(server_id="finance-server")
payment_client = Client("http://localhost:8100/arc")

@server.agent_handler("finance-agent", "chat.start")
async def handle_start(params, context):
message = params["initialMessage"]["parts"][0]["content"]

if "pay invoice" in message.lower():
# Extract invoice number
invoice_num = message.split()[-1]

# Call payment agent
payment_response = await payment_client.task.send(
target_agent="payment-agent",
method="process_payment",
parameters={"invoice": invoice_num},
trace_id=context.trace_id
)

result = payment_response.result
response_text = f"Payment processed: {result['status']}"
else:
response_text = "Finance agent ready"

return {
"type": "chat",
"chat": {
"chatId": params.get("chatId") or generate_id(),
"status": "ACTIVE",
"message": {
"role": "agent",
"parts": [{"type": "text", "content": response_text}]
}
}
}
# payment_server.py
from arc import Server

server = Server(server_id="payment-server")

@server.task_handler("payment-agent", "process_payment")
async def process_payment(params, context):
invoice = params["invoice"]

# Process payment
return {
"status": "completed",
"invoice": invoice,
"amount": "$1000.00"
}

if __name__ == "__main__":
server.run(host="0.0.0.0", port=8100)

Best Practices

1. Always Propagate TraceId

# Correct
response = await other_agent.chat.start(
target_agent="other",
initial_message={...},
trace_id=context.trace_id # ✓
)

# Wrong
response = await other_agent.chat.start(
target_agent="other",
initial_message={...}
# ✗ Missing trace_id
)

2. Handle Timeouts

import asyncio

try:
response = await asyncio.wait_for(
agent_b_client.chat.start(...),
timeout=5.0 # 5 second timeout
)
except asyncio.TimeoutError:
# Handle timeout
return fallback_response

3. Implement Circuit Breaker

from datetime import datetime, timedelta

class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
self.is_open = False

async def call(self, func, *args, **kwargs):
if self.is_open:
if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
self.is_open = False
self.failure_count = 0
else:
raise Exception("Circuit breaker is open")

try:
result = await func(*args, **kwargs)
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = datetime.now()

if self.failure_count >= self.failure_threshold:
self.is_open = True

raise e

# Usage
breaker = CircuitBreaker()

response = await breaker.call(
agent_b_client.chat.start,
target_agent="agent-b",
initial_message={...}
)

Next Steps