blob: 09894263f0926f78fe303c8e45f0b8bb10736b49 [file] [log] [blame]
package org.apache.helix.controller.stages;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.helix.HelixException;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.StateModelDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageSelectionStage extends AbstractBaseStage {
private static final Logger LOG = LoggerFactory.getLogger(MessageSelectionStage.class);
protected static class Bounds {
private int upper;
private int lower;
public Bounds(int lower, int upper) {
this.lower = lower;
this.upper = upper;
}
public int getUpperBound() {
return upper;
}
}
@Override
public void process(ClusterEvent event) throws Exception {
_eventId = event.getEventId();
ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
CurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.name());
MessageOutput messageGenOutput =
event.getAttribute(AttributeName.MESSAGES_ALL.name());
if (cache == null || resourceMap == null || currentStateOutput == null
|| messageGenOutput == null) {
throw new StageException("Missing attributes in event:" + event
+ ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
}
MessageOutput output = new MessageOutput();
for (String resourceName : resourceMap.keySet()) {
Resource resource = resourceMap.get(resourceName);
try {
StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
IdealState idealState = cache.getIdealState(resourceName);
Map<String, Bounds> stateConstraints =
computeStateConstraints(stateModelDef, idealState, cache);
for (Partition partition : resource.getPartitions()) {
List<Message> messages = messageGenOutput.getMessages(resourceName, partition);
List<Message> selectedMessages = selectMessages(cache.getLiveInstances(),
currentStateOutput.getCurrentStateMap(resourceName, partition),
currentStateOutput.getPendingMessageMap(resourceName, partition), messages,
currentStateOutput.getPendingRelayMessageMap(resourceName, partition).values(),
stateConstraints, stateTransitionPriorities, stateModelDef,
resource.isP2PMessageEnabled());
output.addMessages(resourceName, partition, selectedMessages);
}
} catch (HelixException ex) {
LogUtil.logError(LOG, _eventId,
"Failed to finish message selection for resource " + resourceName, ex);
}
}
event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), output);
}
private void increaseStateCnt(Map<String, Bounds> stateConstraints, String state,
Map<String, Integer> stateCnts) {
if (!stateConstraints.containsKey(state)) {
// skip state that doesn't have constraint
return;
}
if (!stateCnts.containsKey(state)) {
stateCnts.put(state, 0);
}
stateCnts.put(state, stateCnts.get(state) + 1);
}
// TODO: This method deserves its own class. The class should not understand helix but
// just be able to solve the problem using the algo. I think the method is following that
// but if we don't move it to another class its quite easy to break that contract
/**
* greedy message selection algorithm: 1) calculate CS+PS state lower/upper-bounds 2)
* group messages by state transition and sorted by priority 3) from highest priority to
* lowest, for each message group with the same transition add message one by one and
* make sure state constraint is not violated update state lower/upper-bounds when a new
* message is selected.
*
* @param liveInstances
* @param currentStates
* @param pendingMessages
* @param messages
* @param stateConstraints
* @param stateTransitionPriorities
* @param stateModelDef
* @return
*/
List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
Map<String, String> currentStates, Map<String, Message> pendingMessages,
List<Message> messages, Collection<Message> pendingRelayMessages,
Map<String, Bounds> stateConstraints, final Map<String, Integer> stateTransitionPriorities,
StateModelDefinition stateModelDef, boolean p2pMessageEnabled) {
if (messages == null || messages.isEmpty()) {
return Collections.emptyList();
}
List<Message> selectedMessages = new ArrayList<>();
Map<String, Integer> stateCnts = new HashMap<>();
String initialState = stateModelDef.getInitialState();
// count currentState, if no currentState, count as in initialState
for (String instance : liveInstances.keySet()) {
String state = initialState;
if (currentStates.containsKey(instance)) {
state = currentStates.get(instance);
}
increaseStateCnt(stateConstraints, state, stateCnts);
}
// count pendingStates
for (String instance : pendingMessages.keySet()) {
Message message = pendingMessages.get(instance);
increaseStateCnt(stateConstraints, message.getToState(), stateCnts);
increaseStateCnt(stateConstraints, message.getFromState(), stateCnts);
}
// group messages based on state transition priority
Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
new TreeMap<>();
/* record all state transition messages that transition a replica from top-state */
List<Message> fromTopStateMessages = new LinkedList<>();
for (Message message : messages) {
if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION_CANCELLATION.name())) {
selectedMessages.add(message);
continue;
}
String fromState = message.getFromState();
String toState = message.getToState();
String transition = fromState + "-" + toState;
int priority = Integer.MAX_VALUE;
if (stateTransitionPriorities.containsKey(transition)) {
priority = stateTransitionPriorities.get(transition);
}
if (!messagesGroupByStateTransitPriority.containsKey(priority)) {
messagesGroupByStateTransitPriority.put(priority, new ArrayList<>());
}
messagesGroupByStateTransitPriority.get(priority).add(message);
if (fromState.equals(stateModelDef.getTopState())) {
fromTopStateMessages.add(message);
}
}
// select messages
for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
NextMessage:
for (Message message : messageList) {
String toState = message.getToState();
String fromState = message.getFromState();
if (toState.equals(stateModelDef.getTopState())) {
// find if there are any pending relay messages match this message.
// if the pending relay message targets the same host, we are fine to continue send the message,
// if it targets to different host, we should not send the message now (should send after the relay message gets expired).
for (Message relayMsg : pendingRelayMessages) {
if (relayMsg.getToState().equals(toState) && relayMsg.getFromState()
.equals(fromState)) {
LOG.info(
"There is pending relay message, pending relay message: {}, relay time starts {}, expiry timeout {}.",
relayMsg.getMsgId(), relayMsg.getRelayTime(), relayMsg.getExpiryPeriod());
if (!relayMsg.getTgtName().equals(message.getTgtName())) {
LOG.info(
"The pending relay message was sent to a different host, not send message: {}, pending relay message: {}",
message.getMsgId(), relayMsg.getId());
continue NextMessage;
}
}
}
}
if (stateConstraints.containsKey(toState)) {
int newCnt = (stateCnts.containsKey(toState) ? stateCnts.get(toState) + 1 : 1);
if (newCnt > stateConstraints.get(toState).getUpperBound()) {
if (p2pMessageEnabled && toState.equals(stateModelDef.getTopState())
&& stateModelDef.isSingleTopStateModel()) {
// attach this message as a relay message to the message to transition off current top-state replica
if (fromTopStateMessages.size() > 0) {
Message fromTopStateMsg = fromTopStateMessages.get(0);
fromTopStateMsg.attachRelayMessage(message.getTgtName(), message);
fromTopStateMessages.remove(0);
}
} else {
// reach upper-bound of message for the topState, will not send the message
LogUtil.logInfo(LOG, _eventId,
"Reach upper_bound: " + stateConstraints.get(toState).getUpperBound()
+ ", not send message: " + message);
}
continue;
}
}
increaseStateCnt(stateConstraints, message.getToState(), stateCnts);
selectedMessages.add(message);
}
}
return selectedMessages;
}
/**
* TODO: This code is duplicate in multiple places. Can we do it in to one place in the
* beginning and compute the stateConstraint instance once and re use at other places.
* Each IdealState must have a constraint object associated with it
*/
private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
IdealState idealState, ResourceControllerDataProvider cache) {
Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
List<String> statePriorityList = stateModelDefinition.getStatesPriorityList();
for (String state : statePriorityList) {
String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
int max = -1;
if ("N".equals(numInstancesPerState)) {
max = cache.getLiveInstances().size();
} else if ("R".equals(numInstancesPerState)) {
// idealState is null when resource has been dropped,
// R can't be evaluated and ignore state constraints
//if (idealState != null) {
// max = cache.getReplicas(idealState.getResourceName());
//}
} else {
try {
max = Integer.parseInt(numInstancesPerState);
} catch (Exception e) {
// use -1
}
}
if (max > -1) {
// if state has no constraint, will not put in map
stateConstraints.put(state, new Bounds(0, max));
}
}
return stateConstraints;
}
// TODO: if state transition priority is not provided then use lexicographical sorting
// so that behavior is consistent
private Map<String, Integer> getStateTransitionPriorityMap(StateModelDefinition stateModelDef) {
Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
List<String> stateTransitionPriorityList = stateModelDef.getStateTransitionPriorityList();
for (int i = 0; i < stateTransitionPriorityList.size(); i++) {
stateTransitionPriorities.put(stateTransitionPriorityList.get(i), i);
}
return stateTransitionPriorities;
}
}