Systems Library / AI Model Setup / How to Implement AI Request Prioritization
AI Model Setup routing optimization

How to Implement AI Request Prioritization

Build priority queues so critical AI tasks run before batch processing.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

When I first wired up AI into production, every request hit the same queue in order of arrival. A low-priority nightly batch job was blocking a live customer-facing response. That's when I built a proper ai request priority queue management system. It took less than a day and immediately stopped the bleeding.

The business case is simple: not all AI tasks carry the same urgency. A customer waiting on a support reply is worth more right now than a marketing email digest that runs at midnight. Priority queues let you encode that business logic in code so the right requests always move to the front.

What You Need Before Starting

Step 1: Define Your Priority Levels

Start with three tiers. You can always add more, but three covers 90% of real use cases.

from enum import IntEnum

class Priority(IntEnum):
    CRITICAL = 1   # live user-facing, customer support, auth flows
    HIGH     = 2   # internal tools, team dashboards, near-real-time
    LOW      = 3   # batch jobs, nightly summaries, bulk enrichment

Use IntEnum so you can compare and sort priorities numerically. Lower number = runs first.

Step 2: Build the Priority Queue with Redis Sorted Sets

Redis sorted sets are a natural fit. The score IS the priority. Lower scores dequeue first.

import redis
import json
import time
import uuid

r = redis.Redis(host='localhost', port=6379, db=0)

QUEUE_KEY = "ai:request:queue"

def enqueue(prompt: str, priority: Priority, metadata: dict = {}) -> str:
    request_id = str(uuid.uuid4())
    payload = json.dumps({
        "id": request_id,
        "prompt": prompt,
        "priority": int(priority),
        "metadata": metadata,
        "enqueued_at": time.time()
    })
    # Score = priority * 1e10 + timestamp ensures FIFO within same priority tier
    score = int(priority) * 1e10 + time.time()
    r.zadd(QUEUE_KEY, {payload: score})
    return request_id

def dequeue() -> dict | None:
    # ZPOPMIN atomically pops the lowest-score (highest priority) item
    result = r.zpopmin(QUEUE_KEY, 1)
    if not result:
        return None
    payload, _ = result[0]
    return json.loads(payload)

The scoring trick is important. Multiplying priority by 1e10 gives you enough headroom so that timestamp tiebreakers never leak into the wrong priority tier.

Step 3: Build the Worker Loop

The worker pulls requests from the queue and dispatches them to your AI provider.

import anthropic
import asyncio

client = anthropic.Anthropic()

def process_request(request: dict) -> str:
    message = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=1024,
        messages=[{"role": "user", "content": request["prompt"]}]
    )
    return message.content[0].text

def run_worker(poll_interval: float = 0.1):
    print("Worker started. Polling queue...")
    while True:
        request = dequeue()
        if request:
            print(f"Processing [{Priority(request['priority']).name}] request {request['id']}")
            try:
                result = process_request(request)
                # Store result so the caller can retrieve it
                r.setex(f"ai:result:{request['id']}", 300, result)
                print(f"Done: {request['id']}")
            except Exception as e:
                print(f"Failed: {request['id']} — {e}")
        else:
            time.sleep(poll_interval)

Step 4: Add a Result Retrieval Pattern

Callers need a way to get results back. Use a simple poll-and-wait pattern keyed by request ID.

def get_result(request_id: str, timeout: int = 30) -> str | None:
    deadline = time.time() + timeout
    while time.time() < deadline:
        result = r.get(f"ai:result:{request_id}")
        if result:
            return result.decode()
        time.sleep(0.2)
    return None  # timed out

# Usage example
req_id = enqueue("Summarize this support ticket: ...", Priority.CRITICAL)
output = get_result(req_id, timeout=20)
print(output)

Step 5: Add Queue Depth Monitoring

You need visibility. If the LOW priority queue grows to 5,000 items while CRITICAL sits at zero, something is wrong with your worker pool.

def queue_stats() -> dict:
    all_items = r.zrange(QUEUE_KEY, 0, -1, withscores=True)
    stats = {p.name: 0 for p in Priority}
    for payload, _ in all_items:
        data = json.loads(payload)
        tier = Priority(data["priority"]).name
        stats[tier] += 1
    return stats

# Log this every 60 seconds in production
print(queue_stats())
# {'CRITICAL': 0, 'HIGH': 3, 'LOW': 847}

Step 6: Run Multiple Workers for High-Volume Tiers

For production, run dedicated worker processes per priority tier. This prevents a flood of LOW priority work from starving CRITICAL even if the queue logic is correct.

# worker_critical.py — runs 3 processes via supervisor or systemd
QUEUE_KEY_CRITICAL = "ai:request:queue:critical"

# Split your enqueue function to route to tier-specific queues
# and run separate worker pools per tier

A simple approach: use three Redis queue keys (queue:critical, queue:high, queue:low) and three separate worker pool sizes (e.g., 4 workers, 2 workers, 1 worker). Critical gets the most throughput, low gets the least.

What to Build Next

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems