Skip to main content
December 2025 Update: Production patterns for multi-agent orchestration including ReAct, hierarchical decomposition, and event-driven architectures.

Why Multi-Agent Systems?

Single agents have limitations:
Single Agent                    Multi-Agent System
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
One context window              Distributed context
Jack of all trades              Specialized experts
Sequential processing           Parallel execution
One perspective                 Multiple viewpoints
Limited tool access             Tool specialization

Pattern 1: ReAct (Reason + Act Loop)

The foundational pattern for autonomous agents:
┌─────────────────────────────────────────────────────┐
│                    ReAct Loop                       │
├─────────────────────────────────────────────────────┤
│                                                     │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐         │
│  │ Thought │───▶│ Action  │───▶│ Observe │──┐      │
│  └─────────┘    └─────────┘    └─────────┘  │      │
│       ▲                                     │      │
│       └─────────────────────────────────────┘      │
│                                                     │
│  Repeat until task complete or max iterations      │
└─────────────────────────────────────────────────────┘

Implementation

from openai import OpenAI
from typing import Callable, Any
import json

client = OpenAI()

class ReActAgent:
    """Agent using Reason + Act pattern"""
    
    def __init__(
        self,
        tools: dict[str, Callable],
        model: str = "gpt-4o",
        max_iterations: int = 10
    ):
        self.tools = tools
        self.model = model
        self.max_iterations = max_iterations
    
    def _build_system_prompt(self) -> str:
        tool_descriptions = "\n".join([
            f"- {name}: {func.__doc__}"
            for name, func in self.tools.items()
        ])
        
        return f"""You are a ReAct agent. For each step:
1. THOUGHT: Reason about what to do next
2. ACTION: Choose a tool and inputs
3. OBSERVATION: I'll provide the tool result

Available tools:
{tool_descriptions}

Format your response as:
THOUGHT: <your reasoning>
ACTION: <tool_name>
INPUT: <json input for tool>

When you have the final answer, respond:
THOUGHT: I have the answer
FINAL ANSWER: <your answer>"""
    
    def run(self, query: str) -> dict:
        """Run the ReAct loop"""
        messages = [
            {"role": "system", "content": self._build_system_prompt()},
            {"role": "user", "content": query}
        ]
        
        trajectory = []
        
        for i in range(self.max_iterations):
            response = client.chat.completions.create(
                model=self.model,
                messages=messages
            )
            
            content = response.choices[0].message.content
            messages.append({"role": "assistant", "content": content})
            
            # Parse the response
            step = self._parse_step(content)
            trajectory.append(step)
            
            # Check for final answer
            if step.get("final_answer"):
                return {
                    "answer": step["final_answer"],
                    "trajectory": trajectory,
                    "iterations": i + 1
                }
            
            # Execute action
            if step.get("action"):
                observation = self._execute_action(
                    step["action"],
                    step.get("input", {})
                )
                
                step["observation"] = observation
                messages.append({
                    "role": "user",
                    "content": f"OBSERVATION: {observation}"
                })
        
        return {
            "answer": "Max iterations reached",
            "trajectory": trajectory,
            "iterations": self.max_iterations
        }
    
    def _parse_step(self, content: str) -> dict:
        """Parse agent response into structured step"""
        step = {"raw": content}
        
        lines = content.strip().split("\n")
        for line in lines:
            if line.startswith("THOUGHT:"):
                step["thought"] = line[8:].strip()
            elif line.startswith("ACTION:"):
                step["action"] = line[7:].strip()
            elif line.startswith("INPUT:"):
                try:
                    step["input"] = json.loads(line[6:].strip())
                except:
                    step["input"] = line[6:].strip()
            elif line.startswith("FINAL ANSWER:"):
                step["final_answer"] = line[13:].strip()
        
        return step
    
    def _execute_action(self, action: str, input_data: Any) -> str:
        """Execute a tool action"""
        if action not in self.tools:
            return f"Error: Unknown tool '{action}'"
        
        try:
            if isinstance(input_data, dict):
                result = self.tools[action](**input_data)
            else:
                result = self.tools[action](input_data)
            return str(result)
        except Exception as e:
            return f"Error executing {action}: {str(e)}"

# Example tools
def search_web(query: str) -> str:
    """Search the web for information"""
    # Simulated search
    return f"Search results for '{query}': Found 5 relevant articles..."

def calculate(expression: str) -> str:
    """Evaluate a mathematical expression"""
    return str(eval(expression))

def get_weather(city: str) -> str:
    """Get current weather for a city"""
    return f"Weather in {city}: 72°F, Sunny"

# Usage
agent = ReActAgent(
    tools={
        "search_web": search_web,
        "calculate": calculate,
        "get_weather": get_weather
    }
)

result = agent.run("What's 15% of $250, and is it good weather for shopping in NYC?")
print(result["answer"])

Pattern 2: Hierarchical Task Decomposition

Break complex tasks into subtasks with specialized agents:
┌─────────────────────────────────────────────────────────────┐
│                    Orchestrator Agent                        │
│         (Plans, delegates, synthesizes results)              │
└───────────────┬─────────────────┬─────────────────┬─────────┘
                │                 │                 │
        ┌───────▼───────┐ ┌───────▼───────┐ ┌───────▼───────┐
        │   Research    │ │   Analysis    │ │   Writing     │
        │    Agent      │ │    Agent      │ │    Agent      │
        └───────────────┘ └───────────────┘ └───────────────┘

Implementation

from dataclasses import dataclass
from typing import Optional
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Task:
    id: str
    description: str
    agent_type: str
    status: TaskStatus = TaskStatus.PENDING
    result: Optional[str] = None
    dependencies: list[str] = None
    
    def __post_init__(self):
        if self.dependencies is None:
            self.dependencies = []

class SpecializedAgent:
    """Base class for specialized agents"""
    
    def __init__(self, name: str, expertise: str, model: str = "gpt-4o"):
        self.name = name
        self.expertise = expertise
        self.model = model
    
    def execute(self, task: Task, context: dict) -> str:
        """Execute a task"""
        response = client.chat.completions.create(
            model=self.model,
            messages=[
                {
                    "role": "system",
                    "content": f"You are a {self.expertise} specialist. {self._get_instructions()}"
                },
                {
                    "role": "user",
                    "content": f"Task: {task.description}\n\nContext: {json.dumps(context)}"
                }
            ]
        )
        return response.choices[0].message.content
    
    def _get_instructions(self) -> str:
        return "Complete the task thoroughly and accurately."

class ResearchAgent(SpecializedAgent):
    def __init__(self):
        super().__init__("Researcher", "research and information gathering")
    
    def _get_instructions(self) -> str:
        return "Find accurate, relevant information. Cite sources when possible."

class AnalysisAgent(SpecializedAgent):
    def __init__(self):
        super().__init__("Analyst", "data analysis and insights")
    
    def _get_instructions(self) -> str:
        return "Analyze data thoroughly. Provide clear insights and recommendations."

class WritingAgent(SpecializedAgent):
    def __init__(self):
        super().__init__("Writer", "content writing and communication")
    
    def _get_instructions(self) -> str:
        return "Write clear, engaging content. Match the tone to the audience."

class OrchestratorAgent:
    """Coordinates multiple specialized agents"""
    
    def __init__(self):
        self.agents = {
            "research": ResearchAgent(),
            "analysis": AnalysisAgent(),
            "writing": WritingAgent()
        }
        self.tasks: dict[str, Task] = {}
    
    def decompose_task(self, objective: str) -> list[Task]:
        """Break down objective into subtasks"""
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": """You are a task planning expert. Break down the objective into subtasks.
                    
Available agent types:
- research: For gathering information
- analysis: For analyzing data and finding insights
- writing: For creating written content

Return JSON array:
[
    {"id": "1", "description": "...", "agent_type": "research", "dependencies": []},
    {"id": "2", "description": "...", "agent_type": "analysis", "dependencies": ["1"]}
]"""
                },
                {"role": "user", "content": objective}
            ],
            response_format={"type": "json_object"}
        )
        
        result = json.loads(response.choices[0].message.content)
        tasks = []
        
        for t in result.get("tasks", result.get("subtasks", [])):
            task = Task(
                id=t["id"],
                description=t["description"],
                agent_type=t["agent_type"],
                dependencies=t.get("dependencies", [])
            )
            tasks.append(task)
            self.tasks[task.id] = task
        
        return tasks
    
    def execute_plan(self, tasks: list[Task]) -> dict:
        """Execute tasks respecting dependencies"""
        results = {}
        
        while any(t.status != TaskStatus.COMPLETED for t in tasks):
            for task in tasks:
                if task.status != TaskStatus.PENDING:
                    continue
                
                # Check dependencies
                deps_complete = all(
                    self.tasks[dep].status == TaskStatus.COMPLETED
                    for dep in task.dependencies
                )
                
                if not deps_complete:
                    continue
                
                # Execute task
                task.status = TaskStatus.IN_PROGRESS
                
                context = {
                    dep: self.tasks[dep].result
                    for dep in task.dependencies
                }
                
                agent = self.agents[task.agent_type]
                task.result = agent.execute(task, context)
                task.status = TaskStatus.COMPLETED
                results[task.id] = task.result
        
        return results
    
    def synthesize_results(self, objective: str, results: dict) -> str:
        """Combine all results into final output"""
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": "Synthesize the results from multiple agents into a coherent final response."
                },
                {
                    "role": "user",
                    "content": f"Objective: {objective}\n\nResults:\n{json.dumps(results, indent=2)}"
                }
            ]
        )
        return response.choices[0].message.content
    
    def run(self, objective: str) -> str:
        """Complete end-to-end execution"""
        # Decompose
        tasks = self.decompose_task(objective)
        print(f"Created {len(tasks)} subtasks")
        
        # Execute
        results = self.execute_plan(tasks)
        
        # Synthesize
        return self.synthesize_results(objective, results)

# Usage
orchestrator = OrchestratorAgent()
result = orchestrator.run(
    "Create a market analysis report for electric vehicles in 2024"
)
print(result)

Pattern 3: Event-Driven Agents

Agents that respond to events and can run for extended periods:
┌─────────────────────────────────────────────────────────────┐
│                     Event Bus                               │
└──────────┬────────────────┬────────────────┬───────────────┘
           │                │                │
    ┌──────▼──────┐  ┌──────▼──────┐  ┌──────▼──────┐
    │   Monitor   │  │   Handler   │  │   Notifier  │
    │    Agent    │  │    Agent    │  │    Agent    │
    │ (emit)      │  │ (consume)   │  │ (consume)   │
    └─────────────┘  └─────────────┘  └─────────────┘

Implementation

import asyncio
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, Any
from collections import defaultdict

@dataclass
class Event:
    type: str
    data: dict
    timestamp: datetime = None
    source: str = "system"
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now()

class EventBus:
    """Central event bus for agent communication"""
    
    def __init__(self):
        self.subscribers: dict[str, list[Callable]] = defaultdict(list)
        self.event_history: list[Event] = []
    
    def subscribe(self, event_type: str, handler: Callable):
        """Subscribe to an event type"""
        self.subscribers[event_type].append(handler)
    
    async def publish(self, event: Event):
        """Publish an event to all subscribers"""
        self.event_history.append(event)
        
        handlers = self.subscribers.get(event.type, [])
        handlers += self.subscribers.get("*", [])  # Wildcard subscribers
        
        tasks = [handler(event) for handler in handlers]
        await asyncio.gather(*tasks)

class EventDrivenAgent:
    """Base class for event-driven agents"""
    
    def __init__(self, name: str, event_bus: EventBus):
        self.name = name
        self.event_bus = event_bus
        self.running = False
    
    def subscribe(self, event_type: str, handler: Callable):
        """Subscribe to events"""
        self.event_bus.subscribe(event_type, handler)
    
    async def emit(self, event_type: str, data: dict):
        """Emit an event"""
        event = Event(type=event_type, data=data, source=self.name)
        await self.event_bus.publish(event)
    
    async def start(self):
        """Start the agent"""
        self.running = True
        await self.on_start()
    
    async def stop(self):
        """Stop the agent"""
        self.running = False
        await self.on_stop()
    
    async def on_start(self):
        """Override in subclass"""
        pass
    
    async def on_stop(self):
        """Override in subclass"""
        pass

class MonitorAgent(EventDrivenAgent):
    """Monitors for conditions and emits events"""
    
    def __init__(self, event_bus: EventBus, check_interval: float = 5.0):
        super().__init__("Monitor", event_bus)
        self.check_interval = check_interval
        self.conditions: list[dict] = []
    
    def add_condition(
        self,
        name: str,
        check: Callable[[], bool],
        event_type: str
    ):
        """Add a condition to monitor"""
        self.conditions.append({
            "name": name,
            "check": check,
            "event_type": event_type
        })
    
    async def on_start(self):
        """Start monitoring loop"""
        while self.running:
            for condition in self.conditions:
                try:
                    if condition["check"]():
                        await self.emit(
                            condition["event_type"],
                            {"condition": condition["name"]}
                        )
                except Exception as e:
                    await self.emit("error", {
                        "agent": self.name,
                        "error": str(e)
                    })
            
            await asyncio.sleep(self.check_interval)

class HandlerAgent(EventDrivenAgent):
    """Handles events with LLM-powered responses"""
    
    def __init__(self, event_bus: EventBus, event_types: list[str]):
        super().__init__("Handler", event_bus)
        
        for event_type in event_types:
            self.subscribe(event_type, self.handle_event)
    
    async def handle_event(self, event: Event):
        """Handle an incoming event"""
        # Use LLM to decide action
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": """You are an event handler agent. 
                    Analyze the event and decide what action to take.
                    Return JSON: {"action": "...", "reasoning": "..."}"""
                },
                {
                    "role": "user",
                    "content": f"Event: {event.type}\nData: {json.dumps(event.data)}"
                }
            ],
            response_format={"type": "json_object"}
        )
        
        decision = json.loads(response.choices[0].message.content)
        
        # Emit result event
        await self.emit("action_taken", {
            "original_event": event.type,
            "action": decision["action"],
            "reasoning": decision["reasoning"]
        })

class NotifierAgent(EventDrivenAgent):
    """Sends notifications based on events"""
    
    def __init__(self, event_bus: EventBus):
        super().__init__("Notifier", event_bus)
        self.subscribe("action_taken", self.notify)
    
    async def notify(self, event: Event):
        """Send notification"""
        print(f"🔔 Notification: {event.data['action']}")
        # In production: send email, Slack, etc.

# Usage
async def main():
    event_bus = EventBus()
    
    # Create agents
    monitor = MonitorAgent(event_bus, check_interval=1.0)
    handler = HandlerAgent(event_bus, ["alert", "warning"])
    notifier = NotifierAgent(event_bus)
    
    # Add monitoring conditions
    monitor.add_condition(
        name="High CPU",
        check=lambda: get_cpu_usage() > 80,
        event_type="alert"
    )
    
    # Start agents
    await asyncio.gather(
        monitor.start(),
        handler.start(),
        notifier.start()
    )

asyncio.run(main())

Pattern 4: Debate and Consensus

Multiple agents debate to reach better conclusions:
class DebateAgent:
    """Agent that participates in debates"""
    
    def __init__(self, name: str, perspective: str, model: str = "gpt-4o"):
        self.name = name
        self.perspective = perspective
        self.model = model
    
    def argue(self, topic: str, previous_arguments: list[dict]) -> str:
        """Make an argument considering previous points"""
        history = "\n".join([
            f"{a['agent']}: {a['argument']}"
            for a in previous_arguments
        ])
        
        response = client.chat.completions.create(
            model=self.model,
            messages=[
                {
                    "role": "system",
                    "content": f"""You are {self.name}, arguing from the perspective: {self.perspective}
                    
