December 2025 Update: Production patterns for multi-agent orchestration including ReAct, hierarchical decomposition, and event-driven architectures.
Why Multi-Agent Systems?
Single agents have limitations:Copy
Single Agent Multi-Agent System
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
One context window Distributed context
Jack of all trades Specialized experts
Sequential processing Parallel execution
One perspective Multiple viewpoints
Limited tool access Tool specialization
Pattern 1: ReAct (Reason + Act Loop)
The foundational pattern for autonomous agents:Copy
┌─────────────────────────────────────────────────────┐
│ ReAct Loop │
├─────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Thought │───▶│ Action │───▶│ Observe │──┐ │
│ └─────────┘ └─────────┘ └─────────┘ │ │
│ ▲ │ │
│ └─────────────────────────────────────┘ │
│ │
│ Repeat until task complete or max iterations │
└─────────────────────────────────────────────────────┘
Implementation
Copy
from openai import OpenAI
from typing import Callable, Any
import json
client = OpenAI()
class ReActAgent:
"""Agent using Reason + Act pattern"""
def __init__(
self,
tools: dict[str, Callable],
model: str = "gpt-4o",
max_iterations: int = 10
):
self.tools = tools
self.model = model
self.max_iterations = max_iterations
def _build_system_prompt(self) -> str:
tool_descriptions = "\n".join([
f"- {name}: {func.__doc__}"
for name, func in self.tools.items()
])
return f"""You are a ReAct agent. For each step:
1. THOUGHT: Reason about what to do next
2. ACTION: Choose a tool and inputs
3. OBSERVATION: I'll provide the tool result
Available tools:
{tool_descriptions}
Format your response as:
THOUGHT: <your reasoning>
ACTION: <tool_name>
INPUT: <json input for tool>
When you have the final answer, respond:
THOUGHT: I have the answer
FINAL ANSWER: <your answer>"""
def run(self, query: str) -> dict:
"""Run the ReAct loop"""
messages = [
{"role": "system", "content": self._build_system_prompt()},
{"role": "user", "content": query}
]
trajectory = []
for i in range(self.max_iterations):
response = client.chat.completions.create(
model=self.model,
messages=messages
)
content = response.choices[0].message.content
messages.append({"role": "assistant", "content": content})
# Parse the response
step = self._parse_step(content)
trajectory.append(step)
# Check for final answer
if step.get("final_answer"):
return {
"answer": step["final_answer"],
"trajectory": trajectory,
"iterations": i + 1
}
# Execute action
if step.get("action"):
observation = self._execute_action(
step["action"],
step.get("input", {})
)
step["observation"] = observation
messages.append({
"role": "user",
"content": f"OBSERVATION: {observation}"
})
return {
"answer": "Max iterations reached",
"trajectory": trajectory,
"iterations": self.max_iterations
}
def _parse_step(self, content: str) -> dict:
"""Parse agent response into structured step"""
step = {"raw": content}
lines = content.strip().split("\n")
for line in lines:
if line.startswith("THOUGHT:"):
step["thought"] = line[8:].strip()
elif line.startswith("ACTION:"):
step["action"] = line[7:].strip()
elif line.startswith("INPUT:"):
try:
step["input"] = json.loads(line[6:].strip())
except:
step["input"] = line[6:].strip()
elif line.startswith("FINAL ANSWER:"):
step["final_answer"] = line[13:].strip()
return step
def _execute_action(self, action: str, input_data: Any) -> str:
"""Execute a tool action"""
if action not in self.tools:
return f"Error: Unknown tool '{action}'"
try:
if isinstance(input_data, dict):
result = self.tools[action](**input_data)
else:
result = self.tools[action](input_data)
return str(result)
except Exception as e:
return f"Error executing {action}: {str(e)}"
# Example tools
def search_web(query: str) -> str:
"""Search the web for information"""
# Simulated search
return f"Search results for '{query}': Found 5 relevant articles..."
def calculate(expression: str) -> str:
"""Evaluate a mathematical expression"""
return str(eval(expression))
def get_weather(city: str) -> str:
"""Get current weather for a city"""
return f"Weather in {city}: 72°F, Sunny"
# Usage
agent = ReActAgent(
tools={
"search_web": search_web,
"calculate": calculate,
"get_weather": get_weather
}
)
result = agent.run("What's 15% of $250, and is it good weather for shopping in NYC?")
print(result["answer"])
Pattern 2: Hierarchical Task Decomposition
Break complex tasks into subtasks with specialized agents:Copy
┌─────────────────────────────────────────────────────────────┐
│ Orchestrator Agent │
│ (Plans, delegates, synthesizes results) │
└───────────────┬─────────────────┬─────────────────┬─────────┘
│ │ │
┌───────▼───────┐ ┌───────▼───────┐ ┌───────▼───────┐
│ Research │ │ Analysis │ │ Writing │
│ Agent │ │ Agent │ │ Agent │
└───────────────┘ └───────────────┘ └───────────────┘
Implementation
Copy
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Task:
id: str
description: str
agent_type: str
status: TaskStatus = TaskStatus.PENDING
result: Optional[str] = None
dependencies: list[str] = None
def __post_init__(self):
if self.dependencies is None:
self.dependencies = []
class SpecializedAgent:
"""Base class for specialized agents"""
def __init__(self, name: str, expertise: str, model: str = "gpt-4o"):
self.name = name
self.expertise = expertise
self.model = model
def execute(self, task: Task, context: dict) -> str:
"""Execute a task"""
response = client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": f"You are a {self.expertise} specialist. {self._get_instructions()}"
},
{
"role": "user",
"content": f"Task: {task.description}\n\nContext: {json.dumps(context)}"
}
]
)
return response.choices[0].message.content
def _get_instructions(self) -> str:
return "Complete the task thoroughly and accurately."
class ResearchAgent(SpecializedAgent):
def __init__(self):
super().__init__("Researcher", "research and information gathering")
def _get_instructions(self) -> str:
return "Find accurate, relevant information. Cite sources when possible."
class AnalysisAgent(SpecializedAgent):
def __init__(self):
super().__init__("Analyst", "data analysis and insights")
def _get_instructions(self) -> str:
return "Analyze data thoroughly. Provide clear insights and recommendations."
class WritingAgent(SpecializedAgent):
def __init__(self):
super().__init__("Writer", "content writing and communication")
def _get_instructions(self) -> str:
return "Write clear, engaging content. Match the tone to the audience."
class OrchestratorAgent:
"""Coordinates multiple specialized agents"""
def __init__(self):
self.agents = {
"research": ResearchAgent(),
"analysis": AnalysisAgent(),
"writing": WritingAgent()
}
self.tasks: dict[str, Task] = {}
def decompose_task(self, objective: str) -> list[Task]:
"""Break down objective into subtasks"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": """You are a task planning expert. Break down the objective into subtasks.
Available agent types:
- research: For gathering information
- analysis: For analyzing data and finding insights
- writing: For creating written content
Return JSON array:
[
{"id": "1", "description": "...", "agent_type": "research", "dependencies": []},
{"id": "2", "description": "...", "agent_type": "analysis", "dependencies": ["1"]}
]"""
},
{"role": "user", "content": objective}
],
response_format={"type": "json_object"}
)
result = json.loads(response.choices[0].message.content)
tasks = []
for t in result.get("tasks", result.get("subtasks", [])):
task = Task(
id=t["id"],
description=t["description"],
agent_type=t["agent_type"],
dependencies=t.get("dependencies", [])
)
tasks.append(task)
self.tasks[task.id] = task
return tasks
def execute_plan(self, tasks: list[Task]) -> dict:
"""Execute tasks respecting dependencies"""
results = {}
while any(t.status != TaskStatus.COMPLETED for t in tasks):
for task in tasks:
if task.status != TaskStatus.PENDING:
continue
# Check dependencies
deps_complete = all(
self.tasks[dep].status == TaskStatus.COMPLETED
for dep in task.dependencies
)
if not deps_complete:
continue
# Execute task
task.status = TaskStatus.IN_PROGRESS
context = {
dep: self.tasks[dep].result
for dep in task.dependencies
}
agent = self.agents[task.agent_type]
task.result = agent.execute(task, context)
task.status = TaskStatus.COMPLETED
results[task.id] = task.result
return results
def synthesize_results(self, objective: str, results: dict) -> str:
"""Combine all results into final output"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "Synthesize the results from multiple agents into a coherent final response."
},
{
"role": "user",
"content": f"Objective: {objective}\n\nResults:\n{json.dumps(results, indent=2)}"
}
]
)
return response.choices[0].message.content
def run(self, objective: str) -> str:
"""Complete end-to-end execution"""
# Decompose
tasks = self.decompose_task(objective)
print(f"Created {len(tasks)} subtasks")
# Execute
results = self.execute_plan(tasks)
# Synthesize
return self.synthesize_results(objective, results)
# Usage
orchestrator = OrchestratorAgent()
result = orchestrator.run(
"Create a market analysis report for electric vehicles in 2024"
)
print(result)
Pattern 3: Event-Driven Agents
Agents that respond to events and can run for extended periods:Copy
┌─────────────────────────────────────────────────────────────┐
│ Event Bus │
└──────────┬────────────────┬────────────────┬───────────────┘
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ Monitor │ │ Handler │ │ Notifier │
│ Agent │ │ Agent │ │ Agent │
│ (emit) │ │ (consume) │ │ (consume) │
└─────────────┘ └─────────────┘ └─────────────┘
Implementation
Copy
import asyncio
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, Any
from collections import defaultdict
@dataclass
class Event:
type: str
data: dict
timestamp: datetime = None
source: str = "system"
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
class EventBus:
"""Central event bus for agent communication"""
def __init__(self):
self.subscribers: dict[str, list[Callable]] = defaultdict(list)
self.event_history: list[Event] = []
def subscribe(self, event_type: str, handler: Callable):
"""Subscribe to an event type"""
self.subscribers[event_type].append(handler)
async def publish(self, event: Event):
"""Publish an event to all subscribers"""
self.event_history.append(event)
handlers = self.subscribers.get(event.type, [])
handlers += self.subscribers.get("*", []) # Wildcard subscribers
tasks = [handler(event) for handler in handlers]
await asyncio.gather(*tasks)
class EventDrivenAgent:
"""Base class for event-driven agents"""
def __init__(self, name: str, event_bus: EventBus):
self.name = name
self.event_bus = event_bus
self.running = False
def subscribe(self, event_type: str, handler: Callable):
"""Subscribe to events"""
self.event_bus.subscribe(event_type, handler)
async def emit(self, event_type: str, data: dict):
"""Emit an event"""
event = Event(type=event_type, data=data, source=self.name)
await self.event_bus.publish(event)
async def start(self):
"""Start the agent"""
self.running = True
await self.on_start()
async def stop(self):
"""Stop the agent"""
self.running = False
await self.on_stop()
async def on_start(self):
"""Override in subclass"""
pass
async def on_stop(self):
"""Override in subclass"""
pass
class MonitorAgent(EventDrivenAgent):
"""Monitors for conditions and emits events"""
def __init__(self, event_bus: EventBus, check_interval: float = 5.0):
super().__init__("Monitor", event_bus)
self.check_interval = check_interval
self.conditions: list[dict] = []
def add_condition(
self,
name: str,
check: Callable[[], bool],
event_type: str
):
"""Add a condition to monitor"""
self.conditions.append({
"name": name,
"check": check,
"event_type": event_type
})
async def on_start(self):
"""Start monitoring loop"""
while self.running:
for condition in self.conditions:
try:
if condition["check"]():
await self.emit(
condition["event_type"],
{"condition": condition["name"]}
)
except Exception as e:
await self.emit("error", {
"agent": self.name,
"error": str(e)
})
await asyncio.sleep(self.check_interval)
class HandlerAgent(EventDrivenAgent):
"""Handles events with LLM-powered responses"""
def __init__(self, event_bus: EventBus, event_types: list[str]):
super().__init__("Handler", event_bus)
for event_type in event_types:
self.subscribe(event_type, self.handle_event)
async def handle_event(self, event: Event):
"""Handle an incoming event"""
# Use LLM to decide action
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": """You are an event handler agent.
Analyze the event and decide what action to take.
Return JSON: {"action": "...", "reasoning": "..."}"""
},
{
"role": "user",
"content": f"Event: {event.type}\nData: {json.dumps(event.data)}"
}
],
response_format={"type": "json_object"}
)
decision = json.loads(response.choices[0].message.content)
# Emit result event
await self.emit("action_taken", {
"original_event": event.type,
"action": decision["action"],
"reasoning": decision["reasoning"]
})
class NotifierAgent(EventDrivenAgent):
"""Sends notifications based on events"""
def __init__(self, event_bus: EventBus):
super().__init__("Notifier", event_bus)
self.subscribe("action_taken", self.notify)
async def notify(self, event: Event):
"""Send notification"""
print(f"🔔 Notification: {event.data['action']}")
# In production: send email, Slack, etc.
# Usage
async def main():
event_bus = EventBus()
# Create agents
monitor = MonitorAgent(event_bus, check_interval=1.0)
handler = HandlerAgent(event_bus, ["alert", "warning"])
notifier = NotifierAgent(event_bus)
# Add monitoring conditions
monitor.add_condition(
name="High CPU",
check=lambda: get_cpu_usage() > 80,
event_type="alert"
)
# Start agents
await asyncio.gather(
monitor.start(),
handler.start(),
notifier.start()
)
asyncio.run(main())
Pattern 4: Debate and Consensus
Multiple agents debate to reach better conclusions:Copy
class DebateAgent:
"""Agent that participates in debates"""
def __init__(self, name: str, perspective: str, model: str = "gpt-4o"):
self.name = name
self.perspective = perspective
self.model = model
def argue(self, topic: str, previous_arguments: list[dict]) -> str:
"""Make an argument considering previous points"""
history = "\n".join([
f"{a['agent']}: {a['argument']}"
for a in previous_arguments
])
response = client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": f"""You are {self.name}, arguing from the perspective: {self.perspective}
Make a clear, logical argument. Address previous points if relevant.
Be constructive and aim for the best solution."""
},
{
"role": "user",
"content": f"Topic: {topic}\n\nPrevious arguments:\n{history or 'None yet'}"
}
]
)
return response.choices[0].message.content
class JudgeAgent:
"""Synthesizes debate into consensus"""
def __init__(self, model: str = "gpt-4o"):
self.model = model
def synthesize(self, topic: str, arguments: list[dict]) -> str:
"""Create consensus from all arguments"""
all_arguments = "\n\n".join([
f"**{a['agent']}** ({a['perspective']}):\n{a['argument']}"
for a in arguments
])
response = client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": """You are a neutral judge synthesizing a debate.
Identify the strongest points from each perspective.
Create a balanced conclusion that incorporates the best ideas.
Note any unresolved disagreements."""
},
{
"role": "user",
"content": f"Topic: {topic}\n\nArguments:\n{all_arguments}"
}
]
)
return response.choices[0].message.content
class DebateOrchestrator:
"""Orchestrates a multi-agent debate"""
def __init__(self, rounds: int = 3):
self.rounds = rounds
self.agents: list[DebateAgent] = []
self.judge = JudgeAgent()
def add_agent(self, name: str, perspective: str):
self.agents.append(DebateAgent(name, perspective))
def run_debate(self, topic: str) -> dict:
"""Run the full debate"""
all_arguments = []
for round_num in range(self.rounds):
print(f"\n=== Round {round_num + 1} ===")
for agent in self.agents:
argument = agent.argue(topic, all_arguments)
all_arguments.append({
"agent": agent.name,
"perspective": agent.perspective,
"argument": argument,
"round": round_num + 1
})
print(f"\n{agent.name}: {argument[:200]}...")
# Judge synthesizes
conclusion = self.judge.synthesize(topic, all_arguments)
return {
"topic": topic,
"rounds": self.rounds,
"arguments": all_arguments,
"conclusion": conclusion
}
# Usage
debate = DebateOrchestrator(rounds=2)
debate.add_agent("Optimist", "Focus on opportunities and potential benefits")
debate.add_agent("Skeptic", "Identify risks and potential problems")
debate.add_agent("Pragmatist", "Focus on practical implementation")
result = debate.run_debate("Should we adopt AI agents for customer support?")
print(f"\nConclusion:\n{result['conclusion']}")
Pattern 5: Supervisor Pattern
A supervisor agent manages and monitors worker agents:Copy
class WorkerAgent:
"""Agent that performs specific tasks under supervision"""
def __init__(self, name: str, specialty: str):
self.name = name
self.specialty = specialty
self.status = "idle"
self.current_task = None
async def work(self, task: str) -> dict:
"""Perform a task"""
self.status = "working"
self.current_task = task
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": f"You are a {self.specialty} specialist. Complete the task accurately."
},
{"role": "user", "content": task}
]
)
result = response.choices[0].message.content
self.status = "idle"
self.current_task = None
return {"agent": self.name, "result": result}
class SupervisorAgent:
"""Supervises and coordinates worker agents"""
def __init__(self):
self.workers: dict[str, WorkerAgent] = {}
self.task_queue: list[dict] = []
self.completed_tasks: list[dict] = []
def add_worker(self, worker: WorkerAgent):
self.workers[worker.name] = worker
def assign_task(self, task: str) -> str:
"""Use LLM to assign task to best worker"""
worker_info = "\n".join([
f"- {w.name}: {w.specialty} (status: {w.status})"
for w in self.workers.values()
])
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": f"""You are a supervisor. Assign the task to the best available worker.
Workers:
{worker_info}
Return just the worker name."""
},
{"role": "user", "content": f"Task: {task}"}
]
)
return response.choices[0].message.content.strip()
async def supervise(self, objective: str) -> dict:
"""Supervise the completion of an objective"""
# Break down objective
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "Break this objective into specific tasks. Return JSON: {\"tasks\": [\"task1\", \"task2\"]}"
},
{"role": "user", "content": objective}
],
response_format={"type": "json_object"}
)
tasks = json.loads(response.choices[0].message.content)["tasks"]
# Assign and execute tasks
results = []
for task in tasks:
worker_name = self.assign_task(task)
worker = self.workers.get(worker_name)
if worker and worker.status == "idle":
result = await worker.work(task)
results.append(result)
self.completed_tasks.append({
"task": task,
"worker": worker_name,
"result": result
})
# Synthesize results
synthesis = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "Synthesize these results into a final response."
},
{
"role": "user",
"content": f"Objective: {objective}\n\nResults:\n{json.dumps(results, indent=2)}"
}
]
)
return {
"objective": objective,
"tasks_completed": len(results),
"synthesis": synthesis.choices[0].message.content
}
# Usage
supervisor = SupervisorAgent()
supervisor.add_worker(WorkerAgent("DataBot", "data analysis"))
supervisor.add_worker(WorkerAgent("WriteBot", "content writing"))
supervisor.add_worker(WorkerAgent("CodeBot", "programming"))
result = await supervisor.supervise(
"Analyze our sales data and create a summary report with code examples"
)
Key Takeaways
ReAct for Autonomy
Use Reason+Act loops for agents that need to work independently
Hierarchy for Complexity
Break complex tasks into specialized subtasks
Events for Scale
Event-driven patterns for long-running, distributed systems
Debate for Quality
Multiple perspectives improve decision quality
What’s Next
Multimodal AI
Learn to build AI systems that work with vision, audio, and real-time voice