| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.wayang.core.optimizer.cardinality; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.stream.Stream; |
| import org.apache.commons.lang3.Validate; |
| import org.apache.wayang.core.api.Configuration; |
| import org.apache.wayang.core.optimizer.OptimizationContext; |
| import org.apache.wayang.core.plan.wayangplan.InputSlot; |
| import org.apache.wayang.core.plan.wayangplan.Operator; |
| import org.apache.wayang.core.plan.wayangplan.OperatorContainer; |
| import org.apache.wayang.core.plan.wayangplan.OutputSlot; |
| import org.apache.wayang.core.plan.wayangplan.PlanTraversal; |
| import org.apache.wayang.core.util.OneTimeExecutable; |
| import org.apache.wayang.core.util.WayangCollections; |
| |
| /** |
| * {@link CardinalityEstimator} that subsumes a DAG of operators, each one providing a local {@link CardinalityEstimator}. |
| */ |
| public class CardinalityEstimationTraversal { |
| |
| private final Collection<Activation> inputActivations; |
| |
| private final Collection<? extends Activator> sourceActivators; |
| |
| /** |
| * Create an instance that pushes {@link CardinalityEstimate}s through a data flow plan starting at the given |
| * {@code inputSlots} and {@code sourceOperators}, thereby putting {@link CardinalityEstimate}s into the |
| * {@code cache}. |
| * |
| * @param operatorContainer that should be traversed |
| * @param configuration provides utilities for the estimation |
| */ |
| public static CardinalityEstimationTraversal createPushTraversal(OperatorContainer operatorContainer, Configuration configuration) { |
| if (operatorContainer.isSource()) { |
| return createPushTraversal( |
| Collections.emptyList(), |
| Collections.singleton(operatorContainer.getSource()), |
| configuration |
| ); |
| } else { |
| return createPushTraversal( |
| operatorContainer.getMappedInputs(), |
| Collections.emptyList(), |
| configuration |
| ); |
| } |
| } |
| |
| /** |
| * Create an instance that pushes {@link CardinalityEstimate}s through a data flow plan starting at the given |
| * {@code inputSlots} and {@code sourceOperators}, thereby putting {@link CardinalityEstimate}s into the |
| * {@code cache}. |
| * |
| * @param inputSlots open {@link InputSlot}s that will be initially activated |
| * @param sourceOperators {@link Operator} that will be initially activated |
| * @param configuration provides utilties for the estimation |
| */ |
| public static CardinalityEstimationTraversal createPushTraversal(Collection<InputSlot<?>> inputSlots, |
| Collection<Operator> sourceOperators, |
| Configuration configuration) { |
| return createPushTraversal(inputSlots, Collections.emptySet(), sourceOperators, configuration); |
| } |
| |
| |
| /** |
| * Create an instance that pushes {@link CardinalityEstimate}s through a data flow plan starting at the given |
| * {@code inputSlots} and {@code sourceOperators}, thereby putting {@link CardinalityEstimate}s into the |
| * {@code cache}. |
| * |
| * @param inputSlots open {@link InputSlot}s that will be initially activated |
| * @param borderInputSlots that will not be followed; they are terminal in addition to {@link OutputSlot}s that |
| * have no occupied {@link InputSlot}s |
| * @param sourceOperators {@link Operator} that will be initially activated |
| * @param configuration provides utilties for the estimation |
| */ |
| public static CardinalityEstimationTraversal createPushTraversal(Collection<InputSlot<?>> inputSlots, |
| Collection<InputSlot<?>> borderInputSlots, |
| Collection<Operator> sourceOperators, |
| Configuration configuration) { |
| Validate.notNull(inputSlots); |
| Validate.notNull(sourceOperators); |
| Validate.notNull(configuration); |
| |
| // Starting from the an output, find all required inputs. |
| return new Builder(inputSlots, borderInputSlots, sourceOperators, configuration).build(); |
| } |
| |
| |
| /** |
| * Creates a new instance. |
| * |
| * @param inputActivations {@link Activation}s that will be satisfied by the parameters of |
| * {@link CardinalityEstimator#estimate(Configuration, CardinalityEstimate...)} }; |
| * the indices of the {@link Activation}s match those |
| * of the {@link CardinalityEstimate}s |
| * @param sourceActivators {@link Activator}s of source {@link CardinalityEstimator} |
| */ |
| private CardinalityEstimationTraversal(final Collection<Activation> inputActivations, |
| Collection<? extends Activator> sourceActivators) { |
| this.inputActivations = inputActivations; |
| this.sourceActivators = sourceActivators; |
| } |
| |
| /** |
| * Traverse and update {@link CardinalityEstimate}s. |
| * |
| * @param optimizationContext provides input {@link CardinalityEstimate}s and stores all produces |
| * {@link CardinalityEstimate}s alongside the push traversal |
| * @param configuration provides the applicable {@link Configuration} |
| * @return whether any {@link CardinalityEstimate}s have been updated |
| */ |
| public boolean traverse(OptimizationContext optimizationContext, Configuration configuration) { |
| boolean isUpdated = false; |
| try { |
| final Queue<Activator> activators = this.initializeActivatorQueue(); |
| do { |
| assert !activators.isEmpty() : String.format("No source activators. (input activations: %s)", this.inputActivations); |
| final Activator activator = activators.poll(); |
| isUpdated |= activator.process(optimizationContext, configuration, activators); |
| } while (!activators.isEmpty()); |
| } finally { |
| this.reset(); |
| } |
| return isUpdated; |
| } |
| |
| /** |
| * Set up a queue of initial {@link Activator}s for an estimation pass. |
| */ |
| private Queue<Activator> initializeActivatorQueue() { |
| Queue<Activator> activatedActivators = new LinkedList<>(this.sourceActivators); |
| this.inputActivations.forEach(activation -> activation.fire(activatedActivators)); |
| return activatedActivators; |
| } |
| |
| /** |
| * Resets this instance, so that it perform a new traversal. |
| */ |
| private void reset() { |
| this.resetAll(Stream.concat( |
| this.inputActivations.stream().map(Activation::getTargetActivator), |
| this.sourceActivators.stream() |
| )); |
| } |
| |
| private void resetAll(Stream<Activator> activatorStream) { |
| activatorStream |
| .filter(Activator::reset) |
| .flatMap(Activator::getAllDependentActivations) |
| .map(Activation::getTargetActivator) |
| .forEach(this::resetDownstream); |
| } |
| |
| private void resetDownstream(Activator activator) { |
| this.resetAll(Stream.of(activator)); |
| } |
| |
| /** |
| * Wraps a {@link CardinalityEstimator}, thereby caching its input {@link CardinalityEstimate}s and keeping track |
| * of its dependent {@link CardinalityEstimator}s. |
| */ |
| private static class Activator { |
| |
| private final boolean[] isActivated; |
| |
| private final CardinalityPusher pusher; |
| |
| private final Collection<Activation>[] dependentActivations; |
| |
| /** |
| * The {@link Operator} being wrapped by this instance. |
| */ |
| private final Operator operator; |
| |
| @SuppressWarnings("unchecked") |
| Activator(Operator operator, Configuration configuration) { |
| this.operator = operator; |
| this.isActivated = new boolean[operator.getNumInputs()]; |
| this.dependentActivations = new Collection[operator.getNumOutputs()]; |
| for (int outputIndex = 0; outputIndex < this.dependentActivations.length; outputIndex++) { |
| this.dependentActivations[outputIndex] = new ArrayList<>(2); |
| } |
| this.pusher = operator.getCardinalityPusher(configuration); |
| } |
| |
| /** |
| * Execute this instance, thereby activating new instances and putting them on the queue. |
| * |
| * @param optimizationContext the current {@link OptimizationContext} in which the push should take place |
| * @param activatorQueue accepts newly activated {@link CardinalityEstimator}s |
| */ |
| boolean process(OptimizationContext optimizationContext, Configuration configuration, Queue<Activator> activatorQueue) { |
| OptimizationContext.OperatorContext opCtx = optimizationContext.getOperatorContext(this.operator); |
| assert opCtx != null : String.format("Could not find OperatorContext for %s.", this.operator); |
| |
| // Do the local estimation. |
| boolean isUpdated = this.pusher.push(opCtx, configuration); |
| opCtx.pushCardinalitiesForward(); |
| |
| for (int outputIndex = 0; outputIndex < this.operator.getNumOutputs(); outputIndex++) { |
| // Trigger follow-up operators. |
| this.processDependentActivations(this.dependentActivations[outputIndex], activatorQueue); |
| } |
| |
| return isUpdated; |
| } |
| |
| /** |
| * Triggers the {@link #dependentActivations} and puts newly activated {@link Activator}s onto the |
| * {@code activatorQueue}. |
| */ |
| private void processDependentActivations(Collection<Activation> activations, Queue<Activator> activatorQueue) { |
| // Otherwise, we update/activate the dependent estimators. |
| activations.forEach(activation -> activation.fire(activatorQueue)); |
| } |
| |
| /** |
| * Tells whether all input {@link Activation}s have fired. |
| */ |
| boolean canBeActivated() { |
| for (boolean isActivated : this.isActivated) { |
| if (!isActivated) return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Resets this instance. |
| * |
| * @return whether this instance was not already reset |
| */ |
| boolean reset() { |
| boolean isStateChanged = false; |
| for (int inputIndex = 0; inputIndex < this.isActivated.length; inputIndex++) { |
| isStateChanged |= this.isActivated[inputIndex]; |
| this.isActivated[inputIndex] = false; |
| } |
| return isStateChanged || this.isActivated.length == 0; |
| } |
| |
| |
| protected Activation createActivation(int inputIndex) { |
| return new Activation(inputIndex, this); |
| } |
| |
| |
| protected Stream<Activation> getAllDependentActivations() { |
| return Arrays.stream(this.dependentActivations).flatMap(Collection::stream); |
| } |
| |
| protected Collection<Activation> getDependentActivations(OutputSlot<?> outputSlot) { |
| return this.dependentActivations[outputSlot.getIndex()]; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder activations = new StringBuilder(this.isActivated.length); |
| for (boolean b : this.isActivated) { |
| activations.append(b ? "|" : "-"); |
| } |
| return String.format("%s[%s, %s]", this.getClass().getSimpleName(), this.operator, activations); |
| } |
| } |
| |
| |
| /** |
| * Describes a reference to an input of an {@link Activator}. |
| */ |
| private static class Activation { |
| |
| /** |
| * The input index on which the {@link #activator} will be activated. |
| */ |
| final int inputIndex; |
| |
| /** |
| * The {@link Activator} to be activated by this instance. |
| */ |
| final Activator activator; |
| |
| public Activation(int inputIndex, Activator activator) { |
| this.activator = activator; |
| this.inputIndex = inputIndex; |
| } |
| |
| public Activator getTargetActivator() { |
| return this.activator; |
| } |
| |
| public void fire(Queue<Activator> activatorQueue) { |
| assert !this.activator.isActivated[this.inputIndex] |
| : String.format("%s is already activated at input %d.", this.activator.operator, this.inputIndex); |
| this.activator.isActivated[this.inputIndex] = true; |
| if (this.activator.canBeActivated()) { |
| activatorQueue.add(this.activator); |
| } |
| } |
| } |
| |
| /** |
| * Utility to create a {@link CardinalityEstimationTraversal}, |
| */ |
| private static class Builder extends OneTimeExecutable { |
| |
| /** |
| * {@link Configuration} that provides crucial information for the {@link CardinalityEstimate}s to be created. |
| */ |
| final Configuration configuration; |
| |
| /** |
| * {@link InputSlot}s that will provide initial {@link CardinalityEstimate}s. |
| */ |
| final Collection<InputSlot<?>> inputSlots; |
| |
| /** |
| * {@link InputSlot}s that will not be followed; they are terminal. Note that these are not necessarily |
| * the only {@link OutputSlot}s in the created {@link CardinalityEstimationTraversal} that will not be |
| * followed -- some {@link OutputSlot}s might not have occupied {@link InputSlot}s. |
| */ |
| final Set<InputSlot<?>> borderInputSlots; |
| |
| /** |
| * Source {@link Operator}s that should be part of the pushing. |
| */ |
| private final Collection<Operator> sourceOperators; |
| |
| /** |
| * Keeps {@link Activator}s around that have already been created. |
| */ |
| private Map<Operator, Activator> createdActivators = new HashMap<>(); |
| |
| /** |
| * The finally build {@link CardinalityEstimationTraversal}. |
| */ |
| private CardinalityEstimationTraversal result; |
| |
| /** |
| * Creates a new instance. |
| * |
| * @param inputSlots see {@link #inputSlots} |
| * @param borderInputSlots see {@link #borderInputSlots} |
| * @param sourceOperators see {@link #sourceOperators} |
| * @param configuration see {@link #configuration} |
| */ |
| private Builder(Collection<InputSlot<?>> inputSlots, Collection<InputSlot<?>> borderInputSlots, Collection<Operator> sourceOperators, Configuration configuration) { |
| this.inputSlots = inputSlots; |
| this.borderInputSlots = WayangCollections.asSet(borderInputSlots); |
| this.configuration = configuration; |
| this.sourceOperators = sourceOperators; |
| } |
| |
| /** |
| * Build the {@link CardinalityEstimationTraversal}. |
| * |
| * @return the build {@link CardinalityEstimationTraversal} |
| */ |
| CardinalityEstimationTraversal build() { |
| this.execute(); |
| return this.result; |
| } |
| |
| /** |
| * Builds an instance starting from {@link InputSlot}s and source {@link Operator}. |
| */ |
| @Override |
| public void doExecute() { |
| Set<InputSlot<?>> distinctInputs = new HashSet<>(this.inputSlots); |
| |
| // Go through all relevant operators of and create EstimatorActivators. |
| PlanTraversal.downstream() |
| .withCallback(this::addAndRegisterActivator) |
| .followingInputsDownstreamIf(input -> !this.borderInputSlots.contains(input)) |
| .traverse(this.sourceOperators) |
| .traverse(distinctInputs.stream().map(InputSlot::getOwner)); |
| |
| // Gather the required activations. |
| final Collection<Activation> requiredActivations = new LinkedList<>(); |
| for (InputSlot<?> inputSlot : distinctInputs) { |
| final Operator owner = inputSlot.getOwner(); |
| final Activator activator = this.createdActivators.get(owner); |
| if (activator != null) { |
| requiredActivations.add(activator.createActivation(inputSlot.getIndex())); |
| } |
| } |
| |
| // Gather the source activators. |
| Collection<Activator> sourceActivators = new LinkedList<>(); |
| for (Operator source : this.sourceOperators) { |
| final Activator activator = this.createdActivators.get(source); |
| if (activator != null) { |
| sourceActivators.add(activator); |
| } |
| } |
| |
| this.result = new CardinalityEstimationTraversal(requiredActivations, sourceActivators); |
| } |
| |
| /** |
| * If there is no registered {@link Activator} for the {@code operator} and/or no |
| * {@link Activation}, then these will be created, registered, and connected. |
| * |
| * @param operator |
| */ |
| private void addAndRegisterActivator(Operator operator) { |
| // The operator should not have been processed yet. |
| assert !this.createdActivators.containsKey(operator); |
| |
| // Otherwise, try to create the activator. |
| Activator activator = this.createActivator(operator); |
| |
| // Register existing dependent activators. |
| for (OutputSlot<?> outputSlot : operator.getAllOutputs()) { |
| this.registerDependentActivations(outputSlot, activator); |
| } |
| |
| // Register with required activators. |
| this.registerAsDependentActivation(activator); |
| |
| } |
| |
| /** |
| * @return the {@link Activator} that is associated to the owner of the {@code outputSlot} |
| */ |
| protected Activator getCachedActivator(OutputSlot<?> outputSlot) { |
| return this.createdActivators.get(outputSlot.getOwner()); |
| } |
| |
| /** |
| * Create and register an {@link Activator} for the {@code operator}. |
| * |
| * @return the newly created {@link Activator} |
| */ |
| protected Activator createActivator(Operator operator) { |
| final Activator pusherActivator = new Activator(operator, this.configuration); |
| this.createdActivators.put(operator, pusherActivator); |
| return pusherActivator; |
| } |
| |
| /** |
| * Connect the {@code activator} with already existing {@link Activator}s that are fed by the {@code outputSlot} |
| * via a new {@link Activation}. |
| */ |
| protected void registerDependentActivations(OutputSlot<?> outputSlot, Activator activator) { |
| for (InputSlot<?> inputSlot : outputSlot.getOccupiedSlots()) { |
| Arrays.stream(inputSlot.getOwner().getAllOutputs()) |
| .map(this::getCachedActivator) |
| .filter(Objects::nonNull) |
| .map(dependentActivator -> dependentActivator.createActivation(inputSlot.getIndex())) |
| .forEach(activator.getDependentActivations(outputSlot)::add); |
| } |
| } |
| |
| /** |
| * Connect the {@code activator} with already existing {@link Activator}s that are fed by the {@code outputSlot} |
| * via a new {@link Activation}. |
| */ |
| protected void registerAsDependentActivation(Activator activator) { |
| for (InputSlot<?> inputSlot : activator.operator.getAllInputs()) { |
| final OutputSlot<?> occupant = inputSlot.getOccupant(); |
| if (Objects.isNull(occupant)) { |
| continue; |
| } |
| final Activator requiredActivator = this.getCachedActivator(occupant); |
| if (requiredActivator == null) { |
| continue; |
| } |
| requiredActivator.getDependentActivations(occupant).add(activator.createActivation(inputSlot.getIndex())); |
| } |
| } |
| } |
| |
| } |