blob: d92edf2706820592222c7b8b3758d16d9fde07e4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.basic.mapping;
import org.apache.wayang.basic.operators.CollectionSource;
import org.apache.wayang.basic.operators.LoopOperator;
import org.apache.wayang.basic.operators.MapOperator;
import org.apache.wayang.basic.operators.RepeatOperator;
import org.apache.wayang.core.mapping.Mapping;
import org.apache.wayang.core.mapping.OperatorPattern;
import org.apache.wayang.core.mapping.PlanTransformation;
import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
import org.apache.wayang.core.mapping.SubplanPattern;
import org.apache.wayang.core.plan.wayangplan.Subplan;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.WayangCollections;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
/**
* This {@link Mapping} translates a {@link RepeatOperator} into a {@link Subplan} with the {@link LoopOperator}.
*/
@SuppressWarnings("unused")
public class RepeatMapping implements Mapping {
@Override
public Collection<PlanTransformation> getTransformations() {
return Collections.singleton(this.createTransformation());
}
private PlanTransformation createTransformation() {
return new PlanTransformation(
this.createPattern(),
this.createReplacementFactory()
);
}
private SubplanPattern createPattern() {
return SubplanPattern.createSingleton(new OperatorPattern<>(
"repeat",
new RepeatOperator<>(1, DataSetType.none()),
false
));
}
private ReplacementSubplanFactory createReplacementFactory() {
return new ReplacementSubplanFactory.OfSingleOperators<>(this::createLoopOperatorSubplan);
}
private Subplan createLoopOperatorSubplan(RepeatOperator<?> repeatOperator, int epoch) {
final int numIterations = repeatOperator.getNumIterations();
// Create
final CollectionSource<Integer> incSource = CollectionSource.singleton(0, Integer.class);
incSource.setName(String.format("%s (init)", repeatOperator.getName()));
final LoopOperator<?, ?> loopOperator = new LoopOperator<>(
repeatOperator.getType().unchecked(),
DataSetType.createDefault(Integer.class),
ints -> WayangCollections.getSingle(ints) >= numIterations,
numIterations
);
loopOperator.setName(repeatOperator.getName());
final MapOperator<Integer, Integer> increment = new MapOperator<>(
i -> i + 1, Integer.class, Integer.class
);
increment.setName(String.format("%s (inc)", repeatOperator.getName()));
incSource.connectTo(0, loopOperator, LoopOperator.INITIAL_CONVERGENCE_INPUT_INDEX);
loopOperator.connectTo(LoopOperator.ITERATION_CONVERGENCE_OUTPUT_INDEX, increment, 0);
increment.connectTo(0, loopOperator, LoopOperator.ITERATION_CONVERGENCE_INPUT_INDEX);
// Carefully align with the slots of RepeatOperator.
// TODO: This unfortunately does not work because we are introducing a non-source Subplan with a Source.
return Subplan.wrap(
Arrays.asList(
loopOperator.getInput(LoopOperator.INITIAL_INPUT_INDEX),
loopOperator.getInput(LoopOperator.ITERATION_INPUT_INDEX)
),
Arrays.asList(
loopOperator.getOutput(LoopOperator.ITERATION_OUTPUT_INDEX),
loopOperator.getOutput(LoopOperator.FINAL_OUTPUT_INDEX)
),
null
);
}
}