Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

December 2025 Update: Production patterns for multi-agent orchestration including ReAct, hierarchical decomposition, and event-driven architectures.

Why Multi-Agent Systems?

Single agents have limitations:
Single Agent                    Multi-Agent System
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
One context window              Distributed context
Jack of all trades              Specialized experts
Sequential processing           Parallel execution
One perspective                 Multiple viewpoints
Limited tool access             Tool specialization

Pattern 1: ReAct (Reason + Act Loop)

The foundational pattern for autonomous agents:
┌─────────────────────────────────────────────────────┐
│                    ReAct Loop                       │
├─────────────────────────────────────────────────────┤
│                                                     │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐         │
│  │ Thought │───▶│ Action  │───▶│ Observe │──┐      │
│  └─────────┘    └─────────┘    └─────────┘  │      │
│       ▲                                     │      │
│       └─────────────────────────────────────┘      │
│                                                     │
│  Repeat until task complete or max iterations      │
└─────────────────────────────────────────────────────┘

Implementation

from openai import OpenAI
from typing import Callable, Any
import json

client = OpenAI()

class ReActAgent:
    """Agent using Reason + Act pattern"""
    
    def __init__(
        self,
        tools: dict[str, Callable],
        model: str = "gpt-4o",
        max_iterations: int = 10
    ):
        self.tools = tools
        self.model = model
        self.max_iterations = max_iterations
    
    def _build_system_prompt(self) -> str:
        tool_descriptions = "\n".join([
            f"- {name}: {func.__doc__}"
            for name, func in self.tools.items()
        ])
        
        return f"""You are a ReAct agent. For each step:
1. THOUGHT: Reason about what to do next
2. ACTION: Choose a tool and inputs
3. OBSERVATION: I'll provide the tool result

Available tools:
{tool_descriptions}

Format your response as:
THOUGHT: <your reasoning>
ACTION: <tool_name>
INPUT: <json input for tool>

When you have the final answer, respond:
THOUGHT: I have the answer
FINAL ANSWER: <your answer>"""
    
    def run(self, query: str) -> dict:
        """Run the ReAct loop"""
        messages = [
            {"role": "system", "content": self._build_system_prompt()},
            {"role": "user", "content": query}
        ]
        
        trajectory = []
        
        for i in range(self.max_iterations):
            response = client.chat.completions.create(
                model=self.model,
                messages=messages
            )
            
            content = response.choices[0].message.content
            messages.append({"role": "assistant", "content": content})
            
            # Parse the response
            step = self._parse_step(content)
            trajectory.append(step)
            
            # Check for final answer
            if step.get("final_answer"):
                return {
                    "answer": step["final_answer"],
                    "trajectory": trajectory,
                    "iterations": i + 1
                }
            
            # Execute action
            if step.get("action"):
                observation = self._execute_action(
                    step["action"],
                    step.get("input", {})
                )
                
                step["observation"] = observation
                messages.append({
                    "role": "user",
                    "content": f"OBSERVATION: {observation}"
                })
        
        return {
            "answer": "Max iterations reached",
            "trajectory": trajectory,
            "iterations": self.max_iterations
        }
    
    def _parse_step(self, content: str) -> dict:
        """Parse agent response into structured step"""
        step = {"raw": content}
        
        lines = content.strip().split("\n")
        for line in lines:
            if line.startswith("THOUGHT:"):
                step["thought"] = line[8:].strip()
            elif line.startswith("ACTION:"):
                step["action"] = line[7:].strip()
            elif line.startswith("INPUT:"):
                try:
                    step["input"] = json.loads(line[6:].strip())
                except:
                    step["input"] = line[6:].strip()
            elif line.startswith("FINAL ANSWER:"):
                step["final_answer"] = line[13:].strip()
        
        return step
    
    def _execute_action(self, action: str, input_data: Any) -> str:
        """Execute a tool action"""
        if action not in self.tools:
            return f"Error: Unknown tool '{action}'"
        
        try:
            if isinstance(input_data, dict):
                result = self.tools[action](**input_data)
            else:
                result = self.tools[action](input_data)
            return str(result)
        except Exception as e:
            return f"Error executing {action}: {str(e)}"

# Example tools
def search_web(query: str) -> str:
    """Search the web for information"""
    # Simulated search
    return f"Search results for '{query}': Found 5 relevant articles..."

def calculate(expression: str) -> str:
    """Evaluate a mathematical expression"""
    return str(eval(expression))

def get_weather(city: str) -> str:
    """Get current weather for a city"""
    return f"Weather in {city}: 72°F, Sunny"

# Usage
agent = ReActAgent(
    tools={
        "search_web": search_web,
        "calculate": calculate,
        "get_weather": get_weather
    }
)

result = agent.run("What's 15% of $250, and is it good weather for shopping in NYC?")
print(result["answer"])

Pattern 2: Hierarchical Task Decomposition

Break complex tasks into subtasks with specialized agents:
┌─────────────────────────────────────────────────────────────┐
│                    Orchestrator Agent                        │
│         (Plans, delegates, synthesizes results)              │
└───────────────┬─────────────────┬─────────────────┬─────────┘
                │                 │                 │
        ┌───────▼───────┐ ┌───────▼───────┐ ┌───────▼───────┐
        │   Research    │ │   Analysis    │ │   Writing     │
        │    Agent      │ │    Agent      │ │    Agent      │
        └───────────────┘ └───────────────┘ └───────────────┘

Implementation

from dataclasses import dataclass
from typing import Optional
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Task:
    id: str
    description: str
    agent_type: str
    status: TaskStatus = TaskStatus.PENDING
    result: Optional[str] = None
    dependencies: list[str] = None
    
    def __post_init__(self):
        if self.dependencies is None:
            self.dependencies = []

class SpecializedAgent:
    """Base class for specialized agents"""
    
    def __init__(self, name: str, expertise: str, model: str = "gpt-4o"):
        self.name = name
        self.expertise = expertise
        self.model = model
    
    def execute(self, task: Task, context: dict) -> str:
        """Execute a task"""
        response = client.chat.completions.create(
            model=self.model,
            messages=[
                {
                    "role": "system",
                    "content": f"You are a {self.expertise} specialist. {self._get_instructions()}"
                },
                {
                    "role": "user",
                    "content": f"Task: {task.description}\n\nContext: {json.dumps(context)}"
                }
            ]
        )
        return response.choices[0].message.content
    
    def _get_instructions(self) -> str:
        return "Complete the task thoroughly and accurately."

class ResearchAgent(SpecializedAgent):
    def __init__(self):
        super().__init__("Researcher", "research and information gathering")
    
    def _get_instructions(self) -> str:
        return "Find accurate, relevant information. Cite sources when possible."

class AnalysisAgent(SpecializedAgent):
    def __init__(self):
        super().__init__("Analyst", "data analysis and insights")
    
    def _get_instructions(self) -> str:
        return "Analyze data thoroughly. Provide clear insights and recommendations."

class WritingAgent(SpecializedAgent):
    def __init__(self):
        super().__init__("Writer", "content writing and communication")
    
    def _get_instructions(self) -> str:
        return "Write clear, engaging content. Match the tone to the audience."

class OrchestratorAgent:
    """Coordinates multiple specialized agents"""
    
    def __init__(self):
        self.agents = {
            "research": ResearchAgent(),
            "analysis": AnalysisAgent(),
            "writing": WritingAgent()
        }
        self.tasks: dict[str, Task] = {}
    
    def decompose_task(self, objective: str) -> list[Task]:
        """Break down objective into subtasks"""
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": """You are a task planning expert. Break down the objective into subtasks.
                    
Available agent types:
- research: For gathering information
- analysis: For analyzing data and finding insights
- writing: For creating written content

