blob: 7d369d529825e51e7483fb324882a25eb72e83a1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.funtest.framework
import groovy.transform.CompileStatic
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem as HadoopFS
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hdfs.HdfsConfiguration
import org.apache.hadoop.registry.client.api.RegistryConstants
import org.apache.hadoop.util.ExitUtil
import org.apache.hadoop.util.Shell
import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.slider.api.StatusKeys
import org.apache.slider.common.tools.ConfigHelper
import org.apache.slider.core.launch.SerializedApplicationReport
import org.apache.slider.core.main.ServiceLauncher
import org.apache.slider.common.SliderKeys
import org.apache.slider.common.SliderXmlConfKeys
import org.apache.slider.api.ClusterDescription
import org.apache.slider.common.tools.SliderUtils
import org.apache.slider.client.SliderClient
import org.apache.slider.core.persist.ApplicationReportSerDeser
import org.apache.slider.test.SliderTestUtils
import org.apache.slider.test.Outcome;
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Rule
import org.junit.rules.Timeout
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import static org.apache.slider.common.SliderExitCodes.*
import static org.apache.slider.core.main.LauncherExitCodes.*
import static org.apache.slider.funtest.framework.FuntestProperties.*
import static org.apache.slider.common.params.Arguments.*
import static org.apache.slider.common.params.SliderActions.*
import static org.apache.slider.common.SliderXMLConfKeysForTesting.*
@CompileStatic
abstract class CommandTestBase extends SliderTestUtils {
private static final Logger log =
LoggerFactory.getLogger(CommandTestBase.class);
public static final String SLIDER_CONF_DIR = sysprop(SLIDER_CONF_DIR_PROP)
public static final String SLIDER_TAR_DIR = sysprop(SLIDER_BIN_DIR_PROP)
public static final File SLIDER_TAR_DIRECTORY = new File(
SLIDER_TAR_DIR).canonicalFile
public static final File SLIDER_SCRIPT = new File(
SLIDER_TAR_DIRECTORY,
BIN_SLIDER).canonicalFile
public static final File SLIDER_SCRIPT_PYTHON = new File(
SLIDER_TAR_DIRECTORY,
BIN_SLIDER_PYTHON).canonicalFile
public static final File SLIDER_CONF_DIRECTORY = new File(
SLIDER_CONF_DIR).canonicalFile
public static final File SLIDER_CONF_XML = new File(SLIDER_CONF_DIRECTORY,
CLIENT_CONFIG_FILENAME).canonicalFile
public static final YarnConfiguration SLIDER_CONFIG
public static final int THAW_WAIT_TIME
public static final int FREEZE_WAIT_TIME
public static final int SLIDER_TEST_TIMEOUT
public static final String YARN_RAM_REQUEST
/**
* Keytab for secure cluster
*/
public static final String TEST_AM_KEYTAB
static File keytabFile
/**
* shell-escaped ~ symbol. On windows this does
* not need to be escaped
*/
public static final String TILDE
public static final int CONTAINER_LAUNCH_TIMEOUT = 90000
public static final int PROBE_SLEEP_TIME = 4000
public static final int REGISTRY_STARTUP_TIMEOUT = 60000
public static final String E_LAUNCH_FAIL = 'Application did not start'
/*
Static initializer for test configurations. If this code throws exceptions
(which it may) the class will not be instantiable.
*/
static {
new HdfsConfiguration()
ConfigHelper.registerDeprecatedConfigItems();
SLIDER_CONFIG = ConfLoader.loadSliderConf(SLIDER_CONF_XML, true);
THAW_WAIT_TIME = getTimeOptionMillis(SLIDER_CONFIG,
KEY_TEST_THAW_WAIT_TIME,
1000 * DEFAULT_THAW_WAIT_TIME_SECONDS)
FREEZE_WAIT_TIME = getTimeOptionMillis(SLIDER_CONFIG,
KEY_TEST_FREEZE_WAIT_TIME,
1000 * DEFAULT_TEST_FREEZE_WAIT_TIME_SECONDS)
SLIDER_TEST_TIMEOUT = getTimeOptionMillis(SLIDER_CONFIG,
KEY_TEST_TIMEOUT,
1000 * DEFAULT_TEST_TIMEOUT_SECONDS)
YARN_RAM_REQUEST = SLIDER_CONFIG.getTrimmed(
KEY_TEST_YARN_RAM_REQUEST,
DEFAULT_YARN_RAM_REQUEST)
TEST_AM_KEYTAB = SLIDER_CONFIG.getTrimmed(
KEY_TEST_AM_KEYTAB)
TILDE = Shell.WINDOWS? "~" : "\\~"
}
@Rule
public final Timeout testTimeout = new Timeout(SLIDER_TEST_TIMEOUT);
@BeforeClass
public static void setupTestBase() {
Configuration conf = loadSliderConf();
SliderShell.confDir = SLIDER_CONF_DIRECTORY
// choose python script if on windows or the launch key recommends it
//
boolean python = SLIDER_CONFIG.getBoolean(KEY_LAUNCH_PYTHON, false)
SliderShell.scriptFile =
(SliderShell.windows || python) ? SLIDER_SCRIPT_PYTHON : SLIDER_SCRIPT
//set the property of the configuration directory
def path = SLIDER_CONF_DIRECTORY.absolutePath
SLIDER_CONFIG.set(ENV_SLIDER_CONF_DIR, path)
// locate any hadoop conf dir
def hadoopConf = SLIDER_CONFIG.getTrimmed(ENV_HADOOP_CONF_DIR)
if (hadoopConf) {
File hadoopConfDir = new File(hadoopConf).canonicalFile
// propagate the value to the client config
SliderShell.setEnv(ENV_HADOOP_CONF_DIR, hadoopConfDir.absolutePath)
}
if (SliderUtils.maybeInitSecurity(conf)) {
log.debug("Security enabled")
SliderUtils.forceLogin()
// now look for the security key
/*
if (!TEST_AM_KEYTAB) {
fail("Security keytab is not defined in $KEY_TEST_AM_KEYTAB")
}
keytabFile = new File(TEST_AM_KEYTAB)
if (!keytabFile.exists()) {
throw new FileNotFoundException("Security keytab ${keytabFile} " +
" defined in $KEY_TEST_AM_KEYTAB")
}
*/
} else {
log.info "Security is off"
}
log.info("Test using ${HadoopFS.getDefaultUri(SLIDER_CONFIG)} " +
"and YARN RM @ ${SLIDER_CONFIG.get(YarnConfiguration.RM_ADDRESS)}")
}
public static void logShell(SliderShell shell) {
shell.dumpOutput();
}
/**
* give the test thread a name
*/
@Before
public void nameThread() {
Thread.currentThread().name = "JUnit"
}
/**
* Add a configuration file at a given path
* @param dir directory containing the file
* @param filename filename
* @return true if the file was found
* @throws IOException loading problems (other than a missing file)
*/
public static boolean maybeAddConfFile(File dir, String filename) throws IOException {
File confFile = new File(dir, filename)
if (confFile.isFile()) {
ConfigHelper.addConfigurationFile(SLIDER_CONFIG, confFile, true)
log.debug("Loaded $confFile")
return true;
} else {
log.debug("Did not find $confFile —skipping load")
return false;
}
}
/**
* Add a jar to the slider classpath by looking up a class and determining
* its containing JAR
* @param clazz class inside the JAR
*/
public static void addExtraJar(Class clazz) {
def jar = SliderUtils.findContainingJarOrFail(clazz)
def path = jar.absolutePath
if (!SliderShell.slider_classpath_extra.contains(path)) {
SliderShell.slider_classpath_extra << path
}
}
/**
* Resolve a system property, throwing an exception if it is not present
* @param key property name
* @return the value
* @throws RuntimeException if the property is not set
*/
public static String sysprop(String key) {
def property = System.getProperty(key)
if (!property) {
throw new RuntimeException("Undefined property $key")
}
return property
}
/**
* Print to system out
* @param string
*/
static void println(String s) {
System.out.println(s)
}
/**
* Print a newline to system out
* @param string
*/
static void println() {
System.out.println()
}
/**
* Exec any slider command
* @param conf
* @param commands
* @return the shell
*/
public static SliderShell slider(Collection<String> commands) {
SliderShell shell = new SliderShell(commands)
shell.execute()
return shell
}
/**
* Execute an operation, state the expected error code
* @param exitCode exit code
* @param commands commands
* @return
*/
public static SliderShell slider(int exitCode, Collection<String> commands) {
return SliderShell.run(exitCode, commands)
}
/**
* Load the client XML file
* @return
*/
public static Configuration loadSliderConf() {
Configuration conf = ConfLoader.loadSliderConf(SLIDER_CONF_XML, true)
return conf
}
public static HadoopFS getClusterFS() {
return HadoopFS.get(SLIDER_CONFIG)
}
static SliderShell destroy(String name) {
slider([
ACTION_DESTROY, name
])
}
static SliderShell destroy(int result, String name) {
slider(result, [
ACTION_DESTROY, name
])
}
static SliderShell exists(String name, boolean live = true) {
List<String> args = [
ACTION_EXISTS, name
]
if (live) {
args << ARG_LIVE
}
slider(args)
}
static SliderShell exists(int result, String name, boolean live = true) {
List<String> args = [
ACTION_EXISTS, name
]
if (live) {
args << ARG_LIVE
}
slider(result, args)
}
static SliderShell freeze(String name) {
slider([
ACTION_FREEZE, name
])
}
static SliderShell freeze(
int exitCode,
String name,
Collection<String> args) {
slider(exitCode, [ACTION_FREEZE, name] + args)
}
/**
* Stop cluster: no exit code checking
* @param name
* @param args
* @return
*/
static SliderShell freeze(String name, Collection<String> args) {
slider([ACTION_FREEZE, name] + args)
}
static SliderShell freezeForce(String name) {
freeze(name, [ARG_FORCE, ARG_WAIT, "10000"])
}
static SliderShell killContainer(String name, String containerID) {
slider(0,
[
ACTION_KILL_CONTAINER,
name,
containerID
])
}
static SliderShell list(String name) {
List<String> cmd = [
ACTION_LIST
]
if (name != null) {
cmd << name
}
slider(cmd)
}
static SliderShell lookup(int result, String id, File out) {
assert id
def commands = [ACTION_LOOKUP, ARG_ID, id]
if (out) {
commands += [ARG_OUTPUT, out.absolutePath]
}
slider(result, commands)
}
static SliderShell lookup(String id, File out) {
assert id
def commands = [ACTION_LOOKUP, ARG_ID, id]
if (out) {
commands += [ARG_OUTPUT, out.absolutePath]
}
slider(commands)
}
static SliderShell list(int result, Collection<String> commands =[]) {
slider(result, [ACTION_LIST] + commands )
}
static SliderShell status(String name) {
slider([
ACTION_STATUS, name
])
}
static SliderShell status(int result, String name) {
slider(result,
[
ACTION_STATUS, name
])
}
static SliderShell thaw(String name) {
slider([
ACTION_THAW, name
])
}
static SliderShell thaw(int result, String name) {
slider(result,
[
ACTION_THAW, name
])
}
static SliderShell thaw(String name, Collection<String> args) {
slider(0, [ACTION_THAW, name] + args)
}
static SliderShell resolve(int result, Collection<String> commands) {
slider(result,
[ACTION_RESOLVE] + commands
)
}
static SliderShell registry(int result, Collection<String> commands) {
slider(result,
[ACTION_REGISTRY] + commands
)
}
static SliderShell registry(Collection<String> commands) {
slider(0,
[ACTION_REGISTRY] + commands
)
}
/**
* Ensure that a cluster has been destroyed
* @param name
*/
static void ensureClusterDestroyed(String name) {
def froze = freezeForce(name)
def result = froze.ret
if (result != 0 && result != EXIT_UNKNOWN_INSTANCE) {
froze.assertExitCode(0)
}
destroy(0, name)
}
/**
* If the functional tests are enabled, set up the cluster
*
* @param cluster
*/
static void setupCluster(String cluster) {
describe "setting up $cluster"
ensureClusterDestroyed(cluster)
}
/**
* Teardown operation -freezes cluster, and may destroy it
* though for testing it is best if it is retained
* @param name cluster name
*/
static void teardown(String name) {
freezeForce(name)
}
/**
* Assert the exit code is that the cluster is 0
* @param shell shell
*/
public static void assertSuccess(SliderShell shell) {
assertExitCode(shell, 0)
}
/**
* Assert the exit code is that the cluster is unknown
* @param shell shell
*/
public static void assertUnknownCluster(SliderShell shell) {
assertExitCode(shell, EXIT_UNKNOWN_INSTANCE)
}
/**
* Assert a shell exited with a given error code
* if not the output is printed and an assertion is raised
* @param shell shell
* @param errorCode expected error code
*/
public static void assertExitCode(SliderShell shell, int errorCode) {
shell.assertExitCode(errorCode)
}
/**
* Assert that the stdout/stderr streams of the shell contain the string
* to look for.
* If the assertion does not hold, the output is logged before
* the assertion is thrown
* @param shell
* @param lookThisUp
* @param n number of times (default = 1)
*/
public static void assertOutputContains(
SliderShell shell,
String lookThisUp,
int n = 1) {
if (!shell.outputContains(lookThisUp)) {
log.error("Missing $lookThisUp from:")
shell.dumpOutput()
assert shell.outputContains(lookThisUp)
}
}
/**
* Create a connection to the cluster by execing the status command
*
* @param clustername
* @return
*/
SliderClient bondToCluster(Configuration conf, String clustername) {
String address = getRequiredConfOption(conf, YarnConfiguration.RM_ADDRESS)
ServiceLauncher<SliderClient> launcher = launchClientAgainstRM(
address,
["exists", clustername],
conf)
int exitCode = launcher.serviceExitCode
if (exitCode) {
throw new ExitUtil.ExitException(exitCode, "exit code = $exitCode")
}
SliderClient sliderClient = launcher.service
sliderClient.deployedClusterName = clustername
return sliderClient;
}
/**
* Create or build a slider cluster (the action is set by the first verb)
* @param action operation to invoke: ACTION_CREATE or ACTION_BUILD
* @param clustername cluster name
* @param roles map of rolename to count
* @param extraArgs list of extra args to add to the creation command
* @param deleteExistingData should the data of any existing cluster
* of this name be deleted
* @param blockUntilRunning block until the AM is running
* @param clusterOps map of key=value cluster options to set with the --option arg
* @return shell which will have executed the command.
*/
public SliderShell createOrBuildSliderCluster(
String action,
String clustername,
Map<String, Integer> roles,
List<String> extraArgs,
boolean blockUntilRunning,
Map<String, String> clusterOps) {
assert action != null
assert clustername != null
List<String> argsList = [action, clustername]
argsList << ARG_ZKHOSTS <<
SLIDER_CONFIG.getTrimmed(RegistryConstants.KEY_REGISTRY_ZK_QUORUM)
if (blockUntilRunning) {
argsList << ARG_WAIT << Integer.toString(THAW_WAIT_TIME)
}
List<String> roleList = [];
roles.each { String role, Integer val ->
log.info("Role $role := $val")
roleList << ARG_COMPONENT << role << Integer.toString(val)
}
argsList += roleList;
//now inject any cluster options
clusterOps.each { String opt, String val ->
argsList << ARG_OPTION << opt.toString() << val.toString();
}
if (extraArgs != null) {
argsList += extraArgs;
}
slider(0, argsList)
}
/**
* Create a slider cluster
* @param clustername cluster name
* @param roles map of rolename to count
* @param extraArgs list of extra args to add to the creation command
* @param blockUntilRunning block until the AM is running
* @param clusterOps map of key=value cluster options to set with the --option arg
* @return launcher which will have executed the command.
*/
public SliderShell createSliderApplication(
String clustername,
Map<String, Integer> roles,
List<String> extraArgs,
boolean blockUntilRunning,
Map<String, String> clusterOps) {
return createOrBuildSliderCluster(
ACTION_CREATE,
clustername,
roles,
extraArgs,
blockUntilRunning,
clusterOps)
}
/**
* Create a templated slider app
* @param name name
* @param appTemplate application template
* @param resourceTemplate resource template
* @return the shell
*/
public SliderShell createTemplatedSliderApplication(
String name,
String appTemplate,
String resourceTemplate,
List<String> extraArgs = [],
File launchReportFile = null) {
if (!launchReportFile) {
launchReportFile = createAppReportFile()
}
// delete any previous copy of the file
launchReportFile.delete();
List<String> commands = [
ACTION_CREATE, name,
ARG_TEMPLATE, appTemplate,
ARG_RESOURCES, resourceTemplate,
ARG_OUTPUT, launchReportFile.absolutePath,
ARG_WAIT, Integer.toString(THAW_WAIT_TIME)
]
maybeAddCommandOption(commands,
[ARG_COMP_OPT, SliderKeys.COMPONENT_AM, SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME],
SLIDER_CONFIG.getTrimmed(SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME));
maybeAddCommandOption(commands,
[ARG_COMP_OPT, SliderKeys.COMPONENT_AM, SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR],
SLIDER_CONFIG.getTrimmed(SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR));
maybeAddCommandOption(commands,
[ARG_COMP_OPT, SliderKeys.COMPONENT_AM, SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH],
SLIDER_CONFIG.getTrimmed(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH));
maybeAddCommandOption(commands,
[ARG_COMP_OPT, SliderKeys.COMPONENT_AM, SliderXmlConfKeys.KEY_KEYTAB_PRINCIPAL],
SLIDER_CONFIG.getTrimmed(SliderXmlConfKeys.KEY_KEYTAB_PRINCIPAL));
commands.addAll(extraArgs)
SliderShell shell = new SliderShell(commands)
if (0 != shell.execute()) {
// app has failed.
// grab the app report of the last known instance of this app
// which may not be there if it was a config failure; may be out of date
// from a previous run
log.error("Launch failed with exit code ${shell.ret}")
shell.dumpOutput()
// now grab that app report if it is there
def appReport = maybeLookupFromLaunchReport(launchReportFile)
String extraText = ""
if (appReport) {
log.error("Application report:\n$appReport")
extraText = appReport.diagnostics
}
fail("Application Launch Failure, exit code ${shell.ret}\n${extraText}")
}
return shell
}
public static File createAppReportFile() {
File reportFile = File.createTempFile(
"launch",
".json",
new File("target"))
reportFile.delete()
return reportFile
}
/**
* If the option is not null/empty, add the command and the option
* @param args arg list being built up
* @param command command to add option
* @param option option to probe and use
* @return the (possibly extended) list
*/
public static List<String> maybeAddCommandOption(
List<String> args, List<String> commands, String option) {
if ( SliderUtils.isSet(option)) {
args.addAll(commands)
args << option
}
return args
}
public static SerializedApplicationReport maybeLoadAppReport(File reportFile) {
if (reportFile.exists() && reportFile.length()> 0) {
ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser()
def report = serDeser.fromFile(reportFile)
return report
}
return null;
}
public static SerializedApplicationReport loadAppReport(File reportFile) {
if (reportFile.exists() && reportFile.length()> 0) {
ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser()
def report = serDeser.fromFile(reportFile)
return report
} else {
throw new FileNotFoundException(reportFile.absolutePath)
}
}
public static SerializedApplicationReport maybeLookupFromLaunchReport(File launchReport) {
def report = maybeLoadAppReport(launchReport)
if (report) {
return lookupApplication(report.applicationId)
} else {
return null
}
}
/**
* Lookup an application, return null if loading failed
* @param id application ID
* @return an application report or null
*/
public static SerializedApplicationReport lookupApplication(String id) {
File reportFile = createAppReportFile();
try {
def shell = lookup(id, reportFile)
if (shell.ret == 0) {
return maybeLoadAppReport(reportFile)
} else {
log.warn("Lookup operation failed with ${shell.ret}")
shell.dumpOutput()
return null
}
} finally {
reportFile.delete()
}
}
public Path buildClusterPath(String clustername) {
return new Path(
clusterFS.homeDirectory,
"${SliderKeys.SLIDER_BASE_DIRECTORY}/cluster/${clustername}")
}
public ClusterDescription killAmAndWaitForRestart(
SliderClient sliderClient, String cluster) {
assert cluster
slider(0, [
ACTION_AM_SUICIDE, cluster,
ARG_EXITCODE, "1",
ARG_WAIT, "1000",
ARG_MESSAGE, "suicide"
])
sleep(5000)
ensureApplicationIsUp(cluster)
return sliderClient.clusterDescription
}
/**
* Kill an AM and await restrt
* @param sliderClient
* @param application
* @param appId
* @return
*/
public void killAmAndWaitForRestart(String application, String appId) {
assert application
slider(0, [
ACTION_AM_SUICIDE, application,
ARG_EXITCODE, "1",
ARG_WAIT, "1000",
ARG_MESSAGE, "suicide"
])
sleep(5000)
ensureYarnApplicationIsUp(appId)
}
/**
* Spinning operation to perform a registry call
* @param application application
*/
protected void ensureRegistryCallSucceeds(String application) {
repeatUntilSuccess(this.&isRegistryAccessible,
REGISTRY_STARTUP_TIMEOUT,
PROBE_SLEEP_TIME,
[application: application],
true,
"Application registry is not accessible after $REGISTRY_STARTUP_TIMEOUT") {
describe "Not able to access registry after after $REGISTRY_STARTUP_TIMEOUT"
exists(application, true).dumpOutput()
SliderShell shell = registry(0, [
ARG_NAME,
application,
ARG_LISTEXP
])
}
}
/**
* wait for an application to come up
* @param application
*/
protected void ensureApplicationIsUp(String application) {
repeatUntilSuccess(this.&isApplicationRunning,
SLIDER_CONFIG.getInt(KEY_TEST_INSTANCE_LAUNCH_TIME,
DEFAULT_INSTANCE_LAUNCH_TIME_SECONDS) * 1000,
PROBE_SLEEP_TIME,
[application: application],
true,
E_LAUNCH_FAIL) {
describe "final state of app that tests say is not up"
exists(application, true).dumpOutput()
}
}
/**
* Is the registry accessible for an application?
* @param args argument map containing <code>"application"</code>
* @return probe outcome
*/
protected Outcome isRegistryAccessible(Map<String, String> args) {
String applicationName = args['application'];
SliderShell shell = slider(
[
ACTION_REGISTRY,
ARG_NAME,
applicationName,
ARG_LISTEXP
])
if (EXIT_SUCCESS != shell.execute()) {
logShell(shell)
}
return Outcome.fromBool(EXIT_SUCCESS == shell.execute())
}
/**
* Probe for an application running; uses <code>exists</code> operation
* @param args argument map containing <code>"application"</code>
* @return
*/
protected Outcome isApplicationRunning(Map<String, String> args) {
String applicationName = args['application'];
return Outcome.fromBool(isApplicationUp(applicationName))
}
/**
* Use <code>exists</code> operation to probe for an application being up
* @param applicationName app name
* @return true if it s running
*/
protected boolean isApplicationUp(String applicationName) {
return isApplicationInState(
applicationName,
YarnApplicationState.RUNNING
);
}
/**
* is an application in a desired yarn state. Uses the <code>exists</code>
* CLI operation
* @param yarnState
* @param applicationName
* @return
*/
public static boolean isApplicationInState(
String applicationName,
YarnApplicationState yarnState) {
SliderShell shell = slider(
[ACTION_EXISTS, applicationName, ARG_STATE, yarnState.toString()])
return shell.ret == 0
}
/**
* Probe callback for is the the app running or not
* @param args map where 'applicationId' must m
* @return
*/
protected Outcome isYarnApplicationRunning(Map<String, String> args) {
String applicationId = args['applicationId'];
return isYarnApplicationRunning(applicationId)
}
/**
* is a yarn application in a desired yarn state
* @param yarnState
* @param applicationName
* @return an outcome indicating whether the app is at the state, on its way
* or has gone past
*/
public static Outcome isYarnApplicationRunning(
String applicationId) {
YarnApplicationState yarnState = YarnApplicationState.RUNNING
return isYarnApplicationInState(applicationId, yarnState)
}
/**
* Probe for a YARN application being in a given state
* @param applicationId app id
* @param yarnStat desired state
* @return success for a match, retry if state below desired, and fail if
* above it
*/
public static Outcome isYarnApplicationInState(
String applicationId,
YarnApplicationState yarnState) {
YarnApplicationState appState = lookupYarnAppState(applicationId)
if (yarnState == appState) {
return Outcome.Success;
}
if (appState.ordinal() > yarnState.ordinal()) {
// app has passed beyond hope
return Outcome.Fail
}
return Outcome.Retry
}
/**
* Look up the YARN application by ID, get its application record
* @param applicationId the application ID
* @return the application state
*/
public static YarnApplicationState lookupYarnAppState(String applicationId) {
def sar = lookupApplication(applicationId)
assert sar != null;
YarnApplicationState appState = YarnApplicationState.valueOf(sar.state)
return appState
}
/**
* Assert an application is in a given state; fail if not
* @param applicationId appId
* @param expectedState expected state
*/
public static void assertInYarnState(String applicationId,
YarnApplicationState expectedState) {
def applicationReport = lookupApplication(applicationId)
assert expectedState.toString() == applicationReport.state
}
/**
* Wait for the YARN app to come up. This will fail fast
* @param launchReportFile launch time file containing app id
* @return the app ID
*/
protected String ensureYarnApplicationIsUp(File launchReportFile) {
def id = loadAppReport(launchReportFile).applicationId
ensureYarnApplicationIsUp(id)
return id;
}
/**
* Wait for the YARN app to come up. This will fail fast
* @param applicationId
*/
protected void ensureYarnApplicationIsUp(String applicationId) {
repeatUntilSuccess(this.&isYarnApplicationRunning,
instanceLaunchTime,
PROBE_SLEEP_TIME,
[applicationId: applicationId],
true,
E_LAUNCH_FAIL) {
describe "final state of app that tests say is not up"
def sar = lookupApplication(applicationId)
def message = E_LAUNCH_FAIL + "\n$sar"
log.error(message)
fail(message)
}
}
/**
* Get the expected launch time. Default is the configuration option
* {@link FuntestProperties#KEY_TEST_INSTANCE_LAUNCH_TIME} and
* default value {@link FuntestProperties#KEY_TEST_INSTANCE_LAUNCH_TIME}
* @return
*/
public int getInstanceLaunchTime() {
return SLIDER_CONFIG.getInt(KEY_TEST_INSTANCE_LAUNCH_TIME,
DEFAULT_INSTANCE_LAUNCH_TIME_SECONDS)
}
public String getInfoAmWebUrl(String applicationName) {
ClusterDescription cd = execStatus(applicationName);
String urlString = cd.getInfo("info.am.web.url");
return urlString;
}
public ClusterDescription execStatus(String application) {
ClusterDescription cd
File statusFile = File.createTempFile("status", ".json")
try {
slider(EXIT_SUCCESS,
[
ACTION_STATUS,
application,
ARG_OUTPUT, statusFile.absolutePath
])
assert statusFile.exists()
return ClusterDescription.fromFile(statusFile)
} finally {
statusFile.delete()
}
}
public int queryRequestedCount(String application, String role) {
ClusterDescription cd = execStatus(application)
if (cd.statistics.size() == 0) {
log.debug("No statistics entries")
}
if (!cd.statistics[role]) {
log.debug("No stats for role $role")
return 0;
}
def statsForRole = cd.statistics[role]
def requested = statsForRole[StatusKeys.STATISTICS_CONTAINERS_REQUESTED]
assert null != statsForRole[StatusKeys.STATISTICS_CONTAINERS_REQUESTED]
return requested
}
Outcome hasRequestedContainerCountReached(Map<String, String> args) {
String application = args['application']
String role = args['role']
int expectedCount = args['limit'].toInteger();
int requestedCount = queryRequestedCount(application, role)
log.debug("requested $role count = $requestedCount; expected=$expectedCount")
return Outcome.fromBool(requestedCount >= expectedCount)
}
void expectContainerRequestedCountReached(String application, String role, int limit,
int container_launch_timeout) {
repeatUntilSuccess(
this.&hasRequestedContainerCountReached,
container_launch_timeout,
PROBE_SLEEP_TIME,
[limit : Integer.toString(limit),
role : role,
application: application],
true,
"countainer count not reached") {
int requestedCount = queryRequestedCount(application, role)
def message = "expected count of $role = $limit not reached: $requestedCount" +
" after $container_launch_timeout mS"
describe message
ClusterDescription cd = execStatus(application);
log.info("Parsed status \n$cd")
fail(message)
}
}
public ClusterDescription assertContainersLive(String clustername,
String component,
int count) {
ClusterDescription cd = execStatus(clustername)
assertContainersLive(cd, component, count)
return cd;
}
/**
* Outcome checker for the live container count
* @param args argument map, must contain "application", "component" and "live"
* @return
*/
Outcome hasLiveContainerCountReached(Map<String, String> args) {
assert args['application']
assert args['component']
assert args['live']
String application = args['application']
String component = args['component']
int expectedCount = args['live'].toInteger();
ClusterDescription cd = execStatus(application)
def actual = extractLiveContainerCount(cd, component)
log.debug(
"live $component count = $actual; expected=$expectedCount")
return Outcome.fromBool(actual >= expectedCount)
}
/**
* Wait for the live container count to be reached
* @param application application name
* @param component component name
* @param expected expected count
* @param container_launch_timeout launch timeout
*/
void expectLiveContainerCountReached(
String application,
String component,
int expected,
int container_launch_timeout) {
repeatUntilSuccess(
this.&hasLiveContainerCountReached,
container_launch_timeout,
PROBE_SLEEP_TIME,
[live : Integer.toString(expected),
component : component,
application: application],
true,
"countainer count not reached") {
describe "container count not reached"
assertContainersLive(application, component, expected)
}
}
}