Skip to main content
GET
/
v1
/
sessions
/
{session_id}
/
events
Stream Events (SSE)
curl --request GET \
  --url https://api.example.com/v1/sessions/{session_id}/events \
  --header 'Accept: <accept>' \
  --header 'Authorization: <authorization>'
{
  "id": "<string>",
  "event": "<string>",
  "data": "<any>"
}

Overview

Connect to a real-time event stream to receive updates as they happen. This uses Server-Sent Events (SSE) for push-based updates instead of polling.
SSE provides lower latency than polling and is more efficient for monitoring long-running tasks.

Request

session_id
string
required
The UUID of the session
Authorization
string
required
Bearer token for authentication
Accept
string
required
Must be set to text/event-stream
sanitize
boolean
default:"true"
Filter out system messages, internal prompts, and images
event_types
string
Comma-separated list of event types to subscribe toAvailable types:
  • step - Agent actions and navigation
  • thought - Agent reasoning
  • question - Questions requiring user input
  • done - Task completion
  • error - Errors
  • log - Debug logs
  • paused - Session paused
  • resumed - Session resumed
  • heartbeat - Keep-alive signals
include_history
boolean
default:"true"
Send historical messages immediately upon connection

Response Format

SSE events follow this format:
id: 42
event: thought
data: {"text":"Searching for flights...","message_id":42}

id: 43
event: question
data: {"text":"Prefer morning or evening?","message_id":43}

event: heartbeat
data: {"timestamp":"2025-10-05T18:05:30Z"}

Event Fields

id
string
Message ID for tracking
event
string
Event type (thought, question, done, error, etc.)
data
json
Event payload as JSON string

Example Requests

curl -N https://api.agi.tech/v1/sessions/<session_id>/events \
  -H "Authorization: Bearer $AGI_API_KEY" \
  -H "Accept: text/event-stream"

Event Types

thought

Agent’s reasoning or actions
{
  "event": "thought",
  "data": {
    "text": "Navigating to Kayak to search for flights...",
    "message_id": 42
  }
}

question

Agent needs user input
{
  "event": "question",
  "data": {
    "text": "Would you like direct flights only?",
    "message_id": 43
  }
}

done

Task completed successfully
{
  "event": "done",
  "data": {
    "text": "Task completed",
    "result": {...},
    "message_id": 50
  }
}

error

Error occurred
{
  "event": "error",
  "data": {
    "error": "Connection timeout",
    "status_code": 500
  }
}

heartbeat

Keep-alive signal (sent every ~30 seconds)
{
  "event": "heartbeat",
  "data": {
    "timestamp": "2025-10-05T18:05:30Z"
  }
}

Use Cases

Real-time Monitoring Dashboard

def monitor_dashboard(session_id):
    """Display real-time updates in a dashboard"""
    for event in stream_events(session_id):
        if event['type'] == 'thought':
            update_status_display(event['data']['text'])
        elif event['type'] == 'question':
            show_question_prompt(event['data']['text'])
        elif event['type'] == 'done':
            show_completion(event['data'])
            break

Filtered Event Stream

# Only monitor critical events
url = f"{BASE_URL}/sessions/{session_id}/events"
params = {
    "event_types": "question,done,error",
    "sanitize": True
}

# Stream will only send questions, completion, and errors

Auto-reconnect on Disconnect

def stream_with_reconnect(session_id, max_retries=3):
    retries = 0

    while retries < max_retries:
        try:
            stream_events(session_id)
            break  # Completed successfully
        except Exception as e:
            retries += 1
            print(f"Connection lost, retry {retries}/{max_retries}")
            time.sleep(2)

SSE vs Polling

FeatureSSEPolling
LatencyLow (~instant)Medium (1-3s)
EfficiencyHighLower
ComplexityMediumSimple
Use CaseReal-time monitoringSimple status checks
Use SSE when:
  • You need immediate updates
  • Monitoring long-running tasks
  • Building interactive UIs
Use Polling when:
  • Simple status checks
  • Client doesn’t support SSE
  • Stateless serverless functions

Connection Management

# Recommended pattern with timeout
import signal

def stream_with_timeout(session_id, timeout=300):
    """Stream with maximum duration"""

    def timeout_handler(signum, frame):
        raise TimeoutError("Stream timeout")

    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(timeout)

    try:
        stream_events(session_id)
    except TimeoutError:
        print("Stream exceeded timeout")
    finally:
        signal.alarm(0)  # Cancel alarm

Best Practices

Use SSE for real-time monitoring and long-running tasks. It’s more efficient than polling for immediate updates.
The connection will automatically close when the task completes (done or error event).
SSE connections can drop due to network issues. Implement reconnection logic with exponential backoff.
Set include_history: false if you only want new events, not historical messages from before connection.