| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.wayang.core.optimizer; |
| |
| import org.apache.wayang.core.api.Configuration; |
| import org.apache.wayang.core.api.Job; |
| import org.apache.wayang.core.api.exception.WayangException; |
| import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate; |
| import org.apache.wayang.core.optimizer.channels.ChannelConversionGraph; |
| import org.apache.wayang.core.optimizer.costs.EstimationContext; |
| import org.apache.wayang.core.optimizer.costs.LoadProfile; |
| import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator; |
| import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators; |
| import org.apache.wayang.core.optimizer.costs.LoadProfileToTimeConverter; |
| import org.apache.wayang.core.optimizer.costs.TimeEstimate; |
| import org.apache.wayang.core.optimizer.costs.TimeToCostConverter; |
| import org.apache.wayang.core.optimizer.enumeration.PlanEnumerationPruningStrategy; |
| import org.apache.wayang.core.plan.wayangplan.CompositeOperator; |
| import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; |
| import org.apache.wayang.core.plan.wayangplan.InputSlot; |
| import org.apache.wayang.core.plan.wayangplan.LoopHeadOperator; |
| import org.apache.wayang.core.plan.wayangplan.LoopSubplan; |
| import org.apache.wayang.core.plan.wayangplan.Operator; |
| import org.apache.wayang.core.plan.wayangplan.OperatorContainer; |
| import org.apache.wayang.core.plan.wayangplan.OutputSlot; |
| import org.apache.wayang.core.plan.wayangplan.PlanTraversal; |
| import org.apache.wayang.core.plan.wayangplan.WayangPlan; |
| import org.apache.wayang.core.plan.wayangplan.Slot; |
| import org.apache.wayang.core.platform.Platform; |
| import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; |
| import org.apache.wayang.core.util.ReflectionUtils; |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.function.ToDoubleFunction; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| /** |
| * Manages contextual information required during the optimization of a {@link WayangPlan}. |
| * <p>A single {@link Operator} can have multiple contexts in a {@link WayangPlan} - namely if it appears in a loop. |
| * We manage these contexts in a hierarchical fashion.</p> |
| */ |
| public abstract class OptimizationContext { |
| |
| protected final Logger logger = LogManager.getLogger(this.getClass()); |
| |
| /** |
| * The {@link Job} whose {@link WayangPlan} is to be optimized. |
| */ |
| protected final Job job; |
| |
| /** |
| * The instance in that this instance is nested - or {@code null} if it is top-level. |
| */ |
| protected final LoopContext hostLoopContext; |
| |
| /** |
| * The iteration number of this instance within its {@link #hostLoopContext} (starting from {@code 0}) - or {@code -1} |
| * if there is no {@link #hostLoopContext}. |
| */ |
| private int iterationNumber; |
| |
| /** |
| * Forked {@link OptimizationContext}s can have a base. |
| */ |
| private final OptimizationContext base; |
| |
| /** |
| * {@link ChannelConversionGraph} used for the optimization. |
| */ |
| private final ChannelConversionGraph channelConversionGraph; |
| |
| /** |
| * {@link PlanEnumerationPruningStrategy}s to be used during optimization (in the given order). |
| */ |
| private final List<PlanEnumerationPruningStrategy> pruningStrategies; |
| |
| /** |
| * Create a new, plain instance. |
| */ |
| public OptimizationContext(Job job) { |
| this( |
| job, |
| null, |
| null, |
| -1, |
| new ChannelConversionGraph(job.getConfiguration()), |
| initializePruningStrategies(job.getConfiguration()) |
| ); |
| } |
| |
| /** |
| * Creates a new instance. Useful for testing. |
| * |
| * @param operator the single {@link Operator} of this instance |
| */ |
| public OptimizationContext(Job job, Operator operator) { |
| this( |
| job, |
| null, |
| null, |
| -1, |
| new ChannelConversionGraph(job.getConfiguration()), |
| initializePruningStrategies(job.getConfiguration()) |
| ); |
| this.addOneTimeOperator(operator); |
| } |
| |
| /** |
| * Base constructor. |
| */ |
| protected OptimizationContext(Job job, |
| OptimizationContext base, |
| LoopContext hostLoopContext, |
| int iterationNumber, |
| ChannelConversionGraph channelConversionGraph, |
| List<PlanEnumerationPruningStrategy> pruningStrategies) { |
| this.job = job; |
| this.base = base; |
| this.hostLoopContext = hostLoopContext; |
| this.iterationNumber = iterationNumber; |
| this.channelConversionGraph = channelConversionGraph; |
| this.pruningStrategies = pruningStrategies; |
| } |
| |
| /** |
| * Initializes the {@link PlanEnumerationPruningStrategy}s from the {@link Configuration}. |
| * |
| * @param configuration defines the {@link PlanEnumerationPruningStrategy}s |
| * @return a {@link List} of configured {@link PlanEnumerationPruningStrategy}s |
| */ |
| private static List<PlanEnumerationPruningStrategy> initializePruningStrategies(Configuration configuration) { |
| return configuration.getPruningStrategyClassProvider().provideAll().stream() |
| .map(strategyClass -> OptimizationUtils.createPruningStrategy(strategyClass, configuration)) |
| .collect(Collectors.toList()); |
| } |
| |
| /** |
| * Add {@link OperatorContext}s for the {@code operator} that is executed once within this instance. Also |
| * add its encased {@link Operator}s. |
| * Potentially invoke {@link #addOneTimeLoop(OperatorContext)} as well. |
| */ |
| public abstract OperatorContext addOneTimeOperator(Operator operator); |
| |
| /** |
| * Add {@link OperatorContext}s for all the contained {@link Operator}s of the {@code container}. |
| */ |
| public void addOneTimeOperators(OperatorContainer container) { |
| final CompositeOperator compositeOperator = container.toOperator(); |
| final Stream<Operator> innerOutputOperatorStream = compositeOperator.isSink() ? |
| Stream.of(container.getSink()) : |
| Arrays.stream(compositeOperator.getAllOutputs()) |
| .map(container::traceOutput) |
| .filter(Objects::nonNull) |
| .map(Slot::getOwner); |
| PlanTraversal.upstream() |
| .withCallback(this::addOneTimeOperator) |
| .traverse(innerOutputOperatorStream); |
| } |
| |
| /** |
| * Add {@link OptimizationContext}s for the {@code loop} that is executed once within this instance. |
| */ |
| public abstract void addOneTimeLoop(OperatorContext operatorContext); |
| |
| /** |
| * Return the {@link OperatorContext} of the {@code operator}. |
| * |
| * @param operator a one-time {@link Operator} (i.e., not in a nested loop) |
| * @return the {@link OperatorContext} for the {@link Operator} or {@code null} if none |
| */ |
| public abstract OperatorContext getOperatorContext(Operator operator); |
| |
| /** |
| * Retrieve the {@link LoopContext} for the {@code loopSubplan}. |
| */ |
| public abstract LoopContext getNestedLoopContext(LoopSubplan loopSubplan); |
| |
| /** |
| * @return if this instance describes an iteration within a {@link LoopSubplan}, return the number of that iteration |
| * (starting at {@code 0}); otherwise {@code -1} |
| */ |
| public int getIterationNumber() { |
| return this.iterationNumber; |
| } |
| |
| /** |
| * @return whether this instance is the first iteration within a {@link LoopContext}; this instance must be embedded |
| * in a {@link LoopContext} |
| */ |
| public boolean isInitialIteration() { |
| assert this.hostLoopContext != null : "Not within a LoopContext."; |
| return this.iterationNumber == 0; |
| } |
| |
| |
| /** |
| * @return whether this instance is the final iteration within a {@link LoopContext}; this instance must be embedded |
| * in a {@link LoopContext} |
| */ |
| public boolean isFinalIteration() { |
| assert this.hostLoopContext != null; |
| return this.iterationNumber == this.hostLoopContext.getIterationContexts().size() - 1; |
| } |
| |
| /** |
| * Retrieve the {@link LoopContext} in which this instance resides. |
| * |
| * @return the {@link LoopContext} or {@code null} if none |
| */ |
| public LoopContext getLoopContext() { |
| return this.hostLoopContext; |
| } |
| |
| /** |
| * @return if this instance describes an iteration within a {@link LoopSubplan}, return the instance in which |
| * this instance is nested |
| */ |
| public OptimizationContext getParent() { |
| return this.hostLoopContext == null ? null : this.hostLoopContext.getOptimizationContext(); |
| } |
| |
| public OptimizationContext getNextIterationContext() { |
| assert this.hostLoopContext != null : String.format("%s is the last iteration.", this); |
| assert !this.isFinalIteration(); |
| return this.hostLoopContext.getIterationContexts().get(this.iterationNumber + 1); |
| } |
| |
| /** |
| * Calls {@link OperatorContext#clearMarks()} for all nested {@link OperatorContext}s. |
| */ |
| public abstract void clearMarks(); |
| |
| public Configuration getConfiguration() { |
| return this.job.getConfiguration(); |
| } |
| |
| /** |
| * @return the {@link OperatorContext}s of this instance (exclusive of any base instance) |
| */ |
| public abstract Map<Operator, OperatorContext> getLocalOperatorContexts(); |
| |
| /** |
| * @return whether there is a {@link TimeEstimate} for each {@link ExecutionOperator} |
| */ |
| public abstract boolean isTimeEstimatesComplete(); |
| |
| public ChannelConversionGraph getChannelConversionGraph() { |
| return this.channelConversionGraph; |
| } |
| |
| public OptimizationContext getBase() { |
| return this.base; |
| } |
| |
| public abstract void mergeToBase(); |
| |
| public List<PlanEnumerationPruningStrategy> getPruningStrategies() { |
| return this.pruningStrategies; |
| } |
| |
| /** |
| * Get the top-level parent containing this instance. |
| * |
| * @return the top-level parent, which can also be this instance |
| */ |
| public OptimizationContext getRootParent() { |
| OptimizationContext optimizationContext = this; |
| while (true) { |
| final OptimizationContext parent = optimizationContext.getParent(); |
| if (parent == null) return optimizationContext; |
| optimizationContext = parent; |
| } |
| } |
| |
| /** |
| * Get the {@link DefaultOptimizationContext}s represented by this instance. |
| * |
| * @return a {@link Collection} of said {@link DefaultOptimizationContext}s |
| */ |
| public abstract List<DefaultOptimizationContext> getDefaultOptimizationContexts(); |
| |
| /** |
| * Provide the {@link Job} whose optimization is supported by this instance. |
| * |
| * @return the {@link Job} |
| */ |
| public Job getJob() { |
| return this.job; |
| } |
| |
| /** |
| * Stores a value into the {@link Job}-global cache. |
| * |
| * @param key identifies the value |
| * @param value the value |
| * @return the value previously associated with the key or else {@code null} |
| */ |
| public Object putIntoJobCache(String key, Object value) { |
| return this.getJob().getCache().put(key, value); |
| } |
| |
| /** |
| * Queries the {@link Job}-global cache. |
| * |
| * @param key that is associated with the value to be retrieved |
| * @return the value associated with the key or else {@code null} |
| */ |
| public Object queryJobCache(String key) { |
| return this.getJob().getCache().get(key); |
| } |
| |
| /** |
| * Queries the {@link Job}-global cache. |
| * |
| * @param key that is associated with the value to be retrieved |
| * @param resultClass the expected {@link Class} of the retrieved value |
| * @return the value associated with the key or else {@code null} |
| */ |
| public <T> T queryJobCache(String key, Class<T> resultClass) { |
| final Object value = this.queryJobCache(key); |
| try { |
| return resultClass.cast(value); |
| } catch (ClassCastException e) { |
| throw new WayangException("Job-cache value cannot be casted as requested.", e); |
| } |
| } |
| |
| /** |
| * Represents a single optimization context of an {@link Operator}. This can be thought of as a single, virtual |
| * execution of the {@link Operator}. |
| */ |
| public class OperatorContext implements EstimationContext { |
| |
| /** |
| * The {@link Operator} that is being decorated with this instance. |
| */ |
| private final Operator operator; |
| |
| /** |
| * {@link CardinalityEstimate}s that align with the {@link #operator}s {@link InputSlot}s and |
| * {@link OutputSlot}s. |
| */ |
| private final CardinalityEstimate[] inputCardinalities, outputCardinalities; |
| |
| /** |
| * Used to mark changed {@link #inputCardinalities} and {@link #outputCardinalities}. |
| */ |
| private final boolean[] inputCardinalityMarkers, outputCardinalityMarkers; |
| |
| /** |
| * {@link LoadProfile} of the {@link Operator}. |
| */ |
| private LoadProfile loadProfile; |
| |
| /** |
| * {@link TimeEstimate} for the {@link ExecutionOperator}. |
| */ |
| protected TimeEstimate timeEstimate; |
| |
| /** |
| * Cost estimate for the {@link ExecutionOperator}. |
| */ |
| private ProbabilisticDoubleInterval costEstimate; |
| |
| /** |
| * The squashed version of the {@link #costEstimate}. |
| */ |
| private double squashedCostEstimate; |
| |
| /** |
| * Reflects the number of executions of the {@link #operator}. This, e.g., relevant in {@link LoopContext}s. |
| */ |
| private int numExecutions = 1; |
| |
| /** |
| * In case this instance corresponds to an execution, this field keeps track of this execution. |
| */ |
| private ExecutionLineageNode lineage; |
| |
| /** |
| * Creates a new instance. |
| */ |
| protected OperatorContext(Operator operator) { |
| this.operator = operator; |
| this.inputCardinalities = new CardinalityEstimate[this.operator.getNumInputs()]; |
| this.inputCardinalityMarkers = new boolean[this.inputCardinalities.length]; |
| this.outputCardinalities = new CardinalityEstimate[this.operator.getNumOutputs()]; |
| this.outputCardinalityMarkers = new boolean[this.outputCardinalities.length]; |
| } |
| |
| public Operator getOperator() { |
| return this.operator; |
| } |
| |
| public CardinalityEstimate getOutputCardinality(int outputIndex) { |
| return this.outputCardinalities[outputIndex]; |
| } |
| |
| public CardinalityEstimate getInputCardinality(int inputIndex) { |
| return this.inputCardinalities[inputIndex]; |
| } |
| |
| public boolean isInputMarked(int inputIndex) { |
| return this.inputCardinalityMarkers[inputIndex]; |
| } |
| |
| public boolean isOutputMarked(int outputIndex) { |
| return this.outputCardinalityMarkers[outputIndex]; |
| } |
| |
| /** |
| * Resets the marks for all {@link InputSlot}s and {@link OutputSlot}s. |
| */ |
| public void clearMarks() { |
| Arrays.fill(this.inputCardinalityMarkers, false); |
| Arrays.fill(this.outputCardinalityMarkers, false); |
| } |
| |
| @Override |
| public CardinalityEstimate[] getInputCardinalities() { |
| return this.inputCardinalities; |
| } |
| |
| @Override |
| public CardinalityEstimate[] getOutputCardinalities() { |
| return this.outputCardinalities; |
| } |
| |
| /** |
| * Sets the {@link CardinalityEstimate} for a certain {@link OutputSlot}. If the {@link CardinalityEstimate} |
| * changes, the {@link OutputSlot} is marked. |
| */ |
| public void setInputCardinality(int inputIndex, CardinalityEstimate cardinality) { |
| this.inputCardinalityMarkers[inputIndex] |= !Objects.equals(this.inputCardinalities[inputIndex], cardinality); |
| if (OptimizationContext.this.logger.isDebugEnabled() && this.inputCardinalityMarkers[inputIndex]) { |
| OptimizationContext.this.logger.debug( |
| "Changing cardinality of {} from {} to {}.", |
| this.operator.getInput(inputIndex), |
| this.inputCardinalities[inputIndex], |
| cardinality |
| ); |
| } |
| this.inputCardinalities[inputIndex] = cardinality; |
| |
| // Invalidate dependent estimate caches. |
| this.timeEstimate = null; |
| this.costEstimate = null; |
| } |
| |
| /** |
| * Sets the {@link CardinalityEstimate} for a certain {@link InputSlot}. If the {@link CardinalityEstimate} |
| * changes, the {@link InputSlot} is marked. |
| */ |
| public void setOutputCardinality(int outputIndex, CardinalityEstimate cardinality) { |
| this.outputCardinalityMarkers[outputIndex] |= !Objects.equals(this.outputCardinalities[outputIndex], cardinality); |
| if (OptimizationContext.this.logger.isDebugEnabled() && this.outputCardinalityMarkers[outputIndex]) { |
| OptimizationContext.this.logger.debug( |
| "Changing cardinality of {} from {} to {}.", |
| this.operator.getOutput(outputIndex), |
| this.outputCardinalities[outputIndex], |
| cardinality |
| ); |
| } |
| this.outputCardinalities[outputIndex] = cardinality; |
| |
| // Invalidate dependent estimate caches. |
| this.timeEstimate = null; |
| this.costEstimate = null; |
| } |
| |
| /** |
| * Push forward all marked {@link CardinalityEstimate}s within the same {@link OptimizationContext}. |
| * |
| * @see Operator#propagateOutputCardinality(int, OperatorContext) |
| */ |
| public void pushCardinalitiesForward() { |
| OptimizationContext targetContext = this.getOptimizationContext(); |
| for (int outputIndex = 0; outputIndex < this.outputCardinalities.length; outputIndex++) { |
| this.pushCardinalityForward(outputIndex, targetContext); |
| } |
| } |
| |
| /** |
| * Pushes the {@link CardinalityEstimate} corresponding to the {@code outputIndex} forward to the |
| * {@code targetContext} if it is marked. |
| */ |
| public void pushCardinalityForward(int outputIndex, OptimizationContext targetContext) { |
| if (!this.outputCardinalityMarkers[outputIndex]) return; |
| this.operator.propagateOutputCardinality(outputIndex, this, targetContext); |
| } |
| |
| @Override |
| public double getDoubleProperty(String propertyKey, double fallback) { |
| try { |
| return ReflectionUtils.toDouble(ReflectionUtils.getProperty(this.operator, propertyKey)); |
| } catch (Exception e) { |
| logger.error("Could not retrieve property \"{}\" from {}.", propertyKey, this.operator, e); |
| return fallback; |
| } |
| } |
| |
| @Override |
| public Collection<String> getPropertyKeys() { |
| return this.operator.getEstimationContextProperties(); |
| } |
| |
| /** |
| * @return the {@link OptimizationContext} in which this instance resides |
| */ |
| public OptimizationContext getOptimizationContext() { |
| return OptimizationContext.this; |
| } |
| |
| /** |
| * Update the {@link LoadProfile} and {@link TimeEstimate} of this instance. |
| */ |
| public void updateCostEstimate() { |
| this.updateCostEstimate(this.getOptimizationContext().getConfiguration()); |
| } |
| |
| /** |
| * Update the {@link LoadProfile} and {@link TimeEstimate} of this instance. |
| * |
| * @param configuration provides the necessary functions |
| */ |
| private void updateCostEstimate(Configuration configuration) { |
| if (!this.operator.isExecutionOperator()) return; |
| |
| // Estimate the LoadProfile. |
| final LoadProfileEstimator loadProfileEstimator = this.getLoadProfileEstimator(); |
| try { |
| this.loadProfile = LoadProfileEstimators.estimateLoadProfile(this, loadProfileEstimator); |
| } catch (Exception e) { |
| throw new WayangException(String.format("Load profile estimation for %s failed.", this.operator), e); |
| } |
| |
| // Calculate the TimeEstimate. |
| final ExecutionOperator executionOperator = (ExecutionOperator) this.operator; |
| final Platform platform = executionOperator.getPlatform(); |
| final LoadProfileToTimeConverter timeConverter = configuration.getLoadProfileToTimeConverterProvider().provideFor(platform); |
| this.timeEstimate = TimeEstimate.MINIMUM.plus(timeConverter.convert(this.loadProfile)); |
| if (OptimizationContext.this.logger.isDebugEnabled()) { |
| OptimizationContext.this.logger.debug( |
| "Setting time estimate of {} to {}.", this.operator, this.timeEstimate |
| ); |
| } |
| |
| // Calculate the cost estimate. |
| final TimeToCostConverter timeToCostConverter = configuration.getTimeToCostConverterProvider().provideFor(platform); |
| this.costEstimate = timeToCostConverter.convertWithoutFixCosts(this.timeEstimate); |
| |
| // Squash the cost estimate. |
| final ToDoubleFunction<ProbabilisticDoubleInterval> costSquasher = configuration.getCostSquasherProvider().provide(); |
| this.squashedCostEstimate = costSquasher.applyAsDouble(this.costEstimate); |
| } |
| |
| /** |
| * Get the {@link LoadProfileEstimator} for the {@link ExecutionOperator} in this instance. |
| * |
| * @return the {@link LoadProfileEstimator} |
| */ |
| public LoadProfileEstimator getLoadProfileEstimator() { |
| if (!(this.operator instanceof ExecutionOperator)) { |
| return null; |
| } |
| return this.getOptimizationContext().getConfiguration() |
| .getOperatorLoadProfileEstimatorProvider() |
| .provideFor((ExecutionOperator) this.operator); |
| } |
| |
| /** |
| * Merges {@code that} instance into this instance. The lineage is not merged, though. |
| * |
| * @param that the other instance |
| * @return this instance |
| */ |
| protected OperatorContext merge(OperatorContext that) { |
| assert this.operator == that.operator; |
| assert this.inputCardinalities.length == that.inputCardinalities.length; |
| assert this.outputCardinalities.length == that.outputCardinalities.length; |
| |
| System.arraycopy(that.inputCardinalities, 0, this.inputCardinalities, 0, that.inputCardinalities.length); |
| System.arraycopy(that.inputCardinalityMarkers, 0, this.inputCardinalityMarkers, 0, that.inputCardinalityMarkers.length); |
| System.arraycopy(that.outputCardinalities, 0, this.outputCardinalities, 0, that.outputCardinalities.length); |
| System.arraycopy(that.outputCardinalityMarkers, 0, this.outputCardinalityMarkers, 0, that.outputCardinalityMarkers.length); |
| |
| this.loadProfile = that.loadProfile; |
| this.timeEstimate = that.timeEstimate; |
| this.costEstimate = that.costEstimate; |
| this.squashedCostEstimate = that.squashedCostEstimate; |
| this.numExecutions = that.numExecutions; |
| |
| return this; |
| } |
| |
| /** |
| * Increases the {@link CardinalityEstimate}, {@link TimeEstimate}, and {@link ProbabilisticDoubleInterval cost estimate} |
| * of this instance by those of the given instance. |
| * |
| * @param that the increment |
| */ |
| public void increaseBy(OperatorContext that) { |
| assert this.operator.equals(that.operator); |
| this.addTo(this.inputCardinalities, that.inputCardinalities); |
| this.addTo(this.inputCardinalityMarkers, that.inputCardinalityMarkers); |
| this.addTo(this.outputCardinalities, that.outputCardinalities); |
| this.addTo(this.outputCardinalityMarkers, that.outputCardinalityMarkers); |
| if (that.costEstimate != null) { |
| if (this.costEstimate == null) { |
| this.loadProfile = that.loadProfile; |
| this.timeEstimate = that.timeEstimate; |
| this.costEstimate = that.costEstimate; |
| this.squashedCostEstimate = that.squashedCostEstimate; |
| } else { |
| this.loadProfile = this.loadProfile.plus(that.loadProfile); |
| this.timeEstimate = this.timeEstimate.plus(that.timeEstimate); |
| this.costEstimate = this.costEstimate.plus(that.costEstimate); |
| this.squashedCostEstimate += that.squashedCostEstimate; |
| } |
| } |
| this.numExecutions += that.numExecutions; |
| } |
| |
| /** |
| * Adds up {@link CardinalityEstimate}s. |
| * |
| * @param aggregate the accumulator |
| * @param delta the increment |
| */ |
| private void addTo(CardinalityEstimate[] aggregate, CardinalityEstimate[] delta) { |
| assert aggregate.length == delta.length; |
| for (int i = 0; i < aggregate.length; i++) { |
| CardinalityEstimate aggregateCardinality = aggregate[i]; |
| CardinalityEstimate deltaCardinality = delta[i]; |
| if (aggregateCardinality == null) { |
| aggregate[i] = deltaCardinality; |
| } else if (deltaCardinality != null) { |
| aggregate[i] = aggregateCardinality.plus(deltaCardinality); |
| } |
| } |
| } |
| |
| private void addTo(boolean[] aggregate, boolean[] delta) { |
| assert aggregate.length == delta.length; |
| for (int i = 0; i < aggregate.length; i++) { |
| aggregate[i] |= delta[i]; |
| } |
| } |
| |
| public void setNumExecutions(int numExecutions) { |
| this.numExecutions = numExecutions; |
| } |
| |
| public int getNumExecutions() { |
| return this.numExecutions; |
| } |
| |
| public LoadProfile getLoadProfile() { |
| if (this.loadProfile == null) { |
| this.updateCostEstimate(); |
| } |
| return this.loadProfile; |
| } |
| |
| public TimeEstimate getTimeEstimate() { |
| if (this.timeEstimate == null) { |
| this.updateCostEstimate(); |
| } |
| return this.timeEstimate; |
| } |
| |
| /** |
| * Get the estimated costs incurred by this instance (without fix costs). |
| * |
| * @return the cost estimate |
| */ |
| public ProbabilisticDoubleInterval getCostEstimate() { |
| if (this.costEstimate == null) { |
| this.updateCostEstimate(); |
| } |
| return this.costEstimate; |
| } |
| |
| /** |
| * Get the squashed estimated costs incurred by this instance (without fix costs). |
| * |
| * @return the squashed cost estimate |
| */ |
| public double getSquashedCostEstimate() { |
| if (this.costEstimate == null) { |
| this.updateCostEstimate(); |
| } |
| return this.squashedCostEstimate; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("%s[%s]", this.getClass().getSimpleName(), this.getOperator()); |
| } |
| |
| /** |
| * Resets the estimates of this instance. |
| */ |
| public void resetEstimates() { |
| Arrays.fill(this.inputCardinalities, null); |
| Arrays.fill(this.inputCardinalityMarkers, false); |
| Arrays.fill(this.outputCardinalities, null); |
| Arrays.fill(this.outputCardinalityMarkers, false); |
| this.loadProfile = null; |
| this.timeEstimate = null; |
| this.costEstimate = null; |
| this.squashedCostEstimate = 0d; |
| } |
| } |
| |
| /** |
| * Maintains {@link OptimizationContext}s for the iterations of a {@link LoopSubplan}. |
| */ |
| public class LoopContext { |
| |
| private final OperatorContext loopSubplanContext; |
| |
| private final List<OptimizationContext> iterationContexts; |
| |
| private AggregateOptimizationContext aggregateOptimizationContext; |
| |
| protected LoopContext(OperatorContext loopSubplanContext) { |
| assert loopSubplanContext.getOptimizationContext() == OptimizationContext.this; |
| assert loopSubplanContext.getOperator() instanceof LoopSubplan; |
| |
| this.loopSubplanContext = loopSubplanContext; |
| |
| LoopSubplan loop = (LoopSubplan) loopSubplanContext.getOperator(); |
| final int numIterationContexts = loop.getNumExpectedIterations() + 1; |
| this.iterationContexts = new ArrayList<>(numIterationContexts); |
| for (int iterationNumber = 0; iterationNumber < numIterationContexts; iterationNumber++) { |
| this.iterationContexts.add( |
| new DefaultOptimizationContext(loop, this, iterationNumber) |
| ); |
| } |
| } |
| |
| public OperatorContext getLoopSubplanContext() { |
| return this.loopSubplanContext; |
| } |
| |
| /** |
| * Retrieves the iteration {@link OptimizationContext}s. |
| * <p> |
| * <p>Note that for {@code n} iterations, there are |
| * {@code n+1} {@link OptimizationContext}s because the {@link LoopHeadOperator} is triggered {@code n+1} times. |
| * The first {@code n} represent the iterations, the final represents the final state of the loop, in which |
| * only the {@link LoopHeadOperator} is run the last time.</p> |
| * |
| * @return the {@link OptimizationContext} for each iteration; order by execution order |
| */ |
| public List<OptimizationContext> getIterationContexts() { |
| return this.iterationContexts; |
| } |
| |
| public OptimizationContext getIterationContext(int iteration) { |
| return this.iterationContexts.get(iteration); |
| } |
| |
| /** |
| * @return the {@link OptimizationContext} in that the {@link LoopSubplan} resides |
| */ |
| public OptimizationContext getOptimizationContext() { |
| return this.getLoopSubplanContext().getOptimizationContext(); |
| } |
| |
| public OptimizationContext getInitialIterationContext() { |
| return this.iterationContexts.get(0); |
| } |
| |
| public OptimizationContext getFinalIterationContext() { |
| return this.iterationContexts.get(this.iterationContexts.size() - 1); |
| } |
| |
| /** |
| * Add a new iteration {@link OptimizationContext} between second-to-last and final iteration. |
| * |
| * @return the added {@link OptimizationContext} |
| */ |
| public OptimizationContext appendIterationContext() { |
| // Shift the final iteration context by one. |
| final OptimizationContext finalIterationContext = this.getFinalIterationContext(); |
| this.iterationContexts.add(finalIterationContext); |
| finalIterationContext.iterationNumber++; |
| |
| // Copy the second-to-last iteration context. |
| OptimizationContext oldSecondToLastIterationContext = this.getIterationContext(this.iterationContexts.size() - 3); |
| OptimizationContext newSecondToLastIterationContext = ((DefaultOptimizationContext) oldSecondToLastIterationContext).copy(); |
| newSecondToLastIterationContext.iterationNumber++; |
| |
| // Insert and return the copied iteration context. |
| this.iterationContexts.set(iterationContexts.size() - 2, newSecondToLastIterationContext); |
| return newSecondToLastIterationContext; |
| } |
| |
| public LoopSubplan getLoop() { |
| return (LoopSubplan) this.loopSubplanContext.getOperator(); |
| } |
| |
| public AggregateOptimizationContext getAggregateContext() { |
| if (this.aggregateOptimizationContext == null) { |
| this.aggregateOptimizationContext = new AggregateOptimizationContext(this); |
| } |
| return this.aggregateOptimizationContext; |
| } |
| } |
| |
| |
| } |