> ## Documentation Index
> Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Patterns

> RabbitMQ patterns

# RabbitMQ Messaging Patterns

Messaging patterns are the architectural recipes that solve specific distributed systems problems. Just as design patterns in software engineering give you proven solutions for common code-level problems, messaging patterns give you proven solutions for how services communicate. Choosing the wrong pattern is like using a hammer to drive a screw -- it sort of works, but the result is fragile and awkward.

***

## 1. Work Queues (Task Distribution)

The simplest and most common pattern: distribute tasks among multiple workers. One producer pushes tasks into a queue, and multiple consumers compete to pull them out. Each message is processed by exactly one worker.

**Real-world analogy**: A restaurant kitchen with one order queue and multiple chefs. Each order goes to one chef, not all of them.

**Use cases**: Image resizing, email sending, PDF generation, payment processing -- any CPU-intensive or I/O-heavy task you want to offload from the request-response cycle.

```python theme={null}
# producer.py -- submit tasks to the work queue
import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare a durable queue so tasks survive broker restarts
channel.queue_declare(queue='tasks', durable=True)

task = {"type": "resize_image", "image_id": "abc123", "width": 800}

channel.basic_publish(
    exchange='',                    # Default exchange -- routes directly to the named queue
    routing_key='tasks',            # Queue name to publish to
    body=json.dumps(task),
    properties=pika.BasicProperties(
        delivery_mode=2,            # Persistent -- written to disk, survives broker restart
        content_type='application/json',
    )
)
print(f"Submitted task: {task}")
connection.close()
```

```python theme={null}
# worker.py -- consume and process tasks
import pika
import json
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='tasks', durable=True)

# Prefetch 1: only give this worker one task at a time
# Without this, RabbitMQ distributes tasks round-robin regardless of worker speed
# A slow worker could have 100 messages queued while a fast worker sits idle
channel.basic_qos(prefetch_count=1)

def process_task(ch, method, properties, body):
    task = json.loads(body)
    print(f"Processing: {task}")
    time.sleep(2)  # Simulate work
    print(f"Completed: {task['type']}")

    # Acknowledge AFTER processing is complete
    # If the worker crashes before acking, the message goes back to the queue
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='tasks', on_message_callback=process_task)
print("Worker waiting for tasks...")
channel.start_consuming()
```

<Tip>
  **Scaling is trivial**: To handle more load, start more workers. They all pull from the same queue, and prefetch ensures fair distribution. No code changes required -- just run another instance of worker.py.
</Tip>

***

## 2. Publish/Subscribe (Fanout)

Broadcast a message to all interested consumers. Every subscriber gets a copy of every message. Unlike work queues where one message goes to one consumer, pub/sub delivers each message to all consumers.

**Real-world analogy**: A radio station broadcast -- every tuned-in radio receives the same signal.

**Use cases**: Cache invalidation across multiple services, real-time notifications, log aggregation (every log consumer gets all logs), event broadcasting in event-driven architectures.

```python theme={null}
# publisher.py -- broadcast events to all subscribers
import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Fanout exchange -- broadcasts to ALL bound queues, ignores routing keys
channel.exchange_declare(exchange='events', exchange_type='fanout')

event = {"type": "user.created", "user_id": "u-456", "email": "alice@example.com"}

channel.basic_publish(
    exchange='events',
    routing_key='',         # Ignored by fanout exchanges, but required by the API
    body=json.dumps(event)
)
print(f"Published: {event}")
connection.close()
```

```python theme={null}
# subscriber.py -- each subscriber gets its own queue
import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='events', exchange_type='fanout')

# Create an exclusive, auto-delete queue for this subscriber
# Each subscriber gets its own queue, so each gets a copy of every message
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# Bind this subscriber's queue to the fanout exchange
channel.queue_bind(exchange='events', queue=queue_name)

def on_event(ch, method, properties, body):
    event = json.loads(body)
    print(f"Received event: {event}")

channel.basic_consume(queue=queue_name, on_message_callback=on_event, auto_ack=True)
print("Listening for events...")
channel.start_consuming()
```