Return JSON array:
[
    {"id": "1", "description": "...", "agent_type": "research", "dependencies": []},
    {"id": "2", "description": "...", "agent_type": "analysis", "dependencies": ["1"]}
]"""
                },
                {"role": "user", "content": objective}
            ],
            response_format={"type": "json_object"}
        )
        
        result = json.loads(response.choices[0].message.content)
        tasks = []
        
        for t in result.get("tasks", result.get("subtasks", [])):
            task = Task(
                id=t["id"],
                description=t["description"],
                agent_type=t["agent_type"],
                dependencies=t.get("dependencies", [])
            )
            tasks.append(task)
            self.tasks[task.id] = task
        
        return tasks
    
    def execute_plan(self, tasks: list[Task]) -> dict:
        """Execute tasks respecting dependencies"""
        results = {}
        
        while any(t.status != TaskStatus.COMPLETED for t in tasks):
            for task in tasks:
                if task.status != TaskStatus.PENDING:
                    continue
                
                # Check dependencies
                deps_complete = all(
                    self.tasks[dep].status == TaskStatus.COMPLETED
                    for dep in task.dependencies
                )
                
                if not deps_complete:
                    continue
                
                # Execute task
                task.status = TaskStatus.IN_PROGRESS
                
                context = {
                    dep: self.tasks[dep].result
                    for dep in task.dependencies
                }
                
                agent = self.agents[task.agent_type]
                task.result = agent.execute(task, context)
                task.status = TaskStatus.COMPLETED
                results[task.id] = task.result
        
        return results
    
    def synthesize_results(self, objective: str, results: dict) -> str:
        """Combine all results into final output"""
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": "Synthesize the results from multiple agents into a coherent final response."
                },
                {
                    "role": "user",
                    "content": f"Objective: {objective}\n\nResults:\n{json.dumps(results, indent=2)}"
                }
            ]
        )
        return response.choices[0].message.content
    
    def run(self, objective: str) -> str:
        """Complete end-to-end execution"""
        # Decompose
        tasks = self.decompose_task(objective)
        print(f"Created {len(tasks)} subtasks")
        
        # Execute
        results = self.execute_plan(tasks)
        
        # Synthesize
        return self.synthesize_results(objective, results)

# Usage
orchestrator = OrchestratorAgent()
result = orchestrator.run(
    "Create a market analysis report for electric vehicles in 2024"
)
print(result)

Pattern 3: Event-Driven Agents

Agents that respond to events and can run for extended periods:
┌─────────────────────────────────────────────────────────────┐
│                     Event Bus                               │
└──────────┬────────────────┬────────────────┬───────────────┘
           │                │                │
    ┌──────▼──────┐  ┌──────▼──────┐  ┌──────▼──────┐
    │   Monitor   │  │   Handler   │  │   Notifier  │
    │    Agent    │  │    Agent    │  │    Agent    │
    │ (emit)      │  │ (consume)   │  │ (consume)   │
    └─────────────┘  └─────────────┘  └─────────────┘

Implementation

import asyncio
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, Any
from collections import defaultdict

@dataclass
class Event:
    type: str
    data: dict
    timestamp: datetime = None
    source: str = "system"
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now()

class EventBus:
    """Central event bus for agent communication"""
    
    def __init__(self):
        self.subscribers: dict[str, list[Callable]] = defaultdict(list)
        self.event_history: list[Event] = []
    
    def subscribe(self, event_type: str, handler: Callable):
        """Subscribe to an event type"""
        self.subscribers[event_type].append(handler)
    
    async def publish(self, event: Event):
        """Publish an event to all subscribers"""
        self.event_history.append(event)
        
        handlers = self.subscribers.get(event.type, [])
        handlers += self.subscribers.get("*", [])  # Wildcard subscribers
        
        tasks = [handler(event) for handler in handlers]
        await asyncio.gather(*tasks)

class EventDrivenAgent:
    """Base class for event-driven agents"""
    
    def __init__(self, name: str, event_bus: EventBus):
        self.name = name
        self.event_bus = event_bus
        self.running = False
    
    def subscribe(self, event_type: str, handler: Callable):
        """Subscribe to events"""
        self.event_bus.subscribe(event_type, handler)
    
    async def emit(self, event_type: str, data: dict):
        """Emit an event"""
        event = Event(type=event_type, data=data, source=self.name)
        await self.event_bus.publish(event)
    
    async def start(self):
        """Start the agent"""
        self.running = True
        await self.on_start()
    
    async def stop(self):
        """Stop the agent"""
        self.running = False
        await self.on_stop()
    
    async def on_start(self):
        """Override in subclass"""
        pass
    
    async def on_stop(self):
        """Override in subclass"""
        pass

class MonitorAgent(EventDrivenAgent):
    """Monitors for conditions and emits events"""
    
    def __init__(self, event_bus: EventBus, check_interval: float = 5.0):
        super().__init__("Monitor", event_bus)
        self.check_interval = check_interval
        self.conditions: list[dict] = []
    
    def add_condition(
        self,
        name: str,
        check: Callable[[], bool],
        event_type: str
    ):
        """Add a condition to monitor"""
        self.conditions.append({
            "name": name,
            "check": check,
            "event_type": event_type
        })
    
    async def on_start(self):
        """Start monitoring loop"""
        while self.running:
            for condition in self.conditions:
                try:
                    if condition["check"]():
                        await self.emit(
                            condition["event_type"],
                            {"condition": condition["name"]}
                        )
                except Exception as e:
                    await self.emit("error", {
                        "agent": self.name,
                        "error": str(e)
                    })
            
            await asyncio.sleep(self.check_interval)

class HandlerAgent(EventDrivenAgent):
    """Handles events with LLM-powered responses"""
    
    def __init__(self, event_bus: EventBus, event_types: list[str]):
        super().__init__("Handler", event_bus)
        
        for event_type in event_types:
            self.subscribe(event_type, self.handle_event)
    
    async def handle_event(self, event: Event):
        """Handle an incoming event"""
        # Use LLM to decide action
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": """You are an event handler agent. 
                    Analyze the event and decide what action to take.
                    Return JSON: {"action": "...", "reasoning": "..."}"""
                },
                {
                    "role": "user",
                    "content": f"Event: {event.type}\nData: {json.dumps(event.data)}"
                }
            ],
            response_format={"type": "json_object"}
        )
        
        decision = json.loads(response.choices[0].message.content)
        
        # Emit result event
        await self.emit("action_taken", {
            "original_event": event.type,
            "action": decision["action"],
            "reasoning": decision["reasoning"]
        })

class NotifierAgent(EventDrivenAgent):
    """Sends notifications based on events"""
    
    def __init__(self, event_bus: EventBus):
        super().__init__("Notifier", event_bus)
        self.subscribe("action_taken", self.notify)
    
    async def notify(self, event: Event):
        """Send notification"""
        print(f"🔔 Notification: {event.data['action']}")
        # In production: send email, Slack, etc.

# Usage
async def main():
    event_bus = EventBus()
    
    # Create agents
    monitor = MonitorAgent(event_bus, check_interval=1.0)
    handler = HandlerAgent(event_bus, ["alert", "warning"])
    notifier = NotifierAgent(event_bus)
    
    # Add monitoring conditions
    monitor.add_condition(
        name="High CPU",
        check=lambda: get_cpu_usage() > 80,
        event_type="alert"
    )
    
    # Start agents
    await asyncio.gather(
        monitor.start(),
        handler.start(),
        notifier.start()
    )

asyncio.run(main())

Pattern 4: Debate and Consensus

Multiple agents debate to reach better conclusions:
class DebateAgent:
    """Agent that participates in debates"""
    
    def __init__(self, name: str, perspective: str, model: str = "gpt-4o"):
        self.name = name
        self.perspective = perspective
        self.model = model
    
    def argue(self, topic: str, previous_arguments: list[dict]) -> str:
        """Make an argument considering previous points"""
        history = "\n".join([
            f"{a['agent']}: {a['argument']}"
            for a in previous_arguments
        ])
        
        response = client.chat.completions.create(
            model=self.model,
            messages=[
                {
                    "role": "system",
                    "content": f"""You are {self.name}, arguing from the perspective: {self.perspective}
                    
Make a clear, logical argument. Address previous points if relevant.
Be constructive and aim for the best solution."""
                },
                {
                    "role": "user",
                    "content": f"Topic: {topic}\n\nPrevious arguments:\n{history or 'None yet'}"
                }
            ]
        )
        return response.choices[0].message.content

class JudgeAgent:
    """Synthesizes debate into consensus"""
    
    def __init__(self, model: str = "gpt-4o"):
        self.model = model
    
    def synthesize(self, topic: str, arguments: list[dict]) -> str:
        """Create consensus from all arguments"""
        all_arguments = "\n\n".join([
            f"**{a['agent']}** ({a['perspective']}):\n{a['argument']}"
            for a in arguments
        ])
        
        response = client.chat.completions.create(
            model=self.model,
            messages=[
                {
                    "role": "system",
                    "content": """You are a neutral judge synthesizing a debate.
                    
Identify the strongest points from each perspective.
Create a balanced conclusion that incorporates the best ideas.
Note any unresolved disagreements."""
                },
                {
                    "role": "user",
                    "content": f"Topic: {topic}\n\nArguments:\n{all_arguments}"
                }
            ]
        )
        return response.choices[0].message.content

class DebateOrchestrator:
    """Orchestrates a multi-agent debate"""
    
    def __init__(self, rounds: int = 3):
        self.rounds = rounds
        self.agents: list[DebateAgent] = []
        self.judge = JudgeAgent()
    
    def add_agent(self, name: str, perspective: str):
        self.agents.append(DebateAgent(name, perspective))
    
    def run_debate(self, topic: str) -> dict:
        """Run the full debate"""
        all_arguments = []
        
        for round_num in range(self.rounds):
            print(f"\n=== Round {round_num + 1} ===")
            
            for agent in self.agents:
                argument = agent.argue(topic, all_arguments)
                
                all_arguments.append({
                    "agent": agent.name,
                    "perspective": agent.perspective,
                    "argument": argument,
                    "round": round_num + 1
                })
                
                print(f"\n{agent.name}: {argument[:200]}...")
        
        # Judge synthesizes
        conclusion = self.judge.synthesize(topic, all_arguments)
        
        return {
            "topic": topic,
            "rounds": self.rounds,
            "arguments": all_arguments,
            "conclusion": conclusion
        }

# Usage
debate = DebateOrchestrator(rounds=2)
debate.add_agent("Optimist", "Focus on opportunities and potential benefits")
debate.add_agent("Skeptic", "Identify risks and potential problems")
debate.add_agent("Pragmatist", "Focus on practical implementation")

result = debate.run_debate("Should we adopt AI agents for customer support?")
print(f"\nConclusion:\n{result['conclusion']}")

Pattern 5: Supervisor Pattern

A supervisor agent manages and monitors worker agents:
class WorkerAgent:
    """Agent that performs specific tasks under supervision"""
    
    def __init__(self, name: str, specialty: str):
        self.name = name
        self.specialty = specialty
        self.status = "idle"
        self.current_task = None
    
    async def work(self, task: str) -> dict:
        """Perform a task"""
        self.status = "working"
        self.current_task = task
        
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": f"You are a {self.specialty} specialist. Complete the task accurately."
                },
                {"role": "user", "content": task}
            ]
        )
        
        result = response.choices[0].message.content
        
        self.status = "idle"
        self.current_task = None
        
        return {"agent": self.name, "result": result}

class SupervisorAgent:
    """Supervises and coordinates worker agents"""
    
    def __init__(self):
        self.workers: dict[str, WorkerAgent] = {}
        self.task_queue: list[dict] = []
        self.completed_tasks: list[dict] = []
    
    def add_worker(self, worker: WorkerAgent):
        self.workers[worker.name] = worker
    
    def assign_task(self, task: str) -> str:
        """Use LLM to assign task to best worker"""
        worker_info = "\n".join([
            f"- {w.name}: {w.specialty} (status: {w.status})"
            for w in self.workers.values()
        ])
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": f"""You are a supervisor. Assign the task to the best available worker.
                    
