title: “Side Outputs” nav-title: “Side Outputs” nav-parent_id: streaming nav-pos: 36

  • This will be replaced by the TOC {:toc}

In addition to the main stream that results from DataStream operations, you can also produce any number of additional side output result streams. The type of data in the result streams does not have to match the type of data in the main stream and the types of the different side outputs can also differ. This operation can be useful when you want to split a stream of data where you would normally have to replicate the stream and then filter out from each stream the data that you don't want to have.

When using side outputs, you first need to define an OutputTag that will be used to identify a side output stream:

{% highlight java %} // this needs to be an anonymous inner class, so that we can analyze the type OutputTag outputTag = new OutputTag(“side-output”) {}; {% endhighlight %}

Notice how the OutputTag is typed according to the type of elements that the side output stream contains.

Emitting data to a side output is possible from the following functions:

  • [ProcessFunction]({{ site.baseurl }}/dev/stream/operators/process_function.html)
  • CoProcessFunction
  • [ProcessWindowFunction]({{ site.baseurl }}/dev/stream/operators/windows.html#processwindowfunction)
  • ProcessAllWindowFunction

You can use the Context parameter, which is exposed to users in the above functions, to emit data to a side output identified by an OutputTag. Here is an example of emitting side output data from a ProcessFunction:

{% highlight java %} DataStream input = ...;

final OutputTag outputTag = new OutputTag(“side-output”){};

SingleOutputStreamOperator mainDataStream = input .process(new ProcessFunction<Integer, Integer>() {

  @Override
  public void processElement(
      Integer value,
      Context ctx,
      Collector<Integer> out) throws Exception {
    // emit data to regular output
    out.collect(value);

    // emit data to side output
    ctx.output(outputTag, "sideout-" + String.valueOf(value));
  }
});

{% endhighlight %}

val input: DataStream[Int] = ... val outputTag = OutputTagString

val mainDataStream = input .process(new ProcessFunction[Int, Int] { override def processElement( value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = { // emit data to regular output out.collect(value)

  // emit data to side output
  ctx.output(outputTag, "sideout-" + String.valueOf(value))
}

}) {% endhighlight %}

For retrieving the side output stream you use getSideOutput(OutputTag) on the result of the DataStream operation. This will give you a DataStream that is typed to the result of the side output stream:

{% highlight java %} final OutputTag outputTag = new OutputTag(“side-output”){};

SingleOutputStreamOperator mainDataStream = ...;

DataStream sideOutputStream = mainDataStream.getSideOutput(outputTag); {% endhighlight %}

val mainDataStream = ...

val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag) {% endhighlight %}

{% top %}