/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.samza.zk;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkBadVersionException;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.runtime.LocationId;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Util class to help manage Zk connection and ZkClient.
 * It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree.
 *
 * <p>
 *  <b>Note on ZkClient:</b>
 *  {@link ZkClient} consists of two threads - I/O thread and Event thread.
 *  I/O thread manages heartbeats to the Zookeeper server in the ensemble and handles responses to synchronous methods
 *  in Zookeeper API.
 *  Event thread typically receives all the Watcher events and delivers to registered listeners. It, also, handles
 *  responses to asynchronous methods in Zookeeper API.
 * </p>
 *
 * <p>
 *  <b>Note on Session disconnect handling:</b>
 *  After the session has timed out, and restored we may still get some notifications from before (from the old
 *  session). To avoid this, we add a currentGeneration member, which starts with 0, and is increased each time
 *  a new session is established. Current value of this member is passed to each Listener when it is created.
 *  So if the Callback from this Listener comes with an old generation id - we ignore it.
 * </p>
 *
 * <p>
 *   <b>Note on Session Management:</b>
 *   Session management, if needed, should be handled by the caller. This can be done by implementing
 *   {@link org.I0Itec.zkclient.IZkStateListener} and subscribing this listener to the current ZkClient. Note: The connection state change
 *   callbacks are invoked in the context of the Event thread of the ZkClient. So, it is advised to do non-blocking
 *   processing in the callbacks.
 * </p>
 */
public class ZkUtils {
  private static final Logger LOG = LoggerFactory.getLogger(ZkUtils.class);
  /* package private */static final String ZK_PROTOCOL_VERSION = "1.0";

  private final ZkClient zkClient;
  private volatile String ephemeralPath = null;
  private final ZkKeyBuilder keyBuilder;
  private final int connectionTimeoutMs;
  private final AtomicInteger currentGeneration;
  private final ZkUtilsMetrics metrics;
  private final int sessionTimeoutMs;

  public void incGeneration() {
    currentGeneration.incrementAndGet();
  }

  public int getGeneration() {
    return currentGeneration.get();
  }

