Authentication
OAuth2 and JWT-based authentication for ARC Protocol.
OAuth 2.0
ARC Protocol recommends OAuth 2.0 Bearer tokens for authentication.
Token Format
Authorization: Bearer <access_token>
Token Validation
Server validates tokens on every request:
from arc import Server
from arc.auth import MultiProviderJWTValidator
# Configure JWT validator
validator = MultiProviderJWTValidator(
providers={
"auth0": {
"jwks_uri": "https://your-domain.auth0.com/.well-known/jwks.json",
"audience": "https://your-api.com",
"issuer": "https://your-domain.auth0.com/"
}
}
)
# Enable authentication
server = Server(server_id="secure-server", enable_auth=True)
server.set_jwt_validator(validator)
JWT Structure
Claims
Required JWT claims:
{
"iss": "https://your-domain.auth0.com/",
"sub": "user-id-123",
"aud": "https://your-api.com",
"exp": 1704542400,
"iat": 1704456000,
"scope": "arc.task.controller arc.chat.controller"
}
Validation
Server validates:
- Signature - Cryptographic signature using JWKS
- Issuer - Token from trusted issuer
- Audience - Token intended for this API
- Expiration - Token not expired
- Scopes - Required scopes present
Identity Providers
Auth0
validator = MultiProviderJWTValidator(
providers={
"auth0": {
"jwks_uri": "https://your-domain.auth0.com/.well-known/jwks.json",
"audience": "https://your-api.com",
"issuer": "https://your-domain.auth0.com/"
}
}
)
Okta
validator = MultiProviderJWTValidator(
providers={
"okta": {
"jwks_uri": "https://your-domain.okta.com/oauth2/default/v1/keys",
"audience": "api://default",
"issuer": "https://your-domain.okta.com/oauth2/default"
}
}
)
Multiple Providers
validator = MultiProviderJWTValidator(
providers={
"auth0": {
"jwks_uri": "https://domain1.auth0.com/.well-known/jwks.json",
"audience": "https://api.com",
"issuer": "https://domain1.auth0.com/"
},
"okta": {
"jwks_uri": "https://domain2.okta.com/oauth2/default/v1/keys",
"audience": "api://default",
"issuer": "https://domain2.okta.com/oauth2/default"
}
}
)
Client Authentication
With Token
from arc import Client
client = Client(
endpoint="https://api.example.com/arc",
token="your-oauth2-access-token"
)
response = await client.task.create(
target_agent="secure-agent",
initial_message={...}
)
Token Refresh
Handle token expiration:
import time
class TokenManager:
def __init__(self):
self.token = None
self.expires_at = 0
async def get_token(self):
if time.time() >= self.expires_at:
# Refresh token
self.token = await refresh_oauth_token()
self.expires_at = time.time() + 3600
return self.token
# Usage
token_manager = TokenManager()
client = Client(
endpoint="https://api.example.com/arc",
token=await token_manager.get_token()
)
Error Handling
Authentication Failures
from arc.exceptions import AuthenticationError, TokenExpiredError
try:
response = await client.task.create(...)
except AuthenticationError as e:
# Invalid or missing token
print(f"Authentication failed: {e.message}")
except TokenExpiredError as e:
# Token expired, refresh and retry
new_token = await refresh_token()
client.token = new_token
response = await client.task.create(...)
Error Codes
-44001Authentication Failed - Invalid or missing token-44004Token Expired - Bearer token expired
Security Best Practices
- Use HTTPS - Never send tokens over HTTP
- Short-Lived Tokens - Access tokens expire in 1 hour or less
- Secure Storage - Store tokens securely (encrypted, not in code)
- Token Refresh - Implement automatic token refresh
- Revocation - Support token revocation
- Audit Logging - Log authentication events
Machine-to-Machine
For agent-to-agent authentication:
# Client credentials flow
import httpx
async def get_m2m_token():
response = await httpx.post(
"https://your-domain.auth0.com/oauth/token",
json={
"client_id": "your-client-id",
"client_secret": "your-client-secret",
"audience": "https://your-api.com",
"grant_type": "client_credentials"
}
)
return response.json()["access_token"]
# Use M2M token
token = await get_m2m_token()
client = Client(endpoint="...", token=token)
Custom Authentication
Implement custom JWT validation:
from arc.auth import JWTValidator
class CustomJWTValidator(JWTValidator):
async def validate_token(self, token: str) -> dict:
# Custom validation logic
decoded = self.decode_token(token)
# Additional custom checks
if not self.is_valid_custom_claim(decoded):
raise AuthenticationError("Invalid custom claim")
return decoded
def is_valid_custom_claim(self, decoded: dict) -> bool:
# Custom validation
return "custom_claim" in decoded
# Use custom validator
validator = CustomJWTValidator(...)
server.set_jwt_validator(validator)