package org.apache.helix.messaging.handling;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.Criteria;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.MapKey;
import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.api.listeners.MessageListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.manager.zk.ParticipantManager;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.LiveInstance.LiveInstanceStatus;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor.ProcessedMessageState;
import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.StatusUpdateUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class HelixTaskExecutor implements MessageListener, TaskExecutor {
  /**
   * Put together all registration information about a message handler factory
   */
  class MsgHandlerFactoryRegistryItem {
    private final MessageHandlerFactory _factory;
    private final int _threadPoolSize;

    public MsgHandlerFactoryRegistryItem(MessageHandlerFactory factory, int threadPoolSize) {
      if (factory == null) {
        throw new NullPointerException("Message handler factory is null");
      }

      if (threadPoolSize <= 0) {
        throw new IllegalArgumentException("Illegal thread pool size: " + threadPoolSize);
      }

      _factory = factory;
      _threadPoolSize = threadPoolSize;
    }

    int threadPoolSize() {
      return _threadPoolSize;
    }

    MessageHandlerFactory factory() {
      return _factory;
    }
  }

  private static Logger LOG = LoggerFactory.getLogger(HelixTaskExecutor.class);

  private static AtomicLong thread_uid = new AtomicLong(0);
  // TODO: we need to further design how to throttle this.
  // From storage point of view, only bootstrap case is expensive
  // and we need to throttle, which is mostly IO / network bounded.
  public static final int DEFAULT_PARALLEL_TASKS = 40;
  // TODO: create per-task type threadpool with customizable pool size
  protected final Map<String, MessageTaskInfo> _taskMap;
  private final Object _lock;
  private final StatusUpdateUtil _statusUpdateUtil;
  private final ParticipantStatusMonitor _monitor;
  public static final String MAX_THREADS = "maxThreads";

  private MessageQueueMonitor _messageQueueMonitor;
  private GenericHelixController _controller;
  private Long _lastSessionSyncTime;
  private String _freezeSessionId;
  private LiveInstanceStatus _liveInstanceStatus;
  private static final int SESSION_SYNC_INTERVAL = 2000; // 2 seconds
  private static final String SESSION_SYNC = "SESSION-SYNC";
  /**
   * Map of MsgType->MsgHandlerFactoryRegistryItem
   */
  final ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem> _hdlrFtyRegistry;

  final ConcurrentHashMap<String, ExecutorService> _executorMap;

  final ExecutorService _batchMessageExecutorService;

  final ConcurrentHashMap<String, String> _messageTaskMap;

  final Set<String> _knownMessageIds;

  /* Resources whose configuration for dedicate thread pool has been checked.*/
  final Set<String> _resourcesThreadpoolChecked;
  final Set<String> _transitionTypeThreadpoolChecked;

  // timer for schedule timeout tasks
  final Timer _timer;

  private boolean _isShuttingDown;

  public HelixTaskExecutor() {
    this(new ParticipantStatusMonitor(false, null), null);
  }

  public HelixTaskExecutor(ParticipantStatusMonitor participantStatusMonitor) {
    this(participantStatusMonitor, null);
  }

  public HelixTaskExecutor(ParticipantStatusMonitor participantStatusMonitor,
      MessageQueueMonitor messageQueueMonitor) {
    _monitor = participantStatusMonitor;
    _messageQueueMonitor = messageQueueMonitor;

    _taskMap = new ConcurrentHashMap<>();

    _hdlrFtyRegistry = new ConcurrentHashMap<>();
    _executorMap = new ConcurrentHashMap<>();
    _messageTaskMap = new ConcurrentHashMap<>();
    _knownMessageIds = Collections.newSetFromMap(new ConcurrentHashMap<>());
    _batchMessageExecutorService = Executors.newCachedThreadPool();
    _monitor.createExecutorMonitor("BatchMessageExecutor", _batchMessageExecutorService);

    _resourcesThreadpoolChecked = Collections.newSetFromMap(new ConcurrentHashMap<>());
    _transitionTypeThreadpoolChecked = Collections.newSetFromMap(new ConcurrentHashMap<>());

    _lock = new Object();
    _statusUpdateUtil = new StatusUpdateUtil();

    // created as a daemon timer thread to handle task timeout
    _timer = new Timer("HelixTaskExecutor_Timer", true);

    _isShuttingDown = false;

    startMonitorThread();
  }

  @Override
  public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory) {
    registerMessageHandlerFactory(type, factory, DEFAULT_PARALLEL_TASKS);
  }

  @Override
  public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
      int threadpoolSize) {
    if (factory instanceof MultiTypeMessageHandlerFactory) {
      if (!((MultiTypeMessageHandlerFactory) factory).getMessageTypes().contains(type)) {
        throw new HelixException("Message factory type mismatch. Type: " + type + ", factory: "
            + ((MultiTypeMessageHandlerFactory) factory).getMessageTypes());
      }
    } else {
      if (!factory.getMessageType().equals(type)) {
        throw new HelixException(
            "Message factory type mismatch. Type: " + type + ", factory: " + factory
                .getMessageType());
      }
    }

    _isShuttingDown = false;

    MsgHandlerFactoryRegistryItem newItem =
        new MsgHandlerFactoryRegistryItem(factory, threadpoolSize);
    MsgHandlerFactoryRegistryItem prevItem = _hdlrFtyRegistry.putIfAbsent(type, newItem);
    if (prevItem == null) {
      _executorMap.computeIfAbsent(type, msgType -> {
        ExecutorService newPool = Executors.newFixedThreadPool(threadpoolSize, r -> new Thread(r,
            "HelixTaskExecutor-message_handle_thread_" + thread_uid.getAndIncrement()));
        _monitor.createExecutorMonitor(type, newPool);
        return newPool;
      });
      LOG.info(
          "Registered message handler factory for type: " + type + ", poolSize: " + threadpoolSize
              + ", factory: " + factory + ", pool: " + _executorMap.get(type));
    } else {
      LOG.info("Skip register message handler factory for type: " + type + ", poolSize: "
          + threadpoolSize + ", factory: " + factory + ", already existing factory: " + prevItem
          .factory());
      newItem = null;
    }
  }

  public void setController(GenericHelixController controller) {
    _controller = controller;
  }

  public ParticipantStatusMonitor getParticipantMonitor() {
    return _monitor;
  }

  private void startMonitorThread() {
    // start a thread which monitors the completions of task
  }

  /** Dedicated Thread pool can be provided in configuration or by client.
   *  This method is to check it and update the thread pool if necessary.
   */
  private void updateStateTransitionMessageThreadPool(Message message, HelixManager manager) {
    if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) {
      return;
    }

    String resourceName = message.getResourceName();
    String factoryName = message.getStateModelFactoryName();
    String stateModelName = message.getStateModelDef();

    if (factoryName == null) {
      factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
    }
    StateModelFactory<? extends StateModel> stateModelFactory =
        manager.getStateMachineEngine().getStateModelFactory(stateModelName, factoryName);

    String perStateTransitionTypeKey =
        getStateTransitionType(getPerResourceStateTransitionPoolName(resourceName),
            message.getFromState(), message.getToState());
    if (perStateTransitionTypeKey != null && stateModelFactory != null
        && !_transitionTypeThreadpoolChecked.contains(perStateTransitionTypeKey)) {
      ExecutorService perStateTransitionTypeExecutor = stateModelFactory
          .getExecutorService(resourceName, message.getFromState(), message.getToState());
      _transitionTypeThreadpoolChecked.add(perStateTransitionTypeKey);

      if (perStateTransitionTypeExecutor != null) {
        _executorMap.put(perStateTransitionTypeKey, perStateTransitionTypeExecutor);
        LOG.info(String
            .format("Added client specified dedicate threadpool for resource %s from %s to %s",
                getPerResourceStateTransitionPoolName(resourceName), message.getFromState(),
                message.getToState()));
        return;
      }
    }

    if (!_resourcesThreadpoolChecked.contains(resourceName)) {
      int threadpoolSize = -1;
      ConfigAccessor configAccessor = manager.getConfigAccessor();
      // Changes to this configuration on thread pool size will only take effect after the participant get restarted.
      if (configAccessor != null) {
        HelixConfigScope scope = new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
            .forCluster(manager.getClusterName()).forResource(resourceName).build();

        String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS);
        try {
          if (threadpoolSizeStr != null) {
            threadpoolSize = Integer.parseInt(threadpoolSizeStr);
          }
        } catch (Exception e) {
          LOG.error(
              "Failed to parse ThreadPoolSize from resourceConfig for resource" + resourceName, e);
        }
      }
      final String key = getPerResourceStateTransitionPoolName(resourceName);
      if (threadpoolSize > 0) {
        _executorMap.put(key, Executors.newFixedThreadPool(threadpoolSize,
            r -> new Thread(r, "GerenricHelixController-message_handle_" + key)));
        LOG.info("Added dedicate threadpool for resource: " + resourceName + " with size: "
            + threadpoolSize);
      } else {
        // if threadpool is not configured
        // check whether client specifies customized threadpool.
        if (stateModelFactory != null) {
          ExecutorService executor = stateModelFactory.getExecutorService(resourceName);
          if (executor != null) {
            _executorMap.put(key, executor);
            LOG.info("Added client specified dedicate threadpool for resource: " + key);
          }
        } else {
          LOG.error(String.format(
              "Fail to get dedicate threadpool defined in stateModelFactory %s: using factoryName: %s for resource %s. No stateModelFactory was found!",
              stateModelName, factoryName, resourceName));
        }
      }
      _resourcesThreadpoolChecked.add(resourceName);
    }
  }

  /**
   * Find the executor service for the message. A message can have a per-statemodelfactory
   * executor service, or per-message type executor service.
   */
  ExecutorService findExecutorServiceForMsg(Message message) {
    ExecutorService executorService = _executorMap.get(message.getMsgType());
    if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())) {
      if (message.getBatchMessageMode() == true) {
        executorService = _batchMessageExecutorService;
      } else {
        String resourceName = message.getResourceName();
        if (resourceName != null) {
          String key = getPerResourceStateTransitionPoolName(resourceName);
          String perStateTransitionTypeKey =
              getStateTransitionType(key, message.getFromState(), message.getToState());
          if (perStateTransitionTypeKey != null && _executorMap
              .containsKey(perStateTransitionTypeKey)) {
            LOG.info(String
                .format("Find per state transition type thread pool for resource %s from %s to %s",
                    message.getResourceName(), message.getFromState(), message.getToState()));
            executorService = _executorMap.get(perStateTransitionTypeKey);
          } else if (_executorMap.containsKey(key)) {
            LOG.info("Find per-resource thread pool with key: " + key);
            executorService = _executorMap.get(key);
          }
        }
      }
    }
    return executorService;
  }

  // ExecutorService impl's in JDK are thread-safe
  @Override
  public List<Future<HelixTaskResult>> invokeAllTasks(List<MessageTask> tasks, long timeout,
      TimeUnit unit) throws InterruptedException {
    if (tasks == null || tasks.size() == 0) {
      return null;
    }

    // check all tasks use the same executor-service
    ExecutorService exeSvc = findExecutorServiceForMsg(tasks.get(0).getMessage());
    for (int i = 1; i < tasks.size(); i++) {
      MessageTask task = tasks.get(i);
      ExecutorService curExeSvc = findExecutorServiceForMsg(task.getMessage());
      if (curExeSvc != exeSvc) {
        LOG.error("Fail to invoke all tasks because they are not using the same executor-service");
        return null;
      }
    }

    // TODO: check if any of the task has already been scheduled

    // this is a blocking call
    List<Future<HelixTaskResult>> futures = exeSvc.invokeAll(tasks, timeout, unit);

    return futures;
  }

  @Override
  public boolean cancelTimeoutTask(MessageTask task) {
    synchronized (_lock) {
      String taskId = task.getTaskId();
      if (_taskMap.containsKey(taskId)) {
        MessageTaskInfo info = _taskMap.get(taskId);
        removeMessageFromTaskAndFutureMap(task.getMessage());
        if (info._timerTask != null) {
          info._timerTask.cancel();
        }
        return true;
      }
      return false;
    }
  }

  @Override
  public boolean scheduleTask(MessageTask task) {
    String taskId = task.getTaskId();
    Message message = task.getMessage();
    NotificationContext notificationContext = task.getNotificationContext();
    HelixManager manager = notificationContext.getManager();

    try {
      // Check to see if dedicate thread pool for handling state transition messages is configured or provided.
      updateStateTransitionMessageThreadPool(message, manager);

      LOG.info("Scheduling message {}: {}:{}, {}->{}", taskId, message.getResourceName(),
          message.getPartitionName(), message.getFromState(), message.getToState());

      _statusUpdateUtil
          .logInfo(message, HelixTaskExecutor.class, "Message handling task scheduled", manager);

      // this sync guarantees that ExecutorService.submit() task and put taskInfo into map are
      // sync'ed
      synchronized (_lock) {
        if (!_taskMap.containsKey(taskId)) {
          ExecutorService exeSvc = findExecutorServiceForMsg(message);

          if (exeSvc == null) {
            LOG.warn(String
                .format("Threadpool is null for type %s of message %s", message.getMsgType(),
                    message.getMsgId()));
            return false;
          }

          LOG.info("Submit task: " + taskId + " to pool: " + exeSvc);
          Future<HelixTaskResult> future = exeSvc.submit(task);

          _messageTaskMap
              .putIfAbsent(getMessageTarget(message.getResourceName(), message.getPartitionName()),
                  taskId);

          TimerTask timerTask = null;
          if (message.getExecutionTimeout() > 0) {
            timerTask = new MessageTimeoutTask(this, task);
            _timer.schedule(timerTask, message.getExecutionTimeout());
            LOG.info(
                "Message starts with timeout " + message.getExecutionTimeout() + " MsgId: " + task
                    .getTaskId());
          } else {
            LOG.debug("Message does not have timeout. MsgId: " + task.getTaskId());
          }
          _taskMap.put(taskId, new MessageTaskInfo(task, future, timerTask));

          LOG.info("Message: " + taskId + " handling task scheduled");
          return true;
        } else {
          _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
              "Message handling task already sheduled for " + taskId, manager);
        }
      }
    } catch (Exception e) {
      LOG.error("Error while executing task. " + message, e);
      _statusUpdateUtil
          .logError(message, HelixTaskExecutor.class, e, "Error while executing task " + e,
              manager);
    }
    return false;
  }

  @Override
  public boolean cancelTask(MessageTask task) {
    Message message = task.getMessage();
    NotificationContext notificationContext = task.getNotificationContext();
    String taskId = task.getTaskId();

    synchronized (_lock) {
      if (_taskMap.containsKey(taskId)) {
        MessageTaskInfo taskInfo = _taskMap.get(taskId);
        // cancel timeout task
        if (taskInfo._timerTask != null) {
          taskInfo._timerTask.cancel();
        }

        // cancel task
        Future<HelixTaskResult> future = taskInfo.getFuture();
        removeMessageFromTaskAndFutureMap(message);
        _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceling task: " + taskId,
            notificationContext.getManager());

        // If the thread is still running it will be interrupted if cancel(true)
        // is called. So state transition callbacks should implement logic to
        // return if it is interrupted.
        if (future.cancel(true)) {
          _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled task: " + taskId,
              notificationContext.getManager());
          _taskMap.remove(taskId);
          return true;
        } else {
          _statusUpdateUtil
              .logInfo(message, HelixTaskExecutor.class, "fail to cancel task: " + taskId,
                  notificationContext.getManager());
        }
      } else {
        _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
            "fail to cancel task: " + taskId + ", future not found",
            notificationContext.getManager());
      }
    }
    return false;
  }

  @Override
  public void finishTask(MessageTask task) {
    Message message = task.getMessage();
    String taskId = task.getTaskId();
    LOG.info("message finished: " + taskId + ", took " + (new Date().getTime() - message
        .getExecuteStartTimeStamp()));

    synchronized (_lock) {
      if (_taskMap.containsKey(taskId)) {
        MessageTaskInfo info = _taskMap.remove(taskId);
        removeMessageFromTaskAndFutureMap(message);
        if (info._timerTask != null) {
          // ok to cancel multiple times
          info._timerTask.cancel();
        }
      } else {
        LOG.warn("message " + taskId + " not found in task map");
      }
    }
  }

  private void updateMessageState(Collection<Message> msgsToBeUpdated, HelixDataAccessor accessor,
      String instanceName) {
    if (msgsToBeUpdated.isEmpty()) {
      return;
    }

    Builder keyBuilder = accessor.keyBuilder();
    List<Message> updateMsgs = new ArrayList<>();
    List<String> updateMsgPaths = new ArrayList<>();
    List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
    for (Message msg : msgsToBeUpdated) {
      updateMsgs.add(msg);
      updateMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
      /**
       * We use the updater to avoid race condition between writing message to zk as READ state and removing message after ST is done
       * If there is no message at this path, meaning the message is removed so we do not write the message
       */
      updaters.add(currentData -> {
        if (currentData == null) {
          LOG.warn(
              "Message {} targets at {} has already been removed before it is set as READ on instance {}",
              msg.getId(), msg.getTgtName(), instanceName);
          return null;
        }
        return msg.getRecord();
      });
    }
    boolean[] updateResults =
        accessor.updateChildren(updateMsgPaths, updaters, AccessOption.PERSISTENT);

    boolean isMessageUpdatedAsNew = false;
    // Note that only cache the known message Ids after the update to ZK is successfully done.
    // This is to avoid inconsistent cache.
    for (int i = 0; i < updateMsgs.size(); i++) {
      Message msg = updateMsgs.get(i);
      if (msg.getMsgState().equals(MessageState.NEW)) {
        // If a message is updated as NEW state, then we might need to process it again soon.
        isMessageUpdatedAsNew = true;
        // And it shall not be treated as a known messages.
      } else {
        _knownMessageIds.add(msg.getId());
        if (!updateResults[i]) {
          // TODO: If the message update fails, maybe we shall not treat the message as a known
          // TODO: message. We shall apply more strict check and retry the update.
          LOG.error("Failed to update the message {}.", msg.getMsgId());
        }
      }
    }

    if (isMessageUpdatedAsNew) {
      // Sending a NO-OP message to trigger another message callback to re-process the messages
      // that are updated as NEW state.
      sendNopMessage(accessor, instanceName);
    }
  }

  private void shutdownAndAwaitTermination(ExecutorService pool) {
    LOG.info("Shutting down pool: " + pool);
    pool.shutdown(); // Disable new tasks from being submitted
    try {
      // Wait a while for existing tasks to terminate
      if (!pool.awaitTermination(200, TimeUnit.MILLISECONDS)) {
        List<Runnable> waitingTasks = pool.shutdownNow(); // Cancel currently executing tasks
        LOG.info("Tasks that never commenced execution: " + waitingTasks);
        // Wait a while for tasks to respond to being cancelled
        if (!pool.awaitTermination(200, TimeUnit.MILLISECONDS)) {
          LOG.error("Pool did not fully terminate in 200ms. pool: " + pool);
        }
      }
    } catch (InterruptedException ie) {
      // (Re-)Cancel if current thread also interrupted
      LOG.error("Interruped when waiting for shutdown pool: " + pool, ie);
      pool.shutdownNow();
      // Preserve interrupt status
      Thread.currentThread().interrupt();
    }
  }

  /**
   * remove message-handler factory from map, shutdown the associated executor
   * @param type
   */
  void unregisterMessageHandlerFactory(String type) {
    MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.remove(type);
    ExecutorService pool = _executorMap.remove(type);
    _monitor.removeExecutorMonitor(type);

    LOG.info(
        "Unregistering message handler factory for type: " + type + ", factory: " + item.factory()
            + ", pool: " + pool);

    if (pool != null) {
      shutdownAndAwaitTermination(pool);
    }

    // reset state-model
    if (item != null) {
      item.factory().reset();
    }

    LOG.info(
        "Unregistered message handler factory for type: " + type + ", factory: " + item.factory()
            + ", pool: " + pool);
  }

  private void syncFactoryState() {
    LOG.info("Start to sync factory state");
    // Lock on the registry to avoid race condition when concurrently calling sync() and reset()
    synchronized (_hdlrFtyRegistry) {
      for (Map.Entry<String, MsgHandlerFactoryRegistryItem> entry : _hdlrFtyRegistry.entrySet()) {
        MsgHandlerFactoryRegistryItem item = entry.getValue();
        if (item.factory() != null) {
          try {
            item.factory().sync();
          } catch (Exception ex) {
            LOG.error("Failed to syncState the factory {} of message type {}.", item.factory(),
                entry.getKey(), ex);
          }
        }
      }
    }
  }

  void reset() {
    LOG.info("Reset HelixTaskExecutor");

    if (_messageQueueMonitor != null) {
      _messageQueueMonitor.reset();
    }

    synchronized (_hdlrFtyRegistry) {
      for (String msgType : _hdlrFtyRegistry.keySet()) {
        // don't un-register factories, just shutdown all executors
        ExecutorService pool = _executorMap.remove(msgType);
        _monitor.removeExecutorMonitor(msgType);
        if (pool != null) {
          LOG.info("Reset exectuor for msgType: " + msgType + ", pool: " + pool);
          shutdownAndAwaitTermination(pool);
        }

        MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
        if (item.factory() != null) {
          try {
            item.factory().reset();
          } catch (Exception ex) {
            LOG.error("Failed to reset the factory {} of message type {}.", item.factory().toString(),
                msgType, ex);
          }
        }
      }
    }
    // threads pool specific to STATE_TRANSITION.Key specific pool are not shut down.
    // this is a potential area to improve. https://github.com/apache/helix/issues/1245

    StringBuilder sb = new StringBuilder();
    // Log all tasks that fail to terminate
    for (String taskId : _taskMap.keySet()) {
      MessageTaskInfo info = _taskMap.get(taskId);
      sb.append(
          "Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage() + "\n");
    }

    LOG.info(sb.toString());
    _taskMap.clear();

    _messageTaskMap.clear();

    _knownMessageIds.clear();

    _lastSessionSyncTime = null;
  }

  void init() {
    LOG.info("Init HelixTaskExecutor");

    if (_messageQueueMonitor != null) {
      _messageQueueMonitor.init();
    }

    _isShuttingDown = false;

    // Re-init all existing factories
    for (final String msgType : _hdlrFtyRegistry.keySet()) {
      MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
      ExecutorService pool = _executorMap.computeIfAbsent(msgType, type -> {
        ExecutorService newPool = Executors.newFixedThreadPool(item.threadPoolSize(),
            r -> new Thread(r, "HelixTaskExecutor-message_handle_" + type));
        _monitor.createExecutorMonitor(type, newPool);
        return newPool;
      });
      LOG.info("Setup the thread pool for type: %s, isShutdown: %s", msgType, pool.isShutdown());
    }
  }

  private void syncSessionToController(HelixManager manager) {
    if (_lastSessionSyncTime == null || System.currentTimeMillis() - _lastSessionSyncTime
        > SESSION_SYNC_INTERVAL) { // > delay since last sync
      HelixDataAccessor accessor = manager.getHelixDataAccessor();
      PropertyKey key = new Builder(manager.getClusterName()).controllerMessage(SESSION_SYNC);
      if (accessor.getProperty(key) == null) {
        LOG.info(String
            .format("Participant %s syncs session with controller", manager.getInstanceName()));
        Message msg = new Message(MessageType.PARTICIPANT_SESSION_CHANGE, SESSION_SYNC);
        msg.setSrcName(manager.getInstanceName());
        msg.setTgtSessionId("*");
        msg.setMsgState(MessageState.NEW);
        msg.setMsgId(SESSION_SYNC);

        Criteria cr = new Criteria();
        cr.setRecipientInstanceType(InstanceType.CONTROLLER);
        cr.setSessionSpecific(false);

        manager.getMessagingService().send(cr, msg);
        _lastSessionSyncTime = System.currentTimeMillis();
      }
    }
  }

  private List<Message> readNewMessagesFromZK(HelixManager manager, String instanceName,
      HelixConstants.ChangeType changeType) {
    HelixDataAccessor accessor = manager.getHelixDataAccessor();
    Builder keyBuilder = accessor.keyBuilder();

    Set<String> messageIds = new HashSet<>();
    if (changeType.equals(HelixConstants.ChangeType.MESSAGE)) {
      messageIds.addAll(accessor.getChildNames(keyBuilder.messages(instanceName)));
    } else if (changeType.equals(HelixConstants.ChangeType.MESSAGES_CONTROLLER)) {
      messageIds.addAll(accessor.getChildNames(keyBuilder.controllerMessages()));
    } else {
      LOG.warn("Unexpected ChangeType for Message Change CallbackHandler: " + changeType);
      return Collections.emptyList();
    }

    // Avoid reading the already known messages.
    messageIds.removeAll(_knownMessageIds);
    List<PropertyKey> keys = new ArrayList<>();
    for (String messageId : messageIds) {
      if (changeType.equals(HelixConstants.ChangeType.MESSAGE)) {
        keys.add(keyBuilder.message(instanceName, messageId));
      } else if (changeType.equals(HelixConstants.ChangeType.MESSAGES_CONTROLLER)) {
        keys.add(keyBuilder.controllerMessage(messageId));
      }
    }

    /**
     * Do not throw exception on partial message read.
     * 1. There is no way to resolve the error on the participant side. And once it fails here, we
     * are running the risk of ignoring the message change event. And the participant might be stuck.
     * 2. Even this is a partial read, we have another chance to retry in the business logic since
     * as long as the participant processes messages, it will touch the message folder and triggers
     * another message event.
     */
    List<Message> newMessages = accessor.getProperty(keys, false);
    // Message may be removed before get read, clean up null messages.
    Iterator<Message> messageIterator = newMessages.iterator();
    while (messageIterator.hasNext()) {
      if (messageIterator.next() == null) {
        messageIterator.remove();
      }
    }
    return newMessages;
  }

  @Override
  @PreFetch(enabled = false)
  public void onMessage(String instanceName, List<Message> messages,
      NotificationContext changeContext) {
    HelixManager manager = changeContext.getManager();

    // If FINALIZE notification comes, reset all handler factories
    // and terminate all the thread pools
    // TODO: see if we should have a separate notification call for resetting
    if (changeContext.getType() == Type.FINALIZE) {
      reset();
      return;
    }

    if (changeContext.getType() == Type.INIT) {
      init();
      // continue to process messages
    }

    // if prefetch is disabled in MessageListenerCallback, we need to read all new messages from zk.
    if (messages == null || messages.isEmpty()) {
      // If no messages are given, check and read all new messages.
      messages = readNewMessagesFromZK(manager, instanceName, changeContext.getChangeType());
    }

    if (_isShuttingDown) {
      StringBuilder sb = new StringBuilder();
      for (Message message : messages) {
        sb.append(message.getMsgId() + ",");
      }
      LOG.info(
          "Helix task executor is shutting down, ignore unprocessed messages : " + sb.toString());
      return;
    }

    // Update message count
    if (_messageQueueMonitor != null) {
      _messageQueueMonitor.setMessageQueueBacklog(messages.size());
    }

    if (messages.isEmpty()) {
      LOG.info("No Messages to process");
      return;
    }

    // sort message by creation timestamp, so message created earlier is processed first
    Collections.sort(messages, Message.CREATE_TIME_COMPARATOR);

    HelixDataAccessor accessor = manager.getHelixDataAccessor();
    Builder keyBuilder = accessor.keyBuilder();

    // message handlers and corresponding contexts created
    Map<String, MessageHandler> stateTransitionHandlers = new HashMap<>();
    Map<String, NotificationContext> stateTransitionContexts = new HashMap<>();

    List<MessageHandler> nonStateTransitionHandlers = new ArrayList<>();
    List<NotificationContext> nonStateTransitionContexts = new ArrayList<>();

    // message to be updated in ZK
    Map<String, Message> msgsToBeUpdated = new HashMap<>();

    String sessionId = manager.getSessionId();
    List<String> curResourceNames =
        accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
    List<String> taskCurResourceNames =
        accessor.getChildNames(keyBuilder.taskCurrentStates(instanceName, sessionId));
    List<PropertyKey> createCurStateKeys = new ArrayList<>();
    List<CurrentState> metaCurStates = new ArrayList<>();
    Set<String> createCurStateNames = new HashSet<>();

    for (Message message : messages) {
      if (checkAndProcessNoOpMessage(message, instanceName, changeContext, manager, sessionId,
          stateTransitionHandlers)) {
        // skip the following operations for the no-op messages.
        continue;
      }
      NotificationContext msgWorkingContext = changeContext.clone();
      MessageHandler msgHandler = null;
      try {
        // create message handlers, if handlers not found but no exception, leave its state as NEW
        msgHandler = createMessageHandler(message, msgWorkingContext);
      } catch (Exception ex) {
        // Failed to create message handler and there is an Exception.
        int remainingRetryCount = message.getRetryCount();
        LOG.error(
            "Exception happens when creating Message Handler for message {}. Current remaining retry count is {}.",
            message.getMsgId(), remainingRetryCount);
        // Reduce the message retry count to avoid infinite retrying.
        message.setRetryCount(remainingRetryCount - 1);
        message.setExecuteSessionId(sessionId);
        // Note that we are re-using the retry count of Message that was original designed to control
        // timeout retries. So it is not checked before the first try in order to ensure consistent
        // behavior. It is possible that we introduce a new behavior for this method. But it requires
        // us to split the configuration item so as to avoid confusion.
        if (message.getRetryCount() <= 0) {
          // If no more retry count remains, then mark the message to be UNPROCESSABLE.
          String errorMsg = String.format("No available message Handler found!"
                  + " Stop processing message %s since it has zero or negative remaining retry count %d!",
              message.getMsgId(), message.getRetryCount());
          updateUnprocessableMessage(message, null, errorMsg, manager);
        }
        msgsToBeUpdated.put(message.getId(), message);
        // continue processing in the next section where handler object is double-checked.
      }

      if (msgHandler == null) {
        // Skip processing this message in this callback. The same message process will be retried
        // in the next round if retry count > 0.
        LOG.warn("There is no existing handler for message {}."
            + " Skip processing it for now. Will retry on the next callback.", message.getMsgId());
        continue;
      }

      if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) || message.getMsgType()
          .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
        if (validateAndProcessStateTransitionMessage(message, manager, stateTransitionHandlers,
            msgHandler)) {
          // Need future process by triggering state transition
          String msgTarget =
              getMessageTarget(message.getResourceName(), message.getPartitionName());
          stateTransitionHandlers.put(msgTarget, msgHandler);
          stateTransitionContexts.put(msgTarget, msgWorkingContext);
        } else {
          // Skip the following operations for the invalid/expired state transition messages.
          // Also remove the message since it might block the other state transition messages.
          removeMessageFromZK(accessor, message, instanceName);
          continue;
        }
      } else {
        // Need future process non state transition messages by triggering the handler
        nonStateTransitionHandlers.add(msgHandler);
        nonStateTransitionContexts.add(msgWorkingContext);
      }

      // Update the normally processed messages
      Message markedMsg = markReadMessage(message, msgWorkingContext, manager);
      msgsToBeUpdated.put(markedMsg.getId(), markedMsg);

      // batch creation of all current state meta data
      // do it for non-controller and state transition messages only
      if (!message.isControlerMsg() && message.getMsgType()
          .equals(Message.MessageType.STATE_TRANSITION.name())) {
        String resourceName = message.getResourceName();
        if (!curResourceNames.contains(resourceName) && !taskCurResourceNames.contains(resourceName)
            && !createCurStateNames.contains(resourceName)) {
          createCurStateNames.add(resourceName);
          PropertyKey curStateKey = keyBuilder.currentState(instanceName, sessionId, resourceName);
          if (TaskConstants.STATE_MODEL_NAME.equals(message.getStateModelDef()) && !Boolean
              .getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED)) {
            curStateKey = keyBuilder.taskCurrentState(instanceName, sessionId, resourceName);
          }
          createCurStateKeys.add(curStateKey);

          CurrentState metaCurState = new CurrentState(resourceName);
          metaCurState.setBucketSize(message.getBucketSize());
          metaCurState.setStateModelDefRef(message.getStateModelDef());
          metaCurState.setSessionId(sessionId);
          metaCurState.setBatchMessageMode(message.getBatchMessageMode());
          String ftyName = message.getStateModelFactoryName();
          if (ftyName != null) {
            metaCurState.setStateModelFactoryName(ftyName);
          } else {
            metaCurState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
          }
          metaCurStates.add(metaCurState);
        }
      }
    }

    // batch create curState meta
    if (createCurStateKeys.size() > 0) {
      try {
        accessor.createChildren(createCurStateKeys, metaCurStates);
      } catch (Exception e) {
        LOG.error("fail to create cur-state znodes for messages: " + msgsToBeUpdated, e);
      }
    }

    // update message state in batch and schedule tasks for all read messages
    updateMessageState(msgsToBeUpdated.values(), accessor, instanceName);

    for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
      MessageHandler handler = handlerEntry.getValue();
      NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
      if (!scheduleTaskForMessage(instanceName, accessor, handler, context) && !_isShuttingDown) {
        /**
         * TODO: Checking _isShuttingDown is a workaround to avoid unnecessary ERROR partition.
         * TODO: We shall improve the shutdown process of the participant to clean up the workflow
         * TODO: completely. In detail, there isa race condition between TaskExecutor thread
         * TODO: pool shutting down and Message handler stops listening. In this gap, the message
         * TODO: will still be processed but schedule will fail. If we mark partition into ERROR
         * TODO: state, then the controller side logic might be confused.
         */
        try {
          // Record error state to the message handler.
          handler.onError(new HelixException(String
                  .format("Failed to schedule the task for executing message handler for %s.",
                      handler._message.getMsgId())), MessageHandler.ErrorCode.ERROR,
              MessageHandler.ErrorType.FRAMEWORK);
        } catch (Exception ex) {
          LOG.error("Failed to trigger onError method of the message handler for {}",
              handler._message.getMsgId(), ex);
        }
      }
    }

    for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
      MessageHandler handler = nonStateTransitionHandlers.get(i);
      NotificationContext context = nonStateTransitionContexts.get(i);
      scheduleTaskForMessage(instanceName, accessor, handler, context);
    }
  }

  /**
   * Inspect the message. Report and remove it if no operation needs to be done.
   * @param message
   * @param instanceName
   * @param changeContext
   * @param manager
   * @param sessionId
   * @param stateTransitionHandlers
   * @return True if the message is no-op message and no other process step is required.
   */
  private boolean checkAndProcessNoOpMessage(Message message, String instanceName,
      NotificationContext changeContext, HelixManager manager, String sessionId,
      Map<String, MessageHandler> stateTransitionHandlers) {
    HelixDataAccessor accessor = manager.getHelixDataAccessor();
    try {
      // nop messages are simply removed. It is used to trigger onMessage() in
      // situations such as register a new message handler factory
      if (message.getMsgType().equalsIgnoreCase(MessageType.NO_OP.toString())) {
        LOG.info(
            "Dropping NO-OP message. mid: " + message.getId() + ", from: " + message.getMsgSrc());
        reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
        return true;
      }

      String tgtSessionId = message.getTgtSessionId();
      // sessionId mismatch normally means message comes from expired session, just remove it
      if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
        String warningMessage = "SessionId does NOT match. expected sessionId: " + sessionId
            + ", tgtSessionId in message: " + tgtSessionId + ", messageId: " + message.getMsgId();
        LOG.warn(warningMessage);
        reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
        _statusUpdateUtil
            .logWarning(message, HelixStateMachineEngine.class, warningMessage, manager);

        // Proactively send a session sync message from participant to controller
        // upon session mismatch after a new session is established
        if (manager.getInstanceType() == InstanceType.PARTICIPANT
            || manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) {
          if (message.getCreateTimeStamp() > manager.getSessionStartTime()) {
            syncSessionToController(manager);
          }
        }
        return true;
      }

      if ((manager.getInstanceType() == InstanceType.CONTROLLER
          || manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
          && MessageType.PARTICIPANT_SESSION_CHANGE.name().equals(message.getMsgType())) {
        LOG.info(String.format("Controller received PARTICIPANT_SESSION_CHANGE msg from src: %s",
            message.getMsgSrc()));
        PropertyKey key = new Builder(manager.getClusterName()).liveInstances();
        List<LiveInstance> liveInstances = manager.getHelixDataAccessor().getChildValues(key, true);
        _controller.onLiveInstanceChange(liveInstances, changeContext);
        reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.COMPLETED);
        return true;
      }

      // don't process message that is of READ or UNPROCESSABLE state
      if (MessageState.NEW != message.getMsgState()) {
        // It happens because we don't delete message right after
        // read. Instead we keep it until the current state is updated.
        // We will read the message again if there is a new message but we
        // check for the status and ignore if its already read
        if (LOG.isTraceEnabled()) {
          LOG.trace("Message already read. msgId: " + message.getMsgId());
        }
        return true;
      }

      if (message.isExpired()) {
        LOG.info(
            "Dropping expired message. mid: " + message.getId() + ", from: " + message.getMsgSrc()
                + " relayed from: " + message.getRelaySrcHost());
        reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
        return true;
      }

      // State Transition Cancellation
      if (message.getMsgType().equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
        boolean success =
            cancelNotStartedStateTransition(message, stateTransitionHandlers, accessor,
                instanceName);
        if (success) {
          return true;
        }
      }

      if (MessageType.PARTICIPANT_STATUS_CHANGE.name().equals(message.getMsgType())) {
        LiveInstanceStatus toStatus = LiveInstanceStatus.valueOf(message.getToState());
        changeParticipantStatus(instanceName, toStatus, manager);
        reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.COMPLETED);
        return true;
      }

      _monitor.reportReceivedMessage(message);
    } catch (Exception e) {
      LOG.error("Failed to process the message {}. Deleting the message from ZK. Exception: {}",
          message, e);
      removeMessageFromTaskAndFutureMap(message);
      removeMessageFromZK(accessor, message, instanceName);
      return true;
    }
    return false;
  }

  /**
   * Preprocess the state transition message to validate if the request is valid.
   * If no operation needs to be triggered, discard the the message.
   * @param message
   * @param manager
   * @param stateTransitionHandlers
   * @param createHandler
   * @return True if the requested state transition is valid, and need to schedule the transition.
   *         False if no more operation is required.
   */
  private boolean validateAndProcessStateTransitionMessage(Message message, HelixManager manager,
      Map<String, MessageHandler> stateTransitionHandlers, MessageHandler createHandler) {
    String messageTarget = getMessageTarget(message.getResourceName(), message.getPartitionName());

    try {
      if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
          && isStateTransitionInProgress(messageTarget)) {
        String taskId = _messageTaskMap.get(messageTarget);
        Message msg = _taskMap.get(taskId).getTask().getMessage();
        // If there is another state transition for same partition is going on,
        // discard the message. Controller will resend if this is a valid message
        String errMsg = String.format(
            "Another state transition for %s:%s is in progress with msg: %s, p2p: %s, read: %d, current:%d. Discarding %s->%s message",
            message.getResourceName(), message.getPartitionName(), msg.getMsgId(),
            msg.isRelayMessage(), msg.getReadTimeStamp(), System.currentTimeMillis(),
            message.getFromState(), message.getToState());
        updateUnprocessableMessage(message, null /* exception */, errMsg, manager);
        return false;
      }
      if (createHandler instanceof HelixStateTransitionHandler) {
        // We only check to state if there is no ST task scheduled/executing.
        HelixStateTransitionHandler.StaleMessageValidateResult result =
            ((HelixStateTransitionHandler) createHandler).staleMessageValidator();
        if (!result.isValid) {
          updateUnprocessableMessage(message, null /* exception */, result.exception.getMessage(),
              manager);
          return false;
        }
      }
      if (stateTransitionHandlers.containsKey(messageTarget)) {
        // If there are 2 messages in same batch about same partition's state transition,
        // the later one is discarded
        Message duplicatedMessage = stateTransitionHandlers.get(messageTarget)._message;
        String errMsg = String.format(
            "Duplicated state transition message: %s. Existing: %s->%s; New (Discarded): %s->%s",
            message.getMsgId(), duplicatedMessage.getFromState(), duplicatedMessage.getToState(),
            message.getFromState(), message.getToState());
        updateUnprocessableMessage(message, null /* exception */, errMsg, manager);
        return false;
      }
      return true;
    } catch (Exception ex) {
      updateUnprocessableMessage(message, ex, "State transition validation failed with Exception.",
          manager);
      return false;
    }
  }

  /**
   * Schedule task to execute the message handler.
   * @param instanceName
   * @param accessor
   * @param handler
   * @param context
   * @return True if schedule the task successfully. False otherwise.
   */
  private boolean scheduleTaskForMessage(String instanceName, HelixDataAccessor accessor,
      MessageHandler handler, NotificationContext context) {
    Message msg = handler._message;
    if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
      // Remove message if schedule tasks are failed.
      removeMessageFromTaskAndFutureMap(msg);
      removeMessageFromZK(accessor, msg, instanceName);
      return false;
    }
    return true;
  }

  /**
   * Check if a state transition of the given message target is in progress. This function
   * assumes the given message target corresponds to a state transition task
   *
   * @param messageTarget message target generated by getMessageTarget()
   * @return true if there is a task going on with same message target else false
   */
  private boolean isStateTransitionInProgress(String messageTarget) {
    synchronized (_lock) {
      if (_messageTaskMap.containsKey(messageTarget)) {
        String taskId = _messageTaskMap.get(messageTarget);
        return !_taskMap.get(taskId).getFuture().isDone();
      }
      return false;
    }
  }

  // Try to cancel this state transition that has not been started yet.
  // Three Types of Cancellation: 1. Message arrived with previous state transition
  //                              2. Message handled but task not started
  //                              3. Message handled and task already started
  // This method tries to handle the first two cases, it returns true if no further cancellation is needed,
  // false if not been able to cancel the state transition (i.e, further cancellation is needed).
  private boolean cancelNotStartedStateTransition(Message message,
      Map<String, MessageHandler> stateTransitionHandlers, HelixDataAccessor accessor,
      String instanceName) {
    String targetMessageName =
        getMessageTarget(message.getResourceName(), message.getPartitionName());
    ProcessedMessageState messageState;
    Message targetStateTransitionMessage;

    // State transition message and cancel message are in same batch
    if (stateTransitionHandlers.containsKey(targetMessageName)) {
      targetStateTransitionMessage = stateTransitionHandlers.get(targetMessageName).getMessage();
      if (isCancelingSameStateTransition(targetStateTransitionMessage, message)) {
        stateTransitionHandlers.remove(targetMessageName);
        messageState = ProcessedMessageState.COMPLETED;
      } else {
        messageState = ProcessedMessageState.DISCARDED;
      }
    } else if (_messageTaskMap.containsKey(targetMessageName)) {
      // Cancel the from future without interrupt ->  Cancel the task future without
      // interruptting the state transition that is already started.  If the state transition
      // is already started, we should call cancel in the state model.
      String taskId = _messageTaskMap.get(targetMessageName);
      HelixTask task = (HelixTask) _taskMap.get(taskId).getTask();
      Future<HelixTaskResult> future = _taskMap.get(taskId).getFuture();
      targetStateTransitionMessage = task.getMessage();

      if (isCancelingSameStateTransition(task.getMessage(), message)) {
        boolean success = task.cancel();
        if (!success) {
          // the state transition is already started, need further cancellation.
          return false;
        }

        future.cancel(false);
        _messageTaskMap.remove(targetMessageName);
        _taskMap.remove(taskId);
        messageState = ProcessedMessageState.COMPLETED;
      } else {
        messageState = ProcessedMessageState.DISCARDED;
      }
    } else {
      return false;
    }

    // remove the original state-transition message been cancelled.
    removeMessageFromZK(accessor, targetStateTransitionMessage, instanceName);
    _monitor.reportProcessedMessage(targetStateTransitionMessage,
        ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);

    // remove the state transition cancellation message
    reportAndRemoveMessage(message, accessor, instanceName, messageState);

    return true;
  }

  private void reportAndRemoveMessage(Message message, HelixDataAccessor accessor,
      String instanceName, ProcessedMessageState messageProcessState) {
    _monitor.reportReceivedMessage(message);
    _monitor.reportProcessedMessage(message, messageProcessState);
    removeMessageFromZK(accessor, message, instanceName);
  }

  private Message markReadMessage(Message message, NotificationContext context,
      HelixManager manager) {
    message.setMsgState(MessageState.READ);
    message.setReadTimeStamp(new Date().getTime());
    message.setExecuteSessionId(context.getManager().getSessionId());

    _statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New Message", manager);
    return message;
  }

  private void updateUnprocessableMessage(Message message, Exception exception, String errorMsg,
      HelixManager manager) {
    String error = "Message " + message.getMsgId() + " cannot be processed: " + message.getRecord();
    if (exception != null) {
      LOG.error(error, exception);
      _statusUpdateUtil.logError(message, HelixStateMachineEngine.class, exception, error, manager);
    } else {
      LOG.error(error + errorMsg);
      _statusUpdateUtil.logError(message, HelixStateMachineEngine.class, errorMsg, manager);
    }
    message.setMsgState(MessageState.UNPROCESSABLE);
    _monitor.reportProcessedMessage(message, ProcessedMessageState.FAILED);
  }

  public MessageHandler createMessageHandler(Message message, NotificationContext changeContext) {
    String msgType = message.getMsgType();

    MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);

    // Fail to find a MessageHandlerFactory for the message
    // we will keep the message and the message will be handled when
    // the corresponding MessageHandlerFactory is registered
    if (item == null) {
      LOG.warn("Fail to find message handler factory for type: " + msgType + " msgId: " + message
          .getMsgId());
      return null;
    }
    MessageHandlerFactory handlerFactory = item.factory();

    // pass the executor to msg-handler since batch-msg-handler needs task-executor to schedule
    // sub-msgs
    changeContext.add(MapKey.TASK_EXECUTOR.toString(), this);
    return handlerFactory.createHandler(message, changeContext);
  }

  private void removeMessageFromTaskAndFutureMap(Message message) {
    _knownMessageIds.remove(message.getId());
    String messageTarget = getMessageTarget(message.getResourceName(), message.getPartitionName());
    if (_messageTaskMap.containsKey(messageTarget)) {
      _messageTaskMap.remove(messageTarget);
    }
  }

  private boolean isCancelingSameStateTransition(Message stateTranstionMessage,
      Message cancellationMessage) {
    return stateTranstionMessage.getFromState().equalsIgnoreCase(cancellationMessage.getFromState())
        && stateTranstionMessage.getToState().equalsIgnoreCase(cancellationMessage.getToState());
  }

  String getMessageTarget(String resourceName, String partitionName) {
    return String.format("%s_%s", resourceName, partitionName);
  }

  private void changeParticipantStatus(String instanceName,
      LiveInstance.LiveInstanceStatus toStatus, HelixManager manager) {
    if (toStatus == null) {
      LOG.warn("To status is null! Skip participant status change.");
      return;
    }

    LOG.info("Changing participant {} status to {} from {}", instanceName, toStatus,
        _liveInstanceStatus);
    HelixDataAccessor accessor = manager.getHelixDataAccessor();
    String sessionId = manager.getSessionId();
    String path = accessor.keyBuilder().liveInstance(instanceName).getPath();
    boolean success = false;

    switch (toStatus) {
      case PAUSED:
        _freezeSessionId = sessionId;
        _liveInstanceStatus = toStatus;
        // Entering freeze mode, update live instance status.
        // If the update fails, another new freeze message will be sent by controller.
        success = accessor.getBaseDataAccessor().update(path, record -> {
          record.setEnumField(LiveInstance.LiveInstanceProperty.STATUS.name(), toStatus);
          return record;
        }, AccessOption.EPHEMERAL);
        break;
      case NORMAL:
        // Exiting freeze mode
        // session changed, should call state model sync
        if (_freezeSessionId != null && !_freezeSessionId.equals(sessionId)) {
          syncFactoryState();
          ParticipantManager.carryOverPreviousCurrentState(accessor, instanceName, sessionId,
              manager.getStateMachineEngine(), false);
        }
        _freezeSessionId = null;
        _liveInstanceStatus = null;
        success = accessor.getBaseDataAccessor().update(path, record -> {
          // Remove the status field for backwards compatibility
          record.getSimpleFields().remove(LiveInstance.LiveInstanceProperty.STATUS.name());
          return record;
        }, AccessOption.EPHEMERAL);
        break;
      default:
        LOG.warn("To status {} is not supported", toStatus);
        break;
    }

    LOG.info("Changed participant {} status to {}. FreezeSessionId={}, update success={}",
        instanceName, _liveInstanceStatus, _freezeSessionId, success);
  }

  private String getStateTransitionType(String prefix, String fromState, String toState) {
    if (prefix == null || fromState == null || toState == null) {
      return null;
    }
    return String.format("%s.%s.%s", prefix, fromState, toState);
  }

  private String getPerResourceStateTransitionPoolName(String resourceName) {
    return MessageType.STATE_TRANSITION.name() + "." + resourceName;
  }

  public LiveInstanceStatus getLiveInstanceStatus() {
    return _liveInstanceStatus;
  }

  private void removeMessageFromZK(HelixDataAccessor accessor, Message message,
      String instanceName) {
    if (HelixUtil.removeMessageFromZK(accessor, message, instanceName)) {
      LOG.info("Successfully removed message {} from ZK.", message.getMsgId());
    } else {
      LOG.warn("Failed to remove message {} from ZK.", message.getMsgId());
    }
  }

  private void sendNopMessage(HelixDataAccessor accessor, String instanceName) {
    try {
      Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString());
      nopMsg.setSrcName(instanceName);
      nopMsg.setTgtName(instanceName);
      accessor
          .setProperty(accessor.keyBuilder().message(nopMsg.getTgtName(), nopMsg.getId()), nopMsg);
      LOG.info("Send NO_OP message to {}}, msgId: {}.", nopMsg.getTgtName(), nopMsg.getId());
    } catch (Exception e) {
      LOG.error("Failed to send NO_OP message to {}.", instanceName, e);
    }
  }

  @Override
  public void shutdown() {
    LOG.info("Shutting down HelixTaskExecutor");
    _isShuttingDown = true;
    _timer.cancel();

    reset();
    _monitor.shutDown();
    LOG.info("Shutdown HelixTaskExecutor finished");
  }
}
