package org.apache.helix.metaclient.impl.zk;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.metaclient.api.AsyncCallback;
import org.apache.helix.metaclient.api.ChildChangeListener;
import org.apache.helix.metaclient.api.ConnectStateChangeListener;
import org.apache.helix.metaclient.api.DataChangeListener;
import org.apache.helix.metaclient.api.DataUpdater;
import org.apache.helix.metaclient.api.DirectChildChangeListener;
import org.apache.helix.metaclient.api.DirectChildSubscribeResult;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.StateChangeListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientCreateCallbackHandler;
import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientDeleteCallbackHandler;
import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientExistCallbackHandler;
import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientGetCallbackHandler;
import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientSetCallbackHandler;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode;
import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;


public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
  private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClient.class);
  private final ZkClient _zkClient;
  private final long _initConnectionTimeout;
  private final long _reconnectTimeout;

  // After ZkClient gets disconnected from ZK server, it keeps retrying connection until connection
  // is re-established or ZkClient is closed. We need a separate thread to monitor ZkClient
  // reconnect and close ZkClient if it not able to reconnect within user specified timeout.
  private final ScheduledExecutorService _zkClientReconnectMonitor;
  private ScheduledFuture<?> _reconnectMonitorFuture;
  private ReconnectStateChangeListener _reconnectStateChangeListener;
  // Lock all activities related to ZkClient connection
  private ReentrantLock _zkClientConnectionMutex = new ReentrantLock();


  public ZkMetaClient(ZkMetaClientConfig config) {
    _initConnectionTimeout = config.getConnectionInitTimeoutInMillis();
    _reconnectTimeout = config.getMetaClientReconnectPolicy().getAutoReconnectTimeout();
    // TODO: Right new ZkClient reconnect using exp backoff with fixed max backoff interval. We should
    // Allow user to config reconnect policy
    _zkClient = new ZkClient(
        new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()),
        (int) _initConnectionTimeout, _reconnectTimeout /*use reconnect timeout for retry timeout*/,
        config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(),
        config.getMonitorInstanceName(), config.getMonitorRootPathOnly(), false, true);
    _zkClientReconnectMonitor = Executors.newSingleThreadScheduledExecutor();
    _reconnectStateChangeListener = new ReconnectStateChangeListener();
  }

  @Override
  public void create(String key, Object data) {
    try {
      create(key, data, EntryMode.PERSISTENT);
    } catch (Exception e) {
      throw new MetaClientException(e);
    }
  }

  @Override
  public void create(String key, Object data, MetaClientInterface.EntryMode mode) {

    try{
      _zkClient.create(key, data, ZkMetaClientUtil.convertMetaClientMode(mode));
    } catch (ZkException | KeeperException e) {
      throw new MetaClientException(e);
    }
  }

  @Override
  public void createWithTTL(String key, T data, long ttl) {
    try{
      _zkClient.createPersistentWithTTL(key, data, ttl);
    } catch (ZkException e) {
      throw translateZkExceptionToMetaclientException(e);
    }
  }

  @Override
  public void renewTTLNode(String key) {
    T oldData = get(key);
    if (oldData == null) {
      throw new MetaClientNoNodeException("Node at " + key + " does not exist.");
    }
    set(key, oldData, _zkClient.getStat(key).getVersion());
  }

  @Override
  public void set(String key, T data, int version) {
    try {
      _zkClient.writeData(key, data, version);
    } catch (ZkException e) {
      throw translateZkExceptionToMetaclientException(e);
    }
  }

  @Override
  public T update(String key, DataUpdater<T> updater) {
    org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
    // TODO: add retry logic for ZkBadVersionException.
    try {
      T oldData = _zkClient.readData(key, stat);
      T newData = updater.update(oldData);
      set(key, newData, stat.getVersion());
      return newData;
    } catch (ZkException e) {
      throw translateZkExceptionToMetaclientException(e);
    }
  }

  //TODO: Get Expiry Time in Stat
  @Override
  public Stat exists(String key) {
    org.apache.zookeeper.data.Stat zkStats;
    try {
      zkStats = _zkClient.getStat(key);
      if (zkStats == null) {
        return null;
      }
      return new Stat(convertZkEntryModeToMetaClientEntryMode(zkStats.getEphemeralOwner()),
          zkStats.getVersion(), zkStats.getCtime(), zkStats.getMtime(), -1);
    } catch (ZkException e) {
      throw translateZkExceptionToMetaclientException(e);
    }
  }

  @Override
  public T get(String key) {
    return _zkClient.readData(key, true);
  }

  @Override
  public List<String> getDirectChildrenKeys(String key) {
    try {
      return _zkClient.getChildren(key);
    } catch (ZkException e) {
      throw translateZkExceptionToMetaclientException(e);
    }
  }

  @Override
  public int countDirectChildren(String key) {
    return _zkClient.countChildren(key);
  }

  @Override
  public boolean delete(String key) {
    try {
      return _zkClient.delete(key);
    } catch (ZkException e) {
      throw translateZkExceptionToMetaclientException(e);
    }
  }

  @Override
  public boolean recursiveDelete(String key) {
    _zkClient.deleteRecursively(key);
    return true;
  }

  // In Current ZkClient, Async CRUD do auto retry when connection lost or session mismatch using
  // existing retry handling logic in zkClient. (defined in ZkAsyncCallbacks)
  // ZkClient execute async callbacks at zkClient main thead, retry is handles in a separate retry
  // thread. In our first version of implementation, we will keep similar behavior and have
  // callbacks executed in ZkClient event thread, and reuse zkClient retry logic.

  // It is highly recommended *NOT* to perform any blocking operation inside the callbacks.
  // If you block the thread the meta client won't process other events.

  // corresponding callbacks for each operation are invoked in order.
  @Override
  public void setAsyncExecPoolSize(int poolSize) {
    throw new UnsupportedOperationException(
        "All async calls are executed in a single thread to maintain sequence.");
  }

  @Override
  public void asyncCreate(String key, Object data, EntryMode mode, AsyncCallback.VoidCallback cb) {
    CreateMode entryMode;
    try {
      entryMode = ZkMetaClientUtil.convertMetaClientMode(mode);
    } catch (ZkException | KeeperException e) {
      throw new MetaClientException(e);
    }
    _zkClient.asyncCreate(key, data, entryMode,
          new ZkMetaClientCreateCallbackHandler(cb));
  }

  @Override
  public void asyncUpdate(String key, DataUpdater<T> updater, AsyncCallback.DataCallback cb) {
    throw new NotImplementedException("Currently asyncUpdate is not supported in ZkMetaClient.");
    /*
     * TODO:  Only Helix has potential using this API as of now.  (ZkBaseDataAccessor.update())
     *  Will move impl from ZkBaseDataAccessor to here when retiring ZkBaseDataAccessor.
     */
  }

  @Override
  public void asyncGet(String key, AsyncCallback.DataCallback cb) {
    _zkClient.asyncGetData(key,
        new ZkMetaClientGetCallbackHandler(cb));
  }

  @Override
  public void asyncCountChildren(String key, AsyncCallback.DataCallback cb) {
    throw new NotImplementedException(
        "Currently asyncCountChildren is not supported in ZkMetaClient.");
    /*
     * TODO:  Only Helix has potential using this API as of now. (ZkBaseDataAccessor.getChildren())
     *  Will move impl from ZkBaseDataAccessor to here when retiring ZkBaseDataAccessor.
     */

  }

  @Override
  public void asyncExist(String key, AsyncCallback.StatCallback cb) {
    _zkClient.asyncExists(key,
        new ZkMetaClientExistCallbackHandler(cb));
  }

  public void asyncDelete(String key, AsyncCallback.VoidCallback cb) {
    _zkClient.asyncDelete(key, new ZkMetaClientDeleteCallbackHandler(cb));
  }

  @Override
  public void asyncTransaction(Iterable<Op> ops, AsyncCallback.TransactionCallback cb) {
    throw new NotImplementedException(
        "Currently asyncTransaction is not supported in ZkMetaClient.");

     //TODO: There is no active use case for Async transaction.
  }

  @Override
  public void asyncSet(String key, T data, int version, AsyncCallback.StatCallback cb) {
    _zkClient.asyncSetData(key, data, version,
        new ZkMetaClientSetCallbackHandler(cb));
  }

  @Override
  public void connect() {
    try {
      _zkClientConnectionMutex.lock();
      _zkClient.connect(_initConnectionTimeout, _zkClient);
      // register _reconnectStateChangeListener as state change listener to react to ZkClient connect
      // state change event. When ZkClient disconnected from ZK, it still auto reconnect until
      // ZkClient is closed or connection re-established.
      // We will need to close ZkClient when user set retry connection timeout.
      _zkClient.subscribeStateChanges(_reconnectStateChangeListener);
    } catch (ZkException e) {
      throw translateZkExceptionToMetaclientException(e);
    } finally {
      _zkClientConnectionMutex.unlock();
    }
  }

  @Override
  public void disconnect() {
    cleanUpAndClose(true, true);
    _zkClientReconnectMonitor.shutdownNow();
  }

  @Override
  public ConnectState getClientConnectionState() {
    return null;
  }

  @Override
  public boolean subscribeDataChange(String key, DataChangeListener listener, boolean skipWatchingNonExistNode) {
    _zkClient.subscribeDataChanges(key, new DataListenerAdapter(listener));
    return true;
  }

  @Override
  public DirectChildSubscribeResult subscribeDirectChildChange(String key,
      DirectChildChangeListener listener, boolean skipWatchingNonExistNode) {
    ChildrenSubscribeResult result =
        _zkClient.subscribeChildChanges(key, new DirectChildListenerAdapter(listener), skipWatchingNonExistNode);
    return new DirectChildSubscribeResult(result.getChildren(), result.isInstalled());
  }

  @Override
  public boolean subscribeStateChanges(ConnectStateChangeListener listener) {
    _zkClient.subscribeStateChanges(new StateChangeListenerAdapter(listener));
    return true;
  }

  @Override
  public boolean subscribeChildChanges(String key, ChildChangeListener listener, boolean skipWatchingNonExistNode) {
    if (skipWatchingNonExistNode && exists(key) == null) {
      return false;
    }
    _zkClient.subscribePersistRecursiveListener(key, new ChildListenerAdapter(listener));
    return true;
  }

  @Override
  public void unsubscribeDataChange(String key, DataChangeListener listener) {
    _zkClient.unsubscribeDataChanges(key, new DataListenerAdapter(listener));
  }

  @Override
  public void unsubscribeDirectChildChange(String key, DirectChildChangeListener listener) {
    _zkClient.unsubscribeChildChanges(key, new DirectChildListenerAdapter(listener));
  }

  // TODO: add impl and remove UnimplementedException
  @Override
  public void unsubscribeChildChanges(String key, ChildChangeListener listener) {
    _zkClient.unsubscribePersistRecursiveListener(key, new ChildListenerAdapter(listener));
  }

  @Override
  public void unsubscribeConnectStateChanges(ConnectStateChangeListener listener) {
    _zkClient.subscribeStateChanges(new StateChangeListenerAdapter(listener));
  }

  @Override
  public boolean waitUntilExists(String key, TimeUnit timeUnit, long time) {
    return false;
  }

  @Override
  public boolean[] create(List<String> key, List<T> data, List<EntryMode> mode) {
    return new boolean[0];
  }

  @Override
  public boolean[] create(List<String> key, List<T> data) {
    return new boolean[0];
  }

  @Override
  public boolean[] delete(List<String> keys) {
    return new boolean[0];
  }

  @Override
  public List<Stat> exists(List<String> keys) {
    return null;
  }

  @Override
  public List<T> get(List<String> keys) {
    return null;
  }

  @Override
  public List<T> update(List<String> keys, List<DataUpdater<T>> updater) {
    return null;
  }

  @Override
  public boolean[] set(List<String> keys, List<T> datas, List<Integer> version) {
    return new boolean[0];
  }

  @Override
  public void close() {
    disconnect();
  }

  @Override
  public List<OpResult> transactionOP(Iterable<Op> ops) {
    // Convert list of MetaClient Ops to Zk Ops
    List<org.apache.zookeeper.Op> zkOps = ZkMetaClientUtil.metaClientOpsToZkOps(ops);
    // Execute Zk transactional support
    List<org.apache.zookeeper.OpResult> zkResult = _zkClient.multi(zkOps);
    // Convert list of Zk OpResults to MetaClient OpResults
    return ZkMetaClientUtil.zkOpResultToMetaClientOpResults(zkResult);
  }

  @Override
  public byte[] serialize(T data, String path) {
    return _zkClient.serialize(data, path);
  }

  @Override
  public T deserialize(byte[] bytes, String path) {
    return _zkClient.deserialize(bytes, path);
  }

  /**
   * A clean up method called when connect state change or MetaClient is closing.
   * @param cancel If we want to cancel the reconnect monitor thread.
   * @param close If we want to close ZkClient.
   */
  private void cleanUpAndClose(boolean cancel, boolean close) {
    _zkClientConnectionMutex.lock();
    try {
      if (close && !_zkClient.isClosed()) {
        _zkClient.close();
        // TODO: need to unsubscribe all persist watcher from ZK
        // Add this in ZkClient when persist watcher change is in
        // Also need to manually send CLOSED state change to state
        // change listener (in change adapter)
        LOG.info("ZkClient is closed");
      }

      if (cancel && _reconnectMonitorFuture != null) {
        _reconnectMonitorFuture.cancel(true);
        LOG.info("ZkClient reconnect monitor thread is canceled");
      }

    } finally {
      _zkClientConnectionMutex.unlock();
    }
  }

  /**
   * MetaClient uses Helix ZkClient (@see org.apache.helix.zookeeper.impl.client.ZkClient) to connect
   * to ZK. Current implementation of ZkClient auto-reconnects infinitely. We use monitor thread
   * in ZkMetaClient to monitor reconnect status and close ZkClient when the client still is in
   * disconnected state when it reach reconnect timeout.
   *
   *
   * case 1: Start the monitor thread when ZkMetaClient gets disconnected even to check connect state
   *         when timeout reached. If not re-connected when timed out, kill the monitor thread
   *         and close ZkClient.
   * [MetaClient thread]        ---------------------------------------------------------------
   *                              ( When disconnected, schedule a event
   *                              to check connect state after timeout)
   * [Reconnect monitor thread]          --------------------------------------
   *                                   ^                                     |  not reconnected when timed out
   *                                  /                                      |
   *                                 | disconnected event                    v
   * [ZkClient]               -------X---------------------------------------X zkClient.close()
   * [ZkClient exp back              |         X            X
   *  -off retry connection]         |--------|--------------|--------------
   *
   *
   * case 2: Start the monitor thread when ZkMetaClient gets disconnected even to check connect state
   *         when timeout reached. If re-connected before timed out, cancel the delayed monitor thread.
   *
   * [MetaClient thread]       ---------------------------------------------------------------
   *                            (cancel scheduled task when reconnected)
   * [Reconnect monitor]               ---------------------------------X
   *                                  ^                                ^
   *                                 /                                /
   *                                | disconnected event             |  reconnected event
   * [ZkClient]                -----X------------------------------------------------------
   * [ZkClient exp back             |        X                      Y  Reconnected before timed out
   *  -off retry connection]        |--------| ---------------------|
   *
   *
   * case 3: Start the monitor thread when ZkMetaClient gets disconnected even to check connect state
   *         when timeout reached. If re-connected errored, kill the monitor thread  and cancel the
   *         delayed monitor thread.
   * [MetaClient thread]       ---------------------------------------------------------------
   *                          (cancel scheduled task and close ZkClient when reconnected error)
   * [Reconnect monitor]              ----------------------------------X
   *                                 ^                               ^  |
   *                                /                           err /   |
   *                               | disconnected event            |    v close ZkClient
   * [ZkClient]               -----X-------------------------------X ---X
   * [ZkClient exp back            |        X                     ^ Reconnect error
   *  -off retry connection]       |--------| --------------------|
   *
   */

  private class ReconnectStateChangeListener implements IZkStateListener {
    // Schedule a monitor to track ZkClient auto reconnect when Disconnected
    // Cancel the monitor thread when connected.
    @Override
    public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
      if (state == Watcher.Event.KeeperState.Disconnected) {                        // ------case 1
        // Expired. start a new event monitoring retry
        _zkClientConnectionMutex.lockInterruptibly();
        try {
          if (_reconnectMonitorFuture == null || _reconnectMonitorFuture.isCancelled()
              || _reconnectMonitorFuture.isDone()) {
            _reconnectMonitorFuture = _zkClientReconnectMonitor.schedule(() -> {
              if (!_zkClient.getConnection().getZookeeperState().isConnected()) {
                cleanUpAndClose(false, true);
              }
            }, _reconnectTimeout, TimeUnit.MILLISECONDS);
            LOG.info("ZkClient is Disconnected, schedule a reconnect monitor after {}",
                _reconnectTimeout);
          }
        } finally {
          _zkClientConnectionMutex.unlock();
        }
      } else if (state == Watcher.Event.KeeperState.SyncConnected
          || state == Watcher.Event.KeeperState.ConnectedReadOnly) {               // ------ case 2
        cleanUpAndClose(true, false);
        LOG.info("ZkClient is SyncConnected, reconnect monitor thread is canceled (if any)");
      }
    }

    // Cancel the monitor thread when connected.
    @Override
    public void handleNewSession(String sessionId) throws Exception {             // ------ case 2
      cleanUpAndClose(true, false);
      LOG.info("New session initiated in ZkClient, reconnect monitor thread is canceled (if any)");
    }

    // Cancel the monitor thread and close ZkClient when connect error.
    @Override
    public void handleSessionEstablishmentError(Throwable error) throws Exception {    // -- case 3
      cleanUpAndClose(true, true);
      LOG.info("New session initiated in ZkClient, reconnect monitor thread is canceled (if any)");
    }
  }

  @VisibleForTesting
  ZkClient getZkClient() {
    return _zkClient;
  }
}