  public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, int sessionTimeOutMs, MetricsRegistry metricsRegistry) {
    this.keyBuilder = zkKeyBuilder;
    this.connectionTimeoutMs = connectionTimeoutMs;
    this.zkClient = zkClient;
    this.metrics = new ZkUtilsMetrics(metricsRegistry);
    this.currentGeneration = new AtomicInteger(0);
    this.sessionTimeoutMs = sessionTimeOutMs;
  }

  public void connect() throws ZkInterruptedException {
    boolean isConnected = zkClient.waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS);
    if (!isConnected) {
      metrics.zkConnectionError.inc();
      throw new RuntimeException("Unable to connect to Zookeeper within connectionTimeout " + connectionTimeoutMs + "ms. Shutting down!");
    }
  }

  // reset all zk-session specific state
  public synchronized void unregister() {
    ephemeralPath = null;
  }

  public ZkClient getZkClient() {
    return zkClient;
  }

  public ZkKeyBuilder getKeyBuilder() {
    return keyBuilder;
  }

  /**
   * Returns a ZK generated identifier for this client.
   * If the current client is registering for the first time, it creates an ephemeral sequential node in the ZK tree
   * If the current client has already registered and is still within the same session, it returns the already existing
   * value for the ephemeralPath
   *
   * @param data Object that should be written as data in the registered ephemeral ZK node
   * @return String representing the absolute ephemeralPath of this client in the current session
   */
  public synchronized String registerProcessorAndGetId(final ProcessorData data) {
    final long startTimeMs = System.currentTimeMillis();
    final long retryTimeOutMs = 2 * sessionTimeoutMs;
    String processorId = data.getProcessorId();
    if (ephemeralPath == null) {
      ephemeralPath = zkClient.createEphemeralSequential(keyBuilder.getProcessorsPath() + "/", data.toString());
      LOG.info("Created ephemeral path: {} for processor: {} in zookeeper.", ephemeralPath, data);
      while (true) {
        ProcessorNode processorNode = new ProcessorNode(data, ephemeralPath);
        // Determine if there are duplicate processors with this.processorId after registration.
        if (!isValidRegisteredProcessor(processorNode)) {
          long currentTimeMs = System.currentTimeMillis();
          if ((currentTimeMs - startTimeMs) < retryTimeOutMs) {
            LOG.info("Processor: {} is duplicate. Retrying registration again.", processorId);
            timeDelay(1000);
          } else {
            LOG.info("Processor: {} is duplicate. Deleting zookeeper node at path: {}.", processorId, ephemeralPath);
            zkClient.delete(ephemeralPath);
            metrics.deletions.inc();
            throw new SamzaException(String.format("Processor: %s is duplicate in the group. Registration failed.", processorId));
          }
        } else {
          break;
        }
      }
    } else {
      LOG.info("Ephemeral path: {} exists for processor: {} in zookeeper.", ephemeralPath, data);
    }
    return ephemeralPath;
  }

  public void timeDelay(int sleepTimeInMillis) {
    try {
      Thread.sleep(sleepTimeInMillis);
    } catch (InterruptedException e) {
      LOG.error("Interrupted exception on wait.", e);
      Thread.interrupted();
    }
  }

  /**
   * Deletes the ephemeral node of a processor in zookeeper.
   * @param processorId uniqueId identifying the stream processor to delete.
   */
  public synchronized void deleteProcessorNode(String processorId) {
    try {
      if (ephemeralPath != null) {
        LOG.info("Deleting the ephemeral node: {} of the processor: {} in zookeeper.", ephemeralPath, processorId);
        zkClient.delete(ephemeralPath);
      }
    } catch (Exception e) {
      LOG.error("Exception occurred on deletion of ephemeral node: {}.", ephemeralPath, e);
    }
  }

  /**
   * Determines the validity of processor registered with zookeeper.
   *
   * If there are multiple processors registered with same processorId,
   * the processor with lexicographically smallest zookeeperPath is considered valid
   * and all the remaining processors are invalid.
   *
   * Two processors will not have smallest zookeeperPath because of sequentialId guarantees
   * of zookeeper for ephemeral nodes.
   *
   * @param processor to check for validity condition in processors group.
   * @return true if the processor is valid. false otherwise.
   */
  private boolean isValidRegisteredProcessor(final ProcessorNode processor) {
    String processorId = processor.getProcessorData().getProcessorId();
    List<ProcessorNode> processorNodes = getAllProcessorNodes().stream()
                                                               .filter(processorNode -> processorNode.processorData.getProcessorId().equals(processorId))
                                                               .collect(Collectors.toList());
    // Check for duplicate processor condition(if more than one processor exist for this processorId).
    if (processorNodes.size() > 1) {
      // There exists more than processor for provided `processorId`.
      LOG.debug("Processor nodes in zookeeper: {} for processorId: {}.", processorNodes, processorId);
      // Get all ephemeral processor paths
      TreeSet<String> sortedProcessorPaths = processorNodes.stream()
                                                           .map(ProcessorNode::getEphemeralPath)
                                                           .collect(Collectors.toCollection(TreeSet::new));
      // Check if smallest path is equal to this processor's ephemeralPath.
      return sortedProcessorPaths.first().equals(processor.getEphemeralPath());
    }
    // There're no duplicate processors. This is a valid registered processor.
    return true;
  }

  /**
   * Fetches all the ephemeral processor nodes of a standalone job from zookeeper.
   * @return a list of {@link ProcessorNode}, where each ProcessorNode represents a registered stream processor.
   */
  List<ProcessorNode> getAllProcessorNodes() {
    List<String> processorZNodes = getSortedActiveProcessorsZnodes();
    LOG.debug("Active ProcessorZNodes in zookeeper: {}.", processorZNodes);
    List<ProcessorNode> processorNodes = new ArrayList<>();
    for (String processorZNode: processorZNodes) {
      String ephemeralProcessorPath = String.format("%s/%s", keyBuilder.getProcessorsPath(), processorZNode);
      String data = readProcessorData(ephemeralProcessorPath);
      if (data != null) {
        processorNodes.add(new ProcessorNode(new ProcessorData(data), ephemeralProcessorPath));
      }
    }
    return processorNodes;
  }

  public void writeTaskLocality(TaskName taskName, LocationId locationId) {
    String taskLocalityPath = String.format("%s/%s", keyBuilder.getTaskLocalityPath(), taskName);
    validatePaths(new String[] {taskLocalityPath});
    writeData(taskLocalityPath, locationId.getId());
  }

  public Map<TaskName, LocationId> readTaskLocality() {
    Map<TaskName, LocationId> taskLocality = new HashMap<>();
    String taskLocalityPath = keyBuilder.getTaskLocalityPath();
    List<String> tasks = new ArrayList<>();
    if (zkClient.exists(taskLocalityPath)) {
      tasks = zkClient.getChildren(taskLocalityPath);
    }
    for (String taskName : tasks) {
      String taskPath = String.format("%s/%s", keyBuilder.getTaskLocalityPath(), taskName);
      String locationId = zkClient.readData(taskPath, true);
      if (locationId != null) {
        taskLocality.put(new TaskName(taskName), new LocationId(locationId));
      }
    }
    return taskLocality;
  }

  /**
   * Method is used to get the <i>sorted</i> list of currently active/registered processors (znodes)
   *
   * @return List of absolute ZK node paths
   */
  public List<String> getSortedActiveProcessorsZnodes() {
    List<String> znodeIds = zkClient.getChildren(keyBuilder.getProcessorsPath());
    if (znodeIds.size() > 0) {
      Collections.sort(znodeIds);
      LOG.info("Found these children - " + znodeIds);
    }
    return znodeIds;
  }

  /**
   * Method is used to read processor's data from the znode
   * @param fullPath absolute path to the znode
   * @return processor's data
   * @throws SamzaException when fullPath doesn't exist in zookeeper
   * or problems with connecting to zookeeper.
   */
  private String readProcessorData(String fullPath) {
    try {
      String data = zkClient.readData(fullPath, true);
      metrics.reads.inc();
      return data;
    } catch (Exception e) {
      throw new SamzaException(String.format("Cannot read ZK node: %s", fullPath), e);
    }
  }

  /**
   * Method is used to get the list of currently active/registered processor ids
   * @return List of processorIds
   */
  public List<String> getSortedActiveProcessorsIDs() {
    return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
  }

  /**
   * Method is used to get the <i>sorted</i> list of processors ids for a given list of znodes
   * @param znodeIds - list of relative paths of the children's znodes
   * @return List of processor ids for a given list of znodes
   */
  public List<String> getActiveProcessorsIDs(List<String> znodeIds) {
    String processorPath = keyBuilder.getProcessorsPath();
    List<String> processorIds = new ArrayList<>(znodeIds.size());
    if (znodeIds.size() > 0) {
      for (String child : znodeIds) {
        String fullPath = String.format("%s/%s", processorPath, child);
        String processorData = readProcessorData(fullPath);
        if (processorData != null) {
          processorIds.add(new ProcessorData(processorData).getProcessorId());
        }
      }
      Collections.sort(processorIds);
      LOG.info("Found these children - " + znodeIds);
      LOG.info("Found these processorIds - " + processorIds);
    }
    return processorIds;
  }

  /* Wrapper for standard I0Itec methods */
  public void unsubscribeDataChanges(String path, IZkDataListener dataListener) {
    zkClient.unsubscribeDataChanges(path, dataListener);
  }

  public void subscribeDataChanges(String path, IZkDataListener dataListener) {
    zkClient.subscribeDataChanges(path, dataListener);
    metrics.subscriptions.inc();
  }

  public void subscribeChildChanges(String path, IZkChildListener listener) {
    zkClient.subscribeChildChanges(path, listener);
    metrics.subscriptions.inc();
  }

  public void unsubscribeChildChanges(String path, IZkChildListener childListener) {
    zkClient.unsubscribeChildChanges(path, childListener);
  }

  public void writeData(String path, Object object) {
    zkClient.writeData(path, object);
    metrics.writes.inc();
  }

  public boolean exists(String path) {
    return zkClient.exists(path);
  }

  public void close() {
    try {
      zkClient.close();
    } catch (ZkInterruptedException e) {
      LOG.warn("Interrupted when closing zkClient. Clearing the interrupted status and retrying.", e);
      Thread.interrupted();
      zkClient.close();
      Thread.currentThread().interrupt();
    }
  }

  /**
   * A generation-aware {@link IZkChildListener} that only responds to events that occur in the current-generation.
   * Each generation is identified by a generation-id which is scoped to the currently active Zk session and
   * is incremented each time a session expires. This ensures that events corresponding to the previous generation
   * are not acted on.
   */
  public abstract static class GenerationAwareZkChildListener implements IZkChildListener {
    private final int generation;
    private final ZkUtils zkUtils;
    private final String listenerName;

    public GenerationAwareZkChildListener(ZkUtils zkUtils, String listenerName) {
      generation = zkUtils.getGeneration();
      this.zkUtils = zkUtils;
      this.listenerName = listenerName;
    }

    @Override
    public void handleChildChange(String barrierParticipantPath, List<String> participantIds) throws Exception {
      int currentGeneration = zkUtils.getGeneration();
      if (currentGeneration != generation) {
        LOG.warn(String.format("Skipping handleChildChange for %s from wrong generation. Current generation: %s; " +
            "Callback generation: %s", listenerName, currentGeneration, generation));
        return;
      }
      doHandleChildChange(barrierParticipantPath, participantIds);
    }

    public abstract void doHandleChildChange(String path, List<String> children) throws Exception;
  }

  /**
   * A generation-aware {@link IZkDataListener} that only responds to events that occur in the current-generation.
   * Each generation is identified by a generation-id which is scoped to the currently active Zk session and
   * is incremented each time a session expires. This ensures that events corresponding to the previous generation
   * are not acted on.
   */
  public abstract static class GenerationAwareZkDataListener implements IZkDataListener {
    private final int generation;
    private final ZkUtils zkUtils;
    private final String listenerName;

    public GenerationAwareZkDataListener(ZkUtils zkUtils, String listenerName) {
      generation = zkUtils.getGeneration();
      this.zkUtils = zkUtils;
      this.listenerName = listenerName;
    }

    @Override
    public void handleDataChange(String path, Object data) {
      if (!isValid()) {
        LOG.warn(String.format("Skipping handleDataChange for %s from wrong generation. Current generation: %s; " +
            "Callback generation: %s", listenerName, zkUtils.getGeneration(), generation));
      } else {
        doHandleDataChange(path, data);
      }
    }

    public void handleDataDeleted(String dataPath) throws Exception {
      if (!isValid()) {
        LOG.warn(String.format("Skipping handleDataDeleted for %s from wrong generation. Current generation: %s; " +
            "Callback generation: %s", listenerName, zkUtils.getGeneration(), generation));
      } else {
        doHandleDataDeleted(dataPath);
      }
    }

    public abstract void doHandleDataChange(String path, Object data);

    public abstract void doHandleDataDeleted(String path);

    private boolean isValid() {
      return generation == zkUtils.getGeneration();
    }
  }

  /**
    * subscribe for changes of JobModel version
    * @param dataListener describe this
    */
  public void subscribeToJobModelVersionChange(GenerationAwareZkDataListener dataListener) {
    LOG.info(" subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
    zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
    metrics.subscriptions.inc();
  }

  /**
   * Publishes new job model into ZK.
   * This call should FAIL if the node already exists.
   * @param jobModelVersion  version of the jobModeL to publish
   * @param jobModel jobModel to publish
   *
   */
  public void publishJobModel(String jobModelVersion, JobModel jobModel) {
    try {
      ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
      String jobModelStr = mmapper.writerWithDefaultPrettyPrinter().writeValueAsString(jobModel);
      LOG.info("jobModelAsString=" + jobModelStr);
      zkClient.createPersistent(keyBuilder.getJobModelPath(jobModelVersion), jobModelStr);
      LOG.info("wrote jobModel path =" + keyBuilder.getJobModelPath(jobModelVersion));
    } catch (Exception e) {
      LOG.error("JobModel publish failed for version=" + jobModelVersion, e);
      throw new SamzaException(e);
    }
  }

  /**
   * get the job model from ZK by version
   * @param jobModelVersion jobModel version to get
   * @return job model for this version
   */
  public JobModel getJobModel(String jobModelVersion) {
    LOG.info("Read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
    Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
    metrics.reads.inc();
    ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
    JobModel jm;
    try {
      jm = mmapper.readValue((String) data, JobModel.class);
    } catch (IOException e) {
      throw new SamzaException("failed to read JobModel from ZK", e);
    }
    return jm;
  }

  /**
   * read the jobmodel version from ZK
   * @return jobmodel version as a string
   */
  public String getJobModelVersion() {
    String jobModelVersion = zkClient.readData(keyBuilder.getJobModelVersionPath(), true);
    metrics.reads.inc();
    return jobModelVersion;
  }

  /**
   * Read the version of the job model that is the most recent agreed version by the quorum.
   * @return most recent active job model version
   */
  public String getLastActiveJobModelVersion() {
    String lastActiveJobModelVersion = zkClient.readData(keyBuilder.getActiveJobModelVersionPath(), true);
    metrics.reads.inc();
    return lastActiveJobModelVersion;
  }

  /**
   * Generates the next JobModel version that should be used by a processor group in a rebalancing phase
   * for coordination.
   * @param currentJobModelVersion the current version of JobModel.
   * @return the next JobModel version.
   */
  public String getNextJobModelVersion(String currentJobModelVersion) {
    if (currentJobModelVersion == null) {
      return  "1";
    } else {
      /**
       * There's inconsistency between the maximum published jobModel version and value stored in jobModelVersion
       * zookeeper node. Short term fix is to read all published jobModel versions and choose the maximum version. If there's a
       * inconsistency, update the jobModelVersionPath with maximum published jobModelVersion.
       */
      List<String> publishedJobModelVersions = zkClient.getChildren(keyBuilder.getJobModelPathPrefix());
      metrics.reads.inc(publishedJobModelVersions.size());
      String maxPublishedJMVersion = publishedJobModelVersions.stream()
                                                              .max(Comparator.comparingInt(Integer::valueOf)).orElse("0");
      return Integer.toString(Math.max(Integer.valueOf(currentJobModelVersion), Integer.valueOf(maxPublishedJMVersion)) + 1);
    }
  }

  /**
   * publish the version number of the next JobModel
   * @param oldVersion - used to validate, that no one has changed the version in the meanwhile.
   * @param newVersion - new version.
   */
  public void publishJobModelVersion(String oldVersion, String newVersion) {
    Stat stat = new Stat();
    String currentVersion = zkClient.readData(keyBuilder.getJobModelVersionPath(), stat);
    metrics.reads.inc();
    LOG.info("publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat
        .getVersion() + ")");

    if (currentVersion != null && !currentVersion.equals(oldVersion)) {
      throw new SamzaException(
          "Someone changed JobModelVersion while the leader was generating one: expected" + oldVersion + ", got " + currentVersion);
    }
    // data version is the ZK version of the data from the ZK.
    int dataVersion = stat.getVersion();
    try {
      stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), newVersion, dataVersion);
      metrics.writes.inc();
    } catch (Exception e) {
      String msg = "publish job model version failed for new version = " + newVersion + "; old version = " + oldVersion;
      LOG.error(msg, e);
      throw new SamzaException(msg, e);
    }
    LOG.info("published new version: " + newVersion + "; expected data version = " + (dataVersion + 1) +
        "(actual data version after update = " + stat.getVersion() + ")");
  }

  /**
   * Publish to most recent job model version that is agreed by the quorum.
   * @param version active job model version
   */
  public void publishActiveJobModelVersion(String version) {
    try {
      zkClient.writeData(keyBuilder.getActiveJobModelVersionPath(), version);
      metrics.writes.inc();
      LOG.info("Published the active job model version = {} to zookeeper successfully.", version);
    } catch (Exception e) {
      /*
       * Failure to write the most recent job model version has the following implications
       *  1. Processors no longer benefit from the optimization to start the container upon restarts based
       *     on the most recent active job model. It is useful in scenarios where processors leave the quorum and
       *     comeback within debounce time and the work assignment for new quorum remains unchanged.
       *  2. During rolling upgrades, processors that are upgraded initially will wait for the min(deployment
       *     window, T + debounce time) where T is the time at which the last change notification of the quorum was received
       *     by the leader.
       *
       *  That said, failures don't impact correctness and is better to continue processing as opposed to bringing down
       *  the processor as a fatal error.
       */
      LOG.warn("Failed to persist the active job model version = {} due to {}", version, e);
    }
  }

  // validate that Zk protocol currently used by the job is the same as in this participant
  public void validateZkVersion() {

    // Version of the protocol is written into root znode. If root does not exist yet we need to create one.
    String rootPath = keyBuilder.getRootPath();
    if (!zkClient.exists(rootPath)) {
      try {
        // attempt to create the root with the correct version
        zkClient.createPersistent(rootPath, ZK_PROTOCOL_VERSION);
        LOG.info("Created zk root node: " + rootPath + " with zk version " + ZK_PROTOCOL_VERSION);
        return;
      } catch (ZkNodeExistsException e) {
        // ignoring
        LOG.warn("root path " + rootPath + " already exists.");
      }
    }
    // if exists, verify the version
    Stat stat = new Stat();
    String version = zkClient.readData(rootPath, stat);
    if (version == null) {
      // for backward compatibility, if no value - assume 1.0
      try {
        zkClient.writeData(rootPath, "1.0", stat.getVersion());
      } catch (ZkBadVersionException e) {
        // if the write failed with ZkBadVersionException it means someone else already wrote a version, so we can ignore it.
      }
      // re-read the updated version
      version = zkClient.readData(rootPath);
    }
    LOG.info("Current version for zk root node: " + rootPath + " is " + version + ", expected version is " + ZK_PROTOCOL_VERSION);
    if (!version.equals(ZK_PROTOCOL_VERSION)) {
      throw new SamzaException("ZK Protocol mismatch. Expected " + ZK_PROTOCOL_VERSION + "; found " + version);
    }
  }

  /**
   * verify that given paths exist in ZK
   * @param paths - paths to verify or create
   */
  public void validatePaths(String[] paths) {
    for (String path : paths) {
      if (!zkClient.exists(path)) {
        zkClient.createPersistent(path, true);
      }
    }
  }

  /**
   * subscribe to the changes in the list of processors in ZK
   * @param listener - will be called when a processor is added or removed.
   */
  public void subscribeToProcessorChange(IZkChildListener listener) {
    LOG.info("Subscribing for child change at:" + keyBuilder.getProcessorsPath());
    zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
    metrics.subscriptions.inc();
  }

  /**
   * cleanup old data from ZK
   * @param numVersionsToLeave - number of versions to leave
   */
  public void cleanupZK(int numVersionsToLeave) {
    deleteOldBarrierVersions(numVersionsToLeave);
    deleteOldJobModels(numVersionsToLeave);
  }

  void deleteOldJobModels(int numVersionsToLeave) {
    // read current list of JMs
    String path = keyBuilder.getJobModelPathPrefix();
    LOG.info("About to delete jm path=" + path);
    List<String> znodeIds = zkClient.getChildren(path);
    deleteOldVersionPath(path, znodeIds, numVersionsToLeave, new Comparator<String>() {
      @Override
      public int compare(String o1, String o2) {
        // jm version name format is <num>
        return Integer.valueOf(o1) - Integer.valueOf(o2);
      }
    });
  }

  void deleteOldBarrierVersions(int numVersionsToLeave) {
    // read current list of barriers
    String path = keyBuilder.getJobModelVersionBarrierPrefix();
    LOG.info("About to delete old barrier paths from " + path);
    List<String> znodeIds = zkClient.getChildren(path);
    LOG.info("List of all zkNodes: " + znodeIds);
    deleteOldVersionPath(path, znodeIds, numVersionsToLeave,  new Comparator<String>() {
      @Override
      public int compare(String o1, String o2) {
        // barrier's name format is barrier_<num>
        return ZkBarrierForVersionUpgrade.getVersion(o1) - ZkBarrierForVersionUpgrade.getVersion(o2);
      }
    });
  }

  void deleteOldVersionPath(String path, List<String> zNodeIds, int numVersionsToLeave, Comparator<String> c) {
    if (StringUtils.isEmpty(path) || zNodeIds == null) {
      LOG.warn("cannot cleanup empty path or empty list in ZK");
      return;
    }
    if (zNodeIds.size() > numVersionsToLeave) {
      Collections.sort(zNodeIds, c);
      // get the znodes to delete
      int size = zNodeIds.size();
      List<String> zNodesToDelete = zNodeIds.subList(0, zNodeIds.size() - numVersionsToLeave);
      LOG.info("Starting cleanup of barrier version zkNodes. From size=" + size + " to size " + zNodesToDelete.size() + "; numberToLeave=" + numVersionsToLeave);
      for (String znodeId : zNodesToDelete) {
        String pathToDelete = path + "/" + znodeId;
        try {
          LOG.info("deleting " + pathToDelete);
          zkClient.deleteRecursive(pathToDelete);
          metrics.deletions.inc();
        } catch (Exception e) {
          LOG.warn("delete of node " + pathToDelete + " failed.", e);
        }
      }
    }
  }
  /**
   * Represents zookeeper processor node.
   */
  static class ProcessorNode {
    private final ProcessorData processorData;

    // Ex: /test/processors/0000000000
    private final String ephemeralProcessorPath;

    ProcessorNode(ProcessorData processorData, String ephemeralProcessorPath) {
      this.processorData = processorData;
      this.ephemeralProcessorPath = ephemeralProcessorPath;
    }

    ProcessorData getProcessorData() {
      return processorData;
    }

    String getEphemeralPath() {
      return ephemeralProcessorPath;
    }

    @Override
    public String toString() {
      return String.format("[ProcessorData: %s, ephemeralProcessorPath: %s]", processorData, ephemeralProcessorPath);
    }

    @Override
    public int hashCode() {
      return Objects.hash(processorData, ephemeralProcessorPath);
    }

    @Override
    public boolean equals(Object obj) {
      if (obj == null) return false;
      if (getClass() != obj.getClass()) return false;
      final ProcessorNode other = (ProcessorNode) obj;
      return Objects.equals(processorData, other.processorData) && Objects.equals(ephemeralProcessorPath, other.ephemeralProcessorPath);
    }
  }
}
