Overview
Connect to a real-time event stream to receive updates as they happen. This uses Server-Sent Events (SSE) for push-based updates instead of polling.
SSE provides lower latency than polling and is more efficient for monitoring long-running tasks.
Request
Bearer token for authentication
Must be set to text/event-stream
Filter out system messages, internal prompts, and images
Comma-separated list of event types to subscribe toAvailable types:
step - Agent actions and navigation
thought - Agent reasoning
question - Questions requiring user input
done - Task completion
error - Errors
log - Debug logs
paused - Session paused
resumed - Session resumed
heartbeat - Keep-alive signals
Send historical messages immediately upon connection
SSE events follow this format:
id: 42
event: thought
data: {"text":"Searching for flights...","message_id":42}
id: 43
event: question
data: {"text":"Prefer morning or evening?","message_id":43}
event: heartbeat
data: {"timestamp":"2025-10-05T18:05:30Z"}
Event Fields
Event type (thought, question, done, error, etc.)
Event payload as JSON string
Example Requests
curl -N https://api.agi.tech/v1/sessions/<session_id>/events \
-H "Authorization: Bearer $AGI_API_KEY" \
-H "Accept: text/event-stream"
Event Types
thought
Agent’s reasoning or actions
{
"event": "thought",
"data": {
"text": "Navigating to Kayak to search for flights...",
"message_id": 42
}
}
question
Agent needs user input
{
"event": "question",
"data": {
"text": "Would you like direct flights only?",
"message_id": 43
}
}
done
Task completed successfully
{
"event": "done",
"data": {
"text": "Task completed",
"result": {...},
"message_id": 50
}
}
error
Error occurred
{
"event": "error",
"data": {
"error": "Connection timeout",
"status_code": 500
}
}
heartbeat
Keep-alive signal (sent every ~30 seconds)
{
"event": "heartbeat",
"data": {
"timestamp": "2025-10-05T18:05:30Z"
}
}
Use Cases
Real-time Monitoring Dashboard
def monitor_dashboard(session_id):
"""Display real-time updates in a dashboard"""
for event in stream_events(session_id):
if event['type'] == 'thought':
update_status_display(event['data']['text'])
elif event['type'] == 'question':
show_question_prompt(event['data']['text'])
elif event['type'] == 'done':
show_completion(event['data'])
break
Filtered Event Stream
# Only monitor critical events
url = f"{BASE_URL}/sessions/{session_id}/events"
params = {
"event_types": "question,done,error",
"sanitize": True
}
# Stream will only send questions, completion, and errors
Auto-reconnect on Disconnect
def stream_with_reconnect(session_id, max_retries=3):
retries = 0
while retries < max_retries:
try:
stream_events(session_id)
break # Completed successfully
except Exception as e:
retries += 1
print(f"Connection lost, retry {retries}/{max_retries}")
time.sleep(2)
SSE vs Polling
| Feature | SSE | Polling |
|---|
| Latency | Low (~instant) | Medium (1-3s) |
| Efficiency | High | Lower |
| Complexity | Medium | Simple |
| Use Case | Real-time monitoring | Simple status checks |
Use SSE when:
- You need immediate updates
- Monitoring long-running tasks
- Building interactive UIs
Use Polling when:
- Simple status checks
- Client doesn’t support SSE
- Stateless serverless functions
Connection Management
# Recommended pattern with timeout
import signal
def stream_with_timeout(session_id, timeout=300):
"""Stream with maximum duration"""
def timeout_handler(signum, frame):
raise TimeoutError("Stream timeout")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
try:
stream_events(session_id)
except TimeoutError:
print("Stream exceeded timeout")
finally:
signal.alarm(0) # Cancel alarm
Best Practices
Use SSE for real-time monitoring and long-running tasks. It’s more efficient than polling for immediate updates.
The connection will automatically close when the task completes (done or error event).
SSE connections can drop due to network issues. Implement reconnection logic with exponential backoff.
Set include_history: false if you only want new events, not historical messages from before connection.