Fix bug in AssignmentInfo#encode and add additional logging (#7545)

Same as #7537
but targeted at 2.3 for cherry-pick
Reviewers: Bill Bejeck <bbejeck@gmail.com>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 3dd0836..fc7a13d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -417,9 +417,15 @@
                 minReceivedMetadataVersion = usedVersion;
             }
 
-            final int latestSupportedVersion = info.latestSupportedVersion();
-            if (latestSupportedVersion < minSupportedMetadataVersion) {
-                minSupportedMetadataVersion = latestSupportedVersion;
+            final int supportedVersion = info.latestSupportedVersion();
+
+            if (supportedVersion < minSupportedMetadataVersion) {
+                log.debug("Downgrade the current minimum supported version {} to the smaller seen supported version {}",
+                    minSupportedMetadataVersion, supportedVersion);
+                minSupportedMetadataVersion = supportedVersion;
+            } else {
+                log.debug("Current minimum supported version remains at {}, last seen supported version was {}",
+                    minSupportedMetadataVersion, supportedVersion);
             }
 
             // create the new client metadata if necessary
@@ -450,10 +456,15 @@
         }
 
         if (minReceivedMetadataVersion < SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
-            log.info("Downgrading metadata to version {}. Latest supported version is {}.",
+            log.info("Downgrade metadata to version {}. Latest supported version is {}.",
                 minReceivedMetadataVersion,
                 SubscriptionInfo.LATEST_SUPPORTED_VERSION);
         }
+        if (minSupportedMetadataVersion < SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
+            log.info("Downgrade latest supported metadata to version {}. Latest supported version is {}.",
+                minSupportedMetadataVersion,
+                SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+        }
 
         log.debug("Constructed client metadata {} from the member subscriptions.", clientsMetadata);
 
@@ -888,9 +899,10 @@
                 log.info(
                     "Sent a version {} subscription and got version {} assignment back (successful version probing). "
                         +
-                        "Downgrade subscription metadata to commonly supported version and trigger new rebalance.",
+                        "Downgrade subscription metadata to commonly supported version {} and trigger new rebalance.",
                     usedSubscriptionMetadataVersion,
-                    receivedAssignmentMetadataVersion
+                    receivedAssignmentMetadataVersion,
+                    latestCommonlySupportedVersion
                 );
                 usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
                 return true;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 5c7b037..3521def 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -143,7 +143,7 @@
                     break;
                 default:
                     throw new IllegalStateException("Unknown metadata version: " + usedVersion
-                        + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
+                        + "; latest commonly supported version: " + commonlySupportedVersion);
             }
 
             out.flush();
@@ -206,14 +206,14 @@
 
     private void encodeVersionThree(final DataOutputStream out) throws IOException {
         out.writeInt(3);
-        out.writeInt(LATEST_SUPPORTED_VERSION);
+        out.writeInt(commonlySupportedVersion);
         encodeActiveAndStandbyTaskAssignment(out);
         encodePartitionsByHost(out);
     }
 
     private void encodeVersionFour(final DataOutputStream out) throws IOException {
         out.writeInt(4);
-        out.writeInt(LATEST_SUPPORTED_VERSION);
+        out.writeInt(commonlySupportedVersion);
         encodeActiveAndStandbyTaskAssignment(out);
         encodePartitionsByHost(out);
         out.writeInt(errCode);
@@ -230,7 +230,7 @@
             final AssignmentInfo assignmentInfo;
 
             final int usedVersion = in.readInt();
-            final int latestSupportedVersion;
+            final int commonlySupportedVersion;
             switch (usedVersion) {
                 case 1:
                     assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN);
@@ -241,13 +241,13 @@
                     decodeVersionTwoData(assignmentInfo, in);
                     break;
                 case 3:
-                    latestSupportedVersion = in.readInt();
-                    assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
+                    commonlySupportedVersion = in.readInt();
+                    assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
                     decodeVersionThreeData(assignmentInfo, in);
                     break;
                 case 4:
-                    latestSupportedVersion = in.readInt();
-                    assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
+                    commonlySupportedVersion = in.readInt();
+                    assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
                     decodeVersionFourData(assignmentInfo, in);
                     break;
                 default:
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 03b9e2d..96f3779 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -207,11 +207,10 @@
 
     private ByteBuffer encodeVersionThree() {
         final byte[] endPointBytes = prepareUserEndPoint();
-
         final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
+        buf.putInt(3);
+        buf.putInt(latestSupportedVersion);
 
-        buf.putInt(3); // used version
-        buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
         encodeClientUUID(buf);
         encodeTasks(buf, prevTasks);
         encodeTasks(buf, standbyTasks);
@@ -226,7 +225,7 @@
         final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
 
         buf.putInt(4); // used version
-        buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
+        buf.putInt(latestSupportedVersion); // supported version
         encodeClientUUID(buf);
         encodeTasks(buf, prevTasks);
         encodeTasks(buf, standbyTasks);
@@ -273,7 +272,7 @@
             default:
                 latestSupportedVersion = data.getInt();
                 subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion);
-                log.info("Unable to decode subscription data: used version: {}; latest supported version: {}", usedVersion, LATEST_SUPPORTED_VERSION);
+                log.info("Unable to decode subscription data: used version: {}; latest supported version: {}", usedVersion, latestSupportedVersion);
         }
 
         return subscriptionInfo;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index 8b99065..3ed2f71 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -94,4 +94,13 @@
         final AssignmentInfo expectedInfo = new AssignmentInfo(4, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 2);
         assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
     }
+
+    @Test
+    public void shouldEncodeAndDecodeSmallerCommonlySupportedVersion() {
+        final int usedVersion = AssignmentInfo.LATEST_SUPPORTED_VERSION - 1;
+        final int commonlySupportedVersion = AssignmentInfo.LATEST_SUPPORTED_VERSION - 1;
+        final AssignmentInfo info = new AssignmentInfo(usedVersion, commonlySupportedVersion, activeTasks, standbyTasks, globalAssignment, 0);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion, activeTasks, standbyTasks, globalAssignment, 0);
+        assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 2a75c57..6492cc0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -90,6 +90,16 @@
         assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, info.latestSupportedVersion());
     }
 
+    @Test
+    public void shouldEncodeAndDecodeSmallerLatestSupportedVersion() {
+        final int usedVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION - 1;
+        final int latestSupportedVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION - 1;
+
+        final SubscriptionInfo info = new SubscriptionInfo(usedVersion, latestSupportedVersion, processId, activeTasks, standbyTasks, "localhost:80");
+        final SubscriptionInfo expectedInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion, processId, activeTasks, standbyTasks, "localhost:80");
+        assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
+    }
+
     private ByteBuffer encodeFutureVersion() {
         final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */
                                                    + 4 /* supported version */);