| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.runtime.entrypoint; |
| |
| import org.apache.flink.api.common.time.Time; |
| import org.apache.flink.configuration.ClusterOptions; |
| import org.apache.flink.configuration.ConfigOption; |
| import org.apache.flink.configuration.ConfigOptions; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.configuration.ConfigurationUtils; |
| import org.apache.flink.configuration.GlobalConfiguration; |
| import org.apache.flink.configuration.HighAvailabilityOptions; |
| import org.apache.flink.configuration.IllegalConfigurationException; |
| import org.apache.flink.configuration.JMXServerOptions; |
| import org.apache.flink.configuration.JobManagerOptions; |
| import org.apache.flink.configuration.RestOptions; |
| import org.apache.flink.configuration.SchedulerExecutionMode; |
| import org.apache.flink.configuration.WebOptions; |
| import org.apache.flink.core.fs.FileSystem; |
| import org.apache.flink.core.plugin.PluginManager; |
| import org.apache.flink.core.plugin.PluginUtils; |
| import org.apache.flink.core.security.FlinkSecurityManager; |
| import org.apache.flink.management.jmx.JMXService; |
| import org.apache.flink.runtime.blob.BlobServer; |
| import org.apache.flink.runtime.clusterframework.ApplicationStatus; |
| import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore; |
| import org.apache.flink.runtime.dispatcher.MiniDispatcher; |
| import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent; |
| import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; |
| import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; |
| import org.apache.flink.runtime.heartbeat.HeartbeatServices; |
| import org.apache.flink.runtime.highavailability.HighAvailabilityServices; |
| import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; |
| import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; |
| import org.apache.flink.runtime.metrics.MetricRegistryImpl; |
| import org.apache.flink.runtime.metrics.ReporterSetup; |
| import org.apache.flink.runtime.metrics.groups.ProcessMetricGroup; |
| import org.apache.flink.runtime.metrics.util.MetricUtils; |
| import org.apache.flink.runtime.resourcemanager.ResourceManager; |
| import org.apache.flink.runtime.rpc.AddressResolution; |
| import org.apache.flink.runtime.rpc.FatalErrorHandler; |
| import org.apache.flink.runtime.rpc.RpcService; |
| import org.apache.flink.runtime.rpc.RpcSystem; |
| import org.apache.flink.runtime.rpc.RpcSystemUtils; |
| import org.apache.flink.runtime.rpc.RpcUtils; |
| import org.apache.flink.runtime.security.SecurityConfiguration; |
| import org.apache.flink.runtime.security.SecurityUtils; |
| import org.apache.flink.runtime.security.contexts.SecurityContext; |
| import org.apache.flink.runtime.util.ZooKeeperUtils; |
| import org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever; |
| import org.apache.flink.util.AutoCloseableAsync; |
| import org.apache.flink.util.ExceptionUtils; |
| import org.apache.flink.util.ExecutorUtils; |
| import org.apache.flink.util.FileUtils; |
| import org.apache.flink.util.Preconditions; |
| import org.apache.flink.util.ShutdownHookUtil; |
| import org.apache.flink.util.concurrent.ExecutorThreadFactory; |
| import org.apache.flink.util.concurrent.FutureUtils; |
| import org.apache.flink.util.concurrent.ScheduledExecutor; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.annotation.Nullable; |
| import javax.annotation.concurrent.GuardedBy; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.lang.reflect.UndeclaredThrowableException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| /** |
| * Base class for the Flink cluster entry points. |
| * |
| * <p>Specialization of this class can be used for the session mode and the per-job mode |
| */ |
| public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErrorHandler { |
| |
| public static final ConfigOption<String> INTERNAL_CLUSTER_EXECUTION_MODE = |
| ConfigOptions.key("internal.cluster.execution-mode") |
| .defaultValue(ExecutionMode.NORMAL.toString()); |
| |
| protected static final Logger LOG = LoggerFactory.getLogger(ClusterEntrypoint.class); |
| |
| protected static final int STARTUP_FAILURE_RETURN_CODE = 1; |
| protected static final int RUNTIME_FAILURE_RETURN_CODE = 2; |
| |
| private static final Time INITIALIZATION_SHUTDOWN_TIMEOUT = Time.seconds(30L); |
| |
| /** The lock to guard startup / shutdown / manipulation methods. */ |
| private final Object lock = new Object(); |
| |
| private final Configuration configuration; |
| |
| private final CompletableFuture<ApplicationStatus> terminationFuture; |
| |
| private final AtomicBoolean isShutDown = new AtomicBoolean(false); |
| |
| @GuardedBy("lock") |
| private DispatcherResourceManagerComponent clusterComponent; |
| |
| @GuardedBy("lock") |
| private MetricRegistryImpl metricRegistry; |
| |
| @GuardedBy("lock") |
| private ProcessMetricGroup processMetricGroup; |
| |
| @GuardedBy("lock") |
| private HighAvailabilityServices haServices; |
| |
| @GuardedBy("lock") |
| private BlobServer blobServer; |
| |
| @GuardedBy("lock") |
| private HeartbeatServices heartbeatServices; |
| |
| @GuardedBy("lock") |
| private RpcService commonRpcService; |
| |
| @GuardedBy("lock") |
| private ExecutorService ioExecutor; |
| |
| private ExecutionGraphInfoStore executionGraphInfoStore; |
| |
| private final Thread shutDownHook; |
| private RpcSystem rpcSystem; |
| |
| protected ClusterEntrypoint(Configuration configuration) { |
| this.configuration = generateClusterConfiguration(configuration); |
| this.terminationFuture = new CompletableFuture<>(); |
| |
| if (configuration.get(JobManagerOptions.SCHEDULER_MODE) == SchedulerExecutionMode.REACTIVE |
| && !supportsReactiveMode()) { |
| final String msg = |
| "Reactive mode is configured for an unsupported cluster type. At the moment, reactive mode is only supported by standalone application clusters (bin/standalone-job.sh)."; |
| // log message as well, otherwise the error is only shown in the .out file of the |
| // cluster |
| LOG.error(msg); |
| throw new IllegalConfigurationException(msg); |
| } |
| |
| shutDownHook = |
| ShutdownHookUtil.addShutdownHook( |
| () -> this.closeAsync().join(), getClass().getSimpleName(), LOG); |
| } |
| |
| public CompletableFuture<ApplicationStatus> getTerminationFuture() { |
| return terminationFuture; |
| } |
| |
| public void startCluster() throws ClusterEntrypointException { |
| LOG.info("Starting {}.", getClass().getSimpleName()); |
| |
| try { |
| FlinkSecurityManager.setFromConfiguration(configuration); |
| PluginManager pluginManager = |
| PluginUtils.createPluginManagerFromRootFolder(configuration); |
| configureFileSystems(configuration, pluginManager); |
| |
| SecurityContext securityContext = installSecurityContext(configuration); |
| |
| ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration); |
| securityContext.runSecured( |
| (Callable<Void>) |
| () -> { |
| runCluster(configuration, pluginManager); |
| |
| return null; |
| }); |
| } catch (Throwable t) { |
| final Throwable strippedThrowable = |
| ExceptionUtils.stripException(t, UndeclaredThrowableException.class); |
| |
| try { |
| // clean up any partial state |
| shutDownAsync( |
| ApplicationStatus.FAILED, |
| ShutdownBehaviour.STOP_APPLICATION, |
| ExceptionUtils.stringifyException(strippedThrowable), |
| false) |
| .get( |
| INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), |
| TimeUnit.MILLISECONDS); |
| } catch (InterruptedException | ExecutionException | TimeoutException e) { |
| strippedThrowable.addSuppressed(e); |
| } |
| |
| throw new ClusterEntrypointException( |
| String.format( |
| "Failed to initialize the cluster entrypoint %s.", |
| getClass().getSimpleName()), |
| strippedThrowable); |
| } |
| } |
| |
| protected boolean supportsReactiveMode() { |
| return false; |
| } |
| |
| private void configureFileSystems(Configuration configuration, PluginManager pluginManager) { |
| LOG.info("Install default filesystem."); |
| FileSystem.initialize(configuration, pluginManager); |
| } |
| |
| private SecurityContext installSecurityContext(Configuration configuration) throws Exception { |
| LOG.info("Install security context."); |
| |
| SecurityUtils.install(new SecurityConfiguration(configuration)); |
| |
| return SecurityUtils.getInstalledContext(); |
| } |
| |
| private void runCluster(Configuration configuration, PluginManager pluginManager) |
| throws Exception { |
| synchronized (lock) { |
| initializeServices(configuration, pluginManager); |
| |
| // write host information into configuration |
| configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); |
| configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); |
| |
| final DispatcherResourceManagerComponentFactory |
| dispatcherResourceManagerComponentFactory = |
| createDispatcherResourceManagerComponentFactory(configuration); |
| |
| clusterComponent = |
| dispatcherResourceManagerComponentFactory.create( |
| configuration, |
| ioExecutor, |
| commonRpcService, |
| haServices, |
| blobServer, |
| heartbeatServices, |
| metricRegistry, |
| executionGraphInfoStore, |
| new RpcMetricQueryServiceRetriever( |
| metricRegistry.getMetricQueryServiceRpcService()), |
| this); |
| |
| clusterComponent |
| .getShutDownFuture() |
| .whenComplete( |
| (ApplicationStatus applicationStatus, Throwable throwable) -> { |
| if (throwable != null) { |
| shutDownAsync( |
| ApplicationStatus.UNKNOWN, |
| ShutdownBehaviour.STOP_APPLICATION, |
| ExceptionUtils.stringifyException(throwable), |
| false); |
| } else { |
| // This is the general shutdown path. If a separate more |
| // specific shutdown was |
| // already triggered, this will do nothing |
| shutDownAsync( |
| applicationStatus, |
| ShutdownBehaviour.STOP_APPLICATION, |
| null, |
| true); |
| } |
| }); |
| } |
| } |
| |
| protected void initializeServices(Configuration configuration, PluginManager pluginManager) |
| throws Exception { |
| |
| LOG.info("Initializing cluster services."); |
| |
| synchronized (lock) { |
| rpcSystem = RpcSystem.load(configuration); |
| |
| commonRpcService = |
| RpcUtils.createRemoteRpcService( |
| rpcSystem, |
| configuration, |
| configuration.getString(JobManagerOptions.ADDRESS), |
| getRPCPortRange(configuration), |
| configuration.getString(JobManagerOptions.BIND_HOST), |
| configuration.getOptional(JobManagerOptions.RPC_BIND_PORT)); |
| |
| JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT)); |
| |
| // update the configuration used to create the high availability services |
| configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); |
| configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); |
| |
| ioExecutor = |
| Executors.newFixedThreadPool( |
| ClusterEntrypointUtils.getPoolSize(configuration), |
| new ExecutorThreadFactory("cluster-io")); |
| haServices = createHaServices(configuration, ioExecutor, rpcSystem); |
| blobServer = new BlobServer(configuration, haServices.createBlobStore()); |
| blobServer.start(); |
| heartbeatServices = createHeartbeatServices(configuration); |
| metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem); |
| |
| final RpcService metricQueryServiceRpcService = |
| MetricUtils.startRemoteMetricsRpcService( |
| configuration, commonRpcService.getAddress(), rpcSystem); |
| metricRegistry.startQueryService(metricQueryServiceRpcService, null); |
| |
| final String hostname = RpcUtils.getHostname(commonRpcService); |
| |
| processMetricGroup = |
| MetricUtils.instantiateProcessMetricGroup( |
| metricRegistry, |
| hostname, |
| ConfigurationUtils.getSystemResourceMetricsProbingInterval( |
| configuration)); |
| |
| executionGraphInfoStore = |
| createSerializableExecutionGraphStore( |
| configuration, commonRpcService.getScheduledExecutor()); |
| } |
| } |
| |
| /** |
| * Returns the port range for the common {@link RpcService}. |
| * |
| * @param configuration to extract the port range from |
| * @return Port range for the common {@link RpcService} |
| */ |
| protected String getRPCPortRange(Configuration configuration) { |
| if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) { |
| return configuration.getString(HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE); |
| } else { |
| return String.valueOf(configuration.getInteger(JobManagerOptions.PORT)); |
| } |
| } |
| |
| protected HighAvailabilityServices createHaServices( |
| Configuration configuration, Executor executor, RpcSystemUtils rpcSystemUtils) |
| throws Exception { |
| return HighAvailabilityServicesUtils.createHighAvailabilityServices( |
| configuration, executor, AddressResolution.NO_ADDRESS_RESOLUTION, rpcSystemUtils); |
| } |
| |
| protected HeartbeatServices createHeartbeatServices(Configuration configuration) { |
| return HeartbeatServices.fromConfiguration(configuration); |
| } |
| |
| protected MetricRegistryImpl createMetricRegistry( |
| Configuration configuration, |
| PluginManager pluginManager, |
| RpcSystemUtils rpcSystemUtils) { |
| return new MetricRegistryImpl( |
| MetricRegistryConfiguration.fromConfiguration( |
| configuration, rpcSystemUtils.getMaximumMessageSizeInBytes(configuration)), |
| ReporterSetup.fromConfiguration(configuration, pluginManager)); |
| } |
| |
| @Override |
| public CompletableFuture<Void> closeAsync() { |
| ShutdownHookUtil.removeShutdownHook(shutDownHook, getClass().getSimpleName(), LOG); |
| |
| return shutDownAsync( |
| ApplicationStatus.UNKNOWN, |
| ShutdownBehaviour.STOP_PROCESS, |
| "Cluster entrypoint has been closed externally.", |
| false) |
| .thenAccept(ignored -> {}); |
| } |
| |
| protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) { |
| final long shutdownTimeout = |
| configuration.getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT); |
| |
| synchronized (lock) { |
| Throwable exception = null; |
| |
| final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3); |
| |
| if (blobServer != null) { |
| try { |
| blobServer.close(); |
| } catch (Throwable t) { |
| exception = ExceptionUtils.firstOrSuppressed(t, exception); |
| } |
| } |
| |
| if (haServices != null) { |
| try { |
| if (cleanupHaData) { |
| haServices.closeAndCleanupAllData(); |
| } else { |
| haServices.close(); |
| } |
| } catch (Throwable t) { |
| exception = ExceptionUtils.firstOrSuppressed(t, exception); |
| } |
| } |
| |
| if (executionGraphInfoStore != null) { |
| try { |
| executionGraphInfoStore.close(); |
| } catch (Throwable t) { |
| exception = ExceptionUtils.firstOrSuppressed(t, exception); |
| } |
| } |
| |
| if (processMetricGroup != null) { |
| processMetricGroup.close(); |
| } |
| |
| if (metricRegistry != null) { |
| terminationFutures.add(metricRegistry.shutdown()); |
| } |
| |
| if (ioExecutor != null) { |
| terminationFutures.add( |
| ExecutorUtils.nonBlockingShutdown( |
| shutdownTimeout, TimeUnit.MILLISECONDS, ioExecutor)); |
| } |
| |
| if (commonRpcService != null) { |
| terminationFutures.add(commonRpcService.stopService()); |
| } |
| |
| try { |
| JMXService.stopInstance(); |
| } catch (Throwable t) { |
| exception = ExceptionUtils.firstOrSuppressed(t, exception); |
| } |
| |
| if (exception != null) { |
| terminationFutures.add(FutureUtils.completedExceptionally(exception)); |
| } |
| |
| return FutureUtils.completeAll(terminationFutures); |
| } |
| } |
| |
| @Override |
| public void onFatalError(Throwable exception) { |
| ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(exception); |
| LOG.error("Fatal error occurred in the cluster entrypoint.", exception); |
| |
| FlinkSecurityManager.forceProcessExit(RUNTIME_FAILURE_RETURN_CODE); |
| } |
| |
| // -------------------------------------------------- |
| // Internal methods |
| // -------------------------------------------------- |
| |
| private Configuration generateClusterConfiguration(Configuration configuration) { |
| final Configuration resultConfiguration = |
| new Configuration(Preconditions.checkNotNull(configuration)); |
| |
| final String webTmpDir = configuration.getString(WebOptions.TMP_DIR); |
| final File uniqueWebTmpDir = new File(webTmpDir, "flink-web-" + UUID.randomUUID()); |
| |
| resultConfiguration.setString(WebOptions.TMP_DIR, uniqueWebTmpDir.getAbsolutePath()); |
| |
| return resultConfiguration; |
| } |
| |
| private CompletableFuture<ApplicationStatus> shutDownAsync( |
| ApplicationStatus applicationStatus, |
| ShutdownBehaviour shutdownBehaviour, |
| @Nullable String diagnostics, |
| boolean cleanupHaData) { |
| if (isShutDown.compareAndSet(false, true)) { |
| LOG.info( |
| "Shutting {} down with application status {}. Diagnostics {}.", |
| getClass().getSimpleName(), |
| applicationStatus, |
| diagnostics); |
| |
| final CompletableFuture<Void> shutDownApplicationFuture = |
| closeClusterComponent(applicationStatus, shutdownBehaviour, diagnostics); |
| |
| final CompletableFuture<Void> serviceShutdownFuture = |
| FutureUtils.composeAfterwards( |
| shutDownApplicationFuture, () -> stopClusterServices(cleanupHaData)); |
| |
| final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture = |
| FutureUtils.runAfterwards(serviceShutdownFuture, rpcSystem::close); |
| |
| final CompletableFuture<Void> cleanupDirectoriesFuture = |
| FutureUtils.runAfterwards( |
| rpcSystemClassLoaderCloseFuture, this::cleanupDirectories); |
| |
| cleanupDirectoriesFuture.whenComplete( |
| (Void ignored2, Throwable serviceThrowable) -> { |
| if (serviceThrowable != null) { |
| terminationFuture.completeExceptionally(serviceThrowable); |
| } else { |
| terminationFuture.complete(applicationStatus); |
| } |
| }); |
| } |
| |
| return terminationFuture; |
| } |
| |
| /** |
| * Close cluster components and deregister the Flink application from the resource management |
| * system by signalling the {@link ResourceManager}. |
| * |
| * @param applicationStatus to terminate the application with |
| * @param shutdownBehaviour shutdown behaviour |
| * @param diagnostics additional information about the shut down, can be {@code null} |
| * @return Future which is completed once the shut down |
| */ |
| private CompletableFuture<Void> closeClusterComponent( |
| ApplicationStatus applicationStatus, |
| ShutdownBehaviour shutdownBehaviour, |
| @Nullable String diagnostics) { |
| synchronized (lock) { |
| if (clusterComponent != null) { |
| switch (shutdownBehaviour) { |
| case STOP_APPLICATION: |
| return clusterComponent.stopApplication(applicationStatus, diagnostics); |
| case STOP_PROCESS: |
| default: |
| return clusterComponent.stopProcess(); |
| } |
| } else { |
| return CompletableFuture.completedFuture(null); |
| } |
| } |
| } |
| |
| /** |
| * Clean up of temporary directories created by the {@link ClusterEntrypoint}. |
| * |
| * @throws IOException if the temporary directories could not be cleaned up |
| */ |
| protected void cleanupDirectories() throws IOException { |
| final String webTmpDir = configuration.getString(WebOptions.TMP_DIR); |
| |
| FileUtils.deleteDirectory(new File(webTmpDir)); |
| } |
| |
| // -------------------------------------------------- |
| // Abstract methods |
| // -------------------------------------------------- |
| |
| protected abstract DispatcherResourceManagerComponentFactory |
| createDispatcherResourceManagerComponentFactory(Configuration configuration) |
| throws IOException; |
| |
| protected abstract ExecutionGraphInfoStore createSerializableExecutionGraphStore( |
| Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException; |
| |
| protected static EntrypointClusterConfiguration parseArguments(String[] args) |
| throws FlinkParseException { |
| final CommandLineParser<EntrypointClusterConfiguration> clusterConfigurationParser = |
| new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory()); |
| |
| return clusterConfigurationParser.parse(args); |
| } |
| |
| protected static Configuration loadConfiguration( |
| EntrypointClusterConfiguration entrypointClusterConfiguration) { |
| final Configuration dynamicProperties = |
| ConfigurationUtils.createConfiguration( |
| entrypointClusterConfiguration.getDynamicProperties()); |
| final Configuration configuration = |
| GlobalConfiguration.loadConfiguration( |
| entrypointClusterConfiguration.getConfigDir(), dynamicProperties); |
| |
| final int restPort = entrypointClusterConfiguration.getRestPort(); |
| |
| if (restPort >= 0) { |
| configuration.setInteger(RestOptions.PORT, restPort); |
| } |
| |
| final String hostname = entrypointClusterConfiguration.getHostname(); |
| |
| if (hostname != null) { |
| configuration.setString(JobManagerOptions.ADDRESS, hostname); |
| } |
| |
| return configuration; |
| } |
| |
| // -------------------------------------------------- |
| // Helper methods |
| // -------------------------------------------------- |
| |
| public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { |
| |
| final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName(); |
| try { |
| clusterEntrypoint.startCluster(); |
| } catch (ClusterEntrypointException e) { |
| LOG.error( |
| String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), |
| e); |
| System.exit(STARTUP_FAILURE_RETURN_CODE); |
| } |
| |
| int returnCode; |
| Throwable throwable = null; |
| |
| try { |
| returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode(); |
| } catch (Throwable e) { |
| throwable = ExceptionUtils.stripExecutionException(e); |
| returnCode = RUNTIME_FAILURE_RETURN_CODE; |
| } |
| |
| LOG.info( |
| "Terminating cluster entrypoint process {} with exit code {}.", |
| clusterEntrypointName, |
| returnCode, |
| throwable); |
| System.exit(returnCode); |
| } |
| |
| /** Execution mode of the {@link MiniDispatcher}. */ |
| public enum ExecutionMode { |
| /** Waits until the job result has been served. */ |
| NORMAL, |
| |
| /** Directly stops after the job has finished. */ |
| DETACHED |
| } |
| |
| private enum ShutdownBehaviour { |
| STOP_APPLICATION, |
| STOP_PROCESS, |
| } |
| } |