blob: 5b42d177c5d972bd598a7cbe33543c07feb48d4c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.window.accumulation;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
import org.apache.apex.malhar.lib.util.KeyValPair;
import com.google.common.collect.Multimap;
/**
* Test for POJO outer join accumulations
*/
public class PojoOuterJoinTest
{
public static class TestPojo1
{
private int uId;
private String uName;
public TestPojo1()
{
}
public TestPojo1(int id, String name)
{
this.uId = id;
this.uName = name;
}
public int getUId()
{
return uId;
}
public void setUId(int uId)
{
this.uId = uId;
}
public String getUName()
{
return uName;
}
public void setUName(String uName)
{
this.uName = uName;
}
}
public static class TestPojo3
{
private int uId;
private String uNickName;
private int age;
public TestPojo3()
{
}
public TestPojo3(int id, String name, int age)
{
this.uId = id;
this.uNickName = name;
this.age = age;
}
public int getUId()
{
return uId;
}
public void setUId(int uId)
{
this.uId = uId;
}
public String getUNickName()
{
return uNickName;
}
public void setUNickName(String uNickName)
{
this.uNickName = uNickName;
}
public int getAge()
{
return age;
}
public void setAge(int age)
{
this.age = age;
}
}
public static class TestOutClass
{
private int uId;
private String uName;
private String uNickName;
private int age;
public int getUId()
{
return uId;
}
public void setUId(int uId)
{
this.uId = uId;
}
public String getUName()
{
return uName;
}
public void setUName(String uName)
{
this.uName = uName;
}
public String getUNickName()
{
return uNickName;
}
public void setUNickName(String uNickName)
{
this.uNickName = uNickName;
}
public int getAge()
{
return age;
}
public void setAge(int age)
{
this.age = age;
}
}
public static class TestOutMultipleKeysClass
{
private int uId;
private String uName;
private int age;
public int getUId()
{
return uId;
}
public void setUId(int uId)
{
this.uId = uId;
}
public String getUName()
{
return uName;
}
public void setUName(String uName)
{
this.uName = uName;
}
public int getAge()
{
return age;
}
public void setAge(int age)
{
this.age = age;
}
}
@Test
public void PojoLeftOuterJoinTest()
{
String[] leftKeys = {"uId"};
String[] rightKeys = {"uId"};
PojoLeftOuterJoin<TestPojo1, TestPojo3> pij = new PojoLeftOuterJoin<>(TestOutClass.class, leftKeys, rightKeys);
List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
Assert.assertEquals(2, accu.size());
accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12));
accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13));
List result = pij.getOutput(accu);
Assert.assertEquals(2, result.size());
Object o = result.get(0);
Assert.assertTrue(o instanceof TestOutClass);
TestOutClass testOutClass = (TestOutClass)o;
int uId = testOutClass.getUId();
if (uId == 1) {
checkNameAge("Josh",12,testOutClass);
o = result.get(1);
Assert.assertTrue(o instanceof TestOutClass);
testOutClass = (TestOutClass)o;
uId = testOutClass.getUId();
Assert.assertEquals(2, testOutClass.getUId());
checkNameAge("Bob",0,testOutClass);
} else if (uId == 2) {
checkNameAge("Bob",0,testOutClass);
o = result.get(1);
Assert.assertTrue(o instanceof TestOutClass);
testOutClass = (TestOutClass)o;
uId = testOutClass.getUId();
Assert.assertEquals(1, testOutClass.getUId());
checkNameAge("Josh",12,testOutClass);
}
}
public void checkNameAge(String name, int age, TestOutClass testOutClass)
{
Assert.assertEquals(name, testOutClass.getUName());
Assert.assertEquals(age, testOutClass.getAge());
}
@Test
public void PojoRightOuterJoinTest()
{
String[] leftKeys = {"uId"};
String[] rightKeys = {"uId"};
PojoRightOuterJoin<TestPojo1, TestPojo3> pij = new PojoRightOuterJoin<>(TestOutClass.class, leftKeys, rightKeys);
List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
Assert.assertEquals(2, accu.size());
accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12));
accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13));
List result = pij.getOutput(accu);
Assert.assertEquals(2, result.size());
Object o = result.get(0);
Assert.assertTrue(o instanceof TestOutClass);
TestOutClass testOutClass = (TestOutClass)o;
int uId = testOutClass.getUId();
if (uId == 1) {
checkNameAge("Josh",12,testOutClass);
o = result.get(1);
Assert.assertTrue(o instanceof TestOutClass);
testOutClass = (TestOutClass)o;
uId = testOutClass.getUId();
Assert.assertEquals(3, testOutClass.getUId());
checkNameAge(null,13,testOutClass);
} else if (uId == 3) {
checkNameAge(null,13,testOutClass);
o = result.get(1);
Assert.assertTrue(o instanceof TestOutClass);
testOutClass = (TestOutClass)o;
uId = testOutClass.getUId();
Assert.assertEquals(1, testOutClass.getUId());
checkNameAge("Josh",12,testOutClass);
}
}
@Test
public void PojoFullOuterJoinTest()
{
String[] leftKeys = {"uId"};
String[] rightKeys = {"uId"};
PojoFullOuterJoin<TestPojo1, TestPojo3> pij = new PojoFullOuterJoin<>(TestOutClass.class, leftKeys, rightKeys);
List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
Assert.assertEquals(2, accu.size());
accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
accu = pij.accumulate2(accu, new TestPojo3(1, "NickJosh", 12));
accu = pij.accumulate2(accu, new TestPojo3(3, "NickBob", 13));
Assert.assertEquals(3, pij.getOutput(accu).size());
Set<Integer> checkMap = new HashSet<>();
for ( int i = 0; i < 3; i++ ) {
Object o = pij.getOutput(accu).get(i);
Assert.assertTrue(o instanceof TestOutClass);
TestOutClass testOutClass = (TestOutClass)o;
int uId = testOutClass.getUId();
checkMap.add(uId);
}
Assert.assertEquals(3,checkMap.size());
}
@Test
public void PojoLeftOuterJoinTestWithMap()
{
String[] leftKeys = {"uId", "uName"};
String[] rightKeys = {"uId", "uNickName"};
Map<String,KeyValPair<AbstractPojoJoin.STREAM, String>> outputInputMap = new HashMap<>();
outputInputMap.put("uId",new KeyValPair<>(AbstractPojoJoin.STREAM.LEFT,"uId"));
outputInputMap.put("age",new KeyValPair<>(AbstractPojoJoin.STREAM.RIGHT,"age"));
PojoLeftOuterJoin<TestPojo1, TestPojo3> pij = new PojoLeftOuterJoin<>(TestOutClass.class, leftKeys, rightKeys, outputInputMap);
List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
Assert.assertEquals(2, accu.size());
accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12));
accu = pij.accumulate2(accu, new TestPojo3(3, "ECE", 13));
List result = pij.getOutput(accu);
Assert.assertEquals(2, result.size());
Object o = result.get(0);
Assert.assertTrue(o instanceof TestOutClass);
TestOutClass testOutClass = (TestOutClass)o;
int uId = testOutClass.getUId();
if (uId == 1) {
checkNameAge(null,12,testOutClass);
o = result.get(1);
Assert.assertTrue(o instanceof TestOutClass);
testOutClass = (TestOutClass)o;
uId = testOutClass.getUId();
Assert.assertEquals(2, uId);
checkNameAge(null,0,testOutClass);
} else if (uId == 2) {
checkNameAge(null,0,testOutClass);
o = result.get(1);
Assert.assertTrue(o instanceof TestOutClass);
testOutClass = (TestOutClass)o;
uId = testOutClass.getUId();
Assert.assertEquals(1, uId);
checkNameAge(null,12,testOutClass);
}
}
@Test
public void PojoRightOuterJoinTestWithMap()
{
String[] leftKeys = {"uId", "uName"};
String[] rightKeys = {"uId", "uNickName"};
Map<String,KeyValPair<AbstractPojoJoin.STREAM, String>> outputInputMap = new HashMap<>();
outputInputMap.put("uId",new KeyValPair<>(AbstractPojoJoin.STREAM.LEFT,"uId"));
outputInputMap.put("age",new KeyValPair<>(AbstractPojoJoin.STREAM.RIGHT,"age"));
PojoRightOuterJoin<TestPojo1, TestPojo3> pij = new PojoRightOuterJoin<>(TestOutClass.class, leftKeys, rightKeys, outputInputMap);
List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
Assert.assertEquals(2, accu.size());
accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12));
accu = pij.accumulate2(accu, new TestPojo3(3, "Bob", 13));
List result = pij.getOutput(accu);
Assert.assertEquals(2, result.size());
Object o = result.get(0);
Assert.assertTrue(o instanceof TestOutClass);
TestOutClass testOutClass = (TestOutClass)o;
int uId = testOutClass.getUId();
if (uId == 1) {
checkNameAge(null,12,testOutClass);
o = result.get(1);
Assert.assertTrue(o instanceof TestOutClass);
testOutClass = (TestOutClass)o;
uId = testOutClass.getUId();
Assert.assertEquals(0, uId);
checkNameAge(null,13,testOutClass);
} else if (uId == 0) {
checkNameAge(null,13,testOutClass);
o = result.get(1);
Assert.assertTrue(o instanceof TestOutClass);
testOutClass = (TestOutClass)o;
uId = testOutClass.getUId();
Assert.assertEquals(1, uId);
checkNameAge(null,12,testOutClass);
}
}
@Test
public void PojoFullOuterJoinTestWithMap()
{
String[] leftKeys = {"uId", "uName"};
String[] rightKeys = {"uId", "uNickName"};
Map<String,KeyValPair<AbstractPojoJoin.STREAM, String>> outputInputMap = new HashMap<>();
outputInputMap.put("uId",new KeyValPair<>(AbstractPojoJoin.STREAM.LEFT,"uId"));
outputInputMap.put("age",new KeyValPair<>(AbstractPojoJoin.STREAM.RIGHT,"age"));
PojoFullOuterJoin<TestPojo1, TestPojo3> pij = new PojoFullOuterJoin<>(TestOutClass.class, leftKeys, rightKeys, outputInputMap);
List<Multimap<List<Object>, Object>> accu = pij.defaultAccumulatedValue();
Assert.assertEquals(2, accu.size());
accu = pij.accumulate(accu, new TestPojo1(1, "Josh"));
accu = pij.accumulate(accu, new TestPojo1(2, "Bob"));
accu = pij.accumulate2(accu, new TestPojo3(1, "Josh", 12));
accu = pij.accumulate2(accu, new TestPojo3(3, "Bob", 13));
Assert.assertEquals(3, pij.getOutput(accu).size());
Set<Integer> checkMap = new HashSet<>();
for ( int i = 0; i < 3; i++ ) {
Object o = pij.getOutput(accu).get(i);
Assert.assertTrue(o instanceof TestOutClass);
TestOutClass testOutClass = (TestOutClass)o;
int uId = testOutClass.getUId();
checkMap.add(uId);
}
Assert.assertEquals(3,checkMap.size());
}
}