blob: 010affcba36ee534a80a629dcc0f58273b0dfb06 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import com.google.common.base.Preconditions;
import com.google.common.math.LongMath;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TQueryOptions;
import java.math.RoundingMode;
import java.util.List;
import java.util.function.Supplier;
/**
* A base class that encapsulate processing cost which models a total cost or amount
* of work shared across all instances of specific {@link PlanNode}, {@link DataSink}, or
* {@link PlanFragment}.
*/
public abstract class ProcessingCost implements Cloneable {
public static ProcessingCost invalid() { return new BaseProcessingCost(-1, 1, 0); }
public static ProcessingCost zero() { return new BaseProcessingCost(0, 1, 0); }
public static ProcessingCost maxCost(ProcessingCost a, ProcessingCost b) {
return (a.getTotalCost() >= b.getTotalCost()) ? a : b;
}
public static ProcessingCost sumCost(ProcessingCost a, ProcessingCost b) {
return new SumProcessingCost(a, b);
}
public static ProcessingCost scaleCost(ProcessingCost cost, long factor) {
return new ScaledProcessingCost(cost, factor);
}
public static ProcessingCost broadcastCost(
ProcessingCost cost, Supplier<Integer> numInstanceSupplier) {
return new BroadcastProcessingCost(cost, numInstanceSupplier);
}
protected static void tryAdjustConsumerParallelism(int nodeStepCount,
int minParallelism, int maxParallelism, ProcessingCost producer,
ProcessingCost consumer) {
Preconditions.checkState(consumer.getNumInstancesExpected() > 0);
Preconditions.checkState(producer.getNumInstancesExpected() > 0);
if (producer.getCostPerRowProduced() > 0
&& (consumer.canReducedBy(nodeStepCount, minParallelism, producer)
|| (consumer.canIncreaseBy(nodeStepCount, maxParallelism, producer)))) {
// Adjust consumer's concurrency following producer's parallelism and their
// produce-consume rate ratio.
float consProdRatio = consumer.consumerProducerRatio(producer);
int adjustedCount = (int) Math.ceil(consProdRatio
* producer.getNumInstancesExpected() / nodeStepCount)
* nodeStepCount;
final int finalCount =
Math.max(minParallelism, Math.min(maxParallelism, adjustedCount));
consumer.setNumInstanceExpected(() -> finalCount);
} else if (maxParallelism < consumer.getNumInstancesExpected()) {
consumer.setNumInstanceExpected(() -> maxParallelism);
}
}
private static ProcessingCost computeValidBaseCost(
long cardinality, float exprsCost, float materializationCost) {
return new BaseProcessingCost(
Math.max(0, cardinality), exprsCost, materializationCost);
}
public static ProcessingCost basicCost(
String label, long cardinality, float exprsCost, float materializationCost) {
ProcessingCost processingCost =
computeValidBaseCost(cardinality, exprsCost, materializationCost);
processingCost.setLabel(label);
return processingCost;
}
public static ProcessingCost basicCost(
String label, long cardinality, float exprsCost) {
ProcessingCost processingCost = computeValidBaseCost(cardinality, exprsCost, 0);
processingCost.setLabel(label);
return processingCost;
}
/**
* Merge multiple ProcessingCost into a single new ProcessingCost.
* <p>
* The resulting ProcessingCost will have the total cost, number of rows produced,
* and number of rows consumed as a sum of respective properties of all ProcessingCost
* in the given list. Meanwhile, the number of instances expected is the maximum among
* all ProcessingCost is the list.
*
* @param costs list of all ProcessingCost to merge.
* @return A new combined ProcessingCost.
*/
protected static ProcessingCost fullMergeCosts(List<ProcessingCost> costs) {
Preconditions.checkNotNull(costs);
Preconditions.checkArgument(!costs.isEmpty());
ProcessingCost resultingCost = ProcessingCost.zero();
long inputCardinality = 0;
long outputCardinality = 0;
int maxProducerParallelism = 1;
for (ProcessingCost cost : costs) {
resultingCost = ProcessingCost.sumCost(resultingCost, cost);
inputCardinality += cost.getNumRowToConsume();
outputCardinality += cost.getNumRowToProduce();
maxProducerParallelism =
Math.max(maxProducerParallelism, cost.getNumInstancesExpected());
}
resultingCost.setNumRowToConsume(inputCardinality);
resultingCost.setNumRowToProduce(outputCardinality);
final int finalProducerParallelism = maxProducerParallelism;
resultingCost.setNumInstanceExpected(() -> finalProducerParallelism);
return resultingCost;
}
protected Supplier<Integer> numInstanceSupplier_ = null;
private long numRowToProduce_ = 0;
private long numRowToConsume_ = 0;
private String label_ = null;
private boolean isSetNumRowToProduce_ = false;
private boolean isSetNumRowToConsume_ = false;
public abstract long getTotalCost();
public abstract boolean isValid();
public abstract ProcessingCost clone();
public String getDetails() {
StringBuilder output = new StringBuilder();
output.append("cost-total=")
.append(getTotalCost())
.append(" max-instances=")
.append(getNumInstanceMax());
if (hasAdjustedInstanceCount()) {
output.append(" adj-instances=").append(getNumInstancesExpected());
}
output.append(" cost/inst=")
.append(getPerInstanceCost())
.append(" #cons:#prod=")
.append(numRowToConsume_)
.append(":")
.append(numRowToProduce_);
if (isSetNumRowToConsume_ && isSetNumRowToProduce_) {
output.append(" reduction=").append(getReduction());
}
if (isSetNumRowToConsume_) {
output.append(" cost/cons=").append(getCostPerRowConsumed());
}
if (isSetNumRowToProduce_) {
output.append(" cost/prod=").append(getCostPerRowProduced());
}
return output.toString();
}
public String debugString() {
StringBuilder output = new StringBuilder();
if (label_ != null) {
output.append(label_);
output.append("=");
}
output.append(this);
return output.toString();
}
@Override
public String toString() {
return "{" + getDetails() + "}";
}
public String getExplainString(String detailPrefix, boolean fullExplain) {
return detailPrefix + getDetails();
}
public void setNumInstanceExpected(Supplier<Integer> countSupplier) {
Preconditions.checkArgument(
countSupplier.get() > 0, "Number of instance must be greater than 0!");
numInstanceSupplier_ = countSupplier;
}
public int getNumInstancesExpected() {
return hasAdjustedInstanceCount() ? numInstanceSupplier_.get() : getNumInstanceMax();
}
private boolean hasAdjustedInstanceCount() {
return numInstanceSupplier_ != null && numInstanceSupplier_.get() > 0;
}
protected int getNumInstanceMax() { return getNumInstanceMax(1); }
protected int getNumInstanceMax(int numNodes) {
long maxParallelism = LongMath.divide(getTotalCost(),
BackendConfig.INSTANCE.getMinProcessingPerThread(), RoundingMode.CEILING);
return roundUpNumNodeMultiple(maxParallelism, numNodes);
}
protected static int roundUpNumNodeMultiple(long parallelism, int numNodes) {
// Round up to the nearest multiple of numNodes.
// Little over-parallelize is better than under-parallelize.
long maxParallelism =
LongMath.divide(parallelism, numNodes, RoundingMode.CEILING) * numNodes;
if (maxParallelism <= 0) {
maxParallelism = 1;
} else if (maxParallelism > Integer.MAX_VALUE) {
// Floor Integer.MAX_VALUE to the nearest multiple of numNodes.
maxParallelism = Integer.MAX_VALUE - (Integer.MAX_VALUE % numNodes);
}
return (int) maxParallelism;
}
/**
* Set num rows to produce.
*
* @param numRowToProduce Number of rows to produce by plan node or data sink associated
* with this cost. Assume 0 rows if negative value is given.
*/
public void setNumRowToProduce(long numRowToProduce) {
numRowToProduce_ = Math.max(0, numRowToProduce);
isSetNumRowToProduce_ = true;
}
/**
* Set num rows to consume.
*
* @param numRowToConsume Number of rows to consume by plan node or data sink associated
* with this cost. Assume 0 rows if negative value is given.
*/
protected void setNumRowToConsume(long numRowToConsume) {
numRowToConsume_ = Math.max(0, numRowToConsume);
isSetNumRowToConsume_ = true;
}
public void setLabel(String label) { label_ = label; }
public long getNumRowToConsume() { return numRowToConsume_; }
public long getNumRowToProduce() { return numRowToProduce_; }
private int getPerInstanceCost() {
Preconditions.checkState(getNumInstancesExpected() > 0);
return (int) Math.ceil((float) getTotalCost() / getNumInstancesExpected());
}
private float getReduction() {
return (float) numRowToConsume_ / Math.max(1, numRowToProduce_);
}
private float getCostPerRowProduced() {
return (float) getTotalCost() / Math.max(1, numRowToProduce_);
}
private float getCostPerRowConsumed() {
return (float) getTotalCost() / Math.max(1, numRowToConsume_);
}
private float instanceRatio(ProcessingCost other) {
Preconditions.checkState(getNumInstancesExpected() > 0);
return (float) getNumInstancesExpected() / other.getNumInstancesExpected();
}
private float consumerProducerRatio(ProcessingCost other) {
return getCostPerRowConsumed() / Math.max(1, other.getCostPerRowProduced());
}
private boolean isAtLowestInstanceRatio(
int nodeStepCount, int minParallelism, ProcessingCost other) {
if (getNumInstancesExpected() - nodeStepCount < minParallelism) {
return true;
} else {
float lowerRatio = (float) (getNumInstancesExpected() - nodeStepCount)
/ other.getNumInstancesExpected();
return lowerRatio < consumerProducerRatio(other);
}
}
private boolean isAtHighestInstanceRatio(
int nodeStepCount, int maxInstance, ProcessingCost other) {
if (getNumInstancesExpected() + nodeStepCount > maxInstance) {
return true;
} else {
float higherRatio = (float) (getNumInstancesExpected() + nodeStepCount)
/ other.getNumInstancesExpected();
return higherRatio > consumerProducerRatio(other);
}
}
private boolean canReducedBy(
int nodeStepCount, int minParallelism, ProcessingCost other) {
return !isAtLowestInstanceRatio(nodeStepCount, minParallelism, other)
&& consumerProducerRatio(other) < instanceRatio(other);
}
private boolean canIncreaseBy(
int nodeStepCount, int maxInstance, ProcessingCost other) {
return !isAtHighestInstanceRatio(nodeStepCount, maxInstance, other)
&& consumerProducerRatio(other) > instanceRatio(other);
}
}