Skip to content

Events and streaming

Blackgeorge emits events for run lifecycle, workers, tools, workflows, and adapter calls.

EventBus API

Creating an event bus

from blackgeorge.event_bus import EventBus

bus = EventBus()

Error collection

EventBus collects errors from async handlers for inspection:

bus = EventBus()

# After running handlers...
errors = bus.get_errors()
for error in errors:
    print(f"Handler failed: {error.event_type} - {error.handler_error}")

bus.clear_errors()

To await all pending handlers and optionally raise on first error:

# Wait for all pending async handlers
errors = await bus.await_pending()

# Wait and raise on first error
try:
    errors = await bus.await_pending(raise_on_error=True)
except EventHandlerError as e:
    print(f"Handler failed: {e.event_type} - {e.handler_error}")

Subscribing to events

def handle_event(event) -> None:
    print(f"{event.type}: {event.payload}")

bus.subscribe("run.started", handle_event)
bus.subscribe("run.completed", handle_event)
bus.subscribe("run.failed", handle_event)

Event handlers receive an Event with these fields:

  • event_id: unique event id
  • type: event name, for example run.started
  • timestamp: UTC timestamp
  • run_id: run identifier
  • source: emitter name (for example worker/tool/workforce name)
  • payload: event-specific data

EventBus.subscribe supports exact event types and * for all event types.

Emitting events

from blackgeorge.core.event import Event
from blackgeorge.utils import new_id, utc_now

event = Event(
    event_id=new_id(),
    type="custom.event",
    timestamp=utc_now(),
    run_id="run-123",
    source="my_component",
    payload={"data": "value"},
)
bus.emit(event)

emit runs handlers in-process. If a handler is async (or returns an awaitable), it is scheduled on the current loop when available, or run with a temporary loop from sync contexts.

await bus.aemit(event)

aemit awaits async handlers and awaitable returns from sync handlers.

Unsubscribing

Use unsubscribe to remove a previously registered handler.

bus.unsubscribe("run.started", handle_event)
bus.unsubscribe("*", handle_event)

Event types

EventType Enum

Use typed event types for IDE autocomplete and type safety:

from blackgeorge import EventType

# Access event types as enum values
bus.subscribe(EventType.RUN_STARTED, handler)
bus.subscribe(EventType.WORKER_COMPLETED, handler)
bus.subscribe(EventType.TOOL_FAILED, handler)

# Compare with strings
assert EventType.WORKER_STARTED == "worker.started"
assert EventType.WORKER_STARTED.value == "worker.started"

Available event types:

EventType String Value
RUN_STARTED run.started
RUN_COMPLETED run.completed
RUN_FAILED run.failed
RUN_PAUSED run.paused
RUN_RESUMED run.resumed
STEP_STARTED step.started
STEP_COMPLETED step.completed
STEP_PAUSED step.paused
WORKER_STARTED worker.started
WORKER_COMPLETED worker.completed
WORKER_FAILED worker.failed
WORKER_PAUSED worker.paused
WORKER_CONTEXT_SUMMARIZED worker.context_summarized
WORKFORCE_STARTED workforce.started
WORKFORCE_COMPLETED workforce.completed
TOOL_STARTED tool.started
TOOL_COMPLETED tool.completed
TOOL_FAILED tool.failed
TOOL_CONFIRMATION_REQUESTED tool.confirmation_requested
TOOL_USER_INPUT_REQUESTED tool.user_input_requested
STREAM_TOKEN stream.token
ASSISTANT_MESSAGE assistant.message
LLM_STARTED llm.started
LLM_COMPLETED llm.completed
LLM_FAILED llm.failed

Run events

Event Type Description Payload Fields
run.started Run started job_id
run.paused Run paused none
run.resumed Run resumed none
run.completed Run completed none
run.failed Run failed errors when emitted by Desk; may be empty for flow-level failures

Worker events

Event Type Description Payload Fields
worker.started Worker iteration started/resumed none
worker.paused Worker paused for pending action pending_action_type (confirmation, user_input, or handoff)
worker.completed Worker completed successfully none
worker.failed Worker failed error
worker.context_summarized Context summary applied model, summarized_messages, kept_messages, optional unregistered_model, optional registration_hint

Workforce events

Event Type Description Payload Fields
workforce.started Workforce run started none
workforce.completed Workforce run finished none

Workflow step events

Event Type Description Payload Fields
step.started Step execution started none
step.completed Step execution finished status
step.paused Step paused status

Tool events

Event Type Description Payload Fields
tool.started Tool execution started tool_call_id
tool.completed Tool execution completed tool_call_id, optional result_preview, optional result_truncated, optional timed_out, optional cancelled
tool.failed Tool execution failed tool_call_id, error
tool.confirmation_requested Tool needs confirmation tool_call_id
tool.user_input_requested Tool needs user input tool_call_id

