Skip to content

API Reference

Quick reference for the current public API.

For provider-level feature differences, see Provider Capabilities.

Which Entry Point?

If you want to... Use Learn the workflow in...
Ask one prompt about one source (or no source) run() Sending Content to Models
Ask many prompts against shared sources run_many() Analyzing Collections with Source Patterns
Submit non-urgent work and collect it later defer() / defer_many() Building With Deferred Delivery
Check deferred job status or collect terminal results inspect_deferred() / collect_deferred() / cancel_deferred() Submitting Work for Later Collection
Feed tool results back into a conversation turn continue_tool() Building an Agent Loop
Reuse Gemini context across later calls create_cache() Reducing Costs with Context Caching

Entry Points

The primary execution functions are exported from pollux:

Run a single prompt, optionally with a source for context.

Parameters:

Name Type Description Default
prompt str | None

The prompt to run.

None
source Source | None

Optional source for context (file, text, URL).

None
config Config

Configuration specifying provider and model.

required
options Options | None

Optional additive features such as schema or reasoning controls.

None

Returns:

Type Description
ResultEnvelope

ResultEnvelope with answers and metrics.

Example

config = Config(provider="gemini", model="gemini-2.0-flash") result = await run("Summarize this document", source=Source.from_file("doc.pdf"), config=config) first_answer = next(iter(result["answers"]), "") print(first_answer)

Source code in src/pollux/__init__.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
async def run(
    prompt: str | None = None,
    *,
    source: Source | None = None,
    config: Config,
    options: Options | None = None,
) -> ResultEnvelope:
    """Run a single prompt, optionally with a source for context.

    Args:
        prompt: The prompt to run.
        source: Optional source for context (file, text, URL).
        config: Configuration specifying provider and model.
        options: Optional additive features such as schema or reasoning controls.

    Returns:
        ResultEnvelope with answers and metrics.

    Example:
        config = Config(provider="gemini", model="gemini-2.0-flash")
        result = await run("Summarize this document", source=Source.from_file("doc.pdf"), config=config)
        first_answer = next(iter(result["answers"]), "")
        print(first_answer)
    """
    sources = (source,) if source else ()
    return await run_many(prompt, sources=sources, config=config, options=options)

Run multiple prompts with shared sources for source-pattern execution.

Parameters:

Name Type Description Default
prompts str | Sequence[str | None] | None

One or more prompts to run.

None
sources Sequence[Source]

Optional sources for shared context.

()
config Config

Configuration specifying provider and model.

required
options Options | None

Optional additive features such as schema or reasoning controls.

None

Returns:

Type Description
ResultEnvelope

ResultEnvelope with answers (one per prompt) and metrics.

Example

config = Config(provider="gemini", model="gemini-2.0-flash") result = await run_many( ["Question 1?", "Question 2?"], sources=[Source.from_text("Context...")], config=config, ) for answer in result["answers"]: print(answer)

Source code in src/pollux/__init__.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
async def run_many(
    prompts: str | Sequence[str | None] | None = None,
    *,
    sources: Sequence[Source] = (),
    config: Config,
    options: Options | None = None,
) -> ResultEnvelope:
    """Run multiple prompts with shared sources for source-pattern execution.

    Args:
        prompts: One or more prompts to run.
        sources: Optional sources for shared context.
        config: Configuration specifying provider and model.
        options: Optional additive features such as schema or reasoning controls.

    Returns:
        ResultEnvelope with answers (one per prompt) and metrics.

    Example:
        config = Config(provider="gemini", model="gemini-2.0-flash")
        result = await run_many(
            ["Question 1?", "Question 2?"],
            sources=[Source.from_text("Context...")],
            config=config,
        )
        for answer in result["answers"]:
            print(answer)
    """
    request = normalize_request(prompts, sources, config, options=options)
    plan = build_plan(request)
    provider = _get_provider(request.config)

    try:
        trace = await execute_plan(plan, provider)
    finally:
        await _close_provider(provider)

    return build_result(plan, trace)

Submit a single deferred request and return a serializable handle.

Source code in src/pollux/__init__.py
 97
 98
 99
100
101
102
103
104
105
106
async def defer(
    prompt: str | None = None,
    *,
    source: Source | None = None,
    config: Config,
    options: Options | None = None,
) -> DeferredHandle:
    """Submit a single deferred request and return a serializable handle."""
    sources = (source,) if source else ()
    return await defer_many(prompt, sources=sources, config=config, options=options)

Submit deferred work and return a handle for later inspection/collection.

Source code in src/pollux/__init__.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
async def defer_many(
    prompts: str | Sequence[str | None] | None = None,
    *,
    sources: Sequence[Source] = (),
    config: Config,
    options: Options | None = None,
) -> DeferredHandle:
    """Submit deferred work and return a handle for later inspection/collection."""
    request = normalize_request(prompts, sources, config, options=options)
    if not request.prompts:
        raise ConfigurationError(
            "defer_many() requires at least one prompt",
            hint="Pass one or more prompts, or use run_many(prompts=[]) for a realtime no-op.",
        )
    plan = build_plan(request)
    provider = _get_provider(request.config)

    try:
        return await submit_deferred(plan, provider)
    finally:
        await _close_provider(provider)

Inspect the current state of a deferred job.

Source code in src/pollux/__init__.py
172
173
174
175
176
177
178
179
180
async def inspect_deferred(
    handle: DeferredHandle,
) -> DeferredSnapshot:
    """Inspect the current state of a deferred job."""
    provider = _resolve_deferred_provider(handle)
    try:
        return await inspect_deferred_handle(handle, provider)
    finally:
        await _close_provider(provider)

Collect a terminal deferred job into the standard ResultEnvelope.

Parameters:

Name Type Description Default
handle DeferredHandle

The deferred handle returned by defer() / defer_many().

required
response_schema type[Any] | dict[str, Any] | None

Optional Pydantic model or JSON Schema for structured output rehydration. Must match the schema used at submission time.

None
Source code in src/pollux/__init__.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
async def collect_deferred(
    handle: DeferredHandle,
    *,
    response_schema: type[Any] | dict[str, Any] | None = None,
) -> ResultEnvelope:
    """Collect a terminal deferred job into the standard ResultEnvelope.

    Args:
        handle: The deferred handle returned by ``defer()`` / ``defer_many()``.
        response_schema: Optional Pydantic model or JSON Schema for structured
            output rehydration. Must match the schema used at submission time.
    """
    provider = _resolve_deferred_provider(handle)
    try:
        return await collect_deferred_handle(
            handle,
            provider,
            response_schema=response_schema,
        )
    finally:
        await _close_provider(provider)

Request provider-side cancellation for a deferred job.

Source code in src/pollux/__init__.py
206
207
208
209
210
211
212
213
214
async def cancel_deferred(
    handle: DeferredHandle,
) -> None:
    """Request provider-side cancellation for a deferred job."""
    provider = _resolve_deferred_provider(handle)
    try:
        await cancel_deferred_handle(handle, provider)
    finally:
        await _close_provider(provider)

Continue a conversation with the results of tool calls.

Parameters:

Name Type Description Default
continue_from ResultEnvelope

The previous ResultEnvelope containing tool calls.

required
tool_results list[dict[str, Any]]

List of tool results as dicts (must provide 'role': 'tool', 'tool_call_id', and 'content').

required
config Config

Configuration specifying provider and model.

required
options Options | None

Optional additive features.

None

Returns:

Type Description
ResultEnvelope

ResultEnvelope with the model's next response.

Source code in src/pollux/__init__.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
async def continue_tool(
    continue_from: ResultEnvelope,
    tool_results: list[dict[str, Any]],
    *,
    config: Config,
    options: Options | None = None,
) -> ResultEnvelope:
    """Continue a conversation with the results of tool calls.

    Args:
        continue_from: The previous ResultEnvelope containing tool calls.
        tool_results: List of tool results as dicts (must provide 'role': 'tool',
            'tool_call_id', and 'content').
        config: Configuration specifying provider and model.
        options: Optional additive features.

    Returns:
        ResultEnvelope with the model's next response.
    """
    import copy

    new_state = copy.deepcopy(continue_from.get("_conversation_state", {}))
    new_state["history"] = new_state.get("history", []) + tool_results

    # Build a synthetic envelope to carry the updated state and response_id
    synthetic_envelope: ResultEnvelope = {"_conversation_state": new_state}

    # Merge options, favoring the synthetic continue_from
    # Do not mutate the caller's options object
    if options is None:
        merged_options = Options(continue_from=synthetic_envelope)
    else:
        # copy dict and strip conflicting fields
        kwargs = dict(options.__dict__)
        kwargs.pop("history", None)
        kwargs.pop("continue_from", None)
        merged_options = Options(continue_from=synthetic_envelope, **kwargs)

    return await run(prompt=None, config=config, options=merged_options)

Create a persistent context cache for use with run() / run_many().

Parameters:

Name Type Description Default
sources Sequence[Source]

Content to cache (files, text, URLs).

required
config Config

Configuration specifying provider and model.

required
system_instruction str | None

Optional system-level instruction baked into the cache.

None
tools list[dict[str, Any]] | list[Any] | None

Optional tools to bake into the cache.

None
ttl_seconds int

Time-to-live in seconds (must be ≥ 1).

3600

Returns:

Type Description
CacheHandle

A CacheHandle that can be passed via Options(cache=handle).

Raises:

Type Description
ConfigurationError

If the provider does not support persistent caching or ttl_seconds is invalid.

Example

cfg = Config(provider="gemini", model="gemini-2.5-flash") handle = await create_cache( [Source.from_file("book.pdf")], config=cfg, ttl_seconds=3600, ) result = await run("Summarize.", config=cfg, options=Options(cache=handle))

Source code in src/pollux/__init__.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
async def create_cache(
    sources: Sequence[Source],
    *,
    config: Config,
    system_instruction: str | None = None,
    tools: list[dict[str, Any]] | list[Any] | None = None,
    ttl_seconds: int = 3600,
) -> CacheHandle:
    """Create a persistent context cache for use with ``run()`` / ``run_many()``.

    Args:
        sources: Content to cache (files, text, URLs).
        config: Configuration specifying provider and model.
        system_instruction: Optional system-level instruction baked into the cache.
        tools: Optional tools to bake into the cache.
        ttl_seconds: Time-to-live in seconds (must be ≥ 1).

    Returns:
        A ``CacheHandle`` that can be passed via ``Options(cache=handle)``.

    Raises:
        ConfigurationError: If the provider does not support persistent caching
            or *ttl_seconds* is invalid.

    Example:
        cfg = Config(provider="gemini", model="gemini-2.5-flash")
        handle = await create_cache(
            [Source.from_file("book.pdf")],
            config=cfg,
            ttl_seconds=3600,
        )
        result = await run("Summarize.", config=cfg, options=Options(cache=handle))
    """
    from pollux.cache import create_cache_impl

    provider = _get_provider(config)
    try:
        return await create_cache_impl(
            sources,
            provider=provider,
            config=config,
            system_instruction=system_instruction,
            tools=tools,
            ttl_seconds=ttl_seconds,
        )
    finally:
        await _close_provider(provider)

Core Types

Source includes both the generic source constructors and narrow provider-specific helpers such as Source.with_gemini_video_settings(...).

A structured representation of a single input source.

Source code in src/pollux/source.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
@dataclass(frozen=True, slots=True)
class Source:
    """A structured representation of a single input source."""

    source_type: SourceType
    identifier: str
    mime_type: str
    size_bytes: int
    content_loader: Callable[[], bytes]
    provider_hints: tuple[ProviderHint, ...] = ()

    @classmethod
    def from_text(cls, text: str, *, identifier: str | None = None) -> Source:
        """Create a Source from text content.

        Args:
            text: The text content.
            identifier: Display label. Defaults to the first 50 characters of *text*.
        """
        content = text.encode("utf-8")
        ident = identifier or text[:50]
        return cls(
            source_type="text",
            identifier=ident,
            mime_type="text/plain",
            size_bytes=len(content),
            content_loader=lambda: content,
        )

    @classmethod
    def from_json(
        cls, data: dict[str, Any] | Any, *, identifier: str | None = None
    ) -> Source:
        """Create a Source from a JSON-serializable object.

        Args:
            data: A dictionary or object to serialize into a JSON string. If the object
                has a `model_dump()` method (like Pydantic models), it will be used.
            identifier: Display label. Defaults to "json-data".
        """
        if hasattr(data, "model_dump") and callable(data.model_dump):
            data = data.model_dump()

        try:
            content = json.dumps(data).encode("utf-8")
        except TypeError as exc:
            raise SourceError(f"Data is not JSON serializable: {exc}") from exc

        ident = identifier or "json-data"
        return cls(
            source_type="json",
            identifier=ident,
            mime_type="application/json",
            size_bytes=len(content),
            content_loader=lambda: content,
        )

    @classmethod
    def from_file(cls, path: str | Path, *, mime_type: str | None = None) -> Source:
        """Create a Source from a local file.

        Args:
            path: Path to the file. Must exist or ``SourceError`` is raised.
            mime_type: MIME type override. Auto-detected from extension when *None*.
        """
        p = Path(path)
        if not p.exists():
            raise SourceError(f"File not found: {p}")

        mt = mime_type or mimetypes.guess_type(str(p))[0] or "application/octet-stream"
        size = p.stat().st_size

        def loader() -> bytes:
            return p.read_bytes()

        return cls(
            source_type="file",
            identifier=str(p),
            mime_type=mt,
            size_bytes=size,
            content_loader=loader,
        )

    @classmethod
    def from_youtube(cls, url: str) -> Source:
        """Create a Source from a YouTube URL reference (no download)."""
        encoded = f"youtube:{url}".encode()
        return cls(
            source_type="youtube",
            identifier=url,
            mime_type="video/mp4",
            size_bytes=0,
            content_loader=lambda: encoded,
        )

    @classmethod
    def from_uri(
        cls, uri: str, *, mime_type: str = "application/octet-stream"
    ) -> Source:
        """Create a Source from a URI.

        Args:
            uri: Remote URI (e.g. ``gs://`` or ``https://``).
            mime_type: MIME type. Defaults to ``application/octet-stream``.
        """
        encoded = f"uri:{mime_type}:{uri}".encode()
        return cls(
            source_type="uri",
            identifier=uri,
            mime_type=mime_type,
            size_bytes=0,
            content_loader=lambda: encoded,
        )

    @classmethod
    def from_arxiv(cls, ref: str) -> Source:
        """Create an arXiv PDF Source from an arXiv ID or URL.

        Args:
            ref: An arXiv ID (e.g. ``"2301.07041"``) or full arXiv URL.
        """
        if not isinstance(ref, str):
            raise TypeError("ref must be a str")

        normalized_url = cls._normalize_arxiv_to_pdf_url(ref.strip())
        encoded = normalized_url.encode("utf-8")
        return cls(
            source_type="arxiv",
            identifier=normalized_url,
            mime_type="application/pdf",
            size_bytes=0,
            content_loader=lambda: encoded,
        )

    @staticmethod
    def _normalize_arxiv_to_pdf_url(ref: str) -> str:
        """Normalize arXiv id or URL to canonical PDF URL."""
        if not ref:
            raise SourceError("arXiv reference cannot be empty")

        arxiv_id = ref
        if ref.startswith(("http://", "https://")):
            parsed = urlparse(ref)
            host = parsed.netloc.lower()
            if host not in _ARXIV_HOSTS:
                raise SourceError(f"Expected arxiv.org URL, got: {parsed.netloc}")

            path = parsed.path.strip("/")
            if path.startswith("abs/"):
                arxiv_id = path[len("abs/") :]
            elif path.startswith("pdf/"):
                arxiv_id = path[len("pdf/") :]
            else:
                raise SourceError(f"Unsupported arXiv URL path: {parsed.path}")

        if arxiv_id.endswith(".pdf"):
            arxiv_id = arxiv_id[:-4]

        arxiv_id = arxiv_id.strip("/")
        if not _ARXIV_ID_RE.match(arxiv_id):
            raise SourceError(f"Invalid arXiv id: {arxiv_id}")

        return f"https://arxiv.org/pdf/{arxiv_id}.pdf"

    def _content_hash(self) -> str:
        """Compute SHA256 hash of raw content bytes."""
        content = self.content_loader()
        return hashlib.sha256(content).hexdigest()

    def gemini_video_settings_for(
        self, provider: str | None
    ) -> dict[str, str | float] | None:
        """Return Gemini video settings when the active provider can use them."""
        provider_hints = self.provider_hints_for(provider)
        if provider_hints is None:
            return None
        return provider_hints.get("video_metadata")

    def provider_hints_for(
        self, provider: str | None
    ) -> dict[str, dict[str, str | float]] | None:
        """Return immutable provider hints as plain dictionaries for transport."""
        if provider is None:
            return None

        hints = {
            hint.name: hint.payload_dict()
            for hint in self.provider_hints
            if hint.provider == provider
        }
        return hints or None

    def cache_identity_hash(self, *, provider: str | None = None) -> str:
        """Compute SHA256 hash for cache identity.

        Includes provider-visible source semantics such as Gemini video settings.
        Falls back to raw content hash when no provider-specific settings apply,
        preserving backward-compatible cache keys.
        """
        provider_hints = self.provider_hints_for(provider)
        if provider_hints is None:
            return self._content_hash()
        combined = (
            self._content_hash()
            + "|"
            + json.dumps(
                provider_hints,
                sort_keys=True,
                separators=(",", ":"),
            )
        )
        return hashlib.sha256(combined.encode("utf-8")).hexdigest()

    def with_gemini_video_settings(
        self,
        *,
        start_offset: str | None = None,
        end_offset: str | None = None,
        fps: float | None = None,
    ) -> Source:
        """Return a copy with validated Gemini video controls attached.

        Pollux keeps this API provider-specific and stable even if Google's
        underlying wire fields evolve. The Gemini adapter maps these settings
        to the current SDK request shape. These settings only affect Gemini
        requests and Gemini explicit-cache identity.
        """
        if not self._is_video_source():
            raise SourceError(
                "Gemini video settings require a video source "
                "(local video file, video URI, or YouTube URL)"
            )

        validated_start_offset: str | None = None
        validated_end_offset: str | None = None
        validated_fps: float | None = None

        if start_offset is not None:
            if not isinstance(start_offset, str) or not start_offset.strip():
                raise SourceError("start_offset must be a non-empty string")
            validated_start_offset = start_offset

        if end_offset is not None:
            if not isinstance(end_offset, str) or not end_offset.strip():
                raise SourceError("end_offset must be a non-empty string")
            validated_end_offset = end_offset

        if fps is not None:
            if isinstance(fps, bool) or not isinstance(fps, (int, float)):
                raise SourceError("fps must be a number")
            fps_value = float(fps)
            if fps_value <= 0 or fps_value > 24:
                raise SourceError("fps must be > 0 and <= 24")
            validated_fps = fps_value

        if (
            validated_start_offset is None
            and validated_end_offset is None
            and validated_fps is None
        ):
            raise SourceError(
                "Provide at least one Gemini video setting: "
                "start_offset, end_offset, or fps"
            )

        return replace(
            self,
            provider_hints=self._with_provider_hint(
                provider="gemini",
                name="video_metadata",
                payload={
                    k: v
                    for k, v in (
                        ("start_offset", validated_start_offset),
                        ("end_offset", validated_end_offset),
                        ("fps", validated_fps),
                    )
                    if v is not None
                },
            ),
        )

    def _is_video_source(self) -> bool:
        """Return True when Gemini video controls can apply to this source."""
        return self.source_type == "youtube" or self.mime_type.startswith("video/")

    def _with_provider_hint(
        self,
        *,
        provider: str,
        name: str,
        payload: dict[str, str | float],
    ) -> tuple[ProviderHint, ...]:
        """Return provider hints with one named hint replaced or added."""
        hint = ProviderHint(
            provider=provider,
            name=name,
            payload=tuple(sorted(payload.items())),
        )
        existing = tuple(
            item
            for item in self.provider_hints
            if not (item.provider == provider and item.name == name)
        )
        return (*existing, hint)