Make a clear, logical argument. Address previous points if relevant.
Be constructive and aim for the best solution."""
                },
                {
                    "role": "user",
                    "content": f"Topic: {topic}\n\nPrevious arguments:\n{history or 'None yet'}"
                }
            ]
        )
        return response.choices[0].message.content

class JudgeAgent:
    """Synthesizes debate into consensus"""
    
    def __init__(self, model: str = "gpt-4o"):
        self.model = model
    
    def synthesize(self, topic: str, arguments: list[dict]) -> str:
        """Create consensus from all arguments"""
        all_arguments = "\n\n".join([
            f"**{a['agent']}** ({a['perspective']}):\n{a['argument']}"
            for a in arguments
        ])
        
        response = client.chat.completions.create(
            model=self.model,
            messages=[
                {
                    "role": "system",
                    "content": """You are a neutral judge synthesizing a debate.
                    
Identify the strongest points from each perspective.
Create a balanced conclusion that incorporates the best ideas.
Note any unresolved disagreements."""
                },
                {
                    "role": "user",
                    "content": f"Topic: {topic}\n\nArguments:\n{all_arguments}"
                }
            ]
        )
        return response.choices[0].message.content

class DebateOrchestrator:
    """Orchestrates a multi-agent debate"""
    
    def __init__(self, rounds: int = 3):
        self.rounds = rounds
        self.agents: list[DebateAgent] = []
        self.judge = JudgeAgent()
    
    def add_agent(self, name: str, perspective: str):
        self.agents.append(DebateAgent(name, perspective))
    
    def run_debate(self, topic: str) -> dict:
        """Run the full debate"""
        all_arguments = []
        
        for round_num in range(self.rounds):
            print(f"\n=== Round {round_num + 1} ===")
            
            for agent in self.agents:
                argument = agent.argue(topic, all_arguments)
                
                all_arguments.append({
                    "agent": agent.name,
                    "perspective": agent.perspective,
                    "argument": argument,
                    "round": round_num + 1
                })
                
                print(f"\n{agent.name}: {argument[:200]}...")
        
        # Judge synthesizes
        conclusion = self.judge.synthesize(topic, all_arguments)
        
        return {
            "topic": topic,
            "rounds": self.rounds,
            "arguments": all_arguments,
            "conclusion": conclusion
        }

# Usage
debate = DebateOrchestrator(rounds=2)
debate.add_agent("Optimist", "Focus on opportunities and potential benefits")
debate.add_agent("Skeptic", "Identify risks and potential problems")
debate.add_agent("Pragmatist", "Focus on practical implementation")

result = debate.run_debate("Should we adopt AI agents for customer support?")
print(f"\nConclusion:\n{result['conclusion']}")

Pattern 5: Supervisor Pattern

A supervisor agent manages and monitors worker agents:
class WorkerAgent:
    """Agent that performs specific tasks under supervision"""
    
    def __init__(self, name: str, specialty: str):
        self.name = name
        self.specialty = specialty
        self.status = "idle"
        self.current_task = None
    
    async def work(self, task: str) -> dict:
        """Perform a task"""
        self.status = "working"
        self.current_task = task
        
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": f"You are a {self.specialty} specialist. Complete the task accurately."
                },
                {"role": "user", "content": task}
            ]
        )
        
        result = response.choices[0].message.content
        
        self.status = "idle"
        self.current_task = None
        
        return {"agent": self.name, "result": result}

class SupervisorAgent:
    """Supervises and coordinates worker agents"""
    
    def __init__(self):
        self.workers: dict[str, WorkerAgent] = {}
        self.task_queue: list[dict] = []
        self.completed_tasks: list[dict] = []
    
    def add_worker(self, worker: WorkerAgent):
        self.workers[worker.name] = worker
    
    def assign_task(self, task: str) -> str:
        """Use LLM to assign task to best worker"""
        worker_info = "\n".join([
            f"- {w.name}: {w.specialty} (status: {w.status})"
            for w in self.workers.values()
        ])
        
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": f"""You are a supervisor. Assign the task to the best available worker.
                    
Workers:
{worker_info}

Return just the worker name."""
                },
                {"role": "user", "content": f"Task: {task}"}
            ]
        )
        
        return response.choices[0].message.content.strip()
    
    async def supervise(self, objective: str) -> dict:
        """Supervise the completion of an objective"""
        # Break down objective
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": "Break this objective into specific tasks. Return JSON: {\"tasks\": [\"task1\", \"task2\"]}"
                },
                {"role": "user", "content": objective}
            ],
            response_format={"type": "json_object"}
        )
        
        tasks = json.loads(response.choices[0].message.content)["tasks"]
        
        # Assign and execute tasks
        results = []
        for task in tasks:
            worker_name = self.assign_task(task)
            worker = self.workers.get(worker_name)
            
            if worker and worker.status == "idle":
                result = await worker.work(task)
                results.append(result)
                self.completed_tasks.append({
                    "task": task,
                    "worker": worker_name,
                    "result": result
                })
        
        # Synthesize results
        synthesis = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": "Synthesize these results into a final response."
                },
                {
                    "role": "user",
                    "content": f"Objective: {objective}\n\nResults:\n{json.dumps(results, indent=2)}"
                }
            ]
        )
        
        return {
            "objective": objective,
            "tasks_completed": len(results),
            "synthesis": synthesis.choices[0].message.content
        }

# Usage
supervisor = SupervisorAgent()
supervisor.add_worker(WorkerAgent("DataBot", "data analysis"))
supervisor.add_worker(WorkerAgent("WriteBot", "content writing"))
supervisor.add_worker(WorkerAgent("CodeBot", "programming"))

result = await supervisor.supervise(
    "Analyze our sales data and create a summary report with code examples"
)

Key Takeaways

ReAct for Autonomy

Use Reason+Act loops for agents that need to work independently

Hierarchy for Complexity

Break complex tasks into specialized subtasks

Events for Scale

Event-driven patterns for long-running, distributed systems

Debate for Quality

Multiple perspectives improve decision quality

What’s Next

Multimodal AI

Learn to build AI systems that work with vision, audio, and real-time voice