blob: 1387286015a1d3054dfb55bbfc6e81c3d725d64b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.plan.executionplan;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.plan.wayangplan.InputSlot;
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.Platform;
/**
* Serves as an adapter to include {@link ExecutionOperator}s, which are usually parts of {@link WayangPlan}s, in
* {@link ExecutionPlan}s.
*/
public class ExecutionTask {
/**
* {@link ExecutionOperator} that is being adapted by this instance.
*/
private final ExecutionOperator operator;
/**
* {@link Channel}s that this instance reads from. Align with {@link #operator}'s {@link InputSlot}s
*
* @see ExecutionOperator#getAllInputs()
*/
private final Channel[] inputChannels;
/**
* {@link Channel}s that this instance writes to. Align with {@link #operator}'s {@link OutputSlot}s
*
* @see ExecutionOperator#getAllOutputs()
*/
private final Channel[] outputChannels;
/**
* Captures if this instance is part of a {@link ExecutionStage} and of which.
*/
private ExecutionStage stage;
public ExecutionTask(ExecutionOperator operator) {
this(operator, operator.getNumInputs(), operator.getNumOutputs());
}
public ExecutionTask(ExecutionOperator operator, int numInputChannels, int numOutputChannels) {
this.operator = operator;
this.inputChannels = new Channel[numInputChannels];
this.outputChannels = new Channel[numOutputChannels];
}
public ExecutionOperator getOperator() {
return this.operator;
}
public Channel[] getInputChannels() {
return this.inputChannels;
}
public int getNumInputChannels() {
return this.getInputChannels().length;
}
public Channel getInputChannel(int index) {
return this.getInputChannels()[index];
}
/**
* Sets an input {@link Channel} for this instance. Consider using {@link Channel#addConsumer(ExecutionTask, int)}
* instead.
*/
void setInputChannel(int index, Channel channel) {
assert channel == null || this.getInputChannel(index) == null
: String.format("Cannot set up %s for %s@%d: There is already %s.",
channel, this.getOperator(), index, this.getInputChannel(index));
this.getInputChannels()[index] = channel;
}
/**
* Exchanges input {@link Channel}. Will also update the {@link Channel}'s consumers appropriately.
*/
public void exchangeInputChannel(Channel currentChannel, Channel newChannel) {
for (int inputIndex = 0; inputIndex < this.getNumInputChannels(); inputIndex++) {
if (this.getInputChannel(inputIndex) == currentChannel) {
currentChannel.getConsumers().remove(this);
this.setInputChannel(inputIndex, null);
newChannel.addConsumer(this, inputIndex);
return;
}
}
throw new IllegalArgumentException(String.format("%s is not an input of %s.", currentChannel, this));
}
public Channel[] getOutputChannels() {
return this.outputChannels;
}
public int getNumOuputChannels() {
return this.getOutputChannels().length;
}
public Channel getOutputChannel(int index) {
return this.getOutputChannels()[index];
}
/**
* Removes the given {@link Channel} as output of this instance.
*
* @return the former output index the {@link Channel}
*/
public int removeOutputChannel(Channel outputChannel) {
int outputIndex;
for (outputIndex = 0; outputIndex < this.getNumOuputChannels(); outputIndex++) {
if (this.getOutputChannel(outputIndex) == outputChannel) {
this.getOutputChannels()[outputIndex] = null;
outputChannel.setProducer(null);
return outputIndex;
}
}
throw new IllegalArgumentException(String.format("%s is not an output of %s.", outputChannel, this));
}
/**
* Removes the given {@link Channel} as input of this instance.
*
* @return the former input index the {@link Channel}
*/
public int removeInputChannel(Channel inputChannel) {
int inputIndex;
for (inputIndex = 0; inputIndex < this.getNumInputChannels(); inputIndex++) {
if (this.getInputChannel(inputIndex) == inputChannel) {
this.getInputChannels()[inputIndex] = null;
inputChannel.getConsumers().remove(this);
return inputIndex;
}
}
throw new IllegalArgumentException(String.format("%s is not an input of %s.", inputChannel, this));
}
public Channel initializeOutputChannel(int index, Configuration configuration) {
final ChannelDescriptor channelDescriptor = this.operator.getOutputChannelDescriptor(index);
final OutputSlot<?> output = this.operator.getNumOutputs() == 0 ? null : this.operator.getOutput(index);
final Channel channel = channelDescriptor.createChannel(output, configuration);
this.setOutputChannel(index, channel);
return channel;
}
/**
* Sets an output {@link Channel} for this instance.
*/
public void setOutputChannel(int index, Channel channel) {
assert this.getOutputChannel(index) == null : String.format("Output channel %d of %s is already set to %s.",
index, this, this.getOutputChannel(index));
this.getOutputChannels()[index] = channel;
channel.setProducer(this);
}
public ExecutionStage getStage() {
return this.stage;
}
public void setStage(ExecutionStage stage) {
this.stage = stage;
}
@Override
public String toString() {
return "T[" + this.operator+ " Platform: {"+this.stage.getPlatformExecution()+ "}]";
// return this.getClass().getSimpleName() + "[" + this.operator + ']';
}
/**
* Returns the {@link OutputSlot} of the {@link ExecutionOperator} that is associated to the given {@link Channel}.
*
* @return the {@link OutputSlot} or {@code null} if none
*/
public OutputSlot<?> getOutputSlotFor(Channel channel) {
// Simple implementation: linear search.
for (int outputIndex = 0; outputIndex < this.getNumOuputChannels(); outputIndex++) {
if (this.getOutputChannel(outputIndex) == channel) {
return outputIndex < this.getOperator().getNumOutputs() ?
this.getOperator().getOutput(outputIndex) :
null;
}
}
throw new IllegalArgumentException(String.format("%s does not belong to %s.", channel, this));
}
/**
* Returns the {@link InputSlot} of the {@link ExecutionOperator} that is associated to the given {@link Channel}.
*
* @return the {@link InputSlot} or {@code null} if none
*/
public InputSlot<?> getInputSlotFor(Channel channel) {
// Simple implementation: linear search.
for (int inputIndex = 0; inputIndex < this.getNumInputChannels(); inputIndex++) {
if (this.getInputChannel(inputIndex) == channel) {
return inputIndex < this.getOperator().getNumInputs() ?
this.getOperator().getInput(inputIndex) :
null;
}
}
throw new IllegalArgumentException(String.format("%s does not belong to %s.", channel, this));
}
/**
* Determines whether the given input {@link Channel} implements a feedback {@link InputSlot} of the enclosed
* {@link ExecutionOperator}.
*
* @param inputChannel the {@link Channel}
* @return whether it implements a feedback {@link InputSlot}
* @see InputSlot#isFeedback()
*/
public boolean isFeedbackInput(Channel inputChannel) {
// Check if we have a LoopHeadOperator in this instance.
final ExecutionOperator operator = this.getOperator();
if (!operator.isLoopHead()) return false;
// Check whether and to which InputSlot the inputChannel corresponds.
final InputSlot<?> input = this.getInputSlotFor(inputChannel);
if (input == null) return false;
// Determine if that InputSlot is a feedback InputSlot.
return input.isFeedback();
}
/**
* @return the {@link Platform} for the encased {@link ExecutionOperator}
*/
public Platform getPlatform() {
return this.operator.getPlatform();
}
}