blob: 644ac4fa2e43d1c435a9fe8e762e9d1ec70e0c1d [file] [log] [blame]
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
package org.apache.edgent.test.topology;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.edgent.function.BiFunction;
import org.apache.edgent.function.Function;
import org.apache.edgent.function.Functions;
import org.apache.edgent.function.ToIntFunction;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.Topology;
import org.apache.edgent.topology.tester.Condition;
import org.junit.Ignore;
import org.junit.Test;
public abstract class PlumbingTest extends TopologyAbstractTest {
public void testBlockingDelay() throws Exception {
// Timing variances on shared machines can cause this test to fail
Topology topology = newTopology();
TStream<String> strings = topology.strings("a", "b", "c", "d");
TStream<Long> starts = -> System.currentTimeMillis());
// delay stream
starts = PlumbingStreams.blockingDelay(starts, 300, TimeUnit.MILLISECONDS);
// calculate delay
starts = starts.modify(v -> System.currentTimeMillis() - v);
starts = starts.filter(v -> v >= 300);
Condition<Long> tc = topology.getTester().tupleCount(starts, 4);
complete(topology, tc);
assertTrue("valid:" + tc.getResult(), tc.valid());
public void testBlockingThrottle() throws Exception {
// Timing variances on shared machines can cause this test to fail
Topology topology = newTopology();
TStream<String> strings = topology.strings("a", "b", "c", "d");
TStream<Long> emittedDelays = -> 0L);
// throttle stream
long[] lastEmittedTimestamp = { 0 };
emittedDelays = PlumbingStreams.blockingThrottle(emittedDelays, 300, TimeUnit.MILLISECONDS)
.map(t -> {
// compute the delay since the last emitted tuple
long now = System.currentTimeMillis();
if (lastEmittedTimestamp[0] == 0)
lastEmittedTimestamp[0] = now;
t = now - lastEmittedTimestamp[0];
lastEmittedTimestamp[0] = now;
// System.out.println("### "+t);
return t;
.map(t -> {
// simulate 200ms downstream processing delay
try {
} catch (InterruptedException e) {
throw new RuntimeException(e);
} return t;
}) ;
// should end up with throttled delays close to 300 (not 500 like
// a blockingDelay() under these same conditions would yield)
emittedDelays = emittedDelays.filter(v -> v <= 320);
Condition<Long> tc = topology.getTester().tupleCount(emittedDelays, 4);
complete(topology, tc);
assertTrue("valid:" + tc.getResult(), tc.valid());
public void testOneShotDelay() throws Exception {
Topology topology = newTopology();
TStream<String> strings = topology.strings("a", "b", "c", "d");
TStream<Long> starts = -> System.currentTimeMillis());
// delay stream
starts = PlumbingStreams.blockingOneShotDelay(starts, 300, TimeUnit.MILLISECONDS);
// calculate display
starts = starts.modify(v -> System.currentTimeMillis() - v);
// the first tuple shouldn't satisfy the predicate
starts = starts.filter(v -> v < 300);
Condition<Long> tc = topology.getTester().tupleCount(starts, 3);
complete(topology, tc);
assertTrue("valid:" + tc.getResult(), tc.valid());
public static class TimeAndId {
private static AtomicInteger ids = new AtomicInteger();
long ms;
final int id;
public TimeAndId() { = System.currentTimeMillis(); = ids.incrementAndGet();
public TimeAndId(TimeAndId tai) { = System.currentTimeMillis() -; =;
public String toString() {
return "TAI:" + id + "@" + ms;
public void testPressureRelieverDrop() throws Exception {
Topology topology = newTopology();
// Verify the pressureReliever drops and retains the most recent when
// backpressure exists.
// Here, all the tuples hit the reliever at once, the downstream processing (oneShotDelay)
// causes a backup causing the reliever's queue to become full and drop tuples.
// The first tuple should be processed, then the last (most recent) N (N==queue depth).
String[] tuples = {"A", "B", "C", "D", "E", "F", "G", "H"};
String[] expTuples = {"A", "F", "G", "H"}; // with queue depth of 3
TStream<String> raw = topology.strings(tuples);
TStream<String> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 3);
TStream<String> pr2 = PlumbingStreams.blockingOneShotDelay(pr, 1, TimeUnit.SECONDS);
Condition<Long> tcCount = topology.getTester().tupleCount(pr2, expTuples.length);
Condition<List<String>> contents = topology.getTester().streamContents(pr2, expTuples);
complete(topology, tcCount);
public void testPressureRelieverNoDrop() throws Exception {
Topology topology = newTopology();
// Same pipeline config as testPressureRelieverDrop but the reliever queue is
// big enough to avoid drops
String[] tuples = {"A", "B", "C", "D", "E", "F", "G", "H"};
TStream<String> raw = topology.strings(tuples);
TStream<String> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 100);
TStream<String> pr2 = PlumbingStreams.blockingOneShotDelay(pr, 1, TimeUnit.SECONDS);
Condition<Long> tcCount = topology.getTester().tupleCount(pr2, tuples.length);
Condition<List<String>> contents = topology.getTester().streamContents(pr2, tuples);
complete(topology, tcCount);
public void testPressureRelieverContinuous() throws Exception {
// Timing variances on shared machines can cause this test to fail
// Try to verify more continuous reliever behavior instead of just the
// the other pressure reliever tests where the backpressure only exists
// at the beginning.
// Generate @ 100tps, consume @ 5tps.
// With reliever depth=1, roughly should process every 20th tuple, with essentially
// no delay in the queue (certainly less than say 50% of the consumer delay, hence < 0.5 * 200ms)
Topology topology = newTopology();
TStream<TimeAndId> raw = topology.poll(() -> new TimeAndId(), 10, TimeUnit.MILLISECONDS);
TStream<TimeAndId> pr = PlumbingStreams.pressureReliever(raw, Functions.unpartitioned(), 1);
TStream<TimeAndId> slow = PlumbingStreams.blockingDelay(pr, 200, TimeUnit.MILLISECONDS);
// calculate the delay (queue time + consumer processing)
TStream<TimeAndId> slowM = slow.modify(v -> new TimeAndId(v));
// Also process raw that should be unaffected by the slow path
TStream<TimeAndId> processed = raw.filter(t -> true);
Condition<Long> tcSlowMCount = topology.getTester().atLeastTupleCount(slowM, 10);
Condition<List<TimeAndId>> tcSlowM = topology.getTester().streamContents(slowM);
Condition<List<TimeAndId>> tcProcessed = topology.getTester().streamContents(processed);
complete(topology, tcSlowMCount);
System.out.println(String.format("testPressureRelieverContinuous() fastCnt:%d slowCnt:%d",
tcProcessed.getResult().size(), tcSlowM.getResult().size()));
System.out.println("slow: "+tcSlowM.getResult());
// No lost tuples in the fast path (successive Ids, starting @ 1)
assertEquals("fastpath tuples dropped",
// No dup tuples in the fast path
Set<TimeAndId> uniqRaw = new HashSet<>(tcProcessed.getResult());
assertEquals("fastpath tuples duplicated", tcProcessed.getResult().size(), uniqRaw.size());
// fastpath count should be roughly 20x the slow delayed/relieved count
assertTrue("rawCnt:"+tcProcessed.getResult().size()+" slowMCnt:"+tcSlowM.getResult().size(),
tcProcessed.getResult().size() >= 15 * +tcSlowM.getResult().size());
// slow should process roughly every 20th tuple... not successive ones
TimeAndId prevId = null;
for (TimeAndId id : tcSlowM.getResult()) {
if (prevId == null) {
// should have processed the 1st tuple
assertEquals("slow firstId", 1,;
else {
// seems like this could be sensitive to host load
assertTrue("slow ids prevId:"+prevId+" id:"+id, >= + 15
&& <= + 25);
prevId = id;
// every slow tuple should be processed near instantaneously - shouldn't wait
// long in the queue.
for (TimeAndId id : tcSlowM.getResult()) {
assertTrue("slow delays prevId:"+prevId+" id:"+id, <= 300); // 200ms consumer processing + up to %50 of that waiting
public void testValveState() throws Exception {
Valve<Integer> valve = new Valve<>();
valve = new Valve<>(true);
valve = new Valve<>(false);
public void testValveInitiallyOpen() throws Exception {
Topology top = newTopology("testValve");
TStream<Integer> values = top.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
Valve<Integer> valve = new Valve<>();
AtomicInteger cnt = new AtomicInteger();
TStream<Integer> filtered = values
.peek(tuple -> {
// reject 4,5,6
int curCnt = cnt.incrementAndGet();
if (curCnt > 6)
else if (curCnt > 3)
Condition<Long> count = top.getTester().tupleCount(filtered, 7);
Condition<List<Integer>> contents = top.getTester().streamContents(filtered, 1,2,3,7,8,9,10 );
complete(top, count);
assertTrue(contents.getResult().toString(), contents.valid());
public void testValveInitiallyClosed() throws Exception {
Topology top = newTopology("testValve");
TStream<Integer> values = top.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
Valve<Integer> valve = new Valve<>(false);
AtomicInteger cnt = new AtomicInteger();
TStream<Integer> filtered = values
.peek(tuple -> {
// reject all but 4,5,6
int curCnt = cnt.incrementAndGet();
if (curCnt > 6)
else if (curCnt > 3)
Condition<Long> count = top.getTester().tupleCount(filtered, 3);
Condition<List<Integer>> contents = top.getTester().streamContents(filtered, 4,5,6 );
complete(top, count);
assertTrue(contents.getResult().toString(), contents.valid());
private Function<Integer,JsonObject> fakeAnalytic(int channel, long period, TimeUnit unit) {
return value -> {
try {
JsonObject jo = new JsonObject();
jo.addProperty("channel", channel);
jo.addProperty("result", value);
return jo;
} catch (InterruptedException e) {
throw new RuntimeException("channel="+channel+" interrupted", e);
private Function<TStream<Integer>,TStream<JsonObject>> fakePipeline(int channel, long period, TimeUnit unit) {
return stream ->, period, unit)).filter(t->true).tag("pipeline-ch"+channel);
public void testConcurrentMap() throws Exception {
Topology top = newTopology("testConcurrentMap");
int ch = 0;
List<Function<Integer,JsonObject>> mappers = new ArrayList<>();
mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
// a couple much faster just in case something's amiss with queues
mappers.add(fakeAnalytic(ch++, 3, TimeUnit.MILLISECONDS));
mappers.add(fakeAnalytic(ch++, 13, TimeUnit.MILLISECONDS));
Function<List<JsonObject>,Integer> combiner = list -> {
int sum = 0;
int cnt = 0;
System.out.println("combiner: "+list);
for(JsonObject jo : list) {
assertEquals(cnt++, jo.get("channel").getAsInt());
sum += jo.get("result").getAsInt();
return sum;
TStream<Integer> values = top.of(1, 2, 3);
Integer[] resultTuples = new Integer[]{
TStream<Integer> result = PlumbingStreams.concurrentMap(values, mappers, combiner);
Condition<Long> count = top.getTester().tupleCount(result, 3);
Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
long begin = System.currentTimeMillis();
complete(top, count);
long end = System.currentTimeMillis();
assertTrue(contents.getResult().toString(), contents.valid());
long actDuration = end - begin;
long expMinSerialDuration = resultTuples.length * mappers.size() * 100;
long expMinDuration = resultTuples.length * 100;
System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
// a gross level performance check w/concurrent channels
if (Boolean.getBoolean(""))
System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
actDuration < 0.5 * expMinSerialDuration);
public void testConcurrent() throws Exception {
Topology top = newTopology("testConcurrent");
int ch = 0;
List<Function<TStream<Integer>,TStream<JsonObject>>> pipelines = new ArrayList<>();
pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
Function<List<JsonObject>,Integer> combiner = list -> {
int sum = 0;
int cnt = 0;
System.out.println("combiner: "+list);
for(JsonObject jo : list) {
assertEquals(cnt++, jo.get("channel").getAsInt());
sum += jo.get("result").getAsInt();
return sum;
TStream<Integer> values = top.of(1, 2, 3);
Integer[] resultTuples = new Integer[]{
TStream<Integer> result = PlumbingStreams.concurrent(values, pipelines, combiner).tag("result");
Condition<Long> count = top.getTester().tupleCount(result, 3);
Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
long begin = System.currentTimeMillis();
complete(top, count);
long end = System.currentTimeMillis();
assertTrue(contents.getResult().toString(), contents.valid());
long actDuration = end - begin;
long expMinSerialDuration = resultTuples.length * pipelines.size() * 100;
long expMinDuration = resultTuples.length * 100;
System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
// a gross level performance check w/concurrent channels
if (Boolean.getBoolean(""))
System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
actDuration < 0.5 * expMinSerialDuration);
private BiFunction<Integer,Integer,JsonObject> fakeParallelAnalytic(long period, TimeUnit unit) {
return (value,channel) -> {
try {
Thread.sleep(unit.toMillis(period)); // simulate work for this period
JsonObject jo = new JsonObject();
jo.addProperty("channel", channel);
jo.addProperty("result", value);
return jo;
} catch (InterruptedException e) {
throw new RuntimeException("channel="+channel+" interrupted", e);
private BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> fakeParallelPipeline(long period, TimeUnit unit) {
return (stream,channel) -> stream
.map(value -> fakeParallelAnalytic(period, unit).apply(value,channel))
private Function<JsonObject,JsonObject> fakeJsonAnalytic(int channel, long period, TimeUnit unit) {
return jo -> {
try {
Thread.sleep(unit.toMillis(period)); // simulate work for this period
return jo;
} catch (InterruptedException e) {
throw new RuntimeException("channel="+channel+" interrupted", e);
private BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> fakeParallelPipelineTiming(long period, TimeUnit unit) {
return (stream,channel) -> stream
.map(jo -> { jo.addProperty("startPipelineMsec", System.currentTimeMillis());
return jo; })
.map(fakeJsonAnalytic(channel, period, unit))
.map(jo -> { jo.addProperty("endPipelineMsec", System.currentTimeMillis());
return jo; })
public void testParallelMap() throws Exception {
Topology top = newTopology("testParallelMap");
BiFunction<Integer,Integer,JsonObject> mapper =
fakeParallelAnalytic(100, TimeUnit.MILLISECONDS);
int width = 5;
ToIntFunction<Integer> splitter = tuple -> tuple % width;
Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
TStream<Integer> values = top.of(resultTuples);
TStream<JsonObject> result = PlumbingStreams.parallelMap(values, width, splitter, mapper).tag("result");
TStream<Integer> result2 = -> {
int r = jo.get("result").getAsInt();
assertEquals(splitter.applyAsInt(r), jo.get("channel").getAsInt());
return r;
Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
long begin = System.currentTimeMillis();
complete(top, count);
long end = System.currentTimeMillis();
assertTrue(contents.getResult().toString(), contents.valid());
long actDuration = end - begin;
long expMinSerialDuration = resultTuples.length * 100;
long expMinDuration = (resultTuples.length / width) * 100;
System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
// a gross level performance check w/parallel channels
if (Boolean.getBoolean(""))
System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
actDuration < 0.5 * expMinSerialDuration);
public void testParallel() throws Exception {
Topology top = newTopology("testParallel");
BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> pipeline =
fakeParallelPipeline(100, TimeUnit.MILLISECONDS);
int width = 5;
ToIntFunction<Integer> splitter = tuple -> tuple % width;
Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
TStream<Integer> values = top.of(resultTuples);
TStream<JsonObject> result = PlumbingStreams.parallel(values, width, splitter, pipeline).tag("result");
TStream<Integer> result2 = -> {
int r = jo.get("result").getAsInt();
assertEquals(splitter.applyAsInt(r), jo.get("channel").getAsInt());
return r;
Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
long begin = System.currentTimeMillis();
complete(top, count);
long end = System.currentTimeMillis();
assertTrue(contents.getResult().toString(), contents.valid());
long actDuration = end - begin;
long expMinSerialDuration = resultTuples.length * 100;
long expMinDuration = (resultTuples.length / width) * 100;
System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
// a gross level performance check w/parallel channels
if (Boolean.getBoolean(""))
System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
actDuration < 0.5 * expMinSerialDuration);
public void testParallelBalanced() throws Exception {
Topology top = newTopology("testParallelBalanced");
// arrange for even channels to process ~2x as many as odd channels.
BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> pipeline =
(stream,ch) -> {
long delay = (ch % 2 == 0) ? 10 : 20;
return, delay, TimeUnit.MILLISECONDS));
int width = 4;
int tupCnt = 60;
int expEvenChCnt = 2 * (tupCnt / 3); // even:2/3rds, odd:1/3rd
Integer[] resultTuples = new Integer[tupCnt];
for (int i = 0; i < tupCnt; i++)
resultTuples[i] = i;
AtomicInteger[] chCounts = new AtomicInteger[width];
for (int ch = 0; ch < width; ch++)
chCounts[ch] = new AtomicInteger();
TStream<Integer> values = top.of(resultTuples);
TStream<JsonObject> result = PlumbingStreams.parallelBalanced(values, width, pipeline).tag("result");
TStream<Integer> result2 = -> {
int r = jo.get("result").getAsInt();
int ch = jo.get("channel").getAsInt();
return r;
Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
long begin = System.currentTimeMillis();
complete(top, count);
long end = System.currentTimeMillis();
assertTrue(contents.getResult().toString(), contents.valid());
long actDuration = end - begin;
long expMinSerialDuration = resultTuples.length * 20;
long expMinDuration = (resultTuples.length / width) * 20;
System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
System.out.println(top.getName()+" chCounts="+Arrays.asList(chCounts));
// a gross level performance check w/parallel channels
if (Boolean.getBoolean(""))
System.err.println(top.getName()+" WARNING skipped performance check on 'ci' system use");
assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
actDuration < 0.5 * expMinSerialDuration);
// Verify the balancing seemed to work.
// On loaded systems we've seen eCnt:37 oCnt:23. Settle for expEvenCnt +- 15%
int thresholdCnt = (int) (expEvenChCnt * 0.15);
int evenChCnt = 0;
for (int ch = 0; ch < width; ch++) {
assertTrue("ch:"+ch, chCounts[ch].get() != 0);
if (ch % 2 == 0)
evenChCnt += chCounts[ch].get();
String.format("evenChCnt:%d expEvenChCnt:%d +-:%d", evenChCnt, expEvenChCnt, thresholdCnt),
evenChCnt >= (expEvenChCnt - thresholdCnt)
&& evenChCnt <= (expEvenChCnt + thresholdCnt));
// @Test
// public void testParallelTiming() throws Exception {
// Topology top = newTopology("testParallelTiming");
// BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> pipeline =
// fakeParallelPipelineTiming(100, TimeUnit.MILLISECONDS);
// int width = 5;
// // ToIntFunction<Integer> splitter = tuple -> tuple % width;
// ToIntFunction<JsonObject> splitter = jo -> jo.get("result").getAsInt() % width;
// Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
// TStream<Integer> values = top.of(resultTuples);
// TStream<JsonObject> inStream = -> {
// JsonObject jo = new JsonObject();
// jo.addProperty("result", value);
// jo.addProperty("channel", splitter.applyAsInt(jo));
// jo.addProperty("enterParallelMsec", System.currentTimeMillis());
// return jo;
// });
// TStream<JsonObject> result = PlumbingStreams.parallel(inStream, width, splitter, pipeline).tag("result");
// TStream<Integer> result2 = -> {
// jo.addProperty("exitParallelMsec", System.currentTimeMillis());
// System.out.println("ch="+jo.get("channel").getAsInt()
// +" endPipeline-startPipeline="
// +(jo.get("endPipelineMsec").getAsLong()
// - jo.get("startPipelineMsec").getAsLong())
// +" exitParallel-startPipeine="
// +(jo.get("exitParallelMsec").getAsLong()
// - jo.get("startPipelineMsec").getAsLong()));
// int r = jo.get("result").getAsInt();
// assertEquals(splitter.applyAsInt(jo), jo.get("channel").getAsInt());
// return r;
// });
// Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
// Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
// long begin = System.currentTimeMillis();
// complete(top, count);
// long end = System.currentTimeMillis();
// assertTrue(contents.getResult().toString(), contents.valid());
// long actDuration = end - begin;
// long expMinSerialDuration = resultTuples.length * 100;
// long expMinDuration = (resultTuples.length / width) * 100;
// System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
// // a gross level performance check w/parallel channels
// assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
// actDuration < 0.5 * expMinSerialDuration);
// }
public void testGate() throws Exception {
Topology topology = newTopology("testGate");
TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
Semaphore semaphore = new Semaphore(1);
raw = PlumbingStreams.gate(raw, semaphore);
ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
ArrayList<Integer> arrayResult = new ArrayList<>();
for (int i = 0; i < 5; i++) {
raw.sink(t -> {
//Add 0 to list because semaphore.acquire() in sync has occurred
//Add 1 to list because semaphore.release() has executed
Condition<List<String>> contents = topology.getTester()
.streamContents(raw, "a", "b", "c", "d", "e");
complete(topology, contents);
assertTrue("valid:" + contents.getResult(), contents.valid());
assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
public void testGateWithLocking() throws Exception {
Topology topology = newTopology("testGateWithLocking");
TStream<String> raw = topology.strings("a", "b", "c", "d", "e");
Semaphore semaphore = new Semaphore(3);
raw = PlumbingStreams.gate(raw, semaphore);
ArrayList<Integer> resultAvailablePermits = new ArrayList<>();
ArrayList<Integer> arrayResult = new ArrayList<>();
raw.sink(t -> {
//Add number of availablePermits
Condition<List<String>> contents = topology.getTester().streamContents(raw, "a", "b", "c");
complete(topology, contents, 1000, TimeUnit.MILLISECONDS);
assertTrue("valid:" + contents.getResult(), contents.valid());
assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));