blob: 3b6a66eabd13942f0872d7fe493f5d31f539e7e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.OperatorInformation;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.dag.DataSourceNode;
import org.apache.flink.optimizer.dag.JoinNode;
import org.apache.flink.optimizer.dag.MapNode;
import org.apache.flink.optimizer.dataproperties.GlobalProperties;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.junit.Test;
import static org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport.*;
import static org.junit.Assert.*;
public class FeedbackPropertiesMatchTest {
@Test
public void testNoPartialSolutionFoundSingleInputOnly() {
try {
SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
SourcePlanNode otherTarget = new SourcePlanNode(getSourceNode(), "Source");
Channel toMap1 = new Channel(target);
toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap1.setLocalStrategy(LocalStrategy.NONE);
SingleInputPlanNode map1 =
new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
Channel toMap2 = new Channel(map1);
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap2.setLocalStrategy(LocalStrategy.NONE);
SingleInputPlanNode map2 =
new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
{
GlobalProperties gp = new GlobalProperties();
LocalProperties lp = new LocalProperties();
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(otherTarget, gp, lp);
assertTrue(report == NO_PARTIAL_SOLUTION);
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testSingleInputOperators() {
try {
SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
Channel toMap1 = new Channel(target);
toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap1.setLocalStrategy(LocalStrategy.NONE);
SingleInputPlanNode map1 =
new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
Channel toMap2 = new Channel(map1);
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap2.setLocalStrategy(LocalStrategy.NONE);
SingleInputPlanNode map2 =
new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
// no feedback properties and none are ever required and present
{
GlobalProperties gp = new GlobalProperties();
LocalProperties lp = new LocalProperties();
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some global feedback properties and none are ever required and present
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(2, 5));
LocalProperties lp = new LocalProperties();
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some local feedback properties and none are ever required and present
{
GlobalProperties gp = new GlobalProperties();
LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some global and local feedback properties and none are ever required and present
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(2, 5));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// --------------------------- requirements on channel 1 -----------------------
// some required global properties, which are matched exactly
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(2, 5));
LocalProperties lp = new LocalProperties();
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setHashPartitioned(new FieldList(2, 5));
toMap1.setRequiredGlobalProps(reqGp);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some required local properties, which are matched exactly
{
GlobalProperties gp = new GlobalProperties();
LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(1, 2));
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(reqLp);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some required global and local properties, which are matched exactly
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(2, 5));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setHashPartitioned(new FieldList(2, 5));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(1, 2));
toMap1.setRequiredGlobalProps(reqGp);
toMap1.setRequiredLocalProps(reqLp);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some required global and local properties, which are over-fulfilled
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(2));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setHashPartitioned(new FieldSet(2, 5));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(1));
toMap1.setRequiredGlobalProps(reqGp);
toMap1.setRequiredLocalProps(reqLp);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some required global properties that are not met
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(2, 1));
LocalProperties lp = new LocalProperties();
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setHashPartitioned(new FieldList(2, 5));
toMap1.setRequiredGlobalProps(reqGp);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// some required local properties that are not met
{
GlobalProperties gp = new GlobalProperties();
LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(2, 1));
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(reqLp);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// some required global and local properties where the global properties are not met
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(2, 1));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setAnyPartitioning(new FieldList(2, 5));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(1));
toMap1.setRequiredGlobalProps(reqGp);
toMap1.setRequiredLocalProps(reqLp);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// some required global and local properties where the local properties are not met
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(1));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setAnyPartitioning(new FieldList(1));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(2));
toMap1.setRequiredGlobalProps(reqGp);
toMap1.setRequiredLocalProps(reqLp);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// --------------------------- requirements on channel 2 -----------------------
// some required global properties, which are matched exactly
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(2, 5));
LocalProperties lp = new LocalProperties();
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setHashPartitioned(new FieldList(2, 5));
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(reqGp);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some required local properties, which are matched exactly
{
GlobalProperties gp = new GlobalProperties();
LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(1, 2));
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(reqLp);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some required global and local properties, which are matched exactly
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(2, 5));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setHashPartitioned(new FieldList(2, 5));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(1, 2));
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(reqGp);
toMap2.setRequiredLocalProps(reqLp);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some required global and local properties, which are over-fulfilled
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(2));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setHashPartitioned(new FieldSet(2, 5));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(1));
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(reqGp);
toMap2.setRequiredLocalProps(reqLp);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some required global properties that are not met
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(2, 1));
LocalProperties lp = new LocalProperties();
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setHashPartitioned(new FieldSet(2, 5));
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(reqGp);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// some required local properties that are not met
{
GlobalProperties gp = new GlobalProperties();
LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(2, 1));
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(reqLp);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// some required global and local properties where the global properties are not met
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(2, 1));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setAnyPartitioning(new FieldSet(2, 5));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(1));
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(reqGp);
toMap2.setRequiredLocalProps(reqLp);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// some required global and local properties where the local properties are not met
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(1));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setAnyPartitioning(new FieldList(1));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(2));
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(reqGp);
toMap2.setRequiredLocalProps(reqLp);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// ---------------------- requirements mixed on 1 and 2 -----------------------
// some required global properties at step one and some more at step 2
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(1, 2));
LocalProperties lp = LocalProperties.EMPTY;
RequestedGlobalProperties reqGp1 = new RequestedGlobalProperties();
reqGp1.setAnyPartitioning(new FieldList(1, 2));
RequestedGlobalProperties reqGp2 = new RequestedGlobalProperties();
reqGp2.setHashPartitioned(new FieldList(1, 2));
toMap1.setRequiredGlobalProps(reqGp1);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(reqGp2);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some required local properties at step one and some more at step 2
{
GlobalProperties gp = new GlobalProperties();
LocalProperties lp =
LocalProperties.forOrdering(
new Ordering(3, null, Order.ASCENDING)
.appendOrdering(1, null, Order.DESCENDING));
RequestedLocalProperties reqLp1 = new RequestedLocalProperties();
reqLp1.setGroupedFields(new FieldList(3, 1));
RequestedLocalProperties reqLp2 = new RequestedLocalProperties();
reqLp2.setOrdering(
new Ordering(3, null, Order.ANY).appendOrdering(1, null, Order.ANY));
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(reqLp1);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(reqLp2);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some required global properties at step one and some local ones at step 2
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(1, 2));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setAnyPartitioning(new FieldList(1, 2));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(2));
toMap1.setRequiredGlobalProps(reqGp);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(reqLp);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some required local properties at step one and some global ones at step 2
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(1, 2));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setAnyPartitioning(new FieldList(1, 2));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(2));
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(reqLp);
toMap2.setRequiredGlobalProps(reqGp);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some fulfilled global properties at step one and some non-fulfilled local ones at
// step 2
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(1, 2));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setAnyPartitioning(new FieldList(1, 2));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(2, 3));
toMap1.setRequiredGlobalProps(reqGp);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(reqLp);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// some fulfilled local properties at step one and some non-fulfilled global ones at
// step 2
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(1, 2));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setAnyPartitioning(new FieldList(2, 3));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(2, 1));
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(reqLp);
toMap2.setRequiredGlobalProps(reqGp);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// some non-fulfilled global properties at step one and some fulfilled local ones at
// step 2
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(1, 2));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setAnyPartitioning(new FieldList(2, 3));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(2, 1));
toMap1.setRequiredGlobalProps(reqGp);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(reqLp);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// some non-fulfilled local properties at step one and some fulfilled global ones at
// step 2
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(1, 2));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setAnyPartitioning(new FieldList(1, 2));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(2, 1, 3));
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(reqLp);
toMap2.setRequiredGlobalProps(reqGp);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testSingleInputOperatorsWithReCreation() {
try {
SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
Channel toMap1 = new Channel(target);
SingleInputPlanNode map1 =
new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
Channel toMap2 = new Channel(map1);
SingleInputPlanNode map2 =
new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
// set ship strategy in first channel, so later non matching global properties do not
// matter
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(1, 2));
LocalProperties lp = LocalProperties.EMPTY;
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setAnyPartitioning(new FieldSet(2, 5));
toMap1.setShipStrategy(
ShipStrategyType.PARTITION_HASH,
new FieldList(2, 5),
DataExchangeMode.PIPELINED);
toMap1.setLocalStrategy(LocalStrategy.NONE);
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap2.setLocalStrategy(LocalStrategy.NONE);
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(reqGp);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(MET, report);
}
// set ship strategy in second channel, so previous non matching global properties void
// the match
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(1, 2));
LocalProperties lp = LocalProperties.EMPTY;
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setAnyPartitioning(new FieldSet(2, 5));
toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap1.setLocalStrategy(LocalStrategy.NONE);
toMap2.setShipStrategy(
ShipStrategyType.PARTITION_HASH,
new FieldList(2, 5),
DataExchangeMode.PIPELINED);
toMap2.setLocalStrategy(LocalStrategy.NONE);
toMap1.setRequiredGlobalProps(reqGp);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// set local strategy in first channel, so later non matching local properties do not
// matter
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(1, 2));
LocalProperties lp =
LocalProperties.forOrdering(
new Ordering(3, null, Order.ASCENDING)
.appendOrdering(1, null, Order.DESCENDING));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(4, 1));
toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap1.setLocalStrategy(
LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap2.setLocalStrategy(LocalStrategy.NONE);
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(reqLp);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// set local strategy in second channel, so previous non matching local properties void
// the match
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(1, 2));
LocalProperties lp =
LocalProperties.forOrdering(
new Ordering(3, null, Order.ASCENDING)
.appendOrdering(1, null, Order.DESCENDING));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(4, 1));
toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap1.setLocalStrategy(LocalStrategy.NONE);
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap2.setLocalStrategy(
LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(reqLp);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// create the properties on the same node as the requirement
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(1, 2));
LocalProperties lp =
LocalProperties.forOrdering(
new Ordering(3, null, Order.ASCENDING)
.appendOrdering(1, null, Order.DESCENDING));
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setAnyPartitioning(new FieldSet(5, 7));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(5, 7));
toMap1.setShipStrategy(
ShipStrategyType.PARTITION_HASH,
new FieldList(5, 7),
DataExchangeMode.PIPELINED);
toMap1.setLocalStrategy(
LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap2.setLocalStrategy(LocalStrategy.NONE);
toMap1.setRequiredGlobalProps(reqGp);
toMap1.setRequiredLocalProps(reqLp);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map2.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(MET, report);
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testSingleInputOperatorsChainOfThree() {
try {
SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
Channel toMap1 = new Channel(target);
SingleInputPlanNode map1 =
new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
Channel toMap2 = new Channel(map1);
SingleInputPlanNode map2 =
new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
Channel toMap3 = new Channel(map2);
SingleInputPlanNode map3 =
new SingleInputPlanNode(getMapNode(), "Mapper 3", toMap3, DriverStrategy.MAP);
// set local strategy in first channel, so later non matching local properties do not
// matter
{
GlobalProperties gp = new GlobalProperties();
LocalProperties lp =
LocalProperties.forOrdering(
new Ordering(3, null, Order.ASCENDING)
.appendOrdering(1, null, Order.DESCENDING));
RequestedLocalProperties reqLp = new RequestedLocalProperties();
reqLp.setGroupedFields(new FieldList(4, 1));
toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap1.setLocalStrategy(
LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap2.setLocalStrategy(LocalStrategy.NONE);
toMap3.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap3.setLocalStrategy(LocalStrategy.NONE);
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(null);
toMap3.setRequiredGlobalProps(null);
toMap3.setRequiredLocalProps(reqLp);
FeedbackPropertiesMeetRequirementsReport report =
map3.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// set global strategy in first channel, so later non matching global properties do not
// matter
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(5, 3));
LocalProperties lp = LocalProperties.EMPTY;
RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
reqGp.setAnyPartitioning(new FieldSet(2, 3));
toMap1.setShipStrategy(
ShipStrategyType.PARTITION_HASH,
new FieldList(1, 2),
DataExchangeMode.PIPELINED);
toMap1.setLocalStrategy(LocalStrategy.NONE);
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap2.setLocalStrategy(LocalStrategy.NONE);
toMap3.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap3.setLocalStrategy(LocalStrategy.NONE);
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(null);
toMap2.setRequiredGlobalProps(null);
toMap2.setRequiredLocalProps(null);
toMap3.setRequiredGlobalProps(reqGp);
toMap3.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
map3.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testNoPartialSolutionFoundTwoInputOperator() {
try {
SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
SourcePlanNode source1 = new SourcePlanNode(getSourceNode(), "Source 1");
SourcePlanNode source2 = new SourcePlanNode(getSourceNode(), "Source 2");
Channel toMap1 = new Channel(source1);
toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap1.setLocalStrategy(LocalStrategy.NONE);
SingleInputPlanNode map1 =
new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
Channel toMap2 = new Channel(source2);
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap2.setLocalStrategy(LocalStrategy.NONE);
SingleInputPlanNode map2 =
new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
Channel toJoin1 = new Channel(map1);
Channel toJoin2 = new Channel(map2);
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toJoin1.setLocalStrategy(LocalStrategy.NONE);
toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toJoin2.setLocalStrategy(LocalStrategy.NONE);
DualInputPlanNode join =
new DualInputPlanNode(
getJoinNode(),
"Join",
toJoin1,
toJoin2,
DriverStrategy.HYBRIDHASH_BUILD_FIRST);
FeedbackPropertiesMeetRequirementsReport report =
join.checkPartialSolutionPropertiesMet(
target, new GlobalProperties(), new LocalProperties());
assertEquals(NO_PARTIAL_SOLUTION, report);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testTwoOperatorsOneIndependent() {
try {
SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
SourcePlanNode source = new SourcePlanNode(getSourceNode(), "Other Source");
Channel toMap1 = new Channel(target);
toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap1.setLocalStrategy(LocalStrategy.NONE);
SingleInputPlanNode map1 =
new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
Channel toMap2 = new Channel(source);
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap2.setLocalStrategy(LocalStrategy.NONE);
SingleInputPlanNode map2 =
new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
Channel toJoin1 = new Channel(map1);
Channel toJoin2 = new Channel(map2);
DualInputPlanNode join =
new DualInputPlanNode(
getJoinNode(),
"Join",
toJoin1,
toJoin2,
DriverStrategy.HYBRIDHASH_BUILD_FIRST);
Channel toAfterJoin = new Channel(join);
toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
SingleInputPlanNode afterJoin =
new SingleInputPlanNode(
getMapNode(), "After Join Mapper", toAfterJoin, DriverStrategy.MAP);
// attach some properties to the non-relevant input
{
toMap2.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED);
toMap2.setLocalStrategy(
LocalStrategy.SORT, new FieldList(2, 7), new boolean[] {true, true});
RequestedGlobalProperties joinGp = new RequestedGlobalProperties();
joinGp.setFullyReplicated();
RequestedLocalProperties joinLp = new RequestedLocalProperties();
joinLp.setOrdering(
new Ordering(2, null, Order.ASCENDING)
.appendOrdering(7, null, Order.ASCENDING));
toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toJoin2.setLocalStrategy(LocalStrategy.NONE);
toJoin2.setRequiredGlobalProps(joinGp);
toJoin2.setRequiredLocalProps(joinLp);
}
// ------------------------------------------------------------------------------------
// no properties from the partial solution, no required properties
{
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toJoin1.setLocalStrategy(LocalStrategy.NONE);
GlobalProperties gp = new GlobalProperties();
LocalProperties lp = LocalProperties.EMPTY;
FeedbackPropertiesMeetRequirementsReport report =
join.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some properties from the partial solution, no required properties
{
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toJoin1.setLocalStrategy(LocalStrategy.NONE);
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
FeedbackPropertiesMeetRequirementsReport report =
join.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// produced properties match relevant input
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties rgp = new RequestedGlobalProperties();
rgp.setHashPartitioned(new FieldList(0));
RequestedLocalProperties rlp = new RequestedLocalProperties();
rlp.setGroupedFields(new FieldList(2));
toJoin1.setRequiredGlobalProps(rgp);
toJoin1.setRequiredLocalProps(rlp);
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toJoin1.setLocalStrategy(LocalStrategy.NONE);
FeedbackPropertiesMeetRequirementsReport report =
join.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// produced properties do not match relevant input
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties rgp = new RequestedGlobalProperties();
rgp.setHashPartitioned(new FieldList(0));
RequestedLocalProperties rlp = new RequestedLocalProperties();
rlp.setGroupedFields(new FieldList(1, 2, 3));
toJoin1.setRequiredGlobalProps(rgp);
toJoin1.setRequiredLocalProps(rlp);
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toJoin1.setLocalStrategy(LocalStrategy.NONE);
FeedbackPropertiesMeetRequirementsReport report =
join.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// produced properties overridden before join
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties rgp = new RequestedGlobalProperties();
rgp.setHashPartitioned(new FieldList(0));
RequestedLocalProperties rlp = new RequestedLocalProperties();
rlp.setGroupedFields(new FieldList(2, 1));
toMap1.setRequiredGlobalProps(rgp);
toMap1.setRequiredLocalProps(rlp);
toJoin1.setRequiredGlobalProps(null);
toJoin1.setRequiredLocalProps(null);
toJoin1.setShipStrategy(
ShipStrategyType.PARTITION_HASH,
new FieldList(2, 1),
DataExchangeMode.PIPELINED);
toJoin1.setLocalStrategy(
LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
FeedbackPropertiesMeetRequirementsReport report =
join.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(MET, report);
}
// produced properties before join match, after join match as well
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties rgp = new RequestedGlobalProperties();
rgp.setHashPartitioned(new FieldList(0));
RequestedLocalProperties rlp = new RequestedLocalProperties();
rlp.setGroupedFields(new FieldList(2, 1));
toMap1.setRequiredGlobalProps(null);
toMap1.setRequiredLocalProps(null);
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toJoin1.setLocalStrategy(LocalStrategy.NONE);
toJoin1.setRequiredGlobalProps(rgp);
toJoin1.setRequiredLocalProps(rlp);
toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
toAfterJoin.setRequiredGlobalProps(rgp);
toAfterJoin.setRequiredLocalProps(rlp);
FeedbackPropertiesMeetRequirementsReport report =
join.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// produced properties before join match, after join do not match
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties rgp1 = new RequestedGlobalProperties();
rgp1.setHashPartitioned(new FieldList(0));
RequestedGlobalProperties rgp2 = new RequestedGlobalProperties();
rgp2.setHashPartitioned(new FieldList(3));
RequestedLocalProperties rlp1 = new RequestedLocalProperties();
rlp1.setGroupedFields(new FieldList(2, 1));
RequestedLocalProperties rlp2 = new RequestedLocalProperties();
rlp2.setGroupedFields(new FieldList(3, 4));
toJoin1.setRequiredGlobalProps(rgp1);
toJoin1.setRequiredLocalProps(rlp1);
toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
toAfterJoin.setRequiredGlobalProps(rgp2);
toAfterJoin.setRequiredLocalProps(rlp2);
FeedbackPropertiesMeetRequirementsReport report =
afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// produced properties are overridden, does not matter that they do not match
{
GlobalProperties gp = new GlobalProperties();
gp.setAnyPartitioning(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties rgp = new RequestedGlobalProperties();
rgp.setHashPartitioned(new FieldList(1));
RequestedLocalProperties rlp = new RequestedLocalProperties();
rlp.setGroupedFields(new FieldList(1, 2, 3));
toJoin1.setRequiredGlobalProps(null);
toJoin1.setRequiredLocalProps(null);
toJoin1.setShipStrategy(
ShipStrategyType.PARTITION_HASH,
new FieldList(2, 1),
DataExchangeMode.PIPELINED);
toJoin1.setLocalStrategy(
LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
toAfterJoin.setRequiredGlobalProps(rgp);
toAfterJoin.setRequiredLocalProps(rlp);
FeedbackPropertiesMeetRequirementsReport report =
afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(MET, report);
}
// local property overridden before join, local property mismatch after join not
// relevant
{
GlobalProperties gp = new GlobalProperties();
gp.setAnyPartitioning(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedLocalProperties rlp = new RequestedLocalProperties();
rlp.setGroupedFields(new FieldList(1, 2, 3));
toJoin1.setRequiredGlobalProps(null);
toJoin1.setRequiredLocalProps(null);
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toJoin1.setLocalStrategy(
LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
toAfterJoin.setRequiredGlobalProps(null);
toAfterJoin.setRequiredLocalProps(rlp);
FeedbackPropertiesMeetRequirementsReport report =
afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// local property overridden before join, global property mismatch after join void the
// match
{
GlobalProperties gp = new GlobalProperties();
gp.setAnyPartitioning(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties rgp = new RequestedGlobalProperties();
rgp.setHashPartitioned(new FieldList(1));
RequestedLocalProperties rlp = new RequestedLocalProperties();
rlp.setGroupedFields(new FieldList(1, 2, 3));
toJoin1.setRequiredGlobalProps(null);
toJoin1.setRequiredLocalProps(null);
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toJoin1.setLocalStrategy(
LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
toAfterJoin.setRequiredGlobalProps(rgp);
toAfterJoin.setRequiredLocalProps(rlp);
FeedbackPropertiesMeetRequirementsReport report =
afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testTwoOperatorsBothDependent() {
try {
SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
Channel toMap1 = new Channel(target);
toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap1.setLocalStrategy(LocalStrategy.NONE);
SingleInputPlanNode map1 =
new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
Channel toMap2 = new Channel(target);
toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toMap2.setLocalStrategy(LocalStrategy.NONE);
SingleInputPlanNode map2 =
new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
Channel toJoin1 = new Channel(map1);
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toJoin1.setLocalStrategy(LocalStrategy.NONE);
Channel toJoin2 = new Channel(map2);
toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toJoin2.setLocalStrategy(LocalStrategy.NONE);
DualInputPlanNode join =
new DualInputPlanNode(
getJoinNode(),
"Join",
toJoin1,
toJoin2,
DriverStrategy.HYBRIDHASH_BUILD_FIRST);
Channel toAfterJoin = new Channel(join);
toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
SingleInputPlanNode afterJoin =
new SingleInputPlanNode(
getMapNode(), "After Join Mapper", toAfterJoin, DriverStrategy.MAP);
// no properties from the partial solution, no required properties
{
GlobalProperties gp = new GlobalProperties();
LocalProperties lp = LocalProperties.EMPTY;
FeedbackPropertiesMeetRequirementsReport report =
afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// some properties from the partial solution, no required properties
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
FeedbackPropertiesMeetRequirementsReport report =
afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// test requirements on one input and met
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties rgp = new RequestedGlobalProperties();
rgp.setHashPartitioned(new FieldList(0));
RequestedLocalProperties rlp = new RequestedLocalProperties();
rlp.setGroupedFields(new FieldList(2, 1));
toJoin1.setRequiredGlobalProps(rgp);
toJoin1.setRequiredLocalProps(rlp);
FeedbackPropertiesMeetRequirementsReport report =
afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// test requirements on both input and met
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties rgp = new RequestedGlobalProperties();
rgp.setHashPartitioned(new FieldList(0));
RequestedLocalProperties rlp = new RequestedLocalProperties();
rlp.setGroupedFields(new FieldList(2, 1));
toJoin1.setRequiredGlobalProps(rgp);
toJoin1.setRequiredLocalProps(rlp);
toJoin2.setRequiredGlobalProps(rgp);
toJoin2.setRequiredLocalProps(rlp);
FeedbackPropertiesMeetRequirementsReport report =
afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
}
// test requirements on both inputs, one not met
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties rgp1 = new RequestedGlobalProperties();
rgp1.setHashPartitioned(new FieldList(0));
RequestedLocalProperties rlp1 = new RequestedLocalProperties();
rlp1.setGroupedFields(new FieldList(2, 1));
RequestedGlobalProperties rgp2 = new RequestedGlobalProperties();
rgp2.setHashPartitioned(new FieldList(1));
RequestedLocalProperties rlp2 = new RequestedLocalProperties();
rlp2.setGroupedFields(new FieldList(0, 3));
toJoin1.setRequiredGlobalProps(rgp1);
toJoin1.setRequiredLocalProps(rlp1);
toJoin2.setRequiredGlobalProps(rgp2);
toJoin2.setRequiredLocalProps(rlp2);
FeedbackPropertiesMeetRequirementsReport report =
afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// test override on both inputs, later requirement ignored
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties rgp = new RequestedGlobalProperties();
rgp.setHashPartitioned(new FieldList(1));
RequestedLocalProperties rlp = new RequestedLocalProperties();
rlp.setGroupedFields(new FieldList(0, 3));
toJoin1.setRequiredGlobalProps(null);
toJoin1.setRequiredLocalProps(null);
toJoin2.setRequiredGlobalProps(null);
toJoin2.setRequiredLocalProps(null);
toJoin1.setShipStrategy(
ShipStrategyType.PARTITION_HASH,
new FieldList(88),
DataExchangeMode.PIPELINED);
toJoin2.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED);
toAfterJoin.setRequiredGlobalProps(rgp);
toAfterJoin.setRequiredLocalProps(rlp);
FeedbackPropertiesMeetRequirementsReport report =
afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(MET, report);
}
// test override on one inputs, later requirement met
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties rgp = new RequestedGlobalProperties();
rgp.setHashPartitioned(new FieldList(0));
RequestedLocalProperties rlp = new RequestedLocalProperties();
rlp.setGroupedFields(new FieldList(2, 1));
toJoin1.setShipStrategy(
ShipStrategyType.PARTITION_HASH,
new FieldList(88),
DataExchangeMode.PIPELINED);
toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toAfterJoin.setRequiredGlobalProps(rgp);
toAfterJoin.setRequiredLocalProps(rlp);
FeedbackPropertiesMeetRequirementsReport report =
afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(PENDING, report);
}
// test override on one input, later requirement not met
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties rgp = new RequestedGlobalProperties();
rgp.setHashPartitioned(new FieldList(3));
RequestedLocalProperties rlp = new RequestedLocalProperties();
rlp.setGroupedFields(new FieldList(77, 69));
toJoin1.setShipStrategy(
ShipStrategyType.PARTITION_HASH,
new FieldList(88),
DataExchangeMode.PIPELINED);
toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toAfterJoin.setRequiredGlobalProps(rgp);
toAfterJoin.setRequiredLocalProps(rlp);
FeedbackPropertiesMeetRequirementsReport report =
afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
// test override on one input locally, later global requirement not met
{
GlobalProperties gp = new GlobalProperties();
gp.setHashPartitioned(new FieldList(0));
LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
RequestedGlobalProperties rgp = new RequestedGlobalProperties();
rgp.setHashPartitioned(new FieldList(3));
toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toJoin1.setLocalStrategy(
LocalStrategy.SORT, new FieldList(3), new boolean[] {false});
toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
toJoin1.setLocalStrategy(LocalStrategy.NONE);
toAfterJoin.setRequiredGlobalProps(rgp);
toAfterJoin.setRequiredLocalProps(null);
FeedbackPropertiesMeetRequirementsReport report =
afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
assertEquals(NOT_MET, report);
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
// --------------------------------------------------------------------------------------------
private static DataSourceNode getSourceNode() {
return new DataSourceNode(
new GenericDataSourceBase<String, TextInputFormat>(
new TextInputFormat(new Path("/")),
new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO)));
}
private static MapNode getMapNode() {
return new MapNode(
new MapOperatorBase<String, String, MapFunction<String, String>>(
new IdentityMapper<String>(),
new UnaryOperatorInformation<String, String>(
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
"map op"));
}
private static JoinNode getJoinNode() {
return new JoinNode(
new InnerJoinOperatorBase<
String, String, String, FlatJoinFunction<String, String, String>>(
new DummyFlatJoinFunction<String>(),
new BinaryOperatorInformation<String, String, String>(
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO),
new int[] {1},
new int[] {2},
"join op"));
}
}