Workers:
{worker_info}

Return just the worker name."""
                },
                {"role": "user", "content": f"Task: {task}"}
            ]
        )
        
        return response.choices[0].message.content.strip()
    
    async def supervise(self, objective: str) -> dict:
        """Supervise the completion of an objective"""
        # Break down objective
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": "Break this objective into specific tasks. Return JSON: {\"tasks\": [\"task1\", \"task2\"]}"
                },
                {"role": "user", "content": objective}
            ],
            response_format={"type": "json_object"}
        )
        
        tasks = json.loads(response.choices[0].message.content)["tasks"]
        
        # Assign and execute tasks
        results = []
        for task in tasks:
            worker_name = self.assign_task(task)
            worker = self.workers.get(worker_name)
            
            if worker and worker.status == "idle":
                result = await worker.work(task)
                results.append(result)
                self.completed_tasks.append({
                    "task": task,
                    "worker": worker_name,
                    "result": result
                })
        
        # Synthesize results
        synthesis = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": "Synthesize these results into a final response."
                },
                {
                    "role": "user",
                    "content": f"Objective: {objective}\n\nResults:\n{json.dumps(results, indent=2)}"
                }
            ]
        )
        
        return {
            "objective": objective,
            "tasks_completed": len(results),
            "synthesis": synthesis.choices[0].message.content
        }

# Usage
supervisor = SupervisorAgent()
supervisor.add_worker(WorkerAgent("DataBot", "data analysis"))
supervisor.add_worker(WorkerAgent("WriteBot", "content writing"))
supervisor.add_worker(WorkerAgent("CodeBot", "programming"))

result = await supervisor.supervise(
    "Analyze our sales data and create a summary report with code examples"
)

Key Takeaways

ReAct for Autonomy

Use Reason+Act loops for agents that need to work independently

Hierarchy for Complexity

Break complex tasks into specialized subtasks

Events for Scale

Event-driven patterns for long-running, distributed systems

Debate for Quality

Multiple perspectives improve decision quality

What’s Next

Multimodal AI

Learn to build AI systems that work with vision, audio, and real-time voice

Interview Deep-Dive

Strong Answer:
  • ReAct (Reason + Act) is a single-agent loop: the agent thinks about what to do, takes an action (calls a tool), observes the result, and repeats until the task is done. It is ideal for tasks that are inherently sequential and exploratory — where you do not know upfront what steps are needed. A research agent that searches the web, reads results, decides what to search next based on what it found, and iterates until it has a complete answer is a perfect ReAct use case. The agent discovers the plan as it executes.
  • Hierarchical task decomposition is a multi-agent pattern where an orchestrator breaks a complex objective into subtasks, assigns each to a specialized agent, manages dependencies between subtasks, and synthesizes the results. It is ideal for tasks that can be planned upfront — where you know the general structure of the work even if you do not know the specifics. “Write a market analysis report” naturally decomposes into research, data analysis, and writing — three distinct phases with clear handoffs.
  • The practical distinction is about predictability. If you can describe the workflow as a DAG (directed acyclic graph) of subtasks before execution starts, use hierarchical decomposition. If the workflow is reactive and depends on intermediate results, use ReAct. Many real systems use both: the orchestrator decomposes the high-level task into subtasks, and each subtask is executed by a ReAct agent that has its own tool set.
  • The failure modes differ too. ReAct agents can get stuck in loops — repeatedly trying the same failing approach because the reasoning step is not sophisticated enough to learn from failure. I always set a maximum iteration count (typically 10-15) and build in explicit loop detection: if the last 3 actions were identical, force a different approach or escalate to a human. Hierarchical decomposition fails when the task decomposition is wrong — the orchestrator breaks the task into the wrong subtasks, or the dependencies are modeled incorrectly. The decomposition step itself is an LLM call, so it can hallucinate subtasks that do not make sense. I validate decompositions against a schema that enforces: each subtask must specify an agent type, each dependency must reference an existing subtask ID, and there must be no circular dependencies.
Follow-up: How do you handle the situation where a worker agent in a hierarchical system fails on its subtask? What is your error recovery strategy?There are three levels of error recovery. First, local retry: the worker agent retries its subtask with a modified approach (rephrased query, different tool, adjusted parameters). I give each worker 2-3 retry attempts before escalating. Second, substitution: the orchestrator reassigns the failed subtask to a different worker agent or a different model. If the research agent failed because the web search returned no results, maybe the analysis agent can derive the needed information from existing context. Third, replanning: the orchestrator re-decomposes the original objective, explicitly noting what failed and why. This produces a new plan that routes around the failure. The critical implementation detail is that each worker must return structured error information — not just “failed” but “failed because: search API returned 0 results for query ‘Q3 2024 EV sales data Europe.’” This context lets the orchestrator make intelligent recovery decisions rather than blindly retrying.
Strong Answer:
  • There are three fundamental communication patterns for multi-agent systems, and the choice depends on your coordination requirements. The first is direct message passing: Agent A sends a message directly to Agent B. This is simple but creates tight coupling — A must know about B. I use this for fixed pipelines where the agent topology is known at design time, like a three-stage pipeline of retrieval, analysis, and writing.
  • The second is a shared blackboard (or shared state): all agents read from and write to a common context object. The orchestrator updates the blackboard after each agent completes its task, and the next agent reads the relevant portions. This decouples agents from each other — they only need to know the blackboard schema, not the other agents. The downside is that the blackboard can grow unbounded. After 10 agent executions, the accumulated context might exceed the next agent’s context window. I manage this by structuring the blackboard as a typed dictionary with size limits per field: {"research_results": "...(max 2000 tokens)...", "analysis": "...(max 1500 tokens)...", "decisions": [...]}.
  • The third is event-driven communication via an event bus. Agents subscribe to event types and react when relevant events are published. This is the most flexible and scalable pattern — you can add new agents without modifying existing ones, agents can run concurrently, and the system naturally supports long-running workflows. The trade-off is complexity: debugging event-driven systems is harder because the execution flow is non-linear, and you need careful design of event types and payloads.
  • For context sharing specifically, the key challenge is that each agent has a limited context window and does not need all the information from all other agents. I build a context builder function per agent that selects and summarizes the relevant portions of shared state. The research agent’s output might be 5,000 tokens, but the writing agent only needs a 500-token summary of the key findings. This selective context injection keeps each agent’s prompt focused and within token limits.
Follow-up: In the event-driven pattern, how do you handle ordering guarantees and prevent race conditions when multiple agents process events concurrently?The honest answer is that you often do not need strict ordering guarantees in multi-agent systems, because the LLM’s reasoning step handles out-of-order information gracefully — it is not like a database transaction where ordering is critical for correctness. But when you do need ordering — for example, the analysis agent must not start until the research agent has finished — I use two mechanisms. First, explicit dependency barriers: the event bus tracks which events have been emitted and only delivers an event to a subscriber if all prerequisite events have been processed. This is essentially a lightweight workflow engine embedded in the event bus. Second, for true concurrency safety on shared state, I use optimistic concurrency control: each agent reads the blackboard with a version number, does its work, and writes back with a compare-and-swap. If the version has changed (another agent wrote in between), the write fails and the agent re-reads and retries. In practice, conflicts are rare because agents typically write to different fields of the blackboard, but the mechanism prevents corruption when they do overlap.
Strong Answer:
  • The first unique failure mode is cascading failures. In a single-agent system, if the agent fails, the task fails. In a multi-agent system, if the research agent produces bad output (hallucinated facts), the analysis agent builds analysis on those hallucinated facts, and the writing agent produces a confident, well-written report full of nonsense. Each agent did its individual job well, but the system produced garbage because errors amplified through the pipeline. The mitigation is inter-agent validation: the analysis agent should not blindly trust the research agent’s output. I add a “quality gate” step between agents where a lightweight model checks for internal consistency, unsupported claims, and obvious errors before passing results forward.
  • The second failure mode is coordination deadlock. Agent A is waiting for Agent B’s output, but Agent B is waiting for Agent A’s output due to a circular dependency. This is rare in well-designed systems but happens when the task decomposition step produces an invalid dependency graph. I enforce DAG validation on every decomposition and add a timeout on all inter-agent waits. If an agent has been waiting for more than 30 seconds, the orchestrator intervenes.
  • The third failure mode is context drift. In a long-running multi-agent workflow, the accumulated context gradually diverges from the original objective. Each agent adds its own interpretation, and by agent 5, the system is solving a slightly different problem than what was asked. I mitigate this by including the original objective in every agent’s prompt, not just the orchestrator’s. Every agent sees “Original objective: X. Your specific task: Y.” This anchors each agent to the user’s intent.
  • The fourth failure mode is cost explosion. A ReAct agent running inside a hierarchical system can enter a loop, consuming 50+ LLM calls before the iteration limit kicks in. Multiply that by 5 worker agents and a decomposition that produces 8 subtasks, and a single user request costs 5insteadof5 instead of 0.50. I enforce per-request cost budgets: each worker gets a maximum token budget, and the orchestrator tracks cumulative spending. If the budget is 80% consumed, the remaining agents are forced to use cheaper models or shorter responses.
  • The fifth and most subtle failure mode is inconsistency between agents. The research agent finds that Product X launched in 2023, but the writing agent (using its own knowledge) states it launched in 2024. Multi-agent systems can produce internally contradictory outputs because each agent has its own context and model instance. A final synthesis step that explicitly checks for contradictions across agent outputs catches most of these.
Follow-up: How do you debug a multi-agent system when the final output is wrong but you do not know which agent introduced the error?This is where observability design pays off. Every agent call must produce a trace that includes: the agent name, its input context (what it received from the blackboard or previous agents), its full prompt, the raw LLM response, and its output (what it wrote to the blackboard). I store these traces in a structured format that lets me reconstruct the full execution graph. When the final output is wrong, I work backward: I check the writing agent’s input — was the analysis correct? I check the analysis agent’s input — was the research accurate? At some point I find the agent where the input was correct but the output was wrong, and that is where the bug is. It is essentially a bisection search through the agent graph. The tooling I build for this is a “replay” capability: I can take any agent’s input trace and re-run it in isolation to see if the error is deterministic or stochastic. If it is deterministic, the bug is in the prompt or the tool. If it is stochastic, I need to add validation or retry logic at that stage.