Systems Library / Infrastructure / How to Create a Security Monitoring and Alert System
Infrastructure monitoring

How to Create a Security Monitoring and Alert System

Monitor for security threats and get instant alerts on suspicious activity.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

A security monitoring alert system for threat detection is something you build before you need it. I set this up on every server I manage after watching a brute force attack hit 10,000 login attempts in an hour on a client's VPS. The attack failed, but nobody knew it happened until I checked logs two days later.

Now I get alerts in real time.

What You Need Before Starting

Step 1: Build the Log Parser

import re
from datetime import datetime, timedelta

def parse_auth_log(log_path="/var/log/auth.log", hours=1):
    failed_logins = {}
    successful_logins = []
    cutoff = datetime.now() - timedelta(hours=hours)
    
    with open(log_path) as f:
        for line in f:
            if "Failed password" in line:
                ip_match = re.search(r"from (\d+\.\d+\.\d+\.\d+)", line)
                if ip_match:
                    ip = ip_match.group(1)
                    failed_logins[ip] = failed_logins.get(ip, 0) + 1
            
            elif "Accepted" in line:
                ip_match = re.search(r"from (\d+\.\d+\.\d+\.\d+)", line)
                user_match = re.search(r"for (\w+)", line)
                if ip_match and user_match:
                    successful_logins.append({
                        "user": user_match.group(1),
                        "ip": ip_match.group(1),
                        "line": line.strip()
                    })
    
    return {"failed": failed_logins, "successful": successful_logins}

Step 2: Define Threat Rules

THREAT_RULES = {
    "brute_force_threshold": 20,
    "known_bad_ips": [],
    "allowed_ssh_ips": ["YOUR_HOME_IP", "YOUR_OFFICE_IP"],
    "watch_users": ["root"]
}

def detect_threats(log_data):
    threats = []
    
    for ip, count in log_data["failed"].items():
        if count >= THREAT_RULES["brute_force_threshold"]:
            threats.append({
                "type": "brute_force",
                "severity": "high",
                "detail": f"IP {ip} had {count} failed login attempts"
            })
    
    for login in log_data["successful"]:
        if login["ip"] not in THREAT_RULES["allowed_ssh_ips"]:
            threats.append({
                "type": "unknown_ip_login",
                "severity": "critical",
                "detail": f"User {login['user']} logged in from unknown IP {login['ip']}"
            })
        
        if login["user"] in THREAT_RULES["watch_users"]:
            threats.append({
                "type": "root_login",
                "severity": "warning",
                "detail": f"Root login from {login['ip']}"
            })
    
    return threats

Step 3: Add Auto-Ban for Brute Force

import subprocess

def ban_ip(ip):
    subprocess.run(["iptables", "-A", "INPUT", "-s", ip, "-j", "DROP"], check=True)
    return f"Banned {ip}"

def auto_ban_attackers(log_data, threshold=50):
    banned = []
    for ip, count in log_data["failed"].items():
        if count >= threshold and ip not in THREAT_RULES["allowed_ssh_ips"]:
            ban_ip(ip)
            banned.append(f"{ip} ({count} attempts)")
    return banned

Step 4: Log Threats to Database

import sqlite3

def init_security_db(db_path="security.db"):
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS threats (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            threat_type TEXT,
            severity TEXT,
            detail TEXT,
            action_taken TEXT,
            detected_at TEXT
        )
    """)
    conn.commit()
    conn.close()

def log_threat(threat, action="notified", db_path="security.db"):
    conn = sqlite3.connect(db_path)
    conn.execute(
        "INSERT INTO threats (threat_type, severity, detail, action_taken, detected_at) VALUES (?,?,?,?,?)",
        (threat["type"], threat["severity"], threat["detail"], action, datetime.utcnow().isoformat())
    )
    conn.commit()
    conn.close()

Step 5: Wire Up Alerts and Schedule

import requests

def run_security_scan():
    init_security_db()
    log_data = parse_auth_log(hours=1)
    threats = detect_threats(log_data)
    banned = auto_ban_attackers(log_data)
    
    for threat in threats:
        action = "auto-banned" if threat["type"] == "brute_force" else "notified"
        log_threat(threat, action)
    
    if threats:
        message = "Security Alert:\n" + "\n".join(
            f"  [{t['severity'].upper()}] {t['detail']}" for t in threats
        )
        if banned:
            message += f"\n\nAuto-banned: {', '.join(banned)}"
        requests.post("YOUR_SLACK_WEBHOOK", json={"text": message})

if __name__ == "__main__":
    run_security_scan()
*/5 * * * * python3 /root/monitoring/security_scan.py

What to Build Next

Add file integrity monitoring. Hash critical config files and alert when anything changes unexpectedly. That catches compromises that slip past login monitoring.

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems