blob: 649a121462dd1cf6c083a4c73076c8ce898392d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.plan.wayangplan;
import org.apache.commons.lang3.Validate;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.mapping.PlanTransformation;
import org.apache.wayang.core.optimizer.SanityChecker;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
/**
* A Wayang plan consists of a set of {@link Operator}s.
*/
public class WayangPlan {
private final Logger logger = LogManager.getLogger(this.getClass());
private final Collection<Operator> sinks = new LinkedList<>();
private boolean isLoopsIsolated = false;
private boolean isPruned = false;
/**
* Prepares this {@link WayangPlan} for the optimization process if not already done.
*/
public void prepare() {
this.prune();
LoopIsolator.isolateLoops(this);
}
/**
* Creates a new instance and does some preprocessing (such as loop isolation).
*
* @param sinks the sinks of the new instance
*/
public WayangPlan(Operator... sinks) {
for (Operator sink : sinks) {
this.addSink(sink);
}
}
/**
* @deprecated Use {@link WayangPlan#WayangPlan(Operator...)}.
*/
@Deprecated
public void addSink(Operator sink) {
if (this.isLoopsIsolated || this.isPruned) {
throw new IllegalStateException("Too late to add more sinks.");
}
Validate.isTrue(sink.isSink(), "%s is not a sink.", sink);
Validate.isTrue(sink.getParent() == null, "%s is nested.", sink);
this.sinks.add(sink);
}
/**
* Replaces an {@code oldSink} with a {@code newSink}. As with all modifications to this instance,
* it is up to the caller to ensure that this does not destroy the integrity of this instance.
*/
public void replaceSink(Operator oldSink, Operator newSink) {
if (oldSink == newSink) return;
assert newSink.isSink();
assert oldSink.isSink();
if (this.sinks.remove(oldSink)) {
assert newSink.getParent() == null;
this.sinks.add(newSink);
}
}
public Collection<Operator> getSinks() {
return this.sinks;
}
/**
* Find the source {@link Operator}s that are reachable from the sinks.
*
* @return the reachable sources, only top-level operators are considered
* @see #getSinks()
*/
public Collection<Operator> collectReachableTopLevelSources() {
return PlanTraversal.upstream().traverse(this.sinks).getTraversedNodesWith(Operator::isSource);
}
/**
* Find the {@link Operator}s in this instance by their name.
*
* @param name the name of the {@link Operator}s
* @return the matching {@link Operator}s; only top-level (non-nested) ones are considered
*/
public Collection<Operator> collectTopLevelOperatorsByName(String name) {
return PlanTraversal.upstream().traverse(this.sinks).getTraversedNodesWith(
operator -> name.equals(operator.getName())
);
}
/**
* Find the {@link Operator} in this instance by their name.
*
* @param name the name of the {@link Operator}s
* @return the matching {@link Operator} or {@code null} if none
* @throws WayangException if there is more than one such {@link Operator}
*/
public Operator collectTopLevelOperatorByName(String name) throws WayangException{
return WayangCollections.getSingleOrNull(this.collectTopLevelOperatorsByName(name));
}
/**
* Prunes {@link Operator}s that do not (indirectly) contribute to a sink.
*/
public void prune() {
if (this.isPruned) return;
final Set<Operator> reachableOperators = new HashSet<>();
PlanTraversal.upstream()
.withCallback((operator, input, output) -> {
reachableOperators.add(operator);
if (!operator.isElementary()) {
this.logger.warn("Not yet considering nested operators during Wayang plan pruning.");
}
})
.traverse(this.sinks);
PlanTraversal.upstream()
.withCallback(operator -> this.pruneUnreachableSuccessors(operator, reachableOperators))
.traverse(this.sinks);
this.isPruned = true;
}
/**
* Prune any successor {@link Operator}s of the {@code baseOperator} that are not reachable/
*/
private void pruneUnreachableSuccessors(Operator baseOperator, Set<Operator> reachableOperators) {
for (int outputIndex = 0; outputIndex < baseOperator.getNumOutputs(); outputIndex++) {
final OutputSlot<?> output = baseOperator.getOutput(outputIndex);
new ArrayList<>(output.getOccupiedSlots()).stream()
.filter(occupiedInput -> !reachableOperators.contains(occupiedInput.getOwner()))
.forEach(occupiedInput -> {
this.logger.warn("Pruning unreachable {} from Wayang plan.", occupiedInput.getOwner());
output.unchecked().disconnectFrom(occupiedInput.unchecked());
});
}
}
/**
* Apply all available transformations in the {@code configuration} to this instance.
*/
public void applyTransformations(Collection<PlanTransformation> transformations) {
boolean isAnyChange;
int epoch = Operator.FIRST_EPOCH;
do {
epoch++;
final int numTransformations = this.applyAndCountTransformations(transformations, epoch);
this.logger.debug("Applied {} transformations in epoch {}.", numTransformations, epoch);
isAnyChange = numTransformations > 0;
} while (isAnyChange);
}
/**
* Apply all {@code transformations} to the {@code plan}.
*
* @param transformations transformations to apply
* @param epoch the new epoch
* @return the number of applied transformations
*/
private int applyAndCountTransformations(Collection<PlanTransformation> transformations, int epoch) {
return transformations.stream()
.mapToInt(transformation -> transformation.transform(this, epoch))
.sum();
}
/**
* Check that the given {@link WayangPlan} is as we expect it to be in the following steps.
*/
public boolean isSane() {
// We make some assumptions on the hyperplan. Make sure that they hold. After all, the transformations might
// have bugs.
return new SanityChecker(this).checkAllCriteria();
}
/**
* Tells whether potential loops within this instance have been isolated.
*/
public boolean isLoopsIsolated() {
return this.isLoopsIsolated;
}
/**
* Tells that potential loops within this instance have been isolated.
*/
public void setLoopsIsolated() {
this.isLoopsIsolated = true;
}
}