Skip to content

Streaming

CRP supports streaming token output via dispatch_stream(), letting you display tokens as they arrive while CRP runs extraction in the background.

Basic Streaming

import crp

session = crp.init(provider="ollama", model="qwen3-4b")

events = session.dispatch_stream(
    task="Explain the CAP theorem"
)

for event in events:
    if event.type == "token":
        print(event.data, end="", flush=True)
    elif event.type == "extraction":
        pass  # Background extraction happening
    elif event.type == "continuation":
        print("\n--- New window ---")
    elif event.type == "done":
        report = event.data
        print(f"\n\nQuality: {report.quality_tier}")
        print(f"Facts: {report.facts_extracted}")

Stream Event Types

Event Type Data When
token String (one or more characters) Each token arrives
extraction Extraction progress Background extraction runs
continuation Window metadata New continuation window starts
window_complete Window stats A window finishes
done QualityReport All generation complete
error Error details Something went wrong

Stream-Augmented Dispatch

dispatch_stream_augmented() combines streaming with real-time extraction — facts are extracted and fed back into the prompt as tokens arrive:

events = session.dispatch_stream_augmented(
    task="Write a security audit report for a web application"
)

for event in events:
    if event.type == "token":
        print(event.data, end="", flush=True)
    elif event.type == "extraction":
        # Real-time fact extraction happening
        print(f"\n  [Extracted: {event.data.fact_count} facts]")
    elif event.type == "done":
        print(f"\nFinal quality: {event.data.quality_tier}")

Streaming with Continuation

Streaming works with continuation — when the model hits its output wall, CRP starts a new window and continues streaming:

events = session.dispatch_stream(
    task="Write a complete Python web framework tutorial",
    max_continuations=5
)

window = 1
for event in events:
    if event.type == "token":
        print(event.data, end="", flush=True)
    elif event.type == "continuation":
        window += 1
        print(f"\n[Window {window}]")
    elif event.type == "done":
        print(f"\nTotal windows: {window}")
        print(f"Quality: {event.data.quality_tier}")

When to Use Streaming

Use Case Recommended
Interactive CLI/chat dispatch_stream()
Real-time web UI dispatch_stream()
Background processing dispatch() (non-streaming)
Extraction-heavy tasks dispatch_stream_augmented()
Batch processing dispatch() (non-streaming)