/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.twill.internal;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import org.apache.twill.api.RunId;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.state.Message;
import org.apache.twill.internal.state.MessageCallback;
import org.apache.twill.internal.state.MessageCodec;
import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * A base implementation of {@link Service} that uses ZooKeeper to transmit states and messages. It uses
 * the following directory structure in ZK:
 *
 * <pre>
 * /instances
 *     |- [runId_1]
 *     |- [runId_2]
 *     |- ...
 * /[runId_1]
 *     |- messages
 *          |- [messageId_1]
 *          |- [messageId_2]
 *          |- ....
 * /[runId_2]
 *     |- messages
 * </pre>
 *
 * It assumes that the zk root node is already namespaced
 * (either with applicationId for AM or runnableId for containers).
 * <p/>
 * The ephemeral nodes under {@code /instances} are the {@code liveNode} for each running instance. It can carries data
 * about that service, which is set by the corresponding implementation.
 * <p/>
 * Each running instance also has its own node named by the runId. Under that node, it has a {@code messages} node for
 * receiving messages from the controller. New message is created by creating a sequence node under the {@code messages}
 * node, with the node data carrying the message content. The message node will be removed once the message
 * is being processed by the service.
 */
public abstract class AbstractTwillService extends AbstractExecutionThreadService implements MessageCallback {

