blob: db9af8c96885513a47aab819be27daa2fd62dcfc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn;
import org.apache.flink.client.FlinkYarnSessionCli;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import static org.apache.flink.yarn.UtilsTest.addTestAppender;
import static org.apache.flink.yarn.UtilsTest.checkForLogString;
/**
* This test starts a MiniYARNCluster with a FIFO scheduler.
* There are no queues for that scheduler.
*/
public class YARNSessionFIFOITCase extends YarnTestBase {
private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOITCase.class);
/*
Override init with FIFO scheduler.
*/
@BeforeClass
public static void setup() {
yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-fifo");
startYARNWithConfig(yarnConfiguration);
}
@After
public void checkForProhibitedLogContents() {
ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
}
/**
* Test regular operation, including command line parameter parsing.
*/
@Test(timeout=60000) // timeout after a minute.
public void testDetachedMode() {
LOG.info("Starting testDetachedMode()");
addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
"-n", "1",
"-jm", "768",
"-tm", "1024",
"--name", "MyCustomName", // test setting a custom name
"--detached"},
"Flink JobManager is now running on", RunTypes.YARN_SESSION);
checkForLogString("The Flink YARN client has been started in detached mode");
Assert.assertFalse("The runner should detach.", runner.isAlive());
LOG.info("Waiting until two containers are running");
// wait until two containers are running
while(getRunningContainers() < 2) {
sleep(500);
}
LOG.info("Two containers are running. Killing the application");
// kill application "externally".
try {
YarnClient yc = YarnClient.createYarnClient();
yc.init(yarnConfiguration);
yc.start();
List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
Assert.assertEquals(1, apps.size()); // Only one running
ApplicationReport app = apps.get(0);
Assert.assertEquals("MyCustomName", app.getName());
ApplicationId id = app.getApplicationId();
yc.killApplication(id);
while(yc.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) {
sleep(500);
}
} catch(Throwable t) {
LOG.warn("Killing failed", t);
Assert.fail();
}
LOG.info("Finished testDetachedMode()");
}
/**
* Test querying the YARN cluster.
*
* This test validates through 666*2 cores in the "cluster".
*/
@Test
public void testQueryCluster() {
LOG.info("Starting testQueryCluster()");
runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332",null, RunTypes.YARN_SESSION, 0); // we have 666*2 cores.
LOG.info("Finished testQueryCluster()");
}
/**
* Test deployment to non-existing queue. (user-reported error)
* Deployment to the queue is possible because there are no queues, so we don't check.
*/
@Test
public void testNonexistingQueue() {
LOG.info("Starting testNonexistingQueue()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
"-n", "1",
"-jm", "768",
"-tm", "1024",
"-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0);
LOG.info("Finished testNonexistingQueue()");
}
/**
* The test cluster has the following resources:
* - 2 Nodes with 4096 MB each.
* - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
*
* We allocate:
* 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
* 5 TaskManagers with 1585 MB
*
* user sees a total request of: 8181 MB (fits)
* system sees a total request of: 8437 (doesn't fit due to min alloc mb)
*/
@Ignore("The test is too resource consuming (8.5 GB of memory)")
@Test
public void testResourceComputation() {
addTestAppender(FlinkYarnClient.class, Level.WARN);
LOG.info("Starting testResourceComputation()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
"-n", "5",
"-jm", "256",
"-tm", "1585"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
LOG.info("Finished testResourceComputation()");
checkForLogString("This YARN session requires 8437MB of memory in the cluster. There are currently only 8192MB available.");
}
/**
* The test cluster has the following resources:
* - 2 Nodes with 4096 MB each.
* - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
*
* We allocate:
* 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
* 2 TaskManagers with 3840 MB
*
* the user sees a total request of: 7936 MB (fits)
* the system sees a request of: 8192 MB (fits)
* HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which doesn't fit.
*
* --> check if the system properly rejects allocating this session.
*/
@Ignore("The test is too resource consuming (8 GB of memory)")
@Test
public void testfullAlloc() {
addTestAppender(FlinkYarnClient.class, Level.WARN);
LOG.info("Starting testfullAlloc()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
"-n", "2",
"-jm", "256",
"-tm", "3840"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
LOG.info("Finished testfullAlloc()");
checkForLogString("There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\n" +
"After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
}
/**
* Test the YARN Java API
*/
@Test
public void testJavaAPI() {
final int WAIT_TIME = 15;
LOG.info("Starting testJavaAPI()");
AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient();
Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
flinkYarnClient.setTaskManagerCount(1);
flinkYarnClient.setJobManagerMemory(768);
flinkYarnClient.setTaskManagerMemory(1024);
flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
String confDirPath = System.getenv("FLINK_CONF_DIR");
flinkYarnClient.setConfigurationDirectory(confDirPath);
flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
// deploy
AbstractFlinkYarnCluster yarnCluster = null;
try {
yarnCluster = flinkYarnClient.deploy();
yarnCluster.connectToCluster();
} catch (Exception e) {
LOG.warn("Failing test", e);
Assert.fail("Error while deploying YARN cluster: "+e.getMessage());
}
FlinkYarnClusterStatus expectedStatus = new FlinkYarnClusterStatus(1, 1);
for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever"
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.warn("Interrupted", e);
}
FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
if(status != null && status.equals(expectedStatus)) {
LOG.info("Cluster reached status " + status);
break; // all good, cluster started
}
if(second > WAIT_TIME) {
// we waited for 15 seconds. cluster didn't come up correctly
Assert.fail("The custer didn't start after " + WAIT_TIME + " seconds");
}
}
// use the cluster
Assert.assertNotNull(yarnCluster.getJobManagerAddress());
Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
LOG.info("Shutting down cluster. All tests passed");
// shutdown cluster
yarnCluster.shutdown(false);
LOG.info("Finished testJavaAPI()");
}
}