Systems Library / AI Model Setup / How to Build Latency-Optimized AI Pipelines
AI Model Setup routing optimization

How to Build Latency-Optimized AI Pipelines

Cut AI response times by 50% with parallel processing and smart caching.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

Slow AI responses kill user experience. A 4-second wait feels long. An 8-second wait loses users. The default approach of sending one request, waiting, sending the next, waiting, is the slowest possible path through a multi-step AI pipeline. I have cut pipeline latency by 50-70% consistently by applying three techniques: parallel execution, streaming responses, and smart pre-fetching.

The gains are measurable and immediate. A 3-step sequential pipeline taking 9 seconds drops to 3-4 seconds when the independent steps run in parallel. Adding streaming makes the perceived latency even lower because users see output starting in under a second, even when the full response takes longer.

What You Need Before Starting

Step 1: Profile Your Current Pipeline

Before optimizing, measure. You cannot improve what you do not track.

import time
import functools
from typing import Callable

def timeit(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = (time.perf_counter() - start) * 1000
        print(f"{func.__name__}: {elapsed:.0f}ms")
        return result
    return wrapper

# Example pipeline to profile
import openai
client = openai.OpenAI(api_key="YOUR_API_KEY")

@timeit
def step_classify(content: str) -> str:
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": f"Classify this as positive/negative/neutral: {content}"}],
        max_tokens=10, temperature=0
    )
    return response.choices[0].message.content

@timeit
def step_summarize(content: str) -> str:
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": f"Summarize in one sentence: {content}"}],
        max_tokens=100, temperature=0.2
    )
    return response.choices[0].message.content

@timeit
def step_extract_keywords(content: str) -> str:
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": f"Extract 5 keywords from: {content}"}],
        max_tokens=50, temperature=0
    )
    return response.choices[0].message.content

# Sequential baseline
def pipeline_sequential(content: str) -> dict:
    sentiment = step_classify(content)
    summary = step_summarize(content)
    keywords = step_extract_keywords(content)
    return {"sentiment": sentiment, "summary": summary, "keywords": keywords}

Step 2: Run Independent Steps in Parallel with asyncio

If your steps do not depend on each other's outputs, run them simultaneously.

import asyncio
import openai

async_client = openai.AsyncOpenAI(api_key="YOUR_API_KEY")

async def async_complete(messages: list, model: str = "gpt-4o-mini", max_tokens: int = 200) -> str:
    response = await async_client.chat.completions.create(
        model=model,
        messages=messages,
        max_tokens=max_tokens,
        temperature=0.3
    )
    return response.choices[0].message.content

async def pipeline_parallel(content: str) -> dict:
    # Run all three steps simultaneously
    sentiment_task = async_complete(
        [{"role": "user", "content": f"Classify as positive/negative/neutral: {content}"}],
        max_tokens=10
    )
    summary_task = async_complete(
        [{"role": "user", "content": f"Summarize in one sentence: {content}"}],
        max_tokens=100
    )
    keywords_task = async_complete(
        [{"role": "user", "content": f"Extract 5 keywords: {content}"}],
        max_tokens=50
    )

    # Execute all at once, collect results
    sentiment, summary, keywords = await asyncio.gather(
        sentiment_task, summary_task, keywords_task
    )

    return {"sentiment": sentiment, "summary": summary, "keywords": keywords}

# Run it
async def run():
    start = time.perf_counter()
    result = await pipeline_parallel("Your content here...")
    elapsed = (time.perf_counter() - start) * 1000
    print(f"Parallel pipeline: {elapsed:.0f}ms")
    print(result)

asyncio.run(run())

Typical result: sequential 2,700ms drops to 900ms with parallel execution.

Step 3: Implement Response Streaming

Streaming returns tokens as they are generated, so the interface can start displaying output immediately.

def stream_response(messages: list, model: str = "gpt-4o-mini") -> str:
    full_content = ""

    with client.chat.completions.stream(
        model=model,
        messages=messages,
        max_tokens=500
    ) as stream:
        for chunk in stream:
            if chunk.choices and chunk.choices[0].delta.content:
                token = chunk.choices[0].delta.content
                print(token, end="", flush=True)  # Display as it arrives
                full_content += token

    print()  # New line after streaming
    return full_content

