| /** |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.aurora.scheduler.app; |
| |
| import java.net.InetSocketAddress; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import javax.inject.Inject; |
| |
| import com.beust.jcommander.Parameter; |
| import com.beust.jcommander.Parameters; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.net.HostAndPort; |
| import com.google.inject.AbstractModule; |
| import com.google.inject.Guice; |
| import com.google.inject.Injector; |
| import com.google.inject.Module; |
| import com.google.inject.util.Modules; |
| |
| import org.apache.aurora.GuavaUtils.ServiceManagerIface; |
| import org.apache.aurora.common.application.Lifecycle; |
| import org.apache.aurora.common.inject.Bindings; |
| import org.apache.aurora.common.stats.Stats; |
| import org.apache.aurora.common.zookeeper.SingletonService; |
| import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener; |
| import org.apache.aurora.gen.ServerInfo; |
| import org.apache.aurora.scheduler.AppStartup; |
| import org.apache.aurora.scheduler.SchedulerLifecycle; |
| import org.apache.aurora.scheduler.TierModule; |
| import org.apache.aurora.scheduler.config.CliOptions; |
| import org.apache.aurora.scheduler.config.CommandLine; |
| import org.apache.aurora.scheduler.config.validators.NotEmptyString; |
| import org.apache.aurora.scheduler.configuration.executor.ExecutorModule; |
| import org.apache.aurora.scheduler.cron.quartz.CronModule; |
| import org.apache.aurora.scheduler.discovery.FlaggedZooKeeperConfig; |
| import org.apache.aurora.scheduler.discovery.ServiceDiscoveryModule; |
| import org.apache.aurora.scheduler.events.WebhookModule; |
| import org.apache.aurora.scheduler.http.HttpService; |
| import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule; |
| import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule; |
| import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory.FrameworkInfoFactoryImpl.SchedulerProtocol; |
| import org.apache.aurora.scheduler.mesos.LibMesosLoadingModule; |
| import org.apache.aurora.scheduler.stats.StatsModule; |
| import org.apache.aurora.scheduler.storage.Storage.Volatile; |
| import org.apache.aurora.scheduler.storage.backup.BackupModule; |
| import org.apache.aurora.scheduler.storage.durability.DurableStorageModule; |
| import org.apache.aurora.scheduler.storage.entities.IServerInfo; |
| import org.apache.aurora.scheduler.storage.log.LogPersistenceModule; |
| import org.apache.aurora.scheduler.storage.log.SnapshotModule; |
| import org.apache.aurora.scheduler.storage.log.SnapshotterImpl; |
| import org.apache.aurora.scheduler.storage.mem.MemStorageModule; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Launcher for the aurora scheduler. |
| */ |
| public class SchedulerMain { |
| private static final Logger LOG = LoggerFactory.getLogger(SchedulerMain.class); |
| |
| @Parameters(separators = "=") |
| public static class Options { |
| @Parameter(names = "-cluster_name", |
| required = true, |
| description = "Name to identify the cluster being served.") |
| public String clusterName; |
| |
| @Parameter( |
| names = "-serverset_path", |
| required = true, |
| validateValueWith = NotEmptyString.class, |
| description = "ZooKeeper ServerSet path to register at.") |
| public String serversetPath; |
| |
| // TODO(zmanji): Consider making this an enum of HTTP or HTTPS. |
| @Parameter(names = "-serverset_endpoint_name", |
| description = "Name of the scheduler endpoint published in ZooKeeper.") |
| public String serversetEndpointName = "http"; |
| |
| // TODO(Suman Karumuri): Rename viz_job_url_prefix to stats_job_url_prefix for consistency. |
| @Parameter(names = "-viz_job_url_prefix", description = "URL prefix for job container stats.") |
| public String statsUrlPrefix = ""; |
| |
| @Parameter(names = "-allow_gpu_resource", |
| description = "Allow jobs to request Mesos GPU resource.", |
| arity = 1) |
| public boolean allowGpuResource = false; |
| |
| public enum DriverKind { |
| // TODO(zmanji): Remove this option once V0_DRIVER has been proven out in production. |
| // This is the original driver that libmesos shipped with. Uses unversioned protobufs, and has |
| // minimal backwards compatibility guarantees. |
| SCHEDULER_DRIVER, |
| // These are the new drivers that libmesos ships with. They use versioned (V1) protobufs for |
| // the Java API. |
| // V0 Driver offers the V1 API over the old Scheduler Driver. It does not fully support |
| // the V1 API (ie mesos maintenance). |
| V0_DRIVER, |
| // V1 Driver offers the V1 API over a full HTTP API implementation. It allows for maintenance |
| // primitives and other new features. |
| V1_DRIVER, |
| } |
| |
| @Parameter(names = "-mesos_driver", description = "Which Mesos Driver to use") |
| public DriverKind driverImpl = DriverKind.SCHEDULER_DRIVER; |
| } |
| |
| public static class ProtocolModule extends AbstractModule { |
| private final Options options; |
| |
| public ProtocolModule(Options options) { |
| this.options = options; |
| } |
| |
| @Override |
| protected void configure() { |
| bind(String.class) |
| .annotatedWith(SchedulerProtocol.class) |
| .toInstance(options.serversetEndpointName); |
| } |
| } |
| |
| @Inject private SingletonService schedulerService; |
| @Inject private HttpService httpService; |
| @Inject private SchedulerLifecycle schedulerLifecycle; |
| @Inject private Lifecycle appLifecycle; |
| @Inject |
| @AppStartup |
| private ServiceManagerIface startupServices; |
| |
| private void stop() { |
| LOG.info("Stopping scheduler services."); |
| try { |
| startupServices.stopAsync().awaitStopped(5L, TimeUnit.SECONDS); |
| } catch (TimeoutException e) { |
| LOG.info("Shutdown did not complete in time: " + e); |
| } |
| appLifecycle.shutdown(); |
| } |
| |
| void run(Options options) { |
| try { |
| startupServices.startAsync(); |
| Runtime.getRuntime().addShutdownHook(new Thread(SchedulerMain.this::stop, "ShutdownHook")); |
| startupServices.awaitHealthy(); |
| |
| LeadershipListener leaderListener = schedulerLifecycle.prepare(); |
| |
| HostAndPort httpAddress = httpService.getAddress(); |
| InetSocketAddress httpSocketAddress = |
| InetSocketAddress.createUnresolved(httpAddress.getHost(), httpAddress.getPort()); |
| |
| schedulerService.lead( |
| httpSocketAddress, |
| ImmutableMap.of(options.serversetEndpointName, httpSocketAddress), |
| leaderListener); |
| } catch (SingletonService.LeadException e) { |
| throw new IllegalStateException("Failed to lead service.", e); |
| } catch (InterruptedException e) { |
| throw new IllegalStateException("Interrupted while joining scheduler service group.", e); |
| } finally { |
| appLifecycle.awaitShutdown(); |
| stop(); |
| } |
| } |
| |
| @VisibleForTesting |
| static Module getUniversalModule(CliOptions options) { |
| return Modules.combine( |
| new ProtocolModule(options.main), |
| new LifecycleModule(), |
| new StatsModule(options.stats), |
| new AppModule(options), |
| new CronModule(options.cron), |
| new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class))); |
| } |
| |
| /** |
| * Runs the scheduler by including modules configured from command line arguments in |
| * addition to the provided environment-specific module. |
| * |
| * @param appEnvironmentModule Additional modules based on the execution environment. |
| */ |
| @VisibleForTesting |
| public static void flagConfiguredMain(CliOptions options, Module appEnvironmentModule) { |
| AtomicLong uncaughtExceptions = Stats.exportLong("uncaught_exceptions"); |
| Thread.setDefaultUncaughtExceptionHandler((t, e) -> { |
| uncaughtExceptions.incrementAndGet(); |
| |
| LOG.error("Uncaught exception from " + t + ":" + e, e); |
| }); |
| |
| Module module = Modules.combine( |
| appEnvironmentModule, |
| getUniversalModule(options), |
| new ServiceDiscoveryModule( |
| FlaggedZooKeeperConfig.create(options.zk), |
| options.main.serversetPath), |
| new BackupModule(options.backup, SnapshotterImpl.class), |
| new ExecutorModule(options.executor), |
| new AbstractModule() { |
| @Override |
| protected void configure() { |
| bind(CliOptions.class).toInstance(options); |
| bind(IServerInfo.class).toInstance( |
| IServerInfo.build( |
| new ServerInfo() |
| .setClusterName(options.main.clusterName) |
| .setStatsUrlPrefix(options.main.statsUrlPrefix))); |
| } |
| }); |
| |
| Lifecycle lifecycle = null; |
| try { |
| Injector injector = Guice.createInjector(module); |
| lifecycle = injector.getInstance(Lifecycle.class); |
| SchedulerMain scheduler = new SchedulerMain(); |
| injector.injectMembers(scheduler); |
| try { |
| scheduler.run(options.main); |
| } finally { |
| LOG.info("Application run() exited."); |
| } |
| } finally { |
| if (lifecycle != null) { |
| lifecycle.shutdown(); |
| } |
| } |
| } |
| |
| public static void main(String... args) { |
| CliOptions options = CommandLine.parseOptions(args); |
| |
| List<Module> modules = ImmutableList.<Module>builder() |
| .add( |
| new CommandLineDriverSettingsModule(options.driver, options.main.allowGpuResource), |
| new LibMesosLoadingModule(options.main.driverImpl), |
| new DurableStorageModule(), |
| new MesosLogStreamModule(options.mesosLog, FlaggedZooKeeperConfig.create(options.zk)), |
| new LogPersistenceModule(options.logPersistence), |
| new SnapshotModule(options.snapshot), |
| new TierModule(options.tiers), |
| new WebhookModule(options.webhook) |
| ) |
| .build(); |
| flagConfiguredMain(options, Modules.combine(modules)); |
| } |
| } |