Systems Library / AI Model Setup / How to Build an AI Webhook Listener
AI Model Setup foundations

How to Build an AI Webhook Listener

Create a webhook endpoint that triggers AI processing on incoming events.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

An ai webhook listener automation setup is the glue between external systems and your AI processing pipeline. A form submission fires a webhook, your listener catches it, passes the data to Claude, and writes the result somewhere useful. No polling. No manual triggers. I use this pattern constantly: CRM events that trigger AI-written follow-up drafts, support tickets that get auto-triaged before a human sees them, and lead form submissions that generate personalized outreach in seconds.

The pattern is simple but the implementation details matter. You need fast response times, proper validation, async processing for anything over a second, and error handling that does not silently drop events.

What You Need Before Starting

Step 1: Build the Basic Listener

Start with a Flask endpoint that receives POST requests, validates them, and returns a 200 fast.

from flask import Flask, request, jsonify
import hashlib
import hmac
import os

app = Flask(__name__)

WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET", "your-secret-here")

def verify_signature(payload: bytes, signature: str, secret: str) -> bool:
    """Verify webhook signature to prevent fake requests."""
    expected = hmac.new(
        secret.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(f"sha256={expected}", signature)

@app.route("/webhook/inbound", methods=["POST"])
def inbound_webhook():
    # Get raw payload for signature verification
    payload = request.get_data()

    # Verify signature if the sender supports it
    signature = request.headers.get("X-Webhook-Signature", "")
    if signature and not verify_signature(payload, signature, WEBHOOK_SECRET):
        return jsonify({"error": "Invalid signature"}), 401

    # Parse the event
    data = request.get_json(silent=True)
    if not data:
        return jsonify({"error": "Invalid JSON"}), 400

    # Return 200 immediately, process async
    # (processing inline will timeout for slow AI calls)
    process_event_async(data)

    return jsonify({"received": True, "status": "queued"}), 200

def process_event_async(data: dict):
    """Process the event without blocking the HTTP response."""
    import threading
    thread = threading.Thread(target=process_event, args=(data,))
    thread.daemon = True
    thread.start()

Return 200 before any AI processing. Webhook senders retry if they do not get a fast response. A 30-second Claude call will cause duplicate processing.

Step 2: Build the Event Processor

The processor handles the actual AI logic. It reads the incoming data, builds a prompt, calls Claude, and routes the output.

import anthropic
import json
from datetime import datetime

client = anthropic.Anthropic()

def process_event(data: dict):
    event_type = data.get("type", "unknown")
    print(f"[{datetime.utcnow().isoformat()}] Processing event: {event_type}")

    try:
        if event_type == "form_submission":
            handle_form_submission(data)
        elif event_type == "support_ticket":
            handle_support_ticket(data)
        elif event_type == "lead_created":
            handle_new_lead(data)
        else:
            print(f"Unknown event type: {event_type}. Logging and skipping.")
            log_unhandled_event(data)
    except Exception as e:
        print(f"Error processing event {event_type}: {e}")
        log_error(event_type, data, str(e))

def handle_form_submission(data: dict):
    name = data.get("name", "Unknown")
    email = data.get("email", "")
    message = data.get("message", "")
    company = data.get("company", "")

    prompt = f"""A new contact form submission came in. Draft a personalized reply.

Contact details:
- Name: {name}
- Company: {company}
- Message: {message}

Write a reply that:
1. Acknowledges their specific message
2. Mentions a relevant next step
3. Is under 100 words
4. Sounds like it came from a real person, not a template

Reply only with the email body, no subject line."""

    response = client.messages.create(
        model="claude-haiku-4-5",
        max_tokens=300,
        messages=[{"role": "user", "content": prompt}]
    )

    reply_draft = response.content[0].text

    # Save to your CRM, database, or send to Slack
    save_reply_draft(email, reply_draft)
    notify_team(name, email, reply_draft)

Step 3: Build Output Routing

AI output needs to go somewhere useful. Build destination handlers.

import requests

SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")

def save_reply_draft(email: str, draft: str):
    """Save to a local file or database."""
    timestamp = datetime.utcnow().isoformat()
    entry = {
        "timestamp": timestamp,
        "email": email,
        "draft": draft
    }

    with open("reply_drafts.jsonl", "a") as f:
        f.write(json.dumps(entry) + "\n")

    print(f"Saved draft for {email}")

def notify_team(name: str, email: str, draft: str):
    """Post to Slack so a human can review and send."""
    if not SLACK_WEBHOOK_URL:
        return

    message = {
        "blocks": [
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"*New form submission from {name}* ({email})\n\n*AI Draft Reply:*\n{draft}"
                }
            },
            {
                "type": "actions",
                "elements": [
                    {
                        "type": "button",
                        "text": {"type": "plain_text", "text": "Send This"},
                        "style": "primary",
                        "value": email
                    },
                    {
                        "type": "button",
                        "text": {"type": "plain_text", "text": "Edit First"},
                        "value": email
                    }
                ]
            }
        ]
    }

    requests.post(SLACK_WEBHOOK_URL, json=message, timeout=5)

def log_unhandled_event(data: dict):
    with open("unhandled_events.jsonl", "a") as f:
        f.write(json.dumps({"timestamp": datetime.utcnow().isoformat(), "data": data}) + "\n")

def log_error(event_type: str, data: dict, error: str):
    with open("errors.jsonl", "a") as f:
        f.write(json.dumps({
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "error": error,
            "data": data
        }) + "\n")

Step 4: Handle Support Ticket Triage

A common real-world use case: classify incoming support tickets before a human reads them.

def handle_support_ticket(data: dict):
    ticket_id = data.get("id")
    subject = data.get("subject", "")
    body = data.get("body", "")
    customer_email = data.get("from_email", "")

    triage_prompt = f"""Classify this support ticket and extract key info.

Subject: {subject}
Body: {body}

Respond in JSON only:
{{
  "category": "billing|technical|account|feature_request|other",
  "priority": "urgent|high|normal|low",
  "sentiment": "positive|neutral|frustrated|angry",
  "summary": "one sentence",
  "suggested_team": "billing|engineering|success|sales"
}}"""

    response = client.messages.create(
        model="claude-haiku-4-5",
        max_tokens=200,
        messages=[{"role": "user", "content": triage_prompt}]
    )

    try:
        triage = json.loads(response.content[0].text)
        print(f"Ticket {ticket_id} classified: {triage['category']} / {triage['priority']}")

        # Route to the right team queue
        route_ticket(ticket_id, triage)
    except json.JSONDecodeError:
        print(f"Failed to parse triage response for ticket {ticket_id}")

def route_ticket(ticket_id: str, triage: dict):
    # Connect to your helpdesk API here
    print(f"Routing ticket {ticket_id} to {triage['suggested_team']} team")
    # e.g., update Zendesk tag, HubSpot ticket owner, etc.

Step 5: Add a Dead Letter Queue

Events that fail processing should not disappear. Log them for retry.

import json
from pathlib import Path

DEAD_LETTER_FILE = "dead_letter_queue.jsonl"

def add_to_dead_letter(data: dict, error: str, attempt: int = 1):
    entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "attempt": attempt,
        "error": error,
        "data": data
    }
    with open(DEAD_LETTER_FILE, "a") as f:
        f.write(json.dumps(entry) + "\n")

def retry_dead_letters():
    """Run this manually or on a cron to retry failed events."""
    if not Path(DEAD_LETTER_FILE).exists():
        return

    with open(DEAD_LETTER_FILE, "r") as f:
        entries = [json.loads(line) for line in f if line.strip()]

    processed = []
    still_failed = []

    for entry in entries:
        try:
            process_event(entry["data"])
            processed.append(entry)
            print(f"Retry succeeded for event from {entry['timestamp']}")
        except Exception as e:
            entry["attempt"] += 1
            entry["last_error"] = str(e)
            if entry["attempt"] < 3:
                still_failed.append(entry)
            else:
                print(f"Dropping event after 3 attempts: {entry['data']}")

    # Rewrite the file with only still-failing entries
    with open(DEAD_LETTER_FILE, "w") as f:
        for entry in still_failed:
            f.write(json.dumps(entry) + "\n")

    print(f"Retry complete: {len(processed)} succeeded, {len(still_failed)} still pending")

Step 6: Run and Test Locally

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000, debug=True)

Test with curl:

curl -X POST http://localhost:5000/webhook/inbound \
  -H "Content-Type: application/json" \
  -d '{"type": "form_submission", "name": "Sarah Chen", "email": "[email protected]", "company": "TechCorp", "message": "Looking for help automating our support workflow"}'

Expose publicly with ngrok for integration testing: ngrok http 5000. Paste the ngrok URL into whatever service you're connecting.

What to Build Next

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems