Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

December 2025 Update: Now includes patterns from OpenAI’s Swarm, Anthropic’s multi-agent research, and production examples from leading AI companies.

What is Agentic Architecture?

Agentic Architecture refers to design patterns for building AI systems where multiple specialized agents collaborate to solve complex problems. Instead of one monolithic agent, you have:
  • Specialized agents with focused capabilities
  • Orchestration to coordinate agents
  • Communication protocols between agents
  • Shared memory/state for collaboration
Key Insight: Complex tasks are better handled by multiple specialized agents than one generalist agent trying to do everything. This is how OpenAI’s o1 and Claude’s research capabilities work internally.

Why Multi-Agent?

Single AgentMulti-Agent
Context overloadFocused contexts
One failure = total failureGraceful degradation
Hard to debugModular testing
Prompt grows infinitelyDistributed prompts
Jack of all tradesExperts collaborate

Real-World Multi-Agent Examples

SystemAgentsPattern
Devin (Cognition)Planner, Coder, Debugger, BrowserHierarchical
AutoGPTTask Decomposer, Executor, CriticReflection
ChatGPT PluginsRouter + Specialist toolsSupervisor
PerplexitySearch, Synthesize, CitePipeline

Architecture Patterns

1. Supervisor Pattern

One agent orchestrates specialized worker agents.
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal

class SupervisorState(TypedDict):
    task: str
    next_agent: str
    results: dict[str, str]
    final_answer: str

llm = ChatOpenAI(model="gpt-4o")

# Supervisor decides which agent to call
def supervisor(state: SupervisorState) -> dict:
    response = llm.invoke(f"""
    You are a supervisor managing specialized agents.
    
    Task: {state['task']}
    
    Available agents:
    - researcher: Find information and data
    - analyst: Analyze data and draw conclusions
    - writer: Write final reports
    
    Results so far: {state['results']}
    
    Which agent should work next? Or is the task complete?
    Respond with just the agent name or 'FINISH'.
    """)
    
    next_agent = response.content.strip().lower()
    return {"next_agent": next_agent}

def researcher(state: SupervisorState) -> dict:
    response = llm.invoke(f"""
    You are a research specialist.
    Task: {state['task']}
    
    Find relevant information and data.
    """)
    
    results = state["results"].copy()
    results["research"] = response.content
    return {"results": results}

def analyst(state: SupervisorState) -> dict:
    response = llm.invoke(f"""
    You are a data analyst.
    Task: {state['task']}
    Research: {state['results'].get('research', '')}
    
    Analyze the data and provide insights.
    """)
    
    results = state["results"].copy()
    results["analysis"] = response.content
    return {"results": results}

def writer(state: SupervisorState) -> dict:
    response = llm.invoke(f"""
    You are a technical writer.
    Task: {state['task']}
    Research: {state['results'].get('research', '')}
    Analysis: {state['results'].get('analysis', '')}
    
    Write a comprehensive final report.
    """)
    
    return {"final_answer": response.content}

def route(state: SupervisorState) -> Literal["researcher", "analyst", "writer", "end"]:
    next_agent = state["next_agent"]
    if next_agent == "finish":
        return "end"
    return next_agent

# Build graph
workflow = StateGraph(SupervisorState)

workflow.add_node("supervisor", supervisor)
workflow.add_node("researcher", researcher)
workflow.add_node("analyst", analyst)
workflow.add_node("writer", writer)

workflow.set_entry_point("supervisor")
workflow.add_conditional_edges("supervisor", route, {
    "researcher": "researcher",
    "analyst": "analyst",
    "writer": "writer",
    "end": END
})
workflow.add_edge("researcher", "supervisor")
workflow.add_edge("analyst", "supervisor")
workflow.add_edge("writer", "supervisor")

app = workflow.compile()

2. Debate Pattern

Agents argue different perspectives to reach better conclusions.
class DebateState(TypedDict):
    topic: str
    round: int
    pro_arguments: list[str]
    con_arguments: list[str]
    judge_verdict: str

def pro_agent(state: DebateState) -> dict:
    """Argues in favor"""
    context = "\n".join(state["con_arguments"][-2:]) if state["con_arguments"] else ""
    
    response = llm.invoke(f"""
    Topic: {state['topic']}
    Round: {state['round']}
    
    You argue IN FAVOR. Counter these points if any:
    {context}
    
    Make your best argument.
    """)
    
    return {"pro_arguments": state["pro_arguments"] + [response.content]}

def con_agent(state: DebateState) -> dict:
    """Argues against"""
    context = "\n".join(state["pro_arguments"][-2:]) if state["pro_arguments"] else ""
    
    response = llm.invoke(f"""
    Topic: {state['topic']}
    Round: {state['round']}
    
    You argue AGAINST. Counter these points if any:
    {context}
    
    Make your best argument.
    """)
    
    return {"con_arguments": state["con_arguments"] + [response.content]}

def judge(state: DebateState) -> dict:
    """Evaluates and synthesizes"""
    response = llm.invoke(f"""
    Topic: {state['topic']}
    
    Pro arguments:
    {chr(10).join(state['pro_arguments'])}
    
    Con arguments:
    {chr(10).join(state['con_arguments'])}
    
    Provide a balanced verdict considering both sides.
    """)
    
    return {"judge_verdict": response.content}

def should_continue(state: DebateState) -> Literal["continue", "judge"]:
    if state["round"] >= 3:
        return "judge"
    return "continue"

def increment_round(state: DebateState) -> dict:
    return {"round": state["round"] + 1}

3. Pipeline Pattern

Sequential processing through specialized agents.
class PipelineState(TypedDict):
    input_data: str
    extracted: dict
    validated: dict
    enriched: dict
    output: str

def extractor(state: PipelineState) -> dict:
    """Extract structured data from input"""
    response = llm.invoke(f"""
    Extract key entities from this text as JSON:
    {state['input_data']}
    """)
    return {"extracted": json.loads(response.content)}

def validator(state: PipelineState) -> dict:
    """Validate extracted data"""
    response = llm.invoke(f"""
    Validate this data. Fix any errors:
    {state['extracted']}
    
    Return corrected JSON.
    """)
    return {"validated": json.loads(response.content)}

def enricher(state: PipelineState) -> dict:
    """Add additional information"""
    response = llm.invoke(f"""
    Enrich this data with additional context:
    {state['validated']}
    
    Return enriched JSON.
    """)
    return {"enriched": json.loads(response.content)}

def formatter(state: PipelineState) -> dict:
    """Format final output"""
    response = llm.invoke(f"""
    Format this data as a readable report:
    {state['enriched']}
    """)
    return {"output": response.content}

# Linear pipeline
workflow = StateGraph(PipelineState)
workflow.add_node("extractor", extractor)
workflow.add_node("validator", validator)
workflow.add_node("enricher", enricher)
workflow.add_node("formatter", formatter)

workflow.set_entry_point("extractor")
workflow.add_edge("extractor", "validator")
workflow.add_edge("validator", "enricher")
workflow.add_edge("enricher", "formatter")
workflow.add_edge("formatter", END)

4. Swarm Pattern

Dynamic team of agents that can spawn/dismiss members.
class SwarmState(TypedDict):
    task: str
    active_agents: list[str]
    agent_outputs: dict[str, str]
    coordination_notes: str
    final_output: str

def coordinator(state: SwarmState) -> dict:
    """Manage the swarm of agents"""
    response = llm.invoke(f"""
    Task: {state['task']}
    
    Current agents: {state['active_agents']}
    Their outputs: {state['agent_outputs']}
    
    Decide:
    1. Which new agents to spawn (if any)
    2. Which agents to dismiss (if any)
    3. Coordination instructions
    
    Available agent types: researcher, coder, reviewer, writer
    
    Return JSON: {{"spawn": [], "dismiss": [], "instructions": ""}}
    """)
    
    decisions = json.loads(response.content)
    
    active = set(state["active_agents"])
    active.update(decisions["spawn"])
    active -= set(decisions["dismiss"])
    
    return {
        "active_agents": list(active),
        "coordination_notes": decisions["instructions"]
    }

Memory Patterns

Shared Memory

from typing import TypedDict

class SharedMemory(TypedDict):
    facts: list[str]           # Confirmed facts
    hypotheses: list[str]      # Unconfirmed ideas
    decisions: list[str]       # Made decisions
    context: dict[str, str]    # Key-value context

class AgentState(TypedDict):
    task: str
    memory: SharedMemory
    current_agent: str
    output: str

