blob: ce3b055650141b51bec48b77fb139fb70924b4ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.plan.wayangplan;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.optimizer.costs.LoadProfile;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
import org.apache.wayang.core.plan.executionplan.Channel;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.Executor;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.platform.lineage.LazyExecutionLineageNode;
import org.apache.wayang.core.util.Tuple;
/**
* An execution operator is handled by a certain platform.
*/
public interface ExecutionOperator extends ElementaryOperator {
/**
* @return the platform that can run this operator
*/
Platform getPlatform();
/**
* @return a copy of this instance; it's {@link Slot}s will not be connected
*/
ExecutionOperator copy();
/**
* @return this instance or, if it was derived via {@link #copy()}, the original instance
*/
ExecutionOperator getOriginal();
/**
* Developers of {@link ExecutionOperator}s can provide a default {@link LoadProfileEstimator} via this method.
*
* @param configuration in which the {@link LoadProfile} should be estimated.
* @return an {@link Optional} that might contain the {@link LoadProfileEstimator} (but {@link Optional#empty()}
* by default)
*/
default Optional<LoadProfileEstimator> createLoadProfileEstimator(Configuration configuration) {
Collection<String> configurationKeys = this.getLoadProfileEstimatorConfigurationKeys();
LoadProfileEstimator mainEstimator = createLoadProfileEstimators(configuration, configurationKeys);
return Optional.ofNullable(mainEstimator);
}
/**
* Creates a {@link LoadProfileEstimator} according to the {@link Configuration}.
*
* @param configuration the {@link Configuration}
* @param configurationKeys keys for the specification within the {@link Configuration}
* @return the {@link LoadProfileEstimator} or {@code null} if none could be created
*/
static LoadProfileEstimator createLoadProfileEstimators(Configuration configuration, Collection<String> configurationKeys) {
LoadProfileEstimator mainEstimator = null;
for (String configurationKey : configurationKeys) {
final LoadProfileEstimator loadProfileEstimator = LoadProfileEstimators.createFromSpecification(configurationKey, configuration);
if (mainEstimator == null) {
mainEstimator = loadProfileEstimator;
} else {
mainEstimator.nest(loadProfileEstimator);
}
}
return mainEstimator;
}
/**
* Provide the {@link Configuration} keys for the {@link LoadProfileEstimator} specification of this instance.
*
* @return the {@link Configuration} keys
*/
default Collection<String> getLoadProfileEstimatorConfigurationKeys() {
final String singleKey = this.getLoadProfileEstimatorConfigurationKey();
return singleKey == null ?
Collections.emptyList() :
Collections.singletonList(singleKey);
}
/**
* @deprecated Use {@link #getLoadProfileEstimatorConfigurationKeys()}
*/
default String getLoadProfileEstimatorConfigurationKey() {
return null;
}
/**
* Provides a base key for configuring input and output limits.
* @return the limit base key or {@code null}
*/
default String getLimitBaseKey() {
// By default, try to infer the key.
String loadProfileEstimatorConfigurationKey = this.getLoadProfileEstimatorConfigurationKey();
if (loadProfileEstimatorConfigurationKey != null && loadProfileEstimatorConfigurationKey.endsWith(".load")) {
return loadProfileEstimatorConfigurationKey
.substring(0, loadProfileEstimatorConfigurationKey.length() - 5)
.concat(".limit");
}
return null;
}
/**
* Tells whether this instance should not be executed on the face of the given
* {@link OptimizationContext.OperatorContext}. For instance, when this instance
* might not be able to handle the amount of data.
*
* @param operatorContext an {@link OptimizationContext.OperatorContext} within which this instance might be
* executed
* @return whether this instance should <b>not</b> be used for execution
*/
default boolean isFiltered(OptimizationContext.OperatorContext operatorContext) {
assert operatorContext.getOperator() == this;
// By default, we look for configuration keys formed like this:
// <my.operator.limit.key>.<input/output name>
// If such a key exists, we compare it to values in the operatorContext.
String limitBaseKey = this.getLimitBaseKey();
if (limitBaseKey != null) {
Configuration configuration = operatorContext.getOptimizationContext().getConfiguration();
// Check the inputs.
for (InputSlot<?> input : this.getAllInputs()) {
String key = limitBaseKey + "." + input.getName();
long limit = configuration.getLongProperty(key, -1);
if (limit >= 0) {
CardinalityEstimate cardinality = operatorContext.getInputCardinality(input.getIndex());
if (cardinality != null && cardinality.getGeometricMeanEstimate() > limit) return true;
}
}
// Check the outputs.
for (OutputSlot<?> output : this.getAllOutputs()) {
String key = limitBaseKey + "." + output.getName();
long limit = configuration.getLongProperty(key, -1);
if (limit >= 0) {
CardinalityEstimate cardinality = operatorContext.getOutputCardinality(output.getIndex());
if (cardinality != null && cardinality.getGeometricMeanEstimate() > limit) return true;
}
}
}
return false;
}
/**
* Display the supported {@link Channel}s for a certain {@link InputSlot}.
*
* @param index the index of the {@link InputSlot}
* @return an {@link List} of {@link Channel}s' {@link Class}es, ordered by their preference of use
*/
List<ChannelDescriptor> getSupportedInputChannels(int index);
/**
* Display the supported {@link Channel}s for a certain {@link OutputSlot}.
*
* @param index the index of the {@link OutputSlot}
* @return an {@link List} of {@link Channel}s' {@link Class}es, ordered by their preference of use
* @see #getOutputChannelDescriptor(int)
* @deprecated {@link ExecutionOperator}s should only support a single {@link ChannelDescriptor}
*/
@Deprecated
List<ChannelDescriptor> getSupportedOutputChannels(int index);
/**
* Display the {@link Channel} used to implement a certain {@link OutputSlot}.
*
* @param index index of the {@link OutputSlot}
* @return the {@link ChannelDescriptor} for the mentioned {@link Channel}
*/
default ChannelDescriptor getOutputChannelDescriptor(int index) {
final List<ChannelDescriptor> supportedOutputChannels = this.getSupportedOutputChannels(index);
assert !supportedOutputChannels.isEmpty() : String.format("No supported output channels for %s.", this);
if (supportedOutputChannels.size() > 1) {
LogManager.getLogger(this.getClass()).warn("Treat {} as the only supported channel for {}.",
supportedOutputChannels.get(0), this.getOutput(index)
);
}
return supportedOutputChannels.get(0);
}
/**
* Create output {@link ChannelInstance}s for this instance.
*
* @param task the {@link ExecutionTask} in which this instance is being wrapped
* @param producerOperatorContext the {@link OptimizationContext.OperatorContext} for this instance
* @param inputChannelInstances the input {@link ChannelInstance}s for the {@code task}
* @return
*/
default ChannelInstance[] createOutputChannelInstances(Executor executor,
ExecutionTask task,
OptimizationContext.OperatorContext producerOperatorContext,
List<ChannelInstance> inputChannelInstances) {
assert task.getOperator() == this;
ChannelInstance[] channelInstances = new ChannelInstance[task.getNumOuputChannels()];
for (int outputIndex = 0; outputIndex < channelInstances.length; outputIndex++) {
final Channel outputChannel = task.getOutputChannel(outputIndex);
final ChannelInstance outputChannelInstance = outputChannel.createInstance(executor, producerOperatorContext, outputIndex);
channelInstances[outputIndex] = outputChannelInstance;
}
return channelInstances;
}
/**
* Models eager execution by marking all {@link LazyExecutionLineageNode}s as executed and collecting all marked ones.
*
* @param inputs the input {@link ChannelInstance}s
* @param outputs the output {@link ChannelInstance}s
* @param operatorContext the executed {@link OptimizationContext.OperatorContext}
* @return the executed {@link OptimizationContext.OperatorContext} and produced {@link ChannelInstance}s
*/
static Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> modelEagerExecution(
ChannelInstance[] inputs,
ChannelInstance[] outputs,
OptimizationContext.OperatorContext operatorContext) {
final ExecutionLineageNode executionLineageNode = new ExecutionLineageNode(operatorContext);
executionLineageNode.addAtomicExecutionFromOperatorContext();
LazyExecutionLineageNode.connectAll(inputs, executionLineageNode, outputs);
final Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> collectors;
if (outputs.length == 0) {
collectors = executionLineageNode.collectAndMark();
} else {
collectors = new Tuple<>(new LinkedList<>(), new LinkedList<>());
for (ChannelInstance output : outputs) {
output.getLineage().collectAndMark(collectors.getField0(), collectors.getField1());
}
}
return collectors;
}
/**
* Models eager execution by marking all {@link LazyExecutionLineageNode}s as executed and collecting all marked ones.
* However, the output {@link ChannelInstance}s are not yet produced.
*
* @param inputs the input {@link ChannelInstance}s
* @param outputs the output {@link ChannelInstance}s
* @param operatorContext the executed {@link OptimizationContext.OperatorContext}
* @return the executed {@link OptimizationContext.OperatorContext} and produced {@link ChannelInstance}s
*/
static Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> modelQuasiEagerExecution(
ChannelInstance[] inputs,
ChannelInstance[] outputs,
OptimizationContext.OperatorContext operatorContext) {
final ExecutionLineageNode executionLineageNode = new ExecutionLineageNode(operatorContext);
executionLineageNode.addAtomicExecutionFromOperatorContext();
LazyExecutionLineageNode.connectAll(inputs, executionLineageNode, outputs);
return executionLineageNode.collectAndMark();
}
/**
* Models lazy execution by not marking any {@link LazyExecutionLineageNode}s.
*
* @param inputs the input {@link ChannelInstance}s
* @param outputs the output {@link ChannelInstance}s
* @param operatorContext the executed {@link OptimizationContext.OperatorContext}
* @return the executed {@link OptimizationContext.OperatorContext} and produced {@link ChannelInstance}s
*/
static Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>>
modelLazyExecution(ChannelInstance[] inputs,
ChannelInstance[] outputs,
OptimizationContext.OperatorContext operatorContext) {
final ExecutionLineageNode executionLineageNode = new ExecutionLineageNode(operatorContext);
executionLineageNode.addAtomicExecutionFromOperatorContext();
LazyExecutionLineageNode.connectAll(inputs, executionLineageNode, outputs);
return new Tuple<>(Collections.emptyList(), Collections.emptyList());
}
}