blob: 590064d4eeb5e8cd1da764eeb5fffadaa811cbc2 [file] [log] [blame]
package org.apache.helix.messaging.handling;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.Attributes;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelParser;
import org.apache.helix.participant.statemachine.StateTransitionError;
import org.apache.helix.task.TaskStateModel;
import org.apache.helix.util.StatusUpdateUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
import org.apache.helix.zookeeper.datamodel.ZNRecordDelta;
import org.apache.helix.zookeeper.datamodel.ZNRecordDelta.MergeOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HelixStateTransitionHandler extends MessageHandler {
public static class HelixStateMismatchException extends Exception {
public HelixStateMismatchException(String info) {
super(info);
}
}
/**
* If current state == toState in message, this is considered as Duplicated state transition
*/
public static class HelixDuplicatedStateTransitionException extends Exception {
public HelixDuplicatedStateTransitionException(String info) {
super(info);
}
}
public static class StaleMessageValidateResult {
public boolean isValid;
public Exception exception;
StaleMessageValidateResult(Exception exp) {
exception = exp;
isValid = exception == null;
}
}
private static final Logger logger = LoggerFactory.getLogger(HelixStateTransitionHandler.class);
private final StateModel _stateModel;
StatusUpdateUtil _statusUpdateUtil;
private final StateModelParser _transitionMethodFinder;
private final CurrentState _currentStateDelta;
private final HelixManager _manager;
private final StateModelFactory<? extends StateModel> _stateModelFactory;
private final boolean _isTaskMessage;
private final boolean _isTaskCurrentStatePathDisabled;
volatile boolean _isTimeout = false;
public HelixStateTransitionHandler(StateModelFactory<? extends StateModel> stateModelFactory,
StateModel stateModel, Message message, NotificationContext context,
CurrentState currentStateDelta) {
super(message, context);
_stateModel = stateModel;
_statusUpdateUtil = new StatusUpdateUtil();
_transitionMethodFinder = new StateModelParser();
_currentStateDelta = currentStateDelta;
_manager = _notificationContext.getManager();
_stateModelFactory = stateModelFactory;
_isTaskMessage = stateModel instanceof TaskStateModel;
_isTaskCurrentStatePathDisabled =
Boolean.getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED);
}
void preHandleMessage() throws Exception {
if (!_message.isValid()) {
String errorMessage = "Invalid Message, ensure that message: " + _message
+ " has all the required fields: " + Arrays.toString(Message.Attributes.values());
_statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, errorMessage,
_manager);
logger.error(errorMessage);
throw new HelixException(errorMessage);
}
logger.info("handling message: " + _message.getMsgId() + " transit "
+ _message.getResourceName() + "." + _message.getPartitionName() + "|"
+ _message.getPartitionNames() + " from:" + _message.getFromState() + " to:"
+ _message.getToState() + ", relayedFrom: " + _message.getRelaySrcHost());
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
String partitionName = _message.getPartitionName();
// Set start time right before invoke client logic
_currentStateDelta.setStartTime(_message.getPartitionName(), System.currentTimeMillis());
StaleMessageValidateResult err = staleMessageValidator();
if (!err.isValid) {
_statusUpdateUtil
.logError(_message, HelixStateTransitionHandler.class, err.exception.getMessage(),
_manager);
logger.error(err.exception.getMessage());
throw err.exception;
}
// Reset the REQUESTED_STATE property if it exists.
try {
String instance = _manager.getInstanceName();
String sessionId = _message.getTgtSessionId();
String resource = _message.getResourceName();
ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(_message.getBucketSize());
PropertyKey key = _isTaskMessage && !_isTaskCurrentStatePathDisabled ? accessor.keyBuilder()
.taskCurrentState(instance, sessionId, resource, bucketizer.getBucketName(partitionName))
: accessor.keyBuilder()
.currentState(instance, sessionId, resource, bucketizer.getBucketName(partitionName));
ZNRecord rec = new ZNRecord(resource);
Map<String, String> map = new TreeMap<String, String>();
map.put(CurrentState.CurrentStateProperty.REQUESTED_STATE.name(), null);
rec.getMapFields().put(partitionName, map);
ZNRecordDelta delta = new ZNRecordDelta(rec, ZNRecordDelta.MergeOperation.SUBTRACT);
List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
deltaList.add(delta);
CurrentState currStateUpdate = new CurrentState(resource);
currStateUpdate.setDeltaList(deltaList);
// Update the ZK current state of the node
if (!accessor.updateProperty(key, currStateUpdate)) {
logger.error("Fails to persist current state back to ZK for resource " + resource
+ " partition: " + partitionName);
}
} catch (Exception e) {
logger.error("Error when removing " + CurrentState.CurrentStateProperty.REQUESTED_STATE.name()
+ " from current state.", e);
StateTransitionError error =
new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
_stateModel.rollbackOnError(_message, _notificationContext, error);
_statusUpdateUtil.logError(
_message, HelixStateTransitionHandler.class, e, "Error when removing "
+ CurrentState.CurrentStateProperty.REQUESTED_STATE.name() + " from current state.",
_manager);
}
}
void postHandleMessage() {
HelixTaskResult taskResult =
(HelixTaskResult) _notificationContext.get(MapKey.HELIX_TASK_RESULT.toString());
Exception exception = taskResult.getException();
String partitionKey = _message.getPartitionName();
String resource = _message.getResourceName();
String sessionId = _message.getTgtSessionId();
String instanceName = _manager.getInstanceName();
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
int bucketSize = _message.getBucketSize();
ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
// No need to sync on manager, we are cancel executor in expiry session before start executor in
// new session
// sessionId might change when we update the state model state.
// for zk current state it is OK as we have the per-session current state node
if (!_message.getTgtSessionId().equals(_manager.getSessionId())) {
logger.warn("Session id has changed. Skip postExecutionMessage. Old session "
+ _message.getExecutionSessionId() + " , new session : " + _manager.getSessionId());
return;
}
// Set the INFO property and mark the end time, previous state of the state transition
_currentStateDelta.setInfo(partitionKey, taskResult.getInfo());
_currentStateDelta.setEndTime(partitionKey, taskResult.getCompleteTime());
_currentStateDelta.setPreviousState(partitionKey, _message.getFromState());
// add host name this state transition is triggered by.
if (Message.MessageType.RELAYED_MESSAGE.name().equals(_message.getMsgSubType())) {
_currentStateDelta.setTriggerHost(partitionKey, _message.getRelaySrcHost());
} else {
_currentStateDelta.setTriggerHost(partitionKey, _message.getMsgSrc());
}
if (taskResult.isSuccess()) {
// String fromState = message.getFromState();
String toState = _message.getToState();
_currentStateDelta.setState(partitionKey, toState);
if (toState.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
// for "OnOfflineToDROPPED" message, we need to remove the resource key record
// from the current state of the instance because the resource key is dropped.
// In the state model it will be stayed as "OFFLINE", which is OK.
ZNRecord rec = new ZNRecord(_currentStateDelta.getId());
rec.getMapFields().put(partitionKey, null);
ZNRecordDelta delta = new ZNRecordDelta(rec, MergeOperation.SUBTRACT);
List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
deltaList.add(delta);
_currentStateDelta.setDeltaList(deltaList);
_stateModelFactory.removeStateModel(resource, partitionKey);
} else {
// if the partition is not to be dropped, update _stateModel to the TO_STATE
_stateModel.updateState(toState);
}
} else if (taskResult.isCancelled()) {
// Cancelled message does not need current state update
return;
} else {
if (exception instanceof HelixStateMismatchException) {
// if fromState mismatch, set current state on zk to stateModel's current state
logger.warn("Force CurrentState on Zk to be stateModel's CurrentState. partitionKey: "
+ partitionKey + ", currentState: " + _stateModel.getCurrentState() + ", message: "
+ _message);
_currentStateDelta.setState(partitionKey, _stateModel.getCurrentState());
} else {
StateTransitionError error =
new StateTransitionError(ErrorType.INTERNAL, ErrorCode.ERROR, exception);
if (exception instanceof InterruptedException) {
if (_isTimeout) {
error = new StateTransitionError(ErrorType.INTERNAL, ErrorCode.TIMEOUT, exception);
} else {
// State transition interrupted but not caused by timeout. Keep the current
// state in this case
logger.error(
"State transition interrupted but not timeout. Not updating state. Partition : "
+ _message.getPartitionName() + " MsgId : " + _message.getMsgId());
return;
}
}
_stateModel.rollbackOnError(_message, _notificationContext, error);
_currentStateDelta.setState(partitionKey, HelixDefinedState.ERROR.toString());
_stateModel.updateState(HelixDefinedState.ERROR.toString());
}
}
try {
// Update the ZK current state of the node
PropertyKey key = _isTaskMessage && !_isTaskCurrentStatePathDisabled ? accessor.keyBuilder()
.taskCurrentState(instanceName, sessionId, resource,
bucketizer.getBucketName(partitionKey)) : accessor.keyBuilder()
.currentState(instanceName, sessionId, resource, bucketizer.getBucketName(partitionKey));
if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
// normal message
if (!accessor.updateProperty(key, _currentStateDelta)) {
throw new HelixException("Fails to persist current state back to ZK for resource "
+ resource + " partition: " + _message.getPartitionName());
}
} else {
// sub-message of a batch message
ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap =
(ConcurrentHashMap<String, CurrentStateUpdate>) _notificationContext
.get(MapKey.CURRENT_STATE_UPDATE.toString());
csUpdateMap.put(partitionKey, new CurrentStateUpdate(key, _currentStateDelta));
}
} catch (Exception e) {
logger.error("Error when updating current-state ", e);
StateTransitionError error =
new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
_stateModel.rollbackOnError(_message, _notificationContext, error);
_statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, e,
"Error when update current-state ", _manager);
}
}
@Override
public HelixTaskResult handleMessage() {
NotificationContext context = _notificationContext;
Message message = _message;
HelixTaskResult taskResult = new HelixTaskResult();
HelixManager manager = context.getManager();
_statusUpdateUtil
.logInfo(message, HelixStateTransitionHandler.class, "Message handling task begin execute",
manager);
message.setExecuteStartTimeStamp(System.currentTimeMillis());
try {
preHandleMessage();
invoke(manager, context, taskResult, message);
} catch (HelixDuplicatedStateTransitionException e) {
// Duplicated state transition problem is fine
taskResult.setSuccess(true);
taskResult.setMessage(e.toString());
taskResult.setInfo(e.getMessage());
} catch (HelixStateMismatchException e) {
// Simply log error and return from here if State mismatch.
// The current state of the state model is intact.
taskResult.setSuccess(false);
taskResult.setMessage(e.toString());
taskResult.setException(e);
} catch (Exception e) {
String errorMessage =
"Exception while executing a state transition task " + message.getPartitionName();
logger.error(errorMessage, e);
if (e.getCause() != null && e.getCause() instanceof InterruptedException) {
e = (InterruptedException) e.getCause();
}
if (e instanceof HelixRollbackException || (e.getCause() != null && e
.getCause() instanceof HelixRollbackException)) {
// TODO : Support cancel to any state
logger.info(
"Rollback happened of state transition on resource \"" + _message.getResourceName()
+ "\" partition \"" + _message.getPartitionName() + "\" from \"" + _message
.getFromState() + "\" to \"" + _message.getToState() + "\"");
taskResult.setCancelled(true);
} else {
_statusUpdateUtil
.logError(message, HelixStateTransitionHandler.class, e, errorMessage, manager);
taskResult.setSuccess(false);
taskResult.setMessage(e.toString());
taskResult.setException(e);
taskResult.setInterrupted(e instanceof InterruptedException);
}
}
taskResult.setCompleteTime(System.currentTimeMillis());
// add task result to context for postHandling
context.add(MapKey.HELIX_TASK_RESULT.toString(), taskResult);
postHandleMessage();
return taskResult;
}
private void invoke(HelixManager manager, NotificationContext context, HelixTaskResult taskResult,
Message message) throws IllegalAccessException, InvocationTargetException,
InterruptedException, HelixRollbackException {
_statusUpdateUtil.logInfo(message, HelixStateTransitionHandler.class,
"Message handling invoking", manager);
// by default, we invoke state transition function in state model
Method methodToInvoke = null;
String fromState = message.getFromState();
String toState = message.getToState();
methodToInvoke = _transitionMethodFinder.getMethodForTransition(_stateModel.getClass(),
fromState, toState, new Class[] {
Message.class, NotificationContext.class
});
if (methodToInvoke != null) {
logger.info(String.format(
"Instance %s, partition %s received state transition from %s to %s on session %s, message id: %s",
message.getTgtName(), message.getPartitionName(), message.getFromState(),
message.getToState(), message.getTgtSessionId(), message.getMsgId()));
if (_cancelled) {
throw new HelixRollbackException(String.format(
"Instance %s, partition %s state transition from %s to %s on session %s has been cancelled, message id: %s",
message.getTgtName(), message.getPartitionName(), message.getFromState(),
message.getToState(), message.getTgtSessionId(), message.getMsgId()));
}
Object result = methodToInvoke.invoke(_stateModel, new Object[] {
message, context
});
taskResult.setSuccess(true);
String resultStr;
if (result == null || result instanceof Void) {
resultStr = "";
} else {
resultStr = result.toString();
}
taskResult.setInfo(resultStr);
} else {
String errorMessage = "Unable to find method for transition from " + fromState + " to "
+ toState + " in " + _stateModel.getClass();
logger.error(errorMessage);
taskResult.setSuccess(false);
taskResult.setInfo(errorMessage);
_statusUpdateUtil.logError(message, HelixStateTransitionHandler.class, errorMessage, manager);
}
}
@Override
public void onError(Exception e, ErrorCode code, ErrorType type) {
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
String instanceName = _manager.getInstanceName();
String resourceName = _message.getResourceName();
String partition = _message.getPartitionName();
// All internal error has been processed already, so we can skip them
if (type == ErrorType.INTERNAL) {
logger.error("Skip internal error. errCode: " + code + ", errMsg: " + e.getMessage());
return;
}
try {
// set current state to ERROR for the partition
// if the transition is not canceled, it should go into error state
if (code == ErrorCode.ERROR) {
CurrentState currentStateDelta = new CurrentState(resourceName);
currentStateDelta.setState(partition, HelixDefinedState.ERROR.toString());
_stateModel.updateState(HelixDefinedState.ERROR.toString());
PropertyKey currentStateKey =
_isTaskMessage && !_isTaskCurrentStatePathDisabled ? keyBuilder
.taskCurrentState(instanceName, _message.getTgtSessionId(), resourceName)
: keyBuilder.currentState(instanceName, _message.getTgtSessionId(), resourceName);
if (!accessor.updateProperty(currentStateKey, currentStateDelta)) {
logger.error("Fails to persist ERROR current state to ZK for resource " + resourceName
+ " partition: " + partition);
}
}
} finally {
StateTransitionError error = new StateTransitionError(type, code, e);
_stateModel.rollbackOnError(_message, _notificationContext, error);
}
}
// Verify the fromState and current state of the stateModel.
public StaleMessageValidateResult staleMessageValidator() {
String fromState = _message.getFromState();
String toState = _message.getToState();
String partitionName = _message.getPartitionName();
// state in _currentStateDelta uses current state from state model. It has the
// most up-to-date. current state. In case currentState in stateModel is null,
// partition is in initial state and we using it as current state.
// Defined in HelixStateMachineEngine.
String state = _currentStateDelta.getState(partitionName);
Exception err = null;
if (toState.equalsIgnoreCase(state)) {
// To state equals current state, we can just ignore the message
err = new HelixDuplicatedStateTransitionException(String
.format("Partition %s current state is same as toState (%s->%s) from message.",
partitionName, fromState, toState));
} else if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state)) {
// If current state is neither toState nor fromState in message, there is a problem
err = new HelixStateMismatchException(String.format(
"Current state of stateModel does not match the fromState in Message, CurrentState: %s, Message: %s->%s, Partition: %s, from: %s, to: %s",
state, fromState, toState, partitionName, _message.getMsgSrc(), _message.getTgtName()));
}
return new StaleMessageValidateResult(err);
}
@Override
public void onTimeout() {
_isTimeout = true;
}
}