| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.slider.server.appmaster; |
| |
| import com.google.common.base.Preconditions; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.slider.common.SliderKeys; |
| import org.apache.slider.common.tools.SliderFileSystem; |
| import org.apache.slider.core.conf.AggregateConf; |
| import org.apache.slider.core.conf.MapOperations; |
| import org.apache.slider.core.launch.ContainerLauncher; |
| import org.apache.slider.providers.ProviderRole; |
| import org.apache.slider.providers.ProviderService; |
| import org.apache.slider.providers.agent.AgentKeys; |
| import org.apache.slider.server.appmaster.actions.ActionStartContainer; |
| import org.apache.slider.server.appmaster.actions.QueueAccess; |
| import org.apache.slider.server.appmaster.state.ContainerAssignment; |
| import org.apache.slider.server.appmaster.state.RoleInstance; |
| import org.apache.slider.server.appmaster.state.RoleStatus; |
| import org.apache.slider.server.services.workflow.WorkflowExecutorService; |
| import org.apache.slider.server.services.workflow.ServiceThreadFactory; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.Map; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| |
| /** |
| * A service for launching containers |
| */ |
| public class RoleLaunchService |
| extends WorkflowExecutorService<ExecutorService> { |
| protected static final Logger log = |
| LoggerFactory.getLogger(RoleLaunchService.class); |
| |
| public static final String ROLE_LAUNCH_SERVICE = "RoleLaunchService"; |
| |
| |
| /** |
| * Queue submission API |
| */ |
| private final QueueAccess actionQueue; |
| |
| /** |
| * Provider building up the command |
| */ |
| private final ProviderService provider; |
| |
| /** |
| * Filesystem to use for the launch |
| */ |
| private final SliderFileSystem fs; |
| |
| /** |
| * Path in the launch filesystem that refers to a configuration directory |
| * -the interpretation of it is left to the Provider |
| */ |
| private final Path generatedConfDirPath; |
| /** |
| * Path in the launch filesystem that refers to a temp directory |
| * which will be cleaned up at (some) time in the future |
| */ |
| private final Path launcherTmpDirPath; |
| |
| private Map<String, String> envVars; |
| |
| /** |
| * Construct an instance of the launcher |
| * @param startOperation the callback to start the opreation |
| * @param actionQueue |
| * @param provider the provider |
| * @param fs filesystem |
| * @param generatedConfDirPath path in the FS for the generated dir |
| * @param envVars environment variables |
| * @param launcherTmpDirPath path for a temporary data in the launch process |
| */ |
| public RoleLaunchService(QueueAccess queueAccess, |
| ProviderService provider, |
| SliderFileSystem fs, |
| Path generatedConfDirPath, |
| Map<String, String> envVars, |
| Path launcherTmpDirPath) { |
| super(ROLE_LAUNCH_SERVICE); |
| this.actionQueue = queueAccess; |
| this.fs = fs; |
| this.generatedConfDirPath = generatedConfDirPath; |
| this.launcherTmpDirPath = launcherTmpDirPath; |
| this.provider = provider; |
| this.envVars = envVars; |
| } |
| |
| @Override |
| public void init(Configuration conf) { |
| super.init(conf); |
| setExecutor(Executors.newCachedThreadPool( |
| new ServiceThreadFactory(ROLE_LAUNCH_SERVICE, true))); |
| } |
| |
| /** |
| * Start an asychronous launch operation |
| * @param container container target |
| * @param role role |
| * @param clusterSpec cluster spec to use for template |
| * @param credentials credentials to use |
| */ |
| public void launchRole(ContainerAssignment assignment, |
| AggregateConf clusterSpec, |
| Credentials credentials) { |
| RoleStatus role = assignment.role; |
| String roleName = role.getName(); |
| // prelaunch safety check |
| Preconditions.checkArgument(provider.isSupportedRole(roleName)); |
| RoleLaunchService.RoleLauncher launcher = |
| new RoleLaunchService.RoleLauncher(assignment, |
| clusterSpec, |
| clusterSpec.getResourceOperations().getOrAddComponent(roleName), |
| clusterSpec.getAppConfOperations().getOrAddComponent(roleName), |
| credentials); |
| execute(launcher); |
| } |
| |
| /** |
| * Thread that runs on the AM to launch a container |
| */ |
| private class RoleLauncher implements Runnable { |
| |
| private final ContainerAssignment assignment; |
| // Allocated container |
| public final Container container; |
| public final String containerRole; |
| private final MapOperations resourceComponent; |
| private final MapOperations appComponent; |
| private final AggregateConf instanceDefinition; |
| public final ProviderRole role; |
| private final Credentials credentials; |
| private Exception raisedException; |
| |
| public RoleLauncher(ContainerAssignment assignment, |
| AggregateConf instanceDefinition, |
| MapOperations resourceComponent, |
| MapOperations appComponent, |
| Credentials credentials) { |
| this.assignment = assignment; |
| this.credentials = credentials; |
| this.container = assignment.container; |
| RoleStatus roleStatus = assignment.role; |
| |
| assert resourceComponent != null; |
| assert appComponent != null; |
| ProviderRole providerRole = roleStatus.getProviderRole(); |
| assert providerRole != null; |
| this.containerRole = providerRole.name; |
| this.role = providerRole; |
| this.resourceComponent = resourceComponent; |
| this.appComponent = appComponent; |
| this.instanceDefinition = instanceDefinition; |
| } |
| |
| public Exception getRaisedException() { |
| return raisedException; |
| } |
| |
| @Override |
| public String toString() { |
| return "RoleLauncher{" + |
| "container=" + container.getId() + |
| ", containerRole='" + containerRole + '\'' + |
| '}'; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| ContainerLauncher containerLauncher = |
| new ContainerLauncher(getConfig(), fs, container, credentials); |
| containerLauncher.setupUGI(); |
| containerLauncher.putEnv(envVars); |
| |
| log.debug("Launching container {} into role {}", |
| container.getId(), |
| containerRole); |
| |
| //now build up the configuration data |
| Path containerTmpDirPath = |
| new Path(launcherTmpDirPath, container.getId().toString()); |
| provider.buildContainerLaunchContext(containerLauncher, |
| instanceDefinition, |
| container, |
| containerRole, |
| fs, |
| generatedConfDirPath, |
| resourceComponent, |
| appComponent, |
| containerTmpDirPath); |
| |
| RoleInstance instance = new RoleInstance(container); |
| String[] envDescription = containerLauncher.dumpEnvToString(); |
| |
| String commandsAsString = containerLauncher.getCommandsAsString(); |
| log.info("Starting container with command: {}", |
| commandsAsString); |
| |
| instance.command = commandsAsString; |
| instance.role = containerRole; |
| instance.roleId = role.id; |
| instance.appVersion = instanceDefinition.getAppConfOperations() |
| .getGlobalOptions().get(SliderKeys.APP_VERSION); |
| instance.environment = envDescription; |
| int delay = appComponent.getOptionInt( |
| AgentKeys.KEY_CONTAINER_LAUNCH_DELAY, 0); |
| int maxDelay = |
| getConfig().getInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, |
| YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS); |
| if (delay > maxDelay/1000) { |
| log.warn("Container launch delay of {} exceeds the maximum allowed of" |
| + " {} seconds. Delay will not be utilized.", |
| delay, maxDelay/1000); |
| delay = 0; |
| } |
| log.info("Container launch delay for {} set to {} seconds", |
| role.name, delay); |
| actionQueue.schedule(new ActionStartContainer("starting " + containerRole, |
| container, |
| containerLauncher.completeContainerLaunch(), |
| instance, |
| delay, |
| TimeUnit.SECONDS)); |
| } catch (Exception e) { |
| log.error("Exception thrown while trying to start {}: {}", |
| containerRole, e, e); |
| raisedException = e; |
| } |
| } |
| |
| } |
| } |