  private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillService.class);
  private static final Gson GSON = new GsonBuilder().serializeNulls().create();

  protected final ZKClient zkClient;
  protected final RunId runId;
  private ExecutorService messageCallbackExecutor;
  private Cancellable watcherCancellable;

  protected AbstractTwillService(final ZKClient zkClient, RunId runId) {
    this.zkClient = zkClient;
    this.runId = runId;
  }

  /**
   * Override to perform any work during service start.
   */
  protected void doStart() throws Exception {
    // Default no-op
  }

  /**
   * Override to execution service work. When this method returns, this Service will stop.
   */
  protected void doRun() throws Exception {
    // Default no-op
  }

  /**
   * Overrides to perform any work during service shutdown.
   */
  protected void doStop() throws Exception {
    // Default no-op
  }

  /**
   * Returns an Object to be stored in the live node. The object return will be GSon serialized. If {@code null}
   * is returned, no data will be stored to the live node.
   */
  protected Object getLiveNodeData() {
    return null;
  }

  /**
   * Returns a {@link Gson} instance for serializing object returned by the {@link #getLiveNodeData()} method.
   */
  protected Gson getLiveNodeGson() {
    return GSON;
  }

  /**
   * Handles message by simply logging it. Child class should override this method for custom handling of message.
   *
   * @see org.apache.twill.internal.state.MessageCallback
   */
  @Override
  public ListenableFuture<String> onReceived(String messageId, Message message) {
    LOG.info("Message received: {}", message);
    return Futures.immediateCheckedFuture(messageId);
  }

  @Override
  protected final void startUp() throws Exception {
    // Single thread executor that will discard task silently if it is already terminated, which only
    // happens when this service is shutting down.
    messageCallbackExecutor = new ThreadPoolExecutor(1, 1,
                                                     0L, TimeUnit.MILLISECONDS,
                                                     new LinkedBlockingQueue<Runnable>(),
                                                     Threads.createDaemonThreadFactory("message-callback"),
                                                     new ThreadPoolExecutor.DiscardPolicy());

    // Watch for session expiration, recreate the live node if reconnected after expiration.
    watcherCancellable = zkClient.addConnectionWatcher(new Watcher() {
      private boolean expired = false;

      @Override
      public void process(WatchedEvent event) {
        if (event.getState() == Event.KeeperState.Expired) {
          LOG.warn("ZK Session expired for service {} with runId {}.", getServiceName(), runId.getId());
          expired = true;
        } else if (event.getState() == Event.KeeperState.SyncConnected && expired) {
          LOG.info("Reconnected after expiration for service {} with runId {}", getServiceName(), runId.getId());
          expired = false;
          logIfFailed(createLiveNode());
        }
      }
    });

    // Create the live node, if succeeded, start the service, otherwise fail out.
    createLiveNode().get();

    // Create node for messaging
    ZKOperations.ignoreError(zkClient.create(getZKPath("messages"), null, CreateMode.PERSISTENT),
                             KeeperException.NodeExistsException.class, null).get();

    doStart();

    // Starts watching for messages
    watchMessages();
  }

  @Override
  protected final void run() throws Exception {
    doRun();
  }

  @Override
  protected final void shutDown() throws Exception {
    if (watcherCancellable != null) {
      watcherCancellable.cancel();
    }

    messageCallbackExecutor.shutdownNow();
    try {
      doStop();
    } finally {
      // Given at most 5 seconds to cleanup ZK nodes
      removeLiveNode().get(5, TimeUnit.SECONDS);
      LOG.info("Service {} with runId {} shutdown completed", getServiceName(), runId.getId());
    }
  }

  /**
   * Update the live node for the service.
   *
   * @return A {@link OperationFuture} that will be completed when the update is done.
   */
  protected final OperationFuture<?> updateLiveNode() {
    String liveNodePath = getLiveNodePath();
    LOG.info("Update live node {}{}", zkClient.getConnectString(), liveNodePath);
    return zkClient.setData(liveNodePath, serializeLiveNode());
  }

  /**
   * Creates the live node for the service. If the node already exists, it will be deleted before creation.
   *
   * @return A {@link OperationFuture} that will be completed when the creation is done.
   */
  private OperationFuture<String> createLiveNode() {
    final String liveNodePath = getLiveNodePath();
    LOG.info("Creating live node {}{}", zkClient.getConnectString(), liveNodePath);
    return ZKOperations.createDeleteIfExists(zkClient, liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL, true);
  }

  private OperationFuture<String> removeLiveNode() {
    String liveNode = getLiveNodePath();
    LOG.info("Remove live node {}{}", zkClient.getConnectString(), liveNode);
    return ZKOperations.ignoreError(zkClient.delete(liveNode), KeeperException.NoNodeException.class, liveNode);
  }

  /**
   * Watches for messages that are sent through ZK messages node.
   */
  private void watchMessages() {
    final String messagesPath = getZKPath("messages");
    Futures.addCallback(zkClient.getChildren(messagesPath, new Watcher() {
      @Override
      public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeChildrenChanged && isRunning()) {
          watchMessages();
        }
      }
    }), new FutureCallback<NodeChildren>() {
      @Override
      public void onSuccess(NodeChildren result) {
        // Sort by the name, which is the messageId. Assumption is that message ids is ordered by time.
        List<String> messages = Lists.newArrayList(result.getChildren());
        Collections.sort(messages);
        for (String messageId : messages) {
          processMessage(messagesPath + "/" + messageId, messageId);
        }
      }

      @Override
      public void onFailure(Throwable t) {
        // TODO: what could be done besides just logging?
        LOG.error("Failed to watch messages.", t);
      }
    }, Threads.SAME_THREAD_EXECUTOR);
  }

  private void processMessage(final String path, final String messageId) {
    Futures.addCallback(zkClient.getData(path), new FutureCallback<NodeData>() {
      @Override
      public void onSuccess(NodeData result) {
        Runnable messageRemover = createMessageRemover(path, result.getStat().getVersion());

        Message message = MessageCodec.decode(result.getData());
        if (message == null) {
          LOG.error("Failed to decode message for {} in {}", messageId, path);
          messageRemover.run();
          return;
        }
        if (LOG.isDebugEnabled()) {
          LOG.debug("Message received from {}: {}", path, new String(MessageCodec.encode(message), Charsets.UTF_8));
        }

        // Handle the stop message
        if (handleStopMessage(message, messageRemover)) {
          return;
        }
        // Otherwise, delegate to the child class to handle the message
        handleMessage(messageId, message, messageRemover);
      }

      @Override
      public void onFailure(Throwable t) {
        LOG.error("Failed to fetch message content from {}", path, t);
      }
    }, messageCallbackExecutor);
  }

  /**
   * Handles {@link SystemMessages#STOP_COMMAND} if the given message is a stop command. After this service is stopped,
   * the message node will be removed.
   *
   * @param message Message to process
   * @param messageRemover Runnable to remove the message node when this service is stopped
   * @return {@code true} if the given message is a stop command, {@code false} otherwise
   */
  private boolean handleStopMessage(Message message, final Runnable messageRemover) {
    if (message.getType() != Message.Type.SYSTEM || !SystemMessages.STOP_COMMAND.equals(message.getCommand())) {
      return false;
    }

    // Stop this service.
    Futures.addCallback(stop(), new FutureCallback<State>() {
      @Override
      public void onSuccess(State result) {
        messageRemover.run();
      }

      @Override
      public void onFailure(Throwable t) {
        LOG.error("Stop service failed upon STOP command", t);
        messageRemover.run();
      }
    }, Threads.SAME_THREAD_EXECUTOR);
    return true;
  }


  /**
   * Handles the given message by calling {@link #onReceived(java.lang.String, org.apache.twill.internal.state.Message)}
   * method.
   *
   * @param messageId Id of the message
   * @param message The message
   * @param messageRemover Runnable to remove the message node when the handling of the message is completed
   */
  private void handleMessage(String messageId, final Message message, final Runnable messageRemover) {
    Futures.addCallback(onReceived(messageId, message), new FutureCallback<String>() {
      @Override
      public void onSuccess(String result) {
        messageRemover.run();
      }

      @Override
      public void onFailure(Throwable t) {
        LOG.error("Failed to handle message {}", message, t);
        messageRemover.run();
      }
    }, Threads.SAME_THREAD_EXECUTOR);
  }

  /**
   * Creates a {@link Runnable} that encapsulation the action to remove a particular message node.
   */
  private Runnable createMessageRemover(final String path, final int version) {
    return new Runnable() {
      @Override
      public void run() {
        logIfFailed(zkClient.delete(path, version));
      }
    };
  }

  /**
   * Logs if the given future failed.
   */
  private <T> void logIfFailed(ListenableFuture<T> future) {
    Futures.addCallback(future, new FutureCallback<T>() {
      @Override
      public void onSuccess(T result) {
        // All-good
      }

      @Override
      public void onFailure(Throwable t) {
        LOG.error("Operation failed for service {} with runId {}", getServiceName(), runId, t);
      }
    }, Threads.SAME_THREAD_EXECUTOR);
  }

  private String getZKPath(String path) {
    return String.format("/%s/%s", runId.getId(), path);
  }

  private String getLiveNodePath() {
    return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, runId.getId());
  }

  private byte[] serializeLiveNode() {
    JsonObject content = new JsonObject();
    Object liveNodeData = getLiveNodeData();
    if (liveNodeData != null) {
      content.add("data", getLiveNodeGson().toJsonTree(liveNodeData));
    }
    return GSON.toJson(content).getBytes(Charsets.UTF_8);
  }
}
