blob: fb55e760daa602a5d27a39fa7b5d340af29b4a96 [file] [log] [blame]
package org.apache.helix.messaging.handling;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.Attributes;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.monitoring.StateTransitionContext;
import org.apache.helix.monitoring.StateTransitionDataPoint;
import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.StatusUpdateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HelixTask implements MessageTask {
private static Logger logger = LoggerFactory.getLogger(HelixTask.class);
private final Message _message;
private final MessageHandler _handler;
private final NotificationContext _notificationContext;
private final HelixManager _manager;
StatusUpdateUtil _statusUpdateUtil;
HelixTaskExecutor _executor;
volatile boolean _isTimeout = false;
volatile boolean _isStarted = false;
volatile boolean _isCancelled = false;
public HelixTask(Message message, NotificationContext notificationContext,
MessageHandler handler, HelixTaskExecutor executor) {
this._notificationContext = notificationContext;
this._message = message;
this._handler = handler;
this._manager = notificationContext.getManager();
_statusUpdateUtil = new StatusUpdateUtil();
_executor = executor;
}
@Override
public HelixTaskResult call() {
HelixTaskResult taskResult = null;
ErrorType type = null;
ErrorCode code = null;
Long handlerStart = null;
Long handlerEnd = null;
long start = System.currentTimeMillis();
logger.info("handling task: " + getTaskId() + " begin, at: " + start);
_statusUpdateUtil.logInfo(_message, HelixTask.class, "Message handling task begin execute",
_manager);
_message.setExecuteStartTimeStamp(new Date().getTime());
// add a concurrent map to hold currentStateUpdates for sub-messages of a batch-message
// partitionName -> csUpdate
if (_message.getBatchMessageMode()) {
_notificationContext.add(MapKey.CURRENT_STATE_UPDATE.toString(),
new ConcurrentHashMap<String, CurrentStateUpdate>());
}
// Handle the message
try {
setStarted();
handlerStart = System.currentTimeMillis();
taskResult = _handler.handleMessage();
handlerEnd = System.currentTimeMillis();
// cancel timeout task
_executor.cancelTimeoutTask(this);
} catch (InterruptedException e) {
taskResult = new HelixTaskResult();
taskResult.setException(e);
taskResult.setInterrupted(true);
_statusUpdateUtil.logError(_message, HelixTask.class, e,
"State transition interrupted, timeout:" + _isTimeout, _manager);
logger.info("Message " + _message.getMsgId() + " is interrupted");
} catch (Exception e) {
taskResult = new HelixTaskResult();
taskResult.setException(e);
taskResult.setMessage(e.getMessage());
String errorMessage =
"Exception while executing a message. " + e + " msgId: " + _message.getMsgId()
+ " type: " + _message.getMsgType();
logger.error(errorMessage, e);
_statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, _manager);
}
Exception exception = null;
try {
if (taskResult.isSuccess()) {
_statusUpdateUtil
.logInfo(_message, _handler.getClass(), "Message handling task completed successfully", _manager);
logger.info("Message " + _message.getMsgId() + " completed.");
_executor.getParticipantMonitor().reportProcessedMessage(_message, ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
} else {
type = ErrorType.INTERNAL;
if (taskResult.isInterrupted()) {
logger.info("Message " + _message.getMsgId() + " is interrupted");
code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
if (_isTimeout) {
int retryCount = _message.getRetryCount();
logger.info("Message timeout, retry count: " + retryCount + " msgId:"
+ _message.getMsgId());
_statusUpdateUtil.logInfo(_message, _handler.getClass(),
"Message handling task timeout, retryCount:" + retryCount, _manager);
// Notify the handler that timeout happens, and the number of retries left
// In case timeout happens (time out and also interrupted)
// we should retry the execution of the message by re-schedule it in
if (retryCount > 0) {
_message.setRetryCount(retryCount - 1);
HelixTask task = new HelixTask(_message, _notificationContext, _handler, _executor);
_executor.scheduleTask(task);
return taskResult;
}
}
_executor.getParticipantMonitor().reportProcessedMessage(
_message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
} else if (taskResult.isCancelled()) {
type = null;
_statusUpdateUtil
.logInfo(_message, _handler.getClass(), "Cancellation completed successfully",
_manager);
_executor.getParticipantMonitor().reportProcessedMessage(
_message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
} else {// logging for errors
code = ErrorCode.ERROR;
String errorMsg =
"Message execution failed. msgId: " + getTaskId() + ", errorMsg: "
+ taskResult.getMessage();
logger.error(errorMsg);
_statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, _manager);
_executor.getParticipantMonitor().reportProcessedMessage(
_message, ParticipantMessageMonitor.ProcessedMessageState.FAILED);
}
}
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
// forward relay messages attached to this message to other participants
if (taskResult.isSuccess()) {
try {
forwardRelayMessages(accessor, _message, taskResult.getCompleteTime());
} catch (Exception e) {
// Fail to send relay message should not result in a task execution failure
// Currently we don't log error to ZK to reduce writes as when accessor throws
// exception, ZK might not be in good condition.
logger.warn("Failed to send relay messages.", e);
}
}
finalCleanup(taskResult);
} catch (Exception e) {
finalCleanup(taskResult);
exception = e;
type = ErrorType.FRAMEWORK;
code = ErrorCode.ERROR;
String errorMessage =
"Exception after executing a message, msgId: " + _message.getMsgId() + e;
logger.error(errorMessage, e);
_statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, _manager);
} finally {
long end = System.currentTimeMillis();
long totalDuration = end - start;
long handlerDuration =
handlerStart != null && handlerEnd != null ? handlerEnd - handlerStart : -1;
logger.info(
"Message: {} (parent: {}) handling task for {}:{} completed at: {}, results: {}. FrameworkTime: {} ms; HandlerTime: {} ms.",
_message.getMsgId(), _message.getAttribute(Attributes.PARENT_MSG_ID), _message.getResourceName(),
_message.getPartitionName(), end, taskResult.isSuccess(), totalDuration - handlerDuration,
handlerDuration);
// Notify the handler about any error happened in the handling procedure, so that
// the handler have chance to finally cleanup
if (type == ErrorType.INTERNAL) {
_handler.onError(taskResult.getException(), code, type);
} else if (type == ErrorType.FRAMEWORK) {
_handler.onError(exception, code, type);
}
}
return taskResult;
}
private void removeMessageFromZk(HelixDataAccessor accessor, Message message) {
if (!HelixUtil.removeMessageFromZK(accessor, message, _manager.getInstanceName())) {
logger.warn("Failed to delete message " + message.getId() + " from zk!");
} else {
logger.info("Delete message " + message.getId() + " from zk!");
}
}
private void forwardRelayMessages(HelixDataAccessor accessor, Message message,
long taskCompletionTime) {
if (message.hasRelayMessages()) {
Map<String, Message> relayMessages = message.getRelayMessages();
Builder keyBuilder = accessor.keyBuilder();
// Ignore all relay messages if participant's session has changed.
if (!_manager.getSessionId().equals(message.getTgtSessionId())) {
logger.info(
"Session id has been changed, ignore all relay messages attached with " + message
.getId());
return;
}
for (String instance : relayMessages.keySet()) {
Message msg = relayMessages.get(instance);
if (msg.getMsgSubType().equals(MessageType.RELAYED_MESSAGE.name())) {
msg.setRelayTime(taskCompletionTime);
if (msg.isExpired()) {
logger.info(
"Relay message expired, ignore " + msg.getId() + " to instance " + instance);
continue;
}
PropertyKey msgKey = keyBuilder.message(instance, msg.getId());
boolean success = accessor.getBaseDataAccessor()
.create(msgKey.getPath(), msg.getRecord(), AccessOption.PERSISTENT);
if (!success) {
logger.warn("Failed to send relay message " + msg.getId() + " to " + instance);
} else {
logger.info("Send relay message " + msg.getId() + " to " + instance);
}
}
}
}
}
private HelixDataAccessor getSrcClusterDataAccessor(final Message message) {
HelixDataAccessor helixDataAccessor = _manager.getHelixDataAccessor();
String clusterName = message.getSrcClusterName();
if (clusterName != null && !clusterName.equals(_manager.getClusterName())) {
// for cross cluster message, create different HelixDataAccessor for replying message.
/*
TODO On frequent cross clsuter messaging request, keeping construct data accessor may cause
performance issue. We should consider adding cache in this class or HelixManager. --JJ
*/
helixDataAccessor = new ZKHelixDataAccessor(clusterName, helixDataAccessor.getBaseDataAccessor());
}
return helixDataAccessor;
}
private void sendReply(HelixDataAccessor replyDataAccessor, Message message,
HelixTaskResult taskResult) {
if (message.getCorrelationId() != null && !message.getMsgType()
.equals(MessageType.TASK_REPLY.name())) {
logger.info("Sending reply for message " + message.getCorrelationId());
_statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", _manager);
taskResult.getTaskResultMap().put("SUCCESS", "" + taskResult.isSuccess());
taskResult.getTaskResultMap().put("INTERRUPTED", "" + taskResult.isInterrupted());
if (!taskResult.isSuccess()) {
taskResult.getTaskResultMap().put("ERRORINFO", taskResult.getMessage());
}
Message replyMessage = Message
.createReplyMessage(message, _manager.getInstanceName(), taskResult.getTaskResultMap());
replyMessage.setSrcInstanceType(_manager.getInstanceType());
Builder keyBuilder = replyDataAccessor.keyBuilder();
if (message.getSrcInstanceType() == InstanceType.PARTICIPANT) {
replyDataAccessor
.setProperty(keyBuilder.message(message.getMsgSrc(), replyMessage.getMsgId()),
replyMessage);
} else if (message.getSrcInstanceType() == InstanceType.CONTROLLER) {
replyDataAccessor
.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()), replyMessage);
}
_statusUpdateUtil.logInfo(message, HelixTask.class, String
.format("1 msg replied to %s in cluster %s.", replyMessage.getTgtName(),
message.getSrcClusterName() == null ?
_manager.getClusterName() :
message.getSrcClusterName()), _manager);
}
}
private void reportMessageStat(HelixManager manager, Message message, HelixTaskResult taskResult) {
// report stat
if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) {
return;
}
long now = new Date().getTime();
long msgReadTime = message.getReadTimeStamp();
long msgExecutionStartTime = message.getExecuteStartTimeStamp();
if (msgReadTime != 0 && msgExecutionStartTime != 0) {
long totalDelay = now - msgReadTime;
long executionDelay = now - msgExecutionStartTime;
if (totalDelay > 0 && executionDelay > 0) {
String fromState = message.getFromState();
String toState = message.getToState();
String transition = fromState + "--" + toState;
StateTransitionContext cxt =
new StateTransitionContext(manager.getClusterName(), manager.getInstanceName(),
message.getResourceName(), transition);
StateTransitionDataPoint data =
new StateTransitionDataPoint(totalDelay, executionDelay, taskResult.isSuccess());
_executor.getParticipantMonitor().reportTransitionStat(cxt, data);
}
} else {
logger.warn("message read time and start execution time not recorded.");
}
}
@Override
public String getTaskId() {
return _message.getId();
}
@Override
public Message getMessage() {
return _message;
}
@Override
public NotificationContext getNotificationContext() {
return _notificationContext;
}
@Override
public void onTimeout() {
_isTimeout = true;
_handler.onTimeout();
}
@Override
public synchronized boolean cancel() {
if (!_isStarted) {
_isCancelled = true;
_handler.cancel();
return true;
}
return false;
}
private synchronized void setStarted() {
if (_isCancelled) {
throw new HelixRollbackException("Task has already been cancelled");
}
_isStarted = true;
}
private void finalCleanup(HelixTaskResult taskResult) {
try {
if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
removeMessageFromZk(_manager.getHelixDataAccessor(), _message);
reportMessageStat(_manager, _message, taskResult);
sendReply(getSrcClusterDataAccessor(_message), _message, taskResult);
_executor.finishTask(this);
}
} catch (Exception e) {
logger.error(String.format("Error to final clean up for message : %s", _message.getId()));
}
}
}