Systems Library / Industry Applications / How to Build a Real Estate CRM Automation System
Industry Applications real estate

How to Build a Real Estate CRM Automation System

Automate CRM workflows specific to real estate sales cycles.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

A real estate CRM automation workflow system handles the repetitive pipeline management that eats an agent's day. I built this to move leads through stages automatically, trigger the right follow-ups at the right time, and flag deals that need attention. The agent focuses on conversations and showings while the system manages everything else.

What You Need Before Starting

Step 1: Define the Sales Pipeline

import sqlite3
from datetime import datetime

PIPELINE_STAGES = [
    "new_lead",
    "contacted",
    "qualified",
    "showing_scheduled",
    "showing_complete",
    "offer_submitted",
    "under_contract",
    "closed"
]

def init_crm_db(db_path="real_estate_crm.db"):
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS contacts (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT,
            email TEXT,
            phone TEXT,
            stage TEXT DEFAULT 'new_lead',
            lead_source TEXT,
            budget_min INTEGER,
            budget_max INTEGER,
            property_type TEXT,
            preferred_areas TEXT,
            assigned_agent TEXT,
            notes TEXT,
            created_at TEXT,
            updated_at TEXT
        )
    """)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS activities (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            contact_id INTEGER,
            activity_type TEXT,
            description TEXT,
            created_at TEXT
        )
    """)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS tasks (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            contact_id INTEGER,
            task TEXT,
            due_date TEXT,
            completed INTEGER DEFAULT 0,
            created_at TEXT
        )
    """)
    conn.commit()
    conn.close()

Step 2: Build Stage Transition Rules

STAGE_RULES = {
    "new_lead": {
        "auto_actions": ["send_welcome_email", "create_call_task"],
        "max_days": 2,
        "stale_action": "send_reminder_to_agent"
    },
    "contacted": {
        "auto_actions": ["schedule_qualification_call"],
        "max_days": 5,
        "stale_action": "send_reengagement_email"
    },
    "qualified": {
        "auto_actions": ["send_property_matches", "create_showing_task"],
        "max_days": 7,
        "stale_action": "alert_agent_stale_qualified"
    },
    "showing_scheduled": {
        "auto_actions": ["send_showing_confirmation", "send_showing_reminder"],
        "max_days": 3,
        "stale_action": "check_showing_status"
    },
    "showing_complete": {
        "auto_actions": ["send_feedback_request", "create_followup_task"],
        "max_days": 2,
        "stale_action": "prompt_agent_feedback"
    }
}

def advance_stage(contact_id, new_stage, db_path="real_estate_crm.db"):
    conn = sqlite3.connect(db_path)
    conn.execute(
        "UPDATE contacts SET stage = ?, updated_at = ? WHERE id = ?",
        (new_stage, datetime.utcnow().isoformat(), contact_id)
    )
    conn.execute(
        "INSERT INTO activities (contact_id, activity_type, description, created_at) VALUES (?,?,?,?)",
        (contact_id, "stage_change", f"Moved to {new_stage}", datetime.utcnow().isoformat())
    )
    conn.commit()
    conn.close()
    
    run_stage_actions(contact_id, new_stage)

Step 3: Execute Automated Actions

def run_stage_actions(contact_id, stage, db_path="real_estate_crm.db"):
    rules = STAGE_RULES.get(stage, {})
    actions = rules.get("auto_actions", [])
    
    conn = sqlite3.connect(db_path)
    contact = conn.execute("SELECT * FROM contacts WHERE id = ?", (contact_id,)).fetchone()
    conn.close()
    
    if not contact:
        return
    
    contact_dict = {"id": contact[0], "name": contact[1], "email": contact[2], "phone": contact[3]}
    
    for action in actions:
        if action == "send_welcome_email":
            send_templated_email(contact_dict, "welcome")
        elif action == "create_call_task":
            create_task(contact_id, f"Call {contact_dict['name']} to qualify", hours_from_now=4)
        elif action == "send_property_matches":
            send_templated_email(contact_dict, "property_matches")
        elif action == "create_showing_task":
            create_task(contact_id, f"Schedule showing for {contact_dict['name']}", hours_from_now=24)

def create_task(contact_id, task_text, hours_from_now=24, db_path="real_estate_crm.db"):
    from datetime import timedelta
    due = (datetime.utcnow() + timedelta(hours=hours_from_now)).isoformat()
    conn = sqlite3.connect(db_path)
    conn.execute(
        "INSERT INTO tasks (contact_id, task, due_date, created_at) VALUES (?,?,?,?)",
        (contact_id, task_text, due, datetime.utcnow().isoformat())
    )
    conn.commit()
    conn.close()

Step 4: Build the Stale Deal Detector

def check_stale_deals(db_path="real_estate_crm.db"):
    conn = sqlite3.connect(db_path)
    stale_deals = []
    
    for stage, rules in STAGE_RULES.items():
        max_days = rules.get("max_days", 7)
        contacts = conn.execute("""
            SELECT id, name, stage, updated_at FROM contacts
            WHERE stage = ? AND updated_at < datetime('now', ?)
        """, (stage, f"-{max_days} days")).fetchall()
        
        for c in contacts:
            stale_deals.append({
                "id": c[0], "name": c[1], "stage": c[2],
                "last_updated": c[3], "action": rules.get("stale_action", "notify")
            })
    
    conn.close()
    return stale_deals

Step 5: Daily Pipeline Report

def generate_pipeline_report(db_path="real_estate_crm.db"):
    conn = sqlite3.connect(db_path)
    report = {}
    
    for stage in PIPELINE_STAGES:
        count = conn.execute("SELECT COUNT(*) FROM contacts WHERE stage = ?", (stage,)).fetchone()[0]
        report[stage] = count
    
    overdue_tasks = conn.execute("""
        SELECT t.task, c.name FROM tasks t
        JOIN contacts c ON t.contact_id = c.id
        WHERE t.completed = 0 AND t.due_date < datetime('now')
    """).fetchall()
    
    conn.close()
    
    lines = ["Pipeline Summary:"]
    for stage, count in report.items():
        lines.append(f"  {stage.replace('_', ' ').title()}: {count}")
    
    if overdue_tasks:
        lines.append(f"\nOverdue Tasks ({len(overdue_tasks)}):")
        for task, name in overdue_tasks[:5]:
            lines.append(f"  {name}: {task}")
    
    return "\n".join(lines)

What to Build Next

Add property matching. When a new listing hits the market that fits a qualified lead's criteria, automatically send them the listing and alert the agent.

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems