blob: ae5952225ec40c6b1ffce1176fb7877a6831d2e2 [file] [log] [blame]
package org.apache.helix.model;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.builder.StateTransitionTableBuilder;
import org.apache.helix.model.util.StateModelDefinitionValidator;
/**
* Describe the state model
*/
public class StateModelDefinition extends HelixProperty {
public enum StateModelDefinitionProperty {
INITIAL_STATE,
STATE_TRANSITION_PRIORITYLIST,
STATE_PRIORITY_LIST
}
/**
* state model's initial state
*/
private final String _initialState;
/**
* State Names in priority order. Indicates the order in which states are
* fulfilled
*/
private final List<String> _statesPriorityList;
/**
* Specifies the number of instances for a given state <br>
* -1 don't care, don't try to keep any resource in this state on any instance <br>
* >0 any integer number greater than 0 specifies the number of instances
* needed to be in this state <br>
* R all instances in the preference list can be in this state <br>
* N all instances in the cluster will be put in this state.PreferenceList
* must be denoted as '*'
*/
private final Map<String, String> _statesCountMap;
private final List<String> _stateTransitionPriorityList;
private Map<String, Integer> _statesPriorityMap = new HashMap<>();
/**
* StateTransition which is used to find the nextState given StartState and
* FinalState
*/
private final Map<String, Map<String, String>> _stateTransitionTable;
/**
* Instantiate from a pre-populated record
* @param record ZNRecord representing a state model definition
*/
public StateModelDefinition(ZNRecord record) {
super(record);
_initialState = record.getSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString());
if (_initialState == null) {
throw new IllegalArgumentException("initial-state for " + record.getId() + " is null");
}
_statesPriorityList =
record.getListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString());
_stateTransitionPriorityList =
record.getListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString());
_stateTransitionTable = new HashMap<>();
_statesCountMap = new HashMap<>();
if (_statesPriorityList != null) {
int priority = 1;
for (String state : _statesPriorityList) {
Map<String, String> metaData = record.getMapField(state + ".meta");
if (metaData != null) {
if (metaData.get("count") != null) {
_statesCountMap.put(state, metaData.get("count"));
}
}
Map<String, String> nextData = record.getMapField(state + ".next");
_stateTransitionTable.put(state, nextData);
_statesPriorityMap.put(state, priority++);
}
}
// add HelixDefinedStates to statesPriorityMap in case it hasn't been added already
for (HelixDefinedState state : HelixDefinedState.values()) {
if (!_statesPriorityMap.containsKey(state.name())) {
// Make it the lowest priority
_statesPriorityMap.put(state.name(), Integer.MAX_VALUE);
}
}
// add transitions for helix-defined states
for (HelixDefinedState state : HelixDefinedState.values()) {
if (_statesPriorityList == null || !_statesPriorityList.contains(state.toString())) {
_statesCountMap.put(state.toString(), "-1");
}
}
addDefaultTransition(HelixDefinedState.ERROR.toString(), HelixDefinedState.DROPPED.toString(),
HelixDefinedState.DROPPED.toString());
addDefaultTransition(HelixDefinedState.ERROR.toString(), _initialState, _initialState);
addDefaultTransition(_initialState, HelixDefinedState.DROPPED.toString(),
HelixDefinedState.DROPPED.toString());
}
/**
* add transitions involving helix-defines states
* these transitions need not to be specified in state-model-definition
* @param from source state
* @param to destination state
* @param next intermediate state to reach the destination
*/
void addDefaultTransition(String from, String to, String next) {
if (!_stateTransitionTable.containsKey(from)) {
_stateTransitionTable.put(from, new TreeMap<String, String>());
}
if (!_stateTransitionTable.get(from).containsKey(to)) {
_stateTransitionTable.get(from).put(to, next);
}
}
/**
* Get an ordered priority list of transitions
* @return transitions in the form SRC-DEST, the first of which is highest priority
*/
public List<String> getStateTransitionPriorityList() {
return _stateTransitionPriorityList;
}
public Map<String, Integer> getStatePriorityMap() {
return _statesPriorityMap;
}
/**
* Get an ordered priority list of states
* @return state names, the first of which is highest priority
*/
public List<String> getStatesPriorityList() {
return _statesPriorityList;
}
/**
* Get the intermediate state required to transition from one state to the other
* @param fromState the source
* @param toState the destination
* @return the intermediate state
*/
public String getNextStateForTransition(String fromState, String toState) {
Map<String, String> map = _stateTransitionTable.get(fromState);
if (map != null) {
return map.get(toState);
}
return null;
}
/**
* Get the starting state in the model
* @return name of the initial state
*/
public String getInitialState() {
return _initialState;
}
/**
* Number of instances that can be in each state
* @param state the state name
* @return maximum instance count per state, can be "N" or "R"
*/
public String getNumInstancesPerState(String state) {
return _statesCountMap.get(state);
}
/**
* Get the top state of this state model
* @return
*/
public String getTopState() {
return _statesPriorityList.get(0);
}
/**
* Whether this state model allows at most a single replica in the top-state?
*
* @return
*/
public boolean isSingleTopStateModel() {
int topStateCount = 0;
try {
topStateCount = Integer.valueOf(_statesCountMap.get(getTopState()));
} catch (NumberFormatException ex) {
}
return topStateCount == 1;
}
/**
* Get the second top states, which need one step transition to top state
* @return a set of second top states
*/
public Set<String> getSecondTopStates() {
Set<String> secondTopStates = new HashSet<String>();
if (_statesPriorityList == null || _statesPriorityList.isEmpty()) {
return secondTopStates;
}
String topState = _statesPriorityList.get(0);
for (String state : _stateTransitionTable.keySet()) {
Map<String, String> transitionMap = _stateTransitionTable.get(state);
if (transitionMap != null && transitionMap.containsKey(topState) && transitionMap
.get(topState).equals(topState)) {
secondTopStates.add(state);
}
}
return secondTopStates;
}
@Override
public boolean isValid() {
return StateModelDefinitionValidator.isStateModelDefinitionValid(this);
}
// TODO move this to model.builder package, refactor StateModelConfigGenerator to use this
/**
* Construct a state model
*/
public static class Builder {
private final String _statemodelName;
private String initialState;
Map<String, Integer> statesMap;
Map<Transition, Integer> transitionMap;
Map<String, String> stateConstraintMap;
/**
* Start building a state model with a name
* @param name state model name
*/
public Builder(String name) {
this._statemodelName = name;
statesMap = new HashMap<>();
transitionMap = new HashMap<>();
stateConstraintMap = new HashMap<>();
}
/**
* initial state of a replica when it starts, most commonly used initial
* state is OFFLINE
* @param initialState
*/
public Builder initialState(String initialState) {
this.initialState = initialState;
return this;
}
/**
* Define all valid states using this method. Set the priority in which the
* constraints must be satisfied. Lets say STATE1 has a constraint of 1 and
* STATE2 has a constraint of 3 but only one node is up then Helix will uses
* the priority to see STATE constraint has to be given higher preference <br/>
* Use -1 to indicates states with no constraints, like OFFLINE
* @param state
* @param priority
*/
public Builder addState(String state, int priority) {
statesMap.put(state, priority);
return this;
}
/**
* Sets the priority to Integer.MAX_VALUE
* @param state
*/
public Builder addState(String state) {
addState(state, Integer.MAX_VALUE);
return this;
}
/**
* Define all legal transitions between states using this method. Priority
* is used to order the transitions. Helix tries to maximize the number of
* transitions that can be fired in parallel without violating the
* constraint. The transitions are first sorted based on priority and
* transitions are selected in a greedy way until the constriants are not
* violated.
* @param fromState source
* @param toState destination
* @param priority priority, higher value is higher priority
* @return Builder
*/
public Builder addTransition(String fromState, String toState, int priority) {
transitionMap.put(new Transition(fromState, toState), priority);
return this;
}
/**
* Add a state transition with maximal priority value
* @see #addTransition(String, String, int)
* @param fromState
* @param toState
* @return Builder
*/
public Builder addTransition(String fromState, String toState) {
addTransition(fromState, toState, Integer.MAX_VALUE);
return this;
}
/**
* Set a maximum for replicas in this state
* @param state state name
* @param upperBound maximum
* @return Builder
*/
public Builder upperBound(String state, int upperBound) {
stateConstraintMap.put(state, String.valueOf(upperBound));
return this;
}
/**
* You can use this to have the bounds dynamically change based on other
* parameters. <br/>
* Currently support 2 values <br/>
* R --> Refers to the number of replicas specified during resource
* creation. This allows having different replication factor for each
* resource without having to create a different state machine. <br/>
* N --> Refers to all nodes in the cluster. Useful for resources that need
* to exist on all nodes. This way one can add/remove nodes without having
* the change the bounds.
* @param state
* @param bound
* @return Builder
*/
public Builder dynamicUpperBound(String state, String bound) {
stateConstraintMap.put(state, bound);
return this;
}
/**
* Create a StateModelDefinition from this Builder
* @return StateModelDefinition
*/
public StateModelDefinition build() {
ZNRecord record = new ZNRecord(_statemodelName);
// get sorted state priorities by specified values
ArrayList<String> statePriorityList = new ArrayList<String>(statesMap.keySet());
Comparator<? super String> c1 = new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return statesMap.get(o1).compareTo(statesMap.get(o2));
}
};
Collections.sort(statePriorityList, c1);
// get sorted transition priorities by specified values
ArrayList<Transition> transitionList = new ArrayList<Transition>(transitionMap.keySet());
Comparator<? super Transition> c2 = new Comparator<Transition>() {
@Override
public int compare(Transition o1, Transition o2) {
return transitionMap.get(o1).compareTo(transitionMap.get(o2));
}
};
Collections.sort(transitionList, c2);
List<String> transitionPriorityList = new ArrayList<>(transitionList.size());
for (Transition t : transitionList) {
transitionPriorityList.add(t.toString());
}
record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(), initialState);
record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
statePriorityList);
record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
transitionPriorityList);
// compute full paths for next states
StateTransitionTableBuilder stateTransitionTableBuilder = new StateTransitionTableBuilder();
Map<String, Map<String, String>> transitionTable =
stateTransitionTableBuilder.buildTransitionTable(statePriorityList,
new ArrayList<>(transitionMap.keySet()));
for (String state : transitionTable.keySet()) {
record.setMapField(state + ".next", transitionTable.get(state));
}
// state counts
for (String state : statePriorityList) {
HashMap<String, String> metadata = new HashMap<String, String>();
if (stateConstraintMap.get(state) != null) {
metadata.put("count", stateConstraintMap.get(state));
} else {
metadata.put("count", "-1");
}
record.setMapField(state + ".meta", metadata);
}
return new StateModelDefinition(record);
}
}
@Override
public boolean equals(Object o) {
if (o == null) {
return false;
}
if (!(o instanceof StateModelDefinition)) {
return false;
}
StateModelDefinition stateModelDefinition = (StateModelDefinition) o;
return _initialState.equals(stateModelDefinition._initialState) && _statesCountMap
.equals(stateModelDefinition._statesCountMap) && _statesPriorityList
.equals(stateModelDefinition._statesPriorityList) && _stateTransitionPriorityList
.equals(stateModelDefinition._stateTransitionPriorityList) &&
_stateTransitionTable.equals(stateModelDefinition._stateTransitionTable);
}
/**
* Get the state to its count map, order in its state priority.
*
* @return state count map: state->count
*/
public LinkedHashMap<String, Integer> getStateCountMap(int candidateNodeNum, int totalReplicas) {
LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<>();
List<String> statesPriorityList = getStatesPriorityList();
int replicas = totalReplicas;
for (String state : statesPriorityList) {
String num = getNumInstancesPerState(state);
if (candidateNodeNum <= 0) {
break;
}
if ("N".equals(num)) {
stateCountMap.put(state, candidateNodeNum);
replicas -= candidateNodeNum;
break;
} else if ("R".equals(num)) {
// wait until we get the counts for all other states
continue;
} else {
int stateCount = -1;
try {
stateCount = Integer.parseInt(num);
} catch (Exception e) {
}
if (stateCount > 0) {
int count = stateCount <= candidateNodeNum ? stateCount : candidateNodeNum;
candidateNodeNum -= count;
stateCountMap.put(state, count);
replicas -= count;
}
}
}
// get state count for R
for (String state : statesPriorityList) {
String num = getNumInstancesPerState(state);
if ("R".equals(num)) {
if (candidateNodeNum > 0 && replicas > 0) {
stateCountMap.put(state, replicas < candidateNodeNum ? replicas : candidateNodeNum);
}
// should have at most one state using R
break;
}
}
return stateCountMap;
}
/**
* Given instance->state map, return the state counts
*
* @param stateMap
*
* @return state->count map for the given state map.
*/
public static Map<String, Integer> getStateCounts(Map<String, String> stateMap) {
Map<String, Integer> stateCounts = new HashMap<>();
for (String state : stateMap.values()) {
if (!stateCounts.containsKey(state)) {
stateCounts.put(state, 0);
}
stateCounts.put(state, stateCounts.get(state) + 1);
}
return stateCounts;
}
}