| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.slider.test |
| |
| import groovy.json.JsonOutput |
| import groovy.transform.CompileStatic |
| import groovy.util.logging.Slf4j |
| import org.apache.commons.httpclient.HttpClient |
| import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager |
| import org.apache.commons.httpclient.methods.GetMethod |
| import org.apache.hadoop.conf.Configuration |
| import org.apache.hadoop.fs.FileStatus |
| import org.apache.hadoop.fs.FileSystem as HadoopFS |
| import org.apache.hadoop.fs.Path |
| import org.apache.hadoop.yarn.api.records.ApplicationReport |
| import org.apache.hadoop.yarn.conf.YarnConfiguration |
| import org.apache.slider.api.ClusterDescription |
| import org.apache.slider.api.ClusterNode |
| import org.apache.slider.api.RoleKeys |
| import org.apache.slider.client.SliderClient |
| import org.apache.slider.common.params.Arguments |
| import org.apache.slider.common.tools.Duration |
| import org.apache.slider.common.tools.SliderUtils |
| import org.apache.slider.core.conf.AggregateConf |
| import org.apache.slider.core.exceptions.BadClusterStateException |
| import org.apache.slider.core.exceptions.SliderException |
| import org.apache.slider.core.exceptions.WaitTimeoutException |
| import org.apache.slider.core.main.ServiceLaunchException |
| import org.apache.slider.core.main.ServiceLauncher |
| import org.apache.slider.core.persist.JsonSerDeser |
| import org.apache.slider.core.registry.docstore.PublishedConfigSet |
| import org.apache.slider.core.registry.info.ServiceInstanceData |
| import org.apache.slider.server.services.curator.CuratorServiceInstance |
| import org.junit.Assert |
| import org.junit.Assume |
| |
| import static Arguments.ARG_OPTION |
| |
| /** |
| * Static utils for tests in this package and in other test projects. |
| * |
| * It is designed to work with mini clusters as well as remote ones |
| * |
| * This class is not final and may be extended for test cases. |
| * |
| * Some of these methods are derived from the SwiftUtils and SwiftTestUtils |
| * classes -replicated here so that they are available in Hadoop-2.0 code |
| */ |
| @Slf4j |
| @CompileStatic |
| class SliderTestUtils extends Assert { |
| |
| public static void describe(String s) { |
| log.info(""); |
| log.info("==============================="); |
| log.info(s); |
| log.info("==============================="); |
| log.info(""); |
| } |
| |
| public static String prettyPrint(String json) { |
| JsonOutput.prettyPrint(json) |
| } |
| |
| public static void skip(String message) { |
| log.warn("Skipping test: " + message) |
| Assume.assumeTrue(message, false); |
| } |
| |
| public static void assume(boolean condition, String message) { |
| if (!condition) { |
| log.warn("Skipping test: " + message) |
| Assume.assumeTrue(message, false); |
| } |
| } |
| |
| |
| public static void assertListEquals(List left, List right) { |
| assert left.size() == right.size(); |
| for (int i = 0; i < left.size(); i++) { |
| assert left[0] == right[0] |
| } |
| } |
| |
| /** |
| * Assume that a string option is set and not equal to "" |
| * @param conf configuration file |
| * @param key key to look for |
| */ |
| public static void assumeStringOptionSet(Configuration conf, String key) { |
| if (!conf.getTrimmed(key)) { |
| skip("Configuration key $key not set") |
| } |
| } |
| |
| |
| /** |
| * assert that a string option is set and not equal to "" |
| * @param conf configuration file |
| * @param key key to look for |
| */ |
| public static void assertStringOptionSet(Configuration conf, String key) { |
| getRequiredConfOption(conf, key) |
| } |
| |
| |
| |
| /** |
| * Assume that a boolean option is set and true. |
| * Unset or false triggers a test skip |
| * @param conf configuration file |
| * @param key key to look for |
| */ |
| public static void assumeBoolOptionTrue(Configuration conf, String key) { |
| assumeBoolOption(conf, key, false) |
| } |
| |
| /** |
| * Assume that a boolean option is true. |
| * False triggers a test skip |
| * @param conf configuration file |
| * @param key key to look for |
| * @param defval default value if the property is not defined |
| */ |
| public static void assumeBoolOption( |
| Configuration conf, String key, boolean defval) { |
| assume(conf.getBoolean(key, defval), |
| "Configuration key $key is false") |
| } |
| |
| /** |
| * Get a required config option (trimmed, incidentally). |
| * Test will fail if not set |
| * @param conf configuration |
| * @param key key |
| * @return the string |
| */ |
| public static String getRequiredConfOption(Configuration conf, String key) { |
| String val = conf.getTrimmed(key) |
| if (!val) { |
| fail("Missing configuration option $key") |
| } |
| return val; |
| } |
| |
| /** |
| * Fails a test because required behavior has not been implemented. |
| */ |
| public static void failNotImplemented() { |
| fail("Not implemented") |
| } |
| |
| /** |
| * Wait for the cluster live; fail if it isn't within the (standard) timeout |
| * @param sliderClient client |
| * @return the app report of the live cluster |
| */ |
| public static ApplicationReport waitForClusterLive(SliderClient sliderClient,int goLiveTime) { |
| ApplicationReport report = sliderClient.monitorAppToRunning( |
| new Duration(goLiveTime)); |
| assertNotNull( |
| "Cluster did not go live in the time $goLiveTime", |
| report); |
| return report; |
| } |
| |
| protected static String[] toArray(List<Object> args) { |
| String[] converted = new String[args.size()]; |
| for (int i = 0; i < args.size(); i++) { |
| def elt = args.get(i) |
| assert args.get(i) != null |
| converted[i] = elt.toString(); |
| } |
| return converted; |
| } |
| |
| public static void waitWhileClusterLive(SliderClient client, int timeout) { |
| Duration duration = new Duration(timeout); |
| duration.start() |
| while (client.actionExists(client.deployedClusterName, true) && |
| !duration.limitExceeded) { |
| sleep(1000); |
| } |
| if (duration.limitExceeded) { |
| fail("Cluster ${client.deployedClusterName} still live after $timeout ms") |
| } |
| } |
| |
| public static void waitUntilClusterLive(SliderClient client, int timeout) { |
| Duration duration = new Duration(timeout); |
| duration.start() |
| while (!client.actionExists(client.deployedClusterName, true) && |
| !duration.limitExceeded) { |
| sleep(1000); |
| } |
| if (duration.limitExceeded) { |
| fail("Cluster ${client.deployedClusterName} not live after $timeout ms") |
| } |
| } |
| |
| /** |
| * Spin waiting for the Slider role count to match expected |
| * @param client client |
| * @param role role to look for |
| * @param desiredCount RS count |
| * @param timeout timeout |
| */ |
| public static ClusterDescription waitForRoleCount( |
| SliderClient client, |
| String role, |
| int desiredCount, |
| int timeout) { |
| return waitForRoleCount(client, [(role): desiredCount], timeout) |
| } |
| |
| /** |
| * Spin waiting for the Slider role count to match expected |
| * @param client client |
| * @param roles map of roles to look for |
| * @param desiredCount RS count |
| * @param timeout timeout |
| */ |
| public static ClusterDescription waitForRoleCount( |
| SliderClient client, |
| Map<String, Integer> roles, |
| int timeout, |
| String operation = "startup") { |
| String clustername = client.deployedClusterName; |
| ClusterDescription status = null |
| Duration duration = new Duration(timeout); |
| duration.start() |
| boolean roleCountFound = false; |
| while (!roleCountFound) { |
| StringBuilder details = new StringBuilder() |
| |
| boolean timedOut = duration.limitExceeded |
| try { |
| status = client.getClusterDescription(clustername) |
| roleCountFound = true; |
| for (Map.Entry<String, Integer> entry : roles.entrySet()) { |
| String role = entry.key |
| int desiredCount = entry.value |
| List<String> instances = status.instances[role] |
| int instanceCount = instances != null ? instances.size() : 0; |
| if (instanceCount != desiredCount) { |
| roleCountFound = false; |
| } |
| details.append("[$role]: desired: $desiredCount;" + |
| " actual: $instanceCount ") |
| |
| // call out requested count, as this is a cause of problems on |
| // overloaded functional test clusters |
| def requested = status.roles[role][RoleKeys.ROLE_REQUESTED_INSTANCES] |
| if (requested != "0") { |
| details.append("requested: $requested ") |
| } |
| } |
| if (roleCountFound) { |
| //successful |
| log.info("$operation: role count as desired: $details") |
| break; |
| } |
| } catch (BadClusterStateException e) { |
| // cluster not live yet; ignore or rethrow |
| if (timedOut) { |
| throw e; |
| } |
| details.append(e.toString()); |
| } |
| if (timedOut) { |
| duration.finish(); |
| describe("$operation: role count not met after $duration: $details") |
| log.info(prettyPrint(status.toJsonString())) |
| fail("$operation: role counts not met after $duration: " + |
| details.toString() + |
| " in \n$status ") |
| } |
| log.debug("Waiting: " + details) |
| Thread.sleep(1000) |
| } |
| return status |
| } |
| |
| /** |
| * Wait for the hbase master to be live (or past it in the lifecycle) |
| * @param clustername cluster |
| * @param spintime time to wait |
| * @return true if the cluster came out of the sleep time live |
| * @throws IOException |
| * @throws SliderException |
| */ |
| public static boolean spinForClusterStartup(SliderClient client, long spintime, |
| String role) |
| throws WaitTimeoutException, IOException, SliderException { |
| int state = client.waitForRoleInstanceLive(role, spintime); |
| return state == ClusterDescription.STATE_LIVE; |
| } |
| |
| public static ClusterDescription dumpClusterStatus(SliderClient client, String text) { |
| ClusterDescription status = client.clusterDescription; |
| dumpClusterDescription(text, status) |
| return status; |
| } |
| |
| public static List<ClusterNode> listNodesInRole(SliderClient client, String role) { |
| return client.listClusterNodesInRole(role) |
| } |
| |
| public static void dumpClusterDescription(String text, ClusterDescription status) { |
| describe(text) |
| log.info(prettyPrint(status.toJsonString())) |
| } |
| |
| |
| public static void dumpClusterDescription(String text, AggregateConf status) { |
| describe(text) |
| log.info(status.toString()) |
| } |
| |
| |
| /** |
| * Fetch the current site config from the Slider AM, from the |
| * <code>clientProperties</code> field of the ClusterDescription |
| * @param client client |
| * @param clustername name of the cluster |
| * @return the site config |
| */ |
| public static Configuration fetchClientSiteConfig(SliderClient client) { |
| ClusterDescription status = client.clusterDescription; |
| Configuration siteConf = new Configuration(false) |
| status.clientProperties.each { String key, String val -> |
| siteConf.set(key, val, "slider cluster"); |
| } |
| return siteConf; |
| } |
| |
| /** |
| * Fetch a web page |
| * @param url URL |
| * @return the response body |
| */ |
| |
| public static String GET(URL url) { |
| return fetchWebPageWithoutError(url.toString()) |
| } |
| |
| public static String GET(URL url, String path) { |
| return GET(url.toString(), path) |
| } |
| |
| public static String GET(String base, String path) { |
| String s = appendToURL(base, path) |
| return GET(s) |
| |
| } |
| |
| def static String GET(String s) { |
| return fetchWebPageWithoutError(s) |
| } |
| |
| public static String appendToURL(String base, String path) { |
| return SliderUtils.appendToURL(base, path) |
| } |
| |
| public static String appendToURL(String base, String... paths) { |
| return SliderUtils.appendToURL(base, paths) |
| } |
| |
| /** |
| * Fetch a web page |
| * @param url URL |
| * @return the response body |
| */ |
| |
| public static String fetchWebPage(String url) { |
| log.info("GET $url") |
| def httpclient = new HttpClient(new MultiThreadedHttpConnectionManager()); |
| httpclient.httpConnectionManager.params.connectionTimeout = 10000; |
| GetMethod get = new GetMethod(url); |
| |
| get.followRedirects = true; |
| int resultCode |
| try { |
| resultCode = httpclient.executeMethod(get); |
| if (resultCode!=200) { |
| log.warn("Result code of $resultCode") |
| } |
| } catch (IOException e) { |
| log.error("Failed on $url: $e",e) |
| throw e; |
| } |
| String body = get.responseBodyAsString; |
| return body; |
| } |
| |
| /** |
| * Fetches a web page asserting that the response code is between 200 and 400. |
| * Will error on 400 and 500 series response codes and let 200 and 300 through. |
| * @param url |
| * @return |
| */ |
| public static String fetchWebPageWithoutError(String url) { |
| assert null != url |
| |
| log.info("Fetching HTTP content at " + url); |
| |
| def client = new HttpClient(new MultiThreadedHttpConnectionManager()); |
| client.httpConnectionManager.params.connectionTimeout = 10000; |
| GetMethod get = new GetMethod(url); |
| |
| get.followRedirects = true; |
| int resultCode = client.executeMethod(get); |
| |
| def body = get.responseBodyAsString |
| if (!(resultCode >= 200 && resultCode < 400)) { |
| def message = "Request to $url failed with exit code $resultCode, body length ${body?.length()}:\n$body" |
| log.error(message) |
| fail(message) |
| } |
| return body; |
| } |
| |
| /** |
| * Assert that a service operation succeeded |
| * @param service service |
| */ |
| public static void assertSucceeded(ServiceLauncher service) { |
| assert 0 == service.serviceExitCode; |
| } |
| |
| /** |
| * Make an assertion about the exit code of an exception |
| * @param ex exception |
| * @param exitCode exit code |
| * @param text error text to look for in the exception |
| */ |
| static void assertExceptionDetails( |
| ServiceLaunchException ex, |
| int exitCode, |
| String text = "") { |
| if (exitCode != ex.exitCode) { |
| log.warn( |
| "Wrong exit code, expected $exitCode but got $ex.exitCode in $ex", |
| ex) |
| assert exitCode == ex.exitCode |
| } |
| if (text) { |
| if (!(ex.toString().contains(text))) { |
| log.warn("String match failed in $ex", ex) |
| assert ex.toString().contains(text); |
| } |
| } |
| } |
| |
| /** |
| * Launch the slider client with the specific args; no validation |
| * of return code takes place |
| * @param conf configuration |
| * @param args arg list |
| * @return the return code |
| */ |
| protected static ServiceLauncher<SliderClient> execSliderCommand( |
| Configuration conf, |
| List args) { |
| ServiceLauncher<SliderClient> serviceLauncher = |
| new ServiceLauncher<SliderClient>(SliderClient.name); |
| serviceLauncher.launchService(conf, |
| toArray(args), |
| false); |
| return serviceLauncher |
| } |
| |
| public static ServiceLauncher launch(Class serviceClass, |
| Configuration conf, |
| List<Object> args) throws |
| Throwable { |
| ServiceLauncher serviceLauncher = |
| new ServiceLauncher(serviceClass.name); |
| serviceLauncher.launchService(conf, |
| toArray(args), |
| false); |
| return serviceLauncher; |
| } |
| |
| public static Throwable launchExpectingException(Class serviceClass, |
| Configuration conf, |
| String expectedText, |
| List args) |
| throws Throwable { |
| try { |
| ServiceLauncher launch = launch(serviceClass, conf, args); |
| throw new AssertionError("Expected an exception with text containing " + expectedText |
| + " -but the service completed with exit code " |
| + launch.serviceExitCode); |
| } catch (Throwable thrown) { |
| if (expectedText && !thrown.toString().contains(expectedText)) { |
| //not the right exception -rethrow |
| throw thrown; |
| } |
| return thrown; |
| } |
| } |
| |
| |
| public static ServiceLauncher<SliderClient> launchClientAgainstRM( |
| String address, |
| List args, |
| Configuration conf) { |
| assert address != null |
| log.info("Connecting to rm at ${address}") |
| if (!args.contains(Arguments.ARG_MANAGER)) { |
| args += [Arguments.ARG_MANAGER, address] |
| } |
| ServiceLauncher<SliderClient> launcher = execSliderCommand(conf, args) |
| return launcher |
| } |
| |
| /** |
| * Add a configuration parameter as a cluster configuration option |
| * @param extraArgs extra arguments |
| * @param conf config |
| * @param option option |
| */ |
| public static void addClusterConfigOption( |
| List<String> extraArgs, |
| YarnConfiguration conf, |
| String option) { |
| |
| conf.getTrimmed(option); |
| extraArgs << ARG_OPTION << option << getRequiredConfOption(conf, option) |
| |
| } |
| |
| /** |
| * Assert that a path refers to a directory |
| * @param fs filesystem |
| * @param path path of the directory |
| * @throws IOException on File IO problems |
| */ |
| public static void assertIsDirectory(HadoopFS fs, |
| Path path) throws IOException { |
| FileStatus fileStatus = fs.getFileStatus(path); |
| assertIsDirectory(fileStatus); |
| } |
| |
| /** |
| * Assert that a path refers to a directory |
| * @param fileStatus stats to check |
| */ |
| public static void assertIsDirectory(FileStatus fileStatus) { |
| assertTrue("Should be a dir -but isn't: " + fileStatus, |
| fileStatus.isDirectory()); |
| } |
| |
| /** |
| * Assert that a path exists -but make no assertions as to the |
| * type of that entry |
| * |
| * @param fileSystem filesystem to examine |
| * @param message message to include in the assertion failure message |
| * @param path path in the filesystem |
| * @throws IOException IO problems |
| */ |
| public static void assertPathExists( |
| HadoopFS fileSystem, |
| String message, |
| Path path) throws IOException { |
| if (!fileSystem.exists(path)) { |
| //failure, report it |
| fail(message + ": not found \"" + path + "\" in " + path.getParent() + "-" + |
| ls(fileSystem, path.getParent())); |
| } |
| } |
| |
| /** |
| * Assert that a path does not exist |
| * |
| * @param fileSystem filesystem to examine |
| * @param message message to include in the assertion failure message |
| * @param path path in the filesystem |
| * @throws IOException IO problems |
| */ |
| public static void assertPathDoesNotExist( |
| HadoopFS fileSystem, |
| String message, |
| Path path) throws IOException { |
| try { |
| FileStatus status = fileSystem.getFileStatus(path); |
| // a status back implies there is a file here |
| fail(message + ": unexpectedly found " + path + " as " + status); |
| } catch (FileNotFoundException expected) { |
| //this is expected |
| |
| } |
| } |
| |
| /** |
| * Assert that a FileSystem.listStatus on a dir finds the subdir/child entry |
| * @param fs filesystem |
| * @param dir directory to scan |
| * @param subdir full path to look for |
| * @throws IOException IO probles |
| */ |
| public static void assertListStatusFinds(HadoopFS fs, |
| Path dir, |
| Path subdir) throws IOException { |
| FileStatus[] stats = fs.listStatus(dir); |
| boolean found = false; |
| StringBuilder builder = new StringBuilder(); |
| for (FileStatus stat : stats) { |
| builder.append(stat.toString()).append('\n'); |
| if (stat.getPath().equals(subdir)) { |
| found = true; |
| } |
| } |
| assertTrue("Path " + subdir |
| + " not found in directory " + dir + ":" + builder, |
| found); |
| } |
| |
| /** |
| * List a a path to string |
| * @param fileSystem filesystem |
| * @param path directory |
| * @return a listing of the filestatuses of elements in the directory, one |
| * to a line, precedeed by the full path of the directory |
| * @throws IOException connectivity problems |
| */ |
| public static String ls(HadoopFS fileSystem, Path path) |
| throws |
| IOException { |
| if (path == null) { |
| //surfaces when someone calls getParent() on something at the top of the path |
| return "/"; |
| } |
| FileStatus[] stats; |
| String pathtext = "ls " + path; |
| try { |
| stats = fileSystem.listStatus(path); |
| } catch (FileNotFoundException e) { |
| return pathtext + " -file not found"; |
| } catch (IOException e) { |
| return pathtext + " -failed: " + e; |
| } |
| return pathtext + fileStatsToString(stats, "\n"); |
| } |
| |
| /** |
| * Take an array of filestats and convert to a string (prefixed w/ a [01] counter |
| * @param stats array of stats |
| * @param separator separator after every entry |
| * @return a stringified set |
| */ |
| public static String fileStatsToString(FileStatus[] stats, String separator) { |
| StringBuilder buf = new StringBuilder(stats.length * 128); |
| for (int i = 0; i < stats.length; i++) { |
| buf.append(String.format("[%02d] %s", i, stats[i])).append(separator); |
| } |
| return buf.toString(); |
| } |
| |
| public static void waitWhileClusterLive(SliderClient sliderClient) { |
| waitWhileClusterLive(sliderClient, 30000) |
| } |
| |
| public static void dumpRegistryInstances( |
| List<CuratorServiceInstance<ServiceInstanceData>> instances) { |
| describe "service registry slider instances" |
| JsonSerDeser<ServiceInstanceData> serDeser = new JsonSerDeser<>( |
| ServiceInstanceData) |
| |
| instances.each { CuratorServiceInstance<ServiceInstanceData> svc -> |
| ServiceInstanceData payload = svc.payload |
| def json = serDeser.toJson(payload) |
| log.info("service $svc payload=\n$json") |
| } |
| describe "end list service registry slider instances" |
| } |
| |
| public static void dumpRegistryInstanceIDs(List<String> instanceIds) { |
| describe "service registry instance IDs" |
| dumpCollection(instanceIds) |
| } |
| |
| public static void dumpRegistryServiceTypes(Collection<String> entries) { |
| describe "service registry types" |
| dumpCollection(entries) |
| } |
| |
| def static void dumpCollection(Collection<String> entries) { |
| log.info("number of entries: ${entries.size()}") |
| entries.each { String it -> log.info(it) } |
| } |
| |
| /** |
| * Get a time option in seconds if set, otherwise the default value (also in seconds). |
| * This operation picks up the time value as a system property if set -that |
| * value overrides anything in the test file |
| * @param conf |
| * @param key |
| * @param defVal |
| * @return |
| */ |
| public static int getTimeOptionMillis(Configuration conf, String key, int defValMillis) { |
| int val = conf.getInt(key, 0) |
| val = Integer.getInteger(key, val) |
| int time = 1000 * val |
| if (time == 0) { |
| time = defValMillis |
| } |
| return time; |
| } |
| |
| def dumpConfigurationSet(PublishedConfigSet confSet) { |
| confSet.keys().each { String key -> |
| def config = confSet.get(key) |
| log.info "$key -- ${config.description}" |
| } |
| } |
| } |