Skip to content

Sending Content to Models

You want to send content (text, files, videos, or structured data) to an LLM and get answers back. Three things to know: how to wrap your input as a Source, which entry point to call, and how to read the result.

Boundary

Pollux owns: uploading and caching source content, attaching it to provider API calls, running prompts concurrently, and normalizing responses into a stable ResultEnvelope.

You own: choosing what to analyze, writing prompts, and processing the returned answers.

Source Constructors

Constructor Input Notes
Source.from_text(text) Plain string Identifier defaults to first 50 chars
Source.from_file(path) Local file path Supports PDF, images, video, audio, text
Source.from_youtube(url) YouTube URL URL reference (no download); Gemini-native, limited on OpenAI and Anthropic
Source.from_arxiv(ref) arXiv ID or URL Normalizes to canonical PDF URL (no download at construction time)
Source.from_uri(uri, mime_type=...) Remote URI Generic remote reference; provider support varies by MIME type
Source.from_json(data) Dict or Pydantic model instance Serializes via json.dumps(); calls model_dump() on Pydantic objects

Examples:

from pydantic import BaseModel
from pollux import Source

text    = Source.from_text("Caching reduces repeated token cost.")
paper   = Source.from_file("paper.pdf")
video   = Source.from_youtube("https://youtube.com/watch?v=dQw4w9WgXcQ")
arxiv   = Source.from_arxiv("2301.00001")
remote  = Source.from_uri("https://example.com/data.csv", mime_type="text/csv")

# Pass application data as context
metrics = {"revenue_q1": 4_200_000, "growth_pct": 12.5, "region": "EMEA"}
context = Source.from_json(metrics)

# Pydantic models work directly, model_dump() is called automatically
class UserProfile(BaseModel):
    name: str
    preferences: list[str]

profile = Source.from_json(UserProfile(name="Alice", preferences=["concise", "formal"]))

from_json is useful when you want to pass structured application data (API responses, database records, or configuration objects) as context alongside a prompt, without manually serializing to a string.

Pollux accepts PDFs, images, video, audio, and text files through the same interface. The source type is detected from the file extension or MIME type; you do not need to specify format-specific options. For media sources (images, video, audio), keep prompts concrete: ask for objects, attributes, timestamps, or quoted text rather than open-ended descriptions.

Gemini Video Controls

Gemini supports clip windows and custom frame sampling for video inputs. Pollux exposes these as an explicit Gemini-only source helper instead of a generic provider passthrough, so the public API stays stable even if Google's wire format changes.

Example: Gemini video clipping

import asyncio
from pollux import Config, Source, run

async def main() -> None:
    config = Config(provider="gemini", model="gemini-2.5-flash-lite")
    source = Source.from_file(
        "lecture.mp4", mime_type="video/mp4"
    ).with_gemini_video_settings(
        start_offset="33m0s",
        end_offset="34m10s",
        fps=1.0,
    )

    result = await run(
        "What claim does the lecturer make in this segment?",
        source=source,
        config=config,
    )
    print(result["answers"][0])

asyncio.run(main())

Pollux validates these settings up front and maps them onto Gemini's current video-processing request shape internally. See the Gemini video guide for the current provider behavior.

Use this helper only when you have consciously chosen Gemini as the provider. On non-Gemini providers, the source still works as a normal video source, but the Gemini-specific controls are ignored rather than translated to another provider-specific feature.

Single Prompt: run()

Start with the simplest case: one prompt, one source, one answer. run() takes one prompt and at most one source. Use it to tune prompt quality before scaling up.

import asyncio
from pollux import Config, Source, run

async def main() -> None:
    config = Config(provider="gemini", model="gemini-2.5-flash-lite")
    result = await run(
        "What are the main conclusions?",
        source=Source.from_file("paper.pdf"),
        config=config,
    )
    print(result["status"])   # "ok"
    print(result["answers"][0])

asyncio.run(main())

Output:

ok
The paper concludes that context caching reduces repeated token cost by up to
90% for fan-out workloads, with diminishing returns below 3 prompts per source.

