/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.giraph.zk;

import java.io.IOException;

import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

/**
 * ZooKeeper provides only atomic operations.  ZooKeeperExt provides additional
 * non-atomic operations that are useful.  It also provides wrappers to
 * deal with ConnectionLossException.  All methods of this class
 * should be thread-safe.
 */
public class ZooKeeperExt {
  /** Length of the ZK sequence number */
  public static final int SEQUENCE_NUMBER_LENGTH = 10;
  /** Internal logger */
  private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class);
  /** Internal ZooKeeper */
  private final ZooKeeper zooKeeper;
  /** Ensure we have progress */
  private final Progressable progressable;
  /** Number of max attempts to retry when failing due to connection loss */
  private final int maxRetryAttempts;
  /** Milliseconds to wait before trying again due to connection loss */
  private final int retryWaitMsecs;

  /**
   * Constructor to connect to ZooKeeper, does not make progress
   *
   * @param connectString Comma separated host:port pairs, each corresponding
   *        to a zk server. e.g.
   *        "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
   *        chroot suffix is used the example would look
   *        like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
   *        where the client would be rooted at "/app/a" and all paths
   *        would be relative to this root - ie getting/setting/etc...
   *        "/foo/bar" would result in operations being run on
   *        "/app/a/foo/bar" (from the server perspective).
   * @param sessionTimeout Session timeout in milliseconds
   * @param maxRetryAttempts Max retry attempts during connection loss
   * @param retryWaitMsecs Msecs to wait when retrying due to connection
   *        loss
   * @param watcher A watcher object which will be notified of state changes,
   *        may also be notified for node events
   * @throws IOException
   */
  public ZooKeeperExt(String connectString,
                      int sessionTimeout,
                      int maxRetryAttempts,
                      int retryWaitMsecs,
                      Watcher watcher) throws IOException {
    this(connectString, sessionTimeout, maxRetryAttempts,
        retryWaitMsecs, watcher, null);
  }

  /**
   * Constructor to connect to ZooKeeper, make progress
   *
   * @param connectString Comma separated host:port pairs, each corresponding
   *        to a zk server. e.g.
   *        "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
   *        chroot suffix is used the example would look
   *        like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
   *        where the client would be rooted at "/app/a" and all paths
   *        would be relative to this root - ie getting/setting/etc...
   *        "/foo/bar" would result in operations being run on
   *        "/app/a/foo/bar" (from the server perspective).
   * @param sessionTimeout Session timeout in milliseconds
   * @param maxRetryAttempts Max retry attempts during connection loss
   * @param retryWaitMsecs Msecs to wait when retrying due to connection
   *        loss
   * @param watcher A watcher object which will be notified of state changes,
   *        may also be notified for node events
   * @param progressable Makes progress for longer operations
   * @throws IOException
   */
  public ZooKeeperExt(String connectString,
      int sessionTimeout,
      int maxRetryAttempts,
      int retryWaitMsecs,
      Watcher watcher,
      Progressable progressable) throws IOException {
    this.zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
    this.progressable = progressable;
    this.maxRetryAttempts = maxRetryAttempts;
    this.retryWaitMsecs = retryWaitMsecs;
  }

  /**
   * Provides a possibility of a creating a path consisting of more than one
   * znode (not atomic).  If recursive is false, operates exactly the
   * same as create().
   *
   * @param path path to create
   * @param data data to set on the final znode
   * @param acl acls on each znode created
   * @param createMode only affects the final znode
   * @param recursive if true, creates all ancestors
   * @return Actual created path
   * @throws KeeperException
   * @throws InterruptedException
   */
  public String createExt(
      final String path,
      byte[] data,
      List<ACL> acl,
      CreateMode createMode,
      boolean recursive) throws KeeperException, InterruptedException {
    if (LOG.isDebugEnabled()) {
      LOG.debug("createExt: Creating path " + path);
    }

    int attempt = 0;
    while (attempt < maxRetryAttempts) {
      try {
        if (!recursive) {
          return zooKeeper.create(path, data, acl, createMode);
        }

        try {
          return zooKeeper.create(path, data, acl, createMode);
        } catch (KeeperException.NoNodeException e) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("createExt: Cannot directly create node " + path);
          }
        }

        int pos = path.indexOf("/", 1);
        for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
          try {
            if (progressable != null) {
              progressable.progress();
            }
            String filePath = path.substring(0, pos);
            if (zooKeeper.exists(filePath, false) == null) {
              zooKeeper.create(
                  filePath, null, acl, CreateMode.PERSISTENT);
            }
          } catch (KeeperException.NodeExistsException e) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("createExt: Znode " + path.substring(0, pos) +
                  " already exists");
            }
          }
        }
        return zooKeeper.create(path, data, acl, createMode);
      } catch (KeeperException.ConnectionLossException e) {
        LOG.warn("createExt: Connection loss on attempt " + attempt + ", " +
            "waiting " + retryWaitMsecs + " msecs before retrying.", e);
      }
      ++attempt;
      Thread.sleep(retryWaitMsecs);
    }
    throw new IllegalStateException("createExt: Failed to create " + path  +
        " after " + attempt + " tries!");
  }

  /**
   * Data structure for handling the output of createOrSet()
   */
  public static class PathStat {
    /** Path to created znode (if any) */
    private String path;
    /** Stat from set znode (if any) */
    private Stat stat;

    /**
     * Put in results from createOrSet()
     *
     * @param path Path to created znode (or null)
     * @param stat Stat from set znode (if set)
     */
    public PathStat(String path, Stat stat) {
      this.path = path;
      this.stat = stat;
    }

    /**
     * Get the path of the created znode if it was created.
     *
     * @return Path of created znode or null if not created
     */
    public String getPath() {
      return path;
    }

    /**
     * Get the stat of the set znode if set
     *
     * @return Stat of set znode or null if not set
     */
    public Stat getStat() {
      return stat;
    }
  }

  /**
   * Create a znode.  Set the znode if the created znode already exists.
   *
   * @param path path to create
   * @param data data to set on the final znode
   * @param acl acls on each znode created
   * @param createMode only affects the final znode
   * @param recursive if true, creates all ancestors
   * @param version Version to set if setting
   * @return Path of created znode or Stat of set znode
   * @throws InterruptedException
   * @throws KeeperException
   */
  public PathStat createOrSetExt(final String path,
      byte[] data,
      List<ACL> acl,
      CreateMode createMode,
      boolean recursive,
      int version) throws KeeperException, InterruptedException {
    String createdPath = null;
    Stat setStat = null;
    try {
      createdPath = createExt(path, data, acl, createMode, recursive);
    } catch (KeeperException.NodeExistsException e) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("createOrSet: Node exists on path " + path);
      }
      setStat = zooKeeper.setData(path, data, version);
    }
    return new PathStat(createdPath, setStat);
  }

  /**
   * Create a znode if there is no other znode there
   *
   * @param path path to create
   * @param data data to set on the final znode
   * @param acl acls on each znode created
   * @param createMode only affects the final znode
   * @param recursive if true, creates all ancestors
   * @return Path of created znode or Stat of set znode
   * @throws InterruptedException
   * @throws KeeperException
   */
  public PathStat createOnceExt(final String path,
      byte[] data,
      List<ACL> acl,
      CreateMode createMode,
      boolean recursive) throws KeeperException, InterruptedException {
    String createdPath = null;
    Stat setStat = null;
    try {
      createdPath = createExt(path, data, acl, createMode, recursive);
    } catch (KeeperException.NodeExistsException e) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("createOnceExt: Node already exists on path " + path);
      }
    }
    return new PathStat(createdPath, setStat);
  }

  /**
   * Delete a path recursively.  When the deletion is recursive, it is a
   * non-atomic operation, hence, not part of ZooKeeper.
   * @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2)
   * @param version expected version (-1 for all)
   * @param recursive if true, remove all children, otherwise behave like
   *        remove()
   * @throws InterruptedException
   * @throws KeeperException
   */
  public void deleteExt(final String path, int version, boolean recursive)
    throws InterruptedException, KeeperException {
    int attempt = 0;
    while (attempt < maxRetryAttempts) {
      try {
        if (!recursive) {
          zooKeeper.delete(path, version);
          return;
        }

        try {
          zooKeeper.delete(path, version);
          return;
        } catch (KeeperException.NotEmptyException e) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("deleteExt: Cannot directly remove node " + path);
          }
        }

        List<String> childList = zooKeeper.getChildren(path, false);
        for (String child : childList) {
          if (progressable != null) {
            progressable.progress();
          }
          deleteExt(path + "/" + child, -1, true);
        }

        zooKeeper.delete(path, version);
        return;
      } catch (KeeperException.ConnectionLossException e) {
        LOG.warn("deleteExt: Connection loss on attempt " +
            attempt + ", waiting " + retryWaitMsecs +
            " msecs before retrying.", e);
      }
      ++attempt;
      Thread.sleep(retryWaitMsecs);
    }
    throw new IllegalStateException("deleteExt: Failed to delete " + path  +
        " after " + attempt + " tries!");
  }

  /**
   * Return the stat of the node of the given path. Return null if no such a
   * node exists.
   * <p>
   * If the watch is true and the call is successful (no exception is thrown),
   * a watch will be left on the node with the given path. The watch will be
   * triggered by a successful operation that creates/delete the node or sets
   * the data on the node.
   *
   * @param path
   *                the node path
   * @param watch
   *                whether need to watch this node
   * @return the stat of the node of the given path; return null if no such a
   *         node exists.
   * @throws KeeperException If the server signals an error
   * @throws InterruptedException If the server transaction is interrupted.
   */
  public Stat exists(String path, boolean watch) throws KeeperException,
      InterruptedException {
    int attempt = 0;
    while (attempt < maxRetryAttempts) {
      try {
        return zooKeeper.exists(path, watch);
      } catch (KeeperException.ConnectionLossException e) {
        LOG.warn("exists: Connection loss on attempt " +
            attempt + ", waiting " + retryWaitMsecs +
            " msecs before retrying.", e);
      }
      ++attempt;
      Thread.sleep(retryWaitMsecs);
    }
    throw new IllegalStateException("exists: Failed to check " + path  +
        " after " + attempt + " tries!");
  }

  /**
   * Return the stat of the node of the given path. Return null if no such a
   * node exists.
   * <p>
   * If the watch is non-null and the call is successful (no exception is
   * thrown), a watch will be left on the node with the given path. The
   * watch will be triggered by a successful operation that
   * creates/delete the node or sets the data on the node.
   *
   * @param path the node path
   * @param watcher explicit watcher
   * @return the stat of the node of the given path; return null if no such a
   *         node exists.
   * @throws KeeperException If the server signals an error
   * @throws InterruptedException If the server transaction is interrupted.
   * @throws IllegalArgumentException if an invalid path is specified
   */
  public Stat exists(final String path, Watcher watcher)
    throws KeeperException, InterruptedException {
    int attempt = 0;
    while (attempt < maxRetryAttempts) {
      try {
        return zooKeeper.exists(path, watcher);
      } catch (KeeperException.ConnectionLossException e) {
        LOG.warn("exists: Connection loss on attempt " +
            attempt + ", waiting " + retryWaitMsecs +
            " msecs before retrying.", e);
      }
      ++attempt;
      Thread.sleep(retryWaitMsecs);
    }
    throw new IllegalStateException("exists: Failed to check " + path  +
        " after " + attempt + " tries!");
  }

  /**
   * Return the data and the stat of the node of the given path.
   * <p>
   * If the watch is non-null and the call is successful (no exception is
   * thrown), a watch will be left on the node with the given path. The watch
   * will be triggered by a successful operation that sets data on the node, or
   * deletes the node.
   * <p>
   * A KeeperException with error code KeeperException.NoNode will be thrown
   * if no node with the given path exists.
   *
   * @param path the given path
   * @param watcher explicit watcher
   * @param stat the stat of the node
   * @return the data of the node
   * @throws KeeperException If the server signals an error with a non-zero
   *         error code
   * @throws InterruptedException If the server transaction is interrupted.
   * @throws IllegalArgumentException if an invalid path is specified
   */
  public byte[] getData(final String path, Watcher watcher, Stat stat)
    throws KeeperException, InterruptedException {
    int attempt = 0;
    while (attempt < maxRetryAttempts) {
      try {
        return zooKeeper.getData(path, watcher, stat);
      } catch (KeeperException.ConnectionLossException e) {
        LOG.warn("getData: Connection loss on attempt " +
            attempt + ", waiting " + retryWaitMsecs +
            " msecs before retrying.", e);
      }
      ++attempt;
      Thread.sleep(retryWaitMsecs);
    }
    throw new IllegalStateException("getData: Failed to get " + path  +
        " after " + attempt + " tries!");
  }

  /**
   * Return the data and the stat of the node of the given path.
   * <p>
   * If the watch is true and the call is successful (no exception is
   * thrown), a watch will be left on the node with the given path. The watch
   * will be triggered by a successful operation that sets data on the node, or
   * deletes the node.
   * <p>
   * A KeeperException with error code KeeperException.NoNode will be thrown
   * if no node with the given path exists.
   *
   * @param path the given path
   * @param watch whether need to watch this node
   * @param stat the stat of the node
   * @return the data of the node
   * @throws KeeperException If the server signals an error with a non-zero
   *         error code
   * @throws InterruptedException If the server transaction is interrupted.
   */
  public byte[] getData(String path, boolean watch, Stat stat)
    throws KeeperException, InterruptedException {
    int attempt = 0;
    while (attempt < maxRetryAttempts) {
      try {
        return zooKeeper.getData(path, watch, stat);
      } catch (KeeperException.ConnectionLossException e) {
        LOG.warn("getData: Connection loss on attempt " +
            attempt + ", waiting " + retryWaitMsecs +
            " msecs before retrying.", e);
      }
      ++attempt;
      Thread.sleep(retryWaitMsecs);
    }
    throw new IllegalStateException("getData: Failed to get " + path  +
        " after " + attempt + " tries!");
  }

  /**
   * Get the children of the path with extensions.
   * Extension 1: Sort the children based on sequence number
   * Extension 2: Get the full path instead of relative path
   *
   * @param path path to znode
   * @param watch set the watch?
   * @param sequenceSorted sort by the sequence number
   * @param fullPath if true, get the fully znode path back
   * @return list of children
   * @throws InterruptedException
   * @throws KeeperException
   */
  public List<String> getChildrenExt(
      final String path,
      boolean watch,
      boolean sequenceSorted,
      boolean fullPath) throws KeeperException, InterruptedException {
    int attempt = 0;
    while (attempt < maxRetryAttempts) {
      try {
        List<String> childList = zooKeeper.getChildren(path, watch);
        /* Sort children according to the sequence number, if desired */
        if (sequenceSorted) {
          Collections.sort(childList, new Comparator<String>() {
            public int compare(String s1, String s2) {
              if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
                  (s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
                throw new RuntimeException(
                    "getChildrenExt: Invalid length for sequence " +
                        " sorting > " +
                        SEQUENCE_NUMBER_LENGTH +
                        " for s1 (" +
                        s1.length() + ") or s2 (" + s2.length() + ")");
              }
              int s1sequenceNumber = Integer.parseInt(
                  s1.substring(s1.length() -
                      SEQUENCE_NUMBER_LENGTH));
              int s2sequenceNumber = Integer.parseInt(
                  s2.substring(s2.length() -
                      SEQUENCE_NUMBER_LENGTH));
              return s1sequenceNumber - s2sequenceNumber;
            }
          });
        }
        if (fullPath) {
          List<String> fullChildList = new ArrayList<String>();
          for (String child : childList) {
            fullChildList.add(path + "/" + child);
          }
          return fullChildList;
        }
        return childList;
      } catch (KeeperException.ConnectionLossException e) {
        LOG.warn("getChildrenExt: Connection loss on attempt " +
            attempt + ", waiting " + retryWaitMsecs +
            " msecs before retrying.", e);
      }
      ++attempt;
      Thread.sleep(retryWaitMsecs);
    }
    throw new IllegalStateException("createExt: Failed to create " + path  +
        " after " + attempt + " tries!");
  }

  /**
   * Close this client object. Once the client is closed, its session becomes
   * invalid. All the ephemeral nodes in the ZooKeeper server associated with
   * the session will be removed. The watches left on those nodes (and on
   * their parents) will be triggered.
   *
   * @throws InterruptedException
   */
  public void close() throws InterruptedException {
    zooKeeper.close();
  }
}
