/**
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.aurora.scheduler.app;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import javax.inject.Inject;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.util.Modules;

import org.apache.aurora.GuavaUtils.ServiceManagerIface;
import org.apache.aurora.common.application.Lifecycle;
import org.apache.aurora.common.inject.Bindings;
import org.apache.aurora.common.stats.Stats;
import org.apache.aurora.common.zookeeper.SingletonService;
import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
import org.apache.aurora.gen.ServerInfo;
import org.apache.aurora.scheduler.AppStartup;
import org.apache.aurora.scheduler.SchedulerLifecycle;
import org.apache.aurora.scheduler.TierModule;
import org.apache.aurora.scheduler.config.CliOptions;
import org.apache.aurora.scheduler.config.CommandLine;
import org.apache.aurora.scheduler.config.validators.NotEmptyString;
import org.apache.aurora.scheduler.configuration.executor.ExecutorModule;
import org.apache.aurora.scheduler.cron.quartz.CronModule;
import org.apache.aurora.scheduler.discovery.FlaggedZooKeeperConfig;
import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule;
import org.apache.aurora.scheduler.events.WebhookModule;
import org.apache.aurora.scheduler.http.HttpService;
import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule;
import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory.FrameworkInfoFactoryImpl.SchedulerProtocol;
import org.apache.aurora.scheduler.mesos.LibMesosLoadingModule;
import org.apache.aurora.scheduler.stats.StatsModule;
import org.apache.aurora.scheduler.storage.Storage.Volatile;
import org.apache.aurora.scheduler.storage.backup.BackupModule;
import org.apache.aurora.scheduler.storage.durability.DurableStorageModule;
import org.apache.aurora.scheduler.storage.entities.IServerInfo;
import org.apache.aurora.scheduler.storage.log.LogPersistenceModule;
import org.apache.aurora.scheduler.storage.log.SnapshotModule;
import org.apache.aurora.scheduler.storage.log.SnapshotterImpl;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Launcher for the aurora scheduler.
 */
public class SchedulerMain {
  private static final Logger LOG = LoggerFactory.getLogger(SchedulerMain.class);

  @Parameters(separators = "=")
  public static class Options {
    @Parameter(names = "-cluster_name",
        required = true,
        description = "Name to identify the cluster being served.")
    public String clusterName;

    @Parameter(
        names = "-serverset_path",
        required = true,
        validateValueWith = NotEmptyString.class,
        description = "ZooKeeper ServerSet path to register at.")
    public String serversetPath;

    // TODO(zmanji): Consider making this an enum of HTTP or HTTPS.
    @Parameter(names = "-serverset_endpoint_name",
        description = "Name of the scheduler endpoint published in ZooKeeper.")
    public String serversetEndpointName = "http";

    // TODO(Suman Karumuri): Rename viz_job_url_prefix to stats_job_url_prefix for consistency.
    @Parameter(names = "-viz_job_url_prefix", description = "URL prefix for job container stats.")
    public String statsUrlPrefix = "";

    @Parameter(names = "-allow_gpu_resource",
        description = "Allow jobs to request Mesos GPU resource.",
        arity = 1)
    public boolean allowGpuResource = false;

    public enum DriverKind {
      // TODO(zmanji): Remove this option once V0_DRIVER has been proven out in production.
      // This is the original driver that libmesos shipped with. Uses unversioned protobufs, and has
      // minimal backwards compatibility guarantees.
      SCHEDULER_DRIVER,
      // These are the new drivers that libmesos ships with. They use versioned (V1) protobufs for
      // the Java API.
      // V0 Driver offers the V1 API over the old Scheduler Driver. It does not fully support
      // the V1 API (ie mesos maintenance).
      V0_DRIVER,
      // V1 Driver offers the V1 API over a full HTTP API implementation. It allows for maintenance
      // primitives and other new features.
      V1_DRIVER,
    }

    @Parameter(names = "-mesos_driver", description = "Which Mesos Driver to use")
    public DriverKind driverImpl = DriverKind.SCHEDULER_DRIVER;
  }

  public static class ProtocolModule extends AbstractModule {
    private final Options options;

    public ProtocolModule(Options options) {
      this.options = options;
    }

    @Override
    protected void configure() {
      bind(String.class)
          .annotatedWith(SchedulerProtocol.class)
          .toInstance(options.serversetEndpointName);
    }
  }