***

## 3. Topic Routing (Selective Subscription)

Subscribe to a subset of messages based on pattern matching on the routing key. This is the flexible middle ground between direct (exact match) and fanout (everything).

**Real-world analogy**: A newspaper subscription where you choose which sections you want (sports, business, technology) rather than getting everything or nothing.

**Use cases**: Log routing (service X only cares about error-level logs), multi-tenant event routing, geographic routing (US events go to US processors).

```python theme={null}
# publisher.py -- publish with structured routing keys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Topic exchange -- routes based on pattern matching against the routing key
channel.exchange_declare(exchange='logs', exchange_type='topic')

# Routing key convention: <facility>.<severity>
# Examples: "auth.error", "payment.info", "order.warning"
messages = [
    ("auth.error", "Failed login attempt from 192.168.1.100"),
    ("auth.info", "User alice logged in"),
    ("payment.error", "Stripe API timeout on charge ch_abc123"),
    ("order.info", "Order #789 placed successfully"),
]

for routing_key, message in messages:
    channel.basic_publish(exchange='logs', routing_key=routing_key, body=message)
    print(f"Published [{routing_key}]: {message}")

connection.close()
```

```python theme={null}
# subscriber.py -- subscribe to patterns
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# Pattern matching syntax:
# * (star) matches exactly one word:  "auth.*" matches "auth.error" but NOT "auth.critical.alert"
# # (hash) matches zero or more words: "auth.#" matches "auth.error" AND "auth.critical.alert"
binding_key = sys.argv[1] if len(sys.argv) > 1 else "#"  # Default: all messages

channel.queue_bind(exchange='logs', queue=queue_name, routing_key=binding_key)

def on_log(ch, method, properties, body):
    print(f"[{method.routing_key}] {body.decode()}")

channel.basic_consume(queue=queue_name, on_message_callback=on_log, auto_ack=True)
print(f"Listening for: {binding_key}")
channel.start_consuming()

# Usage examples:
# python subscriber.py "#"           -- receive ALL logs
# python subscriber.py "*.error"     -- receive all error logs from any facility
# python subscriber.py "auth.*"      -- receive all auth logs at any severity
# python subscriber.py "payment.#"   -- receive all payment logs
```

***

## 4. Request/Reply (RPC)

Implement synchronous-style request/response over asynchronous messaging. The client sends a request message with a reply-to queue and a correlation ID, and the server sends the response back to that queue.

**Real-world analogy**: Sending a letter with a return address and a reference number. The recipient responds to the return address and includes the reference number so you can match the response to the original request.

**Use cases**: Remote procedure calls between microservices, requesting calculations or data transformations, when you need an answer but want the benefits of message queue resilience and load balancing.

```python theme={null}
# rpc_server.py -- process requests and return responses
import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='rpc_queue')
channel.basic_qos(prefetch_count=1)  # Process one request at a time

def calculate_tax(order_total, state):
    """Simulate a tax calculation service."""
    tax_rates = {"CA": 0.0725, "NY": 0.08, "TX": 0.0625}
    rate = tax_rates.get(state, 0.05)
    return round(order_total * rate, 2)

def on_request(ch, method, properties, body):
    request = json.loads(body)
    print(f"Calculating tax for: {request}")

    tax = calculate_tax(request['total'], request['state'])
    response = {"tax": tax, "total_with_tax": request['total'] + tax}

    # Send the response to the reply queue specified by the client
    ch.basic_publish(
        exchange='',
        routing_key=properties.reply_to,       # Client's private reply queue
        properties=pika.BasicProperties(
            correlation_id=properties.correlation_id,  # Echo back so client can match request to response
        ),
        body=json.dumps(response)
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print("RPC server ready...")
channel.start_consuming()
```

```python theme={null}
# rpc_client.py -- send request and wait for response
import pika
import json
import uuid

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Create an exclusive reply queue for this client
result = channel.queue_declare(queue='', exclusive=True)
reply_queue = result.method.queue

# Generate a unique correlation ID for this request
correlation_id = str(uuid.uuid4())
response = None

def on_response(ch, method, properties, body):
    global response
    # Only process responses that match our correlation ID
    if properties.correlation_id == correlation_id:
        response = json.loads(body)

channel.basic_consume(queue=reply_queue, on_message_callback=on_response, auto_ack=True)

# Send the RPC request
request = {"total": 99.99, "state": "CA"}
channel.basic_publish(
    exchange='',
    routing_key='rpc_queue',
    properties=pika.BasicProperties(
        reply_to=reply_queue,           # Tell the server where to send the response
        correlation_id=correlation_id,  # Tag for matching request to response
    ),
    body=json.dumps(request)
)

# Wait for the response (in production, add a timeout)
print(f"Sent request: {request}")
while response is None:
    connection.process_data_events(time_limit=5)

print(f"Response: {response}")
connection.close()
```

<Warning>
  **RPC over messaging adds latency compared to direct HTTP calls.** Use this pattern when you need the resilience benefits (request survives broker restarts, load balancing across workers) or when the server might be temporarily unavailable. For low-latency synchronous calls where the server is always available, direct HTTP or gRPC is simpler.
</Warning>

***

## 5. Dead Letter Exchanges (Error Handling)

When a message cannot be processed (rejected, expired, or queue is full), route it to a dead letter exchange instead of losing it. This is your safety net for failed messages -- and one of the most important patterns for production systems.

**Real-world analogy**: Undeliverable mail goes to the dead letter office rather than being thrown away. Someone can investigate why it was undeliverable and decide what to do with it. Without a dead letter office, failed mail just vanishes and you never know it existed.

A message ends up in the dead letter exchange for three reasons:

1. **Rejected** by a consumer with `requeue=False` (explicit failure)
2. **Expired** because the message TTL or queue TTL was exceeded (timed out)
3. **Dropped** because the queue hit its `x-max-length` limit (overflow)

```python theme={null}
# Set up a queue with dead letter routing
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='dead_letters', durable=True)
channel.queue_bind(exchange='dlx', queue='dead_letters', routing_key='failed')

# Main queue with dead letter configuration
channel.queue_declare(
    queue='orders',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx',           # Where rejected messages go
        'x-dead-letter-routing-key': 'failed',      # Routing key for the DLX
        'x-message-ttl': 60000,                      # Messages expire after 60 seconds if not consumed
    }
)

# In the consumer, reject a message to send it to the dead letter queue
def on_message(ch, method, properties, body):
    try:
        process_order(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Failed to process: {e}")
        # requeue=False sends the message to the dead letter exchange
        # requeue=True puts it back in the original queue (risk of infinite loop)
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
```

***

## Choosing the Right Pattern

| Pattern              | Messages go to              | Use when                                         |
| -------------------- | --------------------------- | ------------------------------------------------ |
| **Work Queue**       | One consumer                | Distributing tasks across workers                |
| **Pub/Sub (Fanout)** | All consumers               | Broadcasting events to all interested services   |
| **Topic Routing**    | Matching consumers          | Selective subscription based on message category |
| **RPC**              | One server, reply to client | Need request/response semantics over messaging   |
| **Dead Letter**      | Error queue                 | Handling messages that fail processing           |

<Tip>
  **Start with the simplest pattern that works.** Work queues solve most problems. Add topic routing when you need selective delivery. Add RPC only when you genuinely need synchronous responses over the message bus. Over-engineering your messaging topology is a common mistake -- every exchange and binding adds operational complexity.
</Tip>

***

## Key Takeaways

* Work queues distribute tasks across competing consumers -- scale by adding workers
* Pub/Sub broadcasts every message to every subscriber via fanout exchanges
* Topic routing gives selective subscription through pattern matching on routing keys
* RPC implements request/response over messaging using reply-to queues and correlation IDs
* Dead letter exchanges catch failed messages instead of losing them
* Always use manual acknowledgments and prefetch limits in production
* Choose the simplest pattern that meets your requirements

***

Next: [RabbitMQ Reliability →](/courses/devops-tools/rabbitmq-reliability)
