blob: e080ae79a96b5c6bd0af62045f06978fa6cfa755 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.testutils.logging.TestLoggerResource;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import static org.apache.flink.yarn.util.TestUtils.getTestJarPath;
/**
* This test starts a MiniYARNCluster with a FIFO scheduler. There are no queues for that scheduler.
*/
public class YARNSessionFIFOITCase extends YarnTestBase {
private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class);
@Rule
public final TestLoggerResource yarTestLoggerResource =
new TestLoggerResource(YarnClusterDescriptor.class, Level.WARN);
/*
Override init with FIFO scheduler.
*/
@BeforeClass
public static void setup() {
YARN_CONFIGURATION.setClass(
YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
YARN_CONFIGURATION.setInt(YarnConfiguration.NM_PMEM_MB, 768);
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-fifo");
startYARNWithConfig(YARN_CONFIGURATION);
}
@After
public void checkForProhibitedLogContents() {
ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
}
@Test(timeout = 60000)
public void testDetachedMode() throws Exception {
runTest(() -> runDetachedModeTest(Collections.emptyMap()));
}
/** Test regular operation, including command line parameter parsing. */
ApplicationId runDetachedModeTest(Map<String, String> securityProperties) throws Exception {
LOG.info("Starting testDetachedMode()");
File exampleJarLocation = getTestJarPath("StreamingWordCount.jar");
// get temporary file for reading input data for wordcount example
File tmpInFile = tmp.newFile();
FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT);
ArrayList<String> args = new ArrayList<>();
args.add("-j");
args.add(flinkUberjar.getAbsolutePath());
args.add("-t");
args.add(flinkLibFolder.getAbsolutePath());
args.add("-jm");
args.add("768m");
args.add("-tm");
args.add("1024m");
if (securityProperties != null) {
for (Map.Entry<String, String> property : securityProperties.entrySet()) {
args.add("-D" + property.getKey() + "=" + property.getValue());
}
}
args.add("--name");
args.add("MyCustomName");
args.add("--applicationType");
args.add("Apache Flink 1.x");
args.add("--detached");
Runner clusterRunner =
startWithArgs(
args.toArray(new String[args.size()]),
"JobManager Web Interface:",
RunTypes.YARN_SESSION);
// before checking any strings outputted by the CLI, first give it time to
// return
clusterRunner.join();
// actually run a program, otherwise we wouldn't necessarily see any
// TaskManagers
// be brought up
Runner jobRunner =
startWithArgs(
new String[] {
"run",
"--detached",
exampleJarLocation.getAbsolutePath(),
"--input",
tmpInFile.getAbsoluteFile().toString()
},
"Job has been submitted with JobID",
RunTypes.CLI_FRONTEND);
jobRunner.join();
final Duration timeout = Duration.ofMinutes(1);
final long testConditionIntervalInMillis = 500;
// in "new" mode we can only wait after the job is submitted, because TMs
// are spun up lazily
// wait until two containers are running
LOG.info("Waiting until two containers are running");
CommonTestUtils.waitUntilCondition(
() -> getRunningContainers() >= 2,
Deadline.fromNow(timeout),
testConditionIntervalInMillis,
"We didn't reach the state of two containers running (instead: "
+ getRunningContainers()
+ ")");
LOG.info("Waiting until the job reaches FINISHED state");
final ApplicationId applicationId = getOnlyApplicationReport().getApplicationId();
CommonTestUtils.waitUntilCondition(
() ->
verifyStringsInNamedLogFiles(
new String[] {"switched from state RUNNING to FINISHED"},
applicationId,
"jobmanager.log"),
Deadline.fromNow(timeout),
testConditionIntervalInMillis,
"The deployed job didn't finish on time reaching the timeout of "
+ timeout
+ " seconds. The application will be cancelled forcefully.");
// kill application "externally".
try {
YarnClient yc = YarnClient.createYarnClient();
yc.init(YARN_CONFIGURATION);
yc.start();
List<ApplicationReport> apps =
getApplicationReportWithRetryOnNPE(
yc, EnumSet.of(YarnApplicationState.RUNNING));
Assert.assertEquals(1, apps.size()); // Only one running
ApplicationReport app = apps.get(0);
Assert.assertEquals("MyCustomName", app.getName());
Assert.assertEquals("Apache Flink 1.x", app.getApplicationType());
ApplicationId id = app.getApplicationId();
yc.killApplication(id);
CommonTestUtils.waitUntilCondition(
() ->
!getApplicationReportWithRetryOnNPE(
yc,
EnumSet.of(
YarnApplicationState.KILLED,
YarnApplicationState.FINISHED))
.isEmpty(),
Deadline.fromNow(timeout),
testConditionIntervalInMillis);
} catch (Throwable t) {
LOG.warn("Killing failed", t);
Assert.fail();
} finally {
// cleanup the yarn-properties file
String confDirPath = System.getenv("FLINK_CONF_DIR");
File configDirectory = new File(confDirPath);
LOG.info(
"testDetachedPerJobYarnClusterInternal: Using configuration directory "
+ configDirectory.getAbsolutePath());
// load the configuration
LOG.info("testDetachedPerJobYarnClusterInternal: Trying to load configuration file");
Configuration configuration =
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
try {
File yarnPropertiesFile =
FlinkYarnSessionCli.getYarnPropertiesLocation(
configuration.getString(
YarnConfigOptions.PROPERTIES_FILE_LOCATION));
if (yarnPropertiesFile.exists()) {
LOG.info(
"testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}",
yarnPropertiesFile.getAbsolutePath());
yarnPropertiesFile.delete();
}
} catch (Exception e) {
LOG.warn(
"testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file",
e);
}
}
LOG.info("Finished testDetachedMode()");
return applicationId;
}
/**
* Test querying the YARN cluster.
*
* <p>This test validates through 666*2 cores in the "cluster".
*/
@Test
public void testQueryCluster() throws Exception {
runTest(
() -> {
LOG.info("Starting testQueryCluster()");
runWithArgs(
new String[] {"-q"},
"Summary: totalMemory 8192 totalCores 1332",
null,
RunTypes.YARN_SESSION,
0); // we have 666*2 cores.
LOG.info("Finished testQueryCluster()");
});
}
}