How to Build AI Request Throttling Systems
Control AI API request rates to stay within budgets and rate limits.
Jay Banlasan
The AI Systems Guy
A workflow I built for a client hit the Anthropic rate limit on day one of a product launch. Tier 1 API access, 1,000 requests per minute, and their traffic was doing 1,200 per minute at peak. Every 429 error showed a failed response to a real customer. Building ai api request throttling rate control into the system before the next launch meant zero rate limit errors, automatic back-pressure when traffic spikes, and predictable costs at every usage tier.
Throttling is not just for protecting against provider limits. It's also a budget enforcement tool. If you allow unlimited concurrent AI calls, a rogue workflow can burn a month's budget in an afternoon. Rate limits at the application layer prevent that.
What You Need Before Starting
- Python 3.10+
- Redis (
pip install redis) for distributed rate limiting asynciofor async throttling- Your provider's documented rate limits (check your account tier)
Step 1: Understand What You're Limiting
There are three things to throttle independently:
# Anthropic Tier 1 example limits (check your actual tier at console.anthropic.com)
PROVIDER_LIMITS = {
"anthropic": {
"requests_per_minute": 1000,
"tokens_per_minute": 100_000,
"tokens_per_day": 2_500_000,
},
"openai": {
"requests_per_minute": 500,
"tokens_per_minute": 200_000,
"tokens_per_day": None, # varies by tier
}
}
# Application-level limits (what YOU want to enforce regardless of provider limits)
APP_LIMITS = {
"global": {"requests_per_minute": 800}, # stay under provider limit
"team:marketing": {"requests_per_minute": 100},
"team:engineering": {"requests_per_minute": 300},
"workflow:nightly-batch": {"requests_per_minute": 50},
}
Set your application limits at 80% of the provider limit. That gap absorbs spikes and prevents you from hitting the hard ceiling.
Step 2: Build a Token Bucket Throttler
Token bucket is the right algorithm here. It allows bursts up to the bucket capacity but maintains an average rate.
import time
import redis
import math
r = redis.Redis(host='localhost', port=6379, db=2)
class TokenBucketThrottler:
def __init__(self, key: str, rate: float, capacity: int):
"""
key: Redis key for this throttler
rate: tokens added per second (e.g., 10 req/min = 10/60 = 0.167)
capacity: max bucket size (controls burst size)
"""
self.key = f"throttle:{key}"
self.rate = rate
self.capacity = capacity
def acquire(self, tokens: int = 1, timeout: float = 30.0) -> bool:
"""
Try to acquire 'tokens' from the bucket.
Returns True if acquired (proceed), False if timed out.
Blocks until tokens are available or timeout is reached.
"""
deadline = time.time() + timeout
while time.time() < deadline:
now = time.time()
# Lua script for atomic check-and-decrement
lua_script = """
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local tokens = tonumber(ARGV[4])
local last = tonumber(redis.call('HGET', key, 'last') or now)
local bucket = tonumber(redis.call('HGET', key, 'tokens') or capacity)
-- Refill bucket based on elapsed time
local elapsed = math.max(0, now - last)
bucket = math.min(capacity, bucket + elapsed * rate)
if bucket >= tokens then
bucket = bucket - tokens
redis.call('HMSET', key, 'tokens', bucket, 'last', now)
redis.call('EXPIRE', key, 3600)
return 1
else
redis.call('HMSET', key, 'tokens', bucket, 'last', now)
redis.call('EXPIRE', key, 3600)
return 0
end
"""
result = r.eval(lua_script, 1, self.key,
self.rate, self.capacity, now, tokens)
if result == 1:
return True
# Wait proportional to deficit
wait = tokens / self.rate
time.sleep(min(wait, 0.5))
return False # timed out
# Create throttlers
_throttlers: dict[str, TokenBucketThrottler] = {}
def get_throttler(scope: str) -> TokenBucketThrottler:
if scope not in _throttlers:
limits = APP_LIMITS.get(scope, APP_LIMITS["global"])
rpm = limits["requests_per_minute"]
_throttlers[scope] = TokenBucketThrottler(
key=scope,
rate=rpm / 60.0,
capacity=min(rpm, 50) # burst up to 50 requests
)
return _throttlers[scope]
Step 3: Wrap AI Calls with Throttling
Every AI call acquires a token before proceeding.
import anthropic
import logging
logger = logging.getLogger("throttler")
_ai = anthropic.Anthropic()
class ThrottleTimeoutError(Exception):
pass
def throttled_ai_call(
prompt: str,
scope: str = "global",
model: str = "claude-haiku-3",
timeout: float = 30.0
) -> str:
throttler = get_throttler(scope)
acquired = throttler.acquire(tokens=1, timeout=timeout)
if not acquired:
raise ThrottleTimeoutError(
f"Rate limit wait exceeded {timeout}s for scope '{scope}'"
)
logger.debug(f"Throttle acquired for scope={scope}")
response = _ai.messages.create(
model=model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
Step 4: Add Token-Based Throttling
Requests per minute is only half the picture. Providers also cap tokens per minute, and a single large request can blow that.
class TokenUsageThrottler:
"""Limits tokens per minute, not just requests per minute."""
def __init__(self, key: str, tokens_per_minute: int):
self.key = f"tokthrottle:{key}"
self.tpm = tokens_per_minute
def reserve(self, estimated_tokens: int, timeout: float = 30.0) -> bool:
"""Reserve estimated_tokens against the per-minute budget."""
deadline = time.time() + timeout
minute_key = f"{self.key}:{int(time.time() // 60)}"
while time.time() < deadline:
current = int(r.get(minute_key) or 0)
if current + estimated_tokens <= self.tpm:
r.incrby(minute_key, estimated_tokens)
r.expire(minute_key, 120)
return True
wait = 60 - (time.time() % 60) # wait until next minute
time.sleep(min(wait, 2.0))
return False
_token_throttler = TokenUsageThrottler("global", tokens_per_minute=80_000) # 80% of limit
def estimate_prompt_tokens(prompt: str) -> int:
return len(prompt) // 3 + 200 # rough estimate including expected output
Step 5: Handle 429 Errors with Exponential Backoff
Even with application-level throttling, provider 429s can still happen. Handle them gracefully.
import time
def ai_call_with_backoff(prompt: str, model: str = "claude-haiku-3",
max_retries: int = 5) -> str:
base_delay = 1.0
for attempt in range(max_retries):
try:
response = _ai.messages.create(
model=model, max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
except anthropic.RateLimitError as e:
if attempt == max_retries - 1:
raise
# Exponential backoff with jitter
delay = base_delay * (2 ** attempt) + (time.time() % 1)
logger.warning(f"Rate limited. Retrying in {delay:.1f}s (attempt {attempt+1})")
time.sleep(delay)
except anthropic.APIStatusError as e:
if e.status_code == 529: # Anthropic overloaded
delay = base_delay * (2 ** attempt)
logger.warning(f"API overloaded. Retrying in {delay:.1f}s")
time.sleep(delay)
else:
raise
raise Exception(f"Failed after {max_retries} retries")
Step 6: Monitor Throttle Pressure
Know when your throttles are under pressure so you can adjust limits or scale capacity.
def throttle_stats(scope: str = "global") -> dict:
"""Returns current bucket state for a throttler."""
throttler = get_throttler(scope)
key = throttler.key
data = r.hgetall(key)
if not data:
return {"scope": scope, "bucket_tokens": throttler.capacity, "pressure": "idle"}
bucket = float(data.get(b"tokens", throttler.capacity))
pct_full = bucket / throttler.capacity
pressure = "low"
if pct_full < 0.3:
pressure = "high"
elif pct_full < 0.6:
pressure = "medium"
return {
"scope": scope,
"bucket_tokens": round(bucket, 2),
"bucket_capacity": throttler.capacity,
"pct_available": round(pct_full * 100, 1),
"pressure": pressure
}
# Log this every minute in production
print(throttle_stats("global"))
print(throttle_stats("team:marketing"))
What to Build Next
- Add a priority pass-through so CRITICAL priority requests bypass throttling when the bucket is empty
- Build a throttle pressure dashboard that shows real-time bucket levels per scope in your ops interface
- Integrate throttle events into your usage analytics to see which scopes hit limits most often
Related Reading
- How to Build a Multi-Model AI Router - throttle at the router level so each model has its own rate limit
- How to Build Automatic Model Failover Systems - a throttled-out primary is one valid trigger for failover to a backup provider
- How to Implement Semantic Caching for AI Queries - caching reduces the total request volume that throttling needs to manage
Want this system built for your business?
Get a free assessment. We will map every system your business needs and show you the ROI.
Get Your Free Assessment