How to Create Automated Escalation Notification Chains
Escalate issues automatically through the right people when unresolved.
Jay Banlasan
The AI Systems Guy
Issues sit unresolved when there is no escalation path. I built a system to automate escalation notification chains that progressively notifies higher levels of management when issues remain open past their SLA. First the assignee. Then the team lead. Then the director.
Automatic escalation means nothing stays stuck because one person dropped the ball.
What You Need Before Starting
- Python 3.8+
- SQLite for issue tracking
- Slack or email for notifications
- Escalation chain definitions
Step 1: Define Escalation Chains
ESCALATION_CHAINS = {
"customer_issue": {
"levels": [
{"level": 1, "role": "support_agent", "wait_hours": 4},
{"level": 2, "role": "support_lead", "wait_hours": 8},
{"level": 3, "role": "support_manager", "wait_hours": 24},
{"level": 4, "role": "vp_customer_success", "wait_hours": 48}
]
},
"system_outage": {
"levels": [
{"level": 1, "role": "on_call_engineer", "wait_hours": 0.25},
{"level": 2, "role": "engineering_manager", "wait_hours": 1},
{"level": 3, "role": "cto", "wait_hours": 2}
]
}
}
ROLE_CONTACTS = {
"support_agent": "support-team@slack",
"support_lead": "[email protected]",
"support_manager": "[email protected]",
"vp_customer_success": "[email protected]",
"on_call_engineer": "oncall@slack",
"engineering_manager": "[email protected]",
"cto": "[email protected]"
}
Step 2: Create the Issue Tracker
import sqlite3
from datetime import datetime
def init_escalation_db(db_path="escalation.db"):
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS issues (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
description TEXT,
chain_type TEXT,
current_level INTEGER DEFAULT 1,
status TEXT DEFAULT 'open',
created_at TEXT,
resolved_at TEXT,
last_escalated_at TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS escalation_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
issue_id INTEGER,
level INTEGER,
notified_role TEXT,
notified_contact TEXT,
escalated_at TEXT
)
""")
conn.commit()
return conn
Step 3: Create and Escalate Issues
from datetime import timedelta
def create_issue(conn, title, description, chain_type):
conn.execute(
"INSERT INTO issues (title, description, chain_type, created_at) VALUES (?,?,?,?)",
(title, description, chain_type, datetime.now().isoformat())
)
conn.commit()
issue_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
chain = ESCALATION_CHAINS[chain_type]
first_level = chain["levels"][0]
notify_and_log(conn, issue_id, first_level, title)
return issue_id
def check_for_escalations(conn):
open_issues = conn.execute(
"SELECT id, title, chain_type, current_level, last_escalated_at FROM issues WHERE status='open'"
).fetchall()
for issue_id, title, chain_type, level, last_esc in open_issues:
chain = ESCALATION_CHAINS[chain_type]
if level >= len(chain["levels"]):
continue
current_config = chain["levels"][level - 1]
last_time = datetime.fromisoformat(last_esc) if last_esc else datetime.fromisoformat(
conn.execute("SELECT created_at FROM issues WHERE id=?", (issue_id,)).fetchone()[0]
)
hours_since = (datetime.now() - last_time).total_seconds() / 3600
if hours_since >= current_config["wait_hours"]:
next_level = chain["levels"][level] if level < len(chain["levels"]) else None
if next_level:
conn.execute(
"UPDATE issues SET current_level=?, last_escalated_at=? WHERE id=?",
(level + 1, datetime.now().isoformat(), issue_id)
)
notify_and_log(conn, issue_id, next_level, title)
conn.commit()
Step 4: Notify and Log
def notify_and_log(conn, issue_id, level_config, title):
role = level_config["role"]
contact = ROLE_CONTACTS.get(role, role)
conn.execute(
"INSERT INTO escalation_log (issue_id, level, notified_role, notified_contact, escalated_at) VALUES (?,?,?,?,?)",
(issue_id, level_config["level"], role, contact, datetime.now().isoformat())
)
conn.commit()
level_label = f"Level {level_config['level']}"
print(f"[ESCALATION {level_label}] Issue #{issue_id}: '{title}' -> {role} ({contact})")
Step 5: Resolve and Report
def resolve_issue(conn, issue_id):
conn.execute(
"UPDATE issues SET status='resolved', resolved_at=? WHERE id=?",
(datetime.now().isoformat(), issue_id)
)
conn.commit()
def escalation_report(conn, days=7):
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
rows = conn.execute("""
SELECT i.title, i.chain_type, i.current_level, i.status,
i.created_at, i.resolved_at
FROM issues i WHERE i.created_at > ?
ORDER BY i.current_level DESC
""", (cutoff,)).fetchall()
report = f"# Escalation Report ({days} days)\n\n"
report += f"Total issues: {len(rows)}\n\n"
for r in rows:
status_icon = "Resolved" if r[3] == "resolved" else f"Level {r[2]}"
report += f"- {r[0]} ({r[1]}) - {status_icon}\n"
return report
# Schedule: run every 30 minutes
# */30 * * * * python3 /path/to/escalation_checker.py
What to Build Next
Add SLA tracking that measures time-to-resolution at each escalation level. Identify which issue types escalate most often and why. The escalation chain is the safety net. SLA analysis tells you where the process breaks down.
Related Reading
- Building Your First Automation: A Complete Guide - automation fundamentals
- Identifying Your Biggest Bottleneck - escalation patterns reveal bottlenecks
- The Feedback Loop That Powers Everything - escalation as a feedback mechanism
Want this system built for your business?
Get a free assessment. We will map every system your business needs and show you the ROI.
Get Your Free Assessment