reduce some unnecessarily long WindowTest tests
diff --git a/api/window/src/test/java/org/apache/edgent/test/window/WindowTest.java b/api/window/src/test/java/org/apache/edgent/test/window/WindowTest.java
index 45d4c61..e563d38 100644
--- a/api/window/src/test/java/org/apache/edgent/test/window/WindowTest.java
+++ b/api/window/src/test/java/org/apache/edgent/test/window/WindowTest.java
@@ -20,6 +20,7 @@
import static org.apache.edgent.function.Functions.unpartitioned;
import static org.apache.edgent.window.Policies.alwaysInsert;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
@@ -35,6 +36,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.edgent.function.BiConsumer;
import org.apache.edgent.window.InsertionTimeList;
@@ -293,7 +295,7 @@
window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
- long endTime = System.currentTimeMillis() + 8000;
+ long endTime = System.currentTimeMillis() + 4000;
List<Thread> threads = new ArrayList<>();
int NUM_THREADS = 10;
// Create 10 threads. Each inserts at 1,000 Hz
@@ -375,7 +377,7 @@
window.insert(1);
}, 0, 10, TimeUnit.MILLISECONDS);
- Thread.sleep(11000);
+ Thread.sleep(4000);
sf.cancel(true);
double tolerance = .08;
for(int i = 0; i < numBatches.size(); i++){
@@ -415,7 +417,7 @@
window.insert(i);
}, 0, 1, TimeUnit.MILLISECONDS);
- Thread.sleep(11000);
+ Thread.sleep(4000);
sf.cancel(true);
try {
sf.get();
@@ -457,19 +459,22 @@
window.registerScheduledExecutorService(new ScheduledThreadPoolExecutor(5));
+ AtomicInteger count = new AtomicInteger();
+ int MAX_TUP_CNT = 300;
ScheduledFuture<?> sf = ses.scheduleAtFixedRate(new Runnable(){
- private int count = 0;
@Override
public void run() {
- if(count < 1000){
- window.insert(count++);
+ if(count.get() < MAX_TUP_CNT){
+ window.insert(count.incrementAndGet());
}
}
}, 0, 10, TimeUnit.MILLISECONDS);
- Thread.sleep(11000);
+ long insertMsec = MAX_TUP_CNT * 10 /*10msec/tup*/;
+ Thread.sleep(insertMsec + 1000/*extra sec*/);
sf.cancel(true);
+ assertEquals("Invalid test", MAX_TUP_CNT, count.get());
int numTuples = 0;
for(int i = 0; i < batches.size() - 1; i++){
List<Integer> batch = batches.get(i);
@@ -480,7 +485,8 @@
}
numTuples += batches.get(batches.size() -1).size();
- assertTrue("Number of tuples submitted (1000) != number of tuples processed in batch (" + numTuples + ")", numTuples == 1000);
+ assertEquals("Number of batch tuples", count.get(), numTuples);
+ assertEquals("Number of batches", MAX_TUP_CNT/100, batches.size());
}
private void assertOnTimeEvictions(List<Long> diffs) {