blob: 310993ffb32afb1f5200effbd0c1569ada5c1225 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.functional;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.start.Main;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.VerifyIngest;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.hadoop.conf.Configuration;
import org.junit.BeforeClass;
import org.junit.Test;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
/**
* This test verifies tserver behavior when the node experiences certain failure states that cause
* it to be unresponsive to network I/O and/or disk access.
*
* <p>
* This failure state is simulated in a tserver by running that tserver with a shared library that
* hooks the read/write system calls. When the shared library sees a specific trigger file, it
* pauses the system calls and prints "sleeping", until that file is deleted, after which it proxies
* the system call to the real system implementation.
*
* <p>
* In response to failures of the type this test simulates, the tserver should recover if the system
* call is stalled for less than the ZooKeeper timeout. Otherwise, it should lose its lock in
* ZooKeeper and terminate itself.
*/
public class HalfDeadTServerIT extends ConfigurableMacBase {
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
// configure only one tserver from mini; mini won't less us configure 0, so instead, we will
// start only 1, and kill it to start our own in the desired simulation environment
cfg.setNumTservers(1);
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
cfg.setProperty(Property.GENERAL_RPC_TIMEOUT, "5s");
cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, Boolean.FALSE.toString());
}
@Override
protected int defaultTimeoutSeconds() {
return 4 * 60;
}
private static final AtomicBoolean sharedLibBuilt = new AtomicBoolean(false);
@SuppressFBWarnings(value = "COMMAND_INJECTION",
justification = "command executed is not from user input")
@BeforeClass
public static void buildSharedLib() throws IOException, InterruptedException {
String root = System.getProperty("user.dir");
String source = root + "/src/test/c/fake_disk_failure.c";
String lib = root + "/target/fake_disk_failure.so";
String platform = System.getProperty("os.name");
String[] cmd;
if (platform.equals("Darwin")) {
cmd = new String[] {"gcc", "-arch", "x86_64", "-arch", "i386", "-dynamiclib", "-O3", "-fPIC",
source, "-o", lib};
} else {
cmd = new String[] {"gcc", "-D_GNU_SOURCE", "-Wall", "-fPIC", source, "-shared", "-o", lib,
"-ldl"};
}
// inherit IO to link see the command's output on the current console
Process gcc = new ProcessBuilder(cmd).inheritIO().start();
// wait for and record whether the compilation of the native library succeeded
sharedLibBuilt.set(gcc.waitFor() == 0);
}
// a simple class to capture a launched process' output (and repeat it back)
private static class DumpOutput extends Thread {
private final Scanner lineScanner;
private final StringBuilder capturedOutput;
private final PrintStream printer;
private final String printerName;
DumpOutput(InputStream is, PrintStream out, String name) {
lineScanner = new Scanner(is);
capturedOutput = new StringBuilder();
printer = out;
printerName = name;
this.setDaemon(true);
}
@Override
public void run() {
while (lineScanner.hasNextLine()) {
String line = lineScanner.nextLine();
capturedOutput.append(line);
capturedOutput.append("\n");
printer.printf("%s(%s):%s%n", getClass().getSimpleName(), printerName, line);
}
}
public String getCaptured() {
return capturedOutput.toString();
}
}
@Test
public void testRecover() throws Exception {
test(10, false);
}
@Test
public void testTimeout() throws Exception {
String results = test(20, true);
if (results != null) {
if (!results.contains("Session expired")) {
log.info(
"Failed to find 'Session expired' in output, but TServer did die which is expected");
}
}
}
@SuppressFBWarnings(value = {"PATH_TRAVERSAL_IN", "COMMAND_INJECTION"},
justification = "path provided by test; command args provided by test")
public String test(int seconds, boolean expectTserverDied) throws Exception {
assumeTrue("Shared library did not build", sharedLibBuilt.get());
try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
while (client.instanceOperations().getTabletServers().isEmpty()) {
// wait until the tserver that we need to kill is running
Thread.sleep(50);
}
// create our own tablet server with the special test library
String javaHome = System.getProperty("java.home");
String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
String classpath = System.getProperty("java.class.path");
classpath = new File(cluster.getConfig().getDir(), "conf") + File.pathSeparator + classpath;
String className = TabletServer.class.getName();
ArrayList<String> argList = new ArrayList<>();
argList.addAll(Arrays.asList(javaBin, "-cp", classpath));
argList.addAll(Arrays.asList(Main.class.getName(), className));
ProcessBuilder builder = new ProcessBuilder(argList);
Map<String,String> env = builder.environment();
env.put("ACCUMULO_HOME", cluster.getConfig().getDir().getAbsolutePath());
env.put("ACCUMULO_LOG_DIR", cluster.getConfig().getLogDir().getAbsolutePath());
String trickFilename = cluster.getConfig().getLogDir().getAbsolutePath() + "/TRICK_FILE";
env.put("TRICK_FILE", trickFilename);
String libPath = System.getProperty("user.dir") + "/target/fake_disk_failure.so";
env.put("LD_PRELOAD", libPath);
env.put("DYLD_INSERT_LIBRARIES", libPath);
env.put("DYLD_FORCE_FLAT_NAMESPACE", "true");
Process ingest = null;
Process tserver = builder.start();
DumpOutput stderrCollector = new DumpOutput(tserver.getErrorStream(), System.err, "stderr");
DumpOutput stdoutCollector = new DumpOutput(tserver.getInputStream(), System.out, "stdout");
try {
stderrCollector.start();
stdoutCollector.start();
sleepUninterruptibly(1, TimeUnit.SECONDS);
// don't need the regular tablet server
cluster.killProcess(ServerType.TABLET_SERVER,
cluster.getProcesses().get(ServerType.TABLET_SERVER).iterator().next());
sleepUninterruptibly(1, TimeUnit.SECONDS);
client.tableOperations().create("test_ingest");
assertEquals(1, client.instanceOperations().getTabletServers().size());
int rows = 100 * 1000;
ingest =
cluster.exec(TestIngest.class, "-c", cluster.getClientPropsPath(), "--rows", rows + "")
.getProcess();
sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
// block I/O with some side-channel trickiness
File trickFile = new File(trickFilename);
try {
assertTrue(trickFile.createNewFile());
sleepUninterruptibly(seconds, TimeUnit.SECONDS);
} finally {
if (!trickFile.delete()) {
log.error("Couldn't delete {}", trickFile);
}
}
if (seconds <= 10) {
assertEquals(0, ingest.waitFor());
VerifyIngest.VerifyParams params = new VerifyIngest.VerifyParams(getClientProperties());
params.rows = rows;
VerifyIngest.verifyIngest(client, params);
} else {
sleepUninterruptibly(5, TimeUnit.SECONDS);
tserver.waitFor();
stderrCollector.join();
stdoutCollector.join();
tserver = null;
}
// verify the process was blocked
String results = stdoutCollector.getCaptured();
assertTrue(results.contains("sleeping\nsleeping\nsleeping\n"));
return results;
} finally {
if (ingest != null) {
ingest.destroy();
ingest.waitFor();
}
if (tserver != null) {
try {
if (expectTserverDied) {
try {
tserver.exitValue();
} catch (IllegalThreadStateException e) {
fail("Expected TServer to kill itself, but it is still running");
}
}
} finally {
tserver.destroy();
tserver.waitFor();
stderrCollector.join();
stdoutCollector.join();
}
}
}
}
}
}