# Blackgeorge > Code-first agentic framework built around Desk, Worker, and Workforce. Code-first agentic framework built around Desk, Worker, and Workforce primitives for building predictable AI agents with tool calling, structured output, and multi-agent coordination. # Core Documentation # Quickstart This guide gets you from zero to a running worker quickly. ## Install ```text uv add blackgeorge ``` For development setup, see `development.md`. ## Provider setup Blackgeorge uses LiteLLM under the hood. Set the API key for the provider you plan to use before running. ```text export OPENAI_API_KEY="your-key" ``` If you use another provider, set its env vars instead: ```text export ANTHROPIC_API_KEY="your-key" export DEEPSEEK_API_KEY="your-key" export AZURE_API_KEY="your-key" export AZURE_API_BASE="https://your-resource.openai.azure.com" export AZURE_API_VERSION="2023-07-01-preview" ``` Model names follow LiteLLM provider prefixes, for example `openai/gpt-5-nano`. ## Your first run ```python from blackgeorge import Desk, Job, Worker desk = Desk(model="openai/gpt-5-nano") worker = Worker(name="Researcher") job = Job(input="Summarize this topic", expected_output="A short summary") report = desk.run(worker, job) print(report.content) ``` ## Add a tool ```python from blackgeorge import Desk, Job, Worker from blackgeorge.tools import tool @tool(description="Echo text back") def echo(text: str) -> str: return text desk = Desk(model="openai/gpt-5-nano") worker = Worker(name="Agent", tools=[echo]) job = Job(input="Call echo with text=hello") report = desk.run(worker, job) print(report.content) ``` ## Pause and resume When a tool requires confirmation or user input, the run pauses and returns a pending action. ```python from blackgeorge import Desk, Job, Worker from blackgeorge.tools import tool @tool(requires_confirmation=True) def risky(action: str) -> str: return f"ok:{action}" desk = Desk(model="openai/gpt-5-nano") worker = Worker(name="Ops", tools=[risky]) job = Job(input="Run the risky action") report = desk.run(worker, job) if report.status == "paused" and report.pending_action is not None: report = desk.resume(report, True) print(report.status) print(report.content) ``` ## Structured output Set a response schema to get validated output. Blackgeorge first attempts LiteLLM structured output with JSON schema, then falls back to Instructor with Pydantic models. ```python from pydantic import BaseModel from blackgeorge import Desk, Job, Worker class Summary(BaseModel): title: str bullets: list[str] worker = Worker(name="Writer", model="openai/gpt-5-nano") desk = Desk(model="openai/gpt-5-nano") job = Job(input="Summarize the report", response_schema=Summary) report = desk.run(worker, job) print(report.data) ``` ## Next steps - Read Concepts to understand the mental model. - Review Worker and Tools for tool safety and pause/resume flows. - Use Workforce for multi-agent coordination. - Use Workflow for multi-step pipelines. # Tutorials Tested examples demonstrating blackgeorge capabilities. ## 1. Basic Worker ```python from blackgeorge import Desk, Job, Worker desk = Desk(model="openai/gpt-5-mini") worker = Worker(name="Assistant") job = Job(input="Say hello") report = desk.run(worker, job) print(report.content) ``` ## 2. Tools ```python from blackgeorge import Desk, Job, Worker from blackgeorge.tools import tool @tool() def search(query: str) -> str: """Search for information""" return f"Results for: {query}" worker = Worker(name="Agent", tools=[search]) job = Job(input="Search for Python tutorials") report = desk.run(worker, job) ``` ## 3. Structured Output ```python from pydantic import BaseModel from blackgeorge import Desk, Job, Worker class Analysis(BaseModel): sentiment: str confidence: float worker = Worker(name="Analyst") job = Job( input="Analyze: 'I love this product!'", response_schema=Analysis ) report = desk.run(worker, job) print(report.data) # Analysis(sentiment='positive', confidence=0.95) ``` ## 4. Pause/Resume ```python from blackgeorge import Desk, Job, Worker from blackgeorge.tools import tool @tool(requires_confirmation=True) def delete_file(path: str) -> str: return f"Deleted: {path}" worker = Worker(name="Agent", tools=[delete_file]) job = Job(input="Delete config.yaml") report = desk.run(worker, job) if report.status == "paused": print(report.pending_action.prompt) # "Confirm delete_file (config.yaml)" report = desk.resume(report, True) # Confirm ``` ## 5. Events ```python from blackgeorge import Desk, EventType, Job, Worker def on_event(event): if event.type == EventType.TOOL_COMPLETED: print(f"Tool finished: {event.source}") desk = Desk(model="openai/gpt-5-mini") desk.event_bus.subscribe("*", on_event) report = desk.run(worker, job) ``` ## 6. Multi-Agent (Workforce) ```python from blackgeorge import Desk, Job, Worker, Workforce researcher = Worker(name="Researcher", tools=[search]) writer = Worker(name="Writer") workforce = Workforce([researcher, writer], mode="managed") job = Job(input="Research and write about AI") report = desk.run(workforce, job) ``` ## 7. Swarm Mode ```python from blackgeorge import Workforce from blackgeorge.tools import transfer_to_agent_tool handoff = transfer_to_agent_tool(["Writer", "Editor"]) researcher = Worker(name="Researcher", tools=[search, handoff]) writer = Worker(name="Writer", tools=[handoff]) editor = Worker(name="Editor") workforce = Workforce([researcher, writer, editor], mode="swarm") ``` ## Patterns ### Tool Safety ```python @tool(requires_confirmation=True) # Ask before executing def delete(path: str) -> str: ... @tool(requires_user_input=True) # Collect user input def ask(question: str) -> str: ... @tool(timeout=30.0, retries=3) # Resilient execution def fetch(url: str) -> str: ... ``` ### RunConfig ```python from blackgeorge import RunConfig config = RunConfig( model="openai/gpt-5-mini", max_iterations=20, max_context_messages=15, ) report = worker.run(config, job) ``` ### Async ```python report = await desk.arun(worker, job) report = await desk.aresume(report, decision) ``` # Concepts This page describes the mental model behind Blackgeorge. ## Primitives Blackgeorge has three core primitives. - Desk: the runtime that owns configuration, storage, and events. - Worker: a single agent that talks to a model and can call tools. - Workforce: a group of workers coordinated by a simple strategy. There is also a workflow layer for multi-step pipelines. ## The run lifecycle A run starts when you call `desk.run(...)`. The desk creates a run record, emits a `run.started` event, and hands control to a worker or workforce. A run ends in one of three states. - completed: the model finished the task - failed: the model or tool flow failed - paused: the run waits for confirmation or user input ## Pause and resume When a tool requires confirmation or user input, the worker pauses the run and returns a `PendingAction`. You call `desk.resume(report, decision_or_input)` to continue the run using the stored state. ## Structured output If you set `Job.response_schema`, Blackgeorge first attempts structured output via LiteLLM `response_format` JSON schema. If that fails, it falls back to Instructor and Pydantic validation. The validated object is returned as `Report.data`. ## Tools Tools are normal Python callables with type hints. The `@tool` decorator builds a schema and validation model from those hints. Tools can be safe by default and still require explicit confirmation. ## Events All activity is emitted as events through the `EventBus`. You can subscribe to events to stream tokens, show progress, or persist audit logs. ## Storage The run store persists run status, state, and events. By default, Blackgeorge uses a SQLite-backed run store in `.blackgeorge/blackgeorge.db`. # Desk `Desk` is the orchestration layer. It owns the model adapter, event bus, run store, and memory store. You run workers, workforces, or flows through the desk. ## Creating a desk ```python from blackgeorge import Desk desk = Desk( model="openai/gpt-5-nano", temperature=0.2, max_tokens=800, stream=False, structured_stream_mode="off", structured_output_retries=3, max_iterations=10, max_tool_calls=20, num_retries=0, respect_context_window=True, max_context_messages=10, ) ``` ### Key parameters - model: default model name for workers without their own model - temperature: model temperature - max_tokens: max tokens for completion requests - stream: enables streaming when the worker is eligible - structured_stream_mode: "off" (strict structured output) or "preview" (stream preview tokens for schema jobs) - structured_output_retries: retries for structured output validation - max_iterations: max model turns per worker run - max_tool_calls: max tool calls per worker run - num_retries: LiteLLM-level retry count for failed calls (0 disables retries) - respect_context_window: when True, auto-summarize and retry on context length errors (Reactive) - max_context_messages: auto-summarize when message count exceeds this limit (Proactive) - event_bus: custom event bus implementation - run_store: custom run store implementation - memory_store: custom memory store implementation - adapter: custom model adapter - storage_dir: directory for the default SQLite run store ## Memory integration The desk applies simple read/write memory behavior for workers: - Before a worker run, it reads `context` from the store using `worker.memory_scope` and inserts it as a system message after any existing leading system messages. - After a completed run, it writes `last_output` (structured data or content) using the same scope. This is intentionally minimal so you can build your own memory workflows on top. ## Context window handling When `respect_context_window` is enabled, workers summarize conversation history on context length errors and retry the call with the summary plus the most recent messages (Reactive). When `max_context_messages` is configured, workers summarize conversation proactively when the number of messages exceeds the limit to maintain a healthy context window (Proactive). If you disable `respect_context_window`, reactive retries on context-limit errors are disabled. If you also do not provide `max_context_messages`, runs fail directly on context-limit errors. For custom or unmapped models, register model context limits in LiteLLM to avoid repeated overflows. ## Running a worker ```python from blackgeorge import Desk, Job, Worker desk = Desk(model="openai/gpt-5-nano") worker = Worker(name="Researcher") job = Job(input="Summarize this topic") report = desk.run(worker, job) print(report.status) ``` ### Async running Use `arun()` for async execution: ```python report = await desk.arun(worker, job) print(report.status) ``` If you already have a running event loop, call `arun()`/`aresume()`. The sync `run()`/`resume()` raise in that case. Both `run()` and `arun()` accept a `stream` parameter: ```python report = desk.run(worker, job, stream=True) report = await desk.arun(worker, job, stream=True) ``` When `stream=True`, `report.events` contains `stream.token` events with incremental deltas for eligible turns. On tool turns, `stream.token` contains streamed tool argument deltas. Structured schema turns remain strict by default, unless `structured_stream_mode="preview"` is enabled. ## Running a workforce ```python from blackgeorge import Desk, Job, Worker, Workforce w1 = Worker(name="Researcher") w2 = Worker(name="Writer") workforce = Workforce([w1, w2], mode="managed") desk = Desk(model="openai/gpt-5-nano") job = Job(input="Create a market report") report = desk.run(workforce, job) ``` ## Running a flow ```python from blackgeorge import Desk, Job, Worker from blackgeorge.workflow import Step desk = Desk(model="openai/gpt-5-nano") worker = Worker(name="Analyst") flow = desk.flow([Step(worker)]) report = flow.run(Job(input="Analyze feedback")) ``` ## Resume a paused run ```python report = desk.run(worker, job) if report.status == "paused" and report.pending_action is not None: report = desk.resume(report, True) ``` For async applications, use `arun()` and `aresume()`. ## Creating sessions Sessions manage multi-turn conversations with automatic persistence: ```python session = desk.session(worker) report = session.run("Hello") report = session.run("What's my name?") ``` Resume an existing session: ```python session = desk.session(worker, session_id="user-123") if session: report = session.run("Continue") ``` See [session.md](https://jolovicdev.github.io/blackgeorge/session/index.md) for full documentation. ## Cleanup registries If you create many temporary workers or workforces, unregister them when you are done. ```python desk.unregister_worker(worker) desk.unregister_workforce(workforce) ``` ## Events The desk emits run and component events. Subscribe to `desk.event_bus` to observe them. ```python from blackgeorge import Desk def handle_event(event) -> None: print(event.type, event.source) desk = Desk(model="openai/gpt-5-nano") desk.event_bus.subscribe("run.started", handle_event) ``` # Worker A `Worker` is a single agent loop that talks to the model, optionally calls tools, and returns a `Report`. ## Create a worker ```python from blackgeorge import Worker worker = Worker( name="Analyst", model="openai/gpt-5-nano", instructions="You are a concise analyst.", ) ``` ### Parameters - name: worker name used in events and selection - tools: list of tools this worker can use - model: model name override for this worker - instructions: optional system instructions - memory_scope: a string namespace for external memory usage ## Job input and system message The worker builds messages from the job. - Job.input becomes the user message. Non-string values are JSON serialized. - Job.expected_output and Job.constraints are appended to the system message. ## Tools and tool safety If the model returns tool calls, the worker executes tools or pauses for a pending action. - requires_confirmation: the run pauses and waits for an explicit approval decision - requires_user_input: the run pauses and waits for a string input - requires_handoff: the run pauses with `pending_action.type="handoff"` (typically intercepted by `Workforce` in swarm mode) When a tool is paused, the worker returns a `Report` with `status="paused"` and a `PendingAction`. Paused turns emit `worker.paused` events instead of `worker.completed`. When a model response includes multiple tool calls in the same turn, the worker executes them in parallel and records tool results in the original call order. If a tool requires confirmation or user input, the worker executes prior tool calls and then pauses before that tool. Any remaining tool calls after the pending one receive an error result so the conversation history stays valid for strict providers. ## Structured output Set `Job.response_schema` to a Pydantic model or TypeAdapter to enforce a structured response. Blackgeorge first attempts LiteLLM structured output using a JSON schema response format, then falls back to Instructor and Pydantic validation. The validated object is returned in `Report.data`. TypeAdapter outputs, including lists of models, are serialized into JSON arrays in `Report.content`. ```python from pydantic import BaseModel from blackgeorge import Job, Worker class Result(BaseModel): title: str score: float worker = Worker(name="Judge") job = Job(input="Score this", response_schema=Result) ``` Structured output is used when: - a response schema is set - tools are not required for the current step If tools are present, the worker may call tools first and then request structured output once the model stops emitting tool calls. Structured output uses the adapter's `structured_complete`/`astructured_complete` hooks when implemented, and falls back to the default LiteLLM JSON schema and Instructor pipeline otherwise. ## Streaming Streaming only happens when all of the following are true: - `Desk.stream` (or `desk.run(..., stream=True)`) is enabled - no response schema is set - or a response schema is set and `structured_stream_mode="preview"` When streaming is enabled, the worker emits `stream.token` events. On tool turns, these tokens are streamed tool argument deltas. With `structured_stream_mode="preview"`, streamed tokens are preview output. The final `Report.data` is still validated against `response_schema`. If preview JSON is invalid, Blackgeorge falls back to strict structured completion before returning. ## Async usage When you already have an event loop, run a single worker through a flow and await it. ```python from blackgeorge import Desk, Job, Worker from blackgeorge.workflow import Step desk = Desk(model="openai/gpt-5-nano") worker = Worker(name="Analyst") flow = desk.flow([Step(worker)]) report = await flow.arun(Job(input="Analyze feedback")) ``` Use `flow.aresume` to continue paused runs in async applications. ## Pause and resume When a worker pauses, resume the run using the report it returned. ```python report = desk.run(worker, job) if report.status == "paused": report = desk.resume(report, "your input") ``` Confirmation actions accept `True` or strings such as `yes`, `approve`, or `confirm`. They decline `False`, `None`, blank strings, or strings such as `no`, `decline`, `deny`, or `cancel`. Other non-empty strings still approve the tool for compatibility with earlier truthy resume values. Declined confirmations produce a tool result error with message "Tool execution declined". Declined confirmations emit a `tool.failed` event with that error. In async applications, use `desk.arun()` and `desk.aresume()`. User input actions insert the provided value into the tool call arguments under `user_input` unless the tool sets a different `input_key`. ## Limits and failure behavior The worker stops and fails when: - max_iterations is exceeded - max_tool_calls is exceeded - the model fails to satisfy a structured response after retries - a context limit is hit and reactive context handling is disabled ## Context window handling `Desk` supports two context compaction paths: - Reactive: when `respect_context_window=True`, context-limit errors trigger summarization and retry. - Proactive: when `max_context_messages` is set, the worker summarizes before model calls when message count exceeds the limit. Proactive compaction does not consume the reactive retry budget used for context-limit recovery. Compaction keeps recent tool-call/result groups together so providers that require strict tool message ordering still receive valid history. If you are using a custom model, register its context window in LiteLLM for more reliable behavior. Failures are returned as `Report` objects with status `failed` and error messages in `Report.errors`. ## Tool override `Job.tools_override` replaces the worker tool list for a single run. - `Tool` instances are used directly. - String entries are resolved by name from the worker toolbelt. - Unknown or unsupported entries are ignored. - If multiple override entries resolve to the same tool name, the last one is effective. # Workforce A `Workforce` coordinates multiple workers. It supports three modes: managed, collaborate, and swarm. ## Create a workforce ```python from blackgeorge import Worker, Workforce w1 = Worker(name="Researcher") w2 = Worker(name="Writer") workforce = Workforce([w1, w2], mode="managed", name="team") ``` ## Managed mode In managed mode, a manager chooses which worker should handle the job. - If you pass `manager`, that worker is used. - If you do not pass `manager`, the first worker in the list is used. - The manager receives a job with a response schema that contains a single field, `worker`. - Manager selection runs with tools disabled, even if the manager has tools, to force a structured selection response. The selection rules are: - If the manager returns a structured response with a `worker` field, that worker is used. - Otherwise, the system scans the manager output for a worker name. - If nothing matches, the first worker is used. If the manager or selected worker pauses, the workforce returns a paused report with enough state to resume later. ## Swarm mode In swarm mode, workers can dynamically hand off execution and context to another agent in the workforce at any time using the `transfer_to_agent_tool`. This enables complex, dynamic multi-agent orchestrations without a hardcoded manager. ```python from blackgeorge.tools import transfer_to_agent_tool handoff_tool = transfer_to_agent_tool(["coder", "reviewer"]) coder = Worker(name="coder", tools=[handoff_tool]) reviewer = Worker(name="reviewer") swarm = Workforce([coder, reviewer], mode="swarm") ``` When a worker calls `transfer_to_agent`, the orchestrator intercepts the pending tool action and switches the active worker inside the same run. Swarm handoff safety rules: - The target must exist in the current workforce. - If the handoff tool has an `agent_name` allowlist (from `transfer_to_agent_tool([...])`), the target must be in that allowlist. - Allowlist enforcement works for both worker-level tools and `Job.tools_override` tools. - System-role messages from the prior worker are not carried into the next worker handoff context, preventing instruction leakage across roles. - Handoff transitions are bounded per run. Exceeding the cap returns a failed report. ## Collaborate mode In collaborate mode, reports are combined across all workers. - If all workers have no tools, workers run concurrently for lower latency. - If any worker has tools, workers run sequentially to preserve deterministic pause/resume behavior. - If a worker pauses, the workforce returns a paused report and stores state. - When resumed, the workforce continues from the paused worker and then completes the remaining workers. You can pass a reducer to combine worker reports. If you do not, the default reducer concatenates content and collects data per worker. ## Worker collaboration Workforces include built-in collaboration primitives for worker-to-worker communication. ### Channel A channel enables direct messaging between workers. Broadcasts are delivered once per recipient by default; use `broadcast_mode="all"` to treat broadcasts as a log. ```python from blackgeorge.collaboration import Channel channel = Channel() channel.send("worker_a", "worker_b", {"task": "analyze data"}) channel.broadcast("manager", {"status": "starting"}) messages = channel.receive("worker_b") for msg in messages: print(f"From {msg.sender}: {msg.content}") all_messages = channel.receive("worker_b", broadcast_mode="all") ``` ### Blackboard A blackboard provides shared state across workers. ```python from blackgeorge.collaboration import Blackboard bb = Blackboard() bb.write("analysis_result", {"score": 95}, author="analyst") result = bb.read("analysis_result") bb.subscribe("analysis_result", lambda k, v, a: print(f"Updated by {a}")) ``` ### Using with Workforce Pass channel and blackboard when creating a workforce: ```python from blackgeorge import Worker, Workforce from blackgeorge.collaboration import Channel, Blackboard channel = Channel() blackboard = Blackboard() workforce = Workforce( [w1, w2], mode="collaborate", channel=channel, blackboard=blackboard, ) ``` Workers can access `workforce.channel` and `workforce.blackboard` for communication. ## Resume behavior Resuming a workforce uses the stored stage in run state: - swarm: resume the exact paused worker identity, including handoff continuation - manager: resume the manager step - worker: resume the selected worker - collaborate: resume the paused worker in sequence The desk handles these transitions when you call `desk.resume(report, decision_or_input)`. If the stored pending worker index no longer matches the current worker list, resume fails with a `failed` report rather than raising. For swarm runs, resume also fails explicitly if the paused worker identity is missing or no longer registered. # WorkerSession A `WorkerSession` manages multi-turn conversations with automatic message persistence and context compaction integration. Use sessions for chatbots, conversational agents, or any scenario where you need to maintain conversation state across multiple user interactions. ## What a WorkerSession does - Accumulates conversation history across `run()` calls - Persists messages to SQLite for recovery across program restarts - Integrates with context compaction (handles long conversations automatically) - Provides both sync (`run()`) and async (`arun()`) APIs - Supports streaming responses with `stream_run()` and `astream_run()` - Supports thinking models (DeepSeek Reasoner, Claude 3.7 Sonnet, etc.) via Job parameters ## Creating a session Sessions are created through the `Desk`: ```python from blackgeorge import Desk, Worker desk = Desk(model="openai/gpt-5-nano") worker = Worker(name="Assistant", instructions="You are helpful") session = desk.session(worker) ``` The session ID is auto-generated, or you can provide your own. If the ID does not exist yet, a new session is created with that ID: ```python session = desk.session( worker=worker, session_id="user-123", metadata={"user_id": "123", "topic": "support"}, ) ``` ## Sending messages Use `run()` for sync or `arun()` for async: ```python report1 = session.run("Hello, I'm Alice") print(report1.content) report2 = session.run("What's my name?") print(report2.content) ``` The session automatically: 1. Loads previous messages from storage 1. Sends them with the new user message 1. Saves the updated conversation after each response ## Loading an existing session ```python restored = desk.session(worker, session_id="user-123") if restored: report = restored.run("Continue our conversation") ``` Returns `None` if the session doesn't exist or belongs to a different worker. ## Session history Get the full conversation history: ```python messages = session.history() for message in messages: print(f"{message.role}: {message.content}") ``` ## Closing a session ```python session.close() ``` This removes the session and all associated messages from storage. ## Context compaction Sessions integrate with the existing context compaction mechanism. When a conversation grows too large: 1. The worker detects the context limit 1. Old messages are summarized automatically 1. The compacted messages are saved back to the session 1. Next turn loads the already-compacted messages You don't need to handle this manually—it just works. ## Session storage Sessions use the `SessionStore` interface: - `SQLiteSessionStore`: Default, persists to `.blackgeorge/blackgeorge.db` - `InMemorySessionStore`: For testing or ephemeral sessions Storage schema: ```sql CREATE TABLE sessions ( id TEXT PRIMARY KEY, worker_name TEXT NOT NULL, metadata TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE session_messages ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, message_json TEXT NOT NULL, timestamp TEXT NOT NULL, FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE ); ``` ## Use cases **Chatbot**: Maintain conversation state for each user ```python session = desk.session(chatbot_worker, session_id=f"user:{user_id}") while user_input := get_user_input(): response = session.run(user_input) send_response(response.content) ``` **Support agent**: Resume conversations across sessions ```python session = desk.session(support_worker, session_id=f"ticket:{ticket_id}") if session is None: raise ValueError("Ticket session belongs to a different worker") ``` **Multi-step task**: Track progress across multiple interactions ```python session = desk.session( worker=planner_worker, session_id=f"plan:{plan_id}", metadata={"step": 1, "total_steps": 5}, ) ``` ## Sync vs async ```python report = session.run("Hello") # sync report = await session.arun("Hello") # async ``` Both use the same underlying storage and produce identical results. If you already have a running event loop, use `arun()` instead of `run()`. ## Streaming responses The `stream_run()` and `astream_run()` methods run with streaming enabled and yield events in real time as they are emitted for that run: ```python # Sync streaming for event in session.stream_run("Tell me a story"): if event.type == "stream.token": print(event.payload["token"], end="", flush=True) elif event.type == "worker.completed": print(f"\nDone! Metrics: {event.payload}") # Async streaming async for event in session.astream_run("Count to 10"): if event.type == "stream.token": print(event.payload["token"], end="", flush=True) ``` Streaming also works with the `stream` parameter on `run()`/`arun()`: ```python report = session.run("Hello", stream=True) for event in report.events: if event.type == "stream.token": print(event.payload["token"], end="") ``` `report.events` includes all events emitted during the run. `stream.token` events are present only when streaming is enabled and the run is stream-eligible. If the provider emits `reasoning_content` or `thinking_blocks` deltas during streaming, they are collected into `report.reasoning_content` and the assistant message history. ## Thinking models For models that output separate reasoning content (DeepSeek Reasoner, Claude 3.7 Sonnet, o1, etc.), use Job parameters: ```python from blackgeorge import Job # DeepSeek Reasoner (no budget_tokens support) report = session.run( "Which is larger: 9.11 or 9.8?", thinking={"type": "enabled"}, ) # Anthropic Claude 3.7+ (supports budget_tokens) report = session.run( "Which is larger: 9.11 or 9.8?", thinking={"type": "enabled", "budget_tokens": 1024}, ) print(f"Reasoning: {report.reasoning_content}") print(f"Answer: {report.content}") ``` ### drop_params Some thinking models reject standard parameters like `temperature` or `top_p`. Use `drop_params=True` to automatically omit unsupported parameters: ```python report = session.run( "Explain quantum computing", thinking={"type": "enabled"}, drop_params=True, # Auto-drops unsupported params ) ``` ### extra_body Pass provider-specific parameters via `extra_body`: ```python report = session.run( "Analyze this data", extra_body={ "guardrails": ["pii-detection"], "custom_setting": 42, }, ) ``` ### Multi-turn behavior Sessions automatically handle `reasoning_content` correctly across turns: - **Within a turn**: Reasoning content and thinking blocks are preserved during tool-call loops - **Between turns**: Reasoning content and thinking blocks are cleared for assistant messages without tool calls. Assistant tool-call messages keep reasoning content and thinking blocks when required by provider rules. ```python # First turn generates reasoning_content report1 = session.run("What is 9.11 vs 9.8?", thinking={"type": "enabled"}) print(report1.reasoning_content) # Has reasoning # Second turn - reasoning_content is cleared from history report2 = session.run("How many Rs in strawberry?") print(report2.reasoning_content) # New reasoning generated # History clears reasoning_content/thinking_blocks for assistant messages without tool calls for msg in session.history(): if msg.role == "assistant" and not msg.tool_calls: assert msg.reasoning_content is None assert msg.thinking_blocks is None ``` ## Worker binding Sessions are bound to a specific worker by name. Loading a session with a different worker returns `None`: ```python session = desk.session(worker1) loaded = desk.session(worker2, session_id=session.session_id) assert loaded is None # worker1 != worker2 ``` ## Integration with Job Sessions use the `initial_messages` field on `Job` to inject conversation history: ```python from blackgeorge import Job messages = session.history() job = Job(input="new message", initial_messages=messages) ``` This is handled automatically by `session.run()`, but you can use it directly if needed. # Workflow The workflow layer lets you compose multiple steps into a flow. Use it when you need a multi-step pipeline or parallel branches. ## Flow Create a flow from steps using the desk. ```python from blackgeorge import Desk, Job, Worker from blackgeorge.workflow import Step desk = Desk(model="openai/gpt-5-nano") analyst = Worker(name="Analyst") writer = Worker(name="Writer") flow = desk.flow([Step(analyst), Step(writer)]) report = flow.run(Job(input="Analyze feedback")) ``` A flow produces a report. If there are multiple steps, the content is combined with step headers. ## Steps - Step: runs a worker or workforce once - Parallel: runs steps concurrently and returns all results - Condition: chooses a branch based on a predicate - Router: selects a route by string key - Loop: repeats steps until a stop predicate or max iterations - AsyncCondition: chooses a branch based on an async predicate - AsyncLoop: repeats steps until an async stop predicate or max iterations ### Step `Step` wraps a runner and optionally provides a `job_builder` to create a job per step. ```python from blackgeorge import Job, Worker from blackgeorge.workflow import Step worker = Worker(name="Analyst") def build_job(context): return Job(input={"task": "Analyze", "seed": context.job.input}) step = Step(worker, job_builder=build_job) ``` ### Parallel ```python from blackgeorge.workflow import Parallel, Step parallel = Parallel(Step(worker_a), Step(worker_b)) ``` ### Condition ```python from blackgeorge.workflow import Condition, Step condition = Condition( predicate=lambda ctx: bool(ctx.outputs), if_true=[Step(worker_a)], if_false=[Step(worker_b)], ) ``` ### Router ```python from blackgeorge.workflow import Router, Step router = Router( selector=lambda ctx: "fast", routes={"fast": [Step(worker_a)], "deep": [Step(worker_b)]}, ) ``` ### Loop ```python from blackgeorge.workflow import Loop, Step loop = Loop( steps=[Step(worker_a)], stop=lambda ctx: len(ctx.outputs) >= 3, max_iterations=5, name="analysis_loop", # Optional: for tracking iterations ) ``` Loops track iterations in the `WorkflowContext`: ```python def stop_when_done(ctx): iteration = ctx.loop_iteration("analysis_loop") return iteration >= 3 or len(ctx.outputs) > 0 ``` ### AsyncCondition Use async predicates when the condition requires async operations: ```python from blackgeorge.workflow import AsyncCondition, Step async def check_external_service(ctx): result = await fetch_status() return result.ready condition = AsyncCondition( predicate=check_external_service, if_true=[Step(worker_a)], if_false=[Step(worker_b)], ) ``` ### AsyncLoop Use async predicates for loop termination: ```python from blackgeorge.workflow import AsyncLoop, Step async def should_stop(ctx): result = await check_completion() return result.done loop = AsyncLoop( steps=[Step(worker_a)], stop=should_stop, max_iterations=10, name="polling_loop", ) ``` ## Pause and resume If a step pauses, the flow stores the current step state and outputs. Resume with the same report and a decision or input. Composite nodes (`Condition`, `Router`, `Loop`) short-circuit on paused or failed results to prevent subsequent steps from running. ```python report = flow.run(job) if report.status == "paused": report = flow.resume(report, True) ``` ## Async usage Use the async APIs when you already have an event loop. ```python report = await flow.arun(job) if report.status == "paused": report = await flow.aresume(report, True) ``` Async flows use non-blocking model calls, and `Parallel` runs steps concurrently. The sync wrappers `flow.run` and `flow.resume` raise if called from a running event loop. # Tools Tools are regular Python functions with type hints. Blackgeorge turns them into validated, schema-backed actions the model can call. ## Tool metadata A tool includes: - name - description - schema - callable - input_model - requires_confirmation - requires_user_input - external_execution - pre and post hooks - confirmation_prompt - user_input_prompt - input_key - timeout - retries - retry_delay - output_type (optional) `external_execution` is available for your own conventions. The core worker does not change behavior based on this flag. ## Define a tool ```python from blackgeorge.tools import tool @tool(description="Add two numbers") def add(a: int, b: int) -> int: return a + b ``` The decorator builds: - a Pydantic input model for validation - a JSON schema sent to the model ## Tool safety Tools can require confirmation or user input. ```python from blackgeorge.tools import tool @tool(requires_confirmation=True) def delete_record(record_id: str) -> str: return f"deleted:{record_id}" ``` When the model requests this tool, the worker pauses and returns a pending action. Resume the run with `True` or an approval string such as `yes`, `approve`, or `confirm`. Use `False`, `None`, or a decline string such as `no`, `decline`, `deny`, or `cancel` to skip execution. Other non-empty strings continue to approve the tool for compatibility with earlier truthy resume values. ## User input tools ```python from blackgeorge.tools import tool @tool(requires_user_input=True) def ask(question: str, user_input: str) -> str: return user_input ``` When resuming, the provided input is inserted into the tool arguments under `user_input` by default. If you want a different argument name, set `input_key` on the tool. ```python from blackgeorge.tools import tool @tool(requires_user_input=True, input_key="answer") def ask(question: str, answer: str) -> str: return answer ``` `input_key` is optional. If you omit it, `user_input` is used. ## Timeouts and retries Tools can specify timeout and retry behavior for resilient execution. Retries use exponential backoff based on `retry_delay`. ```python from blackgeorge.tools import tool @tool(timeout=5.0, retries=3, retry_delay=1.0) async def fetch_data(url: str) -> str: ... ``` The `ToolResult` includes `timed_out` and `cancelled` flags to detect failure modes. ## Cancellation Async tool execution supports cancellation via an event: ```python import asyncio from blackgeorge.tools.execution import aexecute_tool cancel_event = asyncio.Event() result = await aexecute_tool(tool, call, cancel_event=cancel_event) cancel_event.set() ``` ## ToolResult Tools can return a `ToolResult` to control content, data, and error fields directly. ```python from blackgeorge.tools import ToolResult, tool @tool() def fetch_status(code: int) -> ToolResult: if code == 200: return ToolResult(content="ok", data={"code": code}) return ToolResult(error="not ok") ``` `ToolResult` fields: - `content`: String content returned to the model - `data`: Structured data (preserved when using `output_type`) - `error`: Error message if execution failed - `timed_out`: Whether the tool timed out - `cancelled`: Whether the tool was cancelled - `original_exception`: The original exception if an error occurred ## Output Type Validation Tools can declare an `output_type` for automatic validation of return values: ```python from pydantic import BaseModel from blackgeorge.tools import tool class AnalysisResult(BaseModel): findings: list[str] confidence: float @tool(output_type=AnalysisResult) def analyze(text: str) -> dict: return { "findings": ["insight 1", "insight 2"], "confidence": 0.85, } ``` When `output_type` is set: - Return values are validated against the Pydantic model - Invalid outputs produce a `ToolResult` with `error` set - Valid outputs are available as structured `data` This enables type-safe tool outputs and automatic schema validation. ## Hooks Each tool can define pre and post hooks. Pre hooks receive the `ToolCall`. Post hooks receive the `ToolCall` and the `ToolResult`. Hook exceptions are captured as tool execution errors. ## Toolbelt `Toolbelt` manages tool registration and lookup. `Toolkit` is an alias for `Toolbelt`. ```python from blackgeorge.tools import Toolbelt belt = Toolbelt() ``` ## Swarm Handoff Tool For `Workforce` running in `swarm` mode, workers can dynamically transfer control and pass context using the `transfer_to_agent_tool`. ```python from blackgeorge.tools import transfer_to_agent_tool handoff_tool = transfer_to_agent_tool(available_agents=["researcher", "coder"]) worker = Worker(name="router", tools=[handoff_tool]) ``` When this tool is executed, it signals a `handoff` pending action that the orchestrator uses to transparently switch the active worker mid-run. `available_agents` is encoded into the tool schema and enforced by swarm routing. A handoff target must be both: - in the handoff tool allowlist - present in the workforce worker list The same enforcement applies when the handoff tool is provided through `Job.tools_override`. ## Subworker tools Use `create_subworker_tool` to let a worker delegate to bounded child workers with explicit guardrails and run-level persistence. ```python from blackgeorge.tools import create_subworker_tool spawn_subworker = create_subworker_tool( desk=desk, max_subworkers=5, max_tools_per_subworker=8, ) ``` This tool returns `ToolResult` with a structured payload that includes child `run_id` and `status`. Child runs are executed through `Desk`, so they use the same adapter, run store, event bus, and memory integration as top-level runs. For full API and behavior details, see `subworkers.md`. ## MCP Tool Integration Connect to MCP (Model Context Protocol) servers and use their tools. MCP tools are automatically converted to the blackgeorge `Tool` format and can be passed to workers. ### stdio transport For local MCP servers that run as subprocesses. ```python from blackgeorge.tools import MCPToolProvider async with MCPToolProvider() as provider: await provider.connect_stdio("uv", ["run", "my-mcp-server"]) tools = provider.list_tools() result = await provider.acall_tool("fetch", {"url": "https://example.com"}) ``` ### Streamable HTTP transport For remote MCP servers over HTTP. ```python from blackgeorge.tools import MCPToolProvider async with MCPToolProvider() as provider: await provider.connect_streamable_http("https://api.example.com/mcp") tools = provider.list_tools() result = await provider.acall_tool("search", {"query": "python"}) ``` For servers requiring authentication, pass a custom `httpx.AsyncClient`: ```python import httpx from blackgeorge.tools import MCPToolProvider async with MCPToolProvider() as provider: client = httpx.AsyncClient(headers={"Authorization": "Bearer token"}) await provider.connect_streamable_http("https://api.example.com/mcp", http_client=client) tools = provider.list_tools() ``` ### SSE transport For MCP servers that use Server-Sent Events. ```python from blackgeorge.tools import MCPToolProvider async with MCPToolProvider() as provider: await provider.connect_sse("https://api.example.com/mcp/sse") tools = provider.list_tools() ``` ## Execution path The worker executes tools using `execute_tool`: - run pre hooks - validate input with the tool input model - call the function - convert output to `ToolResult` - run post hooks If hooks, validation, or execution fail, the error is captured in the tool result and the run continues. When multiple tool calls are returned in the same model response, the worker executes them in parallel and then appends tool results in call order. # Subworkers Subworkers are delegated workers spawned by another worker through a tool call. ## Naming Use `create_subworker_tool` as the primary API. `create_swarm_tool` is a compatibility alias that returns the same behavior but names the tool `spawn_agent` instead of `spawn_subworker`. ## API ```python from blackgeorge.tools import create_subworker_tool spawn_subworker = create_subworker_tool( desk=desk, available_tools=[...], default_model="deepseek/deepseek-chat", allowed_models={"deepseek/deepseek-chat"}, max_subworkers=5, max_tools_per_subworker=8, max_iterations=10, max_tool_calls=10, structured_output_retries=1, ) ``` Arguments: - `desk`: Runtime context for child runs. Child runs share adapter, event bus, run store, and memory. - `available_tools`: Tool allowlist for spawned subworkers. - `default_model`: Fallback model when a spawn call does not pass `model`. - `allowed_models`: Optional model policy. If set, spawn is rejected for models outside this set. - `max_subworkers`: Maximum successful spawn attempts for this tool instance. - `max_tools_per_subworker`: Maximum number of tools that can be assigned in one spawn call. - `max_iterations`: Worker iteration budget per subworker run. - `max_tool_calls`: Tool-call budget per subworker run. - `structured_output_retries`: Structured response retry budget per subworker run. ## Tool Contract Input schema for `spawn_subworker`: - `name: str` - `instructions: str` - `task: str` - `tools: list[str]` (optional) - `model: str | None` (optional) Output uses `ToolResult`: - On success: - `content`: subworker textual output - `data`: includes `run_id`, `status`, `worker` - On failure: - `error`: policy/runtime error message - `data`: includes `run_id` and `status` when a child run exists ## Runtime Behavior Each spawn creates a normal child run through `Desk.arun`. This means: - Child runs are persisted in run storage. - Child events are emitted to the same event bus. - Memory policies from `Desk` are preserved. Status handling: - `completed`: returns success result. - `failed`: returned as `ToolResult(error=...)`. - `paused`: returned as `ToolResult(error=...)` with `pending_action_type` in `data`. Paused children are treated as tool failures in the parent run so the parent does not silently continue with incomplete delegation. ## Guardrails Built-in guardrails: - Spawn budget (`max_subworkers`) - Model policy (`allowed_models`) - Per-spawn tool budget (`max_tools_per_subworker`) - Child run budgets (`max_iterations`, `max_tool_calls`) - Tool allowlist (`available_tools`) ## Minimal Example ```python from blackgeorge.core.job import Job from blackgeorge.desk import Desk from blackgeorge.tools import create_subworker_tool from blackgeorge.worker import Worker desk = Desk(model="deepseek/deepseek-chat") spawn_subworker = create_subworker_tool(desk=desk, max_subworkers=3) lead = Worker( name="Lead", model="deepseek/deepseek-chat", tools=[spawn_subworker], instructions="Delegate work with spawn_subworker and synthesize results.", ) report = desk.run(lead, Job(input="Research top agent frameworks.")) ``` ## Compatibility Alias `create_swarm_tool(...)` is equivalent to: - same implementation and guardrails as `create_subworker_tool(...)` - only tool name changes from `spawn_subworker` to `spawn_agent` Use `create_subworker_tool` for new code. # Tool Hooks Tool hooks allow you to execute custom logic before and after tool execution. Use hooks for logging, validation, policy checks, and monitoring. ## Hook types - **Pre-hooks**: Execute before the tool callable runs. Receive the `ToolCall`. - **Post-hooks**: Execute after the tool callable completes. Receive both the `ToolCall` and `ToolResult`. ## Pre-hooks Pre-hooks receive the tool call before execution: ```python from blackgeorge.tools import ToolPreHook, tool from blackgeorge.core.tool_call import ToolCall def log_call(call: ToolCall) -> None: print(f"Calling {call.name} with {call.arguments}") @tool(pre=(log_call,)) def calculate(x: int, y: int) -> int: return x + y ``` ### Pre-hook signature ```python from typing import Any from blackgeorge.core.tool_call import ToolCall ToolPreHook = Callable[[ToolCall], Any] ``` A pre-hook can: - Inspect the tool call - Log metadata - Validate arguments ### Pre-hook examples #### Argument validation ```python from blackgeorge.tools import tool def validate_positive(call): for key, value in call.arguments.items(): if isinstance(value, (int, float)) and value < 0: raise ValueError(f"{key} must be positive, got {value}") @tool(pre=(validate_positive,)) def square_root(x: float) -> float: return x ** 0.5 ``` #### Timing ```python import time from blackgeorge.tools import tool timing_data = {} def start_timer(call): timing_data[call.id] = time.time() @tool(pre=(start_timer,)) def expensive_operation(n: int) -> int: return sum(range(n)) ``` #### Access control ```python from blackgeorge.tools import tool ALLOWED_USERS = {"admin", "supervisor"} def check_permission(call): user = call.arguments.get("requested_by", "anonymous") if user not in ALLOWED_USERS: raise PermissionError(f"User {user} not allowed to call {call.name}") @tool(pre=(check_permission,), requires_confirmation=True) def delete_file(file_path: str, requested_by: str) -> str: import os os.remove(file_path) return f"Deleted {file_path}" ``` ## Post-hooks Post-hooks receive the tool call and result after execution: ```python from blackgeorge.tools import ToolPostHook, tool from blackgeorge.tools.base import ToolResult def log_result(call: ToolCall, result: ToolResult) -> None: status = "success" if not result.error else "failed" print(f"{call.name} {status}: {result.content}") @tool(post=(log_result,)) def calculate(x: int, y: int) -> int: return x + y ``` ### Post-hook signature ```python from typing import Any from blackgeorge.core.tool_call import ToolCall from blackgeorge.tools.base import ToolResult ToolPostHook = Callable[[ToolCall, ToolResult], Any] ``` A post-hook can: - Log results - Add metrics - Trigger side effects - Inspect error conditions ### Post-hook examples #### Result logging ```python from blackgeorge.tools import tool def log_completion(call, result): if result.timed_out: print(f"WARNING: {call.name} timed out") elif result.cancelled: print(f"WARNING: {call.name} was cancelled") elif result.error: print(f"ERROR in {call.name}: {result.error}") else: print(f"SUCCESS: {call.name} returned {result.content}") @tool( timeout=5.0, post=(log_completion,), ) def fetch_data(url: str) -> str: import requests return requests.get(url).text ``` #### Result enrichment (side effects) ```python from datetime import datetime from blackgeorge.tools import tool result_audit = {} def record_timestamp(call, result): if result.content: result_audit[call.id] = { "recorded_at": datetime.now().isoformat(), "content_preview": result.content[:80], } @tool(post=(record_timestamp,)) def get_status() -> str: return "OK" ``` #### Metrics collection ```python from collections import defaultdict from blackgeorge.tools import tool tool_metrics = defaultdict(lambda: {"calls": 0, "errors": 0, "total_time": 0}) def track_metrics(call, result): metrics = tool_metrics[call.name] metrics["calls"] += 1 if result.error: metrics["errors"] += 1 @tool(post=(track_metrics,)) def process_data(data: str) -> str: return data.upper() ``` ## Combining pre and post hooks You can use both pre and post hooks on the same tool: ```python from blackgeorge.tools import tool def before(call): print(f"Starting {call.name}") def after(call, result): print(f"Finished {call.name}: {result.content}") @tool(pre=(before,), post=(after,)) def multiply(a: int, b: int) -> int: return a * b ``` ## Async hooks Async hooks are supported in async tool execution paths: ```python import asyncio from blackgeorge.tools import tool async def async_pre_hook(call): await asyncio.sleep(0.1) print(f"Async pre-hook for {call.name}") async def async_post_hook(call, result): await asyncio.sleep(0.1) print(f"Async post-hook for {call.name}") @tool(pre=(async_pre_hook,), post=(async_post_hook,)) async def async_tool(value: str) -> str: await asyncio.sleep(0.1) return value.upper() ``` ## Multiple hooks You can chain multiple hooks: ```python from blackgeorge.tools import tool def log_start(call): print(f"Start: {call.name}") def validate_args(call): if "x" not in call.arguments: raise ValueError("Missing 'x' argument") def log_end(call, result): print(f"End: {call.name} -> {result.content}") def check_error(call, result): if result.error: print(f"Error detected: {result.error}") @tool( pre=(log_start, validate_args), post=(log_end, check_error), ) def compute(x: int, y: int) -> int: return x + y ``` Hooks are executed in the order they are defined. ## Hook context and state Hooks can share state through closures or class attributes: ```python class ToolMonitor: def __init__(self): self.calls = [] self.errors = [] def pre_hook(self, call): self.calls.append(call) def post_hook(self, call, result): if result.error: self.errors.append((call, result.error)) monitor = ToolMonitor() @tool(pre=(monitor.pre_hook,), post=(monitor.post_hook,)) def risky_operation(value: int) -> int: if value < 0: raise ValueError("Negative values not allowed") return value * 2 ``` ## Advanced patterns #### Conditional hooks ```python from blackgeorge.tools import tool def conditional_pre(call): actor = call.arguments.get("requested_by", "unknown") if call.name.startswith("admin_"): print(f"Admin tool {call.name} called by {actor}") @tool(pre=(conditional_pre,)) def admin_delete_user(user_id: str) -> str: return f"Deleted {user_id}" ``` #### Retry-aware hooks ```python from blackgeorge.tools import tool attempt_count = {} def count_attempts(call): attempt_count[call.id] = attempt_count.get(call.id, 0) + 1 print(f"Attempt {attempt_count[call.id]} for {call.name}") @tool(retries=3, pre=(count_attempts,)) def flaky_operation(value: str) -> str: import random if random.random() < 0.5: raise Exception("Random failure") return value ``` #### Cache telemetry with hooks ```python from blackgeorge.tools import tool cache_stats = {"hits": 0, "misses": 0} cache = set() def cache_pre(call): key = f"{call.name}:{frozenset(call.arguments.items())}" if key in cache: cache_stats["hits"] += 1 else: cache_stats["misses"] += 1 def cache_post(call, result): if result.error is None: key = f"{call.name}:{frozenset(call.arguments.items())}" cache.add(key) @tool(pre=(cache_pre,), post=(cache_post,)) def expensive_computation(n: int) -> int: return sum(range(n)) ``` ## Tool result inspection Post-hooks can inspect detailed result information: ```python from blackgeorge.tools import tool, ToolResult def inspect_result(call, result): print(f"Tool: {call.name}") print(f"Content: {result.content}") print(f"Data: {result.data}") print(f"Error: {result.error}") print(f"Timed out: {result.timed_out}") print(f"Cancelled: {result.cancelled}") @tool(timeout=10.0, post=(inspect_result,)) async def long_running_task(seconds: int) -> str: import asyncio await asyncio.sleep(seconds) return f"Slept for {seconds} seconds" ``` ## Error handling in hooks Exceptions in pre-hooks prevent tool execution: ```python from blackgeorge.tools import tool def strict_validation(call): if call.arguments.get("value", 0) < 0: raise ValueError("Value must be non-negative") @tool(pre=(strict_validation,)) def process(value: int) -> int: return value * 2 # This will fail with ValueError if value < 0 ``` Exceptions in post-hooks propagate and fail tool execution: ```python def failing_post_hook(call, result): raise Exception("Post-hook failure") @tool(post=(failing_post_hook,)) def my_tool(value: str) -> str: return value ``` ## Best practices 1. **Keep hooks fast**: Hooks run synchronously, so avoid blocking operations 1. **Use hooks for cross-cutting concerns**: Logging, metrics, validation 1. **Don't rely on hook return values**: Pre-hook returns are ignored 1. **Handle exceptions in post-hooks**: Unhandled post-hook errors fail tool execution 1. **Use closures for state**: Maintain hook-specific state with function closures or classes 1. **Chain hooks carefully**: Order matters when using multiple hooks 1. **Consider async for I/O**: Use async hooks for network or database operations # Collaboration Blackgeorge provides built-in collaboration primitives for multi-agent coordination. Workers can communicate through direct messaging and share state through a common data store. ## Blackboard A `Blackboard` provides shared state across workers. Multiple workers can read from and write to the same blackboard, enabling coordination through shared memory. ### Creating a blackboard ```python from blackgeorge.collaboration import Blackboard blackboard = Blackboard() ``` ### Writing to the blackboard ```python blackboard.write("analysis_result", {"score": 95}, author="analyst") ``` The `write` method takes: - `key`: A string identifier for the data - `value`: Any JSON-serializable value - `author`: The name of the worker writing the data ### Reading from the blackboard ```python result = blackboard.read("analysis_result") print(result) # {"score": 95} # Check if a key exists if blackboard.exists("analysis_result"): value = blackboard.read("analysis_result") ``` ### Reading entries with metadata ```python entry = blackboard.read_entry("analysis_result") print(entry.key) # "analysis_result" print(entry.value) # {"score": 95} print(entry.author) # "analyst" print(entry.created_at) # datetime of first write print(entry.updated_at) # datetime of last update ``` ### Listing all entries ```python entries = blackboard.all_entries() for key, entry in entries.items(): print(f"{key} by {entry.author}: {entry.value}") ``` ### Subscribing to changes ```python def on_write(key: str, value: Any, author: str) -> None: print(f"{author} wrote {key}: {value}") # Subscribe to a specific key blackboard.subscribe("analysis_result", on_write) # Subscribe to all changes blackboard.subscribe_all(on_write) ``` ### Other operations ```python # Delete a key blackboard.delete("analysis_result") # List all keys keys = blackboard.keys() # Clear all entries blackboard.clear() ``` ## Channel A `Channel` enables direct messaging between workers. Messages can be sent to specific recipients or broadcast to all workers. ### Creating a channel ```python from blackgeorge.collaboration import Channel channel = Channel() ``` ### Sending direct messages ```python message = channel.send( sender="worker_a", recipient="worker_b", content={"task": "analyze data"}, metadata={"priority": "high"} ) print(message.id) # Unique message ID ``` ### Broadcasting messages ```python message = channel.broadcast( sender="manager", content={"status": "starting"}, metadata={"phase": "1"} ) ``` ### Receiving messages ```python # Receive all messages for this worker (clears after reading) messages = channel.receive("worker_b") for msg in messages: print(f"From {msg.sender}: {msg.content}") # Peek at messages without clearing messages = channel.peek("worker_b") # Receive with broadcast mode options messages = channel.receive( "worker_b", clear=False, # Don't clear after reading broadcast_mode="one_shot" # Each broadcast seen once (default) ) # Receive all broadcasts (including previously seen) messages = channel.receive( "worker_b", broadcast_mode="all" ) ``` ### Broadcast modes - `one_shot`: Each broadcast is delivered only once per recipient. The channel tracks which broadcasts each recipient has seen. - `all`: All broadcasts are returned every time. Useful for treating broadcasts as a log. ### Message structure Each message has: ```python @dataclass class ChannelMessage: id: str # Unique message ID sender: str # Sender's name recipient: str | None # None for broadcasts content: Any # Message content timestamp: datetime # When sent metadata: dict[str, Any] # Additional data ``` ### Channel utilities ```python # Get all messages across all recipients all_messages = channel.all_messages() # Clear messages for a specific recipient channel.clear("worker_b") # Clear all messages channel.clear() ``` ## Using with Workforce Pass channel and blackboard when creating a workforce: ```python from blackgeorge import Worker, Workforce from blackgeorge.collaboration import Channel, Blackboard channel = Channel() blackboard = Blackboard() worker_a = Worker(name="worker_a") worker_b = Worker(name="worker_b") workforce = Workforce( [worker_a, worker_b], mode="collaborate", channel=channel, blackboard=blackboard, ) ``` Workers can access the collaboration primitives: ```python # In worker tool implementations workforce.channel.send("worker_a", "worker_b", {"data": "value"}) workforce.blackboard.write("result", {"status": "done"}, author="worker_a") ``` ## Collaboration Tools Blackgeorge provides tool wrappers around collaboration primitives so workers can use them through tool calls. ### Channel tools ```python from blackgeorge.collaboration import channel_send_tool, channel_receive_tool, channel_broadcast_tool send_tool = channel_send_tool(channel, sender="worker_a") receive_tool = channel_receive_tool(channel, recipient="worker_b") broadcast_tool = channel_broadcast_tool(channel, sender="manager") worker = Worker(name="worker_a", tools=[send_tool, receive_tool, broadcast_tool]) ``` ### Blackboard tools ```python from blackgeorge.collaboration import blackboard_read_tool, blackboard_write_tool read_tool = blackboard_read_tool(blackboard) write_tool = blackboard_write_tool(blackboard, author="worker_a") worker = Worker(name="worker_a", tools=[read_tool, write_tool]) ``` ### Tool parameters **channel_send:** - `recipient` (str): The recipient worker name - `content` (JsonValue): Message content - `metadata` (dict, optional): Additional message metadata - Returns: Message ID **channel_broadcast:** - `content` (JsonValue): Broadcast content - `metadata` (dict, optional): Additional metadata - Returns: Message ID **channel_receive:** - `broadcast_mode` ("all" or "one_shot"): How to handle broadcasts - `clear` (bool): Whether to clear messages after reading - Returns: List of message dictionaries **blackboard_write:** - `key` (str): The key to write - `value` (JsonValue): The value to store - Returns: The key that was written **blackboard_read:** - `key` (str): The key to read - Returns: The stored value or None ## Example: Multi-agent analysis ```python from blackgeorge import Desk, Job, Worker, Workforce from blackgeorge.collaboration import Channel, Blackboard from blackgeorge.collaboration.tools import ( channel_send_tool, channel_receive_tool, blackboard_read_tool, blackboard_write_tool, ) # Create collaboration primitives channel = Channel() blackboard = Blackboard() # Create workers with collaboration tools analyst = Worker( name="analyst", tools=[ blackboard_write_tool(blackboard, "analyst"), channel_send_tool(channel, "analyst"), ], ) reviewer = Worker( name="reviewer", tools=[ blackboard_read_tool(blackboard), blackboard_write_tool(blackboard, "reviewer"), channel_send_tool(channel, "reviewer"), channel_receive_tool(channel, "reviewer"), ], ) # Create workforce workforce = Workforce( [analyst, reviewer], mode="collaborate", channel=channel, blackboard=blackboard, ) # Run desk = Desk(model="openai/gpt-5-nano") job = Job(input="Analyze the data and have it reviewed") report = desk.run(workforce, job) ``` ## Thread safety Both `Blackboard` and `Channel` are thread-safe. They use internal locks to ensure safe concurrent access from multiple workers. ## Async methods For use in async contexts (like `Workforce` with `mode="collaborate"`), both `Blackboard` and `Channel` provide async method variants: ### Async Blackboard ```python # Async variants of all methods await blackboard.awrite("key", value, "author") value = await blackboard.aread("key") entry = await blackboard.aread_entry("key") exists = await blackboard.aexists("key") deleted = await blackboard.adelete("key") keys = await blackboard.akeys() entries = await blackboard.aall_entries() await blackboard.asubscribe("key", callback) await blackgeorge.asubscribe_all(callback) await blackboard.aunsubscribe("key", callback) await blackboard.aclear() ``` ### Async Channel ```python # Async variants of all methods message = await channel.asend("sender", "recipient", content, metadata) message = await channel.abroadcast("sender", content, metadata) messages = await channel.areceive("recipient", clear=True, broadcast_mode="one_shot") messages = await channel.apeek("recipient") await channel.aclear("recipient") messages = await channel.aall_messages() ``` ### Why async methods? When using `Workforce` with `mode="collaborate"`, workers run in parallel using `asyncio.gather`. The async variants use `asyncio.to_thread()` to delegate to sync methods, ensuring thread-safe access through a single `threading.Lock`. This prevents blocking the event loop while maintaining correct synchronization. # Events and streaming Blackgeorge emits events for run lifecycle, workers, tools, workflows, and adapter calls. ## EventBus API ### Creating an event bus ```python from blackgeorge.event_bus import EventBus bus = EventBus() ``` ### Error collection EventBus collects errors from async handlers for inspection: ```python bus = EventBus() # After running handlers... errors = bus.get_errors() for error in errors: print(f"Handler failed: {error.event_type} - {error.handler_error}") bus.clear_errors() ``` To await all pending handlers and optionally raise on first error: ```python # Wait for all pending async handlers errors = await bus.await_pending() # Wait and raise on first error try: errors = await bus.await_pending(raise_on_error=True) except EventHandlerError as e: print(f"Handler failed: {e.event_type} - {e.handler_error}") ``` ### Subscribing to events ```python def handle_event(event) -> None: print(f"{event.type}: {event.payload}") bus.subscribe("run.started", handle_event) bus.subscribe("run.completed", handle_event) bus.subscribe("run.failed", handle_event) ``` Event handlers receive an `Event` with these fields: - `event_id`: unique event id - `type`: event name, for example `run.started` - `timestamp`: UTC timestamp - `run_id`: run identifier - `source`: emitter name (for example worker/tool/workforce name) - `payload`: event-specific data `EventBus.subscribe` supports exact event types and `*` for all event types. ### Emitting events ```python from blackgeorge.core.event import Event from blackgeorge.utils import new_id, utc_now event = Event( event_id=new_id(), type="custom.event", timestamp=utc_now(), run_id="run-123", source="my_component", payload={"data": "value"}, ) bus.emit(event) ``` `emit` runs handlers in-process. If a handler is async (or returns an awaitable), it is scheduled on the current loop when available, or run with a temporary loop from sync contexts. ```python await bus.aemit(event) ``` `aemit` awaits async handlers and awaitable returns from sync handlers. ### Unsubscribing Use `unsubscribe` to remove a previously registered handler. ```python bus.unsubscribe("run.started", handle_event) bus.unsubscribe("*", handle_event) ``` ## Event types ### EventType Enum Use typed event types for IDE autocomplete and type safety: ```python from blackgeorge import EventType # Access event types as enum values bus.subscribe(EventType.RUN_STARTED, handler) bus.subscribe(EventType.WORKER_COMPLETED, handler) bus.subscribe(EventType.TOOL_FAILED, handler) # Compare with strings assert EventType.WORKER_STARTED == "worker.started" assert EventType.WORKER_STARTED.value == "worker.started" ``` Available event types: | EventType | String Value | | ----------------------------- | ----------------------------- | | `RUN_STARTED` | `run.started` | | `RUN_COMPLETED` | `run.completed` | | `RUN_FAILED` | `run.failed` | | `RUN_PAUSED` | `run.paused` | | `RUN_RESUMED` | `run.resumed` | | `STEP_STARTED` | `step.started` | | `STEP_COMPLETED` | `step.completed` | | `STEP_PAUSED` | `step.paused` | | `WORKER_STARTED` | `worker.started` | | `WORKER_COMPLETED` | `worker.completed` | | `WORKER_FAILED` | `worker.failed` | | `WORKER_PAUSED` | `worker.paused` | | `WORKER_CONTEXT_SUMMARIZED` | `worker.context_summarized` | | `WORKFORCE_STARTED` | `workforce.started` | | `WORKFORCE_COMPLETED` | `workforce.completed` | | `TOOL_STARTED` | `tool.started` | | `TOOL_COMPLETED` | `tool.completed` | | `TOOL_FAILED` | `tool.failed` | | `TOOL_CONFIRMATION_REQUESTED` | `tool.confirmation_requested` | | `TOOL_USER_INPUT_REQUESTED` | `tool.user_input_requested` | | `STREAM_TOKEN` | `stream.token` | | `ASSISTANT_MESSAGE` | `assistant.message` | | `LLM_STARTED` | `llm.started` | | `LLM_COMPLETED` | `llm.completed` | | `LLM_FAILED` | `llm.failed` | ### Run events | Event Type | Description | Payload Fields | | --------------- | ------------- | --------------------------------------------------------------------- | | `run.started` | Run started | `job_id` | | `run.paused` | Run paused | none | | `run.resumed` | Run resumed | none | | `run.completed` | Run completed | none | | `run.failed` | Run failed | `errors` when emitted by `Desk`; may be empty for flow-level failures | ### Worker events | Event Type | Description | Payload Fields | | --------------------------- | -------------------------------- | ------------------------------------------------------------------------------------------------------------ | | `worker.started` | Worker iteration started/resumed | none | | `worker.paused` | Worker paused for pending action | `pending_action_type` (`confirmation`, `user_input`, or `handoff`) | | `worker.completed` | Worker completed successfully | none | | `worker.failed` | Worker failed | `error` | | `worker.context_summarized` | Context summary applied | `model`, `summarized_messages`, `kept_messages`, optional `unregistered_model`, optional `registration_hint` | ### Workforce events | Event Type | Description | Payload Fields | | --------------------- | ---------------------- | -------------- | | `workforce.started` | Workforce run started | none | | `workforce.completed` | Workforce run finished | none | ### Workflow step events | Event Type | Description | Payload Fields | | ---------------- | ----------------------- | -------------- | | `step.started` | Step execution started | none | | `step.completed` | Step execution finished | `status` | | `step.paused` | Step paused | `status` | ### Tool events | Event Type | Description | Payload Fields | | ----------------------------- | ------------------------ | ------------------------------------------------------------------------------------------------------------------ | | `tool.started` | Tool execution started | `tool_call_id` | | `tool.completed` | Tool execution completed | `tool_call_id`, optional `result_preview`, optional `result_truncated`, optional `timed_out`, optional `cancelled` | | `tool.failed` | Tool execution failed | `tool_call_id`, `error` | | `tool.confirmation_requested` | Tool needs confirmation | `tool_call_id` | | `tool.user_input_requested` | Tool needs user input | `tool_call_id` | `handoff` pending actions do not emit a dedicated tool request event. They are represented by `worker.paused` with `pending_action_type="handoff"`. Tool/workforce/worker names are exposed via `event.source`. ### LLM adapter events | Event Type | Description | Payload Fields | | --------------- | ----------------------------------- | ----------------------------------------------------------------------------------------------------------------------- | | `llm.started` | LLM call started | `model`, `messages_count`, `tools_count` | | `llm.completed` | LLM call completed or stream closed | `model`, `latency_ms`, optional `prompt_tokens`, optional `completion_tokens`, optional `total_tokens`, optional `cost` | | `llm.failed` | LLM call failed | `model`, `latency_ms`, `error_type`, `error_message` | ### Streaming/message events | Event Type | Description | Payload Fields | | ------------------- | -------------------------- | -------------------------------------------------- | | `stream.token` | Stream token delta | `token`, `type` (`"content"` or `"tool_argument"`) | | `assistant.message` | Assistant message appended | `content`, optional `tool_calls` | ## Subscribing from a desk ```python from blackgeorge import Desk def on_tool_completed(event) -> None: preview = event.payload.get("result_preview") print(f"{event.source} completed: {preview}") desk = Desk(model="openai/gpt-5-nano") desk.event_bus.subscribe("tool.completed", on_tool_completed) ``` ## Filtering patterns ### Filter by prefixes ```python def handle_tool_events(event) -> None: print(f"{event.type} from {event.source}") for event_type in ("tool.started", "tool.completed", "tool.failed"): bus.subscribe(event_type, handle_tool_events) ``` ### Filter by source ```python def handle_analyst_events(event) -> None: if event.source == "analyst": print(event.type, event.payload) bus.subscribe("worker.started", handle_analyst_events) bus.subscribe("worker.failed", handle_analyst_events) ``` ### Filter with wrappers ```python def create_filter(handler, event_types): def filtered(event): if event.type in event_types: handler(event) return filtered filtered = create_filter(print, {"run.started", "run.completed"}) bus.subscribe("run.started", filtered) bus.subscribe("run.completed", filtered) ``` ## Async handlers ```python async def async_handler(event): await some_async_operation(event) await bus.aemit(event) ``` ## Streaming events `stream.token` emits only when all are true: - streaming enabled on desk/run - no response schema for that turn, or `structured_stream_mode="preview"` On tool turns, `stream.token` payloads carry streamed tool argument deltas. The `type` field distinguishes content tokens from tool argument deltas: ```python def on_token(event): token_type = event.payload.get("type") if token_type == "tool_argument": return # skip rendering raw JSON deltas print(event.payload.get("token", ""), end="", flush=True) desk.event_bus.subscribe("stream.token", on_token) ``` ## Tool result previews `tool.completed` may include `result_preview` and `result_truncated` for lightweight logging. ```python def on_tool_completed(event): print(event.source, event.payload.get("result_preview")) ``` ## Context summary events ```python def on_context_summarized(event): payload = event.payload print(payload["summarized_messages"], payload["kept_messages"]) if payload.get("unregistered_model"): print(payload.get("registration_hint")) ``` ## Event storage Events emitted through a desk are persisted in the run store. ```python events = desk.run_store.get_events(run_id) for event in events: print(event.type, event.payload) ``` ## Custom events ```python from blackgeorge import Desk, Job, Worker from blackgeorge.core.event import Event from blackgeorge.utils import new_id, utc_now desk = Desk(model="openai/gpt-5-nano") worker = Worker(name="assistant") report = desk.run(worker, Job(input="hello")) custom_event = Event( event_id=new_id(), type="custom.progress", timestamp=utc_now(), run_id=report.run_id, source="my_tool", payload={"percent": 50}, ) desk.event_bus.emit(custom_event) ``` ## Performance notes - Handlers run on the emitting path, so keep them fast. - Offload heavy work to queues/threads/processes. - Register subscriptions before starting concurrent run execution. ```python import queue event_queue = queue.Queue() def queue_handler(event): event_queue.put(event) bus.subscribe("run.completed", queue_handler) ``` ## Event-driven patterns ### Progress tracking ```python class ProgressTracker: def __init__(self, total_steps): self.total_steps = total_steps self.completed_steps = 0 def on_step_complete(self, _event): self.completed_steps += 1 percent = (self.completed_steps / self.total_steps) * 100 print(f"{percent:.1f}%") tracker = ProgressTracker(total_steps=5) desk.event_bus.subscribe("step.completed", tracker.on_step_complete) ``` ### Error aggregation ```python class ErrorLogger: def __init__(self): self.errors = [] def on_error(self, event): error = event.payload.get("error") if error is None: errors = event.payload.get("errors") if isinstance(errors, list) and errors: error = "; ".join(errors) self.errors.append((event.type, event.source, error)) logger = ErrorLogger() desk.event_bus.subscribe("run.failed", logger.on_error) desk.event_bus.subscribe("worker.failed", logger.on_error) desk.event_bus.subscribe("tool.failed", logger.on_error) ``` # Event Payloads Blackgeorge provides typed dataclasses for event payloads, replacing untyped `dict[str, Any]` with structured data. ## Available Payload Types ### Run Events ```python from blackgeorge import RunStartedPayload, RunFailedPayload # run.started payload = RunStartedPayload(job_id="job-123") # run.failed payload = RunFailedPayload(errors=["Error 1", "Error 2"]) ``` ### Worker Events ```python from blackgeorge import ( WorkerPausedPayload, WorkerFailedPayload, WorkerContextSummarizedPayload, ) # worker.paused payload = WorkerPausedPayload(pending_action_type="confirmation") # worker.failed payload = WorkerFailedPayload(error="Something went wrong") # worker.context_summarized payload = WorkerContextSummarizedPayload( model="gpt-4", summarized_messages=10, kept_messages=4, unregistered_model=False, registration_hint=None, ) ``` ### Tool Events ```python from blackgeorge import ToolStartedPayload, ToolCompletedPayload, ToolFailedPayload # tool.started payload = ToolStartedPayload(tool_call_id="call-123") # tool.completed payload = ToolCompletedPayload( tool_call_id="call-123", result_preview="Success...", result_truncated=False, timed_out=False, cancelled=False, ) # tool.failed payload = ToolFailedPayload( tool_call_id="call-123", error="Execution failed", ) ``` ### Streaming Events ```python from blackgeorge import StreamTokenPayload, AssistantMessagePayload # stream.token payload = StreamTokenPayload(token="Hello", type="content") # tool argument delta payload = StreamTokenPayload(token='{"path":', type="tool_argument") # assistant.message payload = AssistantMessagePayload( content="Response text", tool_calls=[{"id": "call-1", "name": "tool", "arguments": "{}"}], ) ``` ### LLM Events ```python from blackgeorge import LLMCompletedPayload, LLMFailedPayload # llm.completed payload = LLMCompletedPayload( model="gpt-4", latency_ms=150, prompt_tokens=100, completion_tokens=50, total_tokens=150, cost=0.001, ) # llm.failed payload = LLMFailedPayload( model="gpt-4", latency_ms=100, error_type="RateLimitError", error_message="Rate limit exceeded", ) ``` ### Step Events ```python from blackgeorge import StepCompletedPayload, StepPausedPayload # step.completed payload = StepCompletedPayload(status="completed") # step.paused payload = StepPausedPayload(status="paused") ``` ### Workforce Events ```python from blackgeorge import WorkforcePausedPayload # workforce paused state payload = WorkforcePausedPayload( root_job={"input": "task"}, completed_reports=[], pending_worker_index=0, ) ``` ## Usage These payload types are for documentation and type hints. Event handlers receive `Event` objects with `payload: dict[str, Any]`: ```python from blackgeorge import Desk, EventType def handle_tool_completed(event) -> None: # Access payload fields (matches ToolCompletedPayload structure) tool_call_id = event.payload.get("tool_call_id") result_preview = event.payload.get("result_preview") print(f"Tool {tool_call_id}: {result_preview}") desk = Desk(model="openai/gpt-5-nano") desk.event_bus.subscribe(EventType.TOOL_COMPLETED, handle_tool_completed) ``` ## All Payload Types | Payload Class | Event Type | Key Fields | | -------------------------------- | --------------------------- | ------------------------------------------------------------------------------ | | `RunStartedPayload` | `run.started` | `job_id` | | `RunFailedPayload` | `run.failed` | `errors` | | `WorkerPausedPayload` | `worker.paused` | `pending_action_type` | | `WorkerFailedPayload` | `worker.failed` | `error` | | `WorkerContextSummarizedPayload` | `worker.context_summarized` | `model`, `summarized_messages`, `kept_messages` | | `ToolStartedPayload` | `tool.started` | `tool_call_id` | | `ToolCompletedPayload` | `tool.completed` | `tool_call_id`, `result_preview`, `result_truncated`, `timed_out`, `cancelled` | | `ToolFailedPayload` | `tool.failed` | `tool_call_id`, `error` | | `StreamTokenPayload` | `stream.token` | `token`, `type` (`"content"` or `"tool_argument"`) | | `AssistantMessagePayload` | `assistant.message` | `content`, `tool_calls` | | `LLMCompletedPayload` | `llm.completed` | `model`, `latency_ms`, `total_tokens`, `cost` | | `LLMFailedPayload` | `llm.failed` | `model`, `latency_ms`, `error_type`, `error_message` | | `StepCompletedPayload` | `step.completed` | `status` | | `StepPausedPayload` | `step.paused` | `status` | | `WorkforcePausedPayload` | workforce state | `root_job`, `completed_reports`, `pending_worker_index` | # Exceptions Blackgeorge provides a custom exception hierarchy for structured error handling. ## Exception Hierarchy ```text BlackgeorgeError (base) ├── ContextLimitError ├── ToolExecutionError ├── ToolValidationError ├── ToolTimeoutError ├── EventHandlerError ├── RunnerNotRegisteredError └── StreamingUnsupportedError ``` ## Base Exception ::: blackgeorge.exceptions.BlackgeorgeError ## Context Errors ::: blackgeorge.exceptions.ContextLimitError Raised when the LLM context window is exceeded. Provides context about whether the model was registered and whether summarization was attempted. ## Tool Errors ::: blackgeorge.exceptions.ToolExecutionError Raised when a tool fails during execution. Includes the original exception for debugging. ::: blackgeorge.exceptions.ToolValidationError Raised when tool arguments fail validation. ::: blackgeorge.exceptions.ToolTimeoutError Raised when a tool execution exceeds its timeout. ## Event Errors ::: blackgeorge.exceptions.EventHandlerError Raised when an event handler fails. Wraps the original exception. ## Usage Example ```python from blackgeorge import BlackgeorgeError, ContextLimitError, ToolExecutionError try: report = await desk.arun(worker, job) except ContextLimitError as e: print(f"Context limit: {e}") print(f"Model registered: {e.model_registered}") print(f"Summary attempted: {e.summary_attempted}") except ToolExecutionError as e: print(f"Tool {e.tool_name} failed: {e}") if e.original_exception: raise e.original_exception except BlackgeorgeError as e: print(f"Framework error: {e}") ``` ## ToolResult Exception Access Tool execution errors are captured in `ToolResult` with the exception type preserved: ```python from blackgeorge import Worker, Desk, Job from blackgeorge.tools import tool @tool() def risky_operation(data: str) -> str: raise ValueError("Something went wrong") worker = Worker(name="worker", tools=[risky_operation]) desk = Desk(model="openai/gpt-5-nano") report = desk.run(worker, Job(input="Process this")) # Check tool results in report for tool_call in report.tool_calls: if tool_call.result and tool_call.result.error: print(f"Error: {tool_call.result.error}") if tool_call.result.exception_type: print(f"Exception type: {tool_call.result.exception_type}") ``` ## Checking Context Limit Errors The `is_context_limit_error` helper checks both string patterns and typed exceptions: ```python from blackgeorge.worker_context import is_context_limit_error try: response = await adapter.acomplete(...) except Exception as e: if is_context_limit_error(e): # Handle context limit - retry with smaller context ... raise ``` # Core types This page documents the primary data types exposed by Blackgeorge. ## Job A `Job` is the input to a worker or workforce. Fields: - id: unique job id - input: payload passed to the worker as the user message - expected_output: appended to the system message - tools_override: optional per-run tool override list. Supports `Tool` instances and tool names. In worker runs, string names are resolved from the worker toolbelt; unknown entries are ignored. If duplicate entries resolve to the same tool name, later entries win. - response_schema: Pydantic model or TypeAdapter for structured output - constraints: extra constraints appended to the system message - metadata: arbitrary metadata for your application - initial_messages: optional list of messages to pre-populate conversation history - thinking: enables thinking mode for reasoning models (format varies by provider) - DeepSeek: `{"type": "enabled"}` - Anthropic: `{"type": "enabled", "budget_tokens": 1024}` - example. - structured_stream_mode: optional `"off"` or `"preview"` override for schema-job streaming behavior - drop_params: drops unsupported parameters instead of erroring - extra_body: provider-specific parameters passed to the model API `Job` is immutable. ## Report A `Report` is the result of a run. Fields: - run_id - status: completed, paused, failed, or running - pending_action: present when the run is paused - content: assistant output - reasoning_content: separate reasoning output from thinking models (DeepSeek Reasoner, Claude 3.7, o1, etc.) - data: structured output if a response schema is set - messages: full conversation history - tool_calls: tool calls made during the run - metrics: usage and other metrics - events: list of events for the run - errors: list of error messages ## Message A `Message` represents a single chat message. Fields: - role: system, user, assistant, tool - content - reasoning_content: separate reasoning content from thinking models - thinking_blocks: optional structured reasoning blocks from providers like Anthropic - tool_calls: tool calls emitted by the assistant - tool_call_id: set for tool results - metadata ## ToolCall A `ToolCall` represents a single tool invocation request. Fields: - id - name - arguments - result - error `result` is serialized for BaseModel and dataclass values. ## PendingAction A `PendingAction` is returned when a run pauses. Fields: - action_id - type: confirmation, user_input, or handoff - tool_call - prompt - options - metadata ## Event An `Event` describes a runtime signal. Fields: - event_id - type - timestamp - run_id - source - payload ## RunState and RunRecord - `RunState` contains the full resume state for a paused run. - `RunRecord` is the stored run metadata in the run store. ## Enums and aliases - MessageRole: system, user, assistant, tool - PendingActionType: confirmation, user_input, handoff - RunStatus: completed, paused, failed, running - WorkforceMode: managed, collaborate, swarm - Brief: alias for Job - RunOutput: alias for Report ## Utilities - new_id(): creates a random hex identifier - utc_now(): returns the current UTC time # Multimodal Messages Blackgeorge supports multimodal messages allowing you to send images, videos, audio, and documents to vision-capable models. ## Overview Multimodal messages use the litellm/OpenAI format where `content` can be either a string or a list of content objects. ## Sending Images ### Using URLs ```python from blackgeorge import Desk, Job, Worker desk = Desk(model="openrouter/google/gemini-3-flash-preview") worker = Worker(name="VisionAnalyst") job = Job(input=[ {"type": "text", "text": "What's in this image?"}, {"type": "image_url", "image_url": {"url": "https://example.com/photo.jpg"}} ]) report = desk.run(worker, job) print(report.content) ``` ### Using Local Files ```python from blackgeorge import Desk, Job, Worker, encode_file image_data = encode_file("./my_photo.jpg") job = Job(input=[ {"type": "text", "text": "Describe this image"}, {"type": "image_url", "image_url": {"url": image_data}} ]) report = desk.run(worker, job) ``` ### Multiple Images ```python job = Job(input=[ {"type": "text", "text": "Compare these two images"}, {"type": "image_url", "image_url": {"url": "https://example.com/before.jpg"}}, {"type": "image_url", "image_url": {"url": "https://example.com/after.jpg"}} ]) ``` ## Sending Videos ```python job = Job(input=[ {"type": "text", "text": "Summarize this video"}, {"type": "video_url", "video_url": {"url": "https://youtube.com/watch?v=..."}} ]) # For local video files video_data = encode_file("./demo.mp4") job = Job(input=[ {"type": "text", "text": "What happens in this video?"}, {"type": "video_url", "video_url": {"url": video_data}} ]) ``` ## Sending Audio ```python audio_data = encode_file("./recording.mp3") job = Job(input=[ {"type": "text", "text": "Transcribe this audio"}, {"type": "file", "file": {"file_data": audio_data}} ]) ``` ## Document Understanding (PDF, DOCX, etc.) ```python pdf_data = encode_file("./contract.pdf") job = Job(input=[ {"type": "text", "text": "Extract key terms from this contract"}, {"type": "file", "file": { "file_data": pdf_data, "filename": "contract.pdf" }} ]) ``` Supported document types: - PDF (`.pdf`) - Word (`.doc`, `.docx`) - Excel (`.xls`, `.xlsx`) - CSV (`.csv`) - HTML (`.html`) - Markdown (`.md`) - Text (`.txt`) ## Generating Images Blackgeorge includes tools for generating images with OpenRouter, Gemini, and OpenAI-compatible providers: ```python from blackgeorge import Desk, Job, Worker, generate_image desk = Desk(model="openrouter/google/gemini-3-flash-preview") worker = Worker(name="Assistant", tools=[generate_image]) job = Job(input="Please generate an image of a sunset over mountains") report = desk.run(worker, job) ``` The `generate_image` tool accepts: - `prompt` (required): Text description of the image - `model` (optional): Model to use (default: `openrouter/google/gemini-3-pro-image-preview`) - `size` (optional): Image size (default: `1024x1024`) - `quality` (optional): Image quality (default: `standard`) Returns a dict with: - `url`: URL of the generated image - `b64_json`: Base64-encoded image data when returned by the provider - `revised_prompt`: The model's revised prompt when supported For models that generate images through chat completions, Blackgeorge automatically falls back to `completion(..., modalities=["image", "text"])` if the image endpoint returns no image data. ## The `encode_file()` Utility `encode_file(file_path, mime_type=None)` converts local files to base64 data URLs: ```python from blackgeorge import encode_file # Auto-detect MIME type from extension image_data = encode_file("photo.jpg") # data:image/jpeg;base64,... pdf_data = encode_file("doc.pdf") # data:application/pdf;base64,... # Explicit MIME type data = encode_file("file.bin", mime_type="application/octet-stream") ``` ## Supported Models Multimodal input support varies by model: | Feature | Models | | ---------------- | ------------------------------------------------------------------------------------ | | Images | `openrouter/google/gemini-3-flash-preview`, `openrouter/google/gemini-3-pro-preview` | | Videos | `openrouter/google/gemini-3-flash-preview`, `openrouter/google/gemini-3-pro-preview` | | Audio | `openrouter/google/gemini-3-flash-preview`, `openrouter/google/gemini-3-pro-preview` | | Documents | `openrouter/google/gemini-3-flash-preview`, `openrouter/google/gemini-3-pro-preview` | | Image Generation | `openrouter/google/gemini-3-pro-image-preview` | ## Examples ### Vision Analysis with Gemini 3 Flash Preview ```python from blackgeorge import Desk, Job, Worker, encode_file desk = Desk(model="openrouter/google/gemini-3-flash-preview") worker = Worker(name="VisionBot", instructions="You analyze images in detail") photo = encode_file("./product.jpg") job = Job(input=[ {"type": "text", "text": "Describe this product in detail for a catalog"}, {"type": "image_url", "image_url": {"url": photo}} ]) report = desk.run(worker, job) print(report.content) ``` ### Document Q&A with Gemini 3 Pro Preview ```python from blackgeorge import Desk, Job, Worker, encode_file desk = Desk(model="openrouter/google/gemini-3-pro-preview") worker = Worker(name="DocumentAnalyst") pdf = encode_file("./research_paper.pdf") job = Job(input=[ {"type": "text", "text": "What are the main findings of this research?"}, {"type": "file", "file": {"file_data": pdf, "filename": "research_paper.pdf"}} ]) report = desk.run(worker, job) ``` ## Best Practices 1. **File Size**: Large files (>10MB) may cause timeouts or errors 1. **Model Limits**: Check model documentation for maximum image/video duration limits 1. **Cost**: Multimodal requests typically cost more than text-only 1. **URLs vs Base64**: URLs are more efficient for remote files; use base64 for local files 1. **Error Handling**: Always validate that your chosen model supports the media type # Advanced Features # Storage Run storage tracks the state of each run and stores events. Blackgeorge exposes this via the `RunStore` interface. ## RunStore interface A run store supports: - create_run(run_id, input_payload) - update_run(run_id, status, output, output_json, state) - get_run(run_id) - add_event(event) - get_events(run_id) ## RunRecord A `RunRecord` includes: - run_id - status - input - output - output_json - created_at - updated_at - state ## RunState Run state stores enough data to resume a paused run. It includes: - run_id, status, runner_type, runner_name - job, messages, tool_calls - pending_action - metrics - iteration - payload The payload field is used by workflows and workforces to store extra resume data. ## SQLiteRunStore The default run store uses SQLite and writes to `.blackgeorge/blackgeorge.db` unless you override `storage_dir` or `run_store`. - Inputs, outputs, and state are serialized as JSON. - Events are stored in a separate table and returned in timestamp order. ## InMemoryRunStore The in-memory store is useful for tests or ephemeral runs. It stores records and events in dictionaries and does not persist data. ## Custom stores To build a custom run store, implement the `RunStore` interface and pass it to `Desk(run_store=...)`. # Memory Blackgeorge defines a simple memory interface that can be used by your own tools or custom components. ## Desk integration `Desk` always has a memory store. If you do not pass one, it uses `InMemoryMemoryStore`. For worker runs, the desk uses two conventional keys: - `context`: read before a worker run and inserted as a system message - `last_output`: written after a completed run Both use the worker's `memory_scope` as the namespace. You can ignore these conventions or build on them for more advanced memory behavior. ## MemoryStore interface A memory store supports: - write(key, value, scope) - read(key, scope) - search(query, scope) - reset(scope) `scope` is a string namespace such as `worker:Analyst` or `desk`. ## InMemoryMemoryStore The in-memory store keeps data in a dictionary. It is fast and does not persist data. ## SQLiteMemoryStore The SQLite store persists memory to a file. Values are serialized as JSON strings and can be searched by key or value substring. ## VectorMemoryStore The vector store uses ChromaDB for semantic search with embeddings. It persists locally and supports similarity-based retrieval. By default, Blackgeorge uses a deterministic local embedding function so memory operations work in offline or restricted environments. You can pass your own Chroma embedding function for provider-quality embeddings. ```python from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction from blackgeorge.memory import VectorMemoryStore store = VectorMemoryStore( "/path/to/db", chunk_size=8000, chunk_overlap=200, embedding_function=OpenAIEmbeddingFunction(model_name="text-embedding-3-small"), ) store.write("doc1", "AI is transforming healthcare", "global") store.write("doc2", "Machine learning predicts outcomes", "global") results = store.search("artificial intelligence medicine", "global", top_k=5) for key, value in results: print(f"{key}: {value}") doc = store.read("doc1", "global") ``` Features: - Configurable chunking for long documents - Default chunking: `chunk_size=8000`, `chunk_overlap=200` - Cosine similarity for semantic matching - Scope-based isolation between workers/runs - JSON serialization for complex values - Deterministic default embeddings for reproducible local behavior ## ExternalMemoryStore `ExternalMemoryStore` is a stub you can implement if you want to integrate with another storage system. # Adapters Adapters define how Blackgeorge talks to a model provider. ## BaseModelAdapter `BaseModelAdapter` defines the interface for model calls. - complete(...): synchronous completion - acomplete(...): async completion - structured_complete(...): structured output completion - astructured_complete(...): async structured output completion Both methods accept OpenAI-style message payloads and optional tool schemas. Blackgeorge uses the async adapter methods internally for both sync and async runs, so custom adapters should implement `acomplete` and `astructured_complete`. ## LiteLLMAdapter `LiteLLMAdapter` is the default adapter. It uses LiteLLM to call models with OpenAI-compatible inputs. Key behaviors: - calls `litellm.completion(...)` for synchronous requests - calls `litellm.acompletion(...)` for async requests - passes messages and optional model parameters (`temperature`, `max_tokens`, `thinking`, `extra_body`) - only sends `tools` and `tool_choice` when tools are present - supports streaming when requested - enables `parallel_tool_calls` when model metadata indicates `supports_parallel_function_calling` - for streaming calls, emits `llm.completed` on stream exhaustion/close and `llm.failed` if stream iteration raises ### Runtime lifecycle hardening Blackgeorge configures LiteLLM runtime lifecycle once when `LiteLLMAdapter` is initialized: - it applies deterministic shutdown cleanup for async LiteLLM clients - it patches LiteLLM logging-worker enqueue behavior to close dropped coroutines safely - it avoids registering LiteLLM lazy async cleanup in a way that can emit shutdown warnings This hardening was added to prevent process-exit warnings observed in real integrations, including DeepSeek (`deepseek/deepseek-chat`): - `RuntimeWarning: coroutine 'close_litellm_async_clients' was never awaited` - `RuntimeWarning: coroutine 'Logging.async_success_handler' was never awaited` Tool calls are parsed from the response and mapped into `ToolCall` objects. Malformed tool payloads are preserved with `ToolCall.error` metadata instead of crashing adapter parsing. ## Structured output pipeline Structured output uses LiteLLM JSON schema response formats when possible and falls back to Instructor with LiteLLM. Blackgeorge initializes Instructor clients with: - `instructor.from_provider("litellm/")` - `instructor.from_provider("litellm/", async_client=True)` If the LiteLLM structured response fails or is unavailable, the worker calls `chat.completions.create(..., response_model=YourModel)` and returns the validated Pydantic object as `Report.data`. Structured output retries are clamped to a minimum of 3 attempts for resilience (`retries=0` still performs 3 retries after the first failed attempt). ## Adapter hooks for structured output If your adapter implements `structured_complete`/`astructured_complete`, the worker will call those hooks for response-schema jobs. This lets you route structured output through non-LiteLLM providers or custom pipelines. If the hooks are not implemented, the worker falls back to the LiteLLM + Instructor path. ## Cost tracking The LiteLLM adapter provides cost tracking through callback events. When using `LiteLLMAdapter`, the following events are automatically emitted: ### LLM events Subscribe to these events to track costs and usage: ```python from blackgeorge import Desk, Worker, Job desk = Desk(model="openai/gpt-5-nano") # Track LLM costs def on_llm_completed(event): payload = event.payload print(f"Model: {payload['model']}") print(f"Cost: ${payload.get('cost', 0):.6f}") print(f"Tokens: {payload['total_tokens']}") print(f"Latency: {payload['latency_ms']}ms") desk.event_bus.subscribe("llm.completed", on_llm_completed) worker = Worker(name="assistant") report = desk.run(worker, Job(input="Hello")) ``` ### Event payloads **llm.started:** - `model`: Model name being called - `messages_count`: Number of messages in the request - `tools_count`: Number of tools available **llm.completed:** - `model`: Model name - `latency_ms`: Request latency in milliseconds - `prompt_tokens`: Number of prompt tokens - `completion_tokens`: Number of completion tokens - `total_tokens`: Total token count - `cost`: Estimated cost in USD (if available from LiteLLM) **llm.failed:** - `model`: Model name - `latency_ms`: Request latency before failure - `error_type`: Exception class name - `error_message`: Exception message ### Cost utilities The `blackgeorge.adapters.cost` module provides utilities for cost calculation: ```python from blackgeorge.adapters.cost import calculate_cost, get_model_pricing # Calculate cost from a response cost = calculate_cost(response) # Get pricing info for a model pricing = get_model_pricing("openai/gpt-5-nano") ``` ## Custom adapters To use another provider, implement `BaseModelAdapter` and pass it to `Desk(adapter=...)`. # Reference # Configuration Blackgeorge provides extensive configuration options through environment variables, Desk parameters, and module-level constants. ## Environment variables ### Provider API keys Set the API key for your model provider: ```bash # OpenAI export OPENAI_API_KEY="your-key" # Anthropic export ANTHROPIC_API_KEY="your-key" # DeepSeek export DEEPSEEK_API_KEY="your-key" # Azure export AZURE_API_KEY="your-key" export AZURE_API_BASE="https://your-resource.openai.azure.com" export AZURE_API_VERSION="2023-07-01-preview" ``` ### Example changes preservation For the coding agent example, control whether file changes are preserved: ```bash # Preserve example changes (default: changes are restored) export PRESERVE_EXAMPLE_CHANGES="1" ``` ## Desk configuration The `Desk` class accepts comprehensive configuration options: ```python from blackgeorge import Desk desk = Desk( # Model configuration model="openai/gpt-5-nano", temperature=0.2, max_tokens=800, # Streaming stream=False, # Structured output structured_stream_mode="off", structured_output_retries=3, # Limits max_iterations=10, max_tool_calls=20, num_retries=0, # Context window handling respect_context_window=True, max_context_messages=10, # Custom components event_bus=custom_event_bus, run_store=custom_run_store, memory_store=custom_memory_store, adapter=custom_adapter, # Storage storage_dir=".blackgeorge", ) ``` ## RunConfig For programmatic control, use `RunConfig` to bundle run parameters: ```python from blackgeorge import Job, Worker, RunConfig from blackgeorge.adapters import LiteLLMAdapter adapter = LiteLLMAdapter() worker = Worker(name="analyst") job = Job(input="Summarize the latest incident report") config = RunConfig( adapter=adapter, emit=lambda type, source, payload: print(f"{type}: {source}"), run_id="run-123", temperature=0.7, max_tokens=1000, max_iterations=5, max_tool_calls=10, default_model="openai/gpt-5-nano", ) # Use config-based execution report, state = worker.run(config, job) report, state = await worker.arun(config, job) ``` ### RunConfig fields | Field | Type | Default | Description | | --------------------------- | ---------------- | -------- | ------------------------------------------- | | `adapter` | BaseModelAdapter | Required | LLM adapter for model calls | | `emit` | EventEmitter | Required | Event emission callback | | `run_id` | str | Required | Unique run identifier | | `events` | list[Event] | [] | Event list for the run | | `temperature` | float | None | Model temperature | | `max_tokens` | int | None | Maximum completion tokens | | `stream` | bool | False | Enable streaming | | `stream_options` | dict | None | Streaming options | | `structured_output_retries` | int | 3 | Structured output retry count | | `max_iterations` | int | 10 | Maximum model turns | | `max_tool_calls` | int | 20 | Maximum tool calls | | `num_retries` | int | 0 | LiteLLM-level retry count for failed calls | | `respect_context_window` | bool | True | Auto-summarize on context errors (Reactive) | | `max_context_messages` | int | None | None | | `default_model` | str | None | Default model name | ### RunConfig methods ```python # Create a copy with overrides new_config = config.with_overrides(temperature=0.5, max_tokens=500) # Get effective model name (worker model or default) model = config.model_name(worker.model) ``` ### Model configuration | Parameter | Type | Default | Description | | ------------- | ----- | -------- | ------------------------------------------------------ | | `model` | str | Required | Default model name for workers without their own model | | `temperature` | float | None | Model temperature (0.0-1.0) | | `max_tokens` | int | None | Maximum tokens for completion requests | Model names follow LiteLLM provider prefixes: - `openai/gpt-5-nano` - `anthropic/claude-3-opus` - `deepseek/deepseek-chat` ### Streaming | Parameter | Type | Default | Description | | --------- | ---- | ------- | ------------------------------------- | | `stream` | bool | False | Enable streaming for eligible workers | Streaming occurs when: - Desk or desk.run() has `stream=True` - No response schema is set - Or `structured_stream_mode="preview"` with a response schema On tool turns, streamed `stream.token` events contain tool argument deltas. ### Structured output | Parameter | Type | Default | Description | | --------------------------- | ---- | ------- | ------------------------------------------------------------------------------------------------------ | | `structured_stream_mode` | str | "off" | Structured output mode: "off" (strict non-stream) or "preview" (stream tokens, then validate/fallback) | | `structured_output_retries` | int | 3 | Retries for structured output validation failures | ### Execution limits | Parameter | Type | Default | Description | | ---------------- | ---- | ------- | ----------------------------------------------------- | | `max_iterations` | int | 10 | Maximum model turns per worker run | | `max_tool_calls` | int | 20 | Maximum tool calls per worker run | | `num_retries` | int | 0 | LiteLLM retry count for failed LLM calls (0 disables) | When these limits are exceeded, the run fails with an error in `Report.errors`. ### Context window handling | Parameter | Type | Default | Description | | ------------------------ | ---- | ------- | -------------------------------------------------- | | `respect_context_window` | bool | True | Auto-summarize on context length errors (Reactive) | | `max_context_messages` | int | None | None | When `respect_context_window` is enabled, workers summarize conversation history and retry on context limit errors (Reactive). When `max_context_messages` is configured, workers summarize conversation proactively when the number of messages exceeds the limit to maintain a healthy context window. ### Custom components | Parameter | Type | Default | Description | | -------------- | ---------------- | ------- | ---------------------------------- | | `event_bus` | EventBus | None | Custom event bus implementation | | `run_store` | RunStore | None | Custom run store implementation | | `memory_store` | MemoryStore | None | Custom memory store implementation | | `adapter` | BaseModelAdapter | None | Custom model adapter | ### Storage | Parameter | Type | Default | Description | | ------------- | ---- | -------------- | ------------------------------ | | `storage_dir` | str | ".blackgeorge" | Directory for SQLite run store | ## Worker configuration Workers can be configured with several options: ```python from blackgeorge import Worker worker = Worker( # Identity name="Analyst", # Model model="openai/gpt-5-nano", instructions="You are a data analyst.", # Tools tools=[tool1, tool2], # Memory memory_scope="analyst:session-1", ) ``` ### Worker parameters | Parameter | Type | Required | Description | | -------------- | ---------- | -------- | ----------------------------------------------- | | `name` | str | Yes | Worker name used in events and selection | | `model` | str | No | Model name override (uses desk default if None) | | `instructions` | str | No | System instructions for the worker | | `tools` | list[Tool] | No | Tools this worker can use | | `memory_scope` | str | No | Namespace for external memory usage | ## Workforce configuration Workforces support multiple coordination modes: ```python from blackgeorge import Worker, Workforce workers = [Worker(name="a"), Worker(name="b")] workforce = Workforce( workers=workers, mode="managed", # "managed", "collaborate", or "swarm" name="team", manager=manager_worker, # Only for managed mode reducer=custom_reducer, # Only for collaborate mode channel=channel, # Optional blackboard=blackboard, # Optional ) ``` ### Workforce parameters | Parameter | Type | Required | Description | | ------------ | ------------ | -------- | --------------------------------------------------------- | | `workers` | list[Worker] | Yes | Workers in the workforce | | `mode` | str | No | "managed", "collaborate", or "swarm" (default: "managed") | | `name` | str | No | Workforce name for events | | `manager` | Worker | No | Manager for worker selection (managed mode) | | `reducer` | Callable | No | Function to combine worker reports (collaborate mode) | | `channel` | Channel | No | Communication channel between workers | | `blackboard` | Blackboard | No | Shared state for workers | ### Managed mode In managed mode, a manager worker selects one worker, and the selected worker's report is returned. The `reducer` is not used in managed mode. ### Collaborate mode In collaborate mode, workers without tools run concurrently, while workers with tools run sequentially to preserve deterministic pause/resume semantics. Provide a custom reducer to control how reports are combined: ```python from blackgeorge import Report def custom_reducer(reports: list[Report]) -> Report: combined_content = "\n\n".join(r.content or "" for r in reports) merged_errors = [error for report in reports for error in report.errors] return reports[0].model_copy( update={ "status": "failed" if merged_errors else "completed", "content": combined_content, "errors": merged_errors, "pending_action": None, } ) workforce = Workforce( workers=[w1, w2], mode="collaborate", reducer=custom_reducer, ) ``` ### Swarm mode In swarm mode, workers can hand off execution to another worker at runtime with `transfer_to_agent_tool`. The workforce keeps execution inside a single run while switching the active worker and passing the handoff context. ## Context window configuration Module-level constants control context summarization behavior in `src/blackgeorge/worker_context.py`: | Constant | Default | Description | | --------------------------- | ------- | ---------------------------------------------- | | `SUMMARY_CHUNK_TOKENS` | 2000 | Target token count for summarization chunks | | `SUMMARY_TAIL_MESSAGES` | 4 | Number of recent messages to keep unsummarized | | `SUMMARY_ATTEMPT_LIMIT` | 2 | Maximum summarization attempts before failing | | `SUMMARY_MAX_OUTPUT_TOKENS` | 800 | Maximum tokens in summary output | These constants affect worker context summarization for both reactive retries (`respect_context_window=True`) and proactive compaction (`max_context_messages`). ### Model registration For accurate context window handling, register your models in LiteLLM: ```python import litellm litellm.model_cost = { **litellm.model_cost, "my-model": { "input_cost_per_token": 0.00001, "output_cost_per_token": 0.00002, "max_tokens": 128000, }, } ``` You can also register models programmatically: ```python import litellm litellm.register_model( { "my-provider/my-model": { "litellm_provider": "my-provider", "input_cost_per_token": 0.00001, "output_cost_per_token": 0.00002, "max_tokens": 128000, "mode": "chat", }, } ) ``` Model registration only adds pricing/context metadata. The provider prefix still has to be a LiteLLM-supported provider (or routed through LiteLLM proxy or a custom adapter) to make calls. For provider-prefixed model names such as `openai/gpt-5`, Blackgeorge checks both the full name and the unprefixed suffix when deciding whether context metadata is registered. ## Tool configuration Tools accept several configuration options: ```python from blackgeorge.tools import tool def pre_hook(call): print(f"calling {call.name}") def post_hook(call, result): print(f"{call.name} done: {result.error is None}") @tool( name="my_tool", description="Does something useful", requires_confirmation=False, requires_user_input=False, external_execution=False, pre=(pre_hook,), post=(post_hook,), confirmation_prompt="Proceed?", user_input_prompt="Enter value:", timeout=30.0, retries=3, retry_delay=1.0, ) def my_function(param: str) -> str: return param ``` ### Tool parameters | Parameter | Type | Default | Description | | ----------------------- | ------------------------ | -------------- | ------------------------------------------------------------------- | | `name` | str | Function name | Tool name | | `description` | str | Auto-generated | Tool description for the model | | `requires_confirmation` | bool | False | Requires user confirmation before execution | | `requires_user_input` | bool | False | Requires user input before execution | | `external_execution` | bool | False | Mark as external tool | | `pre` | tuple[ToolPreHook, ...] | () | Pre-execution hooks | | `post` | tuple[ToolPostHook, ...] | () | Post-execution hooks | | `confirmation_prompt` | str | Auto-generated | Custom confirmation prompt | | `user_input_prompt` | str | Auto-generated | Custom input prompt | | `input_key` | str | None | Field name to receive resume user input (default key: `user_input`) | | `timeout` | float | None | Execution timeout in seconds | | `retries` | int | 0 | Number of retry attempts | | `retry_delay` | float | 1.0 | Delay between retries (exponential backoff) | ## Memory store configuration ### VectorMemoryStore ```python from blackgeorge.memory import VectorMemoryStore store = VectorMemoryStore( path="/path/to/db", chunk_size=8000, chunk_overlap=200, ) ``` | Parameter | Type | Default | Description | | --------------- | ---- | -------- | ----------------------------- | | `path` | str | Required | Storage path for ChromaDB | | `chunk_size` | int | 8000 | Chunk size for long documents | | `chunk_overlap` | int | 200 | Overlap between chunks | ## Run store configuration ### Custom run store Implement the `RunStore` interface: ```python from typing import Any from blackgeorge.core.event import Event from blackgeorge.core.types import RunStatus from blackgeorge.store import RunRecord, RunState, RunStore class CustomRunStore(RunStore): def create_run(self, run_id: str, input_payload: Any) -> None: # Implementation pass def update_run( self, run_id: str, status: RunStatus, output: str | None, output_json: Any | None, state: RunState | None, ) -> None: # Implementation pass def get_run(self, run_id: str) -> RunRecord | None: # Implementation pass def add_event(self, event: Event) -> None: # Implementation pass def get_events(self, run_id: str) -> list[Event]: # Implementation pass desk = Desk(run_store=CustomRunStore()) ``` ## Event bus configuration ### Custom event bus ```python from blackgeorge.event_bus import EventBus from blackgeorge import Desk bus = EventBus() # Subscribe to events before passing to desk def log_events(event): print(f"{event.type}: {event.payload}") for event_type in ("run.started", "run.completed", "run.failed"): bus.subscribe(event_type, log_events) desk = Desk(event_bus=bus) ``` ## Adapter configuration ### Custom adapter Implement the `BaseModelAdapter` interface: ```python from typing import Any from blackgeorge.adapters.base import BaseModelAdapter, ModelResponse class CustomAdapter(BaseModelAdapter): def complete( self, *, model: str, messages: list[dict[str, Any]], tools: list[dict[str, Any]] | None, tool_choice: str | dict[str, Any] | None, temperature: float | None, max_tokens: int | None, stream: bool, stream_options: dict[str, Any] | None, thinking: dict[str, Any] | None = None, drop_params: bool | None = None, extra_body: dict[str, Any] | None = None, num_retries: int | None = None, ) -> ModelResponse: # Implementation pass async def acomplete( self, *, model: str, messages: list[dict[str, Any]], tools: list[dict[str, Any]] | None, tool_choice: str | dict[str, Any] | None, temperature: float | None, max_tokens: int | None, stream: bool, stream_options: dict[str, Any] | None, thinking: dict[str, Any] | None = None, drop_params: bool | None = None, extra_body: dict[str, Any] | None = None, num_retries: int | None = None, ) -> ModelResponse: # Implementation pass desk = Desk(adapter=CustomAdapter()) ``` ## Logging configuration Blackgeorge uses the `StructuredLogger` for JSON-formatted logs: ```python import logging from blackgeorge.logging import get_logger logger = get_logger("my_app", level=logging.INFO) logger.info("Application started", user_id="123") # {"timestamp": "2024-01-19T...", "level": "INFO", "message": "Application started", "user_id": "123"} ``` See [Logging](https://jolovicdev.github.io/blackgeorge/logging/index.md) for more details. ## Environment variable reference | Variable | Description | | -------------------------- | ------------------------------------------------ | | `OPENAI_API_KEY` | OpenAI API key | | `ANTHROPIC_API_KEY` | Anthropic API key | | `DEEPSEEK_API_KEY` | DeepSeek API key | | `AZURE_API_KEY` | Azure OpenAI API key | | `AZURE_API_BASE` | Azure OpenAI endpoint | | `AZURE_API_VERSION` | Azure API version | | `PRESERVE_EXAMPLE_CHANGES` | Preserve coding-agent example file changes ("1") | ## Configuration best practices 1. **Use environment variables for secrets**: Never hardcode API keys 1. **Set appropriate limits**: Adjust `max_iterations` and `max_tool_calls` based on your use case 1. **Enable context window handling**: Keep `respect_context_window=True` for production 1. **Configure storage_dir**: Use a dedicated directory for run storage 1. **Use memory_scope**: Isolate worker memory when using multiple workers 1. **Register models**: Register custom models in LiteLLM for accurate context handling # Logging Blackgeorge provides a structured logger that outputs JSON-formatted logs with context support. This is useful for debugging, monitoring, and log aggregation systems. ## Getting a logger Use the `get_logger` function to create a logger: ```python from blackgeorge.logging import get_logger logger = get_logger("my_app") logger.info("Application started") ``` ## Log levels The logger supports standard Python logging levels: ```python import logging from blackgeorge.logging import get_logger logger = get_logger("my_app", level=logging.DEBUG) logger.debug("Detailed debug information") logger.info("General information") logger.warning("Warning message") logger.error("Error occurred") logger.critical("Critical failure") ``` ## Structured output All log messages are formatted as JSON: ```python logger.info("User logged in", user_id="123", ip_address="192.168.1.1") ``` Output: ```json {"timestamp": "2024-01-19T10:30:45.123456", "level": "INFO", "message": "User logged in", "user_id": "123", "ip_address": "192.168.1.1"} ``` ## Context support Create loggers with additional context that persists across all log calls: ```python from blackgeorge.logging import get_logger # Base logger with context logger = get_logger("my_app").with_context( service="payment-processor", version="1.0.0", environment="production" ) # Context is automatically included logger.info("Processing payment", payment_id="abc123") ``` Output: ```json { "timestamp": "2024-01-19T10:30:45.123456", "level": "INFO", "message": "Processing payment", "service": "payment-processor", "version": "1.0.0", "environment": "production", "payment_id": "abc123" } ``` ### Extending context Create new loggers with additional context from an existing logger: ```python base_logger = get_logger("my_app").with_context(service="api") # Add request-specific context request_logger = base_logger.with_context( request_id="req-456", user_id="user-789" ) request_logger.info("Handling request") ``` ## Using with Blackgeorge components ### Logging in tools ```python from blackgeorge.tools import tool from blackgeorge.logging import get_logger logger = get_logger("tools").with_context(component="file_operations") @tool() def read_file(file_path: str) -> str: logger.info("Reading file", file_path=file_path) try: with open(file_path) as f: content = f.read() logger.info("File read successfully", file_path=file_path, size=len(content)) return content except Exception as e: logger.error("Failed to read file", file_path=file_path, error=str(e)) raise ``` ### Logging in event handlers ```python from blackgeorge import Desk from blackgeorge.logging import get_logger logger = get_logger("events").with_context(component="event_monitor") def on_tool_completed(event): logger.info("Tool completed", tool=event.source, run_id=event.run_id) desk = Desk(model="openai/gpt-5-nano") desk.event_bus.subscribe("tool.completed", on_tool_completed) ``` ### Logging in custom stores ```python from typing import Any from blackgeorge.store import RunStore from blackgeorge.logging import get_logger logger = get_logger("stores").with_context(component="custom_store") class CustomRunStore(RunStore): def create_run(self, run_id: str, input_payload: Any) -> None: logger.debug("Creating run", run_id=run_id) # Implementation logger.info("Run created", run_id=run_id) ``` ## Log levels and filtering Control log verbosity by setting the log level: ```python import logging from blackgeorge.logging import get_logger # Only WARNING and above will be logged logger = get_logger("my_app", level=logging.WARNING) logger.debug("This won't be logged") logger.info("Neither will this") logger.warning("This will be logged") logger.error("So will this") ``` ## Configuring handlers The `StructuredLogger` uses Python's standard logging module. Configure handlers for different output destinations: ```python import logging from blackgeorge.logging import get_logger # Get the underlying logger logger = get_logger("my_app") underlying = logger.logger # Add a file handler file_handler = logging.FileHandler("app.log") file_handler.setLevel(logging.DEBUG) underlying.addHandler(file_handler) # Add a stream handler with different level console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) underlying.addHandler(console_handler) ``` ## Timestamps All log entries include UTC timestamps in ISO 8601 format: ```python logger.info("Event occurred") # {"timestamp": "2024-01-19T10:30:45.123456+00:00", "level": "INFO", "message": "Event occurred"} ``` ## Data types The logger handles various data types in context: ```python logger.info( "Complex data", integer=42, floating=3.14, boolean=True, none_value=None, list=[1, 2, 3], dict={"key": "value"}, custom_object=str(CustomClass()) ) ``` ## Error logging Log exceptions with context: ```python try: risky_operation() except Exception as e: logger.error("Operation failed", error=str(e), error_type=type(e).__name__) raise ``` ## Performance considerations 1. **JSON serialization**: The logger uses `json.dumps` with `default=str` for non-serializable objects 1. **Handler selection**: Use appropriate log levels to avoid excessive logging 1. **Async operations**: For high-throughput scenarios, consider offloading log processing ## Integration with observability platforms The JSON output format is compatible with most log aggregation systems: ### ELK Stack (Elasticsearch, Logstash, Kibana) ```python logger.info("API request", method="GET", path="/api/users", status=200) ``` ### Splunk ```python logger.info("Transaction completed", transaction_id="abc123", amount=99.99) ``` ### Datadog ```python logger.info("Metric recorded", metric_name="request.duration", value=123.45, tags=["api:v1"]) ``` ## StructuredLogger API ### Creating a logger ```python from blackgeorge.logging import StructuredLogger logger = StructuredLogger("my_app", level=logging.INFO) ``` ### Methods | Method | Description | | ----------------------------- | ----------------------------------------- | | `debug(message, **kwargs)` | Log debug message | | `info(message, **kwargs)` | Log info message | | `warning(message, **kwargs)` | Log warning message | | `error(message, **kwargs)` | Log error message | | `critical(message, **kwargs)` | Log critical message | | `with_context(**kwargs)` | Return new logger with additional context | ### Logger properties | Property | Description | | --------- | ------------------------------ | | `logger` | The underlying Python logger | | `context` | The current context dictionary | ## Examples ### Request logging ```python from blackgeorge.logging import get_logger logger = get_logger("api").with_context(service="user-service") def handle_request(request): request_logger = logger.with_context( request_id=request.id, user_id=request.user_id ) request_logger.info("Request started", path=request.path) try: response = process_request(request) request_logger.info("Request completed", status=response.status_code) return response except Exception as e: request_logger.error("Request failed", error=str(e)) raise ``` ### Tool execution tracking ```python from blackgeorge.tools import tool from blackgeorge.logging import get_logger logger = get_logger("tools").with_context(component="database") @tool() def query_database(sql: str) -> list[dict]: logger.info("Executing query", sql=sql) try: results = execute_sql(sql) logger.info("Query completed", row_count=len(results)) return results except Exception as e: logger.error("Query failed", sql=sql, error=str(e)) raise ``` ### Multi-component logging ```python from blackgeorge.logging import get_logger # Create base logger for the application base_logger = get_logger("myapp").with_context( app="myapp", version="1.0.0" ) # Create component-specific loggers db_logger = base_logger.with_context(component="database") api_logger = base_logger.with_context(component="api") worker_logger = base_logger.with_context(component="worker") # All logs include app and version context db_logger.info("Connected to database") api_logger.info("Received request") worker_logger.info("Processing job") ``` ## Best practices 1. **Use descriptive log levels**: Reserve ERROR for failures, WARNING for potential issues 1. **Include relevant context**: Add structured data that helps with debugging 1. **Use consistent field names**: Use `user_id` not `userId` or `userIdentifier` 1. **Avoid sensitive data**: Don't log passwords, tokens, or personal information 1. **Log at appropriate granularity**: Debug logs should be disableable in production 1. **Use context wisely**: Share context across related log calls 1. **Handle serialization**: Use `str()` for complex objects that can't be JSON-serialized # Examples # Examples This repository includes a coding agent example under `examples/coding_agent`. ## What it demonstrates - Desk, Worker, Workforce usage - Tool calls and pause/resume - Workflow steps with parallel execution - Event streaming and logging - Tool timeouts and retries - Vector memory for semantic search and recall - Channel and blackboard collaboration tools - Tool result previews in event payloads ## Run the example Set your API key for the selected model provider and run the script. ```text export DEEPSEEK_API_KEY="..." python examples/coding_agent/run.py ``` The example stores run data at: ```text examples/coding_agent/.blackgeorge/blackgeorge.db ``` ## Flow overview The coding agent example is a managed workforce with a manager, coder, reviewer, and narrator. The manager selects the coder, the coder performs file operations with confirmation, and the reviewer summarizes changes. After the main run, a flow executes reviewer and narrator in parallel for a structured report and a human summary. The flow job includes a `changed_files` list so the reviewer can ground its report in the actual modified files. Tool completion events include a short result preview so tool outputs are visible in logs without dumping full payloads. ## API surface used by the example This example uses only public APIs and does not remove or replace any existing ones. The changes in this repository are additive and keep the original API surface intact. Core runtime APIs: - `Desk`, `Worker`, `Workforce`, `Job`, and `Report` drive the run lifecycle. - `Workflow` nodes (`Step`, `Parallel`) orchestrate post-run summaries. Tooling APIs: - `tool` decorator registers callable tools with input validation. - `ToolResult` carries tool output, errors, timeouts, and cancellation flags. - `execute_tool` and `aexecute_tool` run tools with timeouts and retries. Collaboration APIs: - `Channel` and `Blackboard` provide messaging and shared state. - `channel_send`, `channel_receive`, and `blackboard_write` are tool wrappers around those primitives. Memory APIs: - `VectorMemoryStore` stores file content and notes for semantic search. - `search_docs`, `remember`, and `recall` are tools built on top of the memory store. Event APIs: - The event bus emits `tool.completed` with a short `result_preview` so tool outputs are visible in logs. ## Collaboration tools Workers can communicate through channel tools and store shared state via blackboard tools. These tools are exposed as normal `Tool` instances so workers can call them like any other tool. ## Vector memory File reads and notes are stored in the vector memory store to enable semantic search and recall. This allows the agent to look up recently read files and remembered decisions. ## Notes - The example edits files inside `examples/coding_agent/project`. - By default, edits are restored after the run completes. - Set `PRESERVE_EXAMPLE_CHANGES=1` to keep changes. - Set `BLACKGEORGE_STREAM=0` to disable streaming. # Development # Development This project uses pytest for tests, ruff for linting, and mypy for type checking. ## Development workflow Typical local workflow: 1. Install dev dependencies. 1. Make code and docs changes. 1. Run ruff to catch lint and style issues early. 1. Run mypy to ensure type correctness under the strict config. 1. Run pytest to validate behavior. 1. Run MkDocs serve/build to verify documentation changes. This order keeps fast feedback loops first (ruff and mypy), then tests, then docs. MkDocs uses `docs/index.md` as the site home. The `docs/README.md` file is excluded to avoid a name conflict with the home page. ## Install dev dependencies ```text uv pip install -e .[dev] ``` ## Run tests ```text uv run pytest ``` ## Lint ```text uv run ruff check . ``` ```text uv run ruff format . ``` ## Docs site ```text uv run mkdocs serve ``` ```text uv run mkdocs build ``` ## Type check ```text uv run mypy src ```