blob: f6e5804efc3b8ca4c02947d53d874efe1941c8ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.plan.executionplan;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.plan.wayangplan.InputSlot;
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.core.plan.wayangplan.Slot;
import org.apache.wayang.core.platform.Breakpoint;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.Executor;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.types.DataSetType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Models the data movement between to {@link ExecutionTask}s.
*/
public abstract class Channel {
protected final Logger logger = LogManager.getLogger(this.getClass());
/**
* Was used to set up this instance.
*/
private final ChannelDescriptor descriptor;
/**
* {@link OutputSlot} that creates this instance.
*/
private final OutputSlot<?> producerSlot;
/**
* Produces the data flowing through this instance.
*/
private ExecutionTask producer;
/**
* Consuming {@link ExecutionTask}s of this instance.
*/
private final List<ExecutionTask> consumers = new LinkedList<>();
/**
* Mimed instance. Nullable.
*/
private final Channel original;
/**
* Flag whether this instance should be instrumented to detect its actual cardinality.
*/
private boolean isMarkedForInstrumentation = false;
/**
* Other {@link Channel}s that represent the same {@link OutputSlot}-to-{@link InputSlot} connection from a
* {@link WayangPlan} and share properties such as {@link #getCardinalityEstimate(OptimizationContext)} and {@link #getDataSetType()}.
*/
private Set<Channel> siblings = new HashSet<>(2);
/**
* Creates a new, non-hierarchical instance and registers it with the given {@link ExecutionTask}.
*
* @param descriptor used to create this instance
*/
protected Channel(ChannelDescriptor descriptor, OutputSlot<?> producerSlot) {
this.descriptor = descriptor;
this.original = null;
this.producerSlot = producerSlot;
}
/**
* Creates a new, hierarchical instance. Mimes the {@code original}'s properties except for the {@link #consumers}.
*
* @param original the original instance whose properties will be mimed
*/
protected Channel(Channel original) {
this.descriptor = original.getDescriptor();
this.original = original.getOriginal();
assert this.original == null || !this.original.isCopy();
this.producer = original.getProducer();
this.producerSlot = original.getProducerSlot();
}
/**
* Set up a consumer {@link ExecutionTask} for this instance.
*
* @param consumer the new consumer
* @param inputIndex the input index for this instance into the consumer
*/
public void addConsumer(ExecutionTask consumer, int inputIndex) {
if (!this.consumers.contains(consumer)) {
assert this.isReusable() || this.consumers.isEmpty() :
String.format("Cannot add %s as consumer of non-reusable %s, there is already %s.",
consumer, this, this.consumers);
this.consumers.add(consumer);
consumer.setInputChannel(inputIndex, this);
}
}
/**
* Declares whether this is not a read-once instance.
*
* @return whether this instance can have multiple consumers
*/
public boolean isReusable() {
return this.getDescriptor().isReusable();
}
/**
* Tells whether this instance lends itself for {@link Breakpoint}s. That is particularly the case if:
* <ol>
* <li>it is produced immediately by its producer ({@link #getProducer()};</li>
* <li>the contained data are at rest;</li>
* <li>and, as a bonus, the cardinality of the data can be observed.</li>
* </ol>
*
* @return whether this instance lends itself for {@link Breakpoint}s
*/
public boolean isSuitableForBreakpoint() {
return this.getDescriptor().isSuitableForBreakpoint();
}
/**
* Tells whether this instance connects different {@link PlatformExecution}s. The answer is not necessarily
* only determined by whether this instance connects {@link ExecutionTask}s of different {@link Platform}s.
*
* @return whether this instance connects different {@link PlatformExecution}s
*/
public boolean isExecutionBreaker() {
// Default implementation: Check if all producers/consumers are from the same Platform.
Platform singlePlatform = null;
if (this.producer != null) {
singlePlatform = this.producer.getOperator().getPlatform();
}
for (ExecutionTask consumer : this.consumers) {
if (singlePlatform == null) {
singlePlatform = consumer.getOperator().getPlatform();
} else if (!singlePlatform.equals(consumer.getOperator().getPlatform())) {
return true;
}
}
return false;
}
public ExecutionTask getProducer() {
return this.producer;
}
public List<ExecutionTask> getConsumers() {
return this.consumers;
}
public CardinalityEstimate getCardinalityEstimate(OptimizationContext optimizationContext) {
return this.withSiblings(false)
.map(sibling -> {
final OutputSlot<?> output = sibling.getProducerSlot();
if (output == null) return null;
final OptimizationContext.OperatorContext operatorCtx =
optimizationContext.getOperatorContext(output.getOwner());
if (operatorCtx == null) return null;
return operatorCtx.getOutputCardinality(output.getIndex());
}).filter(Objects::nonNull)
.findAny()
.orElseThrow(() -> new IllegalStateException(String.format(
"No CardinalityEstimate for %s (available: %s).",
this, optimizationContext.getLocalOperatorContexts()
)));
}
public boolean isMarkedForInstrumentation() {
return this.withSiblings(false).anyMatch(sibling -> sibling.isMarkedForInstrumentation);
}
/**
* Creates a {@link Stream} of this instance and its siblings. The sibling relationship must not be altered
* while processing the {@link Stream}.
*
* @return the {@link Stream}
*/
public Stream<Channel> withSiblings() {
return this.withSiblings(false);
}
/**
* Creates a {@link Stream} of this instance and its siblings.
*
* @param isWithConcurrentModification whether {@link #siblings} may be modificated while processing the {@link Stream}
* @return the {@link Stream}
*/
private Stream<Channel> withSiblings(boolean isWithConcurrentModification) {
return Stream.concat(
Stream.of(this),
(isWithConcurrentModification ? new ArrayList<>(this.siblings) : this.siblings).stream()
);
}
public void markForInstrumentation() {
this.withSiblings(false).forEach(channel -> {
channel.isMarkedForInstrumentation = true;
LogManager.getLogger(this.getClass()).debug("Marked {} for instrumentation.", channel);
});
}
@Override
public String toString() {
return String.format("%s[%s->%s]",
this.getClass().getSimpleName(),
this.getProducer() == null ? this.getProducerSlot() : this.getProducer(),
this.getConsumers());
}
/**
* Acquaints the given instance with this instance and all existing {@link #siblings}.
*/
public void addSibling(Channel sibling) {
if (sibling == this) return;
this.withSiblings(true).forEach(olderSibling -> olderSibling.relateTo(sibling));
}
/**
* Detaches this instance from all its {@link #siblings}.
*/
public void removeSiblings() {
this.removeSiblingsWhere((channel) -> true);
}
/**
* Detaches this instance from all its {@link #siblings}.
*/
public void removeSiblingsWhere(Predicate<Channel> condition) {
// Detach with siblings.
List<Channel> removedSiblings = new LinkedList<>();
for (Iterator<Channel> i = this.siblings.iterator(); i.hasNext(); ) {
final Channel sibling = i.next();
if (condition.test(sibling)) {
i.remove();
sibling.siblings.remove(this);
removedSiblings.add(sibling);
}
}
// Bring the lingering and former siblings into a consistent state.
for (Channel sibling : this.siblings) {
sibling.siblings.removeAll(removedSiblings);
}
for (Channel removedSibling : removedSiblings) {
removedSibling.siblings.removeAll(this.siblings);
}
}
/**
* Makes this and the given instance siblings.
*/
private void relateTo(Channel sibling) {
this.siblings.add(sibling);
sibling.siblings.add(this);
}
/**
* @return all {@link InputSlot}s and {@link OutputSlot}s that are represented by this instance and its
* {@link #siblings}
*/
public Collection<Slot<?>> getCorrespondingSlots() {
return this.withSiblings(false)
.map(Channel::getCorrespondingSlotsLocal)
.reduce(Stream.empty(), Stream::concat)
.collect(Collectors.toList());
}
/**
* Collect the {@link OutputSlot} of the producer and the {@link InputSlot}s of the consumers that are implemented
* by this instance.
*
* @return a {@link Stream} of said {@link Slot}s
*/
private Stream<Slot<?>> getCorrespondingSlotsLocal() {
final Stream<? extends OutputSlot<?>> outputSlotStream = streamNullable(this.getProducerSlot());
final Stream<? extends InputSlot<?>> inputSlotStream
= this.consumers.stream().flatMap(consumer -> streamNullable(consumer.getInputSlotFor(this)));
return Stream.concat(inputSlotStream, outputSlotStream);
}
/**
* @return an empty {@link Stream} if given {@code null}, otherwise a singleton {@link Stream}
*/
private static <T> Stream<T> streamNullable(T nullable) {
return nullable == null ? Stream.empty() : Stream.of(nullable);
}
/**
* Scrap any consumer {@link ExecutionTask}s and sibling {@link Channel}s that are not within the given
* {@link ExecutionStage}s.
*
* @return whether consumer {@link ExecutionTask}s have been removed
*/
public boolean retain(Set<ExecutionStage> retainableStages) {
boolean isConsumersRemoved = this.consumers.removeIf(consumer -> !retainableStages.contains(consumer.getStage()));
this.removeSiblingsWhere((sibling) -> !retainableStages.contains(sibling.getProducer().getStage()));
return isConsumersRemoved;
}
/**
* Create a copy of this instance. Mimics everything apart from the consumers. Also delimits already executed
* {@link ExecutionTask}s and those that are not executed yet. Be careful when revising this invariant.
*/
public abstract Channel copy();
/**
* @return if this is not a copy, then this instance, otherwise the root original instance
*/
public Channel getOriginal() {
return this.original == null ? this : this.original;
}
/**
* Tells whether this instance is a copy. If so, it delimits already executed
* {@link ExecutionTask}s and those that are not executed yet. Be careful when revising this invariant.
*
* @see #copy()
* @see #getOriginal()
*/
public boolean isCopy() {
return this.original != null;
}
/**
* Merges this instance into the original instance ({@link #getOriginal()}.
* <p>The consumers of the original instance are cleared and replaced with the consumers of this instance.
* For all other properties, the original and this instance should agree.</p>
*/
public void mergeIntoOriginal() {
if (!this.isCopy()) return;
this.getOriginal().copyConsumersFrom(this);
this.getOriginal().adoptSiblings(this);
}
/**
* Copies the consumers of the given {@code channel} into this instance.
*/
private void copyConsumersFrom(Channel channel) {
assert channel.getOriginal() == this;
for (ExecutionTask consumer : new ArrayList<>(channel.getConsumers())) {
// We must take care not to copy back channels, that we already have in the original.
assert this.consumers.stream()
.noneMatch(existingConsumer -> existingConsumer.getOperator().equals(consumer.getOperator())) :
String.format("Conflict when copying consumers from %s (%s) to %s (%s).",
this, this.consumers,
channel, channel.getConsumers()
);
consumer.exchangeInputChannel(channel, this);
}
}
/**
* @return the {@link DataSetType} of this instance (requires a producer)
* @see #setProducer(ExecutionTask)
*/
public DataSetType<?> getDataSetType() {
return this.withSiblings(false)
.filter(sibling -> sibling.getProducerSlot() != null)
.findAny()
.orElseThrow(() -> new IllegalStateException(String.format("No DataSetType for %s.", this)))
.getProducerSlot().getType();
}
/**
* Copies the siblings of the given {@code channel} into this instance.
*/
private void adoptSiblings(Channel channel) {
for (Channel newSibling : channel.siblings) {
this.addSibling(newSibling);
}
channel.removeSiblings();
}
void setProducer(ExecutionTask producer) {
assert this.producerSlot == null || producer.getOperator() == this.producerSlot.getOwner();
this.producer = producer;
}
public ChannelDescriptor getDescriptor() {
return this.descriptor;
}
public OutputSlot<?> getProducerSlot() {
return this.producerSlot;
}
/**
* Try to obtain the {@link ExecutionOperator} producing this instance, either from a given {@link OutputSlot} or
* a {@link ExecutionTask} that was specified as producer.
*
* @return the {@link ExecutionOperator} or {@code null} if none is set up
*/
public ExecutionOperator getProducerOperator() {
if (this.producerSlot != null) {
return (ExecutionOperator) this.producerSlot.getOwner();
} else if (this.producer != null) {
return this.producer.getOperator();
}
return null;
}
public Set<Channel> getSiblings() {
return this.siblings;
}
/**
* Create a {@link ChannelInstance} for this instance.
*
* @param executor that manages the resource or {@code null} if none
*/
public abstract ChannelInstance createInstance(Executor executor,
OptimizationContext.OperatorContext producerOperatorContext,
int producerOutputIndex);
/**
* Tests for inter-stage instances.
*
* @return whether this instance connects {@link ExecutionTask}s of different {@link ExecutionStage}s.
*/
public boolean isBetweenStages() {
if (this.producer == null || this.consumers.isEmpty()) {
return false;
}
final ExecutionStage producerStage = this.producer.getStage();
if (producerStage == null) {
return false;
}
for (ExecutionTask consumer : this.consumers) {
if (consumer.getStage() != null && !producerStage.equals(consumer.getStage())) {
return true;
}
}
return false;
}
}