Revert "remove processContext usage across examples (java and kotlin)" (#38341)

* Revert "remove processContext usage across examples (java and kotlin) (#37937)"

This reverts commit 4151dded54fc3b120e8a2b382ce64f9eee576a06.

* Restore SKILLs.md as it does not affect Beam playground
diff --git a/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java
index 167d301..2a5f85e 100644
--- a/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java
+++ b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java
@@ -28,8 +28,6 @@
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -76,8 +74,9 @@
 
   static class ExtractBrowserTransactionsFn extends DoFn<Row, KV<String, Long>> {
     @ProcessElement
-    public void processElement(@Element Row row, OutputReceiver<KV<String, Long>> receiver) {
-      receiver.output(
+    public void processElement(ProcessContext c) {
+      Row row = c.element();
+      c.output(
           KV.of(
               Preconditions.checkStateNotNull(row.getString("browser")),
               Preconditions.checkStateNotNull(row.getInt64("transactions"))));
@@ -86,13 +85,13 @@
 
   static class FormatCountsFn extends DoFn<KV<String, Long>, Row> {
     @ProcessElement
-    public void processElement(@Element KV<String, Long> element, OutputReceiver<Row> receiver) {
+    public void processElement(ProcessContext c) {
       Row row =
           Row.withSchema(AGGREGATED_SCHEMA)
-              .withFieldValue("browser", element.getKey())
-              .withFieldValue("transaction_count", element.getValue())
+              .withFieldValue("browser", c.element().getKey())
+              .withFieldValue("transaction_count", c.element().getValue())
               .build();
-      receiver.output(row);
+      c.output(row);
     }
   }
 
diff --git a/examples/java/sql/src/main/java/org/apache/beam/examples/SchemaTransformExample.java b/examples/java/sql/src/main/java/org/apache/beam/examples/SchemaTransformExample.java
index 8415568..5b8e4a3 100644
--- a/examples/java/sql/src/main/java/org/apache/beam/examples/SchemaTransformExample.java
+++ b/examples/java/sql/src/main/java/org/apache/beam/examples/SchemaTransformExample.java
@@ -42,8 +42,6 @@
 import org.apache.beam.sdk.schemas.transforms.Select;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -103,9 +101,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/sql/src/main/java/org/apache/beam/examples/SqlTransformExample.java b/examples/java/sql/src/main/java/org/apache/beam/examples/SqlTransformExample.java
index 9ca2dda..9c2302c 100644
--- a/examples/java/sql/src/main/java/org/apache/beam/examples/SqlTransformExample.java
+++ b/examples/java/sql/src/main/java/org/apache/beam/examples/SqlTransformExample.java
@@ -41,8 +41,6 @@
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
@@ -97,9 +95,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/ApproximateQuantilesExample.java b/examples/java/src/main/java/org/apache/beam/examples/ApproximateQuantilesExample.java
index 0f968e8..9e2a96b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/ApproximateQuantilesExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/ApproximateQuantilesExample.java
@@ -24,8 +24,6 @@
 import org.apache.beam.sdk.transforms.ApproximateQuantiles;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.slf4j.Logger;
@@ -72,9 +70,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java
index 1aafd50..0ee7d7b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java
@@ -46,8 +46,6 @@
 import org.apache.beam.sdk.transforms.CombineFns;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -187,16 +185,13 @@
                 new DoFn<
                     KV<Long, CombineFns.CoCombineResult>, KV<Long, Iterable<KV<String, Long>>>>() {
                   @ProcessElement
-                  public void processElement(
-                      @Element KV<Long, CombineFns.CoCombineResult> element,
-                      OutputReceiver<KV<Long, Iterable<KV<String, Long>>>> receiver)
-                      throws Exception {
-                    CombineFns.CoCombineResult e = element.getValue();
+                  public void processElement(ProcessContext c) throws Exception {
+                    CombineFns.CoCombineResult e = c.element().getValue();
                     ArrayList<KV<String, Long>> o = new ArrayList<KV<String, Long>>();
                     o.add(KV.of(minTag.getId(), e.get(minTag)));
                     o.add(KV.of(maxTag.getId(), e.get(maxTag)));
                     o.add(KV.of(sumTag.getId(), e.get(sumTag)));
-                    receiver.output(KV.of(element.getKey(), o));
+                    c.output(KV.of(c.element().getKey(), o));
                   }
                 }));
 
@@ -215,9 +210,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/CoGroupByKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/CoGroupByKeyExample.java
index 46905ad..c77708b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/CoGroupByKeyExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/CoGroupByKeyExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -86,9 +84,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/CombineExample.java b/examples/java/src/main/java/org/apache/beam/examples/CombineExample.java
index 5f6901e..24bed27 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/CombineExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/CombineExample.java
@@ -23,8 +23,6 @@
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.PCollection;
@@ -70,9 +68,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/CountExample.java b/examples/java/src/main/java/org/apache/beam/examples/CountExample.java
index f95ec3f..cb0bd0e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/CountExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/CountExample.java
@@ -23,8 +23,6 @@
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.slf4j.Logger;
@@ -65,9 +63,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/CountPerKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/CountPerKeyExample.java
index f02c262..9bf9bf1 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/CountPerKeyExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/CountPerKeyExample.java
@@ -23,8 +23,6 @@
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -69,9 +67,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/CreateExample.java b/examples/java/src/main/java/org/apache/beam/examples/CreateExample.java
index b5eb5ba..5943ffa 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/CreateExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/CreateExample.java
@@ -27,8 +27,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -81,9 +79,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 07c5be7..7c54e23 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -46,8 +46,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -117,19 +115,18 @@
     private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unmatchedWords");
 
     @ProcessElement
-    public void processElement(
-        @Element KV<String, Long> element, OutputReceiver<KV<String, Long>> receiver) {
-      if (filter.matcher(element.getKey()).matches()) {
+    public void processElement(ProcessContext c) {
+      if (filter.matcher(c.element().getKey()).matches()) {
         // Log at the "DEBUG" level each element that we match. When executing this pipeline
         // these log lines will appear only if the log level is set to "DEBUG" or lower.
-        LOG.debug("Matched: {}", element.getKey());
+        LOG.debug("Matched: {}", c.element().getKey());
         matchedWords.inc();
-        receiver.output(element);
+        c.output(c.element());
       } else {
         // Log at the "TRACE" level each element that is not matched. Different log levels
         // can be used to control the verbosity of logging providing an effective mechanism
         // to filter less important information.
-        LOG.trace("Did not match: {}", element.getKey());
+        LOG.trace("Did not match: {}", c.element().getKey());
         unmatchedWords.inc();
       }
     }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DistinctExample.java b/examples/java/src/main/java/org/apache/beam/examples/DistinctExample.java
index 28fd4af..d3ff9d6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DistinctExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DistinctExample.java
@@ -23,8 +23,6 @@
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.slf4j.Logger;
@@ -70,9 +68,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/FlatMapElementsExample.java b/examples/java/src/main/java/org/apache/beam/examples/FlatMapElementsExample.java
index 4e9493a..71d05ac 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/FlatMapElementsExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/FlatMapElementsExample.java
@@ -24,8 +24,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.FlatMapElements;
 import org.apache.beam.sdk.transforms.InferableFunction;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -80,9 +78,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java b/examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java
index 2120795..78e898b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.GroupIntoBatches;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
@@ -76,9 +74,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaPassengerCountJson.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaPassengerCountJson.java
index 5b26455..20f7023 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KafkaPassengerCountJson.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaPassengerCountJson.java
@@ -55,8 +55,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.Values;
@@ -111,13 +109,10 @@
             ParDo.of(
                 new DoFn<String, KV<Integer, Integer>>() {
                   @ProcessElement
-                  public void processElement(
-                      @Element String element, OutputReceiver<KV<Integer, Integer>> receiver)
-                      throws JsonProcessingException {
+                  public void processElement(ProcessContext c) throws JsonProcessingException {
                     final VendorToPassengerDTO result =
-                        om.readValue(element, new TypeReference<VendorToPassengerDTO>() {});
-                    receiver.output(
-                        KV.of(result.getVendorIdField(), result.getPassengerCountField()));
+                        om.readValue(c.element(), new TypeReference<VendorToPassengerDTO>() {});
+                    c.output(KV.of(result.getVendorIdField(), result.getPassengerCountField()));
                   }
                 }))
         .apply(
@@ -129,11 +124,11 @@
                 new DoFn<KV<Integer, Integer>, KV<Integer, Integer>>() {
                   @ProcessElement
                   public void processElement(
-                      OutputReceiver<KV<Integer, Integer>> out,
-                      @Element KV<Integer, Integer> element) {
+                      ProcessContext c, OutputReceiver<KV<Integer, Integer>> out) {
                     System.out.printf(
-                        "Vendor: %s, Passengers: %s%n", element.getKey(), element.getValue());
-                    out.output(element);
+                        "Vendor: %s, Passengers: %s%n",
+                        c.element().getKey(), c.element().getValue());
+                    out.output(c.element());
                   }
                 }));
     p.run().waitUntilFinish();
diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaStreaming.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaStreaming.java
index 327d321..f0b0922 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KafkaStreaming.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaStreaming.java
@@ -49,8 +49,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -177,9 +175,8 @@
       private static final Random RANDOM = new Random();
 
       @ProcessElement
-      public void processElement(
-          @Element Object element, OutputReceiver<KV<String, Integer>> receiver) {
-        receiver.output(generate());
+      public void processElement(ProcessContext c) {
+        c.output(generate());
       }
 
       public KV<String, Integer> generate() {
@@ -293,17 +290,17 @@
 
   static class LogResults extends DoFn<Map<String, Integer>, Map<String, Integer>> {
     @ProcessElement
-    public void processElement(
-        PaneInfo pane,
-        IntervalWindow w,
-        @Element Map<String, Integer> scores,
-        OutputReceiver<Map<String, Integer>> receiver)
-        throws Exception {
+    public void processElement(ProcessContext c, IntervalWindow w) throws Exception {
+      Map<String, Integer> map = c.element();
+      if (map == null) {
+        c.output(c.element());
+        return;
+      }
 
       String startTime = w.start().toString(dateTimeFormatter);
       String endTime = w.end().toString(dateTimeFormatter);
 
-      PaneInfo.Timing timing = pane.getTiming();
+      PaneInfo.Timing timing = c.pane().getTiming();
 
       switch (timing) {
         case EARLY:
@@ -319,7 +316,7 @@
           throw new RuntimeException("Unknown timing value");
       }
 
-      for (Map.Entry<String, Integer> entry : scores.entrySet()) {
+      for (Map.Entry<String, Integer> entry : map.entrySet()) {
         System.out.printf("%10s: %-10s%n", entry.getKey(), entry.getValue());
       }
 
@@ -329,7 +326,7 @@
         System.out.println();
       }
 
-      receiver.output(scores);
+      c.output(c.element());
     }
   }
 
@@ -343,9 +340,9 @@
 
     static class LogErrorFn extends DoFn<BadRecord, BadRecord> {
       @ProcessElement
-      public void processElement(@Element BadRecord badRecord, OutputReceiver<BadRecord> receiver) {
-        System.out.println(badRecord);
-        receiver.output(badRecord);
+      public void processElement(@Element BadRecord record, OutputReceiver<BadRecord> receiver) {
+        System.out.println(record);
+        receiver.output(record);
       }
     }
   }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java
index fad5668..9e2da24 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java
@@ -51,8 +51,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -105,11 +103,10 @@
             ParDo.of(
                 new DoFn<String, String>() {
                   @ProcessElement
-                  public void processElement(
-                      @Element String element, OutputReceiver<String> receiver) {
-                    for (String word : element.split(TOKENIZER_PATTERN, 0)) {
+                  public void processElement(ProcessContext c) {
+                    for (String word : c.element().split(TOKENIZER_PATTERN, 0)) {
                       if (!word.isEmpty()) {
-                        receiver.output(word);
+                        c.output(word);
                       }
                     }
                   }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java
index 036cc67..355614b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java
@@ -52,8 +52,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -106,11 +104,10 @@
             ParDo.of(
                 new DoFn<String, String>() {
                   @ProcessElement
-                  public void processElement(
-                      @Element String element, OutputReceiver<String> receiver) {
-                    for (String word : element.split(TOKENIZER_PATTERN, 0)) {
+                  public void processElement(ProcessContext c) {
+                    for (String word : c.element().split(TOKENIZER_PATTERN, 0)) {
                       if (!word.isEmpty()) {
-                        receiver.output(word);
+                        c.output(word);
                       }
                     }
                   }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/KeysExample.java b/examples/java/src/main/java/org/apache/beam/examples/KeysExample.java
index c9dc401..155834b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KeysExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KeysExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.Keys;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
@@ -72,9 +70,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/KvSwapExample.java b/examples/java/src/main/java/org/apache/beam/examples/KvSwapExample.java
index 192dc89..090779d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/KvSwapExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/KvSwapExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.KvSwap;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
@@ -71,9 +69,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/LatestExample.java b/examples/java/src/main/java/org/apache/beam/examples/LatestExample.java
index ef4c50d..5e9662a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/LatestExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/LatestExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.Latest;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.WithTimestamps;
@@ -86,9 +84,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MapElementsExample.java b/examples/java/src/main/java/org/apache/beam/examples/MapElementsExample.java
index 60908cf..93d8857 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MapElementsExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MapElementsExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -78,9 +76,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MaxExample.java b/examples/java/src/main/java/org/apache/beam/examples/MaxExample.java
index 845b107..9173d11 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MaxExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MaxExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
@@ -65,9 +63,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MaxPerKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/MaxPerKeyExample.java
index 5ad156a..f5eda81 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MaxPerKeyExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MaxPerKeyExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
@@ -69,9 +67,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MeanExample.java b/examples/java/src/main/java/org/apache/beam/examples/MeanExample.java
index fbb5163..a319079 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MeanExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MeanExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.Mean;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
@@ -65,9 +63,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MeanPerKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/MeanPerKeyExample.java
index 97f598d..aecccee 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MeanPerKeyExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MeanPerKeyExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.Mean;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
@@ -69,9 +67,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinExample.java b/examples/java/src/main/java/org/apache/beam/examples/MinExample.java
index db46f13..a76bcdc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
@@ -65,9 +63,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinPerKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/MinPerKeyExample.java
index e8c3c51..d3c0312 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinPerKeyExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinPerKeyExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
@@ -69,9 +67,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("LogOutput: {} {}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("LogOutput: {} {}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/PartitionExample.java b/examples/java/src/main/java/org/apache/beam/examples/PartitionExample.java
index c202aed..b34f2bd 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/PartitionExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/PartitionExample.java
@@ -32,8 +32,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Partition;
 import org.apache.beam.sdk.values.PCollection;
@@ -194,9 +192,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java
index 57b7a08..3ec8fce 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java
@@ -31,8 +31,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -100,8 +98,8 @@
     }
 
     @ProcessElement
-    public void processElement(@Element String element, OutputReceiver<String> receiver)
-        throws Exception {
+    public void processElement(ProcessContext c) throws Exception {
+      String element = c.element();
       try {
         Preconditions.checkNotNull(rateLimiter).allow(1);
       } catch (Exception e) {
@@ -111,7 +109,7 @@
       // Simulate external API call
       LOG.info("Processing: {}", element);
       Thread.sleep(100);
-      receiver.output("Processed: " + element);
+      c.output("Processed: " + element);
     }
   }
   // [END RateLimiterSimpleJava]
diff --git a/examples/java/src/main/java/org/apache/beam/examples/RegexExample.java b/examples/java/src/main/java/org/apache/beam/examples/RegexExample.java
index d80bc9f..a0d4677 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/RegexExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/RegexExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Regex;
 import org.apache.beam.sdk.values.PCollection;
@@ -77,9 +75,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/SampleExample.java b/examples/java/src/main/java/org/apache/beam/examples/SampleExample.java
index 8d9db0c..ed1d906 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/SampleExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/SampleExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sample;
 import org.apache.beam.sdk.values.KV;
@@ -78,9 +76,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/SumExample.java b/examples/java/src/main/java/org/apache/beam/examples/SumExample.java
index a571d8d..00fcc86 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/SumExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/SumExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.PCollection;
@@ -65,9 +63,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/SumPerKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/SumPerKeyExample.java
index a2c92ef..45d4a9f 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/SumPerKeyExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/SumPerKeyExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
@@ -69,9 +67,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/ToStringExample.java b/examples/java/src/main/java/org/apache/beam/examples/ToStringExample.java
index 54e7832..23e3db6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/ToStringExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/ToStringExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ToString;
 import org.apache.beam.sdk.values.KV;
@@ -76,9 +74,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/TopExample.java b/examples/java/src/main/java/org/apache/beam/examples/TopExample.java
index f602aa3..520af0f 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/TopExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/TopExample.java
@@ -23,8 +23,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.values.PCollection;
@@ -66,9 +64,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/ValuesExample.java b/examples/java/src/main/java/org/apache/beam/examples/ValuesExample.java
index 1b9839a..3fc9e84 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/ValuesExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/ValuesExample.java
@@ -22,8 +22,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.values.KV;
@@ -72,9 +70,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/ViewExample.java b/examples/java/src/main/java/org/apache/beam/examples/ViewExample.java
index d40d6cc..01c1b1c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/ViewExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/ViewExample.java
@@ -23,9 +23,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.SideInput;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.values.KV;
@@ -86,8 +83,10 @@
                       @ProcessElement
                       public void processElement(
                           @Element KV<String, String> person,
-                          @SideInput("citiesToCountries") Map<String, String> citiesToCountries,
-                          OutputReceiver<KV<String, String>> out) {
+                          OutputReceiver<KV<String, String>> out,
+                          ProcessContext context) {
+                        Map<String, String> citiesToCountries =
+                            context.sideInput(citiesToCountriesView);
                         String city = person.getValue();
                         String country = citiesToCountries.get(city);
                         if (country == null) {
@@ -96,7 +95,7 @@
                         out.output(KV.of(person.getKey(), country));
                       }
                     })
-                .withSideInput("citiesToCountries", citiesToCountriesView));
+                .withSideInputs(citiesToCountriesView));
     // [END main_section]
 
     output.apply("Log", ParDo.of(new LogOutput<>("Output: ")));
@@ -113,11 +112,9 @@
     }
 
     @ProcessElement
-    public void processElement(
-        @Element KV<String, String> element, OutputReceiver<KV<String, String>> receiver)
-        throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowExample.java b/examples/java/src/main/java/org/apache/beam/examples/WindowExample.java
index d6a3dff..244a42b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowExample.java
@@ -25,8 +25,6 @@
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -89,9 +87,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) throws Exception {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) throws Exception {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index 24a3c1b..1111404 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -52,8 +52,6 @@
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -133,11 +131,10 @@
                   ParDo.of(
                       new DoFn<KV<String, Long>, CompletionCandidate>() {
                         @ProcessElement
-                        public void processElement(
-                            @Element KV<String, Long> element,
-                            OutputReceiver<CompletionCandidate> receiver) {
-                          receiver.output(
-                              new CompletionCandidate(element.getKey(), element.getValue()));
+                        public void processElement(ProcessContext c) {
+                          c.output(
+                              new CompletionCandidate(
+                                  c.element().getKey(), c.element().getValue()));
                         }
                       }));
 
@@ -213,11 +210,9 @@
     private static class FlattenTops
         extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> {
       @ProcessElement
-      public void processElement(
-          @Element KV<String, List<CompletionCandidate>> element,
-          OutputReceiver<CompletionCandidate> receiver) {
-        for (CompletionCandidate cc : element.getValue()) {
-          receiver.output(cc);
+      public void processElement(ProcessContext c) {
+        for (CompletionCandidate cc : c.element().getValue()) {
+          c.output(cc);
         }
       }
     }
@@ -272,12 +267,10 @@
     }
 
     @ProcessElement
-    public void processElement(
-        @Element CompletionCandidate element,
-        OutputReceiver<KV<String, CompletionCandidate>> receiver) {
-      String word = element.value;
+    public void processElement(ProcessContext c) {
+      String word = c.element().value;
       for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
-        receiver.output(KV.of(word.substring(0, i), element));
+        c.output(KV.of(word.substring(0, i), c.element()));
       }
     }
   }
@@ -339,24 +332,23 @@
   /** Takes as input a set of strings, and emits each #hashtag found therein. */
   static class ExtractHashtags extends DoFn<String, String> {
     @ProcessElement
-    public void processElement(@Element String element, OutputReceiver<String> receiver) {
-      Matcher m = Pattern.compile("#\\S+").matcher(element);
+    public void processElement(ProcessContext c) {
+      Matcher m = Pattern.compile("#\\S+").matcher(c.element());
       while (m.find()) {
-        receiver.output(m.group().substring(1));
+        c.output(m.group().substring(1));
       }
     }
   }
 
   static class FormatForBigquery extends DoFn<KV<String, List<CompletionCandidate>>, TableRow> {
     @ProcessElement
-    public void processElement(
-        @Element KV<String, List<CompletionCandidate>> element, OutputReceiver<TableRow> receiver) {
+    public void processElement(ProcessContext c) {
       List<TableRow> completions = new ArrayList<>();
-      for (CompletionCandidate cc : element.getValue()) {
+      for (CompletionCandidate cc : c.element().getValue()) {
         completions.add(new TableRow().set("count", cc.getCount()).set("tag", cc.getValue()));
       }
-      TableRow row = new TableRow().set("prefix", element.getKey()).set("tags", completions);
-      receiver.output(row);
+      TableRow row = new TableRow().set("prefix", c.element().getKey()).set("tags", completions);
+      c.output(row);
     }
 
     /** Defines the BigQuery schema used for the output. */
@@ -394,16 +386,15 @@
     }
 
     @ProcessElement
-    public void processElement(
-        @Element KV<String, List<CompletionCandidate>> element, OutputReceiver<Entity> receiver) {
+    public void processElement(ProcessContext c) {
       Entity.Builder entityBuilder = Entity.newBuilder();
       com.google.datastore.v1.Key key =
-          makeKey(makeKey(kind, ancestorKey).build(), kind, element.getKey()).build();
+          makeKey(makeKey(kind, ancestorKey).build(), kind, c.element().getKey()).build();
 
       entityBuilder.setKey(key);
       List<Value> candidates = new ArrayList<>();
       Map<String, Value> properties = new HashMap<>();
-      for (CompletionCandidate tag : element.getValue()) {
+      for (CompletionCandidate tag : c.element().getValue()) {
         Entity.Builder tagEntity = Entity.newBuilder();
         properties.put("tag", makeValue(tag.value).build());
         properties.put("count", makeValue(tag.count).build());
@@ -411,7 +402,7 @@
       }
       properties.put("candidates", makeValue(candidates).build());
       entityBuilder.putAllProperties(properties);
-      receiver.output(entityBuilder.build());
+      c.output(entityBuilder.build());
     }
   }
 
@@ -536,12 +527,11 @@
                   ParDo.of(
                       new DoFn<KV<String, List<CompletionCandidate>>, Long>() {
                         @ProcessElement
-                        public void process(
-                            @Element KV<String, List<CompletionCandidate>> elm,
-                            OutputReceiver<Long> receiver) {
+                        public void process(ProcessContext c) {
+                          KV<String, List<CompletionCandidate>> elm = c.element();
                           Long listHash =
-                              elm.getValue().stream().mapToLong(cc -> cc.hashCode()).sum();
-                          receiver.output(Long.valueOf(elm.getKey().hashCode()) + listHash);
+                              c.element().getValue().stream().mapToLong(cc -> cc.hashCode()).sum();
+                          c.output(Long.valueOf(elm.getKey().hashCode()) + listHash);
                         }
                       }))
               .apply(Sum.longsGlobally());
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index ea5f2e5..e4ce5e3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -34,8 +34,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 
 /**
@@ -57,12 +55,12 @@
   /** A {@link DoFn} that tokenizes lines of text into individual words. */
   static class ExtractWords extends DoFn<String, String> {
     @ProcessElement
-    public void processElement(@Element String element, OutputReceiver<String> receiver) {
-      String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
+    public void processElement(ProcessContext c) {
+      String[] words = c.element().split(ExampleUtils.TOKENIZER_PATTERN, -1);
 
       for (String word : words) {
         if (!word.isEmpty()) {
-          receiver.output(word);
+          c.output(word);
         }
       }
     }
@@ -71,8 +69,8 @@
   /** A {@link DoFn} that uppercases a word. */
   static class Uppercase extends DoFn<String, String> {
     @ProcessElement
-    public void processElement(@Element String element, OutputReceiver<String> receiver) {
-      receiver.output(element.toUpperCase());
+    public void processElement(ProcessContext c) {
+      c.output(c.element().toUpperCase());
     }
   }
 
@@ -80,8 +78,8 @@
   static class StringToRowConverter extends DoFn<String, TableRow> {
     /** In this example, put the whole string into single BigQuery field. */
     @ProcessElement
-    public void processElement(@Element String element, OutputReceiver<TableRow> receiver) {
-      receiver.output(new TableRow().set("string_field", element));
+    public void processElement(ProcessContext c) {
+      c.output(new TableRow().set("string_field", c.element()));
     }
 
     static TableSchema getSchema() {
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index a61bd4c..b3e5fd0 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -56,9 +56,6 @@
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.SideInput;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Keys;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -239,11 +236,9 @@
               ParDo.of(
                   new DoFn<KV<URI, String>, KV<URI, String>>() {
                     @ProcessElement
-                    public void processElement(
-                        @Element KV<URI, String> element,
-                        OutputReceiver<KV<URI, String>> receiver) {
-                      URI uri = element.getKey();
-                      String line = element.getValue();
+                    public void processElement(ProcessContext c) {
+                      URI uri = c.element().getKey();
+                      String line = c.element().getValue();
                       for (String word : line.split("\\W+", -1)) {
                         // Log INFO messages when the word “love” is found.
                         if ("love".equalsIgnoreCase(word)) {
@@ -251,7 +246,7 @@
                         }
 
                         if (!word.isEmpty()) {
-                          receiver.output(KV.of(uri, word.toLowerCase()));
+                          c.output(KV.of(uri, word.toLowerCase()));
                         }
                       }
                     }
@@ -286,13 +281,11 @@
               ParDo.of(
                   new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
                     @ProcessElement
-                    public void processElement(
-                        @Element KV<KV<URI, String>, Long> element,
-                        OutputReceiver<KV<URI, KV<String, Long>>> receiver) {
-                      URI uri = element.getKey().getKey();
-                      String word = element.getKey().getValue();
-                      Long occurrences = element.getValue();
-                      receiver.output(KV.of(uri, KV.of(word, occurrences)));
+                    public void processElement(ProcessContext c) {
+                      URI uri = c.element().getKey().getKey();
+                      String word = c.element().getKey().getValue();
+                      Long occurrences = c.element().getValue();
+                      c.output(KV.of(uri, KV.of(word, occurrences)));
                     }
                   }));
 
@@ -329,18 +322,16 @@
               ParDo.of(
                   new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
                     @ProcessElement
-                    public void processElement(
-                        @Element KV<URI, CoGbkResult> element,
-                        OutputReceiver<KV<String, KV<URI, Double>>> receiver) {
-                      URI uri = element.getKey();
-                      Long wordTotal = element.getValue().getOnly(wordTotalsTag);
+                    public void processElement(ProcessContext c) {
+                      URI uri = c.element().getKey();
+                      Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
 
                       for (KV<String, Long> wordAndCount :
-                          element.getValue().getAll(wordCountsTag)) {
+                          c.element().getValue().getAll(wordCountsTag)) {
                         String word = wordAndCount.getKey();
                         Long wordCount = wordAndCount.getValue();
                         Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue();
-                        receiver.output(KV.of(word, KV.of(uri, termFrequency)));
+                        c.output(KV.of(word, KV.of(uri, termFrequency)));
                       }
                     }
                   }));
@@ -357,19 +348,17 @@
               ParDo.of(
                       new DoFn<KV<String, Long>, KV<String, Double>>() {
                         @ProcessElement
-                        public void processElement(
-                            @SideInput("totalDocuments") Long documentTotal,
-                            @Element KV<String, Long> element,
-                            OutputReceiver<KV<String, Double>> receiver) {
-                          String word = element.getKey();
-                          Long documentCount = element.getValue();
+                        public void processElement(ProcessContext c) {
+                          String word = c.element().getKey();
+                          Long documentCount = c.element().getValue();
+                          Long documentTotal = c.sideInput(totalDocuments);
                           Double documentFrequency =
                               documentCount.doubleValue() / documentTotal.doubleValue();
 
-                          receiver.output(KV.of(word, documentFrequency));
+                          c.output(KV.of(word, documentFrequency));
                         }
                       })
-                  .withSideInput("totalDocuments", totalDocuments));
+                  .withSideInputs(totalDocuments));
 
       // Join the term frequency and document frequency
       // collections, each keyed on the word.
@@ -391,17 +380,15 @@
               ParDo.of(
                   new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
                     @ProcessElement
-                    public void processElement(
-                        @Element KV<String, CoGbkResult> element,
-                        OutputReceiver<KV<String, KV<URI, Double>>> receiver) {
-                      String word = element.getKey();
-                      Double df = element.getValue().getOnly(dfTag);
+                    public void processElement(ProcessContext c) {
+                      String word = c.element().getKey();
+                      Double df = c.element().getValue().getOnly(dfTag);
 
-                      for (KV<URI, Double> uriAndTf : element.getValue().getAll(tfTag)) {
+                      for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) {
                         URI uri = uriAndTf.getKey();
                         Double tf = uriAndTf.getValue();
                         Double tfIdf = tf * Math.log(1 / df);
-                        receiver.output(KV.of(word, KV.of(uri, tfIdf)));
+                        c.output(KV.of(word, KV.of(uri, tfIdf)));
                       }
                     }
                   }));
@@ -432,15 +419,13 @@
               ParDo.of(
                   new DoFn<KV<String, KV<URI, Double>>, String>() {
                     @ProcessElement
-                    public void processElement(
-                        @Element KV<String, KV<URI, Double>> element,
-                        OutputReceiver<String> receiver) {
-                      receiver.output(
+                    public void processElement(ProcessContext c) {
+                      c.output(
                           String.format(
                               "%s,\t%s,\t%f",
-                              element.getKey(),
-                              element.getValue().getKey(),
-                              element.getValue().getValue()));
+                              c.element().getKey(),
+                              c.element().getValue().getKey(),
+                              c.element().getValue().getValue()));
                     }
                   }))
           .apply(TextIO.write().to(output).withSuffix(".csv"));
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index d156777..b06cd8d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -48,8 +48,6 @@
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -95,7 +93,8 @@
   /** Extracts user and timestamp from a TableRow representing a Wikipedia edit. */
   static class ExtractUserAndTimestamp extends DoFn<TableRow, String> {
     @ProcessElement
-    public void processElement(@Element TableRow row, OutputReceiver<String> receiver) {
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
       int timestamp;
       // TODO(BEAM-5390): Avoid this workaround.
       try {
@@ -106,7 +105,7 @@
       String userName = (String) row.get("contributor_username");
       if (userName != null) {
         // Sets the implicit timestamp field to be used in windowing.
-        receiver.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
+        c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
       }
     }
   }
@@ -144,24 +143,18 @@
 
   static class SessionsToStringsDoFn extends DoFn<KV<String, Long>, KV<String, Long>> {
     @ProcessElement
-    public void processElement(
-        BoundedWindow window,
-        @Element KV<String, Long> element,
-        OutputReceiver<KV<String, Long>> receiver) {
-      receiver.output(KV.of(element.getKey() + " : " + window, element.getValue()));
+    public void processElement(ProcessContext c, BoundedWindow window) {
+      c.output(KV.of(c.element().getKey() + " : " + window, c.element().getValue()));
     }
   }
 
   static class FormatOutputDoFn extends DoFn<List<KV<String, Long>>, String> {
     @ProcessElement
-    public void processElement(
-        BoundedWindow window,
-        @Element List<KV<String, Long>> element,
-        OutputReceiver<String> receiver) {
-      for (KV<String, Long> item : element) {
+    public void processElement(ProcessContext c, BoundedWindow window) {
+      for (KV<String, Long> item : c.element()) {
         String session = item.getKey();
         long count = item.getValue();
-        receiver.output(session + " : " + count + " : " + ((IntervalWindow) window).start());
+        c.output(session + " : " + count + " : " + ((IntervalWindow) window).start());
       }
     }
   }
@@ -194,11 +187,10 @@
               ParDo.of(
                   new DoFn<String, String>() {
                     @ProcessElement
-                    public void processElement(
-                        @Element String element, OutputReceiver<String> receiver) {
-                      if (Math.abs((long) element.hashCode())
+                    public void processElement(ProcessContext c) {
+                      if (Math.abs((long) c.element().hashCode())
                           <= Integer.MAX_VALUE * samplingThreshold) {
-                        receiver.output(element);
+                        c.output(c.element());
                       }
                     }
                   }))
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index b22e3fe..6f75e2e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -58,9 +58,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -185,14 +182,13 @@
         DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
 
     @ProcessElement
-    public void processElement(@Element String element, OutputReceiver<String> receiver)
-        throws Exception {
-      String[] items = element.split(",", -1);
+    public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
+      String[] items = c.element().split(",", -1);
 
       if (items.length > 0) {
         try {
           String timestamp = items[0];
-          receiver.outputWithTimestamp(element, new Instant(dateTimeFormat.parseMillis(timestamp)));
+          c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
         } catch (IllegalArgumentException e) {
           // Skip the invalid input.
         }
@@ -210,9 +206,8 @@
   static class ExtractFlowInfoFn extends DoFn<String, KV<String, LaneInfo>> {
 
     @ProcessElement
-    public void processElement(
-        @Element String element, OutputReceiver<KV<String, LaneInfo>> receiver) {
-      String[] items = element.split(",", -1);
+    public void processElement(ProcessContext c) {
+      String[] items = c.element().split(",", -1);
       if (items.length < 48) {
         // Skip the invalid input.
         return;
@@ -241,7 +236,7 @@
                 laneAvgOccupancy,
                 laneAvgSpeed,
                 totalFlow);
-        receiver.output(KV.of(stationId, laneInfo));
+        c.output(KV.of(stationId, laneInfo));
       }
     }
   }
@@ -275,15 +270,12 @@
    */
   static class FormatMaxesFn extends DoFn<KV<String, LaneInfo>, TableRow> {
     @ProcessElement
-    public void processElement(
-        @Element KV<String, LaneInfo> element,
-        @Timestamp Instant timestamp,
-        OutputReceiver<TableRow> receiver) {
+    public void processElement(ProcessContext c) {
 
-      LaneInfo laneInfo = element.getValue();
+      LaneInfo laneInfo = c.element().getValue();
       TableRow row =
           new TableRow()
-              .set("station_id", element.getKey())
+              .set("station_id", c.element().getKey())
               .set("direction", laneInfo.getDirection())
               .set("freeway", laneInfo.getFreeway())
               .set("lane_max_flow", laneInfo.getLaneFlow())
@@ -292,8 +284,8 @@
               .set("avg_speed", laneInfo.getLaneAS())
               .set("total_flow", laneInfo.getTotalFlow())
               .set("recorded_timestamp", laneInfo.getRecordedTimestamp())
-              .set("window_timestamp", timestamp.toString());
-      receiver.output(row);
+              .set("window_timestamp", c.timestamp().toString());
+      c.output(row);
     }
 
     /** Defines the BigQuery schema used for the output. */
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index 38ba432..9584156 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -63,9 +63,6 @@
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -190,13 +187,12 @@
         DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
 
     @ProcessElement
-    public void processElement(@Element String element, OutputReceiver<String> receiver)
-        throws Exception {
-      String[] items = element.split(",");
+    public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
+      String[] items = c.element().split(",");
       String timestamp = tryParseTimestamp(items);
       if (timestamp != null) {
         try {
-          receiver.outputWithTimestamp(element, new Instant(dateTimeFormat.parseMillis(timestamp)));
+          c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
         } catch (IllegalArgumentException e) {
           // Skip the invalid input.
         }
@@ -211,11 +207,8 @@
   static class ExtractStationSpeedFn extends DoFn<String, KV<String, StationSpeed>> {
 
     @ProcessElement
-    public void processElement(
-        @Timestamp Instant timestamp,
-        @Element String element,
-        OutputReceiver<KV<String, StationSpeed>> receiver) {
-      String[] items = element.split(",");
+    public void processElement(ProcessContext c) {
+      String[] items = c.element().split(",");
       String stationType = tryParseStationType(items);
       // For this analysis, use only 'main line' station types
       if ("ML".equals(stationType)) {
@@ -223,10 +216,11 @@
         String stationId = tryParseStationId(items);
         // For this simple example, filter out everything but some hardwired routes.
         if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
-          StationSpeed stationSpeed = new StationSpeed(stationId, avgSpeed, timestamp.getMillis());
+          StationSpeed stationSpeed =
+              new StationSpeed(stationId, avgSpeed, c.timestamp().getMillis());
           // The tuple key is the 'route' name stored in the 'sdStations' hash.
           KV<String, StationSpeed> outputValue = KV.of(sdStations.get(stationId), stationSpeed);
-          receiver.output(outputValue);
+          c.output(outputValue);
         }
       }
     }
@@ -240,16 +234,13 @@
    */
   static class GatherStats extends DoFn<KV<String, Iterable<StationSpeed>>, KV<String, RouteInfo>> {
     @ProcessElement
-    public void processElement(
-        @Element KV<String, Iterable<StationSpeed>> element,
-        OutputReceiver<KV<String, RouteInfo>> receiver)
-        throws IOException {
-      String route = element.getKey();
+    public void processElement(ProcessContext c) throws IOException {
+      String route = c.element().getKey();
       double speedSum = 0.0;
       int speedCount = 0;
       int speedups = 0;
       int slowdowns = 0;
-      List<StationSpeed> infoList = Lists.newArrayList(element.getValue());
+      List<StationSpeed> infoList = Lists.newArrayList(c.element().getValue());
       // StationSpeeds sort by embedded timestamp.
       Collections.sort(infoList);
       Map<String, Double> prevSpeeds = new HashMap<>();
@@ -277,25 +268,22 @@
       double speedAvg = speedSum / speedCount;
       boolean slowdownEvent = slowdowns >= 2 * speedups;
       RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
-      receiver.output(KV.of(route, routeInfo));
+      c.output(KV.of(route, routeInfo));
     }
   }
 
   /** Format the results of the slowdown calculations to a TableRow, to save to BigQuery. */
   static class FormatStatsFn extends DoFn<KV<String, RouteInfo>, TableRow> {
     @ProcessElement
-    public void processElement(
-        @Element KV<String, RouteInfo> element,
-        @Timestamp Instant timestamp,
-        OutputReceiver<TableRow> receiver) {
-      RouteInfo routeInfo = element.getValue();
+    public void processElement(ProcessContext c) {
+      RouteInfo routeInfo = c.element().getValue();
       TableRow row =
           new TableRow()
               .set("avg_speed", routeInfo.getAvgSpeed())
               .set("slowdown_event", routeInfo.getSlowdownEvent())
-              .set("route", element.getKey())
-              .set("window_timestamp", timestamp.toString());
-      receiver.output(row);
+              .set("route", c.element().getKey())
+              .set("window_timestamp", c.timestamp().toString());
+      c.output(row);
     }
 
     /** Defines the BigQuery schema used for the output. */
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java
index 6e5321d..cf097c8 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java
@@ -37,9 +37,6 @@
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.GroupIntoBatches;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -202,24 +199,20 @@
 
     @ProcessElement
     @SuppressWarnings("argument")
-    public void process(
-        @Element KV<Integer, Iterable<Row>> element,
-        OutputReceiver<Row> mainReceiver,
-        MultiOutputReceiver multiReceiver) {
+    public void process(@Element KV<Integer, Iterable<Row>> element, ProcessContext context) {
       Iterable<Row> rows = element.getValue();
 
       try {
         for (Row outputRow : getTokenizedRow(rows)) {
-          mainReceiver.output(outputRow);
+          context.output(outputRow);
         }
       } catch (Exception e) {
         for (Row outputRow : rows) {
-          multiReceiver
-              .get(failureTag)
-              .output(
-                  FailsafeElement.of(outputRow, outputRow)
-                      .setErrorMessage(e.getMessage())
-                      .setStacktrace(Throwables.getStackTraceAsString(e)));
+          context.output(
+              failureTag,
+              FailsafeElement.of(outputRow, outputRow)
+                  .setErrorMessage(e.getMessage())
+                  .setStacktrace(Throwables.getStackTraceAsString(e)));
         }
       }
     }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java
index 667e26f..fe8f4c1 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java
@@ -26,8 +26,6 @@
 import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
@@ -92,8 +90,9 @@
   public static class RowToTableRowFn extends DoFn<Row, TableRow> {
 
     @ProcessElement
-    public void processElement(@Element Row row, OutputReceiver<TableRow> receiver) {
-      receiver.output(BigQueryUtils.toTableRow(row));
+    public void processElement(ProcessContext context) {
+      Row row = context.element();
+      context.output(BigQueryUtils.toTableRow(row));
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java
index 435620a..d7d1c3e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java
@@ -75,10 +75,8 @@
 
     @ProcessElement
     public void processElement(
-        @Element Row in,
-        OutputReceiver<KV<ByteString, Iterable<Mutation>>> out,
-        PipelineOptions pipelineOptions) {
-      DataTokenizationOptions options = pipelineOptions.as(DataTokenizationOptions.class);
+        @Element Row in, OutputReceiver<KV<ByteString, Iterable<Mutation>>> out, ProcessContext c) {
+      DataTokenizationOptions options = c.getPipelineOptions().as(DataTokenizationOptions.class);
       // Mapping every field in provided Row to Mutation.SetCell, which will create/update
       // cell content with provided data
       Set<Mutation> mutations =
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java
index 9fead4e..d827e4b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java
@@ -264,6 +264,8 @@
     @Override
     public PCollectionTuple expand(PCollectionTuple lines) {
 
+      PCollectionView<String> headersView = null;
+
       // Convert csv lines into Failsafe elements so that we can recover over multiple transforms.
       PCollection<FailsafeElement<String, String>> lineFailsafeElements =
           lines
@@ -283,14 +285,16 @@
 
         return lineFailsafeElements.apply(
             "LineToDocumentUsingSchema",
-            ParDo.of(new FailsafeElementToJsonFn(schema, delimiter(), udfDeadletterTag()))
+            ParDo.of(
+                    new FailsafeElementToJsonFn(
+                        headersView, schema, delimiter(), udfDeadletterTag()))
                 .withOutputTags(udfOutputTag(), TupleTagList.of(udfDeadletterTag())));
       }
 
       // Run if using headers
-      PCollectionView<String> headersView =
-          lines.get(headerTag()).apply(Sample.any(1)).apply(View.asSingleton());
+      headersView = lines.get(headerTag()).apply(Sample.any(1)).apply(View.asSingleton());
 
+      PCollectionView<String> finalHeadersView = headersView;
       lines
           .get(headerTag())
           .apply(
@@ -298,24 +302,23 @@
               ParDo.of(
                       new DoFn<String, String>() {
                         @ProcessElement
-                        public void processElement(
-                            @SideInput("finalHeadersView") String headers,
-                            @Element String element) {
-                          if (!element.equals(headers)) {
+                        public void processElement(ProcessContext c) {
+                          String headers = c.sideInput(finalHeadersView);
+                          if (!c.element().equals(headers)) {
                             LOG.error("Headers do not match, consistency cannot be guaranteed");
                             throw new RuntimeException(
                                 "Headers do not match, consistency cannot be guaranteed");
                           }
                         }
                       })
-                  .withSideInput("finalHeadersView", headersView));
+                  .withSideInputs(finalHeadersView));
 
       return lineFailsafeElements.apply(
           "LineToDocumentWithHeaders",
           ParDo.of(
-                  new FailsafeElementToJsonWithHeadersFn(
-                      jsonSchemaPath(), delimiter(), udfDeadletterTag()))
-              .withSideInput("finalHeadersView", headersView)
+                  new FailsafeElementToJsonFn(
+                      headersView, jsonSchemaPath(), delimiter(), udfDeadletterTag()))
+              .withSideInputs(headersView)
               .withOutputTags(udfOutputTag(), TupleTagList.of(udfDeadletterTag())));
     }
 
@@ -354,90 +357,45 @@
     @Nullable public final String jsonSchema;
     public final String delimiter;
     public final TupleTag<FailsafeElement<String, String>> udfDeadletterTag;
+    @Nullable private final PCollectionView<String> headersView;
     private Counter successCounter =
         Metrics.counter(FailsafeElementToJsonFn.class, SUCCESSFUL_TO_JSON_COUNTER);
     private Counter failedCounter =
         Metrics.counter(FailsafeElementToJsonFn.class, FAILED_TO_JSON_COUNTER);
 
     FailsafeElementToJsonFn(
+        PCollectionView<String> headersView,
         String jsonSchema,
         String delimiter,
         TupleTag<FailsafeElement<String, String>> udfDeadletterTag) {
+      this.headersView = headersView;
       this.jsonSchema = jsonSchema;
       this.delimiter = delimiter;
       this.udfDeadletterTag = udfDeadletterTag;
     }
 
     @ProcessElement
-    public void processElement(
-        @Element FailsafeElement<String, String> element,
-        OutputReceiver<FailsafeElement<String, String>> receiver,
-        MultiOutputReceiver multiReceiver) {
-      List<String> header = null;
-      List<String> record = Arrays.asList(element.getOriginalPayload().split(this.delimiter));
-
-      try {
-        String json = buildJsonString(header, record, this.jsonSchema);
-        receiver.output(FailsafeElement.of(element.getOriginalPayload(), json));
-        successCounter.inc();
-      } catch (Exception e) {
-        failedCounter.inc();
-        multiReceiver
-            .get(this.udfDeadletterTag)
-            .output(
-                FailsafeElement.of(element)
-                    .setErrorMessage(e.getMessage())
-                    .setStacktrace(Throwables.getStackTraceAsString(e)));
-      }
-    }
-  }
-
-  public static class FailsafeElementToJsonWithHeadersFn
-      extends DoFn<FailsafeElement<String, String>, FailsafeElement<String, String>> {
-
-    @Nullable public final String jsonSchema;
-    public final String delimiter;
-    public final TupleTag<FailsafeElement<String, String>> udfDeadletterTag;
-    private Counter successCounter =
-        Metrics.counter(FailsafeElementToJsonWithHeadersFn.class, SUCCESSFUL_TO_JSON_COUNTER);
-    private Counter failedCounter =
-        Metrics.counter(FailsafeElementToJsonWithHeadersFn.class, FAILED_TO_JSON_COUNTER);
-
-    FailsafeElementToJsonWithHeadersFn(
-        String jsonSchema,
-        String delimiter,
-        TupleTag<FailsafeElement<String, String>> udfDeadletterTag) {
-      this.jsonSchema = jsonSchema;
-      this.delimiter = delimiter;
-      this.udfDeadletterTag = udfDeadletterTag;
-    }
-
-    @ProcessElement
-    public void processElement(
-        @Element FailsafeElement<String, String> element,
-        OutputReceiver<FailsafeElement<String, String>> receiver,
-        MultiOutputReceiver multiReceiver,
-        @SideInput("finalHeadersView") String headersStr) {
+    public void processElement(ProcessContext context) {
+      FailsafeElement<String, String> element = context.element();
       List<String> header = null;
 
-      if (headersStr != null) {
-        header = Arrays.asList(headersStr.split(this.delimiter));
+      if (this.headersView != null) {
+        header = Arrays.asList(context.sideInput(this.headersView).split(this.delimiter));
       }
 
       List<String> record = Arrays.asList(element.getOriginalPayload().split(this.delimiter));
 
       try {
         String json = buildJsonString(header, record, this.jsonSchema);
-        receiver.output(FailsafeElement.of(element.getOriginalPayload(), json));
+        context.output(FailsafeElement.of(element.getOriginalPayload(), json));
         successCounter.inc();
       } catch (Exception e) {
         failedCounter.inc();
-        multiReceiver
-            .get(this.udfDeadletterTag)
-            .output(
-                FailsafeElement.of(element)
-                    .setErrorMessage(e.getMessage())
-                    .setStacktrace(Throwables.getStackTraceAsString(e)));
+        context.output(
+            this.udfDeadletterTag,
+            FailsafeElement.of(element)
+                .setErrorMessage(e.getMessage())
+                .setStacktrace(Throwables.getStackTraceAsString(e)));
       }
     }
   }
@@ -449,9 +407,9 @@
   static class LineToFailsafeElementFn extends DoFn<String, FailsafeElement<String, String>> {
 
     @ProcessElement
-    public void processElement(
-        @Element String message, OutputReceiver<FailsafeElement<String, String>> receiver) {
-      receiver.output(FailsafeElement.of(message, message));
+    public void processElement(ProcessContext context) {
+      String message = context.element();
+      context.output(FailsafeElement.of(message, message));
     }
   }
 
@@ -552,12 +510,13 @@
     }
 
     @ProcessElement
-    public void processElement(@Element ReadableFile file, MultiOutputReceiver outputReceiver) {
+    public void processElement(ProcessContext context, MultiOutputReceiver outputReceiver) {
+      ReadableFile f = context.element();
       String headers;
       List<String> records = null;
       String delimiter = String.valueOf(this.csvFormat.getDelimiter());
       try {
-        String csvFileString = file.readFullyAsUTF8String();
+        String csvFileString = f.readFullyAsUTF8String();
         StringReader reader = new StringReader(csvFileString);
         CSVParser parser = CSVParser.parse(reader, this.csvFormat.withFirstRecordAsHeader());
         records =
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java
index 99981cb..01377ad 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java
@@ -29,9 +29,6 @@
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -46,7 +43,6 @@
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Duration;
-import org.joda.time.Instant;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
@@ -127,17 +123,16 @@
     }
 
     @ProcessElement
-    public void processElement(
-        @Element FailsafeElement<String, String> failsafeElement,
-        @Timestamp Instant timestamp,
-        OutputReceiver<String> receiver) {
+    public void processElement(ProcessContext context) {
+      FailsafeElement<String, String> failsafeElement = context.element();
       ArrayList<String> outputRow = new ArrayList<>();
       final String message = failsafeElement.getOriginalPayload();
 
       // Format the timestamp for insertion
-      String timestampStr = TIMESTAMP_FORMATTER.print(timestamp.toDateTime(DateTimeZone.UTC));
+      String timestamp =
+          TIMESTAMP_FORMATTER.print(context.timestamp().toDateTime(DateTimeZone.UTC));
 
-      outputRow.add(timestampStr);
+      outputRow.add(timestamp);
       outputRow.add(MoreObjects.firstNonNull(failsafeElement.getErrorMessage(), ""));
 
       // Only set the payload if it's populated on the message.
@@ -145,7 +140,7 @@
         outputRow.add(message);
       }
 
-      receiver.output(String.join(csvDelimiter, outputRow));
+      context.output(String.join(csvDelimiter, outputRow));
     }
   }
 
@@ -203,20 +198,19 @@
         DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
 
     @ProcessElement
-    public void processElement(
-        @Timestamp Instant timestamp,
-        @Element FailsafeElement<String, String> failsafeElement,
-        OutputReceiver<TableRow> receiver) {
+    public void processElement(ProcessContext context) {
+      FailsafeElement<String, String> failsafeElement = context.element();
       final String message = failsafeElement.getOriginalPayload();
 
       // Format the timestamp for insertion
-      String timestampStr = TIMESTAMP_FORMATTER.print(timestamp.toDateTime(DateTimeZone.UTC));
+      String timestamp =
+          TIMESTAMP_FORMATTER.print(context.timestamp().toDateTime(DateTimeZone.UTC));
 
       // Build the table row
       @SuppressWarnings("nullness") // TableRow.set not annotated but does accept nulls
       final TableRow failedRow =
           new TableRow()
-              .set("timestamp", timestampStr)
+              .set("timestamp", timestamp)
               .set("errorMessage", failsafeElement.getErrorMessage())
               .set("stacktrace", failsafeElement.getStacktrace());
 
@@ -227,7 +221,7 @@
             .set("payloadBytes", message.getBytes(StandardCharsets.UTF_8));
       }
 
-      receiver.output(failedRow);
+      context.output(failedRow);
     }
   }
 
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 0018201..a3ed04b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -34,9 +34,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.SideInput;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.Mean;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -129,23 +126,21 @@
                             Metrics.counter("main", "SpammerUsers");
 
                         @ProcessElement
-                        public void processElement(
-                            @SideInput("globalMeanScore") Double gmc,
-                            @Element KV<String, Integer> element,
-                            OutputReceiver<KV<String, Integer>> receiver) {
-                          Integer score = element.getValue();
+                        public void processElement(ProcessContext c) {
+                          Integer score = c.element().getValue();
+                          Double gmc = c.sideInput(globalMeanScore);
                           if (score > (gmc * SCORE_WEIGHT)) {
                             LOG.info(
                                 "user {} spammer score {} with mean {}",
-                                element.getKey(),
+                                c.element().getKey(),
                                 score,
                                 gmc);
                             numSpammerUsers.inc();
-                            receiver.output(element);
+                            c.output(c.element());
                           }
                         }
                       })
-                  .withSideInput("globalMeanScore", globalMeanScore));
+                  .withSideInputs(globalMeanScore));
       return filtered;
     }
   }
@@ -154,10 +149,10 @@
   /** Calculate and output an element's session duration. */
   private static class UserSessionInfoFn extends DoFn<KV<String, Integer>, Integer> {
     @ProcessElement
-    public void processElement(BoundedWindow window, OutputReceiver<Integer> receiver) {
+    public void processElement(ProcessContext c, BoundedWindow window) {
       IntervalWindow w = (IntervalWindow) window;
       int duration = new Duration(w.start(), w.end()).toPeriod().toStandardMinutes().getMinutes();
-      receiver.output(duration);
+      c.output(duration);
     }
   }
 
@@ -197,21 +192,22 @@
       configureWindowedWrite() {
     Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<>();
     tableConfigure.put(
-        "team", new WriteToBigQuery.FieldInfo<>("STRING", (e, w, t, p) -> e.getKey()));
+        "team", new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey()));
     tableConfigure.put(
-        "total_score", new WriteToBigQuery.FieldInfo<>("INTEGER", (e, w, t, p) -> e.getValue()));
+        "total_score",
+        new WriteToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue()));
     tableConfigure.put(
         "window_start",
         new WriteToBigQuery.FieldInfo<>(
             "STRING",
-            (e, w, t, p) -> {
+            (c, w) -> {
               IntervalWindow window = (IntervalWindow) w;
               return GameConstants.DATE_TIME_FORMATTER.print(window.start());
             }));
     tableConfigure.put(
         "processing_time",
         new WriteToBigQuery.FieldInfo<>(
-            "STRING", (e, w, t, p) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
+            "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
     return tableConfigure;
   }
 
@@ -226,12 +222,12 @@
         "window_start",
         new WriteToBigQuery.FieldInfo<>(
             "STRING",
-            (e, w, t, p) -> {
+            (c, w) -> {
               IntervalWindow window = (IntervalWindow) w;
               return GameConstants.DATE_TIME_FORMATTER.print(window.start());
             }));
     tableConfigure.put(
-        "mean_duration", new WriteToBigQuery.FieldInfo<>("FLOAT", (e, w, t, p) -> e));
+        "mean_duration", new WriteToBigQuery.FieldInfo<>("FLOAT", (c, w) -> c.element()));
     return tableConfigure;
   }
 
@@ -292,17 +288,14 @@
             ParDo.of(
                     new DoFn<GameActionInfo, GameActionInfo>() {
                       @ProcessElement
-                      public void processElement(
-                          @SideInput("spammersView") Map<String, Integer> spammers,
-                          @Element GameActionInfo element,
-                          OutputReceiver<GameActionInfo> receiver) {
+                      public void processElement(ProcessContext c) {
                         // If the user is not in the spammers Map, output the data element.
-                        if (spammers.get(element.getUser().trim()) == null) {
-                          receiver.output(element);
+                        if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
+                          c.output(c.element());
                         }
                       }
                     })
-                .withSideInput("spammersView", spammersView))
+                .withSideInputs(spammersView))
         // Extract and sum teamname/score pairs from the event data.
         .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
         // [END DocInclude_FilterAndCalc]
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index a94f699..c57d9ba 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -112,11 +112,11 @@
    */
   protected static Map<String, WriteToText.FieldFn<KV<String, Integer>>> configureOutput() {
     Map<String, WriteToText.FieldFn<KV<String, Integer>>> config = new HashMap<>();
-    config.put("team", (e, w, t, p) -> e.getKey());
-    config.put("total_score", (e, w, t, p) -> e.getValue());
+    config.put("team", (c, w) -> c.element().getKey());
+    config.put("total_score", (c, w) -> c.element().getValue());
     config.put(
         "window_start",
-        (e, w, t, p) -> {
+        (c, w) -> {
           IntervalWindow window = (IntervalWindow) w;
           return GameConstants.DATE_TIME_FORMATTER.print(window.start());
         });
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 3750b33..832c0ad 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -135,24 +135,25 @@
 
     Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<>();
     tableConfigure.put(
-        "team", new WriteToBigQuery.FieldInfo<>("STRING", (e, w, t, p) -> e.getKey()));
+        "team", new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey()));
     tableConfigure.put(
-        "total_score", new WriteToBigQuery.FieldInfo<>("INTEGER", (e, w, t, p) -> e.getValue()));
+        "total_score",
+        new WriteToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue()));
     tableConfigure.put(
         "window_start",
         new WriteToBigQuery.FieldInfo<>(
             "STRING",
-            (e, w, t, p) -> {
+            (c, w) -> {
               IntervalWindow window = (IntervalWindow) w;
               return GameConstants.DATE_TIME_FORMATTER.print(window.start());
             }));
     tableConfigure.put(
         "processing_time",
         new WriteToBigQuery.FieldInfo<>(
-            "STRING", (e, w, t, p) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
+            "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
     tableConfigure.put(
         "timing",
-        new WriteToBigQuery.FieldInfo<>("STRING", (e, w, t, p) -> p.getTiming().toString()));
+        new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.pane().getTiming().toString()));
     return tableConfigure;
   }
 
@@ -164,9 +165,10 @@
       configureBigQueryWrite() {
     Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<>();
     tableConfigure.put(
-        "user", new WriteToBigQuery.FieldInfo<>("STRING", (e, w, t, p) -> e.getKey()));
+        "user", new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey()));
     tableConfigure.put(
-        "total_score", new WriteToBigQuery.FieldInfo<>("INTEGER", (e, w, t, p) -> e.getValue()));
+        "total_score",
+        new WriteToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue()));
     return tableConfigure;
   }
 
@@ -182,7 +184,7 @@
     tableConfigure.put(
         "processing_time",
         new WriteToBigQuery.FieldInfo<>(
-            "STRING", (e, w, t, p) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
+            "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
     return tableConfigure;
   }
 
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java
index 3caa1e6..b28db26 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java
@@ -37,8 +37,6 @@
 import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
@@ -103,12 +101,12 @@
   private static Map<String, FieldInfo<KV<String, Integer>>> configureCompleteWindowedTableWrite() {
 
     Map<String, FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<>();
-    tableConfigure.put("team", new FieldInfo<>("STRING", (e, w, t, p) -> e.getKey()));
-    tableConfigure.put("total_score", new FieldInfo<>("INTEGER", (e, w, t, p) -> e.getValue()));
+    tableConfigure.put("team", new FieldInfo<>("STRING", (c, w) -> c.element().getKey()));
+    tableConfigure.put("total_score", new FieldInfo<>("INTEGER", (c, w) -> c.element().getValue()));
     tableConfigure.put(
         "processing_time",
         new FieldInfo<>(
-            "STRING", (e, w, t, p) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
+            "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
     return tableConfigure;
   }
 
@@ -205,11 +203,9 @@
      */
     @ProcessElement
     public void processElement(
-        @Element KV<String, GameActionInfo> element,
-        @StateId(TOTAL_SCORE) ValueState<Integer> totalScore,
-        OutputReceiver<KV<String, Integer>> receiver) {
-      String teamName = element.getKey();
-      GameActionInfo gInfo = element.getValue();
+        ProcessContext c, @StateId(TOTAL_SCORE) ValueState<Integer> totalScore) {
+      String teamName = c.element().getKey();
+      GameActionInfo gInfo = c.element().getValue();
 
       // ValueState cells do not contain a default value. If the state is possibly not written, make
       // sure to check for null on read.
@@ -222,7 +218,7 @@
       // the new total is 2002, and the threshold is 1000, 1999 / 1000 = 1, 2002 / 1000 = 2.
       // Therefore, this team passed the threshold.
       if (oldTotalScore / this.thresholdScore < totalScore.read() / this.thresholdScore) {
-        receiver.output(KV.of(teamName, totalScore.read()));
+        c.output(KV.of(teamName, totalScore.read()));
       }
     }
   }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 054ce7a..b30b466 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -34,8 +34,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -163,18 +161,18 @@
     private final Counter numParseErrors = Metrics.counter("main", "ParseErrors");
 
     @ProcessElement
-    public void processElement(@Element String element, OutputReceiver<GameActionInfo> receiver) {
-      String[] components = element.split(",", -1);
+    public void processElement(ProcessContext c) {
+      String[] components = c.element().split(",", -1);
       try {
         String user = components[0].trim();
         String team = components[1].trim();
         Integer score = Integer.parseInt(components[2].trim());
         Long timestamp = Long.parseLong(components[3].trim());
         GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp);
-        receiver.output(gInfo);
+        c.output(gInfo);
       } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
         numParseErrors.inc();
-        LOG.info("Parse error on {}", element, e);
+        LOG.info("Parse error on {}", c.element(), e);
       }
     }
   }
@@ -234,8 +232,8 @@
    */
   protected static Map<String, WriteToText.FieldFn<KV<String, Integer>>> configureOutput() {
     Map<String, WriteToText.FieldFn<KV<String, Integer>>> config = new HashMap<>();
-    config.put("user", (e, w, t, p) -> e.getKey());
-    config.put("total_score", (e, w, t, p) -> e.getValue());
+    config.put("user", (c, w) -> c.element().getKey());
+    config.put("total_score", (c, w) -> c.element().getValue());
     return config;
   }
 
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index 5486025..eef4bc9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -30,16 +30,11 @@
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.joda.time.Instant;
 
 /**
  * Generate, format, and write BigQuery table row information. Use provided information about the
@@ -69,11 +64,11 @@
   }
 
   /**
-   * A {@link Serializable} function from an element and {@link BoundedWindow} to the value for that
-   * field.
+   * A {@link Serializable} function from a {@link DoFn.ProcessContext} and {@link BoundedWindow} to
+   * the value for that field.
    */
   public interface FieldFn<InputT> extends Serializable {
-    Object apply(InputT element, BoundedWindow window, Instant timestamp, PaneInfo pane);
+    Object apply(DoFn<InputT, TableRow>.ProcessContext context, BoundedWindow window);
   }
 
   /** Define a class to hold information about output table field definitions. */
@@ -101,21 +96,16 @@
   protected class BuildRowFn extends DoFn<InputT, TableRow> {
 
     @ProcessElement
-    public void processElement(
-        @Element InputT element,
-        @Timestamp Instant timestamp,
-        PaneInfo pane,
-        BoundedWindow window,
-        OutputReceiver<TableRow> receiver) {
+    public void processElement(ProcessContext c, BoundedWindow window) {
 
       TableRow row = new TableRow();
       for (Map.Entry<String, FieldInfo<InputT>> entry : fieldInfo.entrySet()) {
         String key = entry.getKey();
         FieldInfo<InputT> fcnInfo = entry.getValue();
         FieldFn<InputT> fcn = fcnInfo.getFieldFn();
-        row.set(key, fcn.apply(element, window, timestamp, pane));
+        row.set(key, fcn.apply(c, window));
       }
-      receiver.output(row);
+      c.output(row);
     }
   }
 
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
index 3547116..330769e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
@@ -32,9 +32,6 @@
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -73,32 +70,26 @@
   }
 
   /**
-   * A {@link Serializable} function from an element and {@link BoundedWindow} to the value for that
-   * field.
+   * A {@link Serializable} function from a {@link DoFn.ProcessContext} and {@link BoundedWindow} to
+   * the value for that field.
    */
   public interface FieldFn<InputT> extends Serializable {
-    Object apply(
-        InputT element, BoundedWindow window, org.joda.time.Instant timestamp, PaneInfo pane);
+    Object apply(DoFn<InputT, String>.ProcessContext context, BoundedWindow window);
   }
 
   /** Convert each key/score pair into a row as specified by fieldFn. */
   protected class BuildRowFn extends DoFn<InputT, String> {
 
     @ProcessElement
-    public void processElement(
-        @Element InputT element,
-        @Timestamp org.joda.time.Instant timestamp,
-        PaneInfo pane,
-        BoundedWindow window,
-        OutputReceiver<String> receiver) {
+    public void processElement(ProcessContext c, BoundedWindow window) {
       List<String> fields = new ArrayList<>();
       for (Map.Entry<String, FieldFn<InputT>> entry : fieldFn.entrySet()) {
         String key = entry.getKey();
         FieldFn<InputT> fcn = entry.getValue();
-        fields.add(key + ": " + fcn.apply(element, window, timestamp, pane));
+        fields.add(key + ": " + fcn.apply(c, window));
       }
       String result = fields.stream().collect(Collectors.joining(", "));
-      receiver.output(result);
+      c.output(result);
     }
   }
 
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index 77a59f2..36fa18a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -24,12 +24,8 @@
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 
@@ -48,20 +44,15 @@
   /** Convert each key/score pair into a BigQuery TableRow. */
   protected class BuildRowFn extends DoFn<T, TableRow> {
     @ProcessElement
-    public void processElement(
-        @Element T element,
-        @Timestamp org.joda.time.Instant timestamp,
-        PaneInfo pane,
-        BoundedWindow window,
-        OutputReceiver<TableRow> receiver) {
+    public void processElement(ProcessContext c, BoundedWindow window) {
 
       TableRow row = new TableRow();
       for (Map.Entry<String, FieldInfo<T>> entry : fieldInfo.entrySet()) {
         String key = entry.getKey();
         FieldInfo<T> fcnInfo = entry.getValue();
-        row.set(key, fcnInfo.getFieldFn().apply(element, window, timestamp, pane));
+        row.set(key, fcnInfo.getFieldFn().apply(c, window));
       }
-      receiver.output(row);
+      c.output(row);
     }
   }
 
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryStreamingTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryStreamingTornadoes.java
index d662cde..3b2653b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryStreamingTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryStreamingTornadoes.java
@@ -32,9 +32,6 @@
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -88,9 +85,10 @@
    */
   static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
     @ProcessElement
-    public void processElement(@Element TableRow row, OutputReceiver<Integer> receiver) {
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
       if (Boolean.TRUE.equals(row.get("tornado"))) {
-        receiver.output(Integer.parseInt((String) row.get("month")));
+        c.output(Integer.parseInt((String) row.get("month")));
       }
     }
   }
@@ -101,16 +99,13 @@
    */
   static class FormatCountsFn extends DoFn<KV<Integer, Long>, TableRow> {
     @ProcessElement
-    public void processElement(
-        @Element KV<Integer, Long> element,
-        @Timestamp Instant timestamp,
-        OutputReceiver<TableRow> receiver) {
+    public void processElement(ProcessContext c) {
       TableRow row =
           new TableRow()
-              .set("ts", timestamp.toString())
-              .set("month", element.getKey())
-              .set("tornado_count", element.getValue());
-      receiver.output(row);
+              .set("ts", c.timestamp().toString())
+              .set("month", c.element().getKey())
+              .set("tornado_count", c.element().getValue());
+      c.output(row);
     }
   }
 
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index 67e0661..43d720e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -32,8 +32,6 @@
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
@@ -81,9 +79,10 @@
    */
   static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
     @ProcessElement
-    public void processElement(@Element TableRow row, OutputReceiver<Integer> receiver) {
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
       if ((Boolean) row.get("tornado")) {
-        receiver.output(Integer.parseInt((String) row.get("month")));
+        c.output(Integer.parseInt((String) row.get("month")));
       }
     }
   }
@@ -94,11 +93,12 @@
    */
   static class FormatCountsFn extends DoFn<KV<Integer, Long>, TableRow> {
     @ProcessElement
-    public void processElement(
-        @Element KV<Integer, Long> element, OutputReceiver<TableRow> receiver) {
+    public void processElement(ProcessContext c) {
       TableRow row =
-          new TableRow().set("month", element.getKey()).set("tornado_count", element.getValue());
-      receiver.output(row);
+          new TableRow()
+              .set("month", c.element().getKey())
+              .set("tornado_count", c.element().getValue());
+      c.output(row);
     }
   }
 
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 5f5b5d0..2a581d7 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -33,8 +33,6 @@
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -80,11 +78,12 @@
     private final Counter smallerWords = Metrics.counter(ExtractLargeWordsFn.class, "smallerWords");
 
     @ProcessElement
-    public void processElement(@Element TableRow row, OutputReceiver<KV<String, String>> receiver) {
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
       String playName = (String) row.get("corpus");
       String word = (String) row.get("word");
       if (word.length() >= MIN_WORD_LENGTH) {
-        receiver.output(KV.of(word, playName));
+        c.output(KV.of(word, playName));
       } else {
         // Track how many smaller words we're not including. This information will be
         // visible in the Monitoring UI.
@@ -99,11 +98,10 @@
    */
   static class FormatShakespeareOutputFn extends DoFn<KV<String, String>, TableRow> {
     @ProcessElement
-    public void processElement(
-        @Element KV<String, String> element, OutputReceiver<TableRow> receiver) {
+    public void processElement(ProcessContext c) {
       TableRow row =
-          new TableRow().set("word", element.getKey()).set("all_plays", element.getValue());
-      receiver.output(row);
+          new TableRow().set("word", c.element().getKey()).set("all_plays", c.element().getValue());
+      c.output(row);
     }
   }
 
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index 26a6659..9187bb8 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -31,9 +31,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.SideInput;
 import org.apache.beam.sdk.transforms.Mean;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -90,7 +87,8 @@
    */
   static class ProjectionFn extends DoFn<TableRow, TableRow> {
     @ProcessElement
-    public void processElement(@Element TableRow row, OutputReceiver<TableRow> receiver) {
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
       // Grab year, month, day, mean_temp from the row
       Integer year = Integer.parseInt((String) row.get("year"));
       Integer month = Integer.parseInt((String) row.get("month"));
@@ -103,7 +101,7 @@
               .set("month", month)
               .set("day", day)
               .set("mean_temp", meanTemp);
-      receiver.output(outRow);
+      c.output(outRow);
     }
   }
 
@@ -121,11 +119,12 @@
     }
 
     @ProcessElement
-    public void processElement(@Element TableRow row, OutputReceiver<TableRow> receiver) {
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
       Integer month;
       month = (Integer) row.get("month");
       if (month.equals(this.monthFilter)) {
-        receiver.output(row);
+        c.output(row);
       }
     }
   }
@@ -136,9 +135,10 @@
    */
   static class ExtractTempFn extends DoFn<TableRow, Double> {
     @ProcessElement
-    public void processElement(@Element TableRow row, OutputReceiver<Double> receiver) {
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
       Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
-      receiver.output(meanTemp);
+      c.output(meanTemp);
     }
   }
 
@@ -178,17 +178,16 @@
               ParDo.of(
                       new DoFn<TableRow, TableRow>() {
                         @ProcessElement
-                        public void processElement(
-                            @SideInput("globalMeanTemp") Double gTemp,
-                            @Element TableRow element,
-                            OutputReceiver<TableRow> receiver) {
-                          Double meanTemp = Double.parseDouble(element.get("mean_temp").toString());
+                        public void processElement(ProcessContext c) {
+                          Double meanTemp =
+                              Double.parseDouble(c.element().get("mean_temp").toString());
+                          Double gTemp = c.sideInput(globalMeanTemp);
                           if (meanTemp < gTemp) {
-                            receiver.output(element);
+                            c.output(c.element());
                           }
                         }
                       })
-                  .withSideInput("globalMeanTemp", globalMeanTemp));
+                  .withSideInputs(globalMeanTemp));
 
       return filteredRows;
     }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index e6f8573..f78df0c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -26,8 +26,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -92,14 +90,13 @@
             ParDo.of(
                 new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
                   @ProcessElement
-                  public void processElement(
-                      @Element KV<String, CoGbkResult> element,
-                      OutputReceiver<KV<String, String>> receiver) {
-                    String countryCode = element.getKey();
-                    String countryName = element.getValue().getOnly(countryInfoTag);
-                    for (String eventInfo : element.getValue().getAll(eventInfoTag)) {
+                  public void processElement(ProcessContext c) {
+                    KV<String, CoGbkResult> e = c.element();
+                    String countryCode = e.getKey();
+                    String countryName = e.getValue().getOnly(countryInfoTag);
+                    for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) {
                       // Generate a string that combines information from both collection values
-                      receiver.output(
+                      c.output(
                           KV.of(
                               countryCode,
                               "Country name: " + countryName + ", Event info: " + eventInfo));
@@ -114,11 +111,10 @@
             ParDo.of(
                 new DoFn<KV<String, String>, String>() {
                   @ProcessElement
-                  public void processElement(
-                      @Element KV<String, String> element, OutputReceiver<String> receiver) {
+                  public void processElement(ProcessContext c) {
                     String outputstring =
-                        "Country code: " + element.getKey() + ", " + element.getValue();
-                    receiver.output(outputstring);
+                        "Country code: " + c.element().getKey() + ", " + c.element().getValue();
+                    c.output(outputstring);
                   }
                 }));
     return formattedResults;
@@ -130,13 +126,14 @@
    */
   static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> {
     @ProcessElement
-    public void processElement(@Element TableRow row, OutputReceiver<KV<String, String>> receiver) {
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
       String countryCode = (String) row.get("ActionGeo_CountryCode");
       String sqlDate = (String) row.get("SQLDATE");
       String actor1Name = (String) row.get("Actor1Name");
       String sourceUrl = (String) row.get("SOURCEURL");
       String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl;
-      receiver.output(KV.of(countryCode, eventInfo));
+      c.output(KV.of(countryCode, eventInfo));
     }
   }
 
@@ -146,10 +143,11 @@
    */
   static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> {
     @ProcessElement
-    public void processElement(@Element TableRow row, OutputReceiver<KV<String, String>> receiver) {
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
       String countryCode = (String) row.get("FIPSCC");
       String countryName = (String) row.get("HumanName");
-      receiver.output(KV.of(countryCode, countryName));
+      c.output(KV.of(countryCode, countryName));
     }
   }
 
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
index 85df56a..8760d56 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -30,8 +30,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -75,22 +73,23 @@
    */
   static class ExtractTempFn extends DoFn<TableRow, KV<Integer, Double>> {
     @ProcessElement
-    public void processElement(
-        @Element TableRow row, OutputReceiver<KV<Integer, Double>> receiver) {
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
       Integer month = Integer.parseInt((String) row.get("month"));
       Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
-      receiver.output(KV.of(month, meanTemp));
+      c.output(KV.of(month, meanTemp));
     }
   }
 
   /** Format the results to a TableRow, to save to BigQuery. */
   static class FormatMaxesFn extends DoFn<KV<Integer, Double>, TableRow> {
     @ProcessElement
-    public void processElement(
-        @Element KV<Integer, Double> element, OutputReceiver<TableRow> receiver) {
+    public void processElement(ProcessContext c) {
       TableRow row =
-          new TableRow().set("month", element.getKey()).set("max_mean_temp", element.getValue());
-      receiver.output(row);
+          new TableRow()
+              .set("month", c.element().getKey())
+              .set("max_mean_temp", c.element().getValue());
+      c.output(row);
     }
   }
 
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java
index 0c39c60..713af1d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java
@@ -26,8 +26,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -75,9 +73,10 @@
    */
   static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
     @ProcessElement
-    public void processElement(@Element TableRow row, OutputReceiver<Integer> receiver) {
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
       if ((Boolean) row.get("tornado")) {
-        receiver.output(Integer.parseInt((String) row.get("month")));
+        c.output(Integer.parseInt((String) row.get("month")));
       }
     }
   }
@@ -88,9 +87,8 @@
    */
   static class FormatCountsFn extends DoFn<KV<Integer, Long>, String> {
     @ProcessElement
-    public void processElement(
-        @Element KV<Integer, Long> element, OutputReceiver<String> receiver) {
-      receiver.output(element.getKey() + ": " + element.getValue());
+    public void processElement(ProcessContext c) {
+      c.output(c.element().getKey() + ": " + c.element().getValue());
     }
   }
 
@@ -136,9 +134,9 @@
     }
 
     @ProcessElement
-    public void processElement(@Element T element, OutputReceiver<T> receiver) {
-      LOG.info("{}{}", prefix, element);
-      receiver.output(element);
+    public void processElement(ProcessContext c) {
+      LOG.info("{}{}", prefix, c.element());
+      c.output(c.element());
     }
   }
 }
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index 5270077..cd3c6dd 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -38,9 +38,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -49,7 +46,6 @@
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
@@ -368,18 +364,15 @@
                   new DoFn<KV<String, Iterable<Integer>>, KV<String, String>>() {
 
                     @ProcessElement
-                    public void processElement(
-                        @Element KV<String, Iterable<Integer>> element,
-                        OutputReceiver<KV<String, String>> receiver)
-                        throws Exception {
-                      Iterable<Integer> flows = element.getValue();
+                    public void processElement(ProcessContext c) throws Exception {
+                      Iterable<Integer> flows = c.element().getValue();
                       Integer sum = 0;
                       Long numberOfRecords = 0L;
                       for (Integer value : flows) {
                         sum += value;
                         numberOfRecords++;
                       }
-                      receiver.output(KV.of(element.getKey(), sum + "," + numberOfRecords));
+                      c.output(KV.of(c.element().getKey(), sum + "," + numberOfRecords));
                     }
                   }));
       PCollection<TableRow> output = results.apply(ParDo.of(new FormatTotalFlow(triggerType)));
@@ -399,27 +392,21 @@
     }
 
     @ProcessElement
-    public void processElement(
-        PaneInfo pane,
-        @Timestamp Instant timestamp,
-        BoundedWindow window,
-        @Element KV<String, String> element,
-        OutputReceiver<TableRow> receiver)
-        throws Exception {
-      String[] values = element.getValue().split(",", -1);
+    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+      String[] values = c.element().getValue().split(",", -1);
       TableRow row =
           new TableRow()
               .set("trigger_type", triggerType)
-              .set("freeway", element.getKey())
+              .set("freeway", c.element().getKey())
               .set("total_flow", Integer.parseInt(values[0]))
               .set("number_of_records", Long.parseLong(values[1]))
               .set("window", window.toString())
-              .set("isFirst", pane.isFirst())
-              .set("isLast", pane.isLast())
-              .set("timing", pane.getTiming().toString())
-              .set("event_time", timestamp.toString())
+              .set("isFirst", c.pane().isFirst())
+              .set("isLast", c.pane().isLast())
+              .set("timing", c.pane().getTiming().toString())
+              .set("event_time", c.timestamp().toString())
               .set("processing_time", Instant.now().toString());
-      receiver.output(row);
+      c.output(row);
     }
   }
 
@@ -431,9 +418,8 @@
     private static final int VALID_NUM_FIELDS = 50;
 
     @ProcessElement
-    public void processElement(
-        @Element String element, OutputReceiver<KV<String, Integer>> receiver) throws Exception {
-      String[] laneInfo = element.split(",", -1);
+    public void processElement(ProcessContext c) throws Exception {
+      String[] laneInfo = c.element().split(",", -1);
       if ("timestamp".equals(laneInfo[0])) {
         // Header row
         return;
@@ -449,7 +435,7 @@
       if (totalFlow == null || totalFlow <= 0) {
         return;
       }
-      receiver.output(KV.of(freeway, totalFlow));
+      c.output(KV.of(freeway, totalFlow));
     }
   }
 
@@ -523,8 +509,7 @@
     }
 
     @ProcessElement
-    public void processElement(@Element String element, OutputReceiver<String> receiver)
-        throws Exception {
+    public void processElement(ProcessContext c) throws Exception {
       Instant timestamp = Instant.now();
       if (random.nextDouble() < THRESHOLD) {
         int range = MAX_DELAY - MIN_DELAY;
@@ -532,7 +517,7 @@
         long delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes);
         timestamp = new Instant(timestamp.getMillis() - delayInMillis);
       }
-      receiver.outputWithTimestamp(element, timestamp);
+      c.outputWithTimestamp(c.element(), timestamp);
     }
   }
 
diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index 99aada2..4f24c69 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -76,10 +76,6 @@
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.SideInput;
-import org.apache.beam.sdk.transforms.DoFn.Timestamp;
 import org.apache.beam.sdk.transforms.Latest;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -544,14 +540,14 @@
             ParDo.of(
                 new DoFn<KV<String, CoGbkResult>, String>() {
                   @ProcessElement
-                  public void processElement(
-                      @Element KV<String, CoGbkResult> e, OutputReceiver<String> receiver) {
+                  public void processElement(ProcessContext c) {
+                    KV<String, CoGbkResult> e = c.element();
                     String name = e.getKey();
                     Iterable<String> emailsIter = e.getValue().getAll(emailsTag);
                     Iterable<String> phonesIter = e.getValue().getAll(phonesTag);
                     String formattedResult =
                         Snippets.formatCoGbkResults(name, emailsIter, phonesIter);
-                    receiver.output(formattedResult);
+                    c.output(formattedResult);
                   }
                 }));
     // [END CoGroupByKeyTuple]
@@ -646,23 +642,20 @@
                     new DoFn<Long, KV<Long, Long>>() {
 
                       @ProcessElement
-                      public void process(
-                          @Timestamp Instant timestamp,
-                          @Element Long element,
-                          @SideInput("mapIterable") Iterable<Map<String, String>> si,
-                          OutputReceiver<KV<Long, Long>> receiver) {
+                      public void process(ProcessContext c, @Timestamp Instant timestamp) {
+                        Iterable<Map<String, String>> si = c.sideInput(mapIterable);
                         // Take an element from the side input iterable (likely length 1)
                         Map<String, String> keyMap = si.iterator().next();
-                        receiver.outputWithTimestamp(KV.of(1L, element), Instant.now());
+                        c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
 
                         LOG.info(
                             "Value is {} with timestamp {}, using key A from side input with time {}.",
-                            element,
+                            c.element(),
                             timestamp.toString(DateTimeFormat.forPattern("HH:mm:ss")),
                             keyMap.get("Key_A"));
                       }
                     })
-                .withSideInput("mapIterable", mapIterable));
+                .withSideInputs(mapIterable));
 
     p.run();
   }
@@ -708,9 +701,9 @@
 
                   // Define the DoFn that logs the ValueProvider value.
                   @ProcessElement
-                  public void process(PipelineOptions options) {
+                  public void process(ProcessContext c) {
 
-                    MyOptions ops = options.as(MyOptions.class);
+                    MyOptions ops = c.getPipelineOptions().as(MyOptions.class);
                     // This example logs the ValueProvider value, but you could store it by
                     // pushing it to an external database.
 
@@ -950,13 +943,11 @@
               ParDo.of(
                       new DoFn<Instant, Long>() {
                         @ProcessElement
-                        public void process(
-                            @SideInput("sideInput") List<Long> sideInputValue,
-                            OutputReceiver<Long> receiver) {
-                          receiver.output((long) sideInputValue.size());
+                        public void process(ProcessContext c) {
+                          c.output((long) c.sideInput(sideInput).size());
                         }
                       })
-                  .withSideInput("sideInput", sideInput));
+                  .withSideInputs(sideInput));
       // [END PeriodicallyUpdatingSideInputs]
       return result;
     }
@@ -1197,10 +1188,7 @@
     private static class BundleFinalizationDoFn extends DoFn<String, Integer> {
       // [START BundleFinalize]
       @ProcessElement
-      public void processElement(
-          @Element String element,
-          OutputReceiver<Integer> receiver,
-          BundleFinalizer bundleFinalizer) {
+      public void processElement(ProcessContext c, BundleFinalizer bundleFinalizer) {
         // ... produce output ...
 
         bundleFinalizer.afterBundleCommit(
diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java b/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java
index b7fb7b8..4aa20fc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java
@@ -28,8 +28,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
 import org.slf4j.Logger;
@@ -86,13 +84,11 @@
     }
 
     @ProcessElement
-    public void processElement(
-        @Element KV<String, String> element, OutputReceiver<KV<String, String>> receiver)
-        throws Exception {
+    public void processElement(ProcessContext c) throws Exception {
       try {
         // Our Library takes a single command in position 0 which it will echo back in the result
         SubProcessCommandLineArgs commands = new SubProcessCommandLineArgs();
-        Command command = new Command(0, String.valueOf(element.getValue()));
+        Command command = new Command(0, String.valueOf(c.element().getValue()));
         commands.putCommand(command);
 
         // The ProcessingKernel deals with the execution of the process
@@ -101,7 +97,7 @@
         // Run the command and work through the results
         List<String> results = kernel.exec(commands);
         for (String s : results) {
-          receiver.output(KV.of(element.getKey(), s));
+          c.output(KV.of(c.element().getKey(), s));
         }
       } catch (Exception ex) {
         LOG.error("Error processing element ", ex);
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
index 0d9e257..19c83c6 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
@@ -29,8 +29,6 @@
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -148,8 +146,8 @@
 
   static class FormatResults extends DoFn<TableRow, String> {
     @ProcessElement
-    public void processElement(@Element TableRow element, OutputReceiver<String> receiver)
-        throws Exception {
+    public void processElement(ProcessContext c) throws Exception {
+      TableRow element = c.element();
       TableRow row =
           new TableRow()
               .set("trigger_type", element.get("trigger_type"))
@@ -160,7 +158,7 @@
               .set("isLast", element.get("isLast"))
               .set("timing", element.get("timing"))
               .set("window", element.get("window"));
-      receiver.output(canonicalFormat(row));
+      c.output(canonicalFormat(row));
     }
   }
 }
diff --git a/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java b/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java
index 9981b40..01055d9 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java
@@ -41,8 +41,6 @@
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Element;
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -146,13 +144,11 @@
     }
 
     @ProcessElement
-    public void processElement(
-        @Element KV<String, String> element, OutputReceiver<KV<String, String>> receiver)
-        throws Exception {
+    public void processElement(ProcessContext c) throws Exception {
       try {
         // Our Library takes a single command in position 0 which it will echo back in the result
         SubProcessCommandLineArgs commands = new SubProcessCommandLineArgs();
-        Command command = new Command(0, String.valueOf(element.getValue()));
+        Command command = new Command(0, String.valueOf(c.element().getValue()));
         commands.putCommand(command);
 
         // The ProcessingKernel deals with the execution of the process
@@ -161,7 +157,7 @@
         // Run the command and work through the results
         List<String> results = kernel.exec(commands);
         for (String s : results) {
-          receiver.output(KV.of(element.getKey(), s));
+          c.output(KV.of(c.element().getKey(), s));
         }
       } catch (Exception ex) {
         LOG.error("Error processing element ", ex);
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt
index 7be4df2..05cb404 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt
@@ -25,8 +25,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory
 import org.apache.beam.sdk.testing.PAssert
 import org.apache.beam.sdk.transforms.DoFn
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
 import org.apache.beam.sdk.transforms.ParDo
 import org.apache.beam.sdk.values.KV
 import org.slf4j.LoggerFactory
@@ -86,18 +84,18 @@
         private val unmatchedWords = Metrics.counter(FilterTextFn::class.java, "unmatchedWords")
 
         @ProcessElement
-        fun processElement(@Element element: KV<String, Long>, receiver: OutputReceiver<KV<String, Long>>) {
-            if (filter.matcher(element.key).matches()) {
+        fun processElement(c: ProcessContext) {
+            if (filter.matcher(c.element().key).matches()) {
                 // Log at the "DEBUG" level each element that we match. When executing this pipeline
                 // these log lines will appear only if the log level is set to "DEBUG" or lower.
-                LOG.debug("Matched: ${element.key}")
+                LOG.debug("Matched: ${c.element().key}")
                 matchedWords.inc()
-                receiver.output(element)
+                c.output(c.element())
             } else {
                 // Log at the "TRACE" level each element that is not matched. Different log levels
                 // can be used to control the verbosity of logging providing an effective mechanism
                 // to filter less important information.
-                LOG.trace("Did not match: ${element.key}")
+                LOG.trace("Did not match: ${c.element().key}")
                 unmatchedWords.inc()
             }
         }
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt
index 8edc988..ec56bc6 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt
@@ -27,8 +27,6 @@
 import org.apache.beam.sdk.options.*
 import org.apache.beam.sdk.transforms.Count
 import org.apache.beam.sdk.transforms.DoFn
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
 import org.apache.beam.sdk.transforms.PTransform
 import org.apache.beam.sdk.transforms.ParDo
 import org.apache.beam.sdk.values.KV
@@ -75,9 +73,10 @@
      */
     internal class ExtractTornadoesFn : DoFn<TableRow, Int>() {
         @ProcessElement
-        fun processElement(@Element row: TableRow, receiver: OutputReceiver<Int>) {
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
             if (row["tornado"] as Boolean) {
-                receiver.output(Integer.parseInt(row["month"] as String))
+                c.output(Integer.parseInt(row["month"] as String))
             }
         }
     }
@@ -88,11 +87,11 @@
      */
     internal class FormatCountsFn : DoFn<KV<Int, Long>, TableRow>() {
         @ProcessElement
-        fun processElement(@Element element: KV<Int, Long>, receiver: OutputReceiver<TableRow>) {
+        fun processElement(c: ProcessContext) {
             val row = TableRow()
-                    .set("month", element.key)
-                    .set("tornado_count", element.value)
-            receiver.output(row)
+                    .set("month", c.element().key)
+                    .set("tornado_count", c.element().value)
+            c.output(row)
         }
     }
 
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt
index 4d89b69..9e38803 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt
@@ -26,8 +26,6 @@
 import org.apache.beam.sdk.metrics.Metrics
 import org.apache.beam.sdk.options.*
 import org.apache.beam.sdk.transforms.*
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
 import org.apache.beam.sdk.values.KV
 import org.apache.beam.sdk.values.PCollection
 
@@ -72,11 +70,12 @@
         private val smallerWords = Metrics.counter(ExtractLargeWordsFn::class.java, "smallerWords")
 
         @ProcessElement
-        fun processElement(@Element row: TableRow, receiver: OutputReceiver<KV<String, String>>) {
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
             val playName = row["corpus"] as String
             val word = row["word"] as String
             if (word.length >= MIN_WORD_LENGTH) {
-                receiver.output(KV.of(word, playName))
+                c.output(KV.of(word, playName))
             } else {
                 // Track how many smaller words we're not including. This information will be
                 // visible in the Monitoring UI.
@@ -91,9 +90,9 @@
      */
     internal class FormatShakespeareOutputFn : DoFn<KV<String, String>, TableRow>() {
         @ProcessElement
-        fun processElement(@Element element: KV<String, String>, receiver: OutputReceiver<TableRow>) {
-            val row = TableRow().set("word", element.key).set("all_plays", element.value)
-            receiver.output(row)
+        fun processElement(c: ProcessContext) {
+            val row = TableRow().set("word", c.element().key).set("all_plays", c.element().value)
+            c.output(row)
         }
     }
 
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt
index 2181884..2625f5b 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt
@@ -25,9 +25,6 @@
 import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
 import org.apache.beam.sdk.options.*
 import org.apache.beam.sdk.transforms.*
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
-import org.apache.beam.sdk.transforms.DoFn.SideInput
 import org.apache.beam.sdk.values.PCollection
 import java.util.logging.Logger
 
@@ -83,7 +80,8 @@
      */
     internal class ProjectionFn : DoFn<TableRow, TableRow>() {
         @ProcessElement
-        fun processElement(@Element row: TableRow, receiver: OutputReceiver<TableRow>) {
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
             // Grab year, month, day, mean_temp from the row
             val year = Integer.parseInt(row["year"] as String)
             val month = Integer.parseInt(row["month"] as String)
@@ -96,7 +94,7 @@
                     .set("month", month)
                     .set("day", day)
                     .set("mean_temp", meanTemp)
-            receiver.output(outRow)
+            c.output(outRow)
         }
     }
 
@@ -110,10 +108,11 @@
     internal class FilterSingleMonthDataFn(private var monthFilter: Int?) : DoFn<TableRow, TableRow>() {
 
         @ProcessElement
-        fun processElement(@Element row: TableRow, receiver: OutputReceiver<TableRow>) {
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
             val month = row["month"]
             if (month == this.monthFilter) {
-                receiver.output(row)
+                c.output(row)
             }
         }
     }
@@ -124,9 +123,10 @@
      */
     internal class ExtractTempFn : DoFn<TableRow, Double>() {
         @ProcessElement
-        fun processElement(@Element row: TableRow, receiver: OutputReceiver<Double>) {
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
             val meanTemp = java.lang.Double.parseDouble(row["mean_temp"].toString())
-            receiver.output(meanTemp)
+            c.output(meanTemp)
         }
     }
 
@@ -158,17 +158,15 @@
                     ParDo.of(
                             object : DoFn<TableRow, TableRow>() {
                                 @ProcessElement
-                                fun processElement(
-                                        @SideInput("globalMeanTemp") gTemp: Double,
-                                        @Element element: TableRow,
-                                        receiver: OutputReceiver<TableRow>) {
-                                    val meanTemp = java.lang.Double.parseDouble(element["mean_temp"].toString())
+                                fun processElement(c: ProcessContext) {
+                                    val meanTemp = java.lang.Double.parseDouble(c.element()["mean_temp"].toString())
+                                    val gTemp = c.sideInput(globalMeanTemp)
                                     if (meanTemp < gTemp) {
-                                        receiver.output(element)
+                                        c.output(c.element())
                                     }
                                 }
                             })
-                            .withSideInput("globalMeanTemp", globalMeanTemp))
+                            .withSideInputs(globalMeanTemp))
         }
     }
 
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt
index 7f81629..2f2215e 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt
@@ -26,8 +26,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory
 import org.apache.beam.sdk.options.Validation
 import org.apache.beam.sdk.transforms.DoFn
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
 import org.apache.beam.sdk.transforms.ParDo
 import org.apache.beam.sdk.transforms.join.CoGbkResult
 import org.apache.beam.sdk.transforms.join.CoGroupByKey
@@ -91,14 +89,13 @@
                 ParDo.of(
                         object : DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
                             @ProcessElement
-                            fun processElement(
-                                    @Element element: KV<String, CoGbkResult>,
-                                    receiver: OutputReceiver<KV<String, String>>) {
-                                val countryCode = element.key
-                                val countryName = element.value.getOnly(countryInfoTag)
-                                for (ei in element.value.getAll(eventInfoTag)) {
+                            fun processElement(c: ProcessContext) {
+                                val e = c.element()
+                                val countryCode = e.key
+                                val countryName = e.value.getOnly(countryInfoTag)
+                                for (ei in c.element().value.getAll(eventInfoTag)) {
                                     // Generate a string that combines information from both collection values
-                                    receiver.output(
+                                    c.output(
                                             KV.of<String, String>(
                                                     countryCode,
                                                     "Country name: $countryName, Event info: $ei"))
@@ -112,9 +109,9 @@
                 ParDo.of(
                         object : DoFn<KV<String, String>, String>() {
                             @ProcessElement
-                            fun processElement(@Element element: KV<String, String>, receiver: OutputReceiver<String>) {
-                                val outputString = "Country code: ${element.key}, ${element.value}"
-                                receiver.output(outputString)
+                            fun processElement(c: ProcessContext) {
+                                val outputString = "Country code: ${c.element().key}, ${c.element().value}"
+                                c.output(outputString)
                             }
                         }))
     }
@@ -125,13 +122,14 @@
      */
     internal class ExtractEventDataFn : DoFn<TableRow, KV<String, String>>() {
         @ProcessElement
-        fun processElement(@Element row: TableRow, receiver: OutputReceiver<KV<String, String>>) {
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
             val countryCode = row["ActionGeo_CountryCode"] as String
             val sqlDate = row["SQLDATE"] as String
             val actor1Name = row["Actor1Name"] as String
             val sourceUrl = row["SOURCEURL"] as String
             val eventInfo = "Date: $sqlDate, Actor1: $actor1Name, url: $sourceUrl"
-            receiver.output(KV.of(countryCode, eventInfo))
+            c.output(KV.of(countryCode, eventInfo))
         }
     }
 
@@ -141,10 +139,11 @@
      */
     internal class ExtractCountryInfoFn : DoFn<TableRow, KV<String, String>>() {
         @ProcessElement
-        fun processElement(@Element row: TableRow, receiver: OutputReceiver<KV<String, String>>) {
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
             val countryCode = row["FIPSCC"] as String
             val countryName = row["HumanName"] as String
-            receiver.output(KV.of(countryCode, countryName))
+            c.output(KV.of(countryCode, countryName))
         }
     }
 
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt
index ad16ecd..11418d3 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt
@@ -26,8 +26,6 @@
 import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
 import org.apache.beam.sdk.options.*
 import org.apache.beam.sdk.transforms.DoFn
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
 import org.apache.beam.sdk.transforms.Max
 import org.apache.beam.sdk.transforms.PTransform
 import org.apache.beam.sdk.transforms.ParDo
@@ -75,21 +73,22 @@
      */
     internal class ExtractTempFn : DoFn<TableRow, KV<Int, Double>>() {
         @ProcessElement
-        fun processElement(@Element row: TableRow, receiver: OutputReceiver<KV<Int, Double>>) {
+        fun processElement(c: ProcessContext) {
+            val row = c.element()
             val month = Integer.parseInt(row["month"] as String)
             val meanTemp = java.lang.Double.parseDouble(row["mean_temp"].toString())
-            receiver.output(KV.of(month, meanTemp))
+            c.output(KV.of(month, meanTemp))
         }
     }
 
     /** Format the results to a TableRow, to save to BigQuery.  */
     internal class FormatMaxesFn : DoFn<KV<Int, Double>, TableRow>() {
         @ProcessElement
-        fun processElement(@Element element: KV<Int, Double>, receiver: OutputReceiver<TableRow>) {
+        fun processElement(c: ProcessContext) {
             val row = TableRow()
-                    .set("month", element.key)
-                    .set("max_mean_temp", element.value)
-            receiver.output(row)
+                    .set("month", c.element().key)
+                    .set("max_mean_temp", c.element().value)
+            c.output(row)
         }
     }
 
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt
index bb8c090..4afa7d0 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt
@@ -34,9 +34,6 @@
 import org.apache.beam.sdk.options.PipelineOptionsFactory
 import org.apache.beam.sdk.options.StreamingOptions
 import org.apache.beam.sdk.transforms.DoFn
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
-import org.apache.beam.sdk.transforms.DoFn.Timestamp
 import org.apache.beam.sdk.transforms.GroupByKey
 import org.apache.beam.sdk.transforms.PTransform
 import org.apache.beam.sdk.transforms.ParDo
@@ -367,17 +364,15 @@
 
                                 @ProcessElement
                                 @Throws(Exception::class)
-                                fun processElement(
-                                        @Element element: KV<String, Iterable<Int>>,
-                                        receiver: OutputReceiver<KV<String, String>>) {
-                                    val flows = element.value
+                                fun processElement(c: ProcessContext) {
+                                    val flows = c.element().value
                                     var sum = 0
                                     var numberOfRecords = 0L
                                     for (value in flows) {
                                         sum += value
                                         numberOfRecords++
                                     }
-                                    receiver.output(KV.of<String, String>(element.key, "$sum,$numberOfRecords"))
+                                    c.output(KV.of<String, String>(c.element().key, "$sum,$numberOfRecords"))
                                 }
                             }))
             return results.apply(ParDo.of(FormatTotalFlow(triggerType)))
@@ -392,25 +387,20 @@
 
         @ProcessElement
         @Throws(Exception::class)
-        fun processElement(
-                @Element element: KV<String, String>,
-                window: BoundedWindow,
-                pane: PaneInfo,
-                @Timestamp timestamp: Instant,
-                receiver: OutputReceiver<TableRow>) {
-            val values = element.value.split(",".toRegex()).toTypedArray()
+        fun processElement(c: ProcessContext, window: BoundedWindow) {
+            val values = c.element().value.split(",".toRegex()).toTypedArray()
             val row = TableRow()
                     .set("trigger_type", triggerType)
-                    .set("freeway", element.key)
+                    .set("freeway", c.element().key)
                     .set("total_flow", Integer.parseInt(values[0]))
                     .set("number_of_records", java.lang.Long.parseLong(values[1]))
                     .set("window", window.toString())
-                    .set("isFirst", pane.isFirst)
-                    .set("isLast", pane.isLast)
-                    .set("timing", pane.timing.toString())
-                    .set("event_time", timestamp.toString())
+                    .set("isFirst", c.pane().isFirst)
+                    .set("isLast", c.pane().isLast)
+                    .set("timing", c.pane().timing.toString())
+                    .set("event_time", c.timestamp().toString())
                     .set("processing_time", Instant.now().toString())
-            receiver.output(row)
+            c.output(row)
         }
     }
 
@@ -422,8 +412,8 @@
 
         @ProcessElement
         @Throws(Exception::class)
-        fun processElement(@Element element: String, receiver: OutputReceiver<KV<String, Int>>) {
-            val laneInfo = element.split(",".toRegex()).toTypedArray()
+        fun processElement(c: ProcessContext) {
+            val laneInfo = c.element().split(",".toRegex()).toTypedArray()
             if ("timestamp" == laneInfo[0]) {
                 // Header row
                 return
@@ -439,7 +429,7 @@
             if (totalFlow == null || totalFlow <= 0) {
                 return
             }
-            receiver.output(KV.of<String, Int>(freeway, totalFlow))
+            c.output(KV.of<String, Int>(freeway, totalFlow))
         }
 
         companion object {
@@ -496,7 +486,7 @@
 
         @ProcessElement
         @Throws(Exception::class)
-        fun processElement(@Element element: String, receiver: OutputReceiver<String>) {
+        fun processElement(c: ProcessContext) {
             var timestamp = Instant.now()
             if (random.nextDouble() < THRESHOLD) {
                 val range = MAX_DELAY - MIN_DELAY
@@ -504,7 +494,7 @@
                 val delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes.toLong())
                 timestamp = Instant(timestamp.millis - delayInMillis)
             }
-            receiver.outputWithTimestamp(element, timestamp)
+            c.outputWithTimestamp(c.element(), timestamp)
         }
 
         companion object {
diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt
index eb20993..d2f58c2 100644
--- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt
@@ -33,8 +33,6 @@
 import org.apache.beam.sdk.io.gcp.bigquery.TableDestination
 import org.apache.beam.sdk.io.gcp.bigquery.WriteResult
 import org.apache.beam.sdk.transforms.*
-import org.apache.beam.sdk.transforms.DoFn.Element
-import org.apache.beam.sdk.transforms.DoFn.OutputReceiver
 import org.apache.beam.sdk.transforms.join.CoGbkResult
 import org.apache.beam.sdk.transforms.join.CoGroupByKey
 import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple
@@ -362,12 +360,13 @@
                 ParDo.of(
                         object : DoFn<KV<String, CoGbkResult>, String>() {
                             @ProcessElement
-                            fun processElement(@Element element: KV<String, CoGbkResult>, receiver: OutputReceiver<String>) {
-                                val name = element.key
-                                val emailsIter = element.value.getAll(emailsTag)
-                                val phonesIter = element.value.getAll(phonesTag)
+                            fun processElement(c: ProcessContext) {
+                                val e = c.element()
+                                val name = e.key
+                                val emailsIter = e.value.getAll(emailsTag)
+                                val phonesIter = e.value.getAll(phonesTag)
                                 val formattedResult = formatCoGbkResults(name, emailsIter, phonesIter)
-                                receiver.output(formattedResult)
+                                c.output(formattedResult)
                             }
                         }))
     }