Functions

from_text classmethod

from_text(text, *, identifier=None)

Create a Source from text content.

Parameters:

Name Type Description Default
text str

The text content.

required
identifier str | None

Display label. Defaults to the first 50 characters of text.

None
Source code in src/pollux/source.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@classmethod
def from_text(cls, text: str, *, identifier: str | None = None) -> Source:
    """Create a Source from text content.

    Args:
        text: The text content.
        identifier: Display label. Defaults to the first 50 characters of *text*.
    """
    content = text.encode("utf-8")
    ident = identifier or text[:50]
    return cls(
        source_type="text",
        identifier=ident,
        mime_type="text/plain",
        size_bytes=len(content),
        content_loader=lambda: content,
    )

from_json classmethod

from_json(data, *, identifier=None)

Create a Source from a JSON-serializable object.

Parameters:

Name Type Description Default
data dict[str, Any] | Any

A dictionary or object to serialize into a JSON string. If the object has a model_dump() method (like Pydantic models), it will be used.

required
identifier str | None

Display label. Defaults to "json-data".

None
Source code in src/pollux/source.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@classmethod
def from_json(
    cls, data: dict[str, Any] | Any, *, identifier: str | None = None
) -> Source:
    """Create a Source from a JSON-serializable object.

    Args:
        data: A dictionary or object to serialize into a JSON string. If the object
            has a `model_dump()` method (like Pydantic models), it will be used.
        identifier: Display label. Defaults to "json-data".
    """
    if hasattr(data, "model_dump") and callable(data.model_dump):
        data = data.model_dump()

    try:
        content = json.dumps(data).encode("utf-8")
    except TypeError as exc:
        raise SourceError(f"Data is not JSON serializable: {exc}") from exc

    ident = identifier or "json-data"
    return cls(
        source_type="json",
        identifier=ident,
        mime_type="application/json",
        size_bytes=len(content),
        content_loader=lambda: content,
    )

from_file classmethod

from_file(path, *, mime_type=None)

Create a Source from a local file.

Parameters:

Name Type Description Default
path str | Path

Path to the file. Must exist or SourceError is raised.

required
mime_type str | None

MIME type override. Auto-detected from extension when None.

None
Source code in src/pollux/source.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@classmethod
def from_file(cls, path: str | Path, *, mime_type: str | None = None) -> Source:
    """Create a Source from a local file.

    Args:
        path: Path to the file. Must exist or ``SourceError`` is raised.
        mime_type: MIME type override. Auto-detected from extension when *None*.
    """
    p = Path(path)
    if not p.exists():
        raise SourceError(f"File not found: {p}")

    mt = mime_type or mimetypes.guess_type(str(p))[0] or "application/octet-stream"
    size = p.stat().st_size

    def loader() -> bytes:
        return p.read_bytes()

    return cls(
        source_type="file",
        identifier=str(p),
        mime_type=mt,
        size_bytes=size,
        content_loader=loader,
    )

