/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.accumulo.fate.zookeeper;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

import java.util.List;

import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

public class ZooReaderWriter extends ZooReader {
  public interface Mutator {
    byte[] mutate(byte[] currentValue) throws AcceptableThriftTableOperationException;
  }

  public ZooReaderWriter(AccumuloConfiguration conf) {
    this(conf.get(Property.INSTANCE_ZK_HOST),
        (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT),
        conf.get(Property.INSTANCE_SECRET));
  }

  private final byte[] auth;

  public ZooReaderWriter(String keepers, int timeoutInMillis, String secret) {
    super(keepers, timeoutInMillis);
    this.auth = ("accumulo" + ":" + secret).getBytes(UTF_8);
  }

  @Override
  public ZooKeeper getZooKeeper() {
    return ZooSession.getAuthenticatedSession(keepers, timeout, "digest", auth);
  }

  /**
   * Retrieve the ACL list that was on the node
   */
  public List<ACL> getACL(String zPath) throws KeeperException, InterruptedException {
    return retryLoop(zk -> zk.getACL(zPath, null));
  }

  /**
   * Create a persistent node with the default ACL
   *
   * @return true if the data was set on a new node or overwritten, and false if an existing node
   *         was skipped
   */
  public boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy)
      throws KeeperException, InterruptedException {
    return putPersistentData(zPath, data, policy, ZooUtil.PUBLIC);
  }

  /**
   * Create a persistent node with the private ACL
   *
   * @return true if the data was set on a new node or overwritten, and false if an existing node
   *         was skipped
   */
  public boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy)
      throws KeeperException, InterruptedException {
    return putPersistentData(zPath, data, policy, ZooUtil.PRIVATE);
  }

  /**
   * Create a persistent node with the provided ACLs
   *
   * @return true if the data was set on a new node or overwritten, and false if an existing node
   *         was skipped
   */
  public boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy,
      List<ACL> acls) throws KeeperException, InterruptedException {
    // zk allows null ACLs, but it's probably a bug in Accumulo if we see it used in our code
    requireNonNull(acls);
    requireNonNull(policy);
    return retryLoop(zk -> {
      try {
        zk.create(zPath, data, acls, CreateMode.PERSISTENT);
        return true;
      } catch (KeeperException e) {
        if (e.code() == Code.NODEEXISTS) {
          switch (policy) {
            case SKIP:
              return false;
            case OVERWRITE:
              zk.setData(zPath, data, -1);
              return true;
            case FAIL:
            default:
              // re-throw below
          }
        }
        throw e;
      }
    },
        // if OVERWRITE policy is used, create() can fail with NODEEXISTS;
        // then, the node can be deleted, causing setData() to fail with NONODE;
        // if that happens, the following code ensures we retry
        e -> e.code() == Code.NONODE && policy == NodeExistsPolicy.OVERWRITE);
  }

  /**
   * Create a persistent sequential node with the default ACL
   *
   * @return the actual path of the created node
   */
  public String putPersistentSequential(String zPath, byte[] data)
      throws KeeperException, InterruptedException {
    return retryLoop(
        zk -> zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.PERSISTENT_SEQUENTIAL));
  }

  /**
   * Create an ephemeral node with the default ACL
   */
  public void putEphemeralData(String zPath, byte[] data)
      throws KeeperException, InterruptedException {
    retryLoop(zk -> zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL));
  }

  /**
   * Create an ephemeral sequential node with the default ACL
   *
   * @return the actual path of the created node
   */
  public String putEphemeralSequential(String zPath, byte[] data)
      throws KeeperException, InterruptedException {
    return retryLoop(zk -> zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL_SEQUENTIAL));
  }

  /**
   * Recursively copy any persistent data from the source to the destination, using the default ACL
   * to create any missing nodes and skipping over any ephemeral data.
   */
  public void recursiveCopyPersistentOverwrite(String source, String destination)
      throws KeeperException, InterruptedException {
    var stat = new Stat();
    byte[] data = getData(source, stat);
    // only copy persistent data
    if (stat.getEphemeralOwner() != 0) {
      return;
    }
    putPersistentData(destination, data, NodeExistsPolicy.OVERWRITE);
    if (stat.getNumChildren() > 0) {
      for (String child : getChildren(source)) {
        recursiveCopyPersistentOverwrite(source + "/" + child, destination + "/" + child);
      }
    }
  }

  /**
   * Update an existing ZK node using the provided mutator function. If it's possible the node
   * doesn't exist yet, use {@link #mutateOrCreate(String, byte[], Mutator)} instead.
   *
   * @return the value set on the node
   */
  public byte[] mutateExisting(String zPath, Mutator mutator)
      throws KeeperException, InterruptedException, AcceptableThriftTableOperationException {
    requireNonNull(mutator);
    return retryLoopMutator(zk -> {
      var stat = new Stat();
      byte[] data = zk.getData(zPath, null, stat);
      // this mutator can throw AcceptableThriftTableOperationException
      data = mutator.mutate(data);
      if (data != null) {
        zk.setData(zPath, data, stat.getVersion());
      }
      return data;
    }, e -> e.code() == Code.BADVERSION); // always retry if bad version
  }

  /**
   * Create a new {@link CreateMode#PERSISTENT} ZK node with the default ACL if it does not exist.
   * If it does already exist, then update it with the provided mutator function. If it is known to
   * exist already, use {@link #mutateExisting(String, Mutator)} instead.
   *
   * @return the value set on the node
   */
  public byte[] mutateOrCreate(String zPath, byte[] createValue, Mutator mutator)
      throws KeeperException, InterruptedException, AcceptableThriftTableOperationException {
    requireNonNull(mutator);
    return putPersistentData(zPath, createValue, NodeExistsPolicy.SKIP) ? createValue
        : mutateExisting(zPath, mutator);
  }

  /**
   * Ensure the provided path exists, using persistent nodes, empty data, and the default ACL for
   * any missing path elements.
   */
  public void mkdirs(String path) throws KeeperException, InterruptedException {
    if (path.equals("")) {
      // terminal condition for recursion
      return;
    }
    if (!path.startsWith("/")) {
      throw new IllegalArgumentException(path + "does not start with /");
    }
    if (exists(path)) {
      return;
    }
    String parent = path.substring(0, path.lastIndexOf("/"));
    mkdirs(parent);
    putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP);
  }

  /**
   * Delete the specified node, and ignore NONODE exceptions.
   */
  public void delete(String path) throws KeeperException, InterruptedException {
    retryLoop(zk -> {
      try {
        zk.delete(path, -1);
      } catch (KeeperException e) {
        // ignore the case where the node doesn't exist
        if (e.code() != Code.NONODE) {
          throw e;
        }
      }
      return null;
    });
  }

  /**
   * This method will delete a node and all its children.
   */
  public void recursiveDelete(String zPath, NodeMissingPolicy policy)
      throws KeeperException, InterruptedException {
    ZooUtil.recursiveDelete(getZooKeeper(), zPath, policy);
  }
}
