blob: 4d02f5a3e70d9bb035403d911935105e90021edc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.iteration;
import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
/** Verifies the created job graph satisfy the expectation. */
public class IterationConstructionTest extends TestLogger {
@Test
public void testEmptyIterationBody() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.setParallelism(4);
DataStream<Integer> variableSource =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
.name("Variable");
DataStreamList result =
Iterations.iterateUnboundedStreams(
DataStreamList.of(variableSource),
DataStreamList.of(),
((variableStreams, dataStreams) ->
new IterationBodyResult(variableStreams, dataStreams)));
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
List<String> expectedVertexNames =
Arrays.asList(
/* 0 */ "Source: Variable -> input-Variable",
/* 1 */ "head-Variable",
/* 2 */ "tail-head-Variable -> filter-tail");
List<Integer> expectedParallelisms = Arrays.asList(4, 4, 4);
List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
assertEquals(
expectedVertexNames,
vertices.stream().map(JobVertex::getName).collect(Collectors.toList()));
assertEquals(
expectedParallelisms,
vertices.stream().map(JobVertex::getParallelism).collect(Collectors.toList()));
assertNotNull(vertices.get(1).getCoLocationGroup());
assertNotNull(vertices.get(2).getCoLocationGroup());
assertSame(vertices.get(1).getCoLocationGroup(), vertices.get(2).getCoLocationGroup());
}
@Test
public void testNotChainingHeadOperator() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.setParallelism(4);
DataStream<Integer> variableSource =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
.name("Variable")
.map(x -> x)
.name("map")
.setParallelism(2);
DataStreamList result =
Iterations.iterateUnboundedStreams(
DataStreamList.of(variableSource),
DataStreamList.of(),
((variableStreams, dataStreams) ->
new IterationBodyResult(variableStreams, dataStreams)));
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
List<String> expectedVertexNames =
Arrays.asList(
/* 0 */ "Source: Variable",
/* 1 */ "map -> input-map",
/* 2 */ "head-map",
/* 3 */ "tail-head-map -> filter-tail");
List<Integer> expectedParallelisms = Arrays.asList(4, 2, 2, 2);
List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
assertEquals(
expectedVertexNames,
vertices.stream().map(JobVertex::getName).collect(Collectors.toList()));
assertEquals(
expectedParallelisms,
vertices.stream().map(JobVertex::getParallelism).collect(Collectors.toList()));
assertNotNull(vertices.get(2).getCoLocationGroup());
assertSame(vertices.get(2).getCoLocationGroup(), vertices.get(3).getCoLocationGroup());
}
@Test
public void testUnboundedIteration() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
DataStream<Integer> variableSource1 =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
.setParallelism(2)
.name("Variable0");
DataStream<Integer> variableSource2 =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
.setParallelism(3)
.name("Variable1");
DataStream<Integer> constantSource =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
.setParallelism(3)
.name("Constant");
DataStreamList result =
Iterations.iterateUnboundedStreams(
DataStreamList.of(variableSource1, variableSource2),
DataStreamList.of(constantSource),
new IterationBody() {
@Override
public IterationBodyResult process(
DataStreamList variableStreams, DataStreamList dataStreams) {
SingleOutputStreamOperator<Integer> processor =
variableStreams
.<Integer>get(0)
.union(variableStreams.<Integer>get(1))
.connect(dataStreams.<Integer>get(0))
.process(
new CoProcessFunction<
Integer, Integer, Integer>() {
@Override
public void processElement1(
Integer value,
Context ctx,
Collector<Integer> out)
throws Exception {}
@Override
public void processElement2(
Integer value,
Context ctx,
Collector<Integer> out)
throws Exception {}
})
.name("Processor")
.setParallelism(4);
return new IterationBodyResult(
DataStreamList.of(
processor
.map(x -> x)
.name("Feedback0")
.setParallelism(2),
processor
.map(x -> x)
.name("Feedback1")
.setParallelism(3)),
DataStreamList.of(
processor.getSideOutput(
new OutputTag<Integer>("output") {})));
}
});
result.get(0).addSink(new DiscardingSink<>()).name("Sink").setParallelism(4);
List<String> expectedVertexNames =
Arrays.asList(
/* 0 */ "Source: Variable0 -> input-Variable0",
/* 1 */ "Source: Variable1 -> input-Variable1",
/* 2 */ "Source: Constant -> input-Constant",
/* 3 */ "head-Variable0",
/* 4 */ "head-Variable1",
/* 5 */ "Processor",
/* 6 */ "Feedback0",
/* 7 */ "tail-Feedback0 -> filter-tail",
/* 8 */ "Feedback1",
/* 9 */ "tail-Feedback1 -> filter-tail",
/* 10 */ "tail-map-SideOutput",
/* 11 */ "output-SideOutput -> Sink: Sink");
List<Integer> expectedParallelisms = Arrays.asList(2, 3, 3, 2, 3, 4, 2, 2, 3, 3, 1, 4);
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
assertEquals(
expectedVertexNames,
vertices.stream().map(JobVertex::getName).collect(Collectors.toList()));
assertEquals(
expectedParallelisms,
vertices.stream().map(JobVertex::getParallelism).collect(Collectors.toList()));
assertNotNull(vertices.get(3).getCoLocationGroup());
assertNotNull(vertices.get(4).getCoLocationGroup());
assertSame(vertices.get(3).getCoLocationGroup(), vertices.get(7).getCoLocationGroup());
assertSame(vertices.get(4).getCoLocationGroup(), vertices.get(9).getCoLocationGroup());
}
@Test
public void testBoundedIterationWithTerminationCriteria() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
DataStream<Integer> variableSource1 =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
.setParallelism(2)
.name("Variable0");
DataStream<Integer> variableSource2 =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
.setParallelism(3)
.name("Variable1");
DataStream<Integer> constantSource =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
.setParallelism(3)
.name("Constant");
DataStreamList result =
Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(variableSource1, variableSource2),
ReplayableDataStreamList.notReplay(constantSource),
IterationConfig.newBuilder().build(),
(variableStreams, dataStreams) -> {
SingleOutputStreamOperator<Integer> processor =
variableStreams
.<Integer>get(0)
.union(variableStreams.<Integer>get(1))
.connect(dataStreams.<Integer>get(0))
.process(
new CoProcessFunction<
Integer, Integer, Integer>() {
@Override
public void processElement1(
Integer value,
Context ctx,
Collector<Integer> out)
throws Exception {}
@Override
public void processElement2(
Integer value,
Context ctx,
Collector<Integer> out)
throws Exception {}
})
.name("Processor")
.setParallelism(4);
return new IterationBodyResult(
DataStreamList.of(
processor
.map(x -> x)
.name("Feedback0")
.setParallelism(2),
processor
.map(x -> x)
.name("Feedback1")
.setParallelism(3)),
DataStreamList.of(
processor.getSideOutput(
new OutputTag<Integer>("output") {})),
processor.map(x -> x).name("Termination").setParallelism(5));
});
result.get(0).addSink(new DiscardingSink<>()).name("Sink").setParallelism(4);
List<String> expectedVertexNames =
Arrays.asList(
/* 0 */ "Source: Variable0 -> input-Variable0",
/* 1 */ "Source: Variable1 -> input-Variable1",
/* 2 */ "Source: Constant -> input-Constant",
/* 3 */ "Source: Termination -> input-Termination",
/* 4 */ "head-Variable0",
/* 5 */ "head-Variable1",
/* 6 */ "Processor",
/* 7 */ "Feedback0",
/* 8 */ "tail-Feedback0 -> filter-tail",
/* 9 */ "Feedback1",
/* 10 */ "tail-Feedback1 -> filter-tail",
/* 11 */ "Termination",
/* 12 */ "head-Termination",
/* 13 */ "criteria-merge",
/* 14 */ "tail-criteria-merge -> filter-tail",
/* 15 */ "tail-map-SideOutput",
/* 16 */ "output-SideOutput -> Sink: Sink");
List<Integer> expectedParallelisms =
Arrays.asList(2, 3, 3, 5, 2, 3, 4, 2, 2, 3, 3, 5, 5, 5, 5, 1, 4);
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
assertEquals(
expectedVertexNames,
vertices.stream().map(JobVertex::getName).collect(Collectors.toList()));
assertEquals(
expectedParallelisms,
vertices.stream().map(JobVertex::getParallelism).collect(Collectors.toList()));
assertNotNull(vertices.get(4).getCoLocationGroup());
assertNotNull(vertices.get(5).getCoLocationGroup());
assertNotNull(vertices.get(12).getCoLocationGroup());
assertSame(vertices.get(4).getCoLocationGroup(), vertices.get(8).getCoLocationGroup());
assertSame(vertices.get(5).getCoLocationGroup(), vertices.get(10).getCoLocationGroup());
assertSame(vertices.get(12).getCoLocationGroup(), vertices.get(14).getCoLocationGroup());
}
@Test
public void testReplayedIteration() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
DataStream<Integer> variableSource =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
.setParallelism(2)
.name("Variable");
DataStream<Integer> constantSource =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
.setParallelism(3)
.name("Constant");
DataStreamList result =
Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(variableSource),
ReplayableDataStreamList.replay(constantSource),
IterationConfig.newBuilder()
.setOperatorLifeCycle(IterationConfig.OperatorLifeCycle.PER_ROUND)
.build(),
(variableStreams, dataStreams) -> {
SingleOutputStreamOperator<Integer> processor =
variableStreams
.<Integer>get(0)
.connect(dataStreams.<Integer>get(0))
.process(
new CoProcessFunction<
Integer, Integer, Integer>() {
@Override
public void processElement1(
Integer value,
Context ctx,
Collector<Integer> out)
throws Exception {}
@Override
public void processElement2(
Integer value,
Context ctx,
Collector<Integer> out)
throws Exception {}
})
.name("Processor")
.setParallelism(4);
return new IterationBodyResult(
DataStreamList.of(
processor
.map(x -> x)
.name("Feedback")
.setParallelism(2)),
DataStreamList.of(
processor.getSideOutput(
new OutputTag<Integer>("output") {})),
processor.map(x -> x).name("Termination").setParallelism(5));
});
result.get(0).addSink(new DiscardingSink<>()).name("Sink").setParallelism(4);
List<String> expectedVertexNames =
Arrays.asList(
/* 0 */ "Source: Variable -> input-Variable",
/* 1 */ "Source: Constant -> input-Constant",
/* 2 */ "Source: Termination -> input-Termination",
/* 3 */ "head-Variable",
/* 4 */ "Replayer-Constant",
/* 5 */ "Processor",
/* 6 */ "Feedback",
/* 7 */ "tail-Feedback -> filter-tail",
/* 8 */ "Termination",
/* 9 */ "head-Termination",
/* 10 */ "criteria-merge",
/* 11 */ "tail-criteria-merge -> filter-tail",
/* 12 */ "tail-map-SideOutput",
/* 13 */ "output-SideOutput -> Sink: Sink");
List<Integer> expectedParallelisms =
Arrays.asList(2, 3, 5, 2, 3, 4, 2, 2, 5, 5, 5, 5, 1, 4);
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
assertEquals(
expectedVertexNames,
vertices.stream().map(JobVertex::getName).collect(Collectors.toList()));
assertEquals(
expectedParallelisms,
vertices.stream().map(JobVertex::getParallelism).collect(Collectors.toList()));
assertNotNull(vertices.get(3).getCoLocationGroup());
assertNotNull(vertices.get(9).getCoLocationGroup());
assertSame(vertices.get(3).getCoLocationGroup(), vertices.get(7).getCoLocationGroup());
assertSame(vertices.get(9).getCoLocationGroup(), vertices.get(11).getCoLocationGroup());
}
}