How to Create Automated Knowledge Ingestion Pipelines
Ingest new documents into your RAG system automatically as they are created.
Jay Banlasan
The AI Systems Guy
When you automate knowledge ingestion into your rag pipeline, new documents become searchable without anyone pressing a button. I build these for teams that produce documents daily: meeting notes, contracts, reports, wiki updates. The pipeline watches for new files, processes them, and indexes them into the vector store automatically.
Your RAG system stays current without maintenance.
What You Need Before Starting
- A working RAG system (see system 409)
- A document source (shared drive, S3 bucket, or local folder)
- Python 3.8+ with watchdog, sentence-transformers, and chromadb
- A processing queue for reliability
Step 1: Watch for New Documents
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import os
class DocumentHandler(FileSystemEventHandler):
def __init__(self, process_fn):
self.process_fn = process_fn
def on_created(self, event):
if event.is_directory:
return
ext = os.path.splitext(event.src_path)[1].lower()
if ext in [".pdf", ".txt", ".md", ".docx"]:
print(f"New document: {event.src_path}")
self.process_fn(event.src_path)
def start_watcher(docs_path, process_fn):
observer = Observer()
observer.schedule(DocumentHandler(process_fn), docs_path, recursive=True)
observer.start()
return observer
Step 2: Build the Processing Pipeline
from langchain.document_loaders import PyPDFLoader, TextLoader, UnstructuredWordDocumentLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
LOADERS = {
".pdf": PyPDFLoader,
".txt": TextLoader,
".md": TextLoader,
".docx": UnstructuredWordDocumentLoader,
}
def process_document(file_path):
ext = os.path.splitext(file_path)[1].lower()
loader_cls = LOADERS.get(ext)
if not loader_cls:
print(f"Unsupported format: {ext}")
return
loader = loader_cls(file_path)
documents = loader.load()
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = splitter.split_documents(documents)
index_chunks(chunks, source=file_path)
log_ingestion(file_path, len(chunks))
print(f"Indexed {len(chunks)} chunks from {file_path}")
Step 3: Index Into the Vector Store
from sentence_transformers import SentenceTransformer
import chromadb
model = SentenceTransformer("all-MiniLM-L6-v2")
chroma = chromadb.PersistentClient(path="./rag_store")
collection = chroma.get_or_create_collection("documents")
def index_chunks(chunks, source):
for i, chunk in enumerate(chunks):
chunk_id = f"{os.path.basename(source)}_{i}"
embedding = model.encode(chunk.page_content).tolist()
collection.upsert(
ids=[chunk_id],
embeddings=[embedding],
documents=[chunk.page_content],
metadatas=[{"source": source, "page": chunk.metadata.get("page", 0), "indexed_at": datetime.now().isoformat()}]
)
Step 4: Add a Processing Queue
Handle bursts of new documents without overwhelming the system:
import queue
import threading
doc_queue = queue.Queue()
def queue_processor():
while True:
file_path = doc_queue.get()
try:
process_document(file_path)
except Exception as e:
log_error(file_path, str(e))
doc_queue.task_done()
worker = threading.Thread(target=queue_processor, daemon=True)
worker.start()
def enqueue_document(file_path):
doc_queue.put(file_path)
Step 5: Handle Updates and Deletions
When a document changes, re-index it. When deleted, remove it:
class DocumentHandler(FileSystemEventHandler):
def on_modified(self, event):
if not event.is_directory and is_supported(event.src_path):
remove_document_chunks(event.src_path)
enqueue_document(event.src_path)
def on_deleted(self, event):
if not event.is_directory:
remove_document_chunks(event.src_path)
def remove_document_chunks(file_path):
source_name = os.path.basename(file_path)
existing = collection.get(where={"source": file_path})
if existing["ids"]:
collection.delete(ids=existing["ids"])
What to Build Next
Add ingestion from cloud sources. Connect to Google Drive, Notion, or Confluence APIs and pull new content on a schedule. Most business knowledge lives in cloud tools, not local folders.
Related Reading
- The Pipeline Architecture - ingestion as a document pipeline
- Setting Up a Data Pipeline for Your Business - data pipeline fundamentals applied to documents
- The Centralized Brain Concept - automated ingestion keeps the brain current
Want this system built for your business?
Get a free assessment. We will map every system your business needs and show you the ROI.
Get Your Free Assessment