blob: 7229bc7beacec54fd4518e9a98d6dca31b49d7a3 [file] [log] [blame]
package org.apache.helix.messaging.handling;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
import org.apache.helix.zookeeper.datamodel.ZNRecordDelta;
import org.apache.helix.zookeeper.datamodel.ZNRecordDelta.MergeOperation;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.Attributes;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelParser;
import org.apache.helix.participant.statemachine.StateTransitionError;
import org.apache.helix.util.StatusUpdateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HelixStateTransitionHandler extends MessageHandler {
public static class HelixStateMismatchException extends Exception {
public HelixStateMismatchException(String info) {
* If current state == toState in message, this is considered as Duplicated state transition
public static class HelixDuplicatedStateTransitionException extends Exception {
public HelixDuplicatedStateTransitionException(String info) {
private static final Logger logger = LoggerFactory.getLogger(HelixStateTransitionHandler.class);
private final StateModel _stateModel;
StatusUpdateUtil _statusUpdateUtil;
private final StateModelParser _transitionMethodFinder;
private final CurrentState _currentStateDelta;
private final HelixManager _manager;
private final StateModelFactory<? extends StateModel> _stateModelFactory;
volatile boolean _isTimeout = false;
public HelixStateTransitionHandler(StateModelFactory<? extends StateModel> stateModelFactory,
StateModel stateModel, Message message, NotificationContext context,
CurrentState currentStateDelta) {
super(message, context);
_stateModel = stateModel;
_statusUpdateUtil = new StatusUpdateUtil();
_transitionMethodFinder = new StateModelParser();
_currentStateDelta = currentStateDelta;
_manager = _notificationContext.getManager();
_stateModelFactory = stateModelFactory;
void preHandleMessage() throws Exception {
if (!_message.isValid()) {
String errorMessage = "Invalid Message, ensure that message: " + _message
+ " has all the required fields: " + Arrays.toString(Message.Attributes.values());
_statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, errorMessage,
throw new HelixException(errorMessage);
}"handling message: " + _message.getMsgId() + " transit "
+ _message.getResourceName() + "." + _message.getPartitionName() + "|"
+ _message.getPartitionNames() + " from:" + _message.getFromState() + " to:"
+ _message.getToState() + ", relayedFrom: " + _message.getRelaySrcHost());
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
String partitionName = _message.getPartitionName();
String fromState = _message.getFromState();
String toState = _message.getToState();
// Verify the fromState and current state of the stateModel
// getting current state from state model will provide most up-to-date
// current state. In case current state is null, partition is in initial
// state and we are setting it in current state
String state = _stateModel.getCurrentState() != null ? _stateModel.getCurrentState()
: _currentStateDelta.getState(partitionName);
// Set start time right before invoke client logic
_currentStateDelta.setStartTime(_message.getPartitionName(), System.currentTimeMillis());
Exception err = null;
if (toState.equalsIgnoreCase(state)) {
// To state equals current state, we can just ignore the message
err = new HelixDuplicatedStateTransitionException(
String.format("Partition %s current state is same as toState (%s->%s) from message.",
partitionName, fromState, toState));
} else if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state)) {
// If current state is neither toState nor fromState in message, there is a problem
err = new HelixStateMismatchException(String.format(
"Current state of stateModel does not match the fromState in Message, CurrentState: %s, Message: %s->%s, Partition: %s, from: %s, to: %s",
state, fromState, toState, partitionName, _message.getMsgSrc(), _message.getTgtName()));
if (err != null) {
_statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, err.getMessage(),
throw err;
// Reset the REQUESTED_STATE property if it exists.
try {
String instance = _manager.getInstanceName();
String sessionId = _message.getTgtSessionId();
String resource = _message.getResourceName();
ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(_message.getBucketSize());
PropertyKey key = accessor.keyBuilder().currentState(instance, sessionId, resource,
ZNRecord rec = new ZNRecord(resource);
Map<String, String> map = new TreeMap<String, String>();
map.put(, null);
rec.getMapFields().put(partitionName, map);
ZNRecordDelta delta = new ZNRecordDelta(rec, ZNRecordDelta.MergeOperation.SUBTRACT);
List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
CurrentState currStateUpdate = new CurrentState(resource);
// Update the ZK current state of the node
if (!accessor.updateProperty(key, currStateUpdate)) {
logger.error("Fails to persist current state back to ZK for resource " + resource
+ " partition: " + partitionName);
} catch (Exception e) {
logger.error("Error when removing " +
+ " from current state.", e);
StateTransitionError error =
new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
_stateModel.rollbackOnError(_message, _notificationContext, error);
_message, HelixStateTransitionHandler.class, e, "Error when removing "
+ + " from current state.",
void postHandleMessage() {
HelixTaskResult taskResult =
(HelixTaskResult) _notificationContext.get(MapKey.HELIX_TASK_RESULT.toString());
Exception exception = taskResult.getException();
String partitionKey = _message.getPartitionName();
String resource = _message.getResourceName();
String sessionId = _message.getTgtSessionId();
String instanceName = _manager.getInstanceName();
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
int bucketSize = _message.getBucketSize();
ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
// No need to sync on manager, we are cancel executor in expiry session before start executor in
// new session
// sessionId might change when we update the state model state.
// for zk current state it is OK as we have the per-session current state node
if (!_message.getTgtSessionId().equals(_manager.getSessionId())) {
logger.warn("Session id has changed. Skip postExecutionMessage. Old session "
+ _message.getExecutionSessionId() + " , new session : " + _manager.getSessionId());
// Set the INFO property and mark the end time, previous state of the state transition
_currentStateDelta.setInfo(partitionKey, taskResult.getInfo());
_currentStateDelta.setEndTime(partitionKey, taskResult.getCompleteTime());
_currentStateDelta.setPreviousState(partitionKey, _message.getFromState());
// add host name this state transition is triggered by.
if ( {
_currentStateDelta.setTriggerHost(partitionKey, _message.getRelaySrcHost());
} else {
_currentStateDelta.setTriggerHost(partitionKey, _message.getMsgSrc());
if (taskResult.isSuccess()) {
// String fromState = message.getFromState();
String toState = _message.getToState();
_currentStateDelta.setState(partitionKey, toState);
if (toState.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
// for "OnOfflineToDROPPED" message, we need to remove the resource key record
// from the current state of the instance because the resource key is dropped.
// In the state model it will be stayed as "OFFLINE", which is OK.
ZNRecord rec = new ZNRecord(_currentStateDelta.getId());
rec.getMapFields().put(partitionKey, null);
ZNRecordDelta delta = new ZNRecordDelta(rec, MergeOperation.SUBTRACT);
List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
_stateModelFactory.removeStateModel(resource, partitionKey);
} else {
// if the partition is not to be dropped, update _stateModel to the TO_STATE
} else if (taskResult.isCancelled()) {
// Cancelled message does not need current state update
} else {
if (exception instanceof HelixStateMismatchException) {
// if fromState mismatch, set current state on zk to stateModel's current state
logger.warn("Force CurrentState on Zk to be stateModel's CurrentState. partitionKey: "
+ partitionKey + ", currentState: " + _stateModel.getCurrentState() + ", message: "
+ _message);
_currentStateDelta.setState(partitionKey, _stateModel.getCurrentState());
} else {
StateTransitionError error =
new StateTransitionError(ErrorType.INTERNAL, ErrorCode.ERROR, exception);
if (exception instanceof InterruptedException) {
if (_isTimeout) {
error = new StateTransitionError(ErrorType.INTERNAL, ErrorCode.TIMEOUT, exception);
} else {
// State transition interrupted but not caused by timeout. Keep the current
// state in this case
"State transition interrupted but not timeout. Not updating state. Partition : "
+ _message.getPartitionName() + " MsgId : " + _message.getMsgId());
_stateModel.rollbackOnError(_message, _notificationContext, error);
_currentStateDelta.setState(partitionKey, HelixDefinedState.ERROR.toString());
// if we have errors transit from ERROR state, disable the partition
if (_message.getFromState().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
try {
// Update the ZK current state of the node
PropertyKey key = keyBuilder.currentState(instanceName, sessionId, resource,
if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
// normal message
if (!accessor.updateProperty(key, _currentStateDelta)) {
throw new HelixException("Fails to persist current state back to ZK for resource "
+ resource + " partition: " + _message.getPartitionName());
} else {
// sub-message of a batch message
ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap =
(ConcurrentHashMap<String, CurrentStateUpdate>) _notificationContext
csUpdateMap.put(partitionKey, new CurrentStateUpdate(key, _currentStateDelta));
} catch (Exception e) {
logger.error("Error when updating current-state ", e);
StateTransitionError error =
new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
_stateModel.rollbackOnError(_message, _notificationContext, error);
_statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, e,
"Error when update current-state ", _manager);
void disablePartition() {
String instanceName = _manager.getInstanceName();
String resourceName = _message.getResourceName();
String partitionName = _message.getPartitionName();
String clusterName = _manager.getClusterName();
HelixAdmin admin = _manager.getClusterManagmentTool();
admin.enablePartition(false, clusterName, instanceName, resourceName,
Arrays.asList(partitionName));"error in transit from ERROR to " + _message.getToState() + " for partition: "
+ partitionName + ". disable it on " + instanceName);
public HelixTaskResult handleMessage() {
NotificationContext context = _notificationContext;
Message message = _message;
synchronized (_stateModel) {
HelixTaskResult taskResult = new HelixTaskResult();
HelixManager manager = context.getManager();
_statusUpdateUtil.logInfo(message, HelixStateTransitionHandler.class,
"Message handling task begin execute", manager);
try {
invoke(manager, context, taskResult, message);
} catch (HelixDuplicatedStateTransitionException e) {
// Duplicated state transition problem is fine
} catch (HelixStateMismatchException e) {
// Simply log error and return from here if State mismatch.
// The current state of the state model is intact.
} catch (Exception e) {
String errorMessage =
"Exception while executing a state transition task " + message.getPartitionName();
logger.error(errorMessage, e);
if (e.getCause() != null && e.getCause() instanceof InterruptedException) {
e = (InterruptedException) e.getCause();
if (e instanceof HelixRollbackException
|| (e.getCause() != null && e.getCause() instanceof HelixRollbackException)) {
// TODO : Support cancel to any state"Rollback happened of state transition on resource \""
+ _message.getResourceName() + "\" partition \"" + _message.getPartitionName()
+ "\" from \"" + _message.getFromState() + "\" to \"" + _message.getToState() + "\"");
} else {
_statusUpdateUtil.logError(message, HelixStateTransitionHandler.class, e, errorMessage,
taskResult.setInterrupted(e instanceof InterruptedException);
// add task result to context for postHandling
context.add(MapKey.HELIX_TASK_RESULT.toString(), taskResult);
return taskResult;
private void invoke(HelixManager manager, NotificationContext context, HelixTaskResult taskResult,
Message message) throws IllegalAccessException, InvocationTargetException,
InterruptedException, HelixRollbackException {
_statusUpdateUtil.logInfo(message, HelixStateTransitionHandler.class,
"Message handling invoking", manager);
// by default, we invoke state transition function in state model
Method methodToInvoke = null;
String fromState = message.getFromState();
String toState = message.getToState();
methodToInvoke = _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(),
fromState, toState, new Class[] {
Message.class, NotificationContext.class
if (methodToInvoke != null) {
"Instance %s, partition %s received state transition from %s to %s on session %s, message id: %s",
message.getTgtName(), message.getPartitionName(), message.getFromState(),
message.getToState(), message.getTgtSessionId(), message.getMsgId()));
if (_cancelled) {
throw new HelixRollbackException(String.format(
"Instance %s, partition %s state transition from %s to %s on session %s has been cancelled, message id: %s",
message.getTgtName(), message.getPartitionName(), message.getFromState(),
message.getToState(), message.getTgtSessionId(), message.getMsgId()));
Object result = methodToInvoke.invoke(_stateModel, new Object[] {
message, context
String resultStr;
if (result == null || result instanceof Void) {
resultStr = "";
} else {
resultStr = result.toString();
} else {
String errorMessage = "Unable to find method for transition from " + fromState + " to "
+ toState + " in " + _stateModel.getClass();
_statusUpdateUtil.logError(message, HelixStateTransitionHandler.class, errorMessage, manager);
public void onError(Exception e, ErrorCode code, ErrorType type) {
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
String instanceName = _manager.getInstanceName();
String resourceName = _message.getResourceName();
String partition = _message.getPartitionName();
// All internal error has been processed already, so we can skip them
if (type == ErrorType.INTERNAL) {
logger.error("Skip internal error. errCode: " + code + ", errMsg: " + e.getMessage());
try {
// set current state to ERROR for the partition
// if the transition is not canceled, it should go into error state
if (code == ErrorCode.ERROR) {
CurrentState currentStateDelta = new CurrentState(resourceName);
currentStateDelta.setState(partition, HelixDefinedState.ERROR.toString());
// if transit from ERROR state, disable the partition
if (_message.getFromState().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
if (!accessor.updateProperty(
keyBuilder.currentState(instanceName, _message.getTgtSessionId(), resourceName),
currentStateDelta)) {
logger.error("Fails to persist ERROR current state to ZK for resource " + resourceName
+ " partition: " + partition);
} finally {
StateTransitionError error = new StateTransitionError(type, code, e);
_stateModel.rollbackOnError(_message, _notificationContext, error);
public void onTimeout() {
_isTimeout = true;