package org.apache.helix.manager.zk;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.io.StringReader;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.collect.ImmutableList;
import org.apache.helix.Criteria;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StatusUpdate;
import org.apache.helix.util.StatusUpdateUtil;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * The current implementation supports throttling on STATE-TRANSITION type of message, transition SCHEDULED-COMPLETED.
 *
 */
public class DefaultSchedulerMessageHandlerFactory implements MultiTypeMessageHandlerFactory {
  public static final String WAIT_ALL = "WAIT_ALL";
  public static final String SCHEDULER_MSG_ID = "SchedulerMessageId";
  public static final String SCHEDULER_TASK_QUEUE = "SchedulerTaskQueue";
  public static final String CONTROLLER_MSG_ID = "controllerMsgId";
  public static final int TASKQUEUE_BUCKET_NUM = 10;

  public static class SchedulerAsyncCallback extends AsyncCallback {
    StatusUpdateUtil _statusUpdateUtil = new StatusUpdateUtil();
    Message _originalMessage;
    HelixManager _manager;
    final Map<String, Map<String, String>> _resultSummaryMap =
        new ConcurrentHashMap<String, Map<String, String>>();

    public SchedulerAsyncCallback(Message originalMessage, HelixManager manager) {
      _originalMessage = originalMessage;
      _manager = manager;
    }

    @Override
    public void onTimeOut() {
      _logger.info("Scheduler msg timeout " + _originalMessage.getMsgId() + " timout with "
          + _timeout + " Ms");

      _statusUpdateUtil.logError(_originalMessage, SchedulerAsyncCallback.class, "Task timeout",
          _manager);
      addSummary(_resultSummaryMap, _originalMessage, _manager, true);
    }

    @Override
    public void onReplyMessage(Message message) {
      _logger.info("Update for scheduler msg " + _originalMessage.getMsgId() + " Message "
          + message.getMsgSrc() + " id " + message.getCorrelationId() + " completed");
      String key = "MessageResult " + message.getMsgSrc() + " " + UUID.randomUUID();
      _resultSummaryMap.put(key, message.getResultMap());

      if (this.isDone()) {
        _logger.info("Scheduler msg " + _originalMessage.getMsgId() + " completed");
        _statusUpdateUtil.logInfo(_originalMessage, SchedulerAsyncCallback.class,
            "Scheduler task completed", _manager);
        addSummary(_resultSummaryMap, _originalMessage, _manager, false);
      }
    }

    private void addSummary(Map<String, Map<String, String>> _resultSummaryMap,
        Message originalMessage, HelixManager manager, boolean timeOut) {
      Map<String, String> summary = new TreeMap<String, String>();
      summary.put("TotalMessages:", "" + _resultSummaryMap.size());
      summary.put("Timeout", "" + timeOut);
      _resultSummaryMap.put("Summary", summary);

      HelixDataAccessor accessor = manager.getHelixDataAccessor();
      Builder keyBuilder = accessor.keyBuilder();
      ZNRecord statusUpdate =
          accessor.getProperty(
              keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
                  originalMessage.getMsgId())).getRecord();

      statusUpdate.getMapFields().putAll(_resultSummaryMap);
      accessor.setProperty(
          keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
              originalMessage.getMsgId()), new StatusUpdate(statusUpdate));

    }
  }

  private static Logger _logger = LoggerFactory.getLogger(DefaultSchedulerMessageHandlerFactory.class);
  HelixManager _manager;

  public DefaultSchedulerMessageHandlerFactory(HelixManager manager) {
    _manager = manager;
  }

  @Override
  public MessageHandler createHandler(Message message, NotificationContext context) {
    String type = message.getMsgType();

    if (!type.equals(getMessageType())) {
      throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:"
          + message.getMsgType());
    }

    return new DefaultSchedulerMessageHandler(message, context, _manager);
  }

  @Override
  public String getMessageType() {
    return MessageType.SCHEDULER_MSG.name();
  }

  @Override
  public List<String> getMessageTypes() {
    return ImmutableList.of(MessageType.SCHEDULER_MSG.name());
  }

  @Override
  public void reset() {
  }

  public static class DefaultSchedulerMessageHandler extends MessageHandler {
    HelixManager _manager;

    public DefaultSchedulerMessageHandler(Message message, NotificationContext context,
        HelixManager manager) {
      super(message, context);
      _manager = manager;
    }

    void handleMessageUsingScheduledTaskQueue(Criteria recipientCriteria, Message messageTemplate,
        String controllerMsgId) {
      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
      Builder keyBuilder = accessor.keyBuilder();

      String clusterName = recipientCriteria.getClusterName();
      if (clusterName != null && !clusterName.equals(_manager.getClusterName())) {
        throw new HelixException(String.format(
            "ScheduledTaskQueue cannot send message to another cluster. Local cluster name %s, remote cluster name %s.",
            _manager.getClusterName(), clusterName));
      }

      Map<String, String> sendSummary = new HashMap<String, String>();
      sendSummary.put("MessageCount", "0");
      Map<InstanceType, List<Message>> messages =
          _manager.getMessagingService().generateMessage(recipientCriteria, messageTemplate);

      // Calculate tasks, and put them into the idealState of the SCHEDULER_TASK_QUEUE resource.
      // List field are the destination node, while the Message parameters are stored in the
      // mapFields
      // task throttling can be done on SCHEDULER_TASK_QUEUE resource
      if (messages.size() > 0) {
        String taskQueueName = _message.getRecord().getSimpleField(SCHEDULER_TASK_QUEUE);
        if (taskQueueName == null) {
          throw new HelixException("SchedulerTaskMessage need to have " + SCHEDULER_TASK_QUEUE
              + " specified.");
        }
        IdealState newAddedScheduledTasks = new IdealState(taskQueueName);
        newAddedScheduledTasks.setBucketSize(TASKQUEUE_BUCKET_NUM);
        newAddedScheduledTasks.setStateModelDefRef(SCHEDULER_TASK_QUEUE);

        synchronized (_manager) {
          int existingTopPartitionId = 0;
          IdealState currentTaskQueue =
              _manager.getHelixDataAccessor().getProperty(
                  accessor.keyBuilder().idealStates(newAddedScheduledTasks.getId()));
          if (currentTaskQueue != null) {
            existingTopPartitionId = findTopPartitionId(currentTaskQueue) + 1;
          }

          List<Message> taskMessages = (List<Message>) (messages.values().toArray()[0]);
          for (Message task : taskMessages) {
            String partitionId = taskQueueName + "_" + existingTopPartitionId;
            existingTopPartitionId++;
            String instanceName = task.getTgtName();
            newAddedScheduledTasks.setPartitionState(partitionId, instanceName, "COMPLETED");
            task.getRecord().setSimpleField(instanceName, "COMPLETED");
            task.getRecord().setSimpleField(CONTROLLER_MSG_ID, controllerMsgId);

            List<String> priorityList = new LinkedList<String>();
            priorityList.add(instanceName);
            newAddedScheduledTasks.getRecord().setListField(partitionId, priorityList);
            newAddedScheduledTasks.getRecord().setMapField(partitionId,
                task.getRecord().getSimpleFields());
            _logger.info("Scheduling for controllerMsg " + controllerMsgId + " , sending task "
                + partitionId + " " + task.getMsgId() + " to " + instanceName);

            if (_logger.isDebugEnabled()) {
              _logger.debug(task.getRecord().getSimpleFields().toString());
            }
          }
          _manager.getHelixDataAccessor().updateProperty(
              accessor.keyBuilder().idealStates(newAddedScheduledTasks.getId()),
              newAddedScheduledTasks);
          sendSummary.put("MessageCount", "" + taskMessages.size());
        }
      }
      // Record the number of messages sent into scheduler message status updates

      ZNRecord statusUpdate =
          accessor.getProperty(
              keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
                  _message.getMsgId())).getRecord();

      statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
      accessor.updateProperty(keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
          _message.getMsgId()), new StatusUpdate(statusUpdate));
    }

    private int findTopPartitionId(IdealState currentTaskQueue) {
      int topId = 0;
      for (String partitionName : currentTaskQueue.getPartitionSet()) {
        try {
          String partitionNumStr = partitionName.substring(partitionName.lastIndexOf('_') + 1);
          int num = Integer.parseInt(partitionNumStr);
          if (topId < num) {
            topId = num;
          }
        } catch (Exception e) {
          _logger.error("", e);
        }
      }
      return topId;
    }

    @Override
    public HelixTaskResult handleMessage() throws InterruptedException {
      String type = _message.getMsgType();
      HelixTaskResult result = new HelixTaskResult();
      if (!type.equals(MessageType.SCHEDULER_MSG.name())) {
        throw new HelixException("Unexpected msg type for message " + _message.getMsgId()
            + " type:" + _message.getMsgType());
      }
      // Parse timeout value
      int timeOut = -1;
      if (_message.getRecord().getSimpleFields().containsKey("TIMEOUT")) {
        try {
          timeOut = Integer.parseInt(_message.getRecord().getSimpleFields().get("TIMEOUT"));
        } catch (Exception e) {
        }
      }

      // Parse the message template
      ZNRecord record = new ZNRecord("templateMessage");
      record.getSimpleFields().putAll(_message.getRecord().getMapField("MessageTemplate"));
      Message messageTemplate = new Message(record);

      // Parse the criteria
      StringReader sr = new StringReader(_message.getRecord().getSimpleField("Criteria"));
      ObjectMapper mapper = new ObjectMapper();
      Criteria recipientCriteria;
      try {
        recipientCriteria = mapper.readValue(sr, Criteria.class);
      } catch (Exception e) {
        _logger.error("", e);
        result.setException(e);
        result.setSuccess(false);
        return result;
      }
      _logger.info("Scheduler sending message, criteria:" + recipientCriteria);

      boolean waitAll = false;
      if (_message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL) != null) {
        try {
          waitAll =
              Boolean.parseBoolean(_message.getRecord().getSimpleField(
                  DefaultSchedulerMessageHandlerFactory.WAIT_ALL));
        } catch (Exception e) {
          _logger.warn("", e);
        }
      }
      boolean hasSchedulerTaskQueue =
          _message.getRecord().getSimpleFields().containsKey(SCHEDULER_TASK_QUEUE);
      // If the target is PARTICIPANT, use the ScheduledTaskQueue
      if (InstanceType.PARTICIPANT == recipientCriteria.getRecipientInstanceType()
          && hasSchedulerTaskQueue) {
        handleMessageUsingScheduledTaskQueue(recipientCriteria, messageTemplate,
            _message.getMsgId());
        result.setSuccess(true);
        result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMsgId());
        result.getTaskResultMap().put("ControllerResult",
            "msg " + _message.getMsgId() + " from " + _message.getMsgSrc() + " processed");
        return result;
      }

      _logger.info("Scheduler sending message to Controller");
      int nMsgsSent = 0;
      SchedulerAsyncCallback callback = new SchedulerAsyncCallback(_message, _manager);
      if (waitAll) {
        nMsgsSent =
            _manager.getMessagingService().sendAndWait(recipientCriteria, messageTemplate,
                callback, timeOut);
      } else {
        nMsgsSent =
            _manager.getMessagingService().send(recipientCriteria, messageTemplate, callback,
                timeOut);
      }
      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
      Builder keyBuilder = accessor.keyBuilder();

      // Record the number of messages sent into status updates
      Map<String, String> sendSummary = new HashMap<String, String>();
      sendSummary.put("MessageCount", "" + nMsgsSent);

      ZNRecord statusUpdate =
          accessor.getProperty(
              keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
                  _message.getMsgId())).getRecord();

      statusUpdate.getMapFields().put("SentMessageCount", sendSummary);

      accessor.setProperty(keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.name(),
          _message.getMsgId()), new StatusUpdate(statusUpdate));

      result.getTaskResultMap().put("ControllerResult",
          "msg " + _message.getMsgId() + " from " + _message.getMsgSrc() + " processed");
      result.getTaskResultMap().put(SCHEDULER_MSG_ID, _message.getMsgId());
      result.setSuccess(true);
      return result;
    }

    @Override
    public void onError(Exception e, ErrorCode code, ErrorType type) {
      _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
    }
  }
}
