How to Handle AI API Rate Limits Gracefully
Build retry logic and rate limit handling for production AI applications.
Jay Banlasan
The AI Systems Guy
Rate limit errors are the first thing that breaks a production AI system. The ai api rate limit handling best practices I follow have saved dozens of client automations from failing mid-run. The core technique is exponential backoff with jitter: wait, retry, wait longer, retry again. Done right, your code handles rate limits transparently without any manual intervention.
Every AI provider has different limits. OpenAI measures requests per minute (RPM) and tokens per minute (TPM). Anthropic measures requests per minute. Groq measures tokens per minute, aggressively. Knowing which limit you are hitting tells you whether to slow down requests or reduce token size.
What You Need Before Starting
- API key for at least one provider set up
tenacitylibrary for cleaner retry logic (optional but recommended)- Understanding of your provider's rate limit tier
- Python 3.9+
Step 1: The Manual Retry Pattern
The simplest version: catch the rate limit error and sleep before retrying:
import os
import time
from openai import OpenAI, RateLimitError
from dotenv import load_dotenv
load_dotenv()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
def call_with_retry(messages: list, model: str = "gpt-4o", max_retries: int = 5) -> str:
"""
Call OpenAI API with exponential backoff on rate limit errors.
"""
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model=model,
messages=messages,
max_tokens=1000
)
return response.choices[0].message.content
except RateLimitError as e:
if attempt == max_retries - 1:
raise # Out of retries, raise the error
# Exponential backoff: 1s, 2s, 4s, 8s, 16s
wait_time = (2 ** attempt) + (0.1 * attempt) # Add small jitter
print(f"Rate limited. Waiting {wait_time:.1f}s (attempt {attempt + 1}/{max_retries})")
time.sleep(wait_time)
# Usage
result = call_with_retry([{"role": "user", "content": "Summarize AI trends in 2024"}])
print(result)
Step 2: Use Tenacity for Cleaner Retry Logic
The tenacity library handles retry logic declaratively:
pip install tenacity
import random
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
wait_random,
retry_if_exception_type,
before_sleep_log
)
import logging
from openai import RateLimitError, APIConnectionError, APITimeoutError
import anthropic
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# OpenAI with tenacity
@retry(
reraise=True,
stop=stop_after_attempt(6),
wait=wait_exponential(multiplier=1, min=1, max=60) + wait_random(0, 2),
retry=retry_if_exception_type((RateLimitError, APIConnectionError, APITimeoutError)),
before_sleep=before_sleep_log(logger, logging.WARNING)
)
def openai_call_with_tenacity(messages: list, model: str = "gpt-4o") -> str:
"""OpenAI call with automatic retry on rate limits and connection errors."""
response = client.chat.completions.create(
model=model,
messages=messages,
max_tokens=1000
)
return response.choices[0].message.content
# Anthropic with tenacity
claude_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
@retry(
reraise=True,
stop=stop_after_attempt(6),
wait=wait_exponential(multiplier=1, min=1, max=60),
retry=retry_if_exception_type(anthropic.RateLimitError)
)
def claude_call_with_tenacity(prompt: str, system: str = None) -> str:
"""Claude call with automatic retry on rate limits."""
kwargs = {
"model": "claude-opus-4-5",
"max_tokens": 1000,
"messages": [{"role": "user", "content": prompt}]
}
if system:
kwargs["system"] = system
response = claude_client.messages.create(**kwargs)
return response.content[0].text
Step 3: Rate Limit Your Own Requests
Proactive rate limiting prevents hitting the API's limits in the first place:
import threading
from collections import deque
from datetime import datetime
class RateLimiter:
"""
Token bucket rate limiter for controlling API request frequency.
"""
def __init__(self, requests_per_minute: int):
self.requests_per_minute = requests_per_minute
self.interval = 60.0 / requests_per_minute # Seconds between requests
self.lock = threading.Lock()
self.last_request_time = 0
self.request_times = deque(maxlen=requests_per_minute)
def wait(self) -> None:
"""Block until it is safe to make the next request."""
with self.lock:
now = time.time()
if self.request_times:
# Check if we have made too many requests in the last minute
one_minute_ago = now - 60
recent_requests = [t for t in self.request_times if t > one_minute_ago]
if len(recent_requests) >= self.requests_per_minute:
# Need to wait until oldest request is more than 1 minute old
oldest = min(recent_requests)
wait_until = oldest + 60
sleep_time = wait_until - now
if sleep_time > 0:
time.sleep(sleep_time)
self.request_times.append(time.time())
# Usage: throttle to 50 requests per minute
limiter = RateLimiter(requests_per_minute=50)
def rate_limited_call(prompt: str) -> str:
limiter.wait()
return openai_call_with_tenacity([{"role": "user", "content": prompt}])
Step 4: Process Large Batches Without Hitting Limits
For processing hundreds or thousands of items:
import concurrent.futures
from typing import Callable
def process_batch(
items: list,
processor_fn: Callable,
max_workers: int = 5,
requests_per_minute: int = 50,
show_progress: bool = True
) -> list:
"""
Process a large batch of items with rate limiting and parallel execution.
Args:
items: List of items to process
processor_fn: Function that takes one item and returns a result
max_workers: Number of parallel workers
requests_per_minute: Max API calls per minute
show_progress: Print progress updates
Returns:
List of results in the same order as inputs
"""
limiter = RateLimiter(requests_per_minute=requests_per_minute)
results = [None] * len(items)
completed = 0
def process_with_limit(index_and_item):
index, item = index_and_item
limiter.wait()
result = processor_fn(item)
return index, result
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(process_with_limit, (i, item)): i
for i, item in enumerate(items)
}
for future in concurrent.futures.as_completed(futures):
index, result = future.result()
results[index] = result
completed += 1
if show_progress and completed % 10 == 0:
print(f"Processed {completed}/{len(items)} items")
return results
# Example: Classify 200 emails
sample_emails = [f"Email content number {i}" for i in range(200)]
def classify_one(email: str) -> str:
return rate_limited_call(f"Classify this email in one word (billing/support/sales/other): {email}")
results = process_batch(sample_emails, classify_one, max_workers=5, requests_per_minute=50)
print(f"Processed {len(results)} emails")
Step 5: Handle Different Error Types Appropriately
Not all errors should be retried the same way:
from openai import (
RateLimitError,
APIConnectionError,
APITimeoutError,
APIStatusError,
AuthenticationError,
BadRequestError
)
def robust_api_call(messages: list, model: str = "gpt-4o") -> str:
"""
Handle different API errors with appropriate responses.
"""
max_retries = 5
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model=model,
messages=messages,
max_tokens=1000,
timeout=30
)
return response.choices[0].message.content
except RateLimitError:
wait = 2 ** attempt
print(f"Rate limited. Retry {attempt + 1} in {wait}s")
time.sleep(wait)
except APITimeoutError:
wait = 5 * (attempt + 1)
print(f"Timeout. Retry {attempt + 1} in {wait}s")
time.sleep(wait)
except APIConnectionError:
wait = 3 * (attempt + 1)
print(f"Connection error. Retry {attempt + 1} in {wait}s")
time.sleep(wait)
except AuthenticationError:
raise # Bad API key, don't retry
except BadRequestError as e:
raise # Bad input, don't retry
except APIStatusError as e:
if e.status_code >= 500:
wait = 2 ** attempt
print(f"Server error {e.status_code}. Retry {attempt + 1} in {wait}s")
time.sleep(wait)
else:
raise # 4xx errors are client-side, don't retry
raise Exception(f"API call failed after {max_retries} attempts")
What to Build Next
- Add a circuit breaker pattern that stops all requests if error rate exceeds a threshold
- Log all rate limit hits to a file so you can analyze patterns and adjust your concurrency
- Set up monitoring alerts when retry counts spike, which indicates hitting your tier limits
Related Reading
- How to Connect GPT-4 to Your Business via API - Basic API setup before adding retry logic
- How to Build a Multi-Turn Conversation with Claude - Long conversations consume tokens fast and can hit TPM limits
- How to Set Up Groq for Ultra-Fast AI Inference - Groq has strict TPM limits that make rate limiting critical
Want this system built for your business?
Get a free assessment. We will map every system your business needs and show you the ROI.
Get Your Free Assessment