Skip to main content

Asynchronous Communication

Synchronous communication works for simple request-response scenarios, but microservices often need asynchronous patterns to achieve loose coupling, resilience, and scalability.
Learning Objectives:
  • Understand when to use async over sync communication
  • Implement message queues with RabbitMQ
  • Build event-driven systems with Apache Kafka
  • Handle message ordering, deduplication, and dead letters
  • Design robust event schemas

Why Asynchronous Communication?

Sync vs Async Comparison

SYNCHRONOUS COMMUNICATION
─────────────────────────────────────────────────────────────────────────────

  Service A                    Service B                    Service C
     │                            │                            │
     │──────── Request ──────────▶│                            │
     │                            │──────── Request ──────────▶│
     │                            │                            │
     │                            │◀─────── Response ──────────│
     │◀─────── Response ──────────│                            │
     │                            │                            │
     
  ⚠️ Problems:
  • A must wait for B and C (high latency)
  • If B fails, A fails
  • Tight coupling between services
  • Scaling issues (chain bottleneck)

─────────────────────────────────────────────────────────────────────────────

ASYNCHRONOUS COMMUNICATION
─────────────────────────────────────────────────────────────────────────────

  Service A              Message Broker              Service B    Service C
     │                        │                          │            │
     │─── Publish Event ─────▶│                          │            │
     │◀── Acknowledgment ─────│                          │            │
     │                        │                          │            │
     │                        │─────── Event ───────────▶│            │
     │                        │─────── Event ─────────────────────────▶│
     │                        │                          │            │
     │                        │◀────── Ack ──────────────│            │
     │                        │◀────── Ack ────────────────────────────│
     
  ✅ Benefits:
  • A doesn't wait (immediate response)
  • B and C process independently
  • B and C can fail and retry
  • Loose coupling, better scaling

When to Use Async

  • Fire and forget: Notifications, logging, analytics
  • Long-running tasks: Report generation, data processing
  • Decoupling needed: Services shouldn’t know about each other
  • Spike handling: Buffer requests during high load
  • Event broadcasting: One event, many consumers
  • Retry needed: Reliable delivery despite failures
  • Order processing: Multi-step workflows

Message Queue Patterns

Point-to-Point (Queue)

One message, one consumer. Work distribution pattern.
┌──────────────┐    ┌─────────────────────────┐    ┌──────────────┐
│   Producer   │───▶│         QUEUE           │───▶│  Consumer 1  │
└──────────────┘    │  [msg1][msg2][msg3]     │    └──────────────┘
                    │                         │    ┌──────────────┐
                    │                         │───▶│  Consumer 2  │
                    └─────────────────────────┘    └──────────────┘
                    
Each message is processed by exactly ONE consumer.
Used for: Work distribution, task queues

Publish-Subscribe (Topic)

One message, multiple consumers. Event broadcasting.
                    ┌─────────────────────────┐    ┌──────────────┐
                    │        TOPIC            │───▶│  Consumer 1  │
┌──────────────┐   │   order.created         │    └──────────────┘
│   Producer   │───▶│                         │───▶┌──────────────┐
└──────────────┘    │                         │    │  Consumer 2  │
                    │                         │    └──────────────┘
                    └─────────────────────────┘───▶┌──────────────┐
                                                   │  Consumer 3  │
                                                   └──────────────┘
                    
Each message is delivered to ALL subscribers.
Used for: Event broadcasting, notifications

RabbitMQ Implementation

Setup and Connection

// config/rabbitmq.js
const amqp = require('amqplib');

class RabbitMQConnection {
  constructor() {
    this.connection = null;
    this.channel = null;
  }

