Systems Library / AI Capabilities / How to Create Automated Knowledge Ingestion Pipelines
AI Capabilities rag knowledge

How to Create Automated Knowledge Ingestion Pipelines

Ingest new documents into your RAG system automatically as they are created.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

When you automate knowledge ingestion into your rag pipeline, new documents become searchable without anyone pressing a button. I build these for teams that produce documents daily: meeting notes, contracts, reports, wiki updates. The pipeline watches for new files, processes them, and indexes them into the vector store automatically.

Your RAG system stays current without maintenance.

What You Need Before Starting

Step 1: Watch for New Documents

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import os

class DocumentHandler(FileSystemEventHandler):
    def __init__(self, process_fn):
        self.process_fn = process_fn

    def on_created(self, event):
        if event.is_directory:
            return
        ext = os.path.splitext(event.src_path)[1].lower()
        if ext in [".pdf", ".txt", ".md", ".docx"]:
            print(f"New document: {event.src_path}")
            self.process_fn(event.src_path)

def start_watcher(docs_path, process_fn):
    observer = Observer()
    observer.schedule(DocumentHandler(process_fn), docs_path, recursive=True)
    observer.start()
    return observer

Step 2: Build the Processing Pipeline

from langchain.document_loaders import PyPDFLoader, TextLoader, UnstructuredWordDocumentLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

LOADERS = {
    ".pdf": PyPDFLoader,
    ".txt": TextLoader,
    ".md": TextLoader,
    ".docx": UnstructuredWordDocumentLoader,
}

def process_document(file_path):
    ext = os.path.splitext(file_path)[1].lower()
    loader_cls = LOADERS.get(ext)
    if not loader_cls:
        print(f"Unsupported format: {ext}")
        return

    loader = loader_cls(file_path)
    documents = loader.load()

    splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
    chunks = splitter.split_documents(documents)

    index_chunks(chunks, source=file_path)
    log_ingestion(file_path, len(chunks))
    print(f"Indexed {len(chunks)} chunks from {file_path}")

Step 3: Index Into the Vector Store

from sentence_transformers import SentenceTransformer
import chromadb

model = SentenceTransformer("all-MiniLM-L6-v2")
chroma = chromadb.PersistentClient(path="./rag_store")
collection = chroma.get_or_create_collection("documents")

def index_chunks(chunks, source):
    for i, chunk in enumerate(chunks):
        chunk_id = f"{os.path.basename(source)}_{i}"
        embedding = model.encode(chunk.page_content).tolist()
        collection.upsert(
            ids=[chunk_id],
            embeddings=[embedding],
            documents=[chunk.page_content],
            metadatas=[{"source": source, "page": chunk.metadata.get("page", 0), "indexed_at": datetime.now().isoformat()}]
        )

Step 4: Add a Processing Queue

Handle bursts of new documents without overwhelming the system:

import queue
import threading

doc_queue = queue.Queue()

def queue_processor():
    while True:
        file_path = doc_queue.get()
        try:
            process_document(file_path)
        except Exception as e:
            log_error(file_path, str(e))
        doc_queue.task_done()

worker = threading.Thread(target=queue_processor, daemon=True)
worker.start()

def enqueue_document(file_path):
    doc_queue.put(file_path)

Step 5: Handle Updates and Deletions

When a document changes, re-index it. When deleted, remove it:

class DocumentHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if not event.is_directory and is_supported(event.src_path):
            remove_document_chunks(event.src_path)
            enqueue_document(event.src_path)

    def on_deleted(self, event):
        if not event.is_directory:
            remove_document_chunks(event.src_path)

def remove_document_chunks(file_path):
    source_name = os.path.basename(file_path)
    existing = collection.get(where={"source": file_path})
    if existing["ids"]:
        collection.delete(ids=existing["ids"])

What to Build Next

Add ingestion from cloud sources. Connect to Google Drive, Notion, or Confluence APIs and pull new content on a schedule. Most business knowledge lives in cloud tools, not local folders.

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems