Skip to main content
December 2025 Update: Now includes patterns from OpenAI’s Swarm, Anthropic’s multi-agent research, and production examples from leading AI companies.

What is Agentic Architecture?

Agentic Architecture refers to design patterns for building AI systems where multiple specialized agents collaborate to solve complex problems. Instead of one monolithic agent, you have:
  • Specialized agents with focused capabilities
  • Orchestration to coordinate agents
  • Communication protocols between agents
  • Shared memory/state for collaboration
Key Insight: Complex tasks are better handled by multiple specialized agents than one generalist agent trying to do everything. This is how OpenAI’s o1 and Claude’s research capabilities work internally.

Why Multi-Agent?

Single AgentMulti-Agent
Context overloadFocused contexts
One failure = total failureGraceful degradation
Hard to debugModular testing
Prompt grows infinitelyDistributed prompts
Jack of all tradesExperts collaborate

Real-World Multi-Agent Examples

SystemAgentsPattern
Devin (Cognition)Planner, Coder, Debugger, BrowserHierarchical
AutoGPTTask Decomposer, Executor, CriticReflection
ChatGPT PluginsRouter + Specialist toolsSupervisor
PerplexitySearch, Synthesize, CitePipeline

Architecture Patterns

1. Supervisor Pattern

One agent orchestrates specialized worker agents.
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal

class SupervisorState(TypedDict):
    task: str
    next_agent: str
    results: dict[str, str]
    final_answer: str

llm = ChatOpenAI(model="gpt-4o")

# Supervisor decides which agent to call
def supervisor(state: SupervisorState) -> dict:
    response = llm.invoke(f"""
    You are a supervisor managing specialized agents.
    
    Task: {state['task']}
    
    Available agents:
    - researcher: Find information and data
    - analyst: Analyze data and draw conclusions
    - writer: Write final reports
    
    Results so far: {state['results']}
    
    Which agent should work next? Or is the task complete?
    Respond with just the agent name or 'FINISH'.
    """)
    
    next_agent = response.content.strip().lower()
    return {"next_agent": next_agent}

def researcher(state: SupervisorState) -> dict:
    response = llm.invoke(f"""
    You are a research specialist.
    Task: {state['task']}
    
    Find relevant information and data.
    """)
    
    results = state["results"].copy()
    results["research"] = response.content
    return {"results": results}

def analyst(state: SupervisorState) -> dict:
    response = llm.invoke(f"""
    You are a data analyst.
    Task: {state['task']}
    Research: {state['results'].get('research', '')}
    
    Analyze the data and provide insights.
    """)
    
    results = state["results"].copy()
    results["analysis"] = response.content
    return {"results": results}

def writer(state: SupervisorState) -> dict:
    response = llm.invoke(f"""
    You are a technical writer.
    Task: {state['task']}
    Research: {state['results'].get('research', '')}
    Analysis: {state['results'].get('analysis', '')}
    
    Write a comprehensive final report.
    """)
    
    return {"final_answer": response.content}

def route(state: SupervisorState) -> Literal["researcher", "analyst", "writer", "end"]:
    next_agent = state["next_agent"]
    if next_agent == "finish":
        return "end"
    return next_agent

# Build graph
workflow = StateGraph(SupervisorState)

workflow.add_node("supervisor", supervisor)
workflow.add_node("researcher", researcher)
workflow.add_node("analyst", analyst)
workflow.add_node("writer", writer)

workflow.set_entry_point("supervisor")
workflow.add_conditional_edges("supervisor", route, {
    "researcher": "researcher",
    "analyst": "analyst",
    "writer": "writer",
    "end": END
})
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("analyst", "supervisor")
workflow.add_edge("writer", "supervisor")

app = workflow.compile()

2. Debate Pattern

Agents argue different perspectives to reach better conclusions.
class DebateState(TypedDict):
    topic: str
    round: int
    pro_arguments: list[str]
    con_arguments: list[str]
    judge_verdict: str

def pro_agent(state: DebateState) -> dict:
    """Argues in favor"""
    context = "\n".join(state["con_arguments"][-2:]) if state["con_arguments"] else ""
    
    response = llm.invoke(f"""
    Topic: {state['topic']}
    Round: {state['round']}
    
    You argue IN FAVOR. Counter these points if any:
    {context}
    
    Make your best argument.
    """)
    
    return {"pro_arguments": state["pro_arguments"] + [response.content]}

def con_agent(state: DebateState) -> dict:
    """Argues against"""
    context = "\n".join(state["pro_arguments"][-2:]) if state["pro_arguments"] else ""
    
    response = llm.invoke(f"""
    Topic: {state['topic']}
    Round: {state['round']}
    
    You argue AGAINST. Counter these points if any:
    {context}
    
    Make your best argument.
    """)
    
    return {"con_arguments": state["con_arguments"] + [response.content]}

def judge(state: DebateState) -> dict:
    """Evaluates and synthesizes"""
    response = llm.invoke(f"""
    Topic: {state['topic']}
    
    Pro arguments:
    {chr(10).join(state['pro_arguments'])}
    
    Con arguments:
    {chr(10).join(state['con_arguments'])}
    
    Provide a balanced verdict considering both sides.
    """)
    
    return {"judge_verdict": response.content}

def should_continue(state: DebateState) -> Literal["continue", "judge"]:
    if state["round"] >= 3:
        return "judge"
    return "continue"

def increment_round(state: DebateState) -> dict:
    return {"round": state["round"] + 1}

3. Pipeline Pattern

Sequential processing through specialized agents.
class PipelineState(TypedDict):
    input_data: str
    extracted: dict
    validated: dict
    enriched: dict
    output: str

def extractor(state: PipelineState) -> dict:
    """Extract structured data from input"""
    response = llm.invoke(f"""
    Extract key entities from this text as JSON:
    {state['input_data']}
    """)
    return {"extracted": json.loads(response.content)}

def validator(state: PipelineState) -> dict:
    """Validate extracted data"""
    response = llm.invoke(f"""
    Validate this data. Fix any errors:
    {state['extracted']}
    
    Return corrected JSON.
    """)
    return {"validated": json.loads(response.content)}

def enricher(state: PipelineState) -> dict:
    """Add additional information"""
    response = llm.invoke(f"""
    Enrich this data with additional context:
    {state['validated']}
    
    Return enriched JSON.
    """)
    return {"enriched": json.loads(response.content)}

def formatter(state: PipelineState) -> dict:
    """Format final output"""
    response = llm.invoke(f"""
    Format this data as a readable report:
    {state['enriched']}
    """)
    return {"output": response.content}

# Linear pipeline
workflow = StateGraph(PipelineState)
workflow.add_node("extractor", extractor)
workflow.add_node("validator", validator)
workflow.add_node("enricher", enricher)
workflow.add_node("formatter", formatter)

workflow.set_entry_point("extractor")
workflow.add_edge("extractor", "validator")
workflow.add_edge("validator", "enricher")
workflow.add_edge("enricher", "formatter")
workflow.add_edge("formatter", END)

4. Swarm Pattern

Dynamic team of agents that can spawn/dismiss members.
class SwarmState(TypedDict):
    task: str
    active_agents: list[str]
    agent_outputs: dict[str, str]
    coordination_notes: str
    final_output: str

def coordinator(state: SwarmState) -> dict:
    """Manage the swarm of agents"""
    response = llm.invoke(f"""
    Task: {state['task']}
    
    Current agents: {state['active_agents']}
    Their outputs: {state['agent_outputs']}
    
    Decide:
    1. Which new agents to spawn (if any)
    2. Which agents to dismiss (if any)
    3. Coordination instructions
    
    Available agent types: researcher, coder, reviewer, writer
    
    Return JSON: {{"spawn": [], "dismiss": [], "instructions": ""}}
    """)
    
    decisions = json.loads(response.content)
    
    active = set(state["active_agents"])
    active.update(decisions["spawn"])
    active -= set(decisions["dismiss"])
    
    return {
        "active_agents": list(active),
        "coordination_notes": decisions["instructions"]
    }

Memory Patterns

Shared Memory

from typing import TypedDict

class SharedMemory(TypedDict):
    facts: list[str]           # Confirmed facts
    hypotheses: list[str]      # Unconfirmed ideas
    decisions: list[str]       # Made decisions
    context: dict[str, str]    # Key-value context

class AgentState(TypedDict):
    task: str
    memory: SharedMemory
    current_agent: str
    output: str

def update_memory(state: AgentState, new_facts: list[str] = None, 
                  new_hypotheses: list[str] = None) -> SharedMemory:
    """Helper to update shared memory"""
    memory = state["memory"].copy()
    if new_facts:
        memory["facts"].extend(new_facts)
    if new_hypotheses:
        memory["hypotheses"].extend(new_hypotheses)
    return memory

Long-Term Memory with Vector DB

from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

class AgentMemory:
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.embeddings = OpenAIEmbeddings()
        self.vectorstore = Chroma(
            collection_name=f"agent_{agent_id}",
            embedding_function=self.embeddings
        )
    
    def remember(self, content: str, metadata: dict = None):
        """Store in long-term memory"""
        self.vectorstore.add_texts(
            texts=[content],
            metadatas=[{"agent_id": self.agent_id, **(metadata or {})}]
        )
    
    def recall(self, query: str, k: int = 5) -> list[str]:
        """Retrieve relevant memories"""
        results = self.vectorstore.similarity_search(query, k=k)
        return [doc.page_content for doc in results]

Communication Patterns

Message Bus

from collections import defaultdict
from typing import Callable

class MessageBus:
    def __init__(self):
        self.subscribers: dict[str, list[Callable]] = defaultdict(list)
        self.messages: list[dict] = []
    
    def subscribe(self, topic: str, handler: Callable):
        self.subscribers[topic].append(handler)
    
    def publish(self, topic: str, message: dict, sender: str):
        msg = {"topic": topic, "message": message, "sender": sender}
        self.messages.append(msg)
        
        for handler in self.subscribers[topic]:
            handler(msg)
    
    def get_history(self, topic: str = None) -> list[dict]:
        if topic:
            return [m for m in self.messages if m["topic"] == topic]
        return self.messages

# Usage
bus = MessageBus()

def researcher_handler(msg):
    print(f"Researcher received: {msg}")

bus.subscribe("research_request", researcher_handler)
bus.publish("research_request", {"query": "AI trends"}, sender="supervisor")

Error Handling & Reliability

from tenacity import retry, stop_after_attempt, wait_exponential

class ReliableAgent:
    def __init__(self, name: str, llm):
        self.name = name
        self.llm = llm
        self.max_retries = 3
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
    async def execute(self, task: str, context: dict) -> str:
        """Execute with automatic retry"""
        try:
            response = await self.llm.ainvoke(self._build_prompt(task, context))
            return self._validate_output(response.content)
        except Exception as e:
            self._log_error(e)
            raise
    
    def _validate_output(self, output: str) -> str:
        """Validate agent output"""
        if not output or len(output) < 10:
            raise ValueError("Output too short")
        return output
    
    def _log_error(self, error: Exception):
        print(f"Agent {self.name} error: {error}")

Observability

import logging
from datetime import datetime

class AgentTracer:
    def __init__(self):
        self.traces = []
        self.logger = logging.getLogger("agents")
    
    def trace(self, agent: str, action: str, input_data: dict, output: str, 
              duration_ms: float):
        trace = {
            "timestamp": datetime.now().isoformat(),
            "agent": agent,
            "action": action,
            "input": input_data,
            "output": output[:200],  # Truncate
            "duration_ms": duration_ms
        }
        
        self.traces.append(trace)
        self.logger.info(f"{agent}.{action}: {duration_ms}ms")
    
    def get_agent_metrics(self, agent: str) -> dict:
        agent_traces = [t for t in self.traces if t["agent"] == agent]
        
        return {
            "total_calls": len(agent_traces),
            "avg_duration_ms": sum(t["duration_ms"] for t in agent_traces) / len(agent_traces),
            "actions": list(set(t["action"] for t in agent_traces))
        }

Best Practices

Each agent should have a single, clear responsibility. Avoid “god agents” that try to do everything.
Specify exactly what each agent expects as input and produces as output.
Always set timeouts for agent operations to prevent hanging.
Log agent decisions, inputs, outputs, and errors for debugging.
Test each agent independently before integrating.

When to Use Multi-Agent

Use Multi-AgentUse Single Agent
Complex, multi-step tasksSimple Q&A
Need different expertiseHomogeneous skills
Parallel processingSequential steps
Error isolation neededSimple error handling
Team simulationIndividual assistant
Start Simple: Begin with a single agent. Add more agents only when you hit complexity limits.