| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher; |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.ExecutorService; |
| |
| import org.apache.hadoop.util.Shell; |
| import org.apache.hadoop.yarn.api.records.ContainerExitStatus; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.UnsupportedFileSystemException; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.util.concurrent.HadoopExecutors; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.event.Dispatcher; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; |
| import org.apache.hadoop.yarn.server.nodemanager.Context; |
| import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| /** |
| * The launcher for the containers. This service should be started only after |
| * the {@link ResourceLocalizationService} is started as it depends on creation |
| * of system directories on the local file-system. |
| * |
| */ |
| public class ContainersLauncher extends AbstractService |
| implements AbstractContainersLauncher { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(ContainersLauncher.class); |
| |
| private Context context; |
| private ContainerExecutor exec; |
| private Dispatcher dispatcher; |
| private ContainerManagerImpl containerManager; |
| |
| private LocalDirsHandlerService dirsHandler; |
| @VisibleForTesting |
| public ExecutorService containerLauncher = |
| HadoopExecutors.newCachedThreadPool( |
| new ThreadFactoryBuilder() |
| .setNameFormat("ContainersLauncher #%d") |
| .build()); |
| @VisibleForTesting |
| public final Map<ContainerId, ContainerLaunch> running = |
| Collections.synchronizedMap(new HashMap<ContainerId, ContainerLaunch>()); |
| |
| public ContainersLauncher() { |
| super("containers-launcher"); |
| } |
| |
| @VisibleForTesting |
| public ContainersLauncher(Context context, Dispatcher dispatcher, |
| ContainerExecutor exec, LocalDirsHandlerService dirsHandler, |
| ContainerManagerImpl containerManager) { |
| this(); |
| init(context, dispatcher, exec, dirsHandler, containerManager); |
| } |
| |
| @Override |
| public void init(Context nmContext, Dispatcher nmDispatcher, |
| ContainerExecutor containerExec, LocalDirsHandlerService nmDirsHandler, |
| ContainerManagerImpl nmContainerManager) { |
| this.exec = containerExec; |
| this.context = nmContext; |
| this.dispatcher = nmDispatcher; |
| this.dirsHandler = nmDirsHandler; |
| this.containerManager = nmContainerManager; |
| } |
| |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| try { |
| //TODO Is this required? |
| FileContext.getLocalFSFileContext(conf); |
| } catch (UnsupportedFileSystemException e) { |
| throw new YarnRuntimeException("Failed to start ContainersLauncher", e); |
| } |
| super.serviceInit(conf); |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| containerLauncher.shutdownNow(); |
| super.serviceStop(); |
| } |
| |
| @Override |
| public void handle(ContainersLauncherEvent event) { |
| // TODO: ContainersLauncher launches containers one by one!! |
| Container container = event.getContainer(); |
| ContainerId containerId = container.getContainerId(); |
| switch (event.getType()) { |
| case LAUNCH_CONTAINER: |
| Application app = |
| context.getApplications().get( |
| containerId.getApplicationAttemptId().getApplicationId()); |
| |
| ContainerLaunch launch = |
| new ContainerLaunch(context, getConfig(), dispatcher, exec, app, |
| event.getContainer(), dirsHandler, containerManager); |
| containerLauncher.submit(launch); |
| running.put(containerId, launch); |
| break; |
| case RELAUNCH_CONTAINER: |
| app = context.getApplications().get( |
| containerId.getApplicationAttemptId().getApplicationId()); |
| |
| ContainerRelaunch relaunch = |
| new ContainerRelaunch(context, getConfig(), dispatcher, exec, app, |
| event.getContainer(), dirsHandler, containerManager); |
| containerLauncher.submit(relaunch); |
| running.put(containerId, relaunch); |
| break; |
| case RECOVER_CONTAINER: |
| app = context.getApplications().get( |
| containerId.getApplicationAttemptId().getApplicationId()); |
| launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher, |
| exec, app, event.getContainer(), dirsHandler, containerManager); |
| containerLauncher.submit(launch); |
| running.put(containerId, launch); |
| break; |
| case RECOVER_PAUSED_CONTAINER: |
| app = context.getApplications().get( |
| containerId.getApplicationAttemptId().getApplicationId()); |
| launch = new RecoverPausedContainerLaunch(context, getConfig(), |
| dispatcher, exec, app, event.getContainer(), dirsHandler, |
| containerManager); |
| containerLauncher.submit(launch); |
| break; |
| case CLEANUP_CONTAINER: |
| case CLEANUP_CONTAINER_FOR_REINIT: |
| ContainerLaunch launcher = running.remove(containerId); |
| if (launcher == null) { |
| // Container not launched. |
| // triggering KILLING to CONTAINER_CLEANEDUP_AFTER_KILL transition. |
| dispatcher.getEventHandler().handle( |
| new ContainerExitEvent(containerId, |
| ContainerEventType.CONTAINER_KILLED_ON_REQUEST, |
| Shell.WINDOWS ? ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode() : |
| ContainerExecutor.ExitCode.TERMINATED.getExitCode(), |
| "Container terminated before launch.")); |
| return; |
| } |
| |
| // Cleanup a container whether it is running/killed/completed, so that |
| // no sub-processes are alive. |
| try { |
| launcher.cleanupContainer(); |
| } catch (IOException e) { |
| LOG.warn("Got exception while cleaning container " + containerId |
| + ". Ignoring."); |
| } |
| break; |
| case SIGNAL_CONTAINER: |
| SignalContainersLauncherEvent signalEvent = |
| (SignalContainersLauncherEvent) event; |
| ContainerLaunch runningContainer = running.get(containerId); |
| if (runningContainer == null) { |
| // Container not launched. So nothing needs to be done. |
| LOG.info("Container " + containerId + " not running, nothing to signal."); |
| return; |
| } |
| |
| try { |
| runningContainer.signalContainer(signalEvent.getCommand()); |
| } catch (IOException e) { |
| LOG.warn("Got exception while signaling container " + containerId |
| + " with command " + signalEvent.getCommand()); |
| } |
| break; |
| case PAUSE_CONTAINER: |
| ContainerLaunch launchedContainer = running.get(containerId); |
| if (launchedContainer == null) { |
| // Container not launched. So nothing needs to be done. |
| return; |
| } |
| |
| // Pause the container |
| try { |
| launchedContainer.pauseContainer(); |
| } catch (Exception e) { |
| LOG.info("Got exception while pausing container: " + |
| StringUtils.stringifyException(e)); |
| } |
| break; |
| case RESUME_CONTAINER: |
| ContainerLaunch launchCont = running.get(containerId); |
| if (launchCont == null) { |
| // Container not launched. So nothing needs to be done. |
| return; |
| } |
| |
| // Resume the container. |
| try { |
| launchCont.resumeContainer(); |
| } catch (Exception e) { |
| LOG.info("Got exception while resuming container: " + |
| StringUtils.stringifyException(e)); |
| } |
| break; |
| } |
| } |
| } |