How to Implement AI Request Prioritization
Build priority queues so critical AI tasks run before batch processing.
Jay Banlasan
The AI Systems Guy
When I first wired up AI into production, every request hit the same queue in order of arrival. A low-priority nightly batch job was blocking a live customer-facing response. That's when I built a proper ai request priority queue management system. It took less than a day and immediately stopped the bleeding.
The business case is simple: not all AI tasks carry the same urgency. A customer waiting on a support reply is worth more right now than a marketing email digest that runs at midnight. Priority queues let you encode that business logic in code so the right requests always move to the front.
What You Need Before Starting
- Python 3.10+
redisandredis-pyinstalled (pip install redis)- An API key for your AI provider (OpenAI, Anthropic, etc.)
- Basic understanding of async Python (asyncio)
Step 1: Define Your Priority Levels
Start with three tiers. You can always add more, but three covers 90% of real use cases.
from enum import IntEnum
class Priority(IntEnum):
CRITICAL = 1 # live user-facing, customer support, auth flows
HIGH = 2 # internal tools, team dashboards, near-real-time
LOW = 3 # batch jobs, nightly summaries, bulk enrichment
Use IntEnum so you can compare and sort priorities numerically. Lower number = runs first.
Step 2: Build the Priority Queue with Redis Sorted Sets
Redis sorted sets are a natural fit. The score IS the priority. Lower scores dequeue first.
import redis
import json
import time
import uuid
r = redis.Redis(host='localhost', port=6379, db=0)
QUEUE_KEY = "ai:request:queue"
def enqueue(prompt: str, priority: Priority, metadata: dict = {}) -> str:
request_id = str(uuid.uuid4())
payload = json.dumps({
"id": request_id,
"prompt": prompt,
"priority": int(priority),
"metadata": metadata,
"enqueued_at": time.time()
})
# Score = priority * 1e10 + timestamp ensures FIFO within same priority tier
score = int(priority) * 1e10 + time.time()
r.zadd(QUEUE_KEY, {payload: score})
return request_id
def dequeue() -> dict | None:
# ZPOPMIN atomically pops the lowest-score (highest priority) item
result = r.zpopmin(QUEUE_KEY, 1)
if not result:
return None
payload, _ = result[0]
return json.loads(payload)
The scoring trick is important. Multiplying priority by 1e10 gives you enough headroom so that timestamp tiebreakers never leak into the wrong priority tier.
Step 3: Build the Worker Loop
The worker pulls requests from the queue and dispatches them to your AI provider.
import anthropic
import asyncio
client = anthropic.Anthropic()
def process_request(request: dict) -> str:
message = client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": request["prompt"]}]
)
return message.content[0].text
def run_worker(poll_interval: float = 0.1):
print("Worker started. Polling queue...")
while True:
request = dequeue()
if request:
print(f"Processing [{Priority(request['priority']).name}] request {request['id']}")
try:
result = process_request(request)
# Store result so the caller can retrieve it
r.setex(f"ai:result:{request['id']}", 300, result)
print(f"Done: {request['id']}")
except Exception as e:
print(f"Failed: {request['id']} — {e}")
else:
time.sleep(poll_interval)
Step 4: Add a Result Retrieval Pattern
Callers need a way to get results back. Use a simple poll-and-wait pattern keyed by request ID.
def get_result(request_id: str, timeout: int = 30) -> str | None:
deadline = time.time() + timeout
while time.time() < deadline:
result = r.get(f"ai:result:{request_id}")
if result:
return result.decode()
time.sleep(0.2)
return None # timed out
# Usage example
req_id = enqueue("Summarize this support ticket: ...", Priority.CRITICAL)
output = get_result(req_id, timeout=20)
print(output)
Step 5: Add Queue Depth Monitoring
You need visibility. If the LOW priority queue grows to 5,000 items while CRITICAL sits at zero, something is wrong with your worker pool.
def queue_stats() -> dict:
all_items = r.zrange(QUEUE_KEY, 0, -1, withscores=True)
stats = {p.name: 0 for p in Priority}
for payload, _ in all_items:
data = json.loads(payload)
tier = Priority(data["priority"]).name
stats[tier] += 1
return stats
# Log this every 60 seconds in production
print(queue_stats())
# {'CRITICAL': 0, 'HIGH': 3, 'LOW': 847}
Step 6: Run Multiple Workers for High-Volume Tiers
For production, run dedicated worker processes per priority tier. This prevents a flood of LOW priority work from starving CRITICAL even if the queue logic is correct.
# worker_critical.py — runs 3 processes via supervisor or systemd
QUEUE_KEY_CRITICAL = "ai:request:queue:critical"
# Split your enqueue function to route to tier-specific queues
# and run separate worker pools per tier
A simple approach: use three Redis queue keys (queue:critical, queue:high, queue:low) and three separate worker pool sizes (e.g., 4 workers, 2 workers, 1 worker). Critical gets the most throughput, low gets the least.
What to Build Next
- Add dead-letter queue handling so failed requests don't silently disappear
- Wire queue depth stats into your existing monitoring dashboard (Grafana, Datadog, or a simple Slack alert)
- Add request deduplication using a Redis SET so identical batch prompts only fire once
Related Reading
- How to Build a Multi-Model AI Router - route requests to the right model before they hit the queue
- How to Optimize Batch AI Processing for Cost - combine prioritization with batching for maximum savings
- How to Build AI Request Throttling Systems - pair throttling with priority queues to stay within rate limits
Want this system built for your business?
Get a free assessment. We will map every system your business needs and show you the ROI.
Get Your Free Assessment