Merge pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

diff --git a/CHANGES.md b/CHANGES.md
index e03466d..acc0201 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,7 +65,7 @@
 
 ## Breaking Changes
 
-* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* The Python SDK now requires `--job_endpoint` to be set when using `--runner=PortableRunner` ([BEAM-9860](https://issues.apache.org/jira/browse/BEAM-9860)). Users seeking the old default behavior should set `--runner=FlinkRunner` instead.
 
 ## Deprecations
 
diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle
index 234f3f6..90f5562 100644
--- a/buildSrc/build.gradle
+++ b/buildSrc/build.gradle
@@ -52,7 +52,7 @@
   runtime "ca.coglinc:javacc-gradle-plugin:2.4.0"                                                     // Enable the JavaCC parser generator
   runtime "net.linguica.gradle:maven-settings-plugin:0.5"
   runtime "gradle.plugin.io.pry.gradle.offline_dependencies:gradle-offline-dependencies-plugin:0.5.0" // Enable creating an offline repository
-  runtime "net.ltgt.gradle:gradle-errorprone-plugin:0.0.13"                                           // Enable errorprone Java static analysis
+  runtime "net.ltgt.gradle:gradle-errorprone-plugin:1.1.1"                                           // Enable errorprone Java static analysis
   runtime "org.ajoberstar.grgit:grgit-gradle:3.0.0"                                                   // Enable website git publish to asf-site branch
   runtime "com.avast.gradle:gradle-docker-compose-plugin:0.8.8"                                       // Enable docker compose tasks
   runtime "ca.cutterslade.gradle:gradle-dependency-analyze:1.3.1"                                     // Enable dep analysis
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 830cf47..9e89d8f 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -447,7 +447,7 @@
         commons_io                                  : "commons-io:commons-io:2.6",
         commons_lang3                               : "org.apache.commons:commons-lang3:3.9",
         commons_math3                               : "org.apache.commons:commons-math3:3.6.1",
-        error_prone_annotations                     : "com.google.errorprone:error_prone_annotations:2.0.15",
+        error_prone_annotations                     : "com.google.errorprone:error_prone_annotations:2.3.1",
         gax                                         : "com.google.api:gax:$gax_version",
         gax_grpc                                    : "com.google.api:gax-grpc:$gax_version",
         google_api_client                           : "com.google.api-client:google-api-client:$google_clients_version",
@@ -693,10 +693,7 @@
         options.compilerArgs += ([
           '-parameters',
           '-Xlint:all',
-          '-Werror',
-          '-XepDisableWarningsInGeneratedCode',
-          '-XepExcludedPaths:(.*/)?(build/generated-src|build/generated.*avro-java|build/generated)/.*',
-          '-Xep:MutableConstantField:OFF' // Guava's immutable collections cannot appear on API surface.
+          '-Werror'
         ]
         + (defaultLintSuppressions + configuration.disableLintWarnings).collect { "-Xlint:-${it}" })
       }
@@ -873,8 +870,21 @@
       // Enable errorprone static analysis
       project.apply plugin: 'net.ltgt.errorprone'
 
+      project.dependencies {
+        errorprone("com.google.errorprone:error_prone_core:2.3.1")
+        // At least JDk 9 compiler is required, however JDK 8 still can be used but with additional errorproneJavac
+        // configuration. For more details please see https://github.com/tbroyer/gradle-errorprone-plugin#jdk-8-support
+        errorproneJavac("com.google.errorprone:javac:9+181-r4173-1")
+      }
+
       project.configurations.errorprone { resolutionStrategy.force 'com.google.errorprone:error_prone_core:2.3.1' }
 
+      project.tasks.withType(JavaCompile) {
+        options.errorprone.disableWarningsInGeneratedCode = true
+        options.errorprone.excludedPaths = '(.*/)?(build/generated-src|build/generated.*avro-java|build/generated)/.*'
+        options.errorprone.errorproneArgs.add("MutableConstantField:OFF")
+      }
+
       if (configuration.shadowClosure) {
         // Enables a plugin which can perform shading of classes. See the general comments
         // above about dependency management for Java projects and how the shadow plugin
@@ -1987,11 +1997,10 @@
       }
 
       def addPortableWordCountTask = { boolean isStreaming, String runner ->
-        def taskName = 'portableWordCount' + (runner.equals("PortableRunner") ? "" : runner) + (isStreaming ? 'Streaming' : 'Batch')
+        def taskName = 'portableWordCount' + runner + (isStreaming ? 'Streaming' : 'Batch')
         project.task(taskName) {
           dependsOn = ['installGcpTest']
           mustRunAfter = [
-            ':runners:flink:1.10:job-server-container:docker',
             ':runners:flink:1.10:job-server:shadowJar',
             ':runners:spark:job-server:shadowJar',
             ':sdks:python:container:py2:docker',
@@ -2045,8 +2054,6 @@
       }
       project.ext.addPortableWordCountTasks = {
         ->
-        addPortableWordCountTask(false, "PortableRunner")
-        addPortableWordCountTask(true, "PortableRunner")
         addPortableWordCountTask(false, "FlinkRunner")
         addPortableWordCountTask(true, "FlinkRunner")
         addPortableWordCountTask(false, "SparkRunner")
diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index e100166..3fca68c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -51,10 +51,13 @@
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.Watch;
@@ -68,7 +71,6 @@
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -785,4 +787,72 @@
 
     }
   }
+
+  public static class PeriodicallyUpdatingSideInputs {
+
+    public static PCollection<Long> main(
+        Pipeline p,
+        Instant startAt,
+        Instant stopAt,
+        Duration interval1,
+        Duration interval2,
+        String fileToRead) {
+      // [START PeriodicallyUpdatingSideInputs]
+      PCollectionView<List<Long>> sideInput =
+          p.apply(
+                  "SIImpulse",
+                  PeriodicImpulse.create()
+                      .startAt(startAt)
+                      .stopAt(stopAt)
+                      .withInterval(interval1)
+                      .applyWindowing())
+              .apply(
+                  "FileToRead",
+                  ParDo.of(
+                      new DoFn<Instant, String>() {
+                        @DoFn.ProcessElement
+                        public void process(@Element Instant notUsed, OutputReceiver<String> o) {
+                          o.output(fileToRead);
+                        }
+                      }))
+              .apply(FileIO.matchAll())
+              .apply(FileIO.readMatches())
+              .apply(TextIO.readFiles())
+              .apply(
+                  ParDo.of(
+                      new DoFn<String, String>() {
+                        @ProcessElement
+                        public void process(@Element String src, OutputReceiver<String> o) {
+                          o.output(src);
+                        }
+                      }))
+              .apply(Combine.globally(Count.<String>combineFn()).withoutDefaults())
+              .apply(View.asList());
+
+      PCollection<Instant> mainInput =
+          p.apply(
+              "MIImpulse",
+              PeriodicImpulse.create()
+                  .startAt(startAt.minus(Duration.standardSeconds(1)))
+                  .stopAt(stopAt.minus(Duration.standardSeconds(1)))
+                  .withInterval(interval2)
+                  .applyWindowing());
+
+      // Consume side input. GenerateSequence generates test data.
+      // Use a real source (like PubSubIO or KafkaIO) in production.
+      PCollection<Long> result =
+          mainInput.apply(
+              "generateOutput",
+              ParDo.of(
+                      new DoFn<Instant, Long>() {
+                        @ProcessElement
+                        public void process(ProcessContext c) {
+                          c.output((long) c.sideInput(sideInput).size());
+                        }
+                      })
+                  .withSideInputs(sideInput));
+      // [END PeriodicallyUpdatingSideInputs]
+      return result;
+    }
+  }
 }
diff --git a/examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java b/examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java
index 0cf53a3..b65840f 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java
@@ -17,23 +17,33 @@
  */
 package org.apache.beam.examples.snippets;
 
+import java.io.BufferedWriter;
+import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Files;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -143,4 +153,40 @@
 
     p.run();
   }
+
+  @Test
+  @Category({NeedsRunner.class, UsesStatefulParDo.class})
+  public void testSlowlyUpdatingSideInputsWindowed() {
+    Instant startAt = Instant.now().minus(Duration.standardMinutes(3));
+    Duration duration = Duration.standardSeconds(10);
+    Instant stopAt = startAt.plus(duration);
+    Duration interval1 = Duration.standardSeconds(1);
+    Duration interval2 = Duration.standardSeconds(1);
+
+    File f = null;
+    try {
+      f = File.createTempFile("testSlowlyUpdatingSIWindowed", "txt");
+      try (BufferedWriter fw = Files.newWriter(f, Charset.forName("UTF-8"))) {
+        fw.append("testdata");
+      }
+    } catch (IOException e) {
+      Assert.fail("failed to create temp file: " + e.toString());
+      throw new RuntimeException("Should never reach here");
+    }
+
+    PCollection<Long> result =
+        Snippets.PeriodicallyUpdatingSideInputs.main(
+            p, startAt, stopAt, interval1, interval2, f.getPath());
+
+    ArrayList<Long> expectedResults = new ArrayList<Long>();
+    expectedResults.add(0L);
+    for (Long i = startAt.getMillis(); i < stopAt.getMillis(); i = i + interval2.getMillis()) {
+      expectedResults.add(1L);
+    }
+
+    PAssert.that(result).containsInAnyOrder(expectedResults);
+
+    p.run().waitUntilFinish();
+    f.deleteOnExit();
+  }
 }
diff --git a/model/job-management/src/main/proto/beam_expansion_api.proto b/model/job-management/src/main/proto/beam_expansion_api.proto
index e358c56..c2c9a72 100644
--- a/model/job-management/src/main/proto/beam_expansion_api.proto
+++ b/model/job-management/src/main/proto/beam_expansion_api.proto
@@ -30,6 +30,7 @@
 option java_outer_classname = "ExpansionApi";
 
 import "beam_runner_api.proto";
+import "google/protobuf/struct.proto";
 
 message ExpansionRequest {
   // Set of components needed to interpret the transform, or which
@@ -46,6 +47,9 @@
   // A namespace (prefix) to use for the id of any newly created
   // components.
   string namespace = 3;
+
+  // The pipeline options specified by the caller
+  google.protobuf.Struct pipeline_options = 4;
 }
 
 message ExpansionResponse {
diff --git a/release/src/main/scripts/build_release_candidate.sh b/release/src/main/scripts/build_release_candidate.sh
index 9842fc5..be5a71d 100755
--- a/release/src/main/scripts/build_release_candidate.sh
+++ b/release/src/main/scripts/build_release_candidate.sh
@@ -176,6 +176,7 @@
   cd sdks/python
   virtualenv ${LOCAL_PYTHON_VIRTUALENV}
   source ${LOCAL_PYTHON_VIRTUALENV}/bin/activate
+  pip install -r build-requirements.txt
   python setup.py sdist --format=zip
   cd dist
 
@@ -275,7 +276,7 @@
   git clone ${GIT_REPO_URL}
   cd ${BEAM_ROOT_DIR}
   git checkout ${RELEASE_BRANCH}
-  cd sdks/python && tox -e docs
+  cd sdks/python && pip install -r build-requirements.txt && tox -e py37-docs
   GENERATED_PYDOC=~/${LOCAL_WEBSITE_UPDATE_DIR}/${LOCAL_PYTHON_DOC}/${BEAM_ROOT_DIR}/sdks/python/target/docs/_build
   rm -rf ${GENERATED_PYDOC}/.doctrees
 
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 7fbfaf0..241b49d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -40,13 +40,13 @@
   Table<StateNamespace, String, TimerData> existingTimers = HashBasedTable.create();
 
   /** Pending input watermark timers, in timestamp order. */
-  private NavigableSet<TimerData> watermarkTimers = new TreeSet<>();
+  private final NavigableSet<TimerData> watermarkTimers = new TreeSet<>();
 
   /** Pending processing time timers, in timestamp order. */
-  private NavigableSet<TimerData> processingTimers = new TreeSet<>();
+  private final NavigableSet<TimerData> processingTimers = new TreeSet<>();
 
   /** Pending synchronized processing time timers, in timestamp order. */
-  private NavigableSet<TimerData> synchronizedProcessingTimers = new TreeSet<>();
+  private final NavigableSet<TimerData> synchronizedProcessingTimers = new TreeSet<>();
 
   /** Current input watermark. */
   private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
@@ -66,6 +66,11 @@
     return outputWatermarkTime;
   }
 
