Back to Blog
    TechnicalAEOStrategy

    Agentic Workflow Patterns: Designing Processes for AI Agent Execution

    Design patterns for building workflows that AI agents can execute reliably. Covers human-in-the-loop patterns, autonomous execution, hybrid approaches, and best practices for agentic process design.

    Julia Maehler··3 min read

    Workflows designed for human execution don't automatically work for **AI agents**. Agents need explicit decision points, clear success criteria, and well-defined handoff mechanisms. This guide covers design patterns for creating **agentic workflows** that agents can execute reliably while maintaining appropriate human oversight.

    The Workflow Design Challenge

    Human vs. Agent Execution

    Human workflows assume people can handle ambiguity, implicitly know when to escalate, make judgment calls based on experience, adapt based on context, and communicate naturally with stakeholders.

    Agent workflows require the opposite — explicit decision criteria, defined escalation triggers, structured judgment frameworks, predictable adaptation rules, and formalized communication protocols.

    The Automation Spectrum

    Full Human ←————————————————————→ Full Automation
    
    Manual    Human-    Agent-    Autonomous   Fully
    Process   Assisted  Assisted  with         Autonomous
              Agent     Human     Oversight
    

    Most workflows should land somewhere in the middle, not at either extreme.

    Pattern 1: Human-in-the-Loop (HITL)

    When to Use

    • High-stakes decisions (financial, legal, safety)
    • Ambiguous situations requiring judgment
    • New processes not yet validated
    • Sensitive customer interactions
    • Compliance-required approvals

    Implementation

    class HITLWorkflow:
        def __init__(self, approval_required: list[str]):
            self.approval_required = approval_required
            self.pending_approvals = {}
    
        async def execute_step(self, step: dict, context: dict):
            if step["type"] in self.approval_required:
                # Agent prepares, human approves
                proposal = await self.agent.prepare(step, context)
                approval = await self.request_human_approval(proposal)
    
                if approval["approved"]:
                    return await self.agent.execute(step, context)
                else:
                    return await self.handle_rejection(approval, step)
            else:
                # Agent executes autonomously
                return await self.agent.execute(step, context)
    
        async def request_human_approval(self, proposal: dict):
            # Create approval request
            request = {
                "id": generate_id(),
                "proposal": proposal,
                "agent_confidence": proposal.get("confidence"),
                "risk_assessment": self.assess_risk(proposal),
                "deadline": datetime.now() + timedelta(hours=24)
            }
    
            # Send to approval queue
            await self.approval_queue.add(request)
    
            # Wait for human decision
            return await self.wait_for_approval(request["id"])
    

    HITL Patterns

    Pre-Execution Approval:

    Agent prepares → Human reviews → Human approves → Agent executes
    

    Post-Execution Review:

    Agent executes → Human reviews → Human confirms/reverts
    

    Parallel Review:

    Agent executes (provisionally) → Human reviews in parallel → Finalize or rollback
    

    Approval Interface Design

    @dataclass
    class ApprovalRequest:
        id: str
        summary: str              # Brief description for quick scan
        details: dict             # Full context for detailed review
        agent_recommendation: str # What the agent suggests
        confidence: float         # Agent's confidence (0-1)
        risk_level: str          # low, medium, high, critical
        impact_description: str   # What happens if approved
        alternatives: list        # Other options considered
        deadline: datetime        # When decision is needed
        context_links: list       # Links to relevant information
    

    UI Considerations: - Show summary first, details on expand - Highlight unusual or high-risk elements - Provide one-click approve/reject - Enable batch approvals for similar requests - Support delegation and escalation

    Pattern 2: Autonomous with Guardrails

    When to Use

    • Well-defined, repeatable processes
    • Low-risk operations
    • High-volume tasks
    • Time-sensitive execution
    • Cost-sensitive workflows

    Implementation

    class GuardedAutonomousWorkflow:
        def __init__(self):
            self.guardrails = [
                BudgetGuardrail(max_spend=1000),
                RateGuardrail(max_per_hour=100),
                ContentGuardrail(blocked_categories=["adult", "violence"]),
                ConfidenceGuardrail(min_confidence=0.85)
            ]
    
        async def execute(self, task: dict):
            # Pre-execution guardrail check
            for guardrail in self.guardrails:
                check = await guardrail.pre_check(task)
                if not check["pass"]:
                    return await self.handle_guardrail_block(guardrail, check)
    
            # Execute task
            result = await self.agent.execute(task)
    
            # Post-execution guardrail check
            for guardrail in self.guardrails:
                check = await guardrail.post_check(result)
                if not check["pass"]:
                    return await self.handle_guardrail_violation(guardrail, result)
    
            return result
    
    
    class BudgetGuardrail:
        def __init__(self, max_spend: float):
            self.max_spend = max_spend
            self.current_spend = 0
    
        async def pre_check(self, task: dict) -> dict:
            estimated_cost = self.estimate_cost(task)
            if self.current_spend + estimated_cost > self.max_spend:
                return {
                    "pass": False,
                    "reason": "Budget exceeded",
                    "details": {
                        "estimated": estimated_cost,
                        "remaining": self.max_spend - self.current_spend
                    }
                }
            return {"pass": True}
    
        async def post_check(self, result: dict) -> dict:
            actual_cost = result.get("cost", 0)
            self.current_spend += actual_cost
            return {"pass": True}
    

    Guardrail Categories

    Resource Guardrails: - Budget limits (spending caps) - Rate limits (operations per time period) - Volume limits (items per batch) - Time limits (max execution duration)

    Content Guardrails: - Toxicity filtering - PII detection - Brand safety - Compliance checking

    Behavioral Guardrails: - Confidence thresholds - Anomaly detection - Pattern matching for risky operations - Change magnitude limits

    Scope Guardrails: - Allowed actions whitelist - Resource access restrictions - External system limitations - Geographic restrictions

    Pattern 3: Escalation Chains

    When to Use

    • Tiered support systems
    • Progressive complexity handling
    • Expertise routing
    • Failure recovery

    Implementation

    class EscalationWorkflow:
        def __init__(self):
            self.tiers = [
                Tier(name="Auto", handler=self.auto_agent, escalation_criteria={
                    "confidence_below": 0.7,
                    "keywords": ["legal", "urgent", "executive"],
                    "error_types": ["validation_failed", "external_error"]
                }),
                Tier(name="L1 Agent", handler=self.l1_agent, escalation_criteria={
                    "confidence_below": 0.5,
                    "time_exceeded": timedelta(minutes=30),
                    "customer_requested": True
                }),
                Tier(name="L2 Human", handler=self.l2_queue, escalation_criteria={
                    "complexity_score_above": 0.8,
                    "financial_impact_above": 10000
                }),
                Tier(name="Specialist", handler=self.specialist_queue, escalation_criteria={
                    # Final tier - no automatic escalation
                })
            ]
    
        async def handle(self, request: dict):
            context = {"escalation_history": []}
    
            for tier in self.tiers:
                result = await self.try_tier(tier, request, context)
    
                if result["resolved"]:
                    return result
    
                if result["escalate"]:
                    context["escalation_history"].append({
                        "tier": tier.name,
                        "reason": result["escalation_reason"],
                        "partial_work": result.get("partial_work")
                    })
                    continue
    
                # Tier couldn't resolve and didn't escalate
                return result
    
            # Reached final tier
            return await self.final_tier_handling(request, context)
    
        async def try_tier(self, tier: Tier, request: dict, context: dict):
            try:
                result = await tier.handler.handle(request, context)
    
                # Check escalation criteria
                if self.should_escalate(result, tier.escalation_criteria):
                    return {
                        "resolved": False,
                        "escalate": True,
                        "escalation_reason": self.get_escalation_reason(result, tier),
                        "partial_work": result.get("partial_work")
                    }
    
                return {"resolved": True, "result": result}
    
            except Exception as e:
                return {
                    "resolved": False,
                    "escalate": True,
                    "escalation_reason": f"Error in {tier.name}: {str(e)}"
                }
    

    Escalation Design Principles

    Clear Triggers:

    ESCALATION_TRIGGERS = {
        "confidence_threshold": 0.7,    # Below this, escalate
        "error_count": 3,               # After this many errors
        "time_limit": timedelta(minutes=30),  # Exceeded time
        "customer_sentiment": "negative",     # Detected frustration
        "keywords": ["manager", "supervisor", "complaint", "legal"]
    }
    

    Context Preservation:

    class EscalationContext:
        def __init__(self):
            self.original_request = None
            self.tier_attempts = []
            self.partial_results = {}
            self.customer_messages = []
    
        def add_tier_attempt(self, tier: str, result: dict):
            self.tier_attempts.append({
                "tier": tier,
                "timestamp": datetime.now(),
                "result_summary": self.summarize(result),
                "work_completed": result.get("partial_work"),
                "reason_for_escalation": result.get("escalation_reason")
            })
    
        def handoff_summary(self) -> str:
            """Generate summary for next tier"""
            return f"""
            Original Request: {self.original_request}
    
            Previous Attempts:
            {self._format_attempts()}
    
            Work Completed:
            {self._format_partial_results()}
    
            Customer Sentiment: {self._assess_sentiment()}
            """
    

    Pattern 4: Parallel Processing with Merge

    When to Use

    • Independent subtasks
    • Time-sensitive workflows
    • Resource-intensive operations
    • Diverse expertise requirements

    Implementation

    class ParallelWorkflow:
        def __init__(self):
            self.workers = AgentPool()
    
        async def execute(self, task: dict):
            # Decompose into parallel tracks
            tracks = await self.decompose(task)
    
            # Execute tracks in parallel
            track_results = await asyncio.gather(*[
                self.execute_track(track)
                for track in tracks
            ], return_exceptions=True)
    
            # Handle failures
            successful = []
            failed = []
            for track, result in zip(tracks, track_results):
                if isinstance(result, Exception):
                    failed.append((track, result))
                else:
                    successful.append((track, result))
    
            # Merge successful results
            if successful:
                merged = await self.merge_results(successful)
    
                # Handle partial failure
                if failed:
                    merged["partial_failure"] = self.format_failures(failed)
    
                return merged
            else:
                raise AllTracksFailedError(failed)
    
        async def merge_results(self, track_results: list):
            """Merge results from parallel tracks"""
            # Use LLM to intelligently merge
            response = await self.llm.chat(
                messages=[{
                    "role": "system",
                    "content": """Merge these parallel workflow results into
                    a coherent final result. Resolve any conflicts by preferring
                    higher-confidence results."""
                }, {
                    "role": "user",
                    "content": json.dumps([
                        {"track": t["name"], "result": r}
                        for t, r in track_results
                    ])
                }]
            )
            return json.loads(response.content)
    

    Fork-Join Pattern

                        ┌─→ Track A ─→┐
            Fork ───────┼─→ Track B ─→┼─────→ Join
                        └─→ Track C ─→┘
    
    class ForkJoinWorkflow:
        async def execute(self, task: dict):
            # Fork
            subtasks = await self.fork(task)
    
            # Execute (with optional dependencies)
            results = {}
            pending = set(subtasks.keys())
    
            while pending:
                ready = [
                    task_id for task_id in pending
                    if self.dependencies_met(task_id, results)
                ]
    
                if not ready:
                    raise DeadlockError("Circular dependencies")
    
                batch_results = await asyncio.gather(*[
                    self.execute_subtask(subtasks[task_id])
                    for task_id in ready
                ])
    
                for task_id, result in zip(ready, batch_results):
                    results[task_id] = result
                    pending.remove(task_id)
    
            # Join
            return await self.join(results)
    

    Pattern 5: State Machine Workflows

    When to Use

    • Complex multi-step processes
    • Workflows with multiple possible paths
    • Processes requiring pause/resume
    • Auditable, traceable execution

    Implementation

    from enum import Enum
    from transitions import Machine
    
    class OrderState(Enum):
        CREATED = "created"
        VALIDATED = "validated"
        PAYMENT_PENDING = "payment_pending"
        PAYMENT_FAILED = "payment_failed"
        PAID = "paid"
        PROCESSING = "processing"
        SHIPPED = "shipped"
        DELIVERED = "delivered"
        CANCELLED = "cancelled"
        REFUNDED = "refunded"
    
    class OrderWorkflow:
        states = [s.value for s in OrderState]
    
        transitions = [
            {"trigger": "validate", "source": "created", "dest": "validated",
             "conditions": ["is_valid_order"]},
            {"trigger": "request_payment", "source": "validated", "dest": "payment_pending"},
            {"trigger": "payment_success", "source": "payment_pending", "dest": "paid"},
            {"trigger": "payment_failure", "source": "payment_pending", "dest": "payment_failed"},
            {"trigger": "retry_payment", "source": "payment_failed", "dest": "payment_pending",
             "conditions": ["retry_allowed"]},
            {"trigger": "start_processing", "source": "paid", "dest": "processing"},
            {"trigger": "ship", "source": "processing", "dest": "shipped"},
            {"trigger": "deliver", "source": "shipped", "dest": "delivered"},
            {"trigger": "cancel", "source": ["created", "validated", "payment_pending"],
             "dest": "cancelled"},
            {"trigger": "refund", "source": ["paid", "processing"], "dest": "refunded"},
        ]
    
        def __init__(self, order_id: str):
            self.order_id = order_id
            self.machine = Machine(
                model=self,
                states=self.states,
                transitions=self.transitions,
                initial="created"
            )
            self.history = []
    
        def on_enter_state(self):
            """Called on every state transition"""
            self.history.append({
                "state": self.state,
                "timestamp": datetime.now(),
                "agent": current_agent_id()
            })
    
        def is_valid_order(self) -> bool:
            """Condition for validate transition"""
            return self.agent.validate_order(self.order_id)
    
        def retry_allowed(self) -> bool:
            """Condition for retry_payment transition"""
            failures = len([h for h in self.history
                           if h["state"] == "payment_failed"])
            return failures < 3
    

    Agent Integration with State Machines

    class StateMachineAgent:
        async def advance_workflow(self, workflow: OrderWorkflow):
            """Attempt to advance the workflow based on current state"""
    
            state_handlers = {
                "created": self.handle_created,
                "validated": self.handle_validated,
                "payment_pending": self.handle_payment_pending,
                "payment_failed": self.handle_payment_failed,
                "paid": self.handle_paid,
                "processing": self.handle_processing,
                "shipped": self.handle_shipped
            }
    
            handler = state_handlers.get(workflow.state)
            if handler:
                await handler(workflow)
    
        async def handle_created(self, workflow: OrderWorkflow):
            # Validate the order
            is_valid = await self.validate_order(workflow.order_id)
            if is_valid:
                workflow.validate()
            else:
                workflow.cancel()
    
        async def handle_validated(self, workflow: OrderWorkflow):
            # Request payment
            workflow.request_payment()
            await self.initiate_payment(workflow.order_id)
    

    Pattern 6: Event-Driven Workflows

    When to Use

    • Reactive processes (respond to external events)
    • Long-running workflows
    • Integration with external systems
    • Loosely coupled processes

    Implementation

    class EventDrivenWorkflow:
        def __init__(self):
            self.handlers = {}
            self.event_bus = EventBus()
    
        def on_event(self, event_type: str):
            """Decorator to register event handlers"""
            def decorator(func):
                self.handlers[event_type] = func
                return func
            return decorator
    
        async def start(self):
            await self.event_bus.subscribe("*", self.handle_event)
    
        async def handle_event(self, event: dict):
            handler = self.handlers.get(event["type"])
            if handler:
                await handler(event)
    
    
    # Usage
    workflow = EventDrivenWorkflow()
    
    @workflow.on_event("order.created")
    async def handle_order_created(event: dict):
        order = event["data"]
        # Validate inventory
        inventory_ok = await check_inventory(order["items"])
        if inventory_ok:
            await event_bus.publish({
                "type": "order.validated",
                "data": order
            })
        else:
            await event_bus.publish({
                "type": "order.validation_failed",
                "data": {"order": order, "reason": "insufficient_inventory"}
            })
    
    @workflow.on_event("order.validated")
    async def handle_order_validated(event: dict):
        order = event["data"]
        # Process payment
        payment_result = await process_payment(order)
        await event_bus.publish({
            "type": f"payment.{payment_result['status']}",
            "data": {"order": order, "payment": payment_result}
        })
    

    Designing for Reliability

    Idempotency

    Ensure operations can be safely retried:

    class IdempotentWorkflowStep:
        async def execute(self, step_id: str, params: dict):
            # Check if already executed
            existing = await self.get_execution(step_id)
            if existing:
                return existing["result"]
    
            # Execute
            result = await self.do_execute(params)
    
            # Record execution
            await self.record_execution(step_id, params, result)
    
            return result
    

    Checkpointing

    Save progress for long-running workflows:

    class CheckpointedWorkflow:
        async def execute(self, workflow_id: str, steps: list):
            checkpoint = await self.load_checkpoint(workflow_id)
            start_index = checkpoint["last_completed"] + 1 if checkpoint else 0
    
            for i, step in enumerate(steps[start_index:], start_index):
                result = await self.execute_step(step)
    
                # Save checkpoint
                await self.save_checkpoint(workflow_id, {
                    "last_completed": i,
                    "results": {**checkpoint.get("results", {}), step["id"]: result}
                })
    
            return await self.load_checkpoint(workflow_id)
    

    Timeout and Retry

    class ResilientWorkflowStep:
        async def execute(self, params: dict):
            for attempt in range(self.max_retries):
                try:
                    return await asyncio.wait_for(
                        self.do_execute(params),
                        timeout=self.timeout_seconds
                    )
                except asyncio.TimeoutError:
                    if attempt == self.max_retries - 1:
                        raise
                    await asyncio.sleep(self.backoff(attempt))
                except RetryableError:
                    if attempt == self.max_retries - 1:
                        raise
                    await asyncio.sleep(self.backoff(attempt))
    
        def backoff(self, attempt: int) -> float:
            return min(2 ** attempt + random.random(), 60)
    

    Monitoring and Observability

    Workflow Metrics

    WORKFLOW_METRICS = {
        # Completion metrics
        "workflows_started": Counter,
        "workflows_completed": Counter,
        "workflows_failed": Counter,
    
        # Duration metrics
        "workflow_duration_seconds": Histogram,
        "step_duration_seconds": Histogram,
    
        # State metrics
        "workflows_by_state": Gauge,
        "pending_human_approvals": Gauge,
    
        # Error metrics
        "step_errors": Counter,
        "escalations": Counter,
        "retries": Counter
    }
    

    Tracing

    class TracedWorkflow:
        async def execute(self, task: dict):
            with tracer.start_span("workflow.execute") as span:
                span.set_attribute("workflow.type", self.workflow_type)
                span.set_attribute("task.id", task["id"])
    
                for step in self.steps:
                    with tracer.start_span(f"step.{step.name}") as step_span:
                        step_span.set_attribute("step.index", step.index)
                        result = await step.execute(task)
                        step_span.set_attribute("step.result", result["status"])
    
                span.set_attribute("workflow.result", "success")
    

    Implementation Checklist

    Design Phase - [ ] Identify all decision points - [ ] Define escalation criteria - [ ] Specify human touchpoints - [ ] Document state transitions - [ ] Plan failure recovery

    Implementation Phase - [ ] Implement state machine or workflow engine - [ ] Add idempotency to all steps - [ ] Build checkpointing - [ ] Create approval interfaces - [ ] Add monitoring and tracing

    Operations Phase - [ ] Set up alerting for stuck workflows - [ ] Create dashboards for workflow health - [ ] Document escalation procedures - [ ] Train human reviewers - [ ] Establish SLAs for human steps

    Conclusion

    Agentic workflows require explicit design for what humans take for granted. The key patterns are:

    1. HITL: Human approval at critical points
    2. Autonomous with Guardrails: Let agents run within boundaries
    3. Escalation Chains: Progressive complexity handling
    4. Parallel Processing: Independent work with merge
    5. State Machines: Explicit states and transitions
    6. Event-Driven: Reactive, loosely coupled processes

    Start with more human oversight than you think you need, then reduce it as you build confidence. It's easier to remove guardrails than to recover from autonomous mistakes.

    The goal is reliable execution: workflows that complete successfully, handle errors gracefully, and maintain appropriate human oversight for your risk tolerance.

    Design for the failure cases. The happy path is easy; the edge cases determine whether your workflow is production-ready.