Skip to content

Commit db62c7c

Browse files
KAFKA-19157: added group.share.max.share.sessions config (#19503)
This PR adds the config group.share.max.share.sessions to ShareGroupConfig Reviewers: Andrew Schofield <aschofield@confluent.io>
1 parent c73d97d commit db62c7c

File tree

3 files changed

+18
-2
lines changed

3 files changed

+18
-2
lines changed

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

+1
Original file line numberDiff line numberDiff line change
@@ -1030,6 +1030,7 @@ class KafkaConfigTest {
10301030
case ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
10311031
case GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
10321032
case ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
1033+
case ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
10331034
case ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG => //ignore string
10341035

10351036
/** Streams groups configs */

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.stream.Collectors;
2929

3030
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
31+
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
3132
import static org.apache.kafka.common.config.ConfigDef.Range.between;
3233
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
3334
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
@@ -70,6 +71,10 @@ public class ShareGroupConfig {
7071
public static final int SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT = 1000;
7172
public static final String SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC = "The purge interval (in number of requests) of the share fetch request purgatory";
7273

74+
public static final String SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG = "group.share.max.share.sessions";
75+
public static final int SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT = 2000;
76+
public static final String SHARE_GROUP_MAX_SHARE_SESSIONS_DOC = "The maximum number of share sessions per broker.";
77+
7378
public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG = "group.share.persister.class.name";
7479
public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT = "org.apache.kafka.server.share.persister.DefaultStatePersister";
7580
public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_DOC = "The class name of share persister for share group. The class should implement " +
@@ -84,6 +89,7 @@ public class ShareGroupConfig {
8489
.define(SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT, SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM, SHARE_GROUP_MAX_GROUPS_DOC)
8590
.define(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM, SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC)
8691
.define(SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
92+
.define(SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, INT, SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SHARE_SESSIONS_DOC)
8793
.defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING, SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM, SHARE_GROUP_PERSISTER_CLASS_NAME_DOC);
8894

8995
private final boolean isShareGroupEnabled;
@@ -94,11 +100,13 @@ public class ShareGroupConfig {
94100
private final int shareGroupMaxRecordLockDurationMs;
95101
private final int shareGroupMinRecordLockDurationMs;
96102
private final int shareFetchPurgatoryPurgeIntervalRequests;
103+
private final int shareGroupMaxShareSessions;
97104
private final String shareGroupPersisterClassName;
98105

99106
public ShareGroupConfig(AbstractConfig config) {
100-
// Share groups are enabled in two cases: 1) The internal configuration to enable it is
101-
// explicitly set; or 2) the share rebalance protocol is enabled.
107+
// Share groups are enabled in two cases:
108+
// 1. The internal configuration to enable it is explicitly set
109+
// 2. the share rebalance protocol is enabled.
102110
Set<String> protocols = config.getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)
103111
.stream().map(String::toUpperCase).collect(Collectors.toSet());
104112
isShareGroupEnabled = config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG) ||
@@ -110,6 +118,7 @@ public ShareGroupConfig(AbstractConfig config) {
110118
shareGroupMaxRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
111119
shareGroupMinRecordLockDurationMs = config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG);
112120
shareFetchPurgatoryPurgeIntervalRequests = config.getInt(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG);
121+
shareGroupMaxShareSessions = config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG);
113122
shareGroupPersisterClassName = config.getString(ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG);
114123
validate();
115124
}
@@ -147,6 +156,10 @@ public int shareFetchPurgatoryPurgeIntervalRequests() {
147156
return shareFetchPurgatoryPurgeIntervalRequests;
148157
}
149158

159+
public int shareGroupMaxShareSessions() {
160+
return shareGroupMaxShareSessions;
161+
}
162+
150163
public String shareGroupPersisterClassName() {
151164
return shareGroupPersisterClassName;
152165
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public void testConfigs() {
4444
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, 15000);
4545
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, 60000);
4646
configs.put(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, 1000);
47+
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, 1000);
4748

4849
ShareGroupConfig config = createConfig(configs);
4950

@@ -55,6 +56,7 @@ public void testConfigs() {
5556
assertEquals(15000, config.shareGroupMinRecordLockDurationMs());
5657
assertEquals(60000, config.shareGroupMaxRecordLockDurationMs());
5758
assertEquals(1000, config.shareFetchPurgatoryPurgeIntervalRequests());
59+
assertEquals(1000, config.shareGroupMaxShareSessions());
5860
}
5961

6062
@Test

0 commit comments

Comments
 (0)