/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.heron.statemgr.zookeeper.curator;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Message;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.common.basics.Pair;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.scheduler.Scheduler;
import org.apache.heron.proto.system.ExecutionEnvironment;
import org.apache.heron.proto.system.PackingPlans;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.proto.tmaster.TopologyMaster;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.common.Key;
import org.apache.heron.spi.statemgr.Lock;
import org.apache.heron.spi.statemgr.WatchCallback;
import org.apache.heron.spi.utils.NetworkUtils;
import org.apache.heron.statemgr.FileSystemStateManager;
import org.apache.heron.statemgr.zookeeper.ZkContext;
import org.apache.heron.statemgr.zookeeper.ZkUtils;
import org.apache.heron.statemgr.zookeeper.ZkWatcherCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;

public class CuratorStateManager extends FileSystemStateManager {
  private static final Logger LOG = Logger.getLogger(CuratorStateManager.class.getName());
  private static final int TUNNEL_SETUP_RETRY = 0;  // 0 means no retry
  private static final int TUNNEL_SETUP_RETRY_SLEEP_SEC = 5;

  private CuratorFramework client;
  private String connectionString;
  private boolean isSchedulerService;
  private List<Process> tunnelProcesses;
  private Config config;

  @Override
  public void initialize(Config newConfig) {
    super.initialize(newConfig);

    this.config = newConfig;
    this.connectionString = Context.stateManagerConnectionString(newConfig);
    this.isSchedulerService = Context.schedulerService(newConfig);
    this.tunnelProcesses = new ArrayList<>();

    NetworkUtils.TunnelConfig tunnelConfig =
        NetworkUtils.TunnelConfig.build(config, NetworkUtils.HeronSystem.STATE_MANAGER);

    if (tunnelConfig.isTunnelNeeded()) {
      for (int setupCount = 0;; ++setupCount) {
        Pair<String, List<Process>> tunneledResults = setupZkTunnel(tunnelConfig);
        String newConnectionString = tunneledResults.first;

        // If tunnel can't be setup correctly. Retry or bail.
        if (!newConnectionString.isEmpty()) {
          // Success, use the new connection string
          connectionString = newConnectionString;
          tunnelProcesses.addAll(tunneledResults.second);
          break;
        } else {
          if (setupCount < TUNNEL_SETUP_RETRY) {
            try {
              TimeUnit.SECONDS.sleep(TUNNEL_SETUP_RETRY_SLEEP_SEC);
            } catch (InterruptedException ex) {
              Thread.currentThread().interrupt();
            }
          } else {
            throw new IllegalArgumentException("Failed to connect to tunnel host '"
                + tunnelConfig.getTunnelHost() + "'");
          }
        }
      }
    }

    // Start it
    client = getCuratorClient();
    LOG.info("Starting Curator client connecting to: " + connectionString);
    client.start();

    try {
      if (!client.blockUntilConnected(ZkContext.connectionTimeoutMs(newConfig),
          TimeUnit.MILLISECONDS)) {
        throw new RuntimeException("Failed to connect to " + connectionString);
      }
    } catch (InterruptedException e) {
      throw new RuntimeException("Interrupted from blockUntilConnected(): " + connectionString, e);
    }

    if (ZkContext.isInitializeTree(newConfig)) {
      initTree();
    }
  }

  /**
   * Lock backed by {@code InterProcessSemaphoreMutex}. Guaranteed to atomically get a
   * distributed ephemeral lock backed by zookeeper. The lock should be explicitly released to
   * avoid unnecessary waiting by other threads waiting on it.
   */
  private final class DistributedLock implements Lock {
    private String path;
    private InterProcessSemaphoreMutex lock;

    private DistributedLock(CuratorFramework client, String path) {
      this.path = path;
      this.lock = new InterProcessSemaphoreMutex(client, path);
    }

    @Override
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
      try {
        return this.lock.acquire(timeout, unit);
      } catch (InterruptedException e) {
        throw e;
        // SUPPRESS CHECKSTYLE IllegalCatch
      } catch (Exception e) {
        throw new RuntimeException("Error while trying to acquire distributed lock at " + path, e);
      }
    }

