Systems Library / AI Model Setup / How to Handle AI API Rate Limits Gracefully
AI Model Setup foundations

How to Handle AI API Rate Limits Gracefully

Build retry logic and rate limit handling for production AI applications.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

Rate limit errors are the first thing that breaks a production AI system. The ai api rate limit handling best practices I follow have saved dozens of client automations from failing mid-run. The core technique is exponential backoff with jitter: wait, retry, wait longer, retry again. Done right, your code handles rate limits transparently without any manual intervention.

Every AI provider has different limits. OpenAI measures requests per minute (RPM) and tokens per minute (TPM). Anthropic measures requests per minute. Groq measures tokens per minute, aggressively. Knowing which limit you are hitting tells you whether to slow down requests or reduce token size.

What You Need Before Starting

Step 1: The Manual Retry Pattern

The simplest version: catch the rate limit error and sleep before retrying:

import os
import time
from openai import OpenAI, RateLimitError
from dotenv import load_dotenv

load_dotenv()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))


def call_with_retry(messages: list, model: str = "gpt-4o", max_retries: int = 5) -> str:
    """
    Call OpenAI API with exponential backoff on rate limit errors.
    """
    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=messages,
                max_tokens=1000
            )
            return response.choices[0].message.content
            
        except RateLimitError as e:
            if attempt == max_retries - 1:
                raise  # Out of retries, raise the error
            
            # Exponential backoff: 1s, 2s, 4s, 8s, 16s
            wait_time = (2 ** attempt) + (0.1 * attempt)  # Add small jitter
            print(f"Rate limited. Waiting {wait_time:.1f}s (attempt {attempt + 1}/{max_retries})")
            time.sleep(wait_time)


# Usage
result = call_with_retry([{"role": "user", "content": "Summarize AI trends in 2024"}])
print(result)

Step 2: Use Tenacity for Cleaner Retry Logic

The tenacity library handles retry logic declaratively:

pip install tenacity
import random
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    wait_random,
    retry_if_exception_type,
    before_sleep_log
)
import logging
from openai import RateLimitError, APIConnectionError, APITimeoutError
import anthropic

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# OpenAI with tenacity
@retry(
    reraise=True,
    stop=stop_after_attempt(6),
    wait=wait_exponential(multiplier=1, min=1, max=60) + wait_random(0, 2),
    retry=retry_if_exception_type((RateLimitError, APIConnectionError, APITimeoutError)),
    before_sleep=before_sleep_log(logger, logging.WARNING)
)
def openai_call_with_tenacity(messages: list, model: str = "gpt-4o") -> str:
    """OpenAI call with automatic retry on rate limits and connection errors."""
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        max_tokens=1000
    )
    return response.choices[0].message.content


# Anthropic with tenacity
claude_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))


@retry(
    reraise=True,
    stop=stop_after_attempt(6),
    wait=wait_exponential(multiplier=1, min=1, max=60),
    retry=retry_if_exception_type(anthropic.RateLimitError)
)
def claude_call_with_tenacity(prompt: str, system: str = None) -> str:
    """Claude call with automatic retry on rate limits."""
    kwargs = {
        "model": "claude-opus-4-5",
        "max_tokens": 1000,
        "messages": [{"role": "user", "content": prompt}]
    }
    if system:
        kwargs["system"] = system
    
    response = claude_client.messages.create(**kwargs)
    return response.content[0].text

Step 3: Rate Limit Your Own Requests

Proactive rate limiting prevents hitting the API's limits in the first place:

import threading
from collections import deque
from datetime import datetime


class RateLimiter:
    """
    Token bucket rate limiter for controlling API request frequency.
    """
    
    def __init__(self, requests_per_minute: int):
        self.requests_per_minute = requests_per_minute
        self.interval = 60.0 / requests_per_minute  # Seconds between requests
        self.lock = threading.Lock()
        self.last_request_time = 0
        self.request_times = deque(maxlen=requests_per_minute)
    
    def wait(self) -> None:
        """Block until it is safe to make the next request."""
        with self.lock:
            now = time.time()
            
            if self.request_times:
                # Check if we have made too many requests in the last minute
                one_minute_ago = now - 60
                recent_requests = [t for t in self.request_times if t > one_minute_ago]
                
                if len(recent_requests) >= self.requests_per_minute:
                    # Need to wait until oldest request is more than 1 minute old
                    oldest = min(recent_requests)
                    wait_until = oldest + 60
                    sleep_time = wait_until - now
                    if sleep_time > 0:
                        time.sleep(sleep_time)
            
            self.request_times.append(time.time())


# Usage: throttle to 50 requests per minute
limiter = RateLimiter(requests_per_minute=50)


def rate_limited_call(prompt: str) -> str:
    limiter.wait()
    return openai_call_with_tenacity([{"role": "user", "content": prompt}])

Step 4: Process Large Batches Without Hitting Limits

For processing hundreds or thousands of items:

import concurrent.futures
from typing import Callable


def process_batch(
    items: list,
    processor_fn: Callable,
    max_workers: int = 5,
    requests_per_minute: int = 50,
    show_progress: bool = True
) -> list:
    """
    Process a large batch of items with rate limiting and parallel execution.
    
    Args:
        items: List of items to process
        processor_fn: Function that takes one item and returns a result
        max_workers: Number of parallel workers
        requests_per_minute: Max API calls per minute
        show_progress: Print progress updates
    
    Returns:
        List of results in the same order as inputs
    """
    limiter = RateLimiter(requests_per_minute=requests_per_minute)
    results = [None] * len(items)
    completed = 0
    
    def process_with_limit(index_and_item):
        index, item = index_and_item
        limiter.wait()
        result = processor_fn(item)
        return index, result
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(process_with_limit, (i, item)): i 
            for i, item in enumerate(items)
        }
        
        for future in concurrent.futures.as_completed(futures):
            index, result = future.result()
            results[index] = result
            completed += 1
            
            if show_progress and completed % 10 == 0:
                print(f"Processed {completed}/{len(items)} items")
    
    return results


# Example: Classify 200 emails
sample_emails = [f"Email content number {i}" for i in range(200)]

def classify_one(email: str) -> str:
    return rate_limited_call(f"Classify this email in one word (billing/support/sales/other): {email}")

results = process_batch(sample_emails, classify_one, max_workers=5, requests_per_minute=50)
print(f"Processed {len(results)} emails")

Step 5: Handle Different Error Types Appropriately

Not all errors should be retried the same way:

from openai import (
    RateLimitError,
    APIConnectionError,
    APITimeoutError,
    APIStatusError,
    AuthenticationError,
    BadRequestError
)


def robust_api_call(messages: list, model: str = "gpt-4o") -> str:
    """
    Handle different API errors with appropriate responses.
    """
    max_retries = 5
    
    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=messages,
                max_tokens=1000,
                timeout=30
            )
            return response.choices[0].message.content
        
        except RateLimitError:
            wait = 2 ** attempt
            print(f"Rate limited. Retry {attempt + 1} in {wait}s")
            time.sleep(wait)
        
        except APITimeoutError:
            wait = 5 * (attempt + 1)
            print(f"Timeout. Retry {attempt + 1} in {wait}s")
            time.sleep(wait)
        
        except APIConnectionError:
            wait = 3 * (attempt + 1)
            print(f"Connection error. Retry {attempt + 1} in {wait}s")
            time.sleep(wait)
        
        except AuthenticationError:
            raise  # Bad API key, don't retry
        
        except BadRequestError as e:
            raise  # Bad input, don't retry
        
        except APIStatusError as e:
            if e.status_code >= 500:
                wait = 2 ** attempt
                print(f"Server error {e.status_code}. Retry {attempt + 1} in {wait}s")
                time.sleep(wait)
            else:
                raise  # 4xx errors are client-side, don't retry
    
    raise Exception(f"API call failed after {max_retries} attempts")

What to Build Next

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems