# Sourcery Documentation > Sourcery is a Python LLM extraction framework for converting text, PDFs, URLs, HTML, and VLM OCR image sources into typed, source-grounded Pydantic data with JSONL and HTML review workflows. Sourcery is a Python LLM extraction framework for converting unstructured text, PDFs, URLs, HTML, and VLM OCR image sources into typed, source-grounded Pydantic data with async extraction, streaming chunk events, traceability, and reviewable output. # Getting Started # Sourcery: Schema-First LLM Document Extraction for Python Sourcery converts unstructured text, PDFs, HTML, URLs, and image OCR output into typed, source-grounded Pydantic data. If you can define your target entities as Pydantic models, you can run reproducible LLM extraction pipelines with deterministic chunking, source-span alignment, retry policy, async execution, streaming chunk events, and reviewable outputs. ## What You Build With It - Typed entity extraction with strict schema validation. - Character-grounded spans (`char_start`, `char_end`) for each extraction. - Deterministic chunking, alignment, and merge behavior. - Optional document-level reconciliation into canonical claims. - Native async extraction and chunk-level streaming events. - JSONL + HTML outputs for downstream systems and human review. ## Core Boundaries 1. Contracts: `sourcery/contracts` defines all request/result primitives. 1. Pipeline: `sourcery/pipeline` handles chunking, prompt envelopes, alignment, merge. 1. Runtime: `sourcery/runtime` executes model calls and orchestration. 1. IO + Review: `sourcery/io` persists and visualizes extraction results. ## Start Here 1. Install and configure credentials: [Getting Started / Installation](https://jolovicdev.github.io/sourcery/getting-started/installation/index.md) 1. Run first extraction in \<5 minutes: [Getting Started / Quickstart](https://jolovicdev.github.io/sourcery/getting-started/quickstart/index.md) 1. Build from mixed sources (PDF/URL/text): [Guides / Build A Pipeline](https://jolovicdev.github.io/sourcery/guides/build-a-pipeline/index.md) 1. Tune reliability and throughput: [Guides / Runtime And Tuning](https://jolovicdev.github.io/sourcery/guides/runtime-and-tuning/index.md) ## Minimal End-to-End Path ``` uv sync --extra ingest export DEEPSEEK_API_KEY="..." uv run python - <<'PY' from pydantic import BaseModel import sourcery from sourcery.contracts import EntitySchemaSet, EntitySpec, ExtractRequest, ExtractionExample, ExtractionTask, ExampleExtraction, RuntimeConfig class PersonAttrs(BaseModel): role: str | None = None request = ExtractRequest( documents="Alice is CTO at Acme.", task=ExtractionTask( instructions="Extract people and role if present.", schema=EntitySchemaSet(entities=[EntitySpec(name="person", attributes_model=PersonAttrs)]), examples=[ ExtractionExample( text="Bob is CEO.", extractions=[ExampleExtraction(entity="person", text="Bob", attributes={"role": "CEO"})], ) ], ), runtime=RuntimeConfig(model="deepseek/deepseek-chat"), ) result = sourcery.extract(request) print(result.metrics.model_dump(mode="json")) PY ``` If you do not use DeepSeek, set the provider key required by your selected model route. ## Documentation Build Serve locally: ``` uv run --extra docs mkdocs serve ``` Build static docs with strict validation: ``` uv run --extra docs mkdocs build --strict ``` # Install Sourcery for Python LLM Extraction ## Requirements - Python `>=3.12` - `uv` package manager - API key for your model provider (for example `DEEPSEEK_API_KEY`) Base runtime dependency is `blackgeorge` (installed automatically with `sourceryforge`). PyPI distribution name is `sourceryforge`, while Python import path remains `sourcery`. ## Install Dependencies Core package only: ``` uv sync ``` Install from PyPI: ``` pip install sourceryforge ``` With ingestion adapters (PDF/URL workflows): ``` uv sync --extra ingest ``` With dev tooling (tests, lint, type-check): ``` uv sync --extra dev --extra ingest ``` With docs tooling: ``` uv sync --extra docs ``` ## Provider Credentials Sourcery runtime calls are delegated to the configured runtime/provider (`RuntimeConfig.model`). This repository itself explicitly reads these keys in helper scripts: ``` export DEEPSEEK_API_KEY="..." ``` ``` export OPENROUTER_API_KEY="..." ``` Set the credentials required by your selected model route before calling extraction. For example, `RuntimeConfig(model="deepseek/deepseek-chat")` typically requires `DEEPSEEK_API_KEY`. Benchmark scripts use `DEEPSEEK_API_KEY` or `OPENROUTER_API_KEY` depending on `--sourcery-model`. ## Smoke Test ``` uv run python - <<'PY' from pydantic import BaseModel import sourcery from sourcery.contracts import EntitySchemaSet, EntitySpec, ExtractRequest, ExtractionExample, ExtractionTask, ExampleExtraction, RuntimeConfig class PersonAttrs(BaseModel): role: str | None = None request = ExtractRequest( documents="Alice is CTO at Acme.", task=ExtractionTask( instructions="Extract people and role if present.", schema=EntitySchemaSet(entities=[EntitySpec(name="person", attributes_model=PersonAttrs)]), examples=[ ExtractionExample( text="Bob is CEO.", extractions=[ExampleExtraction(entity="person", text="Bob", attributes={"role": "CEO"})], ) ], ), runtime=RuntimeConfig(model="deepseek/deepseek-chat"), ) result = sourcery.extract(request) print(result.metrics.model_dump(mode="json")) PY ``` ## Validate Full Dev Environment ``` uv run --extra dev pytest -q uv run --extra dev ruff check sourcery tests uv run --extra dev mypy sourcery ``` # Sourcery Quickstart: Extract Typed Data from Text with an LLM This quickstart creates a Pydantic-backed extraction task, runs LLM extraction, and writes source-grounded JSONL plus HTML review outputs. ## 1. Create `quickstart.py` ``` from pathlib import Path from pydantic import BaseModel import sourcery from sourcery.contracts import ( EntitySchemaSet, EntitySpec, ExtractRequest, ExtractOptions, ExtractionExample, ExtractionTask, ExampleExtraction, RuntimeConfig, ) from sourcery.io import save_extract_result_jsonl, write_document_html, write_reviewer_html class PersonAttributes(BaseModel): role: str | None = None class OrganizationAttributes(BaseModel): sector: str | None = None request = ExtractRequest( documents=( "Alice Johnson is the CEO of Acme Robotics. " "Acme Robotics builds warehouse automation systems." ), task=ExtractionTask( instructions="Extract person and organization entities with useful attributes.", schema=EntitySchemaSet( entities=[ EntitySpec(name="person", attributes_model=PersonAttributes), EntitySpec(name="organization", attributes_model=OrganizationAttributes), ] ), examples=[ ExtractionExample( text="Bob is CTO at Nova Labs.", extractions=[ ExampleExtraction(entity="person", text="Bob", attributes={"role": "CTO"}), ExampleExtraction(entity="organization", text="Nova Labs", attributes={"sector": "software"}), ], ) ], ), options=ExtractOptions(max_passes=2, stop_when_no_new_extractions=True), runtime=RuntimeConfig(model="deepseek/deepseek-chat", temperature=0.0), ) result = sourcery.extract(request) output_dir = Path("output") output_dir.mkdir(parents=True, exist_ok=True) save_extract_result_jsonl(result, output_dir / "result.jsonl") write_document_html(result.documents[0], output_dir / "document.viewer.html") write_reviewer_html(result.documents[0], output_dir / "document.reviewer.html") print("Run ID:", result.run_trace.run_id) print("Documents:", result.metrics.documents_total) print("Extractions:", result.metrics.extracted_total) print("Warnings:", len(result.warnings)) ``` ## 2. Run It ``` uv run python quickstart.py ``` ## 3. Inspect Outputs - `output/result.jsonl`: machine-friendly output for downstream processing. - `output/document.viewer.html`: span-highlighted read-only viewer. - `output/document.reviewer.html`: interactive approve/reject review UI. ## Async Variant ``` result = await sourcery.aextract(request) ``` ## What To Do Next - Move to [Build a Pipeline](https://jolovicdev.github.io/sourcery/guides/build-a-pipeline/index.md) for mixed-source ingestion. - Move to [Runtime and Tuning](https://jolovicdev.github.io/sourcery/guides/runtime-and-tuning/index.md) for reliability/throughput tuning. # Guides # Build a Document Extraction Pipeline with Sourcery This guide shows a production-oriented LLM extraction workflow from mixed sources to source-grounded review output. ## 1. Define Entity Schemas ``` from pydantic import BaseModel class PersonAttrs(BaseModel): role: str | None = None class CompanyAttrs(BaseModel): industry: str | None = None ``` ## 2. Build Task and Examples ``` from sourcery.contracts import EntitySchemaSet, EntitySpec, ExtractionExample, ExtractionTask, ExampleExtraction task = ExtractionTask( instructions="Extract people and companies mentioned in the text.", schema=EntitySchemaSet( entities=[ EntitySpec(name="person", attributes_model=PersonAttrs), EntitySpec(name="company", attributes_model=CompanyAttrs), ] ), examples=[ ExtractionExample( text="Ada is CEO at ByteWorks.", extractions=[ ExampleExtraction(entity="person", text="Ada", attributes={"role": "CEO"}), ExampleExtraction(entity="company", text="ByteWorks", attributes={"industry": "software"}), ], ) ], strict_example_alignment=True, ) ``` ## 3. Run from Mixed Sources ``` import sourcery from sourcery.contracts import ExtractOptions, ExtractRequest, RuntimeConfig from sourcery.ingest import load_source_documents sources = [ "docs/input/report.pdf", "https://example.com/news/article", "Inline note: Helen joined Orbit Labs as COO.", ] runtime = RuntimeConfig(model="deepseek/deepseek-chat", temperature=0.0) options = ExtractOptions( max_chunk_chars=1200, max_passes=2, stop_when_no_new_extractions=True, allow_unresolved=False, ) request = ExtractRequest( documents=load_source_documents(sources), task=task, runtime=runtime, options=options, ) result = sourcery.extract(request) ``` ## 4. Persist and Review ``` from pathlib import Path from sourcery.io import save_extract_result_jsonl, write_document_html, write_reviewer_html output = Path("output") output.mkdir(parents=True, exist_ok=True) save_extract_result_jsonl(result, output / "result.jsonl") for index, document in enumerate(result.documents): write_document_html(document, output / f"doc-{index}.viewer.html") write_reviewer_html(document, output / f"doc-{index}.reviewer.html") ``` ## 5. Inspect Metrics and Warnings ``` print(result.metrics.model_dump(mode="json")) for warning in result.warnings: print("warning:", warning) ``` ## 6. Replay Raw Runtime Runs (Optional) ``` from sourcery.runtime import SourceryEngine engine = SourceryEngine() raw_run_id = None if result.documents and result.documents[0].extractions: raw_run_id = result.documents[0].extractions[0].provenance.raw_run_id if raw_run_id is not None: replay_payload, replay_events = engine.replay_run(request, raw_run_id) ``` Use replay for provider debugging, audits, or incident postmortems. # Sourcery Code Examples for Python LLM Extraction Practical examples for the current Sourcery API. ## 1) Minimal typed extraction ``` from pydantic import BaseModel import sourcery from sourcery.contracts import ( EntitySchemaSet, EntitySpec, ExtractRequest, ExtractionExample, ExtractionTask, ExampleExtraction, RuntimeConfig, ) class PersonAttrs(BaseModel): role: str | None = None request = ExtractRequest( documents="Alice is the CEO of Acme.", task=ExtractionTask( instructions="Extract people and their role.", schema=EntitySchemaSet( entities=[EntitySpec(name="person", attributes_model=PersonAttrs)] ), examples=[ ExtractionExample( text="Bob is the CTO.", extractions=[ ExampleExtraction( entity="person", text="Bob", attributes={"role": "CTO"}, ) ], ) ], ), runtime=RuntimeConfig(model="deepseek/deepseek-chat"), ) result = sourcery.extract(request) print(result.metrics.model_dump(mode="json")) print(result.documents[0].extractions) ``` ## 2) Multi-entity extraction with options ``` from pydantic import BaseModel import sourcery from sourcery.contracts import ( EntitySchemaSet, EntitySpec, ExtractOptions, ExtractRequest, ExtractionExample, ExtractionTask, ExampleExtraction, RuntimeConfig, SourceDocument, ) class PersonAttrs(BaseModel): role: str | None = None class OrgAttrs(BaseModel): industry: str | None = None request = ExtractRequest( documents=[ SourceDocument(document_id="doc-1", text="Alice joined Acme as CEO."), SourceDocument(document_id="doc-2", text="Bob became CTO at Globex."), ], task=ExtractionTask( instructions="Extract people and organizations.", schema=EntitySchemaSet( entities=[ EntitySpec(name="person", attributes_model=PersonAttrs), EntitySpec(name="organization", attributes_model=OrgAttrs), ] ), examples=[ ExtractionExample( text="Carol works at Initech.", extractions=[ ExampleExtraction(entity="person", text="Carol", attributes={"role": None}), ExampleExtraction( entity="organization", text="Initech", attributes={"industry": None}, ), ], ) ], ), options=ExtractOptions( max_chunk_chars=900, max_passes=2, batch_concurrency=8, fuzzy_alignment_threshold=0.82, stop_when_no_new_extractions=True, ), runtime=RuntimeConfig(model="deepseek/deepseek-chat"), ) result = sourcery.extract(request) for doc in result.documents: print(doc.document_id, len(doc.extractions), len(doc.canonical_claims)) ``` ## 3) Extract directly from sources (text/file/PDF/HTML/URL) ``` from pathlib import Path from pydantic import BaseModel import sourcery from sourcery.contracts import ( EntitySchemaSet, EntitySpec, ExtractionExample, ExtractionTask, ExampleExtraction, RuntimeConfig, ) class ClaimAttrs(BaseModel): category: str | None = None task = ExtractionTask( instructions="Extract factual claims.", schema=EntitySchemaSet( entities=[EntitySpec(name="claim", attributes_model=ClaimAttrs)] ), examples=[ ExtractionExample( text="Revenue increased in 2025.", extractions=[ ExampleExtraction( entity="claim", text="Revenue increased in 2025", attributes={"category": "finance"}, ) ], ) ], ) result = sourcery.extract_from_sources( [ "Inline text source", Path("./docs/input.txt"), Path("./docs/report.pdf"), Path("./docs/page.html"), "https://example.com/report", ], task=task, runtime=RuntimeConfig(model="deepseek/deepseek-chat"), ) print(result.metrics.documents_total) ``` Notes: - PDF ingestion requires `pypdf`. - Image ingestion uses VLM OCR via any vision model (see section 10). ## 4) Async extraction ``` import asyncio from pydantic import BaseModel import sourcery from sourcery.contracts import ( EntitySchemaSet, EntitySpec, ExtractRequest, ExtractionExample, ExtractionTask, ExampleExtraction, RuntimeConfig, ) class PersonAttrs(BaseModel): role: str | None = None async def main() -> None: request = ExtractRequest( documents="Dana is VP Engineering.", task=ExtractionTask( instructions="Extract people.", schema=EntitySchemaSet( entities=[EntitySpec(name="person", attributes_model=PersonAttrs)] ), examples=[ ExtractionExample( text="Eve is CFO.", extractions=[ ExampleExtraction(entity="person", text="Eve", attributes={"role": "CFO"}) ], ) ], ), runtime=RuntimeConfig(model="deepseek/deepseek-chat"), ) result = await sourcery.aextract(request) print(result.metrics.extracted_total) asyncio.run(main()) ``` ## 5) Reliability controls: retry, session refinement, reconciliation ``` from pydantic import BaseModel import sourcery from sourcery.contracts import ( EntitySchemaSet, EntitySpec, ExtractRequest, ExtractionExample, ExtractionTask, ExampleExtraction, ReconciliationConfig, RetryPolicy, RuntimeConfig, SessionRefinementConfig, ) class EventAttrs(BaseModel): severity: str | None = None request = ExtractRequest( documents="Outage started at 09:10 UTC. Service recovered at 09:42 UTC.", task=ExtractionTask( instructions="Extract operational events.", schema=EntitySchemaSet( entities=[EntitySpec(name="event", attributes_model=EventAttrs)] ), examples=[ ExtractionExample( text="Incident started at 10:00.", extractions=[ ExampleExtraction( entity="event", text="Incident started at 10:00", attributes={"severity": "high"}, ) ], ) ], ), runtime=RuntimeConfig( model="deepseek/deepseek-chat", retry=RetryPolicy( max_attempts=4, initial_backoff_seconds=0.8, max_backoff_seconds=10.0, backoff_multiplier=2.0, retry_on_rate_limit=True, retry_on_transient_errors=True, auto_resume_paused_runs=True, max_pause_resumes=5, ), session_refinement=SessionRefinementConfig( enabled=True, max_turns=2, context_chars=400, ), reconciliation=ReconciliationConfig( enabled=True, use_workforce=True, min_mentions_for_claim=1, max_claims=100, ), ), ) result = sourcery.extract(request) print(result.warnings) ``` ## 6) Save/load JSONL + reviewer HTML ``` from pathlib import Path from pydantic import BaseModel import sourcery from sourcery.contracts import ( EntitySchemaSet, EntitySpec, ExtractRequest, ExtractionExample, ExtractionTask, ExampleExtraction, RuntimeConfig, ) from sourcery.io import load_document_results_jsonl, save_extract_result_jsonl, write_reviewer_html class PersonAttrs(BaseModel): role: str | None = None request = ExtractRequest( documents="Alice is CEO. Bob is CTO.", task=ExtractionTask( instructions="Extract people.", schema=EntitySchemaSet( entities=[EntitySpec(name="person", attributes_model=PersonAttrs)] ), examples=[ ExtractionExample( text="Carol is CFO.", extractions=[ ExampleExtraction(entity="person", text="Carol", attributes={"role": "CFO"}) ], ) ], ), runtime=RuntimeConfig(model="deepseek/deepseek-chat"), ) result = sourcery.extract(request) out_dir = Path("./output") out_dir.mkdir(parents=True, exist_ok=True) jsonl_path = out_dir / "result.jsonl" html_path = out_dir / "reviewer.html" save_extract_result_jsonl(result, jsonl_path) loaded_docs = load_document_results_jsonl(jsonl_path) write_reviewer_html(loaded_docs[0], html_path, title="Extraction Review") print(jsonl_path, html_path) ``` ## 7) Notebook/HTML visualization ``` from sourcery.io import visualize # From JSONL path (returns HTML object in notebook, raw HTML string otherwise) content = visualize("./output/result.jsonl", animation_speed=0.8, show_legend=True) print(type(content)) ``` ## 8) Replay a BlackGeorge run from provenance ``` from pydantic import BaseModel from sourcery.contracts import ( EntitySchemaSet, EntitySpec, ExtractRequest, ExtractionExample, ExtractionTask, ExampleExtraction, RuntimeConfig, ) from sourcery.runtime import SourceryEngine class PersonAttrs(BaseModel): role: str | None = None engine = SourceryEngine() request = ExtractRequest( documents="Alice is CEO.", task=ExtractionTask( instructions="Extract people.", schema=EntitySchemaSet( entities=[EntitySpec(name="person", attributes_model=PersonAttrs)] ), examples=[ ExtractionExample( text="Bob is CTO.", extractions=[ ExampleExtraction(entity="person", text="Bob", attributes={"role": "CTO"}) ], ) ], ), runtime=RuntimeConfig(model="deepseek/deepseek-chat"), ) result = engine.extract(request) raw_run_id = None for doc in result.documents: for extraction in doc.extractions: if extraction.provenance.raw_run_id: raw_run_id = extraction.provenance.raw_run_id break if raw_run_id: break if raw_run_id: replay_payload, replay_events = engine.replay_run(request, raw_run_id) print(replay_payload) print(len(replay_events)) ``` ## 9) Error handling with typed exceptions ``` from sourcery.exceptions import ( ExampleValidationError, SourceryProviderError, SourceryRateLimitError, SourceryRetryExhaustedError, ) try: # call sourcery.extract(...) pass except ExampleValidationError as exc: print("Example alignment failed:", exc) except SourceryRateLimitError as exc: print("Provider rate-limited:", exc) except SourceryRetryExhaustedError as exc: print("Retries exhausted after", exc.attempts, "attempts") except SourceryProviderError as exc: print("Provider/runtime error:", exc) ``` ## 10) VLM image OCR ``` from sourcery.contracts import RuntimeConfig from sourcery.ingest import BlackGeorgeVLMOCRBackend, load_vlm_ocr_document backend = BlackGeorgeVLMOCRBackend( RuntimeConfig(model="provider/vision-model", temperature=0.0), ) doc = load_vlm_ocr_document("invoice.png", backend=backend) # doc is a SourceDocument — feed it into extract() like any other document ``` Custom backend for any VLM: ``` from sourcery.ingest import VLMOCRBackend class MyBackend: def extract_text(self, *, image_path, prompt=None): return call_your_vision_model(image_path, prompt) assert isinstance(MyBackend(), VLMOCRBackend) doc = load_vlm_ocr_document("scan.png", backend=MyBackend()) ``` ## 11) Streaming extraction ``` from sourcery.contracts import StreamExtractionAdded, StreamChunkDone, StreamPassDone from sourcery.runtime import SourceryEngine engine = SourceryEngine() for event in engine.extract_stream(request): if isinstance(event, StreamExtractionAdded): print(f"New: {event.extraction.entity}='{event.extraction.text}'") elif isinstance(event, StreamChunkDone): print(f"Chunk {event.chunk_id} done ({event.candidates_found} candidates)") elif isinstance(event, StreamPassDone): print(f"Pass {event.pass_id} done (+{event.additions_this_pass})") # The generator returns ExtractResult through StopIteration.value if manually consumed. ``` ## 12) Benchmark command ``` uv run sourcery-benchmark \ --text-types english,japanese,french,spanish \ --max-chars 4500 \ --max-passes 2 \ --batch-concurrency 4 \ --sourcery-model deepseek/deepseek-chat ``` Compatibility wrapper: ``` uv run benchmark_compare.py --text-types english ``` # Sourcery Usage Guide for Python LLM Extraction ## What Sourcery Is Sourcery is both: 1. A **Python library** you import (`import sourcery`) to run schema-first LLM extraction. 1. A **reference project** with ingestion adapters, HTML reviewer UI, and runnable integration scripts. Use it as a library inside your app, and use this repository as a production template for typed, source-grounded document extraction. ## When To Use Sourcery Use Sourcery when you need: - typed extraction contracts (Pydantic models), - grounded spans (`char_start`, `char_end`) for every extraction, - deterministic chunking/alignment/merge behavior, - optional document-level reconciliation into canonical claims, - human review/export workflows. ## Install Python requirement: `>=3.12` Minimal runtime: ``` uv sync ``` With ingestion adapters (PDF/URL/HTML): ``` uv sync --extra ingest ``` With dev tooling: ``` uv sync --extra dev --extra ingest ``` Set provider credentials for the model route you use in `RuntimeConfig.model` (for example `DEEPSEEK_API_KEY`, `OPENAI_API_KEY`, `ANTHROPIC_API_KEY`, etc.). ## Core Public API Import-level API (`sourcery/__init__.py`): 1. `extract(request: ExtractRequest, engine: SourceryEngine | None = None) -> ExtractResult` 1. `aextract(request: ExtractRequest, engine: SourceryEngine | None = None) -> ExtractResult` 1. `extract_from_sources(sources, *, task, runtime, options=None, engine=None) -> ExtractResult` 1. `aextract_from_sources(...) -> ExtractResult` 1. `SourceryEngine` with `.extract(...)`, `.aextract(...)`, `.extract_stream(...)`, `.replay_run(...)` Streaming event contracts live in `sourcery.contracts`: - `StreamExtractionAdded` - `StreamChunkDone` - `StreamPassDone` ## Data Contracts You Define ### 1) `EntitySpec` - `name: str` - `attributes_model: type[BaseModel]` ### 2) `EntitySchemaSet` - `entities: list[EntitySpec]` ### 3) `ExtractionTask` - `instructions: str` - `schema: EntitySchemaSet` - `examples: list[ExtractionExample]` - `strict_example_alignment: bool = True` ### 4) `ExtractRequest` - `documents: list[SourceDocument] | str` - `task: ExtractionTask` - `options: ExtractOptions = ExtractOptions()` - `runtime: RuntimeConfig` ### 5) `ExtractResult` - `documents: list[DocumentResult]` - `run_trace: ExtractionRunTrace` - `metrics: RunMetrics` - `warnings: list[str]` `DocumentResult` includes: - `extractions: list[AlignedExtraction]` - `canonical_claims: list[CanonicalClaim]` ## Runtime Config (`RuntimeConfig`) Required: - `model: str` Core options: - `temperature: float = 0.0` - `max_tokens: int | None = None` - `stream: bool = False` - `storage_dir: str = ".sourcery"` - `respect_context_window: bool = True` Reliability: - `retry: RetryPolicy` - `max_attempts=3` - `initial_backoff_seconds=0.75` - `max_backoff_seconds=8.0` - `backoff_multiplier=2.0` - `retry_on_rate_limit=True` - `retry_on_transient_errors=True` - `auto_resume_paused_runs=True` - `max_pause_resumes=5` Session refinement (optional): - `session_refinement: SessionRefinementConfig` - `enabled=False` - `max_turns=1` - `context_chars=320` Document-level reconciliation (optional): - `reconciliation: ReconciliationConfig` - `enabled=False` - `use_workforce=True` - `min_mentions_for_claim=1` - `max_claims=200` `RuntimeConfig.stream` is passed to the underlying BlackGeorge runtime/provider. For Sourcery-level chunk progress, use `SourceryEngine.extract_stream(...)`. ## Extraction Options (`ExtractOptions`) - `max_chunk_chars=1200` - `context_window_chars=200` - `max_passes=2` - `batch_concurrency=16` - `enable_fuzzy_alignment=True` - `fuzzy_alignment_threshold=0.82` - `accept_partial_exact=False` - `stop_when_no_new_extractions=True` - `allow_unresolved=False` ## Minimal Example (Inline Text) ``` from pydantic import BaseModel import sourcery from sourcery.contracts import ( EntitySchemaSet, EntitySpec, ExtractRequest, ExtractionExample, ExtractionTask, ExampleExtraction, RuntimeConfig, ) class PersonAttrs(BaseModel): role: str | None = None request = ExtractRequest( documents="Alice is the CEO of Acme.", task=ExtractionTask( instructions="Extract person entities.", schema=EntitySchemaSet( entities=[EntitySpec(name="person", attributes_model=PersonAttrs)] ), examples=[ ExtractionExample( text="Bob is the CTO.", extractions=[ ExampleExtraction(entity="person", text="Bob", attributes={"role": "CTO"}) ], ) ], ), runtime=RuntimeConfig(model="deepseek/deepseek-chat"), ) result = sourcery.extract(request) print(result.metrics.model_dump(mode="json")) for ext in result.documents[0].extractions: print(ext.entity, ext.text, ext.char_start, ext.char_end, ext.alignment_status) ``` Notebook equivalent: `examples/notebooks/sourcery_quickstart.ipynb` ## Extract From Files / PDFs / URLs / Images Use the source-based helper: ``` result = sourcery.extract_from_sources( ["1706.03762v7.pdf", "https://example.com/article.html"], task=task, runtime=RuntimeConfig(model="deepseek/deepseek-chat"), ) ``` Supported ingestion via `load_source_document(s)`: 1. Inline text 1. Text files 1. PDF files (`pypdf`) 1. HTML files / raw HTML 1. URLs 1. Image files via VLM OCR (`sourcery.ingest.load_vlm_ocr_document`) Notes: - PDF loader is text-extraction first (`pypdf`). - Image ingestion uses any vision-language model through blackgeorge multimodal support. Notebook equivalent: `examples/notebooks/sourcery_pdf_workflow.ipynb` ## Async Usage ``` result = await sourcery.aextract(request) ``` ## Streaming Extraction `extract_stream(...)` yields chunk-level Sourcery events as extraction work completes. It is result/progress streaming, not token streaming. ``` from sourcery.contracts import StreamChunkDone, StreamExtractionAdded, StreamPassDone from sourcery.runtime import SourceryEngine engine = SourceryEngine() for event in engine.extract_stream(request): if isinstance(event, StreamExtractionAdded): print(event.document_id, event.extraction.entity, event.extraction.text) elif isinstance(event, StreamChunkDone): print("chunk done:", event.chunk_id, event.candidates_found) elif isinstance(event, StreamPassDone): print("pass done:", event.pass_id, event.additions_this_pass) ``` ## Advanced Engine Usage ``` from sourcery.runtime import SourceryEngine engine = SourceryEngine() result = engine.extract(request) raw_run_id = result.documents[0].extractions[0].provenance.raw_run_id if raw_run_id: replay, events = engine.replay_run(request, raw_run_id) print(replay["status"] if replay else None, len(events)) ``` ## Enabling Reconciliation + Session Refinement ``` runtime = RuntimeConfig( model="deepseek/deepseek-chat", session_refinement={"enabled": True, "max_turns": 1, "context_chars": 320}, reconciliation={"enabled": True, "use_workforce": True, "max_claims": 100}, ) ``` What this does: 1. Session refinement adds multi-turn continuity hints per chunk. 1. Reconciliation runs document-level resolver workflow and returns `canonical_claims`. ## Outputs and Review ### Save JSONL ``` from sourcery.io import save_extract_result_jsonl save_extract_result_jsonl(result, "output/result.jsonl") ``` ### Generate HTML viewer ``` from sourcery.io import write_document_html write_document_html(result.documents[0], "output/document.viewer.html") ``` ### Generate reviewer UI ``` from sourcery.io import write_reviewer_html write_reviewer_html(result.documents[0], "output/document.reviewer.html") ``` Reviewer supports: - search, - entity/status filters, - approve/reject/reset, - export approved JSONL/CSV. ## Scripted End-to-End Runs ### Benchmark comparison wrapper ``` uv run benchmark_compare.py --text-types english ``` ## Error Model Important exception classes (`sourcery/exceptions.py`): - `SourceryError` - `SourceryRuntimeError` - `SourceryProviderError` - `SourceryRateLimitError` - `SourceryRetryExhaustedError` - `SourceryPausedRunError` - `SourceryPipelineError` - `SourceryIngestionError` - `SourceryDependencyError` ## Validation Commands ``` uv run --extra dev pytest -q uv run --extra dev ruff check sourcery tests uv run --extra dev mypy sourcery ``` ## Production Notes 1. Treat schemas as API contracts and version them. 1. Start with strict examples and deterministic options. 1. Enable reconciliation for long documents where alias/coreference matters. 1. Keep reviewer approval in-the-loop for high-stakes workflows. 1. Persist JSONL + run trace for audit and replay. # Tune Sourcery Runtime: Async, Streaming, Retries, and Reconciliation Use `RuntimeConfig` for provider/runtime behavior and `ExtractOptions` for deterministic pipeline controls. ## RuntimeConfig Required: - `model: str` Common: - `temperature` - `max_tokens` - `stream` - `storage_dir` - `respect_context_window` `RuntimeConfig.stream` is passed through to the BlackGeorge runtime/provider. For application-level progress events, use `SourceryEngine.extract_stream(...)`. Retry policy (`retry`): - `max_attempts` - `initial_backoff_seconds` - `max_backoff_seconds` - `backoff_multiplier` - `retry_on_rate_limit` - `retry_on_transient_errors` - `auto_resume_paused_runs` - `max_pause_resumes` Optional workflows: - `session_refinement`: per-document session context across chunks - `reconciliation`: canonical claims per document ## ExtractOptions - `max_chunk_chars` - `context_window_chars` - `max_passes` - `batch_concurrency` - `enable_fuzzy_alignment` - `fuzzy_alignment_threshold` - `accept_partial_exact` - `stop_when_no_new_extractions` - `allow_unresolved` ## Behavior That Affects Output - If `allow_unresolved=False` (default), unresolved candidates are counted in metrics but not returned in `documents[*].extractions`. - If `strict_example_alignment=True` (default in `ExtractionTask`) and examples are unresolved, extraction raises `ExampleValidationError` before runtime execution. - `SourceryEngine.aextract(...)` uses native async runtime calls. - `SourceryEngine.extract_stream(...)` emits chunk/pass result events and still returns the same final `ExtractResult` when exhausted. - If reconciliation workforce fails at runtime, the engine falls back to deterministic canonical-claim construction and records warnings. ## Practical Baseline ``` from sourcery.contracts import ExtractOptions, RuntimeConfig runtime = RuntimeConfig( model="deepseek/deepseek-chat", temperature=0.0, ) options = ExtractOptions( max_chunk_chars=1200, context_window_chars=200, max_passes=2, batch_concurrency=16, stop_when_no_new_extractions=True, allow_unresolved=False, ) ``` ## Throughput-Oriented Profile ``` options = ExtractOptions( max_chunk_chars=1800, context_window_chars=120, max_passes=1, batch_concurrency=32, stop_when_no_new_extractions=True, ) ``` ## Quality-Oriented Profile ``` options = ExtractOptions( max_chunk_chars=900, context_window_chars=280, max_passes=3, enable_fuzzy_alignment=True, fuzzy_alignment_threshold=0.82, allow_unresolved=False, ) ``` ## Tuning Sequence 1. Freeze schema and examples first. 1. Set `temperature=0.0`. 1. Measure baseline metrics and warnings. 1. Increase `max_passes` only when extraction recall improves materially. 1. Increase concurrency only if provider limits and system resources allow it. # Load Text, PDFs, URLs, HTML, and Images into Sourcery Ingestion normalizes heterogeneous inputs into `SourceDocument`. Implementation lives in `sourcery/ingest/loaders.py`. ## Supported Source Types - Inline text - Local text-like files: `.txt`, `.md`, `.rst`, `.csv`, `.json`, `.jsonl`, `.yaml`, `.yml` - PDF files (`.pdf`, requires `pypdf`) - HTML files and raw HTML - HTTP/HTTPS URLs ## Primary APIs - `load_source_document(source, ...)` - `load_source_documents(sources, ...)` Extended APIs: - `load_pdf_document(path, ...)` - `load_html_document(source, raw_html=False, ...)` - `load_url_document(url, ...)` ## Examples Load one source automatically: ``` from sourcery.ingest import load_source_document doc = load_source_document("reports/q4.pdf") ``` Load multiple mixed sources: ``` from sourcery.ingest import load_source_documents documents = load_source_documents([ "notes.txt", "https://example.com/post", "Raw inline text to extract from", ]) ``` Load raw HTML string explicitly: ``` from sourcery.ingest.loaders import load_html_document doc = load_html_document("

Hello

", raw_html=True) ``` ## Failure Modes - Missing optional dependency -> `SourceryDependencyError` - Empty parsed content -> `SourceryIngestionError` - Invalid URL passed to `load_url_document(...)` -> `SourceryIngestionError` - Missing PDF/HTML path in dedicated loaders -> `SourceryIngestionError` ## VLM OCR Image-based text extraction via any vision-language model. Sourcery provides the interface; the model does the work. No OCR-specific dependencies required. ### Interface `VLMOCRBackend` is a protocol — implement `extract_text(*, image_path, prompt) -> str` and it works. Sourcery ships `BlackGeorgeVLMOCRBackend` which delegates to blackgeorge's multimodal worker. Swap the `runtime.model` to use any VLM your provider supports. ### Examples ``` from sourcery.contracts import RuntimeConfig from sourcery.ingest import BlackGeorgeVLMOCRBackend, load_vlm_ocr_document backend = BlackGeorgeVLMOCRBackend( RuntimeConfig(model="gemini/gemini-2.5-flash", temperature=0.0), ) doc = load_vlm_ocr_document("scan.png", backend=backend) ``` With a custom prompt for structured extraction: ``` doc = load_vlm_ocr_document( "invoice.jpg", backend=backend, prompt="Extract invoice number, date, total amount, and line items as a table.", ) ``` Batch processing: ``` from sourcery.ingest import load_vlm_ocr_documents docs = load_vlm_ocr_documents( ["page1.png", "page2.png", "page3.png"], backend=backend, ) ``` Custom backend for any VLM: ``` from sourcery.ingest import VLMOCRBackend class MyVLMBackend: def extract_text(self, *, image_path, prompt=None): # call your model here return extracted_text assert isinstance(MyVLMBackend(), VLMOCRBackend) # True doc = load_vlm_ocr_document("scan.png", backend=MyVLMBackend()) ``` ## Operational Notes - URL ingestion auto-detects PDF vs HTML vs plain text by content type and payload. - `load_source_document("missing/path.txt")` does not raise by default; because the path does not exist, it is treated as inline text. Use dedicated loaders when you require strict path existence checks. - VLM OCR loaders raise `SourceryIngestionError` on missing files or empty model output. # Sourcery Outputs: JSONL, HTML Viewer, and Reviewer UI Sourcery supports machine-friendly persistence and human review workflows. ## JSONL Persistence Write extraction output: ``` from sourcery.io import save_extract_result_jsonl save_extract_result_jsonl(result, "output/result.jsonl") ``` Load documents back: ``` from sourcery.io import load_document_results_jsonl documents = load_document_results_jsonl("output/result.jsonl") ``` Iterate rows directly: ``` from sourcery.io import iter_document_rows for row in iter_document_rows("output/result.jsonl"): print(row["document_id"], len(row["extractions"])) ``` ## Viewer HTML Generate read-only visualization with highlighted grounded spans: ``` from sourcery.io import write_document_html write_document_html(result.documents[0], "output/document.viewer.html") ``` Notebook/inline rendering helper: ``` from sourcery.io import visualize html_or_display = visualize(result.documents[0]) ``` `visualize("path/to/result.jsonl")` is also supported and uses the first document in the file. ## Reviewer HTML Generate interactive review UI from a `DocumentResult`: ``` from sourcery.io import write_reviewer_html write_reviewer_html(result.documents[0], "output/document.reviewer.html") ``` Generate reviewer directly from JSONL: ``` from sourcery.io import write_reviewer_html write_reviewer_html("output/result.jsonl", "output/document.reviewer.html") ``` When a JSONL path is provided, the reviewer uses the first document row. Reviewer capabilities: - search by text/attributes, - filter by entity and review status, - approve/reject/reset per extraction or filtered set, - export approved rows to JSONL and CSV, - persist review state in browser local storage. # Benchmark Sourcery Extraction Runs Sourcery provides a benchmark CLI: `sourcery-benchmark`. The current runner records Sourcery and LangExtract extraction metrics in the same run. ## Prerequisites ``` uv sync --extra benchmark ``` Also set provider credentials. Default model route (`deepseek/...`): ``` export DEEPSEEK_API_KEY="..." ``` OpenRouter route (`openrouter/...`): ``` export OPENROUTER_API_KEY="..." ``` ## Run Benchmark ``` uv run sourcery-benchmark \ --text-types english,japanese \ --max-chars 4500 \ --max-passes 2 \ --sourcery-model deepseek/deepseek-chat ``` ## Important Flags - `--text-types`: `english,japanese,french,spanish` - `--max-chars` - `--max-chunk-chars` - `--context-window-chars` - `--max-passes` - `--batch-concurrency` - `--temperature` - `--max-tokens` - `--retries` - `--retry-delay-seconds` - `--sourcery-model` - `--langextract-model` - `--deepseek-base-url` - `--openrouter-base-url` - `--output-dir` ## Output A timestamped JSON report in `benchmark_results/` containing: - benchmark settings, - tokenization rows, - framework summaries (`sourcery`, `langextract`), - per-language records, - error details for failed runs. ## Notes - `langextract[openai]` is required for the LangExtract side of the benchmark. - Model/provider connection settings are normalized internally from the selected `--sourcery-model` route. # Test and Validate Sourcery Run the full validation suite before merging changes. ## Full Suite ``` uv run --extra dev pytest -q uv run --extra dev ruff check sourcery tests uv run --extra dev mypy sourcery ``` ## Focused Test Runs ``` uv run --extra dev pytest -q tests/test_engine.py uv run --extra dev pytest -q tests/test_ingest.py uv run --extra dev pytest -q tests/test_blackgeorge_runtime.py ``` ## What Current Tests Cover - contract validation and boundary checks, - deterministic chunking/alignment/merge behavior, - runtime error classification and retry behavior, - BlackGeorge runtime refinement/reconciliation ordering, - API helpers and ingestion behavior, - JSONL + HTML output correctness, - benchmark utility behavior. ## Recommended Regression Pattern When fixing a bug: 1. Add a failing test that reproduces the issue. 1. Fix the implementation. 1. Run focused tests, then the full suite. # Architecture # Sourcery System Overview Sourcery is structured as replaceable runtime and pipeline components around stable typed contracts. ## Primary Primitives Defined in `sourcery/contracts/models.py`: - `SourceDocument` - `TextChunk` - `ExtractionCandidate` - `AlignedExtraction` - `DocumentResult` - `CanonicalClaim` - `ExtractRequest` - `ExtractResult` These primitives are the stable interface. Internal implementation can change without breaking user code if these contracts remain consistent. ## Module Boundaries - `sourcery/contracts`: request/result/runtime contracts. - `sourcery/pipeline`: deterministic chunking, prompt compilation, alignment, merge. - `sourcery/runtime`: model invocation orchestration and retries. - `sourcery/ingest`: source normalization into `SourceDocument`. - `sourcery/io`: JSONL persistence and HTML review surfaces. - `sourcery/observability`: run trace and event collection. - `sourcery/benchmarks`: benchmark CLI and comparative tooling. ## Execution Flow 1. Validate `ExtractRequest` and task examples. 1. Normalize input documents. 1. Plan chunks per extraction pass. 1. Execute runtime batch for chunks. 1. Align candidates to source spans. 1. Merge non-overlapping resolved extractions. 1. Optionally reconcile canonical claims. 1. Emit `ExtractResult` with metrics, warnings, and run trace. Sync, async, and streaming entry points share this lifecycle. Streaming emits chunk/pass progress events while preserving the same final result shape. ## Determinism Notes Determinism is strongest in pipeline logic (`chunking`, `aligner`, `merger`). Runtime behavior may vary with provider/model behavior, but deterministic options plus strict examples reduce drift. # Sourcery Pipeline Internals: Chunking, Alignment, and Merge Pipeline modules are deterministic and side-effect free. ## `chunking.py` Responsibilities: - Segment document text into stable chunk windows. - Preserve global offsets (`char_start`, `char_end`) in each `TextChunk`. - Attach optional previous context for continuity. Key operational knobs from `ExtractOptions`: - `max_chunk_chars` - `context_window_chars` `plan_chunks(...)` receives `pass_id` for chunk identity; pass iteration (`max_passes`) is controlled by `SourceryEngine`. ## `prompt_compiler.py` Responsibilities: - Build system/user prompt envelopes for runtime workers. - Serialize entity schema and examples into a stable payload format. - Inject pass/chunk context. Output primitive: - `PromptEnvelope(system, user, schema_payload)` ## `aligner.py` Responsibilities: - Map runtime `ExtractionCandidate` values back onto source text. - Produce `AlignedExtraction` with offsets and alignment status. Resolution strategy: 1. Exact span match 1. Fuzzy match (optional) 1. Partial exact fallback (optional) 1. Unresolved when match fails ## `merger.py` Responsibilities: - Merge aligned extractions per document. - Resolve overlaps deterministically. Winner precedence favors stronger grounding and stable ordering to maintain reproducibility across runs. ## `example_validator.py` Responsibilities: - Validate that each example extraction is alignable in its example text. - Emit issues with statuses (`fuzzy`, `unresolved`) when exact span matching fails. - Raise `ExampleValidationError` when strict mode requires it. # Sourcery Runtime Internals: Async, Streaming, and Replay ## Engine `SourceryEngine` (`sourcery/runtime/engine.py`) orchestrates the extraction lifecycle: - runtime construction, - pass scheduling, - chunk runtime execution, - alignment and merge, - optional reconciliation, - metrics and trace finalization. Public entry points: - `SourceryEngine.extract(request)` - `SourceryEngine.aextract(request)` - `SourceryEngine.extract_stream(request)` - `SourceryEngine.replay_run(request, raw_run_id)` `extract_stream(...)` is Sourcery-level result streaming: chunks still run through the normal runtime path, but the engine emits `StreamExtractionAdded`, `StreamChunkDone`, and `StreamPassDone` events as merged results become available. ## Runtime Boundary Protocols in `sourcery/runtime/base.py` define black-box contracts: - `ChunkRuntime` - `DocumentReconciliationRuntime` - `AsyncDocumentReconciliationRuntime` Any runtime implementation that satisfies these interfaces can be swapped in. `EngineDependencies` wires the runtime factory, prompt compiler, example validator, chunk planner, aligner, merger, and trace collector. This keeps the engine testable without making those dependency hooks decorative. ## BlackGeorge Runtime Composition `BlackGeorgeRuntime` combines focused mixins: - `blackgeorge_retry_mixin.py`: retry/backoff and paused-run resume. - `blackgeorge_refinement_mixin.py`: per-document session refinement contexts. - `blackgeorge_flow_mixin.py`: chunk flow execution and report normalization. - `blackgeorge_reconciliation_mixin.py`: document-level reconciliation workforce. `model_gateway.py` builds per-entity response schema variants and parses structured candidate output. ## Observability and Replay - Runtime subscribes to desk event bus (`run.*`, `worker.*`, `step.*`, `llm.*`, `tool.*`). - Events are normalized to `EventRecord` and attached to `ExtractionRunTrace`. - `replay_run` reads raw run data/events from run store for audits and debugging. ## Reconciliation Fallback Behavior When reconciliation is enabled: 1. Deterministic fallback canonical claims are prepared first. 1. Workforce reconciliation is attempted if `use_workforce=True`. 1. If workforce fails with `SourceryRuntimeError`, engine returns fallback claims and warning text. 1. Non-Sourcery unexpected exceptions are propagated. # Reference # Sourcery Public API Reference Top-level exports are defined in `sourcery/__init__.py`. ## Core Functions ``` extract(request, engine=None) -> ExtractResult aextract(request, engine=None) -> ExtractResult extract_from_sources(sources, *, task, runtime, options=None, engine=None) -> ExtractResult aextract_from_sources(sources, *, task, runtime, options=None, engine=None) -> ExtractResult ``` ## Runtime Entry Points ``` SourceryEngine.extract(request) -> ExtractResult SourceryEngine.aextract(request) -> ExtractResult SourceryEngine.extract_stream(request) -> Generator[StreamEvent, None, ExtractResult] SourceryEngine.replay_run(request, raw_run_id) -> tuple[dict[str, object] | None, list[EventRecord]] ``` ## Top-Level Contract Exports (`import sourcery`) - `EntitySpec`, `EntitySchemaSet` - `ExtractionTask`, `ExtractionExample`, `ExampleExtraction` - `ExtractRequest`, `ExtractOptions`, `ExtractResult` - `RuntimeConfig`, `RetryPolicy`, `SessionRefinementConfig`, `ReconciliationConfig` - `AlignedExtraction`, `CanonicalClaim`, `DocumentResult`, `DocumentReconciliationReport` - `ExtractionRunTrace`, `RunMetrics`, `SourceDocument` Additional contracts from `sourcery.contracts`: - `StreamExtractionAdded`, `StreamChunkDone`, `StreamPassDone` - `EngineDependencies` ## Ingestion Exports From `sourcery.ingest`: - `load_source_document(...)` - `load_source_documents(...)` - `load_pdf_document(...)` - `load_html_document(...)` - `load_url_document(...)` - `load_vlm_ocr_document(...)` - `load_vlm_ocr_documents(...)` - `VLMOCRBackend` — protocol for custom VLM OCR backends - `BlackGeorgeVLMOCRBackend(...)` — blackgeorge multimodal implementation Top-level shortcut (`import sourcery`) includes only: - `load_source_document(...)` - `load_source_documents(...)` ## IO Exports From `sourcery.io`: - `save_extract_result_jsonl(...)` - `iter_document_rows(...)` - `load_document_results_jsonl(...)` - `render_document_html(...)` - `write_document_html(...)` - `render_reviewer_html(...)` - `write_reviewer_html(...)` - `visualize(...)` Top-level shortcut (`import sourcery`) includes: - `write_reviewer_html(...)` ## Convenience Example ``` import sourcery from sourcery.contracts import ExtractionTask, RuntimeConfig result = sourcery.extract_from_sources( ["sample.pdf", "https://example.com/article"], task=ExtractionTask(...), runtime=RuntimeConfig(model="deepseek/deepseek-chat"), ) ``` # Sourcery Data Contracts and Pydantic Schemas All contracts are defined in `sourcery/contracts/models.py`. ## Request Contracts - `EntitySpec`: entity name + Pydantic attributes model. - `EntitySchemaSet`: unique list of entity specs. - `ExtractionExample`: few-shot text and expected extractions. - `ExtractionTask`: instructions + schema + examples + `strict_example_alignment`. - `ExtractOptions`: deterministic pipeline controls. - `RuntimeConfig`: model/runtime/retry/refinement/reconciliation settings. - `ExtractRequest`: full extraction input. `ExtractRequest.documents` accepts: - `str` (single inline document), or - `list[SourceDocument]`. ## Runtime/Pipeline Contracts - `TextChunk` - `ExtractionCandidate` - `ChunkRuntimeInput` - `ChunkExtractionReport` - `PromptEnvelope` ## Result Contracts - `AlignedExtraction` - `CanonicalClaim` - `DocumentResult` - `DocumentReconciliationReport` - `RunMetrics` - `ExtractionRunTrace` - `ExtractResult` ## Event Contracts - `EventRecord` - `ExtractionProvenance` - `StreamExtractionAdded` - `StreamChunkDone` - `StreamPassDone` Streaming events are emitted by `SourceryEngine.extract_stream(...)` as chunk results land. They report added extractions, completed chunks, and completed passes; the final `ExtractResult` is returned when the generator is exhausted. ## Validation Guarantees Contracts enforce: - non-empty text and entity names, - valid char/token offset ranges, - unique schema entity names, - non-empty `ExtractionTask.examples`, - threshold bounds (`fuzzy_alignment_threshold`, retry/reconciliation limits), - model route non-empty (`runtime.model`). ## Minimal Contract Example ``` from pydantic import BaseModel from sourcery.contracts import EntitySchemaSet, EntitySpec class CompanyAttrs(BaseModel): sector: str | None = None schema = EntitySchemaSet( entities=[EntitySpec(name="company", attributes_model=CompanyAttrs)] ) ``` # Sourcery Error Model Exceptions are defined in `sourcery/exceptions.py`. ## Base Class - `SourceryError` Catch this only at process boundaries where generic Sourcery failure handling is acceptable. ## Runtime and Provider - `SourceryRuntimeError` - `SourceryProviderError` - `SourceryRateLimitError` - `SourceryPausedRunError` - `SourceryRetryExhaustedError` - `RuntimeIntegrationError` Runtime/provider exceptions may include `exc.context` (`run_id`, `pass_id`, `chunk_id`, `model`, `provider`). `SourceryRetryExhaustedError` includes `.attempts`. ## Pipeline - `SourceryPipelineError` - `ExampleValidationError` Use these to distinguish deterministic task/schema/pipeline problems from runtime/provider failures. ## Ingestion - `SourceryIngestionError` - `SourceryDependencyError` Dependency errors indicate missing optional packages (`pypdf`). ## Runtime Classification Behavior `runtime/errors.py` classifies provider error text into: - rate-limit markers (`429`, `rate limit`, `too many requests`, `quota`), - transient markers (`timeout`, `503`, `502`, `connection reset`, etc.). This classification drives retry decisions in BlackGeorge runtime mixins. ## Recommended Handling Pattern ``` import sourcery from sourcery.exceptions import ( ExampleValidationError, SourceryDependencyError, SourceryRateLimitError, SourceryRetryExhaustedError, SourceryRuntimeError, ) try: result = sourcery.extract(request) except ExampleValidationError as exc: print("Fix task examples:", exc) except SourceryDependencyError as exc: print("Install missing optional dependency:", exc) except SourceryRateLimitError as exc: print("Provider rate limited request:", exc) except SourceryRetryExhaustedError as exc: print("Retry policy exhausted after", exc.attempts, "attempts") except SourceryRuntimeError as exc: print("Runtime failure:", exc, "context:", exc.context) ``` # Sourcery File-by-File Map This map links each tracked module to its responsibility. ## Root Package - `sourcery/__init__.py`: public export surface. - `sourcery/api.py`: top-level extraction convenience functions. - `sourcery/exceptions.py`: typed exception taxonomy. ## Contracts - `sourcery/contracts/models.py`: all request/runtime/result primitives. - `sourcery/contracts/__init__.py`: contracts re-export surface. ## Pipeline - `sourcery/pipeline/chunking.py`: chunk planning and offsets. - `sourcery/pipeline/prompt_compiler.py`: prompt envelope generation. - `sourcery/pipeline/aligner.py`: candidate-to-text grounding. - `sourcery/pipeline/merger.py`: overlap resolution. - `sourcery/pipeline/example_validator.py`: few-shot validation. - `sourcery/pipeline/__init__.py`: pipeline re-exports. ## Runtime - `sourcery/runtime/base.py`: runtime protocol contracts. - `sourcery/runtime/interfaces.py`: protocol re-export. - `sourcery/runtime/engine.py`: extraction orchestration. - `sourcery/runtime/errors.py`: provider error classification. - `sourcery/runtime/model_gateway.py`: schema + parser bridge. - `sourcery/runtime/blackgeorge_models.py`: runtime payload adapters. - `sourcery/runtime/blackgeorge_protocols.py`: internal runtime typing. - `sourcery/runtime/blackgeorge_retry_mixin.py`: retry/backoff/pause logic. - `sourcery/runtime/blackgeorge_refinement_mixin.py`: refinement context handling. - `sourcery/runtime/blackgeorge_flow_mixin.py`: chunk extraction flow. - `sourcery/runtime/blackgeorge_reconciliation_mixin.py`: canonical-claim workflow. - `sourcery/runtime/blackgeorge_runtime.py`: composed runtime implementation. - `sourcery/runtime/__init__.py`: runtime public exports. ## Ingestion - `sourcery/ingest/loaders.py`: all source loaders. - `sourcery/ingest/__init__.py`: ingestion public exports. ## IO - `sourcery/io/jsonl.py`: JSONL persistence helpers. - `sourcery/io/visualization.py`: read-only visual viewer rendering. - `sourcery/io/reviewer.py`: interactive reviewer rendering and export. - `sourcery/io/__init__.py`: IO export surface. ## Observability - `sourcery/observability/trace.py`: run event collection and trace finalization. - `sourcery/observability/__init__.py`: observability exports. ## Benchmarks - `sourcery/benchmarks/config.py`: benchmark config constants. - `sourcery/benchmarks/gutenberg.py`: text sampling helpers. - `sourcery/benchmarks/run.py`: benchmark CLI implementation. - `sourcery/benchmarks/__init__.py`: benchmark exports. ## Test and Utility Scripts - `benchmark_compare.py`: thin CLI entry wrapper for benchmark run. ## Tests - `tests/conftest.py`: shared fixtures and fake runtimes. - `tests/test_contracts.py`: contract validation. - `tests/test_chunking.py`: chunk planning behavior. - `tests/test_aligner.py`: alignment behavior. - `tests/test_merger.py`: merge precedence behavior. - `tests/test_example_validator.py`: example alignment validation. - `tests/test_engine.py`: engine orchestration behavior. - `tests/test_api.py`: public API function behavior. - `tests/test_ingest.py`: ingestion loader behavior. - `tests/test_io.py`: JSONL/viewer/reviewer behavior. - `tests/test_runtime_errors.py`: runtime error classifier behavior. - `tests/test_model_gateway.py`: response parsing behavior. - `tests/test_blackgeorge_runtime.py`: runtime mixin regressions. - `tests/test_benchmarks.py`: benchmark utility behavior.