blob: 44b4bcc6437330544d1a21de94d90ecb1aa47106 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
import org.junit.Test;
import org.mockito.Matchers;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* This class contains test concerning the correct conversion from {@link JobGraph} to {@link
* ExecutionGraph} objects. It also tests that {@link EdgeManagerBuildUtil#connectVertexToResult}
* builds {@link DistributionPattern#ALL_TO_ALL} connections correctly.
*/
public class DefaultExecutionGraphConstructionTest {
private ExecutionGraph createDefaultExecutionGraph(List<JobVertex> vertices) throws Exception {
return TestingDefaultExecutionGraphBuilder.newBuilder()
.setVertexParallelismStore(SchedulerBase.computeVertexParallelismStore(vertices))
.build();
}
@Test
public void testExecutionAttemptIdInTwoIdenticalJobsIsNotSame() throws Exception {
JobVertex v1 = new JobVertex("vertex1");
JobVertex v2 = new JobVertex("vertex2");
JobVertex v3 = new JobVertex("vertex3");
v1.setParallelism(5);
v2.setParallelism(7);
v3.setParallelism(2);
v1.setInvokableClass(AbstractInvokable.class);
v2.setInvokableClass(AbstractInvokable.class);
v3.setInvokableClass(AbstractInvokable.class);
List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2, v3));
ExecutionGraph eg1 = createDefaultExecutionGraph(ordered);
ExecutionGraph eg2 = createDefaultExecutionGraph(ordered);
eg1.attachJobGraph(ordered);
eg2.attachJobGraph(ordered);
assertThat(
Sets.intersection(
eg1.getRegisteredExecutions().keySet(),
eg2.getRegisteredExecutions().keySet()),
is(empty()));
}
/**
* Creates a JobGraph of the following form.
*
* <pre>
* v1--->v2-->\
* \
* v4 --->\
* ----->/ \
* v3-->/ v5
* \ /
* ------------->/
* </pre>
*/
@Test
public void testCreateSimpleGraphBipartite() throws Exception {
JobVertex v1 = new JobVertex("vertex1");
JobVertex v2 = new JobVertex("vertex2");
JobVertex v3 = new JobVertex("vertex3");
JobVertex v4 = new JobVertex("vertex4");
JobVertex v5 = new JobVertex("vertex5");
v1.setParallelism(5);
v2.setParallelism(7);
v3.setParallelism(2);
v4.setParallelism(11);
v5.setParallelism(4);
v1.setInvokableClass(AbstractInvokable.class);
v2.setInvokableClass(AbstractInvokable.class);
v3.setInvokableClass(AbstractInvokable.class);
v4.setInvokableClass(AbstractInvokable.class);
v5.setInvokableClass(AbstractInvokable.class);
v2.connectNewDataSetAsInput(
v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v4.connectNewDataSetAsInput(
v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v4.connectNewDataSetAsInput(
v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v5.connectNewDataSetAsInput(
v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v5.connectNewDataSetAsInput(
v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
ExecutionGraph eg = createDefaultExecutionGraph(ordered);
try {
eg.attachJobGraph(ordered);
} catch (JobException e) {
e.printStackTrace();
fail("Job failed with exception: " + e.getMessage());
}
verifyTestGraph(eg, v1, v2, v3, v4, v5);
}
@Test
public void testAttachViaDataSets() throws Exception {
// construct part one of the execution graph
JobVertex v1 = new JobVertex("vertex1");
JobVertex v2 = new JobVertex("vertex2");
JobVertex v3 = new JobVertex("vertex3");
v1.setParallelism(5);
v2.setParallelism(7);
v3.setParallelism(2);
v1.setInvokableClass(AbstractInvokable.class);
v2.setInvokableClass(AbstractInvokable.class);
v3.setInvokableClass(AbstractInvokable.class);
// this creates an intermediate result for v1
v2.connectNewDataSetAsInput(
v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
// create results for v2 and v3
IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
IntermediateDataSet v3Result1 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
IntermediateDataSet v3Result2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
JobVertex v4 = new JobVertex("vertex4");
JobVertex v5 = new JobVertex("vertex5");
v4.setParallelism(11);
v5.setParallelism(4);
v4.setInvokableClass(AbstractInvokable.class);
v5.setInvokableClass(AbstractInvokable.class);
v4.connectDataSetAsInput(v2result, DistributionPattern.ALL_TO_ALL);
v4.connectDataSetAsInput(v3Result1, DistributionPattern.ALL_TO_ALL);
v5.connectNewDataSetAsInput(
v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v5.connectDataSetAsInput(v3Result2, DistributionPattern.ALL_TO_ALL);
List<JobVertex> ordered = Arrays.asList(v1, v2, v3);
List<JobVertex> ordered2 = Arrays.asList(v4, v5);
ExecutionGraph eg =
createDefaultExecutionGraph(
Stream.concat(ordered.stream(), ordered2.stream())
.collect(Collectors.toList()));
try {
eg.attachJobGraph(ordered);
} catch (JobException e) {
e.printStackTrace();
fail("Job failed with exception: " + e.getMessage());
}
// attach the second part of the graph
try {
eg.attachJobGraph(ordered2);
} catch (JobException e) {
e.printStackTrace();
fail("Job failed with exception: " + e.getMessage());
}
// verify
verifyTestGraph(eg, v1, v2, v3, v4, v5);
}
private void verifyTestGraph(
ExecutionGraph eg,
JobVertex v1,
JobVertex v2,
JobVertex v3,
JobVertex v4,
JobVertex v5) {
ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(
eg, v1, null, Collections.singletonList(v2));
ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(
eg, v2, Collections.singletonList(v1), Collections.singletonList(v4));
ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(
eg, v3, null, Arrays.asList(v4, v5));
ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(
eg, v4, Arrays.asList(v2, v3), Collections.singletonList(v5));
ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(
eg, v5, Arrays.asList(v4, v3), null);
}
@Test
public void testCannotConnectWrongOrder() throws Exception {
JobVertex v1 = new JobVertex("vertex1");
JobVertex v2 = new JobVertex("vertex2");
JobVertex v3 = new JobVertex("vertex3");
JobVertex v4 = new JobVertex("vertex4");
JobVertex v5 = new JobVertex("vertex5");
v1.setParallelism(5);
v2.setParallelism(7);
v3.setParallelism(2);
v4.setParallelism(11);
v5.setParallelism(4);
v1.setInvokableClass(AbstractInvokable.class);
v2.setInvokableClass(AbstractInvokable.class);
v3.setInvokableClass(AbstractInvokable.class);
v4.setInvokableClass(AbstractInvokable.class);
v5.setInvokableClass(AbstractInvokable.class);
v2.connectNewDataSetAsInput(
v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v4.connectNewDataSetAsInput(
v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v4.connectNewDataSetAsInput(
v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v5.connectNewDataSetAsInput(
v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v5.connectNewDataSetAsInput(
v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
ExecutionGraph eg = createDefaultExecutionGraph(ordered);
try {
eg.attachJobGraph(ordered);
fail("Attached wrong jobgraph");
} catch (JobException e) {
// expected
}
}
@Test
public void testSetupInputSplits() {
try {
final InputSplit[] emptySplits = new InputSplit[0];
InputSplitAssigner assigner1 = mock(InputSplitAssigner.class);
InputSplitAssigner assigner2 = mock(InputSplitAssigner.class);
@SuppressWarnings("unchecked")
InputSplitSource<InputSplit> source1 = mock(InputSplitSource.class);
@SuppressWarnings("unchecked")
InputSplitSource<InputSplit> source2 = mock(InputSplitSource.class);
when(source1.createInputSplits(Matchers.anyInt())).thenReturn(emptySplits);
when(source2.createInputSplits(Matchers.anyInt())).thenReturn(emptySplits);
when(source1.getInputSplitAssigner(emptySplits)).thenReturn(assigner1);
when(source2.getInputSplitAssigner(emptySplits)).thenReturn(assigner2);
final JobID jobId = new JobID();
final String jobName = "Test Job Sample Name";
final Configuration cfg = new Configuration();
JobVertex v1 = new JobVertex("vertex1");
JobVertex v2 = new JobVertex("vertex2");
JobVertex v3 = new JobVertex("vertex3");
JobVertex v4 = new JobVertex("vertex4");
JobVertex v5 = new JobVertex("vertex5");
v1.setParallelism(5);
v2.setParallelism(7);
v3.setParallelism(2);
v4.setParallelism(11);
v5.setParallelism(4);
v1.setInvokableClass(AbstractInvokable.class);
v2.setInvokableClass(AbstractInvokable.class);
v3.setInvokableClass(AbstractInvokable.class);
v4.setInvokableClass(AbstractInvokable.class);
v5.setInvokableClass(AbstractInvokable.class);
v2.connectNewDataSetAsInput(
v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v4.connectNewDataSetAsInput(
v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v4.connectNewDataSetAsInput(
v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v5.connectNewDataSetAsInput(
v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v5.connectNewDataSetAsInput(
v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v3.setInputSplitSource(source1);
v5.setInputSplitSource(source2);
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
ExecutionGraph eg = createDefaultExecutionGraph(ordered);
try {
eg.attachJobGraph(ordered);
} catch (JobException e) {
e.printStackTrace();
fail("Job failed with exception: " + e.getMessage());
}
assertEquals(assigner1, eg.getAllVertices().get(v3.getID()).getSplitAssigner());
assertEquals(assigner2, eg.getAllVertices().get(v5.getID()).getSplitAssigner());
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testMoreThanOneConsumerForIntermediateResult() {
try {
JobVertex v1 = new JobVertex("vertex1");
JobVertex v2 = new JobVertex("vertex2");
JobVertex v3 = new JobVertex("vertex3");
v1.setParallelism(5);
v2.setParallelism(7);
v3.setParallelism(2);
IntermediateDataSet result =
v1.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
v2.connectDataSetAsInput(result, DistributionPattern.ALL_TO_ALL);
v3.connectDataSetAsInput(result, DistributionPattern.ALL_TO_ALL);
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
ExecutionGraph eg = createDefaultExecutionGraph(ordered);
try {
eg.attachJobGraph(ordered);
fail("Should not be possible");
} catch (RuntimeException e) {
// expected
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testRegisterConsumedPartitionGroupToEdgeManager() throws Exception {
JobVertex v1 = new JobVertex("source");
JobVertex v2 = new JobVertex("sink");
v1.setParallelism(2);
v2.setParallelism(2);
v2.connectNewDataSetAsInput(
v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
List<JobVertex> ordered = new ArrayList<>(Arrays.asList(v1, v2));
ExecutionGraph eg = createDefaultExecutionGraph(ordered);
eg.attachJobGraph(ordered);
IntermediateResult result =
Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
IntermediateResultPartition partition1 = result.getPartitions()[0];
IntermediateResultPartition partition2 = result.getPartitions()[1];
assertEquals(
partition1.getConsumedPartitionGroups().get(0),
partition2.getConsumedPartitionGroups().get(0));
ConsumedPartitionGroup consumedPartitionGroup =
partition1.getConsumedPartitionGroups().get(0);
Set<IntermediateResultPartitionID> partitionIds = new HashSet<>();
for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
partitionIds.add(partitionId);
}
assertThat(
partitionIds,
containsInAnyOrder(partition1.getPartitionId(), partition2.getPartitionId()));
}
}