KAFKA-16624: Don't generate useless PartitionChangeRecord on older MV (#15810)

Fix a case where we could generate useless PartitionChangeRecords on metadata versions older than
3.6-IV0. This could happen in the case where we had an ISR with only one broker in it, and we were
trying to go down to a fully empty ISR. In this case, PartitionChangeBuilder would block the record
to going down to a fully empty ISR (since that is not valid in these pre-KIP-966 metadata
versions), but it would still emit the record, even though it had no effect.

Reviewers: Igor Soarez <soarez@apple.com>
diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index 7f1b2cb..0d2c1bd 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -214,6 +214,10 @@
         }
     }
 
+    public List<Integer> targetIsr() {
+        return targetIsr;
+    }
+
     // VisibleForTesting
     /**
      * Perform leader election based on the partition state and leader election type.
@@ -365,44 +369,61 @@
     }
 
     /**
-     * Trigger a leader epoch bump if one is needed.
+     * Trigger a leader epoch bump if one is needed because of replica reassignment.
      *
-     * We need to bump the leader epoch if:
-     * 1. The leader changed, or
-     * 2. The new replica list does not contain all the nodes that the old replica list did.
-     *
-     * Changes that do NOT fall in any of these categories will increase the partition epoch, but
-     * not the leader epoch. Note that if the leader epoch increases, the partition epoch will
-     * always increase as well; there is no case where the partition epoch increases more slowly
-     * than the leader epoch.
-     *
-     * If the PartitionChangeRecord sets the leader field to something other than
-     * NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That takes care of
-     * case 1. In this function, we check for cases 2 and 3, and handle them by manually
-     * setting record.leader to the current leader.
-     *
-     * In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica manager
-     * that required that the leader epoch be bump whenever the ISR shrank. In MV 3.6 this leader
-     * bump is not required when the ISR shrinks. Note, that the leader epoch is never increased if
-     * the ISR expanded.
-     *
-     * In MV 3.6 and beyond, if the controller is in ZK migration mode, the leader epoch must
-     * be bumped during ISR shrink for compatability with ZK brokers.
+     * Note that if the leader epoch increases, the partition epoch will always increase as well; there is no
+     * case where the partition epoch increases more slowly than the leader epoch.
      */
-    void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
-        if (record.leader() == NO_LEADER_CHANGE) {
-            boolean bumpLeaderEpochOnIsrShrink = metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() || zkMigrationEnabled;
-
-            if (!Replicas.contains(targetReplicas, partition.replicas)) {
-                // Reassignment
-                record.setLeader(partition.leader);
-            } else if (bumpLeaderEpochOnIsrShrink && !Replicas.contains(targetIsr, partition.isr)) {
-                // ISR shrink
-                record.setLeader(partition.leader);
-            }
+    void triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(PartitionChangeRecord record) {
+        if (record.leader() != NO_LEADER_CHANGE) {
+            // The leader is already changing, so there will already be a leader epoch bump.
+            return;
+        }
+        if (!Replicas.contains(targetReplicas, partition.replicas)) {
+            // If the new replica list does not contain all the brokers that the old one did,
+            // ensure that there will be a leader epoch bump by setting the leader field.
+            record.setLeader(partition.leader);
         }
     }
 
+    /**
+     * Trigger a leader epoch bump if one is needed because of an ISR shrink.
+     *
+     * Note that it's important to call this function only after we have set the ISR field in
+     * the PartitionChangeRecord.
+     */
+    void triggerLeaderEpochBumpForIsrShrinkIfNeeded(PartitionChangeRecord record) {
+        if (!(metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() || zkMigrationEnabled)) {
+            // We only need to bump the leader epoch on an ISR shrink in two cases:
+            //
+            // 1. In older metadata versions before 3.6, there was a bug (KAFKA-15021) in the
+            //    broker replica manager that required that the leader epoch be bumped whenever
+            //    the ISR shrank. (This was never necessary for EXPANSIONS, only SHRINKS.)
+            //
+            // 2. During ZK migration, we bump the leader epoch during all ISR shrinks, in order
+            // to maintain compatibility with migrating brokers that are still in ZK mode.
+            //
+            // If we're not in either case, we can exit here.
+            return;
+        }
+        if (record.leader() != NO_LEADER_CHANGE) {
+            // The leader is already changing, so there will already be a leader epoch bump.
+            return;
+        }
+        if (record.isr() == null) {
+            // The ISR is not changing.
+            return;
+        }
+        if (!Replicas.contains(record.isr(), partition.isr)) {
+            // If the new ISR list does not contain all the brokers that the old one did,
+            // ensure that there will be a leader epoch bump by setting the leader field.
+            record.setLeader(partition.leader);
+        }
+    }
+
+    /**
+     * @return true if the reassignment was completed; false otherwise.
+     */
     private void completeReassignmentIfNeeded() {
         PartitionReassignmentReplicas reassignmentReplicas =
             new PartitionReassignmentReplicas(
@@ -435,7 +456,7 @@
 
         tryElection(record);
 
-        triggerLeaderEpochBumpIfNeeded(record);
+        triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(record);
 
         maybeUpdateRecordElr(record);
 
@@ -449,6 +470,8 @@
             record.setIsr(targetIsr);
         }
 
+        triggerLeaderEpochBumpForIsrShrinkIfNeeded(record);
+
         maybeUpdateLastKnownLeader(record);
 
         setAssignmentChanges(record);
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index db3d1e1..044402d 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -137,19 +137,13 @@
                 0,
                 r -> r != 3,
                 metadataVersion,
-                2)
-                .setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
+                2).
+                setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
+                setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
     }
 
