blob: eb741dea88ac54ecb947cccc4fafb5bce94e43aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.costs;
import org.apache.flink.optimizer.dag.EstimateProvider;
/**
* A default cost estimator that has access to basic size and cardinality estimates.
*
* <p>This estimator works with actual estimates (as far as they are available) and falls back to
* setting relative costs, if no estimates are available. That way, the estimator makes sure that
* plans with different strategies are costed differently, also in the absence of estimates. The
* different relative costs in the absence of estimates represent this estimator's heuristic
* guidance towards certain strategies.
*
* <p>For robustness reasons, we always assume that the whole data is shipped during a repartition
* step. We deviate from the typical estimate of <code>(n - 1) / n</code> (with <i>n</i> being the
* number of nodes), because for a parallelism of 1, that would yield a shipping of zero bytes.
* While this is usually correct, the runtime scheduling may still choose to move tasks to different
* nodes, so that we do not know that no data is shipped.
*/
public class DefaultCostEstimator extends CostEstimator {
/**
* The case of the estimation for all relative costs. We heuristically pick a very large data
* volume, which will favor strategies that are less expensive on large data volumes. This is
* robust and
*/
private static final long HEURISTIC_COST_BASE = 1000000000L;
// The numbers for the CPU effort are rather magic at the moment and should be seen rather
// ordinal
private static final float MATERIALIZATION_CPU_FACTOR = 1;
private static final float HASHING_CPU_FACTOR = 4;
private static final float SORTING_CPU_FACTOR = 9;
// --------------------------------------------------------------------------------------------
// Shipping Strategy Cost
// --------------------------------------------------------------------------------------------
@Override
public void addRandomPartitioningCost(EstimateProvider estimates, Costs costs) {
// conservative estimate: we need ship the whole data over the network to establish the
// partitioning. no disk costs.
final long estOutShipSize = estimates.getEstimatedOutputSize();
if (estOutShipSize <= 0) {
costs.setNetworkCost(Costs.UNKNOWN);
} else {
costs.addNetworkCost(estOutShipSize);
}
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE);
}
@Override
public void addHashPartitioningCost(EstimateProvider estimates, Costs costs) {
// conservative estimate: we need ship the whole data over the network to establish the
// partitioning. no disk costs.
final long estOutShipSize = estimates.getEstimatedOutputSize();
if (estOutShipSize <= 0) {
costs.setNetworkCost(Costs.UNKNOWN);
} else {
costs.addNetworkCost(estOutShipSize);
}
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE);
}
@Override
public void addRangePartitionCost(EstimateProvider estimates, Costs costs) {
final long dataSize = estimates.getEstimatedOutputSize();
if (dataSize > 0) {
// Assume sampling of 10% of the data and spilling it to disk
final long sampled = (long) (dataSize * 0.1f);
// set shipping costs
costs.addNetworkCost(dataSize + sampled);
} else {
costs.setNetworkCost(Costs.UNKNOWN);
}
// no costs known. use the same assumption as above on the heuristic costs
final long sampled = (long) (HEURISTIC_COST_BASE * 0.1f);
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE + sampled);
costs.addHeuristicDiskCost(2 * sampled);
}
@Override
public void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs) {
// if our replication factor is negative, we cannot calculate broadcast costs
if (replicationFactor <= 0) {
throw new IllegalArgumentException(
"The replication factor of must be larger than zero.");
}
if (replicationFactor > 0) {
// assumption: we need ship the whole data over the network to each node.
final long estOutShipSize = estimates.getEstimatedOutputSize();
if (estOutShipSize <= 0) {
costs.setNetworkCost(Costs.UNKNOWN);
} else {
costs.addNetworkCost(replicationFactor * estOutShipSize);
}
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 10 * replicationFactor);
} else {
costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 1000);
}
}
// --------------------------------------------------------------------------------------------
// Local Strategy Cost
// --------------------------------------------------------------------------------------------
@Override
public void addFileInputCost(long fileSizeInBytes, Costs costs) {
if (fileSizeInBytes >= 0) {
costs.addDiskCost(fileSizeInBytes);
} else {
costs.setDiskCost(Costs.UNKNOWN);
}
costs.addHeuristicDiskCost(HEURISTIC_COST_BASE);
}
@Override
public void addLocalSortCost(EstimateProvider estimates, Costs costs) {
final long s = estimates.getEstimatedOutputSize();
// we assume a two phase merge sort, so all in all 2 I/O operations per block
if (s <= 0) {
costs.setDiskCost(Costs.UNKNOWN);
costs.setCpuCost(Costs.UNKNOWN);
} else {
costs.addDiskCost(2 * s);
costs.addCpuCost((long) (s * SORTING_CPU_FACTOR));
}
costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * SORTING_CPU_FACTOR));
}
@Override
public void addLocalMergeCost(
EstimateProvider input1, EstimateProvider input2, Costs costs, int costWeight) {
// costs nothing. the very rarely incurred cost for a spilling block nested loops join in
// the
// presence of massively re-occurring duplicate keys is ignored, because cannot be assessed
}
@Override
public void addHybridHashCosts(
EstimateProvider buildSideInput,
EstimateProvider probeSideInput,
Costs costs,
int costWeight) {
long bs = buildSideInput.getEstimatedOutputSize();
long ps = probeSideInput.getEstimatedOutputSize();
if (bs > 0 && ps > 0) {
long overall = 2 * bs + ps;
costs.addDiskCost(overall);
costs.addCpuCost((long) (overall * HASHING_CPU_FACTOR));
} else {
costs.setDiskCost(Costs.UNKNOWN);
costs.setCpuCost(Costs.UNKNOWN);
}
costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
costs.addHeuristicCpuCost((long) (2 * HEURISTIC_COST_BASE * HASHING_CPU_FACTOR));
// cost weight applies to everything
costs.multiplyWith(costWeight);
}
/**
* Calculates the costs for the cached variant of the hybrid hash join. We are assuming by
* default that half of the cached hash table fit into memory.
*/
@Override
public void addCachedHybridHashCosts(
EstimateProvider buildSideInput,
EstimateProvider probeSideInput,
Costs costs,
int costWeight) {
if (costWeight < 1) {
throw new IllegalArgumentException("The cost weight must be at least one.");
}
long bs = buildSideInput.getEstimatedOutputSize();
long ps = probeSideInput.getEstimatedOutputSize();
if (bs > 0 && ps > 0) {
long overall = 2 * bs + costWeight * ps;
costs.addDiskCost(overall);
costs.addCpuCost((long) (overall * HASHING_CPU_FACTOR));
} else {
costs.setDiskCost(Costs.UNKNOWN);
costs.setCpuCost(Costs.UNKNOWN);
}
// one time the build side plus cost-weight time the probe side
costs.addHeuristicDiskCost((1 + costWeight) * HEURISTIC_COST_BASE);
costs.addHeuristicCpuCost(
(long) ((1 + costWeight) * HEURISTIC_COST_BASE * HASHING_CPU_FACTOR));
}
@Override
public void addStreamedNestedLoopsCosts(
EstimateProvider outerSide,
EstimateProvider innerSide,
long bufferSize,
Costs costs,
int costWeight) {
long is = innerSide.getEstimatedOutputSize();
long oc = outerSide.getEstimatedNumRecords();
if (is > 0 && oc >= 0) {
// costs, if the inner side cannot be cached
if (is > bufferSize) {
costs.addDiskCost(oc * is);
}
costs.addCpuCost((long) (oc * is * MATERIALIZATION_CPU_FACTOR));
} else {
costs.setDiskCost(Costs.UNKNOWN);
costs.setCpuCost(Costs.UNKNOWN);
}
// hack: assume 100k loops (should be expensive enough)
costs.addHeuristicDiskCost(HEURISTIC_COST_BASE * 100000);
costs.addHeuristicCpuCost(
(long) (HEURISTIC_COST_BASE * 100000 * MATERIALIZATION_CPU_FACTOR));
costs.multiplyWith(costWeight);
}
@Override
public void addBlockNestedLoopsCosts(
EstimateProvider outerSide,
EstimateProvider innerSide,
long blockSize,
Costs costs,
int costWeight) {
long is = innerSide.getEstimatedOutputSize();
long os = outerSide.getEstimatedOutputSize();
if (is > 0 && os > 0) {
long loops = Math.max(os / blockSize, 1);
costs.addDiskCost(loops * is);
costs.addCpuCost((long) (loops * is * MATERIALIZATION_CPU_FACTOR));
} else {
costs.setDiskCost(Costs.UNKNOWN);
costs.setCpuCost(Costs.UNKNOWN);
}
// hack: assume 1k loops (much cheaper than the streamed variant!)
costs.addHeuristicDiskCost(HEURISTIC_COST_BASE * 1000);
costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * 1000 * MATERIALIZATION_CPU_FACTOR));
costs.multiplyWith(costWeight);
}
// --------------------------------------------------------------------------------------------
// Damming Cost
// --------------------------------------------------------------------------------------------
@Override
public void addArtificialDamCost(EstimateProvider estimates, long bufferSize, Costs costs) {
final long s = estimates.getEstimatedOutputSize();
// we assume spilling and re-reading
if (s <= 0) {
costs.setDiskCost(Costs.UNKNOWN);
costs.setCpuCost(Costs.UNKNOWN);
} else {
costs.addDiskCost(2 * s);
costs.setCpuCost((long) (s * MATERIALIZATION_CPU_FACTOR));
}
costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * MATERIALIZATION_CPU_FACTOR));
}
}