+  /** Returns true when there are still timers to be fired. */
+  public boolean hasPendingTimers() {
+    return !existingTimers.isEmpty();
+  }
+
   /**
    * Returns when the next timer in the given time domain will fire, or {@code null} if there are no
    * timers scheduled in that time domain.
@@ -160,9 +165,9 @@
   @Deprecated
   @Override
   public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
-    TimerData existing = existingTimers.get(namespace, timerId + '+' + timerFamilyId);
-    if (existing != null) {
-      deleteTimer(existing);
+    TimerData removedTimer = existingTimers.remove(namespace, timerId + '+' + timerFamilyId);
+    if (removedTimer != null) {
+      timersForDomain(removedTimer.getDomain()).remove(removedTimer);
     }
   }
 
@@ -170,10 +175,7 @@
   @Deprecated
   @Override
   public void deleteTimer(TimerData timer) {
-    WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);
-    existingTimers.remove(
-        timer.getNamespace(), timer.getTimerId() + '+' + timer.getTimerFamilyId());
-    timersForDomain(timer.getDomain()).remove(timer);
+    deleteTimer(timer.getNamespace(), timer.getTimerId(), timer.getTimerFamilyId());
   }
 
   @Override
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index baead79..818bdb0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -156,11 +156,11 @@
     StateInternals stateInternals = stepContext.stateInternals();
     TimerInternals timerInternals = stepContext.timerInternals();
 
-    Instant outputWatermark =
+    Instant inputWatermark =
         MoreObjects.firstNonNull(
-            timerInternals.currentOutputWatermarkTime(), BoundedWindow.TIMESTAMP_MIN_VALUE);
+            timerInternals.currentInputWatermarkTime(), BoundedWindow.TIMESTAMP_MIN_VALUE);
 
-    if (!outputWatermark.isAfter(
+    if (!inputWatermark.isAfter(
         value.getTimestamp().plus(windowingStrategy.getAllowedLateness()))) {
 
       StateNamespace namespace = StateNamespaces.window(windowCoder, window);
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
index db18f78..9695a20 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
@@ -28,11 +28,15 @@
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiFunction;
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobMetricsResponse;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
 import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.util.JsonFormat;
 
 /**
  * Metrics containers by step.
@@ -173,7 +177,15 @@
 
   @Override
   public String toString() {
-    return asAttemptedOnlyMetricResults(this).toString();
+    JobApi.MetricResults results =
+        JobApi.MetricResults.newBuilder().addAllAttempted(getMonitoringInfos()).build();
+    GetJobMetricsResponse response = GetJobMetricsResponse.newBuilder().setMetrics(results).build();
+    try {
+      JsonFormat.Printer printer = JsonFormat.printer();
+      return printer.print(response);
+    } catch (InvalidProtocolBufferException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   private Iterable<MetricsContainerImpl> getMetricsContainers() {
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 6947a4c..7e72efc 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -133,6 +133,47 @@
     testOutput(true, (fn, output) -> createStatefulDoFnRunner(fn, output, false));
   }
 
+  @Test
+  public void testDataDroppedBasedOnInputWatermarkWhenOrdered() throws Exception {
+    MetricsContainerImpl container = new MetricsContainerImpl("any");
+    MetricsEnvironment.setCurrentContainer(container);
+    Instant timestamp = new Instant(0);
+
+    MyDoFn fn = MyDoFn.create(true);
+
+    DoFnRunner<KV<String, Integer>, Integer> runner = createStatefulDoFnRunner(fn);
+
+    runner.startBundle();
+
+    IntervalWindow window = new IntervalWindow(timestamp, timestamp.plus(WINDOW_SIZE));
+
+    runner.processElement(
+        WindowedValue.of(KV.of("hello", 1), timestamp, window, PaneInfo.NO_FIRING));
+
+    long droppedValues =
+        container
+            .getCounter(
+                MetricName.named(
+                    StatefulDoFnRunner.class, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER))
+            .getCumulative();
+    assertEquals(0L, droppedValues);
+
+    timerInternals.advanceInputWatermark(timestamp.plus(ALLOWED_LATENESS + 1));
+
+    runner.processElement(
+        WindowedValue.of(KV.of("hello", 1), timestamp, window, PaneInfo.NO_FIRING));
+
+    droppedValues =
+        container
+            .getCounter(
+                MetricName.named(
+                    StatefulDoFnRunner.class, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER))
+            .getCumulative();
+    assertEquals(1L, droppedValues);
+
+    runner.finishBundle();
+  }
+
   private void testLateDropping(boolean ordered) throws Exception {
     MetricsContainerImpl container = new MetricsContainerImpl("any");
     MetricsEnvironment.setCurrentContainer(container);
diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle
index 98dca8e..02eb5bf 100644
--- a/runners/direct-java/build.gradle
+++ b/runners/direct-java/build.gradle
@@ -95,6 +95,8 @@
   group = "Verification"
   description = "Runs tests that require a runner to validate that piplines/transforms work correctly"
 
+  testLogging.showStandardStreams = true
+
   def pipelineOptions = JsonOutput.toJson(["--runner=DirectRunner", "--runnerDeterminedSharding=false"])
   systemProperty "beamTestPipelineOptions", pipelineOptions
 
@@ -112,6 +114,10 @@
     excludeCategories "org.apache.beam.sdk.testing.UsesCrossLanguageTransforms"
     excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
   }
+  testLogging {
+    outputs.upToDateWhen {false}
+    showStandardStreams = true
+  }
 }
 
 // NOTE: This will also run 'NeedsRunner' tests, which are run in the :needsRunnerTests task as well.
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 7ff4ee4..d99aba3 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink;
 
+import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.beam.runners.core.construction.resources.PipelineResources;
@@ -123,6 +124,16 @@
    */
   private static void prepareFilesToStageForRemoteClusterExecution(FlinkPipelineOptions options) {
     if (!options.getFlinkMaster().matches("\\[auto\\]|\\[collection\\]|\\[local\\]")) {
+      if (options.getFilesToStage() == null) {
+        options.setFilesToStage(
+            detectClassPathResourcesToStage(FlinkRunner.class.getClassLoader(), options));
+        LOG.info(
+            "PipelineOptions.filesToStage was not specified. "
+                + "Defaulting to files from the classpath: will stage {} files. "
+                + "Enable logging at DEBUG level to see which files will be staged.",
+            options.getFilesToStage().size());
+        LOG.debug("Classpath elements: {}", options.getFilesToStage());
+      }
       options.setFilesToStage(
           PipelineResources.prepareFilesForStaging(
               options.getFilesToStage(),
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 56a169f..7a69fb4 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.runners.flink;
 
-import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
-
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -37,7 +34,6 @@
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.slf4j.Logger;
@@ -64,27 +60,6 @@
   public static FlinkRunner fromOptions(PipelineOptions options) {
     FlinkPipelineOptions flinkOptions =
         PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
-    ArrayList<String> missing = new ArrayList<>();
-
-    if (flinkOptions.getAppName() == null) {
-      missing.add("appName");
-    }
-    if (missing.size() > 0) {
-      throw new IllegalArgumentException(
-          "Missing required values: " + Joiner.on(',').join(missing));
-    }
-
-    if (flinkOptions.getFilesToStage() == null) {
-      flinkOptions.setFilesToStage(
-          detectClassPathResourcesToStage(FlinkRunner.class.getClassLoader(), options));
-      LOG.info(
-          "PipelineOptions.filesToStage was not specified. "
-              + "Defaulting to files from the classpath: will stage {} files. "
-              + "Enable logging at DEBUG level to see which files will be staged.",
-          flinkOptions.getFilesToStage().size());
-      LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
-    }
-
     return new FlinkRunner(flinkOptions);
   }
 
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index ea1c583..4ee46ae 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -53,7 +53,6 @@
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -229,7 +228,11 @@
             stageBundleFactory,
             (Timer<?> timer, TimerInternals.TimerData timerData) -> {
               currentTimerKey = timer.getUserKey();
-              timerInternals.setTimer(timerData);
+              if (timer.getClearBit()) {
+                timerInternals.deleteTimer(timerData);
+              } else {
+                timerInternals.setTimer(timerData);
+              }
             },
             windowCoder);
 
@@ -247,25 +250,14 @@
     timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     // Now we fire the timers and process elements generated by timers (which may be timers itself)
-    try (RemoteBundle bundle =
-        stageBundleFactory.getBundle(
-            receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
+    while (timerInternals.hasPendingTimers()) {
+      try (RemoteBundle bundle =
+          stageBundleFactory.getBundle(
+              receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
 
-      PipelineTranslatorUtils.fireEligibleTimers(
-          timerInternals,
-          (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
-            FnDataReceiver<Timer> fnTimerReceiver =
-                bundle.getTimerReceivers().get(transformAndTimerId);
-            Preconditions.checkNotNull(
-                fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
-            try {
-              fnTimerReceiver.accept(timerValue);
-            } catch (Exception e) {
-              throw new RuntimeException(
-                  String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
-            }
-          },
-          currentTimerKey);
+        PipelineTranslatorUtils.fireEligibleTimers(
+            timerInternals, bundle.getTimerReceivers(), currentTimerKey);
+      }
     }
   }
 
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
index 5a04f7e..a008ba7 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
@@ -66,6 +66,7 @@
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.OutputTag;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
@@ -242,6 +243,7 @@
      * <p>This test verifies that watermarks are correctly forwarded.
      */
     @Test(timeout = 30_000)
+    @Ignore("https://issues.apache.org/jira/browse/BEAM-9164")
     public void testWatermarkEmission() throws Exception {
       final int numElements = 500;
       PipelineOptions options = PipelineOptionsFactory.create();
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
index 936b972..a35a007 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
@@ -38,6 +38,7 @@
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.state.StateSpec;
@@ -197,6 +198,10 @@
     options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString());
     options.setFilesToStage(new ArrayList<>());
     options.setGcsUtil(mockGcsUtil);
+
+    // Enable the FileSystems API to know about gs:// URIs in this test.
+    FileSystems.setDefaultPipelineOptions(options);
+
     return options;
   }
 }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java
index 43e49dd..473a181 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java
@@ -33,6 +33,8 @@
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable;
+import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterator;
 import org.apache.beam.sdk.util.common.Reiterable;
 import org.apache.beam.sdk.util.common.Reiterator;
 import org.apache.beam.sdk.values.KV;
@@ -154,7 +156,8 @@
    * {@link Reiterable} representing a view of all elements in a base {@link Reiterator} that are in
    * a given window.
    */
-  private static class WindowReiterable<V> implements Reiterable<V> {
+  private static class WindowReiterable<V>
+      extends ElementByteSizeObservableIterable<V, WindowReiterator<V>> implements Reiterable<V> {
     private PeekingReiterator<WindowedValue<V>> baseIterator;
     private BoundedWindow window;
 
@@ -165,12 +168,17 @@
     }
 
     @Override
-    public Reiterator<V> iterator() {
+    public WindowReiterator<V> iterator() {
+      return createIterator();
+    }
+
+    @Override
+    protected WindowReiterator<V> createIterator() {
       // We don't copy the baseIterator when creating the first WindowReiterator
       // so that the WindowReiterator can advance the baseIterator.  We have to
       // make a copy afterwards so that future calls to iterator() will start
       // at the right spot.
-      Reiterator<V> result = new WindowReiterator<V>(baseIterator, window);
+      WindowReiterator<V> result = new WindowReiterator<V>(baseIterator, window);
       baseIterator = baseIterator.copy();
       return result;
     }
@@ -184,7 +192,8 @@
   /**
    * The {@link Reiterator} used by {@link BatchGroupAlsoByWindowViaIteratorsFn.WindowReiterable}.
    */
-  private static class WindowReiterator<V> implements Reiterator<V> {
+  private static class WindowReiterator<V> extends ElementByteSizeObservableIterator<V>
+      implements Reiterator<V> {
     private PeekingReiterator<WindowedValue<V>> iterator;
     private BoundedWindow window;
 
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index c624e5c..1fa4ef0 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -22,7 +22,8 @@
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.function.BiConsumer;
+import java.util.Locale;
+import java.util.Map;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.runners.core.InMemoryTimerInternals;
@@ -36,6 +37,7 @@
 import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory;
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -109,7 +111,7 @@
    */
   public static void fireEligibleTimers(
       InMemoryTimerInternals timerInternals,
-      BiConsumer<KV<String, String>, Timer<?>> timerConsumer,
+      Map<KV<String, String>, FnDataReceiver<Timer>> timerReceivers,
       Object currentTimerKey) {
 
     boolean hasFired;
@@ -119,22 +121,22 @@
 
       while ((timer = timerInternals.removeNextEventTimer()) != null) {
         hasFired = true;
-        fireTimer(timer, timerConsumer, currentTimerKey);
+        fireTimer(timer, timerReceivers, currentTimerKey);
       }
       while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
         hasFired = true;
-        fireTimer(timer, timerConsumer, currentTimerKey);
+        fireTimer(timer, timerReceivers, currentTimerKey);
       }
       while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
         hasFired = true;
-        fireTimer(timer, timerConsumer, currentTimerKey);
+        fireTimer(timer, timerReceivers, currentTimerKey);
       }
     } while (hasFired);
   }
 
   private static void fireTimer(
       TimerInternals.TimerData timer,
-      BiConsumer<KV<String, String>, Timer<?>> timerConsumer,
+      Map<KV<String, String>, FnDataReceiver<Timer>> timerReceivers,
       Object currentTimerKey) {
     StateNamespace namespace = timer.getNamespace();
     Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
@@ -149,7 +151,16 @@
             timestamp,
             outputTimestamp,
             PaneInfo.NO_FIRING);
-    timerConsumer.accept(
-        TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerId()), timerValue);
+    KV<String, String> transformAndTimerId =
+        TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerId());
+    FnDataReceiver<Timer> fnTimerReceiver = timerReceivers.get(transformAndTimerId);
+    Preconditions.checkNotNull(
+        fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
+    try {
+      fnTimerReceiver.accept(timerValue);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
+    }
   }
 }
diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle
index 2325e2b..ddb999f 100644
--- a/runners/spark/build.gradle
+++ b/runners/spark/build.gradle
@@ -137,6 +137,7 @@
     includeCategories 'org.apache.beam.runners.spark.UsesCheckpointRecovery'
     excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
     excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
+    excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
     // Unbounded
     excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
     excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
@@ -206,6 +207,7 @@
     excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
     excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
     excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
+    excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
     // Metrics
     excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
     excludeCategories 'org.apache.beam.sdk.testing.UsesSystemMetrics'
diff --git a/runners/spark/job-server/build.gradle b/runners/spark/job-server/build.gradle
index 478e9f9..0c037d9 100644
--- a/runners/spark/job-server/build.gradle
+++ b/runners/spark/job-server/build.gradle
@@ -107,6 +107,7 @@
       excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
       excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
       excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
+      excludeCategories 'org.apache.beam.sdk.testing.UsesKeyInParDo'
       excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
       //SplitableDoFnTests
       excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
index 954ccc5..233e095 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
@@ -59,8 +59,6 @@
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
@@ -128,66 +126,7 @@
         StateRequestHandler stateRequestHandler =
             getStateRequestHandler(
                 executableStage, stageBundleFactory.getProcessBundleDescriptor());
-        if (executableStage.getTimers().size() > 0) {
-          // Used with Batch, we know that all the data is available for this key. We can't use the
-          // timer manager from the context because it doesn't exist. So we create one and advance
-          // time to the end after processing all elements.
-          final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
-          timerInternals.advanceProcessingTime(Instant.now());
-          timerInternals.advanceSynchronizedProcessingTime(Instant.now());
-
-          ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap);
-
-          TimerReceiverFactory timerReceiverFactory =
-              new TimerReceiverFactory(
-                  stageBundleFactory,
-                  (Timer<?> timer, TimerInternals.TimerData timerData) -> {
-                    currentTimerKey = timer.getUserKey();
-                    timerInternals.setTimer(timerData);
-                  },
-                  windowCoder);
-
-          // Process inputs.
-          processElements(
-              executableStage,
-              stateRequestHandler,
-              receiverFactory,
-              timerReceiverFactory,
-              stageBundleFactory,
-              inputs);
-
-          // Finish any pending windows by advancing the input watermark to infinity.
-          timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
-          // Finally, advance the processing time to infinity to fire any timers.
-          timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-          timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-          // Now we fire the timers and process elements generated by timers (which may be timers
-          // itself)
-          try (RemoteBundle bundle =
-              stageBundleFactory.getBundle(
-                  receiverFactory,
-                  timerReceiverFactory,
-                  stateRequestHandler,
-                  getBundleProgressHandler())) {
-
-            PipelineTranslatorUtils.fireEligibleTimers(
-                timerInternals,
-                (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
-                  FnDataReceiver<Timer> fnTimerReceiver =
-                      bundle.getTimerReceivers().get(transformAndTimerId);
-                  Preconditions.checkNotNull(
-                      fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
-                  try {
-                    fnTimerReceiver.accept(timerValue);
-                  } catch (Exception e) {
-                    throw new RuntimeException(
-                        String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
-                  }
-                },
-                currentTimerKey);
-          }
-        } else {
+        if (executableStage.getTimers().size() == 0) {
           ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap);
           processElements(
               executableStage,
@@ -196,6 +135,58 @@
               null,
               stageBundleFactory,
               inputs);
+          return collector.iterator();
+        }
+        // Used with Batch, we know that all the data is available for this key. We can't use the
+        // timer manager from the context because it doesn't exist. So we create one and advance
+        // time to the end after processing all elements.
+        final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+        timerInternals.advanceProcessingTime(Instant.now());
+        timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+
+        ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap);
+
+        TimerReceiverFactory timerReceiverFactory =
+            new TimerReceiverFactory(
+                stageBundleFactory,
+                (Timer<?> timer, TimerInternals.TimerData timerData) -> {
+                  currentTimerKey = timer.getUserKey();
+                  if (timer.getClearBit()) {
+                    timerInternals.deleteTimer(timerData);
+                  } else {
+                    timerInternals.setTimer(timerData);
+                  }
+                },
+                windowCoder);
+
+        // Process inputs.
+        processElements(
+            executableStage,
+            stateRequestHandler,
+            receiverFactory,
+            timerReceiverFactory,
+            stageBundleFactory,
+            inputs);
+
+        // Finish any pending windows by advancing the input watermark to infinity.
+        timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        // Finally, advance the processing time to infinity to fire any timers.
+        timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+        // Now we fire the timers and process elements generated by timers (which may be timers
+        // itself)
+        while (timerInternals.hasPendingTimers()) {
+          try (RemoteBundle bundle =
+              stageBundleFactory.getBundle(
+                  receiverFactory,
+                  timerReceiverFactory,
+                  stateRequestHandler,
+                  getBundleProgressHandler())) {
+
+            PipelineTranslatorUtils.fireEligibleTimers(
+                timerInternals, bundle.getTimerReceivers(), currentTimerKey);
+          }
         }
         return collector.iterator();
       }
diff --git a/sdks/go/examples/stringsplit/offsetrange/offsetrange.go b/sdks/go/examples/stringsplit/offsetrange/offsetrange.go
index f075a9e..71ca193 100644
--- a/sdks/go/examples/stringsplit/offsetrange/offsetrange.go
+++ b/sdks/go/examples/stringsplit/offsetrange/offsetrange.go
@@ -88,22 +88,25 @@
 	return tracker.Err
 }
 
-// TrySplit splits at the nearest integer greater than the given fraction of the remainder.
-func (tracker *Tracker) TrySplit(fraction float64) (interface{}, error) {
+// TrySplit splits at the nearest integer greater than the given fraction of the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *Tracker) TrySplit(fraction float64) (primary, residual interface{}, err error) {
 	if tracker.Stopped || tracker.IsDone() {
-		return nil, nil
+		return tracker.Rest, nil, nil
 	}
-	if fraction < 0 || fraction > 1 {
-		return nil, errors.New("fraction must be in range [0, 1]")
+	if fraction < 0 {
+		fraction = 0
+	} else if fraction > 1 {
+		fraction = 1
 	}
 
-	splitPt := tracker.Rest.Start + int64(fraction*float64(tracker.Rest.End-tracker.Rest.Start))
-	if splitPt == tracker.Rest.End {
-		return nil, nil
+	splitPt := tracker.Claimed + int64(fraction*float64(tracker.Rest.End-tracker.Claimed))
+	if splitPt >= tracker.Rest.End {
+		return tracker.Rest, nil, nil
 	}
-	residual := Restriction{splitPt, tracker.Rest.End}
+	residual = Restriction{splitPt, tracker.Rest.End}
 	tracker.Rest.End = splitPt
-	return residual, nil
+	return tracker.Rest, residual, nil
 }
 
 // GetProgress reports progress based on the claimed size and unclaimed sizes of the restriction.
diff --git a/sdks/go/examples/stringsplit/stringsplit.go b/sdks/go/examples/stringsplit/stringsplit.go
index a75707a..8092ca1 100644
--- a/sdks/go/examples/stringsplit/stringsplit.go
+++ b/sdks/go/examples/stringsplit/stringsplit.go
@@ -112,7 +112,7 @@
 //
 // Example: If BufSize is 100, then a restriction of 75 to 325 should emit the
 // following substrings: [100, 200], [200, 300], [300, 400]
-func (fn *StringSplitFn) ProcessElement(rt *offsetrange.Tracker, elem string, emit func(string)) error {
+func (fn *StringSplitFn) ProcessElement(rt *offsetrange.Tracker, elem string, emit func(string)) {
 	log.Debugf(context.Background(), "StringSplit ProcessElement: Tracker = %v", rt)
 	i := rt.Rest.Start
 	if rem := i % fn.BufSize; rem != 0 {
@@ -128,8 +128,6 @@
 		}
 		i += fn.BufSize
 	}
-	// TODO(BEAM-9799): Remove this check once error checking is automatic.
-	return rt.GetError()
 }
 
 // LogFn is a DoFn to log our split output.
diff --git a/sdks/go/pkg/beam/core/graph/fn_test.go b/sdks/go/pkg/beam/core/graph/fn_test.go
index 612fce7..e530512 100644
--- a/sdks/go/pkg/beam/core/graph/fn_test.go
+++ b/sdks/go/pkg/beam/core/graph/fn_test.go
@@ -569,8 +569,8 @@
 func (rt *RTrackerT) GetError() error {
 	return nil
 }
-func (rt *RTrackerT) TrySplit(fraction float64) (interface{}, error) {
-	return nil, nil
+func (rt *RTrackerT) TrySplit(fraction float64) (interface{}, interface{}, error) {
+	return nil, nil, nil
 }
 func (rt *RTrackerT) GetProgress() (float64, float64) {
 	return 0, 0
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index 8e4707a..9158972 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -138,6 +138,9 @@
 		if err != nil {
 			return n.fail(err)
 		}
+		if mainIn.RTracker != nil && !mainIn.RTracker.IsDone() {
+			return rtErrHelper(mainIn.RTracker.GetError())
+		}
 
 		// Forward direct output, if any. It is always a main output.
 		if val != nil {
@@ -152,6 +155,9 @@
 			if err != nil {
 				return n.fail(err)
 			}
+			if mainIn.RTracker != nil && !mainIn.RTracker.IsDone() {
+				return rtErrHelper(mainIn.RTracker.GetError())
+			}
 
 			// Forward direct output, if any. It is always a main output.
 			if val != nil {
@@ -162,6 +168,13 @@
 	return nil
 }
 
+func rtErrHelper(err error) error {
+	if err != nil {
+		return err
+	}
+	return errors.New("DoFn terminated without fully processing restriction")
+}
+
 // mustExplodeWindows returns true iif we need to call the function
 // for each window. It is needed if the function either observes the
 // window, either directly or indirectly via (windowed) side inputs.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/sdf_invokers_test.go b/sdks/go/pkg/beam/core/runtime/exec/sdf_invokers_test.go
index 6eedee9..fdc443e 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/sdf_invokers_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/sdf_invokers_test.go
@@ -220,11 +220,13 @@
 	Val  int
 }
 
-func (rt *RTracker) TryClaim(interface{}) bool                      { return false }
-func (rt *RTracker) GetError() error                                { return nil }
-func (rt *RTracker) TrySplit(fraction float64) (interface{}, error) { return nil, nil }
-func (rt *RTracker) GetProgress() (float64, float64)                { return 0, 0 }
-func (rt *RTracker) IsDone() bool                                   { return false }
+func (rt *RTracker) TryClaim(interface{}) bool       { return false }
+func (rt *RTracker) GetError() error                 { return nil }
+func (rt *RTracker) GetProgress() (float64, float64) { return 0, 0 }
+func (rt *RTracker) IsDone() bool                    { return true }
+func (rt *RTracker) TrySplit(fraction float64) (interface{}, interface{}, error) {
+	return nil, nil, nil
+}
 
 // In order to test that these methods get called properly, each one has an
 // implementation that lets us confirm that each argument was passed properly.
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
index 8f1cf47..f614f72 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
@@ -182,11 +182,13 @@
 type testRT struct {
 }
 
-func (rt *testRT) TryClaim(interface{}) bool                      { return false }
-func (rt *testRT) GetError() error                                { return nil }
-func (rt *testRT) TrySplit(fraction float64) (interface{}, error) { return nil, nil }
-func (rt *testRT) GetProgress() (float64, float64)                { return 0, 0 }
-func (rt *testRT) IsDone() bool                                   { return false }
+func (rt *testRT) TryClaim(interface{}) bool       { return false }
+func (rt *testRT) GetError() error                 { return nil }
+func (rt *testRT) GetProgress() (float64, float64) { return 0, 0 }
+func (rt *testRT) IsDone() bool                    { return true }
+func (rt *testRT) TrySplit(fraction float64) (interface{}, interface{}, error) {
+	return nil, nil, nil
+}
 
 // splitPickFn is used for the SDF test, and just needs to fulfill SDF method
 // signatures.
diff --git a/sdks/go/pkg/beam/core/sdf/sdf.go b/sdks/go/pkg/beam/core/sdf/sdf.go
index 4843615..7162d7a 100644
--- a/sdks/go/pkg/beam/core/sdf/sdf.go
+++ b/sdks/go/pkg/beam/core/sdf/sdf.go
@@ -13,55 +13,62 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package sdf is experimental, incomplete, and not yet meant for general usage.
+// Package contains interfaces used specifically for splittable DoFns.
+//
+// Warning: Splittable DoFns are still experimental, largely untested, and
+// likely to have bugs.
 package sdf
 
 // RTracker is an interface used to interact with restrictions while processing elements in
-// SplittableDoFns. Each implementation of RTracker is expected to be used for tracking a single
-// restriction type, which is the type that should be used to create the RTracker, and output by
-// TrySplit.
+// splittable DoFns (specifically, in the ProcessElement method). Each RTracker tracks the progress
+// of a single restriction.
+//
+// All RTracker methods should be thread-safe for dynamic splits to function correctly.
 type RTracker interface {
-	// TryClaim attempts to claim the block of work in the current restriction located at a given
-	// position. This method must be used in the ProcessElement method of Splittable DoFns to claim
-	// work before performing it. If no work is claimed, the ProcessElement is not allowed to perform
-	// work or emit outputs. If the claim is successful, the DoFn must process the entire block. If
-	// the claim is unsuccessful the ProcessElement method of the DoFn must return without performing
+	// TryClaim attempts to claim the block of work located in the given position of the
+	// restriction. This method must be called in ProcessElement to claim work before it can be
+	// processed. Processing work without claiming it first can lead to incorrect output.
+	//
+	// The position type is up to individual implementations, and will usually be related to the
+	// kind of restriction used. For example, a simple restriction representing a numeric range
+	// might use an int64. A more complex restriction, such as one representing a multidimensional
+	// space, might use a more complex type.
+	//
+	// If the claim is successful, the DoFn must process the entire block. If the claim is
+	// unsuccessful ProcessElement method of the DoFn must return without performing
 	// any additional work or emitting any outputs.
 	//
-	// TryClaim accepts an arbitrary value that can be interpreted as the position of a block, and
-	// returns a boolean indicating whether the claim succeeded.
-	//
-	// If the claim fails due to an error, that error can be retrieved with GetError.
-	//
-	// For SDFs to work properly, claims must always be monotonically increasing in reference to the
-	// restriction's start and end points, and every block of work in a restriction must be claimed.
+	// If the claim fails due to an error, that error is stored and will be automatically emitted
+	// when the RTracker is validated, or can be manually retrieved with GetError.
 	//
 	// This pseudocode example illustrates the typical usage of TryClaim:
 	//
-	// 	pos = position of first block after restriction.start
+	// 	pos = position of first block within the restriction
 	// 	for TryClaim(pos) == true {
 	// 		// Do all work in the claimed block and emit outputs.
-	// 		pos = position of next block
+	// 		pos = position of next block within the restriction
 	// 	}
 	// 	return
 	TryClaim(pos interface{}) (ok bool)
 
-	// GetError returns the error that made this RTracker stop executing, and it returns nil if no
-	// error occurred. If IsDone fails while validating this RTracker, this method will be
-	// called to log the error.
+	// GetError returns the error that made this RTracker stop executing, and returns nil if no
+	// error occurred. This is the error that is emitted if automated validation fails.
 	GetError() error
 
-	// TrySplit splits the current restriction into a primary and residual based on a fraction of the
-	// work remaining. The split is performed along the first valid split point located after the
-	// given fraction of the remainder. This method is called by the SDK harness when receiving a
-	// split request by the runner.
+	// TrySplit splits the current restriction into a primary (currently executing work) and
+	// residual (work to be split off) based on a fraction of work remaining. The split is performed
+	// at the first valid split point located after the given fraction of remaining work.
 	//
-	// The current restriction is split into two by modifying the current restriction's endpoint to
-	// turn it into the primary, and returning a new restriction tracker representing the residual.
-	// If no valid split point exists, this method returns nil instead of a residual, but does not
-	// return an error. If this method is unable to split due to some error then it returns nil and
-	// an error.
-	TrySplit(fraction float64) (residual interface{}, err error)
+	// For example, a fraction of 0.5 means to split at the halfway point of remaining work only. If
+	// 50% of work is done and 50% remaining, then a fraction of 0.5 would split after 75% of work.
+	//
+	// This method modifies the underlying restriction in the RTracker to reflect the primary. It
+	// then returns a copy of the newly modified restriction as a primary, and returns a new
+	// restriction for the residual. If the split would produce an empty residual (i.e. the only
+	// split point is the end of the restriction), then the returned residual is nil.
+	//
+	// If an error is returned, some catastrophic failure occurred and the entire bundle will fail.
+	TrySplit(fraction float64) (primary, residual interface{}, err error)
 
 	// GetProgress returns two abstract scalars representing the amount of done and remaining work.
 	// These values have no specific units, but are used to estimate work in relation to each other
@@ -69,7 +76,8 @@
 	GetProgress() (done float64, remaining float64)
 
 	// IsDone returns a boolean indicating whether all blocks inside the restriction have been
-	// claimed. This method is called by the SDK Harness to validate that a Splittable DoFn has
-	// correctly processed all work in a restriction before finishing.
+	// claimed. This method is called by the SDK Harness to validate that a splittable DoFn has
+	// correctly processed all work in a restriction before finishing. If this method returns false
+	// then GetError is expected to return a non-nil error.
 	IsDone() bool
 }
diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go
index 2645fa3..d53e270 100644
--- a/sdks/go/pkg/beam/pardo.go
+++ b/sdks/go/pkg/beam/pardo.go
@@ -222,6 +222,87 @@
 // DoFn instance via output PCollections, in the absence of external
 // communication mechanisms written by user code.
 //
+// Splittable DoFns (Experimental)
+//
+// Warning: Splittable DoFns are still experimental, largely untested, and
+// likely to have bugs.
+//
+// Splittable DoFns are DoFns that are able to split work within an element,
+// as opposed to only at element boundaries like normal DoFns. This is useful
+// for DoFns that emit many outputs per input element and can distribute that
+// work among multiple workers. The most common examples of this are sources.
+//
+// In order to split work within an element, splittable DoFns use the concept of
+// restrictions, which are objects that are associated with an element and
+// describe a portion of work on that element. For example, a restriction
+// associated with a filename might describe what byte range within that file to
+// process. In addition to restrictions, splittable DoFns also rely on
+// restriction trackers to track progress and perform splits on a restriction
+// currently being processed. See the `RTracker` interface in core/sdf/sdf.go
+// for more details.
+//
+// Splitting
+//
+// Splitting means taking one restriction and splitting into two or more that
+// cover the entire input space of the original one. In other words, processing
+// all the split restrictions should produce identical output to processing
+// the original one.
+//
+// Splitting occurs in two stages. The initial splitting occurs before any
+// restrictions have started processing. This step is used to split large
+// restrictions into smaller ones that can then be distributed among multiple
+// workers for processing. Initial splitting is user-defined and optional.
+//
+// Dynamic splitting occurs during the processing of a restriction in runners
+// that have implemented it. If there are available workers, runners may split
+// the unprocessed portion of work from a busy worker and shard it to available
+// workers in order to better distribute work. With unsplittable DoFns this can
+// only occur on element boundaries, but for splittable DoFns this split
+// can land within a restriction and will require splitting that restriction.
+//
+// * Note: The Go SDK currently does not support dynamic splitting for SDFs,
+//   only initial splitting. Only initially split restrictions can be
+//   distributed by liquid sharding. Stragglers will not be split during
+//   execution with dynamic splitting.
+//
+// Splittable DoFn Methods
+//
+// Making a splittable DoFn requires the following methods to be implemented on
+// a DoFn in addition to the usual DoFn requirements. In the following
+// method signatures `elem` represents the main input elements to the DoFn, and
+// should match the types used in ProcessElement. `restriction` represents the
+// user-defined restriction, and can be any type as long as it is consistent
+// throughout all the splittable DoFn methods:
+//
+// * `CreateInitialRestriction(element) restriction`
+//     CreateInitialRestriction creates an initial restriction encompassing an
+//     entire element. The restriction created stays associated with the element
+//     it describes.
+// * `SplitRestriction(elem, restriction) []restriction`
+//     SplitRestriction takes an element and its initial restriction, and
+//     optionally performs an initial split on it, returning a slice of all the
+//     split restrictions. If no splits are desired, the method returns a slice
+//     containing only the original restriction. This method will always be
+//     called on each newly created restriction before they are processed.
+// * `RestrictionSize(elem, restriction) float64`
+//     RestrictionSize returns a cheap size estimation for a restriction. This
+//     size is an abstract scalar value that represents how much work a
+//     restriction takes compared to other restrictions in the same DoFn. For
+//     example, a size of 200 represents twice as much work as a size of
+//     100, but the numbers do not represent anything on their own. Size is
+//     used by runners to estimate work for liquid sharding.
+// * `CreateTracker(restriction) restrictionTracker`
+//     CreateTracker creates and returns a restriction tracker (a concrete type
+//     implementing the `sdf.RTracker` interface) given a restriction. The
+//     restriction tracker is used to track progress processing a restriction,
+//     and to allow for dynamic splits. This method is called on each
+//     restriction right before processing begins.
+// * `ProcessElement(sdf.RTracker, element, func emit(output))`
+//     For splittable DoFns, ProcessElement requires a restriction tracker
+//     before inputs, and generally requires emits to be used for outputs, since
+//     restrictions will generally produce multiple outputs. For more details
+//     on processing restrictions in a splittable DoFn, see `sdf.RTracker`.
+//
 // Fault Tolerance
 //
 // In a distributed system, things can fail: machines can crash, machines can
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
index bb7bc53..456d7bb 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
@@ -43,7 +43,7 @@
 		return "", nil
 	}
 
-  return StageViaLegacyApi(ctx, cc, binary, st)
+	return StageViaLegacyApi(ctx, cc, binary, st)
 }
 
 func StageViaPortableApi(ctx context.Context, cc *grpc.ClientConn, binary, st string) error {
diff --git a/sdks/java/container/build.gradle b/sdks/java/container/build.gradle
index 7f579e6..e352780 100644
--- a/sdks/java/container/build.gradle
+++ b/sdks/java/container/build.gradle
@@ -107,8 +107,8 @@
   dockerPrepare.dependsOn pullLicenses
 } else {
   task createFile(type: Exec) {
-    commandLine 'mkdir', '-p', 'build/target/third_party_licenses'
-    commandLine 'touch', 'build/target/third_party_licenses/skip'
+      executable "sh"
+      args "-c", "mkdir -p build/target/third_party_licenses && touch build/target/third_party_licenses/skip"
   }
   dockerPrepare.dependsOn createFile
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index f84d1c3..59850d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -145,9 +145,6 @@
 
   /** Constructs a pipeline from the provided {@link PipelineOptions}. */
   public static Pipeline create(PipelineOptions options) {
-    // TODO: fix runners that mutate PipelineOptions in this method, then remove this line
-    PipelineRunner.fromOptions(options);
-
     Pipeline pipeline = new Pipeline(options);
     LOG.debug("Creating {}", pipeline);
     return pipeline;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesKeyInParDo.java
similarity index 80%
rename from sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesKey.java
rename to sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesKeyInParDo.java
index c25d25a..b5dbd1b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesKeyInParDo.java
@@ -19,10 +19,12 @@
 
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.transforms.DoFn.OnTimer;
+import org.apache.beam.sdk.transforms.DoFn.OnWindowExpiration;
 
 /**
- * Category tag for validation tests which use key. Tests tagged with {@link UsesKey} should be run
- * for runners which support key parameter in {@link OnTimer}.
+ * Category tag for validation tests which use key. Tests tagged with {@link UsesKeyInParDo} should
+ * be run for runners which support key parameter in {@link OnTimer} and {@link OnWindowExpiration},
+ * .
  */
 @Internal
-public interface UsesKey {}
+public interface UsesKeyInParDo {}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java
new file mode 100644
index 0000000..fc836bc
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicImpulse.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A {@link PTransform} which produces a sequence of elements at fixed runtime intervals.
+ *
+ * <p>If applyWindowing() is specified, each element will be assigned to its own fixed window.
+ *
+ * <p>See {@link PeriodicSequence}.
+ */
+public class PeriodicImpulse extends PTransform<PBegin, PCollection<Instant>> {
+
+  Instant startTimestamp = Instant.now();
+  Instant stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
+  Duration fireInterval = Duration.standardMinutes(1);
+  boolean applyWindowing = false;
+
+  private PeriodicImpulse() {}
+
+  public static PeriodicImpulse create() {
+    return new PeriodicImpulse();
+  }
+
+  public PeriodicImpulse startAt(Instant startTime) {
+    this.startTimestamp = startTime;
+    return this;
+  }
+
+  public PeriodicImpulse stopAt(Instant stopTime) {
+    this.stopTimestamp = stopTime;
+    return this;
+  }
+
+  public PeriodicImpulse withInterval(Duration interval) {
+    this.fireInterval = interval;
+    return this;
+  }
+
+  public PeriodicImpulse applyWindowing() {
+    this.applyWindowing = true;
+    return this;
+  }
+
+  @Override
+  public PCollection<Instant> expand(PBegin input) {
+    PCollection<Instant> result =
+        input
+            .apply(
+                Create.<PeriodicSequence.SequenceDefinition>of(
+                    new PeriodicSequence.SequenceDefinition(
+                        startTimestamp, stopTimestamp, fireInterval)))
+            .apply(PeriodicSequence.create());
+
+    if (this.applyWindowing) {
+      result =
+          result.apply(
+              Window.<Instant>into(FixedWindows.of(Duration.millis(fireInterval.getMillis()))));
+    }
+
+    return result;
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
new file mode 100644
index 0000000..bcf1b57
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A {@link PTransform} which generates a sequence of timestamped elements at given runtime
+ * intervals.
+ *
+ * <p>Transform assigns each element some timestamp and will only output element when worker clock
+ * reach given timestamp. Transform will not output elements prior to target time. Transform can
+ * output elements at any time after target time.
+ */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+public class PeriodicSequence
+    extends PTransform<PCollection<PeriodicSequence.SequenceDefinition>, PCollection<Instant>> {
+
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class SequenceDefinition {
+    public Instant first;
+    public Instant last;
+    public Long durationMilliSec;
+
+    public SequenceDefinition() {}
+
+    public SequenceDefinition(Instant first, Instant last, Duration duration) {
+      this.first = first;
+      this.last = last;
+      this.durationMilliSec = duration.getMillis();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+
+      if (obj == null || obj.getClass() != this.getClass()) {
+        return false;
+      }
+
+      SequenceDefinition src = (SequenceDefinition) obj;
+      return src.first.equals(this.first)
+          && src.last.equals(this.last)
+          && src.durationMilliSec.equals(this.durationMilliSec);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = Objects.hash(first, last, durationMilliSec);
+      return result;
+    }
+  }
+
+  private PeriodicSequence() {}
+
+  public static PeriodicSequence create() {
+    return new PeriodicSequence();
+  }
+
+  @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+  public static class OutputRangeTracker extends RestrictionTracker<OffsetRange, Long>
+      implements RestrictionTracker.HasProgress {
+    private OffsetRange range;
+    @Nullable private Long lastClaimedOffset = null;
+    @Nullable private Long lastAttemptedOffset = null;
+
+    public OutputRangeTracker(OffsetRange range) {
+      this.range = checkNotNull(range);
+      lastClaimedOffset = this.range.getFrom();
+      lastAttemptedOffset = lastClaimedOffset;
+    }
+
+    @Override
+    public OffsetRange currentRestriction() {
+      return range;
+    }
+
+    @Override
+    public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
+      if (fractionOfRemainder != 0) {
+        return null;
+      }
+      OffsetRange res = new OffsetRange(lastClaimedOffset, range.getTo());
+      this.range = new OffsetRange(range.getFrom(), lastClaimedOffset);
+      return SplitResult.of(range, res);
+    }
+
+    @Override
+    public boolean tryClaim(Long i) {
+      checkArgument(
+          i > lastAttemptedOffset,
+          "Trying to claim offset %s while last attempted was %s",
+          i,
+          lastAttemptedOffset);
+      checkArgument(
+          i >= range.getFrom(), "Trying to claim offset %s before start of the range %s", i, range);
+      lastAttemptedOffset = i;
+      if (i > range.getTo()) {
+        return false;
+      }
+      lastClaimedOffset = i;
+      return true;
+    }
+
+    @Override
+    public void checkDone() throws IllegalStateException {
+      checkState(
+          lastAttemptedOffset >= range.getTo() - 1,
+          "Last attempted offset was %s in range %s, claiming work in (%s, %s] was not attempted",
+          lastAttemptedOffset,
+          range,
+          lastAttemptedOffset,
+          range.getTo());
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("range", range)
+          .add("lastClaimedOffset", lastClaimedOffset)
+          .add("lastAttemptedOffset", lastAttemptedOffset)
+          .toString();
+    }
+
+    @Override
+    public Progress getProgress() {
+      double workRemaining = Math.max(range.getTo() - lastAttemptedOffset, 0);
+      return Progress.from(range.getTo() - range.getFrom() - workRemaining, workRemaining);
+    }
+  }
+
+  private static class PeriodicSequenceFn extends DoFn<SequenceDefinition, Instant> {
+    @GetInitialRestriction
+    public OffsetRange getInitialRange(@Element SequenceDefinition element) {
+      return new OffsetRange(
+          element.first.getMillis() - element.durationMilliSec, element.last.getMillis());
+    }
+
+    @NewTracker
+    public RestrictionTracker<OffsetRange, Long> newTracker(@Restriction OffsetRange restriction) {
+      return new OutputRangeTracker(restriction);
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(
+        @Element SequenceDefinition srcElement,
+        OutputReceiver<Instant> out,
+        RestrictionTracker<OffsetRange, Long> restrictionTracker) {
+
+      OffsetRange restriction = restrictionTracker.currentRestriction();
+      Long interval = srcElement.durationMilliSec;
+      Long nextOutput = restriction.getFrom() + interval;
+
+      boolean claimSuccess = true;
+
+      while (claimSuccess && Instant.ofEpochMilli(nextOutput).isBeforeNow()) {
+        claimSuccess = restrictionTracker.tryClaim(nextOutput);
+        if (claimSuccess) {
+          Instant output = Instant.ofEpochMilli(nextOutput);
+          out.outputWithTimestamp(output, output);
+          nextOutput = nextOutput + interval;
+        }
+      }
+
+      ProcessContinuation continuation = ProcessContinuation.stop();
+      if (claimSuccess) {
+        Duration offset = new Duration(Instant.now(), Instant.ofEpochMilli(nextOutput));
+        continuation = ProcessContinuation.resume().withResumeDelay(offset);
+      }
+      return continuation;
+    }
+  }
+
+  @Override
+  public PCollection<Instant> expand(PCollection<SequenceDefinition> input) {
+    return input.apply(ParDo.of(new PeriodicSequenceFn()));
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 2594dee..4b93596 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -59,6 +59,14 @@
   }
 
   /**
+   * Returns a {@link CombineFn} that computes a single and potentially non-uniform sample value of
+   * its inputs.
+   */
+  public static <T> CombineFn<T, ?, T> anyValueCombineFn() {
+    return new AnyValueCombineFn<>();
+  }
+
+  /**
    * {@code Sample#any(long)} takes a {@code PCollection<T>} and a limit, and produces a new {@code
    * PCollection<T>} containing up to limit elements of the input {@code PCollection}.
    *
@@ -246,6 +254,36 @@
     }
   }
 
+  /** A {@link CombineFn} that combines into a single element. */
+  private static class AnyValueCombineFn<T> extends CombineFn<T, List<T>, T> {
+    private SampleAnyCombineFn internal;
+
+    private AnyValueCombineFn() {
+      internal = new SampleAnyCombineFn<>(1);
+    }
+
+    @Override
+    public List<T> createAccumulator() {
+      return internal.createAccumulator();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      return internal.addInput(accumulator, input);
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      return internal.mergeAccumulators(accumulators);
+    }
+
+    @Override
+    public T extractOutput(List<T> accumulator) {
+      Iterator<T> it = internal.extractOutput(accumulator).iterator();
+      return it.hasNext() ? it.next() : null;
+    }
+  }
+
   /**
    * {@code CombineFn} that computes a fixed-size sample of a collection of values.
    *
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableBiConsumer.java
similarity index 65%
copy from sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesKey.java
copy to sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableBiConsumer.java
index c25d25a..43fc1c7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableBiConsumer.java
@@ -15,14 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.testing;
+package org.apache.beam.sdk.transforms;
 
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.transforms.DoFn.OnTimer;
+import java.io.Serializable;
+import java.util.function.BiConsumer;
 
 /**
- * Category tag for validation tests which use key. Tests tagged with {@link UsesKey} should be run
- * for runners which support key parameter in {@link OnTimer}.
+ * A union of the {@link BiConsumer} and {@link Serializable} interfaces.
+ *
+ * @param <FirstInputT> first input value type
+ * @param <SecondInputT> second input value type
  */
-@Internal
-public interface UsesKey {}
+public interface SerializableBiConsumer<FirstInputT, SecondInputT>
+    extends BiConsumer<FirstInputT, SecondInputT>, Serializable {}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 7c9d61d..1560f0f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -121,10 +121,21 @@
   public void testPipelineOptionsImplException() {
     PipelineOptions pipelineOptions = mock(PipelineOptions.class);
 
-    // Check pipeline creation correctly throws exception
+    // Check pipeline run correctly throws exception
     // since it doesn't accept user-implemented PipelineOptions.
     thrown.expect(IllegalArgumentException.class);
-    Pipeline.create(pipelineOptions);
+    Pipeline.create(pipelineOptions).run();
+  }
+
+  @Test
+  public void testPipelineOptionsImplExceptionRunOverride() {
+    PipelineOptions pipelineOptions = mock(PipelineOptions.class);
+
+    // Check pipeline run correctly throws exception
+    // since it doesn't accept user-implemented PipelineOptions.
+    // Same as testPipelineOptionsImplException, but verify we check the options set in run()
+    thrown.expect(IllegalArgumentException.class);
+    Pipeline.create().run(pipelineOptions);
   }
 
   @Test
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d71e0fd..c71e963 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -91,7 +91,7 @@
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
-import org.apache.beam.sdk.testing.UsesKey;
+import org.apache.beam.sdk.testing.UsesKeyInParDo;
 import org.apache.beam.sdk.testing.UsesMapState;
 import org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput;
 import org.apache.beam.sdk.testing.UsesSetState;
@@ -4980,7 +4980,7 @@
     @Category({
       ValidatesRunner.class,
       UsesTimersInParDo.class,
-      UsesKey.class,
+      UsesKeyInParDo.class,
     })
     public void testKeyInOnTimer() throws Exception {
       final String timerId = "foo";
@@ -5012,7 +5012,7 @@
     @Category({
       ValidatesRunner.class,
       UsesTimersInParDo.class,
-      UsesKey.class,
+      UsesKeyInParDo.class,
     })
     public void testKeyInOnTimerWithGenericKey() throws Exception {
       final String timerId = "foo";
@@ -5046,7 +5046,7 @@
     @Category({
       ValidatesRunner.class,
       UsesTimersInParDo.class,
-      UsesKey.class,
+      UsesKeyInParDo.class,
     })
     public void testKeyInOnTimerWithWrongKeyType() throws Exception {
 
@@ -5078,7 +5078,7 @@
     @Category({
       ValidatesRunner.class,
       UsesTimersInParDo.class,
-      UsesKey.class,
+      UsesKeyInParDo.class,
     })
     public void testKeyInOnTimerWithoutKV() throws Exception {
 
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicImpulseTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicImpulseTest.java
new file mode 100644
index 0000000..2e01cd7
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicImpulseTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesImpulse;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for PeriodicImpulse. */
+@RunWith(JUnit4.class)
+public class PeriodicImpulseTest {
+  @Rule public transient TestPipeline p = TestPipeline.create();
+
+  public static class ExtractTsDoFn<InputT> extends DoFn<InputT, KV<InputT, Instant>> {
+    @ProcessElement
+    public void processElement(DoFn<InputT, KV<InputT, Instant>>.ProcessContext c)
+        throws Exception {
+      c.output(KV.of(c.element(), c.timestamp()));
+    }
+  }
+
+  @Test
+  @Category({
+    NeedsRunner.class,
+    UsesImpulse.class,
+    UsesStatefulParDo.class,
+  })
+  public void testOutputsProperElements() {
+    Instant instant = Instant.now();
+
+    Instant startTime = instant.minus(Duration.standardHours(100));
+    long duration = 500;
+    Duration interval = Duration.millis(250);
+    long intervalMillis = interval.getMillis();
+    Instant stopTime = startTime.plus(duration);
+
+    PCollection<KV<Instant, Instant>> result =
+        p.apply(PeriodicImpulse.create().startAt(startTime).stopAt(stopTime).withInterval(interval))
+            .apply(ParDo.of(new ExtractTsDoFn<>()));
+
+    ArrayList<KV<Instant, Instant>> expectedResults =
+        new ArrayList<>((int) (duration / intervalMillis + 1));
+    for (long i = 0; i <= duration; i += intervalMillis) {
+      Instant el = startTime.plus(i);
+      expectedResults.add(KV.of(el, el));
+    }
+
+    PAssert.that(result).containsInAnyOrder(expectedResults);
+
+    p.run().waitUntilFinish();
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
new file mode 100644
index 0000000..cd6bc01
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesImpulse;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for PeriodicSequence. */
+@RunWith(JUnit4.class)
+public class PeriodicSequenceTest {
+  @Rule public transient TestPipeline p = TestPipeline.create();
+
+  public static class ExtractTsDoFn<InputT> extends DoFn<InputT, KV<InputT, Instant>> {
+    @ProcessElement
+    public void processElement(DoFn<InputT, KV<InputT, Instant>>.ProcessContext c)
+        throws Exception {
+      c.output(KV.of(c.element(), c.timestamp()));
+    }
+  }
+
+  @Test
+  @Category({
+    NeedsRunner.class,
+    UsesImpulse.class,
+    UsesStatefulParDo.class,
+  })
+  public void testOutputsProperElements() {
+    Instant instant = Instant.now();
+
+    Instant startTime = instant.minus(Duration.standardHours(100));
+    long duration = 500;
+    Duration interval = Duration.millis(250);
+    long intervalMillis = interval.getMillis();
+    Instant stopTime = startTime.plus(duration);
+
+    PCollection<KV<Instant, Instant>> result =
+        p.apply(
+                Create.<PeriodicSequence.SequenceDefinition>of(
+                    new PeriodicSequence.SequenceDefinition(startTime, stopTime, interval)))
+            .apply(PeriodicSequence.create())
+            .apply(ParDo.of(new ExtractTsDoFn<>())); // used to validate timestamp
+
+    ArrayList<KV<Instant, Instant>> expectedResults =
+        new ArrayList<>((int) (duration / intervalMillis + 1));
+    for (long i = 0; i <= duration; i += intervalMillis) {
+      Instant el = startTime.plus(i);
+      expectedResults.add(KV.of(el, el));
+    }
+
+    PAssert.that(result).containsInAnyOrder(expectedResults);
+
+    p.run().waitUntilFinish();
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
index 7242ad4..d1cdf09 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
@@ -72,6 +72,7 @@
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.ReadableDuration;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -306,6 +307,7 @@
 
   @Test
   @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class})
+  @Ignore("https://issues.apache.org/jira/browse/BEAM-8035")
   public void testMultiplePollsWithManyResults() {
     final long numResults = 3000;
     List<Integer> all = Lists.newArrayList();
diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
index e19754d..0e88313 100644
--- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
+++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
@@ -38,6 +38,7 @@
 import org.apache.beam.runners.core.construction.BeamUrns;
 import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.runners.core.construction.SdkComponents;
@@ -313,8 +314,11 @@
         request.getTransform().getUniqueName(),
         request.getTransform().getSpec().getUrn());
     LOG.debug("Full transform: {}", request.getTransform());
+
+    Pipeline pipeline =
+        Pipeline.create(PipelineOptionsTranslation.fromProto(request.getPipelineOptions()));
+
     Set<String> existingTransformIds = request.getComponents().getTransformsMap().keySet();
-    Pipeline pipeline = Pipeline.create();
     ExperimentalOptions.addExperiment(
         pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api");
     RehydratedComponents rehydratedComponents =
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
index ad99c28..106e609 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
@@ -38,6 +38,7 @@
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.Sample;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
@@ -48,6 +49,7 @@
   public static final Map<String, Function<Schema.FieldType, CombineFn<?, ?, ?>>>
       BUILTIN_AGGREGATOR_FACTORIES =
           ImmutableMap.<String, Function<Schema.FieldType, CombineFn<?, ?, ?>>>builder()
+              .put("ANY_VALUE", typeName -> Sample.anyValueCombineFn())
               .put("COUNT", typeName -> Count.combineFn())
               .put("MAX", BeamBuiltinAggregations::createMax)
               .put("MIN", BeamBuiltinAggregations::createMin)
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index d350062..80964f5 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -27,8 +27,10 @@
 
 import java.math.BigDecimal;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.impl.ParseException;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
@@ -241,6 +243,53 @@
     pipeline.run().waitUntilFinish();
   }
 
+  /** GROUP-BY with the any_value aggregation function. */
+  @Test
+  public void testAnyValueFunction() throws Exception {
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    Schema schema = Schema.builder().addInt32Field("key").addInt32Field("col").build();
+
+    PCollection<Row> inputRows =
+        pipeline
+            .apply(
+                Create.of(
+                    TestUtils.rowsBuilderOf(schema)
+                        .addRows(
+                            0, 1,
+                            0, 2,
+                            1, 3,
+                            2, 4,
+                            2, 5)
+                        .getRows()))
+            .setRowSchema(schema);
+
+    String sql = "SELECT key, any_value(col) as any_value FROM PCOLLECTION GROUP BY key";
+
+    PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));
+
+    Map<Integer, List<Integer>> allowedTuples = new HashMap<>();
+    allowedTuples.put(0, Arrays.asList(1, 2));
+    allowedTuples.put(1, Arrays.asList(3));
+    allowedTuples.put(2, Arrays.asList(4, 5));
+
+    PAssert.that(result)
+        .satisfies(
+            input -> {
+              Iterator<Row> iter = input.iterator();
+              while (iter.hasNext()) {
+                Row row = iter.next();
+                List<Integer> values = allowedTuples.remove(row.getInt32("key"));
+                assertTrue(values != null);
+                assertTrue(values.contains(row.getInt32("any_value")));
+              }
+              assertTrue(allowedTuples.isEmpty());
+              return null;
+            });
+
+    pipeline.run();
+  }
+
   private static class CheckerBigDecimalDivide
       implements SerializableFunction<Iterable<Row>, Void> {
 
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
index 0f31fa6..22b2de9 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlStdOperatorMappingTable.java
@@ -32,6 +32,7 @@
   static final List<FunctionSignatureId> ZETASQL_BUILTIN_FUNCTION_WHITELIST =
       ImmutableList.of(
           FunctionSignatureId.FN_AND,
+          FunctionSignatureId.FN_ANY_VALUE,
           FunctionSignatureId.FN_OR,
           FunctionSignatureId.FN_NOT,
           FunctionSignatureId.FN_MULTIPLY_DOUBLE,
@@ -204,7 +205,7 @@
           .put("min", SqlStdOperatorTable.MIN)
           .put("avg", SqlStdOperatorTable.AVG)
           .put("sum", SqlStdOperatorTable.SUM)
-          // .put("any_value", SqlStdOperatorTable.ANY_VALUE)
+          .put("any_value", SqlStdOperatorTable.ANY_VALUE)
           .put("count", SqlStdOperatorTable.COUNT)
 
           // aggregate UDF
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java
index d222dac..7e00fd0 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java
@@ -44,7 +44,9 @@
   /** This is the case of {@code table [LEFT|INNER] JOIN UNNEST(table.array_field) on join_expr}. */
   @Override
   public boolean canConvert(ResolvedArrayScan zetaNode) {
-    return zetaNode.getInputScan() != null && zetaNode.getJoinExpr() != null;
+    return zetaNode.getArrayExpr() instanceof ResolvedColumnRef
+        && zetaNode.getInputScan() != null
+        && zetaNode.getJoinExpr() != null;
   }
 
   /** Left input is converted from input scan. */
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToOrderByLimitConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToOrderByLimitConverter.java
index 1506882..ab12377 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToOrderByLimitConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToOrderByLimitConverter.java
@@ -81,18 +81,19 @@
 
   /** Collation is a sort order, as in ORDER BY DESCENDING/ASCENDING. */
   private static RelCollation getRelCollation(ResolvedOrderByScan node) {
+    final long inputOffset = node.getColumnList().get(0).getId();
     List<RelFieldCollation> fieldCollations =
         node.getOrderByItemList().stream()
-            .map(LimitOffsetScanToOrderByLimitConverter::orderByItemToFieldCollation)
+            .map(item -> orderByItemToFieldCollation(item, inputOffset))
             .collect(toList());
     return RelCollationImpl.of(fieldCollations);
   }
 
-  private static RelFieldCollation orderByItemToFieldCollation(ResolvedOrderByItem item) {
-    // TODO: might need a column ref mapping here.
+  private static RelFieldCollation orderByItemToFieldCollation(
+      ResolvedOrderByItem item, long inputOffset) {
     Direction sortDirection = item.getIsDescending() ? DESCENDING : ASCENDING;
-    int fieldIndex = (int) item.getColumnRef().getColumn().getId();
-    return new RelFieldCollation(fieldIndex, sortDirection);
+    final long fieldIndex = item.getColumnRef().getColumn().getId() - inputOffset;
+    return new RelFieldCollation((int) fieldIndex, sortDirection);
   }
 
   private RelNode convertOrderByScanToLogicalScan(ResolvedOrderByScan node, RelNode input) {
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
index 26e34ca..d02421b 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
@@ -44,6 +44,7 @@
 import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TIMESTAMP_TABLE_TWO;
 import static org.apache.beam.sdk.extensions.sql.zetasql.TestInput.TIME_TABLE;
 import static org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME;
+import static org.junit.Assert.assertTrue;
 
 import com.google.protobuf.ByteString;
 import com.google.zetasql.SqlException;
@@ -56,6 +57,8 @@
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
@@ -1323,6 +1326,19 @@
   }
 
   @Test
+  public void testZetaSQLSelectFromTableOrderLimit() {
+    String sql =
+        "SELECT x, y FROM (SELECT 1 as x, 0 as y UNION ALL SELECT 0, 0 "
+            + "UNION ALL SELECT 1, 0 UNION ALL SELECT 1, 1) ORDER BY x LIMIT 1";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+    final Schema schema = Schema.builder().addInt64Field("field1").addInt64Field("field2").build();
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(0L, 0L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
   public void testZetaSQLSelectFromTableLimitOffset() {
     String sql =
         "SELECT COUNT(a) FROM (\n"
@@ -1497,6 +1513,36 @@
   }
 
   @Test
+  public void testZetaSQLAnyValueInGroupBy() {
+    String sql =
+        "SELECT rowCol.row_id as key, ANY_VALUE(rowCol.data) as any_value FROM table_with_struct_two GROUP BY rowCol.row_id";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+    Map<Long, List<String>> allowedTuples = new HashMap<>();
+    allowedTuples.put(1L, Arrays.asList("data1"));
+    allowedTuples.put(2L, Arrays.asList("data2"));
+    allowedTuples.put(3L, Arrays.asList("data2", "data3"));
+
+    PAssert.that(stream)
+        .satisfies(
+            input -> {
+              Iterator<Row> iter = input.iterator();
+              while (iter.hasNext()) {
+                Row row = iter.next();
+                List<String> values = allowedTuples.remove(row.getInt64("key"));
+                assertTrue(values != null);
+                assertTrue(values.contains(row.getString("any_value")));
+              }
+              assertTrue(allowedTuples.isEmpty());
+              return null;
+            });
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
   public void testZetaSQLStructFieldAccessInGroupBy2() {
     String sql =
         "SELECT rowCol.data, MAX(rowCol.row_id), MIN(rowCol.row_id) FROM table_with_struct_two"
@@ -3010,6 +3056,44 @@
   }
 
   @Test
+  public void testUnnestJoinStruct() {
+    String sql =
+        "SELECT b, x FROM UNNEST("
+            + "[STRUCT(true AS b, [3, 5] AS arr), STRUCT(false AS b, [7, 9] AS arr)]) t "
+            + "LEFT JOIN UNNEST(t.arr) x ON b";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(UnsupportedOperationException.class);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testUnnestJoinLiteral() {
+    String sql =
+        "SELECT a, b "
+            + "FROM UNNEST([1, 1, 2, 3, 5, 8, 13, NULL]) a "
+            + "JOIN UNNEST([1, 2, 3, 5, 7, 11, 13, NULL]) b "
+            + "ON a = b";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(UnsupportedOperationException.class);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testUnnestJoinSubquery() {
+    String sql =
+        "SELECT a, b "
+            + "FROM UNNEST([1, 2, 3]) a "
+            + "JOIN UNNEST(ARRAY(SELECT b FROM UNNEST([3, 2, 1]) b)) b "
+            + "ON a = b";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(UnsupportedOperationException.class);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
   public void testCaseNoValue() {
     String sql = "SELECT CASE WHEN 1 > 2 THEN 'not possible' ELSE 'seems right' END";
 
diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsAsyncClientProvider.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsAsyncClientProvider.java
new file mode 100644
index 0000000..9b154ce
--- /dev/null
+++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsAsyncClientProvider.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.sns;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.net.URI;
+import javax.annotation.Nullable;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sns.SnsAsyncClient;
+import software.amazon.awssdk.services.sns.SnsAsyncClientBuilder;
+
+/** Basic implementation of {@link SnsAsyncClientProvider} used by default in {@link SnsIO}. */
+class BasicSnsAsyncClientProvider implements SnsAsyncClientProvider {
+  private final AwsCredentialsProvider awsCredentialsProvider;
+  private final String region;
+  @Nullable private final URI serviceEndpoint;
+
+  BasicSnsAsyncClientProvider(
+      AwsCredentialsProvider awsCredentialsProvider, String region, @Nullable URI serviceEndpoint) {
+    checkArgument(awsCredentialsProvider != null, "awsCredentialsProvider can not be null");
+    checkArgument(region != null, "region can not be null");
+    this.awsCredentialsProvider = awsCredentialsProvider;
+    this.region = region;
+    this.serviceEndpoint = serviceEndpoint;
+  }
+
+  @Override
+  public SnsAsyncClient getSnsAsyncClient() {
+    SnsAsyncClientBuilder builder =
+        SnsAsyncClient.builder()
+            .credentialsProvider(awsCredentialsProvider)
+            .region(Region.of(region));
+
+    if (serviceEndpoint != null) {
+      builder.endpointOverride(serviceEndpoint);
+    }
+
+    return builder.build();
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesKey.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsAsyncClientProvider.java
similarity index 64%
copy from sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesKey.java
copy to sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsAsyncClientProvider.java
index c25d25a..372ea73 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesKey.java
+++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsAsyncClientProvider.java
@@ -15,14 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.testing;
+package org.apache.beam.sdk.io.aws2.sns;
 
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.transforms.DoFn.OnTimer;
+import java.io.Serializable;
+import software.amazon.awssdk.services.sns.SnsAsyncClient;
 
 /**
- * Category tag for validation tests which use key. Tests tagged with {@link UsesKey} should be run
- * for runners which support key parameter in {@link OnTimer}.
+ * Provides instances of Asynchronous SNS client.
+ *
+ * <p>Please note, that any instance of {@link SnsAsyncClientProvider} must be {@link Serializable}
+ * to ensure it can be sent to worker machines.
  */
-@Internal
-public interface UsesKey {}
+public interface SnsAsyncClientProvider extends Serializable {
+  SnsAsyncClient getSnsAsyncClient();
+}
diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsClientProvider.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsClientProvider.java
index b1a3af2..2882303 100644
--- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsClientProvider.java
+++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsClientProvider.java
@@ -21,7 +21,7 @@
 import software.amazon.awssdk.services.sns.SnsClient;
 
 /**
- * Provides instances of DynamoDB clients.
+ * Provides instances of SNS client.
  *
  * <p>Please note, that any instance of {@link SnsClientProvider} must be {@link Serializable} to
  * ensure it can be sent to worker machines.
diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
index a4be203..fa45dac 100644
--- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
+++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
@@ -18,15 +18,18 @@
 package org.apache.beam.sdk.io.aws2.sns;
 
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
+import java.util.function.BiConsumer;
 import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -45,6 +48,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.services.sns.SnsAsyncClient;
 import software.amazon.awssdk.services.sns.SnsClient;
 import software.amazon.awssdk.services.sns.model.GetTopicAttributesRequest;
 import software.amazon.awssdk.services.sns.model.GetTopicAttributesResponse;
@@ -55,7 +59,7 @@
 /**
  * {@link PTransform}s for writing to <a href="https://aws.amazon.com/sns/">SNS</a>.
  *
- * <h3>Writing to SNS</h3>
+ * <h3>Writing to SNS Synchronously</h3>
  *
  * <p>Example usage:
  *
@@ -79,15 +83,51 @@
  *   <li>AwsCredentialsProvider, which you can pass on to BasicSnsClientProvider
  *   <li>publishRequestFn, a function to convert your message into PublishRequest
  * </ul>
+ *
+ * <h3>Writing to SNS Asynchronously</h3>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * PCollection<String> data = ...;
+ *
+ * data.apply(SnsIO.<String>writeAsync()
+ * 		.withElementCoder(StringUtf8Coder.of())
+ * 		.withPublishRequestFn(createPublishRequestFn())
+ * 		.withSnsClientProvider(new BasicSnsClientProvider(awsCredentialsProvider, region));
+ *
+ * }</pre>
+ *
+ * <pre>{@code
+ * PCollection<String> data = ...;
+ *
+ * PCollection<SnsResponse<String>> responses = data.apply(SnsIO.<String>writeAsync()
+ *      .withElementCoder(StringUtf8Coder.of())
+ *      .withPublishRequestFn(createPublishRequestFn())
+ *  *   .withSnsClientProvider(new BasicSnsClientProvider(awsCredentialsProvider, region));
+ *
+ * }</pre>
+ *
+ * <p>As a client, you need to provide at least the following things:
+ *
+ * <ul>
+ *   <li>Coder for element T.
+ *   <li>publishRequestFn, a function to convert your message into PublishRequest
+ *   <li>SnsClientProvider, a provider to create an async client.
+ * </ul>
  */
 @Experimental(Kind.SOURCE_SINK)
 public final class SnsIO {
 
-  // Write data tp SNS
+  // Write data to SNS (synchronous)
   public static <T> Write<T> write() {
     return new AutoValue_SnsIO_Write.Builder().build();
   }
 
+  public static <T> WriteAsync<T> writeAsync() {
+    return new AutoValue_SnsIO_WriteAsync.Builder().build();
+  }
+
   /**
    * A POJO encapsulating a configuration for retry behavior when issuing requests to SNS. A retry
    * will be attempted until the maxAttempts or maxDuration is exceeded, whichever comes first, for
@@ -369,4 +409,148 @@
       }
     }
   }
+
+  /** Implementation of {@link #writeAsync}. */
+  @AutoValue
+  public abstract static class WriteAsync<T>
+      extends PTransform<PCollection<T>, PCollection<SnsResponse<T>>> {
+
+    @Nullable
+    abstract SnsAsyncClientProvider getSnsClientProvider();
+
+    /** SerializableFunction to create PublishRequest. */
+    @Nullable
+    abstract SerializableFunction<T, PublishRequest> getPublishRequestFn();
+
+    /** Coder for element T. */
+    @Nullable
+    abstract Coder<T> getCoder();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setSnsClientProvider(SnsAsyncClientProvider asyncClientProvider);
+
+      abstract Builder<T> setCoder(Coder<T> elementCoder);
+
+      abstract Builder<T> setPublishRequestFn(
+          SerializableFunction<T, PublishRequest> publishRequestFn);
+
+      abstract WriteAsync<T> build();
+    }
+
+    /**
+     * Specify a Coder for SNS PublishRequest object.
+     *
+     * @param elementCoder Coder
+     */
+    public WriteAsync<T> withCoder(Coder<T> elementCoder) {
+      checkNotNull(elementCoder, "elementCoder cannot be null");
+      return builder().setCoder(elementCoder).build();
+    }
+
+    /**
+     * Specify a function for converting a message into PublishRequest object.
+     *
+     * @param publishRequestFn publishRequestFn
+     */
+    public WriteAsync<T> withPublishRequestFn(
+        SerializableFunction<T, PublishRequest> publishRequestFn) {
+      checkNotNull(publishRequestFn, "publishRequestFn cannot be null");
+      return builder().setPublishRequestFn(publishRequestFn).build();
+    }
+
+    /**
+     * Allows to specify custom {@link SnsAsyncClientProvider}. {@link SnsAsyncClientProvider}
+     * creates new {@link SnsAsyncClientProvider} which is later used for writing to a SNS topic.
+     */
+    public WriteAsync<T> withSnsClientProvider(SnsAsyncClientProvider asyncClientProvider) {
+      checkNotNull(asyncClientProvider, "asyncClientProvider cannot be null");
+      return builder().setSnsClientProvider(asyncClientProvider).build();
+    }
+
+    /**
+     * Specify credential details and region to be used to write to SNS. If you need more
+     * sophisticated credential protocol, then you should look at {@link
+     * WriteAsync#withSnsClientProvider(SnsAsyncClientProvider)}.
+     */
+    public WriteAsync<T> withSnsClientProvider(
+        AwsCredentialsProvider credentialsProvider, String region) {
+      checkNotNull(credentialsProvider, "credentialsProvider cannot be null");
+      checkNotNull(region, "region cannot be null");
+      return withSnsClientProvider(credentialsProvider, region, null);
+    }
+
+    /**
+     * Specify credential details and region to be used to write to SNS. If you need more
+     * sophisticated credential protocol, then you should look at {@link
+     * WriteAsync#withSnsClientProvider(SnsAsyncClientProvider)}.
+     *
+     * <p>The {@code serviceEndpoint} sets an alternative service host.
+     */
+    public WriteAsync<T> withSnsClientProvider(
+        AwsCredentialsProvider credentialsProvider, String region, URI serviceEndpoint) {
+      checkNotNull(credentialsProvider, "credentialsProvider cannot be null");
+      checkNotNull(region, "region cannot be null");
+      return withSnsClientProvider(
+          new BasicSnsAsyncClientProvider(credentialsProvider, region, serviceEndpoint));
+    }
+
+    @Override
+    public PCollection<SnsResponse<T>> expand(PCollection<T> input) {
+      checkArgument(getSnsClientProvider() != null, "withSnsClientProvider() needs to called");
+      checkArgument(getPublishRequestFn() != null, "withPublishRequestFn() needs to called");
+      checkArgument(getCoder() != null, "withElementCoder() needs to called");
+
+      return input
+          .apply(ParDo.of(new SnsWriteAsyncFn<>(this)))
+          .setCoder(SnsResponseCoder.of(getCoder()));
+    }
+
+    private static class SnsWriteAsyncFn<T> extends DoFn<T, SnsResponse<T>> {
+
+      private static final Logger LOG = LoggerFactory.getLogger(SnsWriteAsyncFn.class);
+
+      private final WriteAsync<T> spec;
+      private transient SnsAsyncClient client;
+
+      SnsWriteAsyncFn(WriteAsync<T> spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() {
+        this.client = spec.getSnsClientProvider().getSnsAsyncClient();
+      }
+
+      @SuppressWarnings("FutureReturnValueIgnored")
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        PublishRequest publishRequest = spec.getPublishRequestFn().apply(context.element());
+        client.publish(publishRequest).whenComplete(getPublishResponse(context));
+      }
+
+      private BiConsumer<? super PublishResponse, ? super Throwable> getPublishResponse(
+          DoFn<T, SnsResponse<T>>.ProcessContext context) {
+        return (response, ex) -> {
+          if (ex == null) {
+            SnsResponse<T> snsResponse = SnsResponse.of(context.element(), response);
+            context.output(snsResponse);
+          } else {
+            LOG.error("Error while publishing request to SNS", ex);
+            throw new SnsWriteException("Error while publishing request to SNS", ex);
+          }
+        };
+      }
+    }
+  }
+
+  /** Exception class for SNS write exceptions. */
+  protected static class SnsWriteException extends RuntimeException {
+
+    SnsWriteException(String message, Throwable error) {
+      super(message, error);
+    }
+  }
 }
diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponse.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponse.java
new file mode 100644
index 0000000..b41541d
--- /dev/null
+++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponse.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.sns;
+
+import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.OptionalInt;
+import software.amazon.awssdk.services.sns.model.PublishResponse;
+
+@AutoValue
+abstract class SnsResponse<T> implements Serializable {
+
+  public abstract T element();
+
+  public abstract OptionalInt statusCode();
+
+  public abstract Optional<String> statusText();
+
+  static <T> SnsResponse<T> create(
+      @NonNull T element, OptionalInt statusCode, Optional<String> statusText) {
+
+    return new AutoValue_SnsResponse<>(element, statusCode, statusText);
+  }
+
+  public static <T> SnsResponse<T> of(@NonNull T element, @Nullable PublishResponse response) {
+
+    final Optional<PublishResponse> publishResponse = Optional.ofNullable(response);
+    OptionalInt statusCode =
+        publishResponse
+            .map(r -> OptionalInt.of(r.sdkHttpResponse().statusCode()))
+            .orElse(OptionalInt.empty());
+
+    Optional<String> statusText = publishResponse.flatMap(r -> r.sdkHttpResponse().statusText());
+
+    return create(element, statusCode, statusText);
+  }
+}
diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoder.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoder.java
new file mode 100644
index 0000000..b627822
--- /dev/null
+++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoder.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.sns;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/** Custom Coder for WrappedSnsResponse. */
+class SnsResponseCoder<T> extends StructuredCoder<SnsResponse<T>> {
+
+  private final Coder<T> elementCoder;
+  private static final VarIntCoder STATUS_CODE_CODER = VarIntCoder.of();
+  private static final StringUtf8Coder STATUS_TEXT_CODER = StringUtf8Coder.of();
+
+  public SnsResponseCoder(Coder<T> elementCoder) {
+    this.elementCoder = elementCoder;
+  }
+
+  static <T> SnsResponseCoder<T> of(Coder<T> elementCoder) {
+    return new SnsResponseCoder<>(elementCoder);
+  }
+
+  @Override
+  public void encode(SnsResponse<T> value, OutputStream outStream) throws IOException {
+    T element = value.element();
+    elementCoder.encode(element, outStream);
+
+    OptionalInt statusCode = value.statusCode();
+    if (statusCode.isPresent()) {
+      BooleanCoder.of().encode(Boolean.TRUE, outStream);
+      STATUS_CODE_CODER.encode(statusCode.getAsInt(), outStream);
+    } else {
+      BooleanCoder.of().encode(Boolean.FALSE, outStream);
+    }
+
+    Optional<String> statusText = value.statusText();
+    if (statusText.isPresent()) {
+      BooleanCoder.of().encode(Boolean.TRUE, outStream);
+      STATUS_TEXT_CODER.encode(statusText.get(), outStream);
+    } else {
+      BooleanCoder.of().encode(Boolean.FALSE, outStream);
+    }
+  }
+
+  @Override
+  public SnsResponse<T> decode(InputStream inStream) throws IOException {
+    T element = elementCoder.decode(inStream);
+
+    OptionalInt statusCode = OptionalInt.empty();
+    if (BooleanCoder.of().decode(inStream)) {
+      statusCode = OptionalInt.of(STATUS_CODE_CODER.decode(inStream));
+    }
+
+    Optional<String> statusText = Optional.empty();
+    if (BooleanCoder.of().decode(inStream)) {
+      statusText = Optional.of(STATUS_TEXT_CODER.decode(inStream));
+    }
+    return SnsResponse.create(element, statusCode, statusText);
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return ImmutableList.of(elementCoder);
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    elementCoder.verifyDeterministic();
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesKey.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncBaseClient.java
similarity index 69%
copy from sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesKey.java
copy to sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncBaseClient.java
index c25d25a..697ae8b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesKey.java
+++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncBaseClient.java
@@ -15,14 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.testing;
+package org.apache.beam.sdk.io.aws2.sns;
 
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.transforms.DoFn.OnTimer;
+import java.io.Serializable;
+import software.amazon.awssdk.services.sns.SnsAsyncClient;
 
-/**
- * Category tag for validation tests which use key. Tests tagged with {@link UsesKey} should be run
- * for runners which support key parameter in {@link OnTimer}.
- */
-@Internal
-public interface UsesKey {}
+class MockSnsAsyncBaseClient implements SnsAsyncClient, Serializable {
+  @Override
+  public String serviceName() {
+    return null;
+  }
+
+  @Override
+  public void close() {}
+}
diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncClient.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncClient.java
new file mode 100644
index 0000000..160a6e5
--- /dev/null
+++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncClient.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.sns;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import software.amazon.awssdk.http.SdkHttpResponse;
+import software.amazon.awssdk.services.sns.model.PublishRequest;
+import software.amazon.awssdk.services.sns.model.PublishResponse;
+
+final class MockSnsAsyncClient extends MockSnsAsyncBaseClient {
+  private final int statusCode;
+
+  private MockSnsAsyncClient(int statusCode) {
+    this.statusCode = statusCode;
+  }
+
+  static MockSnsAsyncClient withStatusCode(int statusCode) {
+    return new MockSnsAsyncClient(statusCode);
+  }
+
+  @Override
+  public CompletableFuture<PublishResponse> publish(PublishRequest publishRequest) {
+    SdkHttpResponse sdkHttpResponse = SdkHttpResponse.builder().statusCode(statusCode).build();
+    PublishResponse.Builder builder = PublishResponse.builder();
+    builder.messageId(UUID.randomUUID().toString());
+    builder.sdkHttpResponse(sdkHttpResponse).build();
+    PublishResponse response = builder.build();
+    return CompletableFuture.completedFuture(response);
+  }
+}
diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncExceptionClient.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncExceptionClient.java
new file mode 100644
index 0000000..d126001
--- /dev/null
+++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncExceptionClient.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.sns;
+
+import java.util.concurrent.CompletableFuture;
+import software.amazon.awssdk.services.sns.model.PublishRequest;
+import software.amazon.awssdk.services.sns.model.PublishResponse;
+
+final class MockSnsAsyncExceptionClient extends MockSnsAsyncBaseClient {
+  private MockSnsAsyncExceptionClient() {}
+
+  static MockSnsAsyncExceptionClient create() {
+    return new MockSnsAsyncExceptionClient();
+  }
+
+  @Override
+  public CompletableFuture<PublishResponse> publish(PublishRequest publishRequest) {
+    CompletableFuture<PublishResponse> completableFuture = new CompletableFuture<>();
+    completableFuture.completeExceptionally(
+        new RuntimeException("Error occurred during publish call"));
+    return completableFuture;
+  }
+}
diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOWriteTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOWriteTest.java
new file mode 100644
index 0000000..8b9f795
--- /dev/null
+++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOWriteTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.sns;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import software.amazon.awssdk.services.sns.model.PublishRequest;
+
+@RunWith(JUnit4.class)
+public class SnsIOWriteTest implements Serializable {
+  private static final String TOPIC = "test";
+  private static final int FAILURE_STATUS_CODE = 400;
+  private static final int SUCCESS_STATUS_CODE = 200;
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void shouldReturnResponseOnPublishSuccess() {
+    String testMessage1 = "test1";
+    String testMessage2 = "test2";
+    String testMessage3 = "test3";
+
+    PCollection<SnsResponse<String>> result =
+        pipeline
+            .apply(
+                Create.of(testMessage1, testMessage2, testMessage3).withCoder(StringUtf8Coder.of()))
+            .apply(
+                SnsIO.<String>writeAsync()
+                    .withCoder(StringUtf8Coder.of())
+                    .withPublishRequestFn(createPublishRequestFn())
+                    .withSnsClientProvider(
+                        () -> MockSnsAsyncClient.withStatusCode(SUCCESS_STATUS_CODE)));
+
+    PAssert.that(result)
+        .satisfies(
+            (responses) -> {
+              ImmutableSet<String> messagesInResponse =
+                  StreamSupport.stream(responses.spliterator(), false)
+                      .filter(response -> response.statusCode().getAsInt() == SUCCESS_STATUS_CODE)
+                      .map(SnsResponse::element)
+                      .collect(ImmutableSet.toImmutableSet());
+
+              Set<String> originalMessages =
+                  Sets.newHashSet(testMessage1, testMessage2, testMessage3);
+              Sets.SetView<String> difference =
+                  Sets.difference(messagesInResponse, originalMessages);
+
+              assertEquals(3, messagesInResponse.size());
+              assertEquals(0, difference.size());
+              return null;
+            });
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void shouldReturnResponseOnPublishFailure() {
+    String testMessage1 = "test1";
+    String testMessage2 = "test2";
+
+    PCollection<SnsResponse<String>> result =
+        pipeline
+            .apply(Create.of(testMessage1, testMessage2).withCoder(StringUtf8Coder.of()))
+            .apply(
+                SnsIO.<String>writeAsync()
+                    .withCoder(StringUtf8Coder.of())
+                    .withPublishRequestFn(createPublishRequestFn())
+                    .withSnsClientProvider(
+                        () -> MockSnsAsyncClient.withStatusCode(FAILURE_STATUS_CODE)));
+
+    PAssert.that(result)
+        .satisfies(
+            (responses) -> {
+              ImmutableSet<String> messagesInResponse =
+                  StreamSupport.stream(responses.spliterator(), false)
+                      .filter(response -> response.statusCode().getAsInt() != SUCCESS_STATUS_CODE)
+                      .map(SnsResponse::element)
+                      .collect(ImmutableSet.toImmutableSet());
+
+              Set<String> originalMessages = Sets.newHashSet(testMessage1, testMessage2);
+              Sets.SetView<String> difference =
+                  Sets.difference(messagesInResponse, originalMessages);
+
+              assertEquals(2, messagesInResponse.size());
+              assertEquals(0, difference.size());
+              return null;
+            });
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  @SuppressWarnings("MissingFail")
+  public void shouldThrowIfThrowErrorOptionSet() {
+    String testMessage1 = "test1";
+
+    pipeline
+        .apply(Create.of(testMessage1))
+        .apply(
+            SnsIO.<String>writeAsync()
+                .withCoder(StringUtf8Coder.of())
+                .withPublishRequestFn(createPublishRequestFn())
+                .withSnsClientProvider(
+                    () -> MockSnsAsyncClient.withStatusCode(FAILURE_STATUS_CODE)));
+    try {
+      pipeline.run().waitUntilFinish();
+    } catch (final Pipeline.PipelineExecutionException e) {
+      assertThrows(IOException.class, () -> e.getCause().getClass());
+    }
+  }
+
+  @Test
+  @SuppressWarnings("MissingFail")
+  public void shouldThrowIfThrowErrorOptionSetOnInternalException() {
+    String testMessage1 = "test1";
+
+    pipeline
+        .apply(Create.of(testMessage1))
+        .apply(
+            SnsIO.<String>writeAsync()
+                .withCoder(StringUtf8Coder.of())
+                .withPublishRequestFn(createPublishRequestFn())
+                .withSnsClientProvider(MockSnsAsyncExceptionClient::create));
+    try {
+      pipeline.run().waitUntilFinish();
+    } catch (final Pipeline.PipelineExecutionException e) {
+      assertThrows(IOException.class, () -> e.getCause().getClass());
+    }
+  }
+
+  private SerializableFunction<String, PublishRequest> createPublishRequestFn() {
+    return (input) -> PublishRequest.builder().topicArn(TOPIC).message(input).build();
+  }
+}
diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoderTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoderTest.java
new file mode 100644
index 0000000..f0d4563
--- /dev/null
+++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoderTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.sns;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SnsResponseCoderTest {
+
+  @Test
+  public void verifyResponseWithStatusCodeAndText() throws IOException {
+
+    SnsResponse<String> expected =
+        SnsResponse.create("test-1", OptionalInt.of(200), Optional.of("OK"));
+
+    SnsResponseCoder<String> coder = SnsResponseCoder.of(StringUtf8Coder.of());
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    coder.encode(expected, output);
+
+    ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
+    SnsResponse<String> actual = coder.decode(in);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void verifyResponseWithStatusAndNoText() throws IOException {
+    SnsResponse<String> expected =
+        SnsResponse.create("test-2", OptionalInt.of(200), Optional.empty());
+
+    SnsResponseCoder<String> coder = SnsResponseCoder.of(StringUtf8Coder.of());
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    coder.encode(expected, output);
+
+    ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
+    SnsResponse<String> actual = coder.decode(in);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void verifyResponseWithNoStatusCodeAndText() throws IOException {
+
+    SnsResponse<String> expected =
+        SnsResponse.create("test-3", OptionalInt.empty(), Optional.empty());
+
+    SnsResponseCoder<String> coder = SnsResponseCoder.of(StringUtf8Coder.of());
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    coder.encode(expected, output);
+
+    ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
+    SnsResponse<String> actual = coder.decode(in);
+
+    Assert.assertEquals(expected, actual);
+  }
+}
diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
index 03d2131..0f91448 100644
--- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
+++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -153,18 +153,32 @@
 
   private static Cluster buildCluster(CassandraEmbeddedServerBuilder builder) {
     int tried = 0;
-    while (tried < 3) {
+    int delay = 5000;
+    Exception exception = null;
+    while (tried < 5) {
       try {
         return builder.buildNativeCluster();
       } catch (NoHostAvailableException e) {
+        if (exception == null) {
+          exception = e;
+        } else {
+          exception.addSuppressed(e);
+        }
         tried++;
         try {
-          Thread.sleep(1000L);
+          Thread.sleep(delay);
         } catch (InterruptedException e1) {
+          Thread thread = Thread.currentThread();
+          thread.interrupt();
+          throw new RuntimeException(String.format("Thread %s was interrupted", thread.getName()));
         }
       }
     }
-    throw new RuntimeException("Unable to create embedded Cassandra cluster");
+    throw new RuntimeException(
+        String.format(
+            "Unable to create embedded Cassandra cluster: tried %d times with %d delay",
+            tried, delay),
+        exception);
   }
 
   @AfterClass
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/SchemaUtils.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/SchemaUtils.java
index ea2a2bf..bc1562e 100644
--- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/SchemaUtils.java
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/SchemaUtils.java
@@ -27,29 +27,30 @@
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils;
 
 /** Utils to convert between HCatalog schema types and Beam schema types. */
 @Experimental(Kind.SCHEMAS)
 class SchemaUtils {
 
-  private static final Map<String, FieldType> PRIMITIVE_SERDE_TYPES_MAP =
-      ImmutableMap.<String, FieldType>builder()
-          .put(serdeConstants.BINARY_TYPE_NAME, FieldType.BYTES)
-          .put(serdeConstants.BOOLEAN_TYPE_NAME, FieldType.BOOLEAN)
-          .put(serdeConstants.TINYINT_TYPE_NAME, FieldType.BYTE)
-          .put(serdeConstants.CHAR_TYPE_NAME, FieldType.STRING)
-          .put(serdeConstants.DATE_TYPE_NAME, FieldType.DATETIME)
-          .put(serdeConstants.DATETIME_TYPE_NAME, FieldType.DATETIME)
-          .put(serdeConstants.DECIMAL_TYPE_NAME, FieldType.DECIMAL)
-          .put(serdeConstants.DOUBLE_TYPE_NAME, FieldType.DOUBLE)
-          .put(serdeConstants.FLOAT_TYPE_NAME, FieldType.FLOAT)
-          .put(serdeConstants.INT_TYPE_NAME, FieldType.INT32)
-          .put(serdeConstants.BIGINT_TYPE_NAME, FieldType.INT64)
-          .put(serdeConstants.SMALLINT_TYPE_NAME, FieldType.INT16)
-          .put(serdeConstants.STRING_TYPE_NAME, FieldType.STRING)
-          .put(serdeConstants.TIMESTAMP_TYPE_NAME, FieldType.DATETIME)
-          .put(serdeConstants.VARCHAR_TYPE_NAME, FieldType.STRING)
+  private static final Map<HCatFieldSchema.Type, FieldType> HCAT_TO_BEAM_TYPES_MAP =
+      ImmutableMap.<HCatFieldSchema.Type, FieldType>builder()
+          .put(HCatFieldSchema.Type.BOOLEAN, FieldType.BOOLEAN)
+          .put(HCatFieldSchema.Type.TINYINT, FieldType.BYTE)
+          .put(HCatFieldSchema.Type.SMALLINT, FieldType.INT16)
+          .put(HCatFieldSchema.Type.INT, FieldType.INT32)
+          .put(HCatFieldSchema.Type.BIGINT, FieldType.INT64)
+          .put(HCatFieldSchema.Type.FLOAT, FieldType.FLOAT)
+          .put(HCatFieldSchema.Type.DOUBLE, FieldType.DOUBLE)
+          .put(HCatFieldSchema.Type.DECIMAL, FieldType.DECIMAL)
+          .put(HCatFieldSchema.Type.STRING, FieldType.STRING)
+          .put(HCatFieldSchema.Type.CHAR, FieldType.STRING)
+          .put(HCatFieldSchema.Type.VARCHAR, FieldType.STRING)
+          .put(HCatFieldSchema.Type.BINARY, FieldType.BYTES)
+          .put(HCatFieldSchema.Type.DATE, FieldType.DATETIME)
+          .put(HCatFieldSchema.Type.TIMESTAMP, FieldType.DATETIME)
           .build();
 
   static Schema toBeamSchema(List<FieldSchema> fields) {
@@ -58,12 +59,35 @@
 
   private static Schema.Field toBeamField(FieldSchema field) {
     String name = field.getName();
-    if (!PRIMITIVE_SERDE_TYPES_MAP.containsKey(field.getType())) {
+    HCatFieldSchema hCatFieldSchema;
+
+    try {
+      hCatFieldSchema = HCatSchemaUtils.getHCatFieldSchema(field);
+    } catch (HCatException e) {
+      // Converting checked Exception to unchecked Exception.
       throw new UnsupportedOperationException(
-          "The type '" + field.getType() + "' of field '" + name + "' is not supported.");
+          "Error while converting FieldSchema to HCatFieldSchema", e);
     }
 
-    FieldType fieldType = PRIMITIVE_SERDE_TYPES_MAP.get(field.getType());
-    return Schema.Field.of(name, fieldType).withNullable(true);
+    switch (hCatFieldSchema.getCategory()) {
+      case PRIMITIVE:
+        {
+          if (!HCAT_TO_BEAM_TYPES_MAP.containsKey(hCatFieldSchema.getType())) {
+            throw new UnsupportedOperationException(
+                "The Primitive HCat type '"
+                    + field.getType()
+                    + "' of field '"
+                    + name
+                    + "' cannot be converted to Beam FieldType");
+          }
+
+          FieldType fieldType = HCAT_TO_BEAM_TYPES_MAP.get(hCatFieldSchema.getType());
+          return Schema.Field.of(name, fieldType).withNullable(true);
+        }
+        // TODO: Add Support for Complex Types i.e. ARRAY, MAP, STRUCT
+      default:
+        throw new UnsupportedOperationException(
+            "The category '" + hCatFieldSchema.getCategory() + "' is not supported.");
+    }
   }
 }
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/SchemaUtilsTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/SchemaUtilsTest.java
new file mode 100644
index 0000000..5b748da
--- /dev/null
+++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/SchemaUtilsTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hcatalog;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SchemaUtilsTest {
+  @Test
+  public void testParameterizedTypesToBeamTypes() {
+    List<FieldSchema> listOfFieldSchema = new ArrayList<>();
+    listOfFieldSchema.add(new FieldSchema("parameterizedChar", "char(10)", null));
+    listOfFieldSchema.add(new FieldSchema("parameterizedVarchar", "varchar(100)", null));
+    listOfFieldSchema.add(new FieldSchema("parameterizedDecimal", "decimal(30,16)", null));
+
+    Schema expectedSchema =
+        Schema.builder()
+            .addNullableField("parameterizedChar", Schema.FieldType.STRING)
+            .addNullableField("parameterizedVarchar", Schema.FieldType.STRING)
+            .addNullableField("parameterizedDecimal", Schema.FieldType.DECIMAL)
+            .build();
+
+    Schema actualSchema = SchemaUtils.toBeamSchema(listOfFieldSchema);
+    Assert.assertEquals(expectedSchema, actualSchema);
+  }
+}
diff --git a/sdks/python/.pylintrc b/sdks/python/.pylintrc
index 8e46902..1f92812 100644
--- a/sdks/python/.pylintrc
+++ b/sdks/python/.pylintrc
@@ -86,11 +86,9 @@
   bad-super-call,
   bad-continuation,
   broad-except,
-  chained-comparison,
   comparison-with-callable,
   consider-using-enumerate,
   consider-using-in,
-  consider-using-set-comprehension,
   consider-using-sys-exit,
   cyclic-import,
   design,
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 8031fdd..3c8b9a9 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -590,7 +590,7 @@
 
   For internal use."""
   def __init__(self, key_coder, window_coder):
-    # type: (Coder) -> None
+    # type: (Coder, Coder) -> None
     self._key_coder = key_coder
     self._window_coder = window_coder
 
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 0cc5271..881e57f 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -60,7 +60,8 @@
   if not os.path.exists(test_yaml):
     raise ValueError('Could not find the test spec: %s' % test_yaml)
   with open(test_yaml, 'rb') as coder_spec:
-    for ix, spec in enumerate(yaml.load_all(coder_spec)):
+    for ix, spec in enumerate(
+        yaml.load_all(coder_spec, Loader=yaml.SafeLoader)):
       spec['index'] = ix
       name = spec.get('name', spec['coder']['urn'].split(':')[-2])
       yield [name, spec]
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index 565849c..81d5ea8 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -92,7 +92,7 @@
   with beam.Pipeline(argv=pipeline_args) as p:
 
     # Read the table rows into a PCollection.
-    rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input))
+    rows = p | 'read' >> beam.io.ReadFromBigQuery(table=known_args.input)
     counts = count_tornadoes(rows)
 
     # Write the output using a "Write" transform that has side effects.
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 37e2d5b..c13c104 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -29,6 +29,7 @@
 import sys
 import unittest
 import uuid
+import warnings
 
 from hamcrest.library.text import stringmatches
 from nose.plugins.attrib import attr
@@ -50,6 +51,9 @@
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import GlobalWindow
 
+warnings.filterwarnings(
+    'ignore', category=FutureWarning, module='apache_beam.io.fileio_test')
+
 
 def _get_file_reader(readable_file):
   if sys.version_info >= (3, 0):
@@ -357,8 +361,8 @@
 
   CSV_HEADERS = ['project', 'foundation']
 
-  SIMPLE_COLLECTION_VALIDATION_SET = set([(elm['project'], elm['foundation'])
-                                          for elm in SIMPLE_COLLECTION])
+  SIMPLE_COLLECTION_VALIDATION_SET = {(elm['project'], elm['foundation'])
+                                      for elm in SIMPLE_COLLECTION}
 
   class CsvSink(fileio.TextSink):
     def __init__(self, headers):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 78231c5..fd6b370 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -525,10 +525,6 @@
 FieldSchema = collections.namedtuple('FieldSchema', 'fields mode name type')
 
 
-def _to_bool(value):
-  return value == 'true'
-
-
 def _to_decimal(value):
   return decimal.Decimal(value)
 
@@ -547,7 +543,6 @@
         'INTEGER': int,
         'INT64': int,
         'FLOAT': float,
-        'BOOLEAN': _to_bool,
         'NUMERIC': _to_decimal,
         'BYTES': _to_bytes,
     }
@@ -607,6 +602,7 @@
       project=None,
       query=None,
       validate=False,
+      pipeline_options=None,
       coder=None,
       use_standard_sql=False,
       flatten_results=True,
@@ -637,6 +633,15 @@
     self.coder = coder or _JsonToDictCoder
     self.kms_key = kms_key
     self.split_result = None
+    self.options = pipeline_options
+
+  def display_data(self):
+    return {
+        'table': str(self.table_reference),
+        'query': str(self.query),
+        'project': str(self.project),
+        'use_legacy_sql': self.use_legacy_sql,
+    }
 
   def estimate_size(self):
     bq = bigquery_tools.BigQueryWrapper()
@@ -654,8 +659,9 @@
           table_ref.projectId, table_ref.datasetId, table_ref.tableId)
       return int(table.numBytes)
     elif self.query is not None and self.query.is_accessible():
+      project = self._get_project()
       job = bq._start_query_job(
-          self.project,
+          project,
           self.query.get(),
           self.use_legacy_sql,
           self.flatten_results,
@@ -669,6 +675,16 @@
       # no access to the query that we're running.
       return None
 
+  def _get_project(self):
+    """Returns the project that queries and exports will be billed to."""
+
+    project = self.options.view_as(GoogleCloudOptions).project
+    if isinstance(project, vp.ValueProvider):
+      project = project.get()
+    if not project:
+      project = self.project
+    return project
+
   def split(self, desired_bundle_size, start_position=None, stop_position=None):
     if self.split_result is None:
       bq = bigquery_tools.BigQueryWrapper()
@@ -687,7 +703,7 @@
               self.coder(schema)) for metadata in metadata_list
       ]
       if self.query is not None:
-        bq.clean_up_temporary_dataset(self.project)
+        bq.clean_up_temporary_dataset(self._get_project())
 
     for source in self.split_result:
       yield SourceBundle(0, source, None, None)
@@ -709,13 +725,13 @@
   @check_accessible(['query'])
   def _setup_temporary_dataset(self, bq):
     location = bq.get_query_location(
-        self.project, self.query.get(), self.use_legacy_sql)
-    bq.create_temporary_dataset(self.project, location)
+        self._get_project(), self.query.get(), self.use_legacy_sql)
+    bq.create_temporary_dataset(self._get_project(), location)
 
   @check_accessible(['query'])
   def _execute_query(self, bq):
     job = bq._start_query_job(
-        self.project,
+        self._get_project(),
         self.query.get(),
         self.use_legacy_sql,
         self.flatten_results,
@@ -723,7 +739,7 @@
         kms_key=self.kms_key)
     job_ref = job.jobReference
     bq.wait_for_bq_job(job_ref)
-    return bq._get_temp_table(self.project)
+    return bq._get_temp_table(self._get_project())
 
   def _export_files(self, bq):
     """Runs a BigQuery export job.
@@ -736,6 +752,7 @@
                                      job_id,
                                      self.table_reference,
                                      bigquery_tools.FileFormat.JSON,
+                                     project=self._get_project(),
                                      include_header=False)
     bq.wait_for_bq_job(job_ref)
     metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list
@@ -1638,6 +1655,7 @@
             _CustomBigQuerySource(
                 gcs_location=gcs_location,
                 validate=self.validate,
+                pipeline_options=pcoll.pipeline.options,
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py
index f7d7f9c..67be83b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py
@@ -87,7 +87,7 @@
 
 
 def table_field_to_avro_field(table_field, namespace):
-  # type: (Dict[Text, Any]) -> Dict[Text, Any]
+  # type: (Dict[Text, Any], str) -> Dict[Text, Any]
 
   """Convert a BigQuery field to an avro field.
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 83bbf26..f9e0212 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -104,8 +104,7 @@
     }),
 ]
 
-_DISTINCT_DESTINATIONS = list(
-    set([elm[0] for elm in _DESTINATION_ELEMENT_PAIRS]))
+_DISTINCT_DESTINATIONS = list({elm[0] for elm in _DESTINATION_ELEMENT_PAIRS})
 
 _ELEMENTS = [elm[1] for elm in _DESTINATION_ELEMENT_PAIRS]
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 6eaa454..2fc1f72 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -56,6 +56,7 @@
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 from apache_beam.transforms import DoFn
+from apache_beam.typehints.typehints import Any
 from apache_beam.utils import retry
 
 # Protect against environments where bigquery library is not available.
@@ -694,6 +695,7 @@
       job_id,
       table_reference,
       destination_format,
+      project=None,
       include_header=True,
       compression=ExportCompression.NONE):
     """Starts a job to export data from BigQuery.
@@ -701,10 +703,10 @@
     Returns:
       bigquery.JobReference with the information about the job that was started.
     """
-    job_reference = bigquery.JobReference(
-        jobId=job_id, projectId=table_reference.projectId)
+    job_project = project or table_reference.projectId
+    job_reference = bigquery.JobReference(jobId=job_id, projectId=job_project)
     request = bigquery.BigqueryJobsInsertRequest(
-        projectId=table_reference.projectId,
+        projectId=job_project,
         job=bigquery.Job(
             configuration=bigquery.JobConfiguration(
                 extract=bigquery.JobConfigurationExtract(
@@ -1183,6 +1185,9 @@
   def decode(self, encoded_table_row):
     return json.loads(encoded_table_row.decode('utf-8'))
 
+  def to_type_hint(self):
+    return Any
+
 
 class JsonRowWriter(io.IOBase):
   """
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index 17fcf53..b01150c 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -117,7 +117,7 @@
       stream = download.stream
 
       def get_range_callback(start, end):
-        if not (start >= 0 and end >= start and end < len(f.contents)):
+        if not 0 <= start <= end < len(f.contents):
           raise ValueError(
               'start=%d end=%d len=%s' % (start, end, len(f.contents)))
         stream.write(f.contents[start:end + 1])
diff --git a/sdks/python/apache_beam/io/restriction_trackers.py b/sdks/python/apache_beam/io/restriction_trackers.py
index 29788d9..c623f38 100644
--- a/sdks/python/apache_beam/io/restriction_trackers.py
+++ b/sdks/python/apache_beam/io/restriction_trackers.py
@@ -137,7 +137,7 @@
           'of the range. Tried to claim position %r for the range [%r, %r)' %
           (position, self._range.start, self._range.stop))
 
-    if position >= self._range.start and position < self._range.stop:
+    if self._range.start <= position < self._range.stop:
       self._current_position = position
       return True
 
diff --git a/sdks/python/apache_beam/metrics/execution_test.py b/sdks/python/apache_beam/metrics/execution_test.py
index a91eeca..d1086da 100644
--- a/sdks/python/apache_beam/metrics/execution_test.py
+++ b/sdks/python/apache_beam/metrics/execution_test.py
@@ -104,9 +104,11 @@
     self.assertEqual(len(cumulative.gauges), 10)
 
     self.assertEqual(
-        set(all_values), set([v for _, v in cumulative.counters.items()]))
+        set(all_values), {v
+                          for _, v in cumulative.counters.items()})
     self.assertEqual(
-        set(all_values), set([v.value for _, v in cumulative.gauges.items()]))
+        set(all_values), {v.value
+                          for _, v in cumulative.gauges.items()})
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 1fb0508..e14045e 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -121,7 +121,7 @@
   All the transforms applied to the pipeline must have distinct full labels.
   If same transform instance needs to be applied then the right shift operator
   should be used to designate new names
-  (e.g. ``input | "label" >> my_tranform``).
+  (e.g. ``input | "label" >> my_transform``).
   """
 
   # TODO: BEAM-9001 - set environment ID in all transforms and allow runners to
@@ -878,8 +878,10 @@
     root_transform_id, = proto.root_transform_ids
     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
     # TODO(robertwb): These are only needed to continue construction. Omit?
-    p.applied_labels = set(
-        [t.unique_name for t in proto.components.transforms.values()])
+    p.applied_labels = {
+        t.unique_name
+        for t in proto.components.transforms.values()
+    }
     for id in proto.components.pcollections:
       pcollection = context.pcollections.get_by_id(id)
       pcollection.pipeline = p
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index f970758..0011109 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -262,8 +262,8 @@
 
     visitor = PipelineTest.Visitor(visited=[])
     pipeline.visit(visitor)
-    self.assertEqual(
-        set([pcoll1, pcoll2, pcoll3, pcoll4, pcoll5]), set(visitor.visited))
+    self.assertEqual({pcoll1, pcoll2, pcoll3, pcoll4, pcoll5},
+                     set(visitor.visited))
     self.assertEqual(set(visitor.enter_composite), set(visitor.leave_composite))
     self.assertEqual(2, len(visitor.enter_composite))
     self.assertEqual(visitor.enter_composite[1].transform, transform)
@@ -779,31 +779,31 @@
 
   def test_dir(self):
     options = Breakfast()
-    self.assertEqual(
-        set([
-            'from_dictionary',
-            'get_all_options',
-            'slices',
-            'style',
-            'view_as',
-            'display_data'
-        ]),
-        set([
-            attr for attr in dir(options)
-            if not attr.startswith('_') and attr != 'next'
-        ]))
-    self.assertEqual(
-        set([
-            'from_dictionary',
-            'get_all_options',
-            'style',
-            'view_as',
-            'display_data'
-        ]),
-        set([
-            attr for attr in dir(options.view_as(Eggs))
-            if not attr.startswith('_') and attr != 'next'
-        ]))
+    self.assertEqual({
+        'from_dictionary',
+        'get_all_options',
+        'slices',
+        'style',
+        'view_as',
+        'display_data'
+    },
+                     {
+                         attr
+                         for attr in dir(options)
+                         if not attr.startswith('_') and attr != 'next'
+                     })
+    self.assertEqual({
+        'from_dictionary',
+        'get_all_options',
+        'style',
+        'view_as',
+        'display_data'
+    },
+                     {
+                         attr
+                         for attr in dir(options.view_as(Eggs))
+                         if not attr.startswith('_') and attr != 'next'
+                     })
 
 
 class RunnerApiTest(unittest.TestCase):
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 156340f..cafa4a1 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -221,7 +221,8 @@
     if self.watermark_estimator_provider is None:
       self.watermark_estimator_provider = NoOpWatermarkEstimatorProvider()
 
-  def invoke_timer_callback(self, user_state_context, key, window, timestamp):
+  def invoke_timer_callback(
+      self, user_state_context, key, window, timestamp, pane_info):
     # TODO(ccy): support side inputs.
     kwargs = {}
     if self.has_userstate_arguments:
@@ -229,10 +230,10 @@
         kwargs[kw] = user_state_context.get_state(state_spec, key, window)
       for kw, timer_spec in self.timer_args_to_replace.items():
         kwargs[kw] = user_state_context.get_timer(
-            timer_spec, key, window, None, None)
+            timer_spec, key, window, timestamp, pane_info)
 
     if self.timestamp_arg_name:
-      kwargs[self.timestamp_arg_name] = Timestamp(seconds=timestamp)
+      kwargs[self.timestamp_arg_name] = Timestamp.of(timestamp)
     if self.window_arg_name:
       kwargs[self.window_arg_name] = window
     if self.key_arg_name:
@@ -509,12 +510,12 @@
     """
     self.signature.teardown_lifecycle_method.method_value()
 
-  def invoke_user_timer(self, timer_spec, key, window, timestamp):
+  def invoke_user_timer(self, timer_spec, key, window, timestamp, pane_info):
     # self.output_processor is Optional, but in practice it won't be None here
     self.output_processor.process_outputs(
         WindowedValue(None, timestamp, (window, )),
         self.signature.timer_methods[timer_spec].invoke_timer_callback(
-            self.user_state_context, key, window, timestamp))
+            self.user_state_context, key, window, timestamp, pane_info))
 
   def invoke_create_watermark_estimator(self, estimator_state):
     return self.signature.create_watermark_estimator_method.method_value(
@@ -983,9 +984,10 @@
     assert isinstance(self.do_fn_invoker, PerWindowInvoker)
     return self.do_fn_invoker.current_element_progress()
 
-  def process_user_timer(self, timer_spec, key, window, timestamp):
+  def process_user_timer(self, timer_spec, key, window, timestamp, pane_info):
     try:
-      self.do_fn_invoker.invoke_user_timer(timer_spec, key, window, timestamp)
+      self.do_fn_invoker.invoke_user_timer(
+          timer_spec, key, window, timestamp, pane_info)
     except BaseException as exn:
       self._reraise_augmented(exn)
 
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index f71e566..07c5f88 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -117,13 +117,15 @@
   # TODO: Remove the apache_beam.pipeline dependency in CreatePTransformOverride
   from apache_beam.runners.dataflow.ptransform_overrides import CombineValuesPTransformOverride
   from apache_beam.runners.dataflow.ptransform_overrides import CreatePTransformOverride
-  from apache_beam.runners.dataflow.ptransform_overrides import ReadPTransformOverride
   from apache_beam.runners.dataflow.ptransform_overrides import JrhReadPTransformOverride
+  from apache_beam.runners.dataflow.ptransform_overrides import ReadPTransformOverride
+  from apache_beam.runners.dataflow.ptransform_overrides import NativeReadPTransformOverride
 
-  # Thesse overrides should be applied before the proto representation of the
+  # These overrides should be applied before the proto representation of the
   # graph is created.
   _PTRANSFORM_OVERRIDES = [
-      CombineValuesPTransformOverride()
+      CombineValuesPTransformOverride(),
+      NativeReadPTransformOverride(),
   ]  # type: List[PTransformOverride]
 
   _JRH_PTRANSFORM_OVERRIDES = [
@@ -325,7 +327,7 @@
     return SetPDoneVisitor(pipeline)
 
   @staticmethod
-  def side_input_visitor(use_unified_worker=False):
+  def side_input_visitor(use_unified_worker=False, use_fn_api=False):
     # Imported here to avoid circular dependencies.
     # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.pipeline import PipelineVisitor
@@ -343,7 +345,7 @@
           for ix, side_input in enumerate(transform_node.side_inputs):
             access_pattern = side_input._side_input_data().access_pattern
             if access_pattern == common_urns.side_inputs.ITERABLE.urn:
-              if use_unified_worker:
+              if use_unified_worker or not use_fn_api:
                 # TODO(BEAM-9173): Stop patching up the access pattern to
                 # appease Dataflow when using the UW and hardcode the output
                 # type to be Any since the Dataflow JSON and pipeline proto
@@ -382,8 +384,9 @@
                   'Unsupported access pattern for %r: %r' %
                   (transform_node.full_label, access_pattern))
             new_side_inputs.append(new_side_input)
-          transform_node.side_inputs = new_side_inputs
-          transform_node.transform.side_inputs = new_side_inputs
+          if use_fn_api:
+            transform_node.side_inputs = new_side_inputs
+            transform_node.transform.side_inputs = new_side_inputs
 
     return SideInputVisitor()
 
@@ -451,9 +454,10 @@
     self._maybe_add_unified_worker_missing_options(options)
 
     # Convert all side inputs into a form acceptable to Dataflow.
-    if apiclient._use_fnapi(options):
-      pipeline.visit(
-          self.side_input_visitor(apiclient._use_unified_worker(options)))
+    pipeline.visit(
+        self.side_input_visitor(
+            apiclient._use_unified_worker(options),
+            apiclient._use_fnapi(options)))
 
     # Performing configured PTransform overrides.  Note that this is currently
     # done before Runner API serialization, since the new proto needs to contain
@@ -971,32 +975,31 @@
     transform_id = self.proto_context.transforms.get_id(transform_node)
     use_fnapi = apiclient._use_fnapi(options)
     use_unified_worker = apiclient._use_unified_worker(options)
+    # Patch side input ids to be unique across a given pipeline.
+    if (label_renames and
+        transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
+      # Patch PTransform proto.
+      for old, new in iteritems(label_renames):
+        transform_proto.inputs[new] = transform_proto.inputs[old]
+        del transform_proto.inputs[old]
+
+      # Patch ParDo proto.
+      proto_type, _ = beam.PTransform._known_urns[transform_proto.spec.urn]
+      proto = proto_utils.parse_Bytes(transform_proto.spec.payload, proto_type)
+      for old, new in iteritems(label_renames):
+        proto.side_inputs[new].CopyFrom(proto.side_inputs[old])
+        del proto.side_inputs[old]
+      transform_proto.spec.payload = proto.SerializeToString()
+      # We need to update the pipeline proto.
+      del self.proto_pipeline.components.transforms[transform_id]
+      (
+          self.proto_pipeline.components.transforms[transform_id].CopyFrom(
+              transform_proto))
     # The data transmitted in SERIALIZED_FN is different depending on whether
     # this is a fnapi pipeline or not.
     if (use_fnapi and
         (transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn or
          use_unified_worker)):
-      # Patch side input ids to be unique across a given pipeline.
-      if (label_renames and
-          transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
-        # Patch PTransform proto.
-        for old, new in iteritems(label_renames):
-          transform_proto.inputs[new] = transform_proto.inputs[old]
-          del transform_proto.inputs[old]
-
-        # Patch ParDo proto.
-        proto_type, _ = beam.PTransform._known_urns[transform_proto.spec.urn]
-        proto = proto_utils.parse_Bytes(
-            transform_proto.spec.payload, proto_type)
-        for old, new in iteritems(label_renames):
-          proto.side_inputs[new].CopyFrom(proto.side_inputs[old])
-          del proto.side_inputs[old]
-        transform_proto.spec.payload = proto.SerializeToString()
-        # We need to update the pipeline proto.
-        del self.proto_pipeline.components.transforms[transform_id]
-        (
-            self.proto_pipeline.components.transforms[transform_id].CopyFrom(
-                transform_proto))
       serialized_data = transform_id
     else:
       serialized_data = pickler.dumps(
@@ -1138,13 +1141,6 @@
     })
     step.add_property(PropertyNames.OUTPUT_INFO, outputs)
 
-  def apply_Read(self, transform, pbegin, options):
-    if hasattr(transform.source, 'format'):
-      # Consider native Read to be a primitive for dataflow.
-      return beam.pvalue.PCollection.from_(pbegin)
-    else:
-      return self.apply_PTransform(transform, pbegin, options)
-
   def run_Read(self, transform_node, options):
     transform = transform_node.transform
     step = self._add_step(
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 233b22c..87f8785 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -34,6 +34,7 @@
 
 import apache_beam as beam
 import apache_beam.transforms as ptransform
+from apache_beam.coders import BytesCoder
 from apache_beam.coders import coders
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import PipelineOptions
@@ -249,7 +250,7 @@
     self.assertEqual(job_dict[u'steps'][1][u'kind'], u'ParallelDo')
     self.assertEqual(job_dict[u'steps'][2][u'kind'], u'ParallelDo')
 
-  def test_biqquery_read_streaming_fail(self):
+  def test_bigquery_read_streaming_fail(self):
     remote_runner = DataflowRunner()
     self.default_properties.append("--streaming")
     with self.assertRaisesRegex(ValueError,
@@ -436,7 +437,8 @@
         beam.pvalue.AsSingleton(pc),
         beam.pvalue.AsMultiMap(pc))
     applied_transform = AppliedPTransform(None, transform, "label", [pc])
-    DataflowRunner.side_input_visitor().visit_transform(applied_transform)
+    DataflowRunner.side_input_visitor(
+        use_fn_api=True).visit_transform(applied_transform)
     self.assertEqual(2, len(applied_transform.side_inputs))
     for side_input in applied_transform.side_inputs:
       self.assertEqual(
@@ -594,6 +596,73 @@
     self.assertIn(
         u'CombineValues', set(step[u'kind'] for step in job_dict[u'steps']))
 
+  def expect_correct_override(self, job, step_name, step_kind):
+    """Expects that a transform was correctly overriden."""
+
+    # If the typing information isn't being forwarded correctly, the component
+    # encodings here will be incorrect.
+    expected_output_info = [{
+        "encoding": {
+            "@type": "kind:windowed_value",
+            "component_encodings": [{
+                "@type": "kind:bytes"
+            }, {
+                "@type": "kind:global_window"
+            }],
+            "is_wrapper": True
+        },
+        "output_name": "out",
+        "user_name": step_name + ".out"
+    }]
+
+    job_dict = json.loads(str(job))
+    maybe_step = [
+        s for s in job_dict[u'steps']
+        if s[u'properties'][u'user_name'] == step_name
+    ]
+    self.assertTrue(maybe_step, 'Could not find step {}'.format(step_name))
+
+    step = maybe_step[0]
+    self.assertEqual(step[u'kind'], step_kind)
+
+    # The display data here is forwarded because the replace transform is
+    # subclassed from iobase.Read.
+    self.assertGreater(len(step[u'properties']['display_data']), 0)
+    self.assertEqual(step[u'properties']['output_info'], expected_output_info)
+
+  def test_read_create_translation(self):
+    runner = DataflowRunner()
+
+    with beam.Pipeline(runner=runner,
+                       options=PipelineOptions(self.default_properties)) as p:
+      # pylint: disable=expression-not-assigned
+      p | beam.Create([b'a', b'b', b'c'])
+
+    self.expect_correct_override(runner.job, u'Create/Read', u'ParallelRead')
+
+  def test_read_bigquery_translation(self):
+    runner = DataflowRunner()
+
+    with beam.Pipeline(runner=runner,
+                       options=PipelineOptions(self.default_properties)) as p:
+      # pylint: disable=expression-not-assigned
+      p | beam.io.Read(beam.io.BigQuerySource('some.table', coder=BytesCoder()))
+
+    self.expect_correct_override(runner.job, u'Read', u'ParallelRead')
+
+  def test_read_pubsub_translation(self):
+    runner = DataflowRunner()
+
+    self.default_properties.append("--streaming")
+
+    with beam.Pipeline(runner=runner,
+                       options=PipelineOptions(self.default_properties)) as p:
+      # pylint: disable=expression-not-assigned
+      p | beam.io.ReadFromPubSub(topic='projects/project/topics/topic')
+
+    self.expect_correct_override(
+        runner.job, u'ReadFromPubSub/Read', u'ParallelRead')
+
 
 class CustomMergingWindowFn(window.WindowFn):
   def assign(self, assign_context):
diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
index 2c58234..9d6e5d3 100644
--- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
+++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
@@ -146,3 +146,41 @@
         return PCollection.from_(pcoll)
 
     return CombineValuesReplacement(self.transform)
+
+
+class NativeReadPTransformOverride(PTransformOverride):
+  """A ``PTransformOverride`` for ``Read`` using native sources.
+
+  The DataflowRunner expects that the Read PTransform using native sources act
+  as a primitive. So this override replaces the Read with a primitive.
+  """
+  def matches(self, applied_ptransform):
+    # Imported here to avoid circular dependencies.
+    # pylint: disable=wrong-import-order, wrong-import-position
+    from apache_beam.io import Read
+
+    # Consider the native Read to be a primitive for Dataflow by replacing.
+    return (
+        isinstance(applied_ptransform.transform, Read) and
+        not getattr(applied_ptransform.transform, 'override', False) and
+        hasattr(applied_ptransform.transform.source, 'format'))
+
+  def get_replacement_transform(self, ptransform):
+    # Imported here to avoid circular dependencies.
+    # pylint: disable=wrong-import-order, wrong-import-position
+    from apache_beam import pvalue
+    from apache_beam.io import iobase
+
+    # This is purposely subclassed from the Read transform to take advantage of
+    # the existing windowing, typing, and display data.
+    class Read(iobase.Read):
+      override = True
+
+      def expand(self, pbegin):
+        return pvalue.PCollection.from_(pbegin)
+
+    # Use the source's coder type hint as this replacement's output. Otherwise,
+    # the typing information is not properly forwarded to the DataflowRunner and
+    # will choose the incorrect coder for this transform.
+    return Read(ptransform.source).with_output_types(
+        ptransform.source.coder.to_type_hint())
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 163b512..bc6f397 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -868,7 +868,9 @@
         timer_spec,
         self.key_coder.decode(timer_firing.encoded_key),
         timer_firing.window,
-        timer_firing.timestamp)
+        timer_firing.timestamp,
+        # TODO Add paneinfo to timer_firing in DirectRunner
+        None)
 
   def process_element(self, element):
     self.runner.process(element)
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index 40f9774..421e8f9 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -525,7 +525,7 @@
   results = []
   for e in elements:
     results.append(e)
-    if len(results) >= n and n > 0:
+    if len(results) >= n > 0:
       break
 
   return elements_to_df(results, include_window_info=include_window_info)
diff --git a/sdks/python/apache_beam/runners/job/utils.py b/sdks/python/apache_beam/runners/job/utils.py
index 1a90adb..672ab4c 100644
--- a/sdks/python/apache_beam/runners/job/utils.py
+++ b/sdks/python/apache_beam/runners/job/utils.py
@@ -28,6 +28,17 @@
 from google.protobuf import struct_pb2
 
 
+def pipeline_options_dict_to_struct(options):
+  # type: (dict) -> struct_pb2.Struct
+  # TODO: Define URNs for options.
+  # convert int values: https://issues.apache.org/jira/browse/BEAM-5509
+  return dict_to_struct({
+      'beam:option:' + k + ':v1': (str(v) if type(v) == int else v)
+      for k,
+      v in options.items() if v is not None
+  })
+
+
 def dict_to_struct(dict_obj):
   # type: (dict) -> struct_pb2.Struct
   return json_format.ParseDict(dict_obj, struct_pb2.Struct())
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index 51bb4ad..1434f54 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -119,7 +119,7 @@
     return self.get_id_to_proto_map()[id]
 
   def put_proto(self, id, proto, ignore_duplicates=False):
-    # type: (str, message.Message) -> str
+    # type: (str, message.Message, bool) -> str
     if not ignore_duplicates and id in self._id_to_proto:
       raise ValueError("Id '%s' is already taken." % id)
     elif (ignore_duplicates and id in self._id_to_proto and
diff --git a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
index 1b2de62..77b6292 100644
--- a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
+++ b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
@@ -30,6 +30,7 @@
 import zipfile
 
 import requests
+from google.protobuf import json_format
 
 from apache_beam.options import pipeline_options
 from apache_beam.portability.api import beam_job_api_pb2
@@ -94,6 +95,14 @@
         options,
         artifact_port=self._artifact_port)
 
+  def GetJobMetrics(self, request, context=None):
+    if request.job_id not in self._jobs:
+      raise LookupError("Job {} does not exist".format(request.job_id))
+    metrics_text = self._jobs[request.job_id].get_metrics()
+    response = beam_job_api_pb2.GetJobMetricsResponse()
+    json_format.Parse(metrics_text, response)
+    return response
+
 
 class FlinkBeamJob(abstract_job_service.UberJarBeamJob):
   """Runs a single Beam job on Flink by staging all contents into a Jar
@@ -237,3 +246,12 @@
         break
       else:
         yield state, timestamp
+
+  def get_metrics(self):
+    accumulators = self.get('v1/jobs/%s/accumulators' %
+                            self._flink_job_id)['user-task-accumulators']
+    for accumulator in accumulators:
+      if accumulator['name'] == '__metricscontainers':
+        return accumulator['value']
+    raise LookupError(
+        "Found no metrics container for job {}".format(self._flink_job_id))
diff --git a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server_test.py b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server_test.py
index d8a0d10..8579be2 100644
--- a/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server_test.py
@@ -56,6 +56,42 @@
     self.assertEqual(job_server.flink_version(), "3.1")
 
   @requests_mock.mock()
+  def test_get_job_metrics(self, http_mock):
+    response = {
+        "user-task-accumulators": [{
+            "name": "__metricscontainers",
+            "type": "MetricsAccumulator",
+            "value": "{\"metrics\": {\"attempted\": [{\"urn\": "
+            "\"metric_urn\", \"type\": \"beam:metrics:sum_int64:v1\", "
+            "\"payload\": \"AA==\", \"labels\": "
+            "{\"PTRANSFORM\": \"ptransform_id\"}}]}}"
+        }]
+    }
+    http_mock.get(
+        'http://flink/v1/jobs/flink_job_id/accumulators', json=response)
+    options = pipeline_options.FlinkRunnerOptions()
+    job_server = flink_uber_jar_job_server.FlinkUberJarJobServer(
+        'http://flink', options)
+    job = flink_uber_jar_job_server.FlinkBeamJob(
+        'http://flink', None, 'job_id', 'job_name', None, options)
+    job._flink_job_id = 'flink_job_id'
+    job_server._jobs['job_id'] = job
+    request = beam_job_api_pb2.GetJobMetricsRequest(job_id='job_id')
+    expected = beam_job_api_pb2.GetJobMetricsResponse(
+        metrics=beam_job_api_pb2.MetricResults(
+            attempted=[{
+                "urn": "metric_urn",
+                "type": "beam:metrics:sum_int64:v1",
+                "payload": b'\000',
+                "labels": {
+                    "PTRANSFORM": "ptransform_id"
+                }
+            }]))
+
+    actual = job_server.GetJobMetrics(request)
+    self.assertEqual(actual, expected)
+
+  @requests_mock.mock()
   def test_end_to_end(self, http_mock):
     with temp_name(suffix='fake.jar') as fake_jar:
       # Create the jar file with some trivial contents.
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
index 2f29515..4bd8cc6 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
@@ -132,6 +132,12 @@
     self._inputs = []
     self._grouped_output = None
 
+  def reset(self):
+    """Resets a cleared buffer for reuse."""
+    if not self.cleared:
+      raise RuntimeError('Trying to reset a non-cleared ListBuffer.')
+    self.cleared = False
+
 
 class GroupingBuffer(object):
   """Used to accumulate groupded (shuffled) results."""
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index 1f162ac..58db57c 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -845,10 +845,13 @@
           (result_future.is_done() and result_future.get().error)):
         if isinstance(output, beam_fn_api_pb2.Elements.Timers) and not dry_run:
           with BundleManager._lock:
-            self.bundle_context_manager.get_buffer(
+            timer_buffer = self.bundle_context_manager.get_buffer(
                 expected_output_timers[(
                     output.transform_id, output.timer_family_id)],
-                output.transform_id).append(output.timers)
+                output.transform_id)
+            if timer_buffer.cleared:
+              timer_buffer.reset()
+            timer_buffer.append(output.timers)
         if isinstance(output, beam_fn_api_pb2.Elements.Data) and not dry_run:
           with BundleManager._lock:
             self.bundle_context_manager.get_buffer(
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index ce99d87..6334dc3 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -351,6 +351,7 @@
 
   def test_pardo_timers(self):
     timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
+    state_spec = userstate.CombiningValueStateSpec('num_called', sum)
 
     class TimerDoFn(beam.DoFn):
       def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)):
@@ -359,7 +360,14 @@
         timer.set(2 * ts)
 
       @userstate.on_timer(timer_spec)
-      def process_timer(self):
+      def process_timer(
+          self,
+          ts=beam.DoFn.TimestampParam,
+          timer=beam.DoFn.TimerParam(timer_spec),
+          state=beam.DoFn.StateParam(state_spec)):
+        if state.read() == 0:
+          state.add(1)
+          timer.set(timestamp.Timestamp(micros=2 * ts.micros))
         yield 'fired'
 
     with self.create_pipeline() as p:
@@ -369,7 +377,7 @@
           | beam.ParDo(TimerDoFn())
           | beam.Map(lambda x, ts=beam.DoFn.TimestampParam: (x, ts)))
 
-      expected = [('fired', ts) for ts in (20, 200)]
+      expected = [('fired', ts) for ts in (20, 200, 40, 400)]
       assert_that(actual, equal_to(expected))
 
   def test_pardo_timers_clear(self):
@@ -1227,8 +1235,8 @@
     pipeline_options = PipelineOptions(direct_num_workers=2)
     p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(), options=pipeline_options)
-    #TODO(BEAM-8444): Fix these tests..
-    p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
+    #TODO(BEAM-8444): Fix these tests.
+    p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
     return p
 
   def test_metrics(self):
@@ -1247,8 +1255,8 @@
         direct_num_workers=2, direct_running_mode='multi_threading')
     p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(), options=pipeline_options)
-    #TODO(BEAM-8444): Fix these tests..
-    p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
+    #TODO(BEAM-8444): Fix these tests.
+    p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
     return p
 
   def test_metrics(self):
@@ -1275,7 +1283,8 @@
     p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(bundle_repeat=3),
         options=pipeline_options)
-    p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
+    #TODO(BEAM-8444): Fix these tests.
+    p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
     return p
 
   def test_register_finalizations(self):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
index 235aec8..14a9a9a 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
@@ -897,7 +897,8 @@
               setattr(proto, name, value)
           if 'unique_name' not in kwargs and hasattr(proto, 'unique_name'):
             proto.unique_name = unique_name(
-                set([p.unique_name for p in protos.values()]),
+                {p.unique_name
+                 for p in protos.values()},
                 original.unique_name + suffix)
           return new_id
 
diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py
index 66c43c6..ec40ae8 100644
--- a/sdks/python/apache_beam/runners/portability/job_server.py
+++ b/sdks/python/apache_beam/runners/portability/job_server.py
@@ -22,8 +22,6 @@
 import atexit
 import shutil
 import signal
-import subprocess
-import sys
 import tempfile
 import threading
 
@@ -158,79 +156,3 @@
             job_port, self._artifact_port, self._expansion_port,
             artifacts_dir)),
             'localhost:%s' % job_port)
-
-
-class DockerizedJobServer(SubprocessJobServer):
-  """
-  Spins up the JobServer in a docker container for local execution.
-  """
-  def __init__(
-      self,
-      job_host="localhost",
-      job_port=None,
-      artifact_port=None,
-      expansion_port=None,
-      harness_port_range=(8100, 8200),
-      max_connection_retries=5):
-    super(DockerizedJobServer, self).__init__()
-    self.job_host = job_host
-    self.job_port = job_port
-    self.expansion_port = expansion_port
-    self.artifact_port = artifact_port
-    self.harness_port_range = harness_port_range
-    self.max_connection_retries = max_connection_retries
-
-  def subprocess_cmd_and_endpoint(self):
-    # TODO This is hardcoded to Flink at the moment but should be changed
-    job_server_image_name = "apache/beam_flink%s_job_server:latest" % (
-        pipeline_options.FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1])
-    docker_path = subprocess.check_output(['which',
-                                           'docker']).strip().decode('utf-8')
-    cmd = [
-        "docker",
-        "run",
-        # We mount the docker binary and socket to be able to spin up
-        # "sibling" containers for the SDK harness.
-        "-v",
-        ':'.join([docker_path, "/bin/docker"]),
-        "-v",
-        "/var/run/docker.sock:/var/run/docker.sock"
-    ]
-
-    self.job_port, self.artifact_port, self.expansion_port = (
-        subprocess_server.pick_port(
-            self.job_port, self.artifact_port, self.expansion_port))
-
-    args = [
-        '--job-host',
-        self.job_host,
-        '--job-port',
-        str(self.job_port),
-        '--artifact-port',
-        str(self.artifact_port),
-        '--expansion-port',
-        str(self.expansion_port)
-    ]
-
-    if sys.platform == "darwin":
-      # Docker-for-Mac doesn't support host networking, so we need to explictly
-      # publish ports from the Docker container to be able to connect to it.
-      # Also, all other containers need to be aware that they run Docker-on-Mac
-      # to connect against the internal Docker-for-Mac address.
-      cmd += ["-e", "DOCKER_MAC_CONTAINER=1"]
-      cmd += ["-p", "{}:{}".format(self.job_port, self.job_port)]
-      cmd += ["-p", "{}:{}".format(self.artifact_port, self.artifact_port)]
-      cmd += ["-p", "{}:{}".format(self.expansion_port, self.expansion_port)]
-      cmd += [
-          "-p",
-          "{0}-{1}:{0}-{1}".format(
-              self.harness_port_range[0], self.harness_port_range[1])
-      ]
-    else:
-      # This shouldn't be set for MacOS because it detroys port forwardings,
-      # even though host networking is not supported on MacOS.
-      cmd.append("--network=host")
-
-    cmd.append(job_server_image_name)
-
-    return cmd + args, '%s:%s' % (self.job_host, self.job_port)
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 10f81e6..b416352 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -18,7 +18,9 @@
 # pytype: skip-file
 
 from __future__ import absolute_import
+from __future__ import division
 
+import atexit
 import functools
 import itertools
 import logging
@@ -162,14 +164,7 @@
     all_options = self.options.get_all_options(
         add_extra_args_fn=add_runner_options,
         retain_unknown_options=self._retain_unknown_options)
-    # TODO: Define URNs for options.
-    # convert int values: https://issues.apache.org/jira/browse/BEAM-5509
-    p_options = {
-        'beam:option:' + k + ':v1': (str(v) if type(v) == int else v)
-        for k,
-        v in all_options.items() if v is not None
-    }
-    return job_utils.dict_to_struct(p_options)
+    return job_utils.pipeline_options_dict_to_struct(all_options)
 
   def prepare(self, proto_pipeline):
     # type: (beam_runner_api_pb2.Pipeline) -> beam_job_api_pb2.PrepareJobResponse
@@ -303,13 +298,10 @@
     return env_class.from_options(portable_options)
 
   def default_job_server(self, options):
-    # type: (PipelineOptions) -> job_server.JobServer
-    # TODO Provide a way to specify a container Docker URL
-    # https://issues.apache.org/jira/browse/BEAM-6328
-    if not self._dockerized_job_server:
-      self._dockerized_job_server = job_server.StopOnExitJobServer(
-          job_server.DockerizedJobServer())
-    return self._dockerized_job_server
+    raise NotImplementedError(
+        'You must specify a --job_endpoint when using --runner=PortableRunner. '
+        'Alternatively, you may specify which portable runner you intend to '
+        'use, such as --runner=FlinkRunner or --runner=SparkRunner.')
 
   def create_job_service_handle(self, job_service, options):
     return JobServiceHandle(job_service, options)
@@ -433,13 +425,15 @@
         state_stream,
         cleanup_callbacks)
     if cleanup_callbacks:
-      # We wait here to ensure that we run the cleanup callbacks.
+      # Register an exit handler to ensure cleanup on exit.
+      atexit.register(functools.partial(result._cleanup, on_exit=True))
       _LOGGER.info(
-          'Waiting until the pipeline has finished because the '
-          'environment "%s" has started a component necessary for the '
-          'execution.',
+          'Environment "%s" has started a component necessary for the '
+          'execution. Be sure to run the pipeline using\n'
+          '  with Pipeline() as p:\n'
+          '    p.apply(..)\n'
+          'This ensures that the pipeline finishes before this program exits.',
           portable_options.environment_type)
-      result.wait_until_finish()
     return result
 
 
@@ -486,6 +480,7 @@
     self._state_stream = state_stream
     self._cleanup_callbacks = cleanup_callbacks
     self._metrics = None
+    self._runtime_exception = None
 
   def cancel(self):
     try:
@@ -535,7 +530,12 @@
     else:
       return 'unknown error'
 
-  def wait_until_finish(self):
+  def wait_until_finish(self, duration=None):
+    """
+    :param duration: The maximum time in milliseconds to wait for the result of
+    the execution. If None or zero, will wait until the pipeline finishes.
+    :return: The result of the pipeline, i.e. PipelineResult.
+    """
     def read_messages():
       previous_state = -1
       for message in self._message_stream:
@@ -553,27 +553,56 @@
             previous_state = current_state
         self._messages.append(message)
 
-    t = threading.Thread(target=read_messages, name='wait_until_finish_read')
-    t.daemon = True
-    t.start()
+    message_thread = threading.Thread(
+        target=read_messages, name='wait_until_finish_read')
+    message_thread.daemon = True
+    message_thread.start()
 
+    if duration:
+      state_thread = threading.Thread(
+          target=functools.partial(self._observe_state, message_thread),
+          name='wait_until_finish_state_observer')
+      state_thread.daemon = True
+      state_thread.start()
+      start_time = time.time()
+      duration_secs = duration / 1000
+      while (time.time() - start_time < duration_secs and
+             state_thread.is_alive()):
+        time.sleep(1)
+    else:
+      self._observe_state(message_thread)
+
+    if self._runtime_exception:
+      raise self._runtime_exception
+
+    return self._state
+
+  def _observe_state(self, message_thread):
     try:
       for state_response in self._state_stream:
         self._state = self._runner_api_state_to_pipeline_state(
             state_response.state)
         if state_response.state in TERMINAL_STATES:
           # Wait for any last messages.
-          t.join(10)
+          message_thread.join(10)
           break
       if self._state != runner.PipelineState.DONE:
-        raise RuntimeError(
+        self._runtime_exception = RuntimeError(
             'Pipeline %s failed in state %s: %s' %
             (self._job_id, self._state, self._last_error_message()))
-      return self._state
+    except Exception as e:
+      self._runtime_exception = e
     finally:
       self._cleanup()
 
-  def _cleanup(self):
+  def _cleanup(self, on_exit=False):
+    if on_exit and self._cleanup_callbacks:
+      _LOGGER.info(
+          'Running cleanup on exit. If your pipeline should continue running, '
+          'be sure to use the following syntax:\n'
+          '  with Pipeline() as p:\n'
+          '    p.apply(..)\n'
+          'This ensures that the pipeline finishes before this program exits.')
     has_exception = None
     for callback in self._cleanup_callbacks:
       try:
diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py b/sdks/python/apache_beam/runners/worker/opcounters_test.py
index 0ac092f..d3ab22d 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters_test.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py
@@ -67,7 +67,7 @@
     sampler.stop()
     sampler.commit_counters()
 
-    actual_counter_names = set([c.name for c in counter_factory.get_counters()])
+    actual_counter_names = {c.name for c in counter_factory.get_counters()}
     expected_counter_names = set([
         # Counter names for STEP 1
         counters.CounterName(
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 544efc4..ced3135 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -691,7 +691,8 @@
         timer_spec,
         timer_data.user_key,
         timer_data.windows[0],
-        timer_data.fire_timestamp)
+        timer_data.fire_timestamp,
+        timer_data.paneinfo)
 
   def finish(self):
     # type: () -> None
diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py
index d8090e3..5254ea5 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -40,7 +40,6 @@
 from apache_beam.transforms.core import WindowInto
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.util import CoGroupByKey
-from apache_beam.utils.annotations import experimental
 
 __all__ = [
     'assert_that',
@@ -317,7 +316,6 @@
   actual | AssertThat()  # pylint: disable=expression-not-assigned
 
 
-@experimental()
 def open_shards(glob_pattern, mode='rt', encoding='utf-8'):
   """Returns a composite file of all shards matching the given glob pattern.
 
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index adda3fd..92b88f9 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1349,7 +1349,7 @@
     return key_coder, window_coder
 
   def to_runner_api_parameter(self, context, **extra_kwargs):
-    # type: (PipelineContext) -> typing.Tuple[str, message.Message]
+    # type: (PipelineContext, Any) -> typing.Tuple[str, message.Message]
     assert isinstance(self, ParDo), \
         "expected instance of ParDo, but got %s" % self.__class__
     picked_pardo_fn_data = pickler.dumps(self._pardo_fn_data())
@@ -2242,11 +2242,10 @@
       reify_output_type = typehints.KV[
           key_type, typehints.WindowedValue[value_type]]  # type: ignore[misc]
       gbk_input_type = (
-          typehints.
-          KV[key_type,
-             typehints.Iterable[
-                 typehints.WindowedValue[  # type: ignore[misc]
-                     value_type]]])
+          typehints.KV[
+              key_type,
+              typehints.Iterable[typehints.WindowedValue[  # type: ignore[misc]
+                  value_type]]])
       gbk_output_type = typehints.KV[key_type, typehints.Iterable[value_type]]
 
       # pylint: disable=bad-continuation
@@ -2565,7 +2564,7 @@
     return super(WindowInto, self).expand(pcoll)
 
   def to_runner_api_parameter(self, context, **extra_kwargs):
-    # type: (PipelineContext) -> typing.Tuple[str, message.Message]
+    # type: (PipelineContext, Any) -> typing.Tuple[str, message.Message]
     return (
         common_urns.primitives.ASSIGN_WINDOWS.urn,
         self.windowing.to_runner_api(context))
diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py
index a71ef09..6061987 100644
--- a/sdks/python/apache_beam/transforms/environments.py
+++ b/sdks/python/apache_beam/transforms/environments.py
@@ -258,7 +258,7 @@
 
   @classmethod
   def from_container_image(cls, container_image, artifacts=()):
-    # type: (str) -> DockerEnvironment
+    # type: (str, Iterable[beam_runner_api_pb2.ArtifactInformation]) -> DockerEnvironment
     return cls(
         container_image=container_image,
         capabilities=python_sdk_capabilities(),
diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py
index 4f31cbb..cfb5e67 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -37,6 +37,7 @@
 from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
 from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
 from apache_beam.runners import pipeline_context
+from apache_beam.runners.job import utils as job_utils
 from apache_beam.transforms import ptransform
 from apache_beam.typehints.native_type_compatibility import convert_to_beam_type
 from apache_beam.typehints.trivial_inference import instance_to_type
@@ -309,10 +310,16 @@
                   urn=common_urns.primitives.IMPULSE.urn),
               outputs={'out': transform_proto.inputs[tag]}))
     components = context.to_runner_api()
+
+    # Retain unknown options since they may only be relevant to the expanding
+    # SDK
+    options = pipeline._options.get_all_options(
+        drop_default=True, retain_unknown_options=True)
     request = beam_expansion_api_pb2.ExpansionRequest(
         components=components,
         namespace=self._namespace,  # type: ignore  # mypy thinks self._namespace is threading.local
-        transform=transform_proto)
+        transform=transform_proto,
+        pipeline_options=job_utils.pipeline_options_dict_to_struct(options))
 
     if isinstance(self._expansion_service, str):
       # Some environments may not support unsecure channels. Hence using a
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index a5c685e..40a21af 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -664,7 +664,7 @@
       return register
 
   def to_runner_api(self, context, has_parts=False, **extra_kwargs):
-    # type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec
+    # type: (PipelineContext, bool, Any) -> beam_runner_api_pb2.FunctionSpec
     from apache_beam.portability.api import beam_runner_api_pb2
     urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
     if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 4cfdad1..ee007ff 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -710,13 +710,6 @@
       result = pcoll.apply(beam.Distinct())
       assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
 
-  def test_remove_duplicates(self):
-    with TestPipeline() as pipeline:
-      pcoll = pipeline | 'Start' >> beam.Create(
-          [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
-      result = pcoll.apply(beam.RemoveDuplicates())
-      assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
-
   def test_chained_ptransforms(self):
     with TestPipeline() as pipeline:
       t = (
diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py
index 02224c7..985a665 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -61,7 +61,8 @@
     return TimeDomain._RUNNER_API_MAPPING[domain]
 
 
-class TimestampCombinerImpl(with_metaclass(ABCMeta, object)):  # type: ignore[misc]
+class TimestampCombinerImpl(with_metaclass(ABCMeta,
+                                           object)):  # type: ignore[misc]
   """Implementation of TimestampCombiner."""
   @abstractmethod
   def assign_output_time(self, window, input_timestamp):
@@ -86,7 +87,8 @@
     return self.combine_all(merging_timestamps)
 
 
-class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)):  # type: ignore[misc]
+class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)
+                          ):  # type: ignore[misc]
   """TimestampCombinerImpl that only depends on the window."""
   def merge(self, result_window, unused_merging_timestamps):
     # Since we know that the result only depends on the window, we can ignore
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 6ce9ba1..57393d8 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -617,7 +617,8 @@
     return self.underlying.has_ontime_pane()
 
 
-class _ParallelTriggerFn(with_metaclass(ABCMeta, TriggerFn)):  # type: ignore[misc]
+class _ParallelTriggerFn(with_metaclass(ABCMeta,
+                                        TriggerFn)):  # type: ignore[misc]
   def __init__(self, *triggers):
     self.triggers = triggers
 
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 202bdbd..82df307 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -572,7 +572,8 @@
   # runner does not execute this method directly as a test.
   @classmethod
   def _create_tests(cls, transcript_filename):
-    for spec in yaml.load_all(open(transcript_filename)):
+    for spec in yaml.load_all(open(transcript_filename),
+                              Loader=yaml.SafeLoader):
       cls._create_test(spec)
 
   def _run_log_test(self, spec):
@@ -1005,7 +1006,7 @@
 
     with TestPipeline() as p:
       # TODO(BEAM-8601): Pass this during pipeline construction.
-      p.options.view_as(StandardOptions).streaming = True
+      p._options.view_as(StandardOptions).streaming = True
 
       # We can have at most one test stream per pipeline, so we share it.
       inputs_and_expected = p | read_test_stream
diff --git a/sdks/python/apache_beam/transforms/userstate.py b/sdks/python/apache_beam/transforms/userstate.py
index 56ba8d9..d9ad166 100644
--- a/sdks/python/apache_beam/transforms/userstate.py
+++ b/sdks/python/apache_beam/transforms/userstate.py
@@ -158,7 +158,7 @@
     return '%s(%s)' % (self.__class__.__name__, self.name)
 
   def to_runner_api(self, context, key_coder, window_coder):
-    # type: (PipelineContext) -> beam_runner_api_pb2.TimerFamilySpec
+    # type: (PipelineContext, Coder, Coder) -> beam_runner_api_pb2.TimerFamilySpec
     return beam_runner_api_pb2.TimerFamilySpec(
         time_domain=TimeDomain.to_runner_api(self.time_domain),
         timer_family_coder_id=context.coders.get_id(
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 5602766..e261f93 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -29,7 +29,6 @@
 import re
 import sys
 import time
-import warnings
 from builtins import filter
 from builtins import object
 from builtins import range
@@ -758,9 +757,6 @@
     Arguments:
       batch_size: (required) How many elements should be in a batch
     """
-    warnings.warn(
-        'Use of GroupIntoBatches transform requires State/Timer '
-        'support from the runner')
     self.batch_size = batch_size
 
   def expand(self, pcoll):
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index e586627..870b104 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -29,6 +29,7 @@
 import re
 import time
 import unittest
+import warnings
 from builtins import object
 from builtins import range
 
@@ -61,6 +62,9 @@
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 from apache_beam.utils.windowed_value import WindowedValue
 
+warnings.filterwarnings(
+    'ignore', category=FutureWarning, module='apache_beam.transform.util_test')
+
 
 class FakeClock(object):
   def __init__(self):
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index a92b7e0..4889d34 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -119,7 +119,8 @@
       raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner)
 
 
-class WindowFn(with_metaclass(abc.ABCMeta, urns.RunnerApiFn)):  # type: ignore[misc]
+class WindowFn(with_metaclass(abc.ABCMeta,
+                              urns.RunnerApiFn)):  # type: ignore[misc]
   """An abstract windowing function defining a basic assign and merge."""
   class AssignContext(object):
     """Context passed to WindowFn.assign()."""
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py
index 92b60b5..3ecd9fa 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -161,8 +161,9 @@
   BEAM_GROUP_ID = 'org.apache.beam'
   JAR_CACHE = os.path.expanduser("~/.apache_beam/cache/jars")
 
-  _BEAM_SERVICES = threading.local()
-  _BEAM_SERVICES.replacements = {}
+  _BEAM_SERVICES = type(
+      'local', (threading.local, ),
+      dict(__init__=lambda self: setattr(self, 'replacements', {})))()
 
   def __init__(self, stub_class, path_to_jar, java_arguments):
     super(JavaJarServer, self).__init__(
diff --git a/sdks/python/mypy.ini b/sdks/python/mypy.ini
index 1e4748f..151eebf 100644
--- a/sdks/python/mypy.ini
+++ b/sdks/python/mypy.ini
@@ -58,3 +58,65 @@
 [mypy-apache_beam.typehints.typehints_test_py3]
 # error: Signature of "process" incompatible with supertype "DoFn"  [override]
 ignore_errors = true
+
+
+# TODO(BEAM-7746): Remove the lines below.
+[mypy-apache_beam.coders.coders]
+ignore_errors = true
+
+[mypy-apache_beam.coders.*]
+ignore_errors = true
+
+[mypy-apache_beam.dataframe.*]
+ignore_errors = true
+
+[mypy-apache_beam.io.*]
+ignore_errors = true
+
+[mypy-apache_beam.ml.gcp.*]
+ignore_errors = true
+
+[mypy-apache_beam.pipeline]
+ignore_errors = true
+
+[mypy-apache_beam.pvalue]
+ignore_errors = true
+
+[mypy-apache_beam.runners.common]
+ignore_errors = true
+
+[mypy-apache_beam.runners.dataflow.dataflow_runner]
+ignore_errors = true
+
+[mypy-apache_beam.runners.direct.*]
+ignore_errors = true
+
+[mypy-apache_beam.runners.interactive.*]
+ignore_errors = true
+
+[mypy-apache_beam.runners.pipeline_context]
+ignore_errors = true
+
+[mypy-apache_beam.runners.portability.artifact_service]
+ignore_errors = true
+
+[mypy-apache_beam.runners.portability.fn_api_runner.*]
+ignore_errors = true
+
+[mypy-apache_beam.runners.portability.portable_runner]
+ignore_errors = true
+
+[mypy-apache_beam.runners.portability.stager]
+ignore_errors = true
+
+[mypy-apache_beam.runners.worker.*]
+ignore_errors = true
+
+[mypy-apache_beam.testing.synthetic_pipeline]
+ignore_errors = true
+
+[mypy-apache_beam.transforms.*]
+ignore_errors = true
+
+[mypy-apache_beam.typehints.*]
+ignore_errors = true
diff --git a/sdks/python/test-suites/portable/py2/build.gradle b/sdks/python/test-suites/portable/py2/build.gradle
index 84322d9..6bce3b2 100644
--- a/sdks/python/test-suites/portable/py2/build.gradle
+++ b/sdks/python/test-suites/portable/py2/build.gradle
@@ -29,17 +29,14 @@
 addPortableWordCountTasks()
 
 task preCommitPy2() {
-  dependsOn ':runners:flink:1.10:job-server-container:docker'
   dependsOn ':sdks:python:container:py2:docker'
-  dependsOn portableWordCountBatch
-  dependsOn portableWordCountStreaming
+  dependsOn ':runners:flink:1.10:job-server:shadowJar'
+  dependsOn portableWordCountFlinkRunnerBatch
+  dependsOn portableWordCountFlinkRunnerStreaming
 }
 
 task postCommitPy2() {
   dependsOn 'setupVirtualenv'
-  dependsOn ':runners:flink:1.10:job-server:shadowJar'
-  dependsOn portableWordCountFlinkRunnerBatch
-  dependsOn portableWordCountFlinkRunnerStreaming
   dependsOn 'postCommitPy2IT'
   dependsOn ':runners:spark:job-server:shadowJar'
   dependsOn portableWordCountSparkRunnerBatch
@@ -47,36 +44,6 @@
 
 // TODO: Move the rest of this file into ../common.gradle.
 
-// Before running this, you need to:
-//
-// 1. Build the SDK container:
-//
-//    ./gradlew -p sdks/python/container buildAll
-//
-// 2. Either a) or b)
-//  a) If you want the Job Server to run in a Docker container:
-//
-//    ./gradlew :runners:flink:1.10:job-server-container:docker
-//
-//  b) Otherwise, start a local JobService, for example, the Portable Flink runner
-//    (in a separate shell since it continues to run):
-//
-//    ./gradlew :runners:flink:1.10:job-server:runShadow
-//
-// Then you can run this example:
-//
-//  Docker (2a):
-//
-//    ./gradlew :sdks:python:test-suites:portable:py2:portableWordCount
-//
-//  Local JobService (2b):
-//
-//    ./gradlew :sdks:python:test-suites:portable:py2:portableWordCount -PjobEndpoint=localhost:8099
-//
-task portableWordCount {
-  dependsOn project.hasProperty("streaming") ? portableWordCountStreaming : portableWordCountBatch
-}
-
 /*************************************************************************************************/
 
 task crossLanguagePythonJavaDirect {
diff --git a/sdks/python/test-suites/portable/py35/build.gradle b/sdks/python/test-suites/portable/py35/build.gradle
index 6e263ea..1e28333 100644
--- a/sdks/python/test-suites/portable/py35/build.gradle
+++ b/sdks/python/test-suites/portable/py35/build.gradle
@@ -28,17 +28,14 @@
 addPortableWordCountTasks()
 
 task preCommitPy35() {
-    dependsOn ':runners:flink:1.10:job-server-container:docker'
     dependsOn ':sdks:python:container:py35:docker'
-    dependsOn portableWordCountBatch
-    dependsOn portableWordCountStreaming
+    dependsOn ':runners:flink:1.10:job-server:shadowJar'
+    dependsOn portableWordCountFlinkRunnerBatch
+    dependsOn portableWordCountFlinkRunnerStreaming
 }
 
 task postCommitPy35() {
     dependsOn 'setupVirtualenv'
-    dependsOn ':runners:flink:1.10:job-server:shadowJar'
-    dependsOn portableWordCountFlinkRunnerBatch
-    dependsOn portableWordCountFlinkRunnerStreaming
     dependsOn 'postCommitPy35IT'
     dependsOn ':runners:spark:job-server:shadowJar'
     dependsOn portableWordCountSparkRunnerBatch
diff --git a/sdks/python/test-suites/portable/py36/build.gradle b/sdks/python/test-suites/portable/py36/build.gradle
index e6330a1..6a45b31 100644
--- a/sdks/python/test-suites/portable/py36/build.gradle
+++ b/sdks/python/test-suites/portable/py36/build.gradle
@@ -28,17 +28,14 @@
 addPortableWordCountTasks()
 
 task preCommitPy36() {
-    dependsOn ':runners:flink:1.10:job-server-container:docker'
     dependsOn ':sdks:python:container:py36:docker'
-    dependsOn portableWordCountBatch
-    dependsOn portableWordCountStreaming
+    dependsOn ':runners:flink:1.10:job-server:shadowJar'
+    dependsOn portableWordCountFlinkRunnerBatch
+    dependsOn portableWordCountFlinkRunnerStreaming
 }
 
 task postCommitPy36() {
     dependsOn 'setupVirtualenv'
-    dependsOn ':runners:flink:1.10:job-server:shadowJar'
-    dependsOn portableWordCountFlinkRunnerBatch
-    dependsOn portableWordCountFlinkRunnerStreaming
     dependsOn 'postCommitPy36IT'
     dependsOn ':runners:spark:job-server:shadowJar'
     dependsOn portableWordCountSparkRunnerBatch
diff --git a/sdks/python/test-suites/portable/py37/build.gradle b/sdks/python/test-suites/portable/py37/build.gradle
index d232514..8c18fc2 100644
--- a/sdks/python/test-suites/portable/py37/build.gradle
+++ b/sdks/python/test-suites/portable/py37/build.gradle
@@ -28,17 +28,14 @@
 addPortableWordCountTasks()
 
 task preCommitPy37() {
-    dependsOn ':runners:flink:1.10:job-server-container:docker'
     dependsOn ':sdks:python:container:py37:docker'
-    dependsOn portableWordCountBatch
-    dependsOn portableWordCountStreaming
+    dependsOn ':runners:flink:1.10:job-server:shadowJar'
+    dependsOn portableWordCountFlinkRunnerBatch
+    dependsOn portableWordCountFlinkRunnerStreaming
 }
 
 task postCommitPy37() {
     dependsOn 'setupVirtualenv'
-    dependsOn ':runners:flink:1.10:job-server:shadowJar'
-    dependsOn portableWordCountFlinkRunnerBatch
-    dependsOn portableWordCountFlinkRunnerStreaming
     dependsOn 'postCommitPy37IT'
     dependsOn ':runners:spark:job-server:shadowJar'
     dependsOn portableWordCountSparkRunnerBatch
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index da689a7..3b66c28 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -193,10 +193,9 @@
 # make extras available in case any of these libs are typed
 extras =
   gcp
-# TODO: enable c test failures
 commands =
   mypy --version
-  - python setup.py mypy
+  python setup.py mypy
 
 [testenv:py37-docs]
 extras = test,gcp,docs,interactive
diff --git a/website/src/contribute/release-guide.md b/website/src/contribute/release-guide.md
index 8747989..3c50735 100644
--- a/website/src/contribute/release-guide.md
+++ b/website/src/contribute/release-guide.md
@@ -665,6 +665,7 @@
 
 Build python binaries in release branch in sdks/python dir.
 
+    pip install -r build-requirements.txt
     python setup.py sdist --format=zip
     cd dist
     cp apache-beam-${RELEASE}.zip staging/apache-beam-${RELEASE}-python.zip
@@ -780,7 +781,7 @@
 ```
 Create the Python SDK documentation using sphinx by running a helper script.
 ```
-cd sdks/python && tox -e docs
+cd sdks/python && pip install -r build-requirements.txt && tox -e py37-docs
 ```
 By default the Pydoc is generated in `sdks/python/target/docs/_build`. Let `${PYDOC_ROOT}` be the absolute path to `_build`.
 
diff --git a/website/src/documentation/patterns/side-inputs.md b/website/src/documentation/patterns/side-inputs.md
index 875b337..99aca3a 100644
--- a/website/src/documentation/patterns/side-inputs.md
+++ b/website/src/documentation/patterns/side-inputs.md
@@ -79,8 +79,10 @@
 1. Apply the side input.
 
 ```java
-No sample present.
+{% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:PeriodicallyUpdatingSideInputs
+%}
 ```
+
 ```py
 {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:SideInputSlowUpdateSnip1
 %}