Systems Library / AI Capabilities / How to Build RAG with Permission-Based Access Control
AI Capabilities rag knowledge

How to Build RAG with Permission-Based Access Control

Implement access controls so users only see documents they are authorized to view.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

RAG with permission-based access control and secure knowledge retrieval prevents users from accessing documents they should not see. I build these for organizations where different teams have different clearance levels. Sales should not see HR documents. Interns should not see financial projections. The AI only retrieves from documents the user is authorized to access.

Without this, RAG becomes a security hole.

What You Need Before Starting

Step 1: Define Permission Model

PERMISSION_LEVELS = {
    "public": 0,
    "internal": 1,
    "confidential": 2,
    "restricted": 3,
}

ROLE_PERMISSIONS = {
    "admin": {"level": 3, "departments": ["all"]},
    "manager": {"level": 2, "departments": ["own"]},
    "employee": {"level": 1, "departments": ["own"]},
    "intern": {"level": 0, "departments": ["own"]},
    "external": {"level": 0, "departments": []},
}

Step 2: Index Documents with Permissions

def index_with_permissions(document, collection):
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer("all-MiniLM-L6-v2")

    embedding = model.encode(document["content"]).tolist()
    collection.add(
        ids=[document["id"]],
        embeddings=[embedding],
        documents=[document["content"]],
        metadatas=[{
            "source": document["source"],
            "permission_level": document.get("permission_level", "internal"),
            "department": document.get("department", "general"),
            "owner": document.get("owner", ""),
        }]
    )

Step 3: Filter Retrieval by User Permissions

def secure_query(question, user, collection, top_k=5):
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer("all-MiniLM-L6-v2")

    role = ROLE_PERMISSIONS.get(user["role"], ROLE_PERMISSIONS["external"])
    max_level = role["level"]

    allowed_levels = [level for level, value in PERMISSION_LEVELS.items() if value <= max_level]

    where_filter = {"permission_level": {"$in": allowed_levels}}

    if "all" not in role["departments"]:
        user_dept = user.get("department", "general")
        where_filter = {
            "$and": [
                {"permission_level": {"$in": allowed_levels}},
                {"$or": [
                    {"department": user_dept},
                    {"department": "general"}
                ]}
            ]
        }

    query_embedding = model.encode(question).tolist()
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k,
        where=where_filter
    )
    return results

Step 4: Generate Permission-Aware Answers

import anthropic

client = anthropic.Anthropic()

def ask_with_permissions(question, user, collection):
    results = secure_query(question, user, collection)

    if not results["documents"][0]:
        return {"answer": "I could not find relevant information you have access to.", "sources": []}

    context = "\n\n".join(results["documents"][0])

    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=500,
        system="Answer from the provided context only. Never hint that additional restricted documents exist.",
        messages=[{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}]
    )

    return {
        "answer": response.content[0].text,
        "sources": [m["source"] for m in results["metadatas"][0]]
    }

Step 5: Audit Access Logs

import sqlite3

def log_access(user_id, user_role, question, documents_accessed):
    conn = sqlite3.connect("access_audit.db")
    conn.execute("""
        INSERT INTO access_log (user_id, user_role, question, docs_accessed, accessed_at)
        VALUES (?, ?, ?, ?, datetime('now'))
    """, (user_id, user_role, question, json.dumps(documents_accessed)))
    conn.commit()

def audit_report(days=30):
    conn = sqlite3.connect("access_audit.db")
    return conn.execute("""
        SELECT user_role, COUNT(*), COUNT(DISTINCT user_id)
        FROM access_log WHERE accessed_at > datetime('now', ?)
        GROUP BY user_role
    """, (f"-{days} days",)).fetchall()

What to Build Next

Add dynamic permission inheritance. When a document references another document, check if the user has access to both before including the reference in the answer. This prevents information leakage through cross-references.

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems