Asynchronous Communication
Synchronous communication works for simple request-response scenarios, but microservices often need asynchronous patterns to achieve loose coupling, resilience, and scalability.Learning Objectives:
- Understand when to use async over sync communication
- Implement message queues with RabbitMQ
- Build event-driven systems with Apache Kafka
- Handle message ordering, deduplication, and dead letters
- Design robust event schemas
Why Asynchronous Communication?
Sync vs Async Comparison
Copy
SYNCHRONOUS COMMUNICATION
─────────────────────────────────────────────────────────────────────────────
Service A Service B Service C
│ │ │
│──────── Request ──────────▶│ │
│ │──────── Request ──────────▶│
│ │ │
│ │◀─────── Response ──────────│
│◀─────── Response ──────────│ │
│ │ │
⚠️ Problems:
• A must wait for B and C (high latency)
• If B fails, A fails
• Tight coupling between services
• Scaling issues (chain bottleneck)
─────────────────────────────────────────────────────────────────────────────
ASYNCHRONOUS COMMUNICATION
─────────────────────────────────────────────────────────────────────────────
Service A Message Broker Service B Service C
│ │ │ │
│─── Publish Event ─────▶│ │ │
│◀── Acknowledgment ─────│ │ │
│ │ │ │
│ │─────── Event ───────────▶│ │
│ │─────── Event ─────────────────────────▶│
│ │ │ │
│ │◀────── Ack ──────────────│ │
│ │◀────── Ack ────────────────────────────│
✅ Benefits:
• A doesn't wait (immediate response)
• B and C process independently
• B and C can fail and retry
• Loose coupling, better scaling
When to Use Async
- Use Async When
- Stay Sync When
- Fire and forget: Notifications, logging, analytics
- Long-running tasks: Report generation, data processing
- Decoupling needed: Services shouldn’t know about each other
- Spike handling: Buffer requests during high load
- Event broadcasting: One event, many consumers
- Retry needed: Reliable delivery despite failures
- Order processing: Multi-step workflows
- Immediate response needed: User login, validation
- Simple request-response: Get user by ID
- Strong consistency: Payment authorization
- Low latency critical: Real-time gaming
- Debugging simplicity: Development phase
Message Queue Patterns
Point-to-Point (Queue)
One message, one consumer. Work distribution pattern.Copy
┌──────────────┐ ┌─────────────────────────┐ ┌──────────────┐
│ Producer │───▶│ QUEUE │───▶│ Consumer 1 │
└──────────────┘ │ [msg1][msg2][msg3] │ └──────────────┘
│ │ ┌──────────────┐
│ │───▶│ Consumer 2 │
└─────────────────────────┘ └──────────────┘
Each message is processed by exactly ONE consumer.
Used for: Work distribution, task queues
Publish-Subscribe (Topic)
One message, multiple consumers. Event broadcasting.Copy
┌─────────────────────────┐ ┌──────────────┐
│ TOPIC │───▶│ Consumer 1 │
┌──────────────┐ │ order.created │ └──────────────┘
│ Producer │───▶│ │───▶┌──────────────┐
└──────────────┘ │ │ │ Consumer 2 │
│ │ └──────────────┘
└─────────────────────────┘───▶┌──────────────┐
│ Consumer 3 │
└──────────────┘
Each message is delivered to ALL subscribers.
Used for: Event broadcasting, notifications
RabbitMQ Implementation
Setup and Connection
Copy
// config/rabbitmq.js
const amqp = require('amqplib');
class RabbitMQConnection {
constructor() {
this.connection = null;
this.channel = null;
}
async connect() {
try {
this.connection = await amqp.connect({
hostname: process.env.RABBITMQ_HOST || 'localhost',
port: process.env.RABBITMQ_PORT || 5672,
username: process.env.RABBITMQ_USER || 'guest',
password: process.env.RABBITMQ_PASS || 'guest',
vhost: process.env.RABBITMQ_VHOST || '/',
heartbeat: 60
});
this.channel = await this.connection.createChannel();
// Handle connection events
this.connection.on('error', (err) => {
console.error('RabbitMQ connection error:', err);
this.reconnect();
});
this.connection.on('close', () => {
console.log('RabbitMQ connection closed');
this.reconnect();
});
console.log('Connected to RabbitMQ');
return this.channel;
} catch (error) {
console.error('Failed to connect to RabbitMQ:', error);
throw error;
}
}
async reconnect() {
console.log('Attempting to reconnect to RabbitMQ...');
await new Promise(resolve => setTimeout(resolve, 5000));
await this.connect();
}
async setupExchanges() {
// Direct exchange for point-to-point
await this.channel.assertExchange('direct_exchange', 'direct', { durable: true });
// Topic exchange for routing patterns
await this.channel.assertExchange('topic_exchange', 'topic', { durable: true });
// Fanout exchange for broadcasting
await this.channel.assertExchange('fanout_exchange', 'fanout', { durable: true });
// Dead letter exchange
await this.channel.assertExchange('dlx_exchange', 'direct', { durable: true });
}
async close() {
await this.channel?.close();
await this.connection?.close();
}
}
module.exports = new RabbitMQConnection();
Producer Implementation
Copy
// messaging/producer.js
class MessageProducer {
constructor(channel) {
this.channel = channel;
}
// Send to specific queue (Point-to-Point)
async sendToQueue(queue, message, options = {}) {
await this.channel.assertQueue(queue, {
durable: true,
deadLetterExchange: 'dlx_exchange',
deadLetterRoutingKey: `${queue}.dlq`
});
const content = Buffer.from(JSON.stringify(message));
return this.channel.sendToQueue(queue, content, {
persistent: true,
messageId: message.id || this.generateId(),
timestamp: Date.now(),
contentType: 'application/json',
headers: {
'x-retry-count': 0,
'x-source-service': process.env.SERVICE_NAME,
...options.headers
}
});
}
// Publish to exchange (Pub/Sub)
async publish(exchange, routingKey, event, options = {}) {
const content = Buffer.from(JSON.stringify(event));
return this.channel.publish(exchange, routingKey, content, {
persistent: true,
messageId: event.eventId || this.generateId(),
timestamp: Date.now(),
contentType: 'application/json',
type: event.eventType,
headers: {
'x-event-version': event.version || '1.0',
'x-correlation-id': event.correlationId,
'x-source-service': process.env.SERVICE_NAME,
...options.headers
}
});
}
// Publish domain event with standard structure
async publishDomainEvent(event) {
const routingKey = event.eventType; // e.g., 'order.created'
await this.publish('topic_exchange', routingKey, {
eventId: event.eventId,
eventType: event.eventType,
occurredAt: event.occurredAt || new Date().toISOString(),
version: event.version || '1.0',
correlationId: event.correlationId,
data: event.data,
metadata: {
service: process.env.SERVICE_NAME,
environment: process.env.NODE_ENV
}
});
}
generateId() {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
}
module.exports = MessageProducer;
Consumer Implementation
Copy
// messaging/consumer.js
class MessageConsumer {
constructor(channel) {
this.channel = channel;
this.handlers = new Map();
}
// Subscribe to queue
async consumeQueue(queue, handler, options = {}) {
const { prefetch = 10, noAck = false } = options;
await this.channel.assertQueue(queue, {
durable: true,
deadLetterExchange: 'dlx_exchange',
deadLetterRoutingKey: `${queue}.dlq`
});
// Set prefetch for fair dispatch
await this.channel.prefetch(prefetch);
console.log(`Consuming from queue: ${queue}`);
await this.channel.consume(queue, async (msg) => {
if (!msg) return;
const startTime = Date.now();
const content = JSON.parse(msg.content.toString());
const messageId = msg.properties.messageId;
try {
console.log(`Processing message ${messageId} from ${queue}`);
await handler(content, msg.properties, msg.fields);
if (!noAck) {
this.channel.ack(msg);
}
console.log(`Message ${messageId} processed in ${Date.now() - startTime}ms`);
} catch (error) {
console.error(`Error processing message ${messageId}:`, error);
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;
const maxRetries = options.maxRetries || 3;
if (retryCount <= maxRetries) {
// Retry with delay
await this.retryMessage(queue, content, msg.properties, retryCount);
this.channel.ack(msg);
} else {
// Send to dead letter queue
this.channel.nack(msg, false, false);
console.log(`Message ${messageId} sent to DLQ after ${maxRetries} retries`);
}
}
}, { noAck });
}
// Subscribe to topic exchange
async subscribeToTopic(patterns, handler, options = {}) {
const queueName = options.queueName ||
`${process.env.SERVICE_NAME}.${patterns.join('_').replace(/[.#*]/g, '_')}`;
const queue = await this.channel.assertQueue(queueName, {
durable: true,
deadLetterExchange: 'dlx_exchange',
deadLetterRoutingKey: `${queueName}.dlq`
});
// Bind queue to each pattern
for (const pattern of patterns) {
await this.channel.bindQueue(queue.queue, 'topic_exchange', pattern);
console.log(`Bound ${queue.queue} to pattern: ${pattern}`);
}
await this.consumeQueue(queue.queue, handler, options);
}
async retryMessage(queue, content, properties, retryCount) {
const delay = Math.pow(2, retryCount) * 1000; // Exponential backoff
// Use delayed message plugin or simple setTimeout
await new Promise(resolve => setTimeout(resolve, delay));
await this.channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), {
...properties,
headers: {
...properties.headers,
'x-retry-count': retryCount,
'x-original-timestamp': properties.timestamp,
'x-retry-timestamp': Date.now()
}
});
}
}
module.exports = MessageConsumer;
Order Service Example
Copy
// order-service/events.js
const MessageProducer = require('../messaging/producer');
class OrderEventPublisher {
constructor(channel) {
this.producer = new MessageProducer(channel);
}
async orderCreated(order) {
await this.producer.publishDomainEvent({
eventType: 'order.created',
correlationId: order.correlationId,
data: {
orderId: order.id,
customerId: order.customerId,
items: order.items.map(item => ({
productId: item.productId,
quantity: item.quantity,
price: item.price
})),
totalAmount: order.totalAmount,
shippingAddress: order.shippingAddress,
createdAt: order.createdAt
}
});
}
async orderPaid(orderId, paymentId, amount) {
await this.producer.publishDomainEvent({
eventType: 'order.paid',
data: { orderId, paymentId, amount }
});
}
async orderShipped(orderId, trackingNumber, carrier) {
await this.producer.publishDomainEvent({
eventType: 'order.shipped',
data: { orderId, trackingNumber, carrier }
});
}
async orderCancelled(orderId, reason) {
await this.producer.publishDomainEvent({
eventType: 'order.cancelled',
data: { orderId, reason }
});
}
}
// Inventory service listening
// inventory-service/eventHandlers.js
class InventoryEventHandler {
constructor(consumer, inventoryService) {
this.consumer = consumer;
this.inventoryService = inventoryService;
}
async start() {
await this.consumer.subscribeToTopic(
['order.created', 'order.cancelled'],
this.handleOrderEvent.bind(this),
{ queueName: 'inventory-service.orders' }
);
}
async handleOrderEvent(event, properties) {
switch (event.eventType) {
case 'order.created':
await this.reserveInventory(event.data);
break;
case 'order.cancelled':
await this.releaseInventory(event.data);
break;
}
}
async reserveInventory(orderData) {
for (const item of orderData.items) {
await this.inventoryService.reserve(
item.productId,
item.quantity,
orderData.orderId
);
}
console.log(`Inventory reserved for order ${orderData.orderId}`);
}
async releaseInventory(orderData) {
await this.inventoryService.releaseByOrder(orderData.orderId);
console.log(`Inventory released for order ${orderData.orderId}`);
}
}
Apache Kafka Implementation
Kafka excels at high-throughput, ordered event streaming.Kafka Concepts
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ KAFKA ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ TOPICS & PARTITIONS │
│ ───────────────────── │
│ │
│ Topic: orders │
│ ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐ │
│ │ Partition 0 │ │ Partition 1 │ │ Partition 2 │ │
│ │ [0][1][2][3][4] │ │ [0][1][2][3] │ │ [0][1][2] │ │
│ │ ───────▶ │ │ ───────▶ │ │ ───────▶ │ │
│ │ (ordered msgs) │ │ (ordered msgs) │ │ (ordered msgs) │ │
│ └───────────────────┘ └───────────────────┘ └───────────────────┘ │
│ │
│ CONSUMER GROUPS │
│ ─────────────────── │
│ │
│ Consumer Group: inventory-service │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Consumer 1 │ │ Consumer 2 │ │ Consumer 3 │ │
│ │ Partition 0 │ │ Partition 1 │ │ Partition 2 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Consumer Group: notification-service │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Consumer 1 │ │ Consumer 2 │ │
│ │ Part 0 & 1 │ │ Partition 2 │ │
│ └─────────────┘ └─────────────┘ │
│ │
│ Each group gets ALL messages (like pub/sub) │
│ Within group, messages are distributed (like queue) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Kafka Producer
Copy
// kafka/producer.js
const { Kafka, Partitioners } = require('kafkajs');
class KafkaProducer {
constructor() {
this.kafka = new Kafka({
clientId: process.env.SERVICE_NAME,
brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
retry: {
initialRetryTime: 100,
retries: 8
}
});
this.producer = this.kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
allowAutoTopicCreation: true,
transactionTimeout: 30000
});
}
async connect() {
await this.producer.connect();
console.log('Kafka producer connected');
}
async disconnect() {
await this.producer.disconnect();
}
// Send single message
async send(topic, message, key = null) {
const result = await this.producer.send({
topic,
messages: [{
key: key ? String(key) : null,
value: JSON.stringify(message),
headers: {
'event-type': message.eventType,
'event-id': message.eventId,
'correlation-id': message.correlationId || '',
'source-service': process.env.SERVICE_NAME,
'timestamp': String(Date.now())
}
}]
});
return result;
}
// Send batch of messages
async sendBatch(topic, messages) {
const result = await this.producer.send({
topic,
messages: messages.map(msg => ({
key: msg.key ? String(msg.key) : null,
value: JSON.stringify(msg.value),
headers: {
'event-type': msg.value.eventType,
'event-id': msg.value.eventId,
'source-service': process.env.SERVICE_NAME
}
}))
});
return result;
}
// Transactional send (exactly-once semantics)
async sendInTransaction(messages) {
const transaction = await this.producer.transaction();
try {
for (const { topic, key, value } of messages) {
await transaction.send({
topic,
messages: [{
key: key ? String(key) : null,
value: JSON.stringify(value)
}]
});
}
await transaction.commit();
} catch (error) {
await transaction.abort();
throw error;
}
}
}
module.exports = KafkaProducer;
Kafka Consumer
Copy
// kafka/consumer.js
const { Kafka } = require('kafkajs');
class KafkaConsumer {
constructor(groupId) {
this.kafka = new Kafka({
clientId: process.env.SERVICE_NAME,
brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(',')
});
this.consumer = this.kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxWaitTimeInMs: 5000,
retry: {
initialRetryTime: 100,
retries: 8
}
});
this.handlers = new Map();
}
async connect() {
await this.consumer.connect();
console.log(`Kafka consumer connected for group: ${this.groupId}`);
}
async disconnect() {
await this.consumer.disconnect();
}
// Subscribe to topics
async subscribe(topics, handler, options = {}) {
const { fromBeginning = false, autoCommit = true } = options;
for (const topic of topics) {
await this.consumer.subscribe({ topic, fromBeginning });
console.log(`Subscribed to topic: ${topic}`);
}
await this.consumer.run({
autoCommit,
autoCommitInterval: 5000,
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
const metadata = {
topic,
partition,
offset: message.offset,
key: message.key?.toString(),
timestamp: message.timestamp,
headers: this.parseHeaders(message.headers)
};
try {
await handler(event, metadata);
if (!autoCommit) {
await this.consumer.commitOffsets([{
topic,
partition,
offset: (BigInt(message.offset) + 1n).toString()
}]);
}
} catch (error) {
console.error(`Error processing message from ${topic}:`, error);
// Could implement retry logic or send to DLQ
throw error;
}
}
});
}
// Subscribe with batch processing
async subscribeBatch(topics, handler, options = {}) {
const { fromBeginning = false, batchSize = 100 } = options;
for (const topic of topics) {
await this.consumer.subscribe({ topic, fromBeginning });
}
await this.consumer.run({
autoCommit: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
const messages = batch.messages.map(message => ({
event: JSON.parse(message.value.toString()),
offset: message.offset,
key: message.key?.toString()
}));
// Process in chunks
for (let i = 0; i < messages.length; i += batchSize) {
if (!isRunning() || isStale()) break;
const chunk = messages.slice(i, i + batchSize);
await handler(chunk, batch.topic, batch.partition);
resolveOffset(chunk[chunk.length - 1].offset);
await heartbeat();
}
}
});
}
parseHeaders(headers) {
const parsed = {};
for (const [key, value] of Object.entries(headers || {})) {
parsed[key] = value?.toString();
}
return parsed;
}
}
module.exports = KafkaConsumer;
Order Service with Kafka
Copy
// order-service/kafkaEvents.js
const KafkaProducer = require('../kafka/producer');
class OrderKafkaPublisher {
constructor() {
this.producer = new KafkaProducer();
}
async initialize() {
await this.producer.connect();
}
async orderCreated(order) {
// Use orderId as key for ordering
await this.producer.send('orders', {
eventId: `evt-${Date.now()}`,
eventType: 'order.created',
occurredAt: new Date().toISOString(),
data: {
orderId: order.id,
customerId: order.customerId,
items: order.items,
totalAmount: order.totalAmount
}
}, order.id); // Key ensures same order events go to same partition
}
async orderStatusChanged(orderId, oldStatus, newStatus) {
await this.producer.send('orders', {
eventId: `evt-${Date.now()}`,
eventType: 'order.status_changed',
data: { orderId, oldStatus, newStatus }
}, orderId);
}
}
// inventory-service/kafkaHandler.js
const KafkaConsumer = require('../kafka/consumer');
class InventoryKafkaHandler {
constructor(inventoryService) {
this.consumer = new KafkaConsumer('inventory-service');
this.inventoryService = inventoryService;
this.processedEvents = new Set(); // For idempotency
}
async start() {
await this.consumer.connect();
await this.consumer.subscribe(['orders'], async (event, metadata) => {
// Idempotency check
if (this.processedEvents.has(event.eventId)) {
console.log(`Skipping duplicate event: ${event.eventId}`);
return;
}
switch (event.eventType) {
case 'order.created':
await this.handleOrderCreated(event.data);
break;
case 'order.cancelled':
await this.handleOrderCancelled(event.data);
break;
}
this.processedEvents.add(event.eventId);
// Cleanup old event IDs periodically
if (this.processedEvents.size > 10000) {
this.processedEvents.clear();
}
});
}
async handleOrderCreated(data) {
for (const item of data.items) {
await this.inventoryService.reserve(
item.productId,
item.quantity,
data.orderId
);
}
}
async handleOrderCancelled(data) {
await this.inventoryService.releaseByOrder(data.orderId);
}
}
Event Design Best Practices
Event Schema Design
Copy
// Good Event Schema
const orderCreatedEvent = {
// Metadata
eventId: "evt-abc123", // Unique event identifier
eventType: "order.created", // Dot-notation type
eventVersion: "1.2", // Schema version
occurredAt: "2024-01-15T10:30:00Z", // When it happened
correlationId: "corr-xyz789", // For tracing
causationId: "cmd-def456", // What caused this event
// Source info
source: {
service: "order-service",
instance: "order-service-pod-abc",
environment: "production"
},
// The actual event data
data: {
orderId: "ord-123",
customerId: "cust-456",
items: [
{ productId: "prod-789", quantity: 2, unitPrice: 29.99 }
],
totalAmount: 59.98,
currency: "USD",
status: "CREATED",
createdAt: "2024-01-15T10:30:00Z"
}
};
// Event Type Naming Convention
const eventTypes = {
// Entity.Action format
"order.created": "When an order is placed",
"order.paid": "When payment is confirmed",
"order.shipped": "When order leaves warehouse",
"order.delivered": "When customer receives order",
"order.cancelled": "When order is cancelled",
"order.refunded": "When refund is processed",
// For status changes
"order.status_changed": "Generic status change",
// For sub-entities
"order.item_added": "Item added to existing order",
"order.item_removed": "Item removed from order"
};
Schema Evolution
Copy
// Version 1.0
const eventV1 = {
eventType: "order.created",
eventVersion: "1.0",
data: {
orderId: "ord-123",
customerId: "cust-456",
total: 59.98 // Just a number
}
};
// Version 1.1 - Added field (backward compatible)
const eventV1_1 = {
eventType: "order.created",
eventVersion: "1.1",
data: {
orderId: "ord-123",
customerId: "cust-456",
total: 59.98,
currency: "USD" // New optional field
}
};
// Version 2.0 - Breaking change
const eventV2 = {
eventType: "order.created",
eventVersion: "2.0",
data: {
orderId: "ord-123",
customerId: "cust-456",
amount: { // Changed structure
value: 59.98,
currency: "USD"
}
}
};
// Consumer handling multiple versions
class EventHandler {
handleOrderCreated(event) {
const version = event.eventVersion;
if (version.startsWith('1.')) {
return this.handleV1(event.data);
} else if (version.startsWith('2.')) {
return this.handleV2(event.data);
} else {
throw new Error(`Unsupported event version: ${version}`);
}
}
handleV1(data) {
// Handle v1.x events
const amount = {
value: data.total,
currency: data.currency || 'USD'
};
// ... process
}
handleV2(data) {
// Handle v2.x events
const amount = data.amount;
// ... process
}
}
Message Guarantees
At-Least-Once Delivery
Copy
// Consumer with at-least-once guarantee
// Messages may be processed multiple times
class AtLeastOnceConsumer {
constructor(channel) {
this.channel = channel;
}
async consume(queue, handler) {
await this.channel.prefetch(1);
await this.channel.consume(queue, async (msg) => {
try {
await handler(JSON.parse(msg.content.toString()));
// Only ack after successful processing
this.channel.ack(msg);
} catch (error) {
// Requeue for retry
this.channel.nack(msg, false, true);
}
});
}
}
Idempotent Consumer
Copy
// Idempotent consumer - safe for at-least-once delivery
class IdempotentConsumer {
constructor(channel, idempotencyStore) {
this.channel = channel;
this.store = idempotencyStore; // Redis or database
}
async consume(queue, handler) {
await this.channel.consume(queue, async (msg) => {
const eventId = msg.properties.messageId;
// Check if already processed
const isProcessed = await this.store.isProcessed(eventId);
if (isProcessed) {
console.log(`Skipping duplicate: ${eventId}`);
this.channel.ack(msg);
return;
}
try {
const content = JSON.parse(msg.content.toString());
// Process with idempotency key
await this.store.startProcessing(eventId);
await handler(content);
await this.store.markProcessed(eventId);
this.channel.ack(msg);
} catch (error) {
await this.store.clearProcessing(eventId);
this.channel.nack(msg, false, true);
}
});
}
}
// Redis-based idempotency store
class RedisIdempotencyStore {
constructor(redis) {
this.redis = redis;
this.ttl = 7 * 24 * 60 * 60; // 7 days
}
async isProcessed(eventId) {
const status = await this.redis.get(`event:${eventId}`);
return status === 'processed';
}
async startProcessing(eventId) {
// Use SETNX to prevent concurrent processing
const set = await this.redis.set(
`event:${eventId}`,
'processing',
'NX',
'EX',
300 // 5 minute lock
);
if (!set) {
throw new Error('Event already being processed');
}
}
async markProcessed(eventId) {
await this.redis.set(`event:${eventId}`, 'processed', 'EX', this.ttl);
}
async clearProcessing(eventId) {
await this.redis.del(`event:${eventId}`);
}
}
Exactly-Once with Kafka
Copy
// Kafka exactly-once semantics
const producer = kafka.producer({
idempotent: true, // Enable idempotent producer
transactionalId: 'order-processor' // For transactions
});
// Transactional producer
await producer.transaction();
try {
await producer.send({
topic: 'orders-processed',
messages: [{ value: JSON.stringify(result) }]
});
// Commit consumer offsets in same transaction
await producer.sendOffsets({
consumerGroupId: 'order-processor',
topics: [{
topic: 'orders',
partitions: [{ partition: 0, offset: '100' }]
}]
});
await producer.commit();
} catch (error) {
await producer.abort();
throw error;
}
Dead Letter Queues
Copy
// Dead Letter Queue Handler
class DeadLetterHandler {
constructor(channel) {
this.channel = channel;
}
async setupDLQ(originalQueue) {
const dlqName = `${originalQueue}.dlq`;
// Create DLQ
await this.channel.assertQueue(dlqName, { durable: true });
// Create original queue with DLX
await this.channel.assertQueue(originalQueue, {
durable: true,
deadLetterExchange: '',
deadLetterRoutingKey: dlqName
});
}
async processDLQ(dlqName, retryHandler) {
await this.channel.consume(dlqName, async (msg) => {
const content = JSON.parse(msg.content.toString());
const headers = msg.properties.headers || {};
const deathInfo = headers['x-death']?.[0];
console.log(`DLQ message:`, {
originalQueue: deathInfo?.queue,
reason: deathInfo?.reason,
count: deathInfo?.count,
firstDeathTime: deathInfo?.time
});
try {
// Attempt manual retry or alerting
const shouldRetry = await retryHandler(content, deathInfo);
if (shouldRetry) {
// Send back to original queue
await this.channel.sendToQueue(
deathInfo.queue,
Buffer.from(JSON.stringify(content)),
{
headers: {
...headers,
'x-manual-retry': true,
'x-retry-timestamp': Date.now()
}
}
);
} else {
// Store for investigation
await this.storeForInvestigation(content, deathInfo);
}
this.channel.ack(msg);
} catch (error) {
console.error('DLQ processing error:', error);
// Keep in DLQ for manual intervention
this.channel.nack(msg, false, false);
}
});
}
async storeForInvestigation(message, deathInfo) {
// Store in database for manual investigation
await db.collection('failed_messages').insertOne({
message,
deathInfo,
storedAt: new Date(),
status: 'pending_investigation'
});
}
}
Interview Questions
Q1: How do you handle message ordering?
Q1: How do you handle message ordering?
Answer:RabbitMQ:
- Single consumer per queue guarantees order
- Multiple consumers lose ordering
- Use consistent hashing to route related messages to same consumer
- Ordering guaranteed within a partition
- Use a partition key (e.g., orderId) to ensure related events go to same partition
- Different partition keys may be processed out of order
- Design for eventual consistency when possible
- Use sequence numbers to detect out-of-order
- Consider whether ordering truly matters for your use case
Q2: How do you ensure exactly-once processing?
Q2: How do you ensure exactly-once processing?
Answer:Strategies:
- Idempotent consumers: Store processed event IDs, check before processing
- Transactional outbox: Write message and DB update in same transaction
- Kafka transactions: Use transactional producer with consumer offset commits
- Store event ID in database/Redis before processing
- Use unique constraints to prevent duplicates
- Make operations naturally idempotent (SET vs INCREMENT)
Q3: When would you choose Kafka over RabbitMQ?
Q3: When would you choose Kafka over RabbitMQ?
Answer:Choose Kafka when:
- High throughput needed (millions of messages/second)
- Need message replay (event sourcing)
- Ordering is important
- Long-term storage of events
- Stream processing (Kafka Streams)
- Complex routing needed
- Flexible messaging patterns
- Lower latency matters
- Simpler operations
- Traditional message queue semantics
Q4: How do you handle poison messages?
Q4: How do you handle poison messages?
Answer:Poison message: A message that consistently fails processing.Handling strategies:
- Retry with backoff: Exponential backoff before requeue
- Max retry count: After N failures, move to DLQ
- Dead letter queue: Store failed messages for investigation
- Alerting: Notify on DLQ threshold
- Manual tools: UI to inspect/retry/delete DLQ messages
Copy
if (retryCount >= maxRetries) {
channel.nack(msg, false, false); // Send to DLQ
} else {
// Requeue with incremented retry count
}
Summary
Key Takeaways
- Async communication enables loose coupling
- RabbitMQ for flexible routing, Kafka for high-throughput streaming
- Design idempotent consumers for at-least-once delivery
- Use DLQs to handle failed messages
- Event schemas should be versioned
Next Steps
In the next chapter, we’ll explore API Gateway Patterns for managing external access to your microservices.