blob: 731902a65b6764911b25d01b17c5d36f6a58d829 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import jnr.constants.platform.Signal;
import jnr.posix.POSIXFactory;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.Connection;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
/**
* Monkeys and chaos.
*/
// TODO: clean up this test
public class ChaosMonkeyIntegrationTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ChaosMonkeyIntegrationTest.class);
private static final String TOTAL_RECORD_COUNT = "1000";
private static final String SEGMENT_COUNT = "25";
private static final String AVRO_DIR = "/tmp/temp-avro-" + ChaosMonkeyIntegrationTest.class.getName();
private static final String SEGMENT_DIR = "/tmp/temp-segment-" + ChaosMonkeyIntegrationTest.class.getName();
private final List<Process> _processes = new ArrayList<>();
private Process runAdministratorCommand(String[] args) {
String classpath = System.getProperty("java.class.path");
System.getProperties().setProperty("pinot.admin.system.exit", "false");
List<String> completeArgs = new ArrayList<>();
completeArgs.add("java");
completeArgs.add("-Xms4G");
completeArgs.add("-Xmx4G");
completeArgs.add("-cp");
completeArgs.add(classpath);
completeArgs.add(PinotAdministrator.class.getName());
completeArgs.addAll(Arrays.asList(args));
try {
Process process = new ProcessBuilder(completeArgs).redirectError(ProcessBuilder.Redirect.INHERIT).
redirectOutput(ProcessBuilder.Redirect.INHERIT).start();
synchronized (_processes) {
_processes.add(process);
}
return process;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void sendSignalToProcess(Process process, Signal signal) {
int processPid = getProcessPid(process);
if (processPid != -1) {
LOGGER.info("Sending signal {} to process {}", signal.intValue(), processPid);
POSIXFactory.getNativePOSIX().kill(processPid, signal.intValue());
}
}
private int getProcessPid(Process process) {
try {
Class<?> processHandle =
Class.forName("java.lang.ProcessHandle", false, ChaosMonkeyIntegrationTest.class.getClassLoader());
return (int) MethodHandles.lookup().findStatic(processHandle, "pid", MethodType.methodType(long.class))
.invoke();
} catch (Throwable t) {
LOGGER.warn("failed to invoke java.lang.ProcessHandle#pid - running on JDK8?", t);
Class<? extends Process> clazz = process.getClass();
try {
Field field = clazz.getDeclaredField("pid");
field.setAccessible(true);
return field.getInt(process);
} catch (NoSuchFieldException | IllegalAccessException e) {
return -1;
}
}
}
private Process startZookeeper() {
return runAdministratorCommand(new String[]{"StartZookeeper", "-zkPort", "2191"});
}
private Process startController() {
return runAdministratorCommand(new String[]{
"StartController", "-zkAddress", "localhost:2191", "-controllerPort", "39000", "-dataDir",
"/tmp/ChaosMonkeyClusterController"
});
}
private Process startBroker() {
return runAdministratorCommand(new String[]{"StartBroker", "-brokerPort", "8099", "-zkAddress", "localhost:2191"});
}
private Process startServer() {
return runAdministratorCommand(new String[]{
"StartServer", "-serverPort", "8098", "-zkAddress", "localhost:2191", "-dataDir",
"/tmp/ChaosMonkeyCluster/data", "-segmentDir", "/tmp/ChaosMonkeyCluster/segments"
});
}
private void generateData()
throws InterruptedException {
String schemaFile = TestUtils.getFileFromResourceUrl(ChaosMonkeyIntegrationTest.class.getClassLoader().
getResource("chaos-monkey-schema.json"));
String schemaAnnotationsFile = TestUtils.getFileFromResourceUrl(ChaosMonkeyIntegrationTest.class.getClassLoader().
getResource("chaos-monkey-schema-annotations.json"));
runAdministratorCommand(new String[]{
"GenerateData", "-numRecords", TOTAL_RECORD_COUNT, "-numFiles", SEGMENT_COUNT, "-schemaFile", schemaFile,
"-schemaAnnotationFile", schemaAnnotationsFile, "-overwrite", "-outDir", AVRO_DIR
}).waitFor();
}
private void createTable()
throws InterruptedException {
String schemaFile = TestUtils.getFileFromResourceUrl(ChaosMonkeyIntegrationTest.class.getClassLoader().
getResource("chaos-monkey-schema.json"));
String createTableFile = TestUtils.getFileFromResourceUrl(ChaosMonkeyIntegrationTest.class.getClassLoader().
getResource("chaos-monkey-create-table.json"));
runAdministratorCommand(new String[]{
"AddTable", "-controllerPort", "39000", "-schemaFile", schemaFile, "-tableConfigFile", createTableFile, "-exec"
}).waitFor();
}
private void convertData()
throws InterruptedException {
String schemaFile = TestUtils.getFileFromResourceUrl(ChaosMonkeyIntegrationTest.class.getClassLoader().
getResource("chaos-monkey-schema.json"));
runAdministratorCommand(new String[]{
"CreateSegment", "-schemaFile", schemaFile, "-dataDir", AVRO_DIR, "-tableName", "myTable", "-segmentName",
"my_table", "-outDir", SEGMENT_DIR, "-overwrite"
}).waitFor();
}
private void uploadData()
throws InterruptedException {
runAdministratorCommand(new String[]{"UploadSegment", "-controllerPort", "39000", "-segmentDir", SEGMENT_DIR})
.waitFor();
}
private int countRecords() {
Connection connection = ConnectionFactory.fromHostList("localhost:8099");
return connection.execute("select count(*) from myTable").getResultSet(0).getInt(0);
}
@Test(enabled = false)
public void testShortZookeeperFreeze()
throws Exception {
testFreezeZookeeper(10000L);
}
@Test(enabled = false)
public void testLongZookeeperFreeze()
throws Exception {
testFreezeZookeeper(60000L);
}
public void testFreezeZookeeper(long freezeLength)
throws Exception {
Process zookeeper = startZookeeper();
Thread.sleep(1000L);
startController();
Thread.sleep(3000L);
startServer();
startBroker();
Thread.sleep(3000L);
createTable();
generateData();
convertData();
uploadData();
Thread.sleep(5000L);
long timeInTwoMinutes = System.currentTimeMillis() + 120000L;
int currentRecordCount = countRecords();
int expectedRecordCount = Integer.parseInt(TOTAL_RECORD_COUNT);
while (currentRecordCount != expectedRecordCount && System.currentTimeMillis() < timeInTwoMinutes) {
Thread.sleep(1000L);
currentRecordCount = countRecords();
}
Assert.assertEquals(currentRecordCount, expectedRecordCount, "All segments did not load within 120 seconds");
sendSignalToProcess(zookeeper, Signal.SIGSTOP);
Thread.sleep(freezeLength);
sendSignalToProcess(zookeeper, Signal.SIGCONT);
Thread.sleep(5000L);
timeInTwoMinutes = System.currentTimeMillis() + 120000L;
currentRecordCount = countRecords();
while (currentRecordCount != expectedRecordCount && System.currentTimeMillis() < timeInTwoMinutes) {
Thread.sleep(1000L);
currentRecordCount = countRecords();
}
Assert.assertEquals(currentRecordCount, expectedRecordCount,
"Record count still inconsistent 120 seconds after zookeeper restart");
}
@AfterMethod
public void tearDown() {
for (Process process : _processes) {
process.destroy();
}
FileUtils.deleteQuietly(new File(AVRO_DIR));
FileUtils.deleteQuietly(new File(SEGMENT_DIR));
}
}