How to Build a Multi-Source RAG System
Combine data from databases, documents, and APIs in one RAG system.
Jay Banlasan
The AI Systems Guy
A multi-source rag system pulling from databases, documents, and APIs gives you answers that span your entire business. I build these when a single document store is not enough. Your sales data lives in a CRM, your policies live in Google Docs, and your product specs live in a wiki. Multi-source RAG connects all of them.
One question can pull context from three different systems and synthesize a complete answer.
What You Need Before Starting
- Multiple data sources (database, documents, APIs)
- Python 3.8+ with chromadb, anthropic, and source-specific connectors
- A unified schema for cross-source metadata
- An embedding model
Step 1: Define Source Connectors
import sqlite3
import requests
import json
class DatabaseSource:
def __init__(self, db_path, query):
self.db_path = db_path
self.query = query
def fetch(self):
conn = sqlite3.connect(self.db_path)
rows = conn.execute(self.query).fetchall()
columns = [d[0] for d in conn.execute(self.query).description]
return [{"type": "database", "content": json.dumps(dict(zip(columns, row)))} for row in rows]
class APISource:
def __init__(self, url, headers=None):
self.url = url
self.headers = headers or {}
def fetch(self):
response = requests.get(self.url, headers=self.headers)
items = response.json().get("data", [])
return [{"type": "api", "content": json.dumps(item)} for item in items]
class DocumentSource:
def __init__(self, collection):
self.collection = collection
def search(self, query_embedding, top_k=5):
results = self.collection.query(query_embeddings=[query_embedding], n_results=top_k)
return results
Step 2: Build the Unified Retriever
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
def retrieve_multi_source(question, sources, top_k_per_source=3):
query_embedding = model.encode(question).tolist()
all_results = []
for source_name, source in sources.items():
if hasattr(source, "search"):
results = source.search(query_embedding, top_k=top_k_per_source)
for i in range(len(results["ids"][0])):
all_results.append({
"source": source_name,
"content": results["documents"][0][i],
"score": 1 - results["distances"][0][i]
})
elif hasattr(source, "fetch"):
items = source.fetch()
for item in items:
item_embedding = model.encode(item["content"]).tolist()
import numpy as np
score = np.dot(query_embedding, item_embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(item_embedding)
)
all_results.append({
"source": source_name,
"content": item["content"],
"score": float(score)
})
all_results.sort(key=lambda x: x["score"], reverse=True)
return all_results[:top_k_per_source * len(sources)]
Step 3: Generate Cross-Source Answers
import anthropic
client = anthropic.Anthropic()
def answer_from_sources(question, retrieved_results):
context_parts = []
for r in retrieved_results:
context_parts.append(f"[Source: {r['source']}]\n{r['content']}")
context = "\n\n---\n\n".join(context_parts)
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=500,
system="Answer using the provided context from multiple sources. Cite which source each fact comes from. If sources conflict, note the discrepancy.",
messages=[{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}]
)
return response.content[0].text
Step 4: Configure Your Sources
import chromadb
chroma = chromadb.PersistentClient(path="./multi_rag")
sources = {
"policies": DocumentSource(chroma.get_collection("policies")),
"crm": DatabaseSource("crm.db", "SELECT company, contact, deal_value, status FROM deals WHERE status = 'active'"),
"products": DocumentSource(chroma.get_collection("products")),
}
def query(question):
results = retrieve_multi_source(question, sources)
answer = answer_from_sources(question, results)
return {"answer": answer, "sources_used": list(set(r["source"] for r in results))}
Step 5: Add Source Priority
Some sources should be weighted higher for certain question types:
SOURCE_PRIORITIES = {
"pricing": {"crm": 1.5, "products": 1.2, "policies": 0.8},
"policy": {"policies": 1.5, "crm": 0.5, "products": 0.5},
"technical": {"products": 1.5, "policies": 0.8, "crm": 0.5},
}
def classify_question(question):
if any(word in question.lower() for word in ["price", "cost", "deal", "discount"]):
return "pricing"
if any(word in question.lower() for word in ["policy", "rule", "guideline", "procedure"]):
return "policy"
return "technical"
def weighted_retrieve(question, sources):
q_type = classify_question(question)
priorities = SOURCE_PRIORITIES.get(q_type, {})
results = retrieve_multi_source(question, sources)
for r in results:
r["score"] *= priorities.get(r["source"], 1.0)
results.sort(key=lambda x: x["score"], reverse=True)
return results
What to Build Next
Add source freshness scoring. If your CRM data is from today but your policy document is 6 months old, weight the CRM higher for time-sensitive questions. Freshness metadata on every chunk prevents stale answers.
Related Reading
- The Integration Layer Explained - multi-source RAG as an integration challenge
- Cross-Functional AI: When Marketing Talks to Operations - cross-department data in one system
- The Centralized Brain Concept - unifying business knowledge into one interface
Want this system built for your business?
Get a free assessment. We will map every system your business needs and show you the ROI.
Get Your Free Assessment