blob: b525ea62d55dabd5bf4bd948907cd0417098e421 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.agent.standalone
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.hadoop.yarn.api.records.ApplicationReport
import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.apache.hadoop.yarn.exceptions.YarnException
import org.apache.slider.agent.AgentMiniClusterTestBase
import org.apache.slider.api.ClusterNode
import org.apache.slider.client.SliderClient
import org.apache.slider.common.SliderKeys
import org.apache.slider.common.params.ActionRegistryArgs
import org.apache.slider.core.build.InstanceBuilder
import org.apache.slider.core.conf.AggregateConf
import org.apache.slider.core.exceptions.SliderException
import org.apache.slider.core.launch.LaunchedApplication
import org.apache.slider.core.main.LauncherExitCodes
import org.apache.slider.core.main.ServiceLauncher
import org.apache.slider.core.persist.LockAcquireFailedException
import org.junit.After
import org.junit.Test
@CompileStatic
@Slf4j
class TestStandaloneAgentAM extends AgentMiniClusterTestBase {
@After
def clientname() {
sliderClientClassName = DEFAULT_SLIDER_CLIENT
}
@Test
public void testStandaloneAgentAM() throws Throwable {
describe "create a standalone AM then perform actions on it"
sliderClientClassName = ExtendedSliderClient.name
//launch fake master
String clustername = createMiniCluster("", configuration, 1, true)
ServiceLauncher<SliderClient> launcher =
createStandaloneAM(clustername, true, false)
SliderClient client = launcher.service
addToTeardown(client);
ApplicationReport report = waitForClusterLive(client)
URI uri = new URI(report.originalTrackingUrl)
assert uri.port in [60000, 60001, 60002, 60003]
assert report.rpcPort in [60000, 60001, 60002, 60003]
logReport(report)
List<ApplicationReport> apps = client.applications;
//get some of its status
dumpClusterStatus(client, "standalone application status")
List<ClusterNode> clusterNodes = client.listClusterNodesInRole(
SliderKeys.COMPONENT_AM)
assert clusterNodes.size() == 1
ClusterNode masterNode = clusterNodes[0]
log.info("Master node = ${masterNode}");
List<ClusterNode> nodes
String[] uuids = client.listNodeUUIDsByRole(SliderKeys.COMPONENT_AM)
assert uuids.length == 1;
nodes = client.listClusterNodes(uuids);
assert nodes.size() == 1;
describe "AM Node UUID=${uuids[0]}"
nodes = listNodesInRole(client, SliderKeys.COMPONENT_AM)
assert nodes.size() == 1;
nodes = listNodesInRole(client, "")
assert nodes.size() == 1;
assert nodes[0].role == SliderKeys.COMPONENT_AM
String username = client.username
def serviceRegistryClient = client.yarnAppListClient
describe("list of all applications")
logApplications(apps)
describe("apps of user $username")
List<ApplicationReport> userInstances = serviceRegistryClient.listInstances()
logApplications(userInstances)
assert userInstances.size() == 1
describe("named app $clustername")
ApplicationReport instance = serviceRegistryClient.findInstance(clustername)
logReport(instance)
assert instance != null
//switch to the slider ZK-based registry
describe "service registry instance IDs"
def instanceIds = client.listRegisteredSliderInstances()
log.info("number of instanceIds: ${instanceIds.size()}")
instanceIds.each { String it -> log.info(it) }
describe "Yarn registry"
def yarnRegistry = client.registryOperations
describe "teardown of cluster instance #1"
//now kill that cluster
assert 0 == clusterActionFreeze(client, clustername)
//list it & See if it is still there
ApplicationReport oldInstance = serviceRegistryClient.findInstance(
clustername)
assert oldInstance != null
assert oldInstance.yarnApplicationState >= YarnApplicationState.FINISHED
//create another AM
launcher = createStandaloneAM(clustername, true, true)
client = launcher.service
ApplicationId i2AppID = client.applicationId
//expect 2 in the list
userInstances = serviceRegistryClient.listInstances()
logApplications(userInstances)
assert userInstances.size() == 2
//but when we look up an instance, we get the new App ID
ApplicationReport instance2 = serviceRegistryClient.findInstance(
clustername)
assert i2AppID == instance2.applicationId
describe("attempting to create instance #3")
//now try to create instance #3, and expect an in-use failure
try {
createStandaloneAM(clustername, false, true)
fail("expected a failure, got a standalone AM")
} catch (SliderException e) {
assertFailureClusterInUse(e);
}
// do a quick registry listing here expecting a usage failure.
ActionRegistryArgs registryArgs = new ActionRegistryArgs()
registryArgs.name=clustername;
def exitCode = client.actionRegistry(registryArgs)
assert LauncherExitCodes.EXIT_USAGE == exitCode
describe("Stopping instance #2")
//now stop that cluster
assert 0 == clusterActionFreeze(client, clustername)
logApplications(client.listSliderInstances(username))
//verify it is down
ApplicationReport reportFor = client.getApplicationReport(i2AppID)
//downgrade this to a fail
// Assume.assumeTrue(YarnApplicationState.FINISHED <= report.yarnApplicationState)
assert YarnApplicationState.FINISHED <= reportFor.yarnApplicationState
ApplicationReport instance3 = serviceRegistryClient.findInstance(
clustername)
assert instance3.yarnApplicationState >= YarnApplicationState.FINISHED
// destroy it
client.actionDestroy(clustername)
}
static class ExtendedSliderClient extends SliderClient {
@Override
protected void persistInstanceDefinition(boolean overwrite,
Path appconfdir,
InstanceBuilder builder)
throws IOException, SliderException, LockAcquireFailedException {
AggregateConf conf = builder.instanceDescription
conf.appConfOperations.
globalOptions[SliderKeys.KEY_ALLOWED_PORT_RANGE]= "60000-60003"
super.persistInstanceDefinition(overwrite, appconfdir, builder)
}
@Override
LaunchedApplication launchApplication(String clustername,
Path clusterDirectory,
AggregateConf instanceDefinition,
boolean debugAM)
throws YarnException, IOException {
instanceDefinition.appConfOperations.
globalOptions[SliderKeys.KEY_ALLOWED_PORT_RANGE] ="60000-60003"
return super.launchApplication(clustername, clusterDirectory, instanceDefinition, debugAM)
}
}
}