blob: d9edcda25f2adf150e457074a3a8dde8db28b6cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.util.Visitor;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests in this class:
*
* <ul>
* <li>Tests that check the correct handling of the properties and strategies in the case where
* the parallelism between tasks is increased or decreased.
* </ul>
*/
@SuppressWarnings({"serial"})
public class ParallelismChangeTest extends CompilerTestBase {
/**
* Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all
* properties).
*
* <p>Increases parallelism between 1st reduce and 2nd map, so the hash partitioning from 1st
* reduce is not reusable. Expected to re-establish partitioning between reduce and map, via
* hash, because random is a full network transit as well.
*/
@Test
public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
final int p = DEFAULT_PARALLELISM;
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(p);
DataSet<Long> set1 = env.generateSequence(0, 1).setParallelism(p);
set1.map(new IdentityMapper<Long>())
.withForwardedFields("*")
.setParallelism(p)
.name("Map1")
.groupBy("*")
.reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*")
.setParallelism(p)
.name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*")
.setParallelism(p * 2)
.name("Map2")
.groupBy("*")
.reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*")
.setParallelism(p * 2)
.name("Reduce2")
.output(new DiscardingOutputFormat<Long>())
.setParallelism(p * 2)
.name("Sink");
Plan plan = env.createProgramPlan();
// submit the plan to the compiler
OptimizedPlan oPlan = compileNoStats(plan);
// check the optimized Plan
// when reducer 1 distributes its data across the instances of map2, it needs to employ a
// local hash method,
// because map2 has twice as many instances and key/value pairs with the same key need to be
// processed by the same
// mapper respectively reducer
SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
Assert.assertEquals(
"Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, mapIn);
Assert.assertEquals(
"Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, redIn);
}
/**
* Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all
* properties).
*
* <p>Increases parallelism between 2nd map and 2nd reduce, so the hash partitioning from 1st
* reduce is not reusable. Expected to re-establish partitioning between map and reduce (hash).
*/
@Test
public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
final int p = DEFAULT_PARALLELISM;
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(p);
DataSet<Long> set1 = env.generateSequence(0, 1).setParallelism(p);
set1.map(new IdentityMapper<Long>())
.withForwardedFields("*")
.setParallelism(p)
.name("Map1")
.groupBy("*")
.reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*")
.setParallelism(p)
.name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*")
.setParallelism(p)
.name("Map2")
.groupBy("*")
.reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*")
.setParallelism(p * 2)
.name("Reduce2")
.output(new DiscardingOutputFormat<Long>())
.setParallelism(p * 2)
.name("Sink");
Plan plan = env.createProgramPlan();
// submit the plan to the compiler
OptimizedPlan oPlan = compileNoStats(plan);
// check the optimized Plan
// when reducer 1 distributes its data across the instances of map2, it needs to employ a
// local hash method,
// because map2 has twice as many instances and key/value pairs with the same key need to be
// processed by the same
// mapper respectively reducer
SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
Assert.assertEquals(
"Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn);
Assert.assertEquals(
"Invalid ship strategy for an operator.",
ShipStrategyType.PARTITION_HASH,
reduceIn);
}
/**
* Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve all fields (hence all
* properties).
*
* <p>Increases parallelism between 1st reduce and 2nd map, such that more tasks are on one
* instance. Expected to re-establish partitioning between map and reduce via a local hash.
*/
@Test
public void checkPropertyHandlingWithIncreasingLocalParallelism() {
final int p = DEFAULT_PARALLELISM * 2;
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(p);
DataSet<Long> set1 = env.generateSequence(0, 1).setParallelism(p);
set1.map(new IdentityMapper<Long>())
.withForwardedFields("*")
.setParallelism(p)
.name("Map1")
.groupBy("*")
.reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*")
.setParallelism(p)
.name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*")
.setParallelism(p * 2)
.name("Map2")
.groupBy("*")
.reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*")
.setParallelism(p * 2)
.name("Reduce2")
.output(new DiscardingOutputFormat<Long>())
.setParallelism(p * 2)
.name("Sink");
Plan plan = env.createProgramPlan();
// submit the plan to the compiler
OptimizedPlan oPlan = compileNoStats(plan);
// check the optimized Plan
// when reducer 1 distributes its data across the instances of map2, it needs to employ a
// local hash method,
// because map2 has twice as many instances and key/value pairs with the same key need to be
// processed by the same
// mapper respectively reducer
SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
Assert.assertTrue(
"Invalid ship strategy for an operator.",
(ShipStrategyType.PARTITION_RANDOM == mapIn
&& ShipStrategyType.PARTITION_HASH == reduceIn)
|| (ShipStrategyType.PARTITION_HASH == mapIn
&& ShipStrategyType.FORWARD == reduceIn));
}
@Test
public void checkPropertyHandlingWithDecreasingParallelism() {
final int p = DEFAULT_PARALLELISM;
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(p);
env.generateSequence(0, 1)
.setParallelism(p * 2)
.map(new IdentityMapper<Long>())
.withForwardedFields("*")
.setParallelism(p * 2)
.name("Map1")
.groupBy("*")
.reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*")
.setParallelism(p * 2)
.name("Reduce1")
.map(new IdentityMapper<Long>())
.withForwardedFields("*")
.setParallelism(p)
.name("Map2")
.groupBy("*")
.reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*")
.setParallelism(p)
.name("Reduce2")
.output(new DiscardingOutputFormat<Long>())
.setParallelism(p)
.name("Sink");
Plan plan = env.createProgramPlan();
// submit the plan to the compiler
OptimizedPlan oPlan = compileNoStats(plan);
// check the optimized Plan
// when reducer 1 distributes its data across the instances of map2, it needs to employ a
// local hash method,
// because map2 has twice as many instances and key/value pairs with the same key need to be
// processed by the same
// mapper respectively reducer
SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
Assert.assertTrue(
"The no sorting local strategy.",
LocalStrategy.SORT == red2Node.getInput().getLocalStrategy()
|| LocalStrategy.SORT == map2Node.getInput().getLocalStrategy());
Assert.assertTrue(
"The no partitioning ship strategy.",
ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy()
|| ShipStrategyType.PARTITION_HASH
== map2Node.getInput().getShipStrategy());
}
/**
* Checks that re-partitioning happens when the inputs of a two-input contract have different
* parallelisms.
*
* <p>Test Plan:
*
* <pre>
*
* (source) -> reduce -\
* Match -> (sink)
* (source) -> reduce -/
*
* </pre>
*/
@Test
public void checkPropertyHandlingWithTwoInputs() {
// construct the plan
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
DataSet<Long> set1 = env.generateSequence(0, 1).setParallelism(5);
DataSet<Long> set2 = env.generateSequence(0, 1).setParallelism(7);
DataSet<Long> reduce1 =
set1.groupBy("*")
.reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*")
.setParallelism(5);
DataSet<Long> reduce2 =
set2.groupBy("*")
.reduceGroup(new IdentityGroupReducer<Long>())
.withForwardedFields("*")
.setParallelism(7);
reduce1.join(reduce2)
.where("*")
.equalTo("*")
.with(new IdentityJoiner<Long>())
.setParallelism(5)
.output(new DiscardingOutputFormat<Long>())
.setParallelism(5);
Plan plan = env.createProgramPlan();
// submit the plan to the compiler
OptimizedPlan oPlan = compileNoStats(plan);
JobGraphGenerator jobGen = new JobGraphGenerator();
// Compile plan to verify that no error is thrown
jobGen.compileJobGraph(oPlan);
oPlan.accept(
new Visitor<PlanNode>() {
@Override
public boolean preVisit(PlanNode visitable) {
if (visitable instanceof DualInputPlanNode) {
DualInputPlanNode node = (DualInputPlanNode) visitable;
Channel c1 = node.getInput1();
Channel c2 = node.getInput2();
Assert.assertEquals(
"Incompatible shipping strategy chosen for match",
ShipStrategyType.FORWARD,
c1.getShipStrategy());
Assert.assertEquals(
"Incompatible shipping strategy chosen for match",
ShipStrategyType.PARTITION_HASH,
c2.getShipStrategy());
return false;
}
return true;
}
@Override
public void postVisit(PlanNode visitable) {
// DO NOTHING
}
});
}
}