title: Streaming concepts

An Edgent application is most useful when processing some sort of data. This page is intended to help you understand stream processing concepts by visually demonstrating some of the operations that can be invoked on a stream, along with code snippets. For example,

  • [filter]({{ site.docsurl }}/org/apache/{{ site.data.project.unix_name }}/topology/TStream.html#filter-org.apache.{{ site.data.project.unix_name }}.function.Predicate-)
  • [split]({{ site.docsurl }}/org/apache/{{ site.data.project.unix_name }}/topology/TStream.html#split-int-org.apache.{{ site.data.project.unix_name }}.function.ToIntFunction-)
  • [union]({{ site.docsurl }}/org/apache/{{ site.data.project.unix_name }}/topology/TStream.html#union-org.apache.{{ site.data.project.unix_name }}.topology.TStream-)
  • [partitioned window]({{ site.docsurl }}/org/apache/{{ site.data.project.unix_name }}/topology/TStream.html#last-long-java.util.concurrent.TimeUnit-org.apache.{{ site.data.project.unix_name }}.function.Function-)
  • [continuous aggregation]({{ site.docsurl }}/org/apache/{{ site.data.project.unix_name }}/topology/TWindow.html#aggregate-org.apache.{{ site.data.project.unix_name }}.function.BiFunction-)
  • [batch]({{ site.docsurl }}/org/apache/{{ site.data.project.unix_name }}/topology/TWindow.html#batch-org.apache.{{ site.data.project.unix_name }}.function.BiFunction-)

Filter

Edgent code

TStream<Integer> filtered = stream.filter(t -> t >= 5);

Explanation

Input stream: Tuples, such as sensor readings, are continually produced and represented as a red stream on the left.

Filtered stream: A filter operation of t >= 5 is applied, resulting in a green output stream on the right that contains tuples with values greater than or equal to five.


Split

Edgent code

List<TStream<String>> streams = stream.split(2, tuple -> tuple.getVal());

Explanation

Input streams: Tuples, such as sensor readings, are continually produced and represented as a red stream on the left.

Split streams: A split operation of getVal() is applied, resulting in two output streams on the right, one green and one purple. The green stream contains tuples with values that are even integers, while the purple stream contains tuples with values that are odd integers.


Union

Edgent code

TStream<String> stream = stream1.union(stream2);

Explanation

Input streams: Tuples, such as sensor readings, are continually produced on two different streams, represented as a red stream and a green stream on the left.

Unioned stream: A union operation is applied, resulting in a single purple output stream on the right. The stream contains tuples from both input streams.


Window (last 5 seconds)

Edgent code

TWindow<Integer> window = stream.last(5, TimeUnit.SECONDS, tuple -> 0);

Explanation

Input stream: Tuples, such as sensor readings, are continually produced and represented as a red stream on the left.

Window: A window declaration of the last five seconds of tuples results in the window outlined in blue containing green tuples. A tuple is inserted into the window when it appears on the stream and is evicted five seconds later, since it will have been in the window for five seconds at that point.


Continuous aggregate (max, last 5 seconds)

Edgent code

TWindow<Integer> window = stream.last(5, TimeUnit.SECONDS, tuple -> 0);
TStream<Integer> max = window.aggregate((tuples, key) -> {
    return Collections.max(tuples);
});

Explanation

Input stream: Tuples, such as sensor readings, are continually produced and represented as a red stream on the left.

Window: A window declaration of the last five seconds of tuples results in the window outlined in blue containing green tuples. A tuple is inserted into the window when it appears on the stream and is evicted five seconds later, since it will have been in the window for five seconds at that point.

Aggregation: The window is continuously aggregated to find the maximum value in the window. Continuously aggregated means that every time the window contents changes, the aggregate is calculated and a tuple is produced on the purple output stream. The window changes every time a tuple is inserted or evicted, where time-based window insertions and evictions are independent events.


Batch (size, last 5 seconds)

Edgent code

TWindow<Integer> window = stream.last(5, TimeUnit.SECONDS, tuple -> 0);
TStream<Integer> size = window.batch((tuples, key) -> {
    return tuples.size();
});

Explanation

Input stream: Tuples, such as sensor readings, are continually produced and represented as a red stream on the left.

Window: A window declaration of the last five seconds of tuples results in the window outlined in blue containing green tuples. A tuple is inserted into the window when it appears on the stream and is evicted five seconds later, since it will have been in the window for five seconds at that point.

Batch: The window is batched to find the number of tuples in the window. A batch means that every 5 seconds, the size of the window is calculated and a tuple is produced on the purple output stream.