| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.nio.ByteBuffer; |
| import java.security.PrivilegedAction; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import javax.crypto.SecretKey; |
| |
| import org.apache.commons.codec.binary.Base64; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeysPublic; |
| import org.apache.hadoop.io.DataInputByteBuffer; |
| import org.apache.hadoop.io.DataOutputBuffer; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.SecurityInfo; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.yarn.api.ApplicationConstants; |
| import org.apache.hadoop.yarn.api.ContainerManager; |
| import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationState; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.ContainerToken; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.ipc.YarnRPC; |
| import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; |
| import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager; |
| import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo; |
| import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; |
| import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationFinishEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType; |
| |
| public class AMLauncher implements Runnable { |
| |
| private static final Log LOG = LogFactory.getLog(AMLauncher.class); |
| |
| private ContainerManager containerMgrProxy; |
| |
| private final AppContext master; |
| private final Configuration conf; |
| private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); |
| private ApplicationTokenSecretManager applicationTokenSecretManager; |
| private ClientToAMSecretManager clientToAMSecretManager; |
| private AMLauncherEventType event; |
| |
| @SuppressWarnings("rawtypes") |
| private EventHandler handler; |
| |
| @SuppressWarnings("unchecked") |
| public AMLauncher(RMContext asmContext, AppContext master, |
| AMLauncherEventType event,ApplicationTokenSecretManager applicationTokenSecretManager, |
| ClientToAMSecretManager clientToAMSecretManager, Configuration conf) { |
| this.master = master; |
| this.conf = new Configuration(conf); // Just not to touch the sec-info class |
| this.applicationTokenSecretManager = applicationTokenSecretManager; |
| this.clientToAMSecretManager = clientToAMSecretManager; |
| this.conf.setClass( |
| CommonConfigurationKeysPublic.HADOOP_SECURITY_INFO_CLASS_NAME, |
| ContainerManagerSecurityInfo.class, SecurityInfo.class); |
| this.event = event; |
| this.handler = asmContext.getDispatcher().getEventHandler(); |
| } |
| |
| private void connect() throws IOException { |
| ContainerId masterContainerID = master.getMasterContainer().getId(); |
| |
| containerMgrProxy = |
| getContainerMgrProxy(masterContainerID.getAppId()); |
| } |
| |
| private void launch() throws IOException { |
| connect(); |
| ContainerId masterContainerID = master.getMasterContainer().getId(); |
| ApplicationSubmissionContext applicationContext = |
| master.getSubmissionContext(); |
| LOG.info("Setting up container " + master.getMasterContainer() |
| + " for AM " + master.getMaster()); |
| ContainerLaunchContext launchContext = |
| createAMContainerLaunchContext(applicationContext, masterContainerID); |
| StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class); |
| request.setContainerLaunchContext(launchContext); |
| containerMgrProxy.startContainer(request); |
| LOG.info("Done launching container " + master.getMasterContainer() |
| + " for AM " + master.getMaster()); |
| } |
| |
| private void cleanup() throws IOException { |
| connect(); |
| ContainerId containerId = master.getMasterContainer().getId(); |
| StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class); |
| stopRequest.setContainerId(containerId); |
| containerMgrProxy.stopContainer(stopRequest); |
| } |
| |
| private ContainerManager getContainerMgrProxy( |
| final ApplicationId applicationID) throws IOException { |
| |
| Container container = master.getMasterContainer(); |
| |
| final String containerManagerBindAddress = container.getContainerManagerAddress(); |
| |
| final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again. |
| |
| UserGroupInformation currentUser = |
| UserGroupInformation.createRemoteUser("TODO"); // TODO |
| if (UserGroupInformation.isSecurityEnabled()) { |
| ContainerToken containerToken = container.getContainerToken(); |
| Token<ContainerTokenIdentifier> token = |
| new Token<ContainerTokenIdentifier>( |
| containerToken.getIdentifier().array(), |
| containerToken.getPassword().array(), new Text( |
| containerToken.getKind()), new Text( |
| containerToken.getService())); |
| currentUser.addToken(token); |
| } |
| return currentUser.doAs(new PrivilegedAction<ContainerManager>() { |
| @Override |
| public ContainerManager run() { |
| return (ContainerManager) rpc.getProxy(ContainerManager.class, |
| NetUtils.createSocketAddr(containerManagerBindAddress), conf); |
| } |
| }); |
| } |
| |
| private ContainerLaunchContext createAMContainerLaunchContext( |
| ApplicationSubmissionContext applicationMasterContext, |
| ContainerId containerID) throws IOException { |
| |
| // Construct the actual Container |
| ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class); |
| container.addAllCommands(applicationMasterContext.getCommandList()); |
| StringBuilder mergedCommand = new StringBuilder(); |
| String failCount = Integer.toString(master.getFailedCount()); |
| List<String> commandList = new ArrayList<String>(); |
| for (String str : container.getCommandList()) { |
| String result = |
| str.replaceFirst(ApplicationConstants.AM_FAIL_COUNT_STRING, |
| failCount); |
| mergedCommand.append(result).append(" "); |
| commandList.add(result); |
| } |
| container.clearCommands(); |
| container.addAllCommands(commandList); |
| /** add the failed count to the app master command line */ |
| |
| LOG.info("Command to launch container " + |
| containerID + " : " + mergedCommand); |
| container.addAllEnv(applicationMasterContext.getAllEnvironment()); |
| |
| container.addAllEnv(setupTokensInEnv(applicationMasterContext)); |
| |
| // Construct the actual Container |
| container.setContainerId(containerID); |
| container.setUser(applicationMasterContext.getUser()); |
| container.setResource(applicationMasterContext.getMasterCapability()); |
| container.addAllLocalResources(applicationMasterContext.getAllResourcesTodo()); |
| container.setContainerTokens(applicationMasterContext.getFsTokensTodo()); |
| return container; |
| } |
| |
| private Map<String, String> setupTokensInEnv( |
| ApplicationSubmissionContext asc) |
| throws IOException { |
| Map<String, String> env = |
| new HashMap<String, String>(); |
| if (UserGroupInformation.isSecurityEnabled()) { |
| // TODO: Security enabled/disabled info should come from RM. |
| |
| Credentials credentials = new Credentials(); |
| |
| DataInputByteBuffer dibb = new DataInputByteBuffer(); |
| if (asc.getFsTokensTodo() != null) { |
| // TODO: Don't do this kind of checks everywhere. |
| dibb.reset(asc.getFsTokensTodo()); |
| credentials.readTokenStorageStream(dibb); |
| } |
| |
| ApplicationTokenIdentifier id = |
| new ApplicationTokenIdentifier(master.getMasterContainer().getId().getAppId()); |
| Token<ApplicationTokenIdentifier> token = |
| new Token<ApplicationTokenIdentifier>(id, |
| this.applicationTokenSecretManager); |
| String schedulerAddressStr = |
| this.conf.get(YarnConfiguration.SCHEDULER_ADDRESS, |
| YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS); |
| InetSocketAddress unresolvedAddr = |
| NetUtils.createSocketAddr(schedulerAddressStr); |
| String resolvedAddr = |
| unresolvedAddr.getAddress().getHostAddress() + ":" |
| + unresolvedAddr.getPort(); |
| token.setService(new Text(resolvedAddr)); |
| String appMasterTokenEncoded = token.encodeToUrlString(); |
| LOG.debug("Putting appMaster token in env : " + appMasterTokenEncoded); |
| env.put(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME, |
| appMasterTokenEncoded); |
| |
| // Add the RM token |
| credentials.addToken(new Text(resolvedAddr), token); |
| DataOutputBuffer dob = new DataOutputBuffer(); |
| credentials.writeTokenStorageToStream(dob); |
| asc.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); |
| |
| ApplicationTokenIdentifier identifier = |
| new ApplicationTokenIdentifier( |
| this.master.getMaster().getApplicationId()); |
| SecretKey clientSecretKey = |
| this.clientToAMSecretManager.getMasterKey(identifier); |
| String encoded = |
| Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded()); |
| LOG.debug("The encoded client secret-key to be put in env : " + encoded); |
| env.put(ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME, encoded); |
| } |
| return env; |
| } |
| |
| @SuppressWarnings("unchecked") |
| public void run() { |
| switch (event) { |
| case LAUNCH: |
| ApplicationEventType eventType = ApplicationEventType.LAUNCHED; |
| try { |
| LOG.info("Launching master" + master.getMaster()); |
| launch(); |
| } catch(Exception ie) { |
| LOG.info("Error launching ", ie); |
| eventType = ApplicationEventType.LAUNCH_FAILED; |
| } |
| handler.handle(new ASMEvent<ApplicationEventType>(eventType, master)); |
| break; |
| case CLEANUP: |
| try { |
| LOG.info("Cleaning master " + master.getMaster()); |
| cleanup(); |
| } catch(IOException ie) { |
| LOG.info("Error cleaning master ", ie); |
| } |
| handler.handle(new ApplicationFinishEvent(master, |
| ApplicationState.COMPLETED)); // Doesn't matter what state you send :) :( |
| break; |
| default: |
| break; |
| } |
| } |
| |
| public AppContext getApplicationContext() { |
| return master; |
| } |
| } |