blob: d4c97a807e861d6d6af67b6992568c98d783981a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.optimizer.channels;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.plan.executionplan.Channel;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
import org.apache.wayang.core.platform.ChannelDescriptor;
import java.util.Collection;
import java.util.function.BiFunction;
import java.util.function.Supplier;
/**
* Default implementation of the {@link ChannelConversion}. Can be used without further subclassing.
*/
public class DefaultChannelConversion extends ChannelConversion {
private final BiFunction<Channel, Configuration, ExecutionOperator> executionOperatorFactory;
/**
* For debug purposes.
*/
private final String name;
public DefaultChannelConversion(
ChannelDescriptor sourceChannelDescriptor,
ChannelDescriptor targetChannelDescriptor,
Supplier<ExecutionOperator> executionOperatorFactory) {
this(
sourceChannelDescriptor,
targetChannelDescriptor,
executionOperatorFactory,
"via " + executionOperatorFactory.get().getClass().getSimpleName()
);
}
public DefaultChannelConversion(
ChannelDescriptor sourceChannelDescriptor,
ChannelDescriptor targetChannelDescriptor,
Supplier<ExecutionOperator> executionOperatorFactory,
String name) {
this(
sourceChannelDescriptor,
targetChannelDescriptor,
(sourceChannel, configuration) -> executionOperatorFactory.get(),
name
);
}
public DefaultChannelConversion(
ChannelDescriptor sourceChannelDescriptor,
ChannelDescriptor targetChannelDescriptor,
BiFunction<Channel, Configuration, ExecutionOperator> executionOperatorFactory,
String name) {
super(sourceChannelDescriptor, targetChannelDescriptor);
this.executionOperatorFactory = executionOperatorFactory;
this.name = name;
}
@Override
public Channel convert(Channel sourceChannel,
Configuration configuration,
Collection<OptimizationContext> optimizationContexts,
CardinalityEstimate optCardinality) {
// Create the ExecutionOperator.
final ExecutionOperator executionOperator = this.executionOperatorFactory.apply(sourceChannel, configuration);
assert executionOperator.getNumInputs() <= 1 && executionOperator.getNumOutputs() <= 1;
executionOperator.setAuxiliary(true);
// Set up the Channels and the ExecutionTask.
final ExecutionTask task = new ExecutionTask(executionOperator, 1, 1);
sourceChannel.addConsumer(task, 0);
final Channel outputChannel = task.initializeOutputChannel(0, configuration);
sourceChannel.addSibling(outputChannel);
setCardinalityAndTimeEstimates(sourceChannel, optimizationContexts, optCardinality, task);
return outputChannel;
}
@Override
public void update(Channel sourceChannel,
Channel targetChannel,
Collection<OptimizationContext> optimizationContexts,
CardinalityEstimate cardinality) {
ExecutionTask conversionTask = targetChannel.getProducer();
this.setCardinalityAndTimeEstimates(sourceChannel, optimizationContexts, cardinality, conversionTask);
}
/**
* Update the key figure estimates for the given {@link ExecutionTask}.
*
* @param sourceChannel provides the {@link CardinalityEstimate}
* @param optimizationContexts in which the estimates should be updates; also provides the estimates for the {@code sourceChannel}
* @param optCardinality overrides the {@link CardinalityEstimate} or else {@code null}
* @param task whose key figure estimates should be updated
*/
private void setCardinalityAndTimeEstimates(Channel sourceChannel,
Collection<OptimizationContext> optimizationContexts,
CardinalityEstimate optCardinality,
ExecutionTask task) {
// Enrich the optimizationContexts.
for (OptimizationContext optimizationContext : optimizationContexts) {
final CardinalityEstimate cardinality = optCardinality == null ?
this.determineCardinality(sourceChannel, optimizationContext) :
optCardinality;
this.setCardinalityAndTimeEstimate(task, optimizationContext, cardinality);
}
}
/**
* Try to extract the {@link CardinalityEstimate} for a {@link Channel} within an {@link OptimizationContext}.
*
* @param channel whose {@link CardinalityEstimate} is sought
* @param optimizationContext provides {@link CardinalityEstimate}s
* @return the {@link CardinalityEstimate}
*/
private CardinalityEstimate determineCardinality(Channel channel, OptimizationContext optimizationContext) {
final ExecutionOperator sourceOperator = channel.getProducerOperator();
final OptimizationContext.OperatorContext sourceOpCtx = optimizationContext.getOperatorContext(sourceOperator);
assert sourceOpCtx != null : String.format("No OperatorContext found for %s.", sourceOperator);
final OutputSlot<?> producerSlot = channel.getProducerSlot();
if (producerSlot != null) {
return sourceOpCtx.getOutputCardinality(producerSlot.getIndex());
} else if (sourceOperator.getNumInputs() == 1) {
return sourceOpCtx.getInputCardinality(0);
}
throw new IllegalStateException(String.format("Could not determine cardinality of %s.", channel));
}
/**
* Sets the {@link CardinalityEstimate}s for a conversion {@link ExecutionTask} in a given {@link OptimizationContext}.
*
* @param conversionTask the conversion {@link ExecutionTask} that should have at most one input and one output
* @param optimizationContext stores the {@link CardinalityEstimate}
* @param cardinality the {@link CardinalityEstimate}
*/
private void setCardinalityAndTimeEstimate(ExecutionTask conversionTask,
OptimizationContext optimizationContext,
CardinalityEstimate cardinality) {
final ExecutionOperator operator = conversionTask.getOperator();
final OptimizationContext.OperatorContext operatorContext = optimizationContext.addOneTimeOperator(operator);
if (operator.getNumInputs() > 0) {
assert operator.getNumInputs() == 1;
operatorContext.setInputCardinality(0, cardinality);
}
if (operator.getNumOutputs() > 0) {
assert operator.getNumOutputs() == 1;
operatorContext.setOutputCardinality(0, cardinality);
}
operatorContext.updateCostEstimate();
}
@Override
public ProbabilisticDoubleInterval estimateConversionCost(CardinalityEstimate cardinality,
int numExecutions,
OptimizationContext optimizationContext) {
// Create OperatorContext.
final ExecutionOperator executionOperator = this.executionOperatorFactory.apply(null, optimizationContext.getConfiguration());
final OptimizationContext.OperatorContext operatorContext = optimizationContext.addOneTimeOperator(executionOperator);
// Initialize cardinality and number of executions.
operatorContext.setNumExecutions(numExecutions);
this.setCardinality(operatorContext, cardinality);
// Estimate time.
operatorContext.updateCostEstimate();
return operatorContext.getCostEstimate();
}
@Override
public boolean isFiltered(CardinalityEstimate cardinality, int numExecutions, OptimizationContext optimizationContext) {
// Create OperatorContext.
final ExecutionOperator executionOperator = this.executionOperatorFactory.apply(null, optimizationContext.getConfiguration());
final OptimizationContext.OperatorContext operatorContext = optimizationContext.addOneTimeOperator(executionOperator);
// Initialize cardinality and number of executions.
operatorContext.setNumExecutions(numExecutions);
this.setCardinality(operatorContext, cardinality);
return executionOperator.isFiltered(operatorContext);
}
private void setCardinality(OptimizationContext.OperatorContext operatorContext, CardinalityEstimate cardinality) {
final int numInputs = operatorContext.getOperator().getNumInputs();
for (int inputIndex = 0; inputIndex < numInputs; inputIndex++) {
operatorContext.setInputCardinality(inputIndex, cardinality);
}
final int numOutputs = operatorContext.getOperator().getNumOutputs();
for (int outputIndex = 0; outputIndex < numOutputs; outputIndex++) {
operatorContext.setOutputCardinality(outputIndex, cardinality);
}
}
@Override
public String toString() {
return String.format("%s[%s]", this.getClass().getSimpleName(), this.name);
}
}