blob: 64bcd93b3f89deed9f2b7b8af0aba902f80f22d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.plan.wayangplan;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.core.util.WayangCollections;
/**
* This mapping can be used to encapsulate subplans by connecting slots (usually <b>against</b> the data flow direction,
* i.e., outer output slot to inner output slot, inner input slot to outer input slot).
*/
public class SlotMapping {
private final Logger logger = LogManager.getLogger(this.getClass());
private final Map<Slot<?>, Slot<?>> upstreamMapping = new HashMap<>();
private Map<Slot<?>, Collection> downstreamMapping = null;
/**
* Create a new instance that maps all {@link Slot}s of the given {@link Operator} to themselves.
*
* @param operator that will be decorated with the new instance
* @return the new instance
*/
public static SlotMapping createIdentityMapping(Operator operator) {
return wrap(operator, operator);
}
/**
* Creates a new instance where the {@link Slot}s of the {@code wrapper} are mapped to the {@link Slot}s of the
* {@code wrappee}, thereby matching their {@link Slot}s by their index (as in {@link Operator#getInput(int)} and
* {@link Operator#getOutput(int)}).
*
* @param wrappee the inner {@link Operator}
* @param wrapper the outer {@link Operator}
* @return the new instance
*/
public static SlotMapping wrap(Operator wrappee, Operator wrapper) {
SlotMapping slotMapping = new SlotMapping();
slotMapping.mapAllUpsteam(wrapper.getAllOutputs(), wrappee.getAllOutputs());
slotMapping.mapAllUpsteam(wrappee.getAllInputs(), wrapper.getAllInputs());
return slotMapping;
}
public void mapAllUpsteam(InputSlot<?>[] sources, InputSlot<?>[] targets) {
if (sources.length != targets.length) {
throw new IllegalArgumentException(String.format("Incompatible number of input slots between %s and %s.",
Arrays.toString(sources), Arrays.toString(targets)));
}
for (int i = 0; i < sources.length; i++) {
this.mapUpstream(sources[i], targets[i]);
}
}
public void mapAllUpsteam(OutputSlot<?>[] sources, OutputSlot<?>[] targets) {
if (sources.length != targets.length) throw new IllegalArgumentException();
for (int i = 0; i < sources.length; i++) {
this.mapUpstream(sources[i], targets[i]);
}
}
public void mapUpstream(InputSlot<?> source, InputSlot<?> target) {
if (target == null) {
this.upstreamMapping.remove(source);
this.downstreamMapping = null;
return;
}
if (!source.isCompatibleWith(target)) {
throw new IllegalArgumentException(String.format("Incompatible slots given: %s -> %s", source, target));
}
this.upstreamMapping.put(source, target);
this.downstreamMapping = null;
}
public void mapUpstream(OutputSlot<?> source, OutputSlot<?> target) {
if (target == null) {
this.upstreamMapping.remove(source);
this.downstreamMapping = null;
return;
}
if (!source.isCompatibleWith(target)) {
throw new IllegalArgumentException(String.format("Incompatible slots given: %s -> %s", source, target));
}
this.upstreamMapping.put(source, target);
this.downstreamMapping = null;
}
public <T> InputSlot<T> resolveUpstream(InputSlot<T> source) {
if (source.getOccupant() != null) {
this.logger.warn("Trying to resolve (upstream) an InputSlot with an occupant.");
}
return (InputSlot<T>) this.upstreamMapping.get(source);
}
public <T> OutputSlot<T> resolveUpstream(OutputSlot<T> source) {
return (OutputSlot<T>) this.upstreamMapping.get(source);
}
public <T> Collection<InputSlot<T>> resolveDownstream(InputSlot<T> source) {
return (Collection<InputSlot<T>>) this.getOrCreateDownstreamMapping().getOrDefault(source, Collections.emptyList());
}
public <T> Collection<OutputSlot<T>> resolveDownstream(OutputSlot<T> source) {
if (!source.getOccupiedSlots().isEmpty()) {
this.logger.warn("Trying to resolve (downstream) an OutputSlot with occupiers.");
}
return (Collection<OutputSlot<T>>) this.getOrCreateDownstreamMapping().getOrDefault(source, Collections.emptyList());
}
/**
* Retrieves {@link #downstreamMapping} or creates it if it does not exist.
*
* @return {@link #downstreamMapping}
*/
private Map<Slot<?>, Collection> getOrCreateDownstreamMapping() {
if (this.downstreamMapping == null) {
Map<Slot<?>, Collection> map = new HashMap<>();
for (Entry<Slot<?>, Slot<?>> slotSlotEntry : this.upstreamMapping.entrySet()) {
Object key = slotSlotEntry.getKey();
map.computeIfAbsent(slotSlotEntry.getValue(), k -> new LinkedList<Object>())
.add(key);
}
this.downstreamMapping = map;
}
return this.downstreamMapping;
}
/**
* Replace the mappings from an old, wrapped operator with a new wrapped operator.
*
* @param oldOperator the old wrapped operator
* @param newOperator the new wrapped operator
*/
public void replaceInputSlotMappings(Operator oldOperator, Operator newOperator) {
if (oldOperator.getParent() == newOperator) {
// Default strategy: The oldOperator is now wrapped by the newOperator.
final SlotMapping oldToNewSlotMapping = oldOperator.getContainer().getSlotMapping();
for (int i = 0; i < oldOperator.getNumInputs(); i++) {
final InputSlot<?> oldInput = oldOperator.getInput(i);
assert oldInput != null : String.format("No %dth input for %s (for %s).", i, oldOperator, newOperator);
if (oldInput.getOccupant() != null) continue;
final InputSlot<?> outerInput = this.resolveUpstream(oldInput);
if (outerInput != null) {
this.delete(oldInput);
final InputSlot<?> newInput = oldToNewSlotMapping.resolveUpstream(oldInput);
if (newInput != null) this.mapUpstream(newInput, outerInput);
}
}
} else {
// Fallback strategy.
this.logger.warn("Using bare indices to replace {} (parent {}) with {}.", oldOperator, oldOperator.getParent(), newOperator);
assert oldOperator.getNumInputs() == newOperator.getNumInputs()
: String.format("Operators %s and %s are not matching.", oldOperator, newOperator);
for (int i = 0; i < oldOperator.getNumInputs(); i++) {
final InputSlot<?> oldInput = oldOperator.getInput(i);
final InputSlot<?> newInput = newOperator.getInput(i);
final InputSlot<?> outerInput = this.resolveUpstream(oldInput);
if (outerInput != null) {
this.mapUpstream(newInput, outerInput);
this.delete(oldInput);
}
}
}
}
/**
* Removes an existing mapping.
*
* @param key the key of the mapping to remove
*/
private void delete(InputSlot<?> key) {
this.upstreamMapping.remove(key);
this.downstreamMapping = null;
}
/**
* Removes an existing mapping.
*
* @param key the key of the mapping to remove
*/
private void delete(OutputSlot<?> key) {
this.upstreamMapping.remove(key);
this.downstreamMapping = null;
}
/**
* Replace the mappings from an old, wrapped operator with a new wrapped operator.
*
* @param oldOperator the old wrapped operator
* @param newOperator the new wrapped operator
*/
@SuppressWarnings("unchecked")
public void replaceOutputSlotMappings(Operator oldOperator, Operator newOperator) {
if (oldOperator.getParent() == newOperator) {
// Default strategy: The oldOperator is now wrapped by the newOperator.
final Map<Slot<?>, Collection> downstreamMapping = this.getOrCreateDownstreamMapping();
final SlotMapping oldToNewSlotMapping = oldOperator.getContainer().getSlotMapping();
for (int i = 0; i < oldOperator.getNumOutputs(); i++) {
final OutputSlot<?> oldOutput = oldOperator.getOutput(i);
final Collection<OutputSlot<?>> outerOutputs = downstreamMapping.get(oldOutput);
if (outerOutputs == null) continue;
// We cannot have multiple OutputSlots, we could not map them downstream to a single OutputSlot.
final OutputSlot<?> newOutput = WayangCollections.getSingleOrNull(
(Collection<OutputSlot<?>>) oldToNewSlotMapping
.getOrCreateDownstreamMapping()
.getOrDefault(oldOutput, Collections.emptySet())
);
for (OutputSlot<?> outerOutput : outerOutputs) {
this.mapUpstream(outerOutput, newOutput);
}
}
} else {
// Fallback strategy.
this.logger.warn("Using bare indices to replace {} (parent {}) with {}.", oldOperator, oldOperator.getParent(), newOperator);
assert oldOperator.getNumOutputs() == newOperator.getNumOutputs()
: String.format("Operators %s and %s are not matching.", oldOperator, newOperator);
for (int i = 0; i < oldOperator.getNumOutputs(); i++) {
final OutputSlot<?> oldOutput = oldOperator.getOutput(i);
final OutputSlot<?> newOutput = newOperator.getOutput(i);
this.upstreamMapping.entrySet().stream()
.filter(entry -> entry.getValue() == oldOutput)
.findFirst()
.map(Map.Entry::getKey)
.ifPresent(outerOutput -> this.mapUpstream((OutputSlot<?>) outerOutput, newOutput));
// No need for delete as we are replacing the old mapping.
}
}
}
/**
* Functionally compose two instances. The {@link OutputSlot}s of this instance are followed to {@link InputSlot}s
* to the other instances, thereby applying the mapping step.
*
* @param that the instance to be composed to this one
* @return a {@link Map} mapping inner {@link InputSlot}s of the second instance to the inner {@link OutputSlot}s
* of the first one
*/
public Map<InputSlot, OutputSlot> compose(SlotMapping that) {
Map<InputSlot, OutputSlot> result = new HashMap<>(2);
that.upstreamMapping.entrySet().stream()
.filter(entry -> entry.getKey().isInputSlot())
.forEach(entry -> {
InputSlot thatOuterInputSlot = (InputSlot) entry.getValue();
final OutputSlot allegedThisOuterOutputSlot = thatOuterInputSlot.getOccupant();
if (allegedThisOuterOutputSlot == null) return;
final OutputSlot thisInnerOutputSlot = this.resolveUpstream(allegedThisOuterOutputSlot);
if (thisInnerOutputSlot == null) return;
final InputSlot thatInnerInputSlot = (InputSlot) entry.getKey();
result.put(thatInnerInputSlot, thisInnerOutputSlot);
});
return result;
}
/**
* Retrieves the upstream mapping of {@link Slot}s. Do not modify!
*
* @return the upstream mapping
*/
public Map<Slot<?>, Slot<?>> getUpstreamMapping() {
return this.upstreamMapping;
}
}