> ## Documentation Index
> Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka Producers & Consumers

> Publishing and consuming messages with Kafka APIs

# Kafka Producers & Consumers

Learn to build robust applications that publish and subscribe to Kafka topics. The Producer and Consumer APIs are where Kafka theory meets production reality -- this is the code you will write, debug, and tune every day.

***

## Producer API

Producers publish data to the topics of their choice.

### Key Responsibilities

* **Partitioning**: Deciding which partition to send the message to.
* **Serialization**: Converting key/value objects to bytes.
* **Compression**: Reducing network bandwidth (Snappy, Gzip, LZ4, Zstd).
* **Batching**: Grouping messages for efficiency.

### Java Example

```java theme={null}
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // Bootstrap servers: initial brokers for cluster discovery.
        // In production, list at least 3 brokers for redundancy.
        // The client discovers the full cluster from any one of them.
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        // Reliability configs -- these three together prevent data loss
        props.put("acks", "all");   // Wait for ALL in-sync replicas to acknowledge
        props.put("retries", 3);    // Retry on transient failures (network blips, leader election)
        props.put("linger.ms", 1);  // Wait 1ms to batch messages (reduces network round trips)
        // In production, also set enable.idempotence=true to prevent duplicates on retry

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 100; i++) {
            // send() is asynchronous -- it returns immediately and enqueues the record.
            // The callback fires when the broker acknowledges (or rejects) the write.
            // NEVER ignore the exception in the callback -- that is how you silently lose data.
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), "Message " + i),
                (metadata, exception) -> {
                    if (exception == null) {
                        System.out.printf("Sent to partition %d @ offset %d%n", 
                            metadata.partition(), metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                });
        }
        // close() flushes any remaining buffered records and waits for acks.
        // Always call close() -- without it, buffered messages are silently dropped.
        producer.close();
    }
}
```

### Python Example (kafka-python)

```python theme={null}
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    # Custom serializer: converts Python dicts to JSON bytes.
    # The serializer runs for every message, so keep it efficient.
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

# Asynchronous send -- returns a Future, not a result.
# Calling .get(timeout=60) blocks until the broker acknowledges.
# In production, prefer callbacks or batched sends for throughput.
future = producer.send('my-topic', value={'key': 'value'})
result = future.get(timeout=60)  # Raises KafkaError if send failed
```

***

## Consumer API

Consumers read data from topics. They subscribe to one or more topics and pull data.

### Consumer Groups & Rebalancing

* **Consumer Group**: A pool of consumers that share the work.
* **Rebalancing**: When a consumer joins/leaves, partitions are reassigned.

### Java Example

```java theme={null}
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("enable.auto.commit", "false"); // Manual commit for safety
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", 
                    record.offset(), record.key(), record.value());
                
                // Process record...
            }
            consumer.commitSync(); // Commit offsets after processing
        }
    }
}
```

***

## Delivery Semantics

<CardGroup cols={3}>
  <Card title="At Most Once" icon="xmark">
    Messages may be lost, but never duplicated.
    *Commit offset **before** processing.*
  </Card>

  <Card title="At Least Once" icon="check">
    Messages are never lost, but may be duplicated.
    *Commit offset **after** processing.* (Default/Preferred)
  </Card>

  <Card title="Exactly Once" icon="check-double">
    Each message is delivered exactly once.
    *Requires Transactional API.*
  </Card>
</CardGroup>

***

## Important Configurations

### Producer Configs

| Config               | Description                        | Recommended            |
| -------------------- | ---------------------------------- | ---------------------- |
| `acks`               | How many replicas must acknowledge | `all` (for durability) |
| `retries`            | Retry count on transient errors    | `Integer.MAX_VALUE`    |
| `enable.idempotence` | Prevent duplicates                 | `true`                 |
| `compression.type`   | Compression algorithm              | `snappy` or `lz4`      |

### Consumer Configs

| Config               | Description                      | Recommended                       |
| -------------------- | -------------------------------- | --------------------------------- |
| `group.id`           | Unique ID for the consumer group | Required                          |
| `auto.offset.reset`  | What to do if no offset exists   | `earliest` (start from beginning) |
| `enable.auto.commit` | Auto-commit offsets              | `false` (manual control)          |
| `max.poll.records`   | Max records per poll             | Tuned to processing speed         |

***

## Best Practices

<AccordionGroup>
  <Accordion title="Handle Rebalancing" icon="scale-balanced">
    Handle `WakeupException` and close consumers gracefully to trigger a rebalance immediately rather than waiting for a timeout.
  </Accordion>

  <Accordion title="Idempotent Processing" icon="rotate">
    Since "At Least Once" is common, ensure your processing logic handles duplicates (e.g., using a database unique constraint).
  </Accordion>

  <Accordion title="Monitor Lag" icon="chart-line">
    **Consumer Lag** is the difference between the latest offset in the partition and the consumer's current offset. High lag means consumers are too slow.
  </Accordion>
</AccordionGroup>

***

## Exactly-Once Semantics (EOS)

Exactly-once is the holy grail of message delivery. Kafka supports it through **Idempotent Producers** and **Transactions**.

### Idempotent Producer

Prevents duplicates caused by producer retries.

```java theme={null}
props.put("enable.idempotence", "true");  // Enables idempotent producer
// Automatically sets:
// acks = all
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection = 5
```

**How it works:**

* Producer assigns a **Producer ID (PID)** and **sequence number** to each message
* Broker deduplicates based on PID + sequence number
* If retry sends duplicate, broker recognizes and discards it

### Transactional Producer

For atomic writes across multiple partitions/topics. The classic use case is the "consume-transform-produce" pattern: read from topic A, process, write to topic B, and commit the consumer offset -- all atomically. Either everything succeeds or nothing does.

```java theme={null}
// transactional.id must be unique per producer instance and stable across restarts.
// Kafka uses it to fence zombie producers (old instances that did not shut down cleanly).
props.put("transactional.id", "my-transactional-producer");

producer.initTransactions();  // Must be called exactly once before any transactional operations

try {
    producer.beginTransaction();
    
    // All sends within the transaction are buffered and only visible to
    // consumers with isolation.level=read_committed after commitTransaction().
    producer.send(new ProducerRecord<>("topic1", "key", "value1"));
    producer.send(new ProducerRecord<>("topic2", "key", "value2"));
    
    // Commit consumer offsets as part of the same transaction.
    // This is what makes exactly-once consume-transform-produce possible:
    // the offset commit and the output writes are atomic.
    producer.sendOffsetsToTransaction(offsets, consumerGroupId);
    
    producer.commitTransaction();
} catch (Exception e) {
    // Abort rolls back ALL sends in this transaction. Consumers with
    // read_committed will never see the aborted messages.
    producer.abortTransaction();
}
```

### Transactional Consumer

Read only committed messages:

```java theme={null}
props.put("isolation.level", "read_committed");
```

| Isolation Level    | Behavior                             |
| ------------------ | ------------------------------------ |
| `read_uncommitted` | See all messages (including aborted) |
| `read_committed`   | Only see committed transactions      |

***

## Consumer Group Rebalancing Deep Dive

Rebalancing is one of the most misunderstood Kafka concepts.

### What Triggers Rebalancing?

1. Consumer joins the group
2. Consumer leaves the group (graceful or crash)
3. Consumer fails to send heartbeat within `session.timeout.ms`
4. Topic partition count changes
5. Consumer subscription changes

### Rebalancing Strategies

| Strategy               | Behavior                                  | Use Case               |
| ---------------------- | ----------------------------------------- | ---------------------- |
| **Range**              | Consecutive partitions to each consumer   | Co-partitioned topics  |
| **RoundRobin**         | Evenly distributed                        | General purpose        |
| **Sticky**             | Minimizes partition movement              | Reduce reprocessing    |
| **Cooperative Sticky** | Incremental rebalance (no stop-the-world) | Production recommended |

### Configuring Cooperative Rebalancing

```java theme={null}
props.put("partition.assignment.strategy", 
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
```

### Preventing Unnecessary Rebalances

```java theme={null}
// Increase timeouts to avoid false positives
props.put("session.timeout.ms", "45000");     // Default: 10000
props.put("heartbeat.interval.ms", "15000");  // Default: 3000
props.put("max.poll.interval.ms", "300000");  // Default: 300000
```

<Warning>
  **Interview Tip**: Know the difference between `session.timeout.ms` and `max.poll.interval.ms`:

  * `session.timeout.ms`: Time without heartbeats before consumer is considered dead
  * `max.poll.interval.ms`: Time between poll() calls before consumer is kicked out
</Warning>

***

## Producer Partitioning Strategies

### Default Partitioner

```java theme={null}
// With key: hash(key) % numPartitions
// Without key: Round-robin (with sticky optimization)
producer.send(new ProducerRecord<>("topic", "user123", "data"));
```

### Custom Partitioner

```java theme={null}
// Custom partitioner for geographic routing: events from the same region
// always land in the same partition, enabling region-specific consumers.
// WARNING: Custom partitioners can cause hot partitions if the distribution
// is skewed (e.g., 80% of traffic is from "us"). Monitor partition sizes.
public class GeoPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        String region = extractRegion(key);
        if ("us".equals(region)) return 0;
        if ("eu".equals(region)) return 1;
        return 2;  // All other regions go to partition 2
    }
    // Also implement close() and configure() -- omitted for brevity
}

props.put("partitioner.class", "com.example.GeoPartitioner");
```

***

## Offset Management

### Manual vs Auto Commit

| Mode             | Config                    | Behavior               | Risk               |
| ---------------- | ------------------------- | ---------------------- | ------------------ |
| **Auto**         | `enable.auto.commit=true` | Commits every 5s       | Data loss on crash |
| **Manual Sync**  | `consumer.commitSync()`   | Blocks until committed | Slower             |
| **Manual Async** | `consumer.commitAsync()`  | Non-blocking           | May fail silently  |

### Best Practice: Commit After Processing

```java theme={null}
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);  // Process first
    }
    
    consumer.commitSync();  // Then commit
}
```

### Seek to Specific Offset

```java theme={null}
// Seek to beginning
consumer.seekToBeginning(consumer.assignment());

// Seek to end
consumer.seekToEnd(consumer.assignment());

// Seek to specific offset
consumer.seek(new TopicPartition("topic", 0), 100L);

// Seek to timestamp
Map<TopicPartition, Long> timestamps = new HashMap<>();
timestamps.put(partition, yesterdayTimestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
consumer.seek(partition, offsets.get(partition).offset());
```

***

## Interview Questions & Answers

<AccordionGroup>
  <Accordion title="What is the difference between At-Least-Once and Exactly-Once?" icon="circle-question">
    | Delivery          | Description            | How to Achieve            |
    | ----------------- | ---------------------- | ------------------------- |
    | **At-Most-Once**  | May lose messages      | Commit before processing  |
    | **At-Least-Once** | May have duplicates    | Commit after processing   |
    | **Exactly-Once**  | No loss, no duplicates | Idempotent + Transactions |

    Exactly-once requires:

    * `enable.idempotence=true`
    * Transactional producer
    * `isolation.level=read_committed` for consumers
  </Accordion>

  <Accordion title="How do you handle consumer lag?" icon="circle-question">
    **Diagnosis:**

    ```bash theme={null}
    kafka-consumer-groups.sh --describe --group my-group
    ```

    **Solutions:**

    1. **Add consumers**: More consumers = more parallelism (up to partition count)
    2. **Increase partitions**: Allows more consumers
    3. **Optimize processing**: Batch database writes, use async I/O
    4. **Increase batch size**: `max.poll.records`
    5. **Skip old data**: Reset offset to latest
  </Accordion>

  <Accordion title="What happens if a consumer takes too long to process?" icon="circle-question">
    If time between `poll()` calls exceeds `max.poll.interval.ms`:

    1. Consumer is considered dead
    2. Rebalance is triggered
    3. Partitions are reassigned
    4. Consumer may process same messages again (duplicates)

    **Fix**: Increase `max.poll.interval.ms` or reduce `max.poll.records`
  </Accordion>

  <Accordion title="How do you ensure message ordering?" icon="circle-question">
    **Within a partition**: Guaranteed by Kafka

    **For a specific key**: Use a key when producing

    ```java theme={null}
    // All orders for user123 go to same partition → ordered
    producer.send(new ProducerRecord<>("orders", "user123", orderJson));
    ```

    **Gotcha with retries**: Set `max.in.flight.requests.per.connection=1` or use idempotent producer to prevent reordering on retry.
  </Accordion>

  <Accordion title="What is the difference between commitSync and commitAsync?" icon="circle-question">
    | Method          | Behavior               | Use Case        |
    | --------------- | ---------------------- | --------------- |
    | `commitSync()`  | Blocks until committed | Safety critical |
    | `commitAsync()` | Non-blocking, callback | High throughput |

    **Best practice**: Use `commitAsync()` in loop, `commitSync()` on shutdown:

    ```java theme={null}
    try {
        while (running) {
            poll and process...
            consumer.commitAsync();
        }
    } finally {
        consumer.commitSync();  // Final sync commit
        consumer.close();
    }
    ```
  </Accordion>
</AccordionGroup>

***

## Common Pitfalls

<Warning>
  **1. Auto-Commit with Slow Processing**: If processing takes > 5s, offsets are committed before processing completes → data loss on crash.

  **2. Not Handling Rebalancing**: During rebalance, partitions are revoked. Commit offsets before they're revoked or you'll reprocess.

  **3. Single-Threaded Processing**: If one message is slow, all processing blocks. Consider async processing.

  **4. max.poll.records Too High**: Fetching 10,000 records but processing is slow → rebalance kicks you out.

  **5. Not Monitoring Consumer Lag**: Lag indicates consumers can't keep up. Set up alerts!
</Warning>

***

Next: [Kafka Streams →](/courses/devops-tools/kafka-streams)