def update_memory(state: AgentState, new_facts: list[str] = None, 
                  new_hypotheses: list[str] = None) -> SharedMemory:
    """Helper to update shared memory"""
    memory = state["memory"].copy()
    if new_facts:
        memory["facts"].extend(new_facts)
    if new_hypotheses:
        memory["hypotheses"].extend(new_hypotheses)
    return memory

Long-Term Memory with Vector DB

from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

class AgentMemory:
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.embeddings = OpenAIEmbeddings()
        self.vectorstore = Chroma(
            collection_name=f"agent_{agent_id}",
            embedding_function=self.embeddings
        )
    
    def remember(self, content: str, metadata: dict = None):
        """Store in long-term memory"""
        self.vectorstore.add_texts(
            texts=[content],
            metadatas=[{"agent_id": self.agent_id, **(metadata or {})}]
        )
    
    def recall(self, query: str, k: int = 5) -> list[str]:
        """Retrieve relevant memories"""
        results = self.vectorstore.similarity_search(query, k=k)
        return [doc.page_content for doc in results]

Communication Patterns

Message Bus

from collections import defaultdict
from typing import Callable

class MessageBus:
    def __init__(self):
        self.subscribers: dict[str, list[Callable]] = defaultdict(list)
        self.messages: list[dict] = []
    
    def subscribe(self, topic: str, handler: Callable):
        self.subscribers[topic].append(handler)
    
    def publish(self, topic: str, message: dict, sender: str):
        msg = {"topic": topic, "message": message, "sender": sender}
        self.messages.append(msg)
        
        for handler in self.subscribers[topic]:
            handler(msg)
    
    def get_history(self, topic: str = None) -> list[dict]:
        if topic:
            return [m for m in self.messages if m["topic"] == topic]
        return self.messages

# Usage
bus = MessageBus()

def researcher_handler(msg):
    print(f"Researcher received: {msg}")

bus.subscribe("research_request", researcher_handler)
bus.publish("research_request", {"query": "AI trends"}, sender="supervisor")

Error Handling & Reliability

from tenacity import retry, stop_after_attempt, wait_exponential

class ReliableAgent:
    def __init__(self, name: str, llm):
        self.name = name
        self.llm = llm
        self.max_retries = 3
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
    async def execute(self, task: str, context: dict) -> str:
        """Execute with automatic retry"""
        try:
            response = await self.llm.ainvoke(self._build_prompt(task, context))
            return self._validate_output(response.content)
        except Exception as e:
            self._log_error(e)
            raise
    
    def _validate_output(self, output: str) -> str:
        """Validate agent output"""
        if not output or len(output) < 10:
            raise ValueError("Output too short")
        return output
    
    def _log_error(self, error: Exception):
        print(f"Agent {self.name} error: {error}")

Observability

import logging
from datetime import datetime

class AgentTracer:
    def __init__(self):
        self.traces = []
        self.logger = logging.getLogger("agents")
    
    def trace(self, agent: str, action: str, input_data: dict, output: str, 
              duration_ms: float):
        trace = {
            "timestamp": datetime.now().isoformat(),
            "agent": agent,
            "action": action,
            "input": input_data,
            "output": output[:200],  # Truncate
            "duration_ms": duration_ms
        }
        
        self.traces.append(trace)
        self.logger.info(f"{agent}.{action}: {duration_ms}ms")
    
    def get_agent_metrics(self, agent: str) -> dict:
        agent_traces = [t for t in self.traces if t["agent"] == agent]
        
        return {
            "total_calls": len(agent_traces),
            "avg_duration_ms": sum(t["duration_ms"] for t in agent_traces) / len(agent_traces),
            "actions": list(set(t["action"] for t in agent_traces))
        }

Best Practices

Each agent should have a single, clear responsibility. Avoid “god agents” that try to do everything.
Specify exactly what each agent expects as input and produces as output.
Always set timeouts for agent operations to prevent hanging.
Log agent decisions, inputs, outputs, and errors for debugging.
Test each agent independently before integrating.

When to Use Multi-Agent

Use Multi-AgentUse Single Agent
Complex, multi-step tasksSimple Q&A
Need different expertiseHomogeneous skills
Parallel processingSequential steps
Error isolation neededSimple error handling
Team simulationIndividual assistant
Start Simple: Begin with a single agent. Add more agents only when you hit complexity limits.

Interview Deep-Dive

