KAFKA-16624: Don't generate useless PartitionChangeRecord on older MV (#15810)
Fix a case where we could generate useless PartitionChangeRecords on metadata versions older than
3.6-IV0. This could happen in the case where we had an ISR with only one broker in it, and we were
trying to go down to a fully empty ISR. In this case, PartitionChangeBuilder would block the record
to going down to a fully empty ISR (since that is not valid in these pre-KIP-966 metadata
versions), but it would still emit the record, even though it had no effect.
Reviewers: Igor Soarez <soarez@apple.com>
diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index 7f1b2cb..0d2c1bd 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -214,6 +214,10 @@
}
}
+ public List<Integer> targetIsr() {
+ return targetIsr;
+ }
+
// VisibleForTesting
/**
* Perform leader election based on the partition state and leader election type.
@@ -365,44 +369,61 @@
}
/**
- * Trigger a leader epoch bump if one is needed.
+ * Trigger a leader epoch bump if one is needed because of replica reassignment.
*
- * We need to bump the leader epoch if:
- * 1. The leader changed, or
- * 2. The new replica list does not contain all the nodes that the old replica list did.
- *
- * Changes that do NOT fall in any of these categories will increase the partition epoch, but
- * not the leader epoch. Note that if the leader epoch increases, the partition epoch will
- * always increase as well; there is no case where the partition epoch increases more slowly
- * than the leader epoch.
- *
- * If the PartitionChangeRecord sets the leader field to something other than
- * NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That takes care of
- * case 1. In this function, we check for cases 2 and 3, and handle them by manually
- * setting record.leader to the current leader.
- *
- * In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica manager
- * that required that the leader epoch be bump whenever the ISR shrank. In MV 3.6 this leader
- * bump is not required when the ISR shrinks. Note, that the leader epoch is never increased if
- * the ISR expanded.
- *
- * In MV 3.6 and beyond, if the controller is in ZK migration mode, the leader epoch must
- * be bumped during ISR shrink for compatability with ZK brokers.
+ * Note that if the leader epoch increases, the partition epoch will always increase as well; there is no
+ * case where the partition epoch increases more slowly than the leader epoch.
*/
- void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {
- if (record.leader() == NO_LEADER_CHANGE) {
- boolean bumpLeaderEpochOnIsrShrink = metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() || zkMigrationEnabled;
-
- if (!Replicas.contains(targetReplicas, partition.replicas)) {
- // Reassignment
- record.setLeader(partition.leader);
- } else if (bumpLeaderEpochOnIsrShrink && !Replicas.contains(targetIsr, partition.isr)) {
- // ISR shrink
- record.setLeader(partition.leader);
- }
+ void triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(PartitionChangeRecord record) {
+ if (record.leader() != NO_LEADER_CHANGE) {
+ // The leader is already changing, so there will already be a leader epoch bump.
+ return;
+ }
+ if (!Replicas.contains(targetReplicas, partition.replicas)) {
+ // If the new replica list does not contain all the brokers that the old one did,
+ // ensure that there will be a leader epoch bump by setting the leader field.
+ record.setLeader(partition.leader);
}
}
+ /**
+ * Trigger a leader epoch bump if one is needed because of an ISR shrink.
+ *
+ * Note that it's important to call this function only after we have set the ISR field in
+ * the PartitionChangeRecord.
+ */
+ void triggerLeaderEpochBumpForIsrShrinkIfNeeded(PartitionChangeRecord record) {
+ if (!(metadataVersion.isLeaderEpochBumpRequiredOnIsrShrink() || zkMigrationEnabled)) {
+ // We only need to bump the leader epoch on an ISR shrink in two cases:
+ //
+ // 1. In older metadata versions before 3.6, there was a bug (KAFKA-15021) in the
+ // broker replica manager that required that the leader epoch be bumped whenever
+ // the ISR shrank. (This was never necessary for EXPANSIONS, only SHRINKS.)
+ //
+ // 2. During ZK migration, we bump the leader epoch during all ISR shrinks, in order
+ // to maintain compatibility with migrating brokers that are still in ZK mode.
+ //
+ // If we're not in either case, we can exit here.
+ return;
+ }
+ if (record.leader() != NO_LEADER_CHANGE) {
+ // The leader is already changing, so there will already be a leader epoch bump.
+ return;
+ }
+ if (record.isr() == null) {
+ // The ISR is not changing.
+ return;
+ }
+ if (!Replicas.contains(record.isr(), partition.isr)) {
+ // If the new ISR list does not contain all the brokers that the old one did,
+ // ensure that there will be a leader epoch bump by setting the leader field.
+ record.setLeader(partition.leader);
+ }
+ }
+
+ /**
+ * @return true if the reassignment was completed; false otherwise.
+ */
private void completeReassignmentIfNeeded() {
PartitionReassignmentReplicas reassignmentReplicas =
new PartitionReassignmentReplicas(
@@ -435,7 +456,7 @@
tryElection(record);
- triggerLeaderEpochBumpIfNeeded(record);
+ triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(record);
maybeUpdateRecordElr(record);
@@ -449,6 +470,8 @@
record.setIsr(targetIsr);
}
+ triggerLeaderEpochBumpForIsrShrinkIfNeeded(record);
+
maybeUpdateLastKnownLeader(record);
setAssignmentChanges(record);
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index db3d1e1..044402d 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -137,19 +137,13 @@
0,
r -> r != 3,
metadataVersion,
- 2)
- .setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
+ 2).
+ setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
+ setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}
- private static PartitionChangeBuilder createFooBuilder(short version) {
- return new PartitionChangeBuilder(FOO,
- FOO_ID,
- 0,
- r -> r != 3,
- metadataVersionForPartitionChangeRecordVersion(version),
- 2).
- setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
- setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
+ private static PartitionChangeBuilder createFooBuilder(short partitionChangeRecordVersion) {
+ return createFooBuilder(metadataVersionForPartitionChangeRecordVersion(partitionChangeRecordVersion));
}
private static final PartitionRegistration BAR = new PartitionRegistration.Builder().
@@ -295,102 +289,145 @@
assertElectLeaderEquals(createBazBuilder(version).setElection(Election.UNCLEAN), 3, false);
}
- private static void testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder,
- PartitionChangeRecord record,
- int expectedLeader) {
- builder.triggerLeaderEpochBumpIfNeeded(record);
+ private static void testTriggerLeaderEpochBumpIfNeeded(
+ PartitionChangeBuilder builder,
+ PartitionChangeRecord record,
+ int expectedLeader
+ ) {
+ builder.triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(record);
+ record.setIsr(builder.targetIsr());
+ builder.triggerLeaderEpochBumpForIsrShrinkIfNeeded(record);
assertEquals(expectedLeader, record.leader());
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
- public void testTriggerLeaderEpochBumpIfNeeded(short version) {
- testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(version),
- new PartitionChangeRecord(), NO_LEADER_CHANGE);
- // Shrinking the ISR doesn't increase the leader epoch
- testTriggerLeaderEpochBumpIfNeededLeader(
- createFooBuilder(version).setTargetIsrWithBrokerStates(
- AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
- ),
+ public void testNoLeaderEpochBumpIfNothingChanged(short version) {
+ testTriggerLeaderEpochBumpIfNeeded(createFooBuilder(version),
new PartitionChangeRecord(),
- NO_LEADER_CHANGE
- );
- // Expanding the ISR doesn't increase the leader epoch
- testTriggerLeaderEpochBumpIfNeededLeader(
- createFooBuilder(version).setTargetIsrWithBrokerStates(
- AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))
- ),
- new PartitionChangeRecord(),
- NO_LEADER_CHANGE
- );
- // Expanding the ISR during migration doesn't increase leader epoch
- testTriggerLeaderEpochBumpIfNeededLeader(
- createFooBuilder(version)
- .setTargetIsrWithBrokerStates(
- AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4)))
- .setZkMigrationEnabled(true),
- new PartitionChangeRecord(),
- NO_LEADER_CHANGE
- );
- testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(version).
- setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(),
NO_LEADER_CHANGE);
- testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(version).
- setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
- new PartitionChangeRecord().setLeader(2), 2);
-
- // Check that the leader epoch is bump if the ISR shrinks and isSkipLeaderEpochBumpSupported is not supported.
- // See KAFKA-15021 for details.
- testTriggerLeaderEpochBumpIfNeededLeader(
- new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, MetadataVersion.IBP_3_5_IV2, 2)
- .setTargetIsrWithBrokerStates(
- AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
- ),
- new PartitionChangeRecord(),
- 1
- );
}
+ /**
+ * Test that shrinking the ISR doesn't increase the leader epoch in later MVs.
+ */
@ParameterizedTest
- @MethodSource("partitionChangeRecordVersions")
- public void testLeaderEpochBumpZkMigration(short version) {
- // KAFKA-15109: Shrinking the ISR while in ZK migration mode requires a leader epoch bump
- testTriggerLeaderEpochBumpIfNeededLeader(
- createFooBuilder(version)
- .setTargetIsrWithBrokerStates(
- AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
- .setZkMigrationEnabled(true),
+ @ValueSource(strings = {"3.6-IV0", "3.7-IV4"})
+ public void testNoLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
+ MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
+ testTriggerLeaderEpochBumpIfNeeded(
+ createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
+ AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))),
new PartitionChangeRecord(),
- 1
- );
+ NO_LEADER_CHANGE);
+ }
- testTriggerLeaderEpochBumpIfNeededLeader(
- createFooBuilder(version)
- .setTargetIsrWithBrokerStates(
- AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
- .setZkMigrationEnabled(false),
+ /**
+ * Test that shrinking the ISR does increase the leader epoch in earlier MVs.
+ * See KAFKA-15021 for details.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {"3.4-IV0", "3.5-IV2"})
+ public void testLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
+ MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
+ testTriggerLeaderEpochBumpIfNeeded(
+ createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
+ AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))),
new PartitionChangeRecord(),
- NO_LEADER_CHANGE
- );
+ 1);
+ }
- // For older MV, always expect the epoch to increase regardless of ZK migration
- testTriggerLeaderEpochBumpIfNeededLeader(
- createFooBuilder(MetadataVersion.IBP_3_5_IV2)
- .setTargetIsrWithBrokerStates(
- AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
- .setZkMigrationEnabled(true),
+ /**
+ * Test that shrinking the ISR does increase the leader epoch in later MVs when ZK migration is on.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {"3.6-IV0", "3.7-IV4"})
+ public void testLeaderEpochBumpOnIsrShrinkWithZkMigration(String metadataVersionString) {
+ MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
+ testTriggerLeaderEpochBumpIfNeeded(
+ createFooBuilder(metadataVersion).
+ setZkMigrationEnabled(true).
+ setTargetIsrWithBrokerStates(
+ AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))),
new PartitionChangeRecord(),
- 1
- );
+ 1);
+ }
- testTriggerLeaderEpochBumpIfNeededLeader(
- createFooBuilder(MetadataVersion.IBP_3_5_IV2)
- .setTargetIsrWithBrokerStates(
- AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1)))
- .setZkMigrationEnabled(false),
+ /**
+ * Test that expanding the ISR doesn't increase the leader epoch.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
+ public void testNoLeaderEpochBumpOnIsrExpansion(String metadataVersionString) {
+ MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
+ testTriggerLeaderEpochBumpIfNeeded(
+ createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
+ AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))),
new PartitionChangeRecord(),
- 1
- );
+ NO_LEADER_CHANGE);
+ }
+
+ /**
+ * Test that expanding the ISR doesn't increase the leader epoch during ZK migration.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
+ public void testNoLeaderEpochBumpOnIsrExpansionDuringMigration(String metadataVersionString) {
+ MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
+ testTriggerLeaderEpochBumpIfNeeded(
+ createFooBuilder(metadataVersion).
+ setZkMigrationEnabled(true).
+ setTargetIsrWithBrokerStates(
+ AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))),
+ new PartitionChangeRecord(),
+ NO_LEADER_CHANGE);
+ }
+
+ /**
+ * Test that changing the replica set such that not all the old replicas remain
+ * always results in a leader epoch increase.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
+ public void testLeaderEpochBumpOnNewReplicaSetDisjoint(String metadataVersionString) {
+ MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
+ testTriggerLeaderEpochBumpIfNeeded(
+ createFooBuilder(metadataVersion).setTargetReplicas(Arrays.asList(2, 1, 4)),
+ new PartitionChangeRecord(),
+ 1);
+ }
+
+ /**
+ * Regression test for KAFKA-16624. Tests that when targetIsr is the empty list, but we
+ * cannot actually change the ISR, triggerLeaderEpochBumpForIsrShrinkIfNeeded does not engage.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
+ public void testNoLeaderEpochBumpOnEmptyTargetIsr(String metadataVersionString) {
+ MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
+ PartitionRegistration partition = new PartitionRegistration.Builder().
+ setReplicas(new int[] {2}).
+ setDirectories(new Uuid[]{
+ Uuid.fromString("dpdvA5AZSWySmnPFTnu5Kw")
+ }).
+ setIsr(new int[] {2}).
+ setLeader(2).
+ setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
+ setLeaderEpoch(100).
+ setPartitionEpoch(200).
+ build();
+ PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
+ FOO_ID,
+ 0,
+ r -> true,
+ metadataVersion,
+ 2).
+ setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
+ setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
+ setTargetReplicas(Arrays.asList());
+ PartitionChangeRecord record = new PartitionChangeRecord();
+ builder.triggerLeaderEpochBumpForIsrShrinkIfNeeded(record);
+ assertEquals(NO_LEADER_CHANGE, record.leader());
}
@ParameterizedTest