| categories:SRE
Kafka configuration and monitoring
Apache Kafka is a distributed streaming platform that provides five main capabilities:
All of them you can find in the guide to the Kafka Protocol.
Components
Broker
Management tools:
Important configuration parameters:
Essential configs
Name | Value | Note |
---|---|---|
broker.id |
related to the hostname | Should be a unique value for every broker |
zookeeper.connect |
host:2181[,host2:2181…]/chroot/path | A comma-separated list of Zookeeper addresses |
log.dirs |
path to directory | The main Kafka directory. Could also be a comma-separated list |
listeners |
listener://host:port[PLAINTEXT://:9092] | Also valid: CLIENT://0.0.0.0:9092 , REPLICATION://localhost:9093 |
background.threads |
[10] | The number of threads to use for various background processing tasks |
num.io.threads |
[8] | The number of threads that the server uses for processing requests (including I/O) |
num.network.threads |
[3] | The number of threads that the server uses for receiving requests and sending responses |
queued.max.requests |
[500] | The number of queued requests allowed for data-plane, before blocking the network threads |
Resiliency
Name | Value | Note |
---|---|---|
broker.rack |
meaningful string | Specify it to perform availability zone aware operations |
default.replication.factor |
2 or 3 [1] | The number of copies of the data. 1 means NO copies |
min.insync.replicas |
1 or 2[1] | Depends on the replication.factor . The producer raises an exception if a majority of replicas do not receive a new write |
message.max.bytes |
[1Mb] | Depends on the cluster size. Should be higher than fetch.message.max.bytes on the consumer |
auto.create.topics.enable |
false [true ] |
Should be disabled. Otherwise, even a metadata request (such as checking if the topic exists) will create a new topic |
max.partition.fetch.bytes |
[1Mb] | The maximum amount of data per-partition the server will return. message.max.bytes (broker config) and max.message.bytes (topic config) define other limits |
delete.topic.enable |
[true] | Prevents accidental deletions, if disabled |
num.replica.fetchers |
[1] | Increasing this value can increase I/O parallelism in the broker |
num.recovery.threads.per.data.dir |
~10[1] | The number of threads broker will use on startup after recovery (recovery * log dirs) |
Rebalancing
Name | Value | Note |
---|---|---|
unclean.leader.election.enable |
[false] | Leads to data loss if it is enabled, and the storage is not reliable |
auto.leader.rebalance.enable |
false [true ] |
Keep in disabled to avoid pauses for leader election |
replica.lag.time.max.ms |
[30000] | Determines when slow follower will be removed from the ISR |
leader.imbalance.check.interval.seconds |
[300] | |
leader.imbalance.per.broker.percentage |
[10] | The controller would trigger a leader balance if it goes above this value per broker |
These settings are NOT mutually exclusive:
Rotation
Name | Value | Note |
---|---|---|
log.segment.bytes |
1GB | May need to increase OS inode setting when segments are small |
log.roll.ms |
null[168 hours] | Closes the segment after a given number of ms, even if it’s too small. |
Retention
Name | Value | Note |
---|---|---|
log.retention.bytes |
[-1] | Size based retention is unlimited by default |
log.retention.(hours/minutes/ms) |
[168 hours] | The smaller time unit takes precedence. Default is 1 week. |
Partitioning
num.partitions
- should be a multiple of the number of brokers. This value cannot be reduced.
In case of a key-based partitioning - increasing the number is also a no go.
producer | broker | consumer |
---|---|---|
100 Mb/sec => | 10 partitions | => 10Mb/sec |
The empirical rule is to aim for the number of partitions, so the size of each of them will grow approximately 6 Gb/day.
How to choose the number of topics/partitions in a Kafka cluster?
Compaction
compression.type = uncompressed(snappy|lz4|gzip|zstd)
- bigger batches show better compression results
Check official documentation at https://kafka.apache.org/documentation/#compaction
Zookeeper
There should be 3 or 5 Zookeeper nodes running to have consensuses.
Group coordinator
The coordinator assigns partitions to consumers and detects consumer availability via heartbeats.
You can access data from the group coordinator through bin/kafka-consumer-groups.sh
:
bin/kafka-consumer-groups.sh --describe --group my-group [--members] [--verbose] [--state] --bootstrap-server localhost:9092
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Producers
acks
acks=0
- fire and forget. With this settingretries
are disabled, and the current offset is always-1
acks=1
(default) - confirmation only from the leaderacks=(-1|all)
- confirmation from all brokers. Still, it requiresmin.insync.replicas
to be more than default 1
Name | Value | Note |
---|---|---|
delivery.timeout.ms |
[1200000] | This value should be higher than request.timeout.ms + linger.ms |
retries |
[2147483647] | If enabled (> 0 ) can reorder batches of messages. It’s better to control delivery with delivery.timeout.ms and leave this one by default |
request.timeout.ms |
[300000] | After this time producer will retry the request |
retry.backoff.ms |
[100] | The time to wait before retrying publishing |
buffer.memory |
[33mb] | The buffer size to store pending messages |
max.block.ms |
[60000] | The time for pending messages to wait in the buffer before discarded with an exception |
batch.size |
[16384] | Size in bytes. Messages bigger than the batch size producer sends immediately. This value should be lower than max.request.size . Bigger batches show better compression results |
linger.ms |
30[0] | The time allowed to collect a batch of messages for publishing. Increasing this number adds latency, but reduces load |
max.in.flight.requests. per.connection |
1[5] | Makes producer ordered if the value is set to 1 |
Consumers
Basic considerations:
- If multiple consumers read the same topic, they should have the same
group.id
. - Max number of consumers in a group is equal to the number of total available partitions in the topic.
- Consumers can subscribe to topics by regex - a powerful but dangerous feature.
- The main method is
poll(TIMEOUT)
- responsible for both consuming and sending heartbeats.
Configuration parameters:
Fetching
Name | Value | Note |
---|---|---|
fetch.min.bytes |
[1] | The number of bytes to fetch. Increasing could be useful to reduce the load |
fetch.max.wait.ms |
[500] | Better to have it set, if fetch.min.bytes is configured to fetch even small chunks, eventually |
fetch.max.bytes |
[52428800] | This is not an absolute maximum. Also, consumer performs multiple fetches in parallel. |
partition.assignment.strategy |
`(RoundRobin | Fair[Range])` |
Timeouts
Name | Value | Note |
---|---|---|
session.timeout.ms |
[10sec] | 3 * heartbeat.interval.ms + some threshold |
heartbeat.interval.ms |
[3sec] | The time between consequent heartbeats to the consumer coordinator |
max.poll.interval.ms |
[5min] | The maximum delay between invocations of poll() |
default.api.timeout.ms |
[1min] | Specifies the timeout (in milliseconds) for client APIs |
Committing
Name | Value | Note |
---|---|---|
enable.auto.commit |
false[true] | Better to turn it off, to prevent data loss |
auto.commit.interval.ms |
[5s] | Ignored if enable.auto.commit = false |
Consumer should commit offsets only explicitly for each partition. And, before closing the connection, it should commit offsets synchronously.
- consumer.commitSync(offsets) - sends only the last successfully read offsets
- consumer.commitSync() - sends all offsets, including failed ones
Offsets
Name | Value | Note |
---|---|---|
auto.offset.reset |
`([latest] | earliest |
When there is no known offset on the consumer side, the order of actions goes like this. First, you search for an offset utilizing functions like:
consumer.position()
seekToBeginning(partition)
seekToEnd(partition)
Then you set the offset:
seek(partition, position)
And, only after that, you start the polling loop:
poll()
Registry
Schema Registry is not part of the Kafka distribution, but it seamlessly integrates with Brokers and provides schema validation for messages in Apache Avro format.
Monitoring
- Monitoring Kafka in Production
- Monitoring Kafka with Kafka exporter + Prometheus + Grafana
- Server metrics
- Producer metrics
- Consumer metrics
- Logging
Server metrics
Apart from the must-have metrics for JVM applications, such as GC cycles and OS metrics with resource consumption.
- CPU
- Memory
- Disk space
- Disk IO
- Network
Primarily, you need to pay attention to the number of the opened file descriptors and compare it to the maximum number of available file descriptors.
Under-replicated Partitions
# JMX MBean
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
This is the most important metric to show cluster health. Depending on the symptom, you need to choose the right mitigation strategy.
Symptom: replication factor > UnderReplicatedPartitions > 0
The first thing to check if all replica leaders are preferred ones. If not - you can run kafka-preferred-replica-election
to fix the problem, in case auto.leader.rebalance.enable
is turned off, as recommended.
A tool that helps to automate rebalancing and some other operational tasks is called Cruise Control, and it has a nice UI
Symptom: UnderReplicatedPartitions > 0 constantly from many brokers
If the total number of Under-replicated Partitions reported by all brokers equals the number of partitions assigned to one of the brokers - it’s highly likely there is an offline broker. Restart it and bring it back to life.
Symptom: The ISR number is flapping. Fluctuating UnderReplicatedPartitions number and no offline brokers.
Usually, this symptom signals a performance issue.
Performance issues
Observation | Conclusion |
---|---|
All UnderReplicatedPartitions are located to the same instance |
This means that other brokers have troubles fetching data over the network from this instance |
A single instance reports UnderReplicatedPartitions > 0 |
This is the opposite case when one instance cannot communicate to other brokers. Run ./kafka-topics ... --describe --under-replicated to check this hypothesis |
Slow Disk IO | One bad disk can affect the whole cluster because producers wait for an ack from more than one broker. Storage performance is critical to cluster health |
CPU starvation | Can be caused by other activity on the server or a mismatching configuration on brokers |
Resource starvation | There should be thresholds and alerts for CPU, Disk and Network utilisation |
Uneven load distribution | kafka-reassign-partitions tool helps to redistribute the load |
The following attributes should have similar values for all brokers in the cluster to achieve optimal performance:
- Partition count
- Leaders partition count
- Topics In/Out Bytes/s
- Messages/s
Offline Partitions
# JMX MBean
kafka.controller:type=KafkaController,name=OfflinePartitionsCount
This is the number of partitions without a leader. They don’t accept any new messages until brokers elect a new leader.
Active Controller Count
# JMX MBean
kafka.controller:type=KafkaController,name=ActiveControllerCount
It should always be 1.
If > 1
- one broker got stuck and will cause problems to admin tasks. This issue you can fix by restarting a broker to
reelect the new controller.
If = 0
- cluster get stuck in the last controlled state. Could be caused by broken connection with Zookeeper.
After you mitigate the root cause, perform a rolling restart of the cluster.
Topic Metrics
# JMX MBeans
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
These metrics indicate the traffic growth rate.
Partition Count
# JMX MBeans
kafka.server:type=ReplicaManager,name=PartitionCount
kafka.server:type=ReplicaManager,name=LeaderCount
A metric to track good partition ratio: LeaderCount/PartitionCount = 1/ReplicationFactor
Request metrics
# JMX MBean
kafka.network:type=RequestMetrics,name=<METRIC_TYPE>,request=<REQUEST_NAME>
For example, a spike in produce
request 99th percentile is a sign of performance issues.
Producer metrics
# JMX MBeans
kafka.producer:type=producer-metrics,client-id=<CLIENT_ID>
kafka.producer:type=producer-node-metrics,client-id=<CLIENT_ID>,node-id=node-<BROKEID>
kafka.producer:type=producer-topic-metrics,client-id=<CLIENT_ID>,topic=<TOPIC_NAME>
Important attributes:
record-error-rate
- How often produced records are erroring out. It counts only if the message is actually droppedrequest-latency-avg
- The baseline for latency alertsrecord-queue-time-avg
- The time spent in the send buffer. A good indicator to tweak configs, such asmax.partition.bytes
andlinger.ms
produce-throttle-time-avg
- Indicates throttling, if Quotas are enabled
Metrics that show how much data we produce:
outgoing-byte-rate
- The average number of outgoing bytes sent per secondrecord-send-rate
- The average number of records sent per secondrequest-rate
- The average number of requests sent per second
Consumer metrics
# JMX MBeans
kafka.consumer:type=consumer-metrics,client-id=<CLIENT_ID>
kafka.consumer:type=consumer-node-metrics,client-id=<CLIENT_ID>,node-id=node-<BROKER_ID>
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=<CLIENT_ID> [,topic=<TOPIC_NAME>`
Latency
fetch-latency-avg
- The north star metric to tweakfetch.min.bytes
andfetch.max.wait.ms
fetch-throttle-time-avg
- Indicates throttling, if Quotas are enabled
Incoming data
bytes-consumed-rate
- The average number of bytes consumed per secondrecords-consumed-rate
- The average number of records consumed per second
Consumer Lag
The difference between the last message produced by a producer, and the last message consumed by a consumer. Use Burrow to aggregate metrics across consumers to one status and track consumer lag.
Consumer group coordinator
# JMX MBean
kafka.consumer:type=consumer-coordinator-metrics,client-id=<CLIENT_ID>
Metric | Unit | Description |
---|---|---|
sync-time-avg |
Time in milliseconds | If it is too high - decrease the number of partitions to let them sync quicker |
sync-rate |
The number of group syncs per second | Should be 0 for a stable consumer group |
commit-latency-average |
How fast offsets are committed | Should be comparable to producers request-latency-avg |
assigned-partitions |
The number of partitions assigned to a consumer | Shows the imbalance of load across consumer group |
Logging
Recommended log levels:
Component | Log Level | Description |
---|---|---|
kafka.controller |
INFO | It contains information about topic creation, broker number changes and partition movement |
kafka.server.ClientQuotaManager |
INFO | Logs events on throttling producers and consumers |
kafka.request.logger |
(DEBUG/TRACE) | Only for troubleshooting. It shows every request with latency and response. |
Thanks to Justin Pihony for his course on Pluralsight.