Task Methods
Asynchronous task operations for long-running processes.
task.create()
Create new asynchronous task.
response = await client.task.create(
target_agent="document-analyzer",
initial_message={
"role": "user",
"parts": [{"type": "text", "content": "Analyze this report"}]
},
priority="HIGH",
metadata={"user_id": "user-123", "deadline": "2025-01-10"}
)
task_id = response.result["task"]["taskId"]
status = response.result["task"]["status"] # SUBMITTED
Parameters:
target_agent(str): Target agent IDinitial_message(dict): Initial message objectpriority(Optional[str]): Task priority (LOW, NORMAL, HIGH, URGENT)metadata(Optional[dict]): Custom task metadatatrace_id(Optional[str]): Workflow trace ID
Returns: ARCResponse with task object
Task Object:
{
"taskId": "task-abc123",
"status": "SUBMITTED", # or WORKING, INPUT_REQUIRED, COMPLETED, FAILED, CANCELED
"createdAt": "2025-01-06T00:00:00Z",
"updatedAt": "2025-01-06T00:00:00Z",
"messages": [...],
"artifacts": []
}
task.send()
Send additional message to existing task.
Use when task status is INPUT_REQUIRED - agent needs more information.
response = await client.task.send(
target_agent="document-analyzer",
task_id="task-abc123",
message={
"role": "user",
"parts": [{"type": "text", "content": "Additional context"}]
}
)
Parameters:
target_agent(str): Target agent IDtask_id(str): Task identifiermessage(dict): Message objecttrace_id(Optional[str]): Workflow trace ID
Returns: ARCResponse with success status
task.info()
Get task status and history.
response = await client.task.info(
target_agent="document-analyzer",
task_id="task-abc123",
include_messages=True,
include_artifacts=True
)
status = response.result["task"]["status"]
messages = response.result["task"]["messages"]
artifacts = response.result["task"]["artifacts"]
Parameters:
target_agent(str): Target agent IDtask_id(str): Task identifierinclude_messages(bool): Include message history (default: True)include_artifacts(bool): Include generated artifacts (default: True)trace_id(Optional[str]): Workflow trace ID
Returns: ARCResponse with task object including messages and artifacts
task.cancel()
Cancel running task.
response = await client.task.cancel(
target_agent="document-analyzer",
task_id="task-abc123",
reason="User cancelled request"
)
status = response.result["task"]["status"] # CANCELED
canceled_at = response.result["task"]["canceledAt"]
Parameters:
target_agent(str): Target agent IDtask_id(str): Task identifierreason(Optional[str]): Cancellation reasontrace_id(Optional[str]): Workflow trace ID
Returns: ARCResponse with updated task object
task.subscribe()
Subscribe to task notifications via webhook.
response = await client.task.subscribe(
target_agent="document-analyzer",
task_id="task-abc123",
callback_url="https://myapp.com/webhooks/arc",
events=["TASK_COMPLETED", "TASK_FAILED", "NEW_ARTIFACT"]
)
subscription_id = response.result["subscription"]["subscriptionId"]
Parameters:
target_agent(str): Target agent IDtask_id(str): Task identifiercallback_url(str): Webhook URL for notificationsevents(Optional[List[str]]): Event types to subscribe totrace_id(Optional[str]): Workflow trace ID
Available Events:
TASK_CREATEDTASK_STARTEDTASK_PAUSEDTASK_RESUMEDTASK_COMPLETEDTASK_FAILEDTASK_CANCELEDNEW_MESSAGENEW_ARTIFACTSTATUS_CHANGE
Returns: ARCResponse with subscription object
Subscription Object:
{
"subscriptionId": "sub-xyz789",
"taskId": "task-abc123",
"callbackUrl": "https://myapp.com/webhooks/arc",
"events": ["TASK_COMPLETED", "TASK_FAILED"],
"createdAt": "2025-01-06T00:00:00Z",
"active": true
}
Task Workflow Example
Complete task lifecycle:
from arc import Client
client = Client(endpoint="...", token="...")
# 1. Create task
create_response = await client.task.create(
target_agent="document-analyzer",
initial_message={
"role": "user",
"parts": [{"type": "text", "content": "Analyze quarterly report"}]
},
priority="HIGH"
)
task_id = create_response.result["task"]["taskId"]
print(f"Task created: {task_id}")
# 2. Subscribe to notifications
subscribe_response = await client.task.subscribe(
target_agent="document-analyzer",
task_id=task_id,
callback_url="https://myapp.com/webhooks/arc",
events=["TASK_COMPLETED", "TASK_FAILED"]
)
print(f"Subscribed: {subscribe_response.result['subscription']['subscriptionId']}")
# 3. Poll for status (or wait for webhook)
import asyncio
while True:
info_response = await client.task.info(
target_agent="document-analyzer",
task_id=task_id
)
status = info_response.result["task"]["status"]
print(f"Status: {status}")
if status == "INPUT_REQUIRED":
# Agent needs more info
await client.task.send(
target_agent="document-analyzer",
task_id=task_id,
message={
"role": "user",
"parts": [{"type": "text", "content": "Additional context"}]
}
)
elif status in ["COMPLETED", "FAILED", "CANCELED"]:
# Task finished
break
await asyncio.sleep(2) # Poll every 2 seconds
# 4. Get final results
final_response = await client.task.info(
target_agent="document-analyzer",
task_id=task_id,
include_artifacts=True
)
artifacts = final_response.result["task"]["artifacts"]
print(f"Artifacts: {len(artifacts)}")
Error Handling
from arc.exceptions import (
TaskNotFoundError,
TaskAlreadyCompletedError,
TaskAlreadyCanceledError,
InvalidParamsError
)
try:
response = await client.task.info(
target_agent="analyzer",
task_id="task-123"
)
except TaskNotFoundError as e:
print(f"Task not found: {e.code}")
except InvalidParamsError as e:
print(f"Invalid parameters: {e.message}")
Task States
SUBMITTED → WORKING → COMPLETED
↓
INPUT_REQUIRED
↓
WORKING
Any state → CANCELED
Any state → FAILED