How to Build a Workflow Automation with Conditional Logic
Create workflows that branch and adapt based on data and conditions.
Jay Banlasan
The AI Systems Guy
Most workflow tools only handle straight-line processes. Real work branches. I built a workflow automation with conditional logic that evaluates data at each step, takes different paths based on conditions, and handles exceptions without human intervention.
If this, then that. But for real business processes with multiple branches.
What You Need Before Starting
- Python 3.8+
- A workflow definition format (defined below)
- SQLite for execution tracking
- Notification system for human-in-the-loop steps
Step 1: Define the Workflow Schema
WORKFLOW_DEFINITION = {
"name": "new_client_onboarding",
"steps": [
{
"id": "collect_info",
"type": "action",
"action": "send_intake_form",
"next": "evaluate_size"
},
{
"id": "evaluate_size",
"type": "condition",
"field": "annual_revenue",
"branches": [
{"condition": "greater_than", "value": 1000000, "next": "enterprise_track"},
{"condition": "greater_than", "value": 100000, "next": "mid_market_track"},
{"condition": "default", "next": "small_business_track"}
]
},
{
"id": "enterprise_track",
"type": "action",
"action": "assign_senior_manager",
"next": "schedule_kickoff"
},
{
"id": "mid_market_track",
"type": "action",
"action": "assign_account_manager",
"next": "schedule_kickoff"
},
{
"id": "small_business_track",
"type": "action",
"action": "send_self_serve_guide",
"next": "end"
},
{
"id": "schedule_kickoff",
"type": "action",
"action": "create_kickoff_meeting",
"next": "end"
}
]
}
Step 2: Build the Workflow Engine
class WorkflowEngine:
def __init__(self, definition):
self.definition = definition
self.steps = {s["id"]: s for s in definition["steps"]}
self.actions = {}
def register_action(self, name, func):
self.actions[name] = func
def evaluate_condition(self, step, context):
field_value = context.get(step["field"])
for branch in step["branches"]:
if branch["condition"] == "default":
return branch["next"]
elif branch["condition"] == "greater_than" and field_value > branch["value"]:
return branch["next"]
elif branch["condition"] == "equals" and field_value == branch["value"]:
return branch["next"]
elif branch["condition"] == "contains" and branch["value"] in str(field_value):
return branch["next"]
return step["branches"][-1]["next"]
def execute(self, context):
current = self.definition["steps"][0]["id"]
execution_log = []
while current != "end":
step = self.steps[current]
execution_log.append({"step": current, "type": step["type"]})
if step["type"] == "condition":
current = self.evaluate_condition(step, context)
elif step["type"] == "action":
action_func = self.actions.get(step["action"])
if action_func:
result = action_func(context)
context.update(result or {})
current = step.get("next", "end")
return execution_log
Step 3: Register Action Handlers
def send_intake_form(context):
print(f"Sending intake form to {context.get('client_email')}")
return {"form_sent": True}
def assign_senior_manager(context):
print(f"Assigning senior manager for enterprise client {context.get('client_name')}")
return {"assigned_to": "senior_manager"}
def assign_account_manager(context):
print(f"Assigning account manager for {context.get('client_name')}")
return {"assigned_to": "account_manager"}
def send_self_serve_guide(context):
print(f"Sending self-serve guide to {context.get('client_email')}")
return {"track": "self_serve"}
engine = WorkflowEngine(WORKFLOW_DEFINITION)
engine.register_action("send_intake_form", send_intake_form)
engine.register_action("assign_senior_manager", assign_senior_manager)
engine.register_action("assign_account_manager", assign_account_manager)
engine.register_action("send_self_serve_guide", send_self_serve_guide)
Step 4: Execute and Track
from datetime import datetime
import json
def run_workflow(engine, context):
context["started_at"] = datetime.now().isoformat()
log = engine.execute(context)
context["completed_at"] = datetime.now().isoformat()
context["execution_log"] = log
print(f"Workflow complete. Steps executed: {len(log)}")
for entry in log:
print(f" {entry['step']} ({entry['type']})")
return context
# Example
result = run_workflow(engine, {
"client_name": "Acme Corp",
"client_email": "[email protected]",
"annual_revenue": 500000
})
Step 5: Add Error Handling and Retries
class ResilientWorkflowEngine(WorkflowEngine):
def execute_step(self, step, context, max_retries=3):
for attempt in range(max_retries):
try:
action_func = self.actions.get(step["action"])
if action_func:
return action_func(context)
return {}
except Exception as e:
if attempt == max_retries - 1:
return {"error": str(e), "step": step["id"], "failed": True}
print(f"Retry {attempt + 1} for {step['id']}: {e}")
What to Build Next
Add parallel branches where multiple paths execute simultaneously and converge at a join point. Real workflows often have steps that can happen at the same time. Parallel execution cuts total workflow time.
Related Reading
- Input, Process, Output: The Universal AI Framework - the pattern behind workflow design
- Building Your First Automation: A Complete Guide - automation fundamentals
- The Automation Decision Tree - deciding what workflows to automate
Want this system built for your business?
Get a free assessment. We will map every system your business needs and show you the ROI.
Get Your Free Assessment