blob: 73b81ab3b4ace7cb53ea5e4a0bf3869b249ca950 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.test
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem as HadoopFS
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hdfs.MiniDFSCluster
import org.apache.hadoop.service.ServiceOperations
import org.apache.hadoop.yarn.api.records.ApplicationReport
import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.apache.slider.api.ClusterNode
import org.apache.slider.client.SliderClient
import org.apache.slider.common.SliderExitCodes
import org.apache.slider.common.SliderXmlConfKeys
import org.apache.slider.common.params.ActionFreezeArgs
import org.apache.slider.common.params.Arguments
import org.apache.slider.common.params.SliderActions
import org.apache.slider.common.tools.Duration
import org.apache.slider.common.tools.SliderFileSystem
import org.apache.slider.common.tools.SliderUtils
import org.apache.slider.core.exceptions.ErrorStrings
import org.apache.slider.core.exceptions.SliderException
import org.apache.slider.core.main.ServiceLauncher
import org.apache.slider.core.main.ServiceLauncherBaseTest
import org.apache.slider.server.appmaster.SliderAppMaster
import org.junit.After
import org.junit.BeforeClass
import org.junit.Rule
import org.junit.rules.Timeout
import static org.apache.slider.test.KeysForTests.*
import static org.apache.slider.common.SliderXMLConfKeysForTesting.*;
/**
* Base class for mini cluster tests -creates a field for the
* mini yarn cluster
*/
//@CompileStatic
@Slf4j
public abstract class YarnMiniClusterTestBase extends ServiceLauncherBaseTest {
/**
* Mini YARN cluster only
*/
public static final int CLUSTER_GO_LIVE_TIME = 3 * 60 * 1000
public static final int CLUSTER_STOP_TIME = 1 * 60 * 1000
public static final int SIGTERM = -15
public static final int SIGKILL = -9
public static final int SIGSTOP = -17
public static
final String NO_ARCHIVE_DEFINED = "Archive configuration option not set: "
/**
* RAM for the YARN containers: {@value}
*/
public static final String YRAM = "256"
public static final String FIFO_SCHEDULER = "org.apache.hadoop.yarn.server" +
".resourcemanager.scheduler.fifo.FifoScheduler";
public static final YarnConfiguration SLIDER_CONFIG = SliderUtils.createConfiguration();
static {
SLIDER_CONFIG.setInt(SliderXmlConfKeys.KEY_AM_RESTART_LIMIT, 1)
SLIDER_CONFIG.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 100)
SLIDER_CONFIG.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false)
SLIDER_CONFIG.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false)
SLIDER_CONFIG.setBoolean(SliderXmlConfKeys.KEY_SLIDER_AM_DEPENDENCY_CHECKS_DISABLED,
true)
SLIDER_CONFIG.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1)
}
public int thawWaitTime = DEFAULT_THAW_WAIT_TIME_SECONDS * 1000
public int freezeWaitTime = DEFAULT_TEST_FREEZE_WAIT_TIME_SECONDS * 1000
public int sliderTestTimeout = DEFAULT_TEST_TIMEOUT_SECONDS * 1000
public boolean teardownKillall = DEFAULT_TEARDOWN_KILLALL
public boolean accumuloTestsEnabled = true
public int accumuloLaunchWaitTime = DEFAULT_ACCUMULO_LAUNCH_TIME_SECONDS * 1000
public boolean hbaseTestsEnabled = true
public int hbaseLaunchWaitTime = DEFAULT_HBASE_LAUNCH_TIME_SECONDS * 1000
protected MiniDFSCluster hdfsCluster
protected MiniYARNCluster miniCluster
protected boolean switchToImageDeploy = false
protected boolean imageIsRemote = false
protected URI remoteImageURI
private int clusterCount =1;
protected List<SliderClient> clustersToTeardown = [];
/**
* This is set in a system property
*/
@Rule
public Timeout testTimeout = new Timeout(
getTimeOptionMillis(testConfiguration,
KEY_TEST_TIMEOUT,
DEFAULT_TEST_TIMEOUT_SECONDS * 1000)
)
/**
* Clent side test: validate system env before launch
*/
@BeforeClass
public static void checkClientEnv() {
SliderUtils.validateSliderClientEnvironment(null)
}
protected String buildClustername(String clustername) {
return clustername ?: createClusterName()
}
/**
* Create the cluster name from the method name and an auto-incrementing
* counter.
* @return a cluster name
*/
protected String createClusterName() {
def base = methodName.getMethodName().toLowerCase(Locale.ENGLISH)
if (clusterCount++ > 1) {
base += "-$clusterCount"
}
return base
}
@Override
void setup() {
super.setup()
def testConf = testConfiguration;
thawWaitTime = getTimeOptionMillis(testConf,
KEY_TEST_THAW_WAIT_TIME,
thawWaitTime)
freezeWaitTime = getTimeOptionMillis(testConf,
KEY_TEST_FREEZE_WAIT_TIME,
freezeWaitTime)
sliderTestTimeout = getTimeOptionMillis(testConf,
KEY_TEST_TIMEOUT,
sliderTestTimeout)
teardownKillall =
testConf.getBoolean(KEY_TEST_TEARDOWN_KILLALL,
teardownKillall)
hbaseTestsEnabled =
testConf.getBoolean(KEY_TEST_HBASE_ENABLED, hbaseTestsEnabled)
hbaseLaunchWaitTime = getTimeOptionMillis(testConf,
KEY_TEST_HBASE_LAUNCH_TIME,
hbaseLaunchWaitTime)
accumuloTestsEnabled =
testConf.getBoolean(KEY_TEST_ACCUMULO_ENABLED, accumuloTestsEnabled)
accumuloLaunchWaitTime = getTimeOptionMillis(testConf,
KEY_ACCUMULO_LAUNCH_TIME,
accumuloLaunchWaitTime)
}
@After
public void teardown() {
describe("teardown")
stopRunningClusters();
stopMiniCluster();
}
protected void addToTeardown(SliderClient client) {
clustersToTeardown << client;
}
protected void addToTeardown(ServiceLauncher<SliderClient> launcher) {
SliderClient sliderClient = launcher?.service
if (sliderClient) {
addToTeardown(sliderClient)
}
}
protected YarnConfiguration getConfiguration() {
return SLIDER_CONFIG;
}
/**
* Stop any running cluster that has been added
*/
public void stopRunningClusters() {
clustersToTeardown.each { SliderClient client ->
try {
maybeStopCluster(client, "", "Teardown at end of test case", true);
} catch (Exception e) {
log.warn("While stopping cluster " + e, e);
}
}
}
public void stopMiniCluster() {
Log commonslog = LogFactory.getLog(this.class)
ServiceOperations.stopQuietly(commonslog, miniCluster)
hdfsCluster?.shutdown();
}
/**
* Create and start a minicluster
* @param name cluster/test name; if empty one is created from the junit method
* @param conf configuration to use
* @param noOfNodeManagers #of NMs
* @param numLocalDirs #of local dirs
* @param numLogDirs #of log dirs
* @param startZK create a ZK micro cluster
* @param startHDFS create an HDFS mini cluster
* @return the name of the cluster
*/
protected String createMiniCluster(String name,
YarnConfiguration conf,
int noOfNodeManagers,
int numLocalDirs,
int numLogDirs,
boolean startHDFS) {
assertNativeLibrariesPresent();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 64);
conf.set(YarnConfiguration.RM_SCHEDULER, FIFO_SCHEDULER);
SliderUtils.patchConfiguration(conf)
name = buildClustername(name)
miniCluster = new MiniYARNCluster(
name,
noOfNodeManagers,
numLocalDirs,
numLogDirs)
miniCluster.init(conf)
miniCluster.start();
if (startHDFS) {
createMiniHDFSCluster(name, conf)
}
return name
}
/**
* Create a mini HDFS cluster and save it to the hdfsClusterField
* @param name
* @param conf
*/
public void createMiniHDFSCluster(String name, YarnConfiguration conf) {
hdfsCluster = buildMiniHDFSCluster(name, conf)
}
/**
* Inner work building the mini dfs cluster
* @param name
* @param conf
* @return
*/
public static MiniDFSCluster buildMiniHDFSCluster(
String name,
YarnConfiguration conf) {
assertNativeLibrariesPresent();
File baseDir = new File("./target/hdfs/$name").absoluteFile;
//use file: to rm it recursively
FileUtil.fullyDelete(baseDir)
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.absolutePath)
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
def cluster = builder.build()
return cluster
}
/**
* Launch the client with the specific args against the MiniMR cluster
* launcher ie expected to have successfully completed
* @param conf configuration
* @param args arg list
* @return the return code
*/
protected ServiceLauncher<SliderClient> launchClientAgainstMiniMR(Configuration conf,
List args) {
ServiceLauncher<SliderClient> launcher = launchClientNoExitCodeCheck(conf, args)
int exited = launcher.serviceExitCode
if (exited != 0) {
throw new SliderException(exited, "Launch failed with exit code $exited")
}
return launcher;
}
/**
* Launch the client with the specific args against the MiniMR cluster
* without any checks for exit codes
* @param conf configuration
* @param args arg list
* @return the return code
*/
public ServiceLauncher<SliderClient> launchClientNoExitCodeCheck(
Configuration conf,
List args) {
assert miniCluster != null
return launchClientAgainstRM(RMAddr, args, conf)
}
/**
* Kill all Slider Services.
* @param signal
*/
public int killAM(int signal) {
killJavaProcesses(SliderAppMaster.SERVICE_CLASSNAME_SHORT, signal)
}
/**
* List any java process with the given grep pattern
* @param grepString string to grep for
*/
public String lsJavaProcesses() {
Process bash = ["jps","-v"].execute()
bash.waitFor()
String out = bash.in.text
log.info(out)
String err = bash.err.text
log.error(err)
return out + "\n" + err
}
public YarnConfiguration getTestConfiguration() {
YarnConfiguration conf = getConfiguration()
conf.addResource(SLIDER_TEST_XML)
return conf;
}
protected String getRMAddr() {
assert miniCluster != null
String addr = miniCluster.config.get(YarnConfiguration.RM_ADDRESS)
assert addr != null;
assert addr != "";
return addr
}
/**
* return the default filesystem, which is HDFS if the miniDFS cluster is
* up, file:// if not
* @return a filesystem string to pass down
*/
protected String getFsDefaultName() {
return buildFsDefaultName(hdfsCluster)
}
public static String buildFsDefaultName(MiniDFSCluster miniDFSCluster) {
if (miniDFSCluster) {
return "hdfs://localhost:${miniDFSCluster.nameNodePort}/"
} else {
return "file:///"
}
}
protected String getWaitTimeArg() {
return WAIT_TIME_ARG;
}
protected int getWaitTimeMillis(Configuration conf) {
return WAIT_TIME * 1000;
}
/**
* Create a cluster
* @param clustername cluster name
* @param roles map of rolename to count
* @param extraArgs list of extra args to add to the creation command
* @param deleteExistingData should the data of any existing cluster
* of this name be deleted
* @param blockUntilRunning block until the AM is running
* @param clusterOps map of key=value cluster options to set with the --option arg
* @return launcher which will have executed the command.
*/
public ServiceLauncher<SliderClient> createCluster(
String clustername,
Map<String, Integer> roles,
List<String> extraArgs,
boolean deleteExistingData,
boolean blockUntilRunning,
Map<String, String> clusterOps) {
createOrBuildCluster(
SliderActions.ACTION_CREATE,
clustername,
roles,
extraArgs,
deleteExistingData,
blockUntilRunning,
clusterOps)
}
/**
* Create or build a cluster (the action is set by the first verb)
* @param action operation to invoke: SliderActions.ACTION_CREATE or SliderActions.ACTION_BUILD
* @param clustername cluster name
* @param roles map of rolename to count
* @param extraArgs list of extra args to add to the creation command
* @param deleteExistingData should the data of any existing cluster
* of this name be deleted
* @param blockUntilRunning block until the AM is running
* @param clusterOps map of key=value cluster options to set with the --option arg
* @return launcher which will have executed the command.
*/
public ServiceLauncher<SliderClient> createOrBuildCluster(String action, String clustername,
Map<String, Integer> roles, List<String> extraArgs, boolean deleteExistingData,
boolean blockUntilRunning, Map<String, String> clusterOps) {
assert clustername != null
assert miniCluster != null
// update action should keep existing data
def config = miniCluster.config
if (deleteExistingData && !SliderActions.ACTION_UPDATE.equals(action)) {
HadoopFS dfs = HadoopFS.get(new URI(fsDefaultName), config)
Path clusterDir = new SliderFileSystem(dfs, config).buildClusterDirPath(clustername)
log.info("deleting customer data at $clusterDir")
//this is a safety check to stop us doing something stupid like deleting /
assert clusterDir.toString().contains("/.slider/")
dfs.delete(clusterDir, true)
}
List<String> componentList = [];
roles.each { String role, Integer val ->
log.info("Component $role := $val")
componentList << Arguments.ARG_COMPONENT << role << Integer.toString(val)
}
List<String> argsList = [
action, clustername,
Arguments.ARG_MANAGER, RMAddr,
Arguments.ARG_FILESYSTEM, fsDefaultName,
Arguments.ARG_DEBUG,
Arguments.ARG_CONFDIR, confDir
]
if (blockUntilRunning) {
argsList << Arguments.ARG_WAIT << WAIT_TIME_ARG
}
argsList += extraCLIArgs
argsList += componentList;
argsList += imageCommands
//now inject any cluster options
clusterOps.each { String opt, String val ->
argsList << Arguments.ARG_OPTION << opt << val;
}
if (extraArgs != null) {
argsList += extraArgs;
}
ServiceLauncher<SliderClient> launcher = launchClientAgainstMiniMR(
//config includes RM binding info
new YarnConfiguration(config),
//varargs list of command line params
argsList
)
assert launcher.serviceExitCode == 0
SliderClient client = launcher.service
if (blockUntilRunning) {
client.monitorAppToRunning(new Duration(CLUSTER_GO_LIVE_TIME))
}
return launcher;
}
/**
* Add arguments to launch Slider with.
*
* Extra arguments are added after standard arguments and before roles.
*
* @return additional arguments to launch Slider with
*/
protected List<String> getExtraCLIArgs() {
[]
}
public String getConfDir() {
return resourceConfDirURI
}
/**
* Get the key for the application
* @return
*/
public String getApplicationHomeKey() {
failNotImplemented()
null
}
/**
* Get the archive path -which defaults to the local one
* @return
*/
public String getArchivePath() {
return localArchive
}
/**
* Get the local archive -the one defined in the test configuration
* @return a possibly null/empty string
*/
public final String getLocalArchive() {
return testConfiguration.getTrimmed(archiveKey)
}
/**
* Get the key for archives in tests
* @return
*/
public String getArchiveKey() {
failNotImplemented()
null
}
/**
* Merge a k-v pair into a simple k=v string; simple utility
* @param key key
* @param val value
* @return the string to use after a -D option
*/
public String define(String key, String val) {
return "$key=$val"
}
public void assumeTestEnabled(boolean flag) {
assume(flag, "test disabled")
}
public void assumeArchiveDefined() {
String archive = archivePath
boolean defined = archive != null && archive != ""
if (!defined) {
log.warn(NO_ARCHIVE_DEFINED + archiveKey);
}
assume(defined,NO_ARCHIVE_DEFINED + archiveKey)
}
/**
* Assume that application home is defined. This does not check that the
* path is valid -that is expected to be a failure on tests that require
* application home to be set.
*/
public void assumeApplicationHome() {
assume(applicationHome != null && applicationHome != "",
"Application home dir option not set " + applicationHomeKey)
}
public String getApplicationHome() {
return testConfiguration.getTrimmed(applicationHomeKey)
}
public List<String> getImageCommands() {
if (switchToImageDeploy) {
// its an image that had better be defined
assert archivePath
if (!imageIsRemote) {
// its not remote, so assert it exists
File f = new File(archivePath)
assert f.exists()
return [Arguments.ARG_IMAGE, f.toURI().toString()]
} else {
assert remoteImageURI
// if it is remote, then its whatever the archivePath property refers to
return [Arguments.ARG_IMAGE, remoteImageURI.toString()];
}
} else {
assert applicationHome
assert new File(applicationHome).exists();
return [Arguments.ARG_APP_HOME, applicationHome]
}
}
/**
* Start a cluster that has already been defined
* @param clustername cluster name
* @param extraArgs list of extra args to add to the creation command
* @param blockUntilRunning block until the AM is running
* @return launcher which will have executed the command.
*/
public ServiceLauncher<SliderClient> thawCluster(String clustername, List<String> extraArgs, boolean blockUntilRunning) {
assert clustername != null
assert miniCluster != null
List<String> argsList = [
SliderActions.ACTION_THAW, clustername,
Arguments.ARG_MANAGER, RMAddr,
Arguments.ARG_WAIT, WAIT_TIME_ARG,
Arguments.ARG_FILESYSTEM, fsDefaultName,
]
argsList += extraCLIArgs
if (extraArgs != null) {
argsList += extraArgs;
}
ServiceLauncher<SliderClient> launcher = launchClientAgainstMiniMR(
//config includes RM binding info
new YarnConfiguration(miniCluster.config),
//varargs list of command line params
argsList
)
assert launcher.serviceExitCode == 0
SliderClient client = (SliderClient) launcher.service
if (blockUntilRunning) {
client.monitorAppToRunning(new Duration(CLUSTER_GO_LIVE_TIME))
}
return launcher;
}
/**
* Get the resource configuration dir in the source tree
* @return
*/
public File getResourceConfDir() {
File f = new File(testConfigurationPath).absoluteFile;
if (!f.exists()) {
throw new FileNotFoundException("Resource configuration directory $f not found")
}
return f;
}
public String getTestConfigurationPath() {
failNotImplemented()
null;
}
/**
get a URI string to the resource conf dir that is suitable for passing down
to the AM -and works even when the default FS is hdfs
*/
public String getResourceConfDirURI() {;
return resourceConfDir.absoluteFile.toURI().toString()
}
/**
* Log an application report
* @param report
*/
public void logReport(ApplicationReport report) {
log.info(SliderUtils.reportToString(report));
}
/**
* Log a list of application reports
* @param apps
*/
public void logApplications(List<ApplicationReport> apps) {
apps.each { ApplicationReport r -> logReport(r) }
}
/**
* Wait for the cluster live; fail if it isn't within the (standard) timeout
* @param client client
* @return the app report of the live cluster
*/
public ApplicationReport waitForClusterLive(SliderClient client) {
return waitForClusterLive(client, CLUSTER_GO_LIVE_TIME)
}
/**
* force kill the application after waiting for
* it to shut down cleanly
* @param client client to talk to
* @return the final application report
*/
public ApplicationReport waitForAppToFinish(SliderClient client) {
int waitTime = getWaitTimeMillis(client.config)
return waitForAppToFinish(client, waitTime)
}
/**
* force kill the application after waiting for
* it to shut down cleanly
* @param client client to talk to
* @param waitTime time in milliseconds to wait
* @return the final application report
*/
public static ApplicationReport waitForAppToFinish(
SliderClient client,
int waitTime) {
ApplicationReport report = client.monitorAppToState(
YarnApplicationState.FINISHED,
new Duration(waitTime));
if (report == null) {
log.info("Forcibly killing application")
dumpClusterStatus(client, "final application status")
//list all the nodes' details
List<ClusterNode> nodes = listNodesInRole(client, "")
if (nodes.empty) {
log.info("No live nodes")
}
nodes.each { ClusterNode node -> log.info(node.toString())}
client.forceKillApplication("timed out waiting for application to complete");
report = client.applicationReport
}
return report;
}
/**
* stop the cluster via the stop action -and wait for {@link #CLUSTER_STOP_TIME}
* for the cluster to stop. If it doesn't
* @param sliderClient client
* @param clustername cluster
* @return the exit code
*/
public int clusterActionFreeze(SliderClient sliderClient, String clustername,
String message = "action stop",
boolean force = false) {
log.info("Stopping cluster $clustername: $message")
ActionFreezeArgs freezeArgs = new ActionFreezeArgs();
freezeArgs.waittime = CLUSTER_STOP_TIME
freezeArgs.message = message
freezeArgs.force = force
int exitCode = sliderClient.actionFreeze(clustername,
freezeArgs);
if (exitCode != 0) {
log.warn("Cluster stop failed with error code $exitCode")
}
return exitCode
}
/**
* Teardown-time cluster termination; will stop the cluster iff the client
* is not null
* @param sliderClient client
* @param clustername name of cluster to teardown
* @return
*/
public int maybeStopCluster(
SliderClient sliderClient,
String clustername,
String message,
boolean force = false) {
if (sliderClient != null) {
if (!clustername) {
clustername = sliderClient.deployedClusterName;
}
//only stop a cluster that exists
if (clustername) {
return clusterActionFreeze(sliderClient, clustername, message, force);
}
}
return 0;
}
String roleMapToString(Map<String,Integer> roles) {
StringBuilder builder = new StringBuilder()
roles.each { String name, int value ->
builder.append("$name->$value ")
}
return builder.toString()
}
/**
* Turn on test runs against a copy of the archive that is
* uploaded to HDFS -this method copies up the
* archive then switches the tests into archive mode
*/
public void enableTestRunAgainstUploadedArchive() {
Path remotePath = copyLocalArchiveToHDFS(localArchive)
// image mode
switchToRemoteImageDeploy(remotePath);
}
/**
* Switch to deploying a remote image
* @param remotePath the remote path to use
*/
public void switchToRemoteImageDeploy(Path remotePath) {
switchToImageDeploy = true
imageIsRemote = true
remoteImageURI = remotePath.toUri()
}
/**
* Copy a local archive to HDFS
* @param localArchive local archive
* @return the path of the uploaded image
*/
public Path copyLocalArchiveToHDFS(String localArchive) {
assert localArchive
File localArchiveFile = new File(localArchive)
assert localArchiveFile.exists()
assert hdfsCluster
Path remoteUnresolvedArchive = new Path(localArchiveFile.name)
assert FileUtil.copy(
localArchiveFile,
hdfsCluster.fileSystem,
remoteUnresolvedArchive,
false,
testConfiguration)
Path remotePath = hdfsCluster.fileSystem.resolvePath(
remoteUnresolvedArchive)
return remotePath
}
/**
* Assert that an operation failed because a cluster is in use
* @param e exception
*/
public static void assertFailureClusterInUse(SliderException e) {
assertExceptionDetails(e,
SliderExitCodes.EXIT_APPLICATION_IN_USE,
ErrorStrings.E_CLUSTER_RUNNING)
}
/**
* Create a SliderFileSystem instance bonded to the running FS.
* The YARN cluster must be up and running already
* @return
*/
public SliderFileSystem createSliderFileSystem() {
HadoopFS dfs = HadoopFS.get(new URI(fsDefaultName), configuration)
SliderFileSystem hfs = new SliderFileSystem(dfs, configuration)
return hfs
}
}