/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.twill.internal.appmaster;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Service;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.twill.api.RunId;
import org.apache.twill.internal.Constants;
import org.apache.twill.internal.ServiceMain;
import org.apache.twill.internal.TwillRuntimeSpecification;
import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
import org.apache.twill.internal.logging.Loggings;
import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
import org.apache.twill.internal.yarn.YarnAMClient;
import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
 * Main class for launching {@link ApplicationMasterService}.
 */
public final class ApplicationMasterMain extends ServiceMain {

  private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterMain.class);
  private final TwillRuntimeSpecification twillRuntimeSpec;

  /**
   * Starts the application master.
   */
  public static void main(String[] args) throws Exception {
    File twillSpec = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.TWILL_SPEC);
    TwillRuntimeSpecification twillRuntimeSpec = TwillRuntimeSpecificationAdapter.create().fromJson(twillSpec);

    new ApplicationMasterMain(twillRuntimeSpec).doMain();
  }

  private ApplicationMasterMain(TwillRuntimeSpecification twillRuntimeSpec) {
    this.twillRuntimeSpec = twillRuntimeSpec;
  }

  private void doMain() throws Exception {
    RunId runId = twillRuntimeSpec.getTwillAppRunId();

    ZKClientService zkClientService = createZKClient();
    Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
    setRMSchedulerAddress(conf, twillRuntimeSpec.getRmSchedulerAddr());

    final YarnAMClient amClient = new VersionDetectYarnAMClientFactory(conf).create();
    ApplicationMasterService service =
      new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, amClient, conf,
                                   createAppLocation(conf, twillRuntimeSpec.getFsUser(),
                                                     twillRuntimeSpec.getTwillAppDir()));
    TrackerService trackerService = new TrackerService(service);

    List<Service> prerequisites = Lists.newArrayList(
      new YarnAMClientService(amClient, trackerService),
      zkClientService,
      new AppMasterTwillZKPathService(zkClientService, runId)
    );

    if (twillRuntimeSpec.isLogCollectionEnabled()) {
      prerequisites.add(new ApplicationKafkaService(zkClientService, twillRuntimeSpec.getKafkaZKConnect()));
    } else {
      LOG.info("Log collection through kafka disabled");
    }

    new ApplicationMasterMain(twillRuntimeSpec)
      .doMain(
        service,
        prerequisites.toArray(new Service[prerequisites.size()])
      );
  }

  /**
   * Optionally sets the RM scheduler address based on the environment variable if it is not set in the cluster config.
   */
  private static void setRMSchedulerAddress(Configuration conf, String schedulerAddress) {
    if (schedulerAddress == null) {
      return;
    }

    // If the RM scheduler address is not in the config or it's from yarn-default.xml,
    // replace it with the one from the env, which is the same as the one client connected to.
    String[] sources = conf.getPropertySources(YarnConfiguration.RM_SCHEDULER_ADDRESS);
    if (sources == null || sources.length == 0 || "yarn-default.xml".equals(sources[sources.length - 1])) {
      conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, schedulerAddress);
    }
  }

  @Override
  protected String getHostname() {
    try {
      return InetAddress.getLocalHost().getCanonicalHostName();
    } catch (UnknownHostException e) {
      return "unknown";
    }
  }

  @Override
  protected TwillRuntimeSpecification getTwillRuntimeSpecification() {
    return twillRuntimeSpec;
  }

  @Nullable
  @Override
  protected String getRunnableName() {
    // No runnable name for the AM
    return null;
  }

  /**
   * A service wrapper for starting/stopping {@link EmbeddedKafkaServer} and make sure the ZK path for
   * Kafka exists before starting the Kafka server.
   */
  private static final class ApplicationKafkaService extends AbstractIdleService {

    private static final Logger LOG = LoggerFactory.getLogger(ApplicationKafkaService.class);

    private final ZKClient zkClient;
    private final EmbeddedKafkaServer kafkaServer;
    private final String kafkaZKPath;

    private ApplicationKafkaService(ZKClient zkClient, String kafkaZKConnect) {
      this.zkClient = zkClient;
      this.kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(kafkaZKConnect));
      this.kafkaZKPath = kafkaZKConnect.substring(zkClient.getConnectString().length());
    }

    @Override
    protected void startUp() throws Exception {
      // Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is
      // no left over content from previous AM attempt.
      LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath);
      ZKOperations.createDeleteIfExists(zkClient, kafkaZKPath, null, CreateMode.PERSISTENT, true).get();
      kafkaServer.startAndWait();
    }

    @Override
    protected void shutDown() throws Exception {
      // Flush all logs before shutting down Kafka server
      Loggings.forceFlush();
      // Delay for 2 seconds to give clients chance to poll the last batch of log messages.
      try {
        TimeUnit.SECONDS.sleep(2);
      } catch (InterruptedException e) {
        // Ignore
        LOG.info("Kafka shutdown delay interrupted", e);
      } finally {
        kafkaServer.stopAndWait();
      }
    }

    private Properties generateKafkaConfig(String kafkaZKConnect) {
      Properties prop = new Properties();
      prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
      prop.setProperty("broker.id", "1");
      prop.setProperty("socket.send.buffer.bytes", "1048576");
      prop.setProperty("socket.receive.buffer.bytes", "1048576");
      prop.setProperty("socket.request.max.bytes", "104857600");
      prop.setProperty("num.partitions", "1");
      prop.setProperty("log.retention.hours", "24");
      prop.setProperty("log.flush.interval.messages", "10000");
      prop.setProperty("log.flush.interval.ms", "1000");
      prop.setProperty("log.segment.bytes", "536870912");
      prop.setProperty("zookeeper.connect", kafkaZKConnect);
      // Set the connection timeout to relatively short time (3 seconds).
      // It is only used by the org.I0Itec.zkclient.ZKClient inside KafkaServer
      // to block and wait for ZK connection goes into SyncConnected state.
      // However, due to race condition described in TWILL-139 in the ZK client library used by Kafka,
      // when ZK authentication is enabled, the ZK client may hang until connection timeout.
      // Setting it to lower value allow the AM to retry multiple times if race happens.
      prop.setProperty("zookeeper.connection.timeout.ms", "3000");
      prop.setProperty("default.replication.factor", "1");
      return prop;
    }
  }


  /**
   * A Service wrapper that starts {@link TrackerService} and {@link YarnAMClient}. It is needed because
   * the tracker host and url needs to be provided to {@link YarnAMClient} before it starts {@link YarnAMClient}.
   */
  private static final class YarnAMClientService extends AbstractIdleService {

    private final YarnAMClient yarnAMClient;
    private final TrackerService trackerService;

    private YarnAMClientService(YarnAMClient yarnAMClient, TrackerService trackerService) {
      this.yarnAMClient = yarnAMClient;
      this.trackerService = trackerService;
    }

    @Override
    protected void startUp() throws Exception {
      trackerService.setHost(yarnAMClient.getHost());
      trackerService.startAndWait();

      yarnAMClient.setTracker(trackerService.getBindAddress(), trackerService.getUrl());
      try {
        yarnAMClient.startAndWait();
      } catch (Exception e) {
        trackerService.stopAndWait();
        throw e;
      }
    }

    @Override
    protected void shutDown() throws Exception {
      try {
        yarnAMClient.stopAndWait();
      } finally {
        trackerService.stopAndWait();
      }
    }
  }

  private static final class AppMasterTwillZKPathService extends TwillZKPathService {

    private static final Logger LOG = LoggerFactory.getLogger(AppMasterTwillZKPathService.class);
    private final ZKClient zkClient;

    AppMasterTwillZKPathService(ZKClient zkClient, RunId runId) {
      super(zkClient, runId);
      this.zkClient = zkClient;
    }

    @Override
    protected void shutDown() throws Exception {
      super.shutDown();

      // Deletes ZK nodes created for the application execution.
      // We don't have to worry about a race condition if another instance of the same app starts at the same time
      // as when removal is performed. This is because we always create nodes with "createParent == true",
      // which takes care of the parent node recreation if it is removed from here.

      // Try to delete the /instances path. It may throws NotEmptyException if there are other instances of the
      // same app running, which we can safely ignore and return.
      if (!delete(Constants.INSTANCES_PATH_PREFIX)) {
        return;
      }

      // Try to delete children under /discovery. It may fail with NotEmptyException if there are other instances
      // of the same app running that has discovery services running.
      List<String> children = getChildren(Constants.DISCOVERY_PATH_PREFIX);
      List<OperationFuture<?>> deleteFutures = new ArrayList<>();
      for (String child : children) {
        String path = Constants.DISCOVERY_PATH_PREFIX + "/" + child;
        LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path);
        deleteFutures.add(zkClient.delete(path));
      }
      Futures.successfulAsList(deleteFutures).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
      for (OperationFuture<?> future : deleteFutures) {
        try {
          future.get();
        } catch (ExecutionException e) {
          if (e.getCause() instanceof KeeperException.NotEmptyException) {
            // If any deletion of the service failed with not empty, if means there are other apps running,
            // hence just return
            return;
          }
          if (e.getCause() instanceof KeeperException.NoNodeException) {
            // If the service node is gone, it maybe deleted by another app instance that is also shutting down,
            // hence just keep going
            continue;
          }
          throw e;
        }
      }

      // Delete the /discovery. It may fail with NotEmptyException (due to race between apps),
      // which can safely ignore and return.
      if (!delete(Constants.DISCOVERY_PATH_PREFIX)) {
        return;
      }

      // Delete the ZK path for the app namespace.
      delete("/");
    }

    /**
     * Deletes the given ZK path.
     *
     * @param path path to delete
     * @return true if the path was deleted, false if failed to delete due to {@link KeeperException.NotEmptyException}.
     * @throws Exception if failed to delete the path
     */
    private boolean delete(String path) throws Exception {
      try {
        LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path);
        zkClient.delete(path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
        return true;
      } catch (ExecutionException e) {
        if (e.getCause() instanceof KeeperException.NotEmptyException) {
          return false;
        }
        if (e.getCause() instanceof KeeperException.NoNodeException) {
          // If the node to be deleted was not created or is already gone, it is the same as delete successfully.
          return true;
        }
        throw e;
      }
    }

    /**
     * Returns the list of children node under the given path.
     *
     * @param path path to get children
     * @return the list of children or empty list if the path doesn't exist.
     * @throws Exception if failed to get children
     */
    private List<String> getChildren(String path) throws Exception {
      try {
        return zkClient.getChildren(path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getChildren();
      } catch (ExecutionException e) {
        if (e.getCause() instanceof KeeperException.NoNodeException) {
          // If the node doesn't exists, return an empty list
          return Collections.emptyList();
        }
        throw e;
      }
    }
  }
}
