How to Build Parallel AI Processing Pipelines
Process multiple AI requests simultaneously to cut total processing time.
Jay Banlasan
The AI Systems Guy
A weekly report job I built was processing 200 ad accounts sequentially. Each account required three AI calls: performance summary, anomaly detection, and next-week recommendation. 600 API calls at 1.5 seconds each meant 15 minutes of wall-clock time every Monday morning. After converting it to parallel ai api processing concurrent execution, the same 600 calls finish in 90 seconds. Same cost, same quality, ten times faster.
Sequential AI processing is the default because it's simple to write. Parallel processing requires a bit more setup but the payoff is immediate and scales linearly with your concurrency level. If you have any workflow doing more than 10 AI calls in a loop, this tutorial directly applies.
What You Need Before Starting
- Python 3.10+
anthropicSDKasyncio(built into Python)aiohttpfor async HTTP if not using the SDK (pip install aiohttp)- A clear understanding of your provider's rate limits (you'll fill those limits)
Step 1: Understand Async vs Threading for AI Calls
AI API calls are I/O bound, not CPU bound. asyncio is the right tool. Threading works but has GIL overhead. Multiprocessing is overkill.
import asyncio
import anthropic
import time
# The async client — this is what makes parallel calls possible
_async_client = anthropic.AsyncAnthropic()
async def single_async_call(prompt: str, model: str = "claude-haiku-3") -> str:
response = await _async_client.messages.create(
model=model, max_tokens=512,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
# Verify the async client works
async def test():
result = await single_async_call("Say 'ok' and nothing else")
print(result)
asyncio.run(test())
Step 2: Process a List of Prompts in Parallel
asyncio.gather() fires all tasks at once and waits for all to complete.
async def process_batch_parallel(prompts: list[str],
model: str = "claude-haiku-3") -> list[str]:
tasks = [single_async_call(p, model) for p in prompts]
results = await asyncio.gather(*tasks, return_exceptions=True)
outputs = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Request {i} failed: {result}")
outputs.append(None)
else:
outputs.append(result)
return outputs
# Benchmark
async def benchmark():
prompts = [f"What is {i} * {i}?" for i in range(20)]
# Sequential
start = time.time()
seq_results = [await single_async_call(p) for p in prompts]
sequential_time = time.time() - start
# Parallel
start = time.time()
par_results = await process_batch_parallel(prompts)
parallel_time = time.time() - start
print(f"Sequential: {sequential_time:.1f}s")
print(f"Parallel: {parallel_time:.1f}s")
print(f"Speedup: {sequential_time/parallel_time:.1f}x")
asyncio.run(benchmark())
Step 3: Add Concurrency Limits (Critical)
Firing 200 requests simultaneously will almost certainly hit your rate limit. Use a semaphore to cap concurrent requests.
async def process_batch_limited(
prompts: list[str],
concurrency: int = 10,
model: str = "claude-haiku-3"
) -> list[str]:
semaphore = asyncio.Semaphore(concurrency)
async def bounded_call(prompt: str) -> str:
async with semaphore:
return await single_async_call(prompt, model)
tasks = [bounded_call(p) for p in prompts]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [None if isinstance(r, Exception) else r for r in results]
# Rule of thumb for concurrency settings:
# RPM limit 1000 -> concurrency = 15-20 (leaves buffer)
# RPM limit 500 -> concurrency = 8-10
# RPM limit 100 -> concurrency = 2-3
The semaphore is a token pool. When all 10 slots are taken, new tasks wait until a slot frees. This naturally rate-limits your request throughput without complex timing logic.
Step 4: Build a Parallel Pipeline with Multiple Stages
Real workflows have multiple AI steps per item. Run independent steps in parallel; run dependent steps in sequence.
@dataclass
class AdAccountReport:
account_id: str
summary: str | None = None
anomalies: str | None = None
recommendations: str | None = None
async def process_single_account(account_id: str, data: dict,
sem: asyncio.Semaphore) -> AdAccountReport:
report = AdAccountReport(account_id=account_id)
async with sem:
# Steps 1 and 2 are independent — run them in parallel
summary_task = _async_client.messages.create(
model="claude-haiku-3", max_tokens=300,
messages=[{"role": "user", "content":
f"Summarize this ad account's 7-day performance: {data}"}]
)
anomaly_task = _async_client.messages.create(
model="claude-haiku-3", max_tokens=200,
messages=[{"role": "user", "content":
f"Identify any performance anomalies: {data}"}]
)
summary_resp, anomaly_resp = await asyncio.gather(summary_task, anomaly_task)
report.summary = summary_resp.content[0].text
report.anomalies = anomaly_resp.content[0].text
# Step 3 depends on steps 1 and 2 — run it after
async with sem:
rec_resp = await _async_client.messages.create(
model="claude-haiku-3", max_tokens=300,
messages=[{"role": "user", "content":
f"Based on this summary: {report.summary}\n"
f"And these anomalies: {report.anomalies}\n"
f"What should we do next week?"}]
)
report.recommendations = rec_resp.content[0].text
return report
async def process_all_accounts(accounts: list[dict],
concurrency: int = 5) -> list[AdAccountReport]:
sem = asyncio.Semaphore(concurrency)
tasks = [process_single_account(a["id"], a, sem) for a in accounts]
results = await asyncio.gather(*tasks, return_exceptions=True)
reports = []
for i, r in enumerate(results):
if isinstance(r, Exception):
print(f"Account {accounts[i]['id']} failed: {r}")
else:
reports.append(r)
return reports
Step 5: Add Progress Tracking for Long Batches
When processing hundreds of items, you need visibility into progress.
from tqdm.asyncio import tqdm_asyncio
import tqdm
async def process_with_progress(items: list[dict],
processor_fn,
concurrency: int = 10) -> list:
semaphore = asyncio.Semaphore(concurrency)
completed = 0
total = len(items)
async def tracked_call(item: dict) -> any:
nonlocal completed
async with semaphore:
result = await processor_fn(item)
completed += 1
if completed % 10 == 0 or completed == total:
print(f"Progress: {completed}/{total} ({completed/total*100:.0f}%)")
return result
tasks = [tracked_call(item) for item in items]
return await asyncio.gather(*tasks, return_exceptions=True)
Step 6: Handle Partial Failures Gracefully
In a batch of 200 requests, some will fail. Don't let failures block the whole batch.
async def resilient_batch(
items: list[dict],
prompt_fn, # function(item) -> str
concurrency: int = 10,
model: str = "claude-haiku-3",
max_retries: int = 2
) -> dict[str, any]:
sem = asyncio.Semaphore(concurrency)
results = {}
async def process_item(item: dict) -> tuple[str, any]:
item_id = item.get("id", str(id(item)))
for attempt in range(max_retries + 1):
try:
async with sem:
prompt = prompt_fn(item)
response = await _async_client.messages.create(
model=model, max_tokens=512,
messages=[{"role": "user", "content": prompt}]
)
return (item_id, response.content[0].text)
except anthropic.RateLimitError:
wait = 2 ** attempt
await asyncio.sleep(wait)
except Exception as e:
if attempt == max_retries:
return (item_id, {"error": str(e)})
await asyncio.sleep(1)
return (item_id, {"error": "max retries exceeded"})
pairs = await asyncio.gather(*[process_item(item) for item in items])
return dict(pairs)
# Example
async def main():
leads = [{"id": f"lead_{i}", "company": f"Company {i}"} for i in range(50)]
def make_prompt(lead: dict) -> str:
return f"Summarize {lead['company']} in one sentence for a sales rep."
start = time.time()
results = await resilient_batch(leads, make_prompt, concurrency=10)
elapsed = time.time() - start
successful = sum(1 for v in results.values() if not isinstance(v, dict))
print(f"Processed {len(leads)} leads in {elapsed:.1f}s ({successful} successful)")
asyncio.run(main())
What to Build Next
- Add a result persistence layer that saves completed items to your database as they finish, not after the whole batch completes
- Build an adaptive concurrency controller that increases parallelism when under rate limit and backs off automatically when 429s appear
- Combine parallel processing with the batch API for overnight jobs: parallel for real-time pipelines, batch API for nightly workloads
Related Reading
- How to Build a Multi-Model AI Router - parallel processing benefits from smart routing to balance load across models
- How to Build Automatic Model Failover Systems - in parallel batches, individual failures should trigger item-level failover, not batch-level
- How to Implement Semantic Caching for AI Queries - caching is even more valuable in parallel pipelines where duplicate prompts are common
Want this system built for your business?
Get a free assessment. We will map every system your business needs and show you the ROI.
Get Your Free Assessment