blob: 1a5185a052cbe12a85c5b9c8899be0f61c09639f [file] [log] [blame]
package org.apache.helix.controller.stages;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.HashMap;
import java.util.Map;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.common.PartitionStateMap;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.Partition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MaintenanceRecoveryStage extends AbstractAsyncBaseStage {
private static Logger LOG = LoggerFactory.getLogger(MaintenanceRecoveryStage.class);
@Override
public AsyncWorkerType getAsyncWorkerType() {
return AsyncWorkerType.MaintenanceRecoveryWorker;
}
@Override
public void execute(final ClusterEvent event) throws Exception {
// Check the cache is there
ResourceControllerDataProvider cache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
if (cache == null) {
return;
}
// Check if the maintenance signal has been changed during this pipeline run
// If true, skip this stage because the Controller already changed the signal
// The flag will be flipped in the next ReadClusterDataStage()
if (cache.hasMaintenanceSignalChanged()) {
return;
}
// Check for the maintenance signal
// If it was entered manually or the signal is null (which shouldn't happen), skip this stage
MaintenanceSignal maintenanceSignal = cache.getMaintenanceSignal();
if (maintenanceSignal == null || maintenanceSignal
.getTriggeringEntity() != MaintenanceSignal.TriggeringEntity.CONTROLLER) {
return;
}
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
if (manager == null || !manager.isConnected()) {
LogUtil.logInfo(LOG, _eventId,
"MaintenanceRecoveryStage failed due to HelixManager being null or not connected!");
return;
}
// At this point, the cluster entered maintenance mode automatically. Retrieve the
// auto-triggering reason
MaintenanceSignal.AutoTriggerReason internalReason = maintenanceSignal.getAutoTriggerReason();
boolean shouldExitMaintenance;
String reason;
switch (internalReason) {
case MAX_OFFLINE_INSTANCES_EXCEEDED:
case MAX_INSTANCES_UNABLE_TO_ACCEPT_ONLINE_REPLICAS:
// Check on the number of offline/disabled instances
int numOfflineInstancesForAutoExit =
cache.getClusterConfig().getNumOfflineInstancesForAutoExit();
if (numOfflineInstancesForAutoExit < 0) {
return; // Config is not set, no auto-exit
}
// Get the count of all instances that are either offline or disabled
int offlineDisabledCount =
cache.getAssignableInstances().size() - cache.getEnabledLiveInstances().size();
shouldExitMaintenance = offlineDisabledCount <= numOfflineInstancesForAutoExit;
reason = String.format(
"Auto-exiting maintenance mode for cluster %s; Num. of offline/disabled instances is %d, less than or equal to the exit threshold %d",
event.getClusterName(), offlineDisabledCount, numOfflineInstancesForAutoExit);
break;
case MAX_PARTITION_PER_INSTANCE_EXCEEDED:
IntermediateStateOutput intermediateStateOutput =
event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
if (intermediateStateOutput == null) {
return;
}
shouldExitMaintenance = !violatesMaxPartitionsPerInstance(cache, intermediateStateOutput);
reason = String.format(
"Auto-exiting maintenance mode for cluster %s; All instances have fewer or equal number of partitions than maxPartitionsPerInstance threshold.",
event.getClusterName());
break;
default:
shouldExitMaintenance = false;
reason = "";
}
if (shouldExitMaintenance) {
// The cluster has recovered sufficiently, so proceed to exit the maintenance mode by removing
// MaintenanceSignal. AutoTriggerReason won't be recorded
manager.getClusterManagmentTool().autoEnableMaintenanceMode(manager.getClusterName(), false,
reason, internalReason);
cache.setMaintenanceSignalChanged(); // Set the flag so we do not double enable/disable
LogUtil.logInfo(LOG, _eventId, reason);
}
}
/**
* Check that the intermediateStateOutput assignment does not violate maxPartitionPerInstance
* threshold.
* @param cache
* @param intermediateStateOutput
* @return true if violation is found, false otherwise.
*/
private boolean violatesMaxPartitionsPerInstance(ResourceControllerDataProvider cache,
IntermediateStateOutput intermediateStateOutput) {
int maxPartitionPerInstance = cache.getClusterConfig().getMaxPartitionsPerInstance();
if (maxPartitionPerInstance <= 0) {
// Config is not set; return
return false;
}
Map<String, PartitionStateMap> resourceStatesMap =
intermediateStateOutput.getResourceStatesMap();
Map<String, Integer> instancePartitionCounts = new HashMap<>();
for (String resource : resourceStatesMap.keySet()) {
IdealState idealState = cache.getIdealState(resource);
if (idealState != null
&& idealState.getStateModelDefRef().equals(BuiltInStateModelDefinitions.Task.name())) {
// Ignore task here. Task has its own throttling logic
continue;
}
PartitionStateMap partitionStateMap = resourceStatesMap.get(resource);
Map<Partition, Map<String, String>> stateMaps = partitionStateMap.getStateMap();
for (Partition p : stateMaps.keySet()) {
Map<String, String> stateMap = stateMaps.get(p);
for (String instance : stateMap.keySet()) {
// If this replica is in DROPPED state, do not count it in the partition count since it is
// to be dropped
String state = stateMap.get(instance);
if (state.equals(HelixDefinedState.DROPPED.name())) {
continue;
}
if (!instancePartitionCounts.containsKey(instance)) {
instancePartitionCounts.put(instance, 0);
}
// Number of replicas (from different partitions) held in this instance
int partitionCount = instancePartitionCounts.get(instance);
partitionCount++;
if (partitionCount > maxPartitionPerInstance) {
// There exists an instance whose intermediate state assignment violates the maximum
// partitions per instance threshold, return!
return true;
}
instancePartitionCounts.put(instance, partitionCount);
}
}
}
// No violation found
return false;
}
}