    @Override
    public void unlock() {
      try {
        this.lock.release();
        // SUPPRESS CHECKSTYLE IllegalCatch
      } catch (Exception e) {
        throw new RuntimeException("Error while trying to release distributed lock at " + path, e);
      }
    }
  }

  protected CuratorFramework getCuratorClient() {
    // these are reasonable arguments for the ExponentialBackoffRetry. The first
    // retry will wait 1 second - the second will wait up to 2 seconds - the
    // third will wait up to 4 seconds.
    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(
        ZkContext.retryIntervalMs(config), ZkContext.retryCount(config));

    // using the CuratorFrameworkFactory.builder() gives fine grained control
    // over creation options. See the CuratorFrameworkFactory.Builder javadoc
    // details
    return CuratorFrameworkFactory.builder()
        .connectString(connectionString)
        .retryPolicy(retryPolicy)
        .connectionTimeoutMs(ZkContext.connectionTimeoutMs(config))
        .sessionTimeoutMs(ZkContext.sessionTimeoutMs(config))
            // etc. etc.
        .build();
  }

  protected Pair<String, List<Process>> setupZkTunnel(NetworkUtils.TunnelConfig tunnelConfig) {
    return ZkUtils.setupZkTunnel(config, tunnelConfig);
  }

  protected void initTree() {
    // Make necessary directories
    for (StateLocation location : StateLocation.values()) {
      LOG.fine(String.format("%s directory: %s", location.getName(), getStateDirectory(location)));
    }

    try {
      for (StateLocation location : StateLocation.values()) {
        client.createContainers(getStateDirectory(location));
      }

      // Suppress it since createContainers() throws Exception
      // SUPPRESS CHECKSTYLE IllegalCatch
    } catch (Exception e) {
      throw new RuntimeException("Failed to initialize tree", e);
    }

    LOG.info("Directory tree initialized.");
  }

  @Override
  public void close() {
    if (client != null) {
      LOG.info("Closing the CuratorClient to: " + connectionString);
      client.close();
    }

    // Close the tunneling
    LOG.info("Closing the tunnel processes");
    if (tunnelProcesses != null) {
      for (Process process : tunnelProcesses) {
        process.destroy();
      }
    }
  }

  public String getConnectionString() {
    return connectionString;
  }

  // Make utils class protected for easy unit testing
  @Override
  protected ListenableFuture<Boolean> nodeExists(String path) {
    final SettableFuture<Boolean> result = SettableFuture.create();

    try {
      LOG.info("Checking existence of path: " + path);
      safeSetFuture(result, client.checkExists().forPath(path) != null);

      // Suppress it since forPath() throws Exception
      // SUPPRESS CHECKSTYLE IllegalCatch
    } catch (Exception e) {
      safeSetException(result, new RuntimeException("Could not check Exist", e));
    }

    return result;
  }

  protected ListenableFuture<Boolean> createNode(
      StateLocation location, String topologyName,
      byte[] data,
      boolean isEphemeral) {
    return createNode(getStatePath(location, topologyName), data, isEphemeral);
  }

  @VisibleForTesting
  protected ListenableFuture<Boolean> createNode(
      String path,
      byte[] data,
      boolean isEphemeral) {
    final SettableFuture<Boolean> result = SettableFuture.create();

    try {
      client.create().
          withMode(isEphemeral ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT)
          .forPath(path, data);
      LOG.info("Created node for path: " + path);
      safeSetFuture(result, true);

      // Suppress it since forPath() throws Exception
      // SUPPRESS CHECKSTYLE IllegalCatch
    } catch (Exception e) {
      safeSetException(result, new RuntimeException("Could not createNode:", e));
    }
    return result;
  }

  @Override
  protected ListenableFuture<Boolean> deleteNode(String path, boolean deleteChildrenIfNecessary) {
    final SettableFuture<Boolean> result = SettableFuture.create();

    try {
      DeleteBuilder deleteBuilder = client.delete();
      if (deleteChildrenIfNecessary) {
        deleteBuilder = (DeleteBuilder) deleteBuilder.deletingChildrenIfNeeded();
      }
      deleteBuilder.withVersion(-1).forPath(path);
      LOG.info("Deleted node for path: " + path);
      safeSetFuture(result, true);

    } catch (KeeperException e) {
      if (KeeperException.Code.NONODE.equals(e.code())) {
        safeSetFuture(result, true);
      } else {
        safeSetException(result, new RuntimeException("Could not deleteNode", e));
      }

      // Suppress it since forPath() throws Exception
      // SUPPRESS CHECKSTYLE IllegalCatch
    } catch (Exception e) {
      safeSetException(result, new RuntimeException("Could not deleteNode", e));
    }

    return result;
  }

  @Override
  protected <M extends Message> ListenableFuture<M> getNodeData(
      WatchCallback watcher,
      String path,
      final Message.Builder builder) {
    final SettableFuture<M> future = SettableFuture.create();

    Watcher wc = ZkWatcherCallback.makeZkWatcher(watcher);

    BackgroundCallback cb = new BackgroundCallback() {
      @Override
      @SuppressWarnings("unchecked") // we don't know what M is until runtime
      public void processResult(CuratorFramework aClient, CuratorEvent event) throws Exception {
        byte[] data;
        if (event != null & (data = event.getData()) != null) {
          builder.mergeFrom(data);
          safeSetFuture(future, (M) builder.build());
        } else {
          safeSetException(future, new RuntimeException("Failed to fetch data from path: "
              + event.getPath()));
        }
      }
    };

    try {
      client.getData().usingWatcher(wc).inBackground(cb).forPath(path);

      // Suppress it since forPath() throws Exception
      // SUPPRESS CHECKSTYLE IllegalCatch
    } catch (Exception e) {
      safeSetException(future, new RuntimeException(
          "Could not getNodeData using watcher for path: " + path, e));
    }

    return future;
  }

  @Override
  protected Lock getLock(String path) {
    return new DistributedLock(this.client, path);
  }

  @Override
  public ListenableFuture<Boolean> setTMasterLocation(
      TopologyMaster.TMasterLocation location,
      String topologyName) {
    return createNode(StateLocation.TMASTER_LOCATION, topologyName, location.toByteArray(), true);
  }

  @Override
  public ListenableFuture<Boolean> setMetricsCacheLocation(
      TopologyMaster.MetricsCacheLocation location,
      String topologyName) {
    client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
      @Override
      public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        if (arg1 != ConnectionState.CONNECTED) {
          // if not the first time successful connection, fail fast
          throw new RuntimeException("Unexpected state change to: " + arg1.name());
        }
      }
    });
    return createNode(
        StateLocation.METRICSCACHE_LOCATION, topologyName, location.toByteArray(), true);
  }

  @Override
  public ListenableFuture<Boolean> setExecutionState(
      ExecutionEnvironment.ExecutionState executionState,
      String topologyName) {
    return createNode(
        StateLocation.EXECUTION_STATE, topologyName, executionState.toByteArray(), false);
  }

  @Override
  public ListenableFuture<Boolean> setTopology(
      TopologyAPI.Topology topology,
      String topologyName) {
    return createNode(StateLocation.TOPOLOGY, topologyName, topology.toByteArray(), false);
  }

  @Override
  public ListenableFuture<Boolean> setPhysicalPlan(
      PhysicalPlans.PhysicalPlan physicalPlan,
      String topologyName) {
    return createNode(StateLocation.PHYSICAL_PLAN, topologyName, physicalPlan.toByteArray(), false);
  }

  @Override
  public ListenableFuture<Boolean> setPackingPlan(
      PackingPlans.PackingPlan packingPlan,
      String topologyName) {
    return createNode(StateLocation.PACKING_PLAN, topologyName, packingPlan.toByteArray(), false);
  }

  @Override
  public ListenableFuture<Boolean> setStatefulCheckpoints(
      CheckpointManager.StatefulConsistentCheckpoints checkpoint,
      String topologyName) {
    return createNode(StateLocation.STATEFUL_CHECKPOINT, topologyName,
        checkpoint.toByteArray(), false);
  }

  @Override
  public ListenableFuture<Boolean> setSchedulerLocation(
      Scheduler.SchedulerLocation location,
      String topologyName) {
    // if isService, set the node as ephemeral node; set as persistent node otherwise
    return createNode(StateLocation.SCHEDULER_LOCATION, topologyName,
        location.toByteArray(),
        isSchedulerService);
  }

  @Override
  public ListenableFuture<Boolean> deleteTMasterLocation(String topologyName) {
    // It is a EPHEMERAL node and would be removed automatically
    final SettableFuture<Boolean> result = SettableFuture.create();
    safeSetFuture(result, true);
    return result;
  }

  @Override
  public ListenableFuture<Boolean> deleteMetricsCacheLocation(String topologyName) {
    // It is a EPHEMERAL node and would be removed automatically
    final SettableFuture<Boolean> result = SettableFuture.create();
    safeSetFuture(result, true);
    return result;
  }

  @Override
  public ListenableFuture<Boolean> deleteSchedulerLocation(String topologyName) {
    // if scheduler is service, the znode is ephemeral and it's deleted automatically
    if (isSchedulerService) {
      final SettableFuture<Boolean> result = SettableFuture.create();
      safeSetFuture(result, true);
      return result;
    } else {
      return deleteNode(getStatePath(StateLocation.SCHEDULER_LOCATION, topologyName), false);
    }
  }

  public static void main(String[] args) throws ExecutionException, InterruptedException,
      IllegalAccessException, ClassNotFoundException, InstantiationException {
    if (args.length < 2) {
      throw new RuntimeException("Expects 2 arguments: <topology_name> <zookeeper_hostname>");
    }

    String zookeeperHostname = args[1];
    Config config = Config.newBuilder()
        .put(Key.STATEMGR_ROOT_PATH, "/storm/heron/states")
        .put(Key.STATEMGR_CONNECTION_STRING, zookeeperHostname)
        .put(Key.SCHEDULER_IS_SERVICE, false)
        .build();
    CuratorStateManager stateManager = new CuratorStateManager();
    stateManager.doMain(args, config);
  }
}
