blob: e5ae45dcff1a0b6e50fff07ec3be44477062d9e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfo;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoEntry;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.testutils.logging.TestLoggerResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.util.TestUtils;
import org.apache.flink.shaded.guava18.com.google.common.net.HostAndPort;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static junit.framework.TestCase.assertTrue;
import static org.apache.flink.util.Preconditions.checkState;
import static org.apache.flink.yarn.util.TestUtils.getTestJarPath;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/**
* This test starts a MiniYARNCluster with a CapacityScheduler. Is has, by default a queue called
* "default". The configuration here adds another queue: "qa-team".
*/
public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class);
/** RestClient to query Flink cluster. */
private static RestClient restClient;
/**
* ExecutorService for {@link RestClient}.
*
* @see #restClient
*/
private static ExecutorService restClientExecutor;
/** Toggles checking for prohibited strings in logs after the test has run. */
private boolean checkForProhibitedLogContents = true;
@Rule
public final TestLoggerResource cliTestLoggerResource =
new TestLoggerResource(CliFrontend.class, Level.INFO);
@Rule
public final TestLoggerResource yarTestLoggerResource =
new TestLoggerResource(YarnClusterDescriptor.class, Level.WARN);
@BeforeClass
public static void setup() throws Exception {
YARN_CONFIGURATION.setClass(
YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
YARN_CONFIGURATION.set("yarn.scheduler.capacity.root.queues", "default,qa-team");
YARN_CONFIGURATION.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
YARN_CONFIGURATION.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
YARN_CONFIGURATION.set(
YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-capacityscheduler");
startYARNWithConfig(YARN_CONFIGURATION);
restClientExecutor = Executors.newSingleThreadExecutor();
restClient =
new RestClient(
RestClientConfiguration.fromConfiguration(new Configuration()),
restClientExecutor);
}
@AfterClass
public static void tearDown() throws Exception {
try {
YarnTestBase.teardown();
} finally {
if (restClient != null) {
restClient.shutdown(Time.seconds(5));
}
if (restClientExecutor != null) {
restClientExecutor.shutdownNow();
}
}
}
/**
* Tests that a session cluster, that uses the resources from the <i>qa-team</i> queue, can be
* started from the command line.
*/
@Test
public void testStartYarnSessionClusterInQaTeamQueue() throws Exception {
runTest(
() ->
runWithArgs(
new String[] {
"-j",
flinkUberjar.getAbsolutePath(),
"-t",
flinkLibFolder.getAbsolutePath(),
"-jm",
"768m",
"-tm",
"1024m",
"-qu",
"qa-team"
},
"JobManager Web Interface:",
null,
RunTypes.YARN_SESSION,
0));
}
/**
* Test per-job yarn cluster
*
* <p>This also tests the prefixed CliFrontend options for the YARN case We also test if the
* requested parallelism of 2 is passed through. The parallelism is requested at the YARN client
* (-ys).
*/
@Test
public void perJobYarnCluster() throws Exception {
runTest(
() -> {
LOG.info("Starting perJobYarnCluster()");
File exampleJarLocation = getTestJarPath("BatchWordCount.jar");
runWithArgs(
new String[] {
"run",
"-m",
"yarn-cluster",
"-yj",
flinkUberjar.getAbsolutePath(),
"-yt",
flinkLibFolder.getAbsolutePath(),
"-ys",
"2", // test that the job is executed with a DOP of 2
"-yjm",
"768m",
"-ytm",
"1024m",
exampleJarLocation.getAbsolutePath()
},
/* test succeeded after this string */
"Program execution finished",
/* prohibited strings: (to verify the parallelism) */
// (we should see "DataSink (...) (1/2)" and "DataSink (...) (2/2)"
// instead)
new String[] {"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"},
RunTypes.CLI_FRONTEND,
0,
cliTestLoggerResource::getMessages);
LOG.info("Finished perJobYarnCluster()");
});
}
/**
* Test per-job yarn cluster and memory calculations for off-heap use (see FLINK-7400) with the
* same job as {@link #perJobYarnCluster()}.
*
* <p>This ensures that with (any) pre-allocated off-heap memory by us, there is some off-heap
* memory remaining for Flink's libraries. Creating task managers will thus fail if no off-heap
* memory remains.
*/
@Test
public void perJobYarnClusterOffHeap() throws Exception {
runTest(
() -> {
LOG.info("Starting perJobYarnCluster()");
File exampleJarLocation = getTestJarPath("BatchWordCount.jar");
// set memory constraints (otherwise this is the same test as
// perJobYarnCluster() above)
final long taskManagerMemoryMB = 1024;
runWithArgs(
new String[] {
"run",
"-m",
"yarn-cluster",
"-yj",
flinkUberjar.getAbsolutePath(),
"-yt",
flinkLibFolder.getAbsolutePath(),
"-ys",
"2", // test that the job is executed with a DOP of 2
"-yjm",
"768m",
"-ytm",
taskManagerMemoryMB + "m",
exampleJarLocation.getAbsolutePath()
},
/* test succeeded after this string */
"Program execution finished",
/* prohibited strings: (to verify the parallelism) */
// (we should see "DataSink (...) (1/2)" and "DataSink (...) (2/2)"
// instead)
new String[] {"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"},
RunTypes.CLI_FRONTEND,
0,
cliTestLoggerResource::getMessages);
LOG.info("Finished perJobYarnCluster()");
});
}
/**
* Starts a session cluster on YARN, and submits a streaming job.
*
* <p>Tests
*
* <ul>
* <li>if a custom YARN application name can be set from the command line,
* <li>if the number of TaskManager slots can be set from the command line,
* <li>if dynamic properties from the command line are set,
* <li>if the vcores are set correctly (FLINK-2213),
* <li>if jobmanager hostname/port are shown in web interface (FLINK-1902)
* </ul>
*
* <p><b>Hint: </b> If you think it is a good idea to add more assertions to this test, think
* again!
*/
@Test
public void
testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots()
throws Exception {
runTest(
() -> {
checkForProhibitedLogContents = false;
final Runner yarnSessionClusterRunner =
startWithArgs(
new String[] {
"-j",
flinkUberjar.getAbsolutePath(),
"-t",
flinkLibFolder.getAbsolutePath(),
"-jm",
"768m",
"-tm",
"1024m",
"-s",
"3", // set the slots 3 to check if the vCores are set
// properly!
"-nm",
"customName",
"-Dfancy-configuration-value=veryFancy",
"-D" + YarnConfigOptions.VCORES.key() + "=2"
},
"JobManager Web Interface:",
RunTypes.YARN_SESSION);
try {
final String logs = outContent.toString();
final HostAndPort hostAndPort = parseJobManagerHostname(logs);
final String host = hostAndPort.getHostText();
final int port = hostAndPort.getPort();
LOG.info("Extracted hostname:port: {}:{}", host, port);
submitJob("WindowJoin.jar");
//
// Assert that custom YARN application name "customName" is set
//
final ApplicationReport applicationReport = getOnlyApplicationReport();
assertEquals("customName", applicationReport.getName());
//
// Assert the number of TaskManager slots are set
//
waitForTaskManagerRegistration(host, port, Duration.ofMillis(30_000));
assertNumberOfSlotsPerTask(host, port, 3);
final Map<String, String> flinkConfig = getFlinkConfig(host, port);
//
// Assert dynamic properties
//
assertThat(flinkConfig, hasEntry("fancy-configuration-value", "veryFancy"));
//
// FLINK-2213: assert that vcores are set
//
assertThat(flinkConfig, hasEntry(YarnConfigOptions.VCORES.key(), "2"));
//
// FLINK-1902: check if jobmanager hostname is shown in web interface
//
assertThat(flinkConfig, hasEntry(JobManagerOptions.ADDRESS.key(), host));
} finally {
yarnSessionClusterRunner.sendStop();
yarnSessionClusterRunner.join();
}
});
}
private static HostAndPort parseJobManagerHostname(final String logs) {
final Pattern p =
Pattern.compile("JobManager Web Interface: http://([a-zA-Z0-9.-]+):([0-9]+)");
final Matcher matches = p.matcher(logs);
String hostname = null;
String port = null;
while (matches.find()) {
hostname = matches.group(1).toLowerCase();
port = matches.group(2);
}
checkState(hostname != null, "hostname not found in log");
checkState(port != null, "port not found in log");
return HostAndPort.fromParts(hostname, Integer.parseInt(port));
}
private void submitJob(final String jobFileName) throws IOException, InterruptedException {
Runner jobRunner =
startWithArgs(
new String[] {
"run", "--detached", getTestJarPath(jobFileName).getAbsolutePath()
},
"Job has been submitted with JobID",
RunTypes.CLI_FRONTEND);
jobRunner.join();
}
private static void waitForTaskManagerRegistration(
final String host, final int port, final Duration waitDuration) throws Exception {
CommonTestUtils.waitUntilCondition(
() -> getNumberOfTaskManagers(host, port) > 0, Deadline.fromNow(waitDuration));
}
private static void assertNumberOfSlotsPerTask(
final String host, final int port, final int slotsNumber) throws Exception {
try {
CommonTestUtils.waitUntilCondition(
() -> getNumberOfSlotsPerTaskManager(host, port) == slotsNumber,
Deadline.fromNow(Duration.ofSeconds(30)));
} catch (final TimeoutException e) {
final int currentNumberOfSlots = getNumberOfSlotsPerTaskManager(host, port);
fail(
String.format(
"Expected slots per TM to be %d, was: %d",
slotsNumber, currentNumberOfSlots));
}
}
private static int getNumberOfTaskManagers(final String host, final int port) throws Exception {
final ClusterOverviewWithVersion clusterOverviewWithVersion =
restClient
.sendRequest(host, port, ClusterOverviewHeaders.getInstance())
.get(30_000, TimeUnit.MILLISECONDS);
return clusterOverviewWithVersion.getNumTaskManagersConnected();
}
private static int getNumberOfSlotsPerTaskManager(final String host, final int port)
throws Exception {
final TaskManagersInfo taskManagersInfo =
restClient.sendRequest(host, port, TaskManagersHeaders.getInstance()).get();
return taskManagersInfo.getTaskManagerInfos().stream()
.map(TaskManagerInfo::getNumberSlots)
.findFirst()
.orElse(0);
}
private static Map<String, String> getFlinkConfig(final String host, final int port)
throws Exception {
final ClusterConfigurationInfo clusterConfigurationInfoEntries =
restClient
.sendRequest(host, port, ClusterConfigurationInfoHeaders.getInstance())
.get();
return clusterConfigurationInfoEntries.stream()
.collect(
Collectors.toMap(
ClusterConfigurationInfoEntry::getKey,
ClusterConfigurationInfoEntry::getValue));
}
/**
* Test deployment to non-existing queue & ensure that the system logs a WARN message for the
* user. (Users had unexpected behavior of Flink on YARN because they mistyped the target queue.
* With an error message, we can help users identifying the issue)
*/
@Test
public void testNonexistingQueueWARNmessage() throws Exception {
runTest(
() -> {
LOG.info("Starting testNonexistingQueueWARNmessage()");
try {
runWithArgs(
new String[] {
"-j",
flinkUberjar.getAbsolutePath(),
"-t",
flinkLibFolder.getAbsolutePath(),
"-jm",
"768m",
"-tm",
"1024m",
"-qu",
"doesntExist"
},
"to unknown queue: doesntExist",
null,
RunTypes.YARN_SESSION,
1);
} catch (Exception e) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
e, "to unknown queue: doesntExist")
.isPresent());
}
assertThat(
yarTestLoggerResource.getMessages(),
hasItem(
containsString(
"The specified queue 'doesntExist' does not exist. Available queues")));
LOG.info("Finished testNonexistingQueueWARNmessage()");
});
}
/**
* Test per-job yarn cluster with the parallelism set at the CliFrontend instead of the YARN
* client.
*/
@Test
public void perJobYarnClusterWithParallelism() throws Exception {
runTest(
() -> {
LOG.info("Starting perJobYarnClusterWithParallelism()");
File exampleJarLocation = getTestJarPath("BatchWordCount.jar");
runWithArgs(
new String[] {
"run",
"-p",
"2", // test that the job is executed with a DOP of 2
"-m",
"yarn-cluster",
"-yj",
flinkUberjar.getAbsolutePath(),
"-yt",
flinkLibFolder.getAbsolutePath(),
"-ys",
"2",
"-yjm",
"768m",
"-ytm",
"1024m",
exampleJarLocation.getAbsolutePath()
},
/* test succeeded after this string */
"Program execution finished",
/* prohibited strings: (we want to see "DataSink (...) (2/2) switched to FINISHED") */
new String[] {"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"},
RunTypes.CLI_FRONTEND,
0,
cliTestLoggerResource::getMessages);
LOG.info("Finished perJobYarnClusterWithParallelism()");
});
}
/** Test a fire-and-forget job submission to a YARN cluster. */
@Test(timeout = 60000)
public void testDetachedPerJobYarnCluster() throws Exception {
runTest(
() -> {
LOG.info("Starting testDetachedPerJobYarnCluster()");
File exampleJarLocation = getTestJarPath("BatchWordCount.jar");
testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
LOG.info("Finished testDetachedPerJobYarnCluster()");
});
}
/** Test a fire-and-forget job submission to a YARN cluster. */
@Test(timeout = 60000)
public void testDetachedPerJobYarnClusterWithStreamingJob() throws Exception {
runTest(
() -> {
LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
File exampleJarLocation = getTestJarPath("StreamingWordCount.jar");
testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()");
});
}
private void testDetachedPerJobYarnClusterInternal(String job) throws Exception {
YarnClient yc = YarnClient.createYarnClient();
yc.init(YARN_CONFIGURATION);
yc.start();
// get temporary folder for writing output of wordcount example
File tmpOutFolder = null;
try {
tmpOutFolder = tmp.newFolder();
} catch (IOException e) {
throw new RuntimeException(e);
}
// get temporary file for reading input data for wordcount example
File tmpInFile;
try {
tmpInFile = tmp.newFile();
FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT);
} catch (IOException e) {
throw new RuntimeException(e);
}
Runner runner =
startWithArgs(
new String[] {
"run",
"-m",
"yarn-cluster",
"-yj",
flinkUberjar.getAbsolutePath(),
"-yt",
flinkLibFolder.getAbsolutePath(),
"-yjm",
"768m",
"-yD",
YarnConfigOptions.APPLICATION_TAGS.key() + "=test-tag",
"-ytm",
"1024m",
"-ys",
"2", // test requesting slots from YARN.
"-p",
"2",
"--detached",
job,
"--input",
tmpInFile.getAbsoluteFile().toString(),
"--output",
tmpOutFolder.getAbsoluteFile().toString()
},
"Job has been submitted with JobID",
RunTypes.CLI_FRONTEND);
// it should usually be 2, but on slow machines, the number varies
Assert.assertTrue(
"There should be at most 2 containers running", getRunningContainers() <= 2);
// give the runner some time to detach
for (int attempt = 0; runner.isAlive() && attempt < 5; attempt++) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
Assert.assertFalse("The runner should detach.", runner.isAlive());
LOG.info("CLI Frontend has returned, so the job is running");
// find out the application id and wait until it has finished.
try {
List<ApplicationReport> apps =
getApplicationReportWithRetryOnNPE(
yc, EnumSet.of(YarnApplicationState.RUNNING));
ApplicationId tmpAppId;
if (apps.size() == 1) {
// Better method to find the right appId. But sometimes the app is shutting down
// very fast
// Only one running
tmpAppId = apps.get(0).getApplicationId();
LOG.info("waiting for the job with appId {} to finish", tmpAppId);
// wait until the app has finished
while (getApplicationReportWithRetryOnNPE(
yc, EnumSet.of(YarnApplicationState.RUNNING))
.size()
> 0) {
sleep(500);
}
} else {
// get appId by finding the latest finished appid
apps = getApplicationReportWithRetryOnNPE(yc);
Collections.sort(
apps,
new Comparator<ApplicationReport>() {
@Override
public int compare(ApplicationReport o1, ApplicationReport o2) {
return o1.getApplicationId().compareTo(o2.getApplicationId()) * -1;
}
});
tmpAppId = apps.get(0).getApplicationId();
LOG.info(
"Selected {} as the last appId from {}",
tmpAppId,
Arrays.toString(apps.toArray()));
}
final ApplicationId id = tmpAppId;
// now it has finished.
// check the output files.
File[] listOfOutputFiles = tmpOutFolder.listFiles();
Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles);
LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder);
// read all output files in output folder to one output string
String content = "";
for (File f : listOfOutputFiles) {
if (f.isFile()) {
content += FileUtils.readFileToString(f) + "\n";
}
}
// String content = FileUtils.readFileToString(taskmanagerOut);
// check for some of the wordcount outputs.
Assert.assertTrue(
"Expected string 'da 5' or '(all,2)' not found in string '" + content + "'",
content.contains("da 5")
|| content.contains("(da,5)")
|| content.contains("(all,2)"));
Assert.assertTrue(
"Expected string 'der 29' or '(mind,1)' not found in string'" + content + "'",
content.contains("der 29")
|| content.contains("(der,29)")
|| content.contains("(mind,1)"));
// check if the heap size for the TaskManager was set correctly
File jobmanagerLog =
TestUtils.findFile(
"..",
new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.contains("jobmanager.log")
&& dir.getAbsolutePath().contains(id.toString());
}
});
Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
content = FileUtils.readFileToString(jobmanagerLog);
String expected = "Starting TaskManagers";
Assert.assertTrue(
"Expected string '"
+ expected
+ "' not found in JobManager log: '"
+ jobmanagerLog
+ "'",
content.contains(expected));
expected = " (2/2) (attempt #0) with attempt id ";
Assert.assertTrue(
"Expected string '"
+ expected
+ "' not found in JobManager log."
+ "This string checks that the job has been started with a parallelism of 2. Log contents: '"
+ jobmanagerLog
+ "'",
content.contains(expected));
// make sure the detached app is really finished.
LOG.info("Checking again that app has finished");
ApplicationReport rep;
do {
sleep(500);
rep = yc.getApplicationReport(id);
LOG.info("Got report {}", rep);
} while (rep.getYarnApplicationState() == YarnApplicationState.RUNNING);
verifyApplicationTags(rep);
} finally {
// cleanup the yarn-properties file
String confDirPath = System.getenv("FLINK_CONF_DIR");
File configDirectory = new File(confDirPath);
LOG.info(
"testDetachedPerJobYarnClusterInternal: Using configuration directory "
+ configDirectory.getAbsolutePath());
// load the configuration
LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
Configuration configuration =
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
try {
File yarnPropertiesFile =
FlinkYarnSessionCli.getYarnPropertiesLocation(
configuration.getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
if (yarnPropertiesFile.exists()) {
LOG.info(
"testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}",
yarnPropertiesFile.getAbsolutePath());
yarnPropertiesFile.delete();
}
} catch (Exception e) {
LOG.warn(
"testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file",
e);
}
try {
LOG.info("testDetachedPerJobYarnClusterInternal: Closing the yarn client");
yc.stop();
} catch (Exception e) {
LOG.warn(
"testDetachedPerJobYarnClusterInternal: Exception while close the yarn client",
e);
}
}
}
/**
* Ensures that the YARN application tags were set properly.
*
* <p>Since YARN application tags were only added in Hadoop 2.4, but Flink still supports Hadoop
* 2.3, reflection is required to invoke the methods. If the method does not exist, this test
* passes.
*/
private void verifyApplicationTags(final ApplicationReport report)
throws InvocationTargetException, IllegalAccessException {
final Method applicationTagsMethod;
Class<ApplicationReport> clazz = ApplicationReport.class;
try {
// this method is only supported by Hadoop 2.4.0 onwards
applicationTagsMethod = clazz.getMethod("getApplicationTags");
} catch (NoSuchMethodException e) {
// only verify the tags if the method exists
return;
}
@SuppressWarnings("unchecked")
Set<String> applicationTags = (Set<String>) applicationTagsMethod.invoke(report);
assertEquals(Collections.singleton("test-tag"), applicationTags);
}
@After
public void checkForProhibitedLogContents() {
if (checkForProhibitedLogContents) {
ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
}
}
}