Systems Library / Infrastructure / How to Build a Cron Job Monitoring System
Infrastructure monitoring

How to Build a Cron Job Monitoring System

Monitor cron jobs and get alerts when scheduled tasks fail.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

A cron job monitoring system with failure alerts catches the silent killers in your infrastructure. Cron jobs fail quietly. No crash page, no error modal. The data just stops flowing and nobody notices until a client asks why their report is empty.

I track every scheduled task across my systems with a dead-simple heartbeat pattern.

What You Need Before Starting

Step 1: Create the Heartbeat Database

import sqlite3
from datetime import datetime

def init_cron_db(db_path="cron_monitor.db"):
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS cron_heartbeats (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            job_name TEXT,
            status TEXT,
            duration_seconds REAL,
            output TEXT,
            timestamp TEXT
        )
    """)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS cron_registry (
            job_name TEXT PRIMARY KEY,
            schedule TEXT,
            max_interval_minutes INTEGER,
            description TEXT
        )
    """)
    conn.commit()
    conn.close()

Step 2: Build the Heartbeat Wrapper

Every cron job calls this wrapper to report success or failure:

import time
import sys

def heartbeat(job_name, func, db_path="cron_monitor.db"):
    start = time.time()
    try:
        result = func()
        duration = time.time() - start
        log_heartbeat(job_name, "success", duration, str(result)[:500], db_path)
        return result
    except Exception as e:
        duration = time.time() - start
        log_heartbeat(job_name, "failed", duration, str(e)[:500], db_path)
        raise

def log_heartbeat(job_name, status, duration, output, db_path="cron_monitor.db"):
    conn = sqlite3.connect(db_path)
    conn.execute(
        "INSERT INTO cron_heartbeats (job_name, status, duration_seconds, output, timestamp) VALUES (?,?,?,?,?)",
        (job_name, status, round(duration, 2), output, datetime.utcnow().isoformat())
    )
    conn.commit()
    conn.close()

Step 3: Register Your Jobs

def register_job(job_name, schedule, max_interval_minutes, description="", db_path="cron_monitor.db"):
    conn = sqlite3.connect(db_path)
    conn.execute(
        "INSERT OR REPLACE INTO cron_registry (job_name, schedule, max_interval_minutes, description) VALUES (?,?,?,?)",
        (job_name, schedule, max_interval_minutes, description)
    )
    conn.commit()
    conn.close()

# Register your jobs once
register_job("meta_daily_pull", "0 6 * * *", 1500, "Pull Meta Ads data daily")
register_job("daily_report", "0 8 * * *", 1500, "Generate daily client report")
register_job("health_check", "*/5 * * * *", 10, "Check endpoint health")

Step 4: Build the Missing Job Detector

def check_for_missing_jobs(db_path="cron_monitor.db"):
    conn = sqlite3.connect(db_path)
    registry = conn.execute("SELECT job_name, max_interval_minutes FROM cron_registry").fetchall()
    
    missing = []
    for job_name, max_minutes in registry:
        last = conn.execute(
            "SELECT timestamp FROM cron_heartbeats WHERE job_name = ? ORDER BY timestamp DESC LIMIT 1",
            (job_name,)
        ).fetchone()
        
        if not last:
            missing.append(f"{job_name}: never ran")
            continue
        
        last_time = datetime.fromisoformat(last[0])
        minutes_ago = (datetime.utcnow() - last_time).total_seconds() / 60
        
        if minutes_ago > max_minutes:
            missing.append(f"{job_name}: last ran {int(minutes_ago)} minutes ago (max: {max_minutes})")
    
    conn.close()
    return missing

Step 5: Alert on Missing or Failed Jobs

import requests

def run_cron_monitor():
    missing = check_for_missing_jobs()
    
    conn = sqlite3.connect("cron_monitor.db")
    recent_failures = conn.execute("""
        SELECT job_name, output, timestamp FROM cron_heartbeats
        WHERE status = 'failed' AND timestamp > datetime('now', '-1 hour')
    """).fetchall()
    conn.close()
    
    alerts = []
    if missing:
        alerts.append("Missing jobs:\n" + "\n".join(f"  {m}" for m in missing))
    if recent_failures:
        alerts.append("Recent failures:\n" + "\n".join(
            f"  {f[0]}: {f[1][:100]}" for f in recent_failures
        ))
    
    if alerts:
        requests.post("YOUR_SLACK_WEBHOOK", json={
            "text": "Cron Monitor Alert:\n" + "\n".join(alerts)
        })

if __name__ == "__main__":
    run_cron_monitor()

Schedule the monitor itself:

*/10 * * * * python3 /root/monitoring/cron_monitor_check.py

What to Build Next

Add a dashboard that shows all registered jobs with their last run time, status, and average duration. Color-code green for healthy, yellow for slow, red for failed or missing.

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems