Skip to content

Commit 0e4f27d

Browse files
KAFKA-19950: Mark session stale when removed prior disconnection event (#21062) (#21071)
The PR marks the old session stale in connection listener map to avoid triggering the member leave event. This is more often see when client sends the share fetch request again with `initial epoch`, then broker refreshes the connection. Reviewers: Andrew Schofield <aschofield@confluent.io>, Abhinav Dixit <adixit@confluent.io>
1 parent 63a777a commit 0e4f27d

File tree

5 files changed

+233
-94
lines changed

5 files changed

+233
-94
lines changed

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3370,7 +3370,7 @@ class KafkaApis(val requestChannel: RequestChannel,
33703370
error(s"Releasing share session close with correlation from client ${request.header.clientId} " +
33713371
s"failed with error ${throwable.getMessage}")
33723372
} else {
3373-
info(s"Releasing share session close $releaseAcquiredRecordsData succeeded")
3373+
info(s"Releasing share session for client id ${request.header.clientId} succeeded, response: $releaseAcquiredRecordsData")
33743374
}
33753375
)
33763376
}
@@ -3594,7 +3594,7 @@ class KafkaApis(val requestChannel: RequestChannel,
35943594
debug(s"Releasing share session close with correlation from client ${request.header.clientId} " +
35953595
s"failed with error ${throwable.getMessage}")
35963596
} else {
3597-
info(s"Releasing share session close $releaseAcquiredRecordsData succeeded")
3597+
info(s"Releasing share session for client id ${request.header.clientId} succeeded, response: $releaseAcquiredRecordsData")
35983598
}
35993599
}
36003600
}

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 70 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -4777,12 +4777,6 @@ class KafkaApisTest extends Logging {
47774777
cachedSharePartitions.mustAdd(new CachedSharePartition(
47784778
new TopicIdPartition(topicId, partitionIndex, topicName), false))
47794779

4780-
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any())).thenThrow(
4781-
Errors.INVALID_REQUEST.exception()
4782-
).thenReturn(new ShareSessionContext(1, new ShareSession(
4783-
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2
4784-
)))
4785-
47864780
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
47874781
any[Session](), anyString, anyDouble, anyLong)).thenReturn(0)
47884782

@@ -4806,6 +4800,13 @@ class KafkaApisTest extends Logging {
48064800

48074801
var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
48084802
var request = buildRequest(shareFetchRequest)
4803+
4804+
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any())).thenThrow(
4805+
Errors.INVALID_REQUEST.exception()
4806+
).thenReturn(new ShareSessionContext(1, new ShareSession(
4807+
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2, request.context.connectionId)
4808+
))
4809+
48094810
kafkaApis = createKafkaApis()
48104811
kafkaApis.handleShareFetchRequest(request)
48114812
var response = verifyNoThrottling[ShareFetchResponse](request)
@@ -5024,11 +5025,6 @@ class KafkaApisTest extends Logging {
50245025
cachedSharePartitions.mustAdd(new CachedSharePartition(
50255026
new TopicIdPartition(topicId, partitionIndex, topicName), false))
50265027

5027-
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any()))
5028-
.thenReturn(new ShareSessionContext(1, new ShareSession(
5029-
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
5030-
)
5031-
50325028
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
50335029
any[Session](), anyString, anyDouble, anyLong)).thenReturn(0)
50345030

@@ -5052,6 +5048,12 @@ class KafkaApisTest extends Logging {
50525048

50535049
val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
50545050
val request = buildRequest(shareFetchRequest)
5051+
5052+
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any()))
5053+
.thenReturn(new ShareSessionContext(1, new ShareSession(
5054+
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2, request.context.connectionId))
5055+
)
5056+
50555057
kafkaApis = createKafkaApis()
50565058
kafkaApis.handleShareFetchRequest(request)
50575059
val response = verifyNoThrottling[ShareFetchResponse](request)
@@ -5083,11 +5085,6 @@ class KafkaApisTest extends Logging {
50835085
cachedSharePartitions.mustAdd(new CachedSharePartition(
50845086
new TopicIdPartition(topicId, partitionIndex, topicName), false))
50855087

5086-
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any()))
5087-
.thenReturn(new ShareSessionContext(1, new ShareSession(
5088-
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
5089-
)
5090-
50915088
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
50925089
any[Session](), anyString, anyDouble, anyLong)).thenReturn(0)
50935090

@@ -5111,6 +5108,12 @@ class KafkaApisTest extends Logging {
51115108

51125109
val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
51135110
val request = buildRequest(shareFetchRequest)
5111+
5112+
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any()))
5113+
.thenReturn(new ShareSessionContext(1, new ShareSession(
5114+
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2, request.context.connectionId))
5115+
)
5116+
51145117
kafkaApis = createKafkaApis()
51155118
kafkaApis.handleShareFetchRequest(request)
51165119
val response = verifyNoThrottling[ShareFetchResponse](request)
@@ -5460,16 +5463,6 @@ class KafkaApisTest extends Logging {
54605463
new TopicIdPartition(topicId, partitionIndex, topicName), false)
54615464
)
54625465

