Apache Kafka is a distributed streaming platform that provides five main capabilities:

All of them you can find in the guide to the Kafka Protocol.

Components

Broker

Management tools:

yahoo/CMAK

Important configuration parameters:

Essential configs

Name Value Note
broker.id related to the hostname Should be a unique value for every broker
zookeeper.connect host:2181[,host2:2181…]/chroot/path A comma-separated list of Zookeeper addresses
log.dirs path to directory The main Kafka directory. Could also be a comma-separated list
listeners listener://host:port[PLAINTEXT://:9092] Also valid: CLIENT://0.0.0.0:9092, REPLICATION://localhost:9093
background.threads [10] The number of threads to use for various background processing tasks
num.io.threads [8] The number of threads that the server uses for processing requests (including I/O)
num.network.threads [3] The number of threads that the server uses for receiving requests and sending responses
queued.max.requests [500] The number of queued requests allowed for data-plane, before blocking the network threads

Resiliency

Name Value Note
broker.rack meaningful string Specify it to perform availability zone aware operations
default.replication.factor 2 or 3 [1] The number of copies of the data. 1 means NO copies
min.insync.replicas 1 or 2[1] Depends on the replication.factor. The producer raises an exception if a majority of replicas do not receive a new write
message.max.bytes [1Mb] Depends on the cluster size. Should be higher than fetch.message.max.bytes on the consumer
auto.create.topics.enable false[true] Should be disabled. Otherwise, even a metadata request (such as checking if the topic exists) will create a new topic
max.partition.fetch.bytes [1Mb] The maximum amount of data per-partition the server will return. message.max.bytes (broker config) and max.message.bytes (topic config) define other limits
delete.topic.enable [true] Prevents accidental deletions, if disabled
num.replica.fetchers [1] Increasing this value can increase I/O parallelism in the broker
num.recovery.threads.per.data.dir ~10[1] The number of threads broker will use on startup after recovery (recovery * log dirs)

Rebalancing

Name Value Note
unclean.leader.election.enable [false] Leads to data loss if it is enabled, and the storage is not reliable
auto.leader.rebalance.enable false[true] Keep in disabled to avoid pauses for leader election
replica.lag.time.max.ms [30000] Determines when slow follower will be removed from the ISR
leader.imbalance.check.interval.seconds [300]
leader.imbalance.per.broker.percentage [10] The controller would trigger a leader balance if it goes above this value per broker

These settings are NOT mutually exclusive:

Rotation

Name Value Note
log.segment.bytes 1GB May need to increase OS inode setting when segments are small
log.roll.ms null[168 hours] Closes the segment after a given number of ms, even if it’s too small.

Retention

Name Value Note
log.retention.bytes [-1] Size based retention is unlimited by default
log.retention.(hours/minutes/ms) [168 hours] The smaller time unit takes precedence. Default is 1 week.

Partitioning

num.partitions - should be a multiple of the number of brokers. This value cannot be reduced. In case of a key-based partitioning - increasing the number is also a no go.

producer broker consumer
100 Mb/sec => 10 partitions => 10Mb/sec

The empirical rule is to aim for the number of partitions, so the size of each of them will grow approximately 6 Gb/day.

How to choose the number of topics/partitions in a Kafka cluster?

Compaction

compression.type = uncompressed(snappy|lz4|gzip|zstd) - bigger batches show better compression results

Check official documentation at https://kafka.apache.org/documentation/#compaction

Zookeeper

There should be 3 or 5 Zookeeper nodes running to have consensuses.

Group coordinator

The coordinator assigns partitions to consumers and detects consumer availability via heartbeats. You can access data from the group coordinator through bin/kafka-consumer-groups.sh:

bin/kafka-consumer-groups.sh --describe --group my-group [--members] [--verbose] [--state] --bootstrap-server localhost:9092
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

Producers

acks

  • acks=0 - fire and forget. With this setting retries are disabled, and the current offset is always -1
  • acks=1 (default) - confirmation only from the leader
  • acks=(-1|all) - confirmation from all brokers. Still, it requires min.insync.replicas to be more than default 1
Name Value Note
delivery.timeout.ms [1200000] This value should be higher than request.timeout.ms + linger.ms
retries [2147483647] If enabled (> 0) can reorder batches of messages. It’s better to control delivery with delivery.timeout.ms and leave this one by default
request.timeout.ms [300000] After this time producer will retry the request
retry.backoff.ms [100] The time to wait before retrying publishing
buffer.memory [33mb] The buffer size to store pending messages
max.block.ms [60000] The time for pending messages to wait in the buffer before discarded with an exception
batch.size [16384] Size in bytes. Messages bigger than the batch size producer sends immediately. This value should be lower than max.request.size. Bigger batches show better compression results
linger.ms 30[0] The time allowed to collect a batch of messages for publishing. Increasing this number adds latency, but reduces load
max.in.flight.requests. per.connection 1[5] Makes producer ordered if the value is set to 1

Consumers

Basic considerations:

  • If multiple consumers read the same topic, they should have the same group.id.
  • Max number of consumers in a group is equal to the number of total available partitions in the topic.
  • Consumers can subscribe to topics by regex - a powerful but dangerous feature.
  • The main method is poll(TIMEOUT) - responsible for both consuming and sending heartbeats.

Configuration parameters:

Fetching

Name Value Note
fetch.min.bytes [1] The number of bytes to fetch. Increasing could be useful to reduce the load
fetch.max.wait.ms [500] Better to have it set, if fetch.min.bytes is configured to fetch even small chunks, eventually
fetch.max.bytes [52428800] This is not an absolute maximum. Also, consumer performs multiple fetches in parallel.
partition.assignment.strategy `(RoundRobin Fair[Range])`

Timeouts

Name Value Note
session.timeout.ms [10sec] 3 * heartbeat.interval.ms + some threshold
heartbeat.interval.ms [3sec] The time between consequent heartbeats to the consumer coordinator
max.poll.interval.ms [5min] The maximum delay between invocations of poll()
default.api.timeout.ms [1min] Specifies the timeout (in milliseconds) for client APIs

Committing

Name Value Note
enable.auto.commit false[true] Better to turn it off, to prevent data loss
auto.commit.interval.ms [5s] Ignored if enable.auto.commit = false

Consumer should commit offsets only explicitly for each partition. And, before closing the connection, it should commit offsets synchronously.

  • consumer.commitSync(offsets) - sends only the last successfully read offsets
  • consumer.commitSync() - sends all offsets, including failed ones

Offsets

