blob: 8bbd92b0a7a75a3431a190532d902c76d49263ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.functions;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* This is a minimal test to verify that semantic annotations are evaluated against
* the type information properly translated correctly to the common data flow API.
*/
@SuppressWarnings("serial")
public class SemanticPropertiesTranslationTest {
@Test
public void testUnaryFunctionWildcardForwardedAnnotation() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3L, "test", 42));
input.map(new WildcardForwardedMapper<Tuple3<Long, String, Integer>>()).output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertNotNull(fw3);
assertTrue(fw1.contains(0));
assertTrue(fw2.contains(1));
assertTrue(fw3.contains(2));
}
@Test
public void testUnaryFunctionInPlaceForwardedAnnotation() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3L, "test", 42));
input.map(new IndividualForwardedMapper<Long, String, Integer>()).output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertTrue(fw1.contains(0));
assertTrue(fw2.contains(2));
}
@Test
public void testUnaryFunctionMovingForwardedAnnotation() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input.map(new ShufflingMapper<Long>()).output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 1);
FieldSet fw3 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertNotNull(fw3);
assertTrue(fw1.contains(2));
assertTrue(fw2.contains(0));
assertTrue(fw3.contains(1));
}
@Test
public void testUnaryFunctionForwardedInLine1() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input.map(new NoAnnotationMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2")
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertTrue(fw1.contains(1));
assertTrue(fw2.contains(2));
}
@Test
public void testUnaryFunctionForwardedInLine2() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2")
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertTrue(fw1.contains(1));
assertTrue(fw2.contains(2));
}
@Test
public void testUnaryFunctionForwardedInLine3() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2")
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertTrue(fw1.contains(1));
assertTrue(fw2.contains(2));
}
@Test
public void testUnaryFunctionAllForwardedExceptAnnotation() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input.map(new AllForwardedExceptMapper<Tuple3<Long, Long, Long>>()).output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet fw1 = semantics.getForwardingTargetFields(0, 0);
FieldSet fw2 = semantics.getForwardingTargetFields(0, 2);
assertNotNull(fw1);
assertNotNull(fw2);
assertTrue(fw1.contains(0));
assertTrue(fw2.contains(2));
}
@Test
public void testUnaryFunctionReadFieldsAnnotation() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput();
SingleInputSemanticProperties semantics = mapper.getSemanticProperties();
FieldSet read = semantics.getReadFields(0);
assertNotNull(read);
assertEquals(2, read.size());
assertTrue(read.contains(0));
assertTrue(read.contains(2));
}
@Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
public void testUnaryForwardedOverwritingInLine1() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input.map(new WildcardForwardedMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2");
}
@Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
public void testUnaryForwardedOverwritingInLine2() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input.map(new AllForwardedExceptMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2");
}
@Test
public void testBinaryForwardedAnnotation() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, String>> input1 = env.fromElements(new Tuple2<Long, String>(3L, "test"));
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Double>> input2 = env.fromElements(new Tuple2<Long, Double>(3L, 3.1415));
input1.join(input2).where(0).equalTo(0).with(new ForwardedBothAnnotationJoin<Long, String, Long, Double>())
.output(new DiscardingOutputFormat<Tuple2<String, Double>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
DualInputSemanticProperties semantics = join.getSemanticProperties();
assertNotNull(semantics.getForwardingTargetFields(0, 0));
assertNotNull(semantics.getForwardingTargetFields(1, 0));
assertEquals(1, semantics.getForwardingTargetFields(0, 1).size());
assertEquals(1, semantics.getForwardingTargetFields(1, 1).size());
assertTrue(semantics.getForwardingTargetFields(0, 1).contains(0));
assertTrue(semantics.getForwardingTargetFields(1, 1).contains(1));
assertEquals(0, semantics.getForwardingTargetFields(0, 0).size());
assertEquals(0, semantics.getForwardingTargetFields(1, 0).size());
}
@Test
public void testBinaryForwardedInLine1() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3L, 4L));
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3L, 2L));
input1.join(input2).where(0).equalTo(0).with(new NoAnnotationJoin<Long>())
.withForwardedFieldsFirst("0->1; 1->2").withForwardedFieldsSecond("1->0")
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
DualInputSemanticProperties semantics = join.getSemanticProperties();
assertNotNull(semantics.getForwardingTargetFields(1, 0));
assertEquals(1, semantics.getForwardingTargetFields(0, 0).size());
assertEquals(1, semantics.getForwardingTargetFields(0, 1).size());
assertEquals(1, semantics.getForwardingTargetFields(1, 1).size());
assertTrue(semantics.getForwardingTargetFields(0, 0).contains(1));
assertTrue(semantics.getForwardingTargetFields(0, 1).contains(2));
assertTrue(semantics.getForwardingTargetFields(1, 1).contains(0));
assertEquals(0, semantics.getForwardingTargetFields(1, 0).size());
}
@Test
public void testBinaryForwardedInLine2() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3L, 4L));
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3L, 2L));
input1.join(input2).where(0).equalTo(0).with(new ReadSetJoin<Long>())
.withForwardedFieldsFirst("0->1; 1->2").withForwardedFieldsSecond("1->0")
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
DualInputSemanticProperties semantics = join.getSemanticProperties();
assertNotNull(semantics.getForwardingTargetFields(1, 0));
assertEquals(1, semantics.getForwardingTargetFields(0, 0).size());
assertEquals(1, semantics.getForwardingTargetFields(0, 1).size());
assertEquals(1, semantics.getForwardingTargetFields(1, 1).size());
assertTrue(semantics.getForwardingTargetFields(0, 0).contains(1));
assertTrue(semantics.getForwardingTargetFields(0, 1).contains(2));
assertTrue(semantics.getForwardingTargetFields(1, 1).contains(0));
assertNotNull(semantics.getReadFields(0));
assertNotNull(semantics.getReadFields(1));
assertEquals(1, semantics.getReadFields(0).size());
assertEquals(1, semantics.getReadFields(1).size());
assertTrue(semantics.getReadFields(0).contains(1));
assertTrue(semantics.getReadFields(1).contains(0));
assertEquals(0, semantics.getForwardingTargetFields(1, 0).size());
}
@Test
public void testBinaryForwardedAnnotationInLineMixed1() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3L, 4L));
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3L, 2L));
input1.join(input2).where(0).equalTo(0).with(new ForwardedFirstAnnotationJoin<Long>())
.withForwardedFieldsSecond("1")
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
DualInputSemanticProperties semantics = join.getSemanticProperties();
assertNotNull(semantics.getForwardingTargetFields(0, 1));
assertNotNull(semantics.getForwardingTargetFields(1, 0));
assertNotNull(semantics.getForwardingTargetFields(0, 0));
assertNotNull(semantics.getForwardingTargetFields(1, 1));
assertEquals(1, semantics.getForwardingTargetFields(0, 0).size());
assertEquals(1, semantics.getForwardingTargetFields(1, 1).size());
assertTrue(semantics.getForwardingTargetFields(0, 0).contains(2));
assertTrue(semantics.getForwardingTargetFields(1, 1).contains(1));
assertEquals(0, semantics.getForwardingTargetFields(0, 1).size());
assertEquals(0, semantics.getForwardingTargetFields(1, 0).size());
}
@Test
public void testBinaryForwardedAnnotationInLineMixed2() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3L, 4L));
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3L, 2L));
input1.join(input2).where(0).equalTo(0).with(new ForwardedSecondAnnotationJoin<Long>())
.withForwardedFieldsFirst("0->1")
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
DualInputSemanticProperties semantics = join.getSemanticProperties();
assertNotNull(semantics.getForwardingTargetFields(0, 1));
assertNotNull(semantics.getForwardingTargetFields(1, 0));
assertNotNull(semantics.getForwardingTargetFields(0, 0));
assertNotNull(semantics.getForwardingTargetFields(1, 1));
assertEquals(1, semantics.getForwardingTargetFields(0, 0).size());
assertEquals(1, semantics.getForwardingTargetFields(1, 1).size());
assertTrue(semantics.getForwardingTargetFields(0, 0).contains(1));
assertTrue(semantics.getForwardingTargetFields(1, 1).contains(2));
assertEquals(0, semantics.getForwardingTargetFields(0, 1).size());
assertEquals(0, semantics.getForwardingTargetFields(1, 0).size());
}
@Test
public void testBinaryAllForwardedExceptAnnotation() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input1 = env.fromElements(new Tuple3<Long, Long, Long>(3L, 4L, 5L));
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input1.join(input2).where(0).equalTo(0).with(new AllForwardedExceptJoin<Long>())
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
DualInputSemanticProperties semantics = join.getSemanticProperties();
assertNotNull(semantics.getForwardingTargetFields(0, 0));
assertNotNull(semantics.getForwardingTargetFields(0, 2));
assertNotNull(semantics.getForwardingTargetFields(1, 0));
assertNotNull(semantics.getForwardingTargetFields(1, 1));
assertEquals(1, semantics.getForwardingTargetFields(0, 1).size());
assertEquals(1, semantics.getForwardingTargetFields(1, 2).size());
assertTrue(semantics.getForwardingTargetFields(0, 1).contains(1));
assertTrue(semantics.getForwardingTargetFields(1, 2).contains(2));
assertEquals(0, semantics.getForwardingTargetFields(0, 0).size());
assertEquals(0, semantics.getForwardingTargetFields(0, 2).size());
assertEquals(0, semantics.getForwardingTargetFields(1, 0).size());
assertEquals(0, semantics.getForwardingTargetFields(1, 1).size());
}
@Test
public void testBinaryReadFieldsAnnotation() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3L, 4L));
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3L, 2L));
input1.join(input2).where(0).equalTo(0).with(new ReadSetJoin<Long>())
.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
DualInputSemanticProperties semantics = join.getSemanticProperties();
assertNotNull(semantics.getReadFields(0));
assertNotNull(semantics.getReadFields(1));
assertEquals(1, semantics.getReadFields(0).size());
assertEquals(1, semantics.getReadFields(1).size());
assertTrue(semantics.getReadFields(0).contains(1));
assertTrue(semantics.getReadFields(1).contains(0));
}
@Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
public void testBinaryForwardedOverwritingInLine1() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3L, 4L));
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3L, 2L));
input1.join(input2).where(0).equalTo(0).with(new ForwardedFirstAnnotationJoin<Long>())
.withForwardedFieldsFirst("0->1");
}
@Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
public void testBinaryForwardedOverwritingInLine2() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3L, 4L));
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3L, 2L));
input1.join(input2).where(0).equalTo(0).with(new ForwardedSecondAnnotationJoin<Long>())
.withForwardedFieldsSecond("0->1");
}
@Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
public void testBinaryForwardedOverwritingInLine3() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3L, 4L));
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3L, 2L));
input1.join(input2).where(0).equalTo(0).with(new ForwardedBothAnnotationJoin<Long, Long, Long, Long>())
.withForwardedFieldsFirst("0->1;");
}
@Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
public void testBinaryForwardedOverwritingInLine4() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3L, 4L));
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3L, 2L));
input1.join(input2).where(0).equalTo(0).with(new ForwardedBothAnnotationJoin<Long, Long, Long, Long>())
.withForwardedFieldsSecond("0->1;");
}
@Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
public void testBinaryForwardedOverwritingInLine5() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input1 = env.fromElements(new Tuple3<Long, Long, Long>(3L, 4L, 5L));
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input1.join(input2).where(0).equalTo(0).with(new AllForwardedExceptJoin<Long>())
.withForwardedFieldsFirst("0->1;");
}
@Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class)
public void testBinaryForwardedOverwritingInLine6() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input1 = env.fromElements(new Tuple3<Long, Long, Long>(3L, 4L, 5L));
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(3L, 2L, 1L));
input1.join(input2).where(0).equalTo(0).with(new AllForwardedExceptJoin<Long>())
.withForwardedFieldsSecond("0->1;");
}
// --------------------------------------------------------------------------------------------
private static class NoAnnotationMapper<T> implements MapFunction<T, T> {
@Override
public T map(T value) {
return value;
}
}
@ForwardedFields("*")
private static class WildcardForwardedMapper<T> implements MapFunction<T, T> {
@Override
public T map(T value) {
return value;
}
}
@ForwardedFields("0;2")
private static class IndividualForwardedMapper<X, Y, Z> implements MapFunction<Tuple3<X, Y, Z>, Tuple3<X, Y, Z>> {
@Override
public Tuple3<X, Y, Z> map(Tuple3<X, Y, Z> value) {
return value;
}
}
@ForwardedFields("0->2;1->0;2->1")
private static class ShufflingMapper<X> implements MapFunction<Tuple3<X, X, X>, Tuple3<X, X, X>> {
@Override
public Tuple3<X, X, X> map(Tuple3<X, X, X> value) {
return value;
}
}
@FunctionAnnotation.NonForwardedFields({"1"})
private static class AllForwardedExceptMapper<T> implements MapFunction<T, T> {
@Override
public T map(T value) {
return value;
}
}
@FunctionAnnotation.ReadFields({"0;2"})
private static class ReadSetMapper<T> implements MapFunction<T, T> {
@Override
public T map(T value) {
return value;
}
}
private static class NoAnnotationJoin<X> implements JoinFunction<Tuple2<X, X>, Tuple2<X, X>, Tuple3<X, X, X>> {
@Override
public Tuple3<X, X, X> join(Tuple2<X, X> first, Tuple2<X, X> second) throws Exception {
return null;
}
}
@ForwardedFieldsFirst("0->2")
private static class ForwardedFirstAnnotationJoin<X> implements JoinFunction<Tuple2<X, X>, Tuple2<X, X>, Tuple3<X, X, X>> {
@Override
public Tuple3<X, X, X> join(Tuple2<X, X> first, Tuple2<X, X> second) throws Exception {
return null;
}
}
@ForwardedFieldsSecond("1->2")
private static class ForwardedSecondAnnotationJoin<X> implements JoinFunction<Tuple2<X, X>, Tuple2<X, X>, Tuple3<X, X, X>> {
@Override
public Tuple3<X, X, X> join(Tuple2<X, X> first, Tuple2<X, X> second) throws Exception {
return null;
}
}
@ForwardedFieldsFirst("1 -> 0")
@ForwardedFieldsSecond("1 -> 1")
private static class ForwardedBothAnnotationJoin<A, B, C, D> implements JoinFunction<Tuple2<A, B>, Tuple2<C, D>, Tuple2<B, D>> {
@Override
public Tuple2<B, D> join(Tuple2<A, B> first, Tuple2<C, D> second) {
return new Tuple2<B, D>(first.f1, second.f1);
}
}
@FunctionAnnotation.NonForwardedFieldsFirst("0;2")
@FunctionAnnotation.NonForwardedFieldsSecond("0;1")
private static class AllForwardedExceptJoin<X> implements JoinFunction<Tuple3<X, X, X>, Tuple3<X, X, X>, Tuple3<X, X, X>> {
@Override
public Tuple3<X, X, X> join(Tuple3<X, X, X> first, Tuple3<X, X, X> second) throws Exception {
return null;
}
}
@FunctionAnnotation.ReadFieldsFirst("1")
@FunctionAnnotation.ReadFieldsSecond("0")
private static class ReadSetJoin<X> implements JoinFunction<Tuple2<X, X>, Tuple2<X, X>, Tuple3<X, X, X>> {
@Override
public Tuple3<X, X, X> join(Tuple2<X, X> first, Tuple2<X, X> second) throws Exception {
return null;
}
}
}