blob: d848186463016e875384a136eb15d3d85d6b619f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.dataproperties;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.java.tuple.Tuple8;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.types.ByteValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.junit.Test;
import static org.junit.Assert.*;
public class RequestedGlobalPropertiesFilteringTest {
private TupleTypeInfo<
Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>
tupleInfo =
new TupleTypeInfo<
Tuple8<
Integer,
Integer,
Integer,
Integer,
Integer,
Integer,
Integer,
Integer>>(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO);
@Test(expected = NullPointerException.class)
public void testNullProps() {
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setAnyPartitioning(new FieldSet(0, 1, 2));
rgProps.filterBySemanticProperties(null, 0);
}
@Test
public void testEraseAll1() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setAnyPartitioning(new FieldSet(0, 1, 2));
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNull(filtered);
}
@Test
public void testEraseAll2() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sProp, new String[] {"3;4"}, null, null, tupleInfo, tupleInfo);
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setAnyPartitioning(new FieldSet(0, 1, 2));
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNull(filtered);
}
@Test
public void testHashPartitioningPreserved1() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sProp, new String[] {"0;3;4"}, null, null, tupleInfo, tupleInfo);
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNotNull(filtered);
assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered.getPartitioning());
assertNotNull(filtered.getPartitionedFields());
assertEquals(3, filtered.getPartitionedFields().size());
assertTrue(filtered.getPartitionedFields().contains(0));
assertTrue(filtered.getPartitionedFields().contains(3));
assertTrue(filtered.getPartitionedFields().contains(4));
assertNull(filtered.getDataDistribution());
assertNull(filtered.getCustomPartitioner());
assertNull(filtered.getOrdering());
}
@Test
public void testHashPartitioningPreserved2() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sProp, new String[] {"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo);
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNotNull(filtered);
assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered.getPartitioning());
assertNotNull(filtered.getPartitionedFields());
assertEquals(3, filtered.getPartitionedFields().size());
assertTrue(filtered.getPartitionedFields().contains(1));
assertTrue(filtered.getPartitionedFields().contains(2));
assertTrue(filtered.getPartitionedFields().contains(7));
assertNull(filtered.getDataDistribution());
assertNull(filtered.getCustomPartitioner());
assertNull(filtered.getOrdering());
}
@Test
public void testHashPartitioningErased() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sProp, new String[] {"1;2"}, null, null, tupleInfo, tupleInfo);
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNull(filtered);
}
@Test
public void testAnyPartitioningPreserved1() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sProp, new String[] {"0;3;4"}, null, null, tupleInfo, tupleInfo);
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNotNull(filtered);
assertEquals(PartitioningProperty.ANY_PARTITIONING, filtered.getPartitioning());
assertNotNull(filtered.getPartitionedFields());
assertEquals(3, filtered.getPartitionedFields().size());
assertTrue(filtered.getPartitionedFields().contains(0));
assertTrue(filtered.getPartitionedFields().contains(3));
assertTrue(filtered.getPartitionedFields().contains(4));
assertNull(filtered.getDataDistribution());
assertNull(filtered.getCustomPartitioner());
assertNull(filtered.getOrdering());
}
@Test
public void testAnyPartitioningPreserved2() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sProp, new String[] {"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo);
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNotNull(filtered);
assertEquals(PartitioningProperty.ANY_PARTITIONING, filtered.getPartitioning());
assertNotNull(filtered.getPartitionedFields());
assertEquals(3, filtered.getPartitionedFields().size());
assertTrue(filtered.getPartitionedFields().contains(1));
assertTrue(filtered.getPartitionedFields().contains(2));
assertTrue(filtered.getPartitionedFields().contains(7));
assertNull(filtered.getDataDistribution());
assertNull(filtered.getCustomPartitioner());
assertNull(filtered.getOrdering());
}
@Test
public void testAnyPartitioningErased() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sProp, new String[] {"1;2"}, null, null, tupleInfo, tupleInfo);
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNull(filtered);
}
@Test
public void testRangePartitioningPreserved1() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sProp, new String[] {"1;3;6"}, null, null, tupleInfo, tupleInfo);
Ordering o = new Ordering();
o.appendOrdering(3, LongValue.class, Order.DESCENDING);
o.appendOrdering(1, IntValue.class, Order.ASCENDING);
o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setRangePartitioned(o);
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNotNull(filtered);
assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning());
assertNotNull(filtered.getOrdering());
assertEquals(3, filtered.getOrdering().getNumberOfFields());
assertEquals(3, filtered.getOrdering().getFieldNumber(0).intValue());
assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue());
assertEquals(6, filtered.getOrdering().getFieldNumber(2).intValue());
assertEquals(LongValue.class, filtered.getOrdering().getType(0));
assertEquals(IntValue.class, filtered.getOrdering().getType(1));
assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
assertNull(filtered.getPartitionedFields());
assertNull(filtered.getDataDistribution());
assertNull(filtered.getCustomPartitioner());
}
@Test
public void testRangePartitioningPreserved2() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sProp, new String[] {"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo);
Ordering o = new Ordering();
o.appendOrdering(3, LongValue.class, Order.DESCENDING);
o.appendOrdering(1, IntValue.class, Order.ASCENDING);
o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setRangePartitioned(o);
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNotNull(filtered);
assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning());
assertNotNull(filtered.getOrdering());
assertEquals(3, filtered.getOrdering().getNumberOfFields());
assertEquals(7, filtered.getOrdering().getFieldNumber(0).intValue());
assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue());
assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue());
assertEquals(LongValue.class, filtered.getOrdering().getType(0));
assertEquals(IntValue.class, filtered.getOrdering().getType(1));
assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
assertNull(filtered.getPartitionedFields());
assertNull(filtered.getDataDistribution());
assertNull(filtered.getCustomPartitioner());
}
@Test
public void testRangePartitioningPreserved3() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sProp, new String[] {"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo);
DataDistribution dd = new MockDistribution();
Ordering o = new Ordering();
o.appendOrdering(3, LongValue.class, Order.DESCENDING);
o.appendOrdering(1, IntValue.class, Order.ASCENDING);
o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setRangePartitioned(o, dd);
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNotNull(filtered);
assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning());
assertNotNull(filtered.getOrdering());
assertEquals(3, filtered.getOrdering().getNumberOfFields());
assertEquals(7, filtered.getOrdering().getFieldNumber(0).intValue());
assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue());
assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue());
assertEquals(LongValue.class, filtered.getOrdering().getType(0));
assertEquals(IntValue.class, filtered.getOrdering().getType(1));
assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
assertNotNull(filtered.getDataDistribution());
assertEquals(dd, filtered.getDataDistribution());
assertNull(filtered.getPartitionedFields());
assertNull(filtered.getCustomPartitioner());
}
@Test
public void testRangePartitioningErased() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sProp, new String[] {"1;2"}, null, null, tupleInfo, tupleInfo);
Ordering o = new Ordering();
o.appendOrdering(3, LongValue.class, Order.DESCENDING);
o.appendOrdering(1, IntValue.class, Order.ASCENDING);
o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setRangePartitioned(o);
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNull(filtered);
}
@Test
public void testCustomPartitioningErased() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sProp, new String[] {"0;1;2"}, null, null, tupleInfo, tupleInfo);
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setCustomPartitioned(new FieldSet(0, 1, 2), new MockPartitioner());
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNull(filtered);
}
@Test
public void testRandomDistributionErased() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sProp, new String[] {"0;1;2"}, null, null, tupleInfo, tupleInfo);
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setRandomPartitioning();
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNull(filtered);
}
@Test
public void testReplicationErased() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sProp, new String[] {"0;1;2"}, null, null, tupleInfo, tupleInfo);
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setFullyReplicated();
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNull(filtered);
}
@Test
public void testRebalancingErased() {
SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sProp, new String[] {"0;1;2"}, null, null, tupleInfo, tupleInfo);
RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
rgProps.setForceRebalancing();
RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
assertNull(filtered);
}
@Test
public void testDualHashPartitioningPreserved() {
DualInputSemanticProperties dprops = new DualInputSemanticProperties();
SemanticPropUtil.getSemanticPropsDualFromString(
dprops,
new String[] {"0;2;4"},
new String[] {"1->3;4->6;3->7"},
null,
null,
null,
null,
tupleInfo,
tupleInfo,
tupleInfo);
RequestedGlobalProperties gprops1 = new RequestedGlobalProperties();
RequestedGlobalProperties gprops2 = new RequestedGlobalProperties();
gprops1.setHashPartitioned(new FieldSet(2, 0, 4));
gprops2.setHashPartitioned(new FieldSet(3, 6, 7));
RequestedGlobalProperties filtered1 = gprops1.filterBySemanticProperties(dprops, 0);
RequestedGlobalProperties filtered2 = gprops2.filterBySemanticProperties(dprops, 1);
assertNotNull(filtered1);
assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered1.getPartitioning());
assertNotNull(filtered1.getPartitionedFields());
assertEquals(3, filtered1.getPartitionedFields().size());
assertTrue(filtered1.getPartitionedFields().contains(0));
assertTrue(filtered1.getPartitionedFields().contains(2));
assertTrue(filtered1.getPartitionedFields().contains(4));
assertNull(filtered1.getOrdering());
assertNull(filtered1.getCustomPartitioner());
assertNull(filtered1.getDataDistribution());
assertNotNull(filtered2);
assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered2.getPartitioning());
assertNotNull(filtered2.getPartitionedFields());
assertEquals(3, filtered2.getPartitionedFields().size());
assertTrue(filtered2.getPartitionedFields().contains(1));
assertTrue(filtered2.getPartitionedFields().contains(3));
assertTrue(filtered2.getPartitionedFields().contains(4));
assertNull(filtered2.getOrdering());
assertNull(filtered2.getCustomPartitioner());
assertNull(filtered2.getDataDistribution());
}
@Test(expected = IndexOutOfBoundsException.class)
public void testInvalidInputIndex() {
SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
SemanticPropUtil.getSemanticPropsSingleFromString(
sprops, new String[] {"0;1"}, null, null, tupleInfo, tupleInfo);
RequestedGlobalProperties gprops = new RequestedGlobalProperties();
gprops.setHashPartitioned(new FieldSet(0, 1));
gprops.filterBySemanticProperties(sprops, 1);
}
}