Events and streaming¶
Blackgeorge emits events for run lifecycle, workers, tools, workflows, and adapter calls.
EventBus API¶
Creating an event bus¶
Error collection¶
EventBus collects errors from async handlers for inspection:
bus = EventBus()
# After running handlers...
errors = bus.get_errors()
for error in errors:
print(f"Handler failed: {error.event_type} - {error.handler_error}")
bus.clear_errors()
To await all pending handlers and optionally raise on first error:
# Wait for all pending async handlers
errors = await bus.await_pending()
# Wait and raise on first error
try:
errors = await bus.await_pending(raise_on_error=True)
except EventHandlerError as e:
print(f"Handler failed: {e.event_type} - {e.handler_error}")
Subscribing to events¶
def handle_event(event) -> None:
print(f"{event.type}: {event.payload}")
bus.subscribe("run.started", handle_event)
bus.subscribe("run.completed", handle_event)
bus.subscribe("run.failed", handle_event)
Event handlers receive an Event with these fields:
event_id: unique event idtype: event name, for examplerun.startedtimestamp: UTC timestamprun_id: run identifiersource: emitter name (for example worker/tool/workforce name)payload: event-specific data
EventBus.subscribe supports exact event types and * for all event types.
Emitting events¶
from blackgeorge.core.event import Event
from blackgeorge.utils import new_id, utc_now
event = Event(
event_id=new_id(),
type="custom.event",
timestamp=utc_now(),
run_id="run-123",
source="my_component",
payload={"data": "value"},
)
bus.emit(event)
emit runs handlers in-process. If a handler is async (or returns an awaitable), it is scheduled on the current loop when available, or run with a temporary loop from sync contexts.
aemit awaits async handlers and awaitable returns from sync handlers.
Unsubscribing¶
Use unsubscribe to remove a previously registered handler.
Event types¶
EventType Enum¶
Use typed event types for IDE autocomplete and type safety:
from blackgeorge import EventType
# Access event types as enum values
bus.subscribe(EventType.RUN_STARTED, handler)
bus.subscribe(EventType.WORKER_COMPLETED, handler)
bus.subscribe(EventType.TOOL_FAILED, handler)
# Compare with strings
assert EventType.WORKER_STARTED == "worker.started"
assert EventType.WORKER_STARTED.value == "worker.started"
Available event types:
| EventType | String Value |
|---|---|
RUN_STARTED |
run.started |
RUN_COMPLETED |
run.completed |
RUN_FAILED |
run.failed |
RUN_PAUSED |
run.paused |
RUN_RESUMED |
run.resumed |
STEP_STARTED |
step.started |
STEP_COMPLETED |
step.completed |
STEP_PAUSED |
step.paused |
WORKER_STARTED |
worker.started |
WORKER_COMPLETED |
worker.completed |
WORKER_FAILED |
worker.failed |
WORKER_PAUSED |
worker.paused |
WORKER_CONTEXT_SUMMARIZED |
worker.context_summarized |
WORKFORCE_STARTED |
workforce.started |
WORKFORCE_COMPLETED |
workforce.completed |
TOOL_STARTED |
tool.started |
TOOL_COMPLETED |
tool.completed |
TOOL_FAILED |
tool.failed |
TOOL_CONFIRMATION_REQUESTED |
tool.confirmation_requested |
TOOL_USER_INPUT_REQUESTED |
tool.user_input_requested |
STREAM_TOKEN |
stream.token |
ASSISTANT_MESSAGE |
assistant.message |
LLM_STARTED |
llm.started |
LLM_COMPLETED |
llm.completed |
LLM_FAILED |
llm.failed |
Run events¶
| Event Type | Description | Payload Fields |
|---|---|---|
run.started |
Run started | job_id |
run.paused |
Run paused | none |
run.resumed |
Run resumed | none |
run.completed |
Run completed | none |
run.failed |
Run failed | errors when emitted by Desk; may be empty for flow-level failures |
Worker events¶
| Event Type | Description | Payload Fields |
|---|---|---|
worker.started |
Worker iteration started/resumed | none |
worker.paused |
Worker paused for pending action | pending_action_type (confirmation, user_input, or handoff) |
worker.completed |
Worker completed successfully | none |
worker.failed |
Worker failed | error |
worker.context_summarized |
Context summary applied | model, summarized_messages, kept_messages, optional unregistered_model, optional registration_hint |
Workforce events¶
| Event Type | Description | Payload Fields |
|---|---|---|
workforce.started |
Workforce run started | none |
workforce.completed |
Workforce run finished | none |
Workflow step events¶
| Event Type | Description | Payload Fields |
|---|---|---|
step.started |
Step execution started | none |
step.completed |
Step execution finished | status |
step.paused |
Step paused | status |
Tool events¶
| Event Type | Description | Payload Fields |
|---|---|---|
tool.started |
Tool execution started | tool_call_id |
tool.completed |
Tool execution completed | tool_call_id, optional result_preview, optional result_truncated, optional timed_out, optional cancelled |
tool.failed |
Tool execution failed | tool_call_id, error |
tool.confirmation_requested |
Tool needs confirmation | tool_call_id |
tool.user_input_requested |
Tool needs user input | tool_call_id |
handoff pending actions do not emit a dedicated tool request event. They are represented by
worker.paused with pending_action_type="handoff".
Tool/workforce/worker names are exposed via event.source.
LLM adapter events¶
| Event Type | Description | Payload Fields |
|---|---|---|
llm.started |
LLM call started | model, messages_count, tools_count |
llm.completed |
LLM call completed or stream closed | model, latency_ms, optional prompt_tokens, optional completion_tokens, optional total_tokens, optional cost |
llm.failed |
LLM call failed | model, latency_ms, error_type, error_message |
Streaming/message events¶
| Event Type | Description | Payload Fields |
|---|---|---|
stream.token |
Stream token delta | token |
assistant.message |
Assistant message appended | content, optional tool_calls |
Subscribing from a desk¶
from blackgeorge import Desk
def on_tool_completed(event) -> None:
preview = event.payload.get("result_preview")
print(f"{event.source} completed: {preview}")
desk = Desk(model="openai/gpt-5-nano")
desk.event_bus.subscribe("tool.completed", on_tool_completed)
Filtering patterns¶
Filter by prefixes¶
def handle_tool_events(event) -> None:
print(f"{event.type} from {event.source}")
for event_type in ("tool.started", "tool.completed", "tool.failed"):
bus.subscribe(event_type, handle_tool_events)
Filter by source¶
def handle_analyst_events(event) -> None:
if event.source == "analyst":
print(event.type, event.payload)
bus.subscribe("worker.started", handle_analyst_events)
bus.subscribe("worker.failed", handle_analyst_events)
Filter with wrappers¶
def create_filter(handler, event_types):
def filtered(event):
if event.type in event_types:
handler(event)
return filtered
filtered = create_filter(print, {"run.started", "run.completed"})
bus.subscribe("run.started", filtered)
bus.subscribe("run.completed", filtered)
Async handlers¶
Streaming events¶
stream.token emits only when all are true:
- streaming enabled on desk/run
- no response schema for that turn, or
structured_stream_mode="preview"
On tool turns, stream.token payloads carry streamed tool argument deltas.
def on_token(event):
print(event.payload.get("token", ""), end="", flush=True)
desk.event_bus.subscribe("stream.token", on_token)
Tool result previews¶
tool.completed may include result_preview and result_truncated for lightweight logging.
Context summary events¶
def on_context_summarized(event):
payload = event.payload
print(payload["summarized_messages"], payload["kept_messages"])
if payload.get("unregistered_model"):
print(payload.get("registration_hint"))
Event storage¶
Events emitted through a desk are persisted in the run store.
Custom events¶
from blackgeorge import Desk, Job, Worker
from blackgeorge.core.event import Event
from blackgeorge.utils import new_id, utc_now
desk = Desk(model="openai/gpt-5-nano")
worker = Worker(name="assistant")
report = desk.run(worker, Job(input="hello"))
custom_event = Event(
event_id=new_id(),
type="custom.progress",
timestamp=utc_now(),
run_id=report.run_id,
source="my_tool",
payload={"percent": 50},
)
desk.event_bus.emit(custom_event)
Performance notes¶
- Handlers run on the emitting path, so keep them fast.
- Offload heavy work to queues/threads/processes.
- Register subscriptions before starting concurrent run execution.
import queue
event_queue = queue.Queue()
def queue_handler(event):
event_queue.put(event)
bus.subscribe("run.completed", queue_handler)
Event-driven patterns¶
Progress tracking¶
class ProgressTracker:
def __init__(self, total_steps):
self.total_steps = total_steps
self.completed_steps = 0
def on_step_complete(self, _event):
self.completed_steps += 1
percent = (self.completed_steps / self.total_steps) * 100
print(f"{percent:.1f}%")
tracker = ProgressTracker(total_steps=5)
desk.event_bus.subscribe("step.completed", tracker.on_step_complete)
Error aggregation¶
class ErrorLogger:
def __init__(self):
self.errors = []
def on_error(self, event):
error = event.payload.get("error")
if error is None:
errors = event.payload.get("errors")
if isinstance(errors, list) and errors:
error = "; ".join(errors)
self.errors.append((event.type, event.source, error))
logger = ErrorLogger()
desk.event_bus.subscribe("run.failed", logger.on_error)
desk.event_bus.subscribe("worker.failed", logger.on_error)
desk.event_bus.subscribe("tool.failed", logger.on_error)