Systems Library / AI Capabilities / How to Build a Multi-Source RAG System
AI Capabilities rag knowledge

How to Build a Multi-Source RAG System

Combine data from databases, documents, and APIs in one RAG system.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

A multi-source rag system pulling from databases, documents, and APIs gives you answers that span your entire business. I build these when a single document store is not enough. Your sales data lives in a CRM, your policies live in Google Docs, and your product specs live in a wiki. Multi-source RAG connects all of them.

One question can pull context from three different systems and synthesize a complete answer.

What You Need Before Starting

Step 1: Define Source Connectors

import sqlite3
import requests
import json

class DatabaseSource:
    def __init__(self, db_path, query):
        self.db_path = db_path
        self.query = query

    def fetch(self):
        conn = sqlite3.connect(self.db_path)
        rows = conn.execute(self.query).fetchall()
        columns = [d[0] for d in conn.execute(self.query).description]
        return [{"type": "database", "content": json.dumps(dict(zip(columns, row)))} for row in rows]

class APISource:
    def __init__(self, url, headers=None):
        self.url = url
        self.headers = headers or {}

    def fetch(self):
        response = requests.get(self.url, headers=self.headers)
        items = response.json().get("data", [])
        return [{"type": "api", "content": json.dumps(item)} for item in items]

class DocumentSource:
    def __init__(self, collection):
        self.collection = collection

    def search(self, query_embedding, top_k=5):
        results = self.collection.query(query_embeddings=[query_embedding], n_results=top_k)
        return results

Step 2: Build the Unified Retriever

from sentence_transformers import SentenceTransformer

model = SentenceTransformer("all-MiniLM-L6-v2")

def retrieve_multi_source(question, sources, top_k_per_source=3):
    query_embedding = model.encode(question).tolist()
    all_results = []

    for source_name, source in sources.items():
        if hasattr(source, "search"):
            results = source.search(query_embedding, top_k=top_k_per_source)
            for i in range(len(results["ids"][0])):
                all_results.append({
                    "source": source_name,
                    "content": results["documents"][0][i],
                    "score": 1 - results["distances"][0][i]
                })
        elif hasattr(source, "fetch"):
            items = source.fetch()
            for item in items:
                item_embedding = model.encode(item["content"]).tolist()
                import numpy as np
                score = np.dot(query_embedding, item_embedding) / (
                    np.linalg.norm(query_embedding) * np.linalg.norm(item_embedding)
                )
                all_results.append({
                    "source": source_name,
                    "content": item["content"],
                    "score": float(score)
                })

    all_results.sort(key=lambda x: x["score"], reverse=True)
    return all_results[:top_k_per_source * len(sources)]

Step 3: Generate Cross-Source Answers

import anthropic

client = anthropic.Anthropic()

def answer_from_sources(question, retrieved_results):
    context_parts = []
    for r in retrieved_results:
        context_parts.append(f"[Source: {r['source']}]\n{r['content']}")

    context = "\n\n---\n\n".join(context_parts)

    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=500,
        system="Answer using the provided context from multiple sources. Cite which source each fact comes from. If sources conflict, note the discrepancy.",
        messages=[{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}]
    )
    return response.content[0].text

Step 4: Configure Your Sources

import chromadb

chroma = chromadb.PersistentClient(path="./multi_rag")

sources = {
    "policies": DocumentSource(chroma.get_collection("policies")),
    "crm": DatabaseSource("crm.db", "SELECT company, contact, deal_value, status FROM deals WHERE status = 'active'"),
    "products": DocumentSource(chroma.get_collection("products")),
}

def query(question):
    results = retrieve_multi_source(question, sources)
    answer = answer_from_sources(question, results)
    return {"answer": answer, "sources_used": list(set(r["source"] for r in results))}

Step 5: Add Source Priority

Some sources should be weighted higher for certain question types:

SOURCE_PRIORITIES = {
    "pricing": {"crm": 1.5, "products": 1.2, "policies": 0.8},
    "policy": {"policies": 1.5, "crm": 0.5, "products": 0.5},
    "technical": {"products": 1.5, "policies": 0.8, "crm": 0.5},
}

def classify_question(question):
    if any(word in question.lower() for word in ["price", "cost", "deal", "discount"]):
        return "pricing"
    if any(word in question.lower() for word in ["policy", "rule", "guideline", "procedure"]):
        return "policy"
    return "technical"

def weighted_retrieve(question, sources):
    q_type = classify_question(question)
    priorities = SOURCE_PRIORITIES.get(q_type, {})
    results = retrieve_multi_source(question, sources)
    for r in results:
        r["score"] *= priorities.get(r["source"], 1.0)
    results.sort(key=lambda x: x["score"], reverse=True)
    return results

What to Build Next

Add source freshness scoring. If your CRM data is from today but your policy document is 6 months old, weight the CRM higher for time-sensitive questions. Freshness metadata on every chunk prevents stale answers.

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems