Skip to content

API Reference

Quick reference for the current public API.

For provider-level feature differences, see Provider Capabilities.

Entry Points

The primary execution functions are exported from pollux:

Run a single prompt, optionally with a source for context.

Parameters:

Name Type Description Default
prompt str

The prompt to run.

required
source Source | None

Optional source for context (file, text, URL).

None
config Config

Configuration specifying provider and model.

required
options Options | None

Optional additive features (schema, reasoning, delivery mode).

None

Returns:

Type Description
ResultEnvelope

ResultEnvelope with answers and metrics.

Example

config = Config(provider="gemini", model="gemini-2.0-flash") result = await run("Summarize this document", source=Source.from_file("doc.pdf"), config=config) first_answer = next(iter(result["answers"]), "") print(first_answer)

Source code in src/pollux/__init__.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
async def run(
    prompt: str,
    *,
    source: Source | None = None,
    config: Config,
    options: Options | None = None,
) -> ResultEnvelope:
    """Run a single prompt, optionally with a source for context.

    Args:
        prompt: The prompt to run.
        source: Optional source for context (file, text, URL).
        config: Configuration specifying provider and model.
        options: Optional additive features (schema, reasoning, delivery mode).

    Returns:
        ResultEnvelope with answers and metrics.

    Example:
        config = Config(provider="gemini", model="gemini-2.0-flash")
        result = await run("Summarize this document", source=Source.from_file("doc.pdf"), config=config)
        first_answer = next(iter(result["answers"]), "")
        print(first_answer)
    """
    sources = (source,) if source else ()
    return await run_many(prompt, sources=sources, config=config, options=options)

Run multiple prompts with shared sources for source-pattern execution.

Parameters:

Name Type Description Default
prompts str | list[str] | tuple[str, ...]

One or more prompts to run.

required
sources tuple[Source, ...] | list[Source]

Optional sources for shared context.

()
config Config

Configuration specifying provider and model.

required
options Options | None

Optional additive features (schema, reasoning, delivery mode).

None

Returns:

Type Description
ResultEnvelope

ResultEnvelope with answers (one per prompt) and metrics.

Example

config = Config(provider="gemini", model="gemini-2.0-flash") result = await run_many( ["Question 1?", "Question 2?"], sources=[Source.from_text("Context...")], config=config, ) for answer in result["answers"]: print(answer)

Source code in src/pollux/__init__.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
async def run_many(
    prompts: str | list[str] | tuple[str, ...],
    *,
    sources: tuple[Source, ...] | list[Source] = (),
    config: Config,
    options: Options | None = None,
) -> ResultEnvelope:
    """Run multiple prompts with shared sources for source-pattern execution.

    Args:
        prompts: One or more prompts to run.
        sources: Optional sources for shared context.
        config: Configuration specifying provider and model.
        options: Optional additive features (schema, reasoning, delivery mode).

    Returns:
        ResultEnvelope with answers (one per prompt) and metrics.

    Example:
        config = Config(provider="gemini", model="gemini-2.0-flash")
        result = await run_many(
            ["Question 1?", "Question 2?"],
            sources=[Source.from_text("Context...")],
            config=config,
        )
        for answer in result["answers"]:
            print(answer)
    """
    request = normalize_request(prompts, sources, config, options=options)
    plan = build_plan(request)
    provider = _get_provider(request.config)

    try:
        trace = await execute_plan(plan, provider, _registry)
    finally:
        aclose = getattr(provider, "aclose", None)
        if callable(aclose):
            try:
                await aclose()
            except asyncio.CancelledError:
                raise
            except Exception as exc:
                # Cleanup should never mask the primary failure.
                logger.warning("Provider cleanup failed: %s", exc)

    return build_result(plan, trace)

Core Types

A structured representation of a single input source.

