blob: f91a6674436c7bf3c5bfb5d490e8acea240b675a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.replicator.nrt;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.SuppressForbidden;
import org.apache.lucene.util.TestRuleIgnoreTestSuites;
import org.apache.lucene.util.TestUtil;
import org.junit.Assume;
import org.junit.BeforeClass;
/** Child process with silly naive TCP socket server to handle
* between-node commands, launched for each node by TestNRTReplication. */
@SuppressCodecs({"MockRandom", "Direct", "SimpleText"})
@SuppressSysoutChecks(bugUrl = "Stuff gets printed, important stuff for debugging a failure")
@SuppressForbidden(reason = "We need Unsafe to actually crush :-)")
public class TestSimpleServer extends LuceneTestCase {
final static Set<Thread> clientThreads = Collections.synchronizedSet(new HashSet<>());
final static AtomicBoolean stop = new AtomicBoolean();
/** Handles one client connection */
private static class ClientHandler extends Thread {
// We hold this just so we can close it to exit the process:
private final ServerSocket ss;
private final Socket socket;
private final Node node;
private final int bufferSize;
public ClientHandler(ServerSocket ss, Node node, Socket socket) {
this.ss = ss;
this.node = node;
this.socket = socket;
this.bufferSize = TestUtil.nextInt(random(), 128, 65536);
if (Node.VERBOSE_CONNECTIONS) {
node.message("new connection socket=" + socket);
}
}
@Override
public void run() {
boolean success = false;
try {
//node.message("using stream buffer size=" + bufferSize);
InputStream is = new BufferedInputStream(socket.getInputStream(), bufferSize);
DataInput in = new InputStreamDataInput(is);
BufferedOutputStream bos = new BufferedOutputStream(socket.getOutputStream(), bufferSize);
DataOutput out = new OutputStreamDataOutput(bos);
if (node instanceof SimplePrimaryNode) {
((SimplePrimaryNode) node).handleOneConnection(random(), ss, stop, is, socket, in, out, bos);
} else {
((SimpleReplicaNode) node).handleOneConnection(ss, stop, is, socket, in, out, bos);
}
bos.flush();
if (Node.VERBOSE_CONNECTIONS) {
node.message("bos.flush done");
}
success = true;
} catch (Throwable t) {
if (t instanceof SocketException == false && t instanceof NodeCommunicationException == false) {
node.message("unexpected exception handling client connection; now failing test:");
t.printStackTrace(System.out);
IOUtils.closeWhileHandlingException(ss);
// Test should fail with this:
throw new RuntimeException(t);
} else {
node.message("exception handling client connection; ignoring:");
t.printStackTrace(System.out);
}
} finally {
if (success) {
try {
IOUtils.close(socket);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
} else {
IOUtils.closeWhileHandlingException(socket);
}
}
if (Node.VERBOSE_CONNECTIONS) {
node.message("socket.close done");
}
}
}
/**
* currently, this only works/tested on Sun and IBM.
*/
// poached from TestIndexWriterOnJRECrash ... should we factor out to TestUtil? seems dangerous to give it such "publicity"?
private static void crashJRE() {
final String vendor = Constants.JAVA_VENDOR;
final boolean supportsUnsafeNpeDereference =
vendor.startsWith("Oracle") ||
vendor.startsWith("Sun") ||
vendor.startsWith("Apple");
try {
if (supportsUnsafeNpeDereference) {
try {
Class<?> clazz = Class.forName("sun.misc.Unsafe");
java.lang.reflect.Field field = clazz.getDeclaredField("theUnsafe");
field.setAccessible(true);
Object o = field.get(null);
Method m = clazz.getMethod("putAddress", long.class, long.class);
m.invoke(o, 0L, 0L);
} catch (Throwable e) {
System.out.println("Couldn't kill the JVM via Unsafe.");
e.printStackTrace(System.out);
}
}
// Fallback attempt to Runtime.halt();
Runtime.getRuntime().halt(-1);
} catch (Exception e) {
System.out.println("Couldn't kill the JVM.");
e.printStackTrace(System.out);
}
// We couldn't get the JVM to crash for some reason.
throw new RuntimeException("JVM refuses to die!");
}
static void writeFilesMetaData(DataOutput out, Map<String,FileMetaData> files) throws IOException {
out.writeVInt(files.size());
for(Map.Entry<String,FileMetaData> ent : files.entrySet()) {
out.writeString(ent.getKey());
FileMetaData fmd = ent.getValue();
out.writeVLong(fmd.length);
out.writeVLong(fmd.checksum);
out.writeVInt(fmd.header.length);
out.writeBytes(fmd.header, 0, fmd.header.length);
out.writeVInt(fmd.footer.length);
out.writeBytes(fmd.footer, 0, fmd.footer.length);
}
}
static Map<String,FileMetaData> readFilesMetaData(DataInput in) throws IOException {
int fileCount = in.readVInt();
//System.out.println("readFilesMetaData: fileCount=" + fileCount);
Map<String,FileMetaData> files = new HashMap<>();
for(int i=0;i<fileCount;i++) {
String fileName = in.readString();
//System.out.println("readFilesMetaData: fileName=" + fileName);
long length = in.readVLong();
long checksum = in.readVLong();
byte[] header = new byte[in.readVInt()];
in.readBytes(header, 0, header.length);
byte[] footer = new byte[in.readVInt()];
in.readBytes(footer, 0, footer.length);
files.put(fileName, new FileMetaData(header, footer, length, checksum));
}
return files;
}
/** Pulls CopyState off the wire */
static CopyState readCopyState(DataInput in) throws IOException {
// Decode a new CopyState
byte[] infosBytes = new byte[in.readVInt()];
in.readBytes(infosBytes, 0, infosBytes.length);
long gen = in.readVLong();
long version = in.readVLong();
Map<String,FileMetaData> files = readFilesMetaData(in);
int count = in.readVInt();
Set<String> completedMergeFiles = new HashSet<>();
for(int i=0;i<count;i++) {
completedMergeFiles.add(in.readString());
}
long primaryGen = in.readVLong();
return new CopyState(files, version, gen, infosBytes, completedMergeFiles, primaryGen, null);
}
@BeforeClass
public static void ensureNested() {
Assume.assumeTrue(TestRuleIgnoreTestSuites.isRunningNested());
}
@SuppressWarnings("try")
public void test() throws Exception {
int id = Integer.parseInt(System.getProperty("tests.nrtreplication.nodeid"));
Thread.currentThread().setName("main child " + id);
Path indexPath = Paths.get(System.getProperty("tests.nrtreplication.indexpath"));
boolean isPrimary = System.getProperty("tests.nrtreplication.isPrimary") != null;
int primaryTCPPort;
long forcePrimaryVersion;
if (isPrimary == false) {
forcePrimaryVersion = -1;
primaryTCPPort = Integer.parseInt(System.getProperty("tests.nrtreplication.primaryTCPPort"));
} else {
primaryTCPPort = -1;
forcePrimaryVersion = Long.parseLong(System.getProperty("tests.nrtreplication.forcePrimaryVersion"));
}
long primaryGen = Long.parseLong(System.getProperty("tests.nrtreplication.primaryGen"));
Node.globalStartNS = Long.parseLong(System.getProperty("tests.nrtreplication.startNS"));
boolean doRandomCrash = "true".equals(System.getProperty("tests.nrtreplication.doRandomCrash"));
boolean doRandomClose = "true".equals(System.getProperty("tests.nrtreplication.doRandomClose"));
boolean doFlipBitsDuringCopy = "true".equals(System.getProperty("tests.nrtreplication.doFlipBitsDuringCopy"));
boolean doCheckIndexOnClose = "true".equals(System.getProperty("tests.nrtreplication.checkonclose"));
// Create server socket that we listen for incoming requests on:
try (final ServerSocket ss = new ServerSocket(0, 0, InetAddress.getLoopbackAddress())) {
int tcpPort = ((InetSocketAddress) ss.getLocalSocketAddress()).getPort();
System.out.println("\nPORT: " + tcpPort);
final Node node;
if (isPrimary) {
node = new SimplePrimaryNode(random(), indexPath, id, tcpPort, primaryGen, forcePrimaryVersion, null, doFlipBitsDuringCopy, doCheckIndexOnClose);
System.out.println("\nCOMMIT VERSION: " + ((PrimaryNode) node).getLastCommitVersion());
} else {
try {
node = new SimpleReplicaNode(random(), id, tcpPort, indexPath, primaryGen, primaryTCPPort, null, doCheckIndexOnClose);
} catch (RuntimeException re) {
if (re.getMessage().startsWith("replica cannot start")) {
// this is "OK": it means MDW's refusal to delete a segments_N commit point means we cannot start:
assumeTrue(re.getMessage(), false);
}
throw re;
}
}
System.out.println("\nINFOS VERSION: " + node.getCurrentSearchingVersion());
if (doRandomClose || doRandomCrash) {
final int waitForMS;
if (isPrimary) {
waitForMS = TestUtil.nextInt(random(), 20000, 60000);
} else {
waitForMS = TestUtil.nextInt(random(), 5000, 60000);
}
boolean doClose;
if (doRandomCrash == false) {
doClose = true;
} else if (doRandomClose) {
doClose = random().nextBoolean();
} else {
doClose = false;
}
if (doClose) {
node.message("top: will close after " + (waitForMS/1000.0) + " seconds");
} else {
node.message("top: will crash after " + (waitForMS/1000.0) + " seconds");
}
Thread t = new Thread() {
@Override
public void run() {
long endTime = System.nanoTime() + waitForMS*1000000L;
while (System.nanoTime() < endTime) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
if (stop.get()) {
break;
}
}
if (stop.get() == false) {
if (doClose) {
try {
node.message("top: now force close server socket after " + (waitForMS/1000.0) + " seconds");
node.state = "top-closing";
ss.close();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
} else {
node.message("top: now crash JVM after " + (waitForMS/1000.0) + " seconds");
crashJRE();
}
}
}
};
if (isPrimary) {
t.setName("crasher P" + id);
} else {
t.setName("crasher R" + id);
}
// So that if node exits naturally, this thread won't prevent process exit:
t.setDaemon(true);
t.start();
}
System.out.println("\nNODE STARTED");
//List<Thread> clientThreads = new ArrayList<>();
// Naive thread-per-connection server:
while (true) {
Socket socket;
try {
socket = ss.accept();
} catch (SocketException se) {
// when ClientHandler closes our ss we will hit this
node.message("top: server socket exc; now exit");
break;
}
Thread thread = new ClientHandler(ss, node, socket);
thread.setDaemon(true);
thread.start();
clientThreads.add(thread);
// Prune finished client threads:
Iterator<Thread> it = clientThreads.iterator();
while (it.hasNext()) {
Thread t = it.next();
if (t.isAlive() == false) {
it.remove();
}
}
//node.message(clientThreads.size() + " client threads are still alive");
}
stop.set(true);
// Make sure all client threads are done, else we get annoying (yet ultimately "harmless") messages about threads still running /
// lingering for them to finish from the child processes:
for(Thread clientThread : clientThreads) {
node.message("top: join clientThread=" + clientThread);
clientThread.join();
node.message("top: done join clientThread=" + clientThread);
}
node.message("done join all client threads; now close node");
node.close();
}
}
}