/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.coord.zk;

import java.util.Iterator;
import java.util.Map;

import javax.annotation.Nullable;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.drill.common.collections.ImmutableEntry;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.exception.VersionMismatchException;
import org.apache.drill.exec.store.sys.store.DataChangeVersion;
import org.apache.jute.BinaryInputArchive;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A namespace aware Zookeeper client.
 *
 * The implementation only operates under the given namespace and is safe to use.
 *
 * Note that instance of this class holds onto resources that must be released via {@code #close()}.
 */
public class ZookeeperClient implements AutoCloseable {
  private static final Logger logger = LoggerFactory.getLogger(ZookeeperClient.class);
  private final CuratorFramework curator;
  private final String root;
  private final PathChildrenCache cache;
  private final CreateMode mode;
  private final int MAX_DATA_LENGTH = BinaryInputArchive.maxBuffer;

  public ZookeeperClient(final CuratorFramework curator, final String root, final CreateMode mode) {
    this.curator = Preconditions.checkNotNull(curator, "curator is required");
    Preconditions.checkArgument(!Strings.isNullOrEmpty(root), "root path is required");
    Preconditions.checkArgument(root.charAt(0) == '/', "root path must be absolute");
    this.root = root;
    this.mode = Preconditions.checkNotNull(mode, "mode is required");
    this.cache = new PathChildrenCache(curator, root, true);
  }

  /**
   * Starts the client. This call ensures the creation of the root path.
   *
   * @throws Exception  if cache fails to start or root path creation fails.
   * @see #close()
   */
  public void start() throws Exception {
    curator.newNamespaceAwareEnsurePath(root).ensure(curator.getZookeeperClient()); // ensure root is created
    getCache().start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); //build cache at start up, to ensure we get correct results right away
  }

  public PathChildrenCache getCache() {
    return cache;
  }

  public String getRoot() {
    return root;
  }

  public CreateMode getMode() {
    return mode;
  }

  /**
   * Returns true if path exists in the cache, false otherwise.
   * Note that calls to this method are eventually consistent.
   *
   * @param path path to check
   * @return true if path exists, false otherwise
   */
  public boolean hasPath(final String path) {
    return hasPath(path, false, null);
  }

  /**
   * Returns true if path exists, false otherwise.
   * If consistent flag is set to true, check is done directly is made against Zookeeper directly,
   * else check is done against local cache.
   *
   * @param path path to check
   * @param consistent whether the check should be consistent
   * @return true if path exists, false otherwise
   */
  public boolean hasPath(final String path, final boolean consistent) {
    return hasPath(path, consistent, null);
  }

  /**
   * Checks if the given path exists.
   * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly.
   * Otherwise, the check is eventually consistent.
   *
   * If consistency flag is set to true and version holder is not null, passes version holder to get data change version.
   * Data change version is retrieved from {@link Stat} object, it increases each time znode data change is performed.
   * Link to Zookeeper documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes
   *
   * @param path path to check
   * @param consistent whether the check should be consistent
   * @param version version holder
   * @return true if path exists, false otherwise
   */
  public boolean hasPath(final String path, final boolean consistent, final DataChangeVersion version) {
    Preconditions.checkNotNull(path, "path is required");

    final String target = PathUtils.join(root, path);
    try {
      if (consistent) {
        Stat stat = curator.checkExists().forPath(target);
        if (version != null && stat != null) {
          version.setVersion(stat.getVersion());
        }
        return stat != null;
      } else {
        return getCache().getCurrentData(target) != null;
      }
    } catch (final Exception e) {
      throw new DrillRuntimeException("error while checking path on zookeeper", e);
    }
  }

  /**
   * Returns a value corresponding to the given path if path exists in the cache, null otherwise.
   *
   * Note that calls to this method are eventually consistent.
   *
   * @param path  target path
   */
  public byte[] get(final String path) {
    return get(path, false);
  }

  /**
   * Returns the value corresponding to the given key, null otherwise.
   *
   * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise,
   * the check is eventually consistent.
   *
   * @param path  target path
   * @param consistent consistency flag
   */
  public byte[] get(final String path, final boolean consistent) {
    return get(path, consistent, null);
  }

  /**
   * Returns the value corresponding to the given key, null otherwise.
   *
   * The check is consistent as it is made against Zookeeper directly.
   *
   * Passes version holder to get data change version.
   *
   * @param path  target path
   * @param version version holder
   */
  public byte[] get(final String path, final DataChangeVersion version) {
    return get(path, true, version);
  }

  /**
   * Returns the value corresponding to the given key, null otherwise.
   *
   * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise,
   * the check is eventually consistent.
   *
   * If consistency flag is set to true and version holder is not null, passes version holder to get data change version.
   * Data change version is retrieved from {@link Stat} object, it increases each time znode data change is performed.
   * Link to Zookeeper documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes
   *
   * @param path  target path
   * @param consistent consistency check
   * @param version version holder
   */
  public byte[] get(final String path, final boolean consistent, final DataChangeVersion version) {
    Preconditions.checkNotNull(path, "path is required");

    final String target = PathUtils.join(root, path);
    if (consistent) {
      try {
        if (version != null) {
          Stat stat = new Stat();
          final byte[] bytes = curator.getData().storingStatIn(stat).forPath(target);
          version.setVersion(stat.getVersion());
          return bytes;
        }
        return curator.getData().forPath(target);
      } catch (final Exception ex) {
        throw new DrillRuntimeException(String.format("error retrieving value for [%s]", path), ex);
      }
    } else {
      final ChildData data = getCache().getCurrentData(target);
      if (data != null) {
        return data.getData();
      }
    }
    return null;
  }

  /**
   * Creates the given path without placing any data in.
   *
   * @param path  target path
   */
  public void create(final String path) {
    Preconditions.checkNotNull(path, "path is required");

    final String target = PathUtils.join(root, path);
    try {
      curator.create().withMode(mode).forPath(target);
      getCache().rebuildNode(target);
    } catch (final Exception e) {
      throw new DrillRuntimeException("unable to put ", e);
    }
  }

  /**
   * Puts the given byte sequence into the given path.
   *
   * If path does not exists, this call creates it.
   *
   * @param path  target path
   * @param data  data to store
   * @throws java.lang.IllegalArgumentException if data size is bigger that jute.maxbuffer value
   */
  public void put(final String path, final byte[] data) {
    put(path, data, null);
  }

  /**
   * Puts the given byte sequence into the given path.
   * <p>
   * If path does not exists, this call creates it.
   * <p>
   * If version holder is not null and path already exists, passes given version for comparison.
   * Zookeeper maintains stat structure that holds version number which increases each time znode data change is performed.
   * If we pass version that doesn't match the actual version of the data,
   * the update will fail {@link org.apache.zookeeper.KeeperException.BadVersionException}.
   * We catch such exception and re-throw it as {@link VersionMismatchException}.
   * Link to documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes
   *
   * @param path    target path
   * @param data    data to store
   * @param version version holder
   * @throws java.lang.IllegalArgumentException if data size is bigger that jute.maxbuffer value
   */
  public void put(final String path, final byte[] data, DataChangeVersion version) {
    Preconditions.checkNotNull(path, "path is required");
    Preconditions.checkNotNull(data, "data is required");
    if (data.length > MAX_DATA_LENGTH) {
      throw new IllegalArgumentException(
        String.format("Can't put this data. Data size %d bytes is bigger than jute.maxbuffer %d", data.length, MAX_DATA_LENGTH)
      );
    }

    final String target = PathUtils.join(root, path);
    try {
      // we make a consistent read to ensure this call won't fail upon consecutive calls on the same path
      // before cache is updated
      boolean hasNode = hasPath(path, true);
      if (!hasNode) {
        try {
          curator.create().withMode(mode).forPath(target, data);
        } catch (NodeExistsException e) {
          // Handle race conditions since Drill is distributed and other
          // drillbits may have just created the node. This assumes that we do want to
          // override the new node. Makes sense here, because if the node had existed,
          // we'd have updated it.
          hasNode = true;
        }
      }
      if (hasNode) {
        if (version != null) {
          try {
            curator.setData().withVersion(version.getVersion()).forPath(target, data);
          } catch (final KeeperException.BadVersionException e) {
            throw new VersionMismatchException("Unable to put data. Version mismatch is detected.", version.getVersion(), e);
          }
        } else {
          curator.setData().forPath(target, data);
        }
      }
      getCache().rebuildNode(target);
    } catch (final VersionMismatchException e) {
      throw e;
    } catch (final Exception e) {
      logger.info("Data size to persist is: {} bytes, client jute.maxbuffer value is: {}. Make sure that the client " +
        "jute.maxbuffer value corresponds to the zookeeper server value.", data.length, MAX_DATA_LENGTH);
      throw new DrillRuntimeException("unable to put ", e);
    }
  }

  /**
   * Puts the given byte sequence into the given path if path is does not exist.
   *
   * @param path  target path
   * @param data  data to store
   * @return null if path was created, else data stored for the given path
   */
  public byte[] putIfAbsent(final String path, final byte[] data) {
    Preconditions.checkNotNull(path, "path is required");
    Preconditions.checkNotNull(data, "data is required");
    if (data.length > MAX_DATA_LENGTH) {
      throw new IllegalArgumentException(
        String.format("Can't put this data. Data size %d bytes is bigger than jute.maxbuffer %d", data.length, MAX_DATA_LENGTH)
      );
    }

    final String target = PathUtils.join(root, path);
    try {
      try {
        curator.create().withMode(mode).forPath(target, data);
        getCache().rebuildNode(target);
        return null;
      } catch (NodeExistsException e) {
        // do nothing
      }
      return curator.getData().forPath(target);
    } catch (final Exception e) {
      logger.info("Data size to persist is: {} bytes, client jute.maxbuffer value is: {}. Make sure that the client " +
        "jute.maxbuffer value corresponds to the zookeeper server value.", data.length, MAX_DATA_LENGTH);
      throw new DrillRuntimeException("unable to put ", e);
    }
  }

  /**
   * Deletes the given node residing at the given path
   *
   * @param path  target path to delete
   */
  public void delete(final String path) {
    Preconditions.checkNotNull(path, "path is required");

    final String target = PathUtils.join(root, path);
    try {
      curator.delete().forPath(target);
      getCache().rebuildNode(target);
    } catch (final Exception e) {
      throw new DrillRuntimeException(String.format("unable to delete node at %s", target), e);
    }
  }

  /**
   * Returns an iterator of (key, value) pairs residing under {@link #getRoot() root} path.
   */
  public Iterator<Map.Entry<String, byte[]>> entries() {
    final String prefix = PathUtils.join(root, "/");
    return Iterables.transform(getCache().getCurrentData(), new Function<ChildData, Map.Entry<String, byte[]>>() {
      @Nullable
      @Override
      public Map.Entry<String, byte[]> apply(final ChildData data) {
        // normalize key name removing the root prefix. resultant key must be a relative path, not beginning with a '/'.
        final String key = data.getPath().replace(prefix, "");
        return new ImmutableEntry<>(key, data.getData());
      }
    }).iterator();
  }

  @Override
  public void close() throws Exception {
    getCache().close();
  }

}
