Systems Library / AI Model Setup / How to Build Error Recovery for AI Workflows
AI Model Setup advanced

How to Build Error Recovery for AI Workflows

Implement automatic error detection and recovery in AI processing pipelines.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

AI APIs fail. Rate limits hit, tokens expire, context windows overflow, and sometimes the model just returns garbage. A production workflow without error recovery is a time bomb. I have seen automation pipelines lose hours of work because one API hiccup crashed the whole job. The fix is a solid retry and recovery layer that handles errors gracefully without human intervention.

This matters most for batch jobs, overnight runs, and any workflow where a failure means reprocessing everything from scratch. Good error recovery means your pipeline finishes the job or fails cleanly with a log showing exactly where and why.

What You Need Before Starting

Step 1: Install Dependencies

pip install tenacity openai

tenacity is the best retry library for Python. It handles exponential backoff, jitter, and custom retry conditions with clean decorator syntax.

Step 2: Build the Retry Decorator

Wrap all API calls with exponential backoff and smart retry conditions.

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    wait_random_exponential,
    retry_if_exception_type,
    before_sleep_log,
    RetryError
)
import openai
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Errors worth retrying
RETRYABLE_ERRORS = (
    openai.RateLimitError,
    openai.APITimeoutError,
    openai.APIConnectionError,
    openai.InternalServerError,
)

# Errors not worth retrying (fail fast)
NON_RETRYABLE_ERRORS = (
    openai.AuthenticationError,
    openai.BadRequestError,
    openai.NotFoundError,
)

@retry(
    retry=retry_if_exception_type(RETRYABLE_ERRORS),
    wait=wait_random_exponential(min=1, max=60),
    stop=stop_after_attempt(5),
    before_sleep=before_sleep_log(logger, logging.WARNING)
)
def call_with_retry(model: str, messages: list, **kwargs) -> str:
    client = openai.OpenAI(api_key="YOUR_API_KEY")
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        **kwargs
    )
    return response.choices[0].message.content

Step 3: Add Context Window Overflow Recovery

When your input is too long, truncate intelligently and retry.

import tiktoken

def count_tokens(text: str, model: str = "gpt-4o") -> int:
    try:
        enc = tiktoken.encoding_for_model(model)
    except KeyError:
        enc = tiktoken.get_encoding("cl100k_base")
    return len(enc.encode(text))

MODEL_LIMITS = {
    "gpt-4o": 128000,
    "gpt-4o-mini": 128000,
    "gpt-3.5-turbo": 16385,
}

def truncate_messages_to_fit(messages: list, model: str, reserve_output: int = 2000) -> list:
    limit = MODEL_LIMITS.get(model, 16000) - reserve_output
    total_tokens = sum(count_tokens(m["content"], model) for m in messages)

    if total_tokens <= limit:
        return messages

    # Keep system message, truncate from oldest user messages
    system_messages = [m for m in messages if m["role"] == "system"]
    other_messages = [m for m in messages if m["role"] != "system"]

    while total_tokens > limit and len(other_messages) > 2:
        removed = other_messages.pop(0)
        total_tokens -= count_tokens(removed["content"], model)

    # If still over, truncate the longest user message
    if total_tokens > limit and other_messages:
        for msg in other_messages:
            if msg["role"] == "user":
                excess = total_tokens - limit
                words = msg["content"].split()
                trim_words = max(1, len(words) - excess // 4)
                msg["content"] = " ".join(words[:trim_words]) + " [truncated]"
                break

    return system_messages + other_messages

def call_with_overflow_recovery(model: str, messages: list, **kwargs) -> dict:
    try:
        result = call_with_retry(model, messages, **kwargs)
        return {"success": True, "result": result, "truncated": False}
    except openai.BadRequestError as e:
        if "maximum context length" in str(e).lower() or "too long" in str(e).lower():
            logger.warning("Context overflow. Truncating and retrying.")
            truncated_messages = truncate_messages_to_fit(messages, model)
            result = call_with_retry(model, truncated_messages, **kwargs)
            return {"success": True, "result": result, "truncated": True}
        raise

Step 4: Build a Checkpoint System for Long Batch Jobs

Save progress so a crashed job can resume from where it stopped.

import json
from pathlib import Path
from datetime import datetime

class BatchCheckpoint:
    def __init__(self, job_name: str, checkpoint_dir: str = "./checkpoints"):
        self.job_name = job_name
        self.checkpoint_path = Path(checkpoint_dir) / f"{job_name}.json"
        Path(checkpoint_dir).mkdir(exist_ok=True)
        self.state = self._load()

    def _load(self) -> dict:
        if self.checkpoint_path.exists():
            data = json.loads(self.checkpoint_path.read_text())
            print(f"Resuming job '{self.job_name}' from item {data['completed_count']}/{data['total_count']}")
            return data
        return {
            "job_name": self.job_name,
            "started_at": datetime.now().isoformat(),
            "completed_count": 0,
            "total_count": 0,
            "completed_ids": [],
            "results": {},
            "errors": {}
        }

    def save(self):
        self.checkpoint_path.write_text(json.dumps(self.state, indent=2))

    def is_done(self, item_id: str) -> bool:
        return item_id in self.state["completed_ids"]

    def mark_done(self, item_id: str, result):
        self.state["completed_ids"].append(item_id)
        self.state["results"][item_id] = result
        self.state["completed_count"] += 1
        self.save()

    def mark_error(self, item_id: str, error: str):
        self.state["errors"][item_id] = error
        self.save()

    def set_total(self, total: int):
        self.state["total_count"] = total
        self.save()

    def clear(self):
        if self.checkpoint_path.exists():
            self.checkpoint_path.unlink()

Step 5: Build a Resilient Batch Processor

Combine retry logic and checkpointing into a batch processor that survives failures.

def process_batch_with_recovery(
    items: list[dict],
    process_fn,
    job_name: str,
    id_field: str = "id",
    max_errors: int = 10
) -> dict:
    checkpoint = BatchCheckpoint(job_name)
    checkpoint.set_total(len(items))

    consecutive_errors = 0
    processed = 0
    skipped = 0

    for item in items:
        item_id = str(item.get(id_field, hash(str(item))))

        if checkpoint.is_done(item_id):
            skipped += 1
            continue

        try:
            result = process_fn(item)
            checkpoint.mark_done(item_id, result)
            processed += 1
            consecutive_errors = 0

            if processed % 10 == 0:
                print(f"Progress: {processed + skipped}/{len(items)} (errors: {len(checkpoint.state['errors'])})")

        except RetryError as e:
            logger.error(f"Max retries exceeded for {item_id}: {e}")
            checkpoint.mark_error(item_id, str(e))
            consecutive_errors += 1

        except NonRetryableError as e:
            logger.error(f"Non-retryable error for {item_id}: {e}")
            checkpoint.mark_error(item_id, str(e))
            consecutive_errors += 1

        except Exception as e:
            logger.error(f"Unexpected error for {item_id}: {e}")
            checkpoint.mark_error(item_id, str(e))
            consecutive_errors += 1

        if consecutive_errors >= max_errors:
            logger.critical(f"Too many consecutive errors ({max_errors}). Stopping job.")
            break

    summary = {
        "total": len(items),
        "processed": processed,
        "skipped_already_done": skipped,
        "errors": len(checkpoint.state["errors"]),
        "success_rate": round(processed / max(1, processed + len(checkpoint.state["errors"])) * 100, 1)
    }

    if len(checkpoint.state["errors"]) == 0:
        checkpoint.clear()

    return summary

class NonRetryableError(Exception):
    pass

Step 6: Add Dead Letter Queue for Failed Items

Capture permanently failed items for manual review instead of silently dropping them.

import csv
from datetime import datetime

def export_failed_items(job_name: str, original_items: list[dict], output_path: str = None):
    checkpoint = BatchCheckpoint(job_name)
    errors = checkpoint.state.get("errors", {})

    if not errors:
        print("No failed items.")
        return

    failed_items = [
        item for item in original_items
        if str(item.get("id", "")) in errors
    ]

    output_path = output_path or f"failed_{job_name}_{datetime.now().strftime('%Y%m%d')}.csv"

    if failed_items:
        keys = list(failed_items[0].keys()) + ["error_reason"]
        with open(output_path, "w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=keys)
            writer.writeheader()
            for item in failed_items:
                item_id = str(item.get("id", ""))
                item["error_reason"] = errors.get(item_id, "unknown")
                writer.writerow(item)

    print(f"Exported {len(failed_items)} failed items to {output_path}")
    return output_path

What to Build Next

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems