blob: 34e44bf781caa22dc8f558d6637c663336537e5f [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.kudu.client;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
import org.apache.commons.io.FileUtils;
import org.apache.kudu.util.NetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* Utility class to start and manipulate Kudu clusters. Relies on being IN the Kudu source code with
* both the kudu-master and kudu-tserver binaries already compiled. {@link BaseKuduTest} should be
* extended instead of directly using this class in almost all cases.
*/
public class MiniKuduCluster implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(MiniKuduCluster.class);
// TS and Master ports will be assigned starting with this one.
private static final int PORT_START = 64030;
// List of threads that print
private final List<Thread> PROCESS_INPUT_PRINTERS = new ArrayList<>();
// Map of ports to master servers.
private final Map<Integer, Process> masterProcesses = new ConcurrentHashMap<>();
// Map of ports to tablet servers.
private final Map<Integer, Process> tserverProcesses = new ConcurrentHashMap<>();
// Map of ports to process command lines. Never removed from. Used to restart processes.
private final Map<Integer, String[]> commandLines = new ConcurrentHashMap<>();
private final List<String> pathsToDelete = new ArrayList<>();
private final List<HostAndPort> masterHostPorts = new ArrayList<>();
private List<Integer> tserverPorts = new ArrayList<>();
// Client we can use for common operations.
private final KuduClient syncClient;
private final int defaultTimeoutMs;
private String masterAddresses;
private MiniKuduCluster(int numMasters, int numTservers, int defaultTimeoutMs) throws Exception {
this.defaultTimeoutMs = defaultTimeoutMs;
startCluster(numMasters, numTservers);
syncClient = new KuduClient.KuduClientBuilder(getMasterAddresses())
.defaultAdminOperationTimeoutMs(defaultTimeoutMs)
.defaultOperationTimeoutMs(defaultTimeoutMs)
.build();
}
/**
* Wait up to this instance's "default timeout" for an expected count of TS to
* connect to the master.
* @param expected How many TS are expected
* @return true if there are at least as many TS as expected, otherwise false
*/
public boolean waitForTabletServers(int expected) throws Exception {
int count = 0;
Stopwatch stopwatch = Stopwatch.createStarted();
while (count < expected && stopwatch.elapsed(TimeUnit.MILLISECONDS) < defaultTimeoutMs) {
Thread.sleep(200);
count = syncClient.listTabletServers().getTabletServersCount();
}
return count >= expected;
}
/**
* Starts a Kudu cluster composed of the provided masters and tablet servers.
* @param numMasters how many masters to start
* @param numTservers how many tablet servers to start
* @throws Exception
*/
private void startCluster(int numMasters, int numTservers) throws Exception {
Preconditions.checkArgument(numMasters > 0, "Need at least one master");
Preconditions.checkArgument(numTservers > 0, "Need at least one tablet server");
// The following props are set via kudu-client's pom.
String baseDirPath = TestUtils.getBaseDir();
String localhost = TestUtils.getUniqueLocalhost();
long now = System.currentTimeMillis();
LOG.info("Starting {} masters...", numMasters);
int startPort = startMasters(PORT_START, numMasters, baseDirPath);
LOG.info("Starting {} tablet servers...", numTservers);
List<Integer> ports = TestUtils.findFreePorts(startPort, numTservers * 2);
for (int i = 0; i < numTservers; i++) {
int rpcPort = ports.get(i * 2);
tserverPorts.add(rpcPort);
String dataDirPath = baseDirPath + "/ts-" + i + "-" + now;
String flagsPath = TestUtils.getFlagsPath();
String[] tsCmdLine = {
TestUtils.findBinary("kudu-tserver"),
"--flagfile=" + flagsPath,
"--fs_wal_dir=" + dataDirPath,
"--fs_data_dirs=" + dataDirPath,
"--flush_threshold_mb=1",
"--tserver_master_addrs=" + masterAddresses,
"--webserver_interface=" + localhost,
"--local_ip_for_outbound_sockets=" + localhost,
"--webserver_port=" + (rpcPort + 1),
"--rpc_bind_addresses=" + localhost + ":" + rpcPort};
tserverProcesses.put(rpcPort, configureAndStartProcess(rpcPort, tsCmdLine));
commandLines.put(rpcPort, tsCmdLine);
if (flagsPath.startsWith(baseDirPath)) {
// We made a temporary copy of the flags; delete them later.
pathsToDelete.add(flagsPath);
}
pathsToDelete.add(dataDirPath);
}
}
/**
* Start the specified number of master servers with ports starting from a specified
* number. Finds free web and RPC ports up front for all of the masters first, then
* starts them on those ports, populating 'masters' map.
* @param masterStartPort the starting point of the port range for the masters
* @param numMasters number of masters to start
* @param baseDirPath the base directory where the mini cluster stores its data
* @return the next free port
* @throws Exception if we are unable to start the masters
*/
private int startMasters(int masterStartPort, int numMasters,
String baseDirPath) throws Exception {
LOG.info("Starting {} masters...", numMasters);
// Get the list of web and RPC ports to use for the master consensus configuration:
// request NUM_MASTERS * 2 free ports as we want to also reserve the web
// ports for the consensus configuration.
String localhost = TestUtils.getUniqueLocalhost();
List<Integer> ports = TestUtils.findFreePorts(masterStartPort, numMasters * 2);
int lastFreePort = ports.get(ports.size() - 1);
List<Integer> masterRpcPorts = Lists.newArrayListWithCapacity(numMasters);
List<Integer> masterWebPorts = Lists.newArrayListWithCapacity(numMasters);
for (int i = 0; i < numMasters * 2; i++) {
if (i % 2 == 0) {
masterRpcPorts.add(ports.get(i));
masterHostPorts.add(HostAndPort.fromParts(localhost, ports.get(i)));
} else {
masterWebPorts.add(ports.get(i));
}
}
masterAddresses = NetUtil.hostsAndPortsToString(masterHostPorts);
long now = System.currentTimeMillis();
for (int i = 0; i < numMasters; i++) {
int port = masterRpcPorts.get(i);
String dataDirPath = baseDirPath + "/master-" + i + "-" + now;
String flagsPath = TestUtils.getFlagsPath();
// The web port must be reserved in the call to findFreePorts above and specified
// to avoid the scenario where:
// 1) findFreePorts finds RPC ports a, b, c for the 3 masters.
// 2) start master 1 with RPC port and let it bind to any (specified as 0) web port.
// 3) master 1 happens to bind to port b for the web port, as master 2 hasn't been
// started yet and findFreePort(s) is "check-time-of-use" (it does not reserve the
// ports, only checks that when it was last called, these ports could be used).
List<String> masterCmdLine = Lists.newArrayList(
TestUtils.findBinary("kudu-master"),
"--flagfile=" + flagsPath,
"--fs_wal_dir=" + dataDirPath,
"--fs_data_dirs=" + dataDirPath,
"--webserver_interface=" + localhost,
"--local_ip_for_outbound_sockets=" + localhost,
"--rpc_bind_addresses=" + localhost + ":" + port,
"--webserver_port=" + masterWebPorts.get(i),
"--raft_heartbeat_interval_ms=200"); // make leader elections faster for faster tests
if (numMasters > 1) {
masterCmdLine.add("--master_addresses=" + masterAddresses);
}
String[] commandLine = masterCmdLine.toArray(new String[masterCmdLine.size()]);
masterProcesses.put(port, configureAndStartProcess(port, commandLine));
commandLines.put(port, commandLine);
if (flagsPath.startsWith(baseDirPath)) {
// We made a temporary copy of the flags; delete them later.
pathsToDelete.add(flagsPath);
}
pathsToDelete.add(dataDirPath);
}
return lastFreePort + 1;
}
/**
* Starts a process using the provided command and configures it to be daemon,
* redirects the stderr to stdout, and starts a thread that will read from the process' input
* stream and redirect that to LOG.
* @param port rpc port used to identify the process
* @param command process and options
* @return The started process
* @throws Exception Exception if an error prevents us from starting the process,
* or if we were able to start the process but noticed that it was then killed (in which case
* we'll log the exit value).
*/
private Process configureAndStartProcess(int port, String[] command) throws Exception {
LOG.info("Starting process: {}", Joiner.on(" ").join(command));
ProcessBuilder processBuilder = new ProcessBuilder(command);
processBuilder.redirectErrorStream(true);
Process proc = processBuilder.start();
ProcessInputStreamLogPrinterRunnable printer =
new ProcessInputStreamLogPrinterRunnable(proc.getInputStream());
Thread thread = new Thread(printer);
thread.setDaemon(true);
thread.setName(Iterables.getLast(Splitter.on(File.separatorChar).split(command[0])) + ":" + port);
PROCESS_INPUT_PRINTERS.add(thread);
thread.start();
Thread.sleep(300);
try {
int ev = proc.exitValue();
throw new Exception("We tried starting a process (" + command[0] + ") but it exited with " +
"value=" + ev);
} catch (IllegalThreadStateException ex) {
// This means the process is still alive, it's like reverse psychology.
}
return proc;
}
/**
* Starts a previously killed master process on the specified port.
* @param port which port the master was listening on for RPCs
* @throws Exception
*/
public void restartDeadMasterOnPort(int port) throws Exception {
restartDeadProcessOnPort(port, masterProcesses);
}
/**
* Starts a previously killed tablet server process on the specified port.
* @param port which port the TS was listening on for RPCs
* @throws Exception
*/
public void restartDeadTabletServerOnPort(int port) throws Exception {
restartDeadProcessOnPort(port, tserverProcesses);
}
private void restartDeadProcessOnPort(int port, Map<Integer, Process> map) throws Exception {
if (!commandLines.containsKey(port)) {
String message = "Cannot start process on unknown port " + port;
LOG.warn(message);
throw new RuntimeException(message);
}
if (map.containsKey(port)) {
String message = "Process already exists on port " + port;
LOG.warn(message);
throw new RuntimeException(message);
}
String[] commandLine = commandLines.get(port);
map.put(port, configureAndStartProcess(port, commandLine));
}
/**
* Kills the TS listening on the provided port. Doesn't do anything if the TS was already killed.
* @param port port on which the tablet server is listening on
* @throws InterruptedException
*/
public void killTabletServerOnPort(int port) throws InterruptedException {
Process ts = tserverProcesses.remove(port);
if (ts == null) {
// The TS is already dead, good.
return;
}
LOG.info("Killing server at port " + port);
ts.destroy();
ts.waitFor();
}
/**
* Kills all tablet servers.
* @throws InterruptedException
*/
public void killTabletServers() throws InterruptedException {
for (Process tserver : tserverProcesses.values()) {
tserver.destroy();
tserver.waitFor();
}
tserverProcesses.clear();
}
/**
* Restarts the dead tablet servers on the port.
* @throws Exception
*/
public void restartDeadTabletServers() throws Exception {
for (int port : tserverPorts) {
restartDeadTabletServerOnPort(port);
}
}
/**
* Kills the master listening on the provided port. Doesn't do anything if the master was
* already killed.
* @param port port on which the master is listening on
* @throws InterruptedException
*/
public void killMasterOnPort(int port) throws InterruptedException {
Process master = masterProcesses.remove(port);
if (master == null) {
// The master is already dead, good.
return;
}
LOG.info("Killing master at port " + port);
master.destroy();
master.waitFor();
}
/**
* See {@link #shutdown()}.
* @throws Exception never thrown, exceptions are logged
*/
@Override
public void close() throws Exception {
shutdown();
}
/**
* Stops all the processes and deletes the folders used to store data and the flagfile.
*/
public void shutdown() {
for (Iterator<Process> masterIter = masterProcesses.values().iterator(); masterIter.hasNext(); ) {
masterIter.next().destroy();
masterIter.remove();
}
for (Iterator<Process> tsIter = tserverProcesses.values().iterator(); tsIter.hasNext(); ) {
tsIter.next().destroy();
tsIter.remove();
}
for (Thread thread : PROCESS_INPUT_PRINTERS) {
thread.interrupt();
}
for (String path : pathsToDelete) {
try {
File f = new File(path);
if (f.isDirectory()) {
FileUtils.deleteDirectory(f);
} else {
f.delete();
}
} catch (Exception e) {
LOG.warn("Could not delete path {}", path, e);
}
}
}
/**
* Returns the comma-separated list of master addresses.
* @return master addresses
*/
public String getMasterAddresses() {
return masterAddresses;
}
/**
* Returns a list of master addresses.
* @return master addresses
*/
public List<HostAndPort> getMasterHostPorts() {
return masterHostPorts;
}
/**
* Returns an unmodifiable map of all tablet servers in pairs of RPC port - > Process.
* @return an unmodifiable map of all tablet servers
*/
@VisibleForTesting
Map<Integer, Process> getTabletServerProcesses() {
return Collections.unmodifiableMap(tserverProcesses);
}
/**
* Returns an unmodifiable map of all masters in pairs of RPC port - > Process.
* @return an unmodifiable map of all masters
*/
@VisibleForTesting
Map<Integer, Process> getMasterProcesses() {
return Collections.unmodifiableMap(masterProcesses);
}
/**
* Helper runnable that receives stdout and logs it along with the process' identifier.
*/
private static class ProcessInputStreamLogPrinterRunnable implements Runnable {
private final InputStream is;
public ProcessInputStreamLogPrinterRunnable(InputStream is) {
this.is = is;
}
@Override
public void run() {
try {
String line;
BufferedReader in = new BufferedReader(new InputStreamReader(is));
while ((line = in.readLine()) != null) {
LOG.info(line);
}
in.close();
}
catch (Exception e) {
if (!e.getMessage().contains("Stream closed")) {
LOG.error("Caught error while reading a process' output", e);
}
}
}
}
public static class MiniKuduClusterBuilder {
private int numMasters = 1;
private int numTservers = 3;
private int defaultTimeoutMs = 50000;
public MiniKuduClusterBuilder numMasters(int numMasters) {
this.numMasters = numMasters;
return this;
}
public MiniKuduClusterBuilder numTservers(int numTservers) {
this.numTservers = numTservers;
return this;
}
/**
* Configures the internal client to use the given timeout for all operations. Also uses the
* timeout for tasks like waiting for tablet servers to check in with the master.
* @param defaultTimeoutMs timeout in milliseconds
* @return this instance
*/
public MiniKuduClusterBuilder defaultTimeoutMs(int defaultTimeoutMs) {
this.defaultTimeoutMs = defaultTimeoutMs;
return this;
}
public MiniKuduCluster build() throws Exception {
return new MiniKuduCluster(numMasters, numTservers, defaultTimeoutMs);
}
}
}