  async connect() {
    try {
      this.connection = await amqp.connect({
        hostname: process.env.RABBITMQ_HOST || 'localhost',
        port: process.env.RABBITMQ_PORT || 5672,
        username: process.env.RABBITMQ_USER || 'guest',
        password: process.env.RABBITMQ_PASS || 'guest',
        vhost: process.env.RABBITMQ_VHOST || '/',
        heartbeat: 60
      });

      this.channel = await this.connection.createChannel();
      
      // Handle connection events
      this.connection.on('error', (err) => {
        console.error('RabbitMQ connection error:', err);
        this.reconnect();
      });
      
      this.connection.on('close', () => {
        console.log('RabbitMQ connection closed');
        this.reconnect();
      });
      
      console.log('Connected to RabbitMQ');
      return this.channel;
    } catch (error) {
      console.error('Failed to connect to RabbitMQ:', error);
      throw error;
    }
  }

  async reconnect() {
    console.log('Attempting to reconnect to RabbitMQ...');
    await new Promise(resolve => setTimeout(resolve, 5000));
    await this.connect();
  }

  async setupExchanges() {
    // Direct exchange for point-to-point
    await this.channel.assertExchange('direct_exchange', 'direct', { durable: true });
    
    // Topic exchange for routing patterns
    await this.channel.assertExchange('topic_exchange', 'topic', { durable: true });
    
    // Fanout exchange for broadcasting
    await this.channel.assertExchange('fanout_exchange', 'fanout', { durable: true });
    
    // Dead letter exchange
    await this.channel.assertExchange('dlx_exchange', 'direct', { durable: true });
  }

  async close() {
    await this.channel?.close();
    await this.connection?.close();
  }
}

module.exports = new RabbitMQConnection();

Producer Implementation

// messaging/producer.js
class MessageProducer {
  constructor(channel) {
    this.channel = channel;
  }

  // Send to specific queue (Point-to-Point)
  async sendToQueue(queue, message, options = {}) {
    await this.channel.assertQueue(queue, {
      durable: true,
      deadLetterExchange: 'dlx_exchange',
      deadLetterRoutingKey: `${queue}.dlq`
    });

    const content = Buffer.from(JSON.stringify(message));
    
    return this.channel.sendToQueue(queue, content, {
      persistent: true,
      messageId: message.id || this.generateId(),
      timestamp: Date.now(),
      contentType: 'application/json',
      headers: {
        'x-retry-count': 0,
        'x-source-service': process.env.SERVICE_NAME,
        ...options.headers
      }
    });
  }

  // Publish to exchange (Pub/Sub)
  async publish(exchange, routingKey, event, options = {}) {
    const content = Buffer.from(JSON.stringify(event));
    
    return this.channel.publish(exchange, routingKey, content, {
      persistent: true,
      messageId: event.eventId || this.generateId(),
      timestamp: Date.now(),
      contentType: 'application/json',
      type: event.eventType,
      headers: {
        'x-event-version': event.version || '1.0',
        'x-correlation-id': event.correlationId,
        'x-source-service': process.env.SERVICE_NAME,
        ...options.headers
      }
    });
  }

  // Publish domain event with standard structure
  async publishDomainEvent(event) {
    const routingKey = event.eventType; // e.g., 'order.created'
    
    await this.publish('topic_exchange', routingKey, {
      eventId: event.eventId,
      eventType: event.eventType,
      occurredAt: event.occurredAt || new Date().toISOString(),
      version: event.version || '1.0',
      correlationId: event.correlationId,
      data: event.data,
      metadata: {
        service: process.env.SERVICE_NAME,
        environment: process.env.NODE_ENV
      }
    });
  }

  generateId() {
    return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
  }
}

module.exports = MessageProducer;

Consumer Implementation

// messaging/consumer.js
class MessageConsumer {
  constructor(channel) {
    this.channel = channel;
    this.handlers = new Map();
  }

  // Subscribe to queue
  async consumeQueue(queue, handler, options = {}) {
    const { prefetch = 10, noAck = false } = options;
    
    await this.channel.assertQueue(queue, {
      durable: true,
      deadLetterExchange: 'dlx_exchange',
      deadLetterRoutingKey: `${queue}.dlq`
    });

    // Set prefetch for fair dispatch
    await this.channel.prefetch(prefetch);

    console.log(`Consuming from queue: ${queue}`);

    await this.channel.consume(queue, async (msg) => {
      if (!msg) return;

      const startTime = Date.now();
      const content = JSON.parse(msg.content.toString());
      const messageId = msg.properties.messageId;

      try {
        console.log(`Processing message ${messageId} from ${queue}`);
        
        await handler(content, msg.properties, msg.fields);
        
        if (!noAck) {
          this.channel.ack(msg);
        }
        
        console.log(`Message ${messageId} processed in ${Date.now() - startTime}ms`);
      } catch (error) {
        console.error(`Error processing message ${messageId}:`, error);
        
        const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;
        const maxRetries = options.maxRetries || 3;
        
        if (retryCount <= maxRetries) {
          // Retry with delay
          await this.retryMessage(queue, content, msg.properties, retryCount);
          this.channel.ack(msg);
        } else {
          // Send to dead letter queue
          this.channel.nack(msg, false, false);
          console.log(`Message ${messageId} sent to DLQ after ${maxRetries} retries`);
        }
      }
    }, { noAck });
  }

  // Subscribe to topic exchange
  async subscribeToTopic(patterns, handler, options = {}) {
    const queueName = options.queueName || 
      `${process.env.SERVICE_NAME}.${patterns.join('_').replace(/[.#*]/g, '_')}`;

    const queue = await this.channel.assertQueue(queueName, {
      durable: true,
      deadLetterExchange: 'dlx_exchange',
      deadLetterRoutingKey: `${queueName}.dlq`
    });

    // Bind queue to each pattern
    for (const pattern of patterns) {
      await this.channel.bindQueue(queue.queue, 'topic_exchange', pattern);
      console.log(`Bound ${queue.queue} to pattern: ${pattern}`);
    }

    await this.consumeQueue(queue.queue, handler, options);
  }

  async retryMessage(queue, content, properties, retryCount) {
    const delay = Math.pow(2, retryCount) * 1000; // Exponential backoff
    
    // Use delayed message plugin or simple setTimeout
    await new Promise(resolve => setTimeout(resolve, delay));
    
    await this.channel.sendToQueue(queue, Buffer.from(JSON.stringify(content)), {
      ...properties,
      headers: {
        ...properties.headers,
        'x-retry-count': retryCount,
        'x-original-timestamp': properties.timestamp,
        'x-retry-timestamp': Date.now()
      }
    });
  }
}

module.exports = MessageConsumer;

Order Service Example

// order-service/events.js
const MessageProducer = require('../messaging/producer');

class OrderEventPublisher {
  constructor(channel) {
    this.producer = new MessageProducer(channel);
  }

  async orderCreated(order) {
    await this.producer.publishDomainEvent({
      eventType: 'order.created',
      correlationId: order.correlationId,
      data: {
        orderId: order.id,
        customerId: order.customerId,
        items: order.items.map(item => ({
          productId: item.productId,
          quantity: item.quantity,
          price: item.price
        })),
        totalAmount: order.totalAmount,
        shippingAddress: order.shippingAddress,
        createdAt: order.createdAt
      }
    });
  }

  async orderPaid(orderId, paymentId, amount) {
    await this.producer.publishDomainEvent({
      eventType: 'order.paid',
      data: { orderId, paymentId, amount }
    });
  }

  async orderShipped(orderId, trackingNumber, carrier) {
    await this.producer.publishDomainEvent({
      eventType: 'order.shipped',
      data: { orderId, trackingNumber, carrier }
    });
  }

  async orderCancelled(orderId, reason) {
    await this.producer.publishDomainEvent({
      eventType: 'order.cancelled',
      data: { orderId, reason }
    });
  }
}

// Inventory service listening
// inventory-service/eventHandlers.js
class InventoryEventHandler {
  constructor(consumer, inventoryService) {
    this.consumer = consumer;
    this.inventoryService = inventoryService;
  }

  async start() {
    await this.consumer.subscribeToTopic(
      ['order.created', 'order.cancelled'],
      this.handleOrderEvent.bind(this),
      { queueName: 'inventory-service.orders' }
    );
  }

  async handleOrderEvent(event, properties) {
    switch (event.eventType) {
      case 'order.created':
        await this.reserveInventory(event.data);
        break;
      case 'order.cancelled':
        await this.releaseInventory(event.data);
        break;
    }
  }

  async reserveInventory(orderData) {
    for (const item of orderData.items) {
      await this.inventoryService.reserve(
        item.productId,
        item.quantity,
        orderData.orderId
      );
    }
    console.log(`Inventory reserved for order ${orderData.orderId}`);
  }

  async releaseInventory(orderData) {
    await this.inventoryService.releaseByOrder(orderData.orderId);
    console.log(`Inventory released for order ${orderData.orderId}`);
  }
}

Apache Kafka Implementation

Kafka excels at high-throughput, ordered event streaming.

Kafka Concepts

┌─────────────────────────────────────────────────────────────────────────────┐
│                           KAFKA ARCHITECTURE                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  TOPICS & PARTITIONS                                                         │
│  ─────────────────────                                                       │
│                                                                              │
│  Topic: orders                                                               │
│  ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐          │
│  │   Partition 0     │ │   Partition 1     │ │   Partition 2     │          │
│  │ [0][1][2][3][4]   │ │ [0][1][2][3]      │ │ [0][1][2]         │          │
│  │     ───────▶      │ │     ───────▶      │ │     ───────▶      │          │
│  │   (ordered msgs)  │ │   (ordered msgs)  │ │   (ordered msgs)  │          │
│  └───────────────────┘ └───────────────────┘ └───────────────────┘          │
│                                                                              │
│  CONSUMER GROUPS                                                             │
│  ───────────────────                                                         │
│                                                                              │
│  Consumer Group: inventory-service                                           │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐                    │
│  │ Consumer 1  │     │ Consumer 2  │     │ Consumer 3  │                    │
│  │ Partition 0 │     │ Partition 1 │     │ Partition 2 │                    │
│  └─────────────┘     └─────────────┘     └─────────────┘                    │
│                                                                              │
│  Consumer Group: notification-service                                        │
│  ┌─────────────┐     ┌─────────────┐                                        │
│  │ Consumer 1  │     │ Consumer 2  │                                        │
│  │ Part 0 & 1  │     │ Partition 2 │                                        │
│  └─────────────┘     └─────────────┘                                        │
│                                                                              │
│  Each group gets ALL messages (like pub/sub)                                 │
│  Within group, messages are distributed (like queue)                         │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Kafka Producer

// kafka/producer.js
const { Kafka, Partitioners } = require('kafkajs');

class KafkaProducer {
  constructor() {
    this.kafka = new Kafka({
      clientId: process.env.SERVICE_NAME,
      brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
      retry: {
        initialRetryTime: 100,
        retries: 8
      }
    });
    
    this.producer = this.kafka.producer({
      createPartitioner: Partitioners.DefaultPartitioner,
      allowAutoTopicCreation: true,
      transactionTimeout: 30000
    });
  }

  async connect() {
    await this.producer.connect();
    console.log('Kafka producer connected');
  }

  async disconnect() {
    await this.producer.disconnect();
  }

  // Send single message
  async send(topic, message, key = null) {
    const result = await this.producer.send({
      topic,
      messages: [{
        key: key ? String(key) : null,
        value: JSON.stringify(message),
        headers: {
          'event-type': message.eventType,
          'event-id': message.eventId,
          'correlation-id': message.correlationId || '',
          'source-service': process.env.SERVICE_NAME,
          'timestamp': String(Date.now())
        }
      }]
    });
    
    return result;
  }

  // Send batch of messages
  async sendBatch(topic, messages) {
    const result = await this.producer.send({
      topic,
      messages: messages.map(msg => ({
        key: msg.key ? String(msg.key) : null,
        value: JSON.stringify(msg.value),
        headers: {
          'event-type': msg.value.eventType,
          'event-id': msg.value.eventId,
          'source-service': process.env.SERVICE_NAME
        }
      }))
    });
    
    return result;
  }

  // Transactional send (exactly-once semantics)
  async sendInTransaction(messages) {
    const transaction = await this.producer.transaction();
    
    try {
      for (const { topic, key, value } of messages) {
        await transaction.send({
          topic,
          messages: [{
            key: key ? String(key) : null,
            value: JSON.stringify(value)
          }]
        });
      }
      
      await transaction.commit();
    } catch (error) {
      await transaction.abort();
      throw error;
    }
  }
}

module.exports = KafkaProducer;

Kafka Consumer

// kafka/consumer.js
const { Kafka } = require('kafkajs');

class KafkaConsumer {
  constructor(groupId) {
    this.kafka = new Kafka({
      clientId: process.env.SERVICE_NAME,
      brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(',')
    });
    
    this.consumer = this.kafka.consumer({
      groupId,
      sessionTimeout: 30000,
      heartbeatInterval: 3000,
      maxWaitTimeInMs: 5000,
      retry: {
        initialRetryTime: 100,
        retries: 8
      }
    });
    
    this.handlers = new Map();
  }

  async connect() {
    await this.consumer.connect();
    console.log(`Kafka consumer connected for group: ${this.groupId}`);
  }

  async disconnect() {
    await this.consumer.disconnect();
  }

  // Subscribe to topics
  async subscribe(topics, handler, options = {}) {
    const { fromBeginning = false, autoCommit = true } = options;
    
    for (const topic of topics) {
      await this.consumer.subscribe({ topic, fromBeginning });
      console.log(`Subscribed to topic: ${topic}`);
    }

    await this.consumer.run({
      autoCommit,
      autoCommitInterval: 5000,
      eachMessage: async ({ topic, partition, message }) => {
        const event = JSON.parse(message.value.toString());
        const metadata = {
          topic,
          partition,
          offset: message.offset,
          key: message.key?.toString(),
          timestamp: message.timestamp,
          headers: this.parseHeaders(message.headers)
        };

        try {
          await handler(event, metadata);
          
          if (!autoCommit) {
            await this.consumer.commitOffsets([{
              topic,
              partition,
              offset: (BigInt(message.offset) + 1n).toString()
            }]);
          }
        } catch (error) {
          console.error(`Error processing message from ${topic}:`, error);
          // Could implement retry logic or send to DLQ
          throw error;
        }
      }
    });
  }

  // Subscribe with batch processing
  async subscribeBatch(topics, handler, options = {}) {
    const { fromBeginning = false, batchSize = 100 } = options;
    
    for (const topic of topics) {
      await this.consumer.subscribe({ topic, fromBeginning });
    }

    await this.consumer.run({
      autoCommit: false,
      eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
        const messages = batch.messages.map(message => ({
          event: JSON.parse(message.value.toString()),
          offset: message.offset,
          key: message.key?.toString()
        }));

        // Process in chunks
        for (let i = 0; i < messages.length; i += batchSize) {
          if (!isRunning() || isStale()) break;
          
          const chunk = messages.slice(i, i + batchSize);
          await handler(chunk, batch.topic, batch.partition);
          
          resolveOffset(chunk[chunk.length - 1].offset);
          await heartbeat();
        }
      }
    });
  }

  parseHeaders(headers) {
    const parsed = {};
    for (const [key, value] of Object.entries(headers || {})) {
      parsed[key] = value?.toString();
    }
    return parsed;
  }
}

module.exports = KafkaConsumer;

Order Service with Kafka

// order-service/kafkaEvents.js
const KafkaProducer = require('../kafka/producer');

class OrderKafkaPublisher {
  constructor() {
    this.producer = new KafkaProducer();
  }

  async initialize() {
    await this.producer.connect();
  }

  async orderCreated(order) {
    // Use orderId as key for ordering
    await this.producer.send('orders', {
      eventId: `evt-${Date.now()}`,
      eventType: 'order.created',
      occurredAt: new Date().toISOString(),
      data: {
        orderId: order.id,
        customerId: order.customerId,
        items: order.items,
        totalAmount: order.totalAmount
      }
    }, order.id); // Key ensures same order events go to same partition
  }

  async orderStatusChanged(orderId, oldStatus, newStatus) {
    await this.producer.send('orders', {
      eventId: `evt-${Date.now()}`,
      eventType: 'order.status_changed',
      data: { orderId, oldStatus, newStatus }
    }, orderId);
  }
}

// inventory-service/kafkaHandler.js
const KafkaConsumer = require('../kafka/consumer');

class InventoryKafkaHandler {
  constructor(inventoryService) {
    this.consumer = new KafkaConsumer('inventory-service');
    this.inventoryService = inventoryService;
    this.processedEvents = new Set(); // For idempotency
  }

  async start() {
    await this.consumer.connect();
    
    await this.consumer.subscribe(['orders'], async (event, metadata) => {
      // Idempotency check
      if (this.processedEvents.has(event.eventId)) {
        console.log(`Skipping duplicate event: ${event.eventId}`);
        return;
      }

      switch (event.eventType) {
        case 'order.created':
          await this.handleOrderCreated(event.data);
          break;
        case 'order.cancelled':
          await this.handleOrderCancelled(event.data);
          break;
      }

      this.processedEvents.add(event.eventId);
      
      // Cleanup old event IDs periodically
      if (this.processedEvents.size > 10000) {
        this.processedEvents.clear();
      }
    });
  }

  async handleOrderCreated(data) {
    for (const item of data.items) {
      await this.inventoryService.reserve(
        item.productId,
        item.quantity,
        data.orderId
      );
    }
  }

  async handleOrderCancelled(data) {
    await this.inventoryService.releaseByOrder(data.orderId);
  }
}

Event Design Best Practices

Event Schema Design

// Good Event Schema
const orderCreatedEvent = {
  // Metadata
  eventId: "evt-abc123",           // Unique event identifier
  eventType: "order.created",       // Dot-notation type
  eventVersion: "1.2",              // Schema version
  occurredAt: "2024-01-15T10:30:00Z", // When it happened
  correlationId: "corr-xyz789",     // For tracing
  causationId: "cmd-def456",        // What caused this event
  
  // Source info
  source: {
    service: "order-service",
    instance: "order-service-pod-abc",
    environment: "production"
  },
  
  // The actual event data
  data: {
    orderId: "ord-123",
    customerId: "cust-456",
    items: [
      { productId: "prod-789", quantity: 2, unitPrice: 29.99 }
    ],
    totalAmount: 59.98,
    currency: "USD",
    status: "CREATED",
    createdAt: "2024-01-15T10:30:00Z"
  }
};

// Event Type Naming Convention
const eventTypes = {
  // Entity.Action format
  "order.created": "When an order is placed",
  "order.paid": "When payment is confirmed",
  "order.shipped": "When order leaves warehouse",
  "order.delivered": "When customer receives order",
  "order.cancelled": "When order is cancelled",
  "order.refunded": "When refund is processed",
  
  // For status changes
  "order.status_changed": "Generic status change",
  
  // For sub-entities
  "order.item_added": "Item added to existing order",
  "order.item_removed": "Item removed from order"
};

Schema Evolution

// Version 1.0
const eventV1 = {
  eventType: "order.created",
  eventVersion: "1.0",
  data: {
    orderId: "ord-123",
    customerId: "cust-456",
    total: 59.98  // Just a number
  }
};

// Version 1.1 - Added field (backward compatible)
const eventV1_1 = {
  eventType: "order.created",
  eventVersion: "1.1",
  data: {
    orderId: "ord-123",
    customerId: "cust-456",
    total: 59.98,
    currency: "USD"  // New optional field
  }
};

// Version 2.0 - Breaking change
const eventV2 = {
  eventType: "order.created",
  eventVersion: "2.0",
  data: {
    orderId: "ord-123",
    customerId: "cust-456",
    amount: {          // Changed structure
      value: 59.98,
      currency: "USD"
    }
  }
};

// Consumer handling multiple versions
class EventHandler {
  handleOrderCreated(event) {
    const version = event.eventVersion;
    
    if (version.startsWith('1.')) {
      return this.handleV1(event.data);
    } else if (version.startsWith('2.')) {
      return this.handleV2(event.data);
    } else {
      throw new Error(`Unsupported event version: ${version}`);
    }
  }
  
  handleV1(data) {
    // Handle v1.x events
    const amount = {
      value: data.total,
      currency: data.currency || 'USD'
    };
    // ... process
  }
  
  handleV2(data) {
    // Handle v2.x events
    const amount = data.amount;
    // ... process
  }
}

Message Guarantees

At-Least-Once Delivery

// Consumer with at-least-once guarantee
// Messages may be processed multiple times
class AtLeastOnceConsumer {
  constructor(channel) {
    this.channel = channel;
  }

  async consume(queue, handler) {
    await this.channel.prefetch(1);
    
    await this.channel.consume(queue, async (msg) => {
      try {
        await handler(JSON.parse(msg.content.toString()));
        // Only ack after successful processing
        this.channel.ack(msg);
      } catch (error) {
        // Requeue for retry
        this.channel.nack(msg, false, true);
      }
    });
  }
}

Idempotent Consumer

// Idempotent consumer - safe for at-least-once delivery
class IdempotentConsumer {
  constructor(channel, idempotencyStore) {
    this.channel = channel;
    this.store = idempotencyStore; // Redis or database
  }

  async consume(queue, handler) {
    await this.channel.consume(queue, async (msg) => {
      const eventId = msg.properties.messageId;
      
      // Check if already processed
      const isProcessed = await this.store.isProcessed(eventId);
      if (isProcessed) {
        console.log(`Skipping duplicate: ${eventId}`);
        this.channel.ack(msg);
        return;
      }

      try {
        const content = JSON.parse(msg.content.toString());
        
        // Process with idempotency key
        await this.store.startProcessing(eventId);
        await handler(content);
        await this.store.markProcessed(eventId);
        
        this.channel.ack(msg);
      } catch (error) {
        await this.store.clearProcessing(eventId);
        this.channel.nack(msg, false, true);
      }
    });
  }
}

// Redis-based idempotency store
class RedisIdempotencyStore {
  constructor(redis) {
    this.redis = redis;
    this.ttl = 7 * 24 * 60 * 60; // 7 days
  }

  async isProcessed(eventId) {
    const status = await this.redis.get(`event:${eventId}`);
    return status === 'processed';
  }

  async startProcessing(eventId) {
    // Use SETNX to prevent concurrent processing
    const set = await this.redis.set(
      `event:${eventId}`,
      'processing',
      'NX',
      'EX',
      300 // 5 minute lock
    );
    if (!set) {
      throw new Error('Event already being processed');
    }
  }

  async markProcessed(eventId) {
    await this.redis.set(`event:${eventId}`, 'processed', 'EX', this.ttl);
  }

  async clearProcessing(eventId) {
    await this.redis.del(`event:${eventId}`);
  }
}

Exactly-Once with Kafka

// Kafka exactly-once semantics
const producer = kafka.producer({
  idempotent: true,  // Enable idempotent producer
  transactionalId: 'order-processor'  // For transactions
});

// Transactional producer
await producer.transaction();

try {
  await producer.send({
    topic: 'orders-processed',
    messages: [{ value: JSON.stringify(result) }]
  });
  
  // Commit consumer offsets in same transaction
  await producer.sendOffsets({
    consumerGroupId: 'order-processor',
    topics: [{
      topic: 'orders',
      partitions: [{ partition: 0, offset: '100' }]
    }]
  });
  
  await producer.commit();
} catch (error) {
  await producer.abort();
  throw error;
}

Dead Letter Queues

// Dead Letter Queue Handler
class DeadLetterHandler {
  constructor(channel) {
    this.channel = channel;
  }

  async setupDLQ(originalQueue) {
    const dlqName = `${originalQueue}.dlq`;
    
    // Create DLQ
    await this.channel.assertQueue(dlqName, { durable: true });
    
    // Create original queue with DLX
    await this.channel.assertQueue(originalQueue, {
      durable: true,
      deadLetterExchange: '',
      deadLetterRoutingKey: dlqName
    });
  }

  async processDLQ(dlqName, retryHandler) {
    await this.channel.consume(dlqName, async (msg) => {
      const content = JSON.parse(msg.content.toString());
      const headers = msg.properties.headers || {};
      const deathInfo = headers['x-death']?.[0];
      
      console.log(`DLQ message:`, {
        originalQueue: deathInfo?.queue,
        reason: deathInfo?.reason,
        count: deathInfo?.count,
        firstDeathTime: deathInfo?.time
      });

      try {
        // Attempt manual retry or alerting
        const shouldRetry = await retryHandler(content, deathInfo);
        
        if (shouldRetry) {
          // Send back to original queue
          await this.channel.sendToQueue(
            deathInfo.queue,
            Buffer.from(JSON.stringify(content)),
            {
              headers: {
                ...headers,
                'x-manual-retry': true,
                'x-retry-timestamp': Date.now()
              }
            }
          );
        } else {
          // Store for investigation
          await this.storeForInvestigation(content, deathInfo);
        }
        
        this.channel.ack(msg);
      } catch (error) {
        console.error('DLQ processing error:', error);
        // Keep in DLQ for manual intervention
        this.channel.nack(msg, false, false);
      }
    });
  }

  async storeForInvestigation(message, deathInfo) {
    // Store in database for manual investigation
    await db.collection('failed_messages').insertOne({
      message,
      deathInfo,
      storedAt: new Date(),
      status: 'pending_investigation'
    });
  }
}

Interview Questions

Answer:RabbitMQ:
  • Single consumer per queue guarantees order
  • Multiple consumers lose ordering
  • Use consistent hashing to route related messages to same consumer
Kafka:
  • Ordering guaranteed within a partition
  • Use a partition key (e.g., orderId) to ensure related events go to same partition
  • Different partition keys may be processed out of order
Best Practices:
  • Design for eventual consistency when possible
  • Use sequence numbers to detect out-of-order
  • Consider whether ordering truly matters for your use case
Answer:Strategies:
  1. Idempotent consumers: Store processed event IDs, check before processing
  2. Transactional outbox: Write message and DB update in same transaction
  3. Kafka transactions: Use transactional producer with consumer offset commits
Idempotency implementation:
  • Store event ID in database/Redis before processing
  • Use unique constraints to prevent duplicates
  • Make operations naturally idempotent (SET vs INCREMENT)
Reality: Exactly-once is hard. Design for at-least-once with idempotent consumers.
Answer:Choose Kafka when:
  • High throughput needed (millions of messages/second)
  • Need message replay (event sourcing)
  • Ordering is important
  • Long-term storage of events
  • Stream processing (Kafka Streams)
Choose RabbitMQ when:
  • Complex routing needed
  • Flexible messaging patterns
  • Lower latency matters
  • Simpler operations
  • Traditional message queue semantics
Both work for: General pub/sub, task queues, event-driven architecture
Answer:Poison message: A message that consistently fails processing.Handling strategies:
  1. Retry with backoff: Exponential backoff before requeue
  2. Max retry count: After N failures, move to DLQ
  3. Dead letter queue: Store failed messages for investigation
  4. Alerting: Notify on DLQ threshold
  5. Manual tools: UI to inspect/retry/delete DLQ messages
Implementation:
if (retryCount >= maxRetries) {
  channel.nack(msg, false, false); // Send to DLQ
} else {
  // Requeue with incremented retry count
}

Summary

Key Takeaways

  • Async communication enables loose coupling
  • RabbitMQ for flexible routing, Kafka for high-throughput streaming
  • Design idempotent consumers for at-least-once delivery
  • Use DLQs to handle failed messages
  • Event schemas should be versioned

Next Steps

In the next chapter, we’ll explore API Gateway Patterns for managing external access to your microservices.