blob: 8c37d578c833b565532163b8c3b307fee795734d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.window.impl;
import java.util.List;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.WindowState;
import org.apache.apex.malhar.lib.window.accumulation.CoGroup;
import com.google.common.base.Function;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.KeyValPair;
/**
* Unit tests for Windowed Join Operator
*/
public class WindowedMergeOperatorTest
{
@Test
public void extractTimestampTest()
{
WindowedMergeOperatorImpl op = createDefaultWindowedMergeOperator();
Function<Integer, Long> timestampExtractor = new Function<Integer, Long>()
{
@Override
public Long apply(Integer input)
{
return (input * 10L);
}
};
Assert.assertEquals(1000L, op.extractTimestamp(new Tuple.PlainTuple<Integer>(100), timestampExtractor));
Assert.assertEquals(2000L, op.extractTimestamp(new Tuple.PlainTuple<Integer>(200), timestampExtractor));
Assert.assertEquals(200L, op.extractTimestamp(new Tuple.TimestampedTuple<Integer>(200L, 10), null));
}
@Test
public void windowedMergeOperatorMergeTest()
{
WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> op = createDefaultWindowedMergeOperator();
Window global = Window.GlobalWindow.INSTANCE;
op.setDataStorage(new InMemoryWindowedStorage<List<Set<Integer>>>());
op.setWindowOption(new WindowOption.GlobalWindow());
op.initializeWindowStates(AbstractWindowedOperator.GLOBAL_WINDOW_SINGLETON_SET);
op.processTuple(new Tuple.WindowedTuple<Integer>(global, 100));
Assert.assertEquals(1, op.dataStorage.get(global).get(0).size());
op.processTuple2(new Tuple.WindowedTuple<Integer>(global, 200));
Assert.assertEquals(1, op.dataStorage.get(global).get(1).size());
op.processTuple(new Tuple.WindowedTuple<Integer>(global, 300));
Assert.assertEquals(2, op.dataStorage.get(global).get(0).size());
Assert.assertEquals(2, op.accumulation.getOutput(op.dataStorage.get(global)).size());
}
@Test
public void keyedWindowedMergeOperatorMergeTest()
{
KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>> op
= createDefaultKeyedWindowedMergeOperator();
Window global = Window.GlobalWindow.INSTANCE;
op.setDataStorage(new InMemoryWindowedKeyedStorage<String, List<Set<Integer>>>());
op.setWindowOption(new WindowOption.GlobalWindow());
op.initializeWindowStates(AbstractWindowedOperator.GLOBAL_WINDOW_SINGLETON_SET);
op.processTuple(new Tuple.WindowedTuple<KeyValPair<String, Integer>>(global, new KeyValPair<String, Integer>("A", 100)));
Assert.assertEquals(1, op.dataStorage.get(global, "A").get(0).size());
Assert.assertTrue(op.dataStorage.get(global, "A").get(0).contains(100));
op.processTuple2(new Tuple.WindowedTuple<KeyValPair<String, Integer>>(global, new KeyValPair<String, Integer>("A", 200)));
Assert.assertEquals(1, op.dataStorage.get(global, "A").get(1).size());
Assert.assertTrue(op.dataStorage.get(global, "A").get(1).contains(200));
op.processTuple2(new Tuple.WindowedTuple<KeyValPair<String, Integer>>(global, new KeyValPair<String, Integer>("B", 300)));
Assert.assertEquals(1, op.dataStorage.get(global, "A").get(1).size());
Assert.assertEquals(1, op.dataStorage.get(global, "B").get(1).size());
Assert.assertTrue(op.dataStorage.get(global, "B").get(1).contains(300));
Assert.assertEquals(2, op.accumulation.getOutput(op.dataStorage.get(global, "A")).size());
}
@Test
public void windowedMergeOperatorWatermarkTest()
{
WindowedMergeOperatorImpl op = createDefaultWindowedMergeOperator();
CollectorTestSink<WatermarkImpl> sink = new CollectorTestSink<>();
op.controlOutput.setSink(sink);
// No watermark is generated if the Merge operator haven't seen all watermarks from all input streams.
op.controlInput.process(new WatermarkImpl(1000000));
op.endWindow();
Assert.assertEquals(-1, op.currentWatermark);
Assert.assertEquals(0, sink.collectedTuples.size());
// Once both input streams sent watermarks to Merge operator, it should generate a watermark and send to downstream.
op.controlInput2.process(new WatermarkImpl(200000));
op.endWindow();
Assert.assertEquals(200000, op.currentWatermark);
Assert.assertEquals(1, sink.collectedTuples.size());
// If the minimum of the latest input watermarks changes, Merge operator should also generate a new watermark.
op.controlInput2.process(new WatermarkImpl(2100000));
op.endWindow();
Assert.assertEquals(1000000, op.currentWatermark);
Assert.assertEquals(2, sink.collectedTuples.size());
// Current watermark of Merge operator could only change during endWindow() event.
op.controlInput.process(new WatermarkImpl(1100000));
op.endWindow();
Assert.assertEquals(1100000, op.currentWatermark);
Assert.assertEquals(3, sink.collectedTuples.size());
// If the upstreams sent a watermark but the minimum of the latest input watermarks doesn't change, the Merge
// operator should not generate a new watermark, thus nothing will be sent to downstream.
op.controlInput.process(new WatermarkImpl(1100000));
op.endWindow();
Assert.assertEquals(1100000, op.currentWatermark);
Assert.assertEquals(3, sink.collectedTuples.size());
}
private WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> createDefaultWindowedMergeOperator()
{
WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> windowedMergeOperator = new WindowedMergeOperatorImpl<>();
windowedMergeOperator.setDataStorage(new InMemoryWindowedStorage<List<Set<Integer>>>());
windowedMergeOperator.setRetractionStorage(new InMemoryWindowedStorage<List<List<Integer>>>());
windowedMergeOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
windowedMergeOperator.setAccumulation(new CoGroup<Integer>());
return windowedMergeOperator;
}
private KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>> createDefaultKeyedWindowedMergeOperator()
{
KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>> windowedMergeOperator = new KeyedWindowedMergeOperatorImpl<>();
windowedMergeOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, List<Set<Integer>>>());
windowedMergeOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, List<List<Integer>>>());
windowedMergeOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
windowedMergeOperator.setAccumulation(new CoGroup<Integer>());
return windowedMergeOperator;
}
}