blob: 759e3dfe5c4c5c907e2108cfbf6e78d49e1fc32d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.NoReassignmentInProgressException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfigCollection;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.ClusterDescriber;
import org.apache.kafka.metadata.placement.PartitionAssignment;
import org.apache.kafka.metadata.placement.PlacementSpec;
import org.apache.kafka.metadata.placement.TopicAssignment;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map.Entry;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.IntPredicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
import static org.apache.kafka.common.protocol.Errors.INELIGIBLE_REPLICA;
import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION;
import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED;
import static org.apache.kafka.common.protocol.Errors.NONE;
import static org.apache.kafka.common.protocol.Errors.NOT_CONTROLLER;
import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED;
import static org.apache.kafka.common.protocol.Errors.TOPIC_AUTHORIZATION_FAILED;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static org.apache.kafka.controller.PartitionReassignmentReplicas.isReassignmentInProgress;
import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
/**
* The ReplicationControlManager is the part of the controller which deals with topics
* and partitions. It is responsible for managing the in-sync replica set and leader
* of each partition, as well as administrative tasks like creating or deleting topics.
*/
public class ReplicationControlManager {
static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
static class Builder {
private SnapshotRegistry snapshotRegistry = null;
private LogContext logContext = null;
private short defaultReplicationFactor = (short) 3;
private int defaultNumPartitions = 1;
private int defaultMinIsr = 1;
private int maxElectionsPerImbalance = MAX_ELECTIONS_PER_IMBALANCE;
private ConfigurationControlManager configurationControl = null;
private ClusterControlManager clusterControl = null;
private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
private FeatureControlManager featureControl = null;
private boolean eligibleLeaderReplicasEnabled = false;
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
return this;
}
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
return this;
}
Builder setDefaultReplicationFactor(short defaultReplicationFactor) {
this.defaultReplicationFactor = defaultReplicationFactor;
return this;
}
Builder setDefaultNumPartitions(int defaultNumPartitions) {
this.defaultNumPartitions = defaultNumPartitions;
return this;
}
Builder setDefaultMinIsr(int defaultMinIsr) {
this.defaultMinIsr = defaultMinIsr;
return this;
}
Builder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) {
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
return this;
}
Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) {
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
return this;
}
Builder setConfigurationControl(ConfigurationControlManager configurationControl) {
this.configurationControl = configurationControl;
return this;
}
Builder setClusterControl(ClusterControlManager clusterControl) {
this.clusterControl = clusterControl;
return this;
}
Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> createTopicPolicy) {
this.createTopicPolicy = createTopicPolicy;
return this;
}
public Builder setFeatureControl(FeatureControlManager featureControl) {
this.featureControl = featureControl;
return this;
}
ReplicationControlManager build() {
if (configurationControl == null) {
throw new IllegalStateException("Configuration control must be set before building");
} else if (clusterControl == null) {
throw new IllegalStateException("Cluster control must be set before building");
}
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = configurationControl.snapshotRegistry();
if (featureControl == null) {
throw new IllegalStateException("FeatureControlManager must not be null");
}
return new ReplicationControlManager(snapshotRegistry,
logContext,
defaultReplicationFactor,
defaultNumPartitions,
defaultMinIsr,
maxElectionsPerImbalance,
eligibleLeaderReplicasEnabled,
configurationControl,
clusterControl,
createTopicPolicy,
featureControl);
}
}
class KRaftClusterDescriber implements ClusterDescriber {
@Override
public Iterator<UsableBroker> usableBrokers() {
return clusterControl.usableBrokers();
}
@Override
public Uuid defaultDir(int brokerId) {
if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) {
return clusterControl.defaultDir(brokerId);
} else {
return DirectoryId.MIGRATING;
}
}
}
static class TopicControlInfo {
private final String name;
private final Uuid id;
private final TimelineHashMap<Integer, PartitionRegistration> parts;
TopicControlInfo(String name, SnapshotRegistry snapshotRegistry, Uuid id) {
this.name = name;
this.id = id;
this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
}
public String name() {
return name;
}
public Uuid topicId() {
return id;
}
public int numPartitions(long epoch) {
return parts.size(epoch);
}
}
/**
* Translate a CreateableTopicConfigCollection to a map from string to string.
*/
static Map<String, String> translateCreationConfigs(CreateableTopicConfigCollection collection) {
HashMap<String, String> result = new HashMap<>();
collection.forEach(config -> result.put(config.name(), config.value()));
return Collections.unmodifiableMap(result);
}
private final SnapshotRegistry snapshotRegistry;
private final Logger log;
/**
* The KIP-464 default replication factor that is used if a CreateTopics request does
* not specify one.
*/
private final short defaultReplicationFactor;
/**
* The KIP-464 default number of partitions that is used if a CreateTopics request does
* not specify a number of partitions.
*/
private final int defaultNumPartitions;
/**
* The default min ISR that is used if a CreateTopics request does not specify one.
*/
private final int defaultMinIsr;
/**
* True if eligible leader replicas is enabled.
*/
private final boolean eligibleLeaderReplicasEnabled;
/**
* Maximum number of leader elections to perform during one partition leader balancing operation.
*/
private final int maxElectionsPerImbalance;
/**
* A reference to the controller's configuration control manager.
*/
private final ConfigurationControlManager configurationControl;
/**
* A reference to the controller's cluster control manager.
*/
private final ClusterControlManager clusterControl;
/**
* The policy to use to validate that topic assignments are valid, if one is present.
*/
private final Optional<CreateTopicPolicy> createTopicPolicy;
/**
* The feature control manager.
*/
private final FeatureControlManager featureControl;
/**
* Maps topic names to topic UUIDs.
*/
private final TimelineHashMap<String, Uuid> topicsByName;
/**
* We try to prevent topics from being created if their names would collide with
* existing topics when periods in the topic name are replaced with underscores.
* The reason for this is that some per-topic metrics do replace periods with
* underscores, and would therefore be ambiguous otherwise.
*
* This map is from normalized topic name to a set of topic names. So if we had two
* topics named foo.bar and foo_bar this map would contain
* a mapping from foo_bar to a set containing foo.bar and foo_bar.
*
* Since we reject topic creations that would collide, under normal conditions the
* sets in this map should only have a size of 1. However, if the cluster was
* upgraded from a version prior to KAFKA-13743, it may be possible to have more
* values here, since colliding topic names will be "grandfathered in."
*/
private final TimelineHashMap<String, TimelineHashSet<String>> topicsWithCollisionChars;
/**
* Maps topic UUIDs to structures containing topic information, including partitions.
*/
private final TimelineHashMap<Uuid, TopicControlInfo> topics;
/**
* A map of broker IDs to the partitions that the broker is in the ISR for.
*/
private final BrokersToIsrs brokersToIsrs;
/**
* A map from topic IDs to the partitions in the topic which are reassigning.
*/
private final TimelineHashMap<Uuid, int[]> reassigningTopics;
/**
* The set of topic partitions for which the leader is not the preferred leader.
*/
private final TimelineHashSet<TopicIdPartition> imbalancedPartitions;
/**
* A map from registered directory IDs to the partitions that are stored in that directory.
*/
private final TimelineHashMap<Uuid, TimelineHashSet<TopicIdPartition>> directoriesToPartitions;
/**
* A ClusterDescriber which supplies cluster information to our ReplicaPlacer.
*/
final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber();
private ReplicationControlManager(
SnapshotRegistry snapshotRegistry,
LogContext logContext,
short defaultReplicationFactor,
int defaultNumPartitions,
int defaultMinIsr,
int maxElectionsPerImbalance,
boolean eligibleLeaderReplicasEnabled,
ConfigurationControlManager configurationControl,
ClusterControlManager clusterControl,
Optional<CreateTopicPolicy> createTopicPolicy,
FeatureControlManager featureControl
) {
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(ReplicationControlManager.class);
this.defaultReplicationFactor = defaultReplicationFactor;
this.defaultNumPartitions = defaultNumPartitions;
this.defaultMinIsr = defaultMinIsr;
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
this.configurationControl = configurationControl;
this.createTopicPolicy = createTopicPolicy;
this.featureControl = featureControl;
this.clusterControl = clusterControl;
this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
this.topicsWithCollisionChars = new TimelineHashMap<>(snapshotRegistry, 0);
this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
this.reassigningTopics = new TimelineHashMap<>(snapshotRegistry, 0);
this.imbalancedPartitions = new TimelineHashSet<>(snapshotRegistry, 0);
this.directoriesToPartitions = new TimelineHashMap<>(snapshotRegistry, 0);
}
public void replay(TopicRecord record) {
Uuid existingUuid = topicsByName.put(record.name(), record.topicId());
if (existingUuid != null) {
// We don't currently support sending a second TopicRecord for the same topic name...
// unless, of course, there is a RemoveTopicRecord in between.
if (existingUuid.equals(record.topicId())) {
throw new RuntimeException("Found duplicate TopicRecord for " + record.name() +
" with topic ID " + record.topicId());
} else {
throw new RuntimeException("Found duplicate TopicRecord for " + record.name() +
" with a different ID than before. Previous ID was " + existingUuid +
" and new ID is " + record.topicId());
}
}
if (Topic.hasCollisionChars(record.name())) {
String normalizedName = Topic.unifyCollisionChars(record.name());
TimelineHashSet<String> topicNames = topicsWithCollisionChars.get(normalizedName);
if (topicNames == null) {
topicNames = new TimelineHashSet<>(snapshotRegistry, 1);
topicsWithCollisionChars.put(normalizedName, topicNames);
}
topicNames.add(record.name());
}
topics.put(record.topicId(),
new TopicControlInfo(record.name(), snapshotRegistry, record.topicId()));
log.info("Replayed TopicRecord for topic {} with topic ID {}.", record.name(), record.topicId());
}
public void replay(PartitionRecord record) {
TopicControlInfo topicInfo = topics.get(record.topicId());
if (topicInfo == null) {
throw new RuntimeException("Tried to create partition " + record.topicId() +
":" + record.partitionId() + ", but no topic with that ID was found.");
}
PartitionRegistration newPartInfo = new PartitionRegistration(record);
PartitionRegistration prevPartInfo = topicInfo.parts.get(record.partitionId());
String description = topicInfo.name + "-" + record.partitionId() +
" with topic ID " + record.topicId();
if (prevPartInfo == null) {
log.info("Replayed PartitionRecord for new partition {} and {}.", description,
newPartInfo);
topicInfo.parts.put(record.partitionId(), newPartInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(), null,
newPartInfo.isr, NO_LEADER, newPartInfo.leader);
updatePartitionDirectories(record.topicId(), record.partitionId(), null, newPartInfo.directories);
updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
false, isReassignmentInProgress(newPartInfo));
} else if (!newPartInfo.equals(prevPartInfo)) {
log.info("Replayed PartitionRecord for existing partition {} and {}.", description,
newPartInfo);
newPartInfo.maybeLogPartitionChange(log, description, prevPartInfo);
topicInfo.parts.put(record.partitionId(), newPartInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr,
newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader);
updatePartitionDirectories(record.topicId(), record.partitionId(), prevPartInfo.directories, newPartInfo.directories);
updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
isReassignmentInProgress(prevPartInfo), isReassignmentInProgress(newPartInfo));
}
if (newPartInfo.hasPreferredLeader()) {
imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
} else {
imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
}
}
private void updateReassigningTopicsIfNeeded(Uuid topicId, int partitionId,
boolean wasReassigning, boolean isReassigning) {
if (!wasReassigning) {
if (isReassigning) {
int[] prevReassigningParts = reassigningTopics.getOrDefault(topicId, Replicas.NONE);
reassigningTopics.put(topicId, Replicas.copyWith(prevReassigningParts, partitionId));
}
} else if (!isReassigning) {
int[] prevReassigningParts = reassigningTopics.getOrDefault(topicId, Replicas.NONE);
int[] newReassigningParts = Replicas.copyWithout(prevReassigningParts, partitionId);
if (newReassigningParts.length == 0) {
reassigningTopics.remove(topicId);
} else {
reassigningTopics.put(topicId, newReassigningParts);
}
}
}
public void replay(PartitionChangeRecord record) {
TopicControlInfo topicInfo = topics.get(record.topicId());
if (topicInfo == null) {
throw new RuntimeException("Tried to create partition " + record.topicId() +
":" + record.partitionId() + ", but no topic with that ID was found.");
}
PartitionRegistration prevPartitionInfo = topicInfo.parts.get(record.partitionId());
if (prevPartitionInfo == null) {
throw new RuntimeException("Tried to create partition " + record.topicId() +
":" + record.partitionId() + ", but no partition with that id was found.");
}
PartitionRegistration newPartitionInfo = prevPartitionInfo.merge(record);
updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
isReassignmentInProgress(prevPartitionInfo), isReassignmentInProgress(newPartitionInfo));
topicInfo.parts.put(record.partitionId(), newPartitionInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(),
prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.leader,
newPartitionInfo.leader);
updatePartitionDirectories(record.topicId(), record.partitionId(), prevPartitionInfo.directories, newPartitionInfo.directories);
String topicPart = topicInfo.name + "-" + record.partitionId() + " with topic ID " +
record.topicId();
newPartitionInfo.maybeLogPartitionChange(log, topicPart, prevPartitionInfo);
if (newPartitionInfo.hasPreferredLeader()) {
imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), record.partitionId()));
} else {
imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
}
if (record.removingReplicas() != null || record.addingReplicas() != null) {
log.info("Replayed partition assignment change {} for topic {}", record, topicInfo.name);
} else if (log.isDebugEnabled()) {
log.debug("Replayed partition change {} for topic {}", record, topicInfo.name);
}
}
public void replay(RemoveTopicRecord record) {
// Remove this topic from the topics map and the topicsByName map.
TopicControlInfo topic = topics.remove(record.topicId());
if (topic == null) {
throw new UnknownTopicIdException("Can't find topic with ID " + record.topicId() +
" to remove.");
}
topicsByName.remove(topic.name);
if (Topic.hasCollisionChars(topic.name)) {
String normalizedName = Topic.unifyCollisionChars(topic.name);
TimelineHashSet<String> colliding = topicsWithCollisionChars.get(normalizedName);
if (colliding != null) {
colliding.remove(topic.name);
if (colliding.isEmpty()) {
topicsWithCollisionChars.remove(normalizedName);
}
}
}
reassigningTopics.remove(record.topicId());
// Delete the configurations associated with this topic.
configurationControl.deleteTopicConfigs(topic.name);
for (Map.Entry<Integer, PartitionRegistration> entry : topic.parts.entrySet()) {
int partitionId = entry.getKey();
PartitionRegistration partition = entry.getValue();
// Remove the entries for this topic in brokersToIsrs.
for (int i = 0; i < partition.isr.length; i++) {
brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
updatePartitionDirectories(topic.id, partitionId, partition.directories, null);
}
imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), partitionId));
}
brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);
log.info("Replayed RemoveTopicRecord for topic {} with ID {}.", topic.name, record.topicId());
}
ControllerResult<CreateTopicsResponseData> createTopics(
ControllerRequestContext context,
CreateTopicsRequestData request,
Set<String> describable
) {
Map<String, ApiError> topicErrors = new HashMap<>();
List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
// Check the topic names.
validateNewTopicNames(topicErrors, request.topics(), topicsWithCollisionChars);
// Identify topics that already exist and mark them with the appropriate error
request.topics().stream().filter(creatableTopic -> topicsByName.containsKey(creatableTopic.name()))
.forEach(t -> topicErrors.put(t.name(), new ApiError(Errors.TOPIC_ALREADY_EXISTS,
"Topic '" + t.name() + "' already exists.")));
// Verify that the configurations for the new topics are OK, and figure out what
// configurations should be created.
Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges =
computeConfigChanges(topicErrors, request.topics());
// Try to create whatever topics are needed.
Map<String, CreatableTopicResult> successes = new HashMap<>();
for (CreatableTopic topic : request.topics()) {
if (topicErrors.containsKey(topic.name())) continue;
// Figure out what ConfigRecords should be created, if any.
ConfigResource configResource = new ConfigResource(TOPIC, topic.name());
Map<String, Entry<OpType, String>> keyToOps = configChanges.get(configResource);
List<ApiMessageAndVersion> configRecords;
if (keyToOps != null) {
ControllerResult<ApiError> configResult =
configurationControl.incrementalAlterConfig(configResource, keyToOps, true);
if (configResult.response().isFailure()) {
topicErrors.put(topic.name(), configResult.response());
continue;
} else {
configRecords = configResult.records();
}
} else {
configRecords = Collections.emptyList();
}
ApiError error;
try {
error = createTopic(context, topic, records, successes, configRecords, describable.contains(topic.name()));
} catch (ApiException e) {
error = ApiError.fromThrowable(e);
}
if (error.isFailure()) {
topicErrors.put(topic.name(), error);
}
}
// Create responses for all topics.
CreateTopicsResponseData data = new CreateTopicsResponseData();
StringBuilder resultsBuilder = new StringBuilder();
String resultsPrefix = "";
for (CreatableTopic topic : request.topics()) {
ApiError error = topicErrors.get(topic.name());
if (error != null) {
data.topics().add(new CreatableTopicResult().
setName(topic.name()).
setErrorCode(error.error().code()).
setErrorMessage(error.message()));
resultsBuilder.append(resultsPrefix).append(topic).append(": ").
append(error.error()).append(" (").append(error.message()).append(")");
resultsPrefix = ", ";
continue;
}
CreatableTopicResult result = successes.get(topic.name());
data.topics().add(result);
resultsBuilder.append(resultsPrefix).append(topic).append(": ").
append("SUCCESS");
resultsPrefix = ", ";
}
if (request.validateOnly()) {
log.info("Validate-only CreateTopics result(s): {}", resultsBuilder);
return ControllerResult.atomicOf(Collections.emptyList(), data);
} else {
log.info("CreateTopics result(s): {}", resultsBuilder);
return ControllerResult.atomicOf(records, data);
}
}
private ApiError createTopic(ControllerRequestContext context,
CreatableTopic topic,
List<ApiMessageAndVersion> records,
Map<String, CreatableTopicResult> successes,
List<ApiMessageAndVersion> configRecords,
boolean authorizedToReturnConfigs) {
Map<String, String> creationConfigs = translateCreationConfigs(topic.configs());
Map<Integer, PartitionRegistration> newParts = new HashMap<>();
if (!topic.assignments().isEmpty()) {
if (topic.replicationFactor() != -1) {
return new ApiError(INVALID_REQUEST,
"A manual partition assignment was specified, but replication " +
"factor was not set to -1.");
}
if (topic.numPartitions() != -1) {
return new ApiError(INVALID_REQUEST,
"A manual partition assignment was specified, but numPartitions " +
"was not set to -1.");
}
OptionalInt replicationFactor = OptionalInt.empty();
for (CreatableReplicaAssignment assignment : topic.assignments()) {
if (newParts.containsKey(assignment.partitionIndex())) {
return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
"Found multiple manual partition assignments for partition " +
assignment.partitionIndex());
}
PartitionAssignment partitionAssignment = new PartitionAssignment(assignment.brokerIds(), clusterDescriber);
validateManualPartitionAssignment(partitionAssignment, replicationFactor);
replicationFactor = OptionalInt.of(assignment.brokerIds().size());
List<Integer> isr = assignment.brokerIds().stream().
filter(clusterControl::isActive).collect(Collectors.toList());
if (isr.isEmpty()) {
return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
"All brokers specified in the manual partition assignment for " +
"partition " + assignment.partitionIndex() + " are fenced or in controlled shutdown.");
}
newParts.put(
assignment.partitionIndex(),
buildPartitionRegistration(partitionAssignment, isr)
);
}
for (int i = 0; i < newParts.size(); i++) {
if (!newParts.containsKey(i)) {
return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
"partitions should be a consecutive 0-based integer sequence");
}
}
ApiError error = maybeCheckCreateTopicPolicy(() -> {
Map<Integer, List<Integer>> assignments = new HashMap<>();
newParts.entrySet().forEach(e -> assignments.put(e.getKey(),
Replicas.toList(e.getValue().replicas)));
return new CreateTopicPolicy.RequestMetadata(
topic.name(), null, null, assignments, creationConfigs);
});
if (error.isFailure()) return error;
} else if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
"Replication factor must be larger than 0, or -1 to use the default value.");
} else if (topic.numPartitions() < -1 || topic.numPartitions() == 0) {
return new ApiError(Errors.INVALID_PARTITIONS,
"Number of partitions was set to an invalid non-positive value.");
} else {
int numPartitions = topic.numPartitions() == -1 ?
defaultNumPartitions : topic.numPartitions();
short replicationFactor = topic.replicationFactor() == -1 ?
defaultReplicationFactor : topic.replicationFactor();
try {
TopicAssignment topicAssignment = clusterControl.replicaPlacer().place(new PlacementSpec(
0,
numPartitions,
replicationFactor
), clusterDescriber);
for (int partitionId = 0; partitionId < topicAssignment.assignments().size(); partitionId++) {
PartitionAssignment partitionAssignment = topicAssignment.assignments().get(partitionId);
List<Integer> isr = partitionAssignment.replicas().stream().
filter(clusterControl::isActive).collect(Collectors.toList());
// If the ISR is empty, it means that all brokers are fenced or
// in controlled shutdown. To be consistent with the replica placer,
// we reject the create topic request with INVALID_REPLICATION_FACTOR.
if (isr.isEmpty()) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
"Unable to replicate the partition " + replicationFactor +
" time(s): All brokers are currently fenced or in controlled shutdown.");
}
newParts.put(
partitionId,
buildPartitionRegistration(partitionAssignment, isr)
);
}
} catch (InvalidReplicationFactorException e) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
"Unable to replicate the partition " + replicationFactor +
" time(s): " + e.getMessage());
}
ApiError error = maybeCheckCreateTopicPolicy(() -> new CreateTopicPolicy.RequestMetadata(
topic.name(), numPartitions, replicationFactor, null, creationConfigs));
if (error.isFailure()) return error;
}
int numPartitions = newParts.size();
try {
context.applyPartitionChangeQuota(numPartitions); // check controller mutation quota
} catch (ThrottlingQuotaExceededException e) {
log.debug("Topic creation of {} partitions not allowed because quota is violated. Delay time: {}",
numPartitions, e.throttleTimeMs());
return ApiError.fromThrowable(e);
}
Uuid topicId = Uuid.randomUuid();
CreatableTopicResult result = new CreatableTopicResult().
setName(topic.name()).
setTopicId(topicId).
setErrorCode(NONE.code()).
setErrorMessage(null);
if (authorizedToReturnConfigs) {
Map<String, ConfigEntry> effectiveConfig = configurationControl.
computeEffectiveTopicConfigs(creationConfigs);
List<String> configNames = new ArrayList<>(effectiveConfig.keySet());
configNames.sort(String::compareTo);
for (String configName : configNames) {
ConfigEntry entry = effectiveConfig.get(configName);
result.configs().add(new CreateTopicsResponseData.CreatableTopicConfigs().
setName(entry.name()).
setValue(entry.isSensitive() ? null : entry.value()).
setReadOnly(entry.isReadOnly()).
setConfigSource(KafkaConfigSchema.translateConfigSource(entry.source()).id()).
setIsSensitive(entry.isSensitive()));
}
result.setNumPartitions(numPartitions);
result.setReplicationFactor((short) newParts.values().iterator().next().replicas.length);
result.setTopicConfigErrorCode(NONE.code());
} else {
result.setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code());
}
successes.put(topic.name(), result);
records.add(new ApiMessageAndVersion(new TopicRecord().
setName(topic.name()).
setTopicId(topicId), (short) 0));
// ConfigRecords go after TopicRecord but before PartitionRecord(s).
records.addAll(configRecords);
for (Entry<Integer, PartitionRegistration> partEntry : newParts.entrySet()) {
int partitionIndex = partEntry.getKey();
PartitionRegistration info = partEntry.getValue();
records.add(info.toRecord(topicId, partitionIndex, new ImageWriterOptions.Builder().
setMetadataVersion(featureControl.metadataVersion()).
build()));
}
return ApiError.NONE;
}
private static PartitionRegistration buildPartitionRegistration(
PartitionAssignment partitionAssignment,
List<Integer> isr
) {
return new PartitionRegistration.Builder().
setReplicas(Replicas.toArray(partitionAssignment.replicas())).
setDirectories(Uuid.toArray(partitionAssignment.directories())).
setIsr(Replicas.toArray(isr)).
setLeader(isr.get(0)).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(0).
setPartitionEpoch(0).
build();
}
private ApiError maybeCheckCreateTopicPolicy(Supplier<CreateTopicPolicy.RequestMetadata> supplier) {
if (createTopicPolicy.isPresent()) {
try {
createTopicPolicy.get().validate(supplier.get());
} catch (PolicyViolationException e) {
return new ApiError(Errors.POLICY_VIOLATION, e.getMessage());
}
}
return ApiError.NONE;
}
static void validateNewTopicNames(Map<String, ApiError> topicErrors,
CreatableTopicCollection topics,
Map<String, ? extends Set<String>> topicsWithCollisionChars) {
for (CreatableTopic topic : topics) {
if (topicErrors.containsKey(topic.name())) continue;
try {
Topic.validate(topic.name());
} catch (InvalidTopicException e) {
topicErrors.put(topic.name(),
new ApiError(Errors.INVALID_TOPIC_EXCEPTION, e.getMessage()));
}
if (Topic.hasCollisionChars(topic.name())) {
String normalizedName = Topic.unifyCollisionChars(topic.name());
Set<String> colliding = topicsWithCollisionChars.get(normalizedName);
if (colliding != null) {
topicErrors.put(topic.name(), new ApiError(Errors.INVALID_TOPIC_EXCEPTION,
"Topic '" + topic.name() + "' collides with existing topic: " +
colliding.iterator().next()));
}
}
}
}
static Map<ConfigResource, Map<String, Entry<OpType, String>>>
computeConfigChanges(Map<String, ApiError> topicErrors,
CreatableTopicCollection topics) {
Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges = new HashMap<>();
for (CreatableTopic topic : topics) {
if (topicErrors.containsKey(topic.name())) continue;
Map<String, Entry<OpType, String>> topicConfigs = new HashMap<>();
List<String> nullConfigs = new ArrayList<>();
for (CreateTopicsRequestData.CreateableTopicConfig config : topic.configs()) {
if (config.value() == null) {
nullConfigs.add(config.name());
} else {
topicConfigs.put(config.name(), new SimpleImmutableEntry<>(SET, config.value()));
}
}
if (!nullConfigs.isEmpty()) {
topicErrors.put(topic.name(), new ApiError(Errors.INVALID_CONFIG,
"Null value not supported for topic configs: " + String.join(",", nullConfigs)));
} else if (!topicConfigs.isEmpty()) {
configChanges.put(new ConfigResource(TOPIC, topic.name()), topicConfigs);
}
}
return configChanges;
}
Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) {
Map<String, ResultOrError<Uuid>> results = new HashMap<>(names.size());
for (String name : names) {
if (name == null) {
results.put(null, new ResultOrError<>(INVALID_REQUEST, "Invalid null topic name."));
} else {
Uuid id = topicsByName.get(name, offset);
if (id == null) {
results.put(name, new ResultOrError<>(
new ApiError(UNKNOWN_TOPIC_OR_PARTITION)));
} else {
results.put(name, new ResultOrError<>(id));
}
}
}
return results;
}
Map<String, Uuid> findAllTopicIds(long offset) {
HashMap<String, Uuid> result = new HashMap<>(topicsByName.size(offset));
for (Entry<String, Uuid> entry : topicsByName.entrySet(offset)) {
result.put(entry.getKey(), entry.getValue());
}
return result;
}
Map<Uuid, ResultOrError<String>> findTopicNames(long offset, Collection<Uuid> ids) {
Map<Uuid, ResultOrError<String>> results = new HashMap<>(ids.size());
for (Uuid id : ids) {
if (id == null || id.equals(Uuid.ZERO_UUID)) {
results.put(id, new ResultOrError<>(new ApiError(INVALID_REQUEST,
"Attempt to find topic with invalid topicId " + id)));
} else {
TopicControlInfo topic = topics.get(id, offset);
if (topic == null) {
results.put(id, new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID)));
} else {
results.put(id, new ResultOrError<>(topic.name));
}
}
}
return results;
}
ControllerResult<Map<Uuid, ApiError>> deleteTopics(ControllerRequestContext context, Collection<Uuid> ids) {
Map<Uuid, ApiError> results = new HashMap<>(ids.size());
List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP, ids.size());
for (Uuid id : ids) {
try {
deleteTopic(context, id, records);
results.put(id, ApiError.NONE);
} catch (ApiException e) {
results.put(id, ApiError.fromThrowable(e));
} catch (Exception e) {
log.error("Unexpected deleteTopics error for {}", id, e);
results.put(id, ApiError.fromThrowable(e));
}
}
return ControllerResult.atomicOf(records, results);
}
void deleteTopic(ControllerRequestContext context, Uuid id, List<ApiMessageAndVersion> records) {
TopicControlInfo topic = topics.get(id);
if (topic == null) {
throw new UnknownTopicIdException(UNKNOWN_TOPIC_ID.message());
}
int numPartitions = topic.parts.size();
try {
context.applyPartitionChangeQuota(numPartitions); // check controller mutation quota
} catch (ThrottlingQuotaExceededException e) {
// log a message and rethrow the exception
log.debug("Topic deletion of {} partitions not allowed because quota is violated. Delay time: {}",
numPartitions, e.throttleTimeMs());
throw e;
}
records.add(new ApiMessageAndVersion(new RemoveTopicRecord().
setTopicId(id), (short) 0));
}
// VisibleForTesting
PartitionRegistration getPartition(Uuid topicId, int partitionId) {
TopicControlInfo topic = topics.get(topicId);
if (topic == null) {
return null;
}
return topic.parts.get(partitionId);
}
// VisibleForTesting
TopicControlInfo getTopic(Uuid topicId) {
return topics.get(topicId);
}
Uuid getTopicId(String name) {
return topicsByName.get(name);
}
// VisibleForTesting
BrokersToIsrs brokersToIsrs() {
return brokersToIsrs;
}
// VisibleForTesting
TimelineHashSet<TopicIdPartition> imbalancedPartitions() {
return imbalancedPartitions;
}
boolean isElrEnabled() {
return eligibleLeaderReplicasEnabled && featureControl.metadataVersion().isElrSupported();
}
ControllerResult<AlterPartitionResponseData> alterPartition(
ControllerRequestContext context,
AlterPartitionRequestData request
) {
short requestVersion = context.requestHeader().requestApiVersion();
clusterControl.checkBrokerEpoch(request.brokerId(), request.brokerEpoch());
AlterPartitionResponseData response = new AlterPartitionResponseData();
List<ApiMessageAndVersion> records = new ArrayList<>();
for (AlterPartitionRequestData.TopicData topicData : request.topics()) {
AlterPartitionResponseData.TopicData responseTopicData =
new AlterPartitionResponseData.TopicData().
setTopicName(topicData.topicName()).
setTopicId(topicData.topicId());
response.topics().add(responseTopicData);
Uuid topicId = requestVersion > 1 ? topicData.topicId() : topicsByName.get(topicData.topicName());
if (topicId == null || topicId.equals(Uuid.ZERO_UUID) || !topics.containsKey(topicId)) {
Errors error = requestVersion > 1 ? UNKNOWN_TOPIC_ID : UNKNOWN_TOPIC_OR_PARTITION;
for (AlterPartitionRequestData.PartitionData partitionData : topicData.partitions()) {
responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
setPartitionIndex(partitionData.partitionIndex()).
setErrorCode(error.code()));
}
log.info("Rejecting AlterPartition request for unknown topic ID {} or name {}.",
topicData.topicId(), topicData.topicName());
continue;
}
TopicControlInfo topic = topics.get(topicId);
for (AlterPartitionRequestData.PartitionData partitionData : topicData.partitions()) {
if (requestVersion < 3) {
partitionData.setNewIsrWithEpochs(
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(partitionData.newIsr())
);
}
int partitionId = partitionData.partitionIndex();
PartitionRegistration partition = topic.parts.get(partitionId);
Errors validationError = validateAlterPartitionData(
request.brokerId(),
topic,
partitionId,
partition,
context.requestHeader().requestApiVersion(),
partitionData);
if (validationError != Errors.NONE) {
responseTopicData.partitions().add(
new AlterPartitionResponseData.PartitionData()
.setPartitionIndex(partitionId)
.setErrorCode(validationError.code())
);
continue;
}
PartitionChangeBuilder builder = new PartitionChangeBuilder(
partition,
topic.id,
partitionId,
new LeaderAcceptor(clusterControl, partition),
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topic.name)
)
.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed())
.setEligibleLeaderReplicasEnabled(isElrEnabled());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
Optional<ApiMessageAndVersion> record = builder
.setTargetIsrWithBrokerStates(partitionData.newIsrWithEpochs())
.setTargetLeaderRecoveryState(LeaderRecoveryState.of(partitionData.leaderRecoveryState()))
.setDefaultDirProvider(clusterDescriber)
.build();
if (record.isPresent()) {
records.add(record.get());
PartitionChangeRecord change = (PartitionChangeRecord) record.get().message();
partition = partition.merge(change);
if (log.isDebugEnabled()) {
log.debug("Node {} has altered ISR for {}-{} to {}.",
request.brokerId(), topic.name, partitionId, change.isr());
}
if (change.leader() != request.brokerId() &&
change.leader() != NO_LEADER_CHANGE) {
// Normally, an AlterPartition request, which is made by the partition
// leader itself, is not allowed to modify the partition leader.
// However, if there is an ongoing partition reassignment and the
// ISR change completes it, then the leader may change as part of
// the changes made during reassignment cleanup.
//
// In this case, we report back NEW_LEADER_ELECTED to the leader
// which made the AlterPartition request. This lets it know that it must
// fetch new metadata before trying again. This return code is
// unusual because we both return an error and generate a new
// metadata record. We usually only do one or the other.
// FENCED_LEADER_EPOCH is used for request version below or equal to 1.
Errors error = requestVersion > 1 ? NEW_LEADER_ELECTED : FENCED_LEADER_EPOCH;
log.info("AlterPartition request from node {} for {}-{} completed " +
"the ongoing partition reassignment and triggered a " +
"leadership change. Returning {}.",
request.brokerId(), topic.name, partitionId, error);
responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
setPartitionIndex(partitionId).
setErrorCode(error.code()));
continue;
} else if (isReassignmentInProgress(partition)) {
log.info("AlterPartition request from node {} for {}-{} completed " +
"the ongoing partition reassignment.", request.brokerId(),
topic.name, partitionId);
}
}
/* Setting the LeaderRecoveryState field is always safe because it will always be the
* same as the value set in the request. For version 0, that is always the default
* RECOVERED which is ignored when serializing to version 0. For any other version, the
* LeaderRecoveryState field is supported.
*/
responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData().
setPartitionIndex(partitionId).
setErrorCode(Errors.NONE.code()).
setLeaderId(partition.leader).
setIsr(Replicas.toList(partition.isr)).
setLeaderRecoveryState(partition.leaderRecoveryState.value()).
setLeaderEpoch(partition.leaderEpoch).
setPartitionEpoch(partition.partitionEpoch));
}
}
return ControllerResult.of(records, response);
}
/**
* Validate the partition information included in the alter partition request.
*
* @param brokerId id of the broker requesting the alter partition
* @param topic current topic information store by the replication manager
* @param partitionId partition id being altered
* @param partition current partition registration for the partition being altered
* @param partitionData partition data from the alter partition request
*
* @return Errors.NONE for valid alter partition data; otherwise the validation error
*/
private Errors validateAlterPartitionData(
int brokerId,
TopicControlInfo topic,
int partitionId,
PartitionRegistration partition,
short requestApiVersion,
AlterPartitionRequestData.PartitionData partitionData
) {
if (partition == null) {
log.info("Rejecting AlterPartition request for unknown partition {}-{}.",
topic.name, partitionId);
return UNKNOWN_TOPIC_OR_PARTITION;
}
// If the partition leader has a higher leader/partition epoch, then it is likely
// that this node is no longer the active controller. We return NOT_CONTROLLER in
// this case to give the leader an opportunity to find the new controller.
if (partitionData.leaderEpoch() > partition.leaderEpoch) {
log.debug("Rejecting AlterPartition request from node {} for {}-{} because " +
"the current leader epoch is {}, which is greater than the local value {}.",
brokerId, topic.name, partitionId, partition.leaderEpoch, partitionData.leaderEpoch());
return NOT_CONTROLLER;
}
if (partitionData.partitionEpoch() > partition.partitionEpoch) {
log.debug("Rejecting AlterPartition request from node {} for {}-{} because " +
"the current partition epoch is {}, which is greater than the local value {}.",
brokerId, topic.name, partitionId, partition.partitionEpoch, partitionData.partitionEpoch());
return NOT_CONTROLLER;
}
if (partitionData.leaderEpoch() < partition.leaderEpoch) {
log.debug("Rejecting AlterPartition request from node {} for {}-{} because " +
"the current leader epoch is {}, not {}.", brokerId, topic.name,
partitionId, partition.leaderEpoch, partitionData.leaderEpoch());
return FENCED_LEADER_EPOCH;
}
if (brokerId != partition.leader) {
log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
"the current leader is {}.", brokerId, topic.name,
partitionId, partition.leader);
return INVALID_REQUEST;
}
if (partitionData.partitionEpoch() < partition.partitionEpoch) {
log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
"the current partition epoch is {}, not {}.", brokerId,
topic.name, partitionId, partition.partitionEpoch,
partitionData.partitionEpoch());
return INVALID_UPDATE_VERSION;
}
int[] newIsr = partitionData.newIsrWithEpochs().stream()
.mapToInt(brokerState -> brokerState.brokerId()).toArray();
if (!Replicas.validateIsr(partition.replicas, newIsr)) {
log.error("Rejecting AlterPartition request from node {} for {}-{} because " +
"it specified an invalid ISR {}.", brokerId,
topic.name, partitionId, partitionData.newIsrWithEpochs());
return INVALID_REQUEST;
}
if (!Replicas.contains(newIsr, partition.leader)) {
// The ISR must always include the current leader.
log.error("Rejecting AlterPartition request from node {} for {}-{} because " +
"it specified an invalid ISR {} that doesn't include itself.",
brokerId, topic.name, partitionId, partitionData.newIsrWithEpochs());
return INVALID_REQUEST;
}
LeaderRecoveryState leaderRecoveryState = LeaderRecoveryState.of(partitionData.leaderRecoveryState());
if (leaderRecoveryState == LeaderRecoveryState.RECOVERING && newIsr.length > 1) {
log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
"the ISR {} had more than one replica while the leader was still " +
"recovering from an unclean leader election {}.",
brokerId, topic.name, partitionId, partitionData.newIsrWithEpochs(),
leaderRecoveryState);
return INVALID_REQUEST;
}
if (partition.leaderRecoveryState == LeaderRecoveryState.RECOVERED &&
leaderRecoveryState == LeaderRecoveryState.RECOVERING) {
log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
"the leader recovery state cannot change from RECOVERED to RECOVERING.",
brokerId, topic.name, partitionId);
return INVALID_REQUEST;
}
List<IneligibleReplica> ineligibleReplicas = ineligibleReplicasForIsr(partitionData.newIsrWithEpochs());
if (!ineligibleReplicas.isEmpty()) {
log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
"it specified ineligible replicas {} in the new ISR {}.",
brokerId, topic.name, partitionId, ineligibleReplicas, partitionData.newIsrWithEpochs());
if (requestApiVersion > 1) {
return INELIGIBLE_REPLICA;
} else {
return OPERATION_NOT_ATTEMPTED;
}
}
return Errors.NONE;
}
private List<IneligibleReplica> ineligibleReplicasForIsr(List<BrokerState> brokerStates) {
List<IneligibleReplica> ineligibleReplicas = new ArrayList<>(0);
for (BrokerState brokerState : brokerStates) {
int brokerId = brokerState.brokerId();
BrokerRegistration registration = clusterControl.registration(brokerId);
if (registration == null) {
ineligibleReplicas.add(new IneligibleReplica(brokerId, "not registered"));
} else if (registration.inControlledShutdown()) {
ineligibleReplicas.add(new IneligibleReplica(brokerId, "shutting down"));
} else if (registration.fenced()) {
ineligibleReplicas.add(new IneligibleReplica(brokerId, "fenced"));
} else if (brokerState.brokerEpoch() != -1 && registration.epoch() != brokerState.brokerEpoch()) {
// The given broker epoch should match with the broker epoch in the broker registration, except the
// given broker epoch is -1 which means skipping the broker epoch verification.
ineligibleReplicas.add(new IneligibleReplica(brokerId,
"broker epoch mismatch: requested=" + brokerState.brokerEpoch()
+ " VS expected=" + registration.epoch()));
}
}
return ineligibleReplicas;
}
/**
* Generate the appropriate records to handle a broker being fenced.
*
* First, we remove this broker from any ISR. Then we generate a
* FenceBrokerRecord.
*
* @param brokerId The broker id.
* @param records The record list to append to.
*/
void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) {
BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId);
if (brokerRegistration == null) {
throw new RuntimeException("Can't find broker registration for broker " + brokerId);
}
generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER, records,
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
if (featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
setBrokerId(brokerId).setBrokerEpoch(brokerRegistration.epoch()).
setFenced(BrokerRegistrationFencingChange.FENCE.value()),
(short) 0));
} else {
records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
setId(brokerId).setEpoch(brokerRegistration.epoch()),
(short) 0));
}
}
/**
* Generate the appropriate records to handle a broker being unregistered.
*
* First, we remove this broker from any ISR. Then we generate an
* UnregisterBrokerRecord.
*
* @param brokerId The broker id.
* @param brokerEpoch The broker epoch.
* @param records The record list to append to.
*/
void handleBrokerUnregistered(int brokerId, long brokerEpoch,
List<ApiMessageAndVersion> records) {
generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, NO_LEADER, records,
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord().
setBrokerId(brokerId).setBrokerEpoch(brokerEpoch),
(short) 0));
}
/**
* Generate the appropriate records to handle a broker becoming unfenced.
*
* First, we create an UnfenceBrokerRecord. Then, we check if there are any
* partitions that don't currently have a leader that should be led by the newly
* unfenced broker.
*
* @param brokerId The broker id.
* @param brokerEpoch The broker epoch.
* @param records The record list to append to.
*/
void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
if (featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
setFenced(BrokerRegistrationFencingChange.UNFENCE.value()),
(short) 0));
} else {
records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().setId(brokerId).
setEpoch(brokerEpoch), (short) 0));
}
generateLeaderAndIsrUpdates("handleBrokerUnfenced", NO_LEADER, brokerId, records,
brokersToIsrs.partitionsWithNoLeader());
}
/**
* Generate the appropriate records to handle a broker starting a controlled shutdown.
*
* First, we create an BrokerRegistrationChangeRecord. Then, we remove this broker
* from any ISR and elect new leaders for partitions led by this
* broker.
*
* @param brokerId The broker id.
* @param brokerEpoch The broker epoch.
* @param records The record list to append to.
*/
void handleBrokerInControlledShutdown(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
if (featureControl.metadataVersion().isInControlledShutdownStateSupported()
&& !clusterControl.inControlledShutdown(brokerId)) {
records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()),
(short) 1));
}
generateLeaderAndIsrUpdates("enterControlledShutdown[" + brokerId + "]",
brokerId, NO_LEADER, records, brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
}
/**
* Generates the appropriate records to handle a list of directories being reported offline.
*
* If the reported directories include directories that were previously online, this includes
* a BrokerRegistrationChangeRecord and any number of PartitionChangeRecord to update
* leadership and ISR for partitions in those directories that were previously online.
*
* @param brokerId The broker id.
* @param brokerEpoch The broker epoch.
* @param offlineDirs The list of directories that are offline.
* @param records The record list to append to.
*/
void handleDirectoriesOffline(
int brokerId,
long brokerEpoch,
List<Uuid> offlineDirs,
List<ApiMessageAndVersion> records
) {
BrokerRegistration registration = clusterControl.registration(brokerId);
List<Uuid> newOfflineDirs = registration.directoryIntersection(offlineDirs);
if (!newOfflineDirs.isEmpty()) {
for (Uuid newOfflineDir : newOfflineDirs) {
TimelineHashSet<TopicIdPartition> parts = directoriesToPartitions.get(newOfflineDir);
Iterator<TopicIdPartition> iterator = (parts == null) ?
Collections.emptyIterator() : parts.iterator();
generateLeaderAndIsrUpdates(
"handleDirectoriesOffline[" + brokerId + ":" + newOfflineDir + "]",
brokerId, NO_LEADER, records, iterator);
}
List<Uuid> newOnlineDirs = registration.directoryDifference(offlineDirs);
records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
setLogDirs(newOnlineDirs),
(short) 2));
log.warn("Directories {} in broker {} marked offline, remaining directories: {}",
newOfflineDirs, brokerId, newOnlineDirs);
}
}
ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
ElectionType electionType = electionType(request.electionType());
List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
ElectLeadersResponseData response = new ElectLeadersResponseData();
if (request.topicPartitions() == null) {
// If topicPartitions is null, we try to elect a new leader for every partition. There
// are some obvious issues with this wire protocol. For example, what if we have too
// many partitions to fit the results in a single RPC? This behavior should probably be
// removed from the protocol. For now, however, we have to implement this for
// compatibility with the old controller.
for (Entry<String, Uuid> topicEntry : topicsByName.entrySet()) {
String topicName = topicEntry.getKey();
ReplicaElectionResult topicResults =
new ReplicaElectionResult().setTopic(topicName);
response.replicaElectionResults().add(topicResults);
TopicControlInfo topic = topics.get(topicEntry.getValue());
if (topic != null) {
for (int partitionId : topic.parts.keySet()) {
ApiError error = electLeader(topicName, partitionId, electionType, records);
// When electing leaders for all partitions, we do not return
// partitions which already have the desired leader.
if (error.error() != Errors.ELECTION_NOT_NEEDED) {
topicResults.partitionResult().add(new PartitionResult().
setPartitionId(partitionId).
setErrorCode(error.error().code()).
setErrorMessage(error.message()));
}
}
}
}
} else {
for (TopicPartitions topic : request.topicPartitions()) {
ReplicaElectionResult topicResults =
new ReplicaElectionResult().setTopic(topic.topic());
response.replicaElectionResults().add(topicResults);
for (int partitionId : topic.partitions()) {
ApiError error = electLeader(topic.topic(), partitionId, electionType, records);
topicResults.partitionResult().add(new PartitionResult().
setPartitionId(partitionId).
setErrorCode(error.error().code()).
setErrorMessage(error.message()));
}
}
}
return ControllerResult.of(records, response);
}
private static ElectionType electionType(byte electionType) {
try {
return ElectionType.valueOf(electionType);
} catch (IllegalArgumentException e) {
throw new InvalidRequestException("Unknown election type " + (int) electionType);
}
}
ApiError electLeader(String topic, int partitionId, ElectionType electionType,
List<ApiMessageAndVersion> records) {
Uuid topicId = topicsByName.get(topic);
if (topicId == null) {
return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
"No such topic as " + topic);
}
TopicControlInfo topicInfo = topics.get(topicId);
if (topicInfo == null) {
return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
"No such topic id as " + topicId);
}
PartitionRegistration partition = topicInfo.parts.get(partitionId);
if (partition == null) {
return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
"No such partition as " + topic + "-" + partitionId);
}
if ((electionType == ElectionType.PREFERRED && partition.hasPreferredLeader())
|| (electionType == ElectionType.UNCLEAN && partition.hasLeader())) {
return new ApiError(Errors.ELECTION_NOT_NEEDED);
}
PartitionChangeBuilder.Election election = PartitionChangeBuilder.Election.PREFERRED;
if (electionType == ElectionType.UNCLEAN) {
election = PartitionChangeBuilder.Election.UNCLEAN;
}
Optional<ApiMessageAndVersion> record = new PartitionChangeBuilder(
partition,
topicId,
partitionId,
new LeaderAcceptor(clusterControl, partition),
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topic)
)
.setElection(election)
.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed())
.setEligibleLeaderReplicasEnabled(isElrEnabled())
.setDefaultDirProvider(clusterDescriber)
.build();
if (!record.isPresent()) {
if (electionType == ElectionType.PREFERRED) {
return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE);
} else {
return new ApiError(Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE);
}
}
records.add(record.get());
return ApiError.NONE;
}
ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(
BrokerHeartbeatRequestData request,
long registerBrokerRecordOffset
) {
int brokerId = request.brokerId();
long brokerEpoch = request.brokerEpoch();
clusterControl.checkBrokerEpoch(brokerId, brokerEpoch);
BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager();
BrokerControlStates states = heartbeatManager.calculateNextBrokerState(brokerId,
request, registerBrokerRecordOffset, () -> brokersToIsrs.hasLeaderships(brokerId));
List<ApiMessageAndVersion> records = new ArrayList<>();
if (states.current() != states.next()) {
switch (states.next()) {
case FENCED:
handleBrokerFenced(brokerId, records);
break;
case UNFENCED:
handleBrokerUnfenced(brokerId, brokerEpoch, records);
break;
case CONTROLLED_SHUTDOWN:
handleBrokerInControlledShutdown(brokerId, brokerEpoch, records);
break;
case SHUTDOWN_NOW:
handleBrokerFenced(brokerId, records);
break;
}
}
heartbeatManager.touch(brokerId,
states.next().fenced(),
request.currentMetadataOffset());
if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) {
handleDirectoriesOffline(brokerId, brokerEpoch, request.offlineLogDirs(), records);
}
boolean isCaughtUp = request.currentMetadataOffset() >= registerBrokerRecordOffset;
BrokerHeartbeatReply reply = new BrokerHeartbeatReply(isCaughtUp,
states.next().fenced(),
states.next().inControlledShutdown(),
states.next().shouldShutDown());
return ControllerResult.of(records, reply);
}
/**
* Process a broker heartbeat which has been sitting on the queue for too long, and has
* expired. With default settings, this would happen after 1 second. We process expired
* heartbeats by updating the lastSeenNs of the broker, so that the broker won't get fenced
* incorrectly. However, we don't perform any state changes that we normally would, such as
* unfencing a fenced broker, etc.
*/
void processExpiredBrokerHeartbeat(BrokerHeartbeatRequestData request) {
int brokerId = request.brokerId();
clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
clusterControl.heartbeatManager().touch(brokerId,
clusterControl.brokerRegistrations().get(brokerId).fenced(),
request.currentMetadataOffset());
log.error("processExpiredBrokerHeartbeat: controller event queue overloaded. Timed out " +
"heartbeat from broker {}.", brokerId);
}
public ControllerResult<Void> unregisterBroker(int brokerId) {
BrokerRegistration registration = clusterControl.brokerRegistrations().get(brokerId);
if (registration == null) {
throw new BrokerIdNotRegisteredException("Broker ID " + brokerId +
" is not currently registered");
}
List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
handleBrokerUnregistered(brokerId, registration.epoch(), records);
return ControllerResult.of(records, null);
}
ControllerResult<Void> maybeFenceOneStaleBroker() {
List<ApiMessageAndVersion> records = new ArrayList<>();
BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager();
heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> {
// Even though multiple brokers can go stale at a time, we will process
// fencing one at a time so that the effect of fencing each broker is visible
// to the system prior to processing the next one
log.info("Fencing broker {} because its session has timed out.", brokerId);
handleBrokerFenced(brokerId, records);
heartbeatManager.fence(brokerId);
});
return ControllerResult.of(records, null);
}
boolean arePartitionLeadersImbalanced() {
return !imbalancedPartitions.isEmpty();
}
/**
* Attempt to elect a preferred leader for all topic partitions which have a leader that is not the preferred replica.
*
* The response() method in the return object is true if this method returned without electing all possible preferred replicas.
* The quorum controller should reschedule this operation immediately if it is true.
*
* @return All of the election records and if there may be more available preferred replicas to elect as leader
*/
ControllerResult<Boolean> maybeBalancePartitionLeaders() {
List<ApiMessageAndVersion> records = new ArrayList<>();
boolean rescheduleImmediately = false;
for (TopicIdPartition topicPartition : imbalancedPartitions) {
if (records.size() >= maxElectionsPerImbalance) {
rescheduleImmediately = true;
break;
}
TopicControlInfo topic = topics.get(topicPartition.topicId());
if (topic == null) {
log.error("Skipping unknown imbalanced topic {}", topicPartition);
continue;
}
PartitionRegistration partition = topic.parts.get(topicPartition.partitionId());
if (partition == null) {
log.error("Skipping unknown imbalanced partition {}", topicPartition);
continue;
}
// Attempt to perform a preferred leader election
new PartitionChangeBuilder(
partition,
topicPartition.topicId(),
topicPartition.partitionId(),
new LeaderAcceptor(clusterControl, partition),
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topic.name)
)
.setElection(PartitionChangeBuilder.Election.PREFERRED)
.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed())
.setEligibleLeaderReplicasEnabled(isElrEnabled())
.setDefaultDirProvider(clusterDescriber)
.build().ifPresent(records::add);
}
return ControllerResult.of(records, rescheduleImmediately);
}
ControllerResult<List<CreatePartitionsTopicResult>> createPartitions(
ControllerRequestContext context,
List<CreatePartitionsTopic> topics
) {
List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
List<CreatePartitionsTopicResult> results = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
for (CreatePartitionsTopic topic : topics) {
ApiError apiError = ApiError.NONE;
try {
createPartitions(context, topic, records);
} catch (ApiException e) {
apiError = ApiError.fromThrowable(e);
} catch (Exception e) {
log.error("Unexpected createPartitions error for {}", topic, e);
apiError = ApiError.fromThrowable(e);
}
results.add(new CreatePartitionsTopicResult().
setName(topic.name()).
setErrorCode(apiError.error().code()).
setErrorMessage(apiError.message()));
}
return ControllerResult.atomicOf(records, results);
}
void createPartitions(ControllerRequestContext context,
CreatePartitionsTopic topic,
List<ApiMessageAndVersion> records) {
Uuid topicId = topicsByName.get(topic.name());
if (topicId == null) {
throw new UnknownTopicOrPartitionException();
}
TopicControlInfo topicInfo = topics.get(topicId);
if (topicInfo == null) {
throw new UnknownTopicOrPartitionException();
}
if (topic.count() == topicInfo.parts.size()) {
throw new InvalidPartitionsException("Topic already has " +
topicInfo.parts.size() + " partition(s).");
} else if (topic.count() < topicInfo.parts.size()) {
throw new InvalidPartitionsException("The topic " + topic.name() + " currently " +
"has " + topicInfo.parts.size() + " partition(s); " + topic.count() +
" would not be an increase.");
}
int additional = topic.count() - topicInfo.parts.size();
if (topic.assignments() != null) {
if (topic.assignments().size() != additional) {
throw new InvalidReplicaAssignmentException("Attempted to add " + additional +
" additional partition(s), but only " + topic.assignments().size() +
" assignment(s) were specified.");
}
}
try {
context.applyPartitionChangeQuota(additional); // check controller mutation quota
} catch (ThrottlingQuotaExceededException e) {
// log a message and rethrow the exception
log.debug("Partition creation of {} partitions not allowed because quota is violated. Delay time: {}",
additional, e.throttleTimeMs());
throw e;
}
Iterator<PartitionRegistration> iterator = topicInfo.parts.values().iterator();
if (!iterator.hasNext()) {
throw new UnknownServerException("Invalid state: topic " + topic.name() +
" appears to have no partitions.");
}
PartitionRegistration partitionInfo = iterator.next();
if (partitionInfo.replicas.length > Short.MAX_VALUE) {
throw new UnknownServerException("Invalid replication factor " +
partitionInfo.replicas.length + ": expected a number equal to less than " +
Short.MAX_VALUE);
}
short replicationFactor = (short) partitionInfo.replicas.length;
int startPartitionId = topicInfo.parts.size();
List<PartitionAssignment> partitionAssignments;
List<List<Integer>> isrs;
if (topic.assignments() != null) {
partitionAssignments = new ArrayList<>();
isrs = new ArrayList<>();
for (int i = 0; i < topic.assignments().size(); i++) {
List<Integer> replicas = topic.assignments().get(i).brokerIds();
PartitionAssignment partitionAssignment = new PartitionAssignment(replicas, clusterDescriber);
validateManualPartitionAssignment(partitionAssignment, OptionalInt.of(replicationFactor));
partitionAssignments.add(partitionAssignment);
List<Integer> isr = partitionAssignment.replicas().stream().
filter(clusterControl::isActive).collect(Collectors.toList());
if (isr.isEmpty()) {
throw new InvalidReplicaAssignmentException(
"All brokers specified in the manual partition assignment for " +
"partition " + (startPartitionId + i) + " are fenced or in controlled shutdown.");
}
isrs.add(isr);
}
} else {
partitionAssignments = clusterControl.replicaPlacer().place(
new PlacementSpec(startPartitionId, additional, replicationFactor),
clusterDescriber
).assignments();
isrs = partitionAssignments.stream().map(PartitionAssignment::replicas).collect(Collectors.toList());
}
int partitionId = startPartitionId;
for (int i = 0; i < partitionAssignments.size(); i++) {
PartitionAssignment partitionAssignment = partitionAssignments.get(i);
List<Integer> isr = isrs.get(i).stream().
filter(clusterControl::isActive).collect(Collectors.toList());
// If the ISR is empty, it means that all brokers are fenced or
// in controlled shutdown. To be consistent with the replica placer,
// we reject the create topic request with INVALID_REPLICATION_FACTOR.
if (isr.isEmpty()) {
throw new InvalidReplicationFactorException(
"Unable to replicate the partition " + replicationFactor +
" time(s): All brokers are currently fenced or in controlled shutdown.");
}
records.add(buildPartitionRegistration(partitionAssignment, isr)
.toRecord(topicId, partitionId, new ImageWriterOptions.Builder().
setMetadataVersion(featureControl.metadataVersion()).
build()));
partitionId++;
}
}
void validateManualPartitionAssignment(
PartitionAssignment assignment,
OptionalInt replicationFactor
) {
if (assignment.replicas().isEmpty()) {
throw new InvalidReplicaAssignmentException("The manual partition " +
"assignment includes an empty replica list.");
}
List<Integer> sortedBrokerIds = new ArrayList<>(assignment.replicas());
sortedBrokerIds.sort(Integer::compare);
Integer prevBrokerId = null;
for (Integer brokerId : sortedBrokerIds) {
if (!clusterControl.brokerRegistrations().containsKey(brokerId)) {
throw new InvalidReplicaAssignmentException("The manual partition " +
"assignment includes broker " + brokerId + ", but no such broker is " +
"registered.");
}
if (brokerId.equals(prevBrokerId)) {
throw new InvalidReplicaAssignmentException("The manual partition " +
"assignment includes the broker " + prevBrokerId + " more than " +
"once.");
}
prevBrokerId = brokerId;
}
if (replicationFactor.isPresent() &&
sortedBrokerIds.size() != replicationFactor.getAsInt()) {
throw new InvalidReplicaAssignmentException("The manual partition " +
"assignment includes a partition with " + sortedBrokerIds.size() +
" replica(s), but this is not consistent with previous " +
"partitions, which have " + replicationFactor.getAsInt() + " replica(s).");
}
}
/**
* Iterate over a sequence of partitions and generate ISR changes and/or leader
* changes if necessary.
*
* @param context A human-readable context string used in log4j logging.
* @param brokerToRemove NO_LEADER if no broker is being removed; the ID of the
* broker to remove from the ISR and leadership, otherwise.
* @param brokerToAdd NO_LEADER if no broker is being added; the ID of the
* broker which is now eligible to be a leader, otherwise.
* @param records A list of records which we will append to.
* @param iterator The iterator containing the partitions to examine.
*/
void generateLeaderAndIsrUpdates(String context,
int brokerToRemove,
int brokerToAdd,
List<ApiMessageAndVersion> records,
Iterator<TopicIdPartition> iterator) {
int oldSize = records.size();
// If the caller passed a valid broker ID for brokerToAdd, rather than passing
// NO_LEADER, that node will be considered an acceptable leader even if it is
// currently fenced. This is useful when handling unfencing. The reason is that
// while we're generating the records to handle unfencing, the ClusterControlManager
// still shows the node as fenced.
//
// Similarly, if the caller passed a valid broker ID for brokerToRemove, rather
// than passing NO_LEADER, that node will never be considered an acceptable leader.
// This is useful when handling a newly fenced node. We also exclude brokerToRemove
// from the target ISR, but we need to exclude it here too, to handle the case
// where there is an unclean leader election which chooses a leader from outside
// the ISR.
IntPredicate isAcceptableLeader =
r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.isActive(r));
while (iterator.hasNext()) {
TopicIdPartition topicIdPart = iterator.next();
TopicControlInfo topic = topics.get(topicIdPart.topicId());
if (topic == null) {
throw new RuntimeException("Topic ID " + topicIdPart.topicId() +
" existed in isrMembers, but not in the topics map.");
}
PartitionRegistration partition = topic.parts.get(topicIdPart.partitionId());
if (partition == null) {
throw new RuntimeException("Partition " + topicIdPart +
" existed in isrMembers, but not in the partitions map.");
}
PartitionChangeBuilder builder = new PartitionChangeBuilder(
partition,
topicIdPart.topicId(),
topicIdPart.partitionId(),
new LeaderAcceptor(clusterControl, partition, isAcceptableLeader),
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topic.name)
);
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
// Note: if brokerToRemove was passed as NO_LEADER, this is a no-op (the new
// target ISR will be the same as the old one).
builder.setTargetIsr(Replicas.toList(
Replicas.copyWithout(partition.isr, brokerToRemove)));
builder.setDefaultDirProvider(clusterDescriber)
.build().ifPresent(records::add);
}
if (records.size() != oldSize) {
if (log.isDebugEnabled()) {
StringBuilder bld = new StringBuilder();
String prefix = "";
for (ListIterator<ApiMessageAndVersion> iter = records.listIterator(oldSize);
iter.hasNext(); ) {
ApiMessageAndVersion apiMessageAndVersion = iter.next();
PartitionChangeRecord record = (PartitionChangeRecord) apiMessageAndVersion.message();
bld.append(prefix).append(topics.get(record.topicId()).name).append("-").
append(record.partitionId());
prefix = ", ";
}
log.debug("{}: changing partition(s): {}", context, bld);
} else if (log.isInfoEnabled()) {
log.info("{}: changing {} partition(s)", context, records.size() - oldSize);
}
}
}
ControllerResult<AlterPartitionReassignmentsResponseData>
alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
AlterPartitionReassignmentsResponseData result =
new AlterPartitionReassignmentsResponseData().setErrorMessage(null);
int successfulAlterations = 0, totalAlterations = 0;
for (ReassignableTopic topic : request.topics()) {
ReassignableTopicResponse topicResponse = new ReassignableTopicResponse().
setName(topic.name());
for (ReassignablePartition partition : topic.partitions()) {
ApiError error = ApiError.NONE;
try {
alterPartitionReassignment(topic.name(), partition, records);
successfulAlterations++;
} catch (Throwable e) {
log.info("Unable to alter partition reassignment for " +
topic.name() + ":" + partition.partitionIndex() + " because " +
"of an " + e.getClass().getSimpleName() + " error: " + e.getMessage());
error = ApiError.fromThrowable(e);
}
totalAlterations++;
topicResponse.partitions().add(new ReassignablePartitionResponse().
setPartitionIndex(partition.partitionIndex()).
setErrorCode(error.error().code()).
setErrorMessage(error.message()));
}
result.responses().add(topicResponse);
}
log.info("Successfully altered {} out of {} partition reassignment(s).",
successfulAlterations, totalAlterations);
return ControllerResult.atomicOf(records, result);
}
void alterPartitionReassignment(String topicName,
ReassignablePartition target,
List<ApiMessageAndVersion> records) {
Uuid topicId = topicsByName.get(topicName);
if (topicId == null) {
throw new UnknownTopicOrPartitionException("Unable to find a topic " +
"named " + topicName + ".");
}
TopicControlInfo topicInfo = topics.get(topicId);
if (topicInfo == null) {
throw new UnknownTopicOrPartitionException("Unable to find a topic " +
"with ID " + topicId + ".");
}
TopicIdPartition tp = new TopicIdPartition(topicId, target.partitionIndex());
PartitionRegistration part = topicInfo.parts.get(target.partitionIndex());
if (part == null) {
throw new UnknownTopicOrPartitionException("Unable to find partition " +
topicName + ":" + target.partitionIndex() + ".");
}
Optional<ApiMessageAndVersion> record;
if (target.replicas() == null) {
record = cancelPartitionReassignment(topicName, tp, part);
} else {
record = changePartitionReassignment(tp, part, target);
}
record.ifPresent(records::add);
}
Optional<ApiMessageAndVersion> cancelPartitionReassignment(String topicName,
TopicIdPartition tp,
PartitionRegistration part) {
if (!isReassignmentInProgress(part)) {
throw new NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
}
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(part);
if (revert.unclean()) {
if (!configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
throw new InvalidReplicaAssignmentException("Unable to revert partition " +
"assignment for " + topicName + ":" + tp.partitionId() + " because " +
"it would require an unclean leader election.");
}
}
PartitionChangeBuilder builder = new PartitionChangeBuilder(
part,
tp.topicId(),
tp.partitionId(),
new LeaderAcceptor(clusterControl, part),
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topicName)
);
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
return builder
.setTargetIsr(revert.isr()).
setTargetReplicas(revert.replicas()).
setTargetRemoving(Collections.emptyList()).
setTargetAdding(Collections.emptyList()).
setDefaultDirProvider(clusterDescriber).
build();
}
/**
* Apply a given partition reassignment. In general a partition reassignment goes
* through several stages:
*
* 1. Issue a PartitionChangeRecord adding all the new replicas to the partition's
* main replica list, and setting removingReplicas and addingReplicas.
*
* 2. Wait for the partition to have an ISR that contains all the new replicas. Or
* if there are no new replicas, wait until we have an ISR that contains at least one
* replica that we are not removing.
*
* 3. Issue a second PartitionChangeRecord removing all removingReplicas from the
* partitions' main replica list, and clearing removingReplicas and addingReplicas.
*
* After stage 3, the reassignment is done.
*
* Under some conditions, steps #1 and #2 can be skipped entirely since the ISR is
* already suitable to progress to stage #3. For example, a partition reassignment
* that merely rearranges existing replicas in the list can bypass step #1 and #2 and
* complete immediately.
*
* @param tp The topic id and partition id.
* @param part The existing partition info.
* @param target The target partition info.
*
* @return The ChangePartitionRecord for the new partition assignment,
* or empty if no change is needed.
*/
Optional<ApiMessageAndVersion> changePartitionReassignment(TopicIdPartition tp,
PartitionRegistration part,
ReassignablePartition target) {
// Check that the requested partition assignment is valid.
PartitionAssignment currentAssignment = new PartitionAssignment(Replicas.toList(part.replicas), part::directory);
PartitionAssignment targetAssignment = new PartitionAssignment(target.replicas(), clusterDescriber);
validateManualPartitionAssignment(targetAssignment, OptionalInt.empty());
List<Integer> currentReplicas = Replicas.toList(part.replicas);
PartitionReassignmentReplicas reassignment =
new PartitionReassignmentReplicas(currentAssignment, targetAssignment);
PartitionChangeBuilder builder = new PartitionChangeBuilder(
part,
tp.topicId(),
tp.partitionId(),
new LeaderAcceptor(clusterControl, part),
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topics.get(tp.topicId()).name.toString())
);
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
if (!reassignment.replicas().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.replicas());
}
if (!reassignment.removing().isEmpty()) {
builder.setTargetRemoving(reassignment.removing());
}
if (!reassignment.adding().isEmpty()) {
builder.setTargetAdding(reassignment.adding());
}
return builder.setDefaultDirProvider(clusterDescriber).build();
}
ListPartitionReassignmentsResponseData listPartitionReassignments(
List<ListPartitionReassignmentsTopics> topicList,
long epoch
) {
ListPartitionReassignmentsResponseData response =
new ListPartitionReassignmentsResponseData().setErrorMessage(null);
if (topicList == null) {
// List all reassigning topics.
for (Entry<Uuid, int[]> entry : reassigningTopics.entrySet(epoch)) {
listReassigningTopic(response, entry.getKey(), Replicas.toList(entry.getValue()));
}
} else {
// List the given topics.
for (ListPartitionReassignmentsTopics topic : topicList) {
Uuid topicId = topicsByName.get(topic.name(), epoch);
if (topicId != null) {
listReassigningTopic(response, topicId, topic.partitionIndexes());
}
}
}
return response;
}
ControllerResult<AssignReplicasToDirsResponseData> handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
if (!featureControl.metadataVersion().isDirectoryAssignmentSupported()) {
throw new UnsupportedVersionException("Directory assignment is not supported yet.");
}
int brokerId = request.brokerId();
clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId);
if (brokerRegistration == null) {
throw new BrokerIdNotRegisteredException("Broker ID " + brokerId + " is not currently registered");
}
List<ApiMessageAndVersion> records = new ArrayList<>();
AssignReplicasToDirsResponseData response = new AssignReplicasToDirsResponseData();
Set<TopicIdPartition> leaderAndIsrUpdates = new HashSet<>();
for (AssignReplicasToDirsRequestData.DirectoryData reqDir : request.directories()) {
Uuid dirId = reqDir.id();
boolean directoryIsOffline = !brokerRegistration.hasOnlineDir(dirId);
AssignReplicasToDirsResponseData.DirectoryData resDir = new AssignReplicasToDirsResponseData.DirectoryData().setId(dirId);
for (AssignReplicasToDirsRequestData.TopicData reqTopic : reqDir.topics()) {
Uuid topicId = reqTopic.topicId();
Errors topicError = Errors.NONE;
TopicControlInfo topicInfo = this.topics.get(topicId);
if (topicInfo == null) {
log.warn("AssignReplicasToDirsRequest from broker {} references unknown topic ID {}", brokerId, topicId);
topicError = Errors.UNKNOWN_TOPIC_ID;
}
AssignReplicasToDirsResponseData.TopicData resTopic = new AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId);
for (AssignReplicasToDirsRequestData.PartitionData reqPartition : reqTopic.partitions()) {
int partitionIndex = reqPartition.partitionIndex();
Errors partitionError = topicError;
if (topicError == Errors.NONE) {
String topicName = topicInfo.name;
PartitionRegistration partitionRegistration = topicInfo.parts.get(partitionIndex);
if (partitionRegistration == null) {
log.warn("AssignReplicasToDirsRequest from broker {} references unknown partition {}-{}", brokerId, topicName, partitionIndex);
partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION;
} else if (!Replicas.contains(partitionRegistration.replicas, brokerId)) {
log.warn("AssignReplicasToDirsRequest from broker {} references non assigned partition {}-{}", brokerId, topicName, partitionIndex);
partitionError = Errors.NOT_LEADER_OR_FOLLOWER;
} else {
Optional<ApiMessageAndVersion> partitionChangeRecord = new PartitionChangeBuilder(
partitionRegistration,
topicId,
partitionIndex,
new LeaderAcceptor(clusterControl, partitionRegistration),
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topicName)
)
.setDirectory(brokerId, dirId)
.setDefaultDirProvider(clusterDescriber)
.build();
partitionChangeRecord.ifPresent(records::add);
if (directoryIsOffline) {
leaderAndIsrUpdates.add(new TopicIdPartition(topicId, partitionIndex));
}
if (log.isDebugEnabled()) {
log.debug("Broker {} assigned partition {}:{} to {} dir {}",
brokerId, topics.get(topicId).name(), partitionIndex,
directoryIsOffline ? "OFFLINE" : "ONLINE", dirId);
}
}
}
resTopic.partitions().add(new AssignReplicasToDirsResponseData.PartitionData().
setPartitionIndex(partitionIndex).
setErrorCode(partitionError.code()));
}
resDir.topics().add(resTopic);
}
response.directories().add(resDir);
}
if (!leaderAndIsrUpdates.isEmpty()) {
generateLeaderAndIsrUpdates("offline-dir-assignment", brokerId, NO_LEADER, records, leaderAndIsrUpdates.iterator());
}
return ControllerResult.of(records, response);
}
private void listReassigningTopic(ListPartitionReassignmentsResponseData response,
Uuid topicId,
List<Integer> partitionIds) {
TopicControlInfo topicInfo = topics.get(topicId);
if (topicInfo == null) return;
OngoingTopicReassignment ongoingTopic = new OngoingTopicReassignment().
setName(topicInfo.name);
for (int partitionId : partitionIds) {
Optional<OngoingPartitionReassignment> ongoing =
getOngoingPartitionReassignment(topicInfo, partitionId);
if (ongoing.isPresent()) {
ongoingTopic.partitions().add(ongoing.get());
}
}
if (!ongoingTopic.partitions().isEmpty()) {
response.topics().add(ongoingTopic);
}
}
private Optional<OngoingPartitionReassignment>
getOngoingPartitionReassignment(TopicControlInfo topicInfo, int partitionId) {
PartitionRegistration partition = topicInfo.parts.get(partitionId);
if (partition == null || !isReassignmentInProgress(partition)) {
return Optional.empty();
}
return Optional.of(new OngoingPartitionReassignment().
setAddingReplicas(Replicas.toList(partition.addingReplicas)).
setRemovingReplicas(Replicas.toList(partition.removingReplicas)).
setPartitionIndex(partitionId).
setReplicas(Replicas.toList(partition.replicas)));
}
// Visible to test.
int getTopicEffectiveMinIsr(String topicName) {
int currentMinIsr = defaultMinIsr;
String minIsrConfig = configurationControl.getTopicConfig(topicName, MIN_IN_SYNC_REPLICAS_CONFIG);
if (minIsrConfig != null) {
currentMinIsr = Integer.parseInt(minIsrConfig);
} else {
log.warn("Can't find the min isr config for topic: " + topicName + " using default value " + defaultMinIsr);
}
int replicationFactor = defaultReplicationFactor;
try {
Uuid topicId = topicsByName.get(topicName);
replicationFactor = topics.get(topicId).parts.get(0).replicas.length;
} catch (Exception e) {
log.warn("Can't find the replication factor for topic: " + topicName + " using default value " + replicationFactor + ". Error=" + e);
}
return Math.min(currentMinIsr, replicationFactor);
}
/**
* Updates the directory to partition mapping for a single partition.
* Assignments to reserved directory IDs are ignored, since they cannot
* be used for directories, there's no use in maintaining a set of
* partitions assigned to them.
*/
private void updatePartitionDirectories(
Uuid topicId,
int partitionId,
Uuid[] previousDirectoryIds,
Uuid[] newDirectoryIds
) {
Objects.requireNonNull(topicId, "topicId cannot be null");
TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partitionId);
if (previousDirectoryIds != null) {
for (Uuid dir : previousDirectoryIds) {
if (!DirectoryId.reserved(dir)) {
TimelineHashSet<TopicIdPartition> partitions = directoriesToPartitions.get(dir);
if (partitions != null) {
partitions.remove(topicIdPartition);
if (partitions.isEmpty()) {
directoriesToPartitions.remove(dir);
}
}
}
}
}
if (newDirectoryIds != null) {
for (Uuid dir : newDirectoryIds) {
if (!DirectoryId.reserved(dir)) {
Set<TopicIdPartition> partitions = directoriesToPartitions.computeIfAbsent(dir,
__ -> new TimelineHashSet<>(snapshotRegistry, 0));
partitions.add(topicIdPartition);
}
}
}
}
private static final class IneligibleReplica {
private final int replicaId;
private final String reason;
private IneligibleReplica(int replicaId, String reason) {
this.replicaId = replicaId;
this.reason = reason;
}
@Override
public String toString() {
return replicaId + " (" + reason + ")";
}
}
private static final class LeaderAcceptor implements IntPredicate {
private final ClusterControlManager clusterControl;
private final PartitionRegistration partition;
private final IntPredicate isAcceptableLeader;
private LeaderAcceptor(ClusterControlManager clusterControl, PartitionRegistration partition) {
this(clusterControl, partition, clusterControl::isActive);
}
private LeaderAcceptor(ClusterControlManager clusterControl, PartitionRegistration partition, IntPredicate isAcceptableLeader) {
this.clusterControl = clusterControl;
this.partition = partition;
this.isAcceptableLeader = isAcceptableLeader;
}
@Override
public boolean test(int brokerId) {
if (!isAcceptableLeader.test(brokerId)) {
return false;
}
Uuid replicaDirectory = partition.directory(brokerId);
return clusterControl.hasOnlineDir(brokerId, replicaDirectory);
}
}
}