December 2025 Update : Now includes patterns from OpenAI’s Swarm, Anthropic’s multi-agent research, and production examples from leading AI companies.
What is Agentic Architecture?
Agentic Architecture refers to design patterns for building AI systems where multiple specialized agents collaborate to solve complex problems. Instead of one monolithic agent, you have:
Specialized agents with focused capabilities
Orchestration to coordinate agents
Communication protocols between agents
Shared memory/state for collaboration
Key Insight : Complex tasks are better handled by multiple specialized agents than one generalist agent trying to do everything. This is how OpenAI’s o1 and Claude’s research capabilities work internally.
Why Multi-Agent?
Single Agent Multi-Agent Context overload Focused contexts One failure = total failure Graceful degradation Hard to debug Modular testing Prompt grows infinitely Distributed prompts Jack of all trades Experts collaborate
Real-World Multi-Agent Examples
System Agents Pattern Devin (Cognition) Planner, Coder, Debugger, Browser Hierarchical AutoGPT Task Decomposer, Executor, Critic Reflection ChatGPT Plugins Router + Specialist tools Supervisor Perplexity Search, Synthesize, Cite Pipeline
Architecture Patterns
1. Supervisor Pattern
One agent orchestrates specialized worker agents.
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal
class SupervisorState ( TypedDict ):
task: str
next_agent: str
results: dict[ str , str ]
final_answer: str
llm = ChatOpenAI( model = "gpt-4o" )
# Supervisor decides which agent to call
def supervisor ( state : SupervisorState) -> dict :
response = llm.invoke( f """
You are a supervisor managing specialized agents.
Task: { state[ 'task' ] }
Available agents:
- researcher: Find information and data
- analyst: Analyze data and draw conclusions
- writer: Write final reports
Results so far: { state[ 'results' ] }
Which agent should work next? Or is the task complete?
Respond with just the agent name or 'FINISH'.
""" )
next_agent = response.content.strip().lower()
return { "next_agent" : next_agent}
def researcher ( state : SupervisorState) -> dict :
response = llm.invoke( f """
You are a research specialist.
Task: { state[ 'task' ] }
Find relevant information and data.
""" )
results = state[ "results" ].copy()
results[ "research" ] = response.content
return { "results" : results}
def analyst ( state : SupervisorState) -> dict :
response = llm.invoke( f """
You are a data analyst.
Task: { state[ 'task' ] }
Research: { state[ 'results' ].get( 'research' , '' ) }
Analyze the data and provide insights.
""" )
results = state[ "results" ].copy()
results[ "analysis" ] = response.content
return { "results" : results}
def writer ( state : SupervisorState) -> dict :
response = llm.invoke( f """
You are a technical writer.
Task: { state[ 'task' ] }
Research: { state[ 'results' ].get( 'research' , '' ) }
Analysis: { state[ 'results' ].get( 'analysis' , '' ) }
Write a comprehensive final report.
""" )
return { "final_answer" : response.content}
def route ( state : SupervisorState) -> Literal[ "researcher" , "analyst" , "writer" , "end" ]:
next_agent = state[ "next_agent" ]
if next_agent == "finish" :
return "end"
return next_agent
# Build graph
workflow = StateGraph(SupervisorState)
workflow.add_node( "supervisor" , supervisor)
workflow.add_node( "researcher" , researcher)
workflow.add_node( "analyst" , analyst)
workflow.add_node( "writer" , writer)
workflow.set_entry_point( "supervisor" )
workflow.add_conditional_edges( "supervisor" , route, {
"researcher" : "researcher" ,
"analyst" : "analyst" ,
"writer" : "writer" ,
"end" : END
})
workflow.add_edge( "researcher" , "supervisor" )
workflow.add_edge( "analyst" , "supervisor" )
workflow.add_edge( "writer" , "supervisor" )
app = workflow.compile()
2. Debate Pattern
Agents argue different perspectives to reach better conclusions.
class DebateState ( TypedDict ):
topic: str
round : int
pro_arguments: list[ str ]
con_arguments: list[ str ]
judge_verdict: str
def pro_agent ( state : DebateState) -> dict :
"""Argues in favor"""
context = " \n " .join(state[ "con_arguments" ][ - 2 :]) if state[ "con_arguments" ] else ""
response = llm.invoke( f """
Topic: { state[ 'topic' ] }
Round: { state[ 'round' ] }
You argue IN FAVOR. Counter these points if any:
{ context }
Make your best argument.
""" )
return { "pro_arguments" : state[ "pro_arguments" ] + [response.content]}
def con_agent ( state : DebateState) -> dict :
"""Argues against"""
context = " \n " .join(state[ "pro_arguments" ][ - 2 :]) if state[ "pro_arguments" ] else ""
response = llm.invoke( f """
Topic: { state[ 'topic' ] }
Round: { state[ 'round' ] }
You argue AGAINST. Counter these points if any:
{ context }
Make your best argument.
""" )
return { "con_arguments" : state[ "con_arguments" ] + [response.content]}
def judge ( state : DebateState) -> dict :
"""Evaluates and synthesizes"""
response = llm.invoke( f """
Topic: { state[ 'topic' ] }
Pro arguments:
{ chr ( 10 ).join(state[ 'pro_arguments' ]) }
Con arguments:
{ chr ( 10 ).join(state[ 'con_arguments' ]) }
Provide a balanced verdict considering both sides.
""" )
return { "judge_verdict" : response.content}
def should_continue ( state : DebateState) -> Literal[ "continue" , "judge" ]:
if state[ "round" ] >= 3 :
return "judge"
return "continue"
def increment_round ( state : DebateState) -> dict :
return { "round" : state[ "round" ] + 1 }
3. Pipeline Pattern
Sequential processing through specialized agents.
class PipelineState ( TypedDict ):
input_data: str
extracted: dict
validated: dict
enriched: dict
output: str
def extractor ( state : PipelineState) -> dict :
"""Extract structured data from input"""
response = llm.invoke( f """
Extract key entities from this text as JSON:
{ state[ 'input_data' ] }
""" )
return { "extracted" : json.loads(response.content)}
def validator ( state : PipelineState) -> dict :
"""Validate extracted data"""
response = llm.invoke( f """
Validate this data. Fix any errors:
{ state[ 'extracted' ] }
Return corrected JSON.
""" )
return { "validated" : json.loads(response.content)}
def enricher ( state : PipelineState) -> dict :
"""Add additional information"""
response = llm.invoke( f """
Enrich this data with additional context:
{ state[ 'validated' ] }
Return enriched JSON.
""" )
return { "enriched" : json.loads(response.content)}
def formatter ( state : PipelineState) -> dict :
"""Format final output"""
response = llm.invoke( f """
Format this data as a readable report:
{ state[ 'enriched' ] }
""" )
return { "output" : response.content}
# Linear pipeline
workflow = StateGraph(PipelineState)
workflow.add_node( "extractor" , extractor)
workflow.add_node( "validator" , validator)
workflow.add_node( "enricher" , enricher)
workflow.add_node( "formatter" , formatter)
workflow.set_entry_point( "extractor" )
workflow.add_edge( "extractor" , "validator" )
workflow.add_edge( "validator" , "enricher" )
workflow.add_edge( "enricher" , "formatter" )
workflow.add_edge( "formatter" , END )
4. Swarm Pattern
Dynamic team of agents that can spawn/dismiss members.
class SwarmState ( TypedDict ):
task: str
active_agents: list[ str ]
agent_outputs: dict[ str , str ]
coordination_notes: str
final_output: str
def coordinator ( state : SwarmState) -> dict :
"""Manage the swarm of agents"""
response = llm.invoke( f """
Task: { state[ 'task' ] }
Current agents: { state[ 'active_agents' ] }
Their outputs: { state[ 'agent_outputs' ] }
Decide:
1. Which new agents to spawn (if any)
2. Which agents to dismiss (if any)
3. Coordination instructions
Available agent types: researcher, coder, reviewer, writer
Return JSON: {{ "spawn": [], "dismiss": [], "instructions": "" }}
""" )
decisions = json.loads(response.content)
active = set (state[ "active_agents" ])
active.update(decisions[ "spawn" ])
active -= set (decisions[ "dismiss" ])
return {
"active_agents" : list (active),
"coordination_notes" : decisions[ "instructions" ]
}
Memory Patterns
Shared Memory
from typing import TypedDict
class SharedMemory ( TypedDict ):
facts: list[ str ] # Confirmed facts
hypotheses: list[ str ] # Unconfirmed ideas
decisions: list[ str ] # Made decisions
context: dict[ str , str ] # Key-value context
class AgentState ( TypedDict ):
task: str
memory: SharedMemory
current_agent: str
output: str
def update_memory ( state : AgentState, new_facts : list[ str ] = None ,
new_hypotheses : list[ str ] = None ) -> SharedMemory:
"""Helper to update shared memory"""
memory = state[ "memory" ].copy()
if new_facts:
memory[ "facts" ].extend(new_facts)
if new_hypotheses:
memory[ "hypotheses" ].extend(new_hypotheses)
return memory
Long-Term Memory with Vector DB
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
class AgentMemory :
def __init__ ( self , agent_id : str ):
self .agent_id = agent_id
self .embeddings = OpenAIEmbeddings()
self .vectorstore = Chroma(
collection_name = f "agent_ { agent_id } " ,
embedding_function = self .embeddings
)
def remember ( self , content : str , metadata : dict = None ):
"""Store in long-term memory"""
self .vectorstore.add_texts(
texts = [content],
metadatas = [{ "agent_id" : self .agent_id, ** (metadata or {})}]
)
def recall ( self , query : str , k : int = 5 ) -> list[ str ]:
"""Retrieve relevant memories"""
results = self .vectorstore.similarity_search(query, k = k)
return [doc.page_content for doc in results]
Communication Patterns
Message Bus
from collections import defaultdict
from typing import Callable
class MessageBus :
def __init__ ( self ):
self .subscribers: dict[ str , list[Callable]] = defaultdict( list )
self .messages: list[ dict ] = []
def subscribe ( self , topic : str , handler : Callable):
self .subscribers[topic].append(handler)
def publish ( self , topic : str , message : dict , sender : str ):
msg = { "topic" : topic, "message" : message, "sender" : sender}
self .messages.append(msg)
for handler in self .subscribers[topic]:
handler(msg)
def get_history ( self , topic : str = None ) -> list[ dict ]:
if topic:
return [m for m in self .messages if m[ "topic" ] == topic]
return self .messages
# Usage
bus = MessageBus()
def researcher_handler ( msg ):
print ( f "Researcher received: { msg } " )
bus.subscribe( "research_request" , researcher_handler)
bus.publish( "research_request" , { "query" : "AI trends" }, sender = "supervisor" )
Error Handling & Reliability
from tenacity import retry, stop_after_attempt, wait_exponential
class ReliableAgent :
def __init__ ( self , name : str , llm ):
self .name = name
self .llm = llm
self .max_retries = 3
@retry ( stop = stop_after_attempt( 3 ), wait = wait_exponential( min = 1 , max = 10 ))
async def execute ( self , task : str , context : dict ) -> str :
"""Execute with automatic retry"""
try :
response = await self .llm.ainvoke( self ._build_prompt(task, context))
return self ._validate_output(response.content)
except Exception as e:
self ._log_error(e)
raise
def _validate_output ( self , output : str ) -> str :
"""Validate agent output"""
if not output or len (output) < 10 :
raise ValueError ( "Output too short" )
return output
def _log_error ( self , error : Exception ):
print ( f "Agent { self .name } error: { error } " )
Observability
import logging
from datetime import datetime
class AgentTracer :
def __init__ ( self ):
self .traces = []
self .logger = logging.getLogger( "agents" )
def trace ( self , agent : str , action : str , input_data : dict , output : str ,
duration_ms : float ):
trace = {
"timestamp" : datetime.now().isoformat(),
"agent" : agent,
"action" : action,
"input" : input_data,
"output" : output[: 200 ], # Truncate
"duration_ms" : duration_ms
}
self .traces.append(trace)
self .logger.info( f " { agent } . { action } : { duration_ms } ms" )
def get_agent_metrics ( self , agent : str ) -> dict :
agent_traces = [t for t in self .traces if t[ "agent" ] == agent]
return {
"total_calls" : len (agent_traces),
"avg_duration_ms" : sum (t[ "duration_ms" ] for t in agent_traces) / len (agent_traces),
"actions" : list ( set (t[ "action" ] for t in agent_traces))
}
Best Practices
Each agent should have a single, clear responsibility. Avoid “god agents” that try to do everything.
Specify exactly what each agent expects as input and produces as output.
Always set timeouts for agent operations to prevent hanging.
Log agent decisions, inputs, outputs, and errors for debugging.
Test each agent independently before integrating.
When to Use Multi-Agent
Use Multi-Agent Use Single Agent Complex, multi-step tasks Simple Q&A Need different expertise Homogeneous skills Parallel processing Sequential steps Error isolation needed Simple error handling Team simulation Individual assistant
Start Simple : Begin with a single agent. Add more agents only when you hit complexity limits.