Workflow¶
The workflow layer lets you compose multiple steps into a flow. Use it when you need a multi-step pipeline or parallel branches.
Flow¶
Create a flow from steps using the desk.
from blackgeorge import Desk, Job, Worker
from blackgeorge.workflow import Step
desk = Desk(model="openai/gpt-5-nano")
analyst = Worker(name="Analyst")
writer = Worker(name="Writer")
flow = desk.flow([Step(analyst), Step(writer)])
report = flow.run(Job(input="Analyze feedback"))
A flow produces a report. If there are multiple steps, the content is combined with step headers.
Steps¶
- Step: runs a worker or workforce once
- Parallel: runs steps concurrently and returns all results
- Condition: chooses a branch based on a predicate
- Router: selects a route by string key
- Loop: repeats steps until a stop predicate or max iterations
- AsyncCondition: chooses a branch based on an async predicate
- AsyncLoop: repeats steps until an async stop predicate or max iterations
Step¶
Step wraps a runner and optionally provides a job_builder to create a job per step.
from blackgeorge import Job, Worker
from blackgeorge.workflow import Step
worker = Worker(name="Analyst")
def build_job(context):
return Job(input={"task": "Analyze", "seed": context.job.input})
step = Step(worker, job_builder=build_job)
Parallel¶
Condition¶
from blackgeorge.workflow import Condition, Step
condition = Condition(
predicate=lambda ctx: bool(ctx.outputs),
if_true=[Step(worker_a)],
if_false=[Step(worker_b)],
)
Router¶
from blackgeorge.workflow import Router, Step
router = Router(
selector=lambda ctx: "fast",
routes={"fast": [Step(worker_a)], "deep": [Step(worker_b)]},
)
Loop¶
from blackgeorge.workflow import Loop, Step
loop = Loop(
steps=[Step(worker_a)],
stop=lambda ctx: len(ctx.outputs) >= 3,
max_iterations=5,
name="analysis_loop", # Optional: for tracking iterations
)
Loops track iterations in the WorkflowContext:
def stop_when_done(ctx):
iteration = ctx.loop_iteration("analysis_loop")
return iteration >= 3 or len(ctx.outputs) > 0
AsyncCondition¶
Use async predicates when the condition requires async operations:
from blackgeorge.workflow import AsyncCondition, Step
async def check_external_service(ctx):
result = await fetch_status()
return result.ready
condition = AsyncCondition(
predicate=check_external_service,
if_true=[Step(worker_a)],
if_false=[Step(worker_b)],
)
AsyncLoop¶
Use async predicates for loop termination:
from blackgeorge.workflow import AsyncLoop, Step
async def should_stop(ctx):
result = await check_completion()
return result.done
loop = AsyncLoop(
steps=[Step(worker_a)],
stop=should_stop,
max_iterations=10,
name="polling_loop",
)
Pause and resume¶
If a step pauses, the flow stores the current step state and outputs. Resume with the same report and a decision or input.
Composite nodes (Condition, Router, Loop) short-circuit on paused or failed results to prevent subsequent steps from running.
Async usage¶
Use the async APIs when you already have an event loop.
report = await flow.arun(job)
if report.status == "paused":
report = await flow.aresume(report, True)
Async flows use non-blocking model calls, and Parallel runs steps concurrently. The sync wrappers flow.run and flow.resume raise if called from a running event loop.