Name Value Note
auto.offset.reset `([latest] earliest

When there is no known offset on the consumer side, the order of actions goes like this. First, you search for an offset utilizing functions like:

consumer.position()
seekToBeginning(partition)
seekToEnd(partition)

Then you set the offset:

seek(partition, position)

And, only after that, you start the polling loop:

poll()

Registry

Schema Registry is not part of the Kafka distribution, but it seamlessly integrates with Brokers and provides schema validation for messages in Apache Avro format.

Monitoring

Server metrics

Apart from the must-have metrics for JVM applications, such as GC cycles and OS metrics with resource consumption.

  • CPU
  • Memory
  • Disk space
  • Disk IO
  • Network

Primarily, you need to pay attention to the number of the opened file descriptors and compare it to the maximum number of available file descriptors.

Under-replicated Partitions

# JMX MBean 
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions

This is the most important metric to show cluster health. Depending on the symptom, you need to choose the right mitigation strategy.

Symptom: replication factor > UnderReplicatedPartitions > 0

The first thing to check if all replica leaders are preferred ones. If not - you can run kafka-preferred-replica-election to fix the problem, in case auto.leader.rebalance.enable is turned off, as recommended.

A tool that helps to automate rebalancing and some other operational tasks is called Cruise Control, and it has a nice UI


Symptom: UnderReplicatedPartitions > 0 constantly from many brokers

If the total number of Under-replicated Partitions reported by all brokers equals the number of partitions assigned to one of the brokers - it’s highly likely there is an offline broker. Restart it and bring it back to life.


Symptom: The ISR number is flapping. Fluctuating UnderReplicatedPartitions number and no offline brokers.

Usually, this symptom signals a performance issue.

Performance issues

Observation Conclusion
All UnderReplicatedPartitions are located to the same instance This means that other brokers have troubles fetching data over the network from this instance
A single instance reports UnderReplicatedPartitions > 0 This is the opposite case when one instance cannot communicate to other brokers. Run ./kafka-topics ... --describe --under-replicated to check this hypothesis
Slow Disk IO One bad disk can affect the whole cluster because producers wait for an ack from more than one broker. Storage performance is critical to cluster health
CPU starvation Can be caused by other activity on the server or a mismatching configuration on brokers
Resource starvation There should be thresholds and alerts for CPU, Disk and Network utilisation
Uneven load distribution kafka-reassign-partitions tool helps to redistribute the load

The following attributes should have similar values for all brokers in the cluster to achieve optimal performance:

  • Partition count
  • Leaders partition count
  • Topics In/Out Bytes/s
  • Messages/s

Offline Partitions

# JMX MBean 
kafka.controller:type=KafkaController,name=OfflinePartitionsCount

This is the number of partitions without a leader. They don’t accept any new messages until brokers elect a new leader.

Active Controller Count

# JMX MBean 
kafka.controller:type=KafkaController,name=ActiveControllerCount

It should always be 1.

If > 1 - one broker got stuck and will cause problems to admin tasks. This issue you can fix by restarting a broker to reelect the new controller.

If = 0 - cluster get stuck in the last controlled state. Could be caused by broken connection with Zookeeper. After you mitigate the root cause, perform a rolling restart of the cluster.

Topic Metrics

# JMX MBeans
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec

These metrics indicate the traffic growth rate.

Partition Count

# JMX MBeans
kafka.server:type=ReplicaManager,name=PartitionCount
kafka.server:type=ReplicaManager,name=LeaderCount

A metric to track good partition ratio: LeaderCount/PartitionCount = 1/ReplicationFactor

Request metrics

# JMX MBean
kafka.network:type=RequestMetrics,name=<METRIC_TYPE>,request=<REQUEST_NAME>

For example, a spike in produce request 99th percentile is a sign of performance issues.

Producer metrics

# JMX MBeans
kafka.producer:type=producer-metrics,client-id=<CLIENT_ID>
kafka.producer:type=producer-node-metrics,client-id=<CLIENT_ID>,node-id=node-<BROKEID>
kafka.producer:type=producer-topic-metrics,client-id=<CLIENT_ID>,topic=<TOPIC_NAME>

Important attributes:

  • record-error-rate - How often produced records are erroring out. It counts only if the message is actually dropped
  • request-latency-avg - The baseline for latency alerts
  • record-queue-time-avg - The time spent in the send buffer. A good indicator to tweak configs, such as max.partition.bytes and linger.ms
  • produce-throttle-time-avg - Indicates throttling, if Quotas are enabled

Metrics that show how much data we produce:

  • outgoing-byte-rate - The average number of outgoing bytes sent per second
  • record-send-rate - The average number of records sent per second
  • request-rate - The average number of requests sent per second

Consumer metrics

# JMX MBeans
kafka.consumer:type=consumer-metrics,client-id=<CLIENT_ID>
kafka.consumer:type=consumer-node-metrics,client-id=<CLIENT_ID>,node-id=node-<BROKER_ID>
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=<CLIENT_ID> [,topic=<TOPIC_NAME>`

Latency

  • fetch-latency-avg - The north star metric to tweak fetch.min.bytes and fetch.max.wait.ms
  • fetch-throttle-time-avg - Indicates throttling, if Quotas are enabled

Incoming data

  • bytes-consumed-rate - The average number of bytes consumed per second
  • records-consumed-rate - The average number of records consumed per second

Consumer Lag

The difference between the last message produced by a producer, and the last message consumed by a consumer. Use Burrow to aggregate metrics across consumers to one status and track consumer lag.

Consumer group coordinator

# JMX MBean
kafka.consumer:type=consumer-coordinator-metrics,client-id=<CLIENT_ID>
Metric Unit Description
sync-time-avg Time in milliseconds If it is too high - decrease the number of partitions to let them sync quicker
sync-rate The number of group syncs per second Should be 0 for a stable consumer group
commit-latency-average How fast offsets are committed Should be comparable to producers request-latency-avg
assigned-partitions The number of partitions assigned to a consumer Shows the imbalance of load across consumer group

Logging

Recommended log levels:

Component Log Level Description
kafka.controller INFO It contains information about topic creation, broker number changes and partition movement
kafka.server.ClientQuotaManager INFO Logs events on throttling producers and consumers
kafka.request.logger (DEBUG/TRACE) Only for troubleshooting. It shows every request with latency and response.

Thanks to Justin Pihony for his course on Pluralsight.