blob: 2045f1134818c25898ca7a3316df899c45d57e47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.test
import groovy.json.JsonOutput
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.commons.httpclient.HttpClient
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager
import org.apache.commons.httpclient.methods.GetMethod
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.FileSystem as HadoopFS
import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api.records.ApplicationReport
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.slider.api.ClusterDescription
import org.apache.slider.api.ClusterNode
import org.apache.slider.api.RoleKeys
import org.apache.slider.client.SliderClient
import org.apache.slider.common.params.Arguments
import org.apache.slider.common.tools.Duration
import org.apache.slider.common.tools.SliderUtils
import org.apache.slider.core.conf.AggregateConf
import org.apache.slider.core.exceptions.BadClusterStateException
import org.apache.slider.core.exceptions.SliderException
import org.apache.slider.core.exceptions.WaitTimeoutException
import org.apache.slider.core.main.ServiceLaunchException
import org.apache.slider.core.main.ServiceLauncher
import org.apache.slider.core.persist.JsonSerDeser
import org.apache.slider.core.registry.docstore.PublishedConfigSet
import org.apache.slider.core.registry.info.ServiceInstanceData
import org.apache.slider.server.services.curator.CuratorServiceInstance
import org.junit.Assert
import org.junit.Assume
import static Arguments.ARG_OPTION
/**
* Static utils for tests in this package and in other test projects.
*
* It is designed to work with mini clusters as well as remote ones
*
* This class is not final and may be extended for test cases.
*
* Some of these methods are derived from the SwiftUtils and SwiftTestUtils
* classes -replicated here so that they are available in Hadoop-2.0 code
*/
@Slf4j
@CompileStatic
class SliderTestUtils extends Assert {
public static void describe(String s) {
log.info("");
log.info("===============================");
log.info(s);
log.info("===============================");
log.info("");
}
public static String prettyPrint(String json) {
JsonOutput.prettyPrint(json)
}
public static void skip(String message) {
log.warn("Skipping test: " + message)
Assume.assumeTrue(message, false);
}
public static void assume(boolean condition, String message) {
if (!condition) {
log.warn("Skipping test: " + message)
Assume.assumeTrue(message, false);
}
}
public static void assertListEquals(List left, List right) {
assert left.size() == right.size();
for (int i = 0; i < left.size(); i++) {
assert left[0] == right[0]
}
}
/**
* Assume that a string option is set and not equal to ""
* @param conf configuration file
* @param key key to look for
*/
public static void assumeStringOptionSet(Configuration conf, String key) {
if (!conf.getTrimmed(key)) {
skip("Configuration key $key not set")
}
}
/**
* assert that a string option is set and not equal to ""
* @param conf configuration file
* @param key key to look for
*/
public static void assertStringOptionSet(Configuration conf, String key) {
getRequiredConfOption(conf, key)
}
/**
* Assume that a boolean option is set and true.
* Unset or false triggers a test skip
* @param conf configuration file
* @param key key to look for
*/
public static void assumeBoolOptionTrue(Configuration conf, String key) {
assumeBoolOption(conf, key, false)
}
/**
* Assume that a boolean option is true.
* False triggers a test skip
* @param conf configuration file
* @param key key to look for
* @param defval default value if the property is not defined
*/
public static void assumeBoolOption(
Configuration conf, String key, boolean defval) {
assume(conf.getBoolean(key, defval),
"Configuration key $key is false")
}
/**
* Get a required config option (trimmed, incidentally).
* Test will fail if not set
* @param conf configuration
* @param key key
* @return the string
*/
public static String getRequiredConfOption(Configuration conf, String key) {
String val = conf.getTrimmed(key)
if (!val) {
fail("Missing configuration option $key")
}
return val;
}
/**
* Fails a test because required behavior has not been implemented.
*/
public static void failNotImplemented() {
fail("Not implemented")
}
/**
* Wait for the cluster live; fail if it isn't within the (standard) timeout
* @param sliderClient client
* @return the app report of the live cluster
*/
public static ApplicationReport waitForClusterLive(SliderClient sliderClient,int goLiveTime) {
ApplicationReport report = sliderClient.monitorAppToRunning(
new Duration(goLiveTime));
assertNotNull(
"Cluster did not go live in the time $goLiveTime",
report);
return report;
}
protected static String[] toArray(List<Object> args) {
String[] converted = new String[args.size()];
for (int i = 0; i < args.size(); i++) {
def elt = args.get(i)
assert args.get(i) != null
converted[i] = elt.toString();
}
return converted;
}
public static void waitWhileClusterLive(SliderClient client, int timeout) {
Duration duration = new Duration(timeout);
duration.start()
while (client.actionExists(client.deployedClusterName, true) &&
!duration.limitExceeded) {
sleep(1000);
}
if (duration.limitExceeded) {
fail("Cluster ${client.deployedClusterName} still live after $timeout ms")
}
}
public static void waitUntilClusterLive(SliderClient client, int timeout) {
Duration duration = new Duration(timeout);
duration.start()
while (!client.actionExists(client.deployedClusterName, true) &&
!duration.limitExceeded) {
sleep(1000);
}
if (duration.limitExceeded) {
fail("Cluster ${client.deployedClusterName} not live after $timeout ms")
}
}
/**
* Spin waiting for the Slider role count to match expected
* @param client client
* @param role role to look for
* @param desiredCount RS count
* @param timeout timeout
*/
public static ClusterDescription waitForRoleCount(
SliderClient client,
String role,
int desiredCount,
int timeout) {
return waitForRoleCount(client, [(role): desiredCount], timeout)
}
/**
* Spin waiting for the Slider role count to match expected
* @param client client
* @param roles map of roles to look for
* @param desiredCount RS count
* @param timeout timeout
*/
public static ClusterDescription waitForRoleCount(
SliderClient client,
Map<String, Integer> roles,
int timeout,
String operation = "startup") {
String clustername = client.deployedClusterName;
ClusterDescription status = null
Duration duration = new Duration(timeout);
duration.start()
boolean roleCountFound = false;
while (!roleCountFound) {
StringBuilder details = new StringBuilder()
boolean timedOut = duration.limitExceeded
try {
status = client.getClusterDescription(clustername)
roleCountFound = true;
for (Map.Entry<String, Integer> entry : roles.entrySet()) {
String role = entry.key
int desiredCount = entry.value
List<String> instances = status.instances[role]
int instanceCount = instances != null ? instances.size() : 0;
if (instanceCount != desiredCount) {
roleCountFound = false;
}
details.append("[$role]: desired: $desiredCount;" +
" actual: $instanceCount ")
// call out requested count, as this is a cause of problems on
// overloaded functional test clusters
def requested = status.roles[role][RoleKeys.ROLE_REQUESTED_INSTANCES]
if (requested != "0") {
details.append("requested: $requested ")
}
}
if (roleCountFound) {
//successful
log.info("$operation: role count as desired: $details")
break;
}
} catch (BadClusterStateException e) {
// cluster not live yet; ignore or rethrow
if (timedOut) {
throw e;
}
details.append(e.toString());
}
if (timedOut) {
duration.finish();
describe("$operation: role count not met after $duration: $details")
log.info(prettyPrint(status.toJsonString()))
fail("$operation: role counts not met after $duration: " +
details.toString() +
" in \n$status ")
}
log.debug("Waiting: " + details)
Thread.sleep(1000)
}
return status
}
/**
* Wait for the hbase master to be live (or past it in the lifecycle)
* @param clustername cluster
* @param spintime time to wait
* @return true if the cluster came out of the sleep time live
* @throws IOException
* @throws SliderException
*/
public static boolean spinForClusterStartup(SliderClient client, long spintime,
String role)
throws WaitTimeoutException, IOException, SliderException {
int state = client.waitForRoleInstanceLive(role, spintime);
return state == ClusterDescription.STATE_LIVE;
}
public static ClusterDescription dumpClusterStatus(SliderClient client, String text) {
ClusterDescription status = client.clusterDescription;
dumpClusterDescription(text, status)
return status;
}
public static List<ClusterNode> listNodesInRole(SliderClient client, String role) {
return client.listClusterNodesInRole(role)
}
public static void dumpClusterDescription(String text, ClusterDescription status) {
describe(text)
log.info(prettyPrint(status.toJsonString()))
}
public static void dumpClusterDescription(String text, AggregateConf status) {
describe(text)
log.info(status.toString())
}
/**
* Fetch the current site config from the Slider AM, from the
* <code>clientProperties</code> field of the ClusterDescription
* @param client client
* @param clustername name of the cluster
* @return the site config
*/
public static Configuration fetchClientSiteConfig(SliderClient client) {
ClusterDescription status = client.clusterDescription;
Configuration siteConf = new Configuration(false)
status.clientProperties.each { String key, String val ->
siteConf.set(key, val, "slider cluster");
}
return siteConf;
}
/**
* Fetch a web page
* @param url URL
* @return the response body
*/
public static String GET(URL url) {
return fetchWebPageWithoutError(url.toString())
}
public static String GET(URL url, String path) {
return GET(url.toString(), path)
}
public static String GET(String base, String path) {
String s = appendToURL(base, path)
return GET(s)
}
def static String GET(String s) {
return fetchWebPageWithoutError(s)
}
public static String appendToURL(String base, String path) {
return SliderUtils.appendToURL(base, path)
}
public static String appendToURL(String base, String... paths) {
return SliderUtils.appendToURL(base, paths)
}
/**
* Fetch a web page
* @param url URL
* @return the response body
*/
public static String fetchWebPage(String url) {
log.info("GET $url")
def httpclient = new HttpClient(new MultiThreadedHttpConnectionManager());
httpclient.httpConnectionManager.params.connectionTimeout = 10000;
GetMethod get = new GetMethod(url);
get.followRedirects = true;
int resultCode
try {
resultCode = httpclient.executeMethod(get);
if (resultCode!=200) {
log.warn("Result code of $resultCode")
}
} catch (IOException e) {
log.error("Failed on $url: $e",e)
throw e;
}
String body = get.responseBodyAsString;
return body;
}
/**
* Fetches a web page asserting that the response code is between 200 and 400.
* Will error on 400 and 500 series response codes and let 200 and 300 through.
* @param url
* @return
*/
public static String fetchWebPageWithoutError(String url) {
assert null != url
log.info("Fetching HTTP content at " + url);
def client = new HttpClient(new MultiThreadedHttpConnectionManager());
client.httpConnectionManager.params.connectionTimeout = 10000;
GetMethod get = new GetMethod(url);
get.followRedirects = true;
int resultCode = client.executeMethod(get);
def body = get.responseBodyAsString
if (!(resultCode >= 200 && resultCode < 400)) {
def message = "Request to $url failed with exit code $resultCode, body length ${body?.length()}:\n$body"
log.error(message)
fail(message)
}
return body;
}
/**
* Assert that a service operation succeeded
* @param service service
*/
public static void assertSucceeded(ServiceLauncher service) {
assert 0 == service.serviceExitCode;
}
/**
* Make an assertion about the exit code of an exception
* @param ex exception
* @param exitCode exit code
* @param text error text to look for in the exception
*/
static void assertExceptionDetails(
ServiceLaunchException ex,
int exitCode,
String text = "") {
if (exitCode != ex.exitCode) {
log.warn(
"Wrong exit code, expected $exitCode but got $ex.exitCode in $ex",
ex)
assert exitCode == ex.exitCode
}
if (text) {
if (!(ex.toString().contains(text))) {
log.warn("String match failed in $ex", ex)
assert ex.toString().contains(text);
}
}
}
/**
* Launch the slider client with the specific args; no validation
* of return code takes place
* @param conf configuration
* @param args arg list
* @return the return code
*/
protected static ServiceLauncher<SliderClient> execSliderCommand(
Configuration conf,
List args) {
ServiceLauncher<SliderClient> serviceLauncher =
new ServiceLauncher<SliderClient>(SliderClient.name);
serviceLauncher.launchService(conf,
toArray(args),
false);
return serviceLauncher
}
public static ServiceLauncher launch(Class serviceClass,
Configuration conf,
List<Object> args) throws
Throwable {
ServiceLauncher serviceLauncher =
new ServiceLauncher(serviceClass.name);
serviceLauncher.launchService(conf,
toArray(args),
false);
return serviceLauncher;
}
public static Throwable launchExpectingException(Class serviceClass,
Configuration conf,
String expectedText,
List args)
throws Throwable {
try {
ServiceLauncher launch = launch(serviceClass, conf, args);
throw new AssertionError("Expected an exception with text containing " + expectedText
+ " -but the service completed with exit code "
+ launch.serviceExitCode);
} catch (Throwable thrown) {
if (expectedText && !thrown.toString().contains(expectedText)) {
//not the right exception -rethrow
throw thrown;
}
return thrown;
}
}
public static ServiceLauncher<SliderClient> launchClientAgainstRM(
String address,
List args,
Configuration conf) {
assert address != null
log.info("Connecting to rm at ${address}")
if (!args.contains(Arguments.ARG_MANAGER)) {
args += [Arguments.ARG_MANAGER, address]
}
ServiceLauncher<SliderClient> launcher = execSliderCommand(conf, args)
return launcher
}
/**
* Add a configuration parameter as a cluster configuration option
* @param extraArgs extra arguments
* @param conf config
* @param option option
*/
public static void addClusterConfigOption(
List<String> extraArgs,
YarnConfiguration conf,
String option) {
conf.getTrimmed(option);
extraArgs << ARG_OPTION << option << getRequiredConfOption(conf, option)
}
/**
* Assert that a path refers to a directory
* @param fs filesystem
* @param path path of the directory
* @throws IOException on File IO problems
*/
public static void assertIsDirectory(HadoopFS fs,
Path path) throws IOException {
FileStatus fileStatus = fs.getFileStatus(path);
assertIsDirectory(fileStatus);
}
/**
* Assert that a path refers to a directory
* @param fileStatus stats to check
*/
public static void assertIsDirectory(FileStatus fileStatus) {
assertTrue("Should be a dir -but isn't: " + fileStatus,
fileStatus.isDirectory());
}
/**
* Assert that a path exists -but make no assertions as to the
* type of that entry
*
* @param fileSystem filesystem to examine
* @param message message to include in the assertion failure message
* @param path path in the filesystem
* @throws IOException IO problems
*/
public static void assertPathExists(
HadoopFS fileSystem,
String message,
Path path) throws IOException {
if (!fileSystem.exists(path)) {
//failure, report it
fail(message + ": not found \"" + path + "\" in " + path.getParent() + "-" +
ls(fileSystem, path.getParent()));
}
}
/**
* Assert that a path does not exist
*
* @param fileSystem filesystem to examine
* @param message message to include in the assertion failure message
* @param path path in the filesystem
* @throws IOException IO problems
*/
public static void assertPathDoesNotExist(
HadoopFS fileSystem,
String message,
Path path) throws IOException {
try {
FileStatus status = fileSystem.getFileStatus(path);
// a status back implies there is a file here
fail(message + ": unexpectedly found " + path + " as " + status);
} catch (FileNotFoundException expected) {
//this is expected
}
}
/**
* Assert that a FileSystem.listStatus on a dir finds the subdir/child entry
* @param fs filesystem
* @param dir directory to scan
* @param subdir full path to look for
* @throws IOException IO probles
*/
public static void assertListStatusFinds(HadoopFS fs,
Path dir,
Path subdir) throws IOException {
FileStatus[] stats = fs.listStatus(dir);
boolean found = false;
StringBuilder builder = new StringBuilder();
for (FileStatus stat : stats) {
builder.append(stat.toString()).append('\n');
if (stat.getPath().equals(subdir)) {
found = true;
}
}
assertTrue("Path " + subdir
+ " not found in directory " + dir + ":" + builder,
found);
}
/**
* List a a path to string
* @param fileSystem filesystem
* @param path directory
* @return a listing of the filestatuses of elements in the directory, one
* to a line, precedeed by the full path of the directory
* @throws IOException connectivity problems
*/
public static String ls(HadoopFS fileSystem, Path path)
throws
IOException {
if (path == null) {
//surfaces when someone calls getParent() on something at the top of the path
return "/";
}
FileStatus[] stats;
String pathtext = "ls " + path;
try {
stats = fileSystem.listStatus(path);
} catch (FileNotFoundException e) {
return pathtext + " -file not found";
} catch (IOException e) {
return pathtext + " -failed: " + e;
}
return pathtext + fileStatsToString(stats, "\n");
}
/**
* Take an array of filestats and convert to a string (prefixed w/ a [01] counter
* @param stats array of stats
* @param separator separator after every entry
* @return a stringified set
*/
public static String fileStatsToString(FileStatus[] stats, String separator) {
StringBuilder buf = new StringBuilder(stats.length * 128);
for (int i = 0; i < stats.length; i++) {
buf.append(String.format("[%02d] %s", i, stats[i])).append(separator);
}
return buf.toString();
}
public static void waitWhileClusterLive(SliderClient sliderClient) {
waitWhileClusterLive(sliderClient, 30000)
}
public static void dumpRegistryInstances(
List<CuratorServiceInstance<ServiceInstanceData>> instances) {
describe "service registry slider instances"
JsonSerDeser<ServiceInstanceData> serDeser = new JsonSerDeser<>(
ServiceInstanceData)
instances.each { CuratorServiceInstance<ServiceInstanceData> svc ->
ServiceInstanceData payload = svc.payload
def json = serDeser.toJson(payload)
log.info("service $svc payload=\n$json")
}
describe "end list service registry slider instances"
}
public static void dumpRegistryInstanceIDs(List<String> instanceIds) {
describe "service registry instance IDs"
dumpCollection(instanceIds)
}
public static void dumpRegistryServiceTypes(Collection<String> entries) {
describe "service registry types"
dumpCollection(entries)
}
def static void dumpCollection(Collection<String> entries) {
log.info("number of entries: ${entries.size()}")
entries.each { String it -> log.info(it) }
}
/**
* Get a time option in seconds if set, otherwise the default value (also in seconds).
* This operation picks up the time value as a system property if set -that
* value overrides anything in the test file
* @param conf
* @param key
* @param defVal
* @return
*/
public static int getTimeOptionMillis(Configuration conf, String key, int defValMillis) {
int val = conf.getInt(key, 0)
val = Integer.getInteger(key, val)
int time = 1000 * val
if (time == 0) {
time = defValMillis
}
return time;
}
def dumpConfigurationSet(PublishedConfigSet confSet) {
confSet.keys().each { String key ->
def config = confSet.get(key)
log.info "$key -- ${config.description}"
}
}
}