blob: a8b001c63893156522536273ec21afa3dea917c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
import org.apache.kafka.common.message.AlterPartitionRequestData.PartitionData;
import org.apache.kafka.common.message.AlterPartitionRequestData.TopicData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitionsCollection;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatState;
import org.apache.kafka.controller.ReplicationControlManager.KRaftClusterDescriber;
import org.apache.kafka.metadata.AssignmentsHelper;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.FakeKafkaConfigSchema;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.server.util.MockRandom;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.config.TopicConfig.SEGMENT_BYTES_CONFIG;
import static org.apache.kafka.common.metadata.MetadataRecordType.CLEAR_ELR_RECORD;
import static org.apache.kafka.common.protocol.Errors.ELECTION_NOT_NEEDED;
import static org.apache.kafka.common.protocol.Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
import static org.apache.kafka.common.protocol.Errors.INELIGIBLE_REPLICA;
import static org.apache.kafka.common.protocol.Errors.INVALID_PARTITIONS;
import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICATION_FACTOR;
import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT;
import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED;
import static org.apache.kafka.common.protocol.Errors.NONE;
import static org.apache.kafka.common.protocol.Errors.NOT_CONTROLLER;
import static org.apache.kafka.common.protocol.Errors.NOT_LEADER_OR_FOLLOWER;
import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED;
import static org.apache.kafka.common.protocol.Errors.POLICY_VIOLATION;
import static org.apache.kafka.common.protocol.Errors.PREFERRED_LEADER_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.THROTTLING_QUOTA_EXCEEDED;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static org.apache.kafka.controller.ControllerRequestContextUtil.QUOTA_EXCEEDED_IN_TEST_MSG;
import static org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor;
import static org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextWithMutationQuotaExceededFor;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.apache.kafka.metadata.placement.PartitionAssignmentTest.partitionAssignment;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(40)
public class ReplicationControlManagerTest {
private static final Logger log = LoggerFactory.getLogger(ReplicationControlManagerTest.class);
private static final int BROKER_SESSION_TIMEOUT_MS = 1000;
private static class ReplicationControlTestContext {
private static class Builder {
private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
private MetadataVersion metadataVersion = MetadataVersion.latestTesting();
private MockTime mockTime = new MockTime();
private boolean isElrEnabled = false;
private final Map<String, Object> staticConfig = new HashMap<>();
Builder setCreateTopicPolicy(CreateTopicPolicy createTopicPolicy) {
this.createTopicPolicy = Optional.of(createTopicPolicy);
return this;
}
Builder setMetadataVersion(MetadataVersion metadataVersion) {
this.metadataVersion = metadataVersion;
return this;
}
Builder setIsElrEnabled(boolean isElrEnabled) {
this.isElrEnabled = isElrEnabled;
return this;
}
Builder setStaticConfig(String key, Object value) {
this.staticConfig.put(key, value);
return this;
}
Builder setMockTime(MockTime mockTime) {
this.mockTime = mockTime;
return this;
}
ReplicationControlTestContext build() {
return new ReplicationControlTestContext(metadataVersion,
createTopicPolicy,
mockTime,
isElrEnabled,
staticConfig);
}
}
final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
final LogContext logContext = new LogContext();
final MockTime time;
final MockRandom random = new MockRandom();
final FeatureControlManager featureControl;
final ClusterControlManager clusterControl;
final ConfigurationControlManager configurationControl;
final ReplicationControlManager replicationControl;
final OffsetControlManager offsetControlManager;
void replay(List<ApiMessageAndVersion> records) {
RecordTestUtils.replayAll(clusterControl, records);
RecordTestUtils.replayAll(configurationControl, records);
RecordTestUtils.replayAll(replicationControl, records);
}
private ReplicationControlTestContext(
MetadataVersion metadataVersion,
Optional<CreateTopicPolicy> createTopicPolicy,
MockTime time,
boolean isElrEnabled,
Map<String, Object> staticConfig
) {
this.time = time;
this.featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
setMetadataVersion(metadataVersion).
build();
featureControl.replay(new FeatureLevelRecord()
.setName(EligibleLeaderReplicasVersion.FEATURE_NAME)
.setFeatureLevel(isElrEnabled ?
EligibleLeaderReplicasVersion.ELRV_1.featureLevel() :
EligibleLeaderReplicasVersion.ELRV_0.featureLevel())
);
this.clusterControl = new ClusterControlManager.Builder().
setLogContext(logContext).
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS)).
setReplicaPlacer(new StripedReplicaPlacer(random)).
setFeatureControlManager(featureControl).
setBrokerShutdownHandler(this::handleBrokerShutdown).
build();
this.configurationControl = new ConfigurationControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setFeatureControl(featureControl).
setStaticConfig(staticConfig).
setKafkaConfigSchema(FakeKafkaConfigSchema.INSTANCE).
build();
this.offsetControlManager = new OffsetControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
build();
this.replicationControl = new ReplicationControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setLogContext(logContext).
setMaxElectionsPerImbalance(Integer.MAX_VALUE).
setConfigurationControl(configurationControl).
setClusterControl(clusterControl).
setCreateTopicPolicy(createTopicPolicy).
setFeatureControl(featureControl).
build();
clusterControl.activate();
}
void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List<ApiMessageAndVersion> records) {
replicationControl.handleBrokerShutdown(brokerId, isCleanShutdown, records);
}
CreatableTopicResult createTestTopic(String name,
int numPartitions,
short replicationFactor,
short expectedErrorCode) {
CreateTopicsRequestData request = new CreateTopicsRequestData();
CreatableTopic topic = new CreatableTopic().setName(name);
topic.setNumPartitions(numPartitions).setReplicationFactor(replicationFactor);
request.topics().add(topic);
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
replicationControl.createTopics(requestContext, request, Collections.singleton(name));
CreatableTopicResult topicResult = result.response().topics().find(name);
assertNotNull(topicResult);
assertEquals(expectedErrorCode, topicResult.errorCode());
if (expectedErrorCode == NONE.code()) {
replay(result.records());
}
return topicResult;
}
CreatableTopicResult createTestTopic(String name, int[][] replicas) {
return createTestTopic(name, replicas, Collections.emptyMap(), (short) 0);
}
CreatableTopicResult createTestTopic(String name, int[][] replicas,
short expectedErrorCode) {
return createTestTopic(name, replicas, Collections.emptyMap(), expectedErrorCode);
}
CreatableTopicResult createTestTopic(String name, int[][] replicas,
Map<String, String> configs,
short expectedErrorCode) {
assertNotEquals(0, replicas.length);
CreateTopicsRequestData request = new CreateTopicsRequestData();
CreatableTopic topic = new CreatableTopic().setName(name);
topic.setNumPartitions(-1).setReplicationFactor((short) -1);
for (int i = 0; i < replicas.length; i++) {
topic.assignments().add(new CreatableReplicaAssignment().
setPartitionIndex(i).setBrokerIds(Replicas.toList(replicas[i])));
}
configs.forEach((key, value) -> topic.configs().add(
new CreateTopicsRequestData.CreatableTopicConfig()
.setName(key)
.setValue(value)
)
);
request.topics().add(topic);
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
replicationControl.createTopics(requestContext, request, Collections.singleton(name));
CreatableTopicResult topicResult = result.response().topics().find(name);
assertNotNull(topicResult);
assertEquals(expectedErrorCode, topicResult.errorCode());
if (expectedErrorCode == NONE.code()) {
assertEquals(replicas.length, topicResult.numPartitions());
assertEquals(replicas[0].length, topicResult.replicationFactor());
replay(result.records());
}
return topicResult;
}
void deleteTopic(ControllerRequestContext context, Uuid topicId) {
ControllerResult<Map<Uuid, ApiError>> result = replicationControl.deleteTopics(context, Collections.singleton(topicId));
assertEquals(Collections.singleton(topicId), result.response().keySet());
assertEquals(NONE, result.response().get(topicId).error());
assertEquals(1, result.records().size());
ApiMessageAndVersion removeRecordAndVersion = result.records().get(0);
assertInstanceOf(RemoveTopicRecord.class, removeRecordAndVersion.message());
RemoveTopicRecord removeRecord = (RemoveTopicRecord) removeRecordAndVersion.message();
assertEquals(topicId, removeRecord.topicId());
replay(result.records());
}
void createPartitions(int count, String name, int[][] replicas, short expectedErrorCode) {
assertNotEquals(0, replicas.length);
CreatePartitionsTopic topic = new CreatePartitionsTopic().
setName(name).
setCount(count);
for (int[] replica : replicas) {
topic.assignments().add(new CreatePartitionsAssignment().
setBrokerIds(Replicas.toList(replica)));
}
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
ControllerResult<List<CreatePartitionsTopicResult>> result =
replicationControl.createPartitions(requestContext, Collections.singletonList(topic));
assertEquals(1, result.response().size());
CreatePartitionsTopicResult topicResult = result.response().get(0);
assertEquals(name, topicResult.name());
assertEquals(expectedErrorCode, topicResult.errorCode());
replay(result.records());
}
void registerBrokers(Integer... brokerIds) {
Object[] brokersAndDirs = new Object[brokerIds.length * 2];
for (int i = 0; i < brokerIds.length; i++) {
brokersAndDirs[i * 2] = brokerIds[i];
brokersAndDirs[i * 2 + 1] = Collections.singletonList(
Uuid.fromString("TESTBROKER" + Integer.toString(100000 + brokerIds[i]).substring(1) + "DIRAAAA")
);
}
registerBrokersWithDirs(brokersAndDirs);
}
@SuppressWarnings("unchecked")
void registerBrokersWithDirs(Object... brokerIdsAndDirs) {
if (brokerIdsAndDirs.length % 2 != 0) {
throw new IllegalArgumentException("uneven number of arguments");
}
for (int i = 0; i < brokerIdsAndDirs.length / 2; i++) {
int brokerId = (int) brokerIdsAndDirs[i * 2];
List<Uuid> logDirs = (List<Uuid>) brokerIdsAndDirs[i * 2 + 1];
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
setBrokerEpoch(defaultBrokerEpoch(brokerId)).setBrokerId(brokerId).
setRack(null).setLogDirs(logDirs);
brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
setPort((short) 9092 + brokerId).
setName("PLAINTEXT").
setHost("localhost"));
replay(Collections.singletonList(new ApiMessageAndVersion(brokerRecord, (short) 3)));
}
}
void handleBrokersShutdown(boolean isCleanShutdown, Integer... brokerIds) {
List<ApiMessageAndVersion> records = new ArrayList<>();
for (int brokerId : brokerIds) {
replicationControl.handleBrokerShutdown(brokerId, isCleanShutdown, records);
}
replay(records);
}
void alterPartition(
TopicIdPartition topicIdPartition,
int leaderId,
List<BrokerState> isrWithEpoch,
LeaderRecoveryState leaderRecoveryState
) {
BrokerRegistration registration = clusterControl.brokerRegistrations().get(leaderId);
assertFalse(registration.fenced());
PartitionRegistration partition = replicationControl.getPartition(
topicIdPartition.topicId(),
topicIdPartition.partitionId()
);
assertNotNull(partition);
assertEquals(leaderId, partition.leader);
PartitionData partitionData = new PartitionData()
.setPartitionIndex(topicIdPartition.partitionId())
.setPartitionEpoch(partition.partitionEpoch)
.setLeaderEpoch(partition.leaderEpoch)
.setLeaderRecoveryState(leaderRecoveryState.value())
.setNewIsrWithEpochs(isrWithEpoch);
String topicName = replicationControl.getTopic(topicIdPartition.topicId()).name();
TopicData topicData = new TopicData()
.setTopicName(topicName)
.setTopicId(topicIdPartition.topicId())
.setPartitions(singletonList(partitionData));
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.ALTER_PARTITION);
ControllerResult<AlterPartitionResponseData> alterPartition = replicationControl.alterPartition(
requestContext,
new AlterPartitionRequestData()
.setBrokerId(leaderId)
.setBrokerEpoch(registration.epoch())
.setTopics(singletonList(topicData)));
replay(alterPartition.records());
}
void unfenceBrokers(Integer... brokerIds) {
for (int brokerId : brokerIds) {
clusterControl.trackBrokerHeartbeat(brokerId, defaultBrokerEpoch(brokerId));
ControllerResult<BrokerHeartbeatReply> result = replicationControl.
processBrokerHeartbeat(new BrokerHeartbeatRequestData().
setBrokerId(brokerId).setBrokerEpoch(defaultBrokerEpoch(brokerId)).
setCurrentMetadataOffset(1).
setWantFence(false).setWantShutDown(false), 0);
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
result.response());
replay(result.records());
}
}
void inControlledShutdownBrokers(Integer... brokerIds) {
for (int brokerId : brokerIds) {
BrokerRegistrationChangeRecord record = new BrokerRegistrationChangeRecord()
.setBrokerId(brokerId)
.setBrokerEpoch(defaultBrokerEpoch(brokerId))
.setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
replay(singletonList(new ApiMessageAndVersion(record, (short) 1)));
}
}
void alterTopicConfig(
String topic,
String configKey,
String configValue
) {
ConfigRecord configRecord = new ConfigRecord()
.setResourceType(ConfigResource.Type.TOPIC.id())
.setResourceName(topic)
.setName(configKey)
.setValue(configValue);
replay(singletonList(new ApiMessageAndVersion(configRecord, (short) 0)));
}
void fenceBrokers(Integer... brokerIds) {
fenceBrokers(Set.of(brokerIds));
}
void fenceBrokers(Set<Integer> brokerIds) {
time.sleep(BROKER_SESSION_TIMEOUT_MS);
Set<Integer> unfencedBrokerIds = clusterControl.brokerRegistrations().keySet().stream()
.filter(brokerId -> !brokerIds.contains(brokerId))
.collect(Collectors.toSet());
unfenceBrokers(unfencedBrokerIds.toArray(new Integer[0]));
ControllerResult<Boolean> fenceResult;
do {
fenceResult = replicationControl.maybeFenceOneStaleBroker();
replay(fenceResult.records());
} while (fenceResult.response().booleanValue());
assertEquals(brokerIds, fencedBrokerIds());
}
long currentBrokerEpoch(int brokerId) {
Map<Integer, BrokerRegistration> registrations = clusterControl.brokerRegistrations();
BrokerRegistration registration = registrations.get(brokerId);
assertNotNull(registration, "No current registration for broker " + brokerId);
return registration.epoch();
}
OptionalInt currentLeader(TopicIdPartition topicIdPartition) {
PartitionRegistration partition = replicationControl.
getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
return (partition.leader < 0) ? OptionalInt.empty() : OptionalInt.of(partition.leader);
}
ControllerResult<AssignReplicasToDirsResponseData> assignReplicasToDirs(int brokerId, Map<TopicIdPartition, Uuid> assignment) {
ControllerResult<AssignReplicasToDirsResponseData> result = replicationControl.handleAssignReplicasToDirs(
AssignmentsHelper.buildRequestData(brokerId, defaultBrokerEpoch(brokerId), assignment));
assertNotNull(result.response());
assertEquals(NONE.code(), result.response().errorCode());
replay(result.records());
return result;
}
Set<Integer> fencedBrokerIds() {
return clusterControl.brokerRegistrations().values()
.stream()
.filter(BrokerRegistration::fenced)
.map(BrokerRegistration::id)
.collect(Collectors.toSet());
}
}
static CreateTopicsResponseData withoutConfigs(CreateTopicsResponseData data) {
data.topics().forEach(t -> t.configs().clear());
return data;
}
private static class MockCreateTopicPolicy implements CreateTopicPolicy {
private final List<RequestMetadata> expecteds;
private final AtomicLong index = new AtomicLong(0);
MockCreateTopicPolicy(List<RequestMetadata> expecteds) {
this.expecteds = expecteds;
}
@Override
public void validate(RequestMetadata actual) throws PolicyViolationException {
long curIndex = index.getAndIncrement();
if (curIndex >= expecteds.size()) {
throw new PolicyViolationException("Unexpected topic creation: index " +
"out of range at " + curIndex);
}
RequestMetadata expected = expecteds.get((int) curIndex);
if (!expected.equals(actual)) {
throw new PolicyViolationException("Expected: " + expected +
". Got: " + actual);
}
}
@Override
public void close() {
// nothing to do
}
@Override
public void configure(Map<String, ?> configs) {
// nothing to do
}
}
@Test
public void testExcessiveNumberOfTopicsCannotBeCreated() {
// number of partitions is explicitly set without assignments
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(5000).setReplicationFactor((short) 1));
request.topics().add(new CreatableTopic().setName("bar").
setNumPartitions(5000).setReplicationFactor((short) 1));
request.topics().add(new CreatableTopic().setName("baz").
setNumPartitions(1).setReplicationFactor((short) 1));
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
PolicyViolationException error = assertThrows(
PolicyViolationException.class,
() -> replicationControl.createTopics(requestContext, request, Set.of("foo", "bar", "baz")));
assertEquals(error.getMessage(), "Excessively large number of partitions per request.");
}
@Test
public void testExcessiveNumberOfTopicsCannotBeCreatedWithAssignments() {
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(-1).setReplicationFactor((short) 1));
CreateTopicsRequestData.CreatableReplicaAssignmentCollection assignments =
new CreateTopicsRequestData.CreatableReplicaAssignmentCollection();
assignments.add(new CreatableReplicaAssignment().setPartitionIndex(1));
assignments.add(new CreatableReplicaAssignment().setPartitionIndex(2));
request.topics().add(new CreatableTopic()
.setName("baz")
.setAssignments(assignments));
PolicyViolationException error = assertThrows(
PolicyViolationException.class,
() -> ReplicationControlManager.validateTotalNumberOfPartitions(request, 9999)
);
assertEquals(error.getMessage(), "Excessively large number of partitions per request.");
}
@Test
public void testCreateTopics() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(-1).setReplicationFactor((short) -1));
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
setErrorMessage("Unable to replicate the partition 3 time(s): All " +
"brokers are currently fenced."));
assertEquals(expectedResponse, result.response());
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0);
ctx.inControlledShutdownBrokers(0);
ControllerResult<CreateTopicsResponseData> result2 =
replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData();
expectedResponse2.topics().add(new CreatableTopicResult().setName("foo").
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
setErrorMessage("Unable to replicate the partition 3 time(s): All " +
"brokers are currently fenced or in controlled shutdown."));
assertEquals(expectedResponse2, result2.response());
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
ControllerResult<CreateTopicsResponseData> result3 =
replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData();
expectedResponse3.topics().add(new CreatableTopicResult().setName("foo").
setNumPartitions(1).setReplicationFactor((short) 3).
setErrorMessage(null).setErrorCode((short) 0).
setTopicId(result3.response().topics().find("foo").topicId()));
assertEquals(expectedResponse3, withoutConfigs(result3.response()));
ctx.replay(result3.records());
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 0}).
setDirectories(new Uuid[] {
Uuid.fromString("TESTBROKER00001DIRAAAA"),
Uuid.fromString("TESTBROKER00002DIRAAAA"),
Uuid.fromString("TESTBROKER00000DIRAAAA")
}).
setIsr(new int[] {1, 2, 0}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build(),
replicationControl.getPartition(
((TopicRecord) result3.records().get(0).message()).topicId(), 0));
ControllerResult<CreateTopicsResponseData> result4 =
replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData();
expectedResponse4.topics().add(new CreatableTopicResult().setName("foo").
setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).
setErrorMessage("Topic 'foo' already exists."));
assertEquals(expectedResponse4, result4.response());
}
@Test
public void testCreateTopicsWithMutationQuotaExceeded() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(-1).setReplicationFactor((short) -1));
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
ControllerRequestContext requestContext =
anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
setErrorMessage(QUOTA_EXCEEDED_IN_TEST_MSG));
assertEquals(expectedResponse, result.response());
}
@Test
public void testCreateTopicsISRInvariants() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(-1).setReplicationFactor((short) -1));
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1);
ctx.inControlledShutdownBrokers(1);
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
setNumPartitions(1).setReplicationFactor((short) 3).
setErrorMessage(null).setErrorCode((short) 0).
setTopicId(result.response().topics().find("foo").topicId()));
for (CreatableTopicResult topic : result.response().topics()) {
topic.configs().clear();
}
assertEquals(expectedResponse, result.response());
ctx.replay(result.records());
// Broker 2 cannot be in the ISR because it is fenced and broker 1
// cannot be in the ISR because it is in controlled shutdown.
assertEquals(
new PartitionRegistration.Builder().setReplicas(new int[]{1, 0, 2}).
setDirectories(new Uuid[] {
Uuid.fromString("TESTBROKER00001DIRAAAA"),
Uuid.fromString("TESTBROKER00000DIRAAAA"),
Uuid.fromString("TESTBROKER00002DIRAAAA")
}).
setIsr(new int[]{0}).
setLeader(0).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(0).
setPartitionEpoch(0).build(),
replicationControl.getPartition(
((TopicRecord) result.records().get(0).message()).topicId(), 0));
}
@Test
public void testCreateTopicsWithConfigs() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreateTopicsRequestData.CreatableTopicConfigCollection validConfigs =
new CreateTopicsRequestData.CreatableTopicConfigCollection();
validConfigs.add(
new CreateTopicsRequestData.CreatableTopicConfig()
.setName("foo")
.setValue("notNull")
);
CreateTopicsRequestData request1 = new CreateTopicsRequestData();
request1.topics().add(new CreatableTopic().setName("foo")
.setNumPartitions(-1).setReplicationFactor((short) -1)
.setConfigs(validConfigs));
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result1 =
replicationControl.createTopics(requestContext, request1, Collections.singleton("foo"));
assertEquals((short) 0, result1.response().topics().find("foo").errorCode());
List<ApiMessageAndVersion> records1 = result1.records();
assertEquals(3, records1.size());
ApiMessageAndVersion record0 = records1.get(0);
assertEquals(TopicRecord.class, record0.message().getClass());
ApiMessageAndVersion record1 = records1.get(1);
assertEquals(ConfigRecord.class, record1.message().getClass());
ApiMessageAndVersion lastRecord = records1.get(2);
assertEquals(PartitionRecord.class, lastRecord.message().getClass());
ctx.replay(result1.records());
assertEquals(
"notNull",
ctx.configurationControl.getConfigs(new ConfigResource(ConfigResource.Type.TOPIC, "foo")).get("foo")
);
CreateTopicsRequestData.CreatableTopicConfigCollection invalidConfigs =
new CreateTopicsRequestData.CreatableTopicConfigCollection();
invalidConfigs.add(
new CreateTopicsRequestData.CreatableTopicConfig()
.setName("foo")
.setValue(null)
);
CreateTopicsRequestData request2 = new CreateTopicsRequestData();
request2.topics().add(new CreatableTopic().setName("bar")
.setNumPartitions(-1).setReplicationFactor((short) -1)
.setConfigs(invalidConfigs));
ControllerResult<CreateTopicsResponseData> result2 =
replicationControl.createTopics(requestContext, request2, Collections.singleton("bar"));
assertEquals(Errors.INVALID_CONFIG.code(), result2.response().topics().find("bar").errorCode());
assertEquals(
"Null value not supported for topic configs: foo",
result2.response().topics().find("bar").errorMessage()
);
CreateTopicsRequestData request3 = new CreateTopicsRequestData();
request3.topics().add(new CreatableTopic().setName("baz")
.setNumPartitions(-1).setReplicationFactor((short) -2)
.setConfigs(validConfigs));
ControllerResult<CreateTopicsResponseData> result3 =
replicationControl.createTopics(requestContext, request3, Collections.singleton("baz"));
assertEquals(INVALID_REPLICATION_FACTOR.code(), result3.response().topics().find("baz").errorCode());
assertEquals(Collections.emptyList(), result3.records());
// Test request with multiple topics together.
CreateTopicsRequestData request4 = new CreateTopicsRequestData();
String batchedTopic1 = "batched-topic-1";
request4.topics().add(new CreatableTopic().setName(batchedTopic1)
.setNumPartitions(-1).setReplicationFactor((short) -1)
.setConfigs(validConfigs));
String batchedTopic2 = "batched-topic2";
request4.topics().add(new CreatableTopic().setName(batchedTopic2)
.setNumPartitions(-1).setReplicationFactor((short) -2)
.setConfigs(validConfigs));
Set<String> request4Topics = new HashSet<>();
request4Topics.add(batchedTopic1);
request4Topics.add(batchedTopic2);
ControllerResult<CreateTopicsResponseData> result4 =
replicationControl.createTopics(requestContext, request4, request4Topics);
assertEquals(Errors.NONE.code(), result4.response().topics().find(batchedTopic1).errorCode());
assertEquals(INVALID_REPLICATION_FACTOR.code(), result4.response().topics().find(batchedTopic2).errorCode());
assertEquals(3, result4.records().size());
assertEquals(TopicRecord.class, result4.records().get(0).message().getClass());
TopicRecord batchedTopic1Record = (TopicRecord) result4.records().get(0).message();
assertEquals(batchedTopic1, batchedTopic1Record.name());
assertEquals(new ConfigRecord()
.setResourceName(batchedTopic1)
.setResourceType(ConfigResource.Type.TOPIC.id())
.setName("foo")
.setValue("notNull"),
result4.records().get(1).message());
assertEquals(PartitionRecord.class, result4.records().get(2).message().getClass());
assertEquals(batchedTopic1Record.topicId(), ((PartitionRecord) result4.records().get(2).message()).topicId());
}
@ParameterizedTest(name = "testCreateTopicsWithValidateOnlyFlag with mutationQuotaExceeded: {0}")
@ValueSource(booleans = {true, false})
public void testCreateTopicsWithValidateOnlyFlag(boolean mutationQuotaExceeded) {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreateTopicsRequestData request = new CreateTopicsRequestData().setValidateOnly(true);
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(1).setReplicationFactor((short) 3));
ControllerRequestContext requestContext = mutationQuotaExceeded ?
anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_TOPICS) :
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
ctx.replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
assertEquals(0, result.records().size());
CreatableTopicResult topicResult = result.response().topics().find("foo");
if (mutationQuotaExceeded) {
assertEquals(THROTTLING_QUOTA_EXCEEDED.code(), topicResult.errorCode());
} else {
assertEquals(NONE.code(), topicResult.errorCode());
}
}
@Test
public void testInvalidCreateTopicsWithValidateOnlyFlag() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreateTopicsRequestData request = new CreateTopicsRequestData().setValidateOnly(true);
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(1).setReplicationFactor((short) 4));
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
ctx.replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
assertEquals(0, result.records().size());
CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
setErrorMessage("Unable to replicate the partition 4 time(s): The target " +
"replication factor of 4 cannot be reached because only 3 broker(s) " +
"are registered."));
assertEquals(expectedResponse, result.response());
}
@Test
public void testCreateTopicsWithPolicy() {
MockCreateTopicPolicy createTopicPolicy = new MockCreateTopicPolicy(asList(
new CreateTopicPolicy.RequestMetadata("foo", 2, (short) 2,
null, Collections.emptyMap()),
new CreateTopicPolicy.RequestMetadata("bar", 3, (short) 2,
null, Collections.emptyMap()),
new CreateTopicPolicy.RequestMetadata("baz", null, null,
Collections.singletonMap(0, asList(2, 1, 0)),
Collections.singletonMap(SEGMENT_BYTES_CONFIG, "12300000")),
new CreateTopicPolicy.RequestMetadata("quux", null, null,
Collections.singletonMap(0, asList(2, 1, 0)), Collections.emptyMap())));
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().
setCreateTopicPolicy(createTopicPolicy).
build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
ctx.createTestTopic("foo", 2, (short) 2, NONE.code());
ctx.createTestTopic("bar", 3, (short) 3, POLICY_VIOLATION.code());
ctx.createTestTopic("baz", new int[][] {new int[] {2, 1, 0}},
Collections.singletonMap(SEGMENT_BYTES_CONFIG, "12300000"), NONE.code());
ctx.createTestTopic("quux", new int[][] {new int[] {1, 2, 0}}, POLICY_VIOLATION.code());
}
@Test
public void testCreateTopicWithCollisionChars() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreatableTopicResult initialTopic = ctx.createTestTopic("foo.bar", 2, (short) 2, NONE.code());
assertEquals(2, ctx.replicationControl.getTopic(initialTopic.topicId()).numPartitions(Long.MAX_VALUE));
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.DELETE_TOPICS);
ctx.deleteTopic(requestContext, initialTopic.topicId());
CreatableTopicResult recreatedTopic = ctx.createTestTopic("foo.bar", 4, (short) 2, NONE.code());
assertNotEquals(initialTopic.topicId(), recreatedTopic.topicId());
assertEquals(4, ctx.replicationControl.getTopic(recreatedTopic.topicId()).numPartitions(Long.MAX_VALUE));
}
@Test
public void testValidateNewTopicNames() {
Map<String, ApiError> topicErrors = new HashMap<>();
CreatableTopicCollection topics = new CreatableTopicCollection();
topics.add(new CreatableTopic().setName(""));
topics.add(new CreatableTopic().setName("woo"));
topics.add(new CreatableTopic().setName("."));
ReplicationControlManager.validateNewTopicNames(topicErrors, topics, Collections.emptyMap());
Map<String, ApiError> expectedTopicErrors = new HashMap<>();
expectedTopicErrors.put("", new ApiError(INVALID_TOPIC_EXCEPTION,
"Topic name is invalid: the empty string is not allowed"));
expectedTopicErrors.put(".", new ApiError(INVALID_TOPIC_EXCEPTION,
"Topic name is invalid: '.' is not allowed"));
assertEquals(expectedTopicErrors, topicErrors);
}
@Test
public void testTopicNameCollision() {
Map<String, ApiError> topicErrors = new HashMap<>();
CreatableTopicCollection topics = new CreatableTopicCollection();
topics.add(new CreatableTopic().setName("foo.bar"));
topics.add(new CreatableTopic().setName("woo.bar_foo"));
Map<String, Set<String>> collisionMap = new HashMap<>();
collisionMap.put("foo_bar", new TreeSet<>(singletonList("foo_bar")));
collisionMap.put("woo_bar_foo", new TreeSet<>(Arrays.asList("woo.bar.foo", "woo_bar.foo")));
ReplicationControlManager.validateNewTopicNames(topicErrors, topics, collisionMap);
Map<String, ApiError> expectedTopicErrors = new HashMap<>();
expectedTopicErrors.put("foo.bar", new ApiError(INVALID_TOPIC_EXCEPTION,
"Topic 'foo.bar' collides with existing topic: foo_bar"));
expectedTopicErrors.put("woo.bar_foo", new ApiError(INVALID_TOPIC_EXCEPTION,
"Topic 'woo.bar_foo' collides with existing topic: woo.bar.foo"));
assertEquals(expectedTopicErrors, topicErrors);
}
@Test
public void testRemoveLeaderships() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3);
ctx.unfenceBrokers(0, 1, 2, 3);
CreatableTopicResult result = ctx.createTestTopic("foo",
new int[][] {
new int[] {0, 1, 2},
new int[] {1, 2, 3},
new int[] {2, 3, 0},
new int[] {0, 2, 1}
});
Set<TopicIdPartition> expectedPartitions = new HashSet<>();
expectedPartitions.add(new TopicIdPartition(result.topicId(), 0));
expectedPartitions.add(new TopicIdPartition(result.topicId(), 3));
assertEquals(expectedPartitions, RecordTestUtils.
iteratorToSet(replicationControl.brokersToIsrs().iterator(0, true)));
List<ApiMessageAndVersion> records = new ArrayList<>();
replicationControl.handleBrokerFenced(0, records);
ctx.replay(records);
assertEquals(Collections.emptySet(), RecordTestUtils.
iteratorToSet(replicationControl.brokersToIsrs().iterator(0, true)));
}
@Test
public void testShrinkAndExpandIsr() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2}});
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
long brokerEpoch = ctx.currentBrokerEpoch(0);
PartitionData shrinkIsrRequest = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
ControllerResult<AlterPartitionResponseData> shrinkIsrResult = sendAlterPartition(
replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), shrinkIsrRequest);
AlterPartitionResponseData.PartitionData shrinkIsrResponse = assertAlterPartitionResponse(
shrinkIsrResult, topicIdPartition, NONE);
assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
PartitionData expandIsrRequest = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1, 2), LeaderRecoveryState.RECOVERED);
ControllerResult<AlterPartitionResponseData> expandIsrResult = sendAlterPartition(
replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), expandIsrRequest);
AlterPartitionResponseData.PartitionData expandIsrResponse = assertAlterPartitionResponse(
expandIsrResult, topicIdPartition, NONE);
assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, expandIsrResponse);
}
@Test
public void testEligibleLeaderReplicas_ShrinkAndExpandIsr() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2}});
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
long brokerEpoch = ctx.currentBrokerEpoch(0);
ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2");
// Change ISR to {0}.
PartitionData shrinkIsrRequest = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(0), LeaderRecoveryState.RECOVERED);
ControllerResult<AlterPartitionResponseData> shrinkIsrResult = sendAlterPartition(
replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), shrinkIsrRequest);
AlterPartitionResponseData.PartitionData shrinkIsrResponse = assertAlterPartitionResponse(
shrinkIsrResult, topicIdPartition, NONE);
assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{1, 2}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
PartitionData expandIsrRequest = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
ControllerResult<AlterPartitionResponseData> expandIsrResult = sendAlterPartition(
replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), expandIsrRequest);
AlterPartitionResponseData.PartitionData expandIsrResponse = assertAlterPartitionResponse(
expandIsrResult, topicIdPartition, NONE);
assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, expandIsrResponse);
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
}
@Test
public void testEligibleLeaderReplicas_ShrinkToEmptyIsr() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2}});
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3");
// Change ISR to {0}.
ctx.fenceBrokers(Set.of(1, 2));
PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{1, 2}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
// Clean shutdown the broker
ctx.handleBrokersShutdown(true, 0);
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{0, 1, 2}, partition.elr, partition.toString());
assertArrayEquals(new int[]{0}, partition.lastKnownElr, partition.toString());
assertEquals(0, partition.isr.length);
}
@Test
public void testEligibleLeaderReplicas_BrokerFence() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3);
ctx.unfenceBrokers(0, 1, 2, 3);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2, 3}});
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3");
ctx.fenceBrokers(Set.of(2, 3));
PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{3}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
ctx.fenceBrokers(Set.of(1, 2, 3));
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{1, 3}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
ctx.unfenceBrokers(0, 1, 2, 3);
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{1, 3}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
}
@Test
public void testEligibleLeaderReplicas_DeleteTopic() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2}});
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
long brokerEpoch = ctx.currentBrokerEpoch(0);
ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2");
// Change ISR to {0}.
PartitionData shrinkIsrRequest = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(0), LeaderRecoveryState.RECOVERED);
ControllerResult<AlterPartitionResponseData> shrinkIsrResult = sendAlterPartition(
replicationControl, 0, brokerEpoch, topicIdPartition.topicId(), shrinkIsrRequest);
AlterPartitionResponseData.PartitionData shrinkIsrResponse = assertAlterPartitionResponse(
shrinkIsrResult, topicIdPartition, NONE);
assertConsistentAlterPartitionResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(),
topicIdPartition.partitionId());
assertArrayEquals(new int[]{1, 2}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
assertTrue(replicationControl.brokersToElrs().partitionsWithBrokerInElr(1).hasNext());
ControllerRequestContext deleteTopicsRequestContext = anonymousContextFor(ApiKeys.DELETE_TOPICS);
ctx.deleteTopic(deleteTopicsRequestContext, createTopicResult.topicId());
assertFalse(replicationControl.brokersToElrs().partitionsWithBrokerInElr(1).hasNext());
assertFalse(replicationControl.brokersToIsrs().partitionsWithBrokerInIsr(0).hasNext());
}
@Test
public void testEligibleLeaderReplicas_EffectiveMinIsr() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][]{new int[]{0, 1, 2}});
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "5");
assertEquals(3, replicationControl.getTopicEffectiveMinIsr("foo"));
}
@Test
public void testEligibleLeaderReplicas_CleanElection() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setIsElrEnabled(true)
.build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3);
ctx.unfenceBrokers(0, 1, 2, 3);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2, 3}});
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3");
ctx.fenceBrokers(Set.of(1, 2, 3));
PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{2, 3}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
ctx.unfenceBrokers(2);
ctx.fenceBrokers(Set.of(0, 1));
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{0, 3}, partition.elr, partition.toString());
assertArrayEquals(new int[]{2}, partition.isr, partition.toString());
assertEquals(2, partition.leader, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
}
@Test
public void testEligibleLeaderReplicas_UncleanShutdown() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setIsElrEnabled(true)
.build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3);
ctx.unfenceBrokers(0, 1, 2, 3);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2, 3}});
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
ctx.alterTopicConfig("foo", TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3");
ctx.fenceBrokers(Set.of(1, 2, 3));
PartitionRegistration partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{2, 3}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
// An unclean shutdown ELR member should be kicked out of ELR.
ctx.handleBrokersShutdown(false, 3);
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{2}, partition.elr, partition.toString());
assertArrayEquals(new int[]{}, partition.lastKnownElr, partition.toString());
// An unclean shutdown last ISR member should be recognized as the last known leader.
ctx.handleBrokersShutdown(false, 0);
partition = replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertArrayEquals(new int[]{2}, partition.elr, partition.toString());
assertArrayEquals(new int[]{0}, partition.lastKnownElr, partition.toString());
}
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionHandleUnknownTopicIdOrName(short version) {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
String topicName = "foo";
Uuid topicId = Uuid.randomUuid();
AlterPartitionRequestData request = new AlterPartitionRequestData()
.setBrokerId(0)
.setBrokerEpoch(100)
.setTopics(singletonList(new TopicData()
.setTopicName(version <= 1 ? topicName : "")
.setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID)
.setPartitions(singletonList(new PartitionData()
.setPartitionIndex(0)))));
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
ControllerResult<AlterPartitionResponseData> result =
replicationControl.alterPartition(requestContext, request);
Errors expectedError = version > 1 ? UNKNOWN_TOPIC_ID : UNKNOWN_TOPIC_OR_PARTITION;
AlterPartitionResponseData expectedResponse = new AlterPartitionResponseData()
.setTopics(singletonList(new AlterPartitionResponseData.TopicData()
.setTopicName(version <= 1 ? topicName : "")
.setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID)
.setPartitions(singletonList(new AlterPartitionResponseData.PartitionData()
.setPartitionIndex(0)
.setErrorCode(expectedError.code())))));
assertEquals(expectedResponse, result.response());
}
@Test
public void testInvalidAlterPartitionRequests() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2}});
TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
int leaderId = 0;
int notLeaderId = 1;
assertEquals(OptionalInt.of(leaderId), ctx.currentLeader(topicIdPartition));
long brokerEpoch = ctx.currentBrokerEpoch(0);
// Invalid leader
PartitionData invalidLeaderRequest = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
ControllerResult<AlterPartitionResponseData> invalidLeaderResult = sendAlterPartition(
replicationControl, notLeaderId, ctx.currentBrokerEpoch(notLeaderId),
topicIdPartition.topicId(), invalidLeaderRequest);
assertAlterPartitionResponse(invalidLeaderResult, topicIdPartition, Errors.INVALID_REQUEST);
// Stale broker epoch
PartitionData invalidBrokerEpochRequest = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
assertThrows(StaleBrokerEpochException.class, () -> sendAlterPartition(
replicationControl, leaderId, brokerEpoch - 1, topicIdPartition.topicId(), invalidBrokerEpochRequest));
// Invalid leader epoch
PartitionData invalidLeaderEpochRequest = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
invalidLeaderEpochRequest.setLeaderEpoch(500);
ControllerResult<AlterPartitionResponseData> invalidLeaderEpochResult = sendAlterPartition(
replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
topicIdPartition.topicId(), invalidLeaderEpochRequest);
assertAlterPartitionResponse(invalidLeaderEpochResult, topicIdPartition, NOT_CONTROLLER);
// Invalid partition epoch
PartitionData invalidPartitionEpochRequest = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERED);
invalidPartitionEpochRequest.setPartitionEpoch(500);
ControllerResult<AlterPartitionResponseData> invalidPartitionEpochResult = sendAlterPartition(
replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
topicIdPartition.topicId(), invalidPartitionEpochRequest);
assertAlterPartitionResponse(invalidPartitionEpochResult, topicIdPartition, NOT_CONTROLLER);
// Invalid ISR (3 is not a valid replica)
PartitionData invalidIsrRequest1 = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1, 3), LeaderRecoveryState.RECOVERED);
ControllerResult<AlterPartitionResponseData> invalidIsrResult1 = sendAlterPartition(
replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
topicIdPartition.topicId(), invalidIsrRequest1);
assertAlterPartitionResponse(invalidIsrResult1, topicIdPartition, Errors.INVALID_REQUEST);
// Invalid ISR (does not include leader 0)
PartitionData invalidIsrRequest2 = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(1, 2), LeaderRecoveryState.RECOVERED);
ControllerResult<AlterPartitionResponseData> invalidIsrResult2 = sendAlterPartition(
replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
topicIdPartition.topicId(), invalidIsrRequest2);
assertAlterPartitionResponse(invalidIsrResult2, topicIdPartition, Errors.INVALID_REQUEST);
// Invalid ISR length and recovery state
PartitionData invalidIsrRecoveryRequest = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), LeaderRecoveryState.RECOVERING);
ControllerResult<AlterPartitionResponseData> invalidIsrRecoveryResult = sendAlterPartition(
replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
topicIdPartition.topicId(), invalidIsrRecoveryRequest);
assertAlterPartitionResponse(invalidIsrRecoveryResult, topicIdPartition, Errors.INVALID_REQUEST);
// Invalid recovery state transition from RECOVERED to RECOVERING
PartitionData invalidRecoveryRequest = newAlterPartition(
replicationControl, topicIdPartition, isrWithDefaultEpoch(0), LeaderRecoveryState.RECOVERING);
ControllerResult<AlterPartitionResponseData> invalidRecoveryResult = sendAlterPartition(
replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
topicIdPartition.topicId(), invalidRecoveryRequest);
assertAlterPartitionResponse(invalidRecoveryResult, topicIdPartition, Errors.INVALID_REQUEST);
}
private PartitionData newAlterPartition(
ReplicationControlManager replicationControl,
TopicIdPartition topicIdPartition,
List<BrokerState> newIsrWithEpoch,
LeaderRecoveryState leaderRecoveryState
) {
PartitionRegistration partitionControl =
replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
return new AlterPartitionRequestData.PartitionData()
.setPartitionIndex(0)
.setLeaderEpoch(partitionControl.leaderEpoch)
.setPartitionEpoch(partitionControl.partitionEpoch)
.setNewIsrWithEpochs(newIsrWithEpoch)
.setLeaderRecoveryState(leaderRecoveryState.value());
}
private ControllerResult<AlterPartitionResponseData> sendAlterPartition(
ReplicationControlManager replicationControl,
int brokerId,
long brokerEpoch,
Uuid topicId,
AlterPartitionRequestData.PartitionData partitionData
) {
AlterPartitionRequestData request = new AlterPartitionRequestData()
.setBrokerId(brokerId)
.setBrokerEpoch(brokerEpoch);
AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData()
.setTopicId(topicId);
request.topics().add(topicData);
topicData.partitions().add(partitionData);
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.ALTER_PARTITION);
ControllerResult<AlterPartitionResponseData> result = replicationControl.alterPartition(requestContext, request);
RecordTestUtils.replayAll(replicationControl, result.records());
return result;
}
private AlterPartitionResponseData.PartitionData assertAlterPartitionResponse(
ControllerResult<AlterPartitionResponseData> alterPartitionResult,
TopicIdPartition topicIdPartition,
Errors expectedError
) {
AlterPartitionResponseData response = alterPartitionResult.response();
assertEquals(1, response.topics().size());
AlterPartitionResponseData.TopicData topicData = response.topics().get(0);
assertEquals(topicIdPartition.topicId(), topicData.topicId());
assertEquals(1, topicData.partitions().size());
AlterPartitionResponseData.PartitionData partitionData = topicData.partitions().get(0);
assertEquals(topicIdPartition.partitionId(), partitionData.partitionIndex());
assertEquals(expectedError, Errors.forCode(partitionData.errorCode()));
return partitionData;
}
private void assertConsistentAlterPartitionResponse(
ReplicationControlManager replicationControl,
TopicIdPartition topicIdPartition,
AlterPartitionResponseData.PartitionData partitionData
) {
PartitionRegistration partitionControl =
replicationControl.getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
assertEquals(partitionControl.leader, partitionData.leaderId());
assertEquals(partitionControl.leaderEpoch, partitionData.leaderEpoch());
assertEquals(partitionControl.partitionEpoch, partitionData.partitionEpoch());
List<Integer> expectedIsr = IntStream.of(partitionControl.isr).boxed().collect(Collectors.toList());
assertEquals(expectedIsr, partitionData.isr());
}
private void assertCreatedTopicConfigs(
ReplicationControlTestContext ctx,
String topic,
CreateTopicsRequestData.CreatableTopicConfigCollection requestConfigs
) {
Map<String, String> configs = ctx.configurationControl.getConfigs(
new ConfigResource(ConfigResource.Type.TOPIC, topic));
assertEquals(requestConfigs.size(), configs.size());
for (CreateTopicsRequestData.CreatableTopicConfig requestConfig : requestConfigs) {
String value = configs.get(requestConfig.name());
assertEquals(requestConfig.value(), value);
}
}
private void assertEmptyTopicConfigs(
ReplicationControlTestContext ctx,
String topic
) {
Map<String, String> configs = ctx.configurationControl.getConfigs(
new ConfigResource(ConfigResource.Type.TOPIC, topic));
assertEquals(Collections.emptyMap(), configs);
}
@Test
public void testDeleteTopics() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
CreateTopicsRequestData.CreatableTopicConfigCollection requestConfigs =
new CreateTopicsRequestData.CreatableTopicConfigCollection();
requestConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig().
setName("cleanup.policy").setValue("compact"));
requestConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig().
setName("min.cleanable.dirty.ratio").setValue("0.1"));
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(3).setReplicationFactor((short) 2).
setConfigs(requestConfigs));
ctx.registerBrokers(0, 1);
ctx.unfenceBrokers(0, 1);
ControllerRequestContext createTopicsRequestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> createResult =
replicationControl.createTopics(createTopicsRequestContext, request, Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
Uuid topicId = createResult.response().topics().find("foo").topicId();
expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
setNumPartitions(3).setReplicationFactor((short) 2).
setErrorMessage(null).setErrorCode((short) 0).
setTopicId(topicId));
assertEquals(expectedResponse, withoutConfigs(createResult.response()));
// Until the records are replayed, no changes are made
assertNull(replicationControl.getPartition(topicId, 0));
assertEmptyTopicConfigs(ctx, "foo");
ctx.replay(createResult.records());
assertNotNull(replicationControl.getPartition(topicId, 0));
assertNotNull(replicationControl.getPartition(topicId, 1));
assertNotNull(replicationControl.getPartition(topicId, 2));
assertNull(replicationControl.getPartition(topicId, 3));
assertCreatedTopicConfigs(ctx, "foo", requestConfigs);
assertEquals(singletonMap(topicId, new ResultOrError<>("foo")),
replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(topicId)));
assertEquals(singletonMap("foo", new ResultOrError<>(topicId)),
replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("foo")));
Uuid invalidId = new Uuid(topicId.getMostSignificantBits() + 1,
topicId.getLeastSignificantBits());
assertEquals(singletonMap(invalidId,
new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID))),
replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(invalidId)));
assertEquals(singletonMap("bar",
new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_OR_PARTITION))),
replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar")));
ControllerRequestContext deleteTopicsRequestContext = anonymousContextFor(ApiKeys.DELETE_TOPICS);
ControllerResult<Map<Uuid, ApiError>> invalidDeleteResult = replicationControl.
deleteTopics(deleteTopicsRequestContext, Collections.singletonList(invalidId));
assertEquals(0, invalidDeleteResult.records().size());
assertEquals(singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)),
invalidDeleteResult.response());
ControllerResult<Map<Uuid, ApiError>> deleteResult = replicationControl.
deleteTopics(deleteTopicsRequestContext, Collections.singletonList(topicId));
assertTrue(deleteResult.isAtomic());
assertEquals(singletonMap(topicId, new ApiError(NONE, null)),
deleteResult.response());
assertEquals(1, deleteResult.records().size());
ctx.replay(deleteResult.records());
assertNull(replicationControl.getPartition(topicId, 0));
assertNull(replicationControl.getPartition(topicId, 1));
assertNull(replicationControl.getPartition(topicId, 2));
assertNull(replicationControl.getPartition(topicId, 3));
assertEquals(singletonMap(topicId, new ResultOrError<>(
new ApiError(UNKNOWN_TOPIC_ID))), replicationControl.findTopicNames(
Long.MAX_VALUE, Collections.singleton(topicId)));
assertEquals(singletonMap("foo", new ResultOrError<>(
new ApiError(UNKNOWN_TOPIC_OR_PARTITION))), replicationControl.findTopicIds(
Long.MAX_VALUE, Collections.singleton("foo")));
assertEmptyTopicConfigs(ctx, "foo");
}
@Test
public void testDeleteTopicsWithMutationQuotaExceeded() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(3).setReplicationFactor((short) 2));
ctx.registerBrokers(0, 1);
ctx.unfenceBrokers(0, 1);
ControllerRequestContext createTopicsRequestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> createResult =
replicationControl.createTopics(createTopicsRequestContext, request, Collections.singleton("foo"));
CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
CreatableTopicResult createdTopic = createResult.response().topics().find("foo");
assertEquals(NONE.code(), createdTopic.errorCode());
ctx.replay(createResult.records());
ControllerRequestContext deleteTopicsRequestContext =
anonymousContextWithMutationQuotaExceededFor(ApiKeys.DELETE_TOPICS);
Uuid topicId = createdTopic.topicId();
ControllerResult<Map<Uuid, ApiError>> deleteResult = replicationControl.
deleteTopics(deleteTopicsRequestContext, Collections.singletonList(topicId));
assertEquals(singletonMap(topicId, new ApiError(THROTTLING_QUOTA_EXCEEDED, QUOTA_EXCEEDED_IN_TEST_MSG)),
deleteResult.response());
assertEquals(0, deleteResult.records().size());
}
@Test
public void testCreatePartitions() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(3).setReplicationFactor((short) 2));
request.topics().add(new CreatableTopic().setName("bar").
setNumPartitions(4).setReplicationFactor((short) 2));
request.topics().add(new CreatableTopic().setName("quux").
setNumPartitions(2).setReplicationFactor((short) 2));
request.topics().add(new CreatableTopic().setName("foo2").
setNumPartitions(2).setReplicationFactor((short) 2));
ctx.registerBrokersWithDirs(
0, Collections.emptyList(),
1, asList(Uuid.fromString("QMzamNQVQ7GnJK9DwQHG7Q"), Uuid.fromString("loDxEBLETdedNnQGOKKENw")),
3, Collections.singletonList(Uuid.fromString("dxCDSgNjQvS4WuyqEKoCwA")));
ctx.unfenceBrokers(0, 1, 3);
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> createTopicResult = replicationControl.
createTopics(requestContext, request, new HashSet<>(Arrays.asList("foo", "bar", "quux", "foo2")));
ctx.replay(createTopicResult.records());
List<CreatePartitionsTopic> topics = new ArrayList<>();
topics.add(new CreatePartitionsTopic().
setName("foo").setCount(5).setAssignments(null));
topics.add(new CreatePartitionsTopic().
setName("bar").setCount(3).setAssignments(null));
topics.add(new CreatePartitionsTopic().
setName("baz").setCount(3).setAssignments(null));
topics.add(new CreatePartitionsTopic().
setName("quux").setCount(2).setAssignments(null));
ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult =
replicationControl.createPartitions(requestContext, topics);
assertEquals(asList(new CreatePartitionsTopicResult().
setName("foo").
setErrorCode(NONE.code()).
setErrorMessage(null),
new CreatePartitionsTopicResult().
setName("bar").
setErrorCode(INVALID_PARTITIONS.code()).
setErrorMessage("The topic bar currently has 4 partition(s); 3 would not be an increase."),
new CreatePartitionsTopicResult().
setName("baz").
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
setErrorMessage(null),
new CreatePartitionsTopicResult().
setName("quux").
setErrorCode(INVALID_PARTITIONS.code()).
setErrorMessage("Topic already has 2 partition(s).")),
createPartitionsResult.response());
ctx.replay(createPartitionsResult.records());
List<CreatePartitionsTopic> topics2 = new ArrayList<>();
topics2.add(new CreatePartitionsTopic().
setName("foo").setCount(6).setAssignments(singletonList(
new CreatePartitionsAssignment().setBrokerIds(asList(1, 3)))));
topics2.add(new CreatePartitionsTopic().
setName("bar").setCount(5).setAssignments(singletonList(
new CreatePartitionsAssignment().setBrokerIds(singletonList(1)))));
topics2.add(new CreatePartitionsTopic().
setName("quux").setCount(4).setAssignments(singletonList(
new CreatePartitionsAssignment().setBrokerIds(asList(1, 0)))));
topics2.add(new CreatePartitionsTopic().
setName("foo2").setCount(3).setAssignments(singletonList(
new CreatePartitionsAssignment().setBrokerIds(asList(2, 0)))));
ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult2 =
replicationControl.createPartitions(requestContext, topics2);
assertEquals(asList(new CreatePartitionsTopicResult().
setName("foo").
setErrorCode(NONE.code()).
setErrorMessage(null),
new CreatePartitionsTopicResult().
setName("bar").
setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
setErrorMessage("The manual partition assignment includes a partition " +
"with 1 replica(s), but this is not consistent with previous " +
"partitions, which have 2 replica(s)."),
new CreatePartitionsTopicResult().
setName("quux").
setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
setErrorMessage("Attempted to add 2 additional partition(s), but only 1 assignment(s) were specified."),
new CreatePartitionsTopicResult().
setName("foo2").
setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
setErrorMessage("The manual partition assignment includes broker 2, but " +
"no such broker is registered.")),
createPartitionsResult2.response());
ctx.replay(createPartitionsResult2.records());
assertArrayEquals(
new Uuid[] {DirectoryId.UNASSIGNED, Uuid.fromString("dxCDSgNjQvS4WuyqEKoCwA")},
replicationControl.getPartition(replicationControl.getTopicId("foo"), 5).directories);
}
@Test
public void testCreatePartitionsWithMutationQuotaExceeded() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(3).setReplicationFactor((short) 2));
ctx.registerBrokers(0, 1);
ctx.unfenceBrokers(0, 1);
ControllerRequestContext createTopicsRequestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> createResult =
replicationControl.createTopics(createTopicsRequestContext, request, Collections.singleton("foo"));
CreatableTopicResult createdTopic = createResult.response().topics().find("foo");
assertEquals(NONE.code(), createdTopic.errorCode());
ctx.replay(createResult.records());
List<CreatePartitionsTopic> topics = new ArrayList<>();
topics.add(new CreatePartitionsTopic().
setName("foo").setCount(5).setAssignments(null));
ControllerRequestContext createPartitionsRequestContext =
anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_PARTITIONS);
ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult =
replicationControl.createPartitions(createPartitionsRequestContext, topics);
List<CreatePartitionsTopicResult> expectedThrottled = singletonList(new CreatePartitionsTopicResult().
setName("foo").
setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
setErrorMessage(QUOTA_EXCEEDED_IN_TEST_MSG));
assertEquals(expectedThrottled, createPartitionsResult.response());
// now test the explicit assignment case
List<CreatePartitionsTopic> topics2 = new ArrayList<>();
topics2.add(new CreatePartitionsTopic().
setName("foo").setCount(4).setAssignments(singletonList(
new CreatePartitionsAssignment().setBrokerIds(asList(1, 0)))));
ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult2 =
replicationControl.createPartitions(createPartitionsRequestContext, topics2);
assertEquals(expectedThrottled, createPartitionsResult2.response());
}
@Test
public void testCreatePartitionsFailsWhenAllBrokersAreFencedOrInControlledShutdown() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(1).setReplicationFactor((short) 2));
ctx.registerBrokers(0, 1);
ctx.unfenceBrokers(0, 1);
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> createTopicResult = replicationControl.
createTopics(requestContext, request, new HashSet<>(singletonList("foo")));
ctx.replay(createTopicResult.records());
ctx.registerBrokers(0, 1);
ctx.unfenceBrokers(0);
ctx.inControlledShutdownBrokers(0);
List<CreatePartitionsTopic> topics = new ArrayList<>();
topics.add(new CreatePartitionsTopic().
setName("foo").setCount(2).setAssignments(null));
ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult =
replicationControl.createPartitions(requestContext, topics);
assertEquals(
singletonList(new CreatePartitionsTopicResult().
setName("foo").
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
setErrorMessage("Unable to replicate the partition 2 time(s): All " +
"brokers are currently fenced or in controlled shutdown.")),
createPartitionsResult.response());
}
@Test
public void testCreatePartitionsISRInvariants() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(1).setReplicationFactor((short) 3));
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1);
ctx.inControlledShutdownBrokers(1);
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
ctx.replay(result.records());
List<CreatePartitionsTopic> topics = singletonList(new CreatePartitionsTopic().
setName("foo").setCount(2).setAssignments(null));
ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult =
replicationControl.createPartitions(requestContext, topics);
ctx.replay(createPartitionsResult.records());
// Broker 2 cannot be in the ISR because it is fenced and broker 1
// cannot be in the ISR because it is in controlled shutdown.
assertEquals(
new PartitionRegistration.Builder().setReplicas(new int[]{0, 1, 2}).
setDirectories(new Uuid[] {
Uuid.fromString("TESTBROKER00000DIRAAAA"),
Uuid.fromString("TESTBROKER00001DIRAAAA"),
Uuid.fromString("TESTBROKER00002DIRAAAA")
}).
setIsr(new int[]{0}).
setLeader(0).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(0).
setPartitionEpoch(0).
build(),
replicationControl.getPartition(
((TopicRecord) result.records().get(0).message()).topicId(), 1));
}
@Test
public void testValidateGoodManualPartitionAssignments() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(1, 2, 3);
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(singletonList(1)),
OptionalInt.of(1));
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(singletonList(1)),
OptionalInt.empty());
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1, 2, 3)),
OptionalInt.of(3));
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1, 2, 3)),
OptionalInt.empty());
}
@Test
public void testValidateBadManualPartitionAssignments() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(1, 2);
assertEquals("The manual partition assignment includes an empty replica list.",
assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(Collections.emptyList()),
OptionalInt.empty())).getMessage());
assertEquals("The manual partition assignment includes broker 3, but no such " +
"broker is registered.", assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1, 2, 3)),
OptionalInt.empty())).getMessage());
assertEquals("The manual partition assignment includes the broker 2 more than " +
"once.", assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1, 2, 2)),
OptionalInt.empty())).getMessage());
assertEquals("The manual partition assignment includes a partition with 2 " +
"replica(s), but this is not consistent with previous partitions, which have " +
"3 replica(s).", assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1, 2)),
OptionalInt.of(3))).getMessage());
}
private static final ListPartitionReassignmentsResponseData NONE_REASSIGNING =
new ListPartitionReassignmentsResponseData().setErrorMessage(null);
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testReassignPartitions(short version) {
MetadataVersion metadataVersion = MetadataVersion.latestTesting();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setMetadataVersion(metadataVersion)
.build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3);
ctx.unfenceBrokers(0, 1, 2, 3);
Uuid fooId = ctx.createTestTopic("foo", new int[][] {
new int[] {1, 2, 3}, new int[] {3, 2, 1}}).topicId();
ctx.createTestTopic("bar", new int[][] {
new int[] {1, 2, 3}}).topicId();
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(asList(
new ReassignableTopic().setName("foo").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(asList(3, 2, 1)),
new ReassignablePartition().setPartitionIndex(1).
setReplicas(asList(0, 2, 1)),
new ReassignablePartition().setPartitionIndex(2).
setReplicas(asList(0, 2, 1)))),
new ReassignableTopic().setName("bar"))));
assertEquals(new AlterPartitionReassignmentsResponseData().
setErrorMessage(null).setResponses(asList(
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(1).
setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(2).
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
setErrorMessage("Unable to find partition foo:2."))),
new ReassignableTopicResponse().
setName("bar"))),
alterResult.response());
ctx.replay(alterResult.records());
ListPartitionReassignmentsResponseData currentReassigning =
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
setTopics(singletonList(new OngoingTopicReassignment().
setName("foo").setPartitions(singletonList(
new OngoingPartitionReassignment().setPartitionIndex(1).
setRemovingReplicas(singletonList(3)).
setAddingReplicas(singletonList(0)).
setReplicas(asList(0, 2, 1, 3))))));
assertEquals(currentReassigning, replication.listPartitionReassignments(null, Long.MAX_VALUE));
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(singletonList(
new ListPartitionReassignmentsTopics().setName("bar").
setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE));
assertEquals(currentReassigning, replication.listPartitionReassignments(singletonList(
new ListPartitionReassignmentsTopics().setName("foo").
setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE));
ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(asList(
new ReassignableTopic().setName("foo").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null),
new ReassignablePartition().setPartitionIndex(1).
setReplicas(null),
new ReassignablePartition().setPartitionIndex(2).
setReplicas(null))),
new ReassignableTopic().setName("bar").setPartitions(singletonList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null))))));
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(fooId).
setPartitionId(1).
setReplicas(asList(2, 1, 3)).
setDirectories(asList(
Uuid.fromString("TESTBROKER00002DIRAAAA"),
Uuid.fromString("TESTBROKER00001DIRAAAA"),
Uuid.fromString("TESTBROKER00003DIRAAAA")
)).
setLeader(3).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()), MetadataVersion.latestTesting().partitionChangeRecordVersion())),
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(1).
setErrorCode(NONE.code()).setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(2).
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
setErrorMessage("Unable to find partition foo:2."))),
new ReassignableTopicResponse().setName("bar").setPartitions(singletonList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).
setErrorMessage(null)))))),
cancelResult);
log.info("running final alterPartition...");
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
AlterPartitionRequestData alterPartitionRequestData = new AlterPartitionRequestData().
setBrokerId(3).
setBrokerEpoch(103).
setTopics(singletonList(new TopicData().
setTopicName(version <= 1 ? "foo" : "").
setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).
setPartitions(singletonList(new PartitionData().
setPartitionIndex(1).
setPartitionEpoch(1).
setLeaderEpoch(0).
setNewIsrWithEpochs(isrWithDefaultEpoch(3, 0, 2, 1))))));
ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
requestContext,
new AlterPartitionRequest.Builder(alterPartitionRequestData).build(version).data());
Errors expectedError = version > 1 ? NEW_LEADER_ELECTED : FENCED_LEADER_EPOCH;
assertEquals(new AlterPartitionResponseData().setTopics(singletonList(
new AlterPartitionResponseData.TopicData().
setTopicName(version <= 1 ? "foo" : "").
setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).
setPartitions(singletonList(
new AlterPartitionResponseData.PartitionData().
setPartitionIndex(1).
setErrorCode(expectedError.code()))))),
alterPartitionResult.response());
ctx.replay(alterPartitionResult.records());
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
}
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionShouldRejectFencedBrokers(short version) {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
Uuid fooId = ctx.createTestTopic(
"foo",
new int[][] {new int[] {1, 2, 3, 4}}
).topicId();
List<ApiMessageAndVersion> fenceRecords = new ArrayList<>();
replication.handleBrokerFenced(3, fenceRecords);
ctx.replay(fenceRecords);
assertEquals(
new PartitionRegistration.Builder().
setReplicas(new int[] {1, 2, 3, 4}).
setDirectories(new Uuid[] {
Uuid.fromString("TESTBROKER00001DIRAAAA"),
Uuid.fromString("TESTBROKER00002DIRAAAA"),
Uuid.fromString("TESTBROKER00003DIRAAAA"),
Uuid.fromString("TESTBROKER00004DIRAAAA")
}).
setIsr(new int[] {1, 2, 4}).
setLeader(1).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(0).
setPartitionEpoch(1).
build(),
replication.getPartition(fooId, 0));
AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData()
.setBrokerId(1)
.setBrokerEpoch(101)
.setTopics(singletonList(new TopicData()
.setTopicName(version <= 1 ? "foo" : "")
.setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
.setPartitions(singletonList(new PartitionData()
.setPartitionIndex(0)
.setPartitionEpoch(1)
.setLeaderEpoch(0)
.setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3, 4))))));
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
ControllerResult<AlterPartitionResponseData> alterPartitionResult =
replication.alterPartition(requestContext, new AlterPartitionRequest.Builder(alterIsrRequest).build(version).data());
Errors expectedError = version <= 1 ? OPERATION_NOT_ATTEMPTED : INELIGIBLE_REPLICA;
assertEquals(
new AlterPartitionResponseData()
.setTopics(singletonList(new AlterPartitionResponseData.TopicData()
.setTopicName(version <= 1 ? "foo" : "")
.setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
.setPartitions(singletonList(new AlterPartitionResponseData.PartitionData()
.setPartitionIndex(0)
.setErrorCode(expectedError.code()))))),
alterPartitionResult.response());
fenceRecords = new ArrayList<>();
replication.handleBrokerUnfenced(3, 103, fenceRecords);
ctx.replay(fenceRecords);
alterPartitionResult = replication.alterPartition(requestContext, alterIsrRequest);
assertEquals(
new AlterPartitionResponseData()
.setTopics(singletonList(new AlterPartitionResponseData.TopicData()
.setTopicName(version <= 1 ? "foo" : "")
.setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
.setPartitions(singletonList(new AlterPartitionResponseData.PartitionData()
.setPartitionIndex(0)
.setLeaderId(1)
.setLeaderEpoch(0)
.setIsr(asList(1, 2, 3, 4))
.setPartitionEpoch(2)
.setErrorCode(NONE.code()))))),
alterPartitionResult.response());
}
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionShouldRejectBrokersWithStaleEpoch(short version) {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
Uuid fooId = ctx.createTestTopic(
"foo",
new int[][] {new int[] {1, 2, 3, 4}}
).topicId();
ctx.alterPartition(new TopicIdPartition(fooId, 0), 1, isrWithDefaultEpoch(1, 2, 3), LeaderRecoveryState.RECOVERED);
// First, the leader is constructing an AlterPartition request.
AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData().
setBrokerId(1).
setBrokerEpoch(101).
setTopics(singletonList(new TopicData().
setTopicName(version <= 1 ? "foo" : "").
setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).
setPartitions(singletonList(new PartitionData().
setPartitionIndex(0).
setPartitionEpoch(1).
setLeaderEpoch(0).
setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3, 4))))));
// The broker 4 has failed silently and now registers again.
long newEpoch = defaultBrokerEpoch(4) + 1000;
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
setBrokerEpoch(newEpoch).setBrokerId(4).setRack(null);
brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
setPort((short) 9092 + 4).
setName("PLAINTEXT").
setHost("localhost"));
ctx.replay(Collections.singletonList(new ApiMessageAndVersion(brokerRecord, (short) 0)));
// Unfence the broker 4.
ControllerResult<BrokerHeartbeatReply> result = ctx.replicationControl.
processBrokerHeartbeat(new BrokerHeartbeatRequestData().
setBrokerId(4).setBrokerEpoch(newEpoch).
setCurrentMetadataOffset(1).
setWantFence(false).setWantShutDown(false), 0);
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
result.response());
ctx.replay(result.records());
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
ControllerResult<AlterPartitionResponseData> alterPartitionResult =
replication.alterPartition(requestContext, new AlterPartitionRequest.Builder(alterIsrRequest).build(version).data());
// The late arrived AlterPartition request should be rejected when version >= 3.
if (version >= 3) {
assertEquals(
new AlterPartitionResponseData().
setTopics(singletonList(new AlterPartitionResponseData.TopicData().
setTopicName("").
setTopicId(fooId).
setPartitions(singletonList(new AlterPartitionResponseData.PartitionData().
setPartitionIndex(0).
setErrorCode(INELIGIBLE_REPLICA.code()))))),
alterPartitionResult.response());
} else {
assertEquals(NONE.code(), alterPartitionResult.response().errorCode());
}
}
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionShouldRejectShuttingDownBrokers(short version) {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
Uuid fooId = ctx.createTestTopic(
"foo",
new int[][] {new int[] {1, 2, 3, 4}}
).topicId();
assertEquals(
new PartitionRegistration.Builder().
setReplicas(new int[] {1, 2, 3, 4}).
setDirectories(new Uuid[] {
Uuid.fromString("TESTBROKER00001DIRAAAA"),
Uuid.fromString("TESTBROKER00002DIRAAAA"),
Uuid.fromString("TESTBROKER00003DIRAAAA"),
Uuid.fromString("TESTBROKER00004DIRAAAA")
}).
setIsr(new int[] {1, 2, 3, 4}).
setLeader(1).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(0).
setPartitionEpoch(0).
build(),
replication.getPartition(fooId, 0));
ctx.inControlledShutdownBrokers(3);
AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData()
.setBrokerId(1)
.setBrokerEpoch(101)
.setTopics(singletonList(new TopicData()
.setTopicName(version <= 1 ? "foo" : "")
.setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
.setPartitions(singletonList(new PartitionData()
.setPartitionIndex(0)
.setPartitionEpoch(0)
.setLeaderEpoch(0)
.setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3, 4))))));
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
ControllerResult<AlterPartitionResponseData> alterPartitionResult =
replication.alterPartition(requestContext, new AlterPartitionRequest.Builder(alterIsrRequest).build(version).data());
Errors expectedError = version <= 1 ? OPERATION_NOT_ATTEMPTED : INELIGIBLE_REPLICA;
assertEquals(
new AlterPartitionResponseData()
.setTopics(singletonList(new AlterPartitionResponseData.TopicData()
.setTopicName(version <= 1 ? "foo" : "")
.setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
.setPartitions(singletonList(new AlterPartitionResponseData.PartitionData()
.setPartitionIndex(0)
.setErrorCode(expectedError.code()))))),
alterPartitionResult.response());
}
@Test
public void testCancelReassignPartitions() {
MetadataVersion metadataVersion = MetadataVersion.latestTesting();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setMetadataVersion(metadataVersion)
.build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
Uuid fooId = ctx.createTestTopic("foo", new int[][] {
new int[] {1, 2, 3, 4}, new int[] {0, 1, 2, 3}, new int[] {4, 3, 1, 0},
new int[] {2, 3, 4, 1}}).topicId();
Uuid barId = ctx.createTestTopic("bar", new int[][] {
new int[] {4, 3, 2}}).topicId();
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
List<ApiMessageAndVersion> fenceRecords = new ArrayList<>();
replication.handleBrokerFenced(3, fenceRecords);
ctx.replay(fenceRecords);
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4}).setIsr(new int[] {1, 2, 4}).
setDirectories(new Uuid[] {
Uuid.fromString("TESTBROKER00001DIRAAAA"),
Uuid.fromString("TESTBROKER00002DIRAAAA"),
Uuid.fromString("TESTBROKER00003DIRAAAA"),
Uuid.fromString("TESTBROKER00004DIRAAAA")
}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(1).build(), replication.getPartition(fooId, 0));
ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(asList(
new ReassignableTopic().setName("foo").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(asList(1, 2, 4)),
new ReassignablePartition().setPartitionIndex(1).
setReplicas(asList(1, 2, 3, 0)),
new ReassignablePartition().setPartitionIndex(2).
setReplicas(asList(5, 6, 7)),
new ReassignablePartition().setPartitionIndex(3).
setReplicas(Collections.emptyList()))),
new ReassignableTopic().setName("bar").setPartitions(singletonList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(asList(1, 2, 3, 4, 0)))))));
assertEquals(new AlterPartitionReassignmentsResponseData().
setErrorMessage(null).
setResponses(asList(
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(1).setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
setErrorMessage("The manual partition assignment includes broker 5, but no such broker is registered."),
new ReassignablePartitionResponse().setPartitionIndex(3).setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
setErrorMessage("The manual partition assignment includes an empty replica list."))),
new ReassignableTopicResponse().setName("bar").setPartitions(singletonList(
new ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))),
alterResult.response());
ctx.replay(alterResult.records());
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 4}).setIsr(new int[] {1, 2, 4}).
setDirectories(new Uuid[] {
Uuid.fromString("TESTBROKER00001DIRAAAA"),
Uuid.fromString("TESTBROKER00002DIRAAAA"),
Uuid.fromString("TESTBROKER00004DIRAAAA")
}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(2).build(), replication.getPartition(fooId, 0));
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 0}).setIsr(new int[] {0, 1, 2}).
setDirectories(new Uuid[] {
Uuid.fromString("TESTBROKER00001DIRAAAA"),
Uuid.fromString("TESTBROKER00002DIRAAAA"),
Uuid.fromString("TESTBROKER00003DIRAAAA"),
Uuid.fromString("TESTBROKER00000DIRAAAA")
}).
setLeader(0).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(2).build(), replication.getPartition(fooId, 1));
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 0}).setIsr(new int[] {4, 2}).
setDirectories(new Uuid[] {
Uuid.fromString("TESTBROKER00001DIRAAAA"),
Uuid.fromString("TESTBROKER00002DIRAAAA"),
Uuid.fromString("TESTBROKER00003DIRAAAA"),
Uuid.fromString("TESTBROKER00004DIRAAAA"),
Uuid.fromString("TESTBROKER00000DIRAAAA")
}).
setAddingReplicas(new int[] {0, 1}).setLeader(4).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(2).build(), replication.getPartition(barId, 0));
ListPartitionReassignmentsResponseData currentReassigning =
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
setTopics(singletonList(new OngoingTopicReassignment().
setName("bar").setPartitions(singletonList(
new OngoingPartitionReassignment().setPartitionIndex(0).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(asList(0, 1)).
setReplicas(asList(1, 2, 3, 4, 0))))));
assertEquals(currentReassigning, replication.listPartitionReassignments(null, Long.MAX_VALUE));
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(singletonList(
new ListPartitionReassignmentsTopics().setName("foo").
setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE));
assertEquals(currentReassigning, replication.listPartitionReassignments(singletonList(
new ListPartitionReassignmentsTopics().setName("bar").
setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE));
ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
anonymousContextFor(ApiKeys.ALTER_PARTITION),
new AlterPartitionRequestData().setBrokerId(4).setBrokerEpoch(104).
setTopics(singletonList(new TopicData().setTopicId(barId).setPartitions(singletonList(
new PartitionData().setPartitionIndex(0).setPartitionEpoch(2).
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(4, 1, 2, 0)))))));
assertEquals(new AlterPartitionResponseData().setTopics(singletonList(
new AlterPartitionResponseData.TopicData().setTopicId(barId).setPartitions(singletonList(
new AlterPartitionResponseData.PartitionData().
setPartitionIndex(0).
setLeaderId(4).
setLeaderEpoch(0).
setIsr(asList(4, 1, 2, 0)).
setPartitionEpoch(3).
setErrorCode(NONE.code()))))),
alterPartitionResult.response());
ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(asList(
new ReassignableTopic().setName("foo").setPartitions(singletonList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null))),
new ReassignableTopic().setName("bar").setPartitions(singletonList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null))))));
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(barId).
setPartitionId(0).
setLeader(4).
setReplicas(asList(2, 3, 4)).
setDirectories(asList(
Uuid.fromString("TESTBROKER00002DIRAAAA"),
Uuid.fromString("TESTBROKER00003DIRAAAA"),
Uuid.fromString("TESTBROKER00004DIRAAAA")
)).
setRemovingReplicas(null).
setAddingReplicas(Collections.emptyList()), MetadataVersion.latestTesting().partitionChangeRecordVersion())),
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
new ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null))),
new ReassignableTopicResponse().setName("bar").setPartitions(singletonList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null)))))),
cancelResult);
ctx.replay(cancelResult.records());
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {2, 3, 4}).setIsr(new int[] {4, 2}).
setDirectories(new Uuid[] {
Uuid.fromString("TESTBROKER00002DIRAAAA"),
Uuid.fromString("TESTBROKER00003DIRAAAA"),
Uuid.fromString("TESTBROKER00004DIRAAAA")
}).
setLeader(4).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(3).build(), replication.getPartition(barId, 0));
}
@Test
public void testManualPartitionAssignmentOnAllFencedBrokers() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(0, 1, 2, 3);
ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}},
INVALID_REPLICA_ASSIGNMENT.code());
}
@Test
public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(0, 1, 2, 3, 4, 5);
ctx.unfenceBrokers(0, 1, 2);
Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}}).topicId();
ctx.createPartitions(2, "foo", new int[][] {new int[] {3, 4, 5}},
INVALID_REPLICA_ASSIGNMENT.code());
ctx.createPartitions(2, "foo", new int[][] {new int[] {2, 4, 5}}, NONE.code());
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {2, 4, 5}).
setDirectories(new Uuid[] {
Uuid.fromString("TESTBROKER00002DIRAAAA"),
Uuid.fromString("TESTBROKER00004DIRAAAA"),
Uuid.fromString("TESTBROKER00005DIRAAAA")
}).
setIsr(new int[] {2}).setLeader(2).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build(),
ctx.replicationControl.getPartition(fooId, 1));
}
private void assertLeaderAndIsr(
ReplicationControlManager replication,
TopicIdPartition topicIdPartition,
int leaderId,
int[] isr
) {
PartitionRegistration registration = replication.getPartition(
topicIdPartition.topicId(),
topicIdPartition.partitionId()
);
assertArrayEquals(isr, registration.isr);
assertEquals(leaderId, registration.leader);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testElectUncleanLeaders_WithoutElr(boolean electAllPartitions) {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().
setMetadataVersion(MetadataVersion.IBP_3_6_IV1).
build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
Uuid fooId = ctx.createTestTopic("foo", new int[][]{
new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
TopicIdPartition partition0 = new TopicIdPartition(fooId, 0);
TopicIdPartition partition1 = new TopicIdPartition(fooId, 1);
TopicIdPartition partition2 = new TopicIdPartition(fooId, 2);
ctx.fenceBrokers(Set.of(2, 3));
ctx.fenceBrokers(Set.of(1, 2, 3));
assertLeaderAndIsr(replication, partition0, NO_LEADER, new int[]{1});
assertLeaderAndIsr(replication, partition1, 4, new int[]{4});
assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
ElectLeadersRequestData request = buildElectLeadersRequest(
ElectionType.UNCLEAN,
electAllPartitions ? null : singletonMap("foo", asList(0, 1, 2))
);
// No election can be done yet because no replicas are available for partition 0
ControllerResult<ElectLeadersResponseData> result1 = replication.electLeaders(request);
assertEquals(Collections.emptyList(), result1.records());
ElectLeadersResponseData expectedResponse1 = buildElectLeadersResponse(NONE, electAllPartitions, Utils.mkMap(
Utils.mkEntry(
new TopicPartition("foo", 0),
new ApiError(ELIGIBLE_LEADERS_NOT_AVAILABLE)
),
Utils.mkEntry(
new TopicPartition("foo", 1),
new ApiError(ELECTION_NOT_NEEDED)
),
Utils.mkEntry(
new TopicPartition("foo", 2),
new ApiError(ELECTION_NOT_NEEDED)
)
));
assertElectLeadersResponse(expectedResponse1, result1.response());
// Now we bring 2 back online which should allow the unclean election of partition 0
ctx.unfenceBrokers(2);
// Bring 2 back into the ISR for partition 1. This allows us to verify that
// preferred election does not occur as a result of the unclean election request.
ctx.alterPartition(partition1, 4, isrWithDefaultEpoch(2, 4), LeaderRecoveryState.RECOVERED);
ControllerResult<ElectLeadersResponseData> result = replication.electLeaders(request);
assertEquals(1, result.records().size());
ApiMessageAndVersion record = result.records().get(0);
assertInstanceOf(PartitionChangeRecord.class, record.message());
PartitionChangeRecord partitionChangeRecord = (PartitionChangeRecord) record.message();
assertEquals(0, partitionChangeRecord.partitionId());
assertEquals(2, partitionChangeRecord.leader());
assertEquals(singletonList(2), partitionChangeRecord.isr());
ctx.replay(result.records());
assertLeaderAndIsr(replication, partition0, 2, new int[]{2});
assertLeaderAndIsr(replication, partition1, 4, new int[]{2, 4});
assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
ElectLeadersResponseData expectedResponse = buildElectLeadersResponse(NONE, electAllPartitions, Utils.mkMap(
Utils.mkEntry(
new TopicPartition("foo", 0),
ApiError.NONE
),
Utils.mkEntry(
new TopicPartition("foo", 1),
new ApiError(ELECTION_NOT_NEEDED)
),
Utils.mkEntry(
new TopicPartition("foo", 2),
new ApiError(ELECTION_NOT_NEEDED)
)
));
assertElectLeadersResponse(expectedResponse, result.response());
}
@Test
public void testPreferredElectionDoesNotTriggerUncleanElection() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(1, 2, 3, 4);
ctx.unfenceBrokers(1, 2, 3, 4);
Uuid fooId = ctx.createTestTopic("foo", new int[][]{new int[]{1, 2, 3}}).topicId();
TopicIdPartition partition = new TopicIdPartition(fooId, 0);
ctx.fenceBrokers(Set.of(2, 3));
ctx.fenceBrokers(Set.of(1, 2, 3));
ctx.unfenceBrokers(2);
assertLeaderAndIsr(replication, partition, NO_LEADER, new int[]{1});
ctx.alterTopicConfig("foo", "unclean.leader.election.enable", "true");
ElectLeadersRequestData request = buildElectLeadersRequest(
ElectionType.PREFERRED,
singletonMap("foo", singletonList(0))
);
// No election should be done even though unclean election is available
ControllerResult<ElectLeadersResponseData> result = replication.electLeaders(request);
assertEquals(Collections.emptyList(), result.records());
ElectLeadersResponseData expectedResponse = buildElectLeadersResponse(NONE, false, singletonMap(
new TopicPartition("foo", 0), new ApiError(PREFERRED_LEADER_NOT_AVAILABLE)
));
assertEquals(expectedResponse, result.response());
}
private ElectLeadersRequestData buildElectLeadersRequest(
ElectionType electionType,
Map<String, List<Integer>> partitions
) {
ElectLeadersRequestData request = new ElectLeadersRequestData().
setElectionType(electionType.value);
if (partitions == null) {
request.setTopicPartitions(null);
} else {
partitions.forEach((topic, partitionIds) -> {
request.topicPartitions().add(new TopicPartitions()
.setTopic(topic)
.setPartitions(partitionIds)
);
});
}
return request;
}
@Test
public void testFenceMultipleBrokers() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
Uuid fooId = ctx.createTestTopic("foo", new int[][]{
new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
assertTrue(ctx.fencedBrokerIds().isEmpty());
ctx.fenceBrokers(Set.of(2, 3));
PartitionRegistration partition0 = replication.getPartition(fooId, 0);
PartitionRegistration partition1 = replication.getPartition(fooId, 1);
PartitionRegistration partition2 = replication.getPartition(fooId, 2);
assertArrayEquals(new int[]{1, 2, 3}, partition0.replicas);
assertArrayEquals(new int[]{1}, partition0.isr);
assertEquals(1, partition0.leader);
assertArrayEquals(new int[]{2, 3, 4}, partition1.replicas);
assertArrayEquals(new int[]{4}, partition1.isr);
assertEquals(4, partition1.leader);
assertArrayEquals(new int[]{0, 2, 1}, partition2.replicas);
assertArrayEquals(new int[]{0, 1}, partition2.isr);
assertNotEquals(2, partition2.leader);
}
@Test
public void testElectPreferredLeaders() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(1, 2, 3, 4);
ctx.inControlledShutdownBrokers(1);
Uuid fooId = ctx.createTestTopic("foo", new int[][]{
new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
ElectLeadersRequestData request1 = new ElectLeadersRequestData().
setElectionType(ElectionType.PREFERRED.value).
setTopicPartitions(new TopicPartitionsCollection(asList(
new TopicPartitions().setTopic("foo").
setPartitions(asList(0, 1, 2)),
new TopicPartitions().setTopic("bar").
setPartitions(asList(0, 1))).iterator()));
ControllerResult<ElectLeadersResponseData> election1Result =
replication.electLeaders(request1);
ElectLeadersResponseData expectedResponse1 = buildElectLeadersResponse(NONE, false, Utils.mkMap(
Utils.mkEntry(
new TopicPartition("foo", 0),
new ApiError(PREFERRED_LEADER_NOT_AVAILABLE)
),
Utils.mkEntry(
new TopicPartition("foo", 1),
new ApiError(ELECTION_NOT_NEEDED)
),
Utils.mkEntry(
new TopicPartition("foo", 2),
new ApiError(PREFERRED_LEADER_NOT_AVAILABLE)
),
Utils.mkEntry(
new TopicPartition("bar", 0),
new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
),
Utils.mkEntry(
new TopicPartition("bar", 1),
new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
)
));
assertElectLeadersResponse(expectedResponse1, election1Result.response());
assertEquals(Collections.emptyList(), election1Result.records());
// Broker 1 must be registered to get out from the controlled shutdown state.
ctx.registerBrokers(1);
ctx.unfenceBrokers(0, 1);
ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
anonymousContextFor(ApiKeys.ALTER_PARTITION),
new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102).
setTopics(singletonList(new TopicData().setTopicId(fooId).
setPartitions(asList(
new PartitionData().
setPartitionIndex(0).setPartitionEpoch(0).
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3)),
new PartitionData().
setPartitionIndex(2).setPartitionEpoch(0).
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(0, 2, 1)))))));
assertEquals(new AlterPartitionResponseData().setTopics(singletonList(
new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(asList(
new AlterPartitionResponseData.PartitionData().
setPartitionIndex(0).
setLeaderId(2).
setLeaderEpoch(0).
setIsr(asList(1, 2, 3)).
setPartitionEpoch(1).
setErrorCode(NONE.code()),
new AlterPartitionResponseData.PartitionData().
setPartitionIndex(2).
setLeaderId(2).
setLeaderEpoch(0).
setIsr(asList(0, 2, 1)).
setPartitionEpoch(1).
setErrorCode(NONE.code()))))),
alterPartitionResult.response());
ElectLeadersResponseData expectedResponse2 = buildElectLeadersResponse(NONE, false, Utils.mkMap(
Utils.mkEntry(
new TopicPartition("foo", 0),
ApiError.NONE
),
Utils.mkEntry(
new TopicPartition("foo", 1),
new ApiError(ELECTION_NOT_NEEDED)
),
Utils.mkEntry(
new TopicPartition("foo", 2),
ApiError.NONE
),
Utils.mkEntry(
new TopicPartition("bar", 0),
new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
),
Utils.mkEntry(
new TopicPartition("bar", 1),
new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
)
));
ctx.replay(alterPartitionResult.records());
ControllerResult<ElectLeadersResponseData> election2Result =
replication.electLeaders(request1);
assertElectLeadersResponse(expectedResponse2, election2Result.response());
assertEquals(
asList(
new ApiMessageAndVersion(
new PartitionChangeRecord().
setPartitionId(0).
setTopicId(fooId).
setLeader(1),
MetadataVersion.latestTesting().partitionChangeRecordVersion()),
new ApiMessageAndVersion(
new PartitionChangeRecord().
setPartitionId(2).
setTopicId(fooId).
setLeader(0),
MetadataVersion.latestTesting().partitionChangeRecordVersion())),
election2Result.records());
}
@Test
public void testBalancePartitionLeaders() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(2, 3, 4);
Uuid fooId = ctx.createTestTopic("foo", new int[][]{
new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
assertTrue(replication.arePartitionLeadersImbalanced());
ctx.unfenceBrokers(1);
ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
anonymousContextFor(ApiKeys.ALTER_PARTITION),
new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102).
setTopics(singletonList(new TopicData().setTopicId(fooId).
setPartitions(singletonList(new PartitionData().
setPartitionIndex(0).setPartitionEpoch(0).
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3)))))));
assertEquals(new AlterPartitionResponseData().setTopics(singletonList(
new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(singletonList(
new AlterPartitionResponseData.PartitionData().
setPartitionIndex(0).
setLeaderId(2).
setLeaderEpoch(0).
setIsr(asList(1, 2, 3)).
setPartitionEpoch(1).
setErrorCode(NONE.code()))))),
alterPartitionResult.response());
ctx.replay(alterPartitionResult.records());
ControllerResult<Boolean> balanceResult = replication.maybeBalancePartitionLeaders();
ctx.replay(balanceResult.records());
PartitionChangeRecord expectedChangeRecord = new PartitionChangeRecord()
.setPartitionId(0)
.setTopicId(fooId)
.setLeader(1);
assertEquals(singletonList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latestTesting().partitionChangeRecordVersion())), balanceResult.records());
assertTrue(replication.arePartitionLeadersImbalanced());
assertFalse(balanceResult.response());
ctx.unfenceBrokers(0);
alterPartitionResult = replication.alterPartition(
anonymousContextFor(ApiKeys.ALTER_PARTITION),
new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102).
setTopics(singletonList(new TopicData().setTopicId(fooId).
setPartitions(singletonList(new PartitionData().
setPartitionIndex(2).setPartitionEpoch(0).
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(0, 2, 1)))))));
assertEquals(new AlterPartitionResponseData().setTopics(singletonList(
new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(singletonList(
new AlterPartitionResponseData.PartitionData().
setPartitionIndex(2).
setLeaderId(2).
setLeaderEpoch(0).
setIsr(asList(0, 2, 1)).
setPartitionEpoch(1).
setErrorCode(NONE.code()))))),
alterPartitionResult.response());
ctx.replay(alterPartitionResult.records());
balanceResult = replication.maybeBalancePartitionLeaders();
ctx.replay(balanceResult.records());
expectedChangeRecord = new PartitionChangeRecord()
.setPartitionId(2)
.setTopicId(fooId)
.setLeader(0);
assertEquals(singletonList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latestTesting().partitionChangeRecordVersion())), balanceResult.records());
assertFalse(replication.arePartitionLeadersImbalanced());
assertFalse(balanceResult.response());
}
@ParameterizedTest
@ValueSource(strings = {"none", "static", "dynamic_cluster", "dynamic_node", "dynamic_topic"})
public void testMaybeTriggerUncleanLeaderElectionForLeaderlessPartitions(String uncleanConfig) {
ReplicationControlTestContext.Builder ctxBuilder = new ReplicationControlTestContext.Builder();
if (uncleanConfig.equals("static")) {
ctxBuilder.setStaticConfig(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true");
}
ReplicationControlTestContext ctx = ctxBuilder.build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(0, 1, 2, 3, 4);
Uuid fooId = ctx.createTestTopic("foo", new int[][]{
new int[]{1, 2, 4}, new int[]{1, 3, 4}, new int[]{0, 2, 4}}).topicId();
assertFalse(replication.areSomePartitionsLeaderless());
ctx.fenceBrokers(0, 1, 2, 3, 4);
assertTrue(replication.areSomePartitionsLeaderless());
for (int partitionId : Arrays.asList(0, 1, 2)) {
assertArrayEquals(new int[] {4}, ctx.replicationControl.getPartition(fooId, partitionId).isr);
assertEquals(-1, ctx.replicationControl.getPartition(fooId, partitionId).leader);
}
// Unfence broker 2. It is now available to be the leader for partition 0 and 2, after
// an unclean election.
ctx.unfenceBrokers(2);
if (uncleanConfig.equals("static")) {
// If we statically configured unclean leader election, the election already happened.
assertArrayEquals(new int[] {2}, ctx.replicationControl.getPartition(fooId, 0).isr);
assertEquals(2, ctx.replicationControl.getPartition(fooId, 0).leader);
assertArrayEquals(new int[] {4}, ctx.replicationControl.getPartition(fooId, 1).isr);
assertEquals(-1, ctx.replicationControl.getPartition(fooId, 1).leader);
assertArrayEquals(new int[] {2}, ctx.replicationControl.getPartition(fooId, 2).isr);
assertEquals(2, ctx.replicationControl.getPartition(fooId, 2).leader);
} else {
// Otherwise, check that the election did NOT happen.
for (int partitionId : Arrays.asList(0, 1, 2)) {
assertArrayEquals(new int[] {4}, ctx.replicationControl.getPartition(fooId, partitionId).isr);
assertEquals(-1, ctx.replicationControl.getPartition(fooId, partitionId).leader);
}
}
// If we're setting unclean leader election dynamically, do that here.
if (uncleanConfig.equals("dynamic_cluster")) {
ctx.replay(ctx.configurationControl.incrementalAlterConfigs(
Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""),
Collections.singletonMap(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))),
true).records());
} else if (uncleanConfig.equals("dynamic_node")) {
ctx.replay(ctx.configurationControl.incrementalAlterConfigs(
Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, "0"),
Collections.singletonMap(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))),
true).records());
} else if (uncleanConfig.equals("dynamic_topic")) {
ctx.replay(ctx.configurationControl.incrementalAlterConfigs(
Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, "foo"),
Collections.singletonMap(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))),
true).records());
}
ControllerResult<Boolean> balanceResult = replication.maybeElectUncleanLeaders();
assertFalse(balanceResult.response());
if (uncleanConfig.equals("none") || uncleanConfig.equals("static")) {
assertEquals(0, balanceResult.records().size(), "Expected no records, but " +
balanceResult.records().size() + " were found.");
} else {
assertNotEquals(0, balanceResult.records().size(), "Expected some records, but " +
"none were found.");
ctx.replay(balanceResult.records());
assertArrayEquals(new int[] {2}, ctx.replicationControl.getPartition(fooId, 0).isr);
assertEquals(2, ctx.replicationControl.getPartition(fooId, 0).leader);
assertArrayEquals(new int[] {4}, ctx.replicationControl.getPartition(fooId, 1).isr);
assertEquals(-1, ctx.replicationControl.getPartition(fooId, 1).leader);
assertArrayEquals(new int[] {2}, ctx.replicationControl.getPartition(fooId, 2).isr);
assertEquals(2, ctx.replicationControl.getPartition(fooId, 2).leader);
}
}
private void assertElectLeadersResponse(
ElectLeadersResponseData expected,
ElectLeadersResponseData actual
) {
assertEquals(Errors.forCode(expected.errorCode()), Errors.forCode(actual.errorCode()));
assertEquals(collectElectLeadersErrors(expected), collectElectLeadersErrors(actual));
}
private Map<TopicPartition, PartitionResult> collectElectLeadersErrors(ElectLeadersResponseData response) {
Map<TopicPartition, PartitionResult> res = new HashMap<>();
response.replicaElectionResults().forEach(topicResult -> {
String topic = topicResult.topic();
topicResult.partitionResult().forEach(partitionResult -> {
TopicPartition topicPartition = new TopicPartition(topic, partitionResult.partitionId());
res.put(topicPartition, partitionResult);
});
});
return res;
}
private ElectLeadersResponseData buildElectLeadersResponse(
Errors topLevelError,
boolean electAllPartitions,
Map<TopicPartition, ApiError> errors
) {
Map<String, List<Map.Entry<TopicPartition, ApiError>>> errorsByTopic = errors.entrySet().stream()
.collect(Collectors.groupingBy(entry -> entry.getKey().topic()));
ElectLeadersResponseData response = new ElectLeadersResponseData()
.setErrorCode(topLevelError.code());
errorsByTopic.forEach((topic, partitionErrors) -> {
ReplicaElectionResult electionResult = new ReplicaElectionResult().setTopic(topic);
electionResult.setPartitionResult(partitionErrors.stream()
.filter(entry -> !electAllPartitions || entry.getValue().error() != ELECTION_NOT_NEEDED)
.map(entry -> {
TopicPartition topicPartition = entry.getKey();
ApiError error = entry.getValue();
return new PartitionResult()
.setPartitionId(topicPartition.partition())
.setErrorCode(error.error().code())
.setErrorMessage(error.message());
})
.collect(Collectors.toList()));
response.replicaElectionResults().add(electionResult);
});
return response;
}
@Test
public void testKRaftClusterDescriber() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokersWithDirs(
0, Collections.emptyList(),
1, Collections.emptyList(),
2, asList(Uuid.fromString("ozwqsVMFSNiYQUPSJA3j0w")),
3, asList(Uuid.fromString("SSDgCZ4BTyec5QojGT65qg"), Uuid.fromString("K8KwMrviRcOUvgI8FPOJWg")),
4, Collections.emptyList()
);
ctx.unfenceBrokers(2, 3, 4);
ctx.createTestTopic("foo", new int[][]{
new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
ctx.createTestTopic("bar", new int[][]{
new int[]{2, 3, 4}, new int[]{3, 4, 2}}).topicId();
KRaftClusterDescriber describer = replication.clusterDescriber;
HashSet<UsableBroker> brokers = new HashSet<>();
describer.usableBrokers().forEachRemaining(broker -> brokers.add(broker));
assertEquals(new HashSet<>(Arrays.asList(
new UsableBroker(0, Optional.empty(), true),
new UsableBroker(1, Optional.empty(), true),
new UsableBroker(2, Optional.empty(), false),
new UsableBroker(3, Optional.empty(), false),
new UsableBroker(4, Optional.empty(), false))), brokers);
assertEquals(DirectoryId.MIGRATING, describer.defaultDir(1));
assertEquals(Uuid.fromString("ozwqsVMFSNiYQUPSJA3j0w"), describer.defaultDir(2));
assertEquals(DirectoryId.UNASSIGNED, describer.defaultDir(3));
}
@ParameterizedTest
@EnumSource(value = MetadataVersion.class, names = {"IBP_3_3_IV2", "IBP_3_3_IV3"})
public void testProcessBrokerHeartbeatInControlledShutdown(MetadataVersion metadataVersion) {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().
setMetadataVersion(metadataVersion).
build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
Uuid topicId = ctx.createTestTopic("foo", new int[][]{new int[]{0, 1, 2}}).topicId();
BrokerHeartbeatRequestData heartbeatRequest = new BrokerHeartbeatRequestData()
.setBrokerId(0)
.setBrokerEpoch(100)
.setCurrentMetadataOffset(0)
.setWantShutDown(true);
ControllerResult<BrokerHeartbeatReply> result = ctx.replicationControl
.processBrokerHeartbeat(heartbeatRequest, 0);
List<ApiMessageAndVersion> expectedRecords = new ArrayList<>();
if (metadataVersion.isInControlledShutdownStateSupported()) {
expectedRecords.add(new ApiMessageAndVersion(
new BrokerRegistrationChangeRecord()
.setBrokerEpoch(100)
.setBrokerId(0)
.setInControlledShutdown(BrokerRegistrationInControlledShutdownChange
.IN_CONTROLLED_SHUTDOWN.value()),
(short) 1));
}
expectedRecords.add(new ApiMessageAndVersion(
new PartitionChangeRecord()
.setPartitionId(0)
.setTopicId(topicId)
.setIsr(asList(1, 2))
.setLeader(1),
(short) 0));
assertEquals(expectedRecords, result.records());
}
@Test
public void testProcessExpiredBrokerHeartbeat() {
MockTime mockTime = new MockTime(0, 0, 0);
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().
setMockTime(mockTime).
build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
BrokerHeartbeatRequestData heartbeatRequest = new BrokerHeartbeatRequestData().
setBrokerId(0).
setBrokerEpoch(100).
setCurrentMetadataOffset(123).
setWantShutDown(false);
mockTime.sleep(100);
ctx.replicationControl.processExpiredBrokerHeartbeat(heartbeatRequest);
Optional<BrokerHeartbeatState> state =
ctx.clusterControl.heartbeatManager().brokers().stream().
filter(broker -> broker.id() == 0).findFirst();
assertTrue(state.isPresent());
assertEquals(0, state.get().id());
assertEquals(123, state.get().metadataOffset());
}
@Test
public void testReassignPartitionsHandlesNewReassignmentThatRemovesPreviouslyAddingReplicas() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4, 5);
ctx.unfenceBrokers(0, 1, 2, 3, 4, 5);
String topic = "topic-1";
// Create topic with assignment [0, 1]
Uuid topicId = ctx.createTestTopic(topic, new int[][] {new int[] {0, 1}}).topicId();
log.debug("Created topic with ID {}", topicId);
// Confirm we start off with no reassignments.
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
// Reassign to [2, 3]
ControllerResult<AlterPartitionReassignmentsResponseData> alterResultOne =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(singletonList(
new ReassignableTopic().setName(topic).setPartitions(singletonList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(asList(2, 3)))))));
assertEquals(new AlterPartitionReassignmentsResponseData().
setErrorMessage(null).setResponses(singletonList(
new ReassignableTopicResponse().setName(topic).setPartitions(singletonList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null))))), alterResultOne.response());
ctx.replay(alterResultOne.records());
ListPartitionReassignmentsResponseData currentReassigning =
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
setTopics(singletonList(new OngoingTopicReassignment().
setName(topic).setPartitions(singletonList(
new OngoingPartitionReassignment().setPartitionIndex(0).
setRemovingReplicas(asList(0, 1)).
setAddingReplicas(asList(2, 3)).
setReplicas(asList(2, 3, 0, 1))))));
// Make sure the reassignment metadata is as expected.
assertEquals(currentReassigning, replication.listPartitionReassignments(null, Long.MAX_VALUE));
PartitionRegistration partition = replication.getPartition(topicId, 0);
// Add replica 2 to the ISR.
AlterPartitionRequestData alterPartitionRequestData = new AlterPartitionRequestData().
setBrokerId(partition.leader).
setBrokerEpoch(ctx.currentBrokerEpoch(partition.leader)).
setTopics(singletonList(new TopicData().
setTopicId(topicId).
setPartitions(singletonList(new PartitionData().
setPartitionIndex(0).
setPartitionEpoch(partition.partitionEpoch).
setLeaderEpoch(partition.leaderEpoch).
setNewIsrWithEpochs(isrWithDefaultEpoch(0, 1, 2))))));
ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
anonymousContextFor(ApiKeys.ALTER_PARTITION),
new AlterPartitionRequest.Builder(alterPartitionRequestData).build().data());
assertEquals(new AlterPartitionResponseData().setTopics(singletonList(
new AlterPartitionResponseData.TopicData().
setTopicId(topicId).
setPartitions(singletonList(
new AlterPartitionResponseData.PartitionData().
setPartitionIndex(0).
setIsr(asList(0, 1, 2)).
setPartitionEpoch(partition.partitionEpoch + 1).
setErrorCode(NONE.code()))))),
alterPartitionResult.response());
ctx.replay(alterPartitionResult.records());
// Elect replica 2 as leader via preferred leader election. 2 is at the front of the replicas list.
ElectLeadersRequestData request = buildElectLeadersRequest(
ElectionType.PREFERRED,
singletonMap(topic, singletonList(0))
);
ControllerResult<ElectLeadersResponseData> electLeaderTwoResult = replication.electLeaders(request);
ReplicaElectionResult replicaElectionResult = new ReplicaElectionResult().setTopic(topic);
replicaElectionResult.setPartitionResult(singletonList(new PartitionResult().setPartitionId(0).setErrorCode(NONE.code()).setErrorMessage(null)));
assertEquals(
new ElectLeadersResponseData().setErrorCode(NONE.code()).setReplicaElectionResults(singletonList(replicaElectionResult)),
electLeaderTwoResult.response()
);
ctx.replay(electLeaderTwoResult.records());
// Make sure 2 is the leader
partition = replication.getPartition(topicId, 0);
assertEquals(2, partition.leader);
// Reassign to [4, 5]
ControllerResult<AlterPartitionReassignmentsResponseData> alterResultTwo =
replication.alterPartitionReassignments(
new AlterPartitionReassignmentsRequestData().setTopics(singletonList(
new ReassignableTopic().setName(topic).setPartitions(singletonList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(asList(4, 5)))))));
assertEquals(new AlterPartitionReassignmentsResponseData().
setErrorMessage(null).setResponses(singletonList(
new ReassignableTopicResponse().setName(topic).setPartitions(singletonList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null))))), alterResultTwo.response());
ctx.replay(alterResultTwo.records());
// Make sure the replicas list contains all the previous replicas 0, 1, 2, 3 as well as the new replicas 3, 4
currentReassigning =
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
setTopics(singletonList(new OngoingTopicReassignment().
setName(topic).setPartitions(singletonList(
new OngoingPartitionReassignment().setPartitionIndex(0).
setRemovingReplicas(asList(0, 1, 2, 3)).
setAddingReplicas(asList(4, 5)).
setReplicas(asList(4, 5, 0, 1, 2, 3))))));
assertEquals(currentReassigning, replication.listPartitionReassignments(null, Long.MAX_VALUE));
// Make sure the leader is in the replicas still
partition = replication.getPartition(topicId, 0);
assertEquals(2, partition.leader);
assertTrue(Replicas.toSet(partition.replicas).contains(partition.leader));
// Add 3, 4 to the ISR to complete the reassignment
AlterPartitionRequestData alterPartitionRequestDataTwo = new AlterPartitionRequestData().
setBrokerId(partition.leader).
setBrokerEpoch(ctx.currentBrokerEpoch(partition.leader)).
setTopics(singletonList(new TopicData().
setTopicId(topicId).
setPartitions(singletonList(new PartitionData().
setPartitionIndex(0).
setPartitionEpoch(partition.partitionEpoch).
setLeaderEpoch(partition.leaderEpoch).
setNewIsrWithEpochs(isrWithDefaultEpoch(0, 1, 2, 3, 4, 5))))));
ControllerResult<AlterPartitionResponseData> alterPartitionResultTwo = replication.alterPartition(
anonymousContextFor(ApiKeys.ALTER_PARTITION),
new AlterPartitionRequest.Builder(alterPartitionRequestDataTwo).build().data());
assertEquals(new AlterPartitionResponseData().setTopics(singletonList(
new AlterPartitionResponseData.TopicData().
setTopicId(topicId).
setPartitions(singletonList(
new AlterPartitionResponseData.PartitionData().
setPartitionIndex(0).
setErrorCode(NEW_LEADER_ELECTED.code()))))),
alterPartitionResultTwo.response());
ctx.replay(alterPartitionResultTwo.records());
// After reassignment is finally complete, make sure 4 is the leader now.
partition = replication.getPartition(topicId, 0);
assertEquals(4, partition.leader);
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
}
private static BrokerState brokerState(int brokerId, Long brokerEpoch) {
return new BrokerState().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch);
}
private static Long defaultBrokerEpoch(int brokerId) {
return brokerId + 100L;
}
private static List<BrokerState> isrWithDefaultEpoch(Integer... isr) {
return Arrays.stream(isr).map(brokerId -> brokerState(brokerId, defaultBrokerEpoch(brokerId)))
.collect(Collectors.toList());
}
@Test
public void testDuplicateTopicIdReplay() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
replicationControl.replay(new TopicRecord().
setName("foo").
setTopicId(Uuid.fromString("Ktv3YkMQRe-MId4VkkrMyw")));
assertEquals("Found duplicate TopicRecord for foo with topic ID Ktv3YkMQRe-MId4VkkrMyw",
assertThrows(RuntimeException.class,
() -> replicationControl.replay(new TopicRecord().
setName("foo").
setTopicId(Uuid.fromString("Ktv3YkMQRe-MId4VkkrMyw")))).
getMessage());
assertEquals("Found duplicate TopicRecord for foo with a different ID than before. " +
"Previous ID was Ktv3YkMQRe-MId4VkkrMyw and new ID is 8auUWq8zQqe_99H_m2LAmw",
assertThrows(RuntimeException.class,
() -> replicationControl.replay(new TopicRecord().
setName("foo").
setTopicId(Uuid.fromString("8auUWq8zQqe_99H_m2LAmw")))).
getMessage());
}
@Test
void testHandleAssignReplicasToDirsFailsOnOlderMv() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().
setMetadataVersion(MetadataVersion.IBP_3_7_IV1).
build();
assertThrows(UnsupportedVersionException.class,
() -> ctx.replicationControl.handleAssignReplicasToDirs(new AssignReplicasToDirsRequestData()));
}
@Test
void testHandleAssignReplicasToDirs() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
Uuid dir1b1 = Uuid.fromString("hO2YI5bgRUmByNPHiHxjNQ");
Uuid dir2b1 = Uuid.fromString("R3Gb1HLoTzuKMgAkH5Vtpw");
Uuid dir1b2 = Uuid.fromString("TBGa8UayQi6KguqF5nC0sw");
Uuid offlineDir = Uuid.fromString("zvAf9BKZRyyrEWz4FX2nLA");
ctx.registerBrokersWithDirs(1, asList(dir1b1, dir2b1), 2, singletonList(dir1b2));
ctx.unfenceBrokers(1, 2);
Uuid topicA = ctx.createTestTopic("a", new int[][]{new int[]{1, 2}, new int[]{1, 2}, new int[]{1, 2}}).topicId();
Uuid topicB = ctx.createTestTopic("b", new int[][]{new int[]{1, 2}, new int[]{1, 2}}).topicId();
Uuid topicC = ctx.createTestTopic("c", new int[][]{new int[]{2}}).topicId();
ControllerResult<AssignReplicasToDirsResponseData> controllerResult = ctx.assignReplicasToDirs(1, new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(topicA, 0), dir1b1);
put(new TopicIdPartition(topicA, 1), dir2b1);
put(new TopicIdPartition(topicA, 2), offlineDir); // unknown/offline dir
put(new TopicIdPartition(topicB, 0), dir1b1);
put(new TopicIdPartition(topicB, 1), DirectoryId.LOST);
put(new TopicIdPartition(Uuid.fromString("nLU9hKNXSZuMe5PO2A4dVQ"), 1), dir2b1); // expect UNKNOWN_TOPIC_ID
put(new TopicIdPartition(topicA, 137), dir1b1); // expect UNKNOWN_TOPIC_OR_PARTITION
put(new TopicIdPartition(topicC, 0), dir1b1); // expect NOT_LEADER_OR_FOLLOWER
}});
assertEquals(AssignmentsHelper.normalize(AssignmentsHelper.buildResponseData((short) 0, 0, new HashMap<Uuid, Map<TopicIdPartition, Errors>>() {{
put(dir1b1, new HashMap<TopicIdPartition, Errors>() {{
put(new TopicIdPartition(topicA, 0), NONE);
put(new TopicIdPartition(topicA, 137), UNKNOWN_TOPIC_OR_PARTITION);
put(new TopicIdPartition(topicB, 0), NONE);
put(new TopicIdPartition(topicC, 0), NOT_LEADER_OR_FOLLOWER);
}});
put(dir2b1, new HashMap<TopicIdPartition, Errors>() {{
put(new TopicIdPartition(topicA, 1), NONE);
put(new TopicIdPartition(Uuid.fromString("nLU9hKNXSZuMe5PO2A4dVQ"), 1), UNKNOWN_TOPIC_ID);
}});
put(offlineDir, new HashMap<TopicIdPartition, Errors>() {{
put(new TopicIdPartition(topicA, 2), NONE);
}});
put(DirectoryId.LOST, new HashMap<TopicIdPartition, Errors>() {{
put(new TopicIdPartition(topicB, 1), NONE);
}});
}})), AssignmentsHelper.normalize(controllerResult.response()));
short recordVersion = ctx.featureControl.metadataVersion().partitionChangeRecordVersion();
assertEquals(sortPartitionChangeRecords(asList(
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(topicA).setPartitionId(0)
.setDirectories(asList(dir1b1, dir1b2)), recordVersion),
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(topicA).setPartitionId(1).
setDirectories(asList(dir2b1, dir1b2)), recordVersion),
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(topicA).setPartitionId(2).
setDirectories(asList(offlineDir, dir1b2)), recordVersion),
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(topicB).setPartitionId(0).
setDirectories(asList(dir1b1, dir1b2)), recordVersion),
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(topicB).setPartitionId(1).
setDirectories(asList(DirectoryId.LOST, dir1b2)), recordVersion),
// In addition to the directory assignment changes we expect two additional records,
// which elect new leaders for:
// - a-2 which has been assigned to a directory which is not an online directory (unknown/offline)
// - b-1 which has been assigned to an offline directory.
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(topicA).setPartitionId(2).
setIsr(singletonList(2)).setLeader(2), recordVersion),
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(topicB).setPartitionId(1).
setIsr(singletonList(2)).setLeader(2), recordVersion)
)), sortPartitionChangeRecords(controllerResult.records()));
ctx.replay(controllerResult.records());
assertEquals(new HashSet<TopicIdPartition>() {{
add(new TopicIdPartition(topicA, 0));
add(new TopicIdPartition(topicA, 1));
add(new TopicIdPartition(topicB, 0));
}}, RecordTestUtils.iteratorToSet(ctx.replicationControl.brokersToIsrs().iterator(1, true)));
assertEquals(new HashSet<TopicIdPartition>() {{
add(new TopicIdPartition(topicA, 2));
add(new TopicIdPartition(topicB, 1));
add(new TopicIdPartition(topicC, 0));
}},
RecordTestUtils.iteratorToSet(ctx.replicationControl.brokersToIsrs().iterator(2, true)));
}
@Test
void testHandleDirectoriesOffline() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
int b1 = 101, b2 = 102;
Uuid dir1b1 = Uuid.fromString("suitdzfTTdqoWcy8VqmkUg");
Uuid dir2b1 = Uuid.fromString("yh3acnzGSeurSTj8aIhOjw");
Uuid dir1b2 = Uuid.fromString("OmpmJ8RjQliQlEFht56DwQ");
Uuid dir2b2 = Uuid.fromString("w05baLpsT5Oz0LvKTKXoDw");
ctx.registerBrokersWithDirs(b1, asList(dir1b1, dir2b1), b2, asList(dir1b2, dir2b2));
ctx.unfenceBrokers(b1, b2);
Uuid topicA = ctx.createTestTopic("a", new int[][]{new int[]{b1, b2}, new int[]{b1, b2}}).topicId();
Uuid topicB = ctx.createTestTopic("b", new int[][]{new int[]{b1, b2}, new int[]{b1, b2}}).topicId();
ctx.assignReplicasToDirs(b1, new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(topicA, 0), dir1b1);
put(new TopicIdPartition(topicA, 1), dir2b1);
put(new TopicIdPartition(topicB, 0), dir1b1);
put(new TopicIdPartition(topicB, 1), dir2b1);
}});
ctx.assignReplicasToDirs(b2, new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(topicA, 0), dir1b2);
put(new TopicIdPartition(topicA, 1), dir2b2);
put(new TopicIdPartition(topicB, 0), dir1b2);
put(new TopicIdPartition(topicB, 1), dir2b2);
}});
List<ApiMessageAndVersion> records = new ArrayList<>();
ctx.replicationControl.handleDirectoriesOffline(b1, defaultBrokerEpoch(b1), asList(
dir1b1,
dir1b2 // should not cause update to dir1b2 as it's not registered to b1
), records);
assertEquals(
singletonList(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord()
.setBrokerId(b1).setBrokerEpoch(defaultBrokerEpoch(b1))
.setLogDirs(singletonList(dir2b1)), (short) 2)),
filter(records, BrokerRegistrationChangeRecord.class)
);
short partitionChangeRecordVersion = ctx.featureControl.metadataVersion().partitionChangeRecordVersion();
assertEquals(
sortPartitionChangeRecords(asList(
new ApiMessageAndVersion(new PartitionChangeRecord().setTopicId(topicA).setPartitionId(0)
.setLeader(b2).setIsr(singletonList(b2)), partitionChangeRecordVersion),
new ApiMessageAndVersion(new PartitionChangeRecord().setTopicId(topicB).setPartitionId(0)
.setLeader(b2).setIsr(singletonList(b2)), partitionChangeRecordVersion)
)),
sortPartitionChangeRecords(filter(records, PartitionChangeRecord.class))
);
assertEquals(3, records.size());
ctx.replay(records);
assertEquals(Collections.singletonList(dir2b1), ctx.clusterControl.registration(b1).directories());
}
/**
* Sorts {@link PartitionChangeRecord} by topic ID and partition ID,
* so that the order of the records is deterministic, and can be compared.
*/
private static List<ApiMessageAndVersion> sortPartitionChangeRecords(List<ApiMessageAndVersion> records) {
records = new ArrayList<>(records);
records.sort(Comparator.comparing((ApiMessageAndVersion record) -> {
PartitionChangeRecord partitionChangeRecord = (PartitionChangeRecord) record.message();
return partitionChangeRecord.topicId() + "-" + partitionChangeRecord.partitionId();
}));
return records;
}
private static List<ApiMessageAndVersion> filter(List<ApiMessageAndVersion> records, Class<? extends ApiMessage> clazz) {
return records.stream().filter(r -> clazz.equals(r.message().getClass())).collect(Collectors.toList());
}
@ParameterizedTest
@CsvSource({"false, false", "false, true", "true, false", "true, true"})
void testElrsRemovedOnMinIsrUpdate(boolean clusterLevel, boolean useLegacyAlterConfigs) {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().
setIsElrEnabled(true).
setStaticConfig(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2").
build();
ctx.registerBrokers(1, 2, 3, 4);
ctx.unfenceBrokers(1, 2, 3, 4);
Uuid fooId = ctx.createTestTopic("foo", new int[][]{
new int[]{1, 2, 4}, new int[]{1, 3, 4}}).topicId();
Uuid barId = ctx.createTestTopic("bar", new int[][]{
new int[]{1, 2, 4}, new int[]{1, 3, 4}}).topicId();
ctx.fenceBrokers(4);
ctx.fenceBrokers(1);
assertArrayEquals(new int[]{1}, ctx.replicationControl.getPartition(fooId, 0).elr);
assertArrayEquals(new int[]{1}, ctx.replicationControl.getPartition(barId, 0).elr);
ConfigResource configResource;
if (clusterLevel) {
configResource = new ConfigResource(ConfigResource.Type.BROKER, "");
} else {
configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo");
}
if (useLegacyAlterConfigs) {
ctx.replay(ctx.configurationControl.legacyAlterConfigs(
Collections.singletonMap(configResource,
Collections.singletonMap(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")),
false).records());
} else {
ctx.replay(ctx.configurationControl.incrementalAlterConfigs(
Collections.singletonMap(configResource,
Collections.singletonMap(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "1"))),
false).records());
}
assertArrayEquals(new int[]{}, ctx.replicationControl.getPartition(fooId, 0).elr);
if (clusterLevel) {
assertArrayEquals(new int[]{}, ctx.replicationControl.getPartition(barId, 0).elr);
} else {
assertArrayEquals(new int[]{1}, ctx.replicationControl.getPartition(barId, 0).elr);
}
}
@Test
void testElrsRemovedShouldNotBumpPartitionEpochIfNoChange() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().
setIsElrEnabled(true).
setStaticConfig(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2").
build();
ctx.registerBrokers(1, 2, 3, 4);
ctx.unfenceBrokers(1, 2, 3, 4);
Uuid fooId = ctx.createTestTopic("foo", new int[][]{
new int[]{1, 2, 4}, new int[]{1, 3, 4}}).topicId();
int partitionEpoch = ctx.replicationControl.getPartition(fooId, 0).partitionEpoch;
ctx.replay(Arrays.asList(new ApiMessageAndVersion(new ClearElrRecord(), CLEAR_ELR_RECORD.highestSupportedVersion())));
assertEquals(partitionEpoch, ctx.replicationControl.getPartition(fooId, 0).partitionEpoch);
}
}