blob: f472bfe722f7b582371369903525a47bfb852748 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.cluster.standalone;
import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import org.apache.accumulo.cluster.ClusterControl;
import org.apache.accumulo.cluster.RemoteShell;
import org.apache.accumulo.cluster.RemoteShellOptions;
import org.apache.accumulo.core.master.thrift.ManagerGoalState;
import org.apache.accumulo.manager.state.SetGoalState;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.server.util.Admin;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
/**
* Use the {@link RemoteShell} to control a standalone (possibly distributed) Accumulo instance
*/
public class StandaloneClusterControl implements ClusterControl {
private static final Logger log = LoggerFactory.getLogger(StandaloneClusterControl.class);
private static final String ACCUMULO_SERVICE_SCRIPT = "accumulo-service",
ACCUMULO_SCRIPT = "accumulo";
private static final String MANAGER_HOSTS_FILE = "managers", GC_HOSTS_FILE = "gc",
TSERVER_HOSTS_FILE = "tservers", TRACER_HOSTS_FILE = "tracers",
MONITOR_HOSTS_FILE = "monitor";
String accumuloHome;
String clientAccumuloConfDir;
String serverAccumuloConfDir;
private String clientCmdPrefix;
private String serverCmdPrefix;
protected RemoteShellOptions options;
protected String accumuloServicePath, accumuloPath;
@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN",
justification = "code runs in same security context as user who provided input file name")
public StandaloneClusterControl(String accumuloHome, String clientAccumuloConfDir,
String serverAccumuloConfDir, String clientCmdPrefix, String serverCmdPrefix) {
this.options = new RemoteShellOptions();
this.accumuloHome = accumuloHome;
this.clientAccumuloConfDir = clientAccumuloConfDir;
this.serverAccumuloConfDir = serverAccumuloConfDir;
this.clientCmdPrefix = clientCmdPrefix;
this.serverCmdPrefix = serverCmdPrefix;
File bin = new File(accumuloHome, "bin");
this.accumuloServicePath = new File(bin, ACCUMULO_SERVICE_SCRIPT).getAbsolutePath();
this.accumuloPath = new File(bin, ACCUMULO_SCRIPT).getAbsolutePath();
}
protected Entry<Integer,String> exec(String hostname, String[] command) throws IOException {
RemoteShell shell = new RemoteShell(hostname, command, options);
try {
shell.execute();
} catch (ExitCodeException e) {
// capture the stdout of the process as well.
String output = shell.getOutput();
// add output for the ExitCodeException.
ExitCodeException ece = new ExitCodeException(e.getExitCode(),
"stderr: " + e.getMessage() + ", stdout: " + output);
log.error("Failed to run command", ece);
return Maps.immutableEntry(e.getExitCode(), output);
}
return Maps.immutableEntry(shell.getExitCode(), shell.getOutput());
}
@Override
public int exec(Class<?> clz, String[] args) throws IOException {
return execWithStdout(clz, args).getKey();
}
@Override
public Entry<Integer,String> execWithStdout(Class<?> clz, String[] args) throws IOException {
String manager = getHosts(MANAGER_HOSTS_FILE).get(0);
List<String> cmd = new ArrayList<>();
cmd.add(clientCmdPrefix);
cmd.add(accumuloPath);
cmd.add(clz.getName());
// Quote the arguments to prevent shell expansion
for (String arg : args) {
cmd.add("'" + arg + "'");
}
log.info("Running: '{}' on {}", sanitize(String.join(" ", cmd)), sanitize(manager));
return exec(manager, cmd.toArray(new String[cmd.size()]));
}
/**
* Prevent potential CRLF injection into logs from read in user data See
* https://find-sec-bugs.github.io/bugs.htm#CRLF_INJECTION_LOGS
*/
private String sanitize(String msg) {
return msg.replaceAll("[\r\n]", "");
}
@Override
public void adminStopAll() throws IOException {
String manager = getHosts(MANAGER_HOSTS_FILE).get(0);
String[] cmd = {serverCmdPrefix, accumuloPath, Admin.class.getName(), "stopAll"};
// Directly invoke the RemoteShell
Entry<Integer,String> pair = exec(manager, cmd);
if (pair.getKey() != 0) {
throw new IOException("stopAll did not finish successfully, retcode=" + pair.getKey()
+ ", stdout=" + pair.getValue());
}
}
/**
* Wrapper around SetGoalState
*
* @param goalState
* The goal state to set
* @throws IOException
* If SetGoalState returns a non-zero result
*/
public void setGoalState(String goalState) throws IOException {
requireNonNull(goalState, "Goal state must not be null");
checkArgument(ManagerGoalState.valueOf(goalState) != null, "Unknown goal state: " + goalState);
String manager = getHosts(MANAGER_HOSTS_FILE).get(0);
String[] cmd = {serverCmdPrefix, accumuloPath, SetGoalState.class.getName(), goalState};
Entry<Integer,String> pair = exec(manager, cmd);
if (pair.getKey() != 0) {
throw new IOException("SetGoalState did not finish successfully, retcode=" + pair.getKey()
+ ", stdout=" + pair.getValue());
}
}
@Override
@SuppressWarnings("removal")
public void startAllServers(ServerType server) throws IOException {
switch (server) {
case TABLET_SERVER:
for (String tserver : getHosts(TSERVER_HOSTS_FILE)) {
start(server, tserver);
}
break;
case MASTER:
case MANAGER:
for (String manager : getHosts(MANAGER_HOSTS_FILE)) {
start(server, manager);
}
break;
case GARBAGE_COLLECTOR:
List<String> hosts = getHosts(GC_HOSTS_FILE);
if (hosts.isEmpty()) {
hosts = getHosts(MANAGER_HOSTS_FILE);
if (hosts.isEmpty()) {
throw new IOException("Found hosts to run garbage collector on");
}
hosts = Collections.singletonList(hosts.get(0));
}
for (String gc : hosts) {
start(server, gc);
}
break;
case TRACER:
for (String tracer : getHosts(TRACER_HOSTS_FILE)) {
start(server, tracer);
}
break;
case MONITOR:
for (String monitor : getHosts(MONITOR_HOSTS_FILE)) {
start(server, monitor);
}
break;
case ZOOKEEPER:
default:
throw new UnsupportedOperationException("Could not start servers for " + server);
}
}
@Override
public void start(ServerType server, String hostname) throws IOException {
String[] cmd = {serverCmdPrefix, accumuloServicePath, getProcessString(server), "start"};
Entry<Integer,String> pair = exec(hostname, cmd);
if (pair.getKey() != 0) {
throw new IOException(
"Start " + server + " on " + hostname + " failed for execute successfully");
}
}
@Override
@SuppressWarnings("removal")
public void stopAllServers(ServerType server) throws IOException {
switch (server) {
case TABLET_SERVER:
for (String tserver : getHosts(TSERVER_HOSTS_FILE)) {
stop(server, tserver);
}
break;
case MASTER:
case MANAGER:
for (String manager : getHosts(MANAGER_HOSTS_FILE)) {
stop(server, manager);
}
break;
case GARBAGE_COLLECTOR:
for (String gc : getHosts(GC_HOSTS_FILE)) {
stop(server, gc);
}
break;
case TRACER:
for (String tracer : getHosts(TRACER_HOSTS_FILE)) {
stop(server, tracer);
}
break;
case MONITOR:
for (String monitor : getHosts(MONITOR_HOSTS_FILE)) {
stop(server, monitor);
}
break;
case ZOOKEEPER:
default:
throw new UnsupportedOperationException("Could not start servers for " + server);
}
}
@Override
public void stop(ServerType server, String hostname) throws IOException {
// TODO Use `accumulo admin stop` for tservers, instrument clean stop for GC, monitor, tracer
// instead kill
kill(server, hostname);
}
@Override
public void signal(ServerType server, String hostname, String signal) throws IOException {
String pid = getPid(server, accumuloHome, hostname);
if (pid.trim().isEmpty()) {
log.debug("Found no processes for {} on {}", sanitize(server.prettyPrint()),
sanitize(hostname));
return;
}
boolean isSignalNumber = false;
try {
Integer.parseInt(signal);
isSignalNumber = true;
} catch (NumberFormatException e) {}
String[] stopCmd;
if (isSignalNumber) {
stopCmd = new String[] {serverCmdPrefix, "kill", "-" + signal, pid};
} else {
stopCmd = new String[] {serverCmdPrefix, "kill", "-s", signal, pid};
}
Entry<Integer,String> pair = exec(hostname, stopCmd);
if (pair.getKey() != 0) {
throw new IOException("Signal " + signal + " to " + server + " on " + hostname
+ " failed for execute successfully. stdout=" + pair.getValue());
}
}
@Override
public void suspend(ServerType server, String hostname) throws IOException {
signal(server, hostname, "SIGSTOP");
}
@Override
public void resume(ServerType server, String hostname) throws IOException {
signal(server, hostname, "SIGCONT");
}
@Override
public void kill(ServerType server, String hostname) throws IOException {
signal(server, hostname, "SIGKILL");
}
protected String getPid(ServerType server, String accumuloHome, String hostname)
throws IOException {
String[] getPidCommand = getPidCommand(server, accumuloHome);
Entry<Integer,String> ret = exec(hostname, getPidCommand);
if (ret.getKey() != 0) {
throw new IOException(
"Could not locate PID for " + getProcessString(server) + " on " + hostname);
}
return ret.getValue();
}
protected String[] getPidCommand(ServerType server, String accumuloHome) {
// Lifted from stop-server.sh to get the PID
return new String[] {"ps", "aux", "|", "fgrep", accumuloHome, "|", "fgrep",
getProcessString(server), "|", "fgrep", "-v", "grep", "|", "fgrep", "-v", "ssh", "|", "awk",
"'{print \\$2}'", "|", "head", "-1", "|", "tr", "-d", "'\\n'"};
}
@SuppressWarnings("removal")
protected String getProcessString(ServerType server) {
switch (server) {
case TABLET_SERVER:
return "tserver";
case GARBAGE_COLLECTOR:
return "gc";
case MASTER:
case MANAGER:
return "manager";
case TRACER:
return "tracer";
case MONITOR:
return "monitor";
default:
throw new UnsupportedOperationException("Unhandled ServerType " + server);
}
}
@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN",
justification = "code runs in same security context as user who provided input file name")
protected File getClientConfDir() {
File confDir = new File(clientAccumuloConfDir);
if (!confDir.exists() || !confDir.isDirectory()) {
throw new IllegalStateException(
"Accumulo client conf dir does not exist or is not a directory: " + confDir);
}
return confDir;
}
@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN",
justification = "code runs in same security context as user who provided input file name")
protected File getServerConfDir() {
File confDir = new File(serverAccumuloConfDir);
if (!confDir.exists() || !confDir.isDirectory()) {
throw new IllegalStateException(
"Accumulo server conf dir does not exist or is not a directory: " + confDir);
}
return confDir;
}
/**
* Read hosts in file named by 'fn' in Accumulo conf dir
*/
protected List<String> getHosts(String fn) throws IOException {
return getHosts(new File(getServerConfDir(), fn));
}
/**
* Read the provided file and return all lines which don't start with a '#' character
*/
protected List<String> getHosts(File f) throws IOException {
try (BufferedReader reader = new BufferedReader(new FileReader(f, UTF_8))) {
List<String> hosts = new ArrayList<>();
String line;
while ((line = reader.readLine()) != null) {
line = line.trim();
if (!line.isEmpty() && !line.startsWith("#")) {
hosts.add(line);
}
}
return hosts;
}
}
}