| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.slider.funtest.framework |
| |
| import groovy.transform.CompileStatic |
| import org.apache.hadoop.conf.Configuration |
| import org.apache.hadoop.fs.FileSystem as HadoopFS |
| import org.apache.hadoop.fs.Path |
| import org.apache.hadoop.hdfs.HdfsConfiguration |
| import org.apache.hadoop.registry.client.api.RegistryConstants |
| import org.apache.hadoop.util.ExitUtil |
| import org.apache.hadoop.util.Shell |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState |
| import org.apache.hadoop.yarn.conf.YarnConfiguration |
| import org.apache.slider.api.StatusKeys |
| import org.apache.slider.common.tools.ConfigHelper |
| import org.apache.slider.common.tools.Duration |
| import org.apache.slider.core.launch.SerializedApplicationReport |
| import org.apache.slider.core.main.ServiceLauncher |
| import org.apache.slider.common.SliderKeys |
| import org.apache.slider.common.SliderXmlConfKeys |
| import org.apache.slider.api.ClusterDescription |
| import org.apache.slider.common.tools.SliderUtils |
| import org.apache.slider.client.SliderClient |
| import org.apache.slider.core.persist.ApplicationReportSerDeser |
| import org.apache.slider.test.SliderTestUtils |
| import org.junit.Before |
| import org.junit.BeforeClass |
| import org.junit.Rule |
| import org.junit.rules.Timeout |
| import org.slf4j.Logger |
| import org.slf4j.LoggerFactory |
| import static org.apache.slider.common.SliderExitCodes.* |
| import static org.apache.slider.core.main.LauncherExitCodes.* |
| import static org.apache.slider.funtest.framework.FuntestProperties.* |
| import static org.apache.slider.common.params.Arguments.* |
| import static org.apache.slider.common.params.SliderActions.* |
| import static org.apache.slider.common.SliderXMLConfKeysForTesting.* |
| |
| @CompileStatic |
| abstract class CommandTestBase extends SliderTestUtils { |
| private static final Logger log = |
| LoggerFactory.getLogger(CommandTestBase.class); |
| |
| public static final String SLIDER_CONF_DIR = sysprop(SLIDER_CONF_DIR_PROP) |
| public static final String SLIDER_TAR_DIR = sysprop(SLIDER_BIN_DIR_PROP) |
| public static final File SLIDER_TAR_DIRECTORY = new File( |
| SLIDER_TAR_DIR).canonicalFile |
| public static final File SLIDER_SCRIPT = new File( |
| SLIDER_TAR_DIRECTORY, |
| BIN_SLIDER).canonicalFile |
| public static final File SLIDER_SCRIPT_PYTHON = new File( |
| SLIDER_TAR_DIRECTORY, |
| BIN_SLIDER_PYTHON).canonicalFile |
| public static final File SLIDER_CONF_DIRECTORY = new File( |
| SLIDER_CONF_DIR).canonicalFile |
| public static final File SLIDER_CONF_XML = new File(SLIDER_CONF_DIRECTORY, |
| CLIENT_CONFIG_FILENAME).canonicalFile |
| public static final YarnConfiguration SLIDER_CONFIG |
| public static final int THAW_WAIT_TIME |
| public static final int FREEZE_WAIT_TIME |
| |
| public static final int SLIDER_TEST_TIMEOUT |
| |
| public static final String YARN_RAM_REQUEST |
| |
| /** |
| * Keytab for secure cluster |
| */ |
| public static final String TEST_AM_KEYTAB |
| static File keytabFile |
| |
| /** |
| * shell-escaped ~ symbol. On windows this does |
| * not need to be escaped |
| */ |
| public static final String TILDE |
| public static final int CONTAINER_LAUNCH_TIMEOUT = 90000 |
| public static final int PROBE_SLEEP_TIME = 4000 |
| public static final int REGISTRY_STARTUP_TIMEOUT = 60000 |
| public static final String E_LAUNCH_FAIL = 'Application did not start' |
| |
| /* |
| Static initializer for test configurations. If this code throws exceptions |
| (which it may) the class will not be instantiable. |
| */ |
| static { |
| new HdfsConfiguration() |
| ConfigHelper.registerDeprecatedConfigItems(); |
| SLIDER_CONFIG = ConfLoader.loadSliderConf(SLIDER_CONF_XML, true); |
| THAW_WAIT_TIME = getTimeOptionMillis(SLIDER_CONFIG, |
| KEY_TEST_THAW_WAIT_TIME, |
| 1000 * DEFAULT_THAW_WAIT_TIME_SECONDS) |
| FREEZE_WAIT_TIME = getTimeOptionMillis(SLIDER_CONFIG, |
| KEY_TEST_FREEZE_WAIT_TIME, |
| 1000 * DEFAULT_TEST_FREEZE_WAIT_TIME_SECONDS) |
| SLIDER_TEST_TIMEOUT = getTimeOptionMillis(SLIDER_CONFIG, |
| KEY_TEST_TIMEOUT, |
| 1000 * DEFAULT_TEST_TIMEOUT_SECONDS) |
| |
| YARN_RAM_REQUEST = SLIDER_CONFIG.getTrimmed( |
| KEY_TEST_YARN_RAM_REQUEST, |
| DEFAULT_YARN_RAM_REQUEST) |
| |
| TEST_AM_KEYTAB = SLIDER_CONFIG.getTrimmed( |
| KEY_TEST_AM_KEYTAB) |
| |
| |
| |
| TILDE = Shell.WINDOWS? "~" : "\\~" |
| } |
| |
| @Rule |
| public final Timeout testTimeout = new Timeout(SLIDER_TEST_TIMEOUT); |
| |
| |
| @BeforeClass |
| public static void setupTestBase() { |
| Configuration conf = loadSliderConf(); |
| |
| SliderShell.confDir = SLIDER_CONF_DIRECTORY |
| |
| // choose python script if on windows or the launch key recommends it |
| // |
| boolean python = SLIDER_CONFIG.getBoolean(KEY_LAUNCH_PYTHON, false) |
| SliderShell.scriptFile = |
| (SliderShell.windows || python) ? SLIDER_SCRIPT_PYTHON : SLIDER_SCRIPT |
| |
| //set the property of the configuration directory |
| def path = SLIDER_CONF_DIRECTORY.absolutePath |
| SLIDER_CONFIG.set(ENV_SLIDER_CONF_DIR, path) |
| // locate any hadoop conf dir |
| def hadoopConf = SLIDER_CONFIG.getTrimmed(ENV_HADOOP_CONF_DIR) |
| if (hadoopConf) { |
| File hadoopConfDir = new File(hadoopConf).canonicalFile |
| // propagate the value to the client config |
| SliderShell.setEnv(ENV_HADOOP_CONF_DIR, hadoopConfDir.absolutePath) |
| } |
| |
| if (SliderUtils.maybeInitSecurity(conf)) { |
| log.debug("Security enabled") |
| SliderUtils.forceLogin() |
| // now look for the security key |
| /* |
| if (!TEST_AM_KEYTAB) { |
| fail("Security keytab is not defined in $KEY_TEST_AM_KEYTAB") |
| } |
| keytabFile = new File(TEST_AM_KEYTAB) |
| if (!keytabFile.exists()) { |
| throw new FileNotFoundException("Security keytab ${keytabFile} " + |
| " defined in $KEY_TEST_AM_KEYTAB") |
| } |
| */ |
| |
| } else { |
| log.info "Security is off" |
| } |
| |
| log.info("Test using ${HadoopFS.getDefaultUri(SLIDER_CONFIG)} " + |
| "and YARN RM @ ${SLIDER_CONFIG.get(YarnConfiguration.RM_ADDRESS)}") |
| } |
| |
| public static void logShell(SliderShell shell) { |
| shell.dumpOutput(); |
| } |
| |
| /** |
| * give the test thread a name |
| */ |
| @Before |
| public void nameThread() { |
| Thread.currentThread().name = "JUnit" |
| } |
| |
| /** |
| * Add a configuration file at a given path |
| * @param dir directory containing the file |
| * @param filename filename |
| * @return true if the file was found |
| * @throws IOException loading problems (other than a missing file) |
| */ |
| public static boolean maybeAddConfFile(File dir, String filename) throws IOException { |
| File confFile = new File(dir, filename) |
| if (confFile.isFile()) { |
| ConfigHelper.addConfigurationFile(SLIDER_CONFIG, confFile, true) |
| log.debug("Loaded $confFile") |
| return true; |
| } else { |
| log.debug("Did not find $confFile —skipping load") |
| return false; |
| } |
| } |
| |
| /** |
| * Add a jar to the slider classpath by looking up a class and determining |
| * its containing JAR |
| * @param clazz class inside the JAR |
| */ |
| public static void addExtraJar(Class clazz) { |
| def jar = SliderUtils.findContainingJarOrFail(clazz) |
| |
| def path = jar.absolutePath |
| if (!SliderShell.slider_classpath_extra.contains(path)) { |
| SliderShell.slider_classpath_extra << path |
| } |
| } |
| |
| /** |
| * Resolve a system property, throwing an exception if it is not present |
| * @param key property name |
| * @return the value |
| * @throws RuntimeException if the property is not set |
| */ |
| public static String sysprop(String key) { |
| def property = System.getProperty(key) |
| if (!property) { |
| throw new RuntimeException("Undefined property $key") |
| } |
| return property |
| } |
| |
| /** |
| * Print to system out |
| * @param string |
| */ |
| static void println(String s) { |
| System.out.println(s) |
| } |
| |
| /** |
| * Print a newline to system out |
| * @param string |
| */ |
| static void println() { |
| System.out.println() |
| } |
| |
| /** |
| * Exec any slider command |
| * @param conf |
| * @param commands |
| * @return the shell |
| */ |
| public static SliderShell slider(Collection<String> commands) { |
| SliderShell shell = new SliderShell(commands) |
| shell.execute() |
| return shell |
| } |
| |
| /** |
| * Execute an operation, state the expected error code |
| * @param exitCode exit code |
| * @param commands commands |
| * @return |
| */ |
| public static SliderShell slider(int exitCode, Collection<String> commands) { |
| return SliderShell.run(exitCode, commands) |
| } |
| |
| /** |
| * Load the client XML file |
| * @return |
| */ |
| public static Configuration loadSliderConf() { |
| Configuration conf = ConfLoader.loadSliderConf(SLIDER_CONF_XML, true) |
| return conf |
| } |
| |
| public static HadoopFS getClusterFS() { |
| return HadoopFS.get(SLIDER_CONFIG) |
| } |
| |
| static SliderShell destroy(String name) { |
| slider([ |
| ACTION_DESTROY, name |
| ]) |
| } |
| |
| static SliderShell destroy(int result, String name) { |
| slider(result, [ |
| ACTION_DESTROY, name |
| ]) |
| } |
| |
| static SliderShell exists(String name, boolean live = true) { |
| |
| List<String> args = [ |
| ACTION_EXISTS, name |
| ] |
| if (live) { |
| args << ARG_LIVE |
| } |
| slider(args) |
| } |
| |
| static SliderShell exists(int result, String name, boolean live = true) { |
| List<String> args = [ |
| ACTION_EXISTS, name |
| ] |
| if (live) { |
| args << ARG_LIVE |
| } |
| slider(result, args) |
| } |
| |
| static SliderShell freeze(String name) { |
| slider([ |
| ACTION_FREEZE, name |
| ]) |
| } |
| |
| |
| static SliderShell freeze( |
| int exitCode, |
| String name, |
| Collection<String> args) { |
| slider(exitCode, [ACTION_FREEZE, name] + args) |
| } |
| |
| /** |
| * Stop cluster: no exit code checking |
| * @param name |
| * @param args |
| * @return |
| */ |
| static SliderShell freeze(String name, Collection<String> args) { |
| slider([ACTION_FREEZE, name] + args) |
| } |
| |
| static SliderShell freezeForce(String name) { |
| freeze(name, [ARG_FORCE, ARG_WAIT, "10000"]) |
| } |
| |
| static SliderShell killContainer(String name, String containerID) { |
| slider(0, |
| [ |
| ACTION_KILL_CONTAINER, |
| name, |
| containerID |
| ]) |
| } |
| |
| static SliderShell list(String name) { |
| List<String> cmd = [ |
| ACTION_LIST |
| ] |
| if (name != null) { |
| cmd << name |
| } |
| slider(cmd) |
| } |
| |
| static SliderShell lookup(int result, String id, File out) { |
| assert id |
| def commands = [ACTION_LOOKUP, ARG_ID, id] |
| if (out) { |
| commands += [ARG_OUTPUT, out.absolutePath] |
| } |
| slider(result, commands) |
| } |
| |
| static SliderShell lookup(String id, File out) { |
| assert id |
| def commands = [ACTION_LOOKUP, ARG_ID, id] |
| if (out) { |
| commands += [ARG_OUTPUT, out.absolutePath] |
| } |
| slider(commands) |
| } |
| |
| static SliderShell list(int result, Collection<String> commands =[]) { |
| slider(result, [ACTION_LIST] + commands ) |
| } |
| |
| static SliderShell status(String name) { |
| slider([ |
| ACTION_STATUS, name |
| ]) |
| } |
| |
| static SliderShell status(int result, String name) { |
| slider(result, |
| [ |
| ACTION_STATUS, name |
| ]) |
| } |
| |
| static SliderShell thaw(String name) { |
| slider([ |
| ACTION_THAW, name |
| ]) |
| } |
| |
| static SliderShell thaw(int result, String name) { |
| slider(result, |
| [ |
| ACTION_THAW, name |
| ]) |
| } |
| |
| static SliderShell thaw(String name, Collection<String> args) { |
| slider(0, [ACTION_THAW, name] + args) |
| } |
| |
| static SliderShell resolve(int result, Collection<String> commands) { |
| slider(result, |
| [ACTION_RESOLVE] + commands |
| ) |
| } |
| |
| static SliderShell registry(int result, Collection<String> commands) { |
| slider(result, |
| [ACTION_REGISTRY] + commands |
| ) |
| } |
| |
| static SliderShell registry(Collection<String> commands) { |
| slider(0, |
| [ACTION_REGISTRY] + commands |
| ) |
| } |
| |
| /** |
| * Ensure that a cluster has been destroyed |
| * @param name |
| */ |
| static void ensureClusterDestroyed(String name) { |
| def froze = freezeForce(name) |
| |
| def result = froze.ret |
| if (result != 0 && result != EXIT_UNKNOWN_INSTANCE) { |
| froze.assertExitCode(0) |
| } |
| destroy(0, name) |
| } |
| |
| /** |
| * If the functional tests are enabled, set up the cluster |
| * |
| * @param cluster |
| */ |
| static void setupCluster(String cluster) { |
| describe "setting up $cluster" |
| ensureClusterDestroyed(cluster) |
| } |
| |
| /** |
| * Teardown operation -freezes cluster, and may destroy it |
| * though for testing it is best if it is retained |
| * @param name cluster name |
| */ |
| static void teardown(String name) { |
| freezeForce(name) |
| } |
| |
| /** |
| * Assert the exit code is that the cluster is 0 |
| * @param shell shell |
| */ |
| public static void assertSuccess(SliderShell shell) { |
| assertExitCode(shell, 0) |
| } |
| /** |
| * Assert the exit code is that the cluster is unknown |
| * @param shell shell |
| */ |
| public static void assertUnknownCluster(SliderShell shell) { |
| assertExitCode(shell, EXIT_UNKNOWN_INSTANCE) |
| } |
| |
| /** |
| * Assert a shell exited with a given error code |
| * if not the output is printed and an assertion is raised |
| * @param shell shell |
| * @param errorCode expected error code |
| */ |
| public static void assertExitCode(SliderShell shell, int errorCode) { |
| shell.assertExitCode(errorCode) |
| } |
| |
| /** |
| * Assert that the stdout/stderr streams of the shell contain the string |
| * to look for. |
| * If the assertion does not hold, the output is logged before |
| * the assertion is thrown |
| * @param shell |
| * @param lookThisUp |
| * @param n number of times (default = 1) |
| */ |
| public static void assertOutputContains( |
| SliderShell shell, |
| String lookThisUp, |
| int n = 1) { |
| if (!shell.outputContains(lookThisUp)) { |
| log.error("Missing $lookThisUp from:") |
| shell.dumpOutput() |
| assert shell.outputContains(lookThisUp) |
| } |
| } |
| |
| /** |
| * Create a connection to the cluster by execing the status command |
| * |
| * @param clustername |
| * @return |
| */ |
| SliderClient bondToCluster(Configuration conf, String clustername) { |
| |
| String address = getRequiredConfOption(conf, YarnConfiguration.RM_ADDRESS) |
| |
| ServiceLauncher<SliderClient> launcher = launchClientAgainstRM( |
| address, |
| ["exists", clustername], |
| conf) |
| |
| int exitCode = launcher.serviceExitCode |
| if (exitCode) { |
| throw new ExitUtil.ExitException(exitCode, "exit code = $exitCode") |
| } |
| SliderClient sliderClient = launcher.service |
| sliderClient.deployedClusterName = clustername |
| return sliderClient; |
| } |
| |
| /** |
| * Create or build a slider cluster (the action is set by the first verb) |
| * @param action operation to invoke: ACTION_CREATE or ACTION_BUILD |
| * @param clustername cluster name |
| * @param roles map of rolename to count |
| * @param extraArgs list of extra args to add to the creation command |
| * @param deleteExistingData should the data of any existing cluster |
| * of this name be deleted |
| * @param blockUntilRunning block until the AM is running |
| * @param clusterOps map of key=value cluster options to set with the --option arg |
| * @return shell which will have executed the command. |
| */ |
| public SliderShell createOrBuildSliderCluster( |
| String action, |
| String clustername, |
| Map<String, Integer> roles, |
| List<String> extraArgs, |
| boolean blockUntilRunning, |
| Map<String, String> clusterOps) { |
| assert action != null |
| assert clustername != null |
| |
| List<String> argsList = [action, clustername] |
| |
| argsList << ARG_ZKHOSTS << |
| SLIDER_CONFIG.getTrimmed(RegistryConstants.KEY_REGISTRY_ZK_QUORUM) |
| |
| |
| if (blockUntilRunning) { |
| argsList << ARG_WAIT << Integer.toString(THAW_WAIT_TIME) |
| } |
| |
| List<String> roleList = []; |
| roles.each { String role, Integer val -> |
| log.info("Role $role := $val") |
| roleList << ARG_COMPONENT << role << Integer.toString(val) |
| } |
| |
| argsList += roleList; |
| |
| //now inject any cluster options |
| clusterOps.each { String opt, String val -> |
| argsList << ARG_OPTION << opt.toString() << val.toString(); |
| } |
| |
| if (extraArgs != null) { |
| argsList += extraArgs; |
| } |
| slider(0, argsList) |
| } |
| |
| /** |
| * Create a slider cluster |
| * @param clustername cluster name |
| * @param roles map of rolename to count |
| * @param extraArgs list of extra args to add to the creation command |
| * @param blockUntilRunning block until the AM is running |
| * @param clusterOps map of key=value cluster options to set with the --option arg |
| * @return launcher which will have executed the command. |
| */ |
| public SliderShell createSliderApplication( |
| String clustername, |
| Map<String, Integer> roles, |
| List<String> extraArgs, |
| boolean blockUntilRunning, |
| Map<String, String> clusterOps) { |
| return createOrBuildSliderCluster( |
| ACTION_CREATE, |
| clustername, |
| roles, |
| extraArgs, |
| blockUntilRunning, |
| clusterOps) |
| } |
| |
| /** |
| * Create a templated slider app |
| * @param name name |
| * @param appTemplate application template |
| * @param resourceTemplate resource template |
| * @return the shell |
| */ |
| public SliderShell createTemplatedSliderApplication( |
| String name, |
| String appTemplate, |
| String resourceTemplate, |
| List<String> extraArgs = [], |
| File launchReportFile = null) { |
| |
| if (!launchReportFile) { |
| launchReportFile = createAppReportFile() |
| } |
| // delete any previous copy of the file |
| launchReportFile.delete(); |
| |
| List<String> commands = [ |
| ACTION_CREATE, name, |
| ARG_TEMPLATE, appTemplate, |
| ARG_RESOURCES, resourceTemplate, |
| ARG_OUTPUT, launchReportFile.absolutePath, |
| ARG_WAIT, Integer.toString(THAW_WAIT_TIME) |
| ] |
| |
| maybeAddCommandOption(commands, |
| [ARG_COMP_OPT, SliderKeys.COMPONENT_AM, SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME], |
| SLIDER_CONFIG.getTrimmed(SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME)); |
| maybeAddCommandOption(commands, |
| [ARG_COMP_OPT, SliderKeys.COMPONENT_AM, SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR], |
| SLIDER_CONFIG.getTrimmed(SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR)); |
| maybeAddCommandOption(commands, |
| [ARG_COMP_OPT, SliderKeys.COMPONENT_AM, SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH], |
| SLIDER_CONFIG.getTrimmed(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH)); |
| maybeAddCommandOption(commands, |
| [ARG_COMP_OPT, SliderKeys.COMPONENT_AM, SliderXmlConfKeys.KEY_KEYTAB_PRINCIPAL], |
| SLIDER_CONFIG.getTrimmed(SliderXmlConfKeys.KEY_KEYTAB_PRINCIPAL)); |
| commands.addAll(extraArgs) |
| SliderShell shell = new SliderShell(commands) |
| if (0 != shell.execute()) { |
| // app has failed. |
| |
| // grab the app report of the last known instance of this app |
| // which may not be there if it was a config failure; may be out of date |
| // from a previous run |
| log.error("Launch failed with exit code ${shell.ret}") |
| shell.dumpOutput() |
| |
| // now grab that app report if it is there |
| def appReport = maybeLookupFromLaunchReport(launchReportFile) |
| String extraText = "" |
| if (appReport) { |
| log.error("Application report:\n$appReport") |
| extraText = appReport.diagnostics |
| } |
| |
| fail("Application Launch Failure, exit code ${shell.ret}\n${extraText}") |
| } |
| return shell |
| } |
| |
| public static File createAppReportFile() { |
| File reportFile = File.createTempFile( |
| "launch", |
| ".json", |
| new File("target")) |
| reportFile.delete() |
| return reportFile |
| } |
| |
| /** |
| * If the option is not null/empty, add the command and the option |
| * @param args arg list being built up |
| * @param command command to add option |
| * @param option option to probe and use |
| * @return the (possibly extended) list |
| */ |
| public static List<String> maybeAddCommandOption( |
| List<String> args, List<String> commands, String option) { |
| if ( SliderUtils.isSet(option)) { |
| args.addAll(commands) |
| args << option |
| } |
| return args |
| } |
| |
| public static SerializedApplicationReport maybeLoadAppReport(File reportFile) { |
| if (reportFile.exists() && reportFile.length()> 0) { |
| ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser() |
| def report = serDeser.fromFile(reportFile) |
| return report |
| } |
| return null; |
| } |
| |
| public static SerializedApplicationReport loadAppReport(File reportFile) { |
| if (reportFile.exists() && reportFile.length()> 0) { |
| ApplicationReportSerDeser serDeser = new ApplicationReportSerDeser() |
| def report = serDeser.fromFile(reportFile) |
| return report |
| } else { |
| throw new FileNotFoundException(reportFile.absolutePath) |
| } |
| } |
| |
| public static SerializedApplicationReport maybeLookupFromLaunchReport(File launchReport) { |
| def report = maybeLoadAppReport(launchReport) |
| if (report) { |
| return lookupApplication(report.applicationId) |
| } else { |
| return null |
| } |
| } |
| |
| /** |
| * Lookup an application, return null if loading failed |
| * @param id application ID |
| * @return an application report or null |
| */ |
| public static SerializedApplicationReport lookupApplication(String id) { |
| File reportFile = createAppReportFile(); |
| try { |
| def shell = lookup(id, reportFile) |
| if (shell.ret == 0) { |
| return maybeLoadAppReport(reportFile) |
| } else { |
| log.warn("Lookup operation failed with ${shell.ret}") |
| shell.dumpOutput() |
| return null |
| } |
| } finally { |
| reportFile.delete() |
| } |
| } |
| |
| |
| public Path buildClusterPath(String clustername) { |
| return new Path( |
| clusterFS.homeDirectory, |
| "${SliderKeys.SLIDER_BASE_DIRECTORY}/cluster/${clustername}") |
| } |
| |
| |
| public ClusterDescription killAmAndWaitForRestart( |
| SliderClient sliderClient, String cluster) { |
| |
| assert cluster |
| slider(0, [ |
| ACTION_AM_SUICIDE, cluster, |
| ARG_EXITCODE, "1", |
| ARG_WAIT, "1000", |
| ARG_MESSAGE, "suicide" |
| ]) |
| |
| sleep(5000) |
| ensureApplicationIsUp(cluster) |
| return sliderClient.clusterDescription |
| } |
| public ClusterDescription killAmAndWaitForRestart( |
| SliderClient sliderClient, String cluster, String appId) { |
| |
| assert cluster |
| slider(0, [ |
| ACTION_AM_SUICIDE, cluster, |
| ARG_EXITCODE, "1", |
| ARG_WAIT, "1000", |
| ARG_MESSAGE, "suicide" |
| ]) |
| |
| sleep(5000) |
| ensureYarnApplicationIsUp(appId) |
| return sliderClient.clusterDescription |
| } |
| |
| protected void ensureRegistryCallSucceeds(String application) { |
| repeatUntilSuccess(this.&isRegistryAccessible, |
| REGISTRY_STARTUP_TIMEOUT, |
| PROBE_SLEEP_TIME, |
| [application: application], |
| true, |
| "Application registry is not accessible after $REGISTRY_STARTUP_TIMEOUT") { |
| describe "Not able to access registry after after $REGISTRY_STARTUP_TIMEOUT" |
| exists(application, true).dumpOutput() |
| SliderShell shell = registry(0, [ |
| ARG_NAME, |
| application, |
| ARG_LISTEXP |
| ]) |
| } |
| } |
| |
| |
| protected void ensureApplicationIsUp(String application) { |
| repeatUntilSuccess(this.&isApplicationRunning, |
| SLIDER_CONFIG.getInt(KEY_TEST_INSTANCE_LAUNCH_TIME, |
| DEFAULT_INSTANCE_LAUNCH_TIME_SECONDS) * 1000, |
| PROBE_SLEEP_TIME, |
| [application: application], |
| true, |
| E_LAUNCH_FAIL) { |
| describe "final state of app that tests say is not up" |
| exists(application, true).dumpOutput() |
| } |
| } |
| |
| protected Outcome isRegistryAccessible(Map<String, String> args) { |
| String applicationName = args['application']; |
| SliderShell shell = slider( |
| [ |
| ACTION_REGISTRY, |
| ARG_NAME, |
| applicationName, |
| ARG_LISTEXP |
| ]) |
| if (EXIT_SUCCESS != shell.execute()) { |
| logShell(shell) |
| } |
| return Outcome.fromBool(EXIT_SUCCESS == shell.execute()) |
| } |
| |
| protected Outcome isApplicationRunning(Map<String, String> args) { |
| String applicationName = args['application']; |
| return Outcome.fromBool(isApplicationUp(applicationName)) |
| } |
| |
| protected boolean isApplicationUp(String applicationName) { |
| return isApplicationInState( |
| applicationName, |
| YarnApplicationState.RUNNING |
| ); |
| } |
| |
| /** |
| * is an application in a desired yarn state |
| * @param yarnState |
| * @param applicationName |
| * @return |
| */ |
| public static boolean isApplicationInState( |
| String applicationName, |
| YarnApplicationState yarnState) { |
| SliderShell shell = slider( |
| [ACTION_EXISTS, applicationName, ARG_STATE, yarnState.toString()]) |
| return shell.ret == 0 |
| } |
| |
| |
| protected Outcome isYarnApplicationRunning(Map<String, String> args) { |
| String applicationId = args['applicationId']; |
| return isYarnApplicationRunning(applicationId) |
| } |
| |
| /** |
| * is a yarn application in a desired yarn state |
| * @param yarnState |
| * @param applicationName |
| * @return an outcome indicating whether the app is at the state, on its way |
| * or has gone past |
| */ |
| public static Outcome isYarnApplicationRunning( |
| String applicationId) { |
| YarnApplicationState appState = lookupYarnAppState(applicationId) |
| YarnApplicationState yarnState = YarnApplicationState.RUNNING |
| if (yarnState == appState) { |
| return Outcome.Success; |
| } |
| |
| if (appState.ordinal() > yarnState.ordinal()) { |
| // app has passed beyond hope |
| return Outcome.Fail |
| } |
| return Outcome.Retry |
| } |
| |
| public static YarnApplicationState lookupYarnAppState(String applicationId) { |
| def sar = lookupApplication(applicationId) |
| assert sar != null; |
| YarnApplicationState appState = YarnApplicationState.valueOf(sar.state) |
| return appState |
| } |
| |
| public static void assertInYarnState(String applicationId, |
| YarnApplicationState expectedState) { |
| def applicationReport = lookupApplication(applicationId) |
| assert expectedState.toString() == applicationReport.state |
| } |
| |
| /** |
| * Wait for the YARN app to come up. This will fail fast |
| * @param launchReportFile launch time file containing app id |
| * @return the app ID |
| */ |
| protected String ensureYarnApplicationIsUp(File launchReportFile) { |
| def id = loadAppReport(launchReportFile).applicationId |
| ensureYarnApplicationIsUp(id) |
| return id; |
| } |
| /** |
| * Wait for the YARN app to come up. This will fail fast |
| * @param applicationId |
| */ |
| protected void ensureYarnApplicationIsUp(String applicationId) { |
| repeatUntilSuccess(this.&isYarnApplicationRunning, |
| SLIDER_CONFIG.getInt(KEY_TEST_INSTANCE_LAUNCH_TIME, |
| DEFAULT_INSTANCE_LAUNCH_TIME_SECONDS), |
| PROBE_SLEEP_TIME, |
| [applicationId: applicationId], |
| true, |
| E_LAUNCH_FAIL) { |
| describe "final state of app that tests say is not up" |
| def sar = lookupApplication(applicationId) |
| |
| def message = E_LAUNCH_FAIL + "\n$sar" |
| log.error(message) |
| fail(message) |
| } |
| } |
| |
| /** |
| * Outcome for probes |
| */ |
| static class Outcome { |
| |
| public final String name; |
| |
| private Outcome(String name) { |
| this.name = name |
| } |
| |
| static Outcome Success = new Outcome("Success") |
| static Outcome Retry = new Outcome("Retry") |
| static Outcome Fail = new Outcome("Fail") |
| |
| |
| /** |
| * build from a bool, where false is mapped to retry |
| * @param b boolean |
| * @return an outcome |
| */ |
| static Outcome fromBool(boolean b) { |
| return b? Success: Retry; |
| } |
| |
| } |
| |
| /** |
| * Repeat a probe until it succeeds, if it does not execute a failure |
| * closure then raise an exception with the supplied message |
| * @param probe probe |
| * @param timeout time in millis before giving up |
| * @param sleepDur sleep between failing attempts |
| * @param args map of arguments to the probe |
| * @param failIfUnsuccessful if the probe fails after all the attempts |
| * —should it raise an exception |
| * @param failureMessage message to include in exception raised |
| * @param failureHandler closure to invoke prior to the failure being raised |
| */ |
| protected void repeatUntilSuccess(Closure probe, |
| int timeout, int sleepDur, |
| Map args, |
| boolean failIfUnsuccessful, |
| String failureMessage, |
| Closure failureHandler) { |
| int attemptCount = 0 |
| boolean succeeded = false; |
| boolean completed = false; |
| Duration duration = new Duration(timeout) |
| duration.start(); |
| while (!completed) { |
| Outcome outcome = (Outcome) probe(args) |
| if (outcome.equals(Outcome.Success)) { |
| // success |
| log.debug("Success after $attemptCount attempt(s)") |
| succeeded = true; |
| completed = true; |
| } else if (outcome.equals(Outcome.Retry)) { |
| // failed but retry possible |
| attemptCount++; |
| completed = duration.limitExceeded |
| if (!completed) { |
| sleep(sleepDur) |
| } |
| } else if (outcome.equals(Outcome.Fail)) { |
| // fast fail |
| completed = true; |
| } |
| } |
| |
| if (failIfUnsuccessful & !succeeded) { |
| if (failureHandler) { |
| failureHandler() |
| } |
| fail(failureMessage) |
| } |
| } |
| |
| public String getInfoAmWebUrl(String applicationName) { |
| ClusterDescription cd = execStatus(applicationName); |
| String urlString = cd.getInfo("info.am.web.url"); |
| return urlString; |
| } |
| |
| public ClusterDescription execStatus(String application) { |
| ClusterDescription cd |
| File statusFile = File.createTempFile("status", ".json") |
| try { |
| slider(EXIT_SUCCESS, |
| [ |
| ACTION_STATUS, |
| application, |
| ARG_OUTPUT, statusFile.absolutePath |
| ]) |
| |
| assert statusFile.exists() |
| return ClusterDescription.fromFile(statusFile) |
| } finally { |
| statusFile.delete() |
| } |
| } |
| |
| public int queryRequestedCount(String application, String role) { |
| ClusterDescription cd = execStatus(application) |
| |
| if (cd.statistics.size() == 0) { |
| log.debug("No statistics entries") |
| } |
| |
| if (!cd.statistics[role]) { |
| log.debug("No stats for role $role") |
| return 0; |
| } |
| def statsForRole = cd.statistics[role] |
| |
| def requested = statsForRole[StatusKeys.STATISTICS_CONTAINERS_REQUESTED] |
| assert null != statsForRole[StatusKeys.STATISTICS_CONTAINERS_REQUESTED] |
| return requested |
| } |
| |
| Outcome hasRequestedContainerCountReached(Map<String, String> args) { |
| String application = args['application'] |
| String role = args['role'] |
| int expectedCount = args['limit'].toInteger(); |
| |
| int requestedCount = queryRequestedCount(application, role) |
| log.debug("requested $role count = $requestedCount; expected=$expectedCount") |
| return Outcome.fromBool(requestedCount >= expectedCount) |
| } |
| |
| void expectContainerRequestedCountReached(String application, String role, int limit, |
| int container_launch_timeout) { |
| |
| repeatUntilSuccess( |
| this.&hasRequestedContainerCountReached, |
| container_launch_timeout, |
| PROBE_SLEEP_TIME, |
| [limit : Integer.toString(limit), |
| role : role, |
| application: application], |
| true, |
| "countainer count not reached") { |
| int requestedCount = queryRequestedCount(application, role) |
| |
| def message = "expected count of $role = $limit not reached: $requestedCount" + |
| " after $container_launch_timeout mS" |
| describe message |
| ClusterDescription cd = execStatus(application); |
| log.info("Parsed status \n$cd") |
| fail(message) |
| } |
| } |
| |
| public ClusterDescription assertContainersLive(String clustername, |
| String component, |
| int count) { |
| ClusterDescription cd = execStatus(clustername) |
| assertContainersLive(cd, component, count) |
| return cd; |
| } |
| |
| /** |
| * Outcome checker for the live container count |
| * @param args argument map, must contain "application", "component" and "live" |
| * @return |
| */ |
| Outcome hasLiveContainerCountReached(Map<String, String> args) { |
| assert args['application'] |
| assert args['component'] |
| assert args['live'] |
| String application = args['application'] |
| String component = args['component'] |
| int expectedCount = args['live'].toInteger(); |
| ClusterDescription cd = execStatus(application) |
| def actual = extractLiveContainerCount(cd, component) |
| log.debug( |
| "live $component count = $actual; expected=$expectedCount") |
| return Outcome.fromBool(actual >= expectedCount) |
| } |
| |
| /** |
| * Wait for the live container count to be reached |
| * @param application application name |
| * @param component component name |
| * @param expected expected count |
| * @param container_launch_timeout launch timeout |
| */ |
| void expectLiveContainerCountReached( |
| String application, |
| String component, |
| int expected, |
| int container_launch_timeout) { |
| |
| repeatUntilSuccess( |
| this.&hasLiveContainerCountReached, |
| container_launch_timeout, |
| PROBE_SLEEP_TIME, |
| [live : Integer.toString(expected), |
| component : component, |
| application: application], |
| true, |
| "countainer count not reached") { |
| describe "container count not reached" |
| assertContainersLive(application, component, expected) |
| } |
| } |
| } |