How to Build Latency-Optimized AI Pipelines
Cut AI response times by 50% with parallel processing and smart caching.
Jay Banlasan
The AI Systems Guy
Slow AI responses kill user experience. A 4-second wait feels long. An 8-second wait loses users. The default approach of sending one request, waiting, sending the next, waiting, is the slowest possible path through a multi-step AI pipeline. I have cut pipeline latency by 50-70% consistently by applying three techniques: parallel execution, streaming responses, and smart pre-fetching.
The gains are measurable and immediate. A 3-step sequential pipeline taking 9 seconds drops to 3-4 seconds when the independent steps run in parallel. Adding streaming makes the perceived latency even lower because users see output starting in under a second, even when the full response takes longer.
What You Need Before Starting
- Python 3.9+
- OpenAI API key
asyncio(built into Python)- A multi-step pipeline you want to speed up
Step 1: Profile Your Current Pipeline
Before optimizing, measure. You cannot improve what you do not track.
import time
import functools
from typing import Callable
def timeit(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = (time.perf_counter() - start) * 1000
print(f"{func.__name__}: {elapsed:.0f}ms")
return result
return wrapper
# Example pipeline to profile
import openai
client = openai.OpenAI(api_key="YOUR_API_KEY")
@timeit
def step_classify(content: str) -> str:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": f"Classify this as positive/negative/neutral: {content}"}],
max_tokens=10, temperature=0
)
return response.choices[0].message.content
@timeit
def step_summarize(content: str) -> str:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": f"Summarize in one sentence: {content}"}],
max_tokens=100, temperature=0.2
)
return response.choices[0].message.content
@timeit
def step_extract_keywords(content: str) -> str:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": f"Extract 5 keywords from: {content}"}],
max_tokens=50, temperature=0
)
return response.choices[0].message.content
# Sequential baseline
def pipeline_sequential(content: str) -> dict:
sentiment = step_classify(content)
summary = step_summarize(content)
keywords = step_extract_keywords(content)
return {"sentiment": sentiment, "summary": summary, "keywords": keywords}
Step 2: Run Independent Steps in Parallel with asyncio
If your steps do not depend on each other's outputs, run them simultaneously.
import asyncio
import openai
async_client = openai.AsyncOpenAI(api_key="YOUR_API_KEY")
async def async_complete(messages: list, model: str = "gpt-4o-mini", max_tokens: int = 200) -> str:
response = await async_client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens,
temperature=0.3
)
return response.choices[0].message.content
async def pipeline_parallel(content: str) -> dict:
# Run all three steps simultaneously
sentiment_task = async_complete(
[{"role": "user", "content": f"Classify as positive/negative/neutral: {content}"}],
max_tokens=10
)
summary_task = async_complete(
[{"role": "user", "content": f"Summarize in one sentence: {content}"}],
max_tokens=100
)
keywords_task = async_complete(
[{"role": "user", "content": f"Extract 5 keywords: {content}"}],
max_tokens=50
)
# Execute all at once, collect results
sentiment, summary, keywords = await asyncio.gather(
sentiment_task, summary_task, keywords_task
)
return {"sentiment": sentiment, "summary": summary, "keywords": keywords}
# Run it
async def run():
start = time.perf_counter()
result = await pipeline_parallel("Your content here...")
elapsed = (time.perf_counter() - start) * 1000
print(f"Parallel pipeline: {elapsed:.0f}ms")
print(result)
asyncio.run(run())
Typical result: sequential 2,700ms drops to 900ms with parallel execution.
Step 3: Implement Response Streaming
Streaming returns tokens as they are generated, so the interface can start displaying output immediately.
def stream_response(messages: list, model: str = "gpt-4o-mini") -> str:
full_content = ""
with client.chat.completions.stream(
model=model,
messages=messages,
max_tokens=500
) as stream:
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
print(token, end="", flush=True) # Display as it arrives
full_content += token
print() # New line after streaming
return full_content
# For web apps, yield tokens to the frontend
def stream_to_frontend(messages: list):
for chunk in client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True,
max_tokens=500
):
if chunk.choices and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
# Flask/FastAPI example
# @app.route("/stream")
# def stream_endpoint():
# return Response(stream_to_frontend(messages), mimetype="text/event-stream")
Step 4: Pre-Fetch Likely Next Requests
If you can predict what the user will ask next, start generating before they ask.
from functools import lru_cache
import threading
class PrefetchCache:
def __init__(self):
self._cache = {}
self._lock = threading.Lock()
def prefetch(self, key: str, messages: list, model: str = "gpt-4o-mini"):
"""Start a background request and cache the result."""
def fetch():
response = client.chat.completions.create(
model=model, messages=messages, max_tokens=500, temperature=0
)
with self._lock:
self._cache[key] = response.choices[0].message.content
thread = threading.Thread(target=fetch, daemon=True)
thread.start()
def get(self, key: str, timeout_seconds: float = 5.0) -> str | None:
"""Get prefetched result. Wait up to timeout_seconds if not ready."""
deadline = time.time() + timeout_seconds
while time.time() < deadline:
with self._lock:
if key in self._cache:
return self._cache.pop(key)
time.sleep(0.05)
return None
prefetch = PrefetchCache()
def handle_user_action(user_query: str, context: str) -> str:
# While processing the current query, prefetch likely follow-ups
if "summarize" in user_query.lower():
prefetch.prefetch(
"likely_followup_keywords",
[{"role": "user", "content": f"Extract 5 keywords from: {context}"}]
)
# Process current query
result = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": user_query}]
)
return result.choices[0].message.content
def handle_followup_query(query: str) -> str:
# Check if we prefetched this
if "keyword" in query.lower():
cached = prefetch.get("likely_followup_keywords", timeout_seconds=3.0)
if cached:
print("[PREFETCH HIT]")
return cached
# Fall through to normal API call
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": query}]
)
return response.choices[0].message.content
Step 5: Add Request Deduplication
Identical simultaneous requests should share a single API call.
import hashlib
import json
from threading import Event
class RequestDeduplicator:
def __init__(self):
self._in_flight = {}
self._lock = threading.Lock()
def _make_key(self, messages: list, model: str) -> str:
payload = json.dumps({"model": model, "messages": messages}, sort_keys=True)
return hashlib.sha256(payload.encode()).hexdigest()
def complete(self, messages: list, model: str = "gpt-4o-mini") -> str:
key = self._make_key(messages, model)
with self._lock:
if key in self._in_flight:
event, result_holder = self._in_flight[key]
is_first = False
else:
event = Event()
result_holder = [None, None] # [result, error]
self._in_flight[key] = (event, result_holder)
is_first = True
if is_first:
try:
response = client.chat.completions.create(
model=model, messages=messages, temperature=0
)
result_holder[0] = response.choices[0].message.content
except Exception as e:
result_holder[1] = e
finally:
event.set()
with self._lock:
self._in_flight.pop(key, None)
else:
event.wait(timeout=30)
if result_holder[1]:
raise result_holder[1]
return result_holder[0]
deduplicator = RequestDeduplicator()
Step 6: Choose Faster Models for Latency-Critical Paths
Not all steps need the smartest model. Route by latency sensitivity.
MODEL_LATENCY_PROFILES = {
"ultra_fast": "gpt-4o-mini", # ~500ms average
"fast": "gpt-4o-mini", # ~700ms average
"balanced": "gpt-4o", # ~1200ms average
"quality": "gpt-4o", # ~1500ms average
}
def latency_aware_complete(
messages: list,
latency_requirement: str = "balanced",
quality_minimum: float = 7.0
) -> str:
model = MODEL_LATENCY_PROFILES.get(latency_requirement, "gpt-4o-mini")
# If quality requirement is high, override latency-optimized model
if quality_minimum >= 9.0 and latency_requirement in {"ultra_fast", "fast"}:
model = "gpt-4o"
response = client.chat.completions.create(
model=model, messages=messages, max_tokens=500
)
return response.choices[0].message.content
What to Build Next
- Benchmark your specific pipeline steps and build a dependency graph so you automatically identify which steps can run in parallel versus which require prior outputs
- Add p50/p95/p99 latency tracking per pipeline step to your monitoring so you catch latency regressions before users do
- Implement adaptive batching that groups low-priority requests into small batches when the system is under load, trading latency for throughput on non-real-time tasks
Related Reading
- How to Build a Multi-Model AI Router - routing cheap models for latency-sensitive steps is a core optimization
- How to Implement AI Response Caching - caching is the single highest-impact latency optimization available
- How to Set Up LiteLLM as Your AI Gateway - LiteLLM supports async completions and streaming out of the box
Want this system built for your business?
Get a free assessment. We will map every system your business needs and show you the ROI.
Get Your Free Assessment