Systems Library / Operations & Admin / How to Build a Task Dependency Management System
Operations & Admin process workflow

How to Build a Task Dependency Management System

Manage task dependencies and trigger next steps automatically.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

Tasks that depend on other tasks get stuck when nobody monitors the chain. I built a task dependency management automation system that tracks what blocks what, automatically triggers downstream tasks when upstream ones complete, and alerts people when the critical path is at risk.

The system prevents the "I was waiting for you" problem.

What You Need Before Starting

Step 1: Create the Task and Dependency Database

import sqlite3
from datetime import datetime

def init_task_db(db_path="tasks.db"):
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS tasks (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT,
            assignee TEXT,
            status TEXT DEFAULT 'blocked',
            due_date TEXT,
            completed_at TEXT,
            project TEXT
        )
    """)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS dependencies (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            task_id INTEGER,
            depends_on INTEGER,
            FOREIGN KEY (task_id) REFERENCES tasks(id),
            FOREIGN KEY (depends_on) REFERENCES tasks(id)
        )
    """)
    conn.commit()
    return conn

Step 2: Add Tasks with Dependencies

def add_task(conn, title, assignee, due_date, project, depends_on=None):
    cursor = conn.execute(
        "INSERT INTO tasks (title, assignee, due_date, project) VALUES (?,?,?,?)",
        (title, assignee, due_date, project)
    )
    task_id = cursor.lastrowid

    if depends_on:
        for dep_id in depends_on:
            conn.execute(
                "INSERT INTO dependencies (task_id, depends_on) VALUES (?,?)",
                (task_id, dep_id)
            )

    update_task_status(conn, task_id)
    conn.commit()
    return task_id

def update_task_status(conn, task_id):
    deps = conn.execute(
        "SELECT depends_on FROM dependencies WHERE task_id=?", (task_id,)
    ).fetchall()

    if not deps:
        conn.execute("UPDATE tasks SET status='ready' WHERE id=? AND status='blocked'", (task_id,))
        return

    all_complete = all(
        conn.execute("SELECT status FROM tasks WHERE id=?", (d[0],)).fetchone()[0] == "complete"
        for d in deps
    )

    new_status = "ready" if all_complete else "blocked"
    conn.execute("UPDATE tasks SET status=? WHERE id=? AND status != 'complete'", (new_status, task_id))

Step 3: Complete Tasks and Trigger Downstream

def complete_task(conn, task_id):
    conn.execute(
        "UPDATE tasks SET status='complete', completed_at=? WHERE id=?",
        (datetime.now().isoformat(), task_id)
    )

    downstream = conn.execute(
        "SELECT task_id FROM dependencies WHERE depends_on=?", (task_id,)
    ).fetchall()

    unblocked = []
    for (child_id,) in downstream:
        update_task_status(conn, child_id)
        status = conn.execute("SELECT status FROM tasks WHERE id=?", (child_id,)).fetchone()[0]
        if status == "ready":
            task = conn.execute("SELECT title, assignee FROM tasks WHERE id=?", (child_id,)).fetchone()
            unblocked.append({"id": child_id, "title": task[0], "assignee": task[1]})

    conn.commit()
    return unblocked

Step 4: Find the Critical Path

def get_critical_path(conn, project):
    tasks = conn.execute(
        "SELECT id, title, status, due_date FROM tasks WHERE project=? AND status != 'complete'",
        (project,)
    ).fetchall()

    blocked_chains = []
    for task in tasks:
        chain = trace_dependency_chain(conn, task[0])
        blocked_chains.append({"task": task[1], "chain_length": len(chain), "chain": chain})

    blocked_chains.sort(key=lambda x: -x["chain_length"])
    return blocked_chains

def trace_dependency_chain(conn, task_id, visited=None):
    if visited is None:
        visited = set()
    if task_id in visited:
        return []
    visited.add(task_id)

    deps = conn.execute(
        "SELECT depends_on FROM dependencies WHERE task_id=?", (task_id,)
    ).fetchall()

    chain = [task_id]
    for (dep_id,) in deps:
        status = conn.execute("SELECT status FROM tasks WHERE id=?", (dep_id,)).fetchone()[0]
        if status != "complete":
            chain.extend(trace_dependency_chain(conn, dep_id, visited))

    return chain

Step 5: Generate a Project Status View

def project_status(conn, project):
    tasks = conn.execute(
        "SELECT title, assignee, status, due_date FROM tasks WHERE project=?",
        (project,)
    ).fetchall()

    report = f"# Project: {project}\n\n"
    for status_type in ["blocked", "ready", "complete"]:
        filtered = [t for t in tasks if t[2] == status_type]
        if filtered:
            report += f"## {status_type.title()} ({len(filtered)})\n\n"
            for t in filtered:
                report += f"- {t[0]} ({t[1]}) - due {t[3]}\n"
            report += "\n"

    return report

What to Build Next

Add time estimates to each task so you can calculate the projected project completion date. When a task runs late, the system recalculates the entire downstream timeline and alerts everyone affected.

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems