Systems Library / Platform Integrations / How to Build a Pipedrive Workflow Automation System
Platform Integrations crm platforms

How to Build a Pipedrive Workflow Automation System

Automate Pipedrive deal management with custom workflow triggers.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

I build pipedrive workflow automation system setup systems to automate deal management with custom triggers. This is one of the highest-impact automations for any sales team because it removes the manual data work that slows down revenue.

What You Need

Step 1: Connect to the Pipedrive API

pip install requests anthropic python-dotenv
import requests
import os
from dotenv import load_dotenv

load_dotenv()

API_TOKEN = os.getenv("PIPEDRIVE_TOKEN")
BASE_URL = os.getenv("PIPEDRIVE_API_URL", "https://api.pipedrive.com")

headers = {
    "Authorization": f"Bearer {API_TOKEN}",
    "Content-Type": "application/json"
}

def fetch_records(endpoint, limit=100):
    response = requests.get(
        f"{BASE_URL}/{endpoint}",
        headers=headers,
        params={"limit": limit}
    )
    return response.json().get("results", response.json().get("data", []))

Step 2: Build the Processing Logic

import anthropic
import json

claude = anthropic.Anthropic()

def process_records(records):
    processed = []
    for record in records:
        result = analyze_record(record)
        processed.append(result)
    return processed

def analyze_record(record):
    message = claude.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=500,
        system="Analyze this CRM record. Return JSON with: action (update/skip/flag), reason, and any recommended field changes.",
        messages=[{"role": "user", "content": json.dumps(record)}]
    )
    return {
        "id": record.get("id", ""),
        "analysis": json.loads(message.content[0].text)
    }

Step 3: Apply Updates

def update_record(record_id, properties, endpoint="contacts"):
    response = requests.patch(
        f"{BASE_URL}/{endpoint}/{record_id}",
        headers=headers,
        json={"properties": properties}
    )
    if response.ok:
        print(f"Updated {record_id}")
    return response.json()

def batch_update(processed_records, endpoint="contacts"):
    updated = 0
    for record in processed_records:
        if record["analysis"]["action"] == "update":
            changes = record["analysis"].get("changes", {})
            update_record(record["id"], changes, endpoint)
            updated += 1
    print(f"Updated {updated} of {len(processed_records)} records")

Step 4: Log and Schedule

import sqlite3
from datetime import datetime

def log_run(action, count):
    conn = sqlite3.connect("crm_automation.db")
    conn.execute("""CREATE TABLE IF NOT EXISTS runs (
        action TEXT, count INTEGER, ran_at TEXT
    )""")
    conn.execute("INSERT INTO runs VALUES (?, ?, ?)",
        (action, count, datetime.now().isoformat()))
    conn.commit()

def main():
    records = fetch_records("contacts")
    processed = process_records(records)
    batch_update(processed)
    log_run("Pipedrive workflow automation", len(processed))
    print(f"Completed at {datetime.now().isoformat()}")

main()

What to Build Next

Add a webhook listener so this runs in real time when records change, not just on a schedule. That turns batch processing into instant automation that your sales team feels immediately.

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems