Skip to content

Collaboration

Blackgeorge provides built-in collaboration primitives for multi-agent coordination. Workers can communicate through direct messaging and share state through a common data store.

Blackboard

A Blackboard provides shared state across workers. Multiple workers can read from and write to the same blackboard, enabling coordination through shared memory.

Creating a blackboard

from blackgeorge.collaboration import Blackboard

blackboard = Blackboard()

Writing to the blackboard

blackboard.write("analysis_result", {"score": 95}, author="analyst")

The write method takes: - key: A string identifier for the data - value: Any JSON-serializable value - author: The name of the worker writing the data

Reading from the blackboard

result = blackboard.read("analysis_result")
print(result)  # {"score": 95}

# Check if a key exists
if blackboard.exists("analysis_result"):
    value = blackboard.read("analysis_result")

Reading entries with metadata

entry = blackboard.read_entry("analysis_result")
print(entry.key)         # "analysis_result"
print(entry.value)       # {"score": 95}
print(entry.author)      # "analyst"
print(entry.created_at)  # datetime of first write
print(entry.updated_at)  # datetime of last update

Listing all entries

entries = blackboard.all_entries()
for key, entry in entries.items():
    print(f"{key} by {entry.author}: {entry.value}")

Subscribing to changes

def on_write(key: str, value: Any, author: str) -> None:
    print(f"{author} wrote {key}: {value}")

# Subscribe to a specific key
blackboard.subscribe("analysis_result", on_write)

# Subscribe to all changes
blackboard.subscribe_all(on_write)

Other operations

# Delete a key
blackboard.delete("analysis_result")

# List all keys
keys = blackboard.keys()

# Clear all entries
blackboard.clear()

Channel

A Channel enables direct messaging between workers. Messages can be sent to specific recipients or broadcast to all workers.

Creating a channel

from blackgeorge.collaboration import Channel

channel = Channel()

Sending direct messages

message = channel.send(
    sender="worker_a",
    recipient="worker_b",
    content={"task": "analyze data"},
    metadata={"priority": "high"}
)
print(message.id)  # Unique message ID

Broadcasting messages

message = channel.broadcast(
    sender="manager",
    content={"status": "starting"},
    metadata={"phase": "1"}
)

Receiving messages

# Receive all messages for this worker (clears after reading)
messages = channel.receive("worker_b")
for msg in messages:
    print(f"From {msg.sender}: {msg.content}")

# Peek at messages without clearing
messages = channel.peek("worker_b")

# Receive with broadcast mode options
messages = channel.receive(
    "worker_b",
    clear=False,                # Don't clear after reading
    broadcast_mode="one_shot"   # Each broadcast seen once (default)
)

# Receive all broadcasts (including previously seen)
messages = channel.receive(
    "worker_b",
    broadcast_mode="all"
)

Broadcast modes

  • one_shot: Each broadcast is delivered only once per recipient. The channel tracks which broadcasts each recipient has seen.
  • all: All broadcasts are returned every time. Useful for treating broadcasts as a log.

Message structure

Each message has:

@dataclass
class ChannelMessage:
    id: str                        # Unique message ID
    sender: str                    # Sender's name
    recipient: str | None          # None for broadcasts
    content: Any                   # Message content
    timestamp: datetime            # When sent
    metadata: dict[str, Any]       # Additional data

Channel utilities

# Get all messages across all recipients
all_messages = channel.all_messages()

# Clear messages for a specific recipient
channel.clear("worker_b")

# Clear all messages
channel.clear()

Using with Workforce

Pass channel and blackboard when creating a workforce:

from blackgeorge import Worker, Workforce
from blackgeorge.collaboration import Channel, Blackboard

channel = Channel()
blackboard = Blackboard()

worker_a = Worker(name="worker_a")
worker_b = Worker(name="worker_b")

workforce = Workforce(
    [worker_a, worker_b],
    mode="collaborate",
    channel=channel,
    blackboard=blackboard,
)

Workers can access the collaboration primitives:

# In worker tool implementations
workforce.channel.send("worker_a", "worker_b", {"data": "value"})
workforce.blackboard.write("result", {"status": "done"}, author="worker_a")

Collaboration Tools

Blackgeorge provides tool wrappers around collaboration primitives so workers can use them through tool calls.

Channel tools

from blackgeorge.collaboration import channel_send_tool, channel_receive_tool, channel_broadcast_tool

send_tool = channel_send_tool(channel, sender="worker_a")
receive_tool = channel_receive_tool(channel, recipient="worker_b")
broadcast_tool = channel_broadcast_tool(channel, sender="manager")

worker = Worker(name="worker_a", tools=[send_tool, receive_tool, broadcast_tool])

Blackboard tools

from blackgeorge.collaboration import blackboard_read_tool, blackboard_write_tool

read_tool = blackboard_read_tool(blackboard)
write_tool = blackboard_write_tool(blackboard, author="worker_a")

worker = Worker(name="worker_a", tools=[read_tool, write_tool])

Tool parameters

channel_send: - recipient (str): The recipient worker name - content (JsonValue): Message content - metadata (dict, optional): Additional message metadata - Returns: Message ID

channel_broadcast: - content (JsonValue): Broadcast content - metadata (dict, optional): Additional metadata - Returns: Message ID

channel_receive: - broadcast_mode ("all" or "one_shot"): How to handle broadcasts - clear (bool): Whether to clear messages after reading - Returns: List of message dictionaries

blackboard_write: - key (str): The key to write - value (JsonValue): The value to store - Returns: The key that was written

blackboard_read: - key (str): The key to read - Returns: The stored value or None

Example: Multi-agent analysis

from blackgeorge import Desk, Job, Worker, Workforce
from blackgeorge.collaboration import Channel, Blackboard
from blackgeorge.collaboration.tools import (
    channel_send_tool,
    channel_receive_tool,
    blackboard_read_tool,
    blackboard_write_tool,
)

# Create collaboration primitives
channel = Channel()
blackboard = Blackboard()

# Create workers with collaboration tools
analyst = Worker(
    name="analyst",
    tools=[
        blackboard_write_tool(blackboard, "analyst"),
        channel_send_tool(channel, "analyst"),
    ],
)

reviewer = Worker(
    name="reviewer",
    tools=[
        blackboard_read_tool(blackboard),
        blackboard_write_tool(blackboard, "reviewer"),
        channel_send_tool(channel, "reviewer"),
        channel_receive_tool(channel, "reviewer"),
    ],
)

# Create workforce
workforce = Workforce(
    [analyst, reviewer],
    mode="collaborate",
    channel=channel,
    blackboard=blackboard,
)

# Run
desk = Desk(model="openai/gpt-5-nano")
job = Job(input="Analyze the data and have it reviewed")
report = desk.run(workforce, job)

Thread safety

Both Blackboard and Channel are thread-safe. They use internal locks to ensure safe concurrent access from multiple workers.

Async methods

For use in async contexts (like Workforce with mode="collaborate"), both Blackboard and Channel provide async method variants:

Async Blackboard

# Async variants of all methods
await blackboard.awrite("key", value, "author")
value = await blackboard.aread("key")
entry = await blackboard.aread_entry("key")
exists = await blackboard.aexists("key")
deleted = await blackboard.adelete("key")
keys = await blackboard.akeys()
entries = await blackboard.aall_entries()
await blackboard.asubscribe("key", callback)
await blackgeorge.asubscribe_all(callback)
await blackboard.aunsubscribe("key", callback)
await blackboard.aclear()

Async Channel

# Async variants of all methods
message = await channel.asend("sender", "recipient", content, metadata)
message = await channel.abroadcast("sender", content, metadata)
messages = await channel.areceive("recipient", clear=True, broadcast_mode="one_shot")
messages = await channel.apeek("recipient")
await channel.aclear("recipient")
messages = await channel.aall_messages()

Why async methods?

When using Workforce with mode="collaborate", workers run in parallel using asyncio.gather. The async variants use asyncio.to_thread() to delegate to sync methods, ensuring thread-safe access through a single threading.Lock. This prevents blocking the event loop while maintaining correct synchronization.