handoff pending actions do not emit a dedicated tool request event. They are represented by worker.paused with pending_action_type="handoff".

Tool/workforce/worker names are exposed via event.source.

LLM adapter events

Event Type Description Payload Fields
llm.started LLM call started model, messages_count, tools_count
llm.completed LLM call completed or stream closed model, latency_ms, optional prompt_tokens, optional completion_tokens, optional total_tokens, optional cost
llm.failed LLM call failed model, latency_ms, error_type, error_message

Streaming/message events

Event Type Description Payload Fields
stream.token Stream token delta token
assistant.message Assistant message appended content, optional tool_calls

Subscribing from a desk

from blackgeorge import Desk

def on_tool_completed(event) -> None:
    preview = event.payload.get("result_preview")
    print(f"{event.source} completed: {preview}")

desk = Desk(model="openai/gpt-5-nano")
desk.event_bus.subscribe("tool.completed", on_tool_completed)

Filtering patterns

Filter by prefixes

def handle_tool_events(event) -> None:
    print(f"{event.type} from {event.source}")

for event_type in ("tool.started", "tool.completed", "tool.failed"):
    bus.subscribe(event_type, handle_tool_events)

Filter by source

def handle_analyst_events(event) -> None:
    if event.source == "analyst":
        print(event.type, event.payload)

bus.subscribe("worker.started", handle_analyst_events)
bus.subscribe("worker.failed", handle_analyst_events)

Filter with wrappers

def create_filter(handler, event_types):
    def filtered(event):
        if event.type in event_types:
            handler(event)
    return filtered

filtered = create_filter(print, {"run.started", "run.completed"})
bus.subscribe("run.started", filtered)
bus.subscribe("run.completed", filtered)

Async handlers

async def async_handler(event):
    await some_async_operation(event)

await bus.aemit(event)

Streaming events

stream.token emits only when all are true:

  • streaming enabled on desk/run
  • no response schema for that turn, or structured_stream_mode="preview"

On tool turns, stream.token payloads carry streamed tool argument deltas.

def on_token(event):
    print(event.payload.get("token", ""), end="", flush=True)

desk.event_bus.subscribe("stream.token", on_token)

Tool result previews

tool.completed may include result_preview and result_truncated for lightweight logging.

def on_tool_completed(event):
    print(event.source, event.payload.get("result_preview"))

Context summary events

def on_context_summarized(event):
    payload = event.payload
    print(payload["summarized_messages"], payload["kept_messages"])
    if payload.get("unregistered_model"):
        print(payload.get("registration_hint"))

Event storage

Events emitted through a desk are persisted in the run store.

events = desk.run_store.get_events(run_id)
for event in events:
    print(event.type, event.payload)

Custom events

from blackgeorge import Desk, Job, Worker
from blackgeorge.core.event import Event
from blackgeorge.utils import new_id, utc_now

desk = Desk(model="openai/gpt-5-nano")
worker = Worker(name="assistant")
report = desk.run(worker, Job(input="hello"))

custom_event = Event(
    event_id=new_id(),
    type="custom.progress",
    timestamp=utc_now(),
    run_id=report.run_id,
    source="my_tool",
    payload={"percent": 50},
)
desk.event_bus.emit(custom_event)

Performance notes

  • Handlers run on the emitting path, so keep them fast.
  • Offload heavy work to queues/threads/processes.
  • Register subscriptions before starting concurrent run execution.
import queue

event_queue = queue.Queue()

def queue_handler(event):
    event_queue.put(event)

bus.subscribe("run.completed", queue_handler)

Event-driven patterns

Progress tracking

class ProgressTracker:
    def __init__(self, total_steps):
        self.total_steps = total_steps
        self.completed_steps = 0

    def on_step_complete(self, _event):
        self.completed_steps += 1
        percent = (self.completed_steps / self.total_steps) * 100
        print(f"{percent:.1f}%")

tracker = ProgressTracker(total_steps=5)
desk.event_bus.subscribe("step.completed", tracker.on_step_complete)

Error aggregation

class ErrorLogger:
    def __init__(self):
        self.errors = []

    def on_error(self, event):
        error = event.payload.get("error")
        if error is None:
            errors = event.payload.get("errors")
            if isinstance(errors, list) and errors:
                error = "; ".join(errors)
        self.errors.append((event.type, event.source, error))

logger = ErrorLogger()
desk.event_bus.subscribe("run.failed", logger.on_error)
desk.event_bus.subscribe("worker.failed", logger.on_error)
desk.event_bus.subscribe("tool.failed", logger.on_error)