Strong Answer:
  • The Supervisor pattern is the right default for most enterprise workflows because it provides a single point of control, making it easier to reason about execution order, debug failures, and enforce business rules. The supervisor acts as an explicit router: it receives the task, decides which agent to delegate to, collects results, and decides what to do next. This maps well to workflows with well-defined stages like “research, then analyze, then write.”
  • The Swarm pattern is appropriate when the task is highly dynamic and you cannot predict at design time how many agents you need or what types. For example, an incident response system where you might need to spawn a log-analysis agent, a metrics-analysis agent, and a customer-communication agent depending on the nature of the incident. Swarm excels when parallelism and adaptability matter more than predictable execution.
  • The key trade-off is control versus flexibility. Supervisors give you deterministic routing and clear audit trails but can become bottlenecks if the supervisor LLM makes poor routing decisions. Swarms give you emergent problem-solving behavior but are harder to debug, harder to test, and harder to set cost guardrails on because agents can spawn other agents unpredictably.
  • In practice, most production systems I have seen use a hybrid: a supervisor pattern for the top-level orchestration with swarm-like behavior within specific subtasks. For example, the supervisor routes a research task to a research sub-system, and that sub-system uses a small swarm of specialized search agents internally. This gives you control at the macro level and flexibility at the micro level.
  • One often-overlooked factor: the supervisor pattern degrades more gracefully. If one worker agent fails, the supervisor can retry, route to an alternative, or return a partial result. In a swarm, a failing agent can create cascading confusion because other agents may depend on its output without clear fallback paths.
Follow-up: How do you prevent the supervisor agent from becoming a bottleneck or single point of failure?Two strategies. First, make the supervisor decision lightweight by using a smaller, faster model (gpt-4o-mini) for routing decisions, while worker agents use the full gpt-4o for actual work. The routing decision is usually a classification task that does not need the strongest model. Second, implement supervisor-level caching: if the same type of task comes in repeatedly, cache the routing decision so you skip the supervisor LLM call entirely. For high availability, you can run the supervisor as a stateless function with the execution state stored externally (in Redis or a database), so if the supervisor process dies, another instance can pick up the state and continue.
Strong Answer:
  • Shared state in multi-agent systems is one of the hardest design problems because agents may run concurrently and update the same state. The pattern I follow is to treat shared memory as an append-only log rather than a mutable document. Each agent writes new facts, hypotheses, or decisions to the log, and reads the full log before acting. This eliminates write conflicts because you never modify existing entries, only add new ones.
  • For the implementation, I use a structure similar to the SharedMemory TypedDict shown in this chapter: separate lists for confirmed facts, unconfirmed hypotheses, and decisions. The critical rule is that only one agent at a time can promote a hypothesis to a fact, and this promotion should go through the supervisor or a dedicated “validator” agent to prevent conflicting facts.
  • When agents run in parallel (like the debate pattern), I use a turn-based protocol rather than true concurrent writes. Each agent gets the current state snapshot, produces its output, and then the orchestrator merges all outputs into the state before the next round. This is essentially an optimistic concurrency model where conflicts are resolved by the orchestrator.
  • For long-term memory backed by a vector database, each agent gets its own collection or namespace to prevent pollution. A shared collection is useful for cross-agent knowledge, but writes to it should go through a gatekeeper that checks for contradictions with existing entries. In LangGraph, this is handled through the state management system where each node returns state updates that are merged deterministically.
  • The practical gotcha is context window bloat. If every agent reads the entire shared memory, and the memory grows with each step, you quickly exhaust the context window. I implement a relevance filter: before passing shared memory to an agent, query the memory with the agent’s current task and only include the top-k most relevant entries.
