Skip to content

Commit 7e45383

Browse files
pranavrthPratRanj07
authored andcommitted
[KIP-848] Added Regex support for the new consumer group protocol. (#4968)
[KIP-848] Added Regex support for the new consumer group protocol.
1 parent d83efd5 commit 7e45383

21 files changed

+644
-227
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,4 @@ cov-int
3131
gdbrun*.gdb
3232
TAGS
3333
vcpkg_installed
34+
*tmp-KafkaCluster*

examples/consumer.c

+17-7
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,9 @@ int main(int argc, char **argv) {
7777
char errstr[512]; /* librdkafka API error reporting buffer */
7878
const char *brokers; /* Argument: broker list */
7979
const char *groupid; /* Argument: Consumer group id */
80-
char **topics; /* Argument: list of topics to subscribe to */
81-
int topic_cnt; /* Number of topics to subscribe to */
80+
const char *group_protocol;
81+
char **topics; /* Argument: list of topics to subscribe to */
82+
int topic_cnt; /* Number of topics to subscribe to */
8283
rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */
8384
int i;
8485

@@ -88,15 +89,17 @@ int main(int argc, char **argv) {
8889
if (argc < 4) {
8990
fprintf(stderr,
9091
"%% Usage: "
91-
"%s <broker> <group.id> <topic1> <topic2>..\n",
92+
"%s <broker> <group.id> <group.protocol> <topic1> "
93+
"<topic2>..\n",
9294
argv[0]);
9395
return 1;
9496
}
9597

96-
brokers = argv[1];
97-
groupid = argv[2];
98-
topics = &argv[3];
99-
topic_cnt = argc - 3;
98+
brokers = argv[1];
99+
groupid = argv[2];
100+
group_protocol = argv[3];
101+
topics = &argv[4];
102+
topic_cnt = argc - 4;
100103

101104

102105
/*
@@ -127,6 +130,13 @@ int main(int argc, char **argv) {
127130
return 1;
128131
}
129132

133+
if (rd_kafka_conf_set(conf, "group.protocol", group_protocol, errstr,
134+
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
135+
fprintf(stderr, "%s\n", errstr);
136+
rd_kafka_conf_destroy(conf);
137+
return 1;
138+
}
139+
130140
/* If there is no previously committed offset for a partition
131141
* the auto.offset.reset strategy will be used to decide where
132142
* in the partition to start fetching messages.

src/rdkafka_cgrp.c

+91-55
Original file line numberDiff line numberDiff line change
@@ -263,11 +263,28 @@ typedef RD_MAP_TYPE(const rd_kafka_topic_partition_t *,
263263

264264
/**
265265
* @returns true if consumer has joined the group and thus requires a leave.
266+
*
267+
* `rkcg_member_id` is sufficient to know this with "classic" group protocol.
266268
*/
267-
#define RD_KAFKA_CGRP_HAS_JOINED(rkcg) \
268-
(rkcg->rkcg_member_id != NULL && \
269+
#define RD_KAFKA_CGRP_HAS_JOINED_CLASSIC(rkcg) \
270+
(rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CLASSIC && \
271+
rkcg->rkcg_member_id != NULL && \
269272
RD_KAFKAP_STR_LEN((rkcg)->rkcg_member_id) > 0)
270273

274+
/**
275+
* @returns true if consumer has joined the group and thus requires a leave.
276+
*
277+
* With "consumer" group protocol we cannot rely on the `rkcg_member_id`
278+
* as it's client generated.
279+
*/
280+
#define RD_KAFKA_CGRP_HAS_JOINED_CONSUMER(rkcg) \
281+
(rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER && \
282+
rkcg->rkcg_generation_id > 0)
283+
284+
#define RD_KAFKA_CGRP_HAS_JOINED(rkcg) \
285+
(RD_KAFKA_CGRP_HAS_JOINED_CLASSIC(rkcg) || \
286+
RD_KAFKA_CGRP_HAS_JOINED_CONSUMER(rkcg))
287+
271288

272289
/**
273290
* @returns true if cgrp is waiting for a rebalance_cb to be handled by
@@ -384,6 +401,8 @@ void rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t *rkcg, int join_state) {
384401
void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg) {
385402
rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_subscription);
386403
rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_group_leader.members);
404+
rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_subscription_topics);
405+
rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_subscription_regex);
387406
rd_kafka_cgrp_set_member_id(rkcg, NULL);
388407
rd_kafka_topic_partition_list_destroy(rkcg->rkcg_current_assignment);
389408
RD_IF_FREE(rkcg->rkcg_target_assignment,
@@ -466,7 +485,15 @@ rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk,
466485
rkcg->rkcg_next_subscription = NULL;
467486
TAILQ_INIT(&rkcg->rkcg_topics);
468487
rd_list_init(&rkcg->rkcg_toppars, 32, NULL);
469-
rd_kafka_cgrp_set_member_id(rkcg, "");
488+
489+
if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
490+
rd_kafka_Uuid_t uuid = rd_kafka_Uuid_random();
491+
rd_kafka_cgrp_set_member_id(rkcg,
492+
rd_kafka_Uuid_base64str(&uuid));
493+
} else {
494+
rd_kafka_cgrp_set_member_id(rkcg, "");
495+
}
496+
470497
rkcg->rkcg_subscribed_topics =
471498
rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
472499
rd_interval_init(&rkcg->rkcg_coord_query_intvl);
@@ -1010,7 +1037,9 @@ static void rd_kafka_cgrp_consumer_leave(rd_kafka_cgrp_t *rkcg) {
10101037
rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_member_id,
10111038
member_epoch, rkcg->rkcg_group_instance_id,
10121039
NULL /* no rack */, -1 /* no rebalance_timeout_ms */,
1013-
NULL /* no subscription */, NULL /* no remote assignor */,
1040+
NULL /* no subscription topics */,
1041+
NULL /* no regex subscription */,
1042+
NULL /* no remote assignor */,
10141043
NULL /* no current assignment */,
10151044
RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
10161045
rd_kafka_cgrp_handle_ConsumerGroupHeartbeat_leave, rkcg);
@@ -1339,7 +1368,6 @@ static void rd_kafka_rebalance_op(rd_kafka_cgrp_t *rkcg,
13391368
rd_kafka_cgrp_group_assignment_set(rkcg, NULL);
13401369
}
13411370

1342-
13431371
/**
13441372
* @brief Rejoin the group.
13451373
*
@@ -2942,14 +2970,15 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
29422970
}
29432971

29442972
rd_kafka_buf_read_str(rkbuf, &member_id);
2973+
if (!RD_KAFKAP_STR_IS_NULL(&member_id)) {
2974+
rd_kafka_cgrp_set_member_id(rkcg, member_id.str);
2975+
}
2976+
29452977
rd_kafka_buf_read_i32(rkbuf, &member_epoch);
29462978
rd_kafka_buf_read_i32(rkbuf, &heartbeat_interval_ms);
29472979

29482980
int8_t are_assignments_present;
29492981
rd_kafka_buf_read_i8(rkbuf, &are_assignments_present);
2950-
if (!RD_KAFKAP_STR_IS_NULL(&member_id)) {
2951-
rd_kafka_cgrp_set_member_id(rkcg, member_id.str);
2952-
}
29532982
rkcg->rkcg_generation_id = member_epoch;
29542983
if (heartbeat_interval_ms > 0) {
29552984
rkcg->rkcg_heartbeat_intvl_ms = heartbeat_interval_ms;
@@ -5171,12 +5200,46 @@ rd_kafka_cgrp_calculate_subscribe_revoking_partitions(
51715200
static int32_t
51725201
rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg,
51735202
rd_kafka_topic_partition_list_t *rktparlist) {
5174-
int32_t new_subscription_version =
5175-
rd_atomic32_add(&rkcg->rkcg_subscription_version, 1);
5203+
5204+
rkcg->rkcg_flags &= ~(RD_KAFKA_CGRP_F_SUBSCRIPTION |
5205+
RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION);
5206+
5207+
RD_IF_FREE(rkcg->rkcg_subscription,
5208+
rd_kafka_topic_partition_list_destroy);
5209+
RD_IF_FREE(rkcg->rkcg_subscription_topics,
5210+
rd_kafka_topic_partition_list_destroy);
5211+
RD_IF_FREE(rkcg->rkcg_subscription_regex, rd_kafkap_str_destroy);
5212+
51765213
rkcg->rkcg_subscription = rktparlist;
5177-
return new_subscription_version;
5214+
5215+
if (rkcg->rkcg_subscription) {
5216+
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_SUBSCRIPTION;
5217+
if (rd_kafka_topic_partition_list_regex_cnt(
5218+
rkcg->rkcg_subscription) > 0)
5219+
rkcg->rkcg_flags |=
5220+
RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
5221+
5222+
if (rkcg->rkcg_group_protocol ==
5223+
RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
5224+
rkcg->rkcg_subscription_regex =
5225+
rd_kafka_topic_partition_list_combine_regexes(
5226+
rkcg->rkcg_subscription);
5227+
rkcg->rkcg_subscription_topics =
5228+
rd_kafka_topic_partition_list_remove_regexes(
5229+
rkcg->rkcg_subscription);
5230+
rkcg->rkcg_consumer_flags |=
5231+
RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE |
5232+
RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION;
5233+
}
5234+
} else {
5235+
rkcg->rkcg_subscription_regex = NULL;
5236+
rkcg->rkcg_subscription_topics = NULL;
5237+
}
5238+
5239+
return rd_atomic32_add(&rkcg->rkcg_subscription_version, 1);
51785240
}
51795241

5242+
51805243
/**
51815244
* @brief Handle a new subscription that is modifying an existing subscription
51825245
* in the COOPERATIVE case.
@@ -5194,11 +5257,6 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg,
51945257
int old_cnt = rkcg->rkcg_subscription->cnt;
51955258
int32_t cgrp_subscription_version;
51965259

5197-
rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
5198-
5199-
if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
5200-
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
5201-
52025260
/* Topics in rkcg_subscribed_topics that don't match any pattern in
52035261
the new subscription. */
52045262
unsubscribing_topics =
@@ -5208,7 +5266,6 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg,
52085266
revoking = rd_kafka_cgrp_calculate_subscribe_revoking_partitions(
52095267
rkcg, unsubscribing_topics);
52105268

5211-
rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
52125269
cgrp_subscription_version =
52135270
rd_kafka_cgrp_subscription_set(rkcg, rktparlist);
52145271

@@ -5318,10 +5375,7 @@ static rd_kafka_resp_err_t rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t *rkcg,
53185375
rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers,
53195376
&rkcg->rkcg_max_poll_interval_tmr, 1 /*lock*/);
53205377

5321-
if (rkcg->rkcg_subscription) {
5322-
rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
5323-
rd_kafka_cgrp_subscription_set(rkcg, NULL);
5324-
}
5378+
rd_kafka_cgrp_subscription_set(rkcg, NULL);
53255379

53265380
if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CLASSIC)
53275381
rd_kafka_cgrp_update_subscribed_topics(rkcg, NULL);
@@ -5340,9 +5394,6 @@ static rd_kafka_resp_err_t rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t *rkcg,
53405394
rd_true /*initiating*/,
53415395
"unsubscribe");
53425396

5343-
rkcg->rkcg_flags &= ~(RD_KAFKA_CGRP_F_SUBSCRIPTION |
5344-
RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION);
5345-
53465397
return RD_KAFKA_RESP_ERR_NO_ERROR;
53475398
}
53485399

@@ -5413,11 +5464,6 @@ rd_kafka_cgrp_subscribe(rd_kafka_cgrp_t *rkcg,
54135464
if (!rktparlist)
54145465
return RD_KAFKA_RESP_ERR_NO_ERROR;
54155466

5416-
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_SUBSCRIPTION;
5417-
5418-
if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
5419-
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
5420-
54215467
subscription_version = rd_kafka_cgrp_subscription_set(rkcg, rktparlist);
54225468

54235469
rd_kafka_cgrp_join(rkcg, subscription_version);
@@ -5804,16 +5850,18 @@ static void rd_kafka_cgrp_join_state_serve(rd_kafka_cgrp_t *rkcg) {
58045850
}
58055851
}
58065852

5853+
58075854
void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg,
58085855
rd_bool_t full_request,
58095856
rd_bool_t send_ack) {
58105857

5811-
rd_kafkap_str_t *rkcg_group_instance_id = NULL;
5812-
rd_kafkap_str_t *rkcg_client_rack = NULL;
5813-
int max_poll_interval_ms = -1;
5814-
rd_kafka_topic_partition_list_t *rkcg_subscription = NULL;
5815-
rd_kafkap_str_t *rkcg_group_remote_assignor = NULL;
5816-
rd_kafka_topic_partition_list_t *rkcg_group_assignment = NULL;
5858+
rd_kafkap_str_t *rkcg_group_instance_id = NULL;
5859+
rd_kafkap_str_t *rkcg_client_rack = NULL;
5860+
int max_poll_interval_ms = -1;
5861+
rd_kafka_topic_partition_list_t *rkcg_subscription_topics = NULL;
5862+
rd_kafkap_str_t *rkcg_subscription_regex = NULL;
5863+
rd_kafkap_str_t *rkcg_group_remote_assignor = NULL;
5864+
rd_kafka_topic_partition_list_t *rkcg_group_assignment = NULL;
58175865
int32_t member_epoch = rkcg->rkcg_generation_id;
58185866
if (member_epoch < 0)
58195867
member_epoch = 0;
@@ -5827,7 +5875,8 @@ void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg,
58275875
rkcg_client_rack = rkcg->rkcg_client_rack;
58285876
max_poll_interval_ms =
58295877
rkcg->rkcg_rk->rk_conf.max_poll_interval_ms;
5830-
rkcg_subscription = rkcg->rkcg_subscription;
5878+
rkcg_subscription_topics = rkcg->rkcg_subscription_topics;
5879+
rkcg_subscription_regex = rkcg->rkcg_subscription_regex;
58315880
rkcg_group_remote_assignor = rkcg->rkcg_group_remote_assignor;
58325881
}
58335882

