| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package com.datatorrent.lib.multiwindow; |
| |
| import java.util.Arrays; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.Map; |
| |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| import org.apache.commons.lang.ObjectUtils.Null; |
| |
| import com.google.common.base.Function; |
| import com.google.common.collect.Lists; |
| |
| import com.datatorrent.lib.testbench.CollectorTestSink; |
| |
| /** |
| * A unit test to test SortedMovingWindow operator can either: |
| * 1. sort simple comparable tuples |
| * 2. sort tuples by given order (Comparator) |
| * 3. group tuples into different category and sort the category by a given order |
| * |
| */ |
| public class SortedMovingWindowTest |
| { |
| |
| /** |
| * Test sorting simple comparable tuples within the sliding window |
| */ |
| @Test |
| public void testSortingSimpleNumberTuple() |
| { |
| SortedMovingWindow<Integer, Null> smw = new SortedMovingWindow<Integer, Null>(); |
| CollectorTestSink<Object> testSink = new CollectorTestSink<Object>(); |
| smw.outputPort.setSink(testSink); |
| smw.setup(null); |
| |
| smw.setWindowSize(2); |
| // The incoming 6 integer tuples are disordered among 4 windows |
| emitObjects(smw, new Integer[][]{{1,3}, {2,5}, {4}, {6}}); |
| smw.beginWindow(4); |
| smw.endWindow(); |
| smw.beginWindow(5); |
| smw.endWindow(); |
| |
| // The outcome is sorted |
| Assert.assertEquals(Lists.newArrayList(1, 2, 3, 4, 5, 6), testSink.collectedTuples); |
| |
| } |
| |
| /** |
| * Given sorting key, sorting function, test sorting the map tuples within the sliding window |
| */ |
| @Test |
| public void testSortingMapTupleWithoutKey() |
| { |
| |
| SortedMovingWindow<Map<String, Integer>, Null> smw = new SortedMovingWindow<Map<String, Integer>, Null>(); |
| |
| final String[] keys = {"number"}; |
| |
| smw.setComparator(new Comparator<Map<String, Integer>>() |
| { |
| @Override |
| public int compare(Map<String, Integer> o1, Map<String, Integer> o2) |
| { |
| // order the map by the value of key "number" |
| return o1.get(keys[0]) - o2.get(keys[0]); |
| } |
| }); |
| CollectorTestSink<Object> testSink = new CollectorTestSink<Object>(); |
| smw.outputPort.setSink(testSink); |
| smw.setup(null); |
| smw.setWindowSize(2); |
| |
| // The incoming 6 simple map tuples are disordered among 4 windows |
| emitObjects(smw, new Map[][]{createHashMapTuples(keys, new Integer[][]{{1}, {3}}), |
| createHashMapTuples(keys, new Integer[][]{{2}, {5}}), |
| createHashMapTuples(keys, new Integer[][]{{4}}), createHashMapTuples(keys, new Integer[][]{{6}})}); |
| smw.beginWindow(4); |
| smw.endWindow(); |
| smw.beginWindow(5); |
| smw.endWindow(); |
| |
| // The outcome is ordered by the value of the key "number" |
| Assert.assertEquals(Arrays.asList(createHashMapTuples(keys, new Integer[][]{{1}, {2}, {3}, {4}, {5}, {6}})), |
| testSink.collectedTuples); |
| } |
| |
| |
| /** |
| * Given grouping key, sorting key and sorting function, test sorting the map tuples within the sliding window |
| */ |
| @Test |
| public void testSortingMapTupleWithKey() |
| { |
| |
| SortedMovingWindow<Map<String, Object>, String> smw = new SortedMovingWindow<Map<String, Object>, String>(); |
| |
| final String[] keys = {"name", "number"}; |
| |
| smw.setComparator(new Comparator<Map<String, Object>>() |
| { |
| @Override |
| public int compare(Map<String, Object> o1, Map<String, Object> o2) |
| { |
| // order by key "number" |
| return (Integer)o1.get(keys[1]) - (Integer)o2.get(keys[1]); |
| } |
| }); |
| |
| smw.setFunction(new Function<Map<String,Object>, String>() |
| { |
| @Override |
| public String apply(Map<String, Object> input) |
| { |
| // order tuple with same key "name" |
| return (String)input.get(keys[0]); |
| } |
| }); |
| CollectorTestSink<Object> testSink = new CollectorTestSink<Object>(); |
| smw.outputPort.setSink(testSink); |
| smw.setup(null); |
| smw.setWindowSize(2); |
| |
| // The incoming 9 complex map tuples are disordered with same name among 4 windows |
| emitObjects(smw, new Map[][]{createHashMapTuples(keys, new Object[][]{{"bob", 1}, {"jim", 1}}), |
| createHashMapTuples(keys, new Object[][]{{"jim", 2}, {"bob", 3}}), |
| createHashMapTuples(keys, new Object[][]{{"bob", 2}, {"jim", 4}}), |
| createHashMapTuples(keys, new Object[][]{{"bob", 5}, {"jim", 3}, {"bob", 4}})}); |
| smw.beginWindow(4); |
| smw.endWindow(); |
| smw.beginWindow(5); |
| smw.endWindow(); |
| |
| // All tuples with same "name" are sorted by key "number" |
| Assert.assertEquals(Arrays.asList(createHashMapTuples(keys, |
| new Object[][]{{"bob", 1}, {"jim", 1}, {"jim", 2}, {"bob", 2}, {"bob", 3}, {"jim", 3}, {"jim", 4}, {"bob", 4}, {"bob", 5}})), testSink.collectedTuples); |
| } |
| |
| @SuppressWarnings({"rawtypes", "unchecked"}) |
| private void emitObjects(SortedMovingWindow win, Object[][] obj) |
| { |
| for (int i = 0; i < obj.length; i++) { |
| win.beginWindow(i); |
| for (int j = 0; j < obj[i].length; j++) { |
| win.data.process(obj[i][j]); |
| } |
| win.endWindow(); |
| } |
| } |
| |
| @SuppressWarnings({"rawtypes", "unchecked"}) |
| private Map[] createHashMapTuples(String[] cols, Object[][] values) |
| { |
| |
| HashMap[] maps = new HashMap[values.length]; |
| int index = -1; |
| for (Object[] vs : values) { |
| maps[++index] = new HashMap<String, Object>(); |
| int colIndex = 0; |
| for (Object value : vs) { |
| maps[index].put(cols[colIndex++], value); |
| } |
| } |
| |
| return maps; |
| } |
| |
| } |