Revert "remove processContext usage across examples (java and kotlin)" (#38341)
* Revert "remove processContext usage across examples (java and kotlin) (#37937)"
This reverts commit 4151dded54fc3b120e8a2b382ce64f9eee576a06.
* Restore SKILLs.md as it does not affect Beam playground
diff --git a/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java
index 167d301..2a5f85e 100644
--- a/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java
+++ b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java
@@ -28,8 +28,6 @@
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -76,8 +74,9 @@
static class ExtractBrowserTransactionsFn extends DoFn<Row, KV<String, Long>> {
@ProcessElement
- public void processElement(@Element Row row, OutputReceiver<KV<String, Long>> receiver) {
- receiver.output(
+ public void processElement(ProcessContext c) {
+ Row row = c.element();
+ c.output(
KV.of(
Preconditions.checkStateNotNull(row.getString("browser")),
Preconditions.checkStateNotNull(row.getInt64("transactions"))));
@@ -86,13 +85,13 @@
static class FormatCountsFn extends DoFn<KV<String, Long>, Row> {
@ProcessElement
- public void processElement(@Element KV<String, Long> element, OutputReceiver<Row> receiver) {
+ public void processElement(ProcessContext c) {
Row row =
Row.withSchema(AGGREGATED_SCHEMA)
- .withFieldValue("browser", element.getKey())
- .withFieldValue("transaction_count", element.getValue())
+ .withFieldValue("browser", c.element().getKey())
+ .withFieldValue("transaction_count", c.element().getValue())
.build();
- receiver.output(row);
+ c.output(row);
}
}
diff --git a/examples/java/sql/src/main/java/org/apache/beam/examples/SchemaTransformExample.java b/examples/java/sql/src/main/java/org/apache/beam/examples/SchemaTransformExample.java
index 8415568..5b8e4a3 100644
--- a/examples/java/sql/src/main/java/org/apache/beam/examples/SchemaTransformExample.java
+++ b/examples/java/sql/src/main/java/org/apache/beam/examples/SchemaTransformExample.java
@@ -42,8 +42,6 @@
import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
@@ -103,9 +101,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/sql/src/main/java/org/apache/beam/examples/SqlTransformExample.java b/examples/java/sql/src/main/java/org/apache/beam/examples/SqlTransformExample.java
index 9ca2dda..9c2302c 100644
--- a/examples/java/sql/src/main/java/org/apache/beam/examples/SqlTransformExample.java
+++ b/examples/java/sql/src/main/java/org/apache/beam/examples/SqlTransformExample.java
@@ -41,8 +41,6 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
@@ -97,9 +95,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/ApproximateQuantilesExample.java b/examples/java/src/main/java/org/apache/beam/examples/ApproximateQuantilesExample.java
index 0f968e8..9e2a96b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/ApproximateQuantilesExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/ApproximateQuantilesExample.java
@@ -24,8 +24,6 @@
import org.apache.beam.sdk.transforms.ApproximateQuantiles;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
@@ -72,9 +70,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java
index 1aafd50..0ee7d7b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java
@@ -46,8 +46,6 @@
import org.apache.beam.sdk.transforms.CombineFns;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
@@ -187,16 +185,13 @@
new DoFn<
KV<Long, CombineFns.CoCombineResult>, KV<Long, Iterable<KV<String, Long>>>>() {
@ProcessElement
- public void processElement(
- @Element KV<Long, CombineFns.CoCombineResult> element,
- OutputReceiver<KV<Long, Iterable<KV<String, Long>>>> receiver)
- throws Exception {
- CombineFns.CoCombineResult e = element.getValue();
+ public void processElement(ProcessContext c) throws Exception {
+ CombineFns.CoCombineResult e = c.element().getValue();
ArrayList<KV<String, Long>> o = new ArrayList<KV<String, Long>>();
o.add(KV.of(minTag.getId(), e.get(minTag)));
o.add(KV.of(maxTag.getId(), e.get(maxTag)));
o.add(KV.of(sumTag.getId(), e.get(sumTag)));
- receiver.output(KV.of(element.getKey(), o));
+ c.output(KV.of(c.element().getKey(), o));
}
}));
@@ -215,9 +210,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/CoGroupByKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/CoGroupByKeyExample.java
index 46905ad..c77708b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/CoGroupByKeyExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/CoGroupByKeyExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -86,9 +84,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/CombineExample.java b/examples/java/src/main/java/org/apache/beam/examples/CombineExample.java
index 5f6901e..24bed27 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/CombineExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/CombineExample.java
@@ -23,8 +23,6 @@
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;
@@ -70,9 +68,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/CountExample.java b/examples/java/src/main/java/org/apache/beam/examples/CountExample.java
index f95ec3f..cb0bd0e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/CountExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/CountExample.java
@@ -23,8 +23,6 @@
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
@@ -65,9 +63,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/CountPerKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/CountPerKeyExample.java
index f02c262..9bf9bf1 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/CountPerKeyExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/CountPerKeyExample.java
@@ -23,8 +23,6 @@
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -69,9 +67,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/CreateExample.java b/examples/java/src/main/java/org/apache/beam/examples/CreateExample.java
index b5eb5ba..5943ffa 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/CreateExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/CreateExample.java
@@ -27,8 +27,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -81,9 +79,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 07c5be7..7c54e23 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -46,8 +46,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -117,19 +115,18 @@
private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unmatchedWords");
@ProcessElement
- public void processElement(
- @Element KV<String, Long> element, OutputReceiver<KV<String, Long>> receiver) {
- if (filter.matcher(element.getKey()).matches()) {
+ public void processElement(ProcessContext c) {
+ if (filter.matcher(c.element().getKey()).matches()) {
// Log at the "DEBUG" level each element that we match. When executing this pipeline
// these log lines will appear only if the log level is set to "DEBUG" or lower.
- LOG.debug("Matched: {}", element.getKey());
+ LOG.debug("Matched: {}", c.element().getKey());
matchedWords.inc();
- receiver.output(element);
+ c.output(c.element());
} else {
// Log at the "TRACE" level each element that is not matched. Different log levels
// can be used to control the verbosity of logging providing an effective mechanism
// to filter less important information.
- LOG.trace("Did not match: {}", element.getKey());
+ LOG.trace("Did not match: {}", c.element().getKey());
unmatchedWords.inc();
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DistinctExample.java b/examples/java/src/main/java/org/apache/beam/examples/DistinctExample.java
index 28fd4af..d3ff9d6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DistinctExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DistinctExample.java
@@ -23,8 +23,6 @@
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
@@ -70,9 +68,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/FlatMapElementsExample.java b/examples/java/src/main/java/org/apache/beam/examples/FlatMapElementsExample.java
index 4e9493a..71d05ac 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/FlatMapElementsExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/FlatMapElementsExample.java
@@ -24,8 +24,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.InferableFunction;
import org.apache.beam.sdk.transforms.ParDo;
@@ -80,9 +78,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java b/examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java
index 2120795..78e898b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -76,9 +74,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaPassengerCountJson.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaPassengerCountJson.java
index 5b26455..20f7023 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KafkaPassengerCountJson.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaPassengerCountJson.java
@@ -55,8 +55,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Values;
@@ -111,13 +109,10 @@
ParDo.of(
new DoFn<String, KV<Integer, Integer>>() {
@ProcessElement
- public void processElement(
- @Element String element, OutputReceiver<KV<Integer, Integer>> receiver)
- throws JsonProcessingException {
+ public void processElement(ProcessContext c) throws JsonProcessingException {
final VendorToPassengerDTO result =
- om.readValue(element, new TypeReference<VendorToPassengerDTO>() {});
- receiver.output(
- KV.of(result.getVendorIdField(), result.getPassengerCountField()));
+ om.readValue(c.element(), new TypeReference<VendorToPassengerDTO>() {});
+ c.output(KV.of(result.getVendorIdField(), result.getPassengerCountField()));
}
}))
.apply(
@@ -129,11 +124,11 @@
new DoFn<KV<Integer, Integer>, KV<Integer, Integer>>() {
@ProcessElement
public void processElement(
- OutputReceiver<KV<Integer, Integer>> out,
- @Element KV<Integer, Integer> element) {
+ ProcessContext c, OutputReceiver<KV<Integer, Integer>> out) {
System.out.printf(
- "Vendor: %s, Passengers: %s%n", element.getKey(), element.getValue());
- out.output(element);
+ "Vendor: %s, Passengers: %s%n",
+ c.element().getKey(), c.element().getValue());
+ out.output(c.element());
}
}));
p.run().waitUntilFinish();
diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaStreaming.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaStreaming.java
index 327d321..f0b0922 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KafkaStreaming.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaStreaming.java
@@ -49,8 +49,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -177,9 +175,8 @@
private static final Random RANDOM = new Random();
@ProcessElement
- public void processElement(
- @Element Object element, OutputReceiver<KV<String, Integer>> receiver) {
- receiver.output(generate());
+ public void processElement(ProcessContext c) {
+ c.output(generate());
}
public KV<String, Integer> generate() {
@@ -293,17 +290,17 @@
static class LogResults extends DoFn<Map<String, Integer>, Map<String, Integer>> {
@ProcessElement
- public void processElement(
- PaneInfo pane,
- IntervalWindow w,
- @Element Map<String, Integer> scores,
- OutputReceiver<Map<String, Integer>> receiver)
- throws Exception {
+ public void processElement(ProcessContext c, IntervalWindow w) throws Exception {
+ Map<String, Integer> map = c.element();
+ if (map == null) {
+ c.output(c.element());
+ return;
+ }
String startTime = w.start().toString(dateTimeFormatter);
String endTime = w.end().toString(dateTimeFormatter);
- PaneInfo.Timing timing = pane.getTiming();
+ PaneInfo.Timing timing = c.pane().getTiming();
switch (timing) {
case EARLY:
@@ -319,7 +316,7 @@
throw new RuntimeException("Unknown timing value");
}
- for (Map.Entry<String, Integer> entry : scores.entrySet()) {
+ for (Map.Entry<String, Integer> entry : map.entrySet()) {
System.out.printf("%10s: %-10s%n", entry.getKey(), entry.getValue());
}
@@ -329,7 +326,7 @@
System.out.println();
}
- receiver.output(scores);
+ c.output(c.element());
}
}
@@ -343,9 +340,9 @@
static class LogErrorFn extends DoFn<BadRecord, BadRecord> {
@ProcessElement
- public void processElement(@Element BadRecord badRecord, OutputReceiver<BadRecord> receiver) {
- System.out.println(badRecord);
- receiver.output(badRecord);
+ public void processElement(@Element BadRecord record, OutputReceiver<BadRecord> receiver) {
+ System.out.println(record);
+ receiver.output(record);
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java
index fad5668..9e2da24 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java
@@ -51,8 +51,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -105,11 +103,10 @@
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
- public void processElement(
- @Element String element, OutputReceiver<String> receiver) {
- for (String word : element.split(TOKENIZER_PATTERN, 0)) {
+ public void processElement(ProcessContext c) {
+ for (String word : c.element().split(TOKENIZER_PATTERN, 0)) {
if (!word.isEmpty()) {
- receiver.output(word);
+ c.output(word);
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java
index 036cc67..355614b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java
@@ -52,8 +52,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -106,11 +104,10 @@
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
- public void processElement(
- @Element String element, OutputReceiver<String> receiver) {
- for (String word : element.split(TOKENIZER_PATTERN, 0)) {
+ public void processElement(ProcessContext c) {
+ for (String word : c.element().split(TOKENIZER_PATTERN, 0)) {
if (!word.isEmpty()) {
- receiver.output(word);
+ c.output(word);
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/KeysExample.java b/examples/java/src/main/java/org/apache/beam/examples/KeysExample.java
index c9dc401..155834b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KeysExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KeysExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -72,9 +70,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/KvSwapExample.java b/examples/java/src/main/java/org/apache/beam/examples/KvSwapExample.java
index 192dc89..090779d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KvSwapExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KvSwapExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.KvSwap;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -71,9 +69,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/LatestExample.java b/examples/java/src/main/java/org/apache/beam/examples/LatestExample.java
index ef4c50d..5e9662a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/LatestExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/LatestExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Latest;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithTimestamps;
@@ -86,9 +84,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MapElementsExample.java b/examples/java/src/main/java/org/apache/beam/examples/MapElementsExample.java
index 60908cf..93d8857 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MapElementsExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MapElementsExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -78,9 +76,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MaxExample.java b/examples/java/src/main/java/org/apache/beam/examples/MaxExample.java
index 845b107..9173d11 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MaxExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MaxExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
@@ -65,9 +63,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MaxPerKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/MaxPerKeyExample.java
index 5ad156a..f5eda81 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MaxPerKeyExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MaxPerKeyExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -69,9 +67,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MeanExample.java b/examples/java/src/main/java/org/apache/beam/examples/MeanExample.java
index fbb5163..a319079 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MeanExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MeanExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Mean;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
@@ -65,9 +63,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MeanPerKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/MeanPerKeyExample.java
index 97f598d..aecccee 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MeanPerKeyExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MeanPerKeyExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Mean;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -69,9 +67,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinExample.java b/examples/java/src/main/java/org/apache/beam/examples/MinExample.java
index db46f13..a76bcdc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
@@ -65,9 +63,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinPerKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/MinPerKeyExample.java
index e8c3c51..d3c0312 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinPerKeyExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinPerKeyExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -69,9 +67,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("LogOutput: {} {}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("LogOutput: {} {}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/PartitionExample.java b/examples/java/src/main/java/org/apache/beam/examples/PartitionExample.java
index c202aed..b34f2bd 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/PartitionExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/PartitionExample.java
@@ -32,8 +32,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Partition;
import org.apache.beam.sdk.values.PCollection;
@@ -194,9 +192,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java
index 57b7a08..3ec8fce 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java
@@ -31,8 +31,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -100,8 +98,8 @@
}
@ProcessElement
- public void processElement(@Element String element, OutputReceiver<String> receiver)
- throws Exception {
+ public void processElement(ProcessContext c) throws Exception {
+ String element = c.element();
try {
Preconditions.checkNotNull(rateLimiter).allow(1);
} catch (Exception e) {
@@ -111,7 +109,7 @@
// Simulate external API call
LOG.info("Processing: {}", element);
Thread.sleep(100);
- receiver.output("Processed: " + element);
+ c.output("Processed: " + element);
}
}
// [END RateLimiterSimpleJava]
diff --git a/examples/java/src/main/java/org/apache/beam/examples/RegexExample.java b/examples/java/src/main/java/org/apache/beam/examples/RegexExample.java
index d80bc9f..a0d4677 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/RegexExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/RegexExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Regex;
import org.apache.beam.sdk.values.PCollection;
@@ -77,9 +75,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/SampleExample.java b/examples/java/src/main/java/org/apache/beam/examples/SampleExample.java
index 8d9db0c..ed1d906 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/SampleExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/SampleExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sample;
import org.apache.beam.sdk.values.KV;
@@ -78,9 +76,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/SumExample.java b/examples/java/src/main/java/org/apache/beam/examples/SumExample.java
index a571d8d..00fcc86 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/SumExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/SumExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;
@@ -65,9 +63,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/SumPerKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/SumPerKeyExample.java
index a2c92ef..45d4a9f 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/SumPerKeyExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/SumPerKeyExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
@@ -69,9 +67,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/ToStringExample.java b/examples/java/src/main/java/org/apache/beam/examples/ToStringExample.java
index 54e7832..23e3db6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/ToStringExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/ToStringExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ToString;
import org.apache.beam.sdk.values.KV;
@@ -76,9 +74,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/TopExample.java b/examples/java/src/main/java/org/apache/beam/examples/TopExample.java
index f602aa3..520af0f 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/TopExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/TopExample.java
@@ -23,8 +23,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.values.PCollection;
@@ -66,9 +64,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/ValuesExample.java b/examples/java/src/main/java/org/apache/beam/examples/ValuesExample.java
index 1b9839a..3fc9e84 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/ValuesExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/ValuesExample.java
@@ -22,8 +22,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.KV;
@@ -72,9 +70,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/ViewExample.java b/examples/java/src/main/java/org/apache/beam/examples/ViewExample.java
index d40d6cc..01c1b1c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/ViewExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/ViewExample.java
@@ -23,9 +23,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.SideInput;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
@@ -86,8 +83,10 @@
@ProcessElement
public void processElement(
@Element KV<String, String> person,
- @SideInput("citiesToCountries") Map<String, String> citiesToCountries,
- OutputReceiver<KV<String, String>> out) {
+ OutputReceiver<KV<String, String>> out,
+ ProcessContext context) {
+ Map<String, String> citiesToCountries =
+ context.sideInput(citiesToCountriesView);
String city = person.getValue();
String country = citiesToCountries.get(city);
if (country == null) {
@@ -96,7 +95,7 @@
out.output(KV.of(person.getKey(), country));
}
})
- .withSideInput("citiesToCountries", citiesToCountriesView));
+ .withSideInputs(citiesToCountriesView));
// [END main_section]
output.apply("Log", ParDo.of(new LogOutput<>("Output: ")));
@@ -113,11 +112,9 @@
}
@ProcessElement
- public void processElement(
- @Element KV<String, String> element, OutputReceiver<KV<String, String>> receiver)
- throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowExample.java b/examples/java/src/main/java/org/apache/beam/examples/WindowExample.java
index d6a3dff..244a42b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowExample.java
@@ -25,8 +25,6 @@
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -89,9 +87,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index 24a3c1b..1111404 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -52,8 +52,6 @@
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
@@ -133,11 +131,10 @@
ParDo.of(
new DoFn<KV<String, Long>, CompletionCandidate>() {
@ProcessElement
- public void processElement(
- @Element KV<String, Long> element,
- OutputReceiver<CompletionCandidate> receiver) {
- receiver.output(
- new CompletionCandidate(element.getKey(), element.getValue()));
+ public void processElement(ProcessContext c) {
+ c.output(
+ new CompletionCandidate(
+ c.element().getKey(), c.element().getValue()));
}
}));
@@ -213,11 +210,9 @@
private static class FlattenTops
extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
@ProcessElement
- public void processElement(
- @Element KV<String, List<CompletionCandidate>> element,
- OutputReceiver<CompletionCandidate> receiver) {
- for (CompletionCandidate cc : element.getValue()) {
- receiver.output(cc);
+ public void processElement(ProcessContext c) {
+ for (CompletionCandidate cc : c.element().getValue()) {
+ c.output(cc);
}
}
}
@@ -272,12 +267,10 @@
}
@ProcessElement
- public void processElement(
- @Element CompletionCandidate element,
- OutputReceiver<KV<String, CompletionCandidate>> receiver) {
- String word = element.value;
+ public void processElement(ProcessContext c) {
+ String word = c.element().value;
for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
- receiver.output(KV.of(word.substring(0, i), element));
+ c.output(KV.of(word.substring(0, i), c.element()));
}
}
}
@@ -339,24 +332,23 @@
/** Takes as input a set of strings, and emits each #hashtag found therein. */
static class ExtractHashtags extends DoFn<String, String> {
@ProcessElement
- public void processElement(@Element String element, OutputReceiver<String> receiver) {
- Matcher m = Pattern.compile("#\\S+").matcher(element);
+ public void processElement(ProcessContext c) {
+ Matcher m = Pattern.compile("#\\S+").matcher(c.element());
while (m.find()) {
- receiver.output(m.group().substring(1));
+ c.output(m.group().substring(1));
}
}
}
static class FormatForBigquery extends DoFn<KV<String, List<CompletionCandidate>>, TableRow> {
@ProcessElement
- public void processElement(
- @Element KV<String, List<CompletionCandidate>> element, OutputReceiver<TableRow> receiver) {
+ public void processElement(ProcessContext c) {
List<TableRow> completions = new ArrayList<>();
- for (CompletionCandidate cc : element.getValue()) {
+ for (CompletionCandidate cc : c.element().getValue()) {
completions.add(new TableRow().set("count", cc.getCount()).set("tag", cc.getValue()));
}
- TableRow row = new TableRow().set("prefix", element.getKey()).set("tags", completions);
- receiver.output(row);
+ TableRow row = new TableRow().set("prefix", c.element().getKey()).set("tags", completions);
+ c.output(row);
}
/** Defines the BigQuery schema used for the output. */
@@ -394,16 +386,15 @@
}
@ProcessElement
- public void processElement(
- @Element KV<String, List<CompletionCandidate>> element, OutputReceiver<Entity> receiver) {
+ public void processElement(ProcessContext c) {
Entity.Builder entityBuilder = Entity.newBuilder();
com.google.datastore.v1.Key key =
- makeKey(makeKey(kind, ancestorKey).build(), kind, element.getKey()).build();
+ makeKey(makeKey(kind, ancestorKey).build(), kind, c.element().getKey()).build();
entityBuilder.setKey(key);
List<Value> candidates = new ArrayList<>();
Map<String, Value> properties = new HashMap<>();
- for (CompletionCandidate tag : element.getValue()) {
+ for (CompletionCandidate tag : c.element().getValue()) {
Entity.Builder tagEntity = Entity.newBuilder();
properties.put("tag", makeValue(tag.value).build());
properties.put("count", makeValue(tag.count).build());
@@ -411,7 +402,7 @@
}
properties.put("candidates", makeValue(candidates).build());
entityBuilder.putAllProperties(properties);
- receiver.output(entityBuilder.build());
+ c.output(entityBuilder.build());
}
}
@@ -536,12 +527,11 @@
ParDo.of(
new DoFn<KV<String, List<CompletionCandidate>>, Long>() {
@ProcessElement
- public void process(
- @Element KV<String, List<CompletionCandidate>> elm,
- OutputReceiver<Long> receiver) {
+ public void process(ProcessContext c) {
+ KV<String, List<CompletionCandidate>> elm = c.element();
Long listHash =
- elm.getValue().stream().mapToLong(cc -> cc.hashCode()).sum();
- receiver.output(Long.valueOf(elm.getKey().hashCode()) + listHash);
+ c.element().getValue().stream().mapToLong(cc -> cc.hashCode()).sum();
+ c.output(Long.valueOf(elm.getKey().hashCode()) + listHash);
}
}))
.apply(Sum.longsGlobally());
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index ea5f2e5..e4ce5e3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -34,8 +34,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
/**
@@ -57,12 +55,12 @@
/** A {@link DoFn} that tokenizes lines of text into individual words. */
static class ExtractWords extends DoFn<String, String> {
@ProcessElement
- public void processElement(@Element String element, OutputReceiver<String> receiver) {
- String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
+ public void processElement(ProcessContext c) {
+ String[] words = c.element().split(ExampleUtils.TOKENIZER_PATTERN, -1);
for (String word : words) {
if (!word.isEmpty()) {
- receiver.output(word);
+ c.output(word);
}
}
}
@@ -71,8 +69,8 @@
/** A {@link DoFn} that uppercases a word. */
static class Uppercase extends DoFn<String, String> {
@ProcessElement
- public void processElement(@Element String element, OutputReceiver<String> receiver) {
- receiver.output(element.toUpperCase());
+ public void processElement(ProcessContext c) {
+ c.output(c.element().toUpperCase());
}
}
@@ -80,8 +78,8 @@
static class StringToRowConverter extends DoFn<String, TableRow> {
/** In this example, put the whole string into single BigQuery field. */
@ProcessElement
- public void processElement(@Element String element, OutputReceiver<TableRow> receiver) {
- receiver.output(new TableRow().set("string_field", element));
+ public void processElement(ProcessContext c) {
+ c.output(new TableRow().set("string_field", c.element()));
}
static TableSchema getSchema() {
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index a61bd4c..b3e5fd0 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -56,9 +56,6 @@
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.SideInput;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.PTransform;
@@ -239,11 +236,9 @@
ParDo.of(
new DoFn<KV<URI, String>, KV<URI, String>>() {
@ProcessElement
- public void processElement(
- @Element KV<URI, String> element,
- OutputReceiver<KV<URI, String>> receiver) {
- URI uri = element.getKey();
- String line = element.getValue();
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey();
+ String line = c.element().getValue();
for (String word : line.split("\\W+", -1)) {
// Log INFO messages when the word “love” is found.
if ("love".equalsIgnoreCase(word)) {
@@ -251,7 +246,7 @@
}
if (!word.isEmpty()) {
- receiver.output(KV.of(uri, word.toLowerCase()));
+ c.output(KV.of(uri, word.toLowerCase()));
}
}
}
@@ -286,13 +281,11 @@
ParDo.of(
new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
@ProcessElement
- public void processElement(
- @Element KV<KV<URI, String>, Long> element,
- OutputReceiver<KV<URI, KV<String, Long>>> receiver) {
- URI uri = element.getKey().getKey();
- String word = element.getKey().getValue();
- Long occurrences = element.getValue();
- receiver.output(KV.of(uri, KV.of(word, occurrences)));
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey().getKey();
+ String word = c.element().getKey().getValue();
+ Long occurrences = c.element().getValue();
+ c.output(KV.of(uri, KV.of(word, occurrences)));
}
}));
@@ -329,18 +322,16 @@
ParDo.of(
new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
@ProcessElement
- public void processElement(
- @Element KV<URI, CoGbkResult> element,
- OutputReceiver<KV<String, KV<URI, Double>>> receiver) {
- URI uri = element.getKey();
- Long wordTotal = element.getValue().getOnly(wordTotalsTag);
+ public void processElement(ProcessContext c) {
+ URI uri = c.element().getKey();
+ Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
for (KV<String, Long> wordAndCount :
- element.getValue().getAll(wordCountsTag)) {
+ c.element().getValue().getAll(wordCountsTag)) {
String word = wordAndCount.getKey();
Long wordCount = wordAndCount.getValue();
Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue();
- receiver.output(KV.of(word, KV.of(uri, termFrequency)));
+ c.output(KV.of(word, KV.of(uri, termFrequency)));
}
}
}));
@@ -357,19 +348,17 @@
ParDo.of(
new DoFn<KV<String, Long>, KV<String, Double>>() {
@ProcessElement
- public void processElement(
- @SideInput("totalDocuments") Long documentTotal,
- @Element KV<String, Long> element,
- OutputReceiver<KV<String, Double>> receiver) {
- String word = element.getKey();
- Long documentCount = element.getValue();
+ public void processElement(ProcessContext c) {
+ String word = c.element().getKey();
+ Long documentCount = c.element().getValue();
+ Long documentTotal = c.sideInput(totalDocuments);
Double documentFrequency =
documentCount.doubleValue() / documentTotal.doubleValue();
- receiver.output(KV.of(word, documentFrequency));
+ c.output(KV.of(word, documentFrequency));
}
})
- .withSideInput("totalDocuments", totalDocuments));
+ .withSideInputs(totalDocuments));
// Join the term frequency and document frequency
// collections, each keyed on the word.
@@ -391,17 +380,15 @@
ParDo.of(
new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
@ProcessElement
- public void processElement(
- @Element KV<String, CoGbkResult> element,
- OutputReceiver<KV<String, KV<URI, Double>>> receiver) {
- String word = element.getKey();
- Double df = element.getValue().getOnly(dfTag);
+ public void processElement(ProcessContext c) {
+ String word = c.element().getKey();
+ Double df = c.element().getValue().getOnly(dfTag);
- for (KV<URI, Double> uriAndTf : element.getValue().getAll(tfTag)) {
+ for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) {
URI uri = uriAndTf.getKey();
Double tf = uriAndTf.getValue();
Double tfIdf = tf * Math.log(1 / df);
- receiver.output(KV.of(word, KV.of(uri, tfIdf)));
+ c.output(KV.of(word, KV.of(uri, tfIdf)));
}
}
}));
@@ -432,15 +419,13 @@
ParDo.of(
new DoFn<KV<String, KV<URI, Double>>, String>() {
@ProcessElement
- public void processElement(
- @Element KV<String, KV<URI, Double>> element,
- OutputReceiver<String> receiver) {
- receiver.output(
+ public void processElement(ProcessContext c) {
+ c.output(
String.format(
"%s,\t%s,\t%f",
- element.getKey(),
- element.getValue().getKey(),
- element.getValue().getValue()));
+ c.element().getKey(),
+ c.element().getValue().getKey(),
+ c.element().getValue().getValue()));
}
}))
.apply(TextIO.write().to(output).withSuffix(".csv"));
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index d156777..b06cd8d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -48,8 +48,6 @@
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -95,7 +93,8 @@
/** Extracts user and timestamp from a TableRow representing a Wikipedia edit. */
static class ExtractUserAndTimestamp extends DoFn<TableRow, String> {
@ProcessElement
- public void processElement(@Element TableRow row, OutputReceiver<String> receiver) {
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
int timestamp;
// TODO(BEAM-5390): Avoid this workaround.
try {
@@ -106,7 +105,7 @@
String userName = (String) row.get("contributor_username");
if (userName != null) {
// Sets the implicit timestamp field to be used in windowing.
- receiver.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
+ c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
}
}
}
@@ -144,24 +143,18 @@
static class SessionsToStringsDoFn extends DoFn<KV<String, Long>, KV<String, Long>> {
@ProcessElement
- public void processElement(
- BoundedWindow window,
- @Element KV<String, Long> element,
- OutputReceiver<KV<String, Long>> receiver) {
- receiver.output(KV.of(element.getKey() + " : " + window, element.getValue()));
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ c.output(KV.of(c.element().getKey() + " : " + window, c.element().getValue()));
}
}
static class FormatOutputDoFn extends DoFn<List<KV<String, Long>>, String> {
@ProcessElement
- public void processElement(
- BoundedWindow window,
- @Element List<KV<String, Long>> element,
- OutputReceiver<String> receiver) {
- for (KV<String, Long> item : element) {
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ for (KV<String, Long> item : c.element()) {
String session = item.getKey();
long count = item.getValue();
- receiver.output(session + " : " + count + " : " + ((IntervalWindow) window).start());
+ c.output(session + " : " + count + " : " + ((IntervalWindow) window).start());
}
}
}
@@ -194,11 +187,10 @@
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
- public void processElement(
- @Element String element, OutputReceiver<String> receiver) {
- if (Math.abs((long) element.hashCode())
+ public void processElement(ProcessContext c) {
+ if (Math.abs((long) c.element().hashCode())
<= Integer.MAX_VALUE * samplingThreshold) {
- receiver.output(element);
+ c.output(c.element());
}
}
}))
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index b22e3fe..6f75e2e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -58,9 +58,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -185,14 +182,13 @@
DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
@ProcessElement
- public void processElement(@Element String element, OutputReceiver<String> receiver)
- throws Exception {
- String[] items = element.split(",", -1);
+ public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
+ String[] items = c.element().split(",", -1);
if (items.length > 0) {
try {
String timestamp = items[0];
- receiver.outputWithTimestamp(element, new Instant(dateTimeFormat.parseMillis(timestamp)));
+ c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
} catch (IllegalArgumentException e) {
// Skip the invalid input.
}
@@ -210,9 +206,8 @@
static class ExtractFlowInfoFn extends DoFn<String, KV<String, LaneInfo>> {
@ProcessElement
- public void processElement(
- @Element String element, OutputReceiver<KV<String, LaneInfo>> receiver) {
- String[] items = element.split(",", -1);
+ public void processElement(ProcessContext c) {
+ String[] items = c.element().split(",", -1);
if (items.length < 48) {
// Skip the invalid input.
return;
@@ -241,7 +236,7 @@
laneAvgOccupancy,
laneAvgSpeed,
totalFlow);
- receiver.output(KV.of(stationId, laneInfo));
+ c.output(KV.of(stationId, laneInfo));
}
}
}
@@ -275,15 +270,12 @@
*/
static class FormatMaxesFn extends DoFn<KV<String, LaneInfo>, TableRow> {
@ProcessElement
- public void processElement(
- @Element KV<String, LaneInfo> element,
- @Timestamp Instant timestamp,
- OutputReceiver<TableRow> receiver) {
+ public void processElement(ProcessContext c) {
- LaneInfo laneInfo = element.getValue();
+ LaneInfo laneInfo = c.element().getValue();
TableRow row =
new TableRow()
- .set("station_id", element.getKey())
+ .set("station_id", c.element().getKey())
.set("direction", laneInfo.getDirection())
.set("freeway", laneInfo.getFreeway())
.set("lane_max_flow", laneInfo.getLaneFlow())
@@ -292,8 +284,8 @@
.set("avg_speed", laneInfo.getLaneAS())
.set("total_flow", laneInfo.getTotalFlow())
.set("recorded_timestamp", laneInfo.getRecordedTimestamp())
- .set("window_timestamp", timestamp.toString());
- receiver.output(row);
+ .set("window_timestamp", c.timestamp().toString());
+ c.output(row);
}
/** Defines the BigQuery schema used for the output. */
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index 38ba432..9584156 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -63,9 +63,6 @@
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -190,13 +187,12 @@
DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
@ProcessElement
- public void processElement(@Element String element, OutputReceiver<String> receiver)
- throws Exception {
- String[] items = element.split(",");
+ public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
+ String[] items = c.element().split(",");
String timestamp = tryParseTimestamp(items);
if (timestamp != null) {
try {
- receiver.outputWithTimestamp(element, new Instant(dateTimeFormat.parseMillis(timestamp)));
+ c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
} catch (IllegalArgumentException e) {
// Skip the invalid input.
}
@@ -211,11 +207,8 @@
static class ExtractStationSpeedFn extends DoFn<String, KV<String, StationSpeed>> {
@ProcessElement
- public void processElement(
- @Timestamp Instant timestamp,
- @Element String element,
- OutputReceiver<KV<String, StationSpeed>> receiver) {
- String[] items = element.split(",");
+ public void processElement(ProcessContext c) {
+ String[] items = c.element().split(",");
String stationType = tryParseStationType(items);
// For this analysis, use only 'main line' station types
if ("ML".equals(stationType)) {
@@ -223,10 +216,11 @@
String stationId = tryParseStationId(items);
// For this simple example, filter out everything but some hardwired routes.
if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
- StationSpeed stationSpeed = new StationSpeed(stationId, avgSpeed, timestamp.getMillis());
+ StationSpeed stationSpeed =
+ new StationSpeed(stationId, avgSpeed, c.timestamp().getMillis());
// The tuple key is the 'route' name stored in the 'sdStations' hash.
KV<String, StationSpeed> outputValue = KV.of(sdStations.get(stationId), stationSpeed);
- receiver.output(outputValue);
+ c.output(outputValue);
}
}
}
@@ -240,16 +234,13 @@
*/
static class GatherStats extends DoFn<KV<String, Iterable<StationSpeed>>, KV<String, RouteInfo>> {
@ProcessElement
- public void processElement(
- @Element KV<String, Iterable<StationSpeed>> element,
- OutputReceiver<KV<String, RouteInfo>> receiver)
- throws IOException {
- String route = element.getKey();
+ public void processElement(ProcessContext c) throws IOException {
+ String route = c.element().getKey();
double speedSum = 0.0;
int speedCount = 0;
int speedups = 0;
int slowdowns = 0;
- List<StationSpeed> infoList = Lists.newArrayList(element.getValue());
+ List<StationSpeed> infoList = Lists.newArrayList(c.element().getValue());
// StationSpeeds sort by embedded timestamp.
Collections.sort(infoList);
Map<String, Double> prevSpeeds = new HashMap<>();
@@ -277,25 +268,22 @@
double speedAvg = speedSum / speedCount;
boolean slowdownEvent = slowdowns >= 2 * speedups;
RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
- receiver.output(KV.of(route, routeInfo));
+ c.output(KV.of(route, routeInfo));
}
}
/** Format the results of the slowdown calculations to a TableRow, to save to BigQuery. */
static class FormatStatsFn extends DoFn<KV<String, RouteInfo>, TableRow> {
@ProcessElement
- public void processElement(
- @Element KV<String, RouteInfo> element,
- @Timestamp Instant timestamp,
- OutputReceiver<TableRow> receiver) {
- RouteInfo routeInfo = element.getValue();
+ public void processElement(ProcessContext c) {
+ RouteInfo routeInfo = c.element().getValue();
TableRow row =
new TableRow()
.set("avg_speed", routeInfo.getAvgSpeed())
.set("slowdown_event", routeInfo.getSlowdownEvent())
- .set("route", element.getKey())
- .set("window_timestamp", timestamp.toString());
- receiver.output(row);
+ .set("route", c.element().getKey())
+ .set("window_timestamp", c.timestamp().toString());
+ c.output(row);
}
/** Defines the BigQuery schema used for the output. */
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java
index 6e5321d..cf097c8 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java
@@ -37,9 +37,6 @@
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -202,24 +199,20 @@
@ProcessElement
@SuppressWarnings("argument")
- public void process(
- @Element KV<Integer, Iterable<Row>> element,
- OutputReceiver<Row> mainReceiver,
- MultiOutputReceiver multiReceiver) {
+ public void process(@Element KV<Integer, Iterable<Row>> element, ProcessContext context) {
Iterable<Row> rows = element.getValue();
try {
for (Row outputRow : getTokenizedRow(rows)) {
- mainReceiver.output(outputRow);
+ context.output(outputRow);
}
} catch (Exception e) {
for (Row outputRow : rows) {
- multiReceiver
- .get(failureTag)
- .output(
- FailsafeElement.of(outputRow, outputRow)
- .setErrorMessage(e.getMessage())
- .setStacktrace(Throwables.getStackTraceAsString(e)));
+ context.output(
+ failureTag,
+ FailsafeElement.of(outputRow, outputRow)
+ .setErrorMessage(e.getMessage())
+ .setStacktrace(Throwables.getStackTraceAsString(e)));
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java
index 667e26f..fe8f4c1 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java
@@ -26,8 +26,6 @@
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
@@ -92,8 +90,9 @@
public static class RowToTableRowFn extends DoFn<Row, TableRow> {
@ProcessElement
- public void processElement(@Element Row row, OutputReceiver<TableRow> receiver) {
- receiver.output(BigQueryUtils.toTableRow(row));
+ public void processElement(ProcessContext context) {
+ Row row = context.element();
+ context.output(BigQueryUtils.toTableRow(row));
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java
index 435620a..d7d1c3e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java
@@ -75,10 +75,8 @@
@ProcessElement
public void processElement(
- @Element Row in,
- OutputReceiver<KV<ByteString, Iterable<Mutation>>> out,
- PipelineOptions pipelineOptions) {
- DataTokenizationOptions options = pipelineOptions.as(DataTokenizationOptions.class);
+ @Element Row in, OutputReceiver<KV<ByteString, Iterable<Mutation>>> out, ProcessContext c) {
+ DataTokenizationOptions options = c.getPipelineOptions().as(DataTokenizationOptions.class);
// Mapping every field in provided Row to Mutation.SetCell, which will create/update
// cell content with provided data
Set<Mutation> mutations =
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java
index 9fead4e..d827e4b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java
@@ -264,6 +264,8 @@
@Override
public PCollectionTuple expand(PCollectionTuple lines) {
+ PCollectionView<String> headersView = null;
+
// Convert csv lines into Failsafe elements so that we can recover over multiple transforms.
PCollection<FailsafeElement<String, String>> lineFailsafeElements =
lines
@@ -283,14 +285,16 @@
return lineFailsafeElements.apply(
"LineToDocumentUsingSchema",
- ParDo.of(new FailsafeElementToJsonFn(schema, delimiter(), udfDeadletterTag()))
+ ParDo.of(
+ new FailsafeElementToJsonFn(
+ headersView, schema, delimiter(), udfDeadletterTag()))
.withOutputTags(udfOutputTag(), TupleTagList.of(udfDeadletterTag())));
}
// Run if using headers
- PCollectionView<String> headersView =
- lines.get(headerTag()).apply(Sample.any(1)).apply(View.asSingleton());
+ headersView = lines.get(headerTag()).apply(Sample.any(1)).apply(View.asSingleton());
+ PCollectionView<String> finalHeadersView = headersView;
lines
.get(headerTag())
.apply(
@@ -298,24 +302,23 @@
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
- public void processElement(
- @SideInput("finalHeadersView") String headers,
- @Element String element) {
- if (!element.equals(headers)) {
+ public void processElement(ProcessContext c) {
+ String headers = c.sideInput(finalHeadersView);
+ if (!c.element().equals(headers)) {
LOG.error("Headers do not match, consistency cannot be guaranteed");
throw new RuntimeException(
"Headers do not match, consistency cannot be guaranteed");
}
}
})
- .withSideInput("finalHeadersView", headersView));
+ .withSideInputs(finalHeadersView));
return lineFailsafeElements.apply(
"LineToDocumentWithHeaders",
ParDo.of(
- new FailsafeElementToJsonWithHeadersFn(
- jsonSchemaPath(), delimiter(), udfDeadletterTag()))
- .withSideInput("finalHeadersView", headersView)
+ new FailsafeElementToJsonFn(
+ headersView, jsonSchemaPath(), delimiter(), udfDeadletterTag()))
+ .withSideInputs(headersView)
.withOutputTags(udfOutputTag(), TupleTagList.of(udfDeadletterTag())));
}
@@ -354,90 +357,45 @@
@Nullable public final String jsonSchema;
public final String delimiter;
public final TupleTag<FailsafeElement<String, String>> udfDeadletterTag;
+ @Nullable private final PCollectionView<String> headersView;
private Counter successCounter =
Metrics.counter(FailsafeElementToJsonFn.class, SUCCESSFUL_TO_JSON_COUNTER);
private Counter failedCounter =
Metrics.counter(FailsafeElementToJsonFn.class, FAILED_TO_JSON_COUNTER);
FailsafeElementToJsonFn(
+ PCollectionView<String> headersView,
String jsonSchema,
String delimiter,
TupleTag<FailsafeElement<String, String>> udfDeadletterTag) {
+ this.headersView = headersView;
this.jsonSchema = jsonSchema;
this.delimiter = delimiter;
this.udfDeadletterTag = udfDeadletterTag;
}
@ProcessElement
- public void processElement(
- @Element FailsafeElement<String, String> element,
- OutputReceiver<FailsafeElement<String, String>> receiver,
- MultiOutputReceiver multiReceiver) {
- List<String> header = null;
- List<String> record = Arrays.asList(element.getOriginalPayload().split(this.delimiter));
-
- try {
- String json = buildJsonString(header, record, this.jsonSchema);
- receiver.output(FailsafeElement.of(element.getOriginalPayload(), json));
- successCounter.inc();
- } catch (Exception e) {
- failedCounter.inc();
- multiReceiver
- .get(this.udfDeadletterTag)
- .output(
- FailsafeElement.of(element)
- .setErrorMessage(e.getMessage())
- .setStacktrace(Throwables.getStackTraceAsString(e)));
- }
- }
- }
-
- public static class FailsafeElementToJsonWithHeadersFn
- extends DoFn<FailsafeElement<String, String>, FailsafeElement<String, String>> {
-
- @Nullable public final String jsonSchema;
- public final String delimiter;
- public final TupleTag<FailsafeElement<String, String>> udfDeadletterTag;
- private Counter successCounter =
- Metrics.counter(FailsafeElementToJsonWithHeadersFn.class, SUCCESSFUL_TO_JSON_COUNTER);
- private Counter failedCounter =
- Metrics.counter(FailsafeElementToJsonWithHeadersFn.class, FAILED_TO_JSON_COUNTER);
-
- FailsafeElementToJsonWithHeadersFn(
- String jsonSchema,
- String delimiter,
- TupleTag<FailsafeElement<String, String>> udfDeadletterTag) {
- this.jsonSchema = jsonSchema;
- this.delimiter = delimiter;
- this.udfDeadletterTag = udfDeadletterTag;
- }
-
- @ProcessElement
- public void processElement(
- @Element FailsafeElement<String, String> element,
- OutputReceiver<FailsafeElement<String, String>> receiver,
- MultiOutputReceiver multiReceiver,
- @SideInput("finalHeadersView") String headersStr) {
+ public void processElement(ProcessContext context) {
+ FailsafeElement<String, String> element = context.element();
List<String> header = null;
- if (headersStr != null) {
- header = Arrays.asList(headersStr.split(this.delimiter));
+ if (this.headersView != null) {
+ header = Arrays.asList(context.sideInput(this.headersView).split(this.delimiter));
}
List<String> record = Arrays.asList(element.getOriginalPayload().split(this.delimiter));
try {
String json = buildJsonString(header, record, this.jsonSchema);
- receiver.output(FailsafeElement.of(element.getOriginalPayload(), json));
+ context.output(FailsafeElement.of(element.getOriginalPayload(), json));
successCounter.inc();
} catch (Exception e) {
failedCounter.inc();
- multiReceiver
- .get(this.udfDeadletterTag)
- .output(
- FailsafeElement.of(element)
- .setErrorMessage(e.getMessage())
- .setStacktrace(Throwables.getStackTraceAsString(e)));
+ context.output(
+ this.udfDeadletterTag,
+ FailsafeElement.of(element)
+ .setErrorMessage(e.getMessage())
+ .setStacktrace(Throwables.getStackTraceAsString(e)));
}
}
}
@@ -449,9 +407,9 @@
static class LineToFailsafeElementFn extends DoFn<String, FailsafeElement<String, String>> {
@ProcessElement
- public void processElement(
- @Element String message, OutputReceiver<FailsafeElement<String, String>> receiver) {
- receiver.output(FailsafeElement.of(message, message));
+ public void processElement(ProcessContext context) {
+ String message = context.element();
+ context.output(FailsafeElement.of(message, message));
}
}
@@ -552,12 +510,13 @@
}
@ProcessElement
- public void processElement(@Element ReadableFile file, MultiOutputReceiver outputReceiver) {
+ public void processElement(ProcessContext context, MultiOutputReceiver outputReceiver) {
+ ReadableFile f = context.element();
String headers;
List<String> records = null;
String delimiter = String.valueOf(this.csvFormat.getDelimiter());
try {
- String csvFileString = file.readFullyAsUTF8String();
+ String csvFileString = f.readFullyAsUTF8String();
StringReader reader = new StringReader(csvFileString);
CSVParser parser = CSVParser.parse(reader, this.csvFormat.withFirstRecordAsHeader());
records =
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java
index 99981cb..01377ad 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java
@@ -29,9 +29,6 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -46,7 +43,6 @@
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
-import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@@ -127,17 +123,16 @@
}
@ProcessElement
- public void processElement(
- @Element FailsafeElement<String, String> failsafeElement,
- @Timestamp Instant timestamp,
- OutputReceiver<String> receiver) {
+ public void processElement(ProcessContext context) {
+ FailsafeElement<String, String> failsafeElement = context.element();
ArrayList<String> outputRow = new ArrayList<>();
final String message = failsafeElement.getOriginalPayload();
// Format the timestamp for insertion
- String timestampStr = TIMESTAMP_FORMATTER.print(timestamp.toDateTime(DateTimeZone.UTC));
+ String timestamp =
+ TIMESTAMP_FORMATTER.print(context.timestamp().toDateTime(DateTimeZone.UTC));
- outputRow.add(timestampStr);
+ outputRow.add(timestamp);
outputRow.add(MoreObjects.firstNonNull(failsafeElement.getErrorMessage(), ""));
// Only set the payload if it's populated on the message.
@@ -145,7 +140,7 @@
outputRow.add(message);
}
- receiver.output(String.join(csvDelimiter, outputRow));
+ context.output(String.join(csvDelimiter, outputRow));
}
}
@@ -203,20 +198,19 @@
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
@ProcessElement
- public void processElement(
- @Timestamp Instant timestamp,
- @Element FailsafeElement<String, String> failsafeElement,
- OutputReceiver<TableRow> receiver) {
+ public void processElement(ProcessContext context) {
+ FailsafeElement<String, String> failsafeElement = context.element();
final String message = failsafeElement.getOriginalPayload();
// Format the timestamp for insertion
- String timestampStr = TIMESTAMP_FORMATTER.print(timestamp.toDateTime(DateTimeZone.UTC));
+ String timestamp =
+ TIMESTAMP_FORMATTER.print(context.timestamp().toDateTime(DateTimeZone.UTC));
// Build the table row
@SuppressWarnings("nullness") // TableRow.set not annotated but does accept nulls
final TableRow failedRow =
new TableRow()
- .set("timestamp", timestampStr)
+ .set("timestamp", timestamp)
.set("errorMessage", failsafeElement.getErrorMessage())
.set("stacktrace", failsafeElement.getStacktrace());
@@ -227,7 +221,7 @@
.set("payloadBytes", message.getBytes(StandardCharsets.UTF_8));
}
- receiver.output(failedRow);
+ context.output(failedRow);
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 0018201..a3ed04b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -34,9 +34,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.SideInput;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Mean;
import org.apache.beam.sdk.transforms.PTransform;
@@ -129,23 +126,21 @@
Metrics.counter("main", "SpammerUsers");
@ProcessElement
- public void processElement(
- @SideInput("globalMeanScore") Double gmc,
- @Element KV<String, Integer> element,
- OutputReceiver<KV<String, Integer>> receiver) {
- Integer score = element.getValue();
+ public void processElement(ProcessContext c) {
+ Integer score = c.element().getValue();
+ Double gmc = c.sideInput(globalMeanScore);
if (score > (gmc * SCORE_WEIGHT)) {
LOG.info(
"user {} spammer score {} with mean {}",
- element.getKey(),
+ c.element().getKey(),
score,
gmc);
numSpammerUsers.inc();
- receiver.output(element);
+ c.output(c.element());
}
}
})
- .withSideInput("globalMeanScore", globalMeanScore));
+ .withSideInputs(globalMeanScore));
return filtered;
}
}
@@ -154,10 +149,10 @@
/** Calculate and output an element's session duration. */
private static class UserSessionInfoFn extends DoFn<KV<String, Integer>, Integer> {
@ProcessElement
- public void processElement(BoundedWindow window, OutputReceiver<Integer> receiver) {
+ public void processElement(ProcessContext c, BoundedWindow window) {
IntervalWindow w = (IntervalWindow) window;
int duration = new Duration(w.start(), w.end()).toPeriod().toStandardMinutes().getMinutes();
- receiver.output(duration);
+ c.output(duration);
}
}
@@ -197,21 +192,22 @@
configureWindowedWrite() {
Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<>();
tableConfigure.put(
- "team", new WriteToBigQuery.FieldInfo<>("STRING", (e, w, t, p) -> e.getKey()));
+ "team", new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey()));
tableConfigure.put(
- "total_score", new WriteToBigQuery.FieldInfo<>("INTEGER", (e, w, t, p) -> e.getValue()));
+ "total_score",
+ new WriteToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue()));
tableConfigure.put(
"window_start",
new WriteToBigQuery.FieldInfo<>(
"STRING",
- (e, w, t, p) -> {
+ (c, w) -> {
IntervalWindow window = (IntervalWindow) w;
return GameConstants.DATE_TIME_FORMATTER.print(window.start());
}));
tableConfigure.put(
"processing_time",
new WriteToBigQuery.FieldInfo<>(
- "STRING", (e, w, t, p) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
+ "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
return tableConfigure;
}
@@ -226,12 +222,12 @@
"window_start",
new WriteToBigQuery.FieldInfo<>(
"STRING",
- (e, w, t, p) -> {
+ (c, w) -> {
IntervalWindow window = (IntervalWindow) w;
return GameConstants.DATE_TIME_FORMATTER.print(window.start());
}));
tableConfigure.put(
- "mean_duration", new WriteToBigQuery.FieldInfo<>("FLOAT", (e, w, t, p) -> e));
+ "mean_duration", new WriteToBigQuery.FieldInfo<>("FLOAT", (c, w) -> c.element()));
return tableConfigure;
}
@@ -292,17 +288,14 @@
ParDo.of(
new DoFn<GameActionInfo, GameActionInfo>() {
@ProcessElement
- public void processElement(
- @SideInput("spammersView") Map<String, Integer> spammers,
- @Element GameActionInfo element,
- OutputReceiver<GameActionInfo> receiver) {
+ public void processElement(ProcessContext c) {
// If the user is not in the spammers Map, output the data element.
- if (spammers.get(element.getUser().trim()) == null) {
- receiver.output(element);
+ if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
+ c.output(c.element());
}
}
})
- .withSideInput("spammersView", spammersView))
+ .withSideInputs(spammersView))
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new ExtractAndSumScore("team"))
// [END DocInclude_FilterAndCalc]
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index a94f699..c57d9ba 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -112,11 +112,11 @@
*/
protected static Map<String, WriteToText.FieldFn<KV<String, Integer>>> configureOutput() {
Map<String, WriteToText.FieldFn<KV<String, Integer>>> config = new HashMap<>();
- config.put("team", (e, w, t, p) -> e.getKey());
- config.put("total_score", (e, w, t, p) -> e.getValue());
+ config.put("team", (c, w) -> c.element().getKey());
+ config.put("total_score", (c, w) -> c.element().getValue());
config.put(
"window_start",
- (e, w, t, p) -> {
+ (c, w) -> {
IntervalWindow window = (IntervalWindow) w;
return GameConstants.DATE_TIME_FORMATTER.print(window.start());
});
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 3750b33..832c0ad 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -135,24 +135,25 @@
Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<>();
tableConfigure.put(
- "team", new WriteToBigQuery.FieldInfo<>("STRING", (e, w, t, p) -> e.getKey()));
+ "team", new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey()));
tableConfigure.put(
- "total_score", new WriteToBigQuery.FieldInfo<>("INTEGER", (e, w, t, p) -> e.getValue()));
+ "total_score",
+ new WriteToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue()));
tableConfigure.put(
"window_start",
new WriteToBigQuery.FieldInfo<>(
"STRING",
- (e, w, t, p) -> {
+ (c, w) -> {
IntervalWindow window = (IntervalWindow) w;
return GameConstants.DATE_TIME_FORMATTER.print(window.start());
}));
tableConfigure.put(
"processing_time",
new WriteToBigQuery.FieldInfo<>(
- "STRING", (e, w, t, p) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
+ "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
tableConfigure.put(
"timing",
- new WriteToBigQuery.FieldInfo<>("STRING", (e, w, t, p) -> p.getTiming().toString()));
+ new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.pane().getTiming().toString()));
return tableConfigure;
}
@@ -164,9 +165,10 @@
configureBigQueryWrite() {
Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<>();
tableConfigure.put(
- "user", new WriteToBigQuery.FieldInfo<>("STRING", (e, w, t, p) -> e.getKey()));
+ "user", new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey()));
tableConfigure.put(
- "total_score", new WriteToBigQuery.FieldInfo<>("INTEGER", (e, w, t, p) -> e.getValue()));
+ "total_score",
+ new WriteToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue()));
return tableConfigure;
}
@@ -182,7 +184,7 @@
tableConfigure.put(
"processing_time",
new WriteToBigQuery.FieldInfo<>(
- "STRING", (e, w, t, p) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
+ "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
return tableConfigure;
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java
index 3caa1e6..b28db26 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java
@@ -37,8 +37,6 @@
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -103,12 +101,12 @@
private static Map<String, FieldInfo<KV<String, Integer>>> configureCompleteWindowedTableWrite() {
Map<String, FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<>();
- tableConfigure.put("team", new FieldInfo<>("STRING", (e, w, t, p) -> e.getKey()));
- tableConfigure.put("total_score", new FieldInfo<>("INTEGER", (e, w, t, p) -> e.getValue()));
+ tableConfigure.put("team", new FieldInfo<>("STRING", (c, w) -> c.element().getKey()));
+ tableConfigure.put("total_score", new FieldInfo<>("INTEGER", (c, w) -> c.element().getValue()));
tableConfigure.put(
"processing_time",
new FieldInfo<>(
- "STRING", (e, w, t, p) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
+ "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
return tableConfigure;
}
@@ -205,11 +203,9 @@
*/
@ProcessElement
public void processElement(
- @Element KV<String, GameActionInfo> element,
- @StateId(TOTAL_SCORE) ValueState<Integer> totalScore,
- OutputReceiver<KV<String, Integer>> receiver) {
- String teamName = element.getKey();
- GameActionInfo gInfo = element.getValue();
+ ProcessContext c, @StateId(TOTAL_SCORE) ValueState<Integer> totalScore) {
+ String teamName = c.element().getKey();
+ GameActionInfo gInfo = c.element().getValue();
// ValueState cells do not contain a default value. If the state is possibly not written, make
// sure to check for null on read.
@@ -222,7 +218,7 @@
// the new total is 2002, and the threshold is 1000, 1999 / 1000 = 1, 2002 / 1000 = 2.
// Therefore, this team passed the threshold.
if (oldTotalScore / this.thresholdScore < totalScore.read() / this.thresholdScore) {
- receiver.output(KV.of(teamName, totalScore.read()));
+ c.output(KV.of(teamName, totalScore.read()));
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 054ce7a..b30b466 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -34,8 +34,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -163,18 +161,18 @@
private final Counter numParseErrors = Metrics.counter("main", "ParseErrors");
@ProcessElement
- public void processElement(@Element String element, OutputReceiver<GameActionInfo> receiver) {
- String[] components = element.split(",", -1);
+ public void processElement(ProcessContext c) {
+ String[] components = c.element().split(",", -1);
try {
String user = components[0].trim();
String team = components[1].trim();
Integer score = Integer.parseInt(components[2].trim());
Long timestamp = Long.parseLong(components[3].trim());
GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp);
- receiver.output(gInfo);
+ c.output(gInfo);
} catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
numParseErrors.inc();
- LOG.info("Parse error on {}", element, e);
+ LOG.info("Parse error on {}", c.element(), e);
}
}
}
@@ -234,8 +232,8 @@
*/
protected static Map<String, WriteToText.FieldFn<KV<String, Integer>>> configureOutput() {
Map<String, WriteToText.FieldFn<KV<String, Integer>>> config = new HashMap<>();
- config.put("user", (e, w, t, p) -> e.getKey());
- config.put("total_score", (e, w, t, p) -> e.getValue());
+ config.put("user", (c, w) -> c.element().getKey());
+ config.put("total_score", (c, w) -> c.element().getValue());
return config;
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index 5486025..eef4bc9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -30,16 +30,11 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
-import org.joda.time.Instant;
/**
* Generate, format, and write BigQuery table row information. Use provided information about the
@@ -69,11 +64,11 @@
}
/**
- * A {@link Serializable} function from an element and {@link BoundedWindow} to the value for that
- * field.
+ * A {@link Serializable} function from a {@link DoFn.ProcessContext} and {@link BoundedWindow} to
+ * the value for that field.
*/
public interface FieldFn<InputT> extends Serializable {
- Object apply(InputT element, BoundedWindow window, Instant timestamp, PaneInfo pane);
+ Object apply(DoFn<InputT, TableRow>.ProcessContext context, BoundedWindow window);
}
/** Define a class to hold information about output table field definitions. */
@@ -101,21 +96,16 @@
protected class BuildRowFn extends DoFn<InputT, TableRow> {
@ProcessElement
- public void processElement(
- @Element InputT element,
- @Timestamp Instant timestamp,
- PaneInfo pane,
- BoundedWindow window,
- OutputReceiver<TableRow> receiver) {
+ public void processElement(ProcessContext c, BoundedWindow window) {
TableRow row = new TableRow();
for (Map.Entry<String, FieldInfo<InputT>> entry : fieldInfo.entrySet()) {
String key = entry.getKey();
FieldInfo<InputT> fcnInfo = entry.getValue();
FieldFn<InputT> fcn = fcnInfo.getFieldFn();
- row.set(key, fcn.apply(element, window, timestamp, pane));
+ row.set(key, fcn.apply(c, window));
}
- receiver.output(row);
+ c.output(row);
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
index 3547116..330769e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
@@ -32,9 +32,6 @@
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -73,32 +70,26 @@
}
/**
- * A {@link Serializable} function from an element and {@link BoundedWindow} to the value for that
- * field.
+ * A {@link Serializable} function from a {@link DoFn.ProcessContext} and {@link BoundedWindow} to
+ * the value for that field.
*/
public interface FieldFn<InputT> extends Serializable {
- Object apply(
- InputT element, BoundedWindow window, org.joda.time.Instant timestamp, PaneInfo pane);
+ Object apply(DoFn<InputT, String>.ProcessContext context, BoundedWindow window);
}
/** Convert each key/score pair into a row as specified by fieldFn. */
protected class BuildRowFn extends DoFn<InputT, String> {
@ProcessElement
- public void processElement(
- @Element InputT element,
- @Timestamp org.joda.time.Instant timestamp,
- PaneInfo pane,
- BoundedWindow window,
- OutputReceiver<String> receiver) {
+ public void processElement(ProcessContext c, BoundedWindow window) {
List<String> fields = new ArrayList<>();
for (Map.Entry<String, FieldFn<InputT>> entry : fieldFn.entrySet()) {
String key = entry.getKey();
FieldFn<InputT> fcn = entry.getValue();
- fields.add(key + ": " + fcn.apply(element, window, timestamp, pane));
+ fields.add(key + ": " + fcn.apply(c, window));
}
String result = fields.stream().collect(Collectors.joining(", "));
- receiver.output(result);
+ c.output(result);
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index 77a59f2..36fa18a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -24,12 +24,8 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -48,20 +44,15 @@
/** Convert each key/score pair into a BigQuery TableRow. */
protected class BuildRowFn extends DoFn<T, TableRow> {
@ProcessElement
- public void processElement(
- @Element T element,
- @Timestamp org.joda.time.Instant timestamp,
- PaneInfo pane,
- BoundedWindow window,
- OutputReceiver<TableRow> receiver) {
+ public void processElement(ProcessContext c, BoundedWindow window) {
TableRow row = new TableRow();
for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
String key = entry.getKey();
FieldInfo<T> fcnInfo = entry.getValue();
- row.set(key, fcnInfo.getFieldFn().apply(element, window, timestamp, pane));
+ row.set(key, fcnInfo.getFieldFn().apply(c, window));
}
- receiver.output(row);
+ c.output(row);
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryStreamingTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryStreamingTornadoes.java
index d662cde..3b2653b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryStreamingTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryStreamingTornadoes.java
@@ -32,9 +32,6 @@
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -88,9 +85,10 @@
*/
static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
@ProcessElement
- public void processElement(@Element TableRow row, OutputReceiver<Integer> receiver) {
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
if (Boolean.TRUE.equals(row.get("tornado"))) {
- receiver.output(Integer.parseInt((String) row.get("month")));
+ c.output(Integer.parseInt((String) row.get("month")));
}
}
}
@@ -101,16 +99,13 @@
*/
static class FormatCountsFn extends DoFn<KV<Integer, Long>, TableRow> {
@ProcessElement
- public void processElement(
- @Element KV<Integer, Long> element,
- @Timestamp Instant timestamp,
- OutputReceiver<TableRow> receiver) {
+ public void processElement(ProcessContext c) {
TableRow row =
new TableRow()
- .set("ts", timestamp.toString())
- .set("month", element.getKey())
- .set("tornado_count", element.getValue());
- receiver.output(row);
+ .set("ts", c.timestamp().toString())
+ .set("month", c.element().getKey())
+ .set("tornado_count", c.element().getValue());
+ c.output(row);
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index 67e0661..43d720e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -32,8 +32,6 @@
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -81,9 +79,10 @@
*/
static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
@ProcessElement
- public void processElement(@Element TableRow row, OutputReceiver<Integer> receiver) {
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
if ((Boolean) row.get("tornado")) {
- receiver.output(Integer.parseInt((String) row.get("month")));
+ c.output(Integer.parseInt((String) row.get("month")));
}
}
}
@@ -94,11 +93,12 @@
*/
static class FormatCountsFn extends DoFn<KV<Integer, Long>, TableRow> {
@ProcessElement
- public void processElement(
- @Element KV<Integer, Long> element, OutputReceiver<TableRow> receiver) {
+ public void processElement(ProcessContext c) {
TableRow row =
- new TableRow().set("month", element.getKey()).set("tornado_count", element.getValue());
- receiver.output(row);
+ new TableRow()
+ .set("month", c.element().getKey())
+ .set("tornado_count", c.element().getValue());
+ c.output(row);
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 5f5b5d0..2a581d7 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -33,8 +33,6 @@
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -80,11 +78,12 @@
private final Counter smallerWords = Metrics.counter(ExtractLargeWordsFn.class, "smallerWords");
@ProcessElement
- public void processElement(@Element TableRow row, OutputReceiver<KV<String, String>> receiver) {
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
String playName = (String) row.get("corpus");
String word = (String) row.get("word");
if (word.length() >= MIN_WORD_LENGTH) {
- receiver.output(KV.of(word, playName));
+ c.output(KV.of(word, playName));
} else {
// Track how many smaller words we're not including. This information will be
// visible in the Monitoring UI.
@@ -99,11 +98,10 @@
*/
static class FormatShakespeareOutputFn extends DoFn<KV<String, String>, TableRow> {
@ProcessElement
- public void processElement(
- @Element KV<String, String> element, OutputReceiver<TableRow> receiver) {
+ public void processElement(ProcessContext c) {
TableRow row =
- new TableRow().set("word", element.getKey()).set("all_plays", element.getValue());
- receiver.output(row);
+ new TableRow().set("word", c.element().getKey()).set("all_plays", c.element().getValue());
+ c.output(row);
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index 26a6659..9187bb8 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -31,9 +31,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.SideInput;
import org.apache.beam.sdk.transforms.Mean;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -90,7 +87,8 @@
*/
static class ProjectionFn extends DoFn<TableRow, TableRow> {
@ProcessElement
- public void processElement(@Element TableRow row, OutputReceiver<TableRow> receiver) {
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
// Grab year, month, day, mean_temp from the row
Integer year = Integer.parseInt((String) row.get("year"));
Integer month = Integer.parseInt((String) row.get("month"));
@@ -103,7 +101,7 @@
.set("month", month)
.set("day", day)
.set("mean_temp", meanTemp);
- receiver.output(outRow);
+ c.output(outRow);
}
}
@@ -121,11 +119,12 @@
}
@ProcessElement
- public void processElement(@Element TableRow row, OutputReceiver<TableRow> receiver) {
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
Integer month;
month = (Integer) row.get("month");
if (month.equals(this.monthFilter)) {
- receiver.output(row);
+ c.output(row);
}
}
}
@@ -136,9 +135,10 @@
*/
static class ExtractTempFn extends DoFn<TableRow, Double> {
@ProcessElement
- public void processElement(@Element TableRow row, OutputReceiver<Double> receiver) {
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
- receiver.output(meanTemp);
+ c.output(meanTemp);
}
}
@@ -178,17 +178,16 @@
ParDo.of(
new DoFn<TableRow, TableRow>() {
@ProcessElement
- public void processElement(
- @SideInput("globalMeanTemp") Double gTemp,
- @Element TableRow element,
- OutputReceiver<TableRow> receiver) {
- Double meanTemp = Double.parseDouble(element.get("mean_temp").toString());
+ public void processElement(ProcessContext c) {
+ Double meanTemp =
+ Double.parseDouble(c.element().get("mean_temp").toString());
+ Double gTemp = c.sideInput(globalMeanTemp);
if (meanTemp < gTemp) {
- receiver.output(element);
+ c.output(c.element());
}
}
})
- .withSideInput("globalMeanTemp", globalMeanTemp));
+ .withSideInputs(globalMeanTemp));
return filteredRows;
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index e6f8573..f78df0c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -26,8 +26,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -92,14 +90,13 @@
ParDo.of(
new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
@ProcessElement
- public void processElement(
- @Element KV<String, CoGbkResult> element,
- OutputReceiver<KV<String, String>> receiver) {
- String countryCode = element.getKey();
- String countryName = element.getValue().getOnly(countryInfoTag);
- for (String eventInfo : element.getValue().getAll(eventInfoTag)) {
+ public void processElement(ProcessContext c) {
+ KV<String, CoGbkResult> e = c.element();
+ String countryCode = e.getKey();
+ String countryName = e.getValue().getOnly(countryInfoTag);
+ for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) {
// Generate a string that combines information from both collection values
- receiver.output(
+ c.output(
KV.of(
countryCode,
"Country name: " + countryName + ", Event info: " + eventInfo));
@@ -114,11 +111,10 @@
ParDo.of(
new DoFn<KV<String, String>, String>() {
@ProcessElement
- public void processElement(
- @Element KV<String, String> element, OutputReceiver<String> receiver) {
+ public void processElement(ProcessContext c) {
String outputstring =
- "Country code: " + element.getKey() + ", " + element.getValue();
- receiver.output(outputstring);
+ "Country code: " + c.element().getKey() + ", " + c.element().getValue();
+ c.output(outputstring);
}
}));
return formattedResults;
@@ -130,13 +126,14 @@
*/
static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> {
@ProcessElement
- public void processElement(@Element TableRow row, OutputReceiver<KV<String, String>> receiver) {
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
String countryCode = (String) row.get("ActionGeo_CountryCode");
String sqlDate = (String) row.get("SQLDATE");
String actor1Name = (String) row.get("Actor1Name");
String sourceUrl = (String) row.get("SOURCEURL");
String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl;
- receiver.output(KV.of(countryCode, eventInfo));
+ c.output(KV.of(countryCode, eventInfo));
}
}
@@ -146,10 +143,11 @@
*/
static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> {
@ProcessElement
- public void processElement(@Element TableRow row, OutputReceiver<KV<String, String>> receiver) {
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
String countryCode = (String) row.get("FIPSCC");
String countryName = (String) row.get("HumanName");
- receiver.output(KV.of(countryCode, countryName));
+ c.output(KV.of(countryCode, countryName));
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
index 85df56a..8760d56 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -30,8 +30,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -75,22 +73,23 @@
*/
static class ExtractTempFn extends DoFn<TableRow, KV<Integer, Double>> {
@ProcessElement
- public void processElement(
- @Element TableRow row, OutputReceiver<KV<Integer, Double>> receiver) {
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
Integer month = Integer.parseInt((String) row.get("month"));
Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
- receiver.output(KV.of(month, meanTemp));
+ c.output(KV.of(month, meanTemp));
}
}
/** Format the results to a TableRow, to save to BigQuery. */
static class FormatMaxesFn extends DoFn<KV<Integer, Double>, TableRow> {
@ProcessElement
- public void processElement(
- @Element KV<Integer, Double> element, OutputReceiver<TableRow> receiver) {
+ public void processElement(ProcessContext c) {
TableRow row =
- new TableRow().set("month", element.getKey()).set("max_mean_temp", element.getValue());
- receiver.output(row);
+ new TableRow()
+ .set("month", c.element().getKey())
+ .set("max_mean_temp", c.element().getValue());
+ c.output(row);
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java
index 0c39c60..713af1d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java
@@ -26,8 +26,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -75,9 +73,10 @@
*/
static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
@ProcessElement
- public void processElement(@Element TableRow row, OutputReceiver<Integer> receiver) {
+ public void processElement(ProcessContext c) {
+ TableRow row = c.element();
if ((Boolean) row.get("tornado")) {
- receiver.output(Integer.parseInt((String) row.get("month")));
+ c.output(Integer.parseInt((String) row.get("month")));
}
}
}
@@ -88,9 +87,8 @@
*/
static class FormatCountsFn extends DoFn<KV<Integer, Long>, String> {
@ProcessElement
- public void processElement(
- @Element KV<Integer, Long> element, OutputReceiver<String> receiver) {
- receiver.output(element.getKey() + ": " + element.getValue());
+ public void processElement(ProcessContext c) {
+ c.output(c.element().getKey() + ": " + c.element().getValue());
}
}
@@ -136,9 +134,9 @@
}
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> receiver) {
- LOG.info("{}{}", prefix, element);
- receiver.output(element);
+ public void processElement(ProcessContext c) {
+ LOG.info("{}{}", prefix, c.element());
+ c.output(c.element());
}
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index 5270077..cd3c6dd 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -38,9 +38,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -49,7 +46,6 @@
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
@@ -368,18 +364,15 @@
new DoFn<KV<String, Iterable<Integer>>, KV<String, String>>() {
@ProcessElement
- public void processElement(
- @Element KV<String, Iterable<Integer>> element,
- OutputReceiver<KV<String, String>> receiver)
- throws Exception {
- Iterable<Integer> flows = element.getValue();
+ public void processElement(ProcessContext c) throws Exception {
+ Iterable<Integer> flows = c.element().getValue();
Integer sum = 0;
Long numberOfRecords = 0L;
for (Integer value : flows) {
sum += value;
numberOfRecords++;
}
- receiver.output(KV.of(element.getKey(), sum + "," + numberOfRecords));
+ c.output(KV.of(c.element().getKey(), sum + "," + numberOfRecords));
}
}));
PCollection<TableRow> output = results.apply(ParDo.of(new FormatTotalFlow(triggerType)));
@@ -399,27 +392,21 @@
}
@ProcessElement
- public void processElement(
- PaneInfo pane,
- @Timestamp Instant timestamp,
- BoundedWindow window,
- @Element KV<String, String> element,
- OutputReceiver<TableRow> receiver)
- throws Exception {
- String[] values = element.getValue().split(",", -1);
+ public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ String[] values = c.element().getValue().split(",", -1);
TableRow row =
new TableRow()
.set("trigger_type", triggerType)
- .set("freeway", element.getKey())
+ .set("freeway", c.element().getKey())
.set("total_flow", Integer.parseInt(values[0]))
.set("number_of_records", Long.parseLong(values[1]))
.set("window", window.toString())
- .set("isFirst", pane.isFirst())
- .set("isLast", pane.isLast())
- .set("timing", pane.getTiming().toString())
- .set("event_time", timestamp.toString())
+ .set("isFirst", c.pane().isFirst())
+ .set("isLast", c.pane().isLast())
+ .set("timing", c.pane().getTiming().toString())
+ .set("event_time", c.timestamp().toString())
.set("processing_time", Instant.now().toString());
- receiver.output(row);
+ c.output(row);
}
}
@@ -431,9 +418,8 @@
private static final int VALID_NUM_FIELDS = 50;
@ProcessElement
- public void processElement(
- @Element String element, OutputReceiver<KV<String, Integer>> receiver) throws Exception {
- String[] laneInfo = element.split(",", -1);
+ public void processElement(ProcessContext c) throws Exception {
+ String[] laneInfo = c.element().split(",", -1);
if ("timestamp".equals(laneInfo[0])) {
// Header row
return;
@@ -449,7 +435,7 @@
if (totalFlow == null || totalFlow <= 0) {
return;
}
- receiver.output(KV.of(freeway, totalFlow));
+ c.output(KV.of(freeway, totalFlow));
}
}
@@ -523,8 +509,7 @@
}
@ProcessElement
- public void processElement(@Element String element, OutputReceiver<String> receiver)
- throws Exception {
+ public void processElement(ProcessContext c) throws Exception {
Instant timestamp = Instant.now();
if (random.nextDouble() < THRESHOLD) {
int range = MAX_DELAY - MIN_DELAY;
@@ -532,7 +517,7 @@
long delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes);
timestamp = new Instant(timestamp.getMillis() - delayInMillis);
}
- receiver.outputWithTimestamp(element, timestamp);
+ c.outputWithTimestamp(c.element(), timestamp);
}
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index 99aada2..4f24c69 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -76,10 +76,6 @@
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.SideInput;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
import org.apache.beam.sdk.transforms.Latest;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
@@ -544,14 +540,14 @@
ParDo.of(
new DoFn<KV<String, CoGbkResult>, String>() {
@ProcessElement
- public void processElement(
- @Element KV<String, CoGbkResult> e, OutputReceiver<String> receiver) {
+ public void processElement(ProcessContext c) {
+ KV<String, CoGbkResult> e = c.element();
String name = e.getKey();
Iterable<String> emailsIter = e.getValue().getAll(emailsTag);
Iterable<String> phonesIter = e.getValue().getAll(phonesTag);
String formattedResult =
Snippets.formatCoGbkResults(name, emailsIter, phonesIter);
- receiver.output(formattedResult);
+ c.output(formattedResult);
}
}));
// [END CoGroupByKeyTuple]
@@ -646,23 +642,20 @@
new DoFn<Long, KV<Long, Long>>() {
@ProcessElement
- public void process(
- @Timestamp Instant timestamp,
- @Element Long element,
- @SideInput("mapIterable") Iterable<Map<String, String>> si,
- OutputReceiver<KV<Long, Long>> receiver) {
+ public void process(ProcessContext c, @Timestamp Instant timestamp) {
+ Iterable<Map<String, String>> si = c.sideInput(mapIterable);
// Take an element from the side input iterable (likely length 1)
Map<String, String> keyMap = si.iterator().next();
- receiver.outputWithTimestamp(KV.of(1L, element), Instant.now());
+ c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
LOG.info(
"Value is {} with timestamp {}, using key A from side input with time {}.",
- element,
+ c.element(),
timestamp.toString(DateTimeFormat.forPattern("HH:mm:ss")),
keyMap.get("Key_A"));
}
})
- .withSideInput("mapIterable", mapIterable));
+ .withSideInputs(mapIterable));
p.run();
}
@@ -708,9 +701,9 @@
// Define the DoFn that logs the ValueProvider value.
@ProcessElement
- public void process(PipelineOptions options) {
+ public void process(ProcessContext c) {
- MyOptions ops = options.as(MyOptions.class);
+ MyOptions ops = c.getPipelineOptions().as(MyOptions.class);
// This example logs the ValueProvider value, but you could store it by
// pushing it to an external database.
@@ -950,13 +943,11 @@
ParDo.of(
new DoFn<Instant, Long>() {
@ProcessElement
- public void process(
- @SideInput("sideInput") List<Long> sideInputValue,
- OutputReceiver<Long> receiver) {
- receiver.output((long) sideInputValue.size());
+ public void process(ProcessContext c) {
+ c.output((long) c.sideInput(sideInput).size());
}
})
- .withSideInput("sideInput", sideInput));
+ .withSideInputs(sideInput));
// [END PeriodicallyUpdatingSideInputs]
return result;
}
@@ -1197,10 +1188,7 @@
private static class BundleFinalizationDoFn extends DoFn<String, Integer> {
// [START BundleFinalize]
@ProcessElement
- public void processElement(
- @Element String element,
- OutputReceiver<Integer> receiver,
- BundleFinalizer bundleFinalizer) {
+ public void processElement(ProcessContext c, BundleFinalizer bundleFinalizer) {
// ... produce output ...
bundleFinalizer.afterBundleCommit(
diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java b/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java
index b7fb7b8..4aa20fc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java
@@ -28,8 +28,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
@@ -86,13 +84,11 @@
}
@ProcessElement
- public void processElement(
- @Element KV<String, String> element, OutputReceiver<KV<String, String>> receiver)
- throws Exception {
+ public void processElement(ProcessContext c) throws Exception {
try {
// Our Library takes a single command in position 0 which it will echo back in the result
SubProcessCommandLineArgs commands = new SubProcessCommandLineArgs();
- Command command = new Command(0, String.valueOf(element.getValue()));
+ Command command = new Command(0, String.valueOf(c.element().getValue()));
commands.putCommand(command);
// The ProcessingKernel deals with the execution of the process
@@ -101,7 +97,7 @@
// Run the command and work through the results
List<String> results = kernel.exec(commands);
for (String s : results) {
- receiver.output(KV.of(element.getKey(), s));
+ c.output(KV.of(c.element().getKey(), s));
}
} catch (Exception ex) {
LOG.error("Error processing element ", ex);
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
index 0d9e257..19c83c6 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
@@ -29,8 +29,6 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -148,8 +146,8 @@
static class FormatResults extends DoFn<TableRow, String> {
@ProcessElement
- public void processElement(@Element TableRow element, OutputReceiver<String> receiver)
- throws Exception {
+ public void processElement(ProcessContext c) throws Exception {
+ TableRow element = c.element();
TableRow row =
new TableRow()
.set("trigger_type", element.get("trigger_type"))
@@ -160,7 +158,7 @@
.set("isLast", element.get("isLast"))
.set("timing", element.get("timing"))
.set("window", element.get("window"));
- receiver.output(canonicalFormat(row));
+ c.output(canonicalFormat(row));
}
}
}
diff --git a/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java b/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java
index 9981b40..01055d9 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java
@@ -41,8 +41,6 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -146,13 +144,11 @@
}
@ProcessElement
- public void processElement(
- @Element KV<String, String> element, OutputReceiver<KV<String, String>> receiver)
- throws Exception {
+ public void processElement(ProcessContext c) throws Exception {
try {
// Our Library takes a single command in position 0 which it will echo back in the result
SubProcessCommandLineArgs commands = new SubProcessCommandLineArgs();
- Command command = new Command(0, String.valueOf(element.getValue()));
+ Command command = new Command(0, String.valueOf(c.element().getValue()));
commands.putCommand(command);
// The ProcessingKernel deals with the execution of the process
@@ -161,7 +157,7 @@
// Run the command and work through the results
List<String> results = kernel.exec(commands);
for (String s : results) {
- receiver.output(KV.of(element.getKey(), s));
+ c.output(KV.of(c.element().getKey(), s));
}
} catch (Exception ex) {
LOG.error("Error processing element ", ex);
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt
index 7be4df2..05cb404 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt
@@ -25,8 +25,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.testing.PAssert
import org.apache.beam.sdk.transforms.DoFn
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
import org.apache.beam.sdk.transforms.ParDo
import org.apache.beam.sdk.values.KV
import org.slf4j.LoggerFactory
@@ -86,18 +84,18 @@
private val unmatchedWords = Metrics.counter(FilterTextFn::class.java, "unmatchedWords")
@ProcessElement
- fun processElement(@Element element: KV<String, Long>, receiver: OutputReceiver<KV<String, Long>>) {
- if (filter.matcher(element.key).matches()) {
+ fun processElement(c: ProcessContext) {
+ if (filter.matcher(c.element().key).matches()) {
// Log at the "DEBUG" level each element that we match. When executing this pipeline
// these log lines will appear only if the log level is set to "DEBUG" or lower.
- LOG.debug("Matched: ${element.key}")
+ LOG.debug("Matched: ${c.element().key}")
matchedWords.inc()
- receiver.output(element)
+ c.output(c.element())
} else {
// Log at the "TRACE" level each element that is not matched. Different log levels
// can be used to control the verbosity of logging providing an effective mechanism
// to filter less important information.
- LOG.trace("Did not match: ${element.key}")
+ LOG.trace("Did not match: ${c.element().key}")
unmatchedWords.inc()
}
}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt
index 8edc988..ec56bc6 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt
@@ -27,8 +27,6 @@
import org.apache.beam.sdk.options.*
import org.apache.beam.sdk.transforms.Count
import org.apache.beam.sdk.transforms.DoFn
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.transforms.ParDo
import org.apache.beam.sdk.values.KV
@@ -75,9 +73,10 @@
*/
internal class ExtractTornadoesFn : DoFn<TableRow, Int>() {
@ProcessElement
- fun processElement(@Element row: TableRow, receiver: OutputReceiver<Int>) {
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
if (row["tornado"] as Boolean) {
- receiver.output(Integer.parseInt(row["month"] as String))
+ c.output(Integer.parseInt(row["month"] as String))
}
}
}
@@ -88,11 +87,11 @@
*/
internal class FormatCountsFn : DoFn<KV<Int, Long>, TableRow>() {
@ProcessElement
- fun processElement(@Element element: KV<Int, Long>, receiver: OutputReceiver<TableRow>) {
+ fun processElement(c: ProcessContext) {
val row = TableRow()
- .set("month", element.key)
- .set("tornado_count", element.value)
- receiver.output(row)
+ .set("month", c.element().key)
+ .set("tornado_count", c.element().value)
+ c.output(row)
}
}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt
index 4d89b69..9e38803 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt
@@ -26,8 +26,6 @@
import org.apache.beam.sdk.metrics.Metrics
import org.apache.beam.sdk.options.*
import org.apache.beam.sdk.transforms.*
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
import org.apache.beam.sdk.values.KV
import org.apache.beam.sdk.values.PCollection
@@ -72,11 +70,12 @@
private val smallerWords = Metrics.counter(ExtractLargeWordsFn::class.java, "smallerWords")
@ProcessElement
- fun processElement(@Element row: TableRow, receiver: OutputReceiver<KV<String, String>>) {
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
val playName = row["corpus"] as String
val word = row["word"] as String
if (word.length >= MIN_WORD_LENGTH) {
- receiver.output(KV.of(word, playName))
+ c.output(KV.of(word, playName))
} else {
// Track how many smaller words we're not including. This information will be
// visible in the Monitoring UI.
@@ -91,9 +90,9 @@
*/
internal class FormatShakespeareOutputFn : DoFn<KV<String, String>, TableRow>() {
@ProcessElement
- fun processElement(@Element element: KV<String, String>, receiver: OutputReceiver<TableRow>) {
- val row = TableRow().set("word", element.key).set("all_plays", element.value)
- receiver.output(row)
+ fun processElement(c: ProcessContext) {
+ val row = TableRow().set("word", c.element().key).set("all_plays", c.element().value)
+ c.output(row)
}
}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt
index 2181884..2625f5b 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt
@@ -25,9 +25,6 @@
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
import org.apache.beam.sdk.options.*
import org.apache.beam.sdk.transforms.*
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
-import org.apache.beam.sdk.transforms.DoFn.SideInput
import org.apache.beam.sdk.values.PCollection
import java.util.logging.Logger
@@ -83,7 +80,8 @@
*/
internal class ProjectionFn : DoFn<TableRow, TableRow>() {
@ProcessElement
- fun processElement(@Element row: TableRow, receiver: OutputReceiver<TableRow>) {
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
// Grab year, month, day, mean_temp from the row
val year = Integer.parseInt(row["year"] as String)
val month = Integer.parseInt(row["month"] as String)
@@ -96,7 +94,7 @@
.set("month", month)
.set("day", day)
.set("mean_temp", meanTemp)
- receiver.output(outRow)
+ c.output(outRow)
}
}
@@ -110,10 +108,11 @@
internal class FilterSingleMonthDataFn(private var monthFilter: Int?) : DoFn<TableRow, TableRow>() {
@ProcessElement
- fun processElement(@Element row: TableRow, receiver: OutputReceiver<TableRow>) {
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
val month = row["month"]
if (month == this.monthFilter) {
- receiver.output(row)
+ c.output(row)
}
}
}
@@ -124,9 +123,10 @@
*/
internal class ExtractTempFn : DoFn<TableRow, Double>() {
@ProcessElement
- fun processElement(@Element row: TableRow, receiver: OutputReceiver<Double>) {
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
val meanTemp = java.lang.Double.parseDouble(row["mean_temp"].toString())
- receiver.output(meanTemp)
+ c.output(meanTemp)
}
}
@@ -158,17 +158,15 @@
ParDo.of(
object : DoFn<TableRow, TableRow>() {
@ProcessElement
- fun processElement(
- @SideInput("globalMeanTemp") gTemp: Double,
- @Element element: TableRow,
- receiver: OutputReceiver<TableRow>) {
- val meanTemp = java.lang.Double.parseDouble(element["mean_temp"].toString())
+ fun processElement(c: ProcessContext) {
+ val meanTemp = java.lang.Double.parseDouble(c.element()["mean_temp"].toString())
+ val gTemp = c.sideInput(globalMeanTemp)
if (meanTemp < gTemp) {
- receiver.output(element)
+ c.output(c.element())
}
}
})
- .withSideInput("globalMeanTemp", globalMeanTemp))
+ .withSideInputs(globalMeanTemp))
}
}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt
index 7f81629..2f2215e 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt
@@ -26,8 +26,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.options.Validation
import org.apache.beam.sdk.transforms.DoFn
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
import org.apache.beam.sdk.transforms.ParDo
import org.apache.beam.sdk.transforms.join.CoGbkResult
import org.apache.beam.sdk.transforms.join.CoGroupByKey
@@ -91,14 +89,13 @@
ParDo.of(
object : DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
@ProcessElement
- fun processElement(
- @Element element: KV<String, CoGbkResult>,
- receiver: OutputReceiver<KV<String, String>>) {
- val countryCode = element.key
- val countryName = element.value.getOnly(countryInfoTag)
- for (ei in element.value.getAll(eventInfoTag)) {
+ fun processElement(c: ProcessContext) {
+ val e = c.element()
+ val countryCode = e.key
+ val countryName = e.value.getOnly(countryInfoTag)
+ for (ei in c.element().value.getAll(eventInfoTag)) {
// Generate a string that combines information from both collection values
- receiver.output(
+ c.output(
KV.of<String, String>(
countryCode,
"Country name: $countryName, Event info: $ei"))
@@ -112,9 +109,9 @@
ParDo.of(
object : DoFn<KV<String, String>, String>() {
@ProcessElement
- fun processElement(@Element element: KV<String, String>, receiver: OutputReceiver<String>) {
- val outputString = "Country code: ${element.key}, ${element.value}"
- receiver.output(outputString)
+ fun processElement(c: ProcessContext) {
+ val outputString = "Country code: ${c.element().key}, ${c.element().value}"
+ c.output(outputString)
}
}))
}
@@ -125,13 +122,14 @@
*/
internal class ExtractEventDataFn : DoFn<TableRow, KV<String, String>>() {
@ProcessElement
- fun processElement(@Element row: TableRow, receiver: OutputReceiver<KV<String, String>>) {
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
val countryCode = row["ActionGeo_CountryCode"] as String
val sqlDate = row["SQLDATE"] as String
val actor1Name = row["Actor1Name"] as String
val sourceUrl = row["SOURCEURL"] as String
val eventInfo = "Date: $sqlDate, Actor1: $actor1Name, url: $sourceUrl"
- receiver.output(KV.of(countryCode, eventInfo))
+ c.output(KV.of(countryCode, eventInfo))
}
}
@@ -141,10 +139,11 @@
*/
internal class ExtractCountryInfoFn : DoFn<TableRow, KV<String, String>>() {
@ProcessElement
- fun processElement(@Element row: TableRow, receiver: OutputReceiver<KV<String, String>>) {
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
val countryCode = row["FIPSCC"] as String
val countryName = row["HumanName"] as String
- receiver.output(KV.of(countryCode, countryName))
+ c.output(KV.of(countryCode, countryName))
}
}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt
index ad16ecd..11418d3 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt
@@ -26,8 +26,6 @@
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
import org.apache.beam.sdk.options.*
import org.apache.beam.sdk.transforms.DoFn
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
import org.apache.beam.sdk.transforms.Max
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.transforms.ParDo
@@ -75,21 +73,22 @@
*/
internal class ExtractTempFn : DoFn<TableRow, KV<Int, Double>>() {
@ProcessElement
- fun processElement(@Element row: TableRow, receiver: OutputReceiver<KV<Int, Double>>) {
+ fun processElement(c: ProcessContext) {
+ val row = c.element()
val month = Integer.parseInt(row["month"] as String)
val meanTemp = java.lang.Double.parseDouble(row["mean_temp"].toString())
- receiver.output(KV.of(month, meanTemp))
+ c.output(KV.of(month, meanTemp))
}
}
/** Format the results to a TableRow, to save to BigQuery. */
internal class FormatMaxesFn : DoFn<KV<Int, Double>, TableRow>() {
@ProcessElement
- fun processElement(@Element element: KV<Int, Double>, receiver: OutputReceiver<TableRow>) {
+ fun processElement(c: ProcessContext) {
val row = TableRow()
- .set("month", element.key)
- .set("max_mean_temp", element.value)
- receiver.output(row)
+ .set("month", c.element().key)
+ .set("max_mean_temp", c.element().value)
+ c.output(row)
}
}
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt
index bb8c090..4afa7d0 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt
@@ -34,9 +34,6 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.options.StreamingOptions
import org.apache.beam.sdk.transforms.DoFn
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
-import org.apache.beam.sdk.transforms.DoFn.Timestamp
import org.apache.beam.sdk.transforms.GroupByKey
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.transforms.ParDo
@@ -367,17 +364,15 @@
@ProcessElement
@Throws(Exception::class)
- fun processElement(
- @Element element: KV<String, Iterable<Int>>,
- receiver: OutputReceiver<KV<String, String>>) {
- val flows = element.value
+ fun processElement(c: ProcessContext) {
+ val flows = c.element().value
var sum = 0
var numberOfRecords = 0L
for (value in flows) {
sum += value
numberOfRecords++
}
- receiver.output(KV.of<String, String>(element.key, "$sum,$numberOfRecords"))
+ c.output(KV.of<String, String>(c.element().key, "$sum,$numberOfRecords"))
}
}))
return results.apply(ParDo.of(FormatTotalFlow(triggerType)))
@@ -392,25 +387,20 @@
@ProcessElement
@Throws(Exception::class)
- fun processElement(
- @Element element: KV<String, String>,
- window: BoundedWindow,
- pane: PaneInfo,
- @Timestamp timestamp: Instant,
- receiver: OutputReceiver<TableRow>) {
- val values = element.value.split(",".toRegex()).toTypedArray()
+ fun processElement(c: ProcessContext, window: BoundedWindow) {
+ val values = c.element().value.split(",".toRegex()).toTypedArray()
val row = TableRow()
.set("trigger_type", triggerType)
- .set("freeway", element.key)
+ .set("freeway", c.element().key)
.set("total_flow", Integer.parseInt(values[0]))
.set("number_of_records", java.lang.Long.parseLong(values[1]))
.set("window", window.toString())
- .set("isFirst", pane.isFirst)
- .set("isLast", pane.isLast)
- .set("timing", pane.timing.toString())
- .set("event_time", timestamp.toString())
+ .set("isFirst", c.pane().isFirst)
+ .set("isLast", c.pane().isLast)
+ .set("timing", c.pane().timing.toString())
+ .set("event_time", c.timestamp().toString())
.set("processing_time", Instant.now().toString())
- receiver.output(row)
+ c.output(row)
}
}
@@ -422,8 +412,8 @@
@ProcessElement
@Throws(Exception::class)
- fun processElement(@Element element: String, receiver: OutputReceiver<KV<String, Int>>) {
- val laneInfo = element.split(",".toRegex()).toTypedArray()
+ fun processElement(c: ProcessContext) {
+ val laneInfo = c.element().split(",".toRegex()).toTypedArray()
if ("timestamp" == laneInfo[0]) {
// Header row
return
@@ -439,7 +429,7 @@
if (totalFlow == null || totalFlow <= 0) {
return
}
- receiver.output(KV.of<String, Int>(freeway, totalFlow))
+ c.output(KV.of<String, Int>(freeway, totalFlow))
}
companion object {
@@ -496,7 +486,7 @@
@ProcessElement
@Throws(Exception::class)
- fun processElement(@Element element: String, receiver: OutputReceiver<String>) {
+ fun processElement(c: ProcessContext) {
var timestamp = Instant.now()
if (random.nextDouble() < THRESHOLD) {
val range = MAX_DELAY - MIN_DELAY
@@ -504,7 +494,7 @@
val delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes.toLong())
timestamp = Instant(timestamp.millis - delayInMillis)
}
- receiver.outputWithTimestamp(element, timestamp)
+ c.outputWithTimestamp(c.element(), timestamp)
}
companion object {
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt
index eb20993..d2f58c2 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt
@@ -33,8 +33,6 @@
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
import org.apache.beam.sdk.transforms.*
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
import org.apache.beam.sdk.transforms.join.CoGbkResult
import org.apache.beam.sdk.transforms.join.CoGroupByKey
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple
@@ -362,12 +360,13 @@
ParDo.of(
object : DoFn<KV<String, CoGbkResult>, String>() {
@ProcessElement
- fun processElement(@Element element: KV<String, CoGbkResult>, receiver: OutputReceiver<String>) {
- val name = element.key
- val emailsIter = element.value.getAll(emailsTag)
- val phonesIter = element.value.getAll(phonesTag)
+ fun processElement(c: ProcessContext) {
+ val e = c.element()
+ val name = e.key
+ val emailsIter = e.value.getAll(emailsTag)
+ val phonesIter = e.value.getAll(phonesTag)
val formattedResult = formatCoGbkResults(name, emailsIter, phonesIter)
- receiver.output(formattedResult)
+ c.output(formattedResult)
}
}))
}