5463-
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any())).thenReturn(
5464-
new ShareSessionContext(0, util.List.of(
5465-
new TopicIdPartition(topicId, partitionIndex, topicName)
5466-
))
5467-
).thenReturn(new ShareSessionContext(1, new ShareSession(
5468-
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
5469-
).thenReturn(new ShareSessionContext(2, new ShareSession(
5470-
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 3))
5471-
)
5472-
54735466
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
54745467
any[Session](), anyString, anyDouble, anyLong)).thenReturn(0)
54755468

@@ -5486,6 +5479,16 @@ class KafkaApisTest extends Logging {
54865479
var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
54875480
var request = buildRequest(shareFetchRequest)
54885481

5482+
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any())).thenReturn(
5483+
new ShareSessionContext(0, util.List.of(
5484+
new TopicIdPartition(topicId, partitionIndex, topicName)
5485+
))
5486+
).thenReturn(new ShareSessionContext(1, new ShareSession(
5487+
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2, request.context.connectionId))
5488+
).thenReturn(new ShareSessionContext(2, new ShareSession(
5489+
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 3, request.context.connectionId))
5490+
)
5491+
54895492
// First share fetch request is to establish the share session with the broker.
54905493
kafkaApis = createKafkaApis()
54915494
kafkaApis.handleShareFetchRequest(request)
@@ -5725,19 +5728,6 @@ class KafkaApisTest extends Logging {
57255728
new TopicIdPartition(topicId4, 0, topicName4), false
57265729
))
57275730

5728-
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any())).thenReturn(
5729-
new ShareSessionContext(0, util.List.of(
5730-
new TopicIdPartition(topicId1, new TopicPartition(topicName1, 0)),
5731-
new TopicIdPartition(topicId1, new TopicPartition(topicName1, 1)),
5732-
new TopicIdPartition(topicId2, new TopicPartition(topicName2, 0)),
5733-
new TopicIdPartition(topicId2, new TopicPartition(topicName2, 1))
5734-
))
5735-
).thenReturn(new ShareSessionContext(1, new ShareSession(
5736-
new ShareSessionKey(groupId, memberId), cachedSharePartitions1, 2))
5737-
).thenReturn(new ShareSessionContext(2, new ShareSession(
5738-
new ShareSessionKey(groupId, memberId), cachedSharePartitions2, 3))
5739-
).thenReturn(new FinalContext())
5740-
57415731
when(sharePartitionManager.releaseSession(any(), any())).thenReturn(
57425732
CompletableFuture.completedFuture(util.Map.of[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData](
57435733
new TopicIdPartition(topicId3, new TopicPartition(topicName3, 0)),
@@ -5808,6 +5798,20 @@ class KafkaApisTest extends Logging {
58085798

58095799
var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
58105800
var request = buildRequest(shareFetchRequest)
5801+
5802+
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any())).thenReturn(
5803+
new ShareSessionContext(0, util.List.of(
5804+
new TopicIdPartition(topicId1, new TopicPartition(topicName1, 0)),
5805+
new TopicIdPartition(topicId1, new TopicPartition(topicName1, 1)),
5806+
new TopicIdPartition(topicId2, new TopicPartition(topicName2, 0)),
5807+
new TopicIdPartition(topicId2, new TopicPartition(topicName2, 1))
5808+
))
5809+
).thenReturn(new ShareSessionContext(1, new ShareSession(
5810+
new ShareSessionKey(groupId, memberId), cachedSharePartitions1, 2, request.context.connectionId))
5811+
).thenReturn(new ShareSessionContext(2, new ShareSession(
5812+
new ShareSessionKey(groupId, memberId), cachedSharePartitions2, 3, request.context.connectionId))
5813+
).thenReturn(new FinalContext())
5814+
58115815
// First share fetch request is to establish the share session with the broker.
58125816
kafkaApis = createKafkaApis()
58135817
kafkaApis.handleShareFetchRequest(request)
@@ -6688,14 +6692,6 @@ class KafkaApisTest extends Logging {
66886692
new TopicIdPartition(topicId, 0, topicName), false
66896693
))
66906694

6691-
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any())).thenReturn(
6692-
new ShareSessionContext(0, util.List.of(
6693-
new TopicIdPartition(topicId, partitionIndex, topicName)
6694-
))
6695-
).thenReturn(new ShareSessionContext(1, new ShareSession(
6696-
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
6697-
)
6698-
66996695
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
67006696
any[Session](), anyString, anyDouble, anyLong)).thenReturn(0)
67016697

@@ -6720,6 +6716,15 @@ class KafkaApisTest extends Logging {
67206716

67216717
var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
67226718
var request = buildRequest(shareFetchRequest)
6719+
6720+
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any())).thenReturn(
6721+
new ShareSessionContext(0, util.List.of(
6722+
new TopicIdPartition(topicId, partitionIndex, topicName)
6723+
))
6724+
).thenReturn(new ShareSessionContext(1, new ShareSession(
6725+
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2, request.context.connectionId))
6726+
)
6727+
67236728
kafkaApis = createKafkaApis()
67246729
kafkaApis.handleShareFetchRequest(request)
67256730
var response = verifyNoThrottling[ShareFetchResponse](request)
@@ -6807,14 +6812,6 @@ class KafkaApisTest extends Logging {
68076812
new TopicIdPartition(topicId, 0, topicName), false
68086813
))
68096814

6810-
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any())).thenReturn(
6811-
new ShareSessionContext(0, util.List.of(
6812-
new TopicIdPartition(topicId, partitionIndex, topicName)
6813-
))
6814-
).thenReturn(new ShareSessionContext(1, new ShareSession(
6815-
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
6816-
)
6817-
68186815
when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn(
68196816
CompletableFuture.completedFuture(util.Map.of[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData](
68206817
new TopicIdPartition(topicId, new TopicPartition(topicName, 0)),
@@ -6837,6 +6834,15 @@ class KafkaApisTest extends Logging {
68376834

68386835
var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
68396836
var request = buildRequest(shareFetchRequest)
6837+
6838+
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any())).thenReturn(
6839+
new ShareSessionContext(0, util.List.of(
6840+
new TopicIdPartition(topicId, partitionIndex, topicName)
6841+
))
6842+
).thenReturn(new ShareSessionContext(1, new ShareSession(
6843+
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2, request.context.connectionId))
6844+
)
6845+
68406846
kafkaApis = createKafkaApis()
68416847
kafkaApis.handleShareFetchRequest(request)
68426848
var response = verifyNoThrottling[ShareFetchResponse](request)
@@ -14299,14 +14305,6 @@ class KafkaApisTest extends Logging {
1429914305
new TopicIdPartition(topicId, 0, topicName), false
1430014306
))
1430114307

14302-
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any())).thenReturn(
14303-
new ShareSessionContext(0, util.List.of(
14304-
new TopicIdPartition(topicId, partitionIndex, topicName)
14305-
))
14306-
).thenReturn(new ShareSessionContext(1, new ShareSession(
14307-
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2))
14308-
)
14309-
1431014308
when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
1431114309
any[Session](), anyString, anyDouble, anyLong)).thenReturn(0)
1431214310

@@ -14331,6 +14329,15 @@ class KafkaApisTest extends Logging {
1433114329

1433214330
var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
1433314331
var request = buildRequest(shareFetchRequest)
14332+
14333+
when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any(), any())).thenReturn(
14334+
new ShareSessionContext(0, util.List.of(
14335+
new TopicIdPartition(topicId, partitionIndex, topicName)
14336+
))
14337+
).thenReturn(new ShareSessionContext(1, new ShareSession(
14338+
new ShareSessionKey(groupId, memberId), cachedSharePartitions, 2, request.context.connectionId))
14339+
)
14340+
1433414341
kafkaApis = createKafkaApis()
1433514342
kafkaApis.handleShareFetchRequest(request)
1433614343
var response = verifyNoThrottling[ShareFetchResponse](request)

server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public enum ModifiedTopicIdPartitionType {
3838

3939
private final ShareSessionKey key;
4040
private final ImplicitLinkedHashCollection<CachedSharePartition> partitionMap;
41+
private final String connectionId;
4142

4243
// visible for testing
4344
public int epoch;
@@ -48,16 +49,23 @@ public enum ModifiedTopicIdPartitionType {
4849
/**
4950
* The share session.
5051
* Each share session is protected by its own lock, which must be taken before mutable
51-
* fields are read or modified. This includes modification of the share session partition map.
52+
* fields are read or modified. This includes modification of the share session partition map.
5253
*
5354
* @param key The share session key to identify the share session uniquely.
5455
* @param partitionMap The CachedPartitionMap.
5556
* @param epoch The share session sequence number.
57+
* @param connectionId The connection id associated with this share session.
5658
*/
57-
public ShareSession(ShareSessionKey key, ImplicitLinkedHashCollection<CachedSharePartition> partitionMap, int epoch) {
59+
public ShareSession(
60+
ShareSessionKey key,
61+
ImplicitLinkedHashCollection<CachedSharePartition> partitionMap,
62+
int epoch,
63+
String connectionId
64+
) {
5865
this.key = key;
5966
this.partitionMap = partitionMap;
6067
this.epoch = epoch;
68+
this.connectionId = connectionId;
6169
}
6270

6371
public ShareSessionKey key() {
@@ -85,6 +93,10 @@ public synchronized Boolean isEmpty() {
8593
return partitionMap.isEmpty();
8694
}
8795

96+
public String connectionId() {
97+
return connectionId;
98+
}
99+
88100
// Update the cached partition data based on the request.
89101
public synchronized Map<ModifiedTopicIdPartitionType, List<TopicIdPartition>> update(
90102
List<TopicIdPartition> shareFetchData,
@@ -138,6 +150,7 @@ public String toString() {
138150
", partitionMap=" + partitionMap +
139151
", epoch=" + epoch +
140152
", cachedSize=" + cachedSize +
153+
", connectionId=" + connectionId +
141154
")";
142155
}
143156
}

0 commit comments

Comments
 (0)