blob: 10a06d2fd84a00acd7d176403bd3849e9b3d92e6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.operator.combine;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.exception.QueryCancelledException;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
/**
* This test mimic the behavior of combining slow operators, where operation is not done by the timeout. When the
* combine operator returns, test whether the slow operators are properly interrupted, and if all the slow operators are
* not running in order to safely release the segment references.
*/
@SuppressWarnings("rawtypes")
public class CombineSlowOperatorsTest {
private static final int NUM_OPERATORS = 10;
private static final int NUM_THREADS = 2;
private static final long TIMEOUT_MS = 100L;
private ExecutorService _executorService;
@BeforeClass
public void setUp() {
_executorService = Executors.newFixedThreadPool(NUM_THREADS);
}
@Test
public void testSelectionOnlyCombineOperator() {
List<Operator> operators = getOperators();
QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable");
queryContext.setEndTimeMs(System.currentTimeMillis() + TIMEOUT_MS);
SelectionOnlyCombineOperator combineOperator =
new SelectionOnlyCombineOperator(operators, queryContext, _executorService);
testCombineOperator(operators, combineOperator);
}
// NOTE: Skip the test for SelectionOrderByCombineOperator because it requires SelectionOrderByOperator for the early
// termination optimization.
@Test
public void testAggregationOnlyCombineOperator() {
List<Operator> operators = getOperators();
QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) FROM testTable");
queryContext.setEndTimeMs(System.currentTimeMillis() + TIMEOUT_MS);
AggregationCombineOperator combineOperator =
new AggregationCombineOperator(operators, queryContext, _executorService);
testCombineOperator(operators, combineOperator);
}
@Test
public void testGroupByOrderByCombineOperator() {
List<Operator> operators = getOperators();
QueryContext queryContext =
QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) FROM testTable GROUP BY column");
queryContext.setEndTimeMs(System.currentTimeMillis() + TIMEOUT_MS);
GroupByCombineOperator combineOperator = new GroupByCombineOperator(operators, queryContext, _executorService);
testCombineOperator(operators, combineOperator);
}
@Test
public void testCancelSelectionOnlyCombineOperator() {
// Just need to wait for one operator to start running.
CountDownLatch ready = new CountDownLatch(1);
List<Operator> operators = getOperators(ready, null);
QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable");
queryContext.setEndTimeMs(System.currentTimeMillis() + 10000);
SelectionOnlyCombineOperator combineOperator =
new SelectionOnlyCombineOperator(operators, queryContext, _executorService);
testCancelCombineOperator(combineOperator, ready, "Cancelled while merging results blocks");
}
@Test
public void testCancelSelectionOrderByCombineOperator() {
CountDownLatch ready = new CountDownLatch(1);
List<Operator> operators = getOperators(ready, null);
QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable ORDER BY column");
queryContext.setEndTimeMs(System.currentTimeMillis() + 10000);
SelectionOrderByCombineOperator combineOperator =
new SelectionOrderByCombineOperator(operators, queryContext, _executorService);
testCancelCombineOperator(combineOperator, ready, "Cancelled while merging results blocks");
}
@Test
public void testCancelMinMaxValueBasedSelectionOrderByCombineOperator() {
CountDownLatch ready = new CountDownLatch(1);
List<Operator> operators = getOperators(ready, () -> {
IndexSegment seg = mock(IndexSegment.class);
DataSource ds = mock(DataSource.class);
DataSourceMetadata dsmd = mock(DataSourceMetadata.class);
when(dsmd.getMinValue()).thenReturn(100L);
when(dsmd.getMaxValue()).thenReturn(200L);
when(seg.getDataSource(anyString())).thenReturn(ds);
when(ds.getDataSourceMetadata()).thenReturn(dsmd);
return seg;
});
QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable ORDER BY column");
queryContext.setEndTimeMs(System.currentTimeMillis() + 10000);
SelectionOrderByCombineOperator combineOperator =
new SelectionOrderByCombineOperator(operators, queryContext, _executorService);
testCancelCombineOperator(combineOperator, ready, "Cancelled while merging results blocks");
}
@Test
public void testCancelAggregationOnlyCombineOperator() {
CountDownLatch ready = new CountDownLatch(1);
List<Operator> operators = getOperators(ready, null);
QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) FROM testTable");
queryContext.setEndTimeMs(System.currentTimeMillis() + 10000);
AggregationCombineOperator combineOperator =
new AggregationCombineOperator(operators, queryContext, _executorService);
testCancelCombineOperator(combineOperator, ready, "Cancelled while merging results blocks");
}
@Test
public void testCancelGroupByOrderByCombineOperator() {
CountDownLatch ready = new CountDownLatch(1);
List<Operator> operators = getOperators(ready, null);
QueryContext queryContext =
QueryContextConverterUtils.getQueryContext("SELECT COUNT(*) FROM testTable GROUP BY column");
queryContext.setEndTimeMs(System.currentTimeMillis() + 10000);
GroupByCombineOperator combineOperator = new GroupByCombineOperator(operators, queryContext, _executorService);
testCancelCombineOperator(combineOperator, ready, "Cancelled while merging results blocks");
}
private void testCancelCombineOperator(BaseCombineOperator combineOperator, CountDownLatch ready, String errMsg) {
AtomicReference<Exception> exp = new AtomicReference<>();
ExecutorService combineExecutor = Executors.newSingleThreadExecutor();
try {
Future<?> future = combineExecutor.submit(() -> {
try {
return combineOperator.nextBlock();
} catch (Exception e) {
exp.set(e);
throw e;
}
});
ready.await();
// At this point, the combineOperator is or will be waiting on future.get() for all sub operators, and the
// waiting can be cancelled as below.
future.cancel(true);
} catch (Exception e) {
Assert.fail();
} finally {
combineExecutor.shutdownNow();
}
TestUtils.waitForCondition((aVoid) -> exp.get() instanceof QueryCancelledException, 10_000,
"Should have been cancelled");
assertEquals(exp.get().getMessage(), errMsg);
}
/**
* NOTE: It is hard to test the logger behavior, but only one error message about the query timeout should be logged
* for each query.
*/
private void testCombineOperator(List<Operator> operators, BaseOperator combineOperator) {
BaseResultsBlock intermediateResultsBlock = (BaseResultsBlock) combineOperator.nextBlock();
List<ProcessingException> processingExceptions = intermediateResultsBlock.getProcessingExceptions();
assertNotNull(processingExceptions);
assertEquals(processingExceptions.size(), 1);
assertTrue(processingExceptions.get(0).getMessage().contains(TimeoutException.class.getName()));
// When the CombineOperator returns, all operators should be either not scheduled or interrupted, and no operator
// should be running so that the segment references can be safely released
for (Operator operator : operators) {
SlowOperator slowOperator = (SlowOperator) operator;
assertFalse(slowOperator._operationInProgress.get());
assertFalse(slowOperator._notInterrupted.get());
}
}
@AfterClass
public void tearDown() {
_executorService.shutdown();
}
private List<Operator> getOperators() {
return getOperators(null, null);
}
private List<Operator> getOperators(CountDownLatch ready, Supplier<IndexSegment> segmentSupplier) {
List<Operator> operators = new ArrayList<>(NUM_OPERATORS);
for (int i = 0; i < NUM_OPERATORS; i++) {
operators.add(new SlowOperator(ready, segmentSupplier));
}
return operators;
}
private static class SlowOperator extends BaseOperator {
private static final String EXPLAIN_NAME = "SLOW";
final AtomicBoolean _operationInProgress = new AtomicBoolean();
final AtomicBoolean _notInterrupted = new AtomicBoolean();
private final CountDownLatch _ready;
private final Supplier<IndexSegment> _segmentSupplier;
public SlowOperator(CountDownLatch ready, Supplier<IndexSegment> segmentSupplier) {
_ready = ready;
_segmentSupplier = segmentSupplier;
}
@Override
protected Block getNextBlock() {
_operationInProgress.set(true);
if (_ready != null) {
_ready.countDown();
}
try {
Thread.sleep(3_600_000L);
} catch (InterruptedException e) {
// Thread should be interrupted for early-termination
throw new EarlyTerminationException();
} finally {
// Wait for 100 milliseconds before marking the operation done
try {
Thread.sleep(100L);
_operationInProgress.set(false);
} catch (InterruptedException e) {
// Thread should not be interrupted again, and we should be able to mark the operation done
}
}
_notInterrupted.set(true);
return null;
}
@Override
public String toExplainString() {
return EXPLAIN_NAME;
}
@Override
public List<Operator> getChildOperators() {
return Collections.emptyList();
}
@Override
public ExecutionStatistics getExecutionStatistics() {
return new ExecutionStatistics(0, 0, 0, 0);
}
@Override
public IndexSegment getIndexSegment() {
if (_segmentSupplier != null) {
return _segmentSupplier.get();
}
return super.getIndexSegment();
}
}
}