blob: a0a517c9eaea67cd08c9cab6c71ea8c8524c0e03 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn;
import org.apache.commons.io.FileUtils;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.FlinkYarnSessionCli;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.ConcurrentMap;
/**
* This base class allows to use the MiniYARNCluster.
* The cluster is re-used for all tests.
*
* This class is located in a different package which is build after flink-dist. This way,
* we can use the YARN uberjar of flink to start a Flink YARN session.
*
* The test is not thread-safe. Parallel execution of tests is not possible!
*/
public abstract class YarnTestBase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(YarnTestBase.class);
protected final static PrintStream originalStdout = System.out;
protected final static PrintStream originalStderr = System.err;
protected static String TEST_CLUSTER_NAME_KEY = "flink-yarn-minicluster-name";
protected final static int NUM_NODEMANAGERS = 2;
/** The tests are scanning for these strings in the final output. */
protected final static String[] PROHIBITED_STRINGS = {
"Exception", // we don't want any exceptions to happen
"Started SelectChannelConnector@0.0.0.0:8081" // Jetty should start on a random port in YARN mode.
};
/** These strings are white-listed, overriding teh prohibited strings */
protected final static String[] WHITELISTED_STRINGS = {
"akka.remote.RemoteTransportExceptionNoStackTrace",
// workaround for annoying InterruptedException logging:
// https://issues.apache.org/jira/browse/YARN-1022
"java.lang.InterruptedException"
};
// Temp directory which is deleted after the unit test.
@ClassRule
public static TemporaryFolder tmp = new TemporaryFolder();
protected static MiniYARNCluster yarnCluster = null;
/**
* Uberjar (fat jar) file of Flink
*/
protected static File flinkUberjar;
protected static final Configuration yarnConfiguration;
/**
* lib/ folder of the flink distribution.
*/
protected static File flinkLibFolder;
static {
yarnConfiguration = new YarnConfiguration();
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 4096); // 4096 is the available memory anyways
yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 4);
yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster.
// so we have to change the number of cores for testing.
yarnConfiguration.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN).
}
/**
* Sleep a bit between the tests (we are re-using the YARN cluster for the tests)
*/
@After
public void sleep() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Assert.fail("Should not happen");
}
}
private YarnClient yarnClient = null;
@Before
public void checkClusterEmpty() throws IOException, YarnException {
if(yarnClient == null) {
yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfiguration);
yarnClient.start();
}
List<ApplicationReport> apps = yarnClient.getApplications();
for(ApplicationReport app : apps) {
if(app.getYarnApplicationState() != YarnApplicationState.FINISHED
&& app.getYarnApplicationState() != YarnApplicationState.KILLED
&& app.getYarnApplicationState() != YarnApplicationState.FAILED) {
Assert.fail("There is at least one application on the cluster is not finished." +
"App "+app.getApplicationId()+" is in state "+app.getYarnApplicationState());
}
}
}
/**
* Locate a file or directory
*/
public static File findFile(String startAt, FilenameFilter fnf) {
File root = new File(startAt);
String[] files = root.list();
if(files == null) {
return null;
}
for(String file : files) {
File f = new File(startAt + File.separator + file);
if(f.isDirectory()) {
File r = findFile(f.getAbsolutePath(), fnf);
if(r != null) {
return r;
}
} else if (fnf.accept(f.getParentFile(), f.getName())) {
return f;
}
}
return null;
}
/**
* Filter to find root dir of the flink-yarn dist.
*/
public static class RootDirFilenameFilter implements FilenameFilter {
@Override
public boolean accept(File dir, String name) {
return name.startsWith("flink-dist") && name.endsWith(".jar") && dir.toString().contains("/lib");
}
}
public static class ContainsName implements FilenameFilter {
private String[] names;
private String excludeInPath = null;
/**
* @param names which have to be included in the filename.
*/
public ContainsName(String[] names) {
this.names = names;
}
public ContainsName(String[] names, String excludeInPath) {
this.names = names;
this.excludeInPath = excludeInPath;
}
@Override
public boolean accept(File dir, String name) {
if(excludeInPath == null) {
for(String n: names) {
if(!name.contains(n)) {
return false;
}
}
return true;
} else {
for(String n: names) {
if(!name.contains(n)) {
return false;
}
}
return !dir.toString().contains(excludeInPath);
}
}
}
public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOException {
tmp.create();
File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml");
FileWriter writer = new FileWriter(yarnSiteXML);
yarnConf.writeXml(writer);
writer.flush();
writer.close();
return yarnSiteXML;
}
/**
* This method checks the written TaskManager and JobManager log files
* for exceptions.
*
* WARN: Please make sure the tool doesn't find old logfiles from previous test runs.
* So always run "mvn clean" before running the tests here.
*
*/
public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited, final String[] whitelisted) {
File cwd = new File("target/"+yarnConfiguration.get(TEST_CLUSTER_NAME_KEY));
Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to exist", cwd.exists());
Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to be a directory", cwd.isDirectory());
File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
// scan each file for prohibited strings.
File f = new File(dir.getAbsolutePath()+ "/" + name);
try {
Scanner scanner = new Scanner(f);
while (scanner.hasNextLine()) {
final String lineFromFile = scanner.nextLine();
for (String aProhibited : prohibited) {
if (lineFromFile.contains(aProhibited)) {
boolean whitelistedFound = false;
for (String white : whitelisted) {
if (lineFromFile.contains(white)) {
whitelistedFound = true;
break;
}
}
if (!whitelistedFound) {
// logging in FATAL to see the actual message in TRAVIS tests.
Marker fatal = MarkerFactory.getMarker("FATAL");
LOG.error(fatal, "Prohibited String '{}' in line '{}'", aProhibited, lineFromFile);
return true;
}
}
}
}
} catch (FileNotFoundException e) {
LOG.warn("Unable to locate file: "+e.getMessage()+" file: "+f.getAbsolutePath());
}
return false;
}
});
if(foundFile != null) {
Scanner scanner = null;
try {
scanner = new Scanner(foundFile);
} catch (FileNotFoundException e) {
Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+foundFile.getAbsolutePath());
}
LOG.warn("Found a file with a prohibited string. Printing contents:");
while (scanner.hasNextLine()) {
LOG.warn("LINE: "+scanner.nextLine());
}
Assert.fail("Found a file "+foundFile+" with a prohibited string: "+Arrays.toString(prohibited));
}
}
public static void sleep(int time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
LOG.warn("Interruped",e);
}
}
public static int getRunningContainers() {
int count = 0;
for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
NodeManager nm = yarnCluster.getNodeManager(nmId);
ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
count += containers.size();
}
return count;
}
public static void startYARNWithConfig(Configuration conf) {
// set the home directory to a tmp directory. Flink on YARN is using the home dir to distribute the file
File homeDir = null;
try {
homeDir = tmp.newFolder();
} catch (IOException e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
System.setProperty("user.home", homeDir.getAbsolutePath());
String uberjarStartLoc = "..";
LOG.info("Trying to locate uberjar in {}", new File(uberjarStartLoc));
flinkUberjar = findFile(uberjarStartLoc, new RootDirFilenameFilter());
Assert.assertNotNull("Flink uberjar not found", flinkUberjar);
String flinkDistRootDir = flinkUberjar.getParentFile().getParent();
flinkLibFolder = flinkUberjar.getParentFile(); // the uberjar is located in lib/
Assert.assertNotNull("Flink flinkLibFolder not found", flinkLibFolder);
Assert.assertTrue("lib folder not found", flinkLibFolder.exists());
Assert.assertTrue("lib folder not found", flinkLibFolder.isDirectory());
if (!flinkUberjar.exists()) {
Assert.fail("Unable to locate yarn-uberjar.jar");
}
try {
LOG.info("Starting up MiniYARNCluster");
if (yarnCluster == null) {
yarnCluster = new MiniYARNCluster(conf.get(YarnTestBase.TEST_CLUSTER_NAME_KEY), NUM_NODEMANAGERS, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
}
Map<String, String> map = new HashMap<String, String>(System.getenv());
File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"}));
Assert.assertNotNull(flinkConfDirPath);
map.put("FLINK_CONF_DIR", flinkConfDirPath.getParent());
File yarnConfFile = writeYarnSiteConfigXML(conf);
map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
map.put("IN_TESTS", "yes we are in tests"); // see FlinkYarnClient() for more infos
TestBaseUtils.setEnv(map);
Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED);
// wait for the nodeManagers to connect
while(!yarnCluster.waitForNodeManagersToConnect(500)) {
LOG.info("Waiting for Nodemanagers to connect");
}
} catch (Exception ex) {
ex.printStackTrace();
LOG.error("setup failure", ex);
Assert.fail();
}
}
/**
* Default @BeforeClass impl. Overwrite this for passing a different configuration
*/
@BeforeClass
public static void setup() {
startYARNWithConfig(yarnConfiguration);
}
// -------------------------- Runner -------------------------- //
protected static ByteArrayOutputStream outContent;
protected static ByteArrayOutputStream errContent;
enum RunTypes {
YARN_SESSION, CLI_FRONTEND
}
/**
* This method returns once the "startedAfterString" has been seen.
*/
protected Runner startWithArgs(String[] args, String startedAfterString, RunTypes type) {
LOG.info("Running with args {}", Arrays.toString(args));
outContent = new ByteArrayOutputStream();
errContent = new ByteArrayOutputStream();
System.setOut(new PrintStream(outContent));
System.setErr(new PrintStream(errContent));
final int START_TIMEOUT_SECONDS = 60;
Runner runner = new Runner(args, type);
runner.setName("Frontend (CLI/YARN Client) runner thread (startWithArgs()).");
runner.start();
for(int second = 0; second < START_TIMEOUT_SECONDS; second++) {
sleep(1000);
// check output for correct TaskManager startup.
if(outContent.toString().contains(startedAfterString)
|| errContent.toString().contains(startedAfterString) ) {
LOG.info("Found expected output in redirected streams");
return runner;
}
// check if thread died
if(!runner.isAlive()) {
sendOutput();
Assert.fail("Runner thread died before the test was finished. Return value = "+runner.getReturnValue());
}
}
sendOutput();
Assert.fail("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " +
"expected string did not show up");
return null;
}
protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode) {
runWithArgs(args,terminateAfterString, failOnStrings, type, returnCode, false);
}
/**
* The test has been passed once the "terminateAfterString" has been seen.
* @param args Command line arguments for the runner
* @param terminateAfterString the runner is searching the stdout and stderr for this string. as soon as it appears, the test has passed
* @param failOnStrings The runner is searching stdout and stderr for the strings specified here. If one appears, the test has failed
* @param type Set the type of the runner
* @param returnCode Expected return code from the runner.
* @param checkLogForTerminateString If true, the runner checks also the log4j logger for the terminate string
*/
protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode, boolean checkLogForTerminateString) {
LOG.info("Running with args {}", Arrays.toString(args));
outContent = new ByteArrayOutputStream();
errContent = new ByteArrayOutputStream();
System.setOut(new PrintStream(outContent));
System.setErr(new PrintStream(errContent));
// we wait for at most three minutes
final int START_TIMEOUT_SECONDS = 180;
final long deadline = System.currentTimeMillis() + (START_TIMEOUT_SECONDS * 1000);
Runner runner = new Runner(args, type);
runner.start();
boolean expectedStringSeen = false;
boolean testPassedFromLog4j = false;
do {
sleep(1000);
String outContentString = outContent.toString();
String errContentString = errContent.toString();
if(failOnStrings != null) {
for (String failOnString : failOnStrings) {
if (outContentString.contains(failOnString)
|| errContentString.contains(failOnString)) {
LOG.warn("Failing test. Output contained illegal string '" + failOnString + "'");
sendOutput();
// stopping runner.
runner.sendStop();
Assert.fail("Output contained illegal string '" + failOnString + "'");
}
}
}
// check output for the expected terminateAfterString.
if(checkLogForTerminateString) {
LoggingEvent matchedEvent = UtilsTest.getEventContainingString(terminateAfterString);
if(matchedEvent != null) {
testPassedFromLog4j = true;
LOG.info("Found expected output in logging event {}", matchedEvent);
}
}
if (outContentString.contains(terminateAfterString) || errContentString.contains(terminateAfterString) || testPassedFromLog4j ) {
expectedStringSeen = true;
LOG.info("Found expected output in redirected streams");
// send "stop" command to command line interface
LOG.info("RunWithArgs: request runner to stop");
runner.sendStop();
// wait for the thread to stop
try {
runner.join(30000);
}
catch (InterruptedException e) {
LOG.warn("Interrupted while stopping runner", e);
}
LOG.warn("RunWithArgs runner stopped.");
}
else {
// check if thread died
if (!runner.isAlive()) {
if (runner.getReturnValue() != 0) {
Assert.fail("Runner thread died before the test was finished. Return value = "
+ runner.getReturnValue());
} else {
LOG.info("Runner stopped earlier than expected with return value = 0");
}
// leave loop: the runner died, so we can not expect new strings to show up.
break;
}
}
}
while (!expectedStringSeen && System.currentTimeMillis() < deadline);
sendOutput();
Assert.assertTrue("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " +
"expected string did not show up", expectedStringSeen);
// check for 0 return code
Assert.assertEquals("Expected return value", returnCode, runner.getReturnValue());
LOG.info("Test was successful");
}
protected static void sendOutput() {
System.setOut(originalStdout);
System.setErr(originalStderr);
LOG.info("Sending stdout content through logger: \n\n{}\n\n", outContent.toString());
LOG.info("Sending stderr content through logger: \n\n{}\n\n", errContent.toString());
}
public static class Runner extends Thread {
private final String[] args;
private int returnValue;
private RunTypes type;
private FlinkYarnSessionCli yCli;
public Runner(String[] args, RunTypes type) {
this.args = args;
this.type = type;
}
public int getReturnValue() {
return returnValue;
}
@Override
public void run() {
switch(type) {
case YARN_SESSION:
yCli = new FlinkYarnSessionCli("", "");
returnValue = yCli.run(args);
break;
case CLI_FRONTEND:
try {
CliFrontend cli = new CliFrontend();
returnValue = cli.parseParameters(args);
} catch (Exception e) {
throw new RuntimeException(e);
}
break;
default:
throw new RuntimeException("Unknown type " + type);
}
if(returnValue != 0) {
Assert.fail("The YARN session returned with non-null value="+returnValue);
}
}
public void sendStop() {
if(yCli != null) {
yCli.stop();
}
}
}
// -------------------------- Tear down -------------------------- //
@AfterClass
public static void copyOnTravis() {
// When we are on travis, we copy the tmp files of JUnit (containing the MiniYARNCluster log files)
// to <flinkRoot>/target/flink-yarn-tests-*.
// The files from there are picked up by the ./tools/travis_watchdog.sh script
// to upload them to Amazon S3.
if(isOnTravis()) {
File target = new File("../target/"+yarnConfiguration.get(TEST_CLUSTER_NAME_KEY));
if(!target.mkdirs()) {
LOG.warn("Error creating dirs to {}", target);
}
File src = tmp.getRoot();
LOG.info("copying the final files from {} to {}", src.getAbsolutePath(), target.getAbsolutePath());
try {
FileUtils.copyDirectoryToDirectory(src, target);
} catch (IOException e) {
LOG.warn("Error copying the final files from {} to {}: msg: {}", src.getAbsolutePath(), target.getAbsolutePath(), e.getMessage(), e);
}
}
}
public static boolean isOnTravis() {
return System.getenv("TRAVIS") != null && System.getenv("TRAVIS").equals("true");
}
}