How to Build a Task Dependency Management System
Manage task dependencies and trigger next steps automatically.
Jay Banlasan
The AI Systems Guy
Tasks that depend on other tasks get stuck when nobody monitors the chain. I built a task dependency management automation system that tracks what blocks what, automatically triggers downstream tasks when upstream ones complete, and alerts people when the critical path is at risk.
The system prevents the "I was waiting for you" problem.
What You Need Before Starting
- Python 3.8+
- SQLite for task and dependency storage
- A notification system (Slack or email)
Step 1: Create the Task and Dependency Database
import sqlite3
from datetime import datetime
def init_task_db(db_path="tasks.db"):
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
assignee TEXT,
status TEXT DEFAULT 'blocked',
due_date TEXT,
completed_at TEXT,
project TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS dependencies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER,
depends_on INTEGER,
FOREIGN KEY (task_id) REFERENCES tasks(id),
FOREIGN KEY (depends_on) REFERENCES tasks(id)
)
""")
conn.commit()
return conn
Step 2: Add Tasks with Dependencies
def add_task(conn, title, assignee, due_date, project, depends_on=None):
cursor = conn.execute(
"INSERT INTO tasks (title, assignee, due_date, project) VALUES (?,?,?,?)",
(title, assignee, due_date, project)
)
task_id = cursor.lastrowid
if depends_on:
for dep_id in depends_on:
conn.execute(
"INSERT INTO dependencies (task_id, depends_on) VALUES (?,?)",
(task_id, dep_id)
)
update_task_status(conn, task_id)
conn.commit()
return task_id
def update_task_status(conn, task_id):
deps = conn.execute(
"SELECT depends_on FROM dependencies WHERE task_id=?", (task_id,)
).fetchall()
if not deps:
conn.execute("UPDATE tasks SET status='ready' WHERE id=? AND status='blocked'", (task_id,))
return
all_complete = all(
conn.execute("SELECT status FROM tasks WHERE id=?", (d[0],)).fetchone()[0] == "complete"
for d in deps
)
new_status = "ready" if all_complete else "blocked"
conn.execute("UPDATE tasks SET status=? WHERE id=? AND status != 'complete'", (new_status, task_id))
Step 3: Complete Tasks and Trigger Downstream
def complete_task(conn, task_id):
conn.execute(
"UPDATE tasks SET status='complete', completed_at=? WHERE id=?",
(datetime.now().isoformat(), task_id)
)
downstream = conn.execute(
"SELECT task_id FROM dependencies WHERE depends_on=?", (task_id,)
).fetchall()
unblocked = []
for (child_id,) in downstream:
update_task_status(conn, child_id)
status = conn.execute("SELECT status FROM tasks WHERE id=?", (child_id,)).fetchone()[0]
if status == "ready":
task = conn.execute("SELECT title, assignee FROM tasks WHERE id=?", (child_id,)).fetchone()
unblocked.append({"id": child_id, "title": task[0], "assignee": task[1]})
conn.commit()
return unblocked
Step 4: Find the Critical Path
def get_critical_path(conn, project):
tasks = conn.execute(
"SELECT id, title, status, due_date FROM tasks WHERE project=? AND status != 'complete'",
(project,)
).fetchall()
blocked_chains = []
for task in tasks:
chain = trace_dependency_chain(conn, task[0])
blocked_chains.append({"task": task[1], "chain_length": len(chain), "chain": chain})
blocked_chains.sort(key=lambda x: -x["chain_length"])
return blocked_chains
def trace_dependency_chain(conn, task_id, visited=None):
if visited is None:
visited = set()
if task_id in visited:
return []
visited.add(task_id)
deps = conn.execute(
"SELECT depends_on FROM dependencies WHERE task_id=?", (task_id,)
).fetchall()
chain = [task_id]
for (dep_id,) in deps:
status = conn.execute("SELECT status FROM tasks WHERE id=?", (dep_id,)).fetchone()[0]
if status != "complete":
chain.extend(trace_dependency_chain(conn, dep_id, visited))
return chain
Step 5: Generate a Project Status View
def project_status(conn, project):
tasks = conn.execute(
"SELECT title, assignee, status, due_date FROM tasks WHERE project=?",
(project,)
).fetchall()
report = f"# Project: {project}\n\n"
for status_type in ["blocked", "ready", "complete"]:
filtered = [t for t in tasks if t[2] == status_type]
if filtered:
report += f"## {status_type.title()} ({len(filtered)})\n\n"
for t in filtered:
report += f"- {t[0]} ({t[1]}) - due {t[3]}\n"
report += "\n"
return report
What to Build Next
Add time estimates to each task so you can calculate the projected project completion date. When a task runs late, the system recalculates the entire downstream timeline and alerts everyone affected.
Related Reading
- Identifying Your Biggest Bottleneck - finding what blocks your projects
- The Feedback Loop That Powers Everything - feedback in project workflows
- Building Your First Automation: A Complete Guide - building automation fundamentals
Want this system built for your business?
Get a free assessment. We will map every system your business needs and show you the ROI.
Get Your Free Assessment