Systems Library / Operations & Admin / How to Build a Workflow Automation with Conditional Logic
Operations & Admin process workflow

How to Build a Workflow Automation with Conditional Logic

Create workflows that branch and adapt based on data and conditions.

Jay Banlasan

Jay Banlasan

The AI Systems Guy

Most workflow tools only handle straight-line processes. Real work branches. I built a workflow automation with conditional logic that evaluates data at each step, takes different paths based on conditions, and handles exceptions without human intervention.

If this, then that. But for real business processes with multiple branches.

What You Need Before Starting

Step 1: Define the Workflow Schema

WORKFLOW_DEFINITION = {
    "name": "new_client_onboarding",
    "steps": [
        {
            "id": "collect_info",
            "type": "action",
            "action": "send_intake_form",
            "next": "evaluate_size"
        },
        {
            "id": "evaluate_size",
            "type": "condition",
            "field": "annual_revenue",
            "branches": [
                {"condition": "greater_than", "value": 1000000, "next": "enterprise_track"},
                {"condition": "greater_than", "value": 100000, "next": "mid_market_track"},
                {"condition": "default", "next": "small_business_track"}
            ]
        },
        {
            "id": "enterprise_track",
            "type": "action",
            "action": "assign_senior_manager",
            "next": "schedule_kickoff"
        },
        {
            "id": "mid_market_track",
            "type": "action",
            "action": "assign_account_manager",
            "next": "schedule_kickoff"
        },
        {
            "id": "small_business_track",
            "type": "action",
            "action": "send_self_serve_guide",
            "next": "end"
        },
        {
            "id": "schedule_kickoff",
            "type": "action",
            "action": "create_kickoff_meeting",
            "next": "end"
        }
    ]
}

Step 2: Build the Workflow Engine

class WorkflowEngine:
    def __init__(self, definition):
        self.definition = definition
        self.steps = {s["id"]: s for s in definition["steps"]}
        self.actions = {}

    def register_action(self, name, func):
        self.actions[name] = func

    def evaluate_condition(self, step, context):
        field_value = context.get(step["field"])

        for branch in step["branches"]:
            if branch["condition"] == "default":
                return branch["next"]
            elif branch["condition"] == "greater_than" and field_value > branch["value"]:
                return branch["next"]
            elif branch["condition"] == "equals" and field_value == branch["value"]:
                return branch["next"]
            elif branch["condition"] == "contains" and branch["value"] in str(field_value):
                return branch["next"]

        return step["branches"][-1]["next"]

    def execute(self, context):
        current = self.definition["steps"][0]["id"]
        execution_log = []

        while current != "end":
            step = self.steps[current]
            execution_log.append({"step": current, "type": step["type"]})

            if step["type"] == "condition":
                current = self.evaluate_condition(step, context)
            elif step["type"] == "action":
                action_func = self.actions.get(step["action"])
                if action_func:
                    result = action_func(context)
                    context.update(result or {})
                current = step.get("next", "end")

        return execution_log

Step 3: Register Action Handlers

def send_intake_form(context):
    print(f"Sending intake form to {context.get('client_email')}")
    return {"form_sent": True}

def assign_senior_manager(context):
    print(f"Assigning senior manager for enterprise client {context.get('client_name')}")
    return {"assigned_to": "senior_manager"}

def assign_account_manager(context):
    print(f"Assigning account manager for {context.get('client_name')}")
    return {"assigned_to": "account_manager"}

def send_self_serve_guide(context):
    print(f"Sending self-serve guide to {context.get('client_email')}")
    return {"track": "self_serve"}

engine = WorkflowEngine(WORKFLOW_DEFINITION)
engine.register_action("send_intake_form", send_intake_form)
engine.register_action("assign_senior_manager", assign_senior_manager)
engine.register_action("assign_account_manager", assign_account_manager)
engine.register_action("send_self_serve_guide", send_self_serve_guide)

Step 4: Execute and Track

from datetime import datetime
import json

def run_workflow(engine, context):
    context["started_at"] = datetime.now().isoformat()
    log = engine.execute(context)
    context["completed_at"] = datetime.now().isoformat()
    context["execution_log"] = log

    print(f"Workflow complete. Steps executed: {len(log)}")
    for entry in log:
        print(f"  {entry['step']} ({entry['type']})")

    return context

# Example
result = run_workflow(engine, {
    "client_name": "Acme Corp",
    "client_email": "[email protected]",
    "annual_revenue": 500000
})

Step 5: Add Error Handling and Retries

class ResilientWorkflowEngine(WorkflowEngine):
    def execute_step(self, step, context, max_retries=3):
        for attempt in range(max_retries):
            try:
                action_func = self.actions.get(step["action"])
                if action_func:
                    return action_func(context)
                return {}
            except Exception as e:
                if attempt == max_retries - 1:
                    return {"error": str(e), "step": step["id"], "failed": True}
                print(f"Retry {attempt + 1} for {step['id']}: {e}")

What to Build Next

Add parallel branches where multiple paths execute simultaneously and converge at a join point. Real workflows often have steps that can happen at the same time. Parallel execution cuts total workflow time.

Related Reading

Want this system built for your business?

Get a free assessment. We will map every system your business needs and show you the ROI.

Get Your Free Assessment

Related Systems