How to Build Error Recovery for AI Workflows
Implement automatic error detection and recovery in AI processing pipelines.
Jay Banlasan
The AI Systems Guy
AI APIs fail. Rate limits hit, tokens expire, context windows overflow, and sometimes the model just returns garbage. A production workflow without error recovery is a time bomb. I have seen automation pipelines lose hours of work because one API hiccup crashed the whole job. The fix is a solid retry and recovery layer that handles errors gracefully without human intervention.
This matters most for batch jobs, overnight runs, and any workflow where a failure means reprocessing everything from scratch. Good error recovery means your pipeline finishes the job or fails cleanly with a log showing exactly where and why.
What You Need Before Starting
- Python 3.9+
- OpenAI or Anthropic API key
tenacitylibrary for retry logic- A workflow that currently runs without error handling (this will improve it)
Step 1: Install Dependencies
pip install tenacity openai
tenacity is the best retry library for Python. It handles exponential backoff, jitter, and custom retry conditions with clean decorator syntax.
Step 2: Build the Retry Decorator
Wrap all API calls with exponential backoff and smart retry conditions.
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
wait_random_exponential,
retry_if_exception_type,
before_sleep_log,
RetryError
)
import openai
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Errors worth retrying
RETRYABLE_ERRORS = (
openai.RateLimitError,
openai.APITimeoutError,
openai.APIConnectionError,
openai.InternalServerError,
)
# Errors not worth retrying (fail fast)
NON_RETRYABLE_ERRORS = (
openai.AuthenticationError,
openai.BadRequestError,
openai.NotFoundError,
)
@retry(
retry=retry_if_exception_type(RETRYABLE_ERRORS),
wait=wait_random_exponential(min=1, max=60),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARNING)
)
def call_with_retry(model: str, messages: list, **kwargs) -> str:
client = openai.OpenAI(api_key="YOUR_API_KEY")
response = client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
return response.choices[0].message.content
Step 3: Add Context Window Overflow Recovery
When your input is too long, truncate intelligently and retry.
import tiktoken
def count_tokens(text: str, model: str = "gpt-4o") -> int:
try:
enc = tiktoken.encoding_for_model(model)
except KeyError:
enc = tiktoken.get_encoding("cl100k_base")
return len(enc.encode(text))
MODEL_LIMITS = {
"gpt-4o": 128000,
"gpt-4o-mini": 128000,
"gpt-3.5-turbo": 16385,
}
def truncate_messages_to_fit(messages: list, model: str, reserve_output: int = 2000) -> list:
limit = MODEL_LIMITS.get(model, 16000) - reserve_output
total_tokens = sum(count_tokens(m["content"], model) for m in messages)
if total_tokens <= limit:
return messages
# Keep system message, truncate from oldest user messages
system_messages = [m for m in messages if m["role"] == "system"]
other_messages = [m for m in messages if m["role"] != "system"]
while total_tokens > limit and len(other_messages) > 2:
removed = other_messages.pop(0)
total_tokens -= count_tokens(removed["content"], model)
# If still over, truncate the longest user message
if total_tokens > limit and other_messages:
for msg in other_messages:
if msg["role"] == "user":
excess = total_tokens - limit
words = msg["content"].split()
trim_words = max(1, len(words) - excess // 4)
msg["content"] = " ".join(words[:trim_words]) + " [truncated]"
break
return system_messages + other_messages
def call_with_overflow_recovery(model: str, messages: list, **kwargs) -> dict:
try:
result = call_with_retry(model, messages, **kwargs)
return {"success": True, "result": result, "truncated": False}
except openai.BadRequestError as e:
if "maximum context length" in str(e).lower() or "too long" in str(e).lower():
logger.warning("Context overflow. Truncating and retrying.")
truncated_messages = truncate_messages_to_fit(messages, model)
result = call_with_retry(model, truncated_messages, **kwargs)
return {"success": True, "result": result, "truncated": True}
raise
Step 4: Build a Checkpoint System for Long Batch Jobs
Save progress so a crashed job can resume from where it stopped.
import json
from pathlib import Path
from datetime import datetime
class BatchCheckpoint:
def __init__(self, job_name: str, checkpoint_dir: str = "./checkpoints"):
self.job_name = job_name
self.checkpoint_path = Path(checkpoint_dir) / f"{job_name}.json"
Path(checkpoint_dir).mkdir(exist_ok=True)
self.state = self._load()
def _load(self) -> dict:
if self.checkpoint_path.exists():
data = json.loads(self.checkpoint_path.read_text())
print(f"Resuming job '{self.job_name}' from item {data['completed_count']}/{data['total_count']}")
return data
return {
"job_name": self.job_name,
"started_at": datetime.now().isoformat(),
"completed_count": 0,
"total_count": 0,
"completed_ids": [],
"results": {},
"errors": {}
}
def save(self):
self.checkpoint_path.write_text(json.dumps(self.state, indent=2))
def is_done(self, item_id: str) -> bool:
return item_id in self.state["completed_ids"]
def mark_done(self, item_id: str, result):
self.state["completed_ids"].append(item_id)
self.state["results"][item_id] = result
self.state["completed_count"] += 1
self.save()
def mark_error(self, item_id: str, error: str):
self.state["errors"][item_id] = error
self.save()
def set_total(self, total: int):
self.state["total_count"] = total
self.save()
def clear(self):
if self.checkpoint_path.exists():
self.checkpoint_path.unlink()
Step 5: Build a Resilient Batch Processor
Combine retry logic and checkpointing into a batch processor that survives failures.
def process_batch_with_recovery(
items: list[dict],
process_fn,
job_name: str,
id_field: str = "id",
max_errors: int = 10
) -> dict:
checkpoint = BatchCheckpoint(job_name)
checkpoint.set_total(len(items))
consecutive_errors = 0
processed = 0
skipped = 0
for item in items:
item_id = str(item.get(id_field, hash(str(item))))
if checkpoint.is_done(item_id):
skipped += 1
continue
try:
result = process_fn(item)
checkpoint.mark_done(item_id, result)
processed += 1
consecutive_errors = 0
if processed % 10 == 0:
print(f"Progress: {processed + skipped}/{len(items)} (errors: {len(checkpoint.state['errors'])})")
except RetryError as e:
logger.error(f"Max retries exceeded for {item_id}: {e}")
checkpoint.mark_error(item_id, str(e))
consecutive_errors += 1
except NonRetryableError as e:
logger.error(f"Non-retryable error for {item_id}: {e}")
checkpoint.mark_error(item_id, str(e))
consecutive_errors += 1
except Exception as e:
logger.error(f"Unexpected error for {item_id}: {e}")
checkpoint.mark_error(item_id, str(e))
consecutive_errors += 1
if consecutive_errors >= max_errors:
logger.critical(f"Too many consecutive errors ({max_errors}). Stopping job.")
break
summary = {
"total": len(items),
"processed": processed,
"skipped_already_done": skipped,
"errors": len(checkpoint.state["errors"]),
"success_rate": round(processed / max(1, processed + len(checkpoint.state["errors"])) * 100, 1)
}
if len(checkpoint.state["errors"]) == 0:
checkpoint.clear()
return summary
class NonRetryableError(Exception):
pass
Step 6: Add Dead Letter Queue for Failed Items
Capture permanently failed items for manual review instead of silently dropping them.
import csv
from datetime import datetime
def export_failed_items(job_name: str, original_items: list[dict], output_path: str = None):
checkpoint = BatchCheckpoint(job_name)
errors = checkpoint.state.get("errors", {})
if not errors:
print("No failed items.")
return
failed_items = [
item for item in original_items
if str(item.get("id", "")) in errors
]
output_path = output_path or f"failed_{job_name}_{datetime.now().strftime('%Y%m%d')}.csv"
if failed_items:
keys = list(failed_items[0].keys()) + ["error_reason"]
with open(output_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=keys)
writer.writeheader()
for item in failed_items:
item_id = str(item.get("id", ""))
item["error_reason"] = errors.get(item_id, "unknown")
writer.writerow(item)
print(f"Exported {len(failed_items)} failed items to {output_path}")
return output_path
What to Build Next
- Add Slack or email alerts that fire when a batch job error rate exceeds 5% so you know about failures before your morning standup
- Build a partial result merger that combines results from multiple interrupted runs of the same job into one clean output file
- Implement a circuit breaker that pauses the entire pipeline for 5 minutes when a provider returns a 529 or sustained rate limits, rather than hammering a degraded service
Related Reading
- How to Write System Prompts That Control AI Behavior - stable prompts reduce the non-retryable errors that kill batches
- How to Build AI Guardrails for Safe Outputs - guardrails catch bad outputs that would otherwise pollute your downstream data
- How to Use AI for Automated Data Extraction - document extraction pipelines are the most common place error recovery saves the day
Want this system built for your business?
Get a free assessment. We will map every system your business needs and show you the ROI.
Get Your Free Assessment