Source code in src/pollux/source.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
@dataclass(frozen=True, slots=True)
class Source:
    """A structured representation of a single input source."""

    source_type: SourceType
    identifier: str
    mime_type: str
    size_bytes: int
    content_loader: Callable[[], bytes]

    @classmethod
    def from_text(cls, text: str, *, identifier: str | None = None) -> Source:
        """Create a Source from text content.

        Args:
            text: The text content.
            identifier: Display label. Defaults to the first 50 characters of *text*.
        """
        content = text.encode("utf-8")
        ident = identifier or text[:50]
        return cls(
            source_type="text",
            identifier=ident,
            mime_type="text/plain",
            size_bytes=len(content),
            content_loader=lambda: content,
        )

    @classmethod
    def from_file(cls, path: str | Path, *, mime_type: str | None = None) -> Source:
        """Create a Source from a local file.

        Args:
            path: Path to the file. Must exist or ``SourceError`` is raised.
            mime_type: MIME type override. Auto-detected from extension when *None*.
        """
        p = Path(path)
        if not p.exists():
            raise SourceError(f"File not found: {p}")

        mt = mime_type or mimetypes.guess_type(str(p))[0] or "application/octet-stream"
        size = p.stat().st_size

        def loader() -> bytes:
            return p.read_bytes()

        return cls(
            source_type="file",
            identifier=str(p),
            mime_type=mt,
            size_bytes=size,
            content_loader=loader,
        )

    @classmethod
    def from_youtube(cls, url: str) -> Source:
        """Create a Source from a YouTube URL reference (no download)."""
        encoded = f"youtube:{url}".encode()
        return cls(
            source_type="youtube",
            identifier=url,
            mime_type="video/mp4",
            size_bytes=0,
            content_loader=lambda: encoded,
        )

    @classmethod
    def from_uri(
        cls, uri: str, *, mime_type: str = "application/octet-stream"
    ) -> Source:
        """Create a Source from a URI.

        Args:
            uri: Remote URI (e.g. ``gs://`` or ``https://``).
            mime_type: MIME type. Defaults to ``application/octet-stream``.
        """
        encoded = f"uri:{mime_type}:{uri}".encode()
        return cls(
            source_type="uri",
            identifier=uri,
            mime_type=mime_type,
            size_bytes=0,
            content_loader=lambda: encoded,
        )

    @classmethod
    def from_arxiv(cls, ref: str) -> Source:
        """Create an arXiv PDF Source from an arXiv ID or URL.

        Args:
            ref: An arXiv ID (e.g. ``"2301.07041"``) or full arXiv URL.
        """
        if not isinstance(ref, str):
            raise TypeError("ref must be a str")

        normalized_url = cls._normalize_arxiv_to_pdf_url(ref.strip())
        encoded = normalized_url.encode("utf-8")
        return cls(
            source_type="arxiv",
            identifier=normalized_url,
            mime_type="application/pdf",
            size_bytes=0,
            content_loader=lambda: encoded,
        )

    @staticmethod
    def _normalize_arxiv_to_pdf_url(ref: str) -> str:
        """Normalize arXiv id or URL to canonical PDF URL."""
        if not ref:
            raise SourceError("arXiv reference cannot be empty")

        arxiv_id = ref
        if ref.startswith(("http://", "https://")):
            parsed = urlparse(ref)
            host = parsed.netloc.lower()
            if host not in _ARXIV_HOSTS:
                raise SourceError(f"Expected arxiv.org URL, got: {parsed.netloc}")

            path = parsed.path.strip("/")
            if path.startswith("abs/"):
                arxiv_id = path[len("abs/") :]
            elif path.startswith("pdf/"):
                arxiv_id = path[len("pdf/") :]
            else:
                raise SourceError(f"Unsupported arXiv URL path: {parsed.path}")

        if arxiv_id.endswith(".pdf"):
            arxiv_id = arxiv_id[:-4]

        arxiv_id = arxiv_id.strip("/")
        if not _ARXIV_ID_RE.match(arxiv_id):
            raise SourceError(f"Invalid arXiv id: {arxiv_id}")

        return f"https://arxiv.org/pdf/{arxiv_id}.pdf"

    def content_hash(self) -> str:
        """Compute SHA256 hash of content for cache identity."""
        content = self.content_loader()
        return hashlib.sha256(content).hexdigest()

Functions

from_text classmethod

from_text(text, *, identifier=None)

Create a Source from text content.

Parameters:

Name Type Description Default
text str

The text content.

required
identifier str | None

Display label. Defaults to the first 50 characters of text.

None
Source code in src/pollux/source.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
def from_text(cls, text: str, *, identifier: str | None = None) -> Source:
    """Create a Source from text content.

    Args:
        text: The text content.
        identifier: Display label. Defaults to the first 50 characters of *text*.
    """
    content = text.encode("utf-8")
    ident = identifier or text[:50]
    return cls(
        source_type="text",
        identifier=ident,
        mime_type="text/plain",
        size_bytes=len(content),
        content_loader=lambda: content,
    )

from_file classmethod

from_file(path, *, mime_type=None)

Create a Source from a local file.

Parameters:

Name Type Description Default
path str | Path

Path to the file. Must exist or SourceError is raised.

required
mime_type str | None

MIME type override. Auto-detected from extension when None.

