blob: 8030d9783ba68c0797409a0edb8dacde87519cba [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.multiwindow;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.apache.commons.lang.ObjectUtils.Null;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.datatorrent.lib.testbench.CollectorTestSink;
/**
* A unit test to test SortedMovingWindow operator can either:
* 1. sort simple comparable tuples
* 2. sort tuples by given order (Comparator)
* 3. group tuples into different category and sort the category by a given order
*
*/
public class SortedMovingWindowTest
{
/**
* Test sorting simple comparable tuples within the sliding window
*/
@Test
public void testSortingSimpleNumberTuple()
{
SortedMovingWindow<Integer, Null> smw = new SortedMovingWindow<Integer, Null>();
CollectorTestSink<Object> testSink = new CollectorTestSink<Object>();
smw.outputPort.setSink(testSink);
smw.setup(null);
smw.setWindowSize(2);
// The incoming 6 integer tuples are disordered among 4 windows
emitObjects(smw, new Integer[][]{{1,3}, {2,5}, {4}, {6}});
smw.beginWindow(4);
smw.endWindow();
smw.beginWindow(5);
smw.endWindow();
// The outcome is sorted
Assert.assertEquals(Lists.newArrayList(1, 2, 3, 4, 5, 6), testSink.collectedTuples);
}
/**
* Given sorting key, sorting function, test sorting the map tuples within the sliding window
*/
@Test
public void testSortingMapTupleWithoutKey()
{
SortedMovingWindow<Map<String, Integer>, Null> smw = new SortedMovingWindow<Map<String, Integer>, Null>();
final String[] keys = {"number"};
smw.setComparator(new Comparator<Map<String, Integer>>()
{
@Override
public int compare(Map<String, Integer> o1, Map<String, Integer> o2)
{
// order the map by the value of key "number"
return o1.get(keys[0]) - o2.get(keys[0]);
}
});
CollectorTestSink<Object> testSink = new CollectorTestSink<Object>();
smw.outputPort.setSink(testSink);
smw.setup(null);
smw.setWindowSize(2);
// The incoming 6 simple map tuples are disordered among 4 windows
emitObjects(smw, new Map[][]{createHashMapTuples(keys, new Integer[][]{{1}, {3}}),
createHashMapTuples(keys, new Integer[][]{{2}, {5}}),
createHashMapTuples(keys, new Integer[][]{{4}}), createHashMapTuples(keys, new Integer[][]{{6}})});
smw.beginWindow(4);
smw.endWindow();
smw.beginWindow(5);
smw.endWindow();
// The outcome is ordered by the value of the key "number"
Assert.assertEquals(Arrays.asList(createHashMapTuples(keys, new Integer[][]{{1}, {2}, {3}, {4}, {5}, {6}})),
testSink.collectedTuples);
}
/**
* Given grouping key, sorting key and sorting function, test sorting the map tuples within the sliding window
*/
@Test
public void testSortingMapTupleWithKey()
{
SortedMovingWindow<Map<String, Object>, String> smw = new SortedMovingWindow<Map<String, Object>, String>();
final String[] keys = {"name", "number"};
smw.setComparator(new Comparator<Map<String, Object>>()
{
@Override
public int compare(Map<String, Object> o1, Map<String, Object> o2)
{
// order by key "number"
return (Integer)o1.get(keys[1]) - (Integer)o2.get(keys[1]);
}
});
smw.setFunction(new Function<Map<String,Object>, String>()
{
@Override
public String apply(Map<String, Object> input)
{
// order tuple with same key "name"
return (String)input.get(keys[0]);
}
});
CollectorTestSink<Object> testSink = new CollectorTestSink<Object>();
smw.outputPort.setSink(testSink);
smw.setup(null);
smw.setWindowSize(2);
// The incoming 9 complex map tuples are disordered with same name among 4 windows
emitObjects(smw, new Map[][]{createHashMapTuples(keys, new Object[][]{{"bob", 1}, {"jim", 1}}),
createHashMapTuples(keys, new Object[][]{{"jim", 2}, {"bob", 3}}),
createHashMapTuples(keys, new Object[][]{{"bob", 2}, {"jim", 4}}),
createHashMapTuples(keys, new Object[][]{{"bob", 5}, {"jim", 3}, {"bob", 4}})});
smw.beginWindow(4);
smw.endWindow();
smw.beginWindow(5);
smw.endWindow();
// All tuples with same "name" are sorted by key "number"
Assert.assertEquals(Arrays.asList(createHashMapTuples(keys,
new Object[][]{{"bob", 1}, {"jim", 1}, {"jim", 2}, {"bob", 2}, {"bob", 3}, {"jim", 3}, {"jim", 4}, {"bob", 4}, {"bob", 5}})), testSink.collectedTuples);
}
@SuppressWarnings({"rawtypes", "unchecked"})
private void emitObjects(SortedMovingWindow win, Object[][] obj)
{
for (int i = 0; i < obj.length; i++) {
win.beginWindow(i);
for (int j = 0; j < obj[i].length; j++) {
win.data.process(obj[i][j]);
}
win.endWindow();
}
}
@SuppressWarnings({"rawtypes", "unchecked"})
private Map[] createHashMapTuples(String[] cols, Object[][] values)
{
HashMap[] maps = new HashMap[values.length];
int index = -1;
for (Object[] vs : values) {
maps[++index] = new HashMap<String, Object>();
int colIndex = 0;
for (Object value : vs) {
maps[index].put(cols[colIndex++], value);
}
}
return maps;
}
}