blob: e0f4d6e2fad6ab765630af22cbf0274558d84a6d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.executiongraph;
import org.junit.jupiter.api.Test;
import static org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfoForAllToAll;
import static org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils.computeVertexInputInfoForPointwise;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link VertexInputInfoComputationUtils}. */
class VertexInputInfoComputationUtilsTest {
@Test
void testComputeConsumedSubpartitionRange3to1() {
final IndexRange range = computeConsumedSubpartitionRange(0, 1, 3);
assertThat(range).isEqualTo(new IndexRange(0, 2));
}
@Test
void testComputeConsumedSubpartitionRange3to2() {
final IndexRange range1 = computeConsumedSubpartitionRange(0, 2, 3);
assertThat(range1).isEqualTo(new IndexRange(0, 0));
final IndexRange range2 = computeConsumedSubpartitionRange(1, 2, 3);
assertThat(range2).isEqualTo(new IndexRange(1, 2));
}
@Test
void testComputeConsumedSubpartitionRange6to4() {
final IndexRange range1 = computeConsumedSubpartitionRange(0, 4, 6);
assertThat(range1).isEqualTo(new IndexRange(0, 0));
final IndexRange range2 = computeConsumedSubpartitionRange(1, 4, 6);
assertThat(range2).isEqualTo(new IndexRange(1, 2));
final IndexRange range3 = computeConsumedSubpartitionRange(2, 4, 6);
assertThat(range3).isEqualTo(new IndexRange(3, 3));
final IndexRange range4 = computeConsumedSubpartitionRange(3, 4, 6);
assertThat(range4).isEqualTo(new IndexRange(4, 5));
}
@Test
void testComputeBroadcastConsumedSubpartitionRange() {
final IndexRange range1 = computeConsumedSubpartitionRange(0, 3, 1, true, true);
assertThat(range1).isEqualTo(new IndexRange(0, 0));
final IndexRange range2 = computeConsumedSubpartitionRange(1, 3, 1, true, true);
assertThat(range2).isEqualTo(new IndexRange(0, 0));
final IndexRange range3 = computeConsumedSubpartitionRange(2, 3, 1, true, true);
assertThat(range3).isEqualTo(new IndexRange(0, 0));
}
@Test
void testComputeConsumedSubpartitionRangeForNonDynamicGraph() {
final IndexRange range1 = computeConsumedSubpartitionRange(0, 3, -1, false, false);
assertThat(range1).isEqualTo(new IndexRange(0, 0));
final IndexRange range2 = computeConsumedSubpartitionRange(1, 3, -1, false, false);
assertThat(range2).isEqualTo(new IndexRange(1, 1));
final IndexRange range3 = computeConsumedSubpartitionRange(2, 3, -1, false, false);
assertThat(range3).isEqualTo(new IndexRange(2, 2));
}
@Test
void testComputeVertexInputInfoForAllToAllWithNonDynamicGraph() {
final JobVertexInputInfo nonBroadcast =
computeVertexInputInfoForAllToAll(2, 3, ignored -> 3, false, false);
assertThat(nonBroadcast.getExecutionVertexInputInfos())
.containsExactlyInAnyOrder(
new ExecutionVertexInputInfo(0, new IndexRange(0, 1), new IndexRange(0, 0)),
new ExecutionVertexInputInfo(1, new IndexRange(0, 1), new IndexRange(1, 1)),
new ExecutionVertexInputInfo(
2, new IndexRange(0, 1), new IndexRange(2, 2)));
final JobVertexInputInfo broadcast =
computeVertexInputInfoForAllToAll(2, 3, ignored -> 3, false, true);
assertThat(broadcast.getExecutionVertexInputInfos())
.containsExactlyInAnyOrder(
new ExecutionVertexInputInfo(0, new IndexRange(0, 1), new IndexRange(0, 0)),
new ExecutionVertexInputInfo(1, new IndexRange(0, 1), new IndexRange(1, 1)),
new ExecutionVertexInputInfo(
2, new IndexRange(0, 1), new IndexRange(2, 2)));
}
@Test
void testComputeVertexInputInfoForAllToAllWithDynamicGraph() {
final JobVertexInputInfo nonBroadcast =
computeVertexInputInfoForAllToAll(2, 3, ignored -> 10, true, false);
assertThat(nonBroadcast.getExecutionVertexInputInfos())
.containsExactlyInAnyOrder(
new ExecutionVertexInputInfo(0, new IndexRange(0, 1), new IndexRange(0, 2)),
new ExecutionVertexInputInfo(1, new IndexRange(0, 1), new IndexRange(3, 5)),
new ExecutionVertexInputInfo(
2, new IndexRange(0, 1), new IndexRange(6, 9)));
final JobVertexInputInfo broadcast =
computeVertexInputInfoForAllToAll(2, 3, ignored -> 1, true, true);
assertThat(broadcast.getExecutionVertexInputInfos())
.containsExactlyInAnyOrder(
new ExecutionVertexInputInfo(0, new IndexRange(0, 1), new IndexRange(0, 0)),
new ExecutionVertexInputInfo(1, new IndexRange(0, 1), new IndexRange(0, 0)),
new ExecutionVertexInputInfo(
2, new IndexRange(0, 1), new IndexRange(0, 0)));
}
@Test
void testComputeVertexInputInfoForPointwiseWithNonDynamicGraph() {
final JobVertexInputInfo jobVertexInputInfo =
computeVertexInputInfoForPointwise(2, 3, ignored -> 3, false);
assertThat(jobVertexInputInfo.getExecutionVertexInputInfos())
.containsExactlyInAnyOrder(
new ExecutionVertexInputInfo(0, new IndexRange(0, 0), new IndexRange(0, 0)),
new ExecutionVertexInputInfo(1, new IndexRange(0, 0), new IndexRange(1, 1)),
new ExecutionVertexInputInfo(
2, new IndexRange(1, 1), new IndexRange(0, 0)));
}
@Test
void testComputeVertexInputInfoForPointwiseWithDynamicGraph() {
final JobVertexInputInfo jobVertexInputInfo =
computeVertexInputInfoForPointwise(2, 3, ignored -> 4, true);
assertThat(jobVertexInputInfo.getExecutionVertexInputInfos())
.containsExactlyInAnyOrder(
new ExecutionVertexInputInfo(0, new IndexRange(0, 0), new IndexRange(0, 1)),
new ExecutionVertexInputInfo(1, new IndexRange(0, 0), new IndexRange(2, 3)),
new ExecutionVertexInputInfo(
2, new IndexRange(1, 1), new IndexRange(0, 3)));
}
private static IndexRange computeConsumedSubpartitionRange(
int consumerIndex, int numConsumers, int numSubpartitions) {
return computeConsumedSubpartitionRange(
consumerIndex, numConsumers, numSubpartitions, true, false);
}
private static IndexRange computeConsumedSubpartitionRange(
int consumerIndex,
int numConsumers,
int numSubpartitions,
boolean isDynamicGraph,
boolean isBroadcast) {
return VertexInputInfoComputationUtils.computeConsumedSubpartitionRange(
consumerIndex, numConsumers, () -> numSubpartitions, isDynamicGraph, isBroadcast);
}
}