blob: 98f04ea736d7978319ae72fee34d4b1f8c0713a3 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.test.window;
import static org.apache.edgent.function.Functions.unpartitioned;
import static org.apache.edgent.window.Policies.alwaysInsert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.edgent.function.BiConsumer;
import org.apache.edgent.window.InsertionTimeList;
import org.apache.edgent.window.Policies;
import org.apache.edgent.window.Window;
import org.apache.edgent.window.Windows;
import org.junit.Test;
public class WindowTest {
/**
* Verifies that the state of the window is correct after each tuple offer.
*/
@Test
public void lastCountTest(){
final int COUNT = 100;
// The window implementation
Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(10, unpartitioned());
// The states of the window as it slides
LinkedList<List<Integer> > incrementalWindowStates = new LinkedList<>();
// A processor that records the states of the window
BiConsumer<List<Integer>, Integer> wp = (tuples, key) -> {
incrementalWindowStates.addLast(new LinkedList<Integer>(tuples));
};
window.registerPartitionProcessor(wp);
// Generate sliding window correct incremental state to compare
// against the window's
LinkedList<LinkedList<Integer>> correctWindowStates = new LinkedList<>();
LinkedList<Integer> previous = null;
LinkedList<Integer> current = null;
for(int i = 0; i < COUNT; i++){
current = new LinkedList<>();
if(previous != null)
current.addAll(previous);
current.addLast(i);
if(current.size() > 10){
current.removeFirst();
}
previous = current;
correctWindowStates.addLast(current);
}
// Add tuples to window, populating the incrementalWindowStates list.
for(int i = 0; i < COUNT; i++){
window.insert(i);
}
// Compare correct window states to the window implementation's
assertTrue(correctWindowStates.size() == incrementalWindowStates.size());
for(int i = 0; i < correctWindowStates.size(); i++){
assertTrue(correctWindowStates.get(i).containsAll(incrementalWindowStates.get(i)));
assertTrue(incrementalWindowStates.get(i).containsAll(correctWindowStates.get(i)));
}
}
@Test
public void keyedWindowTest(){
final int COUNT = 1000;
// The window implementation
// The window implementation
Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(10, tuple->tuple%10);
// The states of the window as it slides
LinkedList<List<Integer> > incrementalWindowStates = new LinkedList<>();
// A processor that records the states of the window
BiConsumer<List<Integer>, Integer> wp = (tuples, key) -> {
incrementalWindowStates.addLast(new LinkedList<Integer>(tuples));
};
window.registerPartitionProcessor(wp);
Map<Integer, LinkedList<Integer>> correctPartitionedStates = new HashMap<>();
List<List<Integer> > correctWindowStates = new ArrayList<>();
for(int i = 0; i < 10; i++){
correctPartitionedStates.put(i, new LinkedList<>());
}
for(int i = 0; i < COUNT; i++){
correctPartitionedStates.get(i%10).add(i);
if(correctPartitionedStates.get(i%10).size() > 10){
correctPartitionedStates.get(i%10).removeFirst();
}
correctWindowStates.add(new ArrayList<>(correctPartitionedStates.get(i%10)));
window.insert(i);
}
// Compare correct window states to the window implementation's
assertTrue(correctWindowStates.size() == incrementalWindowStates.size());
for(int i = 0; i < correctWindowStates.size(); i++){
assertTrue(correctWindowStates.get(i).containsAll(incrementalWindowStates.get(i)));
assertTrue(incrementalWindowStates.get(i).containsAll(correctWindowStates.get(i)));
}
}
@Test
public void accessPartitionKeyTest(){
LinkedList<List<Integer> > incrementalWindowStates = new LinkedList<>();
Window<Integer, Integer, ? extends List<Integer>> window = Windows.<Integer, Integer, LinkedList<Integer>>window(
(partition, tuple) -> {
if (partition.getKey().equals(1) || partition.getKey().equals(3)) {
return false;
}
return true;
},
(partition, tuple) -> { // Contents policy
},
(partition) -> { // Evict determiner
partition.getContents().clear();
},
Policies.processOnInsert(),
tuple -> tuple,
() -> new LinkedList<Integer>());
// A processor that records the states of the window
BiConsumer<List<Integer>, Integer> wp = (tuples, key) -> {
incrementalWindowStates.addLast(new LinkedList<Integer>(tuples));
};
window.registerPartitionProcessor(wp);
for(Integer i = 0; i < 5; i++){
window.insert(i);
}
assertTrue(incrementalWindowStates.size() == 3);
assertTrue(incrementalWindowStates.get(0).get(0)==0);
assertTrue(incrementalWindowStates.get(1).get(0)==2);
assertTrue(incrementalWindowStates.get(2).get(0)==4);
}
@Test
public void concurrentWindowAccessTest() throws InterruptedException {
Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(10, tuple -> 0);
window.registerPartitionProcessor((tuples, key) -> {
// ensure that the window state doesn't change after .05 seconds
// Copy window state
LinkedList<Integer> list_copy = new LinkedList<Integer>(tuples);
// Wait .05 seconds
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Verify that the partition is unchanged
assertTrue(list_copy.containsAll(tuples));
assertTrue(tuples.containsAll(list_copy));
});
// Run for five seconds.
long finishTime = System.currentTimeMillis() + 3000;
List<Thread> threads = new ArrayList<Thread>();
// Ten threads concurrently attempt to insert tuples into the window
for(int i = 0; i < 10; i++){
threads.add(new Thread(new Runnable(){
Random r = new Random();
@Override
public void run() {
while(System.currentTimeMillis() < finishTime){
try{
window.insert(r.nextInt());
}
catch(ConcurrentModificationException cme){
org.junit.Assert.fail("State of window changed while processing");
}
}
}
}));
}
for(Thread thread : threads){
thread.start();
Thread.sleep(10);
}
for(Thread thread : threads)
thread.join();
}
@Test
public void noWaitConcurrentWindowAccessTest() throws InterruptedException {
Window<Integer, Integer, ? extends List<Integer>> window = Windows.lastNProcessOnInsert(100, tuple -> 0);
window.registerPartitionProcessor((tuples, key) -> {});
long finishTime = System.currentTimeMillis() + 3000;
List<Thread> threads = new ArrayList<Thread>();
// Ten threads concurrently attempt to insert tuples into the window
for(int i = 0; i < 10; i++){
threads.add(new Thread(new Runnable(){
Random r = new Random();
@Override
public void run() {
while(System.currentTimeMillis() < finishTime){
try{
window.insert(r.nextInt());
}
catch(ConcurrentModificationException cme){
org.junit.Assert.fail("State of window changed while processing");
}
}
}
}));
}
for(Thread thread : threads){
thread.start();
Thread.sleep(10);
}
for(Thread thread : threads)
thread.join();
}
@Test
public void timeActionTest() throws InterruptedException {
// Timing variances on shared machines can cause this test to fail
assumeTrue(!Boolean.getBoolean("edgent.build.ci"));
List<Long> diffs = Collections.synchronizedList(new ArrayList<>());
List<Boolean> initialized = Collections.synchronizedList(new ArrayList<>());
initialized.add(false);
Window<Long, Integer, InsertionTimeList<Long>> window = // new TimeWindowImpl<Long, Integer, LinkedList<Long>>(
Windows.window(
Policies.alwaysInsert(), // insertion policy
Policies.scheduleEvictIfEmpty(1000, TimeUnit.MILLISECONDS),
// Policies.evictOlderThan(1000, TimeUnit.MILLISECONDS),
Policies.evictOlderWithProcess(1000, TimeUnit.MILLISECONDS),
(partition, tuple) -> {
if(initialized.get(0).booleanValue() == false){
initialized.set(0, true);
ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
ses.scheduleAtFixedRate(() -> {partition.process();}, 0, 1000, TimeUnit.MILLISECONDS);
}},
unpartitioned(),
() -> new InsertionTimeList<Long>());
window.registerPartitionProcessor((tuples, key) -> {
if(tuples.size() > 1)
diffs.add(tuples.get(tuples.size()-1) - tuples.get(0));
});
window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
long endTime = System.currentTimeMillis() + 4000;
List<Thread> threads = new ArrayList<>();
int NUM_THREADS = 10;
// Create 10 threads. Each inserts at 1,000 Hz
for(int i = 0; i < NUM_THREADS; i++){
threads.add(new Thread(new Runnable() {
@Override
public void run() {
while(System.currentTimeMillis() < endTime){
window.insert(System.currentTimeMillis());
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}));
}
for(int i = 0; i < NUM_THREADS; i++){
threads.get(i).start();
}
for(int i = 0; i < NUM_THREADS; i++){
threads.get(i).join();
}
assertOnTimeEvictions(diffs);
}
@SuppressWarnings("serial")
@Test
public void countBatchWindowTest(){
List<Integer> numBatches = new LinkedList<>();
Window<Integer, Integer, List<Integer>> window =
Windows.window(
alwaysInsert(),
Policies.doNothing(),
Policies.evictAll(),
Policies.processWhenFullAndEvict(113),
tuple -> 0,
() -> new ArrayList<Integer>());
window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> t, Integer u) {
numBatches.add(1);
}
});
for(int i = 0; i < 1000; i++){
window.insert(i);
}
assertTrue(numBatches.size() == 8);
}
@SuppressWarnings("serial")
@Test
public void timeBatchWindowTest() throws InterruptedException{
List<Long> numBatches = new LinkedList<>();
ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(5);
Window<Integer, Integer, List<Integer>> window =
Windows.window(
alwaysInsert(),
Policies.scheduleEvictOnFirstInsert(1, TimeUnit.SECONDS),
Policies.evictAllAndScheduleEvictWithProcess(1, TimeUnit.SECONDS),
(partiton, tuple) -> {},
tuple -> 0,
() -> new ArrayList<Integer>());
window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> t, Integer u) {
numBatches.add((long)t.size());
}
});
window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
ScheduledFuture<?> sf = ses.scheduleAtFixedRate(() -> {
window.insert(1);
}, 0, 10, TimeUnit.MILLISECONDS);
Thread.sleep(4000);
sf.cancel(true);
double tolerance = .08;
for(int i = 0; i < numBatches.size(); i++){
assertTrue("Values:" + numBatches.toString(), withinTolerance(100.0, numBatches.get(i).doubleValue(), tolerance));
}
}
@SuppressWarnings("serial")
@Test
public void keyedTimeBatchWindowTest() throws InterruptedException{
Map<Integer, List<Long> > numBatches = Collections.synchronizedMap(new HashMap<>());
for(int i = 0; i < 5; i++)
numBatches.put(i, new LinkedList<Long>());
ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(5);
Window<Integer, Integer, List<Integer>> window =
Windows.window(
alwaysInsert(),
Policies.scheduleEvictOnFirstInsert(1, TimeUnit.SECONDS),
Policies.evictAllAndScheduleEvictWithProcess(1, TimeUnit.SECONDS),
(partiton, tuple) -> {},
tuple -> tuple,
() -> new ArrayList<Integer>());
window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> t, Integer u) {
List<Long> l = numBatches.get(u);
l.add((long)t.size());
}
});
window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
ScheduledFuture<?> sf = ses.scheduleAtFixedRate(() -> {
for(int i = 0; i < 5; i++)
window.insert(i);
}, 0, 1, TimeUnit.MILLISECONDS);
Thread.sleep(4000);
sf.cancel(true);
try {
sf.get();
} catch (Exception e) {
// expected
}
double tolerance = .12;
for(Integer key : numBatches.keySet()){
List<Long> batch = numBatches.get(key);
for(Long l : batch){
assertTrue("Values:" + batch.toString(), withinTolerance(1000.0, l.doubleValue(), tolerance));
}
}
}
@SuppressWarnings("serial")
@Test
public void timeBatchEnsureUnique() throws InterruptedException{
List<List<Integer>> batches = Collections.synchronizedList(new LinkedList<>());
ScheduledExecutorService ses = new ScheduledThreadPoolExecutor(5);
Window<Integer, Integer, List<Integer>> window =
Windows.window(
alwaysInsert(),
Policies.scheduleEvictOnFirstInsert(1, TimeUnit.SECONDS),
Policies.evictAllAndScheduleEvictWithProcess(1, TimeUnit.SECONDS),
(partiton, tuple) -> {},
tuple -> 0,
() -> new ArrayList<Integer>());
window.registerPartitionProcessor(new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> t, Integer u) {
batches.add(new ArrayList<Integer>(t));
}
});
window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
AtomicInteger count = new AtomicInteger();
int MAX_TUP_CNT = 300;
ScheduledFuture<?> sf = ses.scheduleAtFixedRate(new Runnable(){
@Override
public void run() {
if(count.get() < MAX_TUP_CNT){
window.insert(count.incrementAndGet());
}
}
}, 0, 10, TimeUnit.MILLISECONDS);
long insertMsec = MAX_TUP_CNT * 10 /*10msec/tup*/;
Thread.sleep(insertMsec + 1000/*extra sec*/);
sf.cancel(true);
assertEquals("Invalid test", MAX_TUP_CNT, count.get());
int numTuples = 0;
for(int i = 0; i < batches.size() - 1; i++){
List<Integer> batch = batches.get(i);
numTuples += batch.size();
for(int j = i + 1; j < batches.size(); j++){
assertTrue("Batches have overlapping tuples", Collections.disjoint(batches.get(i), batches.get(j)));
}
}
numTuples += batches.get(batches.size() -1).size();
assertEquals("Number of batch tuples", count.get(), numTuples);
assertTrue("Number of batches exp:"+MAX_TUP_CNT/100+" got:"+batches.size(),
withinToleranceAmt((double)MAX_TUP_CNT/100, (double)batches.size(), 1)); // +/- 1
}
private void assertOnTimeEvictions(List<Long> diffs) {
double tolerance = .08;
for(int i = 1; i < diffs.size(); i++){
assertTrue(withinTolerance(1000.0, diffs.get(i).doubleValue(), tolerance));
}
}
public static boolean withinTolerance(double expected, Double actual, double tolerance) {
double lowBound = (1.0 - tolerance) * expected;
double highBound = (1.0 + tolerance) * expected;
return (actual < highBound && actual > lowBound);
}
public static boolean withinToleranceAmt(double expected, Double actual, double toleranceAmt) {
double lowBound = expected - toleranceAmt;
double highBound = expected + toleranceAmt;
return (actual <= highBound && actual >= lowBound);
}
}