Skip to content

[KIP-848] Fix static group membership #4994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -3036,7 +3036,8 @@ static int ut_cgrp_consumer_member_next_assignment0(
fixtures[i].comment);

if (fixtures[i].session_timed_out) {
rd_kafka_mock_cgrp_consumer_member_leave(mcgrp, member);
rd_kafka_mock_cgrp_consumer_member_leave(mcgrp, member,
rd_false);
member = rd_kafka_mock_cgrp_consumer_member_add(
mcgrp, conn, &MemberId, &InstanceId,
&SubscribedTopic, 1);
Expand Down
50 changes: 34 additions & 16 deletions src/rdkafka_mock_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1462,18 +1462,26 @@ rd_kafka_mock_cgrp_consumer_member_add(rd_kafka_mock_cgrp_consumer_t *mcgrp,
/* Find member */
member = rd_kafka_mock_cgrp_consumer_member_find(mcgrp, MemberId);
if (!member) {
if (RD_KAFKAP_STR_LEN(MemberId) == 0)
/* KIP 1082: MemberId is generated by the client */
return NULL;

member = rd_kafka_mock_cgrp_consumer_member_find_by_instance_id(
mcgrp, InstanceId);
if (member && RD_KAFKAP_STR_LEN(MemberId) > 0 &&

if (member &&
rd_kafkap_str_cmp_str(MemberId, member->id) != 0) {
/* Either member is a new instance and is rejoining
* with same InstanceId, so MemberId is NULL,
* or it's rejoining after unsubscribing,
* then it must have the same MemberId as before,
* as it lasts for member lifetime.
* It both don't hold, we cannot add the member
* to the group. */
return NULL;
/* Member is a new instance and is rejoining
* with a new MemberId. */

if (!member->left_static_membership)
/* Old member still active,
* fence this one */
return NULL;

RD_IF_FREE(member->id, rd_free);
member->id = RD_KAFKAP_STR_DUP(MemberId);
member->left_static_membership = rd_false;
}
}

Expand Down Expand Up @@ -1555,29 +1563,39 @@ static void rd_kafka_mock_cgrp_consumer_member_destroy(
rd_free(member);
}

static void rd_kafka_mock_cgrp_consumer_member_leave_static(
rd_kafka_mock_cgrp_consumer_member_t *member) {
member->left_static_membership = rd_true;
rd_kafka_mock_cgrp_consumer_member_returned_assignment_set(member,
NULL);
}


/**
* @brief Called when a member must leave a consumer group.
*
* @param mcgrp Consumer group to leave.
* @param member Member that leaves.
* @param is_static If true, the member is leaving with static group membership.
* @param leave_static If true, the member is leaving with static group
* membership.
*
* @locks mcluster->lock MUST be held.
*/
void rd_kafka_mock_cgrp_consumer_member_leave(
rd_kafka_mock_cgrp_consumer_t *mcgrp,
rd_kafka_mock_cgrp_consumer_member_t *member) {
rd_kafka_mock_cgrp_consumer_member_t *member,
rd_bool_t leave_static) {
rd_bool_t is_static = member->instance_id != NULL;

rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK",
"Member %s is leaving group %s, is static: %s", member->id,
mcgrp->id, RD_STR_ToF(is_static));
if (!is_static)
"Member %s is leaving group %s, is static: %s, "
"static leave: %s",
member->id, mcgrp->id, RD_STR_ToF(is_static),
RD_STR_ToF(leave_static));
if (!is_static || !leave_static)
rd_kafka_mock_cgrp_consumer_member_destroy(mcgrp, member);
else
rd_kafka_mock_cgrp_consumer_member_returned_assignment_set(
member, NULL);
rd_kafka_mock_cgrp_consumer_member_leave_static(member);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -2884,7 +2884,7 @@ rd_kafka_mock_handle_ConsumerGroupHeartbeat(rd_kafka_mock_connection_t *mconn,
}
} else {
rd_kafka_mock_cgrp_consumer_member_leave(
mcgrp, member);
mcgrp, member, MemberEpoch == -2);
member = NULL;
}
} else {
Expand Down
6 changes: 5 additions & 1 deletion src/rdkafka_mock_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ typedef struct rd_kafka_mock_cgrp_consumer_member_s {

rd_list_t *subscribed_topics; /**< Subscribed topics */

rd_bool_t left_static_membership; /**< Member has left the group
* with static membership. */

struct rd_kafka_mock_connection_s *conn; /**< Connection, may be NULL
* if there is no ongoing
* request. */
Expand Down Expand Up @@ -671,7 +674,8 @@ rd_kafka_mock_cgrp_consumer_get(rd_kafka_mock_cluster_t *mcluster,

void rd_kafka_mock_cgrp_consumer_member_leave(
rd_kafka_mock_cgrp_consumer_t *mcgrp,
rd_kafka_mock_cgrp_consumer_member_t *member);
rd_kafka_mock_cgrp_consumer_member_t *member,
rd_bool_t static_leave);

void rd_kafka_mock_cgrp_consumer_member_fenced(
rd_kafka_mock_cgrp_consumer_t *mcgrp,
Expand Down
Loading