blob: 53ac7f54d6a74ade8004111f8c4823b97780792f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.providers.accumulo
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.accumulo.core.client.ZooKeeperInstance
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.slider.core.main.ServiceLauncher
import org.apache.slider.api.ClusterDescription
import org.apache.slider.api.ResourceKeys
import org.apache.slider.client.SliderClient
import org.apache.slider.test.YarnZKMiniClusterTestBase
import static org.apache.slider.common.SliderXMLConfKeysForTesting.*
import static org.apache.slider.providers.accumulo.AccumuloKeys.*
import static org.apache.slider.common.params.Arguments.ARG_PROVIDER
import static org.apache.slider.common.params.Arguments.ARG_RES_COMP_OPT
/**
* test base for accumulo clusters
*/
@CompileStatic
@Slf4j
public abstract class AccumuloTestBase extends YarnZKMiniClusterTestBase {
public static final int ACCUMULO_LAUNCH_WAIT_TIME
public static final boolean ACCUMULO_TESTS_ENABLED
public static final int ACCUMULO_CLUSTER_STARTUP_TIME = ACCUMULO_LAUNCH_WAIT_TIME
public static final int ACCUMULO_CLUSTER_STOP_TIME = 1 * 60 * 1000
/**
* The time to sleep before trying to talk to the HBase Master and
* expect meaningful results.
*/
public static final int ACCUMULO_CLUSTER_STARTUP_TO_LIVE_TIME = ACCUMULO_CLUSTER_STARTUP_TIME
public static final int ACCUMULO_GO_LIVE_TIME = 60000
@Override
public String getTestConfigurationPath() {
return "src/main/resources/" + CONF_RESOURCE;
}
@Override
void setup() {
super.setup()
assumeBoolOption(SLIDER_CONFIG, KEY_TEST_ACCUMULO_ENABLED, true)
assumeArchiveDefined();
assumeApplicationHome();
YarnConfiguration conf = testConfiguration
assumeOtherSettings(conf)
}
/**
* Teardown
*/
@Override
void teardown() {
super.teardown();
if (teardownKillall) {
killAllAccumuloProcesses();
}
}
void killAllAccumuloProcesses() {
killJavaProcesses("org.apache.accumulo.start.Main", SIGKILL)
}
@Override
public String getArchiveKey() {
return KEY_TEST_ACCUMULO_TAR
}
/**
* Get the key for the application
* @return
*/
@Override
public String getApplicationHomeKey() {
return KEY_TEST_ACCUMULO_HOME
}
/**
* Assume that HBase home is defined. This does not check that the
* path is valid -that is expected to be a failure on tests that require
* HBase home to be set.
*/
public void assumeOtherSettings(YarnConfiguration conf) {
assumeStringOptionSet(conf, OPTION_ZK_HOME)
}
/**
* Create a full cluster with a master & the requested no. of region servers
* @param clustername cluster name
* @param tablets # of nodes
* @param extraArgs list of extra args to add to the creation command
* @param deleteExistingData should the data of any existing cluster
* of this name be deleted
* @param blockUntilRunning block until the AM is running
* @return launcher which will have executed the command.
*/
public ServiceLauncher<SliderClient> createAccCluster(String clustername, int tablets, List<String> extraArgs, boolean deleteExistingData, boolean blockUntilRunning) {
Map<String, Integer> roles = [
(ROLE_MASTER): 1,
(ROLE_TABLET): tablets,
];
return createAccCluster(clustername, roles, extraArgs, deleteExistingData, blockUntilRunning);
}
/**
* Create an accumulo cluster
* @param clustername
* @param roles
* @param extraArgs
* @param deleteExistingData
* @param blockUntilRunning
* @return the cluster launcher
*/
public ServiceLauncher<SliderClient> createAccCluster(String clustername, Map<String, Integer> roles, List<String> extraArgs, boolean deleteExistingData, boolean blockUntilRunning) {
extraArgs << ARG_PROVIDER << PROVIDER_ACCUMULO;
YarnConfiguration conf = testConfiguration
def clusterOps = [
(OPTION_ZK_HOME): conf.getTrimmed(OPTION_ZK_HOME),
(OPTION_HADOOP_HOME): conf.getTrimmed(OPTION_HADOOP_HOME),
("site." + AccumuloConfigFileOptions.MONITOR_PORT_CLIENT): AccumuloConfigFileOptions.MONITOR_PORT_CLIENT_DEFAULT,
("site." + AccumuloConfigFileOptions.MASTER_PORT_CLIENT): AccumuloConfigFileOptions.MASTER_PORT_CLIENT_DEFAULT,
]
extraArgs << ARG_RES_COMP_OPT << ROLE_MASTER << ResourceKeys.YARN_MEMORY << YRAM;
extraArgs << ARG_RES_COMP_OPT << ROLE_TABLET << ResourceKeys.YARN_MEMORY << YRAM
extraArgs << ARG_RES_COMP_OPT << ROLE_MONITOR << ResourceKeys.YARN_MEMORY << YRAM
extraArgs << ARG_RES_COMP_OPT << ROLE_GARBAGE_COLLECTOR << ResourceKeys.YARN_MEMORY << YRAM
return createCluster(clustername,
roles,
extraArgs,
deleteExistingData,
blockUntilRunning,
clusterOps)
}
def getAccClusterStatus() {
ZooKeeperInstance instance = new ZooKeeperInstance("", "localhost:4");
instance.getConnector("user", "pass").instanceOperations().tabletServers;
}
public String fetchLocalPage(int port, String page) {
String url = "http://localhost:" + port+ page
return fetchWebPage(url)
}
public ClusterDescription flexAccClusterTestRun(
String clustername, List<Map<String, Integer>> plan) {
int planCount = plan.size()
assert planCount > 0
createMiniCluster(clustername, getConfiguration(),
1,
true);
//now launch the cluster
SliderClient sliderClient = null;
ServiceLauncher launcher = createAccCluster(clustername,
plan[0],
[],
true,
true);
sliderClient = (SliderClient) launcher.service;
try {
//verify the #of roles is as expected
//get the hbase status
waitForRoleCount(sliderClient, plan[0],
ACCUMULO_CLUSTER_STARTUP_TO_LIVE_TIME);
sleep(ACCUMULO_GO_LIVE_TIME);
plan.remove(0)
ClusterDescription cd = null
while (!plan.empty) {
Map<String, Integer> flexTarget = plan.remove(0)
//now flex
describe(
"Flexing " + roleMapToString(flexTarget));
boolean flexed = 0 == sliderClient.flex(clustername,
flexTarget
);
cd = waitForRoleCount(sliderClient, flexTarget,
ACCUMULO_CLUSTER_STARTUP_TO_LIVE_TIME);
sleep(ACCUMULO_GO_LIVE_TIME);
}
return cd;
} finally {
maybeStopCluster(sliderClient, null, "end of flex test run");
}
}
}