Handling Errors and Recovery¶
You want your Pollux-based pipeline to handle failures gracefully. Retry what's transient, skip what's broken, and log enough to diagnose issues later.
At the API level, LLM provider calls can fail for many reasons: invalid credentials, rate limits, malformed input, server errors, unsupported feature combinations. An orchestration layer needs a structured way to surface these failures so your code can make informed recovery decisions without parsing error strings.
Exception Hierarchy¶
Pollux uses a single exception hierarchy rooted at PolluxError:
PolluxError
├── ConfigurationError # Bad config, missing key, unsupported feature
├── SourceError # File not found, invalid arXiv reference
├── PlanningError # Execution plan could not be built
├── InternalError # Bug or invariant violation inside Pollux
├── DeferredNotReadyError # Deferred job is still active
└── APIError # Provider call failed
├── RateLimitError # HTTP 429 (always retryable)
└── CacheError # Cache operation failed
Every error carries a .hint attribute with actionable guidance:
from pollux import Config, ConfigurationError
try:
config = Config(provider="gemini", model="gemini-2.5-flash-lite")
except ConfigurationError as e:
print(e) # "API key required for gemini"
print(e.hint) # "Set GEMINI_API_KEY environment variable or pass api_key=..."
This lets calling code display helpful messages without parsing exception strings.
Boundary
Pollux owns: retrying transient API failures (rate limits, server
errors) within a single run() or run_many() call, respecting
Retry-After headers, and raising typed exceptions with .hint.
You own: workflow-level retry decisions (should I retry this file?), error categorization for your logging/alerting, partial-failure policies (skip vs abort), and circuit-breaking across calls.
Failure Triage¶
Use this order when debugging. Most failures resolve by step 2.
-
Auth and mode check. Is
use_mockwhat you expect? For real mode, ensure the matching key exists (GEMINI_API_KEY,OPENAI_API_KEY,ANTHROPIC_API_KEY, orOPENROUTER_API_KEY). -
Provider/model pairing. Verify the model belongs to the selected provider. Re-run a minimal prompt after fixing any mismatch.
-
Unsupported feature. Compare your options against Provider Capabilities. Deferred work uses
defer()/defer_many()rather thanrun()/run_many(). Conversation continuity and tool calling are provider-dependent. -
Source and payload. Reduce to one source + one prompt and retry. For OpenAI remote URLs, only PDF and image URLs are supported.
Deferred Collection State¶
collect_deferred() is not a polling helper. If the job is still active, it
raises DeferredNotReadyError and attaches the latest DeferredSnapshot on
exc.snapshot.
from pollux import DeferredNotReadyError, collect_deferred
try:
result = await collect_deferred(handle)
except DeferredNotReadyError as exc:
snapshot = exc.snapshot
print(snapshot.status) # queued, running, or cancelling
print(snapshot.pending) # Requests still in flight
print(snapshot.is_terminal) # False
Use snapshot.status and snapshot.is_terminal to decide what your application
does next. Pollux normalizes lifecycle state. Your code still owns polling
cadence, backoff, scheduling, and any cross-job retry policy.
Deferred timelines are provider-driven. A valid job can stay queued or running for minutes or hours. Treat your polling timeout as an application decision, not as proof that deferred submission failed.
Complete Production Example¶
A production wrapper that processes files with category-specific error handling, structured logging, and a summary report:
import asyncio
import logging
from dataclasses import dataclass, field
from pathlib import Path
from pollux import (
APIError,
Config,
ConfigurationError,
PolluxError,
RateLimitError,
Source,
SourceError,
run,
)
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
log = logging.getLogger(__name__)
config = Config(provider="gemini", model="gemini-2.5-flash-lite")
@dataclass
class RunReport:
succeeded: list[str] = field(default_factory=list)
skipped: list[str] = field(default_factory=list)
failed: list[str] = field(default_factory=list)
def summary(self) -> str:
total = len(self.succeeded) + len(self.skipped) + len(self.failed)
return (
f"{total} files: {len(self.succeeded)} ok, "
f"{len(self.skipped)} skipped, {len(self.failed)} failed"
)
async def safe_analyze(path: Path, prompt: str) -> str | None:
"""Analyze a file with category-specific error handling."""
try:
result = await run(
prompt,
source=Source.from_file(str(path)),
config=config,
)
if result["status"] == "partial":
log.warning("%s: partial result (some answers empty)", path.name)
return result["answers"][0]
except ConfigurationError as exc:
# Bad config — nothing to retry, abort early
log.error("Configuration error: %s (hint: %s)", exc, exc.hint)
raise # Let the caller abort the pipeline
except SourceError as exc:
# Bad input file — skip it, process the rest
log.warning("Skipping %s: %s (hint: %s)", path.name, exc, exc.hint)
return None
except RateLimitError as exc:
# Pollux already retried; we're still rate-limited
log.warning(
"Rate limit on %s after retries (hint: %s)", path.name, exc.hint
)
return None
except APIError as exc:
# Other provider errors — log details for diagnosis
log.error(
"API error on %s: %s [status=%s, retryable=%s] (hint: %s)",
path.name, exc, exc.status_code, exc.retryable, exc.hint,
)
return None
except PolluxError as exc:
# Catch-all for unexpected Pollux errors
log.error("Unexpected error on %s: %s (hint: %s)", path.name, exc, exc.hint)
return None
async def process_collection(directory: str, prompt: str) -> RunReport:
"""Process all PDFs with error tracking."""
report = RunReport()
for path in sorted(Path(directory).glob("*.pdf")):
answer = await safe_analyze(path, prompt)
if answer is not None:
report.succeeded.append(path.name)
else:
report.skipped.append(path.name)
log.info(report.summary())
return report
asyncio.run(process_collection("./papers", "Summarize the key findings."))
Step-by-Step Walkthrough¶
-
Catch by category, not by message. The exception hierarchy lets you handle
ConfigurationError,SourceError,RateLimitError, andAPIErrordifferently without parsing error strings. -
Use
.hintfor logging. Every Pollux exception has a.hintwith actionable guidance. Log it alongside the error message for faster diagnosis. -
Abort on configuration errors.
ConfigurationErrormeans the setup is wrong (missing API key, unsupported feature). Retrying won't help. Re-raise to abort the pipeline. -
Skip on source errors.
SourceErrormeans a specific input is bad (file not found, unreadable format). Skip the file and continue. -
Log and continue on API errors.
APIErrorandRateLimitErrormean Pollux already retried internally. At the workflow level, log the failure and move on. Consider a workflow-level retry for important files.
Common Symptoms and Fixes¶
| Symptom | Likely Cause | Fix |
|---|---|---|
ConfigurationError at startup |
Missing API key | export GEMINI_API_KEY="your-key" (or OPENAI_API_KEY / ANTHROPIC_API_KEY / OPENROUTER_API_KEY) or pass api_key in Config(...) |
Outputs look like echo: ... |
use_mock=True is set |
Set use_mock=False (default) and ensure the API key is present |
ConfigurationError at request time |
Provider/model mismatch | Verify the model belongs to the selected provider |
ConfigurationError mentioning delivery_mode |
Legacy Options(delivery_mode="deferred") was passed |
On run() / run_many(), switch to defer() / defer_many(). On deferred entry points, remove delivery_mode. |
status: "partial" |
Some prompts returned empty answers | Check individual entries in answers to identify which prompts failed |
| Remote source rejected | Unsupported MIME type on OpenAI | OpenAI remote URL support is limited to PDFs and images |
Keys show as ***redacted*** |
Intentional redaction | Your key is still being used. Config hides it from string representations |
| Import errors | Missing dependencies | Use Python >=3.10,<3.15 with uv sync --all-extras |
Variations¶
Using .hint for observability¶
The .hint attribute is designed for human-readable context. Include it in
structured logs, alerts, or error dashboards:
except PolluxError as exc:
log.error(
"pollux_error",
extra={
"error_type": type(exc).__name__,
"message": str(exc),
"hint": exc.hint,
"file": path.name,
},
)
For APIError subclasses, additional attributes provide structured metadata:
except APIError as exc:
log.error(
"api_error",
extra={
"status_code": exc.status_code,
"retryable": exc.retryable,
"provider": exc.provider,
"retry_after_s": exc.retry_after_s,
},
)
Circuit breaker¶
Stop processing when errors pile up. Consecutive failures usually mean a systemic issue, not isolated bad files:
MAX_CONSECUTIVE_FAILURES = 3
async def process_with_circuit_breaker(
directory: str, prompt: str,
) -> RunReport:
report = RunReport()
consecutive_failures = 0
for path in sorted(Path(directory).glob("*.pdf")):
answer = await safe_analyze(path, prompt)
if answer is not None:
report.succeeded.append(path.name)
consecutive_failures = 0
else:
report.skipped.append(path.name)
consecutive_failures += 1
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
log.error("Circuit breaker: %d consecutive failures, aborting", consecutive_failures)
break
return report
Distinguishing status: "partial" from exceptions¶
Not all problems are exceptions. A status: "partial" result means some
prompts in a run_many() call returned empty answers. The call succeeded
but the output is incomplete:
result = await run_many(prompts, sources=sources, config=config)
if result["status"] == "ok":
# All answers populated — process normally
pass
elif result["status"] == "partial":
# Some answers are empty strings — decide per-answer
for i, answer in enumerate(result["answers"]):
if answer:
process_answer(i, answer)
else:
log.warning("Empty answer for prompt %d", i)
elif result["status"] == "error":
# All answers empty — treat as a failure
log.error("All answers empty")
Durable Pipelines with Resume-on-Failure¶
For long-running jobs where partial failures are expected, persist a manifest that tracks per-item status. Retries then process only unfinished or failed items:
{
"items": {
"input.txt": {"status": "ok", "output": "outputs/items/input.json"},
"compare.txt": {"status": "error", "error": "RateLimitError: 429"},
"notes.txt": {"status": "pending"}
}
}
On retry, items with status: "ok" are skipped. The manifest updates after
each item (not only at run end), so you never lose progress. See the
resume-on-failure cookbook recipe for a runnable implementation:
python -m cookbook production/resume-on-failure \
--limit 4 \
--manifest outputs/manifest.json --output-dir outputs/items --mock
What to Watch For¶
- Pollux retries internally; you retry at the workflow level. Don't
wrap
run()in a retry loop for transient errors.RetryPolicyalready handles that. Your retries are for workflow-level decisions. ConfigurationErroris never transient. Missing API keys, unsupported features, invalid config. These won't fix themselves. Abort and fix the config.RateLimitErrormeans retries were exhausted. Pollux already waited and retried. If you still getRateLimitError, reduce concurrency or add a longer backoff at the workflow level.- Check
result["status"]even on success. A successful call can return"partial"status with some empty answers. Don't assume all answers are populated because no exception was raised. - Don't catch
Exceptionwhen you meanPolluxError. Catching too broadly hides bugs in your own code. CatchPolluxErrorfor Pollux-specific failures; let everything else propagate.
For the full configuration reference (including RetryPolicy fields, mock
mode, and API key resolution), see Configuring Pollux.
Still Stuck?¶
Include the following in your bug report:
- Provider + model
- Source type(s)
- Exact exception message
File a bug report with concrete reproduction steps.