  @Inject private SingletonService schedulerService;
  @Inject private HttpService httpService;
  @Inject private SchedulerLifecycle schedulerLifecycle;
  @Inject private Lifecycle appLifecycle;
  @Inject
  @AppStartup
  private ServiceManagerIface startupServices;

  private void stop() {
    LOG.info("Stopping scheduler services.");
    try {
      startupServices.stopAsync().awaitStopped(5L, TimeUnit.SECONDS);
    } catch (TimeoutException e) {
      LOG.info("Shutdown did not complete in time: " + e);
    }
    appLifecycle.shutdown();
  }

  void run(Options options) {
    try {
      startupServices.startAsync();
      Runtime.getRuntime().addShutdownHook(new Thread(SchedulerMain.this::stop, "ShutdownHook"));
      startupServices.awaitHealthy();

      LeadershipListener leaderListener = schedulerLifecycle.prepare();

      HostAndPort httpAddress = httpService.getAddress();
      InetSocketAddress httpSocketAddress =
          InetSocketAddress.createUnresolved(httpAddress.getHost(), httpAddress.getPort());

      schedulerService.lead(
          httpSocketAddress,
          ImmutableMap.of(options.serversetEndpointName, httpSocketAddress),
          leaderListener);
    } catch (SingletonService.LeadException e) {
      throw new IllegalStateException("Failed to lead service.", e);
    } catch (InterruptedException e) {
      throw new IllegalStateException("Interrupted while joining scheduler service group.", e);
    } finally {
      appLifecycle.awaitShutdown();
      stop();
    }
  }

  @VisibleForTesting
  static Module getUniversalModule(CliOptions options) {
    return Modules.combine(
        new ProtocolModule(options.main),
        new LifecycleModule(),
        new StatsModule(options.stats),
        new AppModule(options),
        new CronModule(options.cron),
        new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)));
  }

  /**
   * Runs the scheduler by including modules configured from command line arguments in
   * addition to the provided environment-specific module.
   *
   * @param appEnvironmentModule Additional modules based on the execution environment.
   */
  @VisibleForTesting
  public static void flagConfiguredMain(CliOptions options, Module appEnvironmentModule) {
    AtomicLong uncaughtExceptions = Stats.exportLong("uncaught_exceptions");
    Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
      uncaughtExceptions.incrementAndGet();

      LOG.error("Uncaught exception from " + t + ":" + e, e);
    });

    Module module = Modules.combine(
        appEnvironmentModule,
        getUniversalModule(options),
        new ServiceDiscoveryModule(
            FlaggedZooKeeperConfig.create(options.zk),
            options.main.serversetPath),
        new BackupModule(options.backup, SnapshotterImpl.class),
        new ExecutorModule(options.executor),
        new AbstractModule() {
          @Override
          protected void configure() {
            bind(CliOptions.class).toInstance(options);
            bind(IServerInfo.class).toInstance(
                IServerInfo.build(
                    new ServerInfo()
                        .setClusterName(options.main.clusterName)
                        .setStatsUrlPrefix(options.main.statsUrlPrefix)));
          }
        });

    Lifecycle lifecycle = null;
    try {
      Injector injector = Guice.createInjector(module);
      lifecycle = injector.getInstance(Lifecycle.class);
      SchedulerMain scheduler = new SchedulerMain();
      injector.injectMembers(scheduler);
      try {
        scheduler.run(options.main);
      } finally {
        LOG.info("Application run() exited.");
      }
    } finally {
      if (lifecycle != null) {
        lifecycle.shutdown();
      }
    }
  }

  public static void main(String... args) {
    CliOptions options = CommandLine.parseOptions(args);

    List<Module> modules = ImmutableList.<Module>builder()
        .add(
            new CommandLineDriverSettingsModule(options.driver, options.main.allowGpuResource),
            new LibMesosLoadingModule(options.main.driverImpl),
            new DurableStorageModule(),
            new MesosLogStreamModule(options.mesosLog, FlaggedZooKeeperConfig.create(options.zk)),
            new LogPersistenceModule(options.logPersistence),
            new SnapshotModule(options.snapshot),
            new TierModule(options.tiers),
            new WebhookModule(options.webhook)
        )
        .build();
    flagConfiguredMain(options, Modules.combine(modules));
  }
}
