Systems Library / AI Model Setup / How to Build Parallel AI Processing Pipelines
AI Model Setup routing optimization

How to Build Parallel AI Processing Pipelines

Process multiple AI requests simultaneously to cut total processing time.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

A weekly report job I built was processing 200 ad accounts sequentially. Each account required three AI calls: performance summary, anomaly detection, and next-week recommendation. 600 API calls at 1.5 seconds each meant 15 minutes of wall-clock time every Monday morning. After converting it to parallel ai api processing concurrent execution, the same 600 calls finish in 90 seconds. Same cost, same quality, ten times faster.

Sequential AI processing is the default because it's simple to write. Parallel processing requires a bit more setup but the payoff is immediate and scales linearly with your concurrency level. If you have any workflow doing more than 10 AI calls in a loop, this tutorial directly applies.

What You Need Before Starting

Step 1: Understand Async vs Threading for AI Calls

AI API calls are I/O bound, not CPU bound. asyncio is the right tool. Threading works but has GIL overhead. Multiprocessing is overkill.

import asyncio
import anthropic
import time

# The async client — this is what makes parallel calls possible
_async_client = anthropic.AsyncAnthropic()

async def single_async_call(prompt: str, model: str = "claude-haiku-3") -> str:
    response = await _async_client.messages.create(
        model=model, max_tokens=512,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.content[0].text

# Verify the async client works
async def test():
    result = await single_async_call("Say 'ok' and nothing else")
    print(result)

asyncio.run(test())

Step 2: Process a List of Prompts in Parallel

asyncio.gather() fires all tasks at once and waits for all to complete.

async def process_batch_parallel(prompts: list[str],
                                   model: str = "claude-haiku-3") -> list[str]:
    tasks = [single_async_call(p, model) for p in prompts]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    outputs = []
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Request {i} failed: {result}")
            outputs.append(None)
        else:
            outputs.append(result)
    
    return outputs

# Benchmark
async def benchmark():
    prompts = [f"What is {i} * {i}?" for i in range(20)]
    
    # Sequential
    start = time.time()
    seq_results = [await single_async_call(p) for p in prompts]
    sequential_time = time.time() - start
    
    # Parallel
    start = time.time()
    par_results = await process_batch_parallel(prompts)
    parallel_time = time.time() - start
    
    print(f"Sequential: {sequential_time:.1f}s")
    print(f"Parallel:   {parallel_time:.1f}s")
    print(f"Speedup:    {sequential_time/parallel_time:.1f}x")

asyncio.run(benchmark())

Step 3: Add Concurrency Limits (Critical)

Firing 200 requests simultaneously will almost certainly hit your rate limit. Use a semaphore to cap concurrent requests.

async def process_batch_limited(
    prompts: list[str],
    concurrency: int = 10,
    model: str = "claude-haiku-3"
) -> list[str]:
    semaphore = asyncio.Semaphore(concurrency)
    
    async def bounded_call(prompt: str) -> str:
        async with semaphore:
            return await single_async_call(prompt, model)
    
    tasks = [bounded_call(p) for p in prompts]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    return [None if isinstance(r, Exception) else r for r in results]

# Rule of thumb for concurrency settings:
# RPM limit 1000 -> concurrency = 15-20 (leaves buffer)
# RPM limit 500  -> concurrency = 8-10
# RPM limit 100  -> concurrency = 2-3

The semaphore is a token pool. When all 10 slots are taken, new tasks wait until a slot frees. This naturally rate-limits your request throughput without complex timing logic.

Step 4: Build a Parallel Pipeline with Multiple Stages

Real workflows have multiple AI steps per item. Run independent steps in parallel; run dependent steps in sequence.

@dataclass
class AdAccountReport:
    account_id: str
    summary: str | None = None
    anomalies: str | None = None
    recommendations: str | None = None

async def process_single_account(account_id: str, data: dict,
                                   sem: asyncio.Semaphore) -> AdAccountReport:
    report = AdAccountReport(account_id=account_id)
    
    async with sem:
        # Steps 1 and 2 are independent — run them in parallel
        summary_task = _async_client.messages.create(
            model="claude-haiku-3", max_tokens=300,
            messages=[{"role": "user", "content":
                f"Summarize this ad account's 7-day performance: {data}"}]
        )
        anomaly_task = _async_client.messages.create(
            model="claude-haiku-3", max_tokens=200,
            messages=[{"role": "user", "content":
                f"Identify any performance anomalies: {data}"}]
        )
        
        summary_resp, anomaly_resp = await asyncio.gather(summary_task, anomaly_task)
        report.summary   = summary_resp.content[0].text
        report.anomalies = anomaly_resp.content[0].text
    
    # Step 3 depends on steps 1 and 2 — run it after
    async with sem:
        rec_resp = await _async_client.messages.create(
            model="claude-haiku-3", max_tokens=300,
            messages=[{"role": "user", "content":
                f"Based on this summary: {report.summary}\n"
                f"And these anomalies: {report.anomalies}\n"
                f"What should we do next week?"}]
        )
        report.recommendations = rec_resp.content[0].text
    
    return report

async def process_all_accounts(accounts: list[dict],
                                 concurrency: int = 5) -> list[AdAccountReport]:
    sem = asyncio.Semaphore(concurrency)
    tasks = [process_single_account(a["id"], a, sem) for a in accounts]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    reports = []
    for i, r in enumerate(results):
        if isinstance(r, Exception):
            print(f"Account {accounts[i]['id']} failed: {r}")
        else:
            reports.append(r)
    
    return reports

Step 5: Add Progress Tracking for Long Batches

When processing hundreds of items, you need visibility into progress.

from tqdm.asyncio import tqdm_asyncio
import tqdm

async def process_with_progress(items: list[dict],
                                  processor_fn,
                                  concurrency: int = 10) -> list:
    semaphore = asyncio.Semaphore(concurrency)
    completed = 0
    total = len(items)
    
    async def tracked_call(item: dict) -> any:
        nonlocal completed
        async with semaphore:
            result = await processor_fn(item)
            completed += 1
            if completed % 10 == 0 or completed == total:
                print(f"Progress: {completed}/{total} ({completed/total*100:.0f}%)")
            return result
    
    tasks = [tracked_call(item) for item in items]
    return await asyncio.gather(*tasks, return_exceptions=True)

Step 6: Handle Partial Failures Gracefully

In a batch of 200 requests, some will fail. Don't let failures block the whole batch.

async def resilient_batch(
    items: list[dict],
    prompt_fn,            # function(item) -> str
    concurrency: int = 10,
    model: str = "claude-haiku-3",
    max_retries: int = 2
) -> dict[str, any]:
    sem = asyncio.Semaphore(concurrency)
    results = {}
    
    async def process_item(item: dict) -> tuple[str, any]:
        item_id = item.get("id", str(id(item)))
        
        for attempt in range(max_retries + 1):
            try:
                async with sem:
                    prompt = prompt_fn(item)
                    response = await _async_client.messages.create(
                        model=model, max_tokens=512,
                        messages=[{"role": "user", "content": prompt}]
                    )
                    return (item_id, response.content[0].text)
            
            except anthropic.RateLimitError:
                wait = 2 ** attempt
                await asyncio.sleep(wait)
            
            except Exception as e:
                if attempt == max_retries:
                    return (item_id, {"error": str(e)})
                await asyncio.sleep(1)
        
        return (item_id, {"error": "max retries exceeded"})
    
    pairs = await asyncio.gather(*[process_item(item) for item in items])
    return dict(pairs)

# Example
async def main():
    leads = [{"id": f"lead_{i}", "company": f"Company {i}"} for i in range(50)]
    
    def make_prompt(lead: dict) -> str:
        return f"Summarize {lead['company']} in one sentence for a sales rep."
    
    start = time.time()
    results = await resilient_batch(leads, make_prompt, concurrency=10)
    elapsed = time.time() - start
    
    successful = sum(1 for v in results.values() if not isinstance(v, dict))
    print(f"Processed {len(leads)} leads in {elapsed:.1f}s ({successful} successful)")

asyncio.run(main())

What to Build Next

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems