Skip to main content

Kafka Producers & Consumers

Learn to build robust applications that publish and subscribe to Kafka topics.

Producer API

Producers publish data to the topics of their choice.

Key Responsibilities

  • Partitioning: Deciding which partition to send the message to.
  • Serialization: Converting key/value objects to bytes.
  • Compression: Reducing network bandwidth (Snappy, Gzip, LZ4, Zstd).
  • Batching: Grouping messages for efficiency.

Java Example

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        // Reliability Configs
        props.put("acks", "all"); // Wait for all replicas
        props.put("retries", 3);  // Retry on failure
        props.put("linger.ms", 1); // Wait 1ms to batch messages

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), "Message " + i),
                (metadata, exception) -> {
                    if (exception == null) {
                        System.out.printf("Sent to partition %d @ offset %d%n", 
                            metadata.partition(), metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                });
        }
        producer.close();
    }
}

Python Example (kafka-python)

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

# Asynchronous send
future = producer.send('my-topic', value={'key': 'value'})
result = future.get(timeout=60)

Consumer API

Consumers read data from topics. They subscribe to one or more topics and pull data.

Consumer Groups & Rebalancing

  • Consumer Group: A pool of consumers that share the work.
  • Rebalancing: When a consumer joins/leaves, partitions are reassigned.

Java Example

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("enable.auto.commit", "false"); // Manual commit for safety
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", 
                    record.offset(), record.key(), record.value());
                
                // Process record...
            }
            consumer.commitSync(); // Commit offsets after processing
        }
    }
}

Delivery Semantics

At Most Once

Messages may be lost, but never duplicated. Commit offset before processing.

At Least Once

Messages are never lost, but may be duplicated. Commit offset after processing. (Default/Preferred)

Exactly Once

Each message is delivered exactly once. Requires Transactional API.

Important Configurations

Producer Configs

ConfigDescriptionRecommended
acksHow many replicas must acknowledgeall (for durability)
retriesRetry count on transient errorsInteger.MAX_VALUE
enable.idempotencePrevent duplicatestrue
compression.typeCompression algorithmsnappy or lz4

Consumer Configs

ConfigDescriptionRecommended
group.idUnique ID for the consumer groupRequired
auto.offset.resetWhat to do if no offset existsearliest (start from beginning)
enable.auto.commitAuto-commit offsetsfalse (manual control)
max.poll.recordsMax records per pollTuned to processing speed

Best Practices

Handle WakeupException and close consumers gracefully to trigger a rebalance immediately rather than waiting for a timeout.
Since “At Least Once” is common, ensure your processing logic handles duplicates (e.g., using a database unique constraint).
Consumer Lag is the difference between the latest offset in the partition and the consumer’s current offset. High lag means consumers are too slow.

Exactly-Once Semantics (EOS)

Exactly-once is the holy grail of message delivery. Kafka supports it through Idempotent Producers and Transactions.

Idempotent Producer

Prevents duplicates caused by producer retries.
props.put("enable.idempotence", "true");  // Enables idempotent producer
// Automatically sets:
// acks = all
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection = 5
How it works:
  • Producer assigns a Producer ID (PID) and sequence number to each message
  • Broker deduplicates based on PID + sequence number
  • If retry sends duplicate, broker recognizes and discards it

Transactional Producer

For atomic writes across multiple partitions/topics.
props.put("transactional.id", "my-transactional-producer");

producer.initTransactions();

try {
    producer.beginTransaction();
    
    producer.send(new ProducerRecord<>("topic1", "key", "value1"));
    producer.send(new ProducerRecord<>("topic2", "key", "value2"));
    
    // Commit offsets as part of transaction (consume-transform-produce pattern)
    producer.sendOffsetsToTransaction(offsets, consumerGroupId);
    
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

Transactional Consumer

Read only committed messages:
props.put("isolation.level", "read_committed");
Isolation LevelBehavior
read_uncommittedSee all messages (including aborted)
read_committedOnly see committed transactions

Consumer Group Rebalancing Deep Dive

Rebalancing is one of the most misunderstood Kafka concepts.

What Triggers Rebalancing?

  1. Consumer joins the group
  2. Consumer leaves the group (graceful or crash)
  3. Consumer fails to send heartbeat within session.timeout.ms
  4. Topic partition count changes
  5. Consumer subscription changes

Rebalancing Strategies

StrategyBehaviorUse Case
RangeConsecutive partitions to each consumerCo-partitioned topics
RoundRobinEvenly distributedGeneral purpose
StickyMinimizes partition movementReduce reprocessing
Cooperative StickyIncremental rebalance (no stop-the-world)Production recommended

Configuring Cooperative Rebalancing

props.put("partition.assignment.strategy", 
    "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

Preventing Unnecessary Rebalances

// Increase timeouts to avoid false positives
props.put("session.timeout.ms", "45000");     // Default: 10000
props.put("heartbeat.interval.ms", "15000");  // Default: 3000
props.put("max.poll.interval.ms", "300000");  // Default: 300000
Interview Tip: Know the difference between session.timeout.ms and max.poll.interval.ms:
  • session.timeout.ms: Time without heartbeats before consumer is considered dead
  • max.poll.interval.ms: Time between poll() calls before consumer is kicked out

Producer Partitioning Strategies

Default Partitioner

// With key: hash(key) % numPartitions
// Without key: Round-robin (with sticky optimization)
producer.send(new ProducerRecord<>("topic", "user123", "data"));

Custom Partitioner

public class GeoPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        String region = extractRegion(key);
        if ("us".equals(region)) return 0;
        if ("eu".equals(region)) return 1;
        return 2;
    }
}

props.put("partitioner.class", "com.example.GeoPartitioner");

Offset Management

Manual vs Auto Commit

ModeConfigBehaviorRisk
Autoenable.auto.commit=trueCommits every 5sData loss on crash
Manual Syncconsumer.commitSync()Blocks until committedSlower
Manual Asyncconsumer.commitAsync()Non-blockingMay fail silently

Best Practice: Commit After Processing

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);  // Process first
    }
    
    consumer.commitSync();  // Then commit
}

Seek to Specific Offset

// Seek to beginning
consumer.seekToBeginning(consumer.assignment());

// Seek to end
consumer.seekToEnd(consumer.assignment());

// Seek to specific offset
consumer.seek(new TopicPartition("topic", 0), 100L);

// Seek to timestamp
Map<TopicPartition, Long> timestamps = new HashMap<>();
timestamps.put(partition, yesterdayTimestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
consumer.seek(partition, offsets.get(partition).offset());

Interview Questions & Answers

DeliveryDescriptionHow to Achieve
At-Most-OnceMay lose messagesCommit before processing
At-Least-OnceMay have duplicatesCommit after processing
Exactly-OnceNo loss, no duplicatesIdempotent + Transactions
Exactly-once requires:
  • enable.idempotence=true
  • Transactional producer
  • isolation.level=read_committed for consumers
Diagnosis:
kafka-consumer-groups.sh --describe --group my-group
Solutions:
  1. Add consumers: More consumers = more parallelism (up to partition count)
  2. Increase partitions: Allows more consumers
  3. Optimize processing: Batch database writes, use async I/O
  4. Increase batch size: max.poll.records
  5. Skip old data: Reset offset to latest
If time between poll() calls exceeds max.poll.interval.ms:
  1. Consumer is considered dead
  2. Rebalance is triggered
  3. Partitions are reassigned
  4. Consumer may process same messages again (duplicates)
Fix: Increase max.poll.interval.ms or reduce max.poll.records
Within a partition: Guaranteed by KafkaFor a specific key: Use a key when producing
// All orders for user123 go to same partition → ordered
producer.send(new ProducerRecord<>("orders", "user123", orderJson));
Gotcha with retries: Set max.in.flight.requests.per.connection=1 or use idempotent producer to prevent reordering on retry.
MethodBehaviorUse Case
commitSync()Blocks until committedSafety critical
commitAsync()Non-blocking, callbackHigh throughput
Best practice: Use commitAsync() in loop, commitSync() on shutdown:
try {
    while (running) {
        poll and process...
        consumer.commitAsync();
    }
} finally {
    consumer.commitSync();  // Final sync commit
    consumer.close();
}

Common Pitfalls

1. Auto-Commit with Slow Processing: If processing takes > 5s, offsets are committed before processing completes → data loss on crash.2. Not Handling Rebalancing: During rebalance, partitions are revoked. Commit offsets before they’re revoked or you’ll reprocess.3. Single-Threaded Processing: If one message is slow, all processing blocks. Consider async processing.4. max.poll.records Too High: Fetching 10,000 records but processing is slow → rebalance kicks you out.5. Not Monitoring Consumer Lag: Lag indicates consumers can’t keep up. Set up alerts!

Next: Kafka Streams →