/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.giraph.zk;

import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.io.FileUtils;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkState;
import static org.apache.giraph.conf.GiraphConstants.BASE_ZNODE_KEY;
import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY;


/**
 * Manages the election of ZooKeeper servers, starting/stopping the services,
 * etc.
 */
public class ZooKeeperManager {
  /** Class logger */
  private static final Logger LOG = Logger.getLogger(ZooKeeperManager.class);
  /** Separates the hostname and task in the candidate stamp */
  private static final String HOSTNAME_TASK_SEPARATOR = " ";
  /** The ZooKeeperString filename prefix */
  private static final String ZOOKEEPER_SERVER_LIST_FILE_PREFIX =
      "zkServerList_";
  /** Job context (mainly for progress) */
  private Mapper<?, ?, ?, ?>.Context context;
  /** Hadoop configuration */
  private final ImmutableClassesGiraphConfiguration conf;
  /** Task partition, to ensure uniqueness */
  private final int taskPartition;
  /** HDFS base directory for all file-based coordination */
  private final Path baseDirectory;
  /**
   * HDFS task ZooKeeper candidate/completed
   * directory for all file-based coordination
   */
  private final Path taskDirectory;
  /**
   * HDFS ZooKeeper server ready/done directory
   * for all file-based coordination
   */
  private final Path serverDirectory;
  /** HDFS path to whether the task is done */
  private final Path myClosedPath;
  /** Polling msecs timeout */
  private final int pollMsecs;
  /** File system */
  private final FileSystem fs;
  /** Zookeeper wrapper */
  private ZooKeeperRunner zkRunner;
  /** ZooKeeper local file system directory */
  private final String zkDir;
  /** ZooKeeper config file path */
  private final ZookeeperConfig config;
  /** ZooKeeper server host */
  private String zkServerHost;
  /** ZooKeeper server task */
  private int zkServerTask;
  /** ZooKeeper base port */
  private int zkBasePort;
  /** Final ZooKeeper server port list (for clients) */
  private String zkServerPortString;
  /** My hostname */
  private String myHostname = null;
  /** Job id, to ensure uniqueness */
  private final String jobId;
  /** Time object for tracking timeouts */
  private final Time time = SystemTime.get();

  /** State of the application */
  public enum State {
    /** Failure occurred */
    FAILED,
    /** Application finished */
    FINISHED
  }

  /**
   * Constructor with context.
   *
   * @param context Context to be stored internally
   * @param configuration Configuration
   * @throws IOException
   */
  public ZooKeeperManager(Mapper<?, ?, ?, ?>.Context context,
                          ImmutableClassesGiraphConfiguration configuration)
    throws IOException {
    this.context = context;
    this.conf = configuration;
    taskPartition = conf.getTaskPartition();
    jobId = conf.get("mapred.job.id", "Unknown Job");
    baseDirectory =
        new Path(ZOOKEEPER_MANAGER_DIRECTORY.getWithDefault(conf,
            getFinalZooKeeperPath()));
    taskDirectory = new Path(baseDirectory,
        "_task");
    serverDirectory = new Path(baseDirectory,
        "_zkServer");
    myClosedPath = new Path(taskDirectory,
        (new ComputationDoneName(taskPartition)).getName());
    pollMsecs = GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.get(conf);
    String jobLocalDir = conf.get("job.local.dir");
    String zkDirDefault;
    if (jobLocalDir != null) { // for non-local jobs
      zkDirDefault = jobLocalDir +
          "/_bspZooKeeper";
    } else {
      zkDirDefault = System.getProperty("user.dir") + "/" +
              ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue();
    }
    zkDir = conf.get(GiraphConstants.ZOOKEEPER_DIR, zkDirDefault);
    config = new ZookeeperConfig();
    zkBasePort = GiraphConstants.ZOOKEEPER_SERVER_PORT.get(conf);

    myHostname = conf.getLocalHostname();
    fs = FileSystem.get(conf);
  }

  /**
   * Generate the final ZooKeeper coordination directory on HDFS
   *
   * @return directory path with job id
   */
  private String getFinalZooKeeperPath() {
    return ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue() + "/" + jobId;
  }

  /**
   * Return the base ZooKeeper ZNode from which all other ZNodes Giraph creates
   * should be sited, for instance in a multi-tenant ZooKeeper, the znode
   * reserved for Giraph
   *
   * @param conf  Necessary to access user-provided values
   * @return  String of path without trailing slash
   */
  public static String getBasePath(Configuration conf) {
    String result = conf.get(BASE_ZNODE_KEY, "");
    if (!result.equals("") && !result.startsWith("/")) {
      throw new IllegalArgumentException("Value for " +
          BASE_ZNODE_KEY + " must start with /: " + result);
    }

    return result;
  }

