Fix bug in AssignmentInfo#encode and add additional logging (#7545)
Same as #7537
but targeted at 2.3 for cherry-pick
Reviewers: Bill Bejeck <bbejeck@gmail.com>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 3dd0836..fc7a13d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -417,9 +417,15 @@
minReceivedMetadataVersion = usedVersion;
}
- final int latestSupportedVersion = info.latestSupportedVersion();
- if (latestSupportedVersion < minSupportedMetadataVersion) {
- minSupportedMetadataVersion = latestSupportedVersion;
+ final int supportedVersion = info.latestSupportedVersion();
+
+ if (supportedVersion < minSupportedMetadataVersion) {
+ log.debug("Downgrade the current minimum supported version {} to the smaller seen supported version {}",
+ minSupportedMetadataVersion, supportedVersion);
+ minSupportedMetadataVersion = supportedVersion;
+ } else {
+ log.debug("Current minimum supported version remains at {}, last seen supported version was {}",
+ minSupportedMetadataVersion, supportedVersion);
}
// create the new client metadata if necessary
@@ -450,10 +456,15 @@
}
if (minReceivedMetadataVersion < SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
- log.info("Downgrading metadata to version {}. Latest supported version is {}.",
+ log.info("Downgrade metadata to version {}. Latest supported version is {}.",
minReceivedMetadataVersion,
SubscriptionInfo.LATEST_SUPPORTED_VERSION);
}
+ if (minSupportedMetadataVersion < SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
+ log.info("Downgrade latest supported metadata to version {}. Latest supported version is {}.",
+ minSupportedMetadataVersion,
+ SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+ }
log.debug("Constructed client metadata {} from the member subscriptions.", clientsMetadata);
@@ -888,9 +899,10 @@
log.info(
"Sent a version {} subscription and got version {} assignment back (successful version probing). "
+
- "Downgrade subscription metadata to commonly supported version and trigger new rebalance.",
+ "Downgrade subscription metadata to commonly supported version {} and trigger new rebalance.",
usedSubscriptionMetadataVersion,
- receivedAssignmentMetadataVersion
+ receivedAssignmentMetadataVersion,
+ latestCommonlySupportedVersion
);
usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
return true;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 5c7b037..3521def 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -143,7 +143,7 @@
break;
default:
throw new IllegalStateException("Unknown metadata version: " + usedVersion
- + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
+ + "; latest commonly supported version: " + commonlySupportedVersion);
}
out.flush();
@@ -206,14 +206,14 @@
private void encodeVersionThree(final DataOutputStream out) throws IOException {
out.writeInt(3);
- out.writeInt(LATEST_SUPPORTED_VERSION);
+ out.writeInt(commonlySupportedVersion);
encodeActiveAndStandbyTaskAssignment(out);
encodePartitionsByHost(out);
}
private void encodeVersionFour(final DataOutputStream out) throws IOException {
out.writeInt(4);
- out.writeInt(LATEST_SUPPORTED_VERSION);
+ out.writeInt(commonlySupportedVersion);
encodeActiveAndStandbyTaskAssignment(out);
encodePartitionsByHost(out);
out.writeInt(errCode);
@@ -230,7 +230,7 @@
final AssignmentInfo assignmentInfo;
final int usedVersion = in.readInt();
- final int latestSupportedVersion;
+ final int commonlySupportedVersion;
switch (usedVersion) {
case 1:
assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN);
@@ -241,13 +241,13 @@
decodeVersionTwoData(assignmentInfo, in);
break;
case 3:
- latestSupportedVersion = in.readInt();
- assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
+ commonlySupportedVersion = in.readInt();
+ assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
decodeVersionThreeData(assignmentInfo, in);
break;
case 4:
- latestSupportedVersion = in.readInt();
- assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
+ commonlySupportedVersion = in.readInt();
+ assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
decodeVersionFourData(assignmentInfo, in);
break;
default:
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 03b9e2d..96f3779 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -207,11 +207,10 @@
private ByteBuffer encodeVersionThree() {
final byte[] endPointBytes = prepareUserEndPoint();
-
final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
+ buf.putInt(3);
+ buf.putInt(latestSupportedVersion);
- buf.putInt(3); // used version
- buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
encodeClientUUID(buf);
encodeTasks(buf, prevTasks);
encodeTasks(buf, standbyTasks);
@@ -226,7 +225,7 @@
final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
buf.putInt(4); // used version
- buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
+ buf.putInt(latestSupportedVersion); // supported version
encodeClientUUID(buf);
encodeTasks(buf, prevTasks);
encodeTasks(buf, standbyTasks);
@@ -273,7 +272,7 @@
default:
latestSupportedVersion = data.getInt();
subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion);
- log.info("Unable to decode subscription data: used version: {}; latest supported version: {}", usedVersion, LATEST_SUPPORTED_VERSION);
+ log.info("Unable to decode subscription data: used version: {}; latest supported version: {}", usedVersion, latestSupportedVersion);
}
return subscriptionInfo;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index 8b99065..3ed2f71 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -94,4 +94,13 @@
final AssignmentInfo expectedInfo = new AssignmentInfo(4, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 2);
assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
}
+
+ @Test
+ public void shouldEncodeAndDecodeSmallerCommonlySupportedVersion() {
+ final int usedVersion = AssignmentInfo.LATEST_SUPPORTED_VERSION - 1;
+ final int commonlySupportedVersion = AssignmentInfo.LATEST_SUPPORTED_VERSION - 1;
+ final AssignmentInfo info = new AssignmentInfo(usedVersion, commonlySupportedVersion, activeTasks, standbyTasks, globalAssignment, 0);
+ final AssignmentInfo expectedInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion, activeTasks, standbyTasks, globalAssignment, 0);
+ assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 2a75c57..6492cc0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -90,6 +90,16 @@
assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, info.latestSupportedVersion());
}
+ @Test
+ public void shouldEncodeAndDecodeSmallerLatestSupportedVersion() {
+ final int usedVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION - 1;
+ final int latestSupportedVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION - 1;
+
+ final SubscriptionInfo info = new SubscriptionInfo(usedVersion, latestSupportedVersion, processId, activeTasks, standbyTasks, "localhost:80");
+ final SubscriptionInfo expectedInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion, processId, activeTasks, standbyTasks, "localhost:80");
+ assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
+ }
+
private ByteBuffer encodeFutureVersion() {
final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */
+ 4 /* supported version */);