blob: 55a320a32744c79730f87170228ba3a09eebf32c [file] [log] [blame]
package org.apache.helix.controller.stages;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.time.Instant;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.api.status.ClusterManagementMode;
import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.ClusterStatus;
import org.apache.helix.model.ControllerHistory;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.LiveInstance.LiveInstanceStatus;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.Resource;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.RebalanceUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Checks the cluster status whether the cluster is in management mode.
*/
public class ManagementModeStage extends AbstractBaseStage {
private static final Logger LOG = LoggerFactory.getLogger(ManagementModeStage.class);
@Override
public void process(ClusterEvent event) throws Exception {
// TODO: implement the stage
_eventId = event.getEventId();
String clusterName = event.getClusterName();
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
if (manager == null) {
throw new StageException("HelixManager attribute value is null");
}
HelixDataAccessor accessor = manager.getHelixDataAccessor();
ManagementControllerDataProvider cache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
CurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.name());
final Map<String, Resource> resourceMap =
event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
final BestPossibleStateOutput bestPossibleStateOutput =
RebalanceUtil.buildBestPossibleState(resourceMap.keySet(), currentStateOutput);
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
ClusterManagementMode managementMode =
checkClusterFreezeStatus(cache.getEnabledLiveInstances(), cache.getLiveInstances(),
cache.getAllInstancesMessages(), cache.getPauseSignal());
recordClusterStatus(managementMode, accessor);
recordManagementModeHistory(managementMode, cache.getPauseSignal(), manager.getInstanceName(),
accessor);
event.addAttribute(AttributeName.CLUSTER_STATUS.name(), managementMode);
}
// Checks cluster freeze, controller pause mode and status.
private ClusterManagementMode checkClusterFreezeStatus(
Set<String> enabledLiveInstances,
Map<String, LiveInstance> liveInstanceMap,
Map<String, Collection<Message>> allInstanceMessages,
PauseSignal pauseSignal) {
ClusterManagementMode.Type type;
ClusterManagementMode.Status status = ClusterManagementMode.Status.COMPLETED;
if (pauseSignal == null) {
// TODO: Should check maintenance mode after it's moved to management pipeline.
type = ClusterManagementMode.Type.NORMAL;
if (HelixUtil.inManagementMode(pauseSignal, liveInstanceMap, enabledLiveInstances,
allInstanceMessages)) {
status = ClusterManagementMode.Status.IN_PROGRESS;
}
} else if (pauseSignal.isClusterPause()) {
type = ClusterManagementMode.Type.CLUSTER_FREEZE;
if (!instancesFullyFrozen(enabledLiveInstances, liveInstanceMap, allInstanceMessages)) {
status = ClusterManagementMode.Status.IN_PROGRESS;
}
} else {
type = ClusterManagementMode.Type.CONTROLLER_PAUSE;
}
return new ClusterManagementMode(type, status);
}
private boolean instancesFullyFrozen(Set<String> enabledLiveInstances,
Map<String, LiveInstance> liveInstanceMap,
Map<String, Collection<Message>> allInstanceMessages) {
// 1. All live instances are frozen
// 2. No pending participant status change message.
return enabledLiveInstances.stream().noneMatch(
instance -> !LiveInstanceStatus.PAUSED.equals(liveInstanceMap.get(instance).getStatus())
|| hasPendingMessage(allInstanceMessages.get(instance),
MessageType.PARTICIPANT_STATUS_CHANGE));
}
private boolean hasPendingMessage(Collection<Message> messages, MessageType type) {
return messages != null && messages.stream()
.anyMatch(message -> type.name().equals(message.getMsgType()));
}
private void recordClusterStatus(ClusterManagementMode mode, HelixDataAccessor accessor) {
// update cluster status
PropertyKey statusPropertyKey = accessor.keyBuilder().clusterStatus();
ClusterStatus clusterStatus = accessor.getProperty(statusPropertyKey);
if (clusterStatus == null) {
clusterStatus = new ClusterStatus();
}
ClusterManagementMode.Type recordedType = clusterStatus.getManagementMode();
ClusterManagementMode.Status recordedStatus = clusterStatus.getManagementModeStatus();
// If there is any pending message sent by users, status could be computed as in progress.
// Skip recording status change to avoid confusion after cluster is already fully frozen.
if (ClusterManagementMode.Type.CLUSTER_FREEZE.equals(recordedType)
&& ClusterManagementMode.Status.COMPLETED.equals(recordedStatus)
&& ClusterManagementMode.Type.CLUSTER_FREEZE.equals(mode.getMode())
&& ClusterManagementMode.Status.IN_PROGRESS.equals(mode.getStatus())) {
LOG.info("Skip recording status mode={}, status={}, because cluster is fully frozen",
mode.getMode(), mode.getStatus());
return;
}
if (!mode.getMode().equals(recordedType) || !mode.getStatus().equals(recordedStatus)) {
// Only update status when it's different with metadata store
clusterStatus.setManagementMode(mode.getMode());
clusterStatus.setManagementModeStatus(mode.getStatus());
if (!accessor.updateProperty(statusPropertyKey, clusterStatus)) {
LOG.error("Failed to update cluster status {}", clusterStatus);
}
}
}
private void recordManagementModeHistory(ClusterManagementMode mode, PauseSignal pauseSignal,
String controllerName, HelixDataAccessor accessor) {
// Only record completed status
if (!ClusterManagementMode.Status.COMPLETED.equals(mode.getStatus())) {
return;
}
// Record a management mode history in controller history
String path = accessor.keyBuilder().controllerLeaderHistory().getPath();
long timestamp = Instant.now().toEpochMilli();
String fromHost = (pauseSignal == null ? null : pauseSignal.getFromHost());
String reason = (pauseSignal == null ? null : pauseSignal.getReason());
// Need the updater to avoid race condition with controller/maintenance history updates.
if (!accessor.getBaseDataAccessor().update(path, oldRecord -> {
if (oldRecord == null) {
oldRecord = new ZNRecord(PropertyType.HISTORY.toString());
}
return new ControllerHistory(oldRecord)
.updateManagementModeHistory(controllerName, mode, fromHost, timestamp, reason);
}, AccessOption.PERSISTENT)) {
LOG.error("Failed to write management mode history to ZK!");
}
}
}