# For web apps, yield tokens to the frontend
def stream_to_frontend(messages: list):
    for chunk in client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        stream=True,
        max_tokens=500
    ):
        if chunk.choices and chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

# Flask/FastAPI example
# @app.route("/stream")
# def stream_endpoint():
#     return Response(stream_to_frontend(messages), mimetype="text/event-stream")

Step 4: Pre-Fetch Likely Next Requests

If you can predict what the user will ask next, start generating before they ask.

from functools import lru_cache
import threading

class PrefetchCache:
    def __init__(self):
        self._cache = {}
        self._lock = threading.Lock()

    def prefetch(self, key: str, messages: list, model: str = "gpt-4o-mini"):
        """Start a background request and cache the result."""
        def fetch():
            response = client.chat.completions.create(
                model=model, messages=messages, max_tokens=500, temperature=0
            )
            with self._lock:
                self._cache[key] = response.choices[0].message.content

        thread = threading.Thread(target=fetch, daemon=True)
        thread.start()

    def get(self, key: str, timeout_seconds: float = 5.0) -> str | None:
        """Get prefetched result. Wait up to timeout_seconds if not ready."""
        deadline = time.time() + timeout_seconds
        while time.time() < deadline:
            with self._lock:
                if key in self._cache:
                    return self._cache.pop(key)
            time.sleep(0.05)
        return None

prefetch = PrefetchCache()

def handle_user_action(user_query: str, context: str) -> str:
    # While processing the current query, prefetch likely follow-ups
    if "summarize" in user_query.lower():
        prefetch.prefetch(
            "likely_followup_keywords",
            [{"role": "user", "content": f"Extract 5 keywords from: {context}"}]
        )

    # Process current query
    result = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": user_query}]
    )
    return result.choices[0].message.content

def handle_followup_query(query: str) -> str:
    # Check if we prefetched this
    if "keyword" in query.lower():
        cached = prefetch.get("likely_followup_keywords", timeout_seconds=3.0)
        if cached:
            print("[PREFETCH HIT]")
            return cached

    # Fall through to normal API call
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": query}]
    )
    return response.choices[0].message.content

Step 5: Add Request Deduplication

Identical simultaneous requests should share a single API call.

import hashlib
import json
from threading import Event

class RequestDeduplicator:
    def __init__(self):
        self._in_flight = {}
        self._lock = threading.Lock()

    def _make_key(self, messages: list, model: str) -> str:
        payload = json.dumps({"model": model, "messages": messages}, sort_keys=True)
        return hashlib.sha256(payload.encode()).hexdigest()

    def complete(self, messages: list, model: str = "gpt-4o-mini") -> str:
        key = self._make_key(messages, model)

        with self._lock:
            if key in self._in_flight:
                event, result_holder = self._in_flight[key]
                is_first = False
            else:
                event = Event()
                result_holder = [None, None]  # [result, error]
                self._in_flight[key] = (event, result_holder)
                is_first = True

        if is_first:
            try:
                response = client.chat.completions.create(
                    model=model, messages=messages, temperature=0
                )
                result_holder[0] = response.choices[0].message.content
            except Exception as e:
                result_holder[1] = e
            finally:
                event.set()
                with self._lock:
                    self._in_flight.pop(key, None)
        else:
            event.wait(timeout=30)

        if result_holder[1]:
            raise result_holder[1]
        return result_holder[0]

deduplicator = RequestDeduplicator()

Step 6: Choose Faster Models for Latency-Critical Paths

Not all steps need the smartest model. Route by latency sensitivity.

MODEL_LATENCY_PROFILES = {
    "ultra_fast": "gpt-4o-mini",          # ~500ms average
    "fast":       "gpt-4o-mini",          # ~700ms average
    "balanced":   "gpt-4o",               # ~1200ms average
    "quality":    "gpt-4o",               # ~1500ms average
}

def latency_aware_complete(
    messages: list,
    latency_requirement: str = "balanced",
    quality_minimum: float = 7.0
) -> str:
    model = MODEL_LATENCY_PROFILES.get(latency_requirement, "gpt-4o-mini")

    # If quality requirement is high, override latency-optimized model
    if quality_minimum >= 9.0 and latency_requirement in {"ultra_fast", "fast"}:
        model = "gpt-4o"

    response = client.chat.completions.create(
        model=model, messages=messages, max_tokens=500
    )
    return response.choices[0].message.content

What to Build Next

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems