| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.server; |
| |
| import static org.apache.drill.exec.ExecConstants.SLICE_TARGET; |
| import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT; |
| import static org.apache.drill.exec.planner.physical.PlannerSettings.HASHAGG; |
| import static org.apache.drill.exec.planner.physical.PlannerSettings.PARTITION_SENDER_SET_THREADS; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| import org.apache.commons.math3.util.Pair; |
| import org.apache.drill.exec.work.foreman.FragmentsRunner; |
| import org.apache.drill.test.BaseTestQuery; |
| import org.apache.drill.test.QueryTestUtil; |
| import org.apache.drill.SingleRowListener; |
| import org.apache.drill.common.DrillAutoCloseables; |
| import org.apache.drill.common.concurrent.ExtendedLatch; |
| import org.apache.drill.common.config.DrillConfig; |
| import org.apache.drill.common.exceptions.UserException; |
| import org.apache.drill.common.types.TypeProtos.MinorType; |
| import org.apache.drill.common.util.RepeatTestRule.Repeat; |
| import org.apache.drill.exec.ExecConstants; |
| import org.apache.drill.exec.ZookeeperHelper; |
| import org.apache.drill.exec.ZookeeperTestUtil; |
| import org.apache.drill.exec.client.DrillClient; |
| import org.apache.drill.exec.exception.DrillbitStartupException; |
| import org.apache.drill.exec.exception.SchemaChangeException; |
| import org.apache.drill.exec.memory.BufferAllocator; |
| import org.apache.drill.exec.memory.RootAllocatorFactory; |
| import org.apache.drill.exec.physical.impl.ScreenCreator; |
| import org.apache.drill.exec.physical.impl.SingleSenderCreator.SingleSenderRootExec; |
| import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch; |
| import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec; |
| import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator; |
| import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch; |
| import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch; |
| import org.apache.drill.exec.planner.sql.DrillSqlWorker; |
| import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; |
| import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; |
| import org.apache.drill.exec.proto.UserBitShared; |
| import org.apache.drill.exec.proto.UserBitShared.DrillPBError; |
| import org.apache.drill.exec.proto.UserBitShared.ExceptionWrapper; |
| import org.apache.drill.exec.proto.UserBitShared.QueryData; |
| import org.apache.drill.exec.proto.UserBitShared.QueryId; |
| import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; |
| import org.apache.drill.exec.proto.UserBitShared.QueryType; |
| import org.apache.drill.exec.record.BatchSchema; |
| import org.apache.drill.exec.record.MaterializedField; |
| import org.apache.drill.exec.record.RecordBatchLoader; |
| import org.apache.drill.exec.record.VectorWrapper; |
| import org.apache.drill.exec.rpc.ConnectionThrottle; |
| import org.apache.drill.exec.rpc.DrillRpcFuture; |
| import org.apache.drill.exec.rpc.RpcException; |
| import org.apache.drill.exec.rpc.user.QueryDataBatch; |
| import org.apache.drill.exec.rpc.user.UserResultsListener; |
| import org.apache.drill.exec.store.pojo.PojoRecordReader; |
| import org.apache.drill.exec.testing.Controls; |
| import org.apache.drill.exec.testing.ControlsInjectionUtil; |
| import org.apache.drill.exec.util.Pointer; |
| import org.apache.drill.exec.work.foreman.Foreman; |
| import org.apache.drill.exec.work.foreman.ForemanException; |
| import org.apache.drill.exec.work.foreman.ForemanSetupException; |
| import org.apache.drill.exec.work.fragment.FragmentExecutor; |
| import org.apache.drill.test.DrillTest; |
| import org.apache.drill.categories.SlowTest; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.slf4j.Logger; |
| |
| import com.google.common.base.Preconditions; |
| |
| /** |
| * Test how resilient drillbits are to throwing exceptions during various phases of query |
| * execution by injecting exceptions at various points, and to cancellations in various phases. |
| */ |
| @Category({SlowTest.class}) |
| public class TestDrillbitResilience extends DrillTest { |
| private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TestDrillbitResilience.class); |
| |
| private static ZookeeperHelper zkHelper; |
| private static RemoteServiceSet remoteServiceSet; |
| private static final Map<String, Drillbit> drillbits = new HashMap<>(); |
| private static DrillClient drillClient; |
| |
| /** |
| * The number of times test (that are repeated) should be repeated. |
| */ |
| private static final int NUM_RUNS = 3; |
| |
| /** |
| * Note: Counting sys.memory executes a fragment on every drillbit. This is a better check in comparison to |
| * counting sys.drillbits. |
| */ |
| private static final String TEST_QUERY = "select * from sys.memory"; |
| |
| private static void startDrillbit(final String name, final RemoteServiceSet remoteServiceSet) { |
| if (drillbits.containsKey(name)) { |
| throw new IllegalStateException("Drillbit named \"" + name + "\" already exists"); |
| } |
| |
| try { |
| @SuppressWarnings("resource") |
| final Drillbit drillbit = Drillbit.start(zkHelper.getConfig(), remoteServiceSet); |
| drillbits.put(name, drillbit); |
| } catch (final DrillbitStartupException e) { |
| throw new RuntimeException("Failed to start Drillbit \"" + name + "\"", e); |
| } |
| } |
| |
| /** |
| * Shutdown the specified drillbit. |
| * |
| * @param name name of the drillbit |
| */ |
| private static void stopDrillbit(final String name) { |
| final Drillbit drillbit = drillbits.get(name); |
| if (drillbit == null) { |
| throw new IllegalStateException("No Drillbit named \"" + name + "\" found"); |
| } |
| |
| try { |
| drillbit.close(); |
| } catch (final Exception e) { |
| final String message = "Error shutting down Drillbit \"" + name + "\""; |
| System.err.println(message + '.'); |
| logger.warn(message, e); |
| } |
| } |
| |
| /** |
| * Shutdown all the drillbits. |
| */ |
| private static void stopAllDrillbits() { |
| for (String name : drillbits.keySet()) { |
| stopDrillbit(name); |
| } |
| drillbits.clear(); |
| } |
| |
| /* |
| * Canned drillbit names. |
| */ |
| private final static String DRILLBIT_ALPHA = "alpha"; |
| private final static String DRILLBIT_BETA = "beta"; |
| private final static String DRILLBIT_GAMMA = "gamma"; |
| |
| /** |
| * Get the endpoint for the drillbit, if it is running |
| * @param name name of the drillbit |
| * @return endpoint of the drillbit |
| */ |
| @SuppressWarnings("resource") |
| private static DrillbitEndpoint getEndpoint(final String name) { |
| final Drillbit drillbit = drillbits.get(name); |
| if (drillbit == null) { |
| throw new IllegalStateException("No Drillbit named \"" + name + "\" found."); |
| } |
| return drillbit.getContext().getEndpoint(); |
| } |
| |
| @BeforeClass |
| public static void startSomeDrillbits() throws Exception { |
| // turn off the HTTP server to avoid port conflicts between the drill bits |
| System.setProperty(ExecConstants.HTTP_ENABLE, "false"); |
| |
| ZookeeperTestUtil.setJaasTestConfigFile(); |
| |
| // turn on error for failure in cancelled fragments |
| zkHelper = new ZookeeperHelper(true, true); |
| zkHelper.startZookeeper(1); |
| |
| // use a non-null service set so that the drillbits can use port hunting |
| remoteServiceSet = RemoteServiceSet.getLocalServiceSet(); |
| |
| // create name-addressable drillbits |
| startDrillbit(DRILLBIT_ALPHA, remoteServiceSet); |
| startDrillbit(DRILLBIT_BETA, remoteServiceSet); |
| startDrillbit(DRILLBIT_GAMMA, remoteServiceSet); |
| |
| // create a client |
| final DrillConfig drillConfig = zkHelper.getConfig(); |
| drillClient = QueryTestUtil.createClient(drillConfig, remoteServiceSet, 1, new Properties()); |
| clearAllInjections(); |
| } |
| |
| @AfterClass |
| public static void shutdownAllDrillbits() { |
| if (drillClient != null) { |
| drillClient.close(); |
| drillClient = null; |
| } |
| |
| stopAllDrillbits(); |
| |
| if (remoteServiceSet != null) { |
| try { |
| remoteServiceSet.close(); |
| } catch (Exception e) { |
| logger.warn("Failure on close()", e); |
| } |
| remoteServiceSet = null; |
| } |
| |
| zkHelper.stopZookeeper(); |
| } |
| |
| /** |
| * Clear all injections. |
| */ |
| private static void clearAllInjections() { |
| Preconditions.checkNotNull(drillClient); |
| ControlsInjectionUtil.clearControls(drillClient); |
| } |
| |
| /** |
| * Check that all the drillbits are ok. |
| * <p/> |
| * <p>The current implementation does this by counting the number of drillbits using a query. |
| */ |
| private static void assertDrillbitsOk() { |
| final SingleRowListener listener = new SingleRowListener() { |
| private final BufferAllocator bufferAllocator = RootAllocatorFactory.newRoot(zkHelper.getConfig()); |
| private final RecordBatchLoader loader = new RecordBatchLoader(bufferAllocator); |
| |
| @Override |
| public void rowArrived(final QueryDataBatch queryResultBatch) { |
| // load the single record |
| final QueryData queryData = queryResultBatch.getHeader(); |
| try { |
| loader.load(queryData.getDef(), queryResultBatch.getData()); |
| // TODO: Clean: DRILL-2933: That load(...) no longer throws |
| // SchemaChangeException, so check/clean catch clause below. |
| } catch (final SchemaChangeException e) { |
| fail(e.toString()); |
| } |
| assertEquals(1, loader.getRecordCount()); |
| |
| // there should only be one column |
| final BatchSchema batchSchema = loader.getSchema(); |
| assertEquals(1, batchSchema.getFieldCount()); |
| |
| // the column should be an integer |
| final MaterializedField countField = batchSchema.getColumn(0); |
| final MinorType fieldType = countField.getType().getMinorType(); |
| assertEquals(MinorType.BIGINT, fieldType); |
| |
| // get the column value |
| final VectorWrapper<?> vw = loader.iterator().next(); |
| final Object obj = vw.getValueVector().getAccessor().getObject(0); |
| assertTrue(obj instanceof Long); |
| final Long countValue = (Long) obj; |
| |
| // assume this means all the drillbits are still ok |
| assertEquals(drillbits.size(), countValue.intValue()); |
| |
| loader.clear(); |
| } |
| |
| @Override |
| public void cleanup() { |
| DrillAutoCloseables.closeNoChecked(bufferAllocator); |
| } |
| }; |
| |
| try { |
| QueryTestUtil.testWithListener(drillClient, QueryType.SQL, "select count(*) from sys.memory", listener); |
| listener.waitForCompletion(); |
| final QueryState state = listener.getQueryState(); |
| assertTrue(String.format("QueryState should be COMPLETED (and not %s).", state), state == QueryState.COMPLETED); |
| } catch (final Exception e) { |
| throw new RuntimeException("Couldn't query active drillbits", e); |
| } |
| |
| final List<DrillPBError> errorList = listener.getErrorList(); |
| assertTrue("There should not be any errors when checking if Drillbits are OK.", errorList.isEmpty()); |
| } |
| |
| @After |
| public void checkDrillbits() { |
| clearAllInjections(); // so that the drillbit check itself doesn't trigger anything |
| assertDrillbitsOk(); // TODO we need a way to do this without using a query |
| } |
| |
| /** |
| * Set the given controls. |
| */ |
| private static void setControls(final String controls) { |
| ControlsInjectionUtil.setControls(drillClient, controls); |
| } |
| |
| /** |
| * Sets a session option. |
| */ |
| private static void setSessionOption(final String option, final String value) { |
| ControlsInjectionUtil.setSessionOption(drillClient, option, value); |
| } |
| |
| private static void resetSessionOption(final String option) { |
| try { |
| final List<QueryDataBatch> results = drillClient.runQuery( |
| UserBitShared.QueryType.SQL, String.format("ALTER session RESET `%s`", |
| option)); |
| for (final QueryDataBatch data : results) { |
| data.release(); |
| } |
| } catch (final RpcException e) { |
| fail("Could not reset option: " + e.toString()); |
| } |
| } |
| |
| /** |
| * Check that the injected exception is what we were expecting. |
| * |
| * @param throwable the throwable that was caught (by the test) |
| * @param exceptionClass the expected exception class |
| * @param desc the expected exception site description |
| */ |
| private static void assertExceptionMessage(final Throwable throwable, final Class<? extends Throwable> exceptionClass, |
| final String desc) { |
| assertTrue("Throwable was not of UserException type.", throwable instanceof UserException); |
| final ExceptionWrapper cause = ((UserException) throwable).getOrCreatePBError(false).getException(); |
| assertEquals("Exception class names should match.", exceptionClass.getName(), cause.getExceptionClass()); |
| assertEquals("Exception sites should match.", desc, cause.getMessage()); |
| } |
| |
| @Test |
| public void settingNoOpInjectionsAndQuery() { |
| final long before = countAllocatedMemory(); |
| |
| final String controls = Controls.newBuilder() |
| .addExceptionOnBit(getClass(), "noop", RuntimeException.class, getEndpoint(DRILLBIT_BETA)) |
| .build(); |
| setControls(controls); |
| final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(); |
| QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener); |
| final Pair<QueryState, Exception> pair = listener.waitForCompletion(); |
| assertStateCompleted(pair, QueryState.COMPLETED); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| /** |
| * Test throwing exceptions from sites within the Foreman class, as specified by the site |
| * description |
| * |
| * @param desc site description |
| */ |
| private static void testForeman(final String desc) { |
| final String controls = Controls.newBuilder() |
| .addException(Foreman.class, desc, ForemanException.class) |
| .build(); |
| assertFailsWithException(controls, ForemanException.class, desc); |
| } |
| |
| @Test |
| @Repeat(count = NUM_RUNS) |
| public void foreman_runTryBeginning() { |
| final long before = countAllocatedMemory(); |
| |
| testForeman("run-try-beginning"); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| @Test |
| @Ignore // TODO(DRILL-3163, DRILL-3167) |
| //@Repeat(count = NUM_RUNS) |
| public void foreman_runTryEnd() { |
| final long before = countAllocatedMemory(); |
| |
| testForeman("run-try-end"); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| /** |
| * Tests can use this listener to wait, until the submitted query completes or fails, by |
| * calling #waitForCompletion. |
| */ |
| private static class WaitUntilCompleteListener implements UserResultsListener { |
| private final ExtendedLatch latch = new ExtendedLatch(1); // to signal completion |
| protected QueryId queryId = null; |
| protected volatile Pointer<Exception> ex = new Pointer<>(); |
| protected volatile QueryState state = null; |
| |
| /** |
| * Method that sets the exception if the condition is not met. |
| */ |
| protected final void check(final boolean condition, final String format, final Object... args) { |
| if (!condition) { |
| ex.value = new IllegalStateException(String.format(format, args)); |
| } |
| } |
| |
| /** |
| * Method that cancels and resumes the query, in order. |
| */ |
| protected final void cancelAndResume() { |
| Preconditions.checkNotNull(queryId); |
| final ExtendedLatch trigger = new ExtendedLatch(1); |
| (new CancellingThread(queryId, ex, trigger)).start(); |
| (new ResumingThread(queryId, ex, trigger)).start(); |
| } |
| |
| @Override |
| public void queryIdArrived(final QueryId queryId) { |
| this.queryId = queryId; |
| } |
| |
| @Override |
| public void submissionFailed(final UserException ex) { |
| this.ex.value = ex; |
| state = QueryState.FAILED; |
| latch.countDown(); |
| } |
| |
| @Override |
| public void queryCompleted(final QueryState state) { |
| this.state = state; |
| latch.countDown(); |
| } |
| |
| @Override |
| public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) { |
| result.release(); |
| } |
| |
| public final Pair<QueryState, Exception> waitForCompletion() { |
| latch.awaitUninterruptibly(); |
| return new Pair<>(state, ex.value); |
| } |
| } |
| |
| private static class ListenerThatCancelsQueryAfterFirstBatchOfData extends WaitUntilCompleteListener { |
| private boolean cancelRequested = false; |
| |
| @Override |
| public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) { |
| if (!cancelRequested) { |
| check(queryId != null, "Query id should not be null, since we have waited long enough."); |
| (new CancellingThread(queryId, ex, null)).start(); |
| cancelRequested = true; |
| } |
| result.release(); |
| } |
| } |
| |
| /** |
| * Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down. |
| */ |
| private static class CancellingThread extends Thread { |
| private final QueryId queryId; |
| private final Pointer<Exception> ex; |
| private final ExtendedLatch latch; |
| |
| public CancellingThread(final QueryId queryId, final Pointer<Exception> ex, final ExtendedLatch latch) { |
| this.queryId = queryId; |
| this.ex = ex; |
| this.latch = latch; |
| } |
| |
| @Override |
| public void run() { |
| final DrillRpcFuture<Ack> cancelAck = drillClient.cancelQuery(queryId); |
| try { |
| cancelAck.checkedGet(); |
| } catch (final RpcException ex) { |
| this.ex.value = ex; |
| } |
| if (latch != null) { |
| latch.countDown(); |
| } |
| } |
| } |
| |
| /** |
| * Thread that resumes the given query id. After the latch is counted down, the resume signal is sent, until then |
| * the thread waits without interruption. |
| */ |
| private static class ResumingThread extends Thread { |
| private final QueryId queryId; |
| private final Pointer<Exception> ex; |
| private final ExtendedLatch latch; |
| |
| public ResumingThread(final QueryId queryId, final Pointer<Exception> ex, final ExtendedLatch latch) { |
| this.queryId = queryId; |
| this.ex = ex; |
| this.latch = latch; |
| } |
| |
| @Override |
| public void run() { |
| latch.awaitUninterruptibly(); |
| final DrillRpcFuture<Ack> resumeAck = drillClient.resumeQuery(queryId); |
| try { |
| resumeAck.checkedGet(); |
| } catch (final RpcException ex) { |
| this.ex.value = ex; |
| } |
| } |
| } |
| |
| /** |
| * Given the result of {@link WaitUntilCompleteListener#waitForCompletion}, |
| * this method fails if the completed state is not as expected, or if an |
| * exception is thrown. The completed state could be COMPLETED or CANCELED. |
| * This state is set when {@link WaitUntilCompleteListener#queryCompleted} is |
| * called. |
| */ |
| private static void assertStateCompleted(final Pair<QueryState, Exception> result, final QueryState expectedState) { |
| final QueryState actualState = result.getFirst(); |
| final Exception exception = result.getSecond(); |
| if (actualState != expectedState || exception != null) { |
| fail(String.format("Query state is incorrect (expected: %s, actual: %s) AND/OR \nException thrown: %s", |
| expectedState, actualState, exception == null ? "none." : exception)); |
| } |
| } |
| |
| /** |
| * Given a set of controls, this method ensures that the given query completes with a CANCELED state. |
| */ |
| private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener, |
| final String query) { |
| setControls(controls); |
| |
| QueryTestUtil.testWithListener(drillClient, QueryType.SQL, query, listener); |
| final Pair<QueryState, Exception> result = listener.waitForCompletion(); |
| assertStateCompleted(result, QueryState.CANCELED); |
| } |
| |
| /** |
| * Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state. |
| */ |
| private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) { |
| assertCancelledWithoutException(controls, listener, TEST_QUERY); |
| } |
| |
| @Test // To test pause and resume. Test hangs and times out if resume did not happen. |
| public void passThrough() { |
| final long before = countAllocatedMemory(); |
| |
| final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() { |
| @Override |
| public void queryIdArrived(final QueryId queryId) { |
| super.queryIdArrived(queryId); |
| final ExtendedLatch trigger = new ExtendedLatch(1); |
| (new ResumingThread(queryId, ex, trigger)).start(); |
| trigger.countDown(); |
| } |
| }; |
| |
| final String controls = Controls.newBuilder() |
| .addPause(PojoRecordReader.class, "read-next") |
| .build(); |
| setControls(controls); |
| |
| QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener); |
| final Pair<QueryState, Exception> result = listener.waitForCompletion(); |
| assertStateCompleted(result, QueryState.COMPLETED); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| // DRILL-3052: Since root fragment is waiting on data and leaf fragments are cancelled before they send any |
| // data to root, root will never run. This test will timeout if the root did not send the final state to Foreman. |
| // DRILL-2383: Cancellation TC 1: cancel before any result set is returned. |
| @Test |
| @Ignore // TODO(DRILL-3192) |
| //@Repeat(count = NUM_RUNS) |
| public void cancelWhenQueryIdArrives() { |
| final long before = countAllocatedMemory(); |
| |
| final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() { |
| |
| @Override |
| public void queryIdArrived(final QueryId queryId) { |
| super.queryIdArrived(queryId); |
| cancelAndResume(); |
| } |
| }; |
| |
| final String controls = Controls.newBuilder() |
| .addPause(FragmentExecutor.class, "fragment-running") |
| .build(); |
| assertCancelledWithoutException(controls, listener); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| @Test // DRILL-2383: Cancellation TC 2: cancel in the middle of fetching result set |
| @Repeat(count = NUM_RUNS) |
| @Ignore("DRILL-6228") |
| public void cancelInMiddleOfFetchingResults() { |
| final long before = countAllocatedMemory(); |
| |
| final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() { |
| private boolean cancelRequested = false; |
| |
| @Override |
| public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) { |
| if (!cancelRequested) { |
| check(queryId != null, "Query id should not be null, since we have waited long enough."); |
| cancelAndResume(); |
| cancelRequested = true; |
| } |
| result.release(); |
| } |
| }; |
| |
| // skip once i.e. wait for one batch, so that #dataArrived above triggers #cancelAndResume |
| final String controls = Controls.newBuilder() |
| .addPause(ScreenCreator.class, "sending-data", 1) |
| .build(); |
| assertCancelledWithoutException(controls, listener); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| |
| @Test // DRILL-2383: Cancellation TC 3: cancel after all result set are produced but not all are fetched |
| @Repeat(count = NUM_RUNS) |
| @Ignore("DRILL-6228") |
| public void cancelAfterAllResultsProduced() { |
| final long before = countAllocatedMemory(); |
| |
| final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() { |
| private int count = 0; |
| |
| @Override |
| public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) { |
| if (++count == drillbits.size()) { |
| check(queryId != null, "Query id should not be null, since we have waited long enough."); |
| cancelAndResume(); |
| } |
| result.release(); |
| } |
| }; |
| |
| final String controls = Controls.newBuilder() |
| .addPause(ScreenCreator.class, "send-complete") |
| .build(); |
| assertCancelledWithoutException(controls, listener); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| @Test // DRILL-2383: Cancellation TC 4: cancel after everything is completed and fetched |
| @Repeat(count = NUM_RUNS) |
| @Ignore("DRILL-3967") |
| public void cancelAfterEverythingIsCompleted() { |
| final long before = countAllocatedMemory(); |
| |
| final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() { |
| private int count = 0; |
| |
| @Override |
| public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) { |
| if (++count == drillbits.size()) { |
| check(queryId != null, "Query id should not be null, since we have waited long enough."); |
| cancelAndResume(); |
| } |
| result.release(); |
| } |
| }; |
| |
| final String controls = Controls.newBuilder() |
| .addPause(Foreman.class, "foreman-cleanup") |
| .build(); |
| assertCancelledWithoutException(controls, listener); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| @Test // DRILL-2383: Completion TC 1: success |
| public void successfullyCompletes() { |
| final long before = countAllocatedMemory(); |
| |
| final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(); |
| QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener); |
| final Pair<QueryState, Exception> result = listener.waitForCompletion(); |
| assertStateCompleted(result, QueryState.COMPLETED); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| /** |
| * Given a set of controls, this method ensures TEST_QUERY fails with the given class and desc. |
| */ |
| private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass, |
| final String exceptionDesc, final String query) { |
| setControls(controls); |
| final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(); |
| QueryTestUtil.testWithListener(drillClient, QueryType.SQL, query, listener); |
| final Pair<QueryState, Exception> result = listener.waitForCompletion(); |
| final QueryState state = result.getFirst(); |
| assertTrue(String.format("Query state should be FAILED (and not %s).", state), state == QueryState.FAILED); |
| assertExceptionMessage(result.getSecond(), exceptionClass, exceptionDesc); |
| } |
| |
| private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass, |
| final String exceptionDesc) { |
| assertFailsWithException(controls, exceptionClass, exceptionDesc, TEST_QUERY); |
| } |
| |
| @Test // DRILL-2383: Completion TC 2: failed query - before query is executed - while sql parsing |
| public void failsWhenParsing() { |
| final long before = countAllocatedMemory(); |
| |
| final String exceptionDesc = "sql-parsing"; |
| final Class<? extends Throwable> exceptionClass = ForemanSetupException.class; |
| final String controls = Controls.newBuilder() |
| .addException(DrillSqlWorker.class, exceptionDesc, exceptionClass) |
| .build(); |
| assertFailsWithException(controls, exceptionClass, exceptionDesc); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| @Test // DRILL-2383: Completion TC 3: failed query - before query is executed - while sending fragments to other |
| // drillbits |
| public void failsWhenSendingFragments() { |
| final long before = countAllocatedMemory(); |
| |
| final String exceptionDesc = "send-fragments"; |
| final Class<? extends Throwable> exceptionClass = ForemanException.class; |
| final String controls = Controls.newBuilder() |
| .addException(FragmentsRunner.class, exceptionDesc, exceptionClass) |
| .build(); |
| assertFailsWithException(controls, exceptionClass, exceptionDesc); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| @Test // DRILL-2383: Completion TC 4: failed query - during query execution |
| public void failsDuringExecution() { |
| final long before = countAllocatedMemory(); |
| |
| final String exceptionDesc = "fragment-execution"; |
| final Class<? extends Throwable> exceptionClass = IOException.class; |
| final String controls = Controls.newBuilder() |
| .addException(FragmentExecutor.class, exceptionDesc, exceptionClass) |
| .build(); |
| assertFailsWithException(controls, exceptionClass, exceptionDesc); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| /** |
| * Test canceling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen. |
| * Specifically tests canceling fragment which has {@link MergingRecordBatch} blocked waiting for data. |
| */ |
| @Test |
| @Repeat(count = NUM_RUNS) |
| public void interruptingBlockedMergingRecordBatch() { |
| final long before = countAllocatedMemory(); |
| |
| final String control = Controls.newBuilder() |
| .addPause(MergingRecordBatch.class, "waiting-for-data", 1) |
| .build(); |
| interruptingBlockedFragmentsWaitingForData(control); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| /** |
| * Test canceling query interrupts currently blocked FragmentExecutor threads waiting for some event to happen. |
| * Specifically tests canceling fragment which has {@link UnorderedReceiverBatch} blocked waiting for data. |
| */ |
| @Test |
| @Repeat(count = NUM_RUNS) |
| public void interruptingBlockedUnorderedReceiverBatch() { |
| final long before = countAllocatedMemory(); |
| |
| final String control = Controls.newBuilder() |
| .addPause(UnorderedReceiverBatch.class, "waiting-for-data", 1) |
| .build(); |
| interruptingBlockedFragmentsWaitingForData(control); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| private static void interruptingBlockedFragmentsWaitingForData(final String control) { |
| try { |
| setSessionOption(SLICE_TARGET, "1"); |
| setSessionOption(HASHAGG.getOptionName(), "false"); |
| |
| final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city"; |
| assertCancelledWithoutException(control, new ListenerThatCancelsQueryAfterFirstBatchOfData(), query); |
| } finally { |
| resetSessionOption(SLICE_TARGET); |
| resetSessionOption(HASHAGG.getOptionName()); |
| } |
| } |
| |
| /** |
| * Tests interrupting the fragment thread that is running {@link PartitionSenderRootExec}. |
| * {@link PartitionSenderRootExec} spawns threads for partitioner. Interrupting fragment thread should also interrupt |
| * the partitioner threads. |
| */ |
| @Test |
| @Repeat(count = NUM_RUNS) |
| public void interruptingPartitionerThreadFragment() { |
| try { |
| setSessionOption(SLICE_TARGET, "1"); |
| setSessionOption(HASHAGG.getOptionName(), "true"); |
| setSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName(), "6"); |
| |
| final long before = countAllocatedMemory(); |
| |
| final String controls = Controls.newBuilder() |
| .addLatch(PartitionerDecorator.class, "partitioner-sender-latch") |
| .addPause(PartitionerDecorator.class, "wait-for-fragment-interrupt", 1) |
| .build(); |
| |
| final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city"; |
| assertCancelledWithoutException(controls, new ListenerThatCancelsQueryAfterFirstBatchOfData(), query); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } finally { |
| resetSessionOption(SLICE_TARGET); |
| resetSessionOption(HASHAGG.getOptionName()); |
| resetSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName()); |
| } |
| } |
| |
| @Test |
| @Ignore // TODO(DRILL-3193) |
| //@Repeat(count = NUM_RUNS) |
| public void interruptingWhileFragmentIsBlockedInAcquiringSendingTicket() { |
| final long before = countAllocatedMemory(); |
| |
| final String control = Controls.newBuilder() |
| .addPause(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1) |
| .build(); |
| assertCancelledWithoutException(control, new ListenerThatCancelsQueryAfterFirstBatchOfData()); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| @Test |
| @Repeat(count = NUM_RUNS) |
| public void memoryLeaksWhenCancelled() { |
| setSessionOption(SLICE_TARGET, "10"); |
| |
| final long before = countAllocatedMemory(); |
| |
| try { |
| final String controls = Controls.newBuilder() |
| .addPause(ScreenCreator.class, "sending-data", 1) |
| .build(); |
| String query = null; |
| try { |
| query = BaseTestQuery.getFile("queries/tpch/09.sql"); |
| query = query.substring(0, query.length() - 1); // drop the ";" |
| } catch (final IOException e) { |
| fail("Failed to get query file: " + e); |
| } |
| |
| final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() { |
| private volatile boolean cancelRequested = false; |
| |
| @Override |
| public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) { |
| if (!cancelRequested) { |
| check(queryId != null, "Query id should not be null, since we have waited long enough."); |
| cancelAndResume(); |
| cancelRequested = true; |
| } |
| result.release(); |
| } |
| }; |
| |
| assertCancelledWithoutException(controls, listener, query); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } finally { |
| setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT)); |
| } |
| } |
| |
| @Test |
| @Ignore // TODO(DRILL-3194) |
| //@Repeat(count = NUM_RUNS) |
| public void memoryLeaksWhenFailed() { |
| setSessionOption(SLICE_TARGET, "10"); |
| |
| final long before = countAllocatedMemory(); |
| |
| try { |
| final String exceptionDesc = "fragment-execution"; |
| final Class<? extends Throwable> exceptionClass = IOException.class; |
| final String controls = Controls.newBuilder() |
| .addException(FragmentExecutor.class, exceptionDesc, exceptionClass) |
| .build(); |
| |
| String query = null; |
| try { |
| query = BaseTestQuery.getFile("queries/tpch/09.sql"); |
| query = query.substring(0, query.length() - 1); // drop the ";" |
| } catch (final IOException e) { |
| fail("Failed to get query file: " + e); |
| } |
| |
| assertFailsWithException(controls, exceptionClass, exceptionDesc, query); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| |
| } finally { |
| setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT)); |
| } |
| } |
| |
| @Test // DRILL-3065 |
| public void failsAfterMSorterSorting() { |
| |
| // Note: must use an input table that returns more than one |
| // batch. The sort uses an optimization for single-batch inputs |
| // which bypasses the code where this partiucular fault is |
| // injected. |
| |
| final String query = "select n_name from cp.`tpch/lineitem.parquet` order by n_name"; |
| final Class<? extends Exception> typeOfException = RuntimeException.class; |
| |
| final long before = countAllocatedMemory(); |
| final String controls = Controls.newBuilder() |
| .addException(ExternalSortBatch.class, ExternalSortBatch.INTERRUPTION_AFTER_SORT, typeOfException) |
| .build(); |
| assertFailsWithException(controls, typeOfException, ExternalSortBatch.INTERRUPTION_AFTER_SORT, query); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| @Test // DRILL-3085 |
| public void failsAfterMSorterSetup() { |
| |
| // Note: must use an input table that returns more than one |
| // batch. The sort uses an optimization for single-batch inputs |
| // which bypasses the code where this partiucular fault is |
| // injected. |
| |
| final String query = "select n_name from cp.`tpch/lineitem.parquet` order by n_name"; |
| final Class<? extends Exception> typeOfException = RuntimeException.class; |
| |
| final long before = countAllocatedMemory(); |
| final String controls = Controls.newBuilder() |
| .addException(ExternalSortBatch.class, ExternalSortBatch.INTERRUPTION_AFTER_SETUP, typeOfException) |
| .build(); |
| assertFailsWithException(controls, typeOfException, ExternalSortBatch.INTERRUPTION_AFTER_SETUP, query); |
| |
| final long after = countAllocatedMemory(); |
| assertEquals(String.format("We are leaking %d bytes", after - before), before, after); |
| } |
| |
| private static long countAllocatedMemory() { |
| // wait to make sure all fragments finished cleaning up |
| try { |
| Thread.sleep(2000); |
| } catch (final InterruptedException e) { |
| // just ignore |
| } |
| |
| long allocated = 0; |
| for (final String name : drillbits.keySet()) { |
| allocated += drillbits.get(name).getContext().getAllocator().getAllocatedMemory(); |
| } |
| |
| return allocated; |
| } |
| } |