On producer side, messages are load balanced across partition based on hashed key. On the consumer side, to ensure that each message is processed by only one consumer within the group, allowing for load balancing, fault tolerance, and efficient utilization of resources in a Kafka consumer application, there is the concept of consumer group.
Creation of Consumer Group
There is no specific command to create a consumer group. You simply need to add --group <groupname>
to the consumer command. For example:
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic events3p3r --from-beginning --property print.key=true --group consumergroup1
Kafka ensures that, even if you run this command with same group several times, each message will be consumed only once.
By the way, the topic definition is as follow (same as previous blog, 3 partitions and 3 replicas):
Topic: events3p3r TopicId: jppdD6GfS1WjOSsNCJCtcQ PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: events3p3r Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: events3p3r Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: events3p3r Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Consumer Group Details
With help of kafka-consumer-groups.sh
, it is possible to get insights of what is happening. To run it, I trigger following command:
$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group consumergroup1
And output looks like that:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
consumergroup1 events3p3r 0 0 0 0 console-consumer-bd4cfd50-1b79-490c-88b9-386421cad7e4 /10.0.0.157 console-consumer
consumergroup1 events3p3r 1 0 0 0 console-consumer-bd4cfd50-1b79-490c-88b9-386421cad7e4 /10.0.0.157 console-consumer
consumergroup1 events3p3r 2 0 0 0 console-consumer-bd4cfd50-1b79-490c-88b9-386421cad7e4 /10.0.0.157 console-consumer
We can see the 3 partitions, currently without any data, being all read by same host (kafka-03 host IP).
Let’s produce some messages and see what happens. I create a simple text file with this content:
1:message1
2:message2
3:message3
4:message4
5:message5
6:message6
And send them into the topic:
$ cat messages | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --property "key.separator=:" --property "parse.key=true" --topic events3p3r
On the consumer side, we can see the messages:
[root@kafka-03 kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic events3p3r --property print.key=true --group consumergroup1
4 message4
6 message6
2 message2
3 message3
1 message1
5 message5
Keep in mind that message order is kept only per partition and consumer currently polls from the 3 existing partitions, nothing prevent the miss order.
Add more Consumers
Let’s run 2 more consumers for a total of 3: One on first node (kafka-01) and another on second node (kafka-02) and check how consumer group changed:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
consumergroup1 events3p3r 0 2 2 0 console-consumer-571e4252-2aca-4a5d-918d-159a648dd10f /10.0.0.133 console-consumer
consumergroup1 events3p3r 1 2 2 0 console-consumer-8201f7e5-80db-4d64-8886-75469cd8fcb9 /10.0.0.194 console-consumer
consumergroup1 events3p3r 2 2 2 0 console-consumer-bd4cfd50-1b79-490c-88b9-386421cad7e4 /10.0.0.157 console-consumer
We can observe that each consumer has an assigned partition.
And consumes the corresponding data:
It is interesting to note that even if consumer is started with option --from-beginning
, it will not get all messages as old messages were already consumed by another consumer of the group.
Adding more consumer to the group than the amount of partitions in the topic will not increase consume performances. A consumer can read from multiple partitions, but a partition cannot be read from multiple consumers in the same consumer group. Partition can be read by multiple consumers if they are not part of the same consumer group.
Remove Consumers
What happens to the consumer group when a consumer stops? Kafka will do a rebalance of partitions between remaining consumers.
Before a rebalance, with 3 consumers, we have:
$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group consumergroup1 --members --verbose
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
consumergroup1 console-consumer-afca6731-9cf9-431e-b7bd-da43c84fcf31 /10.0.0.194 console-consumer 1 events3p3r(1)
consumergroup1 console-consumer-9e26fae1-e04d-4541-8fa8-139d97856a70 /10.0.0.157 console-consumer 1 events3p3r(0)
consumergroup1 console-consumer-b8257a20-72f2-4d69-b755-47d2cda2fc0c /10.0.0.133 console-consumer 1 events3p3r(2)
After stopping the consumer on the first node:
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
consumergroup1 console-consumer-9e26fae1-e04d-4541-8fa8-139d97856a70 /10.0.0.157 console-consumer 2 events3p3r(0,1)
consumergroup1 console-consumer-b8257a20-72f2-4d69-b755-47d2cda2fc0c /10.0.0.133 console-consumer 1 events3p3r(2)
As you can see, one consumer is polling 2 partitions from events3p3r topic (partition 0 and 1).
We can also see traces of this rebalance in server.log:
[2023-08-29 09:34:41,637] INFO [GroupCoordinator 1]: Preparing to rebalance group consumergroup1 in state PreparingRebalance with old generation 27 (__consumer_offsets-24) (reason: Removing member console-consumer-a161b008-067f-42b7-a613-4f4449ee04f4 on LeaveGroup; client reason: the consumer is being closed) (kafka.coordinator.group.GroupCoordinator)
[2023-08-29 09:34:41,637] INFO [GroupCoordinator 1]: Member MemberMetadata(memberId=console-consumer-a161b008-067f-42b7-a613-4f4449ee04f4, groupInstanceId=None, clientId=console-consumer, clientHost=/10.0.0.194, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=List(range, cooperative-sticky)) has left group consumergroup1 through explicit `LeaveGroup`; client reason: the consumer is being closed (kafka.coordinator.group.GroupCoordinator)
[2023-08-29 09:34:44,522] INFO [GroupCoordinator 1]: Stabilized group consumergroup1 generation 28 (__consumer_offsets-24) with 2 members (kafka.coordinator.group.GroupCoordinator)
[2023-08-29 09:34:44,524] INFO [GroupCoordinator 1]: Assignment received from leader console-consumer-9e26fae1-e04d-4541-8fa8-139d97856a70 for group consumergroup1 for generation 28. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
Group Coordinator and Group Leader Roles
A broker endorses the group coordinator role. It oversees consumer liveness through heartbeat mechanism. If a consumer leaves group or timeout, group coordinator will trigger a rebalance. A rebalance will also occur when a new consumer subscribes to a topic with the same consumer group.
The first consumer connecting to the consumer group assigns the group leader role. The leader receives the list of alive consumers from the group coordinator and then assigns partition(s) to each consumer. The group leader (ie. a consumer) assigns partitions to consumers because it is the consumer that knows best any business constraints on message (ordering, duplicates, losses).
There are several types of assignors available, but I will not cover them here. Maybe in another blog đ