title: “Zipping Elements in a DataSet”

In certain algorithms, one may need to assign unique identifiers to data set elements. This document shows how {% gh_link /flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java “DataSetUtils” %} can be used for that purpose.

  • This will be replaced by the TOC {:toc}

Zip with a Dense Index

For assigning consecutive labels to the elements, the zipWithIndex method should be called. It receives a data set as input and returns a new data set of unique id, initial value tuples. For example, the following code:

DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithIndex(in);

result.writeAsCsv(resultPath, “\n”, “,”); env.execute(); {% endhighlight %}

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val input: DataSet[String] = env.fromElements(“A”, “B”, “C”, “D”, “E”, “F”)

val result: DataSet[(Long, String)] = input.zipWithIndex

result.writeAsCsv(resultPath, “\n”, “,”) env.execute() {% endhighlight %}

will yield the tuples: (0,A), (1,B), (2,C), (3,D), (4,E), (5,F)

Back to top

Zip with an Unique Identifier

In many cases, one may not need to assign consecutive labels. zipWIthUniqueId works in a pipelined fashion, speeding up the label assignment process. This method receives a data set as input and returns a new data set of unique id, initial value tuples. For example, the following code:

DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithUniqueId(in);

result.writeAsCsv(resultPath, “\n”, “,”); env.execute(); {% endhighlight %}

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val input: DataSet[String] = env.fromElements(“A”, “B”, “C”, “D”, “E”, “F”)

val result: DataSet[(Long, String)] = input.zipWithUniqueId

result.writeAsCsv(resultPath, “\n”, “,”) env.execute() {% endhighlight %}

will yield the tuples: (0,A), (2,B), (4,C), (6,D), (8,E), (10,F)

Back to top