Follow-up: How would you design a memory system that persists across sessions for a multi-agent system?I use a two-tier architecture. Short-term memory (the current task’s facts, hypotheses, decisions) lives in an in-memory data structure or Redis and gets cleared after the task completes. Long-term memory lives in a vector database with metadata tags for the agent that wrote it, the task context, and a timestamp. Before starting a new task, each agent queries long-term memory for relevant past experiences. The key is a decay mechanism: memories that have not been recalled in a configurable time window get downranked and eventually archived, preventing the memory from growing indefinitely and keeping retrieval fast. I also tag memories with a confidence score that degrades over time, so recent memories are preferred over stale ones.
Strong Answer:
  • The Debate pattern genuinely improves output quality in specific scenarios: when the task requires considering trade-offs, when there are legitimate multiple perspectives (policy decisions, architecture choices, risk assessments), or when you need to stress-test a conclusion. The mechanism works because each agent is forced to find weaknesses in the other’s argument, which surfaces edge cases and counterexamples that a single agent would miss.
  • It wastes tokens when the answer is factual and unambiguous. Having agents debate whether Python was created in 1991 is pure waste. It also underperforms when the LLM does not have strong enough knowledge to construct genuine counterarguments, in which case the “con” agent generates superficial objections that dilute rather than sharpen the analysis.
  • The biggest value I have seen from the Debate pattern is in code review and architecture decisions. Having a “pro” agent argue for a particular design and a “con” agent argue against it produces surprisingly thorough analysis. The judge agent then synthesizes a recommendation that accounts for trade-offs neither side would have surfaced alone.
  • For production, I limit debates to 2-3 rounds. Research shows that most of the information gain happens in the first 2 rounds; beyond that, agents start repeating themselves with slight variations. Each additional round costs 2 LLM calls (pro + con) plus the context of all previous arguments, so the cost grows quadratically with context.
  • An underappreciated optimization: instead of running the debate at generation time (making the user wait), run it offline as a batch process for common query types. Pre-debate the top 100 most-asked questions in your domain and cache the judge’s synthesis. Then at serving time, retrieve the cached debate outcome and only run a live debate for novel queries.
Follow-up: How would you adapt the Debate pattern for a customer-facing product where latency matters?I would restructure it as an async pipeline. The user gets an initial fast response from a single agent (1-2 seconds). In the background, the debate runs asynchronously. If the debate reaches a different conclusion than the initial response, the system notifies the user with a refined answer (like a “we have a more thorough analysis” update). For products where this UX does not work, I use a lightweight version: instead of separate pro/con agents, I use a single prompt that instructs the model to “consider the strongest argument for and against before concluding.” This captures about 60% of the debate benefit at zero latency cost.
Strong Answer:
  • Observability in multi-agent systems is harder than single-model applications because you need to trace a request across multiple agents, each with their own LLM calls, tool invocations, and state mutations. The foundation is distributed tracing: assign a trace ID to each user request and propagate it through every agent invocation, so you can reconstruct the full execution path.
  • The metrics I track fall into three categories. First, per-agent metrics: latency per agent call, success/failure rate, token consumption, and the distribution of tool calls each agent makes. Second, orchestration metrics: how many agents were invoked per request, how many rounds of debate or supervisor loops, and which routing paths are most common. Third, quality metrics: user satisfaction signals (thumbs up/down), answer accuracy on a held-out evaluation set, and the percentage of requests that hit the max-iterations safety cap.
  • The most important single metric is “steps to completion,” which tells you how efficiently the agent system is solving tasks. A rising trend means either the tasks are getting harder or the agents are degrading. Combined with per-agent failure rates, this helps you pinpoint which agent is struggling.
  • For implementation, I use structured logging where each log entry includes the trace ID, agent name, step number, input summary, output summary, latency, and token count. I pipe these into an observability platform (Langfuse, LangSmith, or a custom Grafana dashboard) that lets me search by trace ID to reconstruct any request. The AgentTracer class in this chapter is a good starting point, but in production you want this integrated with your existing observability stack rather than standalone.
  • A practical tip: log the full LLM prompts and responses for a sample of requests (say, 5%) rather than all requests. Full logging of every request generates enormous data volumes and privacy concerns. But having no prompt logs makes debugging hallucinations or routing errors nearly impossible.
Follow-up: How do you set up alerts that catch multi-agent system degradation before users complain?I set up three tiers of alerts. Tier 1 (immediate page): any agent hitting a 100% failure rate over a 5-minute window, or the overall max-iterations hit rate exceeding 10%. Tier 2 (Slack notification within the hour): average steps-to-completion increasing by more than 30% over a rolling 4-hour window, or per-agent latency p95 exceeding 2x the historical baseline. Tier 3 (daily report): drift in routing patterns (if the supervisor suddenly starts routing 80% of tasks to one agent, something changed), cost per request trending upward, and the distribution of user satisfaction scores. The tier 3 alerts catch slow degradation that tier 1 and 2 miss.