blob: 044402d50872e24f466376fd788426f16ae23d38 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.protocol.types.TaggedFields;
import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.controller.PartitionChangeBuilder.ElectionResult;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.DefaultDirProvider;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.IntPredicate;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.kafka.controller.PartitionChangeBuilder.Election;
import static org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
import static org.apache.kafka.metadata.placement.PartitionAssignmentTest.partitionAssignment;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
@Timeout(value = 40)
public class PartitionChangeBuilderTest {
private static final DefaultDirProvider DEFAULT_DIR_PROVIDER = brokerId -> DirectoryId.UNASSIGNED;
private static Stream<Arguments> partitionChangeRecordVersions() {
return IntStream.range(PartitionChangeRecord.LOWEST_SUPPORTED_VERSION, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION + 1).mapToObj(version -> Arguments.of((short) version));
}
@Test
public void testChangeRecordIsNoOp() {
/* If the next few checks fail please update them based on the latest schema and make sure
* to update changeRecordIsNoOp to take into account the new schema or tagged fields.
*/
// Check that the supported versions haven't changed
assertEquals(2, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION);
assertEquals(0, PartitionChangeRecord.LOWEST_SUPPORTED_VERSION);
// For the latest version check that the number of tagged fields hasn't changed
TaggedFields taggedFields = (TaggedFields) PartitionChangeRecord.SCHEMA_0.get(2).def.type;
assertEquals(6, taggedFields.numFields());
assertTrue(changeRecordIsNoOp(new PartitionChangeRecord()));
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().setLeader(1)));
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
setIsr(Arrays.asList(1, 2, 3))));
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
setRemovingReplicas(Arrays.asList(1))));
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
setAddingReplicas(Arrays.asList(4))));
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
setEligibleLeaderReplicas(Arrays.asList(5))));
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
setLastKnownElr(Arrays.asList(6))));
assertFalse(
changeRecordIsNoOp(
new PartitionChangeRecord()
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
)
);
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().setDirectories(Arrays.asList(
Uuid.fromString("5JwD0VNXRV2Wr9CCON38Tw"),
Uuid.fromString("zpL1bRzTQXmmgdxlLHOWuw"),
Uuid.fromString("6iGUpAkHQXC6bY0FTcPRDw")
))));
}
private static final PartitionRegistration FOO = new PartitionRegistration.Builder().
setReplicas(new int[] {2, 1, 3}).
setDirectories(new Uuid[]{
Uuid.fromString("dpdvA5AZSWySmnPFTnu5Kw"),
Uuid.fromString("V60B3cglScq3Xk8BX1NxAQ"),
DirectoryId.UNASSIGNED,
}).
setIsr(new int[] {2, 1, 3}).
setLeader(1).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(100).
setPartitionEpoch(200).
build();
private final static Uuid FOO_ID = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
private static MetadataVersion metadataVersionForPartitionChangeRecordVersion(short version) {
switch (version) {
case (short) 0:
return MetadataVersion.IBP_3_7_IV0;
case (short) 1:
return MetadataVersion.IBP_3_7_IV2;
case (short) 2:
return MetadataVersion.IBP_3_8_IV0;
default:
throw new RuntimeException("Unknown PartitionChangeRecord version " + version);
}
}
private static PartitionChangeBuilder createFooBuilder(MetadataVersion metadataVersion) {
return new PartitionChangeBuilder(FOO,
FOO_ID,
0,
r -> r != 3,
metadataVersion,
2).
setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}
private static PartitionChangeBuilder createFooBuilder(short partitionChangeRecordVersion) {
return createFooBuilder(metadataVersionForPartitionChangeRecordVersion(partitionChangeRecordVersion));
}
private static final PartitionRegistration BAR = new PartitionRegistration.Builder().
setReplicas(new int[] {1, 2, 3, 4}).
setDirectories(new Uuid[] {
DirectoryId.UNASSIGNED,
Uuid.fromString("X5FnAcIgTheWgTMzeO5WHw"),
Uuid.fromString("GtrcdoSOTm2vFMGFeZq0eg"),
Uuid.fromString("YcOqPw5ARmeKr1y9W3AkFw"),
}).
setIsr(new int[] {1, 2, 3}).
setRemovingReplicas(new int[] {1}).
setAddingReplicas(new int[] {4}).
setLeader(1).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(100).
setPartitionEpoch(200).
build();
private final static Uuid BAR_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw");
private static boolean isElrEnabled(short partitionChangeRecordVersion) {
return partitionChangeRecordVersion >= 2;
}
private static PartitionChangeBuilder createBarBuilder(short version) {
return new PartitionChangeBuilder(BAR,
BAR_ID,
0,
r -> r != 3,
metadataVersionForPartitionChangeRecordVersion(version),
2).
setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}
private static final PartitionRegistration BAZ = new PartitionRegistration.Builder().
setReplicas(new int[] {2, 1, 3}).
setDirectories(new Uuid[] {
Uuid.fromString("ywnfFpTBTbOsFdZ6uAdOmw"),
Uuid.fromString("Th0x70ecRbWvZNNV33jyRA"),
Uuid.fromString("j216tuSoQsC9JFd1Z5ZP6w"),
}).
setIsr(new int[] {1, 3}).
setLeader(3).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(100).
setPartitionEpoch(200).
build();
private final static Uuid BAZ_ID = Uuid.fromString("wQzt5gkSTwuQNXZF5gIw7A");
private static PartitionChangeBuilder createBazBuilder(short version) {
return new PartitionChangeBuilder(BAZ,
BAZ_ID,
0,
__ -> true,
metadataVersionForPartitionChangeRecordVersion(version),
2).
setEligibleLeaderReplicasEnabled(isElrEnabled(version)).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}
private static final PartitionRegistration OFFLINE_WITHOUT_ELR = new PartitionRegistration.Builder().
setReplicas(new int[] {2, 1, 3}).
setDirectories(new Uuid[]{
Uuid.fromString("iYGgiDV5Sb2EtH6hbgYnCA"),
Uuid.fromString("XI2t4qAUSkGlLZSKeEVf8g"),
Uuid.fromString("eqRW24kIRlitzQFzmovE0Q")
}).
setIsr(new int[] {3}).
setLeader(-1).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(100).
setPartitionEpoch(200).
build();
private static final PartitionRegistration OFFLINE_WITH_ELR = new PartitionRegistration.Builder().
setReplicas(new int[] {2, 1, 3}).
setDirectories(new Uuid[]{
Uuid.fromString("CQEqt7trRrmqyNxUT1CY0g"),
Uuid.fromString("59Mb9smoSsC0bGUP2FYV8A"),
Uuid.fromString("LBTmsCVJREqJuIEtwqxRDg")
}).
setElr(new int[] {3}).
setIsr(new int[] {}).
setLastKnownElr(new int[] {2}).
setLeader(-1).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(100).
setPartitionEpoch(200).
build();
private final static Uuid OFFLINE_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw");
private static PartitionChangeBuilder createOfflineBuilder(short partitionChangeRecordVersion) {
MetadataVersion metadataVersion =
metadataVersionForPartitionChangeRecordVersion(partitionChangeRecordVersion);
if (metadataVersion.isElrSupported()) {
return new PartitionChangeBuilder(OFFLINE_WITH_ELR, OFFLINE_ID, 0, r -> r == 1,
metadataVersion, 2).
setEligibleLeaderReplicasEnabled(true).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
} else {
return new PartitionChangeBuilder(OFFLINE_WITHOUT_ELR, OFFLINE_ID, 0, r -> r == 1,
metadataVersion, 2).
setEligibleLeaderReplicasEnabled(false).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
}
}
private static void assertElectLeaderEquals(PartitionChangeBuilder builder,
int expectedNode,
boolean expectedUnclean) {
ElectionResult electionResult = builder.electLeader();
assertEquals(expectedNode, electionResult.node);
assertEquals(expectedUnclean, electionResult.unclean);
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testElectLeader(short version) {
assertElectLeaderEquals(createFooBuilder(version).setElection(Election.PREFERRED), 2, false);
assertElectLeaderEquals(createFooBuilder(version), 1, false);
assertElectLeaderEquals(createFooBuilder(version).setElection(Election.UNCLEAN), 1, false);
assertElectLeaderEquals(createFooBuilder(version)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 3))), 1, false);
assertElectLeaderEquals(createFooBuilder(version).setElection(Election.UNCLEAN)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 3))), 1, false);
assertElectLeaderEquals(createFooBuilder(version)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))), NO_LEADER, false);
assertElectLeaderEquals(createFooBuilder(version).setElection(Election.UNCLEAN).
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))), 2, true);
assertElectLeaderEquals(
createFooBuilder(version).setElection(Election.UNCLEAN)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(4))).setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
4,
false
);
assertElectLeaderEquals(createBazBuilder(version).setElection(Election.PREFERRED), 3, false);
assertElectLeaderEquals(createBazBuilder(version), 3, false);
assertElectLeaderEquals(createBazBuilder(version).setElection(Election.UNCLEAN), 3, false);
}
private static void testTriggerLeaderEpochBumpIfNeeded(
PartitionChangeBuilder builder,
PartitionChangeRecord record,
int expectedLeader
) {
builder.triggerLeaderEpochBumpForReplicaReassignmentIfNeeded(record);
record.setIsr(builder.targetIsr());
builder.triggerLeaderEpochBumpForIsrShrinkIfNeeded(record);
assertEquals(expectedLeader, record.leader());
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testNoLeaderEpochBumpIfNothingChanged(short version) {
testTriggerLeaderEpochBumpIfNeeded(createFooBuilder(version),
new PartitionChangeRecord(),
NO_LEADER_CHANGE);
}
/**
* Test that shrinking the ISR doesn't increase the leader epoch in later MVs.
*/
@ParameterizedTest
@ValueSource(strings = {"3.6-IV0", "3.7-IV4"})
public void testNoLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))),
new PartitionChangeRecord(),
NO_LEADER_CHANGE);
}
/**
* Test that shrinking the ISR does increase the leader epoch in earlier MVs.
* See KAFKA-15021 for details.
*/
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2"})
public void testLeaderEpochBumpOnIsrShrink(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))),
new PartitionChangeRecord(),
1);
}
/**
* Test that shrinking the ISR does increase the leader epoch in later MVs when ZK migration is on.
*/
@ParameterizedTest
@ValueSource(strings = {"3.6-IV0", "3.7-IV4"})
public void testLeaderEpochBumpOnIsrShrinkWithZkMigration(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).
setZkMigrationEnabled(true).
setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))),
new PartitionChangeRecord(),
1);
}
/**
* Test that expanding the ISR doesn't increase the leader epoch.
*/
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
public void testNoLeaderEpochBumpOnIsrExpansion(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))),
new PartitionChangeRecord(),
NO_LEADER_CHANGE);
}
/**
* Test that expanding the ISR doesn't increase the leader epoch during ZK migration.
*/
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
public void testNoLeaderEpochBumpOnIsrExpansionDuringMigration(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).
setZkMigrationEnabled(true).
setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1, 3, 4))),
new PartitionChangeRecord(),
NO_LEADER_CHANGE);
}
/**
* Test that changing the replica set such that not all the old replicas remain
* always results in a leader epoch increase.
*/
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
public void testLeaderEpochBumpOnNewReplicaSetDisjoint(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
testTriggerLeaderEpochBumpIfNeeded(
createFooBuilder(metadataVersion).setTargetReplicas(Arrays.asList(2, 1, 4)),
new PartitionChangeRecord(),
1);
}
/**
* Regression test for KAFKA-16624. Tests that when targetIsr is the empty list, but we
* cannot actually change the ISR, triggerLeaderEpochBumpForIsrShrinkIfNeeded does not engage.
*/
@ParameterizedTest
@ValueSource(strings = {"3.4-IV0", "3.5-IV2", "3.6-IV0", "3.7-IV4"})
public void testNoLeaderEpochBumpOnEmptyTargetIsr(String metadataVersionString) {
MetadataVersion metadataVersion = MetadataVersion.fromVersionString(metadataVersionString);
PartitionRegistration partition = new PartitionRegistration.Builder().
setReplicas(new int[] {2}).
setDirectories(new Uuid[]{
Uuid.fromString("dpdvA5AZSWySmnPFTnu5Kw")
}).
setIsr(new int[] {2}).
setLeader(2).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(100).
setPartitionEpoch(200).
build();
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
FOO_ID,
0,
r -> true,
metadataVersion,
2).
setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
setTargetReplicas(Arrays.asList());
PartitionChangeRecord record = new PartitionChangeRecord();
builder.triggerLeaderEpochBumpForIsrShrinkIfNeeded(record);
assertEquals(NO_LEADER_CHANGE, record.leader());
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testNoChange(short version) {
assertEquals(Optional.empty(), createFooBuilder(version).build());
assertEquals(Optional.empty(), createFooBuilder(version).setElection(Election.UNCLEAN).build());
assertEquals(Optional.empty(), createBarBuilder(version).build());
assertEquals(Optional.empty(), createBarBuilder(version).setElection(Election.UNCLEAN).build());
assertEquals(Optional.empty(), createBazBuilder(version).setElection(Election.PREFERRED).build());
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testIsrChangeDoesntBumpLeaderEpoch(short version) {
// Changing the ISR should not cause the leader epoch to increase
assertEquals(
// Expected
Optional.of(
new ApiMessageAndVersion(
new PartitionChangeRecord()
.setTopicId(FOO_ID)
.setPartitionId(0)
.setIsr(Arrays.asList(2, 1)),
version
)
),
// Actual
createFooBuilder(version)
.setTargetIsrWithBrokerStates(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 1))
)
.build()
);
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testIsrChangeAndLeaderChange(short version) {
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setIsr(Arrays.asList(2, 3)).
setLeader(2), version)),
createFooBuilder(version).setTargetIsrWithBrokerStates(AlterPartitionRequest.
newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2, 3))).build());
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testReassignmentRearrangesReplicas(short version) {
PartitionChangeRecord expectedRecord = new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setReplicas(Arrays.asList(3, 2, 1));
if (version >= 1) {
Map<Integer, Uuid> dirs = DirectoryId.createAssignmentMap(FOO.replicas, FOO.directories);
expectedRecord.setDirectories(Arrays.asList(dirs.get(3), dirs.get(2), dirs.get(1)));
}
assertEquals(Optional.of(new ApiMessageAndVersion(expectedRecord, version)),
createFooBuilder(version).setTargetReplicas(Arrays.asList(3, 2, 1)).build());
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testIsrEnlargementCompletesReassignment(short version) {
PartitionChangeRecord expectedRecord = new PartitionChangeRecord().
setTopicId(BAR_ID).
setPartitionId(0).
setReplicas(Arrays.asList(2, 3, 4)).
setIsr(Arrays.asList(2, 3, 4)).
setLeader(2).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList());
if (version >= 1) {
Map<Integer, Uuid> dirs = DirectoryId.createAssignmentMap(BAR.replicas, BAR.directories);
expectedRecord.setDirectories(Arrays.asList(dirs.get(2), dirs.get(3), dirs.get(4)));
}
assertEquals(Optional.of(new ApiMessageAndVersion(expectedRecord, version)),
createBarBuilder(version).setTargetIsrWithBrokerStates(AlterPartitionRequest.
newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2, 3, 4))).build());
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testRevertReassignment(short version) {
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(BAR);
assertEquals(Arrays.asList(1, 2, 3), revert.replicas());
assertEquals(Arrays.asList(1, 2, 3), revert.isr());
PartitionChangeRecord expectedRecord = new PartitionChangeRecord().
setTopicId(BAR_ID).
setPartitionId(0).
setReplicas(Arrays.asList(1, 2, 3)).
setLeader(1).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList());
if (version >= 1) {
Map<Integer, Uuid> dirs = DirectoryId.createAssignmentMap(BAR.replicas, BAR.directories);
expectedRecord.setDirectories(Arrays.asList(dirs.get(1), dirs.get(2), dirs.get(3)));
}
assertEquals(Optional.of(new ApiMessageAndVersion(expectedRecord, version)),
createBarBuilder(version).
setTargetReplicas(revert.replicas()).
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(revert.isr())).
setTargetRemoving(Collections.emptyList()).
setTargetAdding(Collections.emptyList()).
build());
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testRemovingReplicaReassignment(short version) {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
partitionAssignment(Replicas.toList(FOO.replicas)), partitionAssignment(Arrays.asList(1, 2)));
assertEquals(Collections.singletonList(3), replicas.removing());
assertEquals(Collections.emptyList(), replicas.adding());
assertEquals(Arrays.asList(1, 2, 3), replicas.replicas());
PartitionChangeRecord expectedRecord = new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setReplicas(Arrays.asList(1, 2)).
setIsr(Arrays.asList(2, 1)).
setLeader(1);
if (version >= 1) {
Map<Integer, Uuid> dirs = DirectoryId.createAssignmentMap(FOO.replicas, FOO.directories);
expectedRecord.setDirectories(Arrays.asList(dirs.get(1), dirs.get(2)));
}
assertEquals(Optional.of(new ApiMessageAndVersion(expectedRecord, version)),
createFooBuilder(version).
setTargetReplicas(replicas.replicas()).
setTargetRemoving(replicas.removing()).
build());
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testAddingReplicaReassignment(short version) {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
partitionAssignment(Replicas.toList(FOO.replicas)), partitionAssignment(Arrays.asList(1, 2, 3, 4)));
assertEquals(Collections.emptyList(), replicas.removing());
assertEquals(Collections.singletonList(4), replicas.adding());
assertEquals(Arrays.asList(1, 2, 3, 4), replicas.replicas());
PartitionChangeRecord expectedRecord = new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setReplicas(Arrays.asList(1, 2, 3, 4)).
setAddingReplicas(Collections.singletonList(4));
if (version >= 1) {
Map<Integer, Uuid> dirs = DirectoryId.createAssignmentMap(FOO.replicas, FOO.directories);
expectedRecord.setDirectories(Arrays.asList(dirs.get(1), dirs.get(2), dirs.get(3), DirectoryId.UNASSIGNED));
}
assertEquals(Optional.of(new ApiMessageAndVersion(expectedRecord, version)),
createFooBuilder(version).
setTargetReplicas(replicas.replicas()).
setTargetAdding(replicas.adding()).
build());
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testUncleanLeaderElection(short version) {
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(
new PartitionChangeRecord()
.setTopicId(FOO_ID)
.setPartitionId(0)
.setIsr(Arrays.asList(2))
.setLeader(2)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()),
version
);
assertEquals(
Optional.of(expectedRecord),
createFooBuilder(version).setElection(Election.UNCLEAN)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))).build()
);
PartitionChangeRecord record = new PartitionChangeRecord()
.setTopicId(OFFLINE_ID)
.setPartitionId(0)
.setIsr(Arrays.asList(1))
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value());
if (version >= 2) {
// The test partition has ELR, so unclean election will clear these fiedls.
record.setEligibleLeaderReplicas(Collections.emptyList())
.setLastKnownElr(Collections.emptyList());
}
expectedRecord = new ApiMessageAndVersion(record, version);
assertEquals(
Optional.of(expectedRecord),
createOfflineBuilder(version).setElection(Election.UNCLEAN).build()
);
assertEquals(
Optional.of(expectedRecord),
createOfflineBuilder(version).setElection(Election.UNCLEAN)
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2))).build()
);
}
private static Stream<Arguments> leaderRecoveryAndZkMigrationParams() {
return Stream.of(
arguments(true, true),
arguments(true, false),
arguments(false, true),
arguments(false, false)
);
}
@ParameterizedTest
@MethodSource("leaderRecoveryAndZkMigrationParams")
public void testChangeInLeadershipDoesNotChangeRecoveryState(boolean isLeaderRecoverySupported, boolean zkMigrationsEnabled) {
final byte noChange = (byte) -1;
int leaderId = 1;
LeaderRecoveryState recoveryState = LeaderRecoveryState.RECOVERING;
PartitionRegistration registration = new PartitionRegistration.Builder().
setReplicas(new int[] {leaderId, leaderId + 1, leaderId + 2}).
setDirectories(new Uuid[] {
Uuid.fromString("1sF6XXLkSN2LtDums7CJ8Q"),
Uuid.fromString("iaBBVsoHQR6NDKXwliKMqw"),
Uuid.fromString("sHaBwjdrR2S3bL4E1RKC8Q")
}).
setIsr(new int[] {leaderId}).
setLeader(leaderId).
setLeaderRecoveryState(recoveryState).
setLeaderEpoch(100).
setPartitionEpoch(200).
build();
MetadataVersion metadataVersion = leaderRecoveryMetadataVersion(isLeaderRecoverySupported);
// Change the partition so that there is no leader
PartitionChangeBuilder offlineBuilder = new PartitionChangeBuilder(
registration,
FOO_ID,
0,
brokerId -> false,
metadataVersion,
2
);
offlineBuilder.setZkMigrationEnabled(zkMigrationsEnabled);
// Set the target ISR to empty to indicate that the last leader is offline
offlineBuilder.setTargetIsrWithBrokerStates(Collections.emptyList());
// The partition should stay as recovering
PartitionChangeRecord changeRecord = (PartitionChangeRecord) offlineBuilder
.build()
.get()
.message();
assertEquals(noChange, changeRecord.leaderRecoveryState());
assertEquals(NO_LEADER, changeRecord.leader());
registration = registration.merge(changeRecord);
assertEquals(NO_LEADER, registration.leader);
assertEquals(leaderId, registration.isr[0]);
assertEquals(recoveryState, registration.leaderRecoveryState);
// Bring the leader back online
PartitionChangeBuilder onlineBuilder = new PartitionChangeBuilder(
registration,
FOO_ID,
0,
brokerId -> true,
metadataVersion,
2
);
onlineBuilder.setZkMigrationEnabled(zkMigrationsEnabled);
// The only broker in the ISR is elected leader and stays in the recovering
changeRecord = (PartitionChangeRecord) onlineBuilder.build().get().message();
assertEquals(noChange, changeRecord.leaderRecoveryState());
registration = registration.merge(changeRecord);
assertEquals(leaderId, registration.leader);
assertEquals(leaderId, registration.isr[0]);
assertEquals(recoveryState, registration.leaderRecoveryState);
}
@ParameterizedTest
@MethodSource("leaderRecoveryAndZkMigrationParams")
void testUncleanSetsLeaderRecoveringState(boolean isLeaderRecoverySupported, boolean zkMigrationsEnabled) {
final byte noChange = (byte) -1;
int leaderId = 1;
PartitionRegistration registration = new PartitionRegistration.Builder().
setReplicas(new int[] {leaderId, leaderId + 1, leaderId + 2}).
setDirectories(new Uuid[] {
Uuid.fromString("uYpxts0pS4K4bk5XOoXB4g"),
Uuid.fromString("kS6fHEqwRYucduWkmvsevw"),
Uuid.fromString("De9RqRThQRGjKg3i3yzUxA")
}).
setIsr(new int[] {leaderId + 1, leaderId + 2}).
setLeader(NO_LEADER).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(100).
setPartitionEpoch(200).
build();
MetadataVersion metadataVersion = leaderRecoveryMetadataVersion(isLeaderRecoverySupported);
// Change the partition using unclean leader election
PartitionChangeBuilder onlineBuilder = new PartitionChangeBuilder(
registration,
FOO_ID,
0,
brokerId -> brokerId == leaderId,
metadataVersion,
2
).setElection(Election.UNCLEAN);
onlineBuilder.setZkMigrationEnabled(zkMigrationsEnabled);
// The partition should stay as recovering
PartitionChangeRecord changeRecord = (PartitionChangeRecord) onlineBuilder
.build()
.get()
.message();
byte expectedRecoveryChange = noChange;
if (isLeaderRecoverySupported) {
expectedRecoveryChange = LeaderRecoveryState.RECOVERING.value();
}
assertEquals(expectedRecoveryChange, changeRecord.leaderRecoveryState());
assertEquals(leaderId, changeRecord.leader());
assertEquals(1, changeRecord.isr().size());
assertEquals(leaderId, changeRecord.isr().get(0));
registration = registration.merge(changeRecord);
LeaderRecoveryState expectedRecovery = LeaderRecoveryState.RECOVERED;
if (isLeaderRecoverySupported) {
expectedRecovery = LeaderRecoveryState.RECOVERING;
}
assertEquals(leaderId, registration.leader);
assertEquals(leaderId, registration.isr[0]);
assertEquals(expectedRecovery, registration.leaderRecoveryState);
}
@Test
public void testStoppedLeaderIsDemotedAfterReassignmentCompletesEvenIfNoNewEligibleLeaders() {
// Set up PartitionRegistration as if there's an ongoing reassignment from [0, 1] to [2, 3]
int[] replicas = new int[] {2, 3, 0, 1};
Uuid[] directories = {
Uuid.fromString("XCBQClkBSZyphD87QUXzDA"),
Uuid.fromString("Or2Rp9tTQOSVuy12hsfmTA"),
Uuid.fromString("pThsodMNSwGvljTfc1RNVQ"),
Uuid.fromString("d8CGoNJmS5mJdF20tc8P7g")
};
// The ISR starts off with the old replicas
int[] isr = new int[] {0, 1};
// We're removing [0, 1]
int[] removingReplicas = new int[] {0, 1};
// And adding [2, 3]
int[] addingReplicas = new int[] {2, 3};
// The leader is 0, one of the replicas we're removing
int leader = 0;
LeaderRecoveryState leaderRecoveryState = LeaderRecoveryState.RECOVERED;
int leaderEpoch = 0;
int partitionEpoch = 0;
PartitionRegistration part = new PartitionRegistration.Builder().
setReplicas(replicas).
setDirectories(directories).
setIsr(isr).
setRemovingReplicas(removingReplicas).
setAddingReplicas(addingReplicas).
setLeader(leader).
setLeaderRecoveryState(leaderRecoveryState).
setLeaderEpoch(leaderEpoch).
setPartitionEpoch(partitionEpoch).
build();
Uuid topicId = Uuid.randomUuid();
// Always return false for valid leader. This is so none of the new replicas are valid leaders. This is so we
// test what happens when the previous leader is a replica being "stopped" ie removed from the replicas list
// and none of the adding replicas can be a leader. We want to make sure we do not leave the previous replica
// being stopped as leader.
IntPredicate isValidLeader = l -> false;
PartitionChangeBuilder partitionChangeBuilder = new PartitionChangeBuilder(
part,
topicId,
0,
isValidLeader,
leaderRecoveryMetadataVersion(false),
2
);
// Before we build the new PartitionChangeBuilder, confirm the current leader is 0.
assertEquals(0, part.leader);
// The important part is that the new leader is NO_LEADER.
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
setTopicId(topicId).
setPartitionId(0).
setReplicas(Arrays.asList(2, 3)).
setIsr(Arrays.asList(2, 3)).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
setLeader(NO_LEADER),
(short) 0)),
partitionChangeBuilder.setTargetIsr(Arrays.asList(0, 1, 2, 3)).
build());
}
private MetadataVersion leaderRecoveryMetadataVersion(boolean isSupported) {
if (isSupported) {
return MetadataVersion.IBP_3_2_IV0;
} else {
return MetadataVersion.IBP_3_1_IV0;
}
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testEligibleLeaderReplicas_IsrShrinkBelowMinISR(short version) {
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setDirectories(new Uuid[] {
Uuid.fromString("NeQeLdHhSXi4tQGaFcszKA"),
Uuid.fromString("LsVrQZ73RSSuEWA8hhqQhg"),
Uuid.fromString("0IaY4zXKRR6sROgE8yHfnw"),
Uuid.fromString("1WxphfLCSZqMHKK4JMppuw")
})
.setIsr(new int[] {1, 2, 3, 4})
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
.setLeaderEpoch(100)
.setPartitionEpoch(200)
.build();
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3,
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);
// Update ISR to {1, 2}
builder.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2)));
PartitionChangeRecord record = new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(0)
.setIsr(Arrays.asList(1, 2))
.setLeader(-2)
.setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
if (version >= 2) {
record.setEligibleLeaderReplicas(Arrays.asList(3, 4));
}
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version);
assertEquals(Optional.of(expectedRecord), builder.build());
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
if (version >= 2) {
assertArrayEquals(new int[]{3, 4}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
} else {
assertEquals(0, partition.elr.length);
assertEquals(0, partition.lastKnownElr.length);
}
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testEligibleLeaderReplicas_IsrExpandAboveMinISR(short version) {
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setDirectories(new Uuid[]{
Uuid.fromString("CWgRKBKkToGn1HKzNb2qqQ"),
Uuid.fromString("SCnk7zfSQSmlKqvV702d3A"),
Uuid.fromString("9tO0QHlJRhimjKfH8m9d8A"),
Uuid.fromString("JaaqVOxNT2OGVNCCIFA2JQ")
})
.setIsr(new int[] {1, 2})
.setElr(new int[] {3})
.setLastKnownElr(new int[] {4})
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
.setLeaderEpoch(100)
.setPartitionEpoch(200)
.build();
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
// Min ISR is 3.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3,
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);
builder.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2, 3)));
PartitionChangeRecord record = new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(0)
.setIsr(Arrays.asList(1, 2, 3))
.setLeader(-2)
.setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
// Both versions will set the elr and lastKnownElr as empty list.
record.setEligibleLeaderReplicas(Collections.emptyList())
.setLastKnownElr(Collections.emptyList());
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version);
assertEquals(Optional.of(expectedRecord), builder.build());
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
assertEquals(0, partition.elr.length);
assertEquals(0, partition.lastKnownElr.length);
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testEligibleLeaderReplicas_IsrAddNewMemberNotInELR(short version) {
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setDirectories(new Uuid[]{
Uuid.fromString("gPcIwlldQXikdUB3F4GB6w"),
Uuid.fromString("gFs7V8mKR66z8e5qwtjIMA"),
Uuid.fromString("zKHU2fwrRkuypqTgITl46g"),
Uuid.fromString("zEgmBBh8QJGqbBIvzvH7JA")
})
.setIsr(new int[] {1})
.setElr(new int[] {3})
.setLastKnownElr(new int[] {2})
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
.setLeaderEpoch(100)
.setPartitionEpoch(200)
.build();
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
// Min ISR is 3.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3,
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);
builder.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 4)));
PartitionChangeRecord record = new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(0)
.setIsr(Arrays.asList(1, 4))
.setLeader(-2)
.setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
if (version < 2) {
record.setEligibleLeaderReplicas(Collections.emptyList());
record.setLastKnownElr(Collections.emptyList());
}
// No change is expected to ELR/LastKnownElr.
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version);
assertEquals(Optional.of(expectedRecord), builder.build());
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
if (version >= 2) {
assertArrayEquals(new int[]{3}, partition.elr, partition.toString());
assertArrayEquals(new int[]{2}, partition.lastKnownElr, partition.toString());
} else {
assertArrayEquals(new int[]{}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
}
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testEligibleLeaderReplicas_RemoveUncleanShutdownReplicasFromElr(short version) {
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setDirectories(new Uuid[] {
Uuid.fromString("keB9ssIPRlibyVJT5FcBVA"),
Uuid.fromString("FhezfoReTSmHoKxi8wOIOg"),
Uuid.fromString("QHtFxu8LShm6RiyAP6PxYg"),
Uuid.fromString("tUJOMtvMQkGga30ydluvbQ")
})
.setIsr(new int[] {1})
.setElr(new int[] {2, 3})
.setLastKnownElr(new int[] {})
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
.setLeaderEpoch(100)
.setPartitionEpoch(200)
.build();
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
// Min ISR is 3.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3,
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(false);
builder.setUncleanShutdownReplicas(Arrays.asList(3));
PartitionChangeRecord record = new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(0)
.setLeader(-2)
.setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
if (version >= 2) {
record.setEligibleLeaderReplicas(Arrays.asList(2))
.setLastKnownElr(Arrays.asList(3));
} else {
record.setEligibleLeaderReplicas(Collections.emptyList());
}
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version);
assertEquals(Optional.of(expectedRecord), builder.build());
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
if (version >= 2) {
assertArrayEquals(new int[]{2}, partition.elr, partition.toString());
assertArrayEquals(new int[]{3}, partition.lastKnownElr, partition.toString());
} else {
assertArrayEquals(new int[]{}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
}
}
@Test
public void testKeepsDirectoriesAfterReassignment() {
PartitionRegistration registration = new PartitionRegistration.Builder().
setReplicas(new int[]{2, 1, 3}).
setDirectories(new Uuid[]{
Uuid.fromString("v1PVrX6uS5m8CByXlLfmWg"),
Uuid.fromString("iU2znv45Q9yQkOpkTSy3jA"),
Uuid.fromString("fM5NKyWTQHqEihjIkUl99Q")
}).
setIsr(new int[]{2, 1, 3}).
setLeader(1).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(100).
setPartitionEpoch(200).
build();
Optional<ApiMessageAndVersion> built = new PartitionChangeBuilder(registration, FOO_ID,
0, r -> true, MetadataVersion.IBP_3_7_IV2, 2).
setTargetReplicas(Arrays.asList(3, 1, 5, 4)).
setDirectory(5, Uuid.fromString("RNJ5oFjjSSWMMFRwqdCfJg")).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
build();
Optional<ApiMessageAndVersion> expected = Optional.of(new ApiMessageAndVersion(
new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setLeader(1).
setReplicas(Arrays.asList(3, 1, 5, 4)).
setDirectories(Arrays.asList(
Uuid.fromString("fM5NKyWTQHqEihjIkUl99Q"),
Uuid.fromString("iU2znv45Q9yQkOpkTSy3jA"),
Uuid.fromString("RNJ5oFjjSSWMMFRwqdCfJg"),
DirectoryId.UNASSIGNED
)),
(short) 1
));
assertEquals(expected, built);
}
@Test
public void testUpdateDirectories() {
PartitionRegistration registration = new PartitionRegistration.Builder().
setReplicas(new int[]{2, 1, 3}).
setDirectories(new Uuid[]{
Uuid.fromString("S1zMYZczRjWmucidLqGA5g"),
Uuid.fromString("9eRNXTvFTsWUJObvW51V5A"),
Uuid.fromString("UpePYVBgRAi3c4ujQrf3Kg")
}).
setIsr(new int[]{2, 1, 3}).
setLeader(2).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(100).
setPartitionEpoch(200).
build();
Optional<ApiMessageAndVersion> built = new PartitionChangeBuilder(registration, FOO_ID,
0, r -> true, MetadataVersion.latestTesting(), 2).
setDirectory(3, Uuid.fromString("pN1VKs9zRzK4APflpegAVg")).
setDirectory(1, DirectoryId.LOST).
setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
build();
Optional<ApiMessageAndVersion> expected = Optional.of(new ApiMessageAndVersion(
new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setDirectories(Arrays.asList(
Uuid.fromString("S1zMYZczRjWmucidLqGA5g"),
DirectoryId.LOST,
Uuid.fromString("pN1VKs9zRzK4APflpegAVg")
)),
(short) 2
));
assertEquals(expected, built);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEligibleLeaderReplicas_ElrCanBeElected(boolean lastKnownLeaderEnabled) {
short version = 2;
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setDirectories(DirectoryId.migratingArray(4))
.setIsr(new int[] {1})
.setElr(new int[] {3})
.setLastKnownElr(lastKnownLeaderEnabled ? new int[] {} : new int[] {2})
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
.setLeaderEpoch(100)
.setPartitionEpoch(200)
.build();
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
// Make replica 1 offline.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 1,
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
.setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled)
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER);
builder.setTargetIsr(Collections.emptyList());
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(
new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(0)
.setIsr(Arrays.asList(3))
.setEligibleLeaderReplicas(Arrays.asList(1))
.setLeader(3)
.setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE),
version
);
assertEquals(Optional.of(expectedRecord), builder.build());
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
assertArrayEquals(new int[]{1}, partition.elr, partition.toString());
assertArrayEquals(lastKnownLeaderEnabled ? new int[]{} : new int[]{2}, partition.lastKnownElr, partition.toString());
assertArrayEquals(new int[]{3}, partition.isr, partition.toString());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEligibleLeaderReplicas_IsrCanShrinkToZero(boolean lastKnownLeaderEnabled) {
short version = 2;
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setDirectories(new Uuid[]{
Uuid.fromString("MrTKKPEpRv66ZpWv4V7EBQ"),
Uuid.fromString("CkvgdEcWTVmdhfNuJXL0xA"),
Uuid.fromString("4a2coMsPRkSCsiTVWSksSw"),
Uuid.fromString("tmPdVjzASZ2ZqiS0cVJvtQ")
})
.setIsr(new int[] {1, 2, 3, 4})
.setElr(new int[] {})
.setLastKnownElr(new int[] {})
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
.setLeaderEpoch(100)
.setPartitionEpoch(200)
.build();
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
// Mark all the replicas offline.
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false,
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(true)
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
builder.setTargetIsr(Collections.emptyList());
PartitionChangeRecord record = new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(0)
.setIsr(Collections.emptyList())
.setLeader(-1)
.setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE)
.setEligibleLeaderReplicas(Arrays.asList(1, 2, 3, 4));
if (lastKnownLeaderEnabled) {
record.setLastKnownElr(Arrays.asList(1));
}
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version);
assertEquals(Optional.of(expectedRecord), builder.build());
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
assertArrayEquals(new int[]{1, 2, 3, 4}, partition.elr, partition.toString());
if (lastKnownLeaderEnabled) {
assertArrayEquals(new int[]{1}, partition.lastKnownElr, partition.toString());
builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false,
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(true)
.setUncleanShutdownReplicas(Arrays.asList(2))
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
PartitionChangeRecord changeRecord = (PartitionChangeRecord) builder.build().get().message();
assertNull(changeRecord.lastKnownElr(), changeRecord.toString());
} else {
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
}
assertArrayEquals(new int[]{}, partition.isr, partition.toString());
}
@Test
public void testEligibleLeaderReplicas_ElectLastKnownLeader() {
short version = 2;
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setDirectories(DirectoryId.migratingArray(4))
.setIsr(new int[] {})
.setElr(new int[] {})
.setLastKnownElr(new int[] {1})
.setLeader(-1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
.setLeaderEpoch(100)
.setPartitionEpoch(200)
.build();
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> true,
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setUseLastKnownLeaderInBalancedRecovery(true)
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setEligibleLeaderReplicasEnabled(true);
builder.setTargetIsr(Collections.emptyList());
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(
new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(0)
.setIsr(Arrays.asList(1))
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value())
.setLastKnownElr(Collections.emptyList()),
version
);
assertEquals(Optional.of(expectedRecord), builder.build());
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
assertArrayEquals(new int[]{}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
}
@Test
public void testEligibleLeaderReplicas_ElectLastKnownLeaderShouldFail() {
short version = 2;
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setDirectories(new Uuid[]{
Uuid.fromString("zANDdMukTEqefOvHpmniMg"),
Uuid.fromString("Ui2Eq8rbRiuW7m7uiPTRyg"),
Uuid.fromString("MhgJOZrrTsKNcGM0XKK4aA"),
Uuid.fromString("Y25PaCAmRfyGIKxAThhBAw")
})
.setIsr(new int[] {})
.setElr(new int[] {3})
.setLastKnownElr(new int[] {1})
.setLeader(-1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
.setLeaderEpoch(100)
.setPartitionEpoch(200)
.build();
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> r != 3,
metadataVersionForPartitionChangeRecordVersion(version), 3)
.setElection(Election.PREFERRED)
.setEligibleLeaderReplicasEnabled(true)
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
.setUseLastKnownLeaderInBalancedRecovery(true);
builder.setTargetIsr(Collections.emptyList());
// No change to the partition.
assertEquals(Optional.empty(), builder.build());
}
}