-    private static PartitionChangeBuilder createFooBuilder(short version) {
-        return new PartitionChangeBuilder(FOO,
-                FOO_ID,
-                0,
-                r -> r != 3,
-                metadataVersionForPartitionChangeRecordVersion(version), 
-                2).
-                setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
-                setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
+    private static PartitionChangeBuilder createFooBuilder(short partitionChangeRecordVersion) {
+        return createFooBuilder(metadataVersionForPartitionChangeRecordVersion(partitionChangeRecordVersion));
     }
 
     private static final PartitionRegistration BAR = new PartitionRegistration.Builder().
@@ -295,102 +289,145 @@
         assertElectLeaderEquals(createBazBuilder(version).setElection(Election.UNCLEAN), 3, false);
     }
 
-    private static void testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder,
-                                                                 PartitionChangeRecord record,
-                                                                 int expectedLeader) {
-        builder.triggerLeaderEpochBumpIfNeeded(record);
+    private static void testTriggerLeaderEpochBumpIfNeeded(
+        PartitionChangeBuilder builder,
+        PartitionChangeRecord record,
+        int expectedLeader
+    ) {
+        builder.triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(record);
+        record.setIsr(builder.targetIsr());
+        builder.triggerLeaderEpochBumpForIsrShrinkIfNeeded(record);
         assertEquals(expectedLeader, record.leader());
     }
 
     @ParameterizedTest
     @MethodSource("partitionChangeRecordVersions")
-    public void testTriggerLeaderEpochBumpIfNeeded(short version) {
-        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(version),
-            new PartitionChangeRecord(), NO_LEADER_CHANGE);
-        // Shrinking the ISR doesn't increase the leader epoch
-        testTriggerLeaderEpochBumpIfNeededLeader(
-            createFooBuilder(version).setTargetIsrWithBrokerStates(
-                AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
-            ),
+    public void testNoLeaderEpochBumpIfNothingChanged(short version) {
+        testTriggerLeaderEpochBumpIfNeeded(createFooBuilder(version),
             new PartitionChangeRecord(),
-            NO_LEADER_CHANGE
-        );
-        // Expanding the ISR doesn't increase the leader epoch
-        testTriggerLeaderEpochBumpIfNeededLeader(
-            createFooBuilder(version).setTargetIsrWithBrokerStates(
-                AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))
-            ),
-            new PartitionChangeRecord(),
-            NO_LEADER_CHANGE
-        );
-        // Expanding the ISR during migration doesn't increase leader epoch
-        testTriggerLeaderEpochBumpIfNeededLeader(
-            createFooBuilder(version)
-                .setTargetIsrWithBrokerStates(
-                    AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4)))
-                .setZkMigrationEnabled(true),
-            new PartitionChangeRecord(),
-            NO_LEADER_CHANGE
-        );
-        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(version).
-            setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(),
             NO_LEADER_CHANGE);
-        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(version).
-            setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
-            new PartitionChangeRecord().setLeader(2), 2);
-
-        // Check that the leader epoch is bump if the ISR shrinks and isSkipLeaderEpochBumpSupported is not supported.
-        // See KAFKA-15021 for details.
-        testTriggerLeaderEpochBumpIfNeededLeader(
-            new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, MetadataVersion.IBP_3_5_IV2, 2)
-                .setTargetIsrWithBrokerStates(
-                    AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
-                ),
-            new PartitionChangeRecord(),
-            1
-        );
     }
 
+    /**
+     * Test that shrinking the ISR doesn't increase the leader epoch in later MVs.
+     */
     @ParameterizedTest
-    @MethodSource("partitionChangeRecordVersions")
-    public void testLeaderEpochBumpZkMigration(short version) {
-        // KAFKA-15109: Shrinking the ISR while in ZK migration mode requires a leader epoch bump
-        testTriggerLeaderEpochBumpIfNeededLeader(
-            createFooBuilder(version)
-                .setTargetIsrWithBrokerStates(
-                    AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
-                .setZkMigrationEnabled(true),
+    @ValueSource(strings = {"3.6-IV0", "3.7-IV4"})
+    public void testNoLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
+        MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
+        testTriggerLeaderEpochBumpIfNeeded(
+            createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
+                AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))),
             new PartitionChangeRecord(),
-            1
-        );
+            NO_LEADER_CHANGE);
+    }
 
-        testTriggerLeaderEpochBumpIfNeededLeader(
-            createFooBuilder(version)
-                .setTargetIsrWithBrokerStates(
-                    AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
-                .setZkMigrationEnabled(false),
+    /**
+     * Test that shrinking the ISR does increase the leader epoch in earlier MVs.
+     * See KAFKA-15021 for details.
+     */
+    @ParameterizedTest
+    @ValueSource(strings = {"3.4-IV0", "3.5-IV2"})
+    public void testLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
+        MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
+        testTriggerLeaderEpochBumpIfNeeded(
+            createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
+                AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))),
             new PartitionChangeRecord(),
-            NO_LEADER_CHANGE
-        );
+            1);
+    }
 
-        // For older MV, always expect the epoch to increase regardless of ZK migration
-        testTriggerLeaderEpochBumpIfNeededLeader(
-            createFooBuilder(MetadataVersion.IBP_3_5_IV2)
-                .setTargetIsrWithBrokerStates(
-                    AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
-                .setZkMigrationEnabled(true),
+    /**
+     * Test that shrinking the ISR does increase the leader epoch in later MVs when ZK migration is on.
+     */
+    @ParameterizedTest
+    @ValueSource(strings = {"3.6-IV0", "3.7-IV4"})
+    public void testLeaderEpochBumpOnIsrShrinkWithZkMigration(String metadataVersionString) {
+        MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
+        testTriggerLeaderEpochBumpIfNeeded(
+            createFooBuilder(metadataVersion).
+                setZkMigrationEnabled(true).
+                setTargetIsrWithBrokerStates(
+                    AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))),
             new PartitionChangeRecord(),
-            1
-        );
+            1);
+    }
 
-        testTriggerLeaderEpochBumpIfNeededLeader(
-            createFooBuilder(MetadataVersion.IBP_3_5_IV2)
-                .setTargetIsrWithBrokerStates(
-                    AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
-                .setZkMigrationEnabled(false),
+    /**
+     * Test that expanding the ISR doesn't increase the leader epoch.
+     */
+    @ParameterizedTest
+    @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
+    public void testNoLeaderEpochBumpOnIsrExpansion(String metadataVersionString) {
+        MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
+        testTriggerLeaderEpochBumpIfNeeded(
+            createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
+                AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))),
             new PartitionChangeRecord(),
-            1
-        );
+            NO_LEADER_CHANGE);
+    }
+
+    /**
+     * Test that expanding the ISR doesn't increase the leader epoch during ZK migration.
+     */
+    @ParameterizedTest
+    @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
+    public void testNoLeaderEpochBumpOnIsrExpansionDuringMigration(String metadataVersionString) {
+        MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
+        testTriggerLeaderEpochBumpIfNeeded(
+            createFooBuilder(metadataVersion).
+                setZkMigrationEnabled(true).
+                setTargetIsrWithBrokerStates(
+                    AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))),
+            new PartitionChangeRecord(),
+            NO_LEADER_CHANGE);
+    }
+
+    /**
+     * Test that changing the replica set such that not all the old replicas remain
+     * always results in a leader epoch increase.
+     */
+    @ParameterizedTest
+    @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
+    public void testLeaderEpochBumpOnNewReplicaSetDisjoint(String metadataVersionString) {
+        MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
+        testTriggerLeaderEpochBumpIfNeeded(
+            createFooBuilder(metadataVersion).setTargetReplicas(Arrays.asList(2, 1, 4)),
+            new PartitionChangeRecord(),
+            1);
+    }
+
+    /**
+     * Regression test for KAFKA-16624. Tests that when targetIsr is the empty list, but we
+     * cannot actually change the ISR, triggerLeaderEpochBumpForIsrShrinkIfNeeded does not engage.
+     */
+    @ParameterizedTest
+    @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
+    public void testNoLeaderEpochBumpOnEmptyTargetIsr(String metadataVersionString) {
+        MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
+        PartitionRegistration partition = new PartitionRegistration.Builder().
+            setReplicas(new int[] {2}).
+            setDirectories(new Uuid[]{
+                Uuid.fromString("dpdvA5AZSWySmnPFTnu5Kw")
+            }).
+            setIsr(new int[] {2}).
+            setLeader(2).
+            setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
+            setLeaderEpoch(100).
+            setPartitionEpoch(200).
+            build();
+        PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
+            FOO_ID,
+            0,
+            r -> true,
+            metadataVersion,
+            2).
+            setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
+            setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
+            setTargetReplicas(Arrays.asList());
+        PartitionChangeRecord record = new PartitionChangeRecord();
+        builder.triggerLeaderEpochBumpForIsrShrinkIfNeeded(record);
+        assertEquals(NO_LEADER_CHANGE, record.leader());
     }
 
     @ParameterizedTest