blob: 6d68bff3f7562fdf265be41517d98882db394ef0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.util.List;
import org.apache.impala.analysis.Expr;
import org.apache.impala.thrift.TDataSink;
import org.apache.impala.thrift.TDataSinkType;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TPlanRootSink;
import org.apache.impala.thrift.TQueryOptions;
import com.google.common.base.Preconditions;
/**
* Sink for the root of a query plan that produces result rows. Allows coordination
* between the sender which produces those rows, and the consumer which sends them to the
* client, despite both executing concurrently.
*/
public class PlanRootSink extends DataSink {
// The default estimated memory consumption is 10 mb. Only used if statistics are not
// available. 10 mb should be sufficient to buffer results from most queries. See
// IMPALA-4268 for details on how this value was chosen.
private static final long DEFAULT_RESULT_SPOOLING_ESTIMATED_MEMORY = 10 * 1024 * 1024;
// One expression per result column for the query.
private final List<Expr> outputExprs_;
public PlanRootSink(List<Expr> outputExprs) {
Preconditions.checkState(outputExprs != null);
outputExprs_ = outputExprs;
}
@Override
public void appendSinkExplainString(String prefix, String detailPrefix,
TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder output) {
output.append(String.format("%sPLAN-ROOT SINK\n", prefix));
if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
output.append(detailPrefix + "output exprs: ")
.append(Expr.getExplainString(outputExprs_, explainLevel) + "\n");
}
}
@Override
protected String getLabel() {
return "ROOT";
}
/**
* Computes and sets the {@link ResourceProfile} for this PlanRootSink. If result
* spooling is disabled, a ResourceProfile is returned with no reservation or buffer
* sizes, and the estimated memory consumption is 0. Without result spooling, no rows
* get buffered, and only a single RowBatch is passed to the client at a time. Given
* that RowBatch memory is currently unreserved, no reservation is necessary. If
* SPOOL_QUERY_RESULTS is true, then the ResourceProfile sets a min/max resevation,
* estimated memory consumption, max buffer size, and spillable buffer size. The
* 'memEstimateBytes' (estimated memory consumption in bytes) is set by taking the
* estimated number of input rows into the sink and multiplying it by the estimated
* average row size. The estimated number of input rows is derived from the cardinality
* of the associated fragment's root node. If the cardinality or the average row size
* are not available, a default value is used. The minimum reservation is set 2x the
* default spillable buffer size to account for the read and write page in the
* BufferedTupleStream used by the backend plan-root-sink. The maximum reservation is
* set to the query-level config MAX_PINNED_RESULT_SPOOLING_MEMORY.
*/
@Override
public void computeResourceProfile(TQueryOptions queryOptions) {
if (queryOptions.isSpool_query_results()) {
long bufferSize = queryOptions.getDefault_spillable_buffer_size();
long maxRowBufferSize = PlanNode.computeMaxSpillableBufferSize(
bufferSize, queryOptions.getMax_row_size());
long minMemReservationBytes = 2 * bufferSize;
long maxMemReservationBytes = Math.max(
queryOptions.getMax_result_spooling_mem(), minMemReservationBytes);
PlanNode inputNode = fragment_.getPlanRoot();
long memEstimateBytes;
if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) {
memEstimateBytes = DEFAULT_RESULT_SPOOLING_ESTIMATED_MEMORY;
} else {
long inputCardinality = Math.max(1L, inputNode.getCardinality());
memEstimateBytes = (long) Math.ceil(inputCardinality * inputNode.getAvgRowSize());
}
memEstimateBytes = Math.min(memEstimateBytes, maxMemReservationBytes);
resourceProfile_ = new ResourceProfileBuilder()
.setMemEstimateBytes(memEstimateBytes)
.setMinMemReservationBytes(minMemReservationBytes)
.setMaxMemReservationBytes(maxMemReservationBytes)
.setMaxRowBufferBytes(maxRowBufferSize)
.setSpillableBufferBytes(bufferSize)
.build();
} else {
resourceProfile_ = ResourceProfile.noReservation(0);
}
}
@Override
protected void toThriftImpl(TDataSink tsink) {
TPlanRootSink tPlanRootSink = new TPlanRootSink(resourceProfile_.toThrift());
tsink.setPlan_root_sink(tPlanRootSink);
tsink.output_exprs = Expr.treesToThrift(outputExprs_);
}
@Override
protected TDataSinkType getSinkType() {
return TDataSinkType.PLAN_ROOT_SINK;
}
@Override
public void collectExprs(List<Expr> exprs) {
exprs.addAll(outputExprs_);
}
}