blob: e54cd4b4b7ddb5bec4d3a1bd6404a8ad53fba8ca [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.topology.plumbing;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.edgent.function.BiFunction;
import org.apache.edgent.function.Function;
import org.apache.edgent.function.ToIntFunction;
import org.apache.edgent.oplet.plumbing.Barrier;
import org.apache.edgent.oplet.plumbing.Isolate;
import org.apache.edgent.oplet.plumbing.PressureReliever;
import org.apache.edgent.oplet.plumbing.UnorderedIsolate;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.TopologyProvider;
/**
* Plumbing utilities for {@link TStream}.
* Methods that manipulate the flow of tuples in a streaming org.apache.edgent.org.apache.edgent.topology,
* but are not part of the logic of the application.
*/
public class PlumbingStreams {
/**
* Insert a blocking delay between tuples.
* Returned stream is the input stream delayed by {@code delay}.
* <p>
* Delays less than 1msec are translated to a 0 delay.
* <p>
* This function always adds the {@code delay} amount after receiving
* a tuple before forwarding it.
* <p>
* Downstream tuple processing delays will affect
* the overall delay of a subsequent tuple.
* <p>
* e.g., the input stream contains two tuples t1 and t2 and
* the delay is 100ms. The forwarding of t1 is delayed by 100ms.
* Then if a downstream processing delay of 80ms occurs, this function
* receives t2 80ms after it forwarded t1 and it will delay another
* 100ms before forwarding t2. Hence the overall delay between forwarding
* t1 and t2 is 180ms.
* See {@link #blockingThrottle(long, TimeUnit) blockingThrottle}.
*
* @param <T> Tuple type
* @param stream Stream t
* @param delay Amount of time to delay a tuple.
* @param unit Time unit for {@code delay}.
*
* @return Stream that will be delayed.
*/
public static <T> TStream<T> blockingDelay(TStream<T> stream, long delay, TimeUnit unit) {
return stream.map(t -> {try {
Thread.sleep(unit.toMillis(delay));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} return t;}) ;
}
/**
* Maintain a constant blocking delay between tuples.
* The returned stream is the input stream throttled by {@code delay}.
* <p>
* Delays less than 1msec are translated to a 0 delay.
* <p>
* Sample use:
* <pre>{@code
* TStream<String> stream = org.apache.edgent.org.apache.edgent.topology.strings("a", "b, "c");
* // Create a stream with tuples throttled to 1 second intervals.
* TStream<String> throttledStream = blockingThrottle(stream, 1, TimeUnit.SECOND);
* // print out the throttled tuples as they arrive
* throttledStream.peek(t -> System.out.println(new Date() + " - " + t));
* }</pre>
* <p>
* The function adjusts for downstream processing delays.
* The first tuple is not delayed. If {@code delay} has already
* elapsed since the prior tuple was forwarded, the tuple
* is forwarded immediately.
* Otherwise, forwarding the tuple is delayed to achieve
* a {@code delay} amount since forwarding the prior tuple.
* <p>
* e.g., the input stream contains two tuples t1 and t2 and
* the delay is 100ms. The forwarding of t1 is delayed by 100ms.
* Then if a downstream processing delay of 80ms occurs, this function
* receives t2 80ms after it forwarded t1 and it will only delay another
* 20ms (100ms - 80ms) before forwarding t2.
* Hence the overall delay between forwarding t1 and t2 remains 100ms.
*
* @param <T> tuple type
* @param stream the stream to throttle
* @param delay Amount of time to delay a tuple.
* @param unit Time unit for {@code delay}.
* @return the throttled stream
*/
public static <T> TStream<T> blockingThrottle(TStream<T> stream, long delay, TimeUnit unit) {
return stream.map( blockingThrottle(delay, unit) );
}
private static <T> Function<T,T> blockingThrottle(long delay, TimeUnit unit) {
long[] nextTupleTime = { 0 };
return t -> {
long now = System.currentTimeMillis();
if (nextTupleTime[0] != 0) {
if (now < nextTupleTime[0]) {
try {
Thread.sleep(nextTupleTime[0] - now);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
now = System.currentTimeMillis();
}
}
nextTupleTime[0] = now + unit.toMillis(delay);
return t;
};
}
/**
* Insert a blocking delay before forwarding the first tuple and
* no delay for subsequent tuples.
* <p>
* Delays less than 1msec are translated to a 0 delay.
* <p>
* Sample use:
* <pre>{@code
* TStream<String> stream = org.apache.edgent.org.apache.edgent.topology.strings("a", "b, "c");
* // create a stream where the first tuple is delayed by 5 seconds.
* TStream<String> oneShotDelayedStream =
* stream.map( blockingOneShotDelay(5, TimeUnit.SECONDS) );
* }</pre>
*
* @param <T> tuple type
* @param stream input stream
* @param delay Amount of time to delay a tuple.
* @param unit Time unit for {@code delay}.
* @return the delayed stream
*/
public static <T> TStream<T> blockingOneShotDelay(TStream<T> stream, long delay, TimeUnit unit) {
return stream.map( blockingOneShotDelay(delay, unit) );
}
private static <T> Function<T,T> blockingOneShotDelay(long delay, TimeUnit unit) {
long[] initialDelay = { unit.toMillis(delay) };
return t -> {
if (initialDelay[0] != -1) {
try {
Thread.sleep(initialDelay[0]);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
initialDelay[0] = -1;
}
return t;
};
}
/**
* Relieve pressure on upstream processing by discarding tuples.
* This method ensures that upstream processing is not
* constrained by any delay in downstream processing,
* for example by a connector not being able to connect
* to its external system.
* <P>
* Any downstream processing of the returned stream is isolated
* from {@code stream} so that any slow down does not affect {@code stream}.
* When the downstream processing cannot keep up with rate of
* {@code stream} tuples will be dropped from returned stream.
* <BR>
* Up to {@code count} of the most recent tuples per key from {@code stream}
* are maintained when downstream processing is slow, any older tuples
* that have not been submitted to the returned stream will be discarded.
* <BR>
* Tuple order is maintained within a partition but is not guaranteed to
* be maintained across partitions.
* </P>
*
* @param stream Stream to be isolated from downstream processing.
* @param keyFunction Function defining the key of each tuple.
* @param count Maximum number of tuples to maintain when downstream processing is backing up.
* @return Stream that is isolated from and thus relieves pressure on {@code stream}.
*
* @param <T> Tuple type.
* @param <K> Key type.
* @see #isolate(TStream, int) isolate
*/
public static <T,K> TStream<T> pressureReliever(TStream<T> stream, Function<T,K> keyFunction, int count) {
return stream.pipe(new PressureReliever<>(count, keyFunction));
}
/**
* Isolate upstream processing from downstream processing.
* <BR>
* Implementations may throw {@code OutOfMemoryExceptions}
* if the processing against returned stream cannot keep up
* with the arrival rate of tuples on {@code stream}.
*
* @param <T> Tuple type
* @param stream Stream to be isolated from downstream processing.
* @param ordered {@code true} to maintain arrival order on the returned stream,
* {@code false} to not guaranteed arrival order.
* @return Stream that is isolated from {@code stream}.
*/
public static <T> TStream<T> isolate(TStream<T> stream, boolean ordered) {
return stream.pipe(
ordered ? new Isolate<T>() : new UnorderedIsolate<T>());
}
/**
* Isolate upstream processing from downstream processing.
* <P>
* If the processing against the returned stream cannot keep up
* with the arrival rate of tuples on {@code stream}, upstream
* processing will block until there is space in the queue between
* the streams.
* </P><P>
* Processing of tuples occurs in the order they were received.
* </P>
*
* @param <T> Tuple type
* @param stream Stream to be isolated from downstream processing.
* @param queueCapacity size of the queue between {@code stream} and
* the returned stream.
* @return Stream that is isolated from {@code stream}.
* @see #pressureReliever(TStream, Function, int) pressureReliever
*/
public static <T> TStream<T> isolate(TStream<T> stream, int queueCapacity) {
return stream.pipe(new Isolate<T>(queueCapacity));
}
/**
* Perform analytics concurrently.
* <P>
* This is a convenience function that calls
* {@link #concurrent(TStream, List, Function)} after
* creating {@code pipeline} and {@code combiner} functions
* from the supplied {@code mappers} and {@code combiner} arguments.
* </P><P>
* That is, it is logically, if not exactly, the same as:
* </P>
* <pre>{@code
* List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
* for (Function<T,U> mapper : mappers)
* pipelines.add(s -> s.map(mapper));
* concurrent(stream, pipelines, combiner);
* }</pre>
*
* @param <T> Tuple type on input stream.
* @param <U> Tuple type generated by mappers.
* @param <R> Tuple type of the result.
*
* @param stream input stream
* @param mappers functions to be run concurrently. Each mapper MUST
* return a non-null result.
* A runtime error will be generated if a null result
* is returned.
* @param combiner function to create a result tuple from the list of
* results from {@code mappers}.
* The input list order is 1:1 with the {@code mappers} list.
* I.e., list entry [0] is the result from mappers[0],
* list entry [1] is the result from mappers[1], etc.
* @return result stream
*/
public static <T,U,R> TStream<R> concurrentMap(TStream<T> stream, List<Function<T,U>> mappers, Function<List<U>,R> combiner) {
Objects.requireNonNull(stream, "stream");
Objects.requireNonNull(mappers, "mappers");
Objects.requireNonNull(combiner, "combiner");
List<Function<TStream<T>,TStream<U>>> pipelines = new ArrayList<>();
for (Function<T,U> mapper : mappers) {
pipelines.add(s -> s.map(mapper));
}
return concurrent(stream, pipelines, combiner);
}
/**
* Perform analytics concurrently.
* <P>
* Process input tuples one at at time, invoking the specified
* analytics ({@code pipelines}) concurrently, combine the results,
* and then process the next input tuple in the same manner.
* </P><P>
* Logically, instead of doing this:
* </P>
* <pre>{@code
* sensorReadings<T> -> A1 -> A2 -> A3 -> results<R>
* }</pre>
* create a org.apache.edgent.graph that's logically like this:
* <pre>{@code
* -
* |-> A1 ->|
* sensorReadings<T> -> |-> A2 ->| -> results<R>
* |-> A3 ->|
*
* }</pre>
* more specifically a org.apache.edgent.graph like this:
* <pre>{@code
* -
* |-> isolate(1) -> pipeline1 -> |
* stream -> |-> isolate(1) -> pipeline2 -> |-> barrier(10) -> combiner
* |-> isolate(1) -> pipeline3 -> |
* . . .
* }</pre>
* <P>
* The typical use case for this is when an application has a collection
* of independent analytics to perform on each tuple and the analytics
* are sufficiently long running such that performing them concurrently
* is desired.
* </P><P>
* Note, this is in contrast to "parallel" stream processing,
* which in Java8 Streams and other contexts means processing multiple
* tuples in parallel, each on a replicated processing pipeline.
* </P><P>
* Threadsafety - one of the following must be true:
* </P>
* <ul>
* <li>the tuples from {@code stream} are threadsafe</li>
* <li>the {@code pipelines} do not modify the input tuples</li>
* <li>the {@code pipelines} provide their own synchronization controls
* to protect concurrent modifications of the input tuples</li>
* </ul>
* <P>
* Logically, a thread is allocated for each of the {@code pipelines}.
* The actual degree of concurrency may be {@link TopologyProvider} dependent.
* </P>
*
* @param <T> Tuple type on input stream.
* @param <U> Tuple type generated by pipelines.
* @param <R> Tuple type of the result.
*
* @param stream input stream
* @param pipelines a list of functions to add a pipeline to the org.apache.edgent.org.apache.edgent.topology.
* Each {@code pipeline.apply()} is called with {@code stream}
* as the input, yielding the pipeline's result stream.
* For each input tuple, a pipeline MUST create exactly one output tuple.
* Tuple flow into the pipelines will cease if that requirement
* is not met.
* @param combiner function to create a result tuple from the list of
* results from {@code pipelines}.
* The input tuple list's order is 1:1 with the {@code pipelines} list.
* I.e., list entry [0] is the result from pipelines[0],
* list entry [1] is the result from pipelines[1], etc.
* @return result stream
* @see #barrier(List, int) barrier
*/
public static <T,U,R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>,TStream<U>>> pipelines, Function<List<U>,R> combiner) {
Objects.requireNonNull(stream, "stream");
Objects.requireNonNull(pipelines, "pipelines");
Objects.requireNonNull(combiner, "combiner");
int barrierQueueCapacity = 10; // don't preclude pipelines from getting ahead some.
// Add concurrent (isolated) fanouts
List<TStream<T>> fanouts = new ArrayList<>(pipelines.size());
for (int i = 0; i < pipelines.size(); i++)
fanouts.add(isolate(stream, 1).tag("concurrent.isolated-ch"+i));
// Add pipelines
List<TStream<U>> results = new ArrayList<>(pipelines.size());
int ch = 0;
for (Function<TStream<T>,TStream<U>> pipeline : pipelines) {
results.add(pipeline.apply(fanouts.get(ch)).tag("concurrent-ch"+ch));
ch++;
}
// Add the barrier
TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
// Add the combiner
return barrier.map(combiner);
}
/**
* A tuple synchronization barrier.
* <P>
* Same as {@code barrier(others, 1)}
* </P>
* @param <T> Tuple type
* @param streams input streams
* @return the output stream
* @see #barrier(List, int)
*/
public static <T> TStream<List<T>> barrier(List<TStream<T>> streams) {
return barrier(streams, 1);
}
/**
* A tuple synchronization barrier.
* <P>
* A barrier has n input streams with tuple type {@code T}
* and one output stream with tuple type {@code List<T>}.
* Once the barrier receives one tuple on each of its input streams,
* it generates an output tuple containing one tuple from each input stream.
* It then waits until it has received another tuple from each input stream.
* </P><P>
* Input stream 0's tuple is in the output tuple's list[0],
* stream 1's tuple in list[1], and so on.
* </P><P>
* The barrier's output stream is isolated from the input streams.
* </P><P>
* The barrier has a queue of size {@code queueCapacity} for each
* input stream. When a tuple for an input stream is received it is
* added to its queue. The stream will block if the queue is full.
* </P>
*
* @param <T> Type of the tuple.
*
* @param streams the list of input streams
* @param queueCapacity the size of each input stream's queue
* @return the output stream
* @see Barrier
*/
public static <T> TStream<List<T>> barrier(List<TStream<T>> streams, int queueCapacity) {
List<TStream<T>> others = new ArrayList<>(streams);
TStream<T> s1 = others.remove(0);
return s1.fanin(new Barrier<T>(queueCapacity), others);
}
/**
* Perform an analytic function on tuples in parallel.
* <P>
* Same as {@code parallel(stream, width, splitter, (s,ch) -> s.map(t -> mapper.apply(t, ch))}
* </P>
* @param <T> Input stream tuple type
* @param <U> Result stream tuple type
* @param stream input stream
* @param splitter the tuple channel allocation function
* @param mapper analytic function
* @param width number of channels
* @return the unordered result stream
* @see #roundRobinSplitter(int) roundRobinSplitter
* @see #concurrentMap(TStream, List, Function) concurrentMap
*/
public static <T,U> TStream<U> parallelMap(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<T,Integer,U> mapper) {
BiFunction<TStream<T>,Integer,TStream<U>> pipeline = (s,ch) -> s.map(t -> mapper.apply(t, ch));
return parallel(stream, width, splitter, pipeline);
}
/**
* Perform an analytic pipeline on tuples in parallel.
* <P>
* Splits {@code stream} into {@code width} parallel processing channels,
* partitioning tuples among the channels using {@code splitter}.
* Each channel runs a copy of {@code pipeline}.
* The resulting stream is isolated from the upstream parallel channels.
* </P><P>
* The ordering of tuples in {@code stream} is not maintained in the
* results from {@code parallel}.
* </P><P>
* {@code pipeline} is not required to yield a result for each input
* tuple.
* </P><P>
* A common splitter function is a {@link #roundRobinSplitter(int) roundRobinSplitter}.
* </P><P>
* The generated org.apache.edgent.graph looks like this:
* </P>
* <pre>{@code
* -
* |-> isolate(10) -> pipeline-ch1 -> |
* stream -> split(width,splitter) -> |-> isolate(10) -> pipeline-ch2 -> |-> union -> isolate(width)
* |-> isolate(10) -> pipeline-ch3 -> |
* . . .
* }</pre>
*
* @param <T> Input stream tuple type
* @param <R> Result stream tuple type
*
* @param stream the input stream
* @param width number of parallel processing channels
* @param splitter the tuple channel allocation function
* @param pipeline the pipeline for each channel.
* {@code pipeline.apply(inputStream,channel)}
* is called to generate the pipeline for each channel.
* @return the isolated unordered result from each parallel channel
* @see #roundRobinSplitter(int) roundRobinSplitter
* @see #concurrent(TStream, List, Function) concurrent
*/
public static <T,R> TStream<R> parallel(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<TStream<T>,Integer,TStream<R>> pipeline) {
Objects.requireNonNull(stream, "stream");
if (width < 1)
throw new IllegalArgumentException("width");
Objects.requireNonNull(splitter, "splitter");
Objects.requireNonNull(pipeline, "pipeline");
// Add the splitter
List<TStream<T>> channels = stream.split(width, splitter);
for (int ch = 0; ch < width; ch++)
channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
// Add concurrency (isolation) to the channels
int chBufferSize = 10; // don't immediately block stream if channel is busy
for (int ch = 0; ch < width; ch++)
channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
// Add pipelines
List<TStream<R>> results = new ArrayList<>(width);
for (int ch = 0; ch < width; ch++) {
results.add(pipeline.apply(channels.get(ch), ch).tag("parallel-ch"+ch));
}
// Add the Union
TStream<R> result = results.get(0).union(new HashSet<>(results)).tag("parallel.union");
// Add the isolate - keep channel threads to just their pipeline processing
return isolate(result, width);
}
/**
* Perform an analytic pipeline on tuples in parallel.
* <P>
* Splits {@code stream} into {@code width} parallel processing channels,
* partitioning tuples among the channels in a load balanced fashion.
* Each channel runs a copy of {@code pipeline}.
* The resulting stream is isolated from the upstream parallel channels.
* </P><P>
* The ordering of tuples in {@code stream} is not maintained in the
* results from {@code parallel}.
* </P><P>
* A {@code pipeline} <b>MUST</b> yield a result for each input
* tuple. Failure to do so will result in the channel remaining
* in a busy state and no longer available to process additional tuples.
* </P><P>
* A {@link LoadBalancedSplitter} is used to distribute tuples.
* </P><P>
* The generated org.apache.edgent.graph looks like this:
* </P>
* <pre>{@code
* -
* |-> isolate(1) -> pipeline-ch1 -> peek(splitter.channelDone()) -> |
* stream -> split(width,splitter) -> |-> isolate(1) -> pipeline-ch2 -> peek(splitter.channelDone()) -> |-> union -> isolate(width)
* |-> isolate(1) -> pipeline-ch3 -> peek(splitter.channelDone()) -> |
* . . .
* }</pre>
* <P>
* Note, this implementation requires that the splitter is used from
* only a single JVM. The {@code org.apache.edgent.providers.direct.DirectProvider}
* provider meets this requirement.
* </P>
*
* @param <T> Input stream tuple type
* @param <R> Result stream tuple type
*
* @param stream the input stream
* @param width number of parallel processing channels
* @param pipeline the pipeline for each channel.
* {@code pipeline.apply(inputStream,channel)}
* is called to generate the pipeline for each channel.
* @return the isolated unordered result from each parallel channel
* @see #parallel(TStream, int, ToIntFunction, BiFunction)
* @see LoadBalancedSplitter
*/
public static <T,R> TStream<R> parallelBalanced(TStream<T> stream, int width, BiFunction<TStream<T>,Integer,TStream<R>> pipeline) {
Objects.requireNonNull(stream, "stream");
if (width < 1)
throw new IllegalArgumentException("width");
Objects.requireNonNull(pipeline, "pipeline");
LoadBalancedSplitter<T> splitter = new LoadBalancedSplitter<>(width);
// Add the splitter
List<TStream<T>> channels = stream.split(width, splitter);
for (int ch = 0; ch < width; ch++)
channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
// Add concurrency (isolation) to the channels
int chBufferSize = 1; // 1 is enough with load balanced impl
for (int ch = 0; ch < width; ch++)
channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
// Add pipelines
List<TStream<R>> results = new ArrayList<>(width);
for (int ch = 0; ch < width; ch++) {
final int finalCh = ch;
results.add(pipeline.apply(channels.get(ch), ch)
.tag("parallel-ch"+ch)
.peek(tuple -> splitter.channelDone(finalCh)));
}
// Add the Union
TStream<R> result = results.get(0).union(new HashSet<>(results)).tag("parallel.union");
// Add the isolate - keep channel threads to just their pipeline processing
return isolate(result, width);
}
/**
* A round-robin splitter ToIntFunction
* <P>
* The splitter function cycles among the {@code width} channels
* on successive calls to {@code roundRobinSplitter.applyAsInt()},
* returning {@code 0, 1, ..., width-1, 0, 1, ..., width-1}.
* </P>
* @param <T> Tuple type
* @param width number of splitter channels
* @return the splitter
* @see TStream#split(int, ToIntFunction) TStream.split
* @see PlumbingStreams#parallel(TStream, int, ToIntFunction, BiFunction) parallel
*/
public static <T> ToIntFunction<T> roundRobinSplitter(int width) {
AtomicInteger cnt = new AtomicInteger();
return tuple -> cnt.getAndIncrement() % width;
}
/**
* Control the flow of tuples to an output stream.
* <P>
* A {@link Semaphore} is used to control the flow of tuples
* through the {@code gate}.
* The gate acquires a permit from the
* semaphore to pass the tuple through, blocking until a permit is
* acquired (and applying backpressure upstream while blocked).
* Elsewhere, some code calls {@link Semaphore#release(int)}
* to make permits available.
* </P><P>
* If a TopologyProvider is used that can distribute a org.apache.edgent.org.apache.edgent.topology's
* streams to different JVM's the gate and the code releasing the
* permits must be in the same JVM.
* </P><P>
* Sample use:
* <BR>
* Suppose you wanted to control processing such that concurrent
* pipelines processed each tuple in lock-step.
* I.e., You want all of the pipelines to start processing a tuple
* at the same time and not start a new tuple until the current
* tuple had been fully processed by each of them:
* </P>
* <pre>{@code
* TStream<Integer> readings = ...;
* Semaphore gateControl = new Semaphore(1); // allow the first to pass through
* TStream<Integer> gated = PlumbingStreams.gate(readings, gateControl);
*
* // Create the concurrent pipeline combiner and have it
* // signal that concurrent processing of the tuple has completed.
* // In this sample the combiner just returns the received list of
* // each pipeline result.
*
* Function<TStream<List<Integer>>,TStream<List<Integer>>> combiner =
* stream -> stream.map(list -> {
* gateControl.release();
* return list;
* });
*
* TStream<List<Integer>> results = PlumbingStreams.concurrent(gated, pipelines, combiner);
* }</pre>
* @param <T> Tuple type
* @param stream the input stream
* @param semaphore gate control
* @return gated stream
*/
public static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore) {
return stream.map(tuple -> {
try {
semaphore.acquire();
return tuple;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("interrupted", e);
}
});
}
}