/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.samza.zk;

import com.google.common.annotations.VisibleForTesting;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.I0Itec.zkclient.IZkDataListener;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.LeaderElectorListener;
import org.apache.samza.coordinator.LeaderElector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <p>
 * An implementation of Leader Elector using Zookeeper.
 *
 * Each participant in the leader election process creates an instance of this class and tries to become the leader.
 * The participant with the lowest sequence number in the ZK subtree for election becomes the leader. Every non-leader
 * sets a watcher on its predecessor, where the predecessor is the participant with the largest sequence number
 * that is less than the current participant's sequence number.
 * </p>
 * */
public class ZkLeaderElector implements LeaderElector {
  public static final Logger LOG = LoggerFactory.getLogger(ZkLeaderElector.class);
  private final ZkUtils zkUtils;
  private final String processorIdStr;
  private final ZkKeyBuilder keyBuilder;
  private final String hostName;

  private AtomicBoolean isLeader = new AtomicBoolean(false);
  private IZkDataListener previousProcessorChangeListener;
  private LeaderElectorListener leaderElectorListener = null;
  private String currentSubscription = null;
  private final Random random = new Random();

  public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils) {
    this.processorIdStr = processorIdStr;
    this.zkUtils = zkUtils;
    this.keyBuilder = zkUtils.getKeyBuilder();
    this.hostName = getHostName();
    this.previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);

    zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath()});
  }

  @VisibleForTesting
  public ZkLeaderElector(String processorIdStr,
                         ZkUtils zkUtils,
                         IZkDataListener previousProcessorChangeListener) {
    this.processorIdStr = processorIdStr;
    this.zkUtils = zkUtils;
    this.keyBuilder = zkUtils.getKeyBuilder();
    this.hostName = getHostName();
    this.previousProcessorChangeListener = previousProcessorChangeListener;
  }

  // TODO: This should go away once we integrate with Zk based Job Coordinator
  private String getHostName() {
    try {
      return InetAddress.getLocalHost().getHostName();
    } catch (UnknownHostException e) {
      LOG.error("Failed to fetch hostname of the processor", e);
      throw new SamzaException(e);
    }
  }

  /**
   * Register a LeaderElectorListener
   *
   * @param listener {@link LeaderElectorListener} interfaces to be invoked upon completion of leader election participation
   */
  @Override
  public void setLeaderElectorListener(LeaderElectorListener listener) {
    this.leaderElectorListener = listener;
  }

  /**
   * Async method that helps the caller participate in leader election.
   **/
  @Override
  public void tryBecomeLeader() {
    String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));

    List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
    LOG.debug(zLog("Current active processors - " + children));
    int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));

    LOG.info("tryBecomeLeader: index = " + index + " for path=" + currentPath + " out of " + Arrays.toString(children.toArray()));
    if (children.size() == 0 || index == -1) {
      throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
    }

    if (index == 0) {
      isLeader.getAndSet(true);
      LOG.info(zLog("Eligible to become the leader!"));
      if (leaderElectorListener != null) {
        leaderElectorListener.onBecomingLeader();
      }
      return;
    }

    isLeader.getAndSet(false);
    LOG.info("Index = " + index + " Not eligible to be a leader yet!");
    String predecessor = children.get(index - 1);
    if (!predecessor.equals(currentSubscription)) {
      if (currentSubscription != null) {
        LOG.debug(zLog("Unsubscribing data change for " + currentSubscription));
        zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
            previousProcessorChangeListener);
        previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);
      }
      currentSubscription = predecessor;
      LOG.info(zLog("Subscribing data change for " + predecessor));
      zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
          previousProcessorChangeListener);
    }
    /**
     * Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes
     * on the path, even if the path doesn't exist. Since we are using Ephemeral Sequential nodes, if the path doesn't
     * exist during subscription, it is not going to get created in the future.
     */
    boolean predecessorExists = zkUtils.exists(keyBuilder.getProcessorsPath() + "/" + currentSubscription);
    if (predecessorExists) {
      LOG.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader."));
    } else {
      try {
        Thread.sleep(random.nextInt(1000));
      } catch (InterruptedException e) {
        Thread.interrupted();
      }
      LOG.info(zLog("Predecessor doesn't exist anymore. Trying to become leader again..."));
      tryBecomeLeader();
    }
  }

  @Override
  public void resignLeadership() {
    isLeader.compareAndSet(true, false);
  }

  @Override
  public boolean amILeader() {
    return isLeader.get();
  }

  private String zLog(String logMessage) {
    return String.format("[Processor-%s] %s", processorIdStr, logMessage);
  }

  class PreviousProcessorChangeListener extends ZkUtils.GenerationAwareZkDataListener {

    public PreviousProcessorChangeListener(ZkUtils zkUtils) {
      super(zkUtils, "PreviousProcessorChangeListener");
    }
    @Override
    public void doHandleDataChange(String dataPath, Object data) {
      LOG.info("Data change on path: {} for data: {}", dataPath, data);
    }

    @Override
    public void doHandleDataDeleted(String dataPath) {
      LOG.info(zLog("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader."));
      tryBecomeLeader();
    }
  }
}
