Client Library Guide¶
The ARCP Python client library provides a comprehensive interface for interacting with ARCP servers. This guide covers all client features with practical examples.
๐ Installation¶
# Install from PyPI
pip install arcp-py
# Or install from source
git clone https://github.com/0x00K1/ARCP.git
cd ARCP
pip install -e .
๐ง Basic Usage¶
Simple Client Setup¶
import asyncio
from arcp import ARCPClient
async def main():
# Create client instance
client = ARCPClient("http://localhost:8001")
# Use async context manager for automatic cleanup
async with client:
# Check server health
health = await client.health_check()
print(f"Server status: {health['status']}")
# Discover agents
agents = await client.discover_agents()
print(f"Found {len(agents)} agents")
asyncio.run(main())
Client Configuration¶
from arcp import ARCPClient
# Configure client with custom settings
client = ARCPClient(
base_url="http://localhost:8001",
timeout=30.0, # Request timeout
retry_attempts=3, # Number of retries
retry_delay=1.0, # Initial retry delay
max_retry_delay=60.0, # Maximum retry delay
user_agent="MyApp/1.0" # Custom user agent
)
๐ Authentication¶
Admin Authentication¶
async def admin_example():
client = ARCPClient("http://localhost:8001")
try:
# Login as admin (credentials may vary by deployment)
await client.login_admin("ARCP", "ARCP")
# Now you can access admin endpoints
stats = await client.get_public_stats()
print(f"System stats: {stats}")
except Exception as e:
print(f"Authentication failed: {e}")
print("Check your admin credentials and server configuration")
asyncio.run(admin_example())
Agent Authentication¶
async def agent_example():
client = ARCPClient("http://localhost:8001")
# Request temporary token for agent registration
temp_token = await client.request_temp_token(
agent_id="my-agent",
agent_type="testing",
agent_key="test-agent-001"
)
print(f"Temporary token: {temp_token}")
# Use the token for agent operations
# (Token is automatically used in subsequent requests)
asyncio.run(agent_example())
Token Validation¶
async def validate_token_example():
client = ARCPClient("http://localhost:8001")
# Validate a token
result = await client.validate_token("your-jwt-token")
print(f"Token valid: {result}")
# Check if client is authenticated
if client.is_authenticated():
print("Client is authenticated")
else:
print("Client is not authenticated")
asyncio.run(validate_token_example())
๐ค Agent Management¶
Register an Agent¶
from arcp import AgentRequirements
async def register_agent_example():
client = ARCPClient("http://localhost:8001")
# Register a new agent
agent = await client.register_agent(
agent_id="my-testing-agent",
name="My Testing Agent",
agent_type="testing",
endpoint="https://my-agent.example.com",
capabilities=["data-processing", "file-conversion", "text-analysis"],
context_brief="A testing agent for data processing and file conversion",
version="1.0.0",
owner="My Organization",
public_key="my-agent-public-key-that-is-long-enough-32-chars",
communication_mode="remote",
metadata={
"framework": "fastapi",
"language": "python",
"deployment": "docker",
"created_at": "2025-01-XX"
},
features=["http-api", "json-responses", "async-processing"],
max_tokens=2000,
language_support=["en", "es", "fr"],
rate_limit=100,
requirements=AgentRequirements(
system_requirements=["Python 3.11+", "FastAPI"],
permissions=["http-server", "file-access"],
dependencies=["fastapi", "pydantic", "uvicorn"],
minimum_memory_mb=512,
requires_internet=True,
network_ports=["8080"]
),
policy_tags=["testing", "data-processing"],
agent_key="test-agent-001"
)
print(f"Agent registered: {agent.name}")
print(f"Status: {agent.status}")
print(f"Registered at: {agent.registered_at}")
asyncio.run(register_agent_example())
Get Agent Information¶
async def get_agent_example():
client = ARCPClient("http://localhost:8001")
# Get specific agent
agent = await client.get_agent("my-testing-agent", include_metrics=True)
print(f"Agent: {agent.name}")
print(f"Status: {agent.status}")
print(f"Capabilities: {agent.capabilities}")
print(f"Metrics: {agent.metrics}")
# List all agents
agents = await client.list_agents(
agent_type="testing",
status="alive",
include_metrics=True
)
print(f"Found {len(agents)} testing agents:")
for agent in agents:
print(f" โข {agent.name} - {agent.status}")
asyncio.run(get_agent_example())
Update Agent Heartbeat¶
async def heartbeat_example():
client = ARCPClient("http://localhost:8001")
# Update heartbeat
response = await client.update_heartbeat("my-testing-agent")
print(f"Heartbeat updated: {response}")
# Start automatic heartbeat task
heartbeat_task = await client.start_heartbeat_task(
"my-testing-agent",
interval=30.0 # Update every 30 seconds
)
# Keep running for 2 minutes
await asyncio.sleep(120)
# Stop heartbeat task
heartbeat_task.cancel()
print("Heartbeat task stopped")
asyncio.run(heartbeat_example())
Update Agent Metrics¶
async def metrics_example():
client = ARCPClient("http://localhost:8001")
# Update metrics
metrics = await client.update_metrics("my-testing-agent", {
"total_requests": 150,
"success_rate": 0.98,
"avg_response_time": 0.25,
"last_active": "2025-01-XX...",
"custom_metric": "value"
})
print(f"Metrics updated: {metrics}")
# Get current metrics
current_metrics = await client.get_metrics("my-testing-agent")
print(f"Current metrics: {current_metrics}")
asyncio.run(metrics_example())
Unregister Agent¶
async def unregister_example():
client = ARCPClient("http://localhost:8001")
# Unregister agent
response = await client.unregister_agent("my-testing-agent")
print(f"Unregistration response: {response}")
asyncio.run(unregister_example())
๐ Agent Discovery and Search¶
Basic Discovery¶
async def discovery_example():
client = ARCPClient("http://localhost:8001")
# Discover all agents
agents = await client.discover_agents(limit=50)
print(f"Found {len(agents)} agents")
# Filter by agent type
testing_agents = await client.discover_agents(
agent_type="testing",
limit=10
)
print(f"Found {len(testing_agents)} testing agents")
# Filter by capabilities
data_agents = await client.discover_agents(
capabilities=["data-processing"],
limit=10
)
print(f"Found {len(data_agents)} data processing agents")
asyncio.run(discovery_example())
Semantic Search¶
async def search_example():
client = ARCPClient("http://localhost:8001")
# Basic search
results = await client.search_agents(
query="find agents that can process text files",
top_k=5,
min_similarity=0.5
)
print(f"Search results: {len(results)}")
for result in results:
print(f" โข {result.name} (similarity: {result.similarity:.3f})")
print(f" Capabilities: {', '.join(result.capabilities)}")
# Advanced search with filters
results = await client.search_agents(
query="data analysis and visualization",
top_k=10,
min_similarity=0.3,
capabilities=["data-analysis", "visualization"],
agent_type="testing",
weighted=True, # Use reputation scores
public_api=True # Use public API (no authentication required)
)
print(f"Advanced search results: {len(results)}")
for result in results:
print(f" โข {result.name} (similarity: {result.similarity:.3f})")
if result.metrics:
print(f" Reputation: {result.metrics.reputation_score:.2f}")
asyncio.run(search_example())
Get Public Agent Information¶
async def public_agent_example():
client = ARCPClient("http://localhost:8001")
# Get public agent info (no authentication required)
agent = await client.get_public_agent("my-testing-agent")
print(f"Public agent info: {agent.name}")
print(f"Endpoint: {agent.endpoint}")
print(f"Capabilities: {agent.capabilities}")
asyncio.run(public_agent_example())
๐ Agent Communication¶
Request Agent Connection¶
async def connection_example():
client = ARCPClient("http://localhost:8001")
# Request connection to an agent
response = await client.request_agent_connection(
agent_id="my-testing-agent",
user_id="user-123",
user_endpoint="https://my-app.example.com/callback",
display_name="My Application",
additional_info={
"purpose": "data processing",
"priority": "high",
"expected_duration": "30 minutes"
}
)
print(f"Connection response: {response}")
print(f"Status: {response.get('status')}")
print(f"Next steps: {response.get('next_steps')}")
asyncio.run(connection_example())
๐ System Information¶
Health Check¶
async def health_example():
client = ARCPClient("http://localhost:8001")
# Check server health
health = await client.health_check()
print(f"Server health: {health}")
# Get system information
info = await client.get_system_info()
print(f"System info: {info}")
# Get public statistics
stats = await client.get_public_stats()
print(f"Public stats: {stats}")
# Get allowed agent types
agent_types = await client.get_allowed_agent_types()
print(f"Allowed agent types: {agent_types}")
asyncio.run(health_example())
๐ WebSocket Communication¶
Public WebSocket¶
async def public_websocket_example():
client = ARCPClient("http://localhost:8001")
print("Connecting to public WebSocket...")
message_count = 0
async for message in client.websocket_public():
message_count += 1
print(f"Message {message_count}: {message.get('type', 'unknown')}")
# Process different message types
if message.get('type') == 'stats_update':
data = message.get('data', {})
print(f" System stats: {data.get('total_agents', 0)} agents")
elif message.get('type') == 'agents_update':
data = message.get('data', {})
print(f" Agents update: {data.get('total_count', 0)} agents")
# Stop after 10 messages
if message_count >= 10:
break
print(f"Received {message_count} messages")
asyncio.run(public_websocket_example())
Agent WebSocket¶
async def agent_websocket_example():
client = ARCPClient("http://localhost:8001")
# First authenticate
await client.login_admin("ARCP", "ARCP")
print("Connecting to agent WebSocket...")
message_count = 0
async for message in client.websocket_agent():
message_count += 1
print(f"Agent message {message_count}: {message}")
# Stop after 5 messages
if message_count >= 5:
break
print(f"Received {message_count} agent messages")
asyncio.run(agent_websocket_example())
๐ ๏ธ Advanced Usage Patterns¶
Agent Pool Management¶
class AgentPool:
def __init__(self, client: ARCPClient, capability: str):
self.client = client
self.capability = capability
self.agents = []
self.current_index = 0
async def refresh_agents(self):
"""Refresh the list of available agents"""
results = await self.client.search_agents(
query=f"agents with {self.capability} capability",
capabilities=[self.capability],
weighted=True,
top_k=10
)
self.agents = results
print(f"Refreshed agent pool: {len(self.agents)} agents")
async def get_next_agent(self):
"""Get the next available agent (round-robin)"""
if not self.agents:
await self.refresh_agents()
if not self.agents:
raise Exception(f"No agents available for capability: {self.capability}")
# Round-robin selection
agent = self.agents[self.current_index]
self.current_index = (self.current_index + 1) % len(self.agents)
return agent
async def execute_with_failover(self, request: dict):
"""Execute request with automatic failover"""
max_retries = len(self.agents)
for attempt in range(max_retries):
try:
agent = await self.get_next_agent()
result = await self.execute_on_agent(agent, request)
return result
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
# Remove failed agent from pool
if agent in self.agents:
self.agents.remove(agent)
if attempt == max_retries - 1:
raise Exception(f"All agents failed for capability: {self.capability}")
async def execute_on_agent(self, agent, request: dict):
"""Execute request on specific agent"""
import httpx
async with httpx.AsyncClient() as http_client:
response = await http_client.post(
f"{agent.url}/api/process",
json=request,
timeout=30.0
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Agent request failed: {response.status_code}")
# Usage
async def agent_pool_example():
client = ARCPClient("http://localhost:8001")
# Create agent pool for data processing
pool = AgentPool(client, "data-processing")
# Execute request with failover
result = await pool.execute_with_failover({
"data": "sample data",
"operation": "process"
})
print(f"Result: {result}")
asyncio.run(agent_pool_example())
Agent Monitoring¶
class AgentMonitor:
def __init__(self, client: ARCPClient):
self.client = client
self.agents = {}
self.monitoring = False
async def start_monitoring(self):
"""Start monitoring agent status"""
self.monitoring = True
# Start WebSocket monitoring
async for message in self.client.websocket_public():
if not self.monitoring:
break
if message.get('type') == 'agents_update':
await self.update_agent_status(message.get('data', {}))
async def update_agent_status(self, data: dict):
"""Update agent status from WebSocket message"""
# This would parse the WebSocket message and update internal state
print(f"Agent status update: {data}")
async def get_agent_health(self, agent_id: str):
"""Get detailed health information for an agent"""
try:
agent = await self.client.get_agent(agent_id, include_metrics=True)
health_info = {
"agent_id": agent.agent_id,
"status": agent.status,
"last_seen": agent.last_seen,
"metrics": agent.metrics,
"capabilities": agent.capabilities,
"endpoint": agent.endpoint
}
return health_info
except Exception as e:
return {"error": str(e)}
def stop_monitoring(self):
"""Stop monitoring"""
self.monitoring = False
# Usage
async def monitoring_example():
client = ARCPClient("http://localhost:8001")
monitor = AgentMonitor(client)
# Start monitoring in background
monitoring_task = asyncio.create_task(monitor.start_monitoring())
# Get health for specific agent
health = await monitor.get_agent_health("my-testing-agent")
print(f"Agent health: {health}")
# Stop monitoring after 30 seconds
await asyncio.sleep(30)
monitor.stop_monitoring()
monitoring_task.cancel()
asyncio.run(monitoring_example())
Batch Operations¶
async def batch_operations_example():
client = ARCPClient("http://localhost:8001")
# Batch register multiple agents
agents_to_register = [
{
"agent_id": "agent-1",
"name": "Agent 1",
"agent_type": "testing",
"endpoint": "http://agent1.example.com",
"capabilities": ["processing"],
"context_brief": "Agent 1 for processing",
"agent_key": "test-agent-001"
},
{
"agent_id": "agent-2",
"name": "Agent 2",
"agent_type": "testing",
"endpoint": "http://agent2.example.com",
"capabilities": ["analysis"],
"context_brief": "Agent 2 for analysis",
"agent_key": "test-agent-002"
}
]
# Register agents concurrently
tasks = []
for agent_data in agents_to_register:
task = client.register_agent(**agent_data)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Agent {i+1} registration failed: {result}")
else:
print(f"Agent {i+1} registered: {result.name}")
# Batch update heartbeats
agent_ids = ["agent-1", "agent-2"]
heartbeat_tasks = [
client.update_heartbeat(agent_id)
for agent_id in agent_ids
]
heartbeat_results = await asyncio.gather(*heartbeat_tasks, return_exceptions=True)
for i, result in enumerate(heartbeat_results):
if isinstance(result, Exception):
print(f"Heartbeat {i+1} failed: {result}")
else:
print(f"Heartbeat {i+1} updated: {result}")
asyncio.run(batch_operations_example())
๐งช Testing¶
Unit Tests¶
import pytest
from unittest.mock import AsyncMock, patch
from arcp import ARCPClient
@pytest.mark.asyncio
async def test_client_creation():
"""Test client creation"""
client = ARCPClient("http://localhost:8001")
assert client.base_url == "http://localhost:8001"
assert client.timeout == 30.0
@pytest.mark.asyncio
async def test_health_check():
"""Test health check"""
client = ARCPClient("http://localhost:8001")
with patch('httpx.AsyncClient') as mock_client:
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json.return_value = {"status": "healthy"}
mock_client.return_value.__aenter__.return_value.request.return_value = mock_response
health = await client.health_check()
assert health["status"] == "healthy"
@pytest.mark.asyncio
async def test_agent_discovery():
"""Test agent discovery"""
client = ARCPClient("http://localhost:8001")
with patch('httpx.AsyncClient') as mock_client:
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json.return_value = [
{
"agent_id": "test-agent",
"name": "Test Agent",
"status": "alive"
}
]
mock_client.return_value.__aenter__.return_value.request.return_value = mock_response
agents = await client.discover_agents()
assert len(agents) == 1
assert agents[0].agent_id == "test-agent"
Integration Tests¶
@pytest.mark.asyncio
async def test_full_agent_lifecycle():
"""Test complete agent lifecycle"""
client = ARCPClient("http://localhost:8001")
try:
# Register agent
agent = await client.register_agent(
agent_id="test-lifecycle-agent",
name="Test Lifecycle Agent",
agent_type="testing",
endpoint="http://localhost:8080",
capabilities=["test"],
context_brief="Test agent for lifecycle testing",
agent_key="test-agent-001"
)
assert agent.agent_id == "test-lifecycle-agent"
assert agent.status == "alive"
# Update heartbeat
heartbeat = await client.update_heartbeat("test-lifecycle-agent")
assert heartbeat["status"] == "success"
# Update metrics
metrics = await client.update_metrics("test-lifecycle-agent", {
"total_requests": 10,
"success_rate": 1.0
})
assert metrics["status"] == "success"
# Get agent
retrieved_agent = await client.get_agent("test-lifecycle-agent")
assert retrieved_agent.agent_id == "test-lifecycle-agent"
# Search for agent
search_results = await client.search_agents("test lifecycle")
assert len(search_results) > 0
assert any(r.id == "test-lifecycle-agent" for r in search_results)
finally:
# Cleanup
await client.unregister_agent("test-lifecycle-agent")
๐ Performance Optimization¶
Connection Pooling¶
import httpx
from arcp import ARCPClient
class OptimizedARCPClient(ARCPClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._connection_pool = None
async def _ensure_client(self):
"""Ensure HTTP client with connection pooling"""
if self._client is None:
# Create client with connection pooling
limits = httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30.0
)
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(self.timeout),
limits=limits,
headers={
"User-Agent": self.user_agent,
"X-Client-Fingerprint": self._client_fingerprint,
},
)
Caching¶
import asyncio
import time
from typing import Dict, Any, Optional
from arcp import ARCPClient
class CachedARCPClient(ARCPClient):
def __init__(self, *args, cache_ttl: int = 300, **kwargs):
super().__init__(*args, **kwargs)
self.cache_ttl = cache_ttl
self._cache: Dict[str, tuple] = {} # key -> (value, timestamp)
def _get_cache_key(self, method: str, **kwargs) -> str:
"""Generate cache key"""
return f"{method}:{hash(str(sorted(kwargs.items())))}"
def _is_cache_valid(self, timestamp: float) -> bool:
"""Check if cache entry is valid"""
return time.time() - timestamp < self.cache_ttl
async def discover_agents(self, *args, **kwargs):
"""Cached agent discovery"""
cache_key = self._get_cache_key("discover_agents", **kwargs)
if cache_key in self._cache:
value, timestamp = self._cache[cache_key]
if self._is_cache_valid(timestamp):
return value
# Fetch from server
result = await super().discover_agents(*args, **kwargs)
# Cache result
self._cache[cache_key] = (result, time.time())
return result
async def search_agents(self, *args, **kwargs):
"""Cached agent search"""
cache_key = self._get_cache_key("search_agents", **kwargs)
if cache_key in self._cache:
value, timestamp = self._cache[cache_key]
if self._is_cache_valid(timestamp):
return value
# Fetch from server
result = await super().search_agents(*args, **kwargs)
# Cache result
self._cache[cache_key] = (result, time.time())
return result
๐ Troubleshooting¶
Common Issues¶
Rate Limiting¶
If you encounter rate limiting errors:
import asyncio
from arcp import ARCPClient
async def robust_auth_example():
client = ARCPClient("http://localhost:8001")
max_retries = 3
for attempt in range(max_retries):
try:
await client.login_admin("ARCP", "ARCP")
break
except Exception as e:
if "rate limit" in str(e).lower() and attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Rate limited, waiting {wait_time} seconds...")
await asyncio.sleep(wait_time)
else:
raise e
Connection Errors¶
# Handle connection errors gracefully
async def robust_client_example():
client = ARCPClient("http://localhost:8001")
try:
health = await client.health_check()
print(f"Server health: {health}")
except Exception as e:
print(f"Connection failed: {e}")
print("Check if ARCP server is running on http://localhost:8001")
Authentication Errors¶
# Handle authentication errors
async def auth_error_example():
client = ARCPClient("http://localhost:8001")
try:
await client.login_admin("wrong-username", "wrong-password")
except Exception as e:
print(f"Authentication failed: {e}")
print("Check admin credentials in ARCP configuration")
Timeout Errors¶
# Handle timeout errors
async def timeout_example():
client = ARCPClient(
"http://localhost:8001",
timeout=5.0, # Short timeout for testing
retry_attempts=2
)
try:
agents = await client.discover_agents()
print(f"Found {len(agents)} agents")
except Exception as e:
print(f"Request failed: {e}")
print("Consider increasing timeout or checking network connectivity")
Debug Mode¶
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
# Create client with debug info
client = ARCPClient("http://localhost:8001")
# All requests will now show detailed debug information
๐ Next Steps¶
Now that you understand the client library:
- API Reference - Complete API documentation
- Agent Development Guide - Build agents that use the client
- WebSocket API Reference - Real-time communication
๐ก Tips and Best Practices¶
- Use async context managers - Automatic cleanup and resource management
- Handle errors gracefully - Always catch and handle exceptions
- Implement retry logic - Use the built-in retry mechanisms with exponential backoff
- Monitor performance - Track response times and success rates
- Use connection pooling - For high-throughput applications
- Implement caching - Reduce server load for frequently accessed data
- Test thoroughly - Unit and integration tests are essential
- Log appropriately - Use proper logging levels and formats
- Monitor resource usage - Watch memory and connection usage
- Keep clients updated - Use the latest version for bug fixes and features
- Handle rate limiting - Implement proper delays and backoff strategies
- Validate agent types - Use only supported agent types you wrote in the environment, DEFAULT (security, monitoring, automation, networking, testing)
- Use proper public keys - Ensure public keys are at least 32 characters long
Ready to explore real-time communication? Check out the WebSocket API Reference!