from_youtube classmethod

from_youtube(url)

Create a Source from a YouTube URL reference (no download).

Source code in src/pollux/source.py
121
122
123
124
125
126
127
128
129
130
131
@classmethod
def from_youtube(cls, url: str) -> Source:
    """Create a Source from a YouTube URL reference (no download)."""
    encoded = f"youtube:{url}".encode()
    return cls(
        source_type="youtube",
        identifier=url,
        mime_type="video/mp4",
        size_bytes=0,
        content_loader=lambda: encoded,
    )

from_uri classmethod

from_uri(uri, *, mime_type='application/octet-stream')

Create a Source from a URI.

Parameters:

Name Type Description Default
uri str

Remote URI (e.g. gs:// or https://).

required
mime_type str

MIME type. Defaults to application/octet-stream.

'application/octet-stream'
Source code in src/pollux/source.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def from_uri(
    cls, uri: str, *, mime_type: str = "application/octet-stream"
) -> Source:
    """Create a Source from a URI.

    Args:
        uri: Remote URI (e.g. ``gs://`` or ``https://``).
        mime_type: MIME type. Defaults to ``application/octet-stream``.
    """
    encoded = f"uri:{mime_type}:{uri}".encode()
    return cls(
        source_type="uri",
        identifier=uri,
        mime_type=mime_type,
        size_bytes=0,
        content_loader=lambda: encoded,
    )

from_arxiv classmethod

from_arxiv(ref)

Create an arXiv PDF Source from an arXiv ID or URL.

Parameters:

Name Type Description Default
ref str

An arXiv ID (e.g. "2301.07041") or full arXiv URL.

required
Source code in src/pollux/source.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
@classmethod
def from_arxiv(cls, ref: str) -> Source:
    """Create an arXiv PDF Source from an arXiv ID or URL.

    Args:
        ref: An arXiv ID (e.g. ``"2301.07041"``) or full arXiv URL.
    """
    if not isinstance(ref, str):
        raise TypeError("ref must be a str")

    normalized_url = cls._normalize_arxiv_to_pdf_url(ref.strip())
    encoded = normalized_url.encode("utf-8")
    return cls(
        source_type="arxiv",
        identifier=normalized_url,
        mime_type="application/pdf",
        size_bytes=0,
        content_loader=lambda: encoded,
    )

gemini_video_settings_for

gemini_video_settings_for(provider)

Return Gemini video settings when the active provider can use them.

Source code in src/pollux/source.py
207
208
209
210
211
212
213
214
def gemini_video_settings_for(
    self, provider: str | None
) -> dict[str, str | float] | None:
    """Return Gemini video settings when the active provider can use them."""
    provider_hints = self.provider_hints_for(provider)
    if provider_hints is None:
        return None
    return provider_hints.get("video_metadata")

provider_hints_for

provider_hints_for(provider)

Return immutable provider hints as plain dictionaries for transport.

Source code in src/pollux/source.py
216
217
218
219
220
221
222
223
224
225
226
227
228
def provider_hints_for(
    self, provider: str | None
) -> dict[str, dict[str, str | float]] | None:
    """Return immutable provider hints as plain dictionaries for transport."""
    if provider is None:
        return None

    hints = {
        hint.name: hint.payload_dict()
        for hint in self.provider_hints
        if hint.provider == provider
    }
    return hints or None

cache_identity_hash

cache_identity_hash(*, provider=None)

Compute SHA256 hash for cache identity.

Includes provider-visible source semantics such as Gemini video settings. Falls back to raw content hash when no provider-specific settings apply, preserving backward-compatible cache keys.

Source code in src/pollux/source.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def cache_identity_hash(self, *, provider: str | None = None) -> str:
    """Compute SHA256 hash for cache identity.

    Includes provider-visible source semantics such as Gemini video settings.
    Falls back to raw content hash when no provider-specific settings apply,
    preserving backward-compatible cache keys.
    """
    provider_hints = self.provider_hints_for(provider)
    if provider_hints is None:
        return self._content_hash()
    combined = (
        self._content_hash()
        + "|"
        + json.dumps(
            provider_hints,
            sort_keys=True,
            separators=(",", ":"),
        )
    )
    return hashlib.sha256(combined.encode("utf-8")).hexdigest()

with_gemini_video_settings

with_gemini_video_settings(
    *, start_offset=None, end_offset=None, fps=None
)

Return a copy with validated Gemini video controls attached.

Pollux keeps this API provider-specific and stable even if Google's underlying wire fields evolve. The Gemini adapter maps these settings to the current SDK request shape. These settings only affect Gemini requests and Gemini explicit-cache identity.

Source code in src/pollux/source.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def with_gemini_video_settings(
    self,
    *,
    start_offset: str | None = None,
    end_offset: str | None = None,
    fps: float | None = None,
) -> Source:
    """Return a copy with validated Gemini video controls attached.

    Pollux keeps this API provider-specific and stable even if Google's
    underlying wire fields evolve. The Gemini adapter maps these settings
    to the current SDK request shape. These settings only affect Gemini
    requests and Gemini explicit-cache identity.
    """
    if not self._is_video_source():
        raise SourceError(
            "Gemini video settings require a video source "
            "(local video file, video URI, or YouTube URL)"
        )

    validated_start_offset: str | None = None
    validated_end_offset: str | None = None
    validated_fps: float | None = None

    if start_offset is not None:
        if not isinstance(start_offset, str) or not start_offset.strip():
            raise SourceError("start_offset must be a non-empty string")
        validated_start_offset = start_offset

    if end_offset is not None:
        if not isinstance(end_offset, str) or not end_offset.strip():
            raise SourceError("end_offset must be a non-empty string")
        validated_end_offset = end_offset

    if fps is not None:
        if isinstance(fps, bool) or not isinstance(fps, (int, float)):
            raise SourceError("fps must be a number")
        fps_value = float(fps)
        if fps_value <= 0 or fps_value > 24:
            raise SourceError("fps must be > 0 and <= 24")
        validated_fps = fps_value

    if (
        validated_start_offset is None
        and validated_end_offset is None
        and validated_fps is None
    ):
        raise SourceError(
            "Provide at least one Gemini video setting: "
            "start_offset, end_offset, or fps"
        )

    return replace(
        self,
        provider_hints=self._with_provider_hint(
            provider="gemini",
            name="video_metadata",
            payload={
                k: v
                for k, v in (
                    ("start_offset", validated_start_offset),
                    ("end_offset", validated_end_offset),
                    ("fps", validated_fps),
                )
                if v is not None
            },
        ),
    )

Opaque handle returned by create_cache().

Pass instances via Options(cache=handle) to reuse a persistent context cache across run() / run_many() calls.

Source code in src/pollux/cache.py
27
28
29
30
31
32
33
34
35
36
37
38
@dataclass(frozen=True)
class CacheHandle:
    """Opaque handle returned by ``create_cache()``.

    Pass instances via ``Options(cache=handle)`` to reuse a persistent
    context cache across ``run()`` / ``run_many()`` calls.
    """

    name: str
    model: str
    provider: str
    expires_at: float

Immutable configuration for Pollux execution.

Provider and model are required—Pollux does not guess what you want. API keys are auto-resolved from standard environment variables.

Example

config = Config(provider="gemini", model="gemini-2.0-flash")

API key is automatically resolved from GEMINI_API_KEY

Source code in src/pollux/config.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@dataclass(frozen=True)
class Config:
    """Immutable configuration for Pollux execution.

    Provider and model are required—Pollux does not guess what you want.
    API keys are auto-resolved from standard environment variables.

    Example:
        config = Config(provider="gemini", model="gemini-2.0-flash")
        # API key is automatically resolved from GEMINI_API_KEY
    """

    provider: ProviderName
    model: str
    #: Auto-resolved from the provider-specific API key env var when *None*.
    api_key: str | None = None
    use_mock: bool = False
    request_concurrency: int = 6
    retry: RetryPolicy = field(default_factory=RetryPolicy)

    def __post_init__(self) -> None:
        """Auto-resolve API key and validate configuration."""
        # Validate provider
        if self.provider not in ("gemini", "openai", "anthropic", "openrouter"):
            raise ConfigurationError(
                f"Unknown provider: {self.provider!r}",
                hint=(
                    "Supported providers: 'gemini', 'openai', 'anthropic', 'openrouter'"
                ),
            )

        # Validate numeric fields
        if not isinstance(self.request_concurrency, int):
            raise ConfigurationError(
                f"request_concurrency must be an integer, got {type(self.request_concurrency).__name__}",
                hint="Pass a whole number ≥ 1 for request_concurrency.",
            )
        if self.request_concurrency < 1:
            raise ConfigurationError(
                f"request_concurrency must be ≥ 1, got {self.request_concurrency}",
                hint="This controls how many API calls run in parallel.",
            )

        # Auto-resolve API key from environment if not provided
        if self.api_key is None and not self.use_mock:
            object.__setattr__(self, "api_key", resolve_api_key(self.provider))

        # Validate: real API calls need a key
        if not self.use_mock and not self.api_key:
            env_var = _API_KEY_ENV_VARS[self.provider]
            raise ConfigurationError(
                f"API key required for {self.provider}",
                hint=f"Set {env_var} environment variable or pass api_key=...",
            )

    def __str__(self) -> str:
        """Return a redacted, developer-friendly representation."""
        return (
            f"Config(provider={self.provider!r}, model={self.model!r}, "
            f"api_key={'[REDACTED]' if self.api_key else None}, use_mock={self.use_mock})"
        )

    __repr__ = __str__

Serializable Pollux handle for a deferred job.

Source code in src/pollux/deferred.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@dataclass(frozen=True)
class DeferredHandle:
    """Serializable Pollux handle for a deferred job."""

    job_id: str
    provider: str
    model: str
    request_count: int
    submitted_at: float
    schema_hash: str | None = None
    provider_state: dict[str, Any] | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize the handle for persistence."""
        return dict(asdict(self))

    @classmethod
    def from_dict(cls, data: Mapping[str, Any]) -> DeferredHandle:
        """Rebuild a handle from serialized data."""
        return cls(
            job_id=str(data["job_id"]),
            provider=str(data["provider"]),
            model=str(data["model"]),
            request_count=int(data["request_count"]),
            submitted_at=float(data["submitted_at"]),
            schema_hash=(
                None
                if data.get("schema_hash") is None
                else str(data.get("schema_hash"))
            ),
            provider_state=(
                dict(data["provider_state"])
                if isinstance(data.get("provider_state"), dict)
                else None
            ),
        )

Functions

to_dict

to_dict()

Serialize the handle for persistence.

Source code in src/pollux/deferred.py
56
57
58
def to_dict(self) -> dict[str, Any]:
    """Serialize the handle for persistence."""
    return dict(asdict(self))

from_dict classmethod

from_dict(data)

Rebuild a handle from serialized data.

Source code in src/pollux/deferred.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> DeferredHandle:
    """Rebuild a handle from serialized data."""
    return cls(
        job_id=str(data["job_id"]),
        provider=str(data["provider"]),
        model=str(data["model"]),
        request_count=int(data["request_count"]),
        submitted_at=float(data["submitted_at"]),
        schema_hash=(
            None
            if data.get("schema_hash") is None
            else str(data.get("schema_hash"))
        ),
        provider_state=(
            dict(data["provider_state"])
            if isinstance(data.get("provider_state"), dict)
            else None
        ),
    )

Normalized snapshot of a deferred job lifecycle.

Source code in src/pollux/deferred.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@dataclass(frozen=True)
class DeferredSnapshot:
    """Normalized snapshot of a deferred job lifecycle."""

    job_id: str
    provider: str
    model: str
    status: DeferredStatus
    provider_status: str
    request_count: int
    succeeded: int
    failed: int
    pending: int
    submitted_at: float
    completed_at: float | None = None
    expires_at: float | None = None

    @property
    def is_terminal(self) -> bool:
        """Return True when the job is ready to collect or permanently done."""
        return self.status in _TERMINAL_STATUSES

Attributes

is_terminal property

is_terminal

Return True when the job is ready to collect or permanently done.

Optional execution features for run() and run_many().

Source code in src/pollux/options.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
@dataclass(frozen=True)
class Options:
    """Optional execution features for `run()` and `run_many()`."""

    #: Optional system-level instruction for model behavior.
    system_instruction: str | None = None
    #: Pydantic ``BaseModel`` subclass or JSON Schema dict for structured output.
    response_schema: ResponseSchemaInput | None = None

    #: Core tool calling parameters
    tools: list[dict[str, Any]] | None = None
    tool_choice: Literal["auto", "required", "none"] | dict[str, Any] | None = None

    #: Generation tuning parameters
    temperature: float | None = None
    top_p: float | None = None

    #: Controls model thinking depth; passed through to the provider.
    reasoning_effort: ReasoningEffort | None = None
    #: Legacy compatibility shim. Realtime remains the only supported value.
    delivery_mode: DeliveryMode = "realtime"
    #: Mutually exclusive with *continue_from*.
    history: list[dict[str, Any]] | None = None
    #: Mutually exclusive with *history*.
    continue_from: ResultEnvelope | None = None
    #: Hard limit on the model's total output tokens. Provider-specific semantics.
    max_tokens: int | None = None
    #: Persistent context cache obtained from ``create_cache()``.
    cache: CacheHandle | None = None
    #: Controls implicit model-level caching (e.g., Anthropic prefix caching).
    #: Defaults to True for a single provider call, False for multi-call fan-out.
    implicit_caching: bool | None = None

    def __post_init__(self) -> None:
        """Validate option shapes early for clear errors."""
        if self.system_instruction is not None and not isinstance(
            self.system_instruction, str
        ):
            raise ConfigurationError(
                "system_instruction must be a string",
                hint="Pass system_instruction='You are a concise assistant.'",
            )

        if self.response_schema is not None and not (
            isinstance(self.response_schema, dict)
            or (
                isinstance(self.response_schema, type)
                and issubclass(self.response_schema, BaseModel)
            )
        ):
            raise ConfigurationError(
                "response_schema must be a Pydantic model class or JSON schema dict",
                hint="Pass a BaseModel subclass or a dict following JSON Schema.",
            )

        if self.max_tokens is not None and (
            not isinstance(self.max_tokens, int) or self.max_tokens <= 0
        ):
            raise ConfigurationError(
                "max_tokens must be a positive integer",
                hint="Pass max_tokens=16384 or greater for thinking models.",
            )

        # Keep this runtime guard even though ``delivery_mode`` is typed as
        # ``str`` on purpose; the loose annotation is a UX choice for editor
        # autocomplete, not a widening of the supported values.
        if self.delivery_mode not in {"realtime", "deferred"}:
            raise ConfigurationError(
                "delivery_mode must be 'realtime' or 'deferred'",
                hint=(
                    "Remove delivery_mode, keep the default realtime mode, "
                    "or use delivery_mode='deferred' only as a migration shim."
                ),
            )

        if self.history is not None and self.continue_from is not None:
            raise ConfigurationError(
                "history and continue_from are mutually exclusive",
                hint="Use exactly one conversation input source per call.",
            )
        if self.history is not None:
            if not isinstance(self.history, list):
                raise ConfigurationError(
                    "history must be a list of role/content messages",
                    hint="Pass history=[{'role': 'user', 'content': '...'}].",
                )
            for item in self.history:
                if not isinstance(item, dict) or not isinstance(item.get("role"), str):
                    raise ConfigurationError(
                        "history items must be dicts with a string 'role' field",
                        hint=(
                            "Each item needs at least {'role': 'user', ...}. "
                            "Tool messages may omit 'content' or include extra "
                            "keys like 'tool_call_id'."
                        ),
                    )
        if self.continue_from is not None and not isinstance(self.continue_from, dict):
            raise ConfigurationError(
                "continue_from must be a prior Pollux result envelope",
                hint="Pass the dict returned by run() or run_many().",
            )
        if self.cache is not None:
            from pollux.cache import CacheHandle

            if not isinstance(self.cache, CacheHandle):
                raise ConfigurationError(
                    "cache must be a CacheHandle from create_cache()",
                    hint="Call create_cache() first, then pass Options(cache=handle).",
                )

    def response_schema_json(self) -> dict[str, Any] | None:
        """Return JSON Schema for provider APIs."""
        return response_schema_json(self.response_schema)

    def response_schema_model(self) -> type[BaseModel] | None:
        """Return Pydantic schema class when one was provided."""
        return response_schema_model(self.response_schema)

    def response_schema_hash(self) -> str | None:
        """Return a stable hash of the JSON Schema."""
        return response_schema_hash(self.response_schema)

Functions

response_schema_json

response_schema_json()

Return JSON Schema for provider APIs.

Source code in src/pollux/options.py
171
172
173
def response_schema_json(self) -> dict[str, Any] | None:
    """Return JSON Schema for provider APIs."""
    return response_schema_json(self.response_schema)

response_schema_model

response_schema_model()

Return Pydantic schema class when one was provided.

Source code in src/pollux/options.py
175
176
177
def response_schema_model(self) -> type[BaseModel] | None:
    """Return Pydantic schema class when one was provided."""
    return response_schema_model(self.response_schema)

response_schema_hash

response_schema_hash()

Return a stable hash of the JSON Schema.

Source code in src/pollux/options.py
179
180
181
def response_schema_hash(self) -> str | None:
    """Return a stable hash of the JSON Schema."""
    return response_schema_hash(self.response_schema)

Bounded retry policy with exponential backoff and optional jitter.

Source code in src/pollux/retry.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@dataclass(frozen=True)
class RetryPolicy:
    """Bounded retry policy with exponential backoff and optional jitter."""

    # Defaults are intentionally conservative: retries should help without
    # surprising tail-latency.
    #: Total attempts including the initial call (``2`` = one retry).
    max_attempts: int = 2
    initial_delay_s: float = 0.5
    backoff_multiplier: float = 2.0
    max_delay_s: float = 5.0
    jitter: bool = True  # "full jitter" when enabled
    #: Wall-clock deadline across all attempts; *None* disables the deadline.
    max_elapsed_s: float | None = 15.0

    def __post_init__(self) -> None:
        """Validate invariants to keep retry behavior predictable."""
        if self.max_attempts < 1:
            raise ValueError("RetryPolicy.max_attempts must be >= 1")
        if self.initial_delay_s < 0:
            raise ValueError("RetryPolicy.initial_delay_s must be >= 0")
        if self.backoff_multiplier <= 0:
            raise ValueError("RetryPolicy.backoff_multiplier must be > 0")
        if self.max_delay_s < 0:
            raise ValueError("RetryPolicy.max_delay_s must be >= 0")
        if self.max_elapsed_s is not None and self.max_elapsed_s < 0:
            raise ValueError("RetryPolicy.max_elapsed_s must be >= 0 or None")

Bases: TypedDict

Standard result envelope returned by Pollux.

status is "ok" when all answers are non-empty, "partial" when some are empty, or "error" when all are empty.

Source code in src/pollux/result.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class ResultEnvelope(TypedDict, total=False):
    """Standard result envelope returned by Pollux.

    ``status`` is ``"ok"`` when all answers are non-empty, ``"partial"`` when
    some are empty, or ``"error"`` when all are empty.
    """

    status: Literal["ok", "partial", "error"]
    answers: list[str]  # Stable core contract.
    #: Present only when ``response_schema`` was set in Options.
    structured: list[Any]
    reasoning: list[str | None]
    #: Heuristic: ``0.9`` for ``"ok"`` status, ``0.5`` otherwise.
    confidence: float
    #: Always ``"text"`` in v1.0.
    extraction_method: str
    #: Keys: ``input_tokens``, ``output_tokens``, ``total_tokens``,
    #: and optionally ``reasoning_tokens``.
    usage: dict[str, int]
    #: Keys: ``duration_s``, ``n_calls``, ``cache_used``, ``finish_reasons``.
    metrics: dict[str, Any]
    diagnostics: dict[str, Any]
    _conversation_state: dict[str, Any]
    tool_calls: list[list[dict[str, Any]]]

Error Types

Bases: Exception

Base exception for all Pollux errors.

Source code in src/pollux/errors.py
11
12
13
14
15
16
class PolluxError(Exception):
    """Base exception for all Pollux errors."""

    def __init__(self, message: str, *, hint: str | None = None) -> None:
        super().__init__(message)
        self.hint = hint

Bases: PolluxError

Configuration validation or resolution failed.

Source code in src/pollux/errors.py
19
20
class ConfigurationError(PolluxError):
    """Configuration validation or resolution failed."""

Bases: PolluxError

Source validation or loading failed.

Source code in src/pollux/errors.py
23
24
class SourceError(PolluxError):
    """Source validation or loading failed."""

Bases: PolluxError

Execution planning failed.

Source code in src/pollux/errors.py
27
28
class PlanningError(PolluxError):
    """Execution planning failed."""

Bases: PolluxError

A Pollux internal error (bug) or invariant violation.

Source code in src/pollux/errors.py
31
32
class InternalError(PolluxError):
    """A Pollux internal error (bug) or invariant violation."""

Bases: PolluxError

API call failed.

Providers attach retry metadata so core execution can perform bounded retries without brittle substring matching.

Source code in src/pollux/errors.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class APIError(PolluxError):
    """API call failed.

    Providers attach retry metadata so core execution can perform bounded
    retries without brittle substring matching.
    """

    def __init__(
        self,
        message: str,
        *,
        hint: str | None = None,
        retryable: bool | None = None,
        status_code: int | None = None,
        retry_after_s: float | None = None,
        provider: str | None = None,
        phase: str | None = None,
        call_idx: int | None = None,
    ) -> None:
        super().__init__(message, hint=hint)
        self.retryable = retryable
        self.status_code = status_code
        self.retry_after_s = retry_after_s
        self.provider = provider
        self.phase = phase
        self.call_idx = call_idx

Bases: APIError

Rate limit exceeded (HTTP 429).

Source code in src/pollux/errors.py
67
68
class RateLimitError(APIError):
    """Rate limit exceeded (HTTP 429)."""

Bases: APIError

Cache operation failed.

Source code in src/pollux/errors.py
63
64
class CacheError(APIError):
    """Cache operation failed."""

Bases: PolluxError

Deferred job is not yet in a terminal state.

Source code in src/pollux/errors.py
71
72
73
74
75
76
77
78
79
class DeferredNotReadyError(PolluxError):
    """Deferred job is not yet in a terminal state."""

    def __init__(self, snapshot: Any) -> None:
        super().__init__(
            "Deferred job is not ready to collect",
            hint="Inspect the attached snapshot and retry after the job reaches a terminal state.",
        )
        self.snapshot = snapshot