Step-by-Step Walkthrough

  1. Create a Config. Specify the provider and model. Pollux resolves the API key from the environment automatically.

  2. Wrap your input as a Source. Source.from_file() handles upload, MIME detection, and content hashing. You do not need to read the file or specify its type.

  3. Call run(). Pass the prompt, source, and config. Pollux normalizes the request, plans the API call, executes it, and extracts the answer.

  4. Read result["answers"]. The first (and only) element contains the model's response. Check result["status"] to confirm the call succeeded.

Multiple Prompts: run_many()

Now scale up. run_many() handles multiple prompts and sources, with upload reuse, concurrency, and cache identity managed automatically. This is where source patterns (fan-out, fan-in, broadcast) come into play. See Analyzing Collections with Source Patterns for collection-level workflows.

import asyncio
from pollux import Config, Source, run_many

async def main() -> None:
    config = Config(provider="gemini", model="gemini-2.5-flash-lite")
    sources = [
        Source.from_file("paper1.pdf"),
        Source.from_file("paper2.pdf"),
    ]
    prompts = [
        "Summarize the main argument.",
        "List key findings.",
    ]

    envelope = await run_many(prompts, sources=sources, config=config)
    print(envelope["status"])
    for i, answer in enumerate(envelope["answers"], 1):
        print(f"Q{i}: {answer[:80]}...")

asyncio.run(main())

Output:

ok
Q1: Paper 1 argues that multimodal orchestration layers reduce boilerplate by...
Q2: Key findings: (1) fan-out caching saves 85-92% of input tokens; (2) broad...

Choosing the Entry Point

Situation Use Why
One question, optional source run() Smallest surface area
Multiple questions on shared source(s) run_many() Fan-out efficiency
Same question across many sources run_many() Fan-in analysis
Many questions across many sources run_many() Broadcast pattern
Non-urgent work you will collect later defer() / defer_many() Background provider execution with a serializable handle
Returning tool results to the model continue_tool() Feeds tool outputs back into the conversation

Rule of thumb: if prompts or sources are plural and you want answers now, reach for run_many(). If the workload can wait, reach for defer_many().

continue_tool() is a specialized entry point for agent loops. It takes a previous ResultEnvelope containing tool calls and your tool results, and returns the model's next response. See Feeding Tool Results Back with continue_tool() for details.

run() is a convenience wrapper that delegates to run_many() with a single prompt. In benchmarks, run_many() is typically faster for multi-prompt workloads because it shares uploads and runs prompts concurrently.

ResultEnvelope Reference

Every run(), run_many(), and collect_deferred() call returns a ResultEnvelope: a dict with a stable shape that works the same regardless of provider.

Field Type Always present Description
status "ok" \| "partial" \| "error" Yes ok = all answers populated; partial = some empty; error = all empty
answers list[str] Yes One string per prompt
structured list[Any] Only with response_schema Parsed objects matching your schema
reasoning list[str \| None] No Provider reasoning traces (when available)
tool_calls list[list[dict]] Only with tool calling Per-prompt list of tool-call requests. See Conversations
confidence float Yes Heuristic: 0.9 for ok, 0.5 otherwise
extraction_method str Yes Always "text"
usage dict[str, int] Yes Token counts (input_tokens, output_tokens, total_tokens, and optional reasoning_tokens)
metrics dict[str, Any] Yes duration_s, n_calls, cache_used (explicit caching only), finish_reasons (per-prompt, e.g. "stop", "max_tokens"). Deferred results also add metrics["deferred"] = True.
diagnostics dict[str, Any] Yes Low-level diagnostics. All calls include raw_responses. Deferred results also add diagnostics["deferred"] with job_id, timing, and per-request lifecycle items.

Example of a complete envelope:

{
    "status": "ok",
    "answers": ["The paper concludes that..."],
    "confidence": 0.9,
    "extraction_method": "text",
    "usage": {"input_tokens": 1250, "output_tokens": 89, "total_tokens": 1339},
    "metrics": {"duration_s": 1.42, "n_calls": 1, "cache_used": False, "finish_reasons": ["stop"]},
    "diagnostics": {"raw_responses": [...]},
}

Notes


Once you're comfortable with single calls, see Analyzing Collections with Source Patterns for fan-out, fan-in, and broadcast workflows, Submitting Work for Later Collection when the job can run in the background, or Extracting Structured Data to get typed objects instead of free-form text.