blob: ff970c29e8b632fe6b1ad118d31c6f9f53877b78 [file] [log] [blame]
package org.apache.helix.messaging.handling;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.Criteria;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.api.listeners.MessageListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.manager.zk.ParticipantManager;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.LiveInstance.LiveInstanceStatus;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor.ProcessedMessageState;
import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.StatusUpdateUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HelixTaskExecutor implements MessageListener, TaskExecutor {
/**
* Put together all registration information about a message handler factory
*/
class MsgHandlerFactoryRegistryItem {
private final MessageHandlerFactory _factory;
private final int _threadPoolSize;
public MsgHandlerFactoryRegistryItem(MessageHandlerFactory factory, int threadPoolSize) {
if (factory == null) {
throw new NullPointerException("Message handler factory is null");
}
if (threadPoolSize <= 0) {
throw new IllegalArgumentException("Illegal thread pool size: " + threadPoolSize);
}
_factory = factory;
_threadPoolSize = threadPoolSize;
}
int threadPoolSize() {
return _threadPoolSize;
}
MessageHandlerFactory factory() {
return _factory;
}
}
private static Logger LOG = LoggerFactory.getLogger(HelixTaskExecutor.class);
private static AtomicLong thread_uid = new AtomicLong(0);
// TODO: we need to further design how to throttle this.
// From storage point of view, only bootstrap case is expensive
// and we need to throttle, which is mostly IO / network bounded.
public static final int DEFAULT_PARALLEL_TASKS = 40;
// TODO: create per-task type threadpool with customizable pool size
protected final Map<String, MessageTaskInfo> _taskMap;
private final Object _lock;
private final StatusUpdateUtil _statusUpdateUtil;
private final ParticipantStatusMonitor _monitor;
public static final String MAX_THREADS = "maxThreads";
private MessageQueueMonitor _messageQueueMonitor;
private GenericHelixController _controller;
private Long _lastSessionSyncTime;
private String _freezeSessionId;
private LiveInstanceStatus _liveInstanceStatus;
private static final int SESSION_SYNC_INTERVAL = 2000; // 2 seconds
private static final String SESSION_SYNC = "SESSION-SYNC";
/**
* Map of MsgType->MsgHandlerFactoryRegistryItem
*/
final ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem> _hdlrFtyRegistry;
final ConcurrentHashMap<String, ExecutorService> _executorMap;
final ExecutorService _batchMessageExecutorService;
final ConcurrentHashMap<String, String> _messageTaskMap;
final Set<String> _knownMessageIds;
/* Resources whose configuration for dedicate thread pool has been checked.*/
final Set<String> _resourcesThreadpoolChecked;
final Set<String> _transitionTypeThreadpoolChecked;
// timer for schedule timeout tasks
final Timer _timer;
private boolean _isShuttingDown;
public HelixTaskExecutor() {
this(new ParticipantStatusMonitor(false, null), null);
}
public HelixTaskExecutor(ParticipantStatusMonitor participantStatusMonitor) {
this(participantStatusMonitor, null);
}
public HelixTaskExecutor(ParticipantStatusMonitor participantStatusMonitor,
MessageQueueMonitor messageQueueMonitor) {
_monitor = participantStatusMonitor;
_messageQueueMonitor = messageQueueMonitor;
_taskMap = new ConcurrentHashMap<>();
_hdlrFtyRegistry = new ConcurrentHashMap<>();
_executorMap = new ConcurrentHashMap<>();
_messageTaskMap = new ConcurrentHashMap<>();
_knownMessageIds = Collections.newSetFromMap(new ConcurrentHashMap<>());
_batchMessageExecutorService = Executors.newCachedThreadPool();
_monitor.createExecutorMonitor("BatchMessageExecutor", _batchMessageExecutorService);
_resourcesThreadpoolChecked = Collections.newSetFromMap(new ConcurrentHashMap<>());
_transitionTypeThreadpoolChecked = Collections.newSetFromMap(new ConcurrentHashMap<>());
_lock = new Object();
_statusUpdateUtil = new StatusUpdateUtil();
// created as a daemon timer thread to handle task timeout
_timer = new Timer("HelixTaskExecutor_Timer", true);
_isShuttingDown = false;
startMonitorThread();
}
@Override
public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory) {
registerMessageHandlerFactory(type, factory, DEFAULT_PARALLEL_TASKS);
}
@Override
public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
int threadpoolSize) {
if (factory instanceof MultiTypeMessageHandlerFactory) {
if (!((MultiTypeMessageHandlerFactory) factory).getMessageTypes().contains(type)) {
throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: "
+ ((MultiTypeMessageHandlerFactory) factory).getMessageTypes());
}
} else {
if (!factory.getMessageType().equals(type)) {
throw new HelixException(
"Message factory type mismatch. Type: " + type + ", factory: " + factory
.getMessageType());
}
}
_isShuttingDown = false;
MsgHandlerFactoryRegistryItem newItem =
new MsgHandlerFactoryRegistryItem(factory, threadpoolSize);
MsgHandlerFactoryRegistryItem prevItem = _hdlrFtyRegistry.putIfAbsent(type, newItem);
if (prevItem == null) {
_executorMap.computeIfAbsent(type, msgType -> {
ExecutorService newPool = Executors.newFixedThreadPool(threadpoolSize, r -> new Thread(r,
"HelixTaskExecutor-message_handle_thread_" + thread_uid.getAndIncrement()));
_monitor.createExecutorMonitor(type, newPool);
return newPool;
});
LOG.info(
"Registered message handler factory for type: " + type + ", poolSize: " + threadpoolSize
+ ", factory: " + factory + ", pool: " + _executorMap.get(type));
} else {
LOG.info("Skip register message handler factory for type: " + type + ", poolSize: "
+ threadpoolSize + ", factory: " + factory + ", already existing factory: " + prevItem
.factory());
newItem = null;
}
}
public void setController(GenericHelixController controller) {
_controller = controller;
}
public ParticipantStatusMonitor getParticipantMonitor() {
return _monitor;
}
private void startMonitorThread() {
// start a thread which monitors the completions of task
}
/** Dedicated Thread pool can be provided in configuration or by client.
* This method is to check it and update the thread pool if necessary.
*/
private void updateStateTransitionMessageThreadPool(Message message, HelixManager manager) {
if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) {
return;
}
String resourceName = message.getResourceName();
String factoryName = message.getStateModelFactoryName();
String stateModelName = message.getStateModelDef();
if (factoryName == null) {
factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
}
StateModelFactory<? extends StateModel> stateModelFactory =
manager.getStateMachineEngine().getStateModelFactory(stateModelName, factoryName);
String perStateTransitionTypeKey =
getStateTransitionType(getPerResourceStateTransitionPoolName(resourceName),
message.getFromState(), message.getToState());
if (perStateTransitionTypeKey != null && stateModelFactory != null
&& !_transitionTypeThreadpoolChecked.contains(perStateTransitionTypeKey)) {
ExecutorService perStateTransitionTypeExecutor = stateModelFactory
.getExecutorService(resourceName, message.getFromState(), message.getToState());
_transitionTypeThreadpoolChecked.add(perStateTransitionTypeKey);
if (perStateTransitionTypeExecutor != null) {
_executorMap.put(perStateTransitionTypeKey, perStateTransitionTypeExecutor);
LOG.info(String
.format("Added client specified dedicate threadpool for resource %s from %s to %s",
getPerResourceStateTransitionPoolName(resourceName), message.getFromState(),
message.getToState()));
return;
}
}
if (!_resourcesThreadpoolChecked.contains(resourceName)) {
int threadpoolSize = -1;
ConfigAccessor configAccessor = manager.getConfigAccessor();
// Changes to this configuration on thread pool size will only take effect after the participant get restarted.
if (configAccessor != null) {
HelixConfigScope scope = new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
.forCluster(manager.getClusterName()).forResource(resourceName).build();
String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS);
try {
if (threadpoolSizeStr != null) {
threadpoolSize = Integer.parseInt(threadpoolSizeStr);
}
} catch (Exception e) {
LOG.error(
"Failed to parse ThreadPoolSize from resourceConfig for resource" + resourceName, e);
}
}
final String key = getPerResourceStateTransitionPoolName(resourceName);
if (threadpoolSize > 0) {
_executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize,
r -> new Thread(r, "GerenricHelixController-message_handle_" + key)));
LOG.info("Added dedicate threadpool for resource: " + resourceName + " with size: "
+ threadpoolSize);
} else {
// if threadpool is not configured
// check whether client specifies customized threadpool.
if (stateModelFactory != null) {
ExecutorService executor = stateModelFactory.getExecutorService(resourceName);
if (executor != null) {
_executorMap.put(key, executor);
LOG.info("Added client specified dedicate threadpool for resource: " + key);
}
} else {
LOG.error(String.format(
"Fail to get dedicate threadpool defined in stateModelFactory %s: using factoryName: %s for resource %s. No stateModelFactory was found!",
stateModelName, factoryName, resourceName));
}
}
_resourcesThreadpoolChecked.add(resourceName);
}
}
/**
* Find the executor service for the message. A message can have a per-statemodelfactory
* executor service, or per-message type executor service.
*/
ExecutorService findExecutorServiceForMsg(Message message) {
ExecutorService executorService = _executorMap.get(message.getMsgType());
if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) {
if (message.getBatchMessageMode() == true) {
executorService = _batchMessageExecutorService;
} else {
String resourceName = message.getResourceName();
if (resourceName != null) {
String key = getPerResourceStateTransitionPoolName(resourceName);
String perStateTransitionTypeKey =
getStateTransitionType(key, message.getFromState(), message.getToState());
if (perStateTransitionTypeKey != null && _executorMap
.containsKey(perStateTransitionTypeKey)) {
LOG.info(String
.format("Find per state transition type thread pool for resource %s from %s to %s",
message.getResourceName(), message.getFromState(), message.getToState()));
executorService = _executorMap.get(perStateTransitionTypeKey);
} else if (_executorMap.containsKey(key)) {
LOG.info("Find per-resource thread pool with key: " + key);
executorService = _executorMap.get(key);
}
}
}
}
return executorService;
}
// ExecutorService impl's in JDK are thread-safe
@Override
public List<Future<HelixTaskResult>> invokeAllTasks(List<MessageTask> tasks, long timeout,
TimeUnit unit) throws InterruptedException {
if (tasks == null || tasks.size() == 0) {
return null;
}
// check all tasks use the same executor-service
ExecutorService exeSvc = findExecutorServiceForMsg(tasks.get(0).getMessage());
for (int i = 1; i < tasks.size(); i++) {
MessageTask task = tasks.get(i);
ExecutorService curExeSvc = findExecutorServiceForMsg(task.getMessage());
if (curExeSvc != exeSvc) {
LOG.error("Fail to invoke all tasks because they are not using the same executor-service");
return null;
}
}
// TODO: check if any of the task has already been scheduled
// this is a blocking call
List<Future<HelixTaskResult>> futures = exeSvc.invokeAll(tasks, timeout, unit);
return futures;
}
@Override
public boolean cancelTimeoutTask(MessageTask task) {
synchronized (_lock) {
String taskId = task.getTaskId();
if (_taskMap.containsKey(taskId)) {
MessageTaskInfo info = _taskMap.get(taskId);
removeMessageFromTaskAndFutureMap(task.getMessage());
if (info._timerTask != null) {
info._timerTask.cancel();
}
return true;
}
return false;
}
}
@Override
public boolean scheduleTask(MessageTask task) {
String taskId = task.getTaskId();
Message message = task.getMessage();
NotificationContext notificationContext = task.getNotificationContext();
HelixManager manager = notificationContext.getManager();
try {
// Check to see if dedicate thread pool for handling state transition messages is configured or provided.
updateStateTransitionMessageThreadPool(message, manager);
LOG.info("Scheduling message {}: {}:{}, {}->{}", taskId, message.getResourceName(),
message.getPartitionName(), message.getFromState(), message.getToState());
_statusUpdateUtil
.logInfo(message, HelixTaskExecutor.class, "Message handling task scheduled", manager);
// this sync guarantees that ExecutorService.submit() task and put taskInfo into map are
// sync'ed
synchronized (_lock) {
if (!_taskMap.containsKey(taskId)) {
ExecutorService exeSvc = findExecutorServiceForMsg(message);
if (exeSvc == null) {
LOG.warn(String
.format("Threadpool is null for type %s of message %s", message.getMsgType(),
message.getMsgId()));
return false;
}
LOG.info("Submit task: " + taskId + " to pool: " + exeSvc);
Future<HelixTaskResult> future = exeSvc.submit(task);
_messageTaskMap
.putIfAbsent(getMessageTarget(message.getResourceName(), message.getPartitionName()),
taskId);
TimerTask timerTask = null;
if (message.getExecutionTimeout() > 0) {
timerTask = new MessageTimeoutTask(this, task);
_timer.schedule(timerTask, message.getExecutionTimeout());
LOG.info(
"Message starts with timeout " + message.getExecutionTimeout() + " MsgId: " + task
.getTaskId());
} else {
LOG.debug("Message does not have timeout. MsgId: " + task.getTaskId());
}
_taskMap.put(taskId, new MessageTaskInfo(task, future, timerTask));
LOG.info("Message: " + taskId + " handling task scheduled");
return true;
} else {
_statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
"Message handling task already sheduled for " + taskId, manager);
}
}
} catch (Exception e) {
LOG.error("Error while executing task. " + message, e);
_statusUpdateUtil
.logError(message, HelixTaskExecutor.class, e, "Error while executing task " + e,
manager);
}
return false;
}
@Override
public boolean cancelTask(MessageTask task) {
Message message = task.getMessage();
NotificationContext notificationContext = task.getNotificationContext();
String taskId = task.getTaskId();
synchronized (_lock) {
if (_taskMap.containsKey(taskId)) {
MessageTaskInfo taskInfo = _taskMap.get(taskId);
// cancel timeout task
if (taskInfo._timerTask != null) {
taskInfo._timerTask.cancel();
}
// cancel task
Future<HelixTaskResult> future = taskInfo.getFuture();
removeMessageFromTaskAndFutureMap(message);
_statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceling task: " + taskId,
notificationContext.getManager());
// If the thread is still running it will be interrupted if cancel(true)
// is called. So state transition callbacks should implement logic to
// return if it is interrupted.
if (future.cancel(true)) {
_statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled task: " + taskId,
notificationContext.getManager());
_taskMap.remove(taskId);
return true;
} else {
_statusUpdateUtil
.logInfo(message, HelixTaskExecutor.class, "fail to cancel task: " + taskId,
notificationContext.getManager());
}
} else {
_statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
"fail to cancel task: " + taskId + ", future not found",
notificationContext.getManager());
}
}
return false;
}
@Override
public void finishTask(MessageTask task) {
Message message = task.getMessage();
String taskId = task.getTaskId();
LOG.info("message finished: " + taskId + ", took " + (new Date().getTime() - message
.getExecuteStartTimeStamp()));
synchronized (_lock) {
if (_taskMap.containsKey(taskId)) {
MessageTaskInfo info = _taskMap.remove(taskId);
removeMessageFromTaskAndFutureMap(message);
if (info._timerTask != null) {
// ok to cancel multiple times
info._timerTask.cancel();
}
} else {
LOG.warn("message " + taskId + " not found in task map");
}
}
}
private void updateMessageState(Collection<Message> msgsToBeUpdated, HelixDataAccessor accessor,
String instanceName) {
if (msgsToBeUpdated.isEmpty()) {
return;
}
Builder keyBuilder = accessor.keyBuilder();
List<Message> updateMsgs = new ArrayList<>();
List<String> updateMsgPaths = new ArrayList<>();
List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
for (Message msg : msgsToBeUpdated) {
updateMsgs.add(msg);
updateMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
/**
* We use the updater to avoid race condition between writing message to zk as READ state and removing message after ST is done
* If there is no message at this path, meaning the message is removed so we do not write the message
*/
updaters.add(currentData -> {
if (currentData == null) {
LOG.warn(
"Message {} targets at {} has already been removed before it is set as READ on instance {}",
msg.getId(), msg.getTgtName(), instanceName);
return null;
}
return msg.getRecord();
});
}
boolean[] updateResults =
accessor.updateChildren(updateMsgPaths, updaters, AccessOption.PERSISTENT);
boolean isMessageUpdatedAsNew = false;
// Note that only cache the known message Ids after the update to ZK is successfully done.
// This is to avoid inconsistent cache.
for (int i = 0; i < updateMsgs.size(); i++) {
Message msg = updateMsgs.get(i);
if (msg.getMsgState().equals(MessageState.NEW)) {
// If a message is updated as NEW state, then we might need to process it again soon.
isMessageUpdatedAsNew = true;
// And it shall not be treated as a known messages.
} else {
_knownMessageIds.add(msg.getId());
if (!updateResults[i]) {
// TODO: If the message update fails, maybe we shall not treat the message as a known
// TODO: message. We shall apply more strict check and retry the update.
LOG.error("Failed to update the message {}.", msg.getMsgId());
}
}
}
if (isMessageUpdatedAsNew) {
// Sending a NO-OP message to trigger another message callback to re-process the messages
// that are updated as NEW state.
sendNopMessage(accessor, instanceName);
}
}
private void shutdownAndAwaitTermination(ExecutorService pool) {
LOG.info("Shutting down pool: " + pool);
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(200, TimeUnit.MILLISECONDS)) {
List<Runnable> waitingTasks = pool.shutdownNow(); // Cancel currently executing tasks
LOG.info("Tasks that never commenced execution: " + waitingTasks);
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(200, TimeUnit.MILLISECONDS)) {
LOG.error("Pool did not fully terminate in 200ms. pool: " + pool);
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
LOG.error("Interruped when waiting for shutdown pool: " + pool, ie);
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
/**
* remove message-handler factory from map, shutdown the associated executor
* @param type
*/
void unregisterMessageHandlerFactory(String type) {
MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.remove(type);
ExecutorService pool = _executorMap.remove(type);
_monitor.removeExecutorMonitor(type);
LOG.info(
"Unregistering message handler factory for type: " + type + ", factory: " + item.factory()
+ ", pool: " + pool);
if (pool != null) {
shutdownAndAwaitTermination(pool);
}
// reset state-model
if (item != null) {
item.factory().reset();
}
LOG.info(
"Unregistered message handler factory for type: " + type + ", factory: " + item.factory()
+ ", pool: " + pool);
}
private void syncFactoryState() {
LOG.info("Start to sync factory state");
// Lock on the registry to avoid race condition when concurrently calling sync() and reset()
synchronized (_hdlrFtyRegistry) {
for (Map.Entry<String, MsgHandlerFactoryRegistryItem> entry : _hdlrFtyRegistry.entrySet()) {
MsgHandlerFactoryRegistryItem item = entry.getValue();
if (item.factory() != null) {
try {
item.factory().sync();
} catch (Exception ex) {
LOG.error("Failed to syncState the factory {} of message type {}.", item.factory(),
entry.getKey(), ex);
}
}
}
}
}
void reset() {
LOG.info("Reset HelixTaskExecutor");
if (_messageQueueMonitor != null) {
_messageQueueMonitor.reset();
}
synchronized (_hdlrFtyRegistry) {
for (String msgType : _hdlrFtyRegistry.keySet()) {
// don't un-register factories, just shutdown all executors
ExecutorService pool = _executorMap.remove(msgType);
_monitor.removeExecutorMonitor(msgType);
if (pool != null) {
LOG.info("Reset exectuor for msgType: " + msgType + ", pool: " + pool);
shutdownAndAwaitTermination(pool);
}
MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
if (item.factory() != null) {
try {
item.factory().reset();
} catch (Exception ex) {
LOG.error("Failed to reset the factory {} of message type {}.", item.factory().toString(),
msgType, ex);
}
}
}
}
// threads pool specific to STATE_TRANSITION.Key specific pool are not shut down.
// this is a potential area to improve. https://github.com/apache/helix/issues/1245
StringBuilder sb = new StringBuilder();
// Log all tasks that fail to terminate
for (String taskId : _taskMap.keySet()) {
MessageTaskInfo info = _taskMap.get(taskId);
sb.append(
"Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage() + "\n");
}
LOG.info(sb.toString());
_taskMap.clear();
_messageTaskMap.clear();
_knownMessageIds.clear();
_lastSessionSyncTime = null;
}
void init() {
LOG.info("Init HelixTaskExecutor");
if (_messageQueueMonitor != null) {
_messageQueueMonitor.init();
}
_isShuttingDown = false;
// Re-init all existing factories
for (final String msgType : _hdlrFtyRegistry.keySet()) {
MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
ExecutorService pool = _executorMap.computeIfAbsent(msgType, type -> {
ExecutorService newPool = Executors.newFixedThreadPool(item.threadPoolSize(),
r -> new Thread(r, "HelixTaskExecutor-message_handle_" + type));
_monitor.createExecutorMonitor(type, newPool);
return newPool;
});
LOG.info("Setup the thread pool for type: %s, isShutdown: %s", msgType, pool.isShutdown());
}
}
private void syncSessionToController(HelixManager manager) {
if (_lastSessionSyncTime == null || System.currentTimeMillis() - _lastSessionSyncTime
> SESSION_SYNC_INTERVAL) { // > delay since last sync
HelixDataAccessor accessor = manager.getHelixDataAccessor();
PropertyKey key = new Builder(manager.getClusterName()).controllerMessage(SESSION_SYNC);
if (accessor.getProperty(key) == null) {
LOG.info(String
.format("Participant %s syncs session with controller", manager.getInstanceName()));
Message msg = new Message(MessageType.PARTICIPANT_SESSION_CHANGE, SESSION_SYNC);
msg.setSrcName(manager.getInstanceName());
msg.setTgtSessionId("*");
msg.setMsgState(MessageState.NEW);
msg.setMsgId(SESSION_SYNC);
Criteria cr = new Criteria();
cr.setRecipientInstanceType(InstanceType.CONTROLLER);
cr.setSessionSpecific(false);
manager.getMessagingService().send(cr, msg);
_lastSessionSyncTime = System.currentTimeMillis();
}
}
}
private List<Message> readNewMessagesFromZK(HelixManager manager, String instanceName,
HelixConstants.ChangeType changeType) {
HelixDataAccessor accessor = manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
Set<String> messageIds = new HashSet<>();
if (changeType.equals(HelixConstants.ChangeType.MESSAGE)) {
messageIds.addAll(accessor.getChildNames(keyBuilder.messages(instanceName)));
} else if (changeType.equals(HelixConstants.ChangeType.MESSAGES_CONTROLLER)) {
messageIds.addAll(accessor.getChildNames(keyBuilder.controllerMessages()));
} else {
LOG.warn("Unexpected ChangeType for Message Change CallbackHandler: " + changeType);
return Collections.emptyList();
}
// Avoid reading the already known messages.
messageIds.removeAll(_knownMessageIds);
List<PropertyKey> keys = new ArrayList<>();
for (String messageId : messageIds) {
if (changeType.equals(HelixConstants.ChangeType.MESSAGE)) {
keys.add(keyBuilder.message(instanceName, messageId));
} else if (changeType.equals(HelixConstants.ChangeType.MESSAGES_CONTROLLER)) {
keys.add(keyBuilder.controllerMessage(messageId));
}
}
/**
* Do not throw exception on partial message read.
* 1. There is no way to resolve the error on the participant side. And once it fails here, we
* are running the risk of ignoring the message change event. And the participant might be stuck.
* 2. Even this is a partial read, we have another chance to retry in the business logic since
* as long as the participant processes messages, it will touch the message folder and triggers
* another message event.
*/
List<Message> newMessages = accessor.getProperty(keys, false);
// Message may be removed before get read, clean up null messages.
Iterator<Message> messageIterator = newMessages.iterator();
while (messageIterator.hasNext()) {
if (messageIterator.next() == null) {
messageIterator.remove();
}
}
return newMessages;
}
@Override
@PreFetch(enabled = false)
public void onMessage(String instanceName, List<Message> messages,
NotificationContext changeContext) {
HelixManager manager = changeContext.getManager();
// If FINALIZE notification comes, reset all handler factories
// and terminate all the thread pools
// TODO: see if we should have a separate notification call for resetting
if (changeContext.getType() == Type.FINALIZE) {
reset();
return;
}
if (changeContext.getType() == Type.INIT) {
init();
// continue to process messages
}
// if prefetch is disabled in MessageListenerCallback, we need to read all new messages from zk.
if (messages == null || messages.isEmpty()) {
// If no messages are given, check and read all new messages.
messages = readNewMessagesFromZK(manager, instanceName, changeContext.getChangeType());
}
if (_isShuttingDown) {
StringBuilder sb = new StringBuilder();
for (Message message : messages) {
sb.append(message.getMsgId() + ",");
}
LOG.info(
"Helix task executor is shutting down, ignore unprocessed messages : " + sb.toString());
return;
}
// Update message count
if (_messageQueueMonitor != null) {
_messageQueueMonitor.setMessageQueueBacklog(messages.size());
}
if (messages.isEmpty()) {
LOG.info("No Messages to process");
return;
}
// sort message by creation timestamp, so message created earlier is processed first
Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);
HelixDataAccessor accessor = manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
// message handlers and corresponding contexts created
Map<String, MessageHandler> stateTransitionHandlers = new HashMap<>();
Map<String, NotificationContext> stateTransitionContexts = new HashMap<>();
List<MessageHandler> nonStateTransitionHandlers = new ArrayList<>();
List<NotificationContext> nonStateTransitionContexts = new ArrayList<>();
// message to be updated in ZK
Map<String, Message> msgsToBeUpdated = new HashMap<>();
String sessionId = manager.getSessionId();
List<String> curResourceNames =
accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
List<String> taskCurResourceNames =
accessor.getChildNames(keyBuilder.taskCurrentStates(instanceName, sessionId));
List<PropertyKey> createCurStateKeys = new ArrayList<>();
List<CurrentState> metaCurStates = new ArrayList<>();
Set<String> createCurStateNames = new HashSet<>();
for (Message message : messages) {
if (checkAndProcessNoOpMessage(message, instanceName, changeContext, manager, sessionId,
stateTransitionHandlers)) {
// skip the following operations for the no-op messages.
continue;
}
NotificationContext msgWorkingContext = changeContext.clone();
MessageHandler msgHandler = null;
try {
// create message handlers, if handlers not found but no exception, leave its state as NEW
msgHandler = createMessageHandler(message, msgWorkingContext);
} catch (Exception ex) {
// Failed to create message handler and there is an Exception.
int remainingRetryCount = message.getRetryCount();
LOG.error(
"Exception happens when creating Message Handler for message {}. Current remaining retry count is {}.",
message.getMsgId(), remainingRetryCount);
// Reduce the message retry count to avoid infinite retrying.
message.setRetryCount(remainingRetryCount - 1);
message.setExecuteSessionId(sessionId);
// Note that we are re-using the retry count of Message that was original designed to control
// timeout retries. So it is not checked before the first try in order to ensure consistent
// behavior. It is possible that we introduce a new behavior for this method. But it requires
// us to split the configuration item so as to avoid confusion.
if (message.getRetryCount() <= 0) {
// If no more retry count remains, then mark the message to be UNPROCESSABLE.
String errorMsg = String.format("No available message Handler found!"
+ " Stop processing message %s since it has zero or negative remaining retry count %d!",
message.getMsgId(), message.getRetryCount());
updateUnprocessableMessage(message, null, errorMsg, manager);
}
msgsToBeUpdated.put(message.getId(), message);
// continue processing in the next section where handler object is double-checked.
}
if (msgHandler == null) {
// Skip processing this message in this callback. The same message process will be retried
// in the next round if retry count > 0.
LOG.warn("There is no existing handler for message {}."
+ " Skip processing it for now. Will retry on the next callback.", message.getMsgId());
continue;
}
if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) || message.getMsgType()
.equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
if (validateAndProcessStateTransitionMessage(message, manager, stateTransitionHandlers,
msgHandler)) {
// Need future process by triggering state transition
String msgTarget =
getMessageTarget(message.getResourceName(), message.getPartitionName());
stateTransitionHandlers.put(msgTarget, msgHandler);
stateTransitionContexts.put(msgTarget, msgWorkingContext);
} else {
// Skip the following operations for the invalid/expired state transition messages.
// Also remove the message since it might block the other state transition messages.
removeMessageFromZK(accessor, message, instanceName);
continue;
}
} else {
// Need future process non state transition messages by triggering the handler
nonStateTransitionHandlers.add(msgHandler);
nonStateTransitionContexts.add(msgWorkingContext);
}
// Update the normally processed messages
Message markedMsg = markReadMessage(message, msgWorkingContext, manager);
msgsToBeUpdated.put(markedMsg.getId(), markedMsg);
// batch creation of all current state meta data
// do it for non-controller and state transition messages only
if (!message.isControlerMsg() && message.getMsgType()
.equals(Message.MessageType.STATE_TRANSITION.name())) {
String resourceName = message.getResourceName();
if (!curResourceNames.contains(resourceName) && !taskCurResourceNames.contains(resourceName)
&& !createCurStateNames.contains(resourceName)) {
createCurStateNames.add(resourceName);
PropertyKey curStateKey = keyBuilder.currentState(instanceName, sessionId, resourceName);
if (TaskConstants.STATE_MODEL_NAME.equals(message.getStateModelDef()) && !Boolean
.getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED)) {
curStateKey = keyBuilder.taskCurrentState(instanceName, sessionId, resourceName);
}
createCurStateKeys.add(curStateKey);
CurrentState metaCurState = new CurrentState(resourceName);
metaCurState.setBucketSize(message.getBucketSize());
metaCurState.setStateModelDefRef(message.getStateModelDef());
metaCurState.setSessionId(sessionId);
metaCurState.setBatchMessageMode(message.getBatchMessageMode());
String ftyName = message.getStateModelFactoryName();
if (ftyName != null) {
metaCurState.setStateModelFactoryName(ftyName);
} else {
metaCurState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
}
metaCurStates.add(metaCurState);
}
}
}
// batch create curState meta
if (createCurStateKeys.size() > 0) {
try {
accessor.createChildren(createCurStateKeys, metaCurStates);
} catch (Exception e) {
LOG.error("fail to create cur-state znodes for messages: " + msgsToBeUpdated, e);
}
}
// update message state in batch and schedule tasks for all read messages
updateMessageState(msgsToBeUpdated.values(), accessor, instanceName);
for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
MessageHandler handler = handlerEntry.getValue();
NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
if (!scheduleTaskForMessage(instanceName, accessor, handler, context) && !_isShuttingDown) {
/**
* TODO: Checking _isShuttingDown is a workaround to avoid unnecessary ERROR partition.
* TODO: We shall improve the shutdown process of the participant to clean up the workflow
* TODO: completely. In detail, there isa race condition between TaskExecutor thread
* TODO: pool shutting down and Message handler stops listening. In this gap, the message
* TODO: will still be processed but schedule will fail. If we mark partition into ERROR
* TODO: state, then the controller side logic might be confused.
*/
try {
// Record error state to the message handler.
handler.onError(new HelixException(String
.format("Failed to schedule the task for executing message handler for %s.",
handler._message.getMsgId())), MessageHandler.ErrorCode.ERROR,
MessageHandler.ErrorType.FRAMEWORK);
} catch (Exception ex) {
LOG.error("Failed to trigger onError method of the message handler for {}",
handler._message.getMsgId(), ex);
}
}
}
for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
MessageHandler handler = nonStateTransitionHandlers.get(i);
NotificationContext context = nonStateTransitionContexts.get(i);
scheduleTaskForMessage(instanceName, accessor, handler, context);
}
}
/**
* Inspect the message. Report and remove it if no operation needs to be done.
* @param message
* @param instanceName
* @param changeContext
* @param manager
* @param sessionId
* @param stateTransitionHandlers
* @return True if the message is no-op message and no other process step is required.
*/
private boolean checkAndProcessNoOpMessage(Message message, String instanceName,
NotificationContext changeContext, HelixManager manager, String sessionId,
Map<String, MessageHandler> stateTransitionHandlers) {
HelixDataAccessor accessor = manager.getHelixDataAccessor();
try {
// nop messages are simply removed. It is used to trigger onMessage() in
// situations such as register a new message handler factory
if (message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString())) {
LOG.info(
"Dropping NO-OP message. mid: " + message.getId() + ", from: " + message.getMsgSrc());
reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
return true;
}
String tgtSessionId = message.getTgtSessionId();
// sessionId mismatch normally means message comes from expired session, just remove it
if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
String warningMessage = "SessionId does NOT match. expected sessionId: " + sessionId
+ ", tgtSessionId in message: " + tgtSessionId + ", messageId: " + message.getMsgId();
LOG.warn(warningMessage);
reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
_statusUpdateUtil
.logWarning(message, HelixStateMachineEngine.class, warningMessage, manager);
// Proactively send a session sync message from participant to controller
// upon session mismatch after a new session is established
if (manager.getInstanceType() == InstanceType.PARTICIPANT
|| manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) {
if (message.getCreateTimeStamp() > manager.getSessionStartTime()) {
syncSessionToController(manager);
}
}
return true;
}
if ((manager.getInstanceType() == InstanceType.CONTROLLER
|| manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
&& MessageType.PARTICIPANT_SESSION_CHANGE.name().equals(message.getMsgType())) {
LOG.info(String.format("Controller received PARTICIPANT_SESSION_CHANGE msg from src: %s",
message.getMsgSrc()));
PropertyKey key = new Builder(manager.getClusterName()).liveInstances();
List<LiveInstance> liveInstances = manager.getHelixDataAccessor().getChildValues(key, true);
_controller.onLiveInstanceChange(liveInstances, changeContext);
reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.COMPLETED);
return true;
}
// don't process message that is of READ or UNPROCESSABLE state
if (MessageState.NEW != message.getMsgState()) {
// It happens because we don't delete message right after
// read. Instead we keep it until the current state is updated.
// We will read the message again if there is a new message but we
// check for the status and ignore if its already read
if (LOG.isTraceEnabled()) {
LOG.trace("Message already read. msgId: " + message.getMsgId());
}
return true;
}
if (message.isExpired()) {
LOG.info(
"Dropping expired message. mid: " + message.getId() + ", from: " + message.getMsgSrc()
+ " relayed from: " + message.getRelaySrcHost());
reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
return true;
}
// State Transition Cancellation
if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
boolean success =
cancelNotStartedStateTransition(message, stateTransitionHandlers, accessor,
instanceName);
if (success) {
return true;
}
}
if (MessageType.PARTICIPANT_STATUS_CHANGE.name().equals(message.getMsgType())) {
LiveInstanceStatus toStatus = LiveInstanceStatus.valueOf(message.getToState());
changeParticipantStatus(instanceName, toStatus, manager);
reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.COMPLETED);
return true;
}
_monitor.reportReceivedMessage(message);
} catch (Exception e) {
LOG.error("Failed to process the message {}. Deleting the message from ZK. Exception: {}",
message, e);
removeMessageFromTaskAndFutureMap(message);
removeMessageFromZK(accessor, message, instanceName);
return true;
}
return false;
}
/**
* Preprocess the state transition message to validate if the request is valid.
* If no operation needs to be triggered, discard the the message.
* @param message
* @param manager
* @param stateTransitionHandlers
* @param createHandler
* @return True if the requested state transition is valid, and need to schedule the transition.
* False if no more operation is required.
*/
private boolean validateAndProcessStateTransitionMessage(Message message, HelixManager manager,
Map<String, MessageHandler> stateTransitionHandlers, MessageHandler createHandler) {
String messageTarget = getMessageTarget(message.getResourceName(), message.getPartitionName());
try {
if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
&& isStateTransitionInProgress(messageTarget)) {
String taskId = _messageTaskMap.get(messageTarget);
Message msg = _taskMap.get(taskId).getTask().getMessage();
// If there is another state transition for same partition is going on,
// discard the message. Controller will resend if this is a valid message
String errMsg = String.format(
"Another state transition for %s:%s is in progress with msg: %s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
message.getResourceName(), message.getPartitionName(), msg.getMsgId(),
msg.isRelayMessage(), msg.getReadTimeStamp(), System.currentTimeMillis(),
message.getFromState(), message.getToState());
updateUnprocessableMessage(message, null /* exception */, errMsg, manager);
return false;
}
if (createHandler instanceof HelixStateTransitionHandler) {
// We only check to state if there is no ST task scheduled/executing.
HelixStateTransitionHandler.StaleMessageValidateResult result =
((HelixStateTransitionHandler) createHandler).staleMessageValidator();
if (!result.isValid) {
updateUnprocessableMessage(message, null /* exception */, result.exception.getMessage(),
manager);
return false;
}
}
if (stateTransitionHandlers.containsKey(messageTarget)) {
// If there are 2 messages in same batch about same partition's state transition,
// the later one is discarded
Message duplicatedMessage = stateTransitionHandlers.get(messageTarget)._message;
String errMsg = String.format(
"Duplicated state transition message: %s. Existing: %s->%s; New (Discarded): %s->%s",
message.getMsgId(), duplicatedMessage.getFromState(), duplicatedMessage.getToState(),
message.getFromState(), message.getToState());
updateUnprocessableMessage(message, null /* exception */, errMsg, manager);
return false;
}
return true;
} catch (Exception ex) {
updateUnprocessableMessage(message, ex, "State transition validation failed with Exception.",
manager);
return false;
}
}
/**
* Schedule task to execute the message handler.
* @param instanceName
* @param accessor
* @param handler
* @param context
* @return True if schedule the task successfully. False otherwise.
*/
private boolean scheduleTaskForMessage(String instanceName, HelixDataAccessor accessor,
MessageHandler handler, NotificationContext context) {
Message msg = handler._message;
if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
// Remove message if schedule tasks are failed.
removeMessageFromTaskAndFutureMap(msg);
removeMessageFromZK(accessor, msg, instanceName);
return false;
}
return true;
}
/**
* Check if a state transition of the given message target is in progress. This function
* assumes the given message target corresponds to a state transition task
*
* @param messageTarget message target generated by getMessageTarget()
* @return true if there is a task going on with same message target else false
*/
private boolean isStateTransitionInProgress(String messageTarget) {
synchronized (_lock) {
if (_messageTaskMap.containsKey(messageTarget)) {
String taskId = _messageTaskMap.get(messageTarget);
return !_taskMap.get(taskId).getFuture().isDone();
}
return false;
}
}
// Try to cancel this state transition that has not been started yet.
// Three Types of Cancellation: 1. Message arrived with previous state transition
// 2. Message handled but task not started
// 3. Message handled and task already started
// This method tries to handle the first two cases, it returns true if no further cancellation is needed,
// false if not been able to cancel the state transition (i.e, further cancellation is needed).
private boolean cancelNotStartedStateTransition(Message message,
Map<String, MessageHandler> stateTransitionHandlers, HelixDataAccessor accessor,
String instanceName) {
String targetMessageName =
getMessageTarget(message.getResourceName(), message.getPartitionName());
ProcessedMessageState messageState;
Message targetStateTransitionMessage;
// State transition message and cancel message are in same batch
if (stateTransitionHandlers.containsKey(targetMessageName)) {
targetStateTransitionMessage = stateTransitionHandlers.get(targetMessageName).getMessage();
if (isCancelingSameStateTransition(targetStateTransitionMessage, message)) {
stateTransitionHandlers.remove(targetMessageName);
messageState = ProcessedMessageState.COMPLETED;
} else {
messageState = ProcessedMessageState.DISCARDED;
}
} else if (_messageTaskMap.containsKey(targetMessageName)) {
// Cancel the from future without interrupt -> Cancel the task future without
// interruptting the state transition that is already started. If the state transition
// is already started, we should call cancel in the state model.
String taskId = _messageTaskMap.get(targetMessageName);
HelixTask task = (HelixTask) _taskMap.get(taskId).getTask();
Future<HelixTaskResult> future = _taskMap.get(taskId).getFuture();
targetStateTransitionMessage = task.getMessage();
if (isCancelingSameStateTransition(task.getMessage(), message)) {
boolean success = task.cancel();
if (!success) {
// the state transition is already started, need further cancellation.
return false;
}
future.cancel(false);
_messageTaskMap.remove(targetMessageName);
_taskMap.remove(taskId);
messageState = ProcessedMessageState.COMPLETED;
} else {
messageState = ProcessedMessageState.DISCARDED;
}
} else {
return false;
}
// remove the original state-transition message been cancelled.
removeMessageFromZK(accessor, targetStateTransitionMessage, instanceName);
_monitor.reportProcessedMessage(targetStateTransitionMessage,
ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
// remove the state transition cancellation message
reportAndRemoveMessage(message, accessor, instanceName, messageState);
return true;
}
private void reportAndRemoveMessage(Message message, HelixDataAccessor accessor,
String instanceName, ProcessedMessageState messageProcessState) {
_monitor.reportReceivedMessage(message);
_monitor.reportProcessedMessage(message, messageProcessState);
removeMessageFromZK(accessor, message, instanceName);
}
private Message markReadMessage(Message message, NotificationContext context,
HelixManager manager) {
message.setMsgState(MessageState.READ);
message.setReadTimeStamp(new Date().getTime());
message.setExecuteSessionId(context.getManager().getSessionId());
_statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New Message", manager);
return message;
}
private void updateUnprocessableMessage(Message message, Exception exception, String errorMsg,
HelixManager manager) {
String error = "Message " + message.getMsgId() + " cannot be processed: " + message.getRecord();
if (exception != null) {
LOG.error(error, exception);
_statusUpdateUtil.logError(message, HelixStateMachineEngine.class, exception, error, manager);
} else {
LOG.error(error + errorMsg);
_statusUpdateUtil.logError(message, HelixStateMachineEngine.class, errorMsg, manager);
}
message.setMsgState(MessageState.UNPROCESSABLE);
_monitor.reportProcessedMessage(message, ProcessedMessageState.FAILED);
}
public MessageHandler createMessageHandler(Message message, NotificationContext changeContext) {
String msgType = message.getMsgType();
MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
// Fail to find a MessageHandlerFactory for the message
// we will keep the message and the message will be handled when
// the corresponding MessageHandlerFactory is registered
if (item == null) {
LOG.warn("Fail to find message handler factory for type: " + msgType + " msgId: " + message
.getMsgId());
return null;
}
MessageHandlerFactory handlerFactory = item.factory();
// pass the executor to msg-handler since batch-msg-handler needs task-executor to schedule
// sub-msgs
changeContext.add(MapKey.TASK_EXECUTOR.toString(), this);
return handlerFactory.createHandler(message, changeContext);
}
private void removeMessageFromTaskAndFutureMap(Message message) {
_knownMessageIds.remove(message.getId());
String messageTarget = getMessageTarget(message.getResourceName(), message.getPartitionName());
if (_messageTaskMap.containsKey(messageTarget)) {
_messageTaskMap.remove(messageTarget);
}
}
private boolean isCancelingSameStateTransition(Message stateTranstionMessage,
Message cancellationMessage) {
return stateTranstionMessage.getFromState().equalsIgnoreCase(cancellationMessage.getFromState())
&& stateTranstionMessage.getToState().equalsIgnoreCase(cancellationMessage.getToState());
}
String getMessageTarget(String resourceName, String partitionName) {
return String.format("%s_%s", resourceName, partitionName);
}
private void changeParticipantStatus(String instanceName,
LiveInstance.LiveInstanceStatus toStatus, HelixManager manager) {
if (toStatus == null) {
LOG.warn("To status is null! Skip participant status change.");
return;
}
LOG.info("Changing participant {} status to {} from {}", instanceName, toStatus,
_liveInstanceStatus);
HelixDataAccessor accessor = manager.getHelixDataAccessor();
String sessionId = manager.getSessionId();
String path = accessor.keyBuilder().liveInstance(instanceName).getPath();
boolean success = false;
switch (toStatus) {
case PAUSED:
_freezeSessionId = sessionId;
_liveInstanceStatus = toStatus;
// Entering freeze mode, update live instance status.
// If the update fails, another new freeze message will be sent by controller.
success = accessor.getBaseDataAccessor().update(path, record -> {
record.setEnumField(LiveInstance.LiveInstanceProperty.STATUS.name(), toStatus);
return record;
}, AccessOption.EPHEMERAL);
break;
case NORMAL:
// Exiting freeze mode
// session changed, should call state model sync
if (_freezeSessionId != null && !_freezeSessionId.equals(sessionId)) {
syncFactoryState();
ParticipantManager.carryOverPreviousCurrentState(accessor, instanceName, sessionId,
manager.getStateMachineEngine(), false);
}
_freezeSessionId = null;
_liveInstanceStatus = null;
success = accessor.getBaseDataAccessor().update(path, record -> {
// Remove the status field for backwards compatibility
record.getSimpleFields().remove(LiveInstance.LiveInstanceProperty.STATUS.name());
return record;
}, AccessOption.EPHEMERAL);
break;
default:
LOG.warn("To status {} is not supported", toStatus);
break;
}
LOG.info("Changed participant {} status to {}. FreezeSessionId={}, update success={}",
instanceName, _liveInstanceStatus, _freezeSessionId, success);
}
private String getStateTransitionType(String prefix, String fromState, String toState) {
if (prefix == null || fromState == null || toState == null) {
return null;
}
return String.format("%s.%s.%s", prefix, fromState, toState);
}
private String getPerResourceStateTransitionPoolName(String resourceName) {
return MessageType.STATE_TRANSITION.name() + "." + resourceName;
}
public LiveInstanceStatus getLiveInstanceStatus() {
return _liveInstanceStatus;
}
private void removeMessageFromZK(HelixDataAccessor accessor, Message message,
String instanceName) {
if (HelixUtil.removeMessageFromZK(accessor, message, instanceName)) {
LOG.info("Successfully removed message {} from ZK.", message.getMsgId());
} else {
LOG.warn("Failed to remove message {} from ZK.", message.getMsgId());
}
}
private void sendNopMessage(HelixDataAccessor accessor, String instanceName) {
try {
Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString());
nopMsg.setSrcName(instanceName);
nopMsg.setTgtName(instanceName);
accessor
.setProperty(accessor.keyBuilder().message(nopMsg.getTgtName(), nopMsg.getId()), nopMsg);
LOG.info("Send NO_OP message to {}}, msgId: {}.", nopMsg.getTgtName(), nopMsg.getId());
} catch (Exception e) {
LOG.error("Failed to send NO_OP message to {}.", instanceName, e);
}
}
@Override
public void shutdown() {
LOG.info("Shutting down HelixTaskExecutor");
_isShuttingDown = true;
_timer.cancel();
reset();
_monitor.shutDown();
LOG.info("Shutdown HelixTaskExecutor finished");
}
}