Infrastructure
monitoring
How to Create a Security Monitoring and Alert System
Monitor for security threats and get instant alerts on suspicious activity.
Jay Banlasan
The AI Systems Guy
A security monitoring alert system for threat detection is something you build before you need it. I set this up on every server I manage after watching a brute force attack hit 10,000 login attempts in an hour on a client's VPS. The attack failed, but nobody knew it happened until I checked logs two days later.
Now I get alerts in real time.
What You Need Before Starting
- A Linux VPS with SSH access
- Python 3.8+
- Access to system logs (/var/log/auth.log, /var/log/syslog)
- Slack webhook for alerts
Step 1: Build the Log Parser
import re
from datetime import datetime, timedelta
def parse_auth_log(log_path="/var/log/auth.log", hours=1):
failed_logins = {}
successful_logins = []
cutoff = datetime.now() - timedelta(hours=hours)
with open(log_path) as f:
for line in f:
if "Failed password" in line:
ip_match = re.search(r"from (\d+\.\d+\.\d+\.\d+)", line)
if ip_match:
ip = ip_match.group(1)
failed_logins[ip] = failed_logins.get(ip, 0) + 1
elif "Accepted" in line:
ip_match = re.search(r"from (\d+\.\d+\.\d+\.\d+)", line)
user_match = re.search(r"for (\w+)", line)
if ip_match and user_match:
successful_logins.append({
"user": user_match.group(1),
"ip": ip_match.group(1),
"line": line.strip()
})
return {"failed": failed_logins, "successful": successful_logins}
Step 2: Define Threat Rules
THREAT_RULES = {
"brute_force_threshold": 20,
"known_bad_ips": [],
"allowed_ssh_ips": ["YOUR_HOME_IP", "YOUR_OFFICE_IP"],
"watch_users": ["root"]
}
def detect_threats(log_data):
threats = []
for ip, count in log_data["failed"].items():
if count >= THREAT_RULES["brute_force_threshold"]:
threats.append({
"type": "brute_force",
"severity": "high",
"detail": f"IP {ip} had {count} failed login attempts"
})
for login in log_data["successful"]:
if login["ip"] not in THREAT_RULES["allowed_ssh_ips"]:
threats.append({
"type": "unknown_ip_login",
"severity": "critical",
"detail": f"User {login['user']} logged in from unknown IP {login['ip']}"
})
if login["user"] in THREAT_RULES["watch_users"]:
threats.append({
"type": "root_login",
"severity": "warning",
"detail": f"Root login from {login['ip']}"
})
return threats
Step 3: Add Auto-Ban for Brute Force
import subprocess
def ban_ip(ip):
subprocess.run(["iptables", "-A", "INPUT", "-s", ip, "-j", "DROP"], check=True)
return f"Banned {ip}"
def auto_ban_attackers(log_data, threshold=50):
banned = []
for ip, count in log_data["failed"].items():
if count >= threshold and ip not in THREAT_RULES["allowed_ssh_ips"]:
ban_ip(ip)
banned.append(f"{ip} ({count} attempts)")
return banned
Step 4: Log Threats to Database
import sqlite3
def init_security_db(db_path="security.db"):
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS threats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
threat_type TEXT,
severity TEXT,
detail TEXT,
action_taken TEXT,
detected_at TEXT
)
""")
conn.commit()
conn.close()
def log_threat(threat, action="notified", db_path="security.db"):
conn = sqlite3.connect(db_path)
conn.execute(
"INSERT INTO threats (threat_type, severity, detail, action_taken, detected_at) VALUES (?,?,?,?,?)",
(threat["type"], threat["severity"], threat["detail"], action, datetime.utcnow().isoformat())
)
conn.commit()
conn.close()
Step 5: Wire Up Alerts and Schedule
import requests
def run_security_scan():
init_security_db()
log_data = parse_auth_log(hours=1)
threats = detect_threats(log_data)
banned = auto_ban_attackers(log_data)
for threat in threats:
action = "auto-banned" if threat["type"] == "brute_force" else "notified"
log_threat(threat, action)
if threats:
message = "Security Alert:\n" + "\n".join(
f" [{t['severity'].upper()}] {t['detail']}" for t in threats
)
if banned:
message += f"\n\nAuto-banned: {', '.join(banned)}"
requests.post("YOUR_SLACK_WEBHOOK", json={"text": message})
if __name__ == "__main__":
run_security_scan()
*/5 * * * * python3 /root/monitoring/security_scan.py
What to Build Next
Add file integrity monitoring. Hash critical config files and alert when anything changes unexpectedly. That catches compromises that slip past login monitoring.
Related Reading
- Designing for Failure - building resilient systems from the start
- The Monitoring Stack - how security monitoring fits the bigger picture
- How to Build Automated Alerts That Actually Help - alert design patterns
Want this system built for your business?
Get a free assessment. We will map every system your business needs and show you the ROI.
Get Your Free Assessment