Systems Library / AI Model Setup / How to Build AI Request Throttling Systems
AI Model Setup routing optimization

How to Build AI Request Throttling Systems

Control AI API request rates to stay within budgets and rate limits.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

A workflow I built for a client hit the Anthropic rate limit on day one of a product launch. Tier 1 API access, 1,000 requests per minute, and their traffic was doing 1,200 per minute at peak. Every 429 error showed a failed response to a real customer. Building ai api request throttling rate control into the system before the next launch meant zero rate limit errors, automatic back-pressure when traffic spikes, and predictable costs at every usage tier.

Throttling is not just for protecting against provider limits. It's also a budget enforcement tool. If you allow unlimited concurrent AI calls, a rogue workflow can burn a month's budget in an afternoon. Rate limits at the application layer prevent that.

What You Need Before Starting

Step 1: Understand What You're Limiting

There are three things to throttle independently:

# Anthropic Tier 1 example limits (check your actual tier at console.anthropic.com)
PROVIDER_LIMITS = {
    "anthropic": {
        "requests_per_minute": 1000,
        "tokens_per_minute":   100_000,
        "tokens_per_day":      2_500_000,
    },
    "openai": {
        "requests_per_minute": 500,
        "tokens_per_minute":   200_000,
        "tokens_per_day":      None,  # varies by tier
    }
}

# Application-level limits (what YOU want to enforce regardless of provider limits)
APP_LIMITS = {
    "global": {"requests_per_minute": 800},  # stay under provider limit
    "team:marketing": {"requests_per_minute": 100},
    "team:engineering": {"requests_per_minute": 300},
    "workflow:nightly-batch": {"requests_per_minute": 50},
}

Set your application limits at 80% of the provider limit. That gap absorbs spikes and prevents you from hitting the hard ceiling.

Step 2: Build a Token Bucket Throttler

Token bucket is the right algorithm here. It allows bursts up to the bucket capacity but maintains an average rate.

import time
import redis
import math

r = redis.Redis(host='localhost', port=6379, db=2)

class TokenBucketThrottler:
    def __init__(self, key: str, rate: float, capacity: int):
        """
        key:      Redis key for this throttler
        rate:     tokens added per second (e.g., 10 req/min = 10/60 = 0.167)
        capacity: max bucket size (controls burst size)
        """
        self.key      = f"throttle:{key}"
        self.rate     = rate
        self.capacity = capacity
    
    def acquire(self, tokens: int = 1, timeout: float = 30.0) -> bool:
        """
        Try to acquire 'tokens' from the bucket.
        Returns True if acquired (proceed), False if timed out.
        Blocks until tokens are available or timeout is reached.
        """
        deadline = time.time() + timeout
        
        while time.time() < deadline:
            now = time.time()
            
            # Lua script for atomic check-and-decrement
            lua_script = """
                local key      = KEYS[1]
                local rate     = tonumber(ARGV[1])
                local capacity = tonumber(ARGV[2])
                local now      = tonumber(ARGV[3])
                local tokens   = tonumber(ARGV[4])
                
                local last = tonumber(redis.call('HGET', key, 'last') or now)
                local bucket = tonumber(redis.call('HGET', key, 'tokens') or capacity)
                
                -- Refill bucket based on elapsed time
                local elapsed = math.max(0, now - last)
                bucket = math.min(capacity, bucket + elapsed * rate)
                
                if bucket >= tokens then
                    bucket = bucket - tokens
                    redis.call('HMSET', key, 'tokens', bucket, 'last', now)
                    redis.call('EXPIRE', key, 3600)
                    return 1
                else
                    redis.call('HMSET', key, 'tokens', bucket, 'last', now)
                    redis.call('EXPIRE', key, 3600)
                    return 0
                end
            """
            
            result = r.eval(lua_script, 1, self.key,
                            self.rate, self.capacity, now, tokens)
            
            if result == 1:
                return True
            
            # Wait proportional to deficit
            wait = tokens / self.rate
            time.sleep(min(wait, 0.5))
        
        return False  # timed out

# Create throttlers
_throttlers: dict[str, TokenBucketThrottler] = {}

def get_throttler(scope: str) -> TokenBucketThrottler:
    if scope not in _throttlers:
        limits = APP_LIMITS.get(scope, APP_LIMITS["global"])
        rpm = limits["requests_per_minute"]
        _throttlers[scope] = TokenBucketThrottler(
            key=scope,
            rate=rpm / 60.0,
            capacity=min(rpm, 50)  # burst up to 50 requests
        )
    return _throttlers[scope]

Step 3: Wrap AI Calls with Throttling

Every AI call acquires a token before proceeding.

import anthropic
import logging

logger = logging.getLogger("throttler")
_ai = anthropic.Anthropic()

class ThrottleTimeoutError(Exception):
    pass

def throttled_ai_call(
    prompt: str,
    scope: str = "global",
    model: str = "claude-haiku-3",
    timeout: float = 30.0
) -> str:
    throttler = get_throttler(scope)
    
    acquired = throttler.acquire(tokens=1, timeout=timeout)
    if not acquired:
        raise ThrottleTimeoutError(
            f"Rate limit wait exceeded {timeout}s for scope '{scope}'"
        )
    
    logger.debug(f"Throttle acquired for scope={scope}")
    
    response = _ai.messages.create(
        model=model,
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.content[0].text

Step 4: Add Token-Based Throttling

Requests per minute is only half the picture. Providers also cap tokens per minute, and a single large request can blow that.

class TokenUsageThrottler:
    """Limits tokens per minute, not just requests per minute."""
    
    def __init__(self, key: str, tokens_per_minute: int):
        self.key = f"tokthrottle:{key}"
        self.tpm = tokens_per_minute
    
    def reserve(self, estimated_tokens: int, timeout: float = 30.0) -> bool:
        """Reserve estimated_tokens against the per-minute budget."""
        deadline = time.time() + timeout
        minute_key = f"{self.key}:{int(time.time() // 60)}"
        
        while time.time() < deadline:
            current = int(r.get(minute_key) or 0)
            if current + estimated_tokens <= self.tpm:
                r.incrby(minute_key, estimated_tokens)
                r.expire(minute_key, 120)
                return True
            wait = 60 - (time.time() % 60)  # wait until next minute
            time.sleep(min(wait, 2.0))
        
        return False

_token_throttler = TokenUsageThrottler("global", tokens_per_minute=80_000)  # 80% of limit

def estimate_prompt_tokens(prompt: str) -> int:
    return len(prompt) // 3 + 200  # rough estimate including expected output

Step 5: Handle 429 Errors with Exponential Backoff

Even with application-level throttling, provider 429s can still happen. Handle them gracefully.

import time

def ai_call_with_backoff(prompt: str, model: str = "claude-haiku-3",
                          max_retries: int = 5) -> str:
    base_delay = 1.0
    
    for attempt in range(max_retries):
        try:
            response = _ai.messages.create(
                model=model, max_tokens=1024,
                messages=[{"role": "user", "content": prompt}]
            )
            return response.content[0].text
        
        except anthropic.RateLimitError as e:
            if attempt == max_retries - 1:
                raise
            # Exponential backoff with jitter
            delay = base_delay * (2 ** attempt) + (time.time() % 1)
            logger.warning(f"Rate limited. Retrying in {delay:.1f}s (attempt {attempt+1})")
            time.sleep(delay)
        
        except anthropic.APIStatusError as e:
            if e.status_code == 529:  # Anthropic overloaded
                delay = base_delay * (2 ** attempt)
                logger.warning(f"API overloaded. Retrying in {delay:.1f}s")
                time.sleep(delay)
            else:
                raise
    
    raise Exception(f"Failed after {max_retries} retries")

Step 6: Monitor Throttle Pressure

Know when your throttles are under pressure so you can adjust limits or scale capacity.

def throttle_stats(scope: str = "global") -> dict:
    """Returns current bucket state for a throttler."""
    throttler = get_throttler(scope)
    key = throttler.key
    
    data = r.hgetall(key)
    if not data:
        return {"scope": scope, "bucket_tokens": throttler.capacity, "pressure": "idle"}
    
    bucket = float(data.get(b"tokens", throttler.capacity))
    pct_full = bucket / throttler.capacity
    
    pressure = "low"
    if pct_full < 0.3:
        pressure = "high"
    elif pct_full < 0.6:
        pressure = "medium"
    
    return {
        "scope": scope,
        "bucket_tokens": round(bucket, 2),
        "bucket_capacity": throttler.capacity,
        "pct_available": round(pct_full * 100, 1),
        "pressure": pressure
    }

# Log this every minute in production
print(throttle_stats("global"))
print(throttle_stats("team:marketing"))

What to Build Next

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems