blob: bd58a0633edcde21b3e014e666d0f20205ec7d68 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.safemode;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class defining Safe mode exit criteria for Pipelines.
*
* This rule defines percentage of healthy pipelines need to be reported.
* Once safe mode exit happens, this rules take care of writes can go
* through in a cluster.
*/
public class HealthyPipelineSafeModeRule extends SafeModeExitRule<Pipeline> {
public static final Logger LOG =
LoggerFactory.getLogger(HealthyPipelineSafeModeRule.class);
private int healthyPipelineThresholdCount;
private int currentHealthyPipelineCount = 0;
private final double healthyPipelinesPercent;
private final Set<PipelineID> processedPipelineIDs = new HashSet<>();
HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
PipelineManager pipelineManager,
SCMSafeModeManager manager, ConfigurationSource configuration) {
super(manager, ruleName, eventQueue);
healthyPipelinesPercent =
configuration.getDouble(HddsConfigKeys.
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
HddsConfigKeys.
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT);
int minDatanodes = configuration.getInt(
HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_DATANODE,
HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_DATANODE_DEFAULT);
// We only care about THREE replica pipeline
int minHealthyPipelines = minDatanodes /
HddsProtos.ReplicationFactor.THREE_VALUE;
Preconditions.checkArgument(
(healthyPipelinesPercent >= 0.0 && healthyPipelinesPercent <= 1.0),
HddsConfigKeys.
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT
+ " value should be >= 0.0 and <= 1.0");
// We want to wait for RATIS THREE factor write pipelines
int pipelineCount = pipelineManager.getPipelines(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.ALLOCATED).size();
// This value will be zero when pipeline count is 0.
// On a fresh installed cluster, there will be zero pipelines in the SCM
// pipeline DB.
healthyPipelineThresholdCount = Math.max(minHealthyPipelines,
(int) Math.ceil(healthyPipelinesPercent * pipelineCount));
LOG.info("Total pipeline count is {}, healthy pipeline " +
"threshold count is {}", pipelineCount, healthyPipelineThresholdCount);
getSafeModeMetrics().setNumHealthyPipelinesThreshold(
healthyPipelineThresholdCount);
}
@VisibleForTesting
public void setHealthyPipelineThresholdCount(int actualPipelineCount) {
healthyPipelineThresholdCount =
(int) Math.ceil(healthyPipelinesPercent * actualPipelineCount);
}
@Override
protected TypedEvent<Pipeline> getEventType() {
return SCMEvents.OPEN_PIPELINE;
}
@Override
protected boolean validate() {
if (currentHealthyPipelineCount >= healthyPipelineThresholdCount) {
return true;
}
return false;
}
@Override
protected void process(Pipeline pipeline) {
// When SCM is in safe mode for long time, already registered
// datanode can send pipeline report again, or SCMPipelineManager will
// create new pipelines.
Preconditions.checkNotNull(pipeline);
if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
pipeline.isHealthy() &&
!processedPipelineIDs.contains(pipeline.getId())) {
getSafeModeMetrics().incCurrentHealthyPipelinesCount();
currentHealthyPipelineCount++;
processedPipelineIDs.add(pipeline.getId());
}
if (scmInSafeMode()) {
SCMSafeModeManager.getLogger().info(
"SCM in safe mode. Healthy pipelines reported count is {}, " +
"required healthy pipeline reported count is {}",
currentHealthyPipelineCount, healthyPipelineThresholdCount);
}
}
@Override
protected void cleanup() {
processedPipelineIDs.clear();
}
@VisibleForTesting
public int getCurrentHealthyPipelineCount() {
return currentHealthyPipelineCount;
}
@VisibleForTesting
public int getHealthyPipelineThresholdCount() {
return healthyPipelineThresholdCount;
}
}