blob: c5d508be21f10ccc17cd70db3e88f5e4321f2120 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.test;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.rest.WebServer;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.lang.reflect.Field;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.file.Path;
import java.util.Collection;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@Category({SlowTest.class})
public class TestGracefulShutdown extends BaseTestQuery {
@Rule
public final TestRule TIMEOUT = TestTools.getTimeoutRule(120_000);
@BeforeClass
public static void setUpTestData() throws Exception {
for( int i = 0; i < 300; i++) {
setupFile(i);
}
}
private static void enableWebServer(ClusterFixtureBuilder builder) {
enableDrillPortHunting(builder);
builder.configBuilder.put(ExecConstants.HTTP_ENABLE, true);
builder.configBuilder.put(ExecConstants.HTTP_PORT_HUNT, true);
builder.sessionOption(ExecConstants.SLICE_TARGET, 10);
}
private static void enableDrillPortHunting(ClusterFixtureBuilder builder) {
builder.configBuilder.put(ExecConstants.DRILL_PORT_HUNT, true);
builder.configBuilder.put(ExecConstants.GRACE_PERIOD, 500);
builder.configBuilder.put(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, true);
}
/*
Start multiple drillbits and then shutdown a drillbit. Query the online
endpoints and check if the drillbit still exists.
*/
@Test
public void testOnlineEndPoints() throws Exception {
String[] drillbits = {"db1", "db2", "db3"};
ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits);
enableDrillPortHunting(builder);
try ( ClusterFixture cluster = builder.build()) {
Drillbit drillbit = cluster.drillbit("db2");
int zkRefresh = drillbit.getContext().getConfig().getInt(ExecConstants.ZK_REFRESH);
DrillbitEndpoint drillbitEndpoint = drillbit.getRegistrationHandle().getEndPoint();
cluster.closeDrillbit("db2");
while (true) {
Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit()
.getContext()
.getClusterCoordinator()
.getOnlineEndPoints();
if (!drillbitEndpoints.contains(drillbitEndpoint)) {
// Success
return;
}
Thread.sleep(zkRefresh);
}
}
}
/*
Test shutdown through RestApi
*/
@Test
public void testRestApi() throws Exception {
String[] drillbits = {"db1", "db2", "db3"};
ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits);
enableWebServer(builder);
QueryBuilder.QuerySummaryFuture listener;
final String sql = "select * from dfs.root.`.`";
try (ClusterFixture cluster = builder.build();
final ClientFixture client = cluster.clientFixture()) {
Drillbit drillbit = cluster.drillbit("db1");
int port = drillbit.getWebServerPort();
int zkRefresh = drillbit.getContext().getConfig().getInt(ExecConstants.ZK_REFRESH);
listener = client.queryBuilder().sql(sql).futureSummary();
URL url = new URL("http://localhost:" + port + "/gracefulShutdown");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
if (conn.getResponseCode() != 200) {
throw new RuntimeException("Failed : HTTP error code : "
+ conn.getResponseCode());
}
while (true) {
if (listener.isDone()) {
break;
}
Thread.sleep(100L);
}
if (waitAndAssertDrillbitCount(cluster, zkRefresh, drillbits.length)) {
return;
}
Assert.fail("Timed out");
}
}
/*
Test default shutdown through RestApi
*/
@Test
public void testRestApiShutdown() throws Exception {
String[] drillbits = {"db1", "db2", "db3"};
ClusterFixtureBuilder builder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk().withBits(drillbits);
enableWebServer(builder);
QueryBuilder.QuerySummaryFuture listener;
final String sql = "select * from dfs.root.`.`";
try (ClusterFixture cluster = builder.build();
final ClientFixture client = cluster.clientFixture()) {
Drillbit drillbit = cluster.drillbit("db1");
int port = drillbit.getWebServerPort();
int zkRefresh = drillbit.getContext().getConfig().getInt(ExecConstants.ZK_REFRESH);
listener = client.queryBuilder().sql(sql).futureSummary();
while (true) {
if (listener.isDone()) {
break;
}
Thread.sleep(100L);
}
URL url = new URL("http://localhost:" + port + "/shutdown");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
if (conn.getResponseCode() != 200) {
throw new RuntimeException("Failed : HTTP error code : "
+ conn.getResponseCode());
}
if (waitAndAssertDrillbitCount(cluster, zkRefresh, drillbits.length)) {
return;
}
Assert.fail("Timed out");
}
}
@Test // DRILL-6912
public void testDrillbitWithSamePortContainsShutdownThread() throws Exception {
ClusterFixtureBuilder fixtureBuilder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk()
.configProperty(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, true)
.configProperty(ExecConstants.INITIAL_USER_PORT, QueryTestUtil.getFreePortNumber(31170, 300))
.configProperty(ExecConstants.INITIAL_BIT_PORT, QueryTestUtil.getFreePortNumber(31180, 300));
try (ClusterFixture fixture = fixtureBuilder.build();
Drillbit drillbitWithSamePort = new Drillbit(fixture.config(),
fixtureBuilder.configBuilder().getDefinitions(), fixture.serviceSet())) {
// Assert preconditions :
// 1. First drillbit instance should be started normally
// 2. Second instance startup should fail, because ports are occupied by the first one
assertNotNull("First drillbit instance should be initialized", fixture.drillbit());
try {
drillbitWithSamePort.run();
fail("Invocation of 'drillbitWithSamePort.run()' should throw UserException");
} catch (UserException e) {
assertThat(e.getMessage(), containsString("RESOURCE ERROR: Drillbit could not bind to port"));
// Ensure that drillbit with failed startup may be safely closed
assertNotNull("Drillbit.gracefulShutdownThread shouldn't be null, otherwise close() may throw NPE (if so, check suppressed exception).",
drillbitWithSamePort.getGracefulShutdownThread());
}
}
}
@Test // DRILL-7056
public void testDrillbitTempDir() throws Exception {
File originalDrillbitTempDir = null;
ClusterFixtureBuilder fixtureBuilder = ClusterFixture.bareBuilder(dirTestWatcher).withLocalZk()
.configProperty(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, true)
.configProperty(ExecConstants.INITIAL_USER_PORT, QueryTestUtil.getFreePortNumber(31170, 300))
.configProperty(ExecConstants.INITIAL_BIT_PORT, QueryTestUtil.getFreePortNumber(31180, 300));
try (ClusterFixture fixture = fixtureBuilder.build();
Drillbit twinDrillbitOnSamePort = new Drillbit(fixture.config(),
fixtureBuilder.configBuilder().getDefinitions(), fixture.serviceSet())) {
// Assert preconditions :
// 1. First drillbit instance should be started normally
// 2. Second instance startup should fail, because ports are occupied by the first one
Drillbit originalDrillbit = fixture.drillbit();
assertNotNull("First drillbit instance should be initialized", originalDrillbit);
originalDrillbitTempDir = getWebServerTempDirPath(originalDrillbit);
assertTrue("First drillbit instance should have a temporary Javascript dir initialized", originalDrillbitTempDir.exists());
try {
twinDrillbitOnSamePort.run();
fail("Invocation of 'twinDrillbitOnSamePort.run()' should throw UserException");
} catch (UserException userEx) {
assertThat(userEx.getMessage(), containsString("RESOURCE ERROR: Drillbit could not bind to port"));
}
}
// Verify deletion
assertFalse("First drillbit instance should have a temporary Javascript dir deleted", originalDrillbitTempDir.exists());
}
private File getWebServerTempDirPath(Drillbit drillbit) throws IllegalAccessException {
Field webServerField = FieldUtils.getField(drillbit.getClass(), "webServer", true);
WebServer webServerHandle = (WebServer) FieldUtils.readField(webServerField, drillbit, true);
return webServerHandle.getOrCreateTmpJavaScriptDir();
}
private boolean waitAndAssertDrillbitCount(ClusterFixture cluster, int zkRefresh, int bitsNum)
throws InterruptedException {
while (true) {
Collection<DrillbitEndpoint> drillbitEndpoints = cluster.drillbit()
.getContext()
.getClusterCoordinator()
.getAvailableEndpoints();
if (drillbitEndpoints.size() == bitsNum - 1) {
return true;
}
Thread.sleep(zkRefresh);
}
}
private static void setupFile(int file_num) throws Exception {
final String file = "employee"+file_num+".json";
final Path path = dirTestWatcher.getRootDir().toPath().resolve(file);
try(PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(path.toFile(), true)))) {
out.println("{\"employee_id\":1,\"full_name\":\"Sheri Nowmer\",\"first_name\":\"Sheri\",\"last_name\":\"Nowmer\",\"position_id\":1,\"position_title\":\"President\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1961-08-26\",\"hire_date\":\"1994-12-01 00:00:00.0\",\"end_date\":null,\"salary\":80000.0000,\"supervisor_id\":0,\"education_level\":\"Graduate Degree\",\"marital_status\":\"S\",\"gender\":\"F\",\"management_role\":\"Senior Management\"}\n" +
"{\"employee_id\":2,\"full_name\":\"Derrick Whelply\",\"first_name\":\"Derrick\",\"last_name\":\"Whelply\",\"position_id\":2,\"position_title\":\"VP Country Manager\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1915-07-03\",\"hire_date\":\"1994-12-01 00:00:00.0\",\"end_date\":null,\"salary\":40000.0000,\"supervisor_id\":1,\"education_level\":\"Graduate Degree\",\"marital_status\":\"M\",\"gender\":\"M\",\"management_role\":\"Senior Management\"}\n" +
"{\"employee_id\":4,\"full_name\":\"Michael Spence\",\"first_name\":\"Michael\",\"last_name\":\"Spence\",\"position_id\":2,\"position_title\":\"VP Country Manager\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1969-06-20\",\"hire_date\":\"1998-01-01 00:00:00.0\",\"end_date\":null,\"salary\":40000.0000,\"supervisor_id\":1,\"education_level\":\"Graduate Degree\",\"marital_status\":\"S\",\"gender\":\"M\",\"management_role\":\"Senior Management\"}\n" +
"{\"employee_id\":5,\"full_name\":\"Maya Gutierrez\",\"first_name\":\"Maya\",\"last_name\":\"Gutierrez\",\"position_id\":2,\"position_title\":\"VP Country Manager\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1951-05-10\",\"hire_date\":\"1998-01-01 00:00:00.0\",\"end_date\":null,\"salary\":35000.0000,\"supervisor_id\":1,\"education_level\":\"Bachelors Degree\",\"marital_status\":\"M\",\"gender\":\"F\",\"management_role\":\"Senior Management\"}\n" +
"{\"employee_id\":6,\"full_name\":\"Roberta Damstra\",\"first_name\":\"Roberta\",\"last_name\":\"Damstra\",\"position_id\":3,\"position_title\":\"VP Information Systems\",\"store_id\":0,\"department_id\":2,\"birth_date\":\"1942-10-08\",\"hire_date\":\"1994-12-01 00:00:00.0\",\"end_date\":null,\"salary\":25000.0000,\"supervisor_id\":1,\"education_level\":\"Bachelors Degree\",\"marital_status\":\"M\",\"gender\":\"F\",\"management_role\":\"Senior Management\"}\n" +
"{\"employee_id\":7,\"full_name\":\"Rebecca Kanagaki\",\"first_name\":\"Rebecca\",\"last_name\":\"Kanagaki\",\"position_id\":4,\"position_title\":\"VP Human Resources\",\"store_id\":0,\"department_id\":3,\"birth_date\":\"1949-03-27\",\"hire_date\":\"1994-12-01 00:00:00.0\",\"end_date\":null,\"salary\":15000.0000,\"supervisor_id\":1,\"education_level\":\"Bachelors Degree\",\"marital_status\":\"M\",\"gender\":\"F\",\"management_role\":\"Senior Management\"}\n");
} catch (IOException e) {
fail(e.getMessage());
}
}
}