None
Source code in src/pollux/source.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@classmethod
def from_file(cls, path: str | Path, *, mime_type: str | None = None) -> Source:
    """Create a Source from a local file.

    Args:
        path: Path to the file. Must exist or ``SourceError`` is raised.
        mime_type: MIME type override. Auto-detected from extension when *None*.
    """
    p = Path(path)
    if not p.exists():
        raise SourceError(f"File not found: {p}")

    mt = mime_type or mimetypes.guess_type(str(p))[0] or "application/octet-stream"
    size = p.stat().st_size

    def loader() -> bytes:
        return p.read_bytes()

    return cls(
        source_type="file",
        identifier=str(p),
        mime_type=mt,
        size_bytes=size,
        content_loader=loader,
    )

from_youtube classmethod

from_youtube(url)

Create a Source from a YouTube URL reference (no download).

Source code in src/pollux/source.py
78
79
80
81
82
83
84
85
86
87
88
@classmethod
def from_youtube(cls, url: str) -> Source:
    """Create a Source from a YouTube URL reference (no download)."""
    encoded = f"youtube:{url}".encode()
    return cls(
        source_type="youtube",
        identifier=url,
        mime_type="video/mp4",
        size_bytes=0,
        content_loader=lambda: encoded,
    )

from_uri classmethod

from_uri(uri, *, mime_type='application/octet-stream')

Create a Source from a URI.

Parameters:

Name Type Description Default
uri str