@@ -5863,14 +5912,15 @@ void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg,
58635912
(rkcg->rkcg_consumer_flags &
58645913
~RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION) |
58655914
RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION;
5866-
rkcg_subscription = rkcg->rkcg_subscription;
5915+
rkcg_subscription_topics = rkcg->rkcg_subscription_topics;
5916+
rkcg_subscription_regex = rkcg->rkcg_subscription_regex;
58675917

58685918
if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) {
58695919
char rkcg_new_subscription_str[512] = "NULL";
58705920

5871-
if (rkcg_subscription) {
5921+
if (rkcg->rkcg_subscription) {
58725922
rd_kafka_topic_partition_list_str(
5873-
rkcg_subscription,
5923+
rkcg->rkcg_subscription,
58745924
rkcg_new_subscription_str,
58755925
sizeof(rkcg_new_subscription_str), 0);
58765926
}
@@ -5885,7 +5935,8 @@ void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg,
58855935
rd_kafka_ConsumerGroupHeartbeatRequest(
58865936
rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_member_id,
58875937
member_epoch, rkcg_group_instance_id, rkcg_client_rack,
5888-
max_poll_interval_ms, rkcg_subscription, rkcg_group_remote_assignor,
5938+
max_poll_interval_ms, rkcg_subscription_topics,
5939+
rkcg_subscription_regex, rkcg_group_remote_assignor,
58895940
rkcg_group_assignment, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
58905941
rd_kafka_cgrp_handle_ConsumerGroupHeartbeat, NULL);
58915942
}
@@ -6014,22 +6065,7 @@ rd_kafka_cgrp_consumer_subscribe(rd_kafka_cgrp_t *rkcg,
60146065
return RD_KAFKA_RESP_ERR__FATAL;
60156066
}
60166067

6017-
rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
60186068
if (rktparlist) {
6019-
if (rkcg->rkcg_subscription)
6020-
rd_kafka_topic_partition_list_destroy(
6021-
rkcg->rkcg_subscription);
6022-
6023-
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_SUBSCRIPTION;
6024-
6025-
if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
6026-
rkcg->rkcg_flags |=
6027-
RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
6028-
6029-
rkcg->rkcg_consumer_flags |=
6030-
RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE |
6031-
RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION;
6032-
60336069
rd_kafka_cgrp_subscription_set(rkcg, rktparlist);
60346070
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
60356071
rkcg, "subscription changed");

src/rdkafka_cgrp.h

+26-2
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,33 @@ typedef struct rd_kafka_cgrp_s {
230230
rd_kafka_topic_partition_list_t *rkcg_errored_topics;
231231
/** If a SUBSCRIBE op is received during a COOPERATIVE rebalance,
232232
* actioning this will be postponed until after the rebalance
233-
* completes. The waiting subscription is stored here.
234-
* Mutually exclusive with rkcg_next_subscription. */
233+
* completes. The waiting subscription is stored here. */
235234
rd_kafka_topic_partition_list_t *rkcg_next_subscription;
235+
236+
/**
237+
* Subscription regex pattern. All the provided regex patterns are
238+
* stored as a single string with each pattern separated by '|'.
239+
*
240+
* Only applicable for the consumer protocol introduced in KIP-848.
241+
*
242+
* rkcg_subscription = rkcg_subscription_topics +
243+
* rkcg_subscription_regex
244+
*/
245+
rd_kafkap_str_t *rkcg_subscription_regex;
246+
247+
/**
248+
* Full topic names extracted out from the rkcg_subscription.
249+
*
250+
* Only applicable for the consumer protocol introduced in KIP-848.
251+
*
252+
* For the consumer protocol, this field doesn't include regex
253+
* subscriptions. For that please refer `rkcg_subscription_regex`
254+
*
255+
* rkcg_subscription = rkcg_subscription_topics +
256+
* rkcg_subscription_regex
257+
*/
258+
rd_kafka_topic_partition_list_t *rkcg_subscription_topics;
259+
236260
/** If a (un)SUBSCRIBE op is received during a COOPERATIVE rebalance,
237261
* actioning this will be posponed until after the rebalance
238262
* completes. This flag is used to signal a waiting unsubscribe

src/rdkafka_metadata.c

+2-1
Original file line numberDiff line numberDiff line change
@@ -1580,7 +1580,8 @@ rd_kafka_metadata_refresh_consumer_topics(rd_kafka_t *rk,
15801580
rkcg = rk->rk_cgrp;
15811581
rd_assert(rkcg != NULL);
15821582

1583-
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) {
1583+
if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CLASSIC &&
1584+
rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) {
15841585
/* If there is a wildcard subscription we need to request
15851586
* all topics in the cluster so that we can perform
15861587
* regexp matching. */

0 commit comments

Comments
 (0)