On producer side, messages are load balanced across partition based on hashed key. On the consumer side, to ensure that each message is processed by only one consumer within the group, allowing for load balancing, fault tolerance, and efficient utilization of resources in a Kafka consumer application, there is the concept of consumer group.

Creation of Consumer Group

There is no specific command to create a consumer group. You simply need to add --group <groupname> to the consumer command. For example:

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic events3p3r --from-beginning --property print.key=true --group consumergroup1

Kafka ensures that, even if you run this command with same group several times, each message will be consumed only once.

By the way, the topic definition is as follow (same as previous blog, 3 partitions and 3 replicas):

Topic: events3p3r       TopicId: jppdD6GfS1WjOSsNCJCtcQ PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: events3p3r       Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        Topic: events3p3r       Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: events3p3r       Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

Consumer Group Details

With help of kafka-consumer-groups.sh, it is possible to get insights of what is happening. To run it, I trigger following command:

$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group consumergroup1

And output looks like that:

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
consumergroup1  events3p3r      0          0               0               0               console-consumer-bd4cfd50-1b79-490c-88b9-386421cad7e4 /10.0.0.157     console-consumer
consumergroup1  events3p3r      1          0               0               0               console-consumer-bd4cfd50-1b79-490c-88b9-386421cad7e4 /10.0.0.157     console-consumer
consumergroup1  events3p3r      2          0               0               0               console-consumer-bd4cfd50-1b79-490c-88b9-386421cad7e4 /10.0.0.157     console-consumer

We can see the 3 partitions, currently without any data, being all read by same host (kafka-03 host IP).

Let’s produce some messages and see what happens. I create a simple text file with this content:

1:message1
2:message2
3:message3
4:message4
5:message5
6:message6

And send them into the topic:

$ cat messages | ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092  --property "key.separator=:" --property "parse.key=true" --topic events3p3r

On the consumer side, we can see the messages:

[root@kafka-03 kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic events3p3r --property print.key=true --group consumergroup1
4       message4
6       message6
2       message2
3       message3
1       message1
5       message5

Keep in mind that message order is kept only per partition and consumer currently polls from the 3 existing partitions, nothing prevent the miss order.

Add more Consumers

Let’s run 2 more consumers for a total of 3: One on first node (kafka-01) and another on second node (kafka-02) and check how consumer group changed:

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
consumergroup1  events3p3r      0          2               2               0               console-consumer-571e4252-2aca-4a5d-918d-159a648dd10f /10.0.0.133     console-consumer
consumergroup1  events3p3r      1          2               2               0               console-consumer-8201f7e5-80db-4d64-8886-75469cd8fcb9 /10.0.0.194     console-consumer
consumergroup1  events3p3r      2          2               2               0               console-consumer-bd4cfd50-1b79-490c-88b9-386421cad7e4 /10.0.0.157     console-consumer

We can observe that each consumer has an assigned partition.

And consumes the corresponding data:

It is interesting to note that even if consumer is started with option --from-beginning, it will not get all messages as old messages were already consumed by another consumer of the group.

Adding more consumer to the group than the amount of partitions in the topic will not increase consume performances. A consumer can read from multiple partitions, but a partition cannot be read from multiple consumers in the same consumer group. Partition can be read by multiple consumers if they are not part of the same consumer group.

Remove Consumers

What happens to the consumer group when a consumer stops? Kafka will do a rebalance of partitions between remaining consumers.

Before a rebalance, with 3 consumers, we have:

$ ./bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --describe --group consumergroup1 --members --verbose

GROUP           CONSUMER-ID                                           HOST            CLIENT-ID        #PARTITIONS     ASSIGNMENT
consumergroup1  console-consumer-afca6731-9cf9-431e-b7bd-da43c84fcf31 /10.0.0.194     console-consumer 1               events3p3r(1)
consumergroup1  console-consumer-9e26fae1-e04d-4541-8fa8-139d97856a70 /10.0.0.157     console-consumer 1               events3p3r(0)
consumergroup1  console-consumer-b8257a20-72f2-4d69-b755-47d2cda2fc0c /10.0.0.133     console-consumer 1               events3p3r(2)

After stopping the consumer on the first node:

GROUP           CONSUMER-ID                                           HOST            CLIENT-ID        #PARTITIONS     ASSIGNMENT
consumergroup1  console-consumer-9e26fae1-e04d-4541-8fa8-139d97856a70 /10.0.0.157     console-consumer 2               events3p3r(0,1)
consumergroup1  console-consumer-b8257a20-72f2-4d69-b755-47d2cda2fc0c /10.0.0.133     console-consumer 1               events3p3r(2)

As you can see, one consumer is polling 2 partitions from events3p3r topic (partition 0 and 1).

We can also see traces of this rebalance in server.log:

[2023-08-29 09:34:41,637] INFO [GroupCoordinator 1]: Preparing to rebalance group consumergroup1 in state PreparingRebalance with old generation 27 (__consumer_offsets-24) (reason: Removing member console-consumer-a161b008-067f-42b7-a613-4f4449ee04f4 on LeaveGroup; client reason: the consumer is being closed) (kafka.coordinator.group.GroupCoordinator)
[2023-08-29 09:34:41,637] INFO [GroupCoordinator 1]: Member MemberMetadata(memberId=console-consumer-a161b008-067f-42b7-a613-4f4449ee04f4, groupInstanceId=None, clientId=console-consumer, clientHost=/10.0.0.194, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=List(range, cooperative-sticky)) has left group consumergroup1 through explicit `LeaveGroup`; client reason: the consumer is being closed (kafka.coordinator.group.GroupCoordinator)
[2023-08-29 09:34:44,522] INFO [GroupCoordinator 1]: Stabilized group consumergroup1 generation 28 (__consumer_offsets-24) with 2 members (kafka.coordinator.group.GroupCoordinator)
[2023-08-29 09:34:44,524] INFO [GroupCoordinator 1]: Assignment received from leader console-consumer-9e26fae1-e04d-4541-8fa8-139d97856a70 for group consumergroup1 for generation 28. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

Group Coordinator and Group Leader Roles

A broker endorses the group coordinator role. It oversees consumer liveness through heartbeat mechanism. If a consumer leaves group or timeout, group coordinator will trigger a rebalance. A rebalance will also occur when a new consumer subscribes to a topic with the same consumer group.

The first consumer connecting to the consumer group assigns the group leader role. The leader receives the list of alive consumers from the group coordinator and then assigns partition(s) to each consumer. The group leader (ie. a consumer) assigns partitions to consumers because it is the consumer that knows best any business constraints on message (ordering, duplicates, losses).

There are several types of assignors available, but I will not cover them here. Maybe in another blog 😉