Agentic Workflow Patterns: Designing Processes for AI Agent Execution
Design patterns for building workflows that AI agents can execute reliably. Covers human-in-the-loop patterns, autonomous execution, hybrid approaches, and best practices for agentic process design.
Workflows designed for human execution don't automatically work for **AI agents**. Agents need explicit decision points, clear success criteria, and well-defined handoff mechanisms. This guide covers design patterns for creating **agentic workflows** that agents can execute reliably while maintaining appropriate human oversight.
The Workflow Design Challenge
Human vs. Agent Execution
Human workflows assume people can handle ambiguity, implicitly know when to escalate, make judgment calls based on experience, adapt based on context, and communicate naturally with stakeholders.
Agent workflows require the opposite — explicit decision criteria, defined escalation triggers, structured judgment frameworks, predictable adaptation rules, and formalized communication protocols.
The Automation Spectrum
Full Human ←————————————————————→ Full Automation
Manual Human- Agent- Autonomous Fully
Process Assisted Assisted with Autonomous
Agent Human Oversight
Most workflows should land somewhere in the middle, not at either extreme.
Pattern 1: Human-in-the-Loop (HITL)
When to Use
- High-stakes decisions (financial, legal, safety)
- Ambiguous situations requiring judgment
- New processes not yet validated
- Sensitive customer interactions
- Compliance-required approvals
Implementation
class HITLWorkflow:
def __init__(self, approval_required: list[str]):
self.approval_required = approval_required
self.pending_approvals = {}
async def execute_step(self, step: dict, context: dict):
if step["type"] in self.approval_required:
# Agent prepares, human approves
proposal = await self.agent.prepare(step, context)
approval = await self.request_human_approval(proposal)
if approval["approved"]:
return await self.agent.execute(step, context)
else:
return await self.handle_rejection(approval, step)
else:
# Agent executes autonomously
return await self.agent.execute(step, context)
async def request_human_approval(self, proposal: dict):
# Create approval request
request = {
"id": generate_id(),
"proposal": proposal,
"agent_confidence": proposal.get("confidence"),
"risk_assessment": self.assess_risk(proposal),
"deadline": datetime.now() + timedelta(hours=24)
}
# Send to approval queue
await self.approval_queue.add(request)
# Wait for human decision
return await self.wait_for_approval(request["id"])
HITL Patterns
Pre-Execution Approval:
Agent prepares → Human reviews → Human approves → Agent executes
Post-Execution Review:
Agent executes → Human reviews → Human confirms/reverts
Parallel Review:
Agent executes (provisionally) → Human reviews in parallel → Finalize or rollback
Approval Interface Design
@dataclass
class ApprovalRequest:
id: str
summary: str # Brief description for quick scan
details: dict # Full context for detailed review
agent_recommendation: str # What the agent suggests
confidence: float # Agent's confidence (0-1)
risk_level: str # low, medium, high, critical
impact_description: str # What happens if approved
alternatives: list # Other options considered
deadline: datetime # When decision is needed
context_links: list # Links to relevant information
UI Considerations: - Show summary first, details on expand - Highlight unusual or high-risk elements - Provide one-click approve/reject - Enable batch approvals for similar requests - Support delegation and escalation
Pattern 2: Autonomous with Guardrails
When to Use
- Well-defined, repeatable processes
- Low-risk operations
- High-volume tasks
- Time-sensitive execution
- Cost-sensitive workflows
Implementation
class GuardedAutonomousWorkflow:
def __init__(self):
self.guardrails = [
BudgetGuardrail(max_spend=1000),
RateGuardrail(max_per_hour=100),
ContentGuardrail(blocked_categories=["adult", "violence"]),
ConfidenceGuardrail(min_confidence=0.85)
]
async def execute(self, task: dict):
# Pre-execution guardrail check
for guardrail in self.guardrails:
check = await guardrail.pre_check(task)
if not check["pass"]:
return await self.handle_guardrail_block(guardrail, check)
# Execute task
result = await self.agent.execute(task)
# Post-execution guardrail check
for guardrail in self.guardrails:
check = await guardrail.post_check(result)
if not check["pass"]:
return await self.handle_guardrail_violation(guardrail, result)
return result
class BudgetGuardrail:
def __init__(self, max_spend: float):
self.max_spend = max_spend
self.current_spend = 0
async def pre_check(self, task: dict) -> dict:
estimated_cost = self.estimate_cost(task)
if self.current_spend + estimated_cost > self.max_spend:
return {
"pass": False,
"reason": "Budget exceeded",
"details": {
"estimated": estimated_cost,
"remaining": self.max_spend - self.current_spend
}
}
return {"pass": True}
async def post_check(self, result: dict) -> dict:
actual_cost = result.get("cost", 0)
self.current_spend += actual_cost
return {"pass": True}
Guardrail Categories
Resource Guardrails: - Budget limits (spending caps) - Rate limits (operations per time period) - Volume limits (items per batch) - Time limits (max execution duration)
Content Guardrails: - Toxicity filtering - PII detection - Brand safety - Compliance checking
Behavioral Guardrails: - Confidence thresholds - Anomaly detection - Pattern matching for risky operations - Change magnitude limits
Scope Guardrails: - Allowed actions whitelist - Resource access restrictions - External system limitations - Geographic restrictions
Pattern 3: Escalation Chains
When to Use
- Tiered support systems
- Progressive complexity handling
- Expertise routing
- Failure recovery
Implementation
class EscalationWorkflow:
def __init__(self):
self.tiers = [
Tier(name="Auto", handler=self.auto_agent, escalation_criteria={
"confidence_below": 0.7,
"keywords": ["legal", "urgent", "executive"],
"error_types": ["validation_failed", "external_error"]
}),
Tier(name="L1 Agent", handler=self.l1_agent, escalation_criteria={
"confidence_below": 0.5,
"time_exceeded": timedelta(minutes=30),
"customer_requested": True
}),
Tier(name="L2 Human", handler=self.l2_queue, escalation_criteria={
"complexity_score_above": 0.8,
"financial_impact_above": 10000
}),
Tier(name="Specialist", handler=self.specialist_queue, escalation_criteria={
# Final tier - no automatic escalation
})
]
async def handle(self, request: dict):
context = {"escalation_history": []}
for tier in self.tiers:
result = await self.try_tier(tier, request, context)
if result["resolved"]:
return result
if result["escalate"]:
context["escalation_history"].append({
"tier": tier.name,
"reason": result["escalation_reason"],
"partial_work": result.get("partial_work")
})
continue
# Tier couldn't resolve and didn't escalate
return result
# Reached final tier
return await self.final_tier_handling(request, context)
async def try_tier(self, tier: Tier, request: dict, context: dict):
try:
result = await tier.handler.handle(request, context)
# Check escalation criteria
if self.should_escalate(result, tier.escalation_criteria):
return {
"resolved": False,
"escalate": True,
"escalation_reason": self.get_escalation_reason(result, tier),
"partial_work": result.get("partial_work")
}
return {"resolved": True, "result": result}
except Exception as e:
return {
"resolved": False,
"escalate": True,
"escalation_reason": f"Error in {tier.name}: {str(e)}"
}
Escalation Design Principles
Clear Triggers:
ESCALATION_TRIGGERS = {
"confidence_threshold": 0.7, # Below this, escalate
"error_count": 3, # After this many errors
"time_limit": timedelta(minutes=30), # Exceeded time
"customer_sentiment": "negative", # Detected frustration
"keywords": ["manager", "supervisor", "complaint", "legal"]
}
Context Preservation:
class EscalationContext:
def __init__(self):
self.original_request = None
self.tier_attempts = []
self.partial_results = {}
self.customer_messages = []
def add_tier_attempt(self, tier: str, result: dict):
self.tier_attempts.append({
"tier": tier,
"timestamp": datetime.now(),
"result_summary": self.summarize(result),
"work_completed": result.get("partial_work"),
"reason_for_escalation": result.get("escalation_reason")
})
def handoff_summary(self) -> str:
"""Generate summary for next tier"""
return f"""
Original Request: {self.original_request}
Previous Attempts:
{self._format_attempts()}
Work Completed:
{self._format_partial_results()}
Customer Sentiment: {self._assess_sentiment()}
"""
Pattern 4: Parallel Processing with Merge
When to Use
- Independent subtasks
- Time-sensitive workflows
- Resource-intensive operations
- Diverse expertise requirements
Implementation
class ParallelWorkflow:
def __init__(self):
self.workers = AgentPool()
async def execute(self, task: dict):
# Decompose into parallel tracks
tracks = await self.decompose(task)
# Execute tracks in parallel
track_results = await asyncio.gather(*[
self.execute_track(track)
for track in tracks
], return_exceptions=True)
# Handle failures
successful = []
failed = []
for track, result in zip(tracks, track_results):
if isinstance(result, Exception):
failed.append((track, result))
else:
successful.append((track, result))
# Merge successful results
if successful:
merged = await self.merge_results(successful)
# Handle partial failure
if failed:
merged["partial_failure"] = self.format_failures(failed)
return merged
else:
raise AllTracksFailedError(failed)
async def merge_results(self, track_results: list):
"""Merge results from parallel tracks"""
# Use LLM to intelligently merge
response = await self.llm.chat(
messages=[{
"role": "system",
"content": """Merge these parallel workflow results into
a coherent final result. Resolve any conflicts by preferring
higher-confidence results."""
}, {
"role": "user",
"content": json.dumps([
{"track": t["name"], "result": r}
for t, r in track_results
])
}]
)
return json.loads(response.content)
Fork-Join Pattern
┌─→ Track A ─→┐
Fork ───────┼─→ Track B ─→┼─────→ Join
└─→ Track C ─→┘
class ForkJoinWorkflow:
async def execute(self, task: dict):
# Fork
subtasks = await self.fork(task)
# Execute (with optional dependencies)
results = {}
pending = set(subtasks.keys())
while pending:
ready = [
task_id for task_id in pending
if self.dependencies_met(task_id, results)
]
if not ready:
raise DeadlockError("Circular dependencies")
batch_results = await asyncio.gather(*[
self.execute_subtask(subtasks[task_id])
for task_id in ready
])
for task_id, result in zip(ready, batch_results):
results[task_id] = result
pending.remove(task_id)
# Join
return await self.join(results)
Pattern 5: State Machine Workflows
When to Use
- Complex multi-step processes
- Workflows with multiple possible paths
- Processes requiring pause/resume
- Auditable, traceable execution
Implementation
from enum import Enum
from transitions import Machine
class OrderState(Enum):
CREATED = "created"
VALIDATED = "validated"
PAYMENT_PENDING = "payment_pending"
PAYMENT_FAILED = "payment_failed"
PAID = "paid"
PROCESSING = "processing"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
REFUNDED = "refunded"
class OrderWorkflow:
states = [s.value for s in OrderState]
transitions = [
{"trigger": "validate", "source": "created", "dest": "validated",
"conditions": ["is_valid_order"]},
{"trigger": "request_payment", "source": "validated", "dest": "payment_pending"},
{"trigger": "payment_success", "source": "payment_pending", "dest": "paid"},
{"trigger": "payment_failure", "source": "payment_pending", "dest": "payment_failed"},
{"trigger": "retry_payment", "source": "payment_failed", "dest": "payment_pending",
"conditions": ["retry_allowed"]},
{"trigger": "start_processing", "source": "paid", "dest": "processing"},
{"trigger": "ship", "source": "processing", "dest": "shipped"},
{"trigger": "deliver", "source": "shipped", "dest": "delivered"},
{"trigger": "cancel", "source": ["created", "validated", "payment_pending"],
"dest": "cancelled"},
{"trigger": "refund", "source": ["paid", "processing"], "dest": "refunded"},
]
def __init__(self, order_id: str):
self.order_id = order_id
self.machine = Machine(
model=self,
states=self.states,
transitions=self.transitions,
initial="created"
)
self.history = []
def on_enter_state(self):
"""Called on every state transition"""
self.history.append({
"state": self.state,
"timestamp": datetime.now(),
"agent": current_agent_id()
})
def is_valid_order(self) -> bool:
"""Condition for validate transition"""
return self.agent.validate_order(self.order_id)
def retry_allowed(self) -> bool:
"""Condition for retry_payment transition"""
failures = len([h for h in self.history
if h["state"] == "payment_failed"])
return failures < 3
Agent Integration with State Machines
class StateMachineAgent:
async def advance_workflow(self, workflow: OrderWorkflow):
"""Attempt to advance the workflow based on current state"""
state_handlers = {
"created": self.handle_created,
"validated": self.handle_validated,
"payment_pending": self.handle_payment_pending,
"payment_failed": self.handle_payment_failed,
"paid": self.handle_paid,
"processing": self.handle_processing,
"shipped": self.handle_shipped
}
handler = state_handlers.get(workflow.state)
if handler:
await handler(workflow)
async def handle_created(self, workflow: OrderWorkflow):
# Validate the order
is_valid = await self.validate_order(workflow.order_id)
if is_valid:
workflow.validate()
else:
workflow.cancel()
async def handle_validated(self, workflow: OrderWorkflow):
# Request payment
workflow.request_payment()
await self.initiate_payment(workflow.order_id)
Pattern 6: Event-Driven Workflows
When to Use
- Reactive processes (respond to external events)
- Long-running workflows
- Integration with external systems
- Loosely coupled processes
Implementation
class EventDrivenWorkflow:
def __init__(self):
self.handlers = {}
self.event_bus = EventBus()
def on_event(self, event_type: str):
"""Decorator to register event handlers"""
def decorator(func):
self.handlers[event_type] = func
return func
return decorator
async def start(self):
await self.event_bus.subscribe("*", self.handle_event)
async def handle_event(self, event: dict):
handler = self.handlers.get(event["type"])
if handler:
await handler(event)
# Usage
workflow = EventDrivenWorkflow()
@workflow.on_event("order.created")
async def handle_order_created(event: dict):
order = event["data"]
# Validate inventory
inventory_ok = await check_inventory(order["items"])
if inventory_ok:
await event_bus.publish({
"type": "order.validated",
"data": order
})
else:
await event_bus.publish({
"type": "order.validation_failed",
"data": {"order": order, "reason": "insufficient_inventory"}
})
@workflow.on_event("order.validated")
async def handle_order_validated(event: dict):
order = event["data"]
# Process payment
payment_result = await process_payment(order)
await event_bus.publish({
"type": f"payment.{payment_result['status']}",
"data": {"order": order, "payment": payment_result}
})
Designing for Reliability
Idempotency
Ensure operations can be safely retried:
class IdempotentWorkflowStep:
async def execute(self, step_id: str, params: dict):
# Check if already executed
existing = await self.get_execution(step_id)
if existing:
return existing["result"]
# Execute
result = await self.do_execute(params)
# Record execution
await self.record_execution(step_id, params, result)
return result
Checkpointing
Save progress for long-running workflows:
class CheckpointedWorkflow:
async def execute(self, workflow_id: str, steps: list):
checkpoint = await self.load_checkpoint(workflow_id)
start_index = checkpoint["last_completed"] + 1 if checkpoint else 0
for i, step in enumerate(steps[start_index:], start_index):
result = await self.execute_step(step)
# Save checkpoint
await self.save_checkpoint(workflow_id, {
"last_completed": i,
"results": {**checkpoint.get("results", {}), step["id"]: result}
})
return await self.load_checkpoint(workflow_id)
Timeout and Retry
class ResilientWorkflowStep:
async def execute(self, params: dict):
for attempt in range(self.max_retries):
try:
return await asyncio.wait_for(
self.do_execute(params),
timeout=self.timeout_seconds
)
except asyncio.TimeoutError:
if attempt == self.max_retries - 1:
raise
await asyncio.sleep(self.backoff(attempt))
except RetryableError:
if attempt == self.max_retries - 1:
raise
await asyncio.sleep(self.backoff(attempt))
def backoff(self, attempt: int) -> float:
return min(2 ** attempt + random.random(), 60)
Monitoring and Observability
Workflow Metrics
WORKFLOW_METRICS = {
# Completion metrics
"workflows_started": Counter,
"workflows_completed": Counter,
"workflows_failed": Counter,
# Duration metrics
"workflow_duration_seconds": Histogram,
"step_duration_seconds": Histogram,
# State metrics
"workflows_by_state": Gauge,
"pending_human_approvals": Gauge,
# Error metrics
"step_errors": Counter,
"escalations": Counter,
"retries": Counter
}
Tracing
class TracedWorkflow:
async def execute(self, task: dict):
with tracer.start_span("workflow.execute") as span:
span.set_attribute("workflow.type", self.workflow_type)
span.set_attribute("task.id", task["id"])
for step in self.steps:
with tracer.start_span(f"step.{step.name}") as step_span:
step_span.set_attribute("step.index", step.index)
result = await step.execute(task)
step_span.set_attribute("step.result", result["status"])
span.set_attribute("workflow.result", "success")
Implementation Checklist
Design Phase - [ ] Identify all decision points - [ ] Define escalation criteria - [ ] Specify human touchpoints - [ ] Document state transitions - [ ] Plan failure recovery
Implementation Phase - [ ] Implement state machine or workflow engine - [ ] Add idempotency to all steps - [ ] Build checkpointing - [ ] Create approval interfaces - [ ] Add monitoring and tracing
Operations Phase - [ ] Set up alerting for stuck workflows - [ ] Create dashboards for workflow health - [ ] Document escalation procedures - [ ] Train human reviewers - [ ] Establish SLAs for human steps
Conclusion
Agentic workflows require explicit design for what humans take for granted. The key patterns are:
- HITL: Human approval at critical points
- Autonomous with Guardrails: Let agents run within boundaries
- Escalation Chains: Progressive complexity handling
- Parallel Processing: Independent work with merge
- State Machines: Explicit states and transitions
- Event-Driven: Reactive, loosely coupled processes
Start with more human oversight than you think you need, then reduce it as you build confidence. It's easier to remove guardrails than to recover from autonomous mistakes.
The goal is reliable execution: workflows that complete successfully, handle errors gracefully, and maintain appropriate human oversight for your risk tolerance.
Design for the failure cases. The happy path is easy; the edge cases determine whether your workflow is production-ready.