Documentation Index
Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
Use this file to discover all available pages before exploring further.
RabbitMQ Messaging Patterns
Messaging patterns are the architectural recipes that solve specific distributed systems problems. Just as design patterns in software engineering give you proven solutions for common code-level problems, messaging patterns give you proven solutions for how services communicate. Choosing the wrong pattern is like using a hammer to drive a screw — it sort of works, but the result is fragile and awkward.
1. Work Queues (Task Distribution)
The simplest and most common pattern: distribute tasks among multiple workers. One producer pushes tasks into a queue, and multiple consumers compete to pull them out. Each message is processed by exactly one worker.
Real-world analogy: A restaurant kitchen with one order queue and multiple chefs. Each order goes to one chef, not all of them.
Use cases: Image resizing, email sending, PDF generation, payment processing — any CPU-intensive or I/O-heavy task you want to offload from the request-response cycle.
# producer.py -- submit tasks to the work queue
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare a durable queue so tasks survive broker restarts
channel.queue_declare(queue='tasks', durable=True)
task = {"type": "resize_image", "image_id": "abc123", "width": 800}
channel.basic_publish(
exchange='', # Default exchange -- routes directly to the named queue
routing_key='tasks', # Queue name to publish to
body=json.dumps(task),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent -- written to disk, survives broker restart
content_type='application/json',
)
)
print(f"Submitted task: {task}")
connection.close()
# worker.py -- consume and process tasks
import pika
import json
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)
# Prefetch 1: only give this worker one task at a time
# Without this, RabbitMQ distributes tasks round-robin regardless of worker speed
# A slow worker could have 100 messages queued while a fast worker sits idle
channel.basic_qos(prefetch_count=1)
def process_task(ch, method, properties, body):
task = json.loads(body)
print(f"Processing: {task}")
time.sleep(2) # Simulate work
print(f"Completed: {task['type']}")
# Acknowledge AFTER processing is complete
# If the worker crashes before acking, the message goes back to the queue
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='tasks', on_message_callback=process_task)
print("Worker waiting for tasks...")
channel.start_consuming()
Scaling is trivial: To handle more load, start more workers. They all pull from the same queue, and prefetch ensures fair distribution. No code changes required — just run another instance of worker.py.
2. Publish/Subscribe (Fanout)
Broadcast a message to all interested consumers. Every subscriber gets a copy of every message. Unlike work queues where one message goes to one consumer, pub/sub delivers each message to all consumers.
Real-world analogy: A radio station broadcast — every tuned-in radio receives the same signal.
Use cases: Cache invalidation across multiple services, real-time notifications, log aggregation (every log consumer gets all logs), event broadcasting in event-driven architectures.
# publisher.py -- broadcast events to all subscribers
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Fanout exchange -- broadcasts to ALL bound queues, ignores routing keys
channel.exchange_declare(exchange='events', exchange_type='fanout')
event = {"type": "user.created", "user_id": "u-456", "email": "alice@example.com"}
channel.basic_publish(
exchange='events',
routing_key='', # Ignored by fanout exchanges, but required by the API
body=json.dumps(event)
)
print(f"Published: {event}")
connection.close()
# subscriber.py -- each subscriber gets its own queue
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='events', exchange_type='fanout')
# Create an exclusive, auto-delete queue for this subscriber
# Each subscriber gets its own queue, so each gets a copy of every message
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Bind this subscriber's queue to the fanout exchange
channel.queue_bind(exchange='events', queue=queue_name)
def on_event(ch, method, properties, body):
event = json.loads(body)
print(f"Received event: {event}")
channel.basic_consume(queue=queue_name, on_message_callback=on_event, auto_ack=True)
print("Listening for events...")
channel.start_consuming()
3. Topic Routing (Selective Subscription)
Subscribe to a subset of messages based on pattern matching on the routing key. This is the flexible middle ground between direct (exact match) and fanout (everything).
Real-world analogy: A newspaper subscription where you choose which sections you want (sports, business, technology) rather than getting everything or nothing.
Use cases: Log routing (service X only cares about error-level logs), multi-tenant event routing, geographic routing (US events go to US processors).
# publisher.py -- publish with structured routing keys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Topic exchange -- routes based on pattern matching against the routing key
channel.exchange_declare(exchange='logs', exchange_type='topic')
# Routing key convention: <facility>.<severity>
# Examples: "auth.error", "payment.info", "order.warning"
messages = [
("auth.error", "Failed login attempt from 192.168.1.100"),
("auth.info", "User alice logged in"),
("payment.error", "Stripe API timeout on charge ch_abc123"),
("order.info", "Order #789 placed successfully"),
]
for routing_key, message in messages:
channel.basic_publish(exchange='logs', routing_key=routing_key, body=message)
print(f"Published [{routing_key}]: {message}")
connection.close()
# subscriber.py -- subscribe to patterns
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Pattern matching syntax:
# * (star) matches exactly one word: "auth.*" matches "auth.error" but NOT "auth.critical.alert"
# # (hash) matches zero or more words: "auth.#" matches "auth.error" AND "auth.critical.alert"
binding_key = sys.argv[1] if len(sys.argv) > 1 else "#" # Default: all messages
channel.queue_bind(exchange='logs', queue=queue_name, routing_key=binding_key)
def on_log(ch, method, properties, body):
print(f"[{method.routing_key}] {body.decode()}")
channel.basic_consume(queue=queue_name, on_message_callback=on_log, auto_ack=True)
print(f"Listening for: {binding_key}")
channel.start_consuming()
# Usage examples:
# python subscriber.py "#" -- receive ALL logs
# python subscriber.py "*.error" -- receive all error logs from any facility
# python subscriber.py "auth.*" -- receive all auth logs at any severity
# python subscriber.py "payment.#" -- receive all payment logs
4. Request/Reply (RPC)
Implement synchronous-style request/response over asynchronous messaging. The client sends a request message with a reply-to queue and a correlation ID, and the server sends the response back to that queue.
Real-world analogy: Sending a letter with a return address and a reference number. The recipient responds to the return address and includes the reference number so you can match the response to the original request.
Use cases: Remote procedure calls between microservices, requesting calculations or data transformations, when you need an answer but want the benefits of message queue resilience and load balancing.
# rpc_server.py -- process requests and return responses
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
channel.basic_qos(prefetch_count=1) # Process one request at a time
def calculate_tax(order_total, state):
"""Simulate a tax calculation service."""
tax_rates = {"CA": 0.0725, "NY": 0.08, "TX": 0.0625}
rate = tax_rates.get(state, 0.05)
return round(order_total * rate, 2)
def on_request(ch, method, properties, body):
request = json.loads(body)
print(f"Calculating tax for: {request}")
tax = calculate_tax(request['total'], request['state'])
response = {"tax": tax, "total_with_tax": request['total'] + tax}
# Send the response to the reply queue specified by the client
ch.basic_publish(
exchange='',
routing_key=properties.reply_to, # Client's private reply queue
properties=pika.BasicProperties(
correlation_id=properties.correlation_id, # Echo back so client can match request to response
),
body=json.dumps(response)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print("RPC server ready...")
channel.start_consuming()
# rpc_client.py -- send request and wait for response
import pika
import json
import uuid
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Create an exclusive reply queue for this client
result = channel.queue_declare(queue='', exclusive=True)
reply_queue = result.method.queue
# Generate a unique correlation ID for this request
correlation_id = str(uuid.uuid4())
response = None
def on_response(ch, method, properties, body):
global response
# Only process responses that match our correlation ID
if properties.correlation_id == correlation_id:
response = json.loads(body)
channel.basic_consume(queue=reply_queue, on_message_callback=on_response, auto_ack=True)
# Send the RPC request
request = {"total": 99.99, "state": "CA"}
channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=reply_queue, # Tell the server where to send the response
correlation_id=correlation_id, # Tag for matching request to response
),
body=json.dumps(request)
)
# Wait for the response (in production, add a timeout)
print(f"Sent request: {request}")
while response is None:
connection.process_data_events(time_limit=5)
print(f"Response: {response}")
connection.close()
RPC over messaging adds latency compared to direct HTTP calls. Use this pattern when you need the resilience benefits (request survives broker restarts, load balancing across workers) or when the server might be temporarily unavailable. For low-latency synchronous calls where the server is always available, direct HTTP or gRPC is simpler.
5. Dead Letter Exchanges (Error Handling)
When a message cannot be processed (rejected, expired, or queue is full), route it to a dead letter exchange instead of losing it. This is your safety net for failed messages — and one of the most important patterns for production systems.
Real-world analogy: Undeliverable mail goes to the dead letter office rather than being thrown away. Someone can investigate why it was undeliverable and decide what to do with it. Without a dead letter office, failed mail just vanishes and you never know it existed.
A message ends up in the dead letter exchange for three reasons:
- Rejected by a consumer with
requeue=False (explicit failure)
- Expired because the message TTL or queue TTL was exceeded (timed out)
- Dropped because the queue hit its
x-max-length limit (overflow)
# Set up a queue with dead letter routing
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='dead_letters', durable=True)
channel.queue_bind(exchange='dlx', queue='dead_letters', routing_key='failed')
# Main queue with dead letter configuration
channel.queue_declare(
queue='orders',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx', # Where rejected messages go
'x-dead-letter-routing-key': 'failed', # Routing key for the DLX
'x-message-ttl': 60000, # Messages expire after 60 seconds if not consumed
}
)
# In the consumer, reject a message to send it to the dead letter queue
def on_message(ch, method, properties, body):
try:
process_order(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Failed to process: {e}")
# requeue=False sends the message to the dead letter exchange
# requeue=True puts it back in the original queue (risk of infinite loop)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
Choosing the Right Pattern
| Pattern | Messages go to | Use when |
|---|
| Work Queue | One consumer | Distributing tasks across workers |
| Pub/Sub (Fanout) | All consumers | Broadcasting events to all interested services |
| Topic Routing | Matching consumers | Selective subscription based on message category |
| RPC | One server, reply to client | Need request/response semantics over messaging |
| Dead Letter | Error queue | Handling messages that fail processing |
Start with the simplest pattern that works. Work queues solve most problems. Add topic routing when you need selective delivery. Add RPC only when you genuinely need synchronous responses over the message bus. Over-engineering your messaging topology is a common mistake — every exchange and binding adds operational complexity.
Key Takeaways
- Work queues distribute tasks across competing consumers — scale by adding workers
- Pub/Sub broadcasts every message to every subscriber via fanout exchanges
- Topic routing gives selective subscription through pattern matching on routing keys
- RPC implements request/response over messaging using reply-to queues and correlation IDs
- Dead letter exchanges catch failed messages instead of losing them
- Always use manual acknowledgments and prefetch limits in production
- Choose the simplest pattern that meets your requirements
Next: RabbitMQ Reliability →