blob: a61e19cb2aadc17b20cb7e7075960d0b28b32fd9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.yarn.driver.unmanaged;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Test REEF Driver in Unmanaged AM mode on YARN.
*/
public final class UnmanagedAmTest implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
private static final Logger LOG = Logger.getLogger(UnmanagedAmTest.class.getName());
@Test
public void testAmShutdown() throws IOException, YarnException {
Assume.assumeTrue(
"This test requires a YARN Resource Manager to connect to",
Boolean.parseBoolean(System.getenv("REEF_TEST_YARN")));
final YarnConfiguration yarnConfig = new YarnConfiguration();
// Start YARN client and register the application
final YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfig);
yarnClient.start();
final ContainerLaunchContext containerContext = Records.newRecord(ContainerLaunchContext.class);
containerContext.setCommands(Collections.<String>emptyList());
containerContext.setLocalResources(Collections.<String, LocalResource>emptyMap());
containerContext.setEnvironment(Collections.<String, String>emptyMap());
containerContext.setTokens(getTokens());
final ApplicationSubmissionContext appContext = yarnClient.createApplication().getApplicationSubmissionContext();
appContext.setApplicationName("REEF_Unmanaged_AM_Test");
appContext.setAMContainerSpec(containerContext);
appContext.setUnmanagedAM(true);
appContext.setQueue("default");
final ApplicationId applicationId = appContext.getApplicationId();
LOG.log(Level.INFO, "Registered YARN application: {0}", applicationId);
yarnClient.submitApplication(appContext);
LOG.log(Level.INFO, "YARN application submitted: {0}", applicationId);
addToken(yarnClient.getAMRMToken(applicationId));
// Start the AM
final AMRMClientAsync<AMRMClient.ContainerRequest> rmClient = AMRMClientAsync.createAMRMClientAsync(1000, this);
rmClient.init(yarnConfig);
rmClient.start();
final NMClientAsync nmClient = new NMClientAsyncImpl(this);
nmClient.init(yarnConfig);
nmClient.start();
final RegisterApplicationMasterResponse registration =
rmClient.registerApplicationMaster(NetUtils.getHostname(), -1, null);
LOG.log(Level.INFO, "Unmanaged AM is running: {0}", registration);
rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "Success!", null);
LOG.log(Level.INFO, "Unregistering AM: state {0}", rmClient.getServiceState());
// Shutdown the AM
rmClient.stop();
nmClient.stop();
// Get the final application report
final ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
final YarnApplicationState appState = appReport.getYarnApplicationState();
final FinalApplicationStatus finalAttemptStatus = appReport.getFinalApplicationStatus();
LOG.log(Level.INFO, "Application {0} final attempt {1} status: {2}/{3}", new Object[] {
applicationId, appReport.getCurrentApplicationAttemptId(), appState, finalAttemptStatus});
Assert.assertEquals("Application must be in FINISHED state", YarnApplicationState.FINISHED, appState);
Assert.assertEquals("Final status must be SUCCEEDED", FinalApplicationStatus.SUCCEEDED, finalAttemptStatus);
// Shutdown YARN client
yarnClient.stop();
}
private static ByteBuffer getTokens() throws IOException {
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
final Credentials credentials = ugi.getCredentials();
try (DataOutputBuffer dob = new DataOutputBuffer()) {
credentials.writeTokenStorageToStream(dob);
return ByteBuffer.wrap(dob.getData());
}
}
private static void addToken(final Token<AMRMTokenIdentifier> token) throws IOException {
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
ugi.addToken(token);
}
@Override
public void onShutdownRequest() {
LOG.log(Level.INFO, "Shutdown requested by YARN");
}
// Methods below are dummy implementations of the required callbacks.
@Override
public void onContainersCompleted(final List<ContainerStatus> list) {
throw new RuntimeException("This method should never be invoked");
}
@Override
public void onContainersAllocated(final List<Container> list) {
throw new RuntimeException("This method should never be invoked");
}
@Override
public void onNodesUpdated(final List<NodeReport> list) {
throw new RuntimeException("This method should never be invoked");
}
@Override
public float getProgress() {
return 0;
}
@Override
public void onError(final Throwable throwable) {
throw new RuntimeException("This method should never be invoked", throwable);
}
@Override
public void onContainerStarted(final ContainerId containerId, final Map<String, ByteBuffer> map) {
throw new RuntimeException("This method should never be invoked");
}
@Override
public void onContainerStatusReceived(final ContainerId containerId, final ContainerStatus containerStatus) {
throw new RuntimeException("This method should never be invoked");
}
@Override
public void onContainerStopped(final ContainerId containerId) {
throw new RuntimeException("This method should never be invoked");
}
@Override
public void onStartContainerError(final ContainerId containerId, final Throwable throwable) {
throw new RuntimeException("This method should never be invoked");
}
@Override
public void onGetContainerStatusError(final ContainerId containerId, final Throwable throwable) {
throw new RuntimeException("This method should never be invoked");
}
@Override
public void onStopContainerError(final ContainerId containerId, final Throwable throwable) {
throw new RuntimeException("This method should never be invoked");
}
}