blob: b5cbc3866581162d1ac2477770f424bc28e00989 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.controller;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineObject;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Consumer;
import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD;
import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
public class FeatureControlManager {
public static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
private QuorumFeatures quorumFeatures = null;
private MetadataVersion metadataVersion = MetadataVersion.latestProduction();
private MetadataVersion minimumBootstrapVersion = MetadataVersion.MINIMUM_BOOTSTRAP_VERSION;
private ClusterFeatureSupportDescriber clusterSupportDescriber = new ClusterFeatureSupportDescriber() {
@Override
public Iterator<Entry<Integer, Map<String, VersionRange>>> brokerSupported() {
return Collections.emptyIterator();
}
@Override
public Iterator<Entry<Integer, Map<String, VersionRange>>> controllerSupported() {
return Collections.emptyIterator();
}
};
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
return this;
}
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
return this;
}
Builder setQuorumFeatures(QuorumFeatures quorumFeatures) {
this.quorumFeatures = quorumFeatures;
return this;
}
Builder setMetadataVersion(MetadataVersion metadataVersion) {
this.metadataVersion = metadataVersion;
return this;
}
Builder setMinimumBootstrapVersion(MetadataVersion minimumBootstrapVersion) {
this.minimumBootstrapVersion = minimumBootstrapVersion;
return this;
}
Builder setClusterFeatureSupportDescriber(ClusterFeatureSupportDescriber clusterSupportDescriber) {
this.clusterSupportDescriber = clusterSupportDescriber;
return this;
}
public FeatureControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
if (quorumFeatures == null) {
Map<String, VersionRange> localSupportedFeatures = new HashMap<>();
localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
MetadataVersion.latestProduction().featureLevel()));
quorumFeatures = new QuorumFeatures(0, localSupportedFeatures, Collections.singletonList(0));
}
return new FeatureControlManager(
logContext,
quorumFeatures,
snapshotRegistry,
metadataVersion,
minimumBootstrapVersion,
clusterSupportDescriber
);
}
}
private final Logger log;
/**
* An immutable map containing the features supported by this controller's software.
*/
private final QuorumFeatures quorumFeatures;
/**
* Maps feature names to finalized version ranges.
*/
private final TimelineHashMap<String, Short> finalizedVersions;
/**
* The current metadata version
*/
private final TimelineObject<MetadataVersion> metadataVersion;
/**
* The minimum bootstrap version that we can't downgrade before.
*/
private final MetadataVersion minimumBootstrapVersion;
/**
* Gives information about the supported versions in the cluster.
*/
private final ClusterFeatureSupportDescriber clusterSupportDescriber;
private FeatureControlManager(
LogContext logContext,
QuorumFeatures quorumFeatures,
SnapshotRegistry snapshotRegistry,
MetadataVersion metadataVersion,
MetadataVersion minimumBootstrapVersion,
ClusterFeatureSupportDescriber clusterSupportDescriber
) {
this.log = logContext.logger(FeatureControlManager.class);
this.quorumFeatures = quorumFeatures;
this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
this.metadataVersion = new TimelineObject<>(snapshotRegistry, metadataVersion);
this.minimumBootstrapVersion = minimumBootstrapVersion;
this.clusterSupportDescriber = clusterSupportDescriber;
}
ControllerResult<ApiError> updateFeatures(
Map<String, Short> updates,
Map<String, FeatureUpdate.UpgradeType> upgradeTypes,
boolean validateOnly
) {
List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
Map<String, Short> proposedUpdatedVersions = new HashMap<>(finalizedVersions);
proposedUpdatedVersions.put(MetadataVersion.FEATURE_NAME, metadataVersion.get().featureLevel());
proposedUpdatedVersions.putAll(updates);
for (Entry<String, Short> entry : updates.entrySet()) {
ApiError error = updateFeature(entry.getKey(), entry.getValue(),
upgradeTypes.getOrDefault(entry.getKey(), FeatureUpdate.UpgradeType.UPGRADE), records, proposedUpdatedVersions);
if (!error.error().equals(Errors.NONE)) {
return ControllerResult.of(Collections.emptyList(), error);
}
}
if (validateOnly) {
return ControllerResult.of(Collections.emptyList(), ApiError.NONE);
} else {
return ControllerResult.atomicOf(records, ApiError.NONE);
}
}
MetadataVersion metadataVersion() {
return metadataVersion.get();
}
private ApiError updateFeature(
String featureName,
short newVersion,
FeatureUpdate.UpgradeType upgradeType,
List<ApiMessageAndVersion> records,
Map<String, Short> proposedUpdatedVersions
) {
if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
return invalidUpdateVersion(featureName, newVersion,
"The controller does not support the given upgrade type.");
}
final short currentVersion;
if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
currentVersion = metadataVersion.get().featureLevel();
} else {
currentVersion = finalizedVersions.getOrDefault(featureName, (short) 0);
}
if (newVersion < 0) {
return invalidUpdateVersion(featureName, newVersion,
"A feature version cannot be less than 0.");
}
Optional<String> reasonNotSupported = reasonNotSupported(featureName, newVersion);
if (reasonNotSupported.isPresent()) {
return invalidUpdateVersion(featureName, newVersion, reasonNotSupported.get());
}
if (newVersion < currentVersion) {
if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) {
return invalidUpdateVersion(featureName, newVersion,
"Can't downgrade the version of this feature without setting the " +
"upgrade type to either safe or unsafe downgrade.");
}
} else if (newVersion > currentVersion) {
if (!upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) {
return invalidUpdateVersion(featureName, newVersion, "Can't downgrade to a newer version.");
}
}
if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
// Perform additional checks if we're updating metadata.version
return updateMetadataVersion(newVersion, upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add);
} else {
// Validate dependencies for features that are not metadata.version
try {
Feature.validateVersion(
// Allow unstable feature versions is true because the version range is already checked above.
Feature.featureFromName(featureName).fromFeatureLevel(newVersion, true),
proposedUpdatedVersions);
} catch (IllegalArgumentException e) {
return invalidUpdateVersion(featureName, newVersion, e.getMessage());
}
records.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(featureName).
setFeatureLevel(newVersion), (short) 0));
return ApiError.NONE;
}
}
private Optional<String> reasonNotSupported(
String featureName,
short newVersion
) {
int numBrokersChecked = 0;
int numControllersChecked = 0;
Optional<String> reason = quorumFeatures.reasonNotLocallySupported(featureName, newVersion);
if (reason.isPresent()) return reason;
numControllersChecked++;
for (Iterator<Entry<Integer, Map<String, VersionRange>>> iter =
clusterSupportDescriber.brokerSupported();
iter.hasNext(); ) {
Entry<Integer, Map<String, VersionRange>> entry = iter.next();
reason = QuorumFeatures.reasonNotSupported(newVersion,
"Broker " + entry.getKey(),
entry.getValue().getOrDefault(featureName, QuorumFeatures.DISABLED));
if (reason.isPresent()) return reason;
numBrokersChecked++;
}
String registrationSuffix = "";
HashSet<Integer> foundControllers = new HashSet<>();
foundControllers.add(quorumFeatures.nodeId());
if (metadataVersion.get().isControllerRegistrationSupported()) {
for (Iterator<Entry<Integer, Map<String, VersionRange>>> iter =
clusterSupportDescriber.controllerSupported();
iter.hasNext(); ) {
Entry<Integer, Map<String, VersionRange>> entry = iter.next();
if (entry.getKey() == quorumFeatures.nodeId()) {
// No need to re-check the features supported by this controller, since we
// already checked that above.
continue;
}
reason = QuorumFeatures.reasonNotSupported(newVersion,
"Controller " + entry.getKey(),
entry.getValue().getOrDefault(featureName, QuorumFeatures.DISABLED));
if (reason.isPresent()) return reason;
foundControllers.add(entry.getKey());
numControllersChecked++;
}
for (int id : quorumFeatures.quorumNodeIds()) {
if (!foundControllers.contains(id)) {
return Optional.of("controller " + id + " has not registered, and may not " +
"support this feature");
}
}
} else {
registrationSuffix = " Note: unable to verify controller support in the current " +
"MetadataVersion.";
}
log.info("Verified that {} broker(s) and {} controller(s) supported changing {} to " +
"feature level {}.{}", numBrokersChecked, numControllersChecked, featureName,
newVersion, registrationSuffix);
return Optional.empty();
}
private ApiError invalidUpdateVersion(String feature, short version, String message) {
String errorMessage = String.format("Invalid update version %d for feature %s. %s", version, feature, message);
log.warn(errorMessage);
return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
}
/**
* Perform some additional validation for metadata.version updates.
*/
private ApiError updateMetadataVersion(
short newVersionLevel,
boolean allowUnsafeDowngrade,
Consumer<ApiMessageAndVersion> recordConsumer
) {
MetadataVersion currentVersion = metadataVersion();
final MetadataVersion newVersion;
try {
newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
} catch (IllegalArgumentException e) {
return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version.");
}
// We cannot set a version earlier than IBP_3_3_IV0, since that was the first version that contained
// FeatureLevelRecord itself.
if (newVersion.isLessThan(minimumBootstrapVersion)) {
return invalidMetadataVersion(newVersionLevel, "Unable to set a metadata.version less than " +
minimumBootstrapVersion);
}
if (newVersion.isLessThan(currentVersion)) {
// This is a downgrade
boolean metadataChanged = MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion);
if (!metadataChanged) {
log.warn("Downgrading metadata.version from {} to {}.", currentVersion, newVersion);
} else if (allowUnsafeDowngrade) {
return invalidMetadataVersion(newVersionLevel, "Unsafe metadata downgrade is not supported " +
"in this version.");
} else {
// The phrase "Retry using UNSAFE_DOWNGRADE if you want to force the downgrade to proceed." has been removed
// because unsafe metadata downgrades are not yet supported. We can add it back when implemented (KAFKA-13896).
return invalidMetadataVersion(newVersionLevel, "Refusing to perform the requested " +
"downgrade because it might delete metadata information.");
}
} else {
log.warn("Upgrading metadata.version from {} to {}.", currentVersion, newVersion);
}
recordConsumer.accept(new ApiMessageAndVersion(
new FeatureLevelRecord()
.setName(MetadataVersion.FEATURE_NAME)
.setFeatureLevel(newVersionLevel), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
return ApiError.NONE;
}
private ApiError invalidMetadataVersion(short version, String message) {
String errorMessage = String.format("Invalid metadata.version %d. %s", version, message);
log.warn(errorMessage);
return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
}
FinalizedControllerFeatures finalizedFeatures(long epoch) {
Map<String, Short> features = new HashMap<>();
features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get(epoch).featureLevel());
for (Entry<String, Short> entry : finalizedVersions.entrySet(epoch)) {
features.put(entry.getKey(), entry.getValue());
}
return new FinalizedControllerFeatures(features, epoch);
}
FinalizedControllerFeatures latestFinalizedFeatures() {
Map<String, Short> features = new HashMap<>();
features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get().featureLevel());
for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
features.put(entry.getKey(), entry.getValue());
}
return new FinalizedControllerFeatures(features, -1);
}
public void replay(FeatureLevelRecord record) {
VersionRange range = quorumFeatures.localSupportedFeature(record.name());
if (!range.contains(record.featureLevel())) {
throw new RuntimeException("Tried to apply FeatureLevelRecord " + record + ", but this controller only " +
"supports versions " + range);
}
if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
MetadataVersion mv = MetadataVersion.fromFeatureLevel(record.featureLevel());
metadataVersion.set(mv);
log.info("Replayed a FeatureLevelRecord setting metadata.version to {}", mv);
} else {
if (record.featureLevel() == 0) {
finalizedVersions.remove(record.name());
log.info("Replayed a FeatureLevelRecord removing feature {}", record.name());
} else {
finalizedVersions.put(record.name(), record.featureLevel());
log.info("Replayed a FeatureLevelRecord setting feature {} to {}",
record.name(), record.featureLevel());
}
}
}
boolean isControllerId(int nodeId) {
return quorumFeatures.isControllerId(nodeId);
}
boolean isElrFeatureEnabled() {
return latestFinalizedFeatures().versionOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 0) >=
EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
}
}