Remote URI (e.g. gs:// or https://).

required
mime_type str

MIME type. Defaults to application/octet-stream.

'application/octet-stream'
Source code in src/pollux/source.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@classmethod
def from_uri(
    cls, uri: str, *, mime_type: str = "application/octet-stream"
) -> Source:
    """Create a Source from a URI.

    Args:
        uri: Remote URI (e.g. ``gs://`` or ``https://``).
        mime_type: MIME type. Defaults to ``application/octet-stream``.
    """
    encoded = f"uri:{mime_type}:{uri}".encode()
    return cls(
        source_type="uri",
        identifier=uri,
        mime_type=mime_type,
        size_bytes=0,
        content_loader=lambda: encoded,
    )

from_arxiv classmethod

from_arxiv(ref)

Create an arXiv PDF Source from an arXiv ID or URL.

Parameters:

Name Type Description Default
ref str

An arXiv ID (e.g. "2301.07041") or full arXiv URL.

required
Source code in src/pollux/source.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@classmethod
def from_arxiv(cls, ref: str) -> Source:
    """Create an arXiv PDF Source from an arXiv ID or URL.

    Args:
        ref: An arXiv ID (e.g. ``"2301.07041"``) or full arXiv URL.
    """
    if not isinstance(ref, str):
        raise TypeError("ref must be a str")

    normalized_url = cls._normalize_arxiv_to_pdf_url(ref.strip())
    encoded = normalized_url.encode("utf-8")
    return cls(
        source_type="arxiv",
        identifier=normalized_url,
        mime_type="application/pdf",
        size_bytes=0,
        content_loader=lambda: encoded,
    )

content_hash

content_hash()

Compute SHA256 hash of content for cache identity.

Source code in src/pollux/source.py
159
160
161
162
def content_hash(self) -> str:
    """Compute SHA256 hash of content for cache identity."""
    content = self.content_loader()
    return hashlib.sha256(content).hexdigest()

Immutable configuration for Pollux execution.

Provider and model are required—Pollux does not guess what you want. API keys are auto-resolved from standard environment variables.

Example

config = Config(provider="gemini", model="gemini-2.0-flash")

API key is automatically resolved from GEMINI_API_KEY

Source code in src/pollux/config.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@dataclass(frozen=True)
class Config:
    """Immutable configuration for Pollux execution.

    Provider and model are required—Pollux does not guess what you want.
    API keys are auto-resolved from standard environment variables.

    Example:
        config = Config(provider="gemini", model="gemini-2.0-flash")
        # API key is automatically resolved from GEMINI_API_KEY
    """

    provider: ProviderName
    model: str
    #: Auto-resolved from ``GEMINI_API_KEY`` or ``OPENAI_API_KEY`` when *None*.
    api_key: str | None = None
    use_mock: bool = False
    #: Gemini-only in v1.0; silently ignored for other providers.
    enable_caching: bool = False
    ttl_seconds: int = 3600
    request_concurrency: int = 6
    retry: RetryPolicy = field(default_factory=RetryPolicy)

    def __post_init__(self) -> None:
        """Auto-resolve API key and validate configuration."""
        # Validate provider
        if self.provider not in ("gemini", "openai"):
            raise ConfigurationError(
                f"Unknown provider: {self.provider!r}",
                hint="Supported providers: 'gemini', 'openai'",
            )

        # Auto-resolve API key from environment if not provided
        if self.api_key is None and not self.use_mock:
            env_var = _API_KEY_ENV_VARS[self.provider]
            resolved_key = os.environ.get(env_var)
            object.__setattr__(self, "api_key", resolved_key)

        # Validate: real API calls need a key
        if not self.use_mock and not self.api_key:
            env_var = _API_KEY_ENV_VARS[self.provider]
            raise ConfigurationError(
                f"API key required for {self.provider}",
                hint=f"Set {env_var} environment variable or pass api_key=...",
            )

    def __str__(self) -> str:
        """Return a redacted, developer-friendly representation."""
        return (
            f"Config(provider={self.provider!r}, model={self.model!r}, "
            f"api_key={'[REDACTED]' if self.api_key else None}, use_mock={self.use_mock})"
        )

    __repr__ = __str__

Optional execution features for run() and run_many().

Source code in src/pollux/options.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@dataclass(frozen=True)
class Options:
    """Optional execution features for `run()` and `run_many()`."""

    #: Pydantic ``BaseModel`` subclass or JSON Schema dict for structured output.
    response_schema: ResponseSchemaInput | None = None
    #: Reserved — not yet wired in v1.0.
    reasoning_effort: ReasoningEffort | None = None
    #: ``"deferred"`` is reserved for future provider batch APIs.
    delivery_mode: DeliveryMode = "realtime"
    #: Mutually exclusive with *continue_from*.
    history: list[dict[str, str]] | None = None
    #: Mutually exclusive with *history*.
    continue_from: ResultEnvelope | None = None

    def __post_init__(self) -> None:
        """Validate option shapes early for clear errors."""
        if self.response_schema is not None and not (
            isinstance(self.response_schema, dict)
            or (
                isinstance(self.response_schema, type)
                and issubclass(self.response_schema, BaseModel)
            )
        ):
            raise ConfigurationError(
                "response_schema must be a Pydantic model class or JSON schema dict",
                hint="Pass a BaseModel subclass or a dict following JSON Schema.",
            )

        if self.history is not None and self.continue_from is not None:
            raise ConfigurationError(
                "history and continue_from are mutually exclusive",
                hint="Use exactly one conversation input source per call.",
            )
        if self.history is not None:
            if not isinstance(self.history, list):
                raise ConfigurationError(
                    "history must be a list of role/content messages",
                    hint="Pass history=[{'role': 'user', 'content': '...'}].",
                )
            for item in self.history:
                if (
                    not isinstance(item, dict)
                    or not isinstance(item.get("role"), str)
                    or not isinstance(item.get("content"), str)
                ):
                    raise ConfigurationError(
                        "history items must include string role and content fields",
                        hint=(
                            "Each item should look like "
                            "{'role': 'user', 'content': '...'}"
                        ),
                    )
        if self.continue_from is not None and not isinstance(self.continue_from, dict):
            raise ConfigurationError(
                "continue_from must be a prior Pollux result envelope",
                hint="Pass the dict returned by run() or run_many().",
            )

    def response_schema_json(self) -> dict[str, Any] | None:
        """Return JSON Schema for provider APIs."""
        schema = self.response_schema
        if schema is None:
            return None
        if isinstance(schema, dict):
            return schema
        return schema.model_json_schema()

    def response_schema_model(self) -> type[BaseModel] | None:
        """Return Pydantic schema class when one was provided."""
        schema = self.response_schema
        if isinstance(schema, type) and issubclass(schema, BaseModel):
            return schema
        return None

Functions

response_schema_json

response_schema_json()

Return JSON Schema for provider APIs.

Source code in src/pollux/options.py
79
80
81
82
83
84
85
86
def response_schema_json(self) -> dict[str, Any] | None:
    """Return JSON Schema for provider APIs."""
    schema = self.response_schema
    if schema is None:
        return None
    if isinstance(schema, dict):
        return schema
    return schema.model_json_schema()

response_schema_model

response_schema_model()

Return Pydantic schema class when one was provided.

Source code in src/pollux/options.py
88
89
90
91
92
93
def response_schema_model(self) -> type[BaseModel] | None:
    """Return Pydantic schema class when one was provided."""
    schema = self.response_schema
    if isinstance(schema, type) and issubclass(schema, BaseModel):
        return schema
    return None

Bounded retry policy with exponential backoff and optional jitter.

Source code in src/pollux/retry.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@dataclass(frozen=True)
class RetryPolicy:
    """Bounded retry policy with exponential backoff and optional jitter."""

    # Defaults are intentionally conservative: retries should help without
    # surprising tail-latency.
    #: Total attempts including the initial call (``2`` = one retry).
    max_attempts: int = 2
    initial_delay_s: float = 0.5
    backoff_multiplier: float = 2.0
    max_delay_s: float = 5.0
    jitter: bool = True  # "full jitter" when enabled
    #: Wall-clock deadline across all attempts; *None* disables the deadline.
    max_elapsed_s: float | None = 15.0

    def __post_init__(self) -> None:
        """Validate invariants to keep retry behavior predictable."""
        if self.max_attempts < 1:
            raise ValueError("RetryPolicy.max_attempts must be >= 1")
        if self.initial_delay_s < 0:
            raise ValueError("RetryPolicy.initial_delay_s must be >= 0")
        if self.backoff_multiplier <= 0:
            raise ValueError("RetryPolicy.backoff_multiplier must be > 0")
        if self.max_delay_s < 0:
            raise ValueError("RetryPolicy.max_delay_s must be >= 0")
        if self.max_elapsed_s is not None and self.max_elapsed_s < 0:
            raise ValueError("RetryPolicy.max_elapsed_s must be >= 0 or None")

Bases: TypedDict

Standard result envelope returned by Pollux.

status is "ok" when all answers are non-empty, "partial" when some are empty, or "error" when all are empty.

Source code in src/pollux/result.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class ResultEnvelope(TypedDict, total=False):
    """Standard result envelope returned by Pollux.

    ``status`` is ``"ok"`` when all answers are non-empty, ``"partial"`` when
    some are empty, or ``"error"`` when all are empty.
    """

    status: Literal["ok", "partial", "error"]
    answers: list[str]  # Stable core contract.
    #: Present only when ``response_schema`` was set in Options.
    structured: list[Any]
    reasoning: list[str | None]
    #: Heuristic: ``0.9`` for ``"ok"`` status, ``0.5`` otherwise.
    confidence: float
    #: Always ``"text"`` in v1.0.
    extraction_method: str
    #: Keys: ``input_tokens``, ``output_tokens``, ``total_tokens``.
    usage: dict[str, int]
    #: Keys: ``duration_s``, ``n_calls``, ``cache_used``.
    metrics: dict[str, Any]
    diagnostics: dict[str, Any]
    _conversation_state: dict[str, Any]

Error Types

Bases: Exception

Base exception for all Pollux errors.

Source code in src/pollux/errors.py
11
12
13
14
15
16
class PolluxError(Exception):
    """Base exception for all Pollux errors."""

    def __init__(self, message: str, *, hint: str | None = None) -> None:
        super().__init__(message)
        self.hint = hint

Bases: PolluxError

Configuration validation or resolution failed.

Source code in src/pollux/errors.py
19
20
class ConfigurationError(PolluxError):
    """Configuration validation or resolution failed."""

Bases: PolluxError

Source validation or loading failed.

Source code in src/pollux/errors.py
23
24
class SourceError(PolluxError):
    """Source validation or loading failed."""

Bases: PolluxError

Execution planning failed.

Source code in src/pollux/errors.py
27
28
class PlanningError(PolluxError):
    """Execution planning failed."""

Bases: PolluxError

A Pollux internal error (bug) or invariant violation.

Source code in src/pollux/errors.py
31
32
class InternalError(PolluxError):
    """A Pollux internal error (bug) or invariant violation."""

Bases: PolluxError

API call failed.

Providers attach retry metadata so core execution can perform bounded retries without brittle substring matching.

Source code in src/pollux/errors.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class APIError(PolluxError):
    """API call failed.

    Providers attach retry metadata so core execution can perform bounded
    retries without brittle substring matching.
    """

    def __init__(
        self,
        message: str,
        *,
        hint: str | None = None,
        retryable: bool | None = None,
        status_code: int | None = None,
        retry_after_s: float | None = None,
        provider: str | None = None,
        phase: str | None = None,
        call_idx: int | None = None,
    ) -> None:
        super().__init__(message, hint=hint)
        self.retryable = retryable
        self.status_code = status_code
        self.retry_after_s = retry_after_s
        self.provider = provider
        self.phase = phase
        self.call_idx = call_idx

Bases: APIError

Rate limit exceeded (HTTP 429).

Source code in src/pollux/errors.py
67
68
class RateLimitError(APIError):
    """Rate limit exceeded (HTTP 429)."""

Bases: APIError

Cache operation failed.

Source code in src/pollux/errors.py
63
64
class CacheError(APIError):
    """Cache operation failed."""