  /**
   * Create the candidate stamps and decide on the servers to start if
   * you are partition 0.
   *
   * @throws IOException
   * @throws InterruptedException
   */
  public void setup() throws IOException, InterruptedException {
    createCandidateStamp();
    getZooKeeperServerList();
  }

  /**
   * Create a HDFS stamp for this task.  If another task already
   * created it, then this one will fail, which is fine.
   */
  public void createCandidateStamp() {
    try {
      fs.mkdirs(baseDirectory);
      LOG.info("createCandidateStamp: Made the directory " +
          baseDirectory);
    } catch (IOException e) {
      LOG.error("createCandidateStamp: Failed to mkdirs " +
          baseDirectory);
    }
    try {
      fs.mkdirs(serverDirectory);
      LOG.info("createCandidateStamp: Made the directory " +
          serverDirectory);
    } catch (IOException e) {
      LOG.error("createCandidateStamp: Failed to mkdirs " +
          serverDirectory);
    }
    // Check that the base directory exists and is a directory
    try {
      if (!fs.getFileStatus(baseDirectory).isDir()) {
        throw new IllegalArgumentException(
            "createCandidateStamp: " + baseDirectory +
            " is not a directory, but should be.");
      }
    } catch (IOException e) {
      throw new IllegalArgumentException(
          "createCandidateStamp: Couldn't get file status " +
              "for base directory " + baseDirectory + ".  If there is an " +
              "issue with this directory, please set an accesible " +
              "base directory with the Hadoop configuration option " +
              ZOOKEEPER_MANAGER_DIRECTORY.getKey(), e);
    }

    Path myCandidacyPath = new Path(
        taskDirectory, myHostname +
        HOSTNAME_TASK_SEPARATOR + taskPartition);
    try {
      if (LOG.isInfoEnabled()) {
        LOG.info("createCandidateStamp: Creating my filestamp " +
            myCandidacyPath);
      }
      fs.createNewFile(myCandidacyPath);
    } catch (IOException e) {
      LOG.error("createCandidateStamp: Failed (maybe previous task " +
          "failed) to create filestamp " + myCandidacyPath, e);
    }
  }

  /**
   * Create a new file with retries if it fails.
   *
   * @param fs File system where the new file is created
   * @param path Path of the new file
   * @param maxAttempts Maximum number of attempts
   * @param retryWaitMsecs Milliseconds to wait before retrying
   */
  private static void createNewFileWithRetries(
      FileSystem fs, Path path, int maxAttempts, int retryWaitMsecs) {
    int attempt = 0;
    while (attempt < maxAttempts) {
      try {
        fs.createNewFile(path);
        return;
      } catch (IOException e) {
        LOG.warn("createNewFileWithRetries: Failed to create file at path " +
            path + " on attempt " + attempt + " of " + maxAttempts + ".", e);
      }
      ++attempt;
      Uninterruptibles.sleepUninterruptibly(
          retryWaitMsecs, TimeUnit.MILLISECONDS);
    }
    throw new IllegalStateException(
        "createNewFileWithRetries: Failed to create file at path " +
            path + " after " + attempt + " attempts");
  }

  /**
   * Every task must create a stamp to let the ZooKeeper servers know that
   * they can shutdown.  This also lets the task know that it was already
   * completed.
   */
  private void createZooKeeperClosedStamp() {
    LOG.info("createZooKeeperClosedStamp: Creating my filestamp " +
        myClosedPath);
    createNewFileWithRetries(fs, myClosedPath,
        conf.getHdfsFileCreationRetries(),
        conf.getHdfsFileCreationRetryWaitMs());
  }

  /**
   * Check if all the computation is done.
   * @return true if all computation is done.
   */
  public boolean computationDone() {
    try {
      return fs.exists(myClosedPath);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * Task 0 will call this to create the ZooKeeper server list.  The result is
   * a file that describes the ZooKeeper servers through the filename.
   *
   * @throws IOException
   * @throws InterruptedException
   */
  private void createZooKeeperServerList() throws IOException,
      InterruptedException {
    String host;
    String task;
    while (true) {
      FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
      if (fileStatusArray.length > 0) {
        FileStatus fileStatus = fileStatusArray[0];
        String[] hostnameTaskArray =
            fileStatus.getPath().getName().split(
                HOSTNAME_TASK_SEPARATOR);
        checkState(hostnameTaskArray.length == 2,
            "createZooKeeperServerList: Task 0 failed " +
            "to parse " + fileStatus.getPath().getName());
        host = hostnameTaskArray[0];
        task = hostnameTaskArray[1];
        break;
      }
      Thread.sleep(pollMsecs);
    }
    String serverListFile =
        ZOOKEEPER_SERVER_LIST_FILE_PREFIX + host +
        HOSTNAME_TASK_SEPARATOR + task;
    Path serverListPath =
        new Path(baseDirectory, serverListFile);
    if (LOG.isInfoEnabled()) {
      LOG.info("createZooKeeperServerList: Creating the final " +
          "ZooKeeper file '" + serverListPath + "'");
    }
    fs.createNewFile(serverListPath);
  }

  /**
   * Make an attempt to get the server list file by looking for a file in
   * the appropriate directory with the prefix
   * ZOOKEEPER_SERVER_LIST_FILE_PREFIX.
   * @return null if not found or the filename if found
   * @throws IOException
   */
  private String getServerListFile() throws IOException {
    String serverListFile = null;
    FileStatus [] fileStatusArray = fs.listStatus(baseDirectory);
    for (FileStatus fileStatus : fileStatusArray) {
      if (fileStatus.getPath().getName().startsWith(
          ZOOKEEPER_SERVER_LIST_FILE_PREFIX)) {
        serverListFile = fileStatus.getPath().getName();
        break;
      }
    }
    return serverListFile;
  }

  /**
   * Task 0 is the designated master and will generate the server list
   * (unless it has already done so).  Other
   * tasks will consume the file after it is created (just the filename).
   * @throws IOException
   * @throws InterruptedException
   */
  private void getZooKeeperServerList() throws IOException,
      InterruptedException {
    String serverListFile;

    if (taskPartition == 0) {
      serverListFile = getServerListFile();
      if (serverListFile == null) {
        createZooKeeperServerList();
      }
    }

    while (true) {
      serverListFile = getServerListFile();
      if (LOG.isInfoEnabled()) {
        LOG.info("getZooKeeperServerList: For task " + taskPartition +
            ", got file '" + serverListFile +
            "' (polling period is " +
            pollMsecs + ")");
      }
      if (serverListFile != null) {
        break;
      }
      try {
        Thread.sleep(pollMsecs);
      } catch (InterruptedException e) {
        LOG.warn("getZooKeeperServerList: Strange interrupted " +
            "exception " + e.getMessage());
      }

    }

    String[] serverHostList = serverListFile.substring(
        ZOOKEEPER_SERVER_LIST_FILE_PREFIX.length()).split(
            HOSTNAME_TASK_SEPARATOR);
    if (LOG.isInfoEnabled()) {
      LOG.info("getZooKeeperServerList: Found " +
          Arrays.toString(serverHostList) +
          " hosts in filename '" + serverListFile + "'");
    }

    zkServerHost = serverHostList[0];
    zkServerTask = Integer.parseInt(serverHostList[1]);
    updateZkPortString();
  }

  /**
   * Update zookeeper host:port string.
   */
  private void updateZkPortString() {
    zkServerPortString = zkServerHost + ":" + zkBasePort;
  }

  /**
   * Users can get the server port string to connect to ZooKeeper
   * @return server port string - comma separated
   */
  public String getZooKeeperServerPortString() {
    return zkServerPortString;
  }

  /**
   * Whoever is elected to be a ZooKeeper server must generate a config file
   * locally.
   *
   */
  private void generateZooKeeperConfig() {
    if (LOG.isInfoEnabled()) {
      LOG.info("generateZooKeeperConfig: with base port " +
          zkBasePort);
    }
    File zkDirFile = new File(this.zkDir);
    boolean mkDirRet = zkDirFile.mkdirs();
    if (LOG.isInfoEnabled()) {
      LOG.info("generateZooKeeperConfigFile: Make directory of " +
          zkDirFile.getName() + " = " + mkDirRet);
    }

    config.setDataDir(zkDir);
    config.setDataLogDir(zkDir);
    config.setClientPortAddress(new InetSocketAddress(zkBasePort));
    config.setMinSessionTimeout(conf.getZooKeeperMinSessionTimeout());
    config.setMaxSessionTimeout(conf.getZooKeeperMaxSessionTimeout());
  }

  /**
   * If this task has been selected, online a ZooKeeper server.  Otherwise,
   * wait until this task knows that the ZooKeeper servers have been onlined.
   */
  public void onlineZooKeeperServer() {
    if (zkServerTask == taskPartition) {
      File zkDirFile = new File(this.zkDir);
      try {
        if (LOG.isInfoEnabled()) {
          LOG.info("onlineZooKeeperServers: Trying to delete old " +
              "directory " + this.zkDir);
        }
        FileUtils.deleteDirectory(zkDirFile);
      } catch (IOException e) {
        LOG.warn("onlineZooKeeperServers: Failed to delete " +
            "directory " + this.zkDir, e);
      }
      generateZooKeeperConfig();
      synchronized (this) {
        zkRunner = createRunner();
        int port = zkRunner.start(zkDir, config);
        if (port > 0) {
          zkBasePort = port;
          updateZkPortString();
        }
      }

      // Once the server is up and running, notify that this server is up
      // and running by dropping a ready stamp.
      int connectAttempts = 0;
      final int maxConnectAttempts =
          conf.getZookeeperConnectionAttempts();
      while (connectAttempts < maxConnectAttempts) {
        try {
          if (LOG.isInfoEnabled()) {
            LOG.info("onlineZooKeeperServers: Connect attempt " +
                connectAttempts + " of " +
                maxConnectAttempts +
                " max trying to connect to " +
                myHostname + ":" + zkBasePort +
                " with poll msecs = " + pollMsecs);
          }
          InetSocketAddress zkServerAddress =
              new InetSocketAddress(myHostname, zkBasePort);
          Socket testServerSock = new Socket();
          testServerSock.connect(zkServerAddress, 5000);
          if (LOG.isInfoEnabled()) {
            LOG.info("onlineZooKeeperServers: Connected to " +
                zkServerAddress + "!");
          }
          break;
        } catch (SocketTimeoutException e) {
          LOG.warn("onlineZooKeeperServers: Got " +
              "SocketTimeoutException", e);
        } catch (ConnectException e) {
          LOG.warn("onlineZooKeeperServers: Got " +
              "ConnectException", e);
        } catch (IOException e) {
          LOG.warn("onlineZooKeeperServers: Got " +
              "IOException", e);
        }

        ++connectAttempts;
        try {
          Thread.sleep(pollMsecs);
        } catch (InterruptedException e) {
          LOG.warn("onlineZooKeeperServers: Sleep of " + pollMsecs +
              " interrupted - " + e.getMessage());
        }
      }
      if (connectAttempts == maxConnectAttempts) {
        throw new IllegalStateException(
            "onlineZooKeeperServers: Failed to connect in " +
                connectAttempts + " tries!");
      }
      Path myReadyPath = new Path(
          serverDirectory, myHostname +
          HOSTNAME_TASK_SEPARATOR + taskPartition +
          HOSTNAME_TASK_SEPARATOR + zkBasePort);
      try {
        if (LOG.isInfoEnabled()) {
          LOG.info("onlineZooKeeperServers: Creating my filestamp " +
              myReadyPath);
        }
        fs.createNewFile(myReadyPath);
      } catch (IOException e) {
        LOG.error("onlineZooKeeperServers: Failed (maybe previous " +
            "task failed) to create filestamp " + myReadyPath, e);
      }
    } else {
      int readyRetrievalAttempt = 0;
      String foundServer = null;
      while (true) {
        try {
          FileStatus [] fileStatusArray =
              fs.listStatus(serverDirectory);
          if ((fileStatusArray != null) &&
              (fileStatusArray.length > 0)) {
            for (int i = 0; i < fileStatusArray.length; ++i) {
              String[] hostnameTaskArray =
                  fileStatusArray[i].getPath().getName().split(
                      HOSTNAME_TASK_SEPARATOR);
              if (hostnameTaskArray.length != 3) {
                throw new RuntimeException(
                    "getZooKeeperServerList: Task 0 failed " +
                        "to parse " +
                        fileStatusArray[i].getPath().getName());
              }
              foundServer = hostnameTaskArray[0];
              zkBasePort = Integer.parseInt(hostnameTaskArray[2]);
              updateZkPortString();
            }
            if (LOG.isInfoEnabled()) {
              LOG.info("onlineZooKeeperServers: Got " +
                  foundServer + " on port " +
                  zkBasePort +
                  " (polling period is " +
                  pollMsecs + ") on attempt " +
                  readyRetrievalAttempt);
            }
            if (zkServerHost.equals(foundServer)) {
              break;
            }
          } else {
            if (LOG.isInfoEnabled()) {
              LOG.info("onlineZooKeeperServers: Empty " +
                  "directory " + serverDirectory +
                  ", waiting " + pollMsecs + " msecs.");
            }
          }
          Thread.sleep(pollMsecs);
          ++readyRetrievalAttempt;
        } catch (IOException e) {
          throw new RuntimeException(e);
        } catch (InterruptedException e) {
          LOG.warn("onlineZooKeeperServers: Strange interrupt from " +
              e.getMessage(), e);
        }
      }
    }
  }

  /**
   * Wait for all workers to signal completion.  Will wait up to
   * WAIT_TASK_DONE_TIMEOUT_MS milliseconds for this to complete before
   * reporting an error.
   *
   * @param totalWorkers Number of workers to wait for
   */
  private void waitUntilAllTasksDone(int totalWorkers) {
    int attempt = 0;
    long maxMs = time.getMilliseconds() +
        conf.getWaitTaskDoneTimeoutMs();
    while (true) {
      boolean[] taskDoneArray = new boolean[totalWorkers];
      try {
        FileStatus [] fileStatusArray =
            fs.listStatus(taskDirectory);
        int totalDone = 0;
        if (fileStatusArray.length > 0) {
          for (FileStatus fileStatus : fileStatusArray) {
            String name = fileStatus.getPath().getName();
            if (ComputationDoneName.isName(name)) {
              ++totalDone;
              taskDoneArray[ComputationDoneName.fromName(
                  name).getWorkerId()] = true;
            }
          }
        }
        if (LOG.isInfoEnabled()) {
          LOG.info("waitUntilAllTasksDone: Got " + totalDone +
              " and " + totalWorkers +
              " desired (polling period is " +
              pollMsecs + ") on attempt " +
              attempt);
        }
        if (totalDone >= totalWorkers) {
          break;
        } else {
          StringBuilder sb = new StringBuilder();
          for (int i = 0; i < taskDoneArray.length; ++i) {
            if (!taskDoneArray[i]) {
              sb.append(i).append(", ");
            }
          }
          LOG.info("waitUntilAllTasksDone: Still waiting on tasks " +
              sb.toString());
        }
        ++attempt;
        Thread.sleep(pollMsecs);
        context.progress();
      } catch (IOException e) {
        LOG.warn("waitUntilAllTasksDone: Got IOException.", e);
      } catch (InterruptedException e) {
        LOG.warn("waitUntilAllTasksDone: Got InterruptedException", e);
      }

      if (time.getMilliseconds() > maxMs) {
        throw new IllegalStateException("waitUntilAllTasksDone: Tasks " +
            "did not finish by the maximum time of " +
            conf.getWaitTaskDoneTimeoutMs() + " milliseconds");
      }
    }
  }

  /**
   * Notify the ZooKeeper servers that this partition is done with all
   * ZooKeeper communication.  If this task is running a ZooKeeper server,
   * kill it when all partitions are done and wait for
   * completion.  Clean up the ZooKeeper local directory as well.
   *
   * @param state State of the application
   */
  public void offlineZooKeeperServers(State state) {
    if (state == State.FINISHED) {
      createZooKeeperClosedStamp();
    }
    synchronized (this) {
      if (zkRunner != null) {
        boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf);
        int totalWorkers = conf.getMapTasks();
        // A Yarn job always spawns MAX_WORKERS + 1 containers
        if (isYarnJob) {
          totalWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0) + 1;
        }
        LOG.info("offlineZooKeeperServers: Will wait for " +
            totalWorkers + " tasks");
        waitUntilAllTasksDone(totalWorkers);
        zkRunner.stop();
        File zkDirFile;
        try {
          zkDirFile = new File(zkDir);
          FileUtils.deleteDirectory(zkDirFile);
        } catch (IOException e) {
          LOG.warn("offlineZooKeeperSevers: " +
                  "IOException, but continuing",
              e);
        }
        if (LOG.isInfoEnabled()) {
          LOG.info("offlineZooKeeperServers: deleted directory " + zkDir);
        }
        zkRunner = null;
      }
    }
  }

  /**
   * Create appropriate zookeeper wrapper depending on configuration.
   * Zookeeper can run in master process or outside as a separate
   * java process.
   *
   * @return either in process or out of process wrapper.
   */
  private ZooKeeperRunner createRunner() {
    ZooKeeperRunner runner = new InProcessZooKeeperRunner();
    runner.setConf(conf);
    return runner;
  }

  /**
   *  Is this task running a ZooKeeper server?  Only could be true if called
   *  after onlineZooKeeperServers().
   *
   *  @return true if running a ZooKeeper server, false otherwise
   */
  public boolean runsZooKeeper() {
    synchronized (this) {
      return zkRunner != null;
    }
  }

  /**
   * Do necessary cleanup in zookeeper wrapper.
   */
  public void cleanup() {
    synchronized (this) {
      if (zkRunner != null) {
        zkRunner.cleanup();
      }
    }
  }
}
