blob: 77288e5acc2ae536d45e6668ff2be2bdaec2c3e2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.plan;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.combine.AggregationCombineOperator;
import org.apache.pinot.core.operator.combine.BaseCombineOperator;
import org.apache.pinot.core.operator.combine.CombineOperatorUtils;
import org.apache.pinot.core.operator.combine.DistinctCombineOperator;
import org.apache.pinot.core.operator.combine.GroupByCombineOperator;
import org.apache.pinot.core.operator.combine.SelectionOnlyCombineOperator;
import org.apache.pinot.core.operator.combine.SelectionOrderByCombineOperator;
import org.apache.pinot.core.operator.streaming.StreamingSelectionOnlyCombineOperator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
import org.apache.pinot.core.util.trace.TraceCallable;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.QueryCancelledException;
import org.apache.pinot.spi.trace.InvocationRecording;
import org.apache.pinot.spi.trace.InvocationScope;
import org.apache.pinot.spi.trace.Tracing;
/**
* The <code>CombinePlanNode</code> class provides the execution plan for combining results from multiple segments.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class CombinePlanNode implements PlanNode {
// Try to schedule 10 plans for each thread, or evenly distribute plans to all MAX_NUM_THREADS_PER_QUERY threads
private static final int TARGET_NUM_PLANS_PER_THREAD = 10;
private final List<PlanNode> _planNodes;
private final QueryContext _queryContext;
private final ExecutorService _executorService;
private final StreamObserver<Server.ServerResponse> _streamObserver;
/**
* Constructor for the class.
*
* @param planNodes List of underlying plan nodes
* @param queryContext Query context
* @param executorService Executor service
* @param streamObserver Optional stream observer for streaming query
*/
public CombinePlanNode(List<PlanNode> planNodes, QueryContext queryContext, ExecutorService executorService,
@Nullable StreamObserver<Server.ServerResponse> streamObserver) {
_planNodes = planNodes;
_queryContext = queryContext;
_executorService = executorService;
_streamObserver = streamObserver;
}
@Override
public BaseCombineOperator run() {
try (InvocationScope ignored = Tracing.getTracer().createScope(CombinePlanNode.class)) {
return getCombineOperator();
}
}
private BaseCombineOperator getCombineOperator() {
InvocationRecording recording = Tracing.activeRecording();
int numPlanNodes = _planNodes.size();
recording.setNumChildren(numPlanNodes);
List<Operator> operators = new ArrayList<>(numPlanNodes);
if (numPlanNodes <= TARGET_NUM_PLANS_PER_THREAD) {
// Small number of plan nodes, run them sequentially
for (PlanNode planNode : _planNodes) {
operators.add(planNode.run());
}
} else {
// Large number of plan nodes, run them in parallel
// NOTE: Even if we get single executor thread, still run it using a separate thread so that the timeout can be
// honored
int maxExecutionThreads = _queryContext.getMaxExecutionThreads();
if (maxExecutionThreads <= 0) {
maxExecutionThreads = CombineOperatorUtils.MAX_NUM_THREADS_PER_QUERY;
}
int numTasks =
Math.min((numPlanNodes + TARGET_NUM_PLANS_PER_THREAD - 1) / TARGET_NUM_PLANS_PER_THREAD, maxExecutionThreads);
recording.setNumTasks(numTasks);
// Use a Phaser to ensure all the Futures are done (not scheduled, finished or interrupted) before the main thread
// returns. We need to ensure no execution left before the main thread returning because the main thread holds the
// reference to the segments, and if the segments are deleted/refreshed, the segments can be released after the
// main thread returns, which would lead to undefined behavior (even JVM crash) when executing queries against
// them.
Phaser phaser = new Phaser(1);
// Submit all jobs
Future[] futures = new Future[numTasks];
for (int i = 0; i < numTasks; i++) {
int index = i;
futures[i] = _executorService.submit(new TraceCallable<List<Operator>>() {
@Override
public List<Operator> callJob() {
try {
// Register the thread to the phaser.
// If the phaser is terminated (returning negative value) when trying to register the thread, that means
// the query execution has timed out, and the main thread has deregistered itself and returned the result.
// Directly return as no execution result will be taken.
if (phaser.register() < 0) {
return Collections.emptyList();
}
List<Operator> operators = new ArrayList<>();
for (int i = index; i < numPlanNodes; i += numTasks) {
operators.add(_planNodes.get(i).run());
}
return operators;
} finally {
phaser.arriveAndDeregister();
}
}
});
}
// Get all results
try {
for (Future future : futures) {
List<Operator> ops = (List<Operator>) future.get(_queryContext.getEndTimeMs() - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
operators.addAll(ops);
}
} catch (Exception e) {
// Future object will throw ExecutionException for execution exception, need to check the cause to determine
// whether it is caused by bad query
Throwable cause = e.getCause();
if (cause instanceof BadQueryRequestException) {
throw (BadQueryRequestException) cause;
} else if (e instanceof InterruptedException) {
throw new QueryCancelledException("Cancelled while running CombinePlanNode", e);
} else {
throw new RuntimeException("Caught exception while running CombinePlanNode.", e);
}
} finally {
// Cancel all ongoing jobs
for (Future future : futures) {
if (!future.isDone()) {
future.cancel(true);
}
}
// Deregister the main thread and wait for all threads done
phaser.awaitAdvance(phaser.arriveAndDeregister());
}
}
if (_streamObserver != null && QueryContextUtils.isSelectionOnlyQuery(_queryContext)) {
// Streaming query (only support selection only)
return new StreamingSelectionOnlyCombineOperator(operators, _queryContext, _executorService, _streamObserver);
}
if (QueryContextUtils.isAggregationQuery(_queryContext)) {
if (_queryContext.getGroupByExpressions() == null) {
// Aggregation only
return new AggregationCombineOperator(operators, _queryContext, _executorService);
} else {
// Aggregation group-by
return new GroupByCombineOperator(operators, _queryContext, _executorService);
}
} else if (QueryContextUtils.isSelectionQuery(_queryContext)) {
if (_queryContext.getLimit() == 0 || _queryContext.getOrderByExpressions() == null) {
// Selection only
return new SelectionOnlyCombineOperator(operators, _queryContext, _executorService);
} else {
// Selection order-by
return new SelectionOrderByCombineOperator(operators, _queryContext, _executorService);
}
} else {
assert QueryContextUtils.isDistinctQuery(_queryContext);
return new DistinctCombineOperator(operators, _queryContext, _executorService);
}
}
}