Build A Pipeline#
This guide shows a production-oriented extraction workflow from mixed sources to review output.
1. Define Entity Schemas#
from pydantic import BaseModel
class PersonAttrs(BaseModel):
role: str | None = None
class CompanyAttrs(BaseModel):
industry: str | None = None
2. Build Task and Examples#
from sourcery.contracts import EntitySchemaSet, EntitySpec, ExtractionExample, ExtractionTask, ExampleExtraction
task = ExtractionTask(
instructions="Extract people and companies mentioned in the text.",
schema=EntitySchemaSet(
entities=[
EntitySpec(name="person", attributes_model=PersonAttrs),
EntitySpec(name="company", attributes_model=CompanyAttrs),
]
),
examples=[
ExtractionExample(
text="Ada is CEO at ByteWorks.",
extractions=[
ExampleExtraction(entity="person", text="Ada", attributes={"role": "CEO"}),
ExampleExtraction(entity="company", text="ByteWorks", attributes={"industry": "software"}),
],
)
],
strict_example_alignment=True,
)
3. Run from Mixed Sources#
import sourcery
from sourcery.contracts import ExtractOptions, ExtractRequest, RuntimeConfig
from sourcery.ingest import load_source_documents
sources = [
"docs/input/report.pdf",
"https://example.com/news/article",
"Inline note: Helen joined Orbit Labs as COO.",
]
runtime = RuntimeConfig(model="deepseek/deepseek-chat", temperature=0.0)
options = ExtractOptions(
max_chunk_chars=1200,
max_passes=2,
stop_when_no_new_extractions=True,
allow_unresolved=False,
)
request = ExtractRequest(
documents=load_source_documents(sources),
task=task,
runtime=runtime,
options=options,
)
result = sourcery.extract(request)
4. Persist and Review#
from pathlib import Path
from sourcery.io import save_extract_result_jsonl, write_document_html, write_reviewer_html
output = Path("output")
output.mkdir(parents=True, exist_ok=True)
save_extract_result_jsonl(result, output / "result.jsonl")
for index, document in enumerate(result.documents):
write_document_html(document, output / f"doc-{index}.viewer.html")
write_reviewer_html(document, output / f"doc-{index}.reviewer.html")
5. Inspect Metrics and Warnings#
print(result.metrics.model_dump(mode="json"))
for warning in result.warnings:
print("warning:", warning)
6. Replay Raw Runtime Runs (Optional)#
from sourcery.runtime import SourceryEngine
engine = SourceryEngine()
raw_run_id = None
if result.documents and result.documents[0].extractions:
raw_run_id = result.documents[0].extractions[0].provenance.raw_run_id
if raw_run_id is not None:
replay_payload, replay_events = engine.replay_run(request, raw_run_id)
Use replay for provider debugging, audits, or incident postmortems.