blob: d35ed66c496830a72893c89fdcbe9988fb884bac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.providers.accumulo;
import com.google.common.net.HostAndPort;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.slider.common.SliderKeys;
import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.OptionKeys;
import org.apache.slider.api.RoleKeys;
import org.apache.slider.core.conf.AggregateConf;
import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.conf.MapOperations;
import org.apache.slider.core.launch.CommandLineBuilder;
import org.apache.slider.core.launch.ContainerLauncher;
import org.apache.slider.core.exceptions.BadClusterStateException;
import org.apache.slider.core.exceptions.BadCommandArgumentsException;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.providers.AbstractProviderService;
import org.apache.slider.providers.ProviderCore;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderUtils;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.zk.BlockingZKWatcher;
import org.apache.slider.common.tools.ConfigHelper;
import org.apache.slider.server.services.utility.EventCallback;
import org.apache.slider.server.services.utility.EventNotifyingService;
import org.apache.slider.server.services.utility.ForkedProcessService;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Server-side accumulo provider
*/
public class AccumuloProviderService extends AbstractProviderService implements
ProviderCore,
AccumuloKeys,
SliderKeys {
protected static final Logger log =
LoggerFactory.getLogger(AccumuloClientProvider.class);
private AccumuloClientProvider clientProvider;
private static final ProviderUtils providerUtils = new ProviderUtils(log);
private SliderFileSystem fileSystem = null;
public AccumuloProviderService() {
super("accumulo");
}
@Override
public List<ProviderRole> getRoles() {
return AccumuloRoles.ROLES;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
clientProvider = new AccumuloClientProvider(conf);
}
@Override
public void validateInstanceDefinition(AggregateConf instanceDefinition) throws
SliderException {
clientProvider.validateInstanceDefinition(instanceDefinition);
}
@Override
public Configuration loadProviderConfigurationInformation(File confDir)
throws BadCommandArgumentsException, IOException {
return loadProviderConfigurationInformation(confDir, SITE_XML);
}
/*
======================================================================
Server interface below here
======================================================================
*/
@Override
public void buildContainerLaunchContext(ContainerLauncher launcher,
AggregateConf instanceDefinition,
Container container,
String role,
SliderFileSystem fileSystem,
Path generatedConfPath,
MapOperations resourceComponent,
MapOperations appComponent,
Path containerTmpDirPath) throws IOException, SliderException {
this.fileSystem = fileSystem;
// Set the environment
launcher.putEnv(SliderUtils.buildEnvMap(appComponent));
Map<String, String> env = SliderUtils.buildEnvMap(appComponent);
launcher.setEnv(ACCUMULO_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
ConfTreeOperations appConf =
instanceDefinition.getAppConfOperations();
String hadoop_home =
ApplicationConstants.Environment.HADOOP_COMMON_HOME.$();
MapOperations appConfGlobal = appConf.getGlobalOptions();
hadoop_home = appConfGlobal.getOption(OPTION_HADOOP_HOME, hadoop_home);
launcher.setEnv(HADOOP_HOME, hadoop_home);
launcher.setEnv(HADOOP_PREFIX, hadoop_home);
// By not setting ACCUMULO_HOME, this will cause the Accumulo script to
// compute it on its own to an absolute path.
launcher.setEnv(ACCUMULO_CONF_DIR,
ProviderUtils.convertToAppRelativePath(
SliderKeys.PROPAGATED_CONF_DIR_NAME));
launcher.setEnv(ZOOKEEPER_HOME, appConfGlobal.getMandatoryOption(OPTION_ZK_HOME));
//local resources
//add the configuration resources
launcher.addLocalResources(fileSystem.submitDirectory(
generatedConfPath,
SliderKeys.PROPAGATED_CONF_DIR_NAME));
//Add binaries
//now add the image if it was set
String imageURI = instanceDefinition.getInternalOperations()
.get(OptionKeys.INTERNAL_APPLICATION_IMAGE_PATH);
fileSystem.maybeAddImagePath(launcher.getLocalResources(), imageURI);
CommandLineBuilder commandLine = new CommandLineBuilder();
String heap = "-Xmx" + appComponent.getOption(RoleKeys.JVM_HEAP, DEFAULT_JVM_HEAP);
String opt = "ACCUMULO_OTHER_OPTS";
if (SliderUtils.isSet(heap)) {
switch (role) {
case AccumuloKeys.ROLE_MASTER:
opt = "ACCUMULO_MASTER_OPTS";
break;
case AccumuloKeys.ROLE_TABLET:
opt = "ACCUMULO_TSERVER_OPTS";
break;
case AccumuloKeys.ROLE_MONITOR:
opt = "ACCUMULO_MONITOR_OPTS";
break;
case AccumuloKeys.ROLE_GARBAGE_COLLECTOR:
opt = "ACCUMULO_GC_OPTS";
break;
}
launcher.setEnv(opt, heap);
}
//this must stay relative if it is an image
commandLine.add(providerUtils.buildPathToScript(instanceDefinition,
"bin", "accumulo"));
//role is translated to the accumulo one
commandLine.add(AccumuloRoles.serviceForRole(role));
// Add any role specific arguments to the command line
String additionalArgs = ProviderUtils.getAdditionalArgs(appComponent);
if (!StringUtils.isBlank(additionalArgs)) {
commandLine.add(additionalArgs);
}
commandLine.addOutAndErrFiles(role + "-out.txt", role + "-err.txt");
launcher.addCommand(commandLine.build());
}
public List<String> buildProcessCommandList(AggregateConf instance,
File confDir,
Map<String, String> env,
String... commands) throws
IOException,
SliderException {
env.put(ACCUMULO_LOG_DIR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
String hadoop_home = System.getenv(HADOOP_HOME);
MapOperations globalOptions =
instance.getAppConfOperations().getGlobalOptions();
hadoop_home = globalOptions.getOption(OPTION_HADOOP_HOME, hadoop_home);
if (hadoop_home == null) {
throw new BadConfigException(
"Undefined env variable/config option: " + HADOOP_HOME);
}
ProviderUtils.validatePathReferencesLocalDir("HADOOP_HOME", hadoop_home);
env.put(HADOOP_HOME, hadoop_home);
env.put(HADOOP_PREFIX, hadoop_home);
//buildup accumulo home env variable to be absolute or relative
String accumulo_home = providerUtils.buildPathToHomeDir(instance,
"bin", "accumulo");
File image = new File(accumulo_home);
String accumuloPath = image.getAbsolutePath();
env.put(ACCUMULO_HOME, accumuloPath);
ProviderUtils.validatePathReferencesLocalDir("ACCUMULO_HOME", accumuloPath);
env.put(ACCUMULO_CONF_DIR, confDir.getAbsolutePath());
String zkHome = globalOptions.getMandatoryOption(OPTION_ZK_HOME);
ProviderUtils.validatePathReferencesLocalDir("ZOOKEEPER_HOME", zkHome);
env.put(ZOOKEEPER_HOME, zkHome);
String accumuloScript = AccumuloClientProvider.buildScriptBinPath(instance);
List<String> launchSequence = new ArrayList<>(8);
launchSequence.add(0, accumuloScript);
Collections.addAll(launchSequence, commands);
return launchSequence;
}
/**
* Accumulo startup is a bit more complex than HBase, as it needs
* to pre-initialize the data directory.
*
* This is done by running an init operation before starting the
* real master. If the init fails, that is reported to the AM, which
* then fails the application.
* If the init succeeds, the next service in the queue is started -
* a composite service that starts the Accumulo Master and, in parallel,
* sends a delayed event to the AM
*
* @param instanceDefinition component description
* @param confDir local dir with the config
* @param env environment variables above those generated by
* @param execInProgress callback for the event notification
* @throws IOException IO problems
* @throws SliderException anything internal
*/
@Override
public boolean exec(AggregateConf instanceDefinition,
File confDir,
Map<String, String> env,
EventCallback execInProgress) throws
IOException,
SliderException {
//now pull in these files and do a bit of last-minute validation
File siteXML = new File(confDir, SITE_XML);
Configuration accumuloSite = ConfigHelper.loadConfFromFile(
siteXML);
String zkQuorum =
accumuloSite.get(AccumuloConfigFileOptions.ZOOKEEPER_HOST);
if (zkQuorum == null) {
throw new BadConfigException("Accumulo site.xml %s does not contain %s",
siteXML,
AccumuloConfigFileOptions.ZOOKEEPER_HOST);
} else {
log.info("ZK Quorum is {}", zkQuorum);
}
//now test this
int timeout = 5000;
try {
verifyZookeeperLive(zkQuorum, timeout);
log.info("Zookeeper is live");
} catch (KeeperException e) {
throw new BadClusterStateException("Failed to connect to Zookeeper at %s after %d seconds",
zkQuorum, timeout);
} catch (InterruptedException ignored) {
throw new BadClusterStateException(
"Interrupted while trying to connect to Zookeeper at %s",
zkQuorum);
}
boolean inited = isInited(instanceDefinition);
if (inited) {
// cluster is inited, so don't run anything
return false;
}
List<String> commands;
log.info("Initializing accumulo datastore {}");
ConfTreeOperations appConfOperations =
instanceDefinition.getAppConfOperations();
ConfTreeOperations internalOperations =
instanceDefinition.getInternalOperations();
ConfTreeOperations resourceOperations =
instanceDefinition.getResourceOperations();
String accumuloInstanceName = internalOperations.get(OptionKeys.APPLICATION_NAME);
commands = buildProcessCommandList(instanceDefinition, confDir, env,
"init",
PARAM_INSTANCE_NAME,
providerUtils.getUserName() + "-" + accumuloInstanceName,
PARAM_PASSWORD,
appConfOperations.getGlobalOptions().getMandatoryOption(
OPTION_ACCUMULO_PASSWORD),
"--clear-instance-name");
ForkedProcessService accumulo =
queueCommand(getName(), env, commands);
//add a timeout to this process
accumulo.setTimeout(
appConfOperations.getGlobalOptions().getOptionInt(
OPTION_ACCUMULO_INIT_TIMEOUT,
INIT_TIMEOUT_DEFAULT), 1);
//callback to AM to trigger cluster review is set up to happen after
//the init/verify action has succeeded
EventNotifyingService notifier = new EventNotifyingService(execInProgress,
internalOperations.getGlobalOptions().getOptionInt(
OptionKeys.INTERNAL_CONTAINER_STARTUP_DELAY,
OptionKeys.DEFAULT_CONTAINER_STARTUP_DELAY));
// register the service for lifecycle management;
// this service is started after the accumulo process completes
addService(notifier);
// now trigger the command sequence
maybeStartCommandSequence();
return true;
}
/**
* probe to see if accumulo has already been installed.
* @param cd cluster description
* @return true if the relevant data directory looks inited
* @throws IOException IO problems
*/
private boolean isInited(AggregateConf cd) throws
IOException,
BadConfigException {
String dataDir = cd.getInternalOperations()
.getGlobalOptions()
.getMandatoryOption(
OptionKeys.INTERNAL_DATA_DIR_PATH);
Path accumuloInited = new Path(dataDir, INSTANCE_ID);
FileSystem fs2 = FileSystem.get(accumuloInited.toUri(), getConf());
return fs2.exists(accumuloInited);
}
private void verifyZookeeperLive(String zkQuorum, int timeout) throws
IOException,
KeeperException,
InterruptedException {
BlockingZKWatcher watcher = new BlockingZKWatcher();
ZooKeeper zookeeper = new ZooKeeper(zkQuorum, 10000, watcher, true);
zookeeper.getChildren("/", watcher);
watcher.waitForZKConnection(timeout);
}
@Override
public Map<String, String> buildProviderStatus() {
Map<String,String> status = new HashMap<>();
return status;
}
/* non-javadoc
* @see org.apache.slider.providers.ProviderService#buildMonitorDetails()
*/
@Override
public Map<String, String> buildMonitorDetails(ClusterDescription clusterDesc) {
Map<String, String> details = super.buildMonitorDetails(clusterDesc);
details.put("Active Accumulo Master (RPC): " +
getInfoAvoidingNull(clusterDesc,
AccumuloKeys.MASTER_ADDRESS), null);
String monitorKey = "Active Accumulo Monitor: ";
String monitorAddr = getInfoAvoidingNull(clusterDesc, AccumuloKeys.MONITOR_ADDRESS);
if (!StringUtils.isBlank(monitorAddr)) {
HostAndPort hostPort = HostAndPort.fromString(monitorAddr);
details.put(monitorKey,
String.format("http://%s:%d", hostPort.getHostText(), hostPort.getPort()));
} else {
details.put(monitorKey + "N/A", null);
}
return details;
}
}