Merge pull request #12376: [BEAM-10575] Eliminate legacy rawtypes from GCP IOs and some others; enable -Wrawtypes -Werror

diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index 44d966b..3393e6d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -169,7 +169,7 @@
       TableRow row = new TableRow();
       row.set("string", "abc");
       byte[] rawbytes = {(byte) 0xab, (byte) 0xac};
-      row.set("bytes", new String(Base64.getEncoder().encodeToString(rawbytes)));
+      row.set("bytes", Base64.getEncoder().encodeToString(rawbytes));
       row.set("integer", 5);
       row.set("float", 0.5);
       row.set("numeric", 5);
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
index 7795db4..2775c8b 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
@@ -146,7 +146,7 @@
     List<CompletionCandidate> all = new ArrayList<>();
     for (String s : entries) {
       String[] countValue = s.split(":", -1);
-      all.add(new CompletionCandidate(countValue[0], Integer.valueOf(countValue[1])));
+      all.add(new CompletionCandidate(countValue[0], Integer.parseInt(countValue[1])));
     }
     return all;
   }
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/game/StatefulTeamScoreTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/game/StatefulTeamScoreTest.java
index a4bae4e..6d4c964 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/game/StatefulTeamScoreTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/game/StatefulTeamScoreTest.java
@@ -159,7 +159,6 @@
             .apply(ParDo.of(new UpdateTeamScoreFn(100)));
 
     String redTeam = TestUser.RED_ONE.getTeam();
-    String blueTeam = TestUser.BLUE_ONE.getTeam();
 
     IntervalWindow window1 = new IntervalWindow(baseTime, teamWindowDuration);
     IntervalWindow window2 = new IntervalWindow(window1.end(), teamWindowDuration);
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index 8bf99bf..7c5c80f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -21,10 +21,11 @@
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
 /** An wrapper interface that represents the execution of a {@link DoFn}. */
-public interface DoFnRunner<InputT, OutputT> {
+public interface DoFnRunner<InputT extends @Nullable Object, OutputT extends @Nullable Object> {
   /** Prepares and calls a {@link DoFn DoFn's} {@link DoFn.StartBundle @StartBundle} method. */
   void startBundle();
 
@@ -38,7 +39,7 @@
    * Calls a {@link DoFn DoFn's} {@link DoFn.OnTimer @OnTimer} method for the given timer in the
    * given window.
    */
-  <KeyT> void onTimer(
+  <KeyT extends @Nullable Object> void onTimer(
       String timerId,
       String timerFamilyId,
       KeyT key,
@@ -57,7 +58,8 @@
    * Calls a {@link DoFn DoFn's} {@link DoFn.OnWindowExpiration @OnWindowExpiration} method and
    * performs additional task, such as extracts a value saved in a state before garbage collection.
    */
-  <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key);
+  <KeyT extends @Nullable Object> void onWindowExpiration(
+      BoundedWindow window, Instant timestamp, KeyT key);
 
   /**
    * @since 2.5.0
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
index 1537ad5..6c216f5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
@@ -19,6 +19,7 @@
 
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.beam.sdk.state.ReadableState;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -70,6 +71,8 @@
 
     return new ReadableState<PaneInfo>() {
       @Override
+      @SuppressFBWarnings(
+          "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") // just prefetch calls to readLater
       public ReadableState<PaneInfo> readLater() {
         previousPaneFuture.readLater();
         return this;
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index d555da6..f667bd4 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.core;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -46,6 +47,7 @@
    * Prefetch all bag state in {@code address} across all windows under merge in {@code context},
    * except for the bag state in the final state address window which we can blindly append to.
    */
+  @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") // just prefetch calls to readLater
   public static <K, T, W extends BoundedWindow> void prefetchBags(
       MergingStateAccessor<K, W> context, StateTag<BagState<T>> address) {
     Map<W, BagState<T>> map = context.accessInEachMergingWindow(address);
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index a4d51ba..89e8a63 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.core;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
@@ -116,6 +117,7 @@
   }
 
   @Override
+  @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") // just prefetch calls to readLater
   public void prefetchOnTrigger(StateAccessor<K> state) {
     state.access(bufferTag).readLater();
   }
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 948b281..1498e77 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -19,6 +19,7 @@
 
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
@@ -69,7 +70,10 @@
       StateTags.makeSystemTagInternal(
           StateTags.watermarkStateInternal("extra", TimestampCombiner.EARLIEST));
 
+  // [BEAM-420] Seems likely these should all be transient or this class should not be Serializable
+  @SuppressFBWarnings("SE_BAD_FIELD")
   private final TimerInternals timerInternals;
+
   private final WindowingStrategy<?, W> windowingStrategy;
   private final StateTag<WatermarkHoldState> elementHoldTag;
 
@@ -270,6 +274,7 @@
   }
 
   /** Prefetch watermark holds in preparation for merging. */
+  @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") // just prefetch calls to readLater
   public void prefetchOnMerge(MergingStateAccessor<?, W> context) {
     Map<W, WatermarkHoldState> map = context.accessInEachMergingWindow(elementHoldTag);
     WatermarkHoldState result = context.access(elementHoldTag);
@@ -298,6 +303,7 @@
    * all of the existing holds. For example, if the new window implies a later watermark hold, then
    * earlier holds may be released.
    */
+  @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") // just prefetch calls to readLater
   public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) {
     WindowTracing.debug(
         "WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{}; " + "outputWatermark:{}",
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 1045813..b75f031 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
@@ -385,21 +386,33 @@
     private final BlockingQueue<VisibleExecutorUpdate> updates = new LinkedBlockingQueue<>();
 
     @Override
+    // updates is a non-capacity-limited LinkedBlockingQueue, which can never refuse an offered
+    // update
+    @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
     public void failed(Exception e) {
       updates.offer(VisibleExecutorUpdate.fromException(e));
     }
 
     @Override
+    // updates is a non-capacity-limited LinkedBlockingQueue, which can never refuse an offered
+    // update
+    @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
     public void failed(Error e) {
       updates.offer(VisibleExecutorUpdate.fromError(e));
     }
 
     @Override
+    // updates is a non-capacity-limited LinkedBlockingQueue, which can never refuse an offered
+    // update
+    @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
     public void cancelled() {
       updates.offer(VisibleExecutorUpdate.cancelled());
     }
 
     @Override
+    // updates is a non-capacity-limited LinkedBlockingQueue, which can never refuse an offered
+    // update
+    @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
     public void completed() {
       updates.offer(VisibleExecutorUpdate.finished());
     }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index c0c05e4..e3ff795 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -19,6 +19,7 @@
 
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -125,7 +126,10 @@
 
   static class GbkThenStatefulParDo<K, InputT, OutputT>
       extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> {
+    @SuppressFBWarnings(
+        "SE_TRANSIENT_FIELD_NOT_RESTORED") // PTransforms do not actually support serialization.
     private final transient DoFn<KV<K, InputT>, OutputT> doFn;
+
     private final TupleTagList additionalOutputTags;
     private final TupleTag<OutputT> mainOutputTag;
     private final List<PCollectionView<?>> sideInputs;
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
index 83a707d..d6262e7 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink.metrics;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
@@ -40,6 +41,8 @@
   private PrintStream ps;
 
   @Override
+  @SuppressFBWarnings(
+      "DM_DEFAULT_ENCODING") // should this method specify the encoding for the PrintStream?
   public void open(MetricConfig config) {
     synchronized (this) {
       if (path == null) {
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
index 41c9824..57494d6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
@@ -22,6 +22,7 @@
 import com.google.api.services.dataflow.model.JobMessage;
 import com.google.api.services.dataflow.model.JobMetrics;
 import com.google.api.services.dataflow.model.MetricUpdate;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.List;
@@ -142,6 +143,7 @@
    * Return {@code true} if the job succeeded or {@code false} if it terminated in any other manner.
    */
   @SuppressWarnings("FutureReturnValueIgnored") // Job status checked via job.waitUntilFinish
+  @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
   private boolean waitForStreamingJobTermination(
       final DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
     // In streaming, there are infinite retries, so rather than timeout
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/OutputReference.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/OutputReference.java
index 9b9ced3..21732e6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/OutputReference.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/OutputReference.java
@@ -21,6 +21,7 @@
 
 import com.google.api.client.json.GenericJson;
 import com.google.api.client.util.Key;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.Objects;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -30,6 +31,7 @@
  */
 public final class OutputReference extends GenericJson {
   @Key("@type")
+  @SuppressFBWarnings("SS_SHOULD_BE_STATIC") // read via reflection so must be Field just like this
   public final String type = "OutputReference";
 
   @Key("step_name")
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
index 327234c..6d80563 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
@@ -22,7 +22,6 @@
 import com.google.api.client.json.GenericJson;
 import com.google.api.client.json.JsonFactory;
 import com.google.api.client.json.JsonGenerator;
-import com.google.api.client.util.Charsets;
 import com.google.api.services.dataflow.model.InstructionOutput;
 import com.google.api.services.dataflow.model.ParallelInstruction;
 import com.google.api.services.dataflow.model.SideInputInfo;
@@ -42,6 +41,7 @@
 import org.apache.beam.sdk.extensions.gcp.util.Transport;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 
 /** Container class for different types of network nodes. All nodes only have reference equality. */
@@ -169,7 +169,7 @@
       generator.enablePrettyPrint();
       generator.serialize(json);
       generator.flush();
-      return byteStream.toString();
+      return byteStream.toString(Charsets.UTF_8.name());
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
index e4de8cb9..08a995d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.charset.Charset;
@@ -37,6 +38,7 @@
 import java.util.logging.LogRecord;
 import java.util.logging.Logger;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
 
 /**
  * A {@link PrintStream} factory that creates {@link PrintStream}s which output to the specified JUL
@@ -66,14 +68,17 @@
     private int carryOverBytes;
     private byte[] carryOverByteArray;
 
-    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel)
+        throws UnsupportedEncodingException {
       super(
           new OutputStream() {
             @Override
             public void write(int i) throws IOException {
               throw new RuntimeException("All methods should be overwritten so this is unused");
             }
-          });
+          },
+          false,
+          Charsets.UTF_8.name());
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
@@ -401,7 +406,11 @@
    * specified {@code loggerName} and {@code level}.
    */
   static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
-    return new JulHandlerPrintStream(handler, loggerName, messageLevel);
+    try {
+      return new JulHandlerPrintStream(handler, loggerName, messageLevel);
+    } catch (UnsupportedEncodingException exc) {
+      throw new RuntimeException("Encoding not supported: " + Charsets.UTF_8.name(), exc);
+    }
   }
 
   @VisibleForTesting
diff --git a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarCreator.java b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarCreator.java
index 71397dc..ef05075 100644
--- a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarCreator.java
+++ b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarCreator.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.jobsubmission;
 
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -90,7 +92,7 @@
             .as(PortablePipelineOptions.class);
 
     final String jobName = jobInfo.jobName();
-    File outputFile = new File(pipelineOptions.getOutputExecutablePath());
+    File outputFile = new File(checkArgumentNotNull(pipelineOptions.getOutputExecutablePath()));
     LOG.info("Creating jar {} for job {}", outputFile.getAbsolutePath(), jobName);
     outputStream =
         new JarOutputStream(new FileOutputStream(outputFile), createManifest(mainClass, jobName));
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java
index 16a1298..d87cbd2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.spark.metrics.sink;
 
 import com.codahale.metrics.MetricRegistry;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.Properties;
 import org.apache.beam.runners.spark.metrics.AggregatorMetric;
 import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
@@ -26,6 +27,8 @@
 /**
  * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} metrics to a CSV file.
  */
+// Intentionally overriding parent name because inheritors should replace the parent.
+@SuppressFBWarnings("NM_SAME_SIMPLE_NAME_AS_SUPERCLASS")
 public class CsvSink extends org.apache.spark.metrics.sink.CsvSink {
   public CsvSink(
       final Properties properties,
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java
index c25e8cd..eca1b2b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java
@@ -18,12 +18,15 @@
 package org.apache.beam.runners.spark.metrics.sink;
 
 import com.codahale.metrics.MetricRegistry;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.Properties;
 import org.apache.beam.runners.spark.metrics.AggregatorMetric;
 import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
 import org.apache.spark.metrics.sink.Sink;
 
 /** A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} metrics to Graphite. */
+// Intentionally overriding parent name because inheritors should replace the parent.
+@SuppressFBWarnings("NM_SAME_SIMPLE_NAME_AS_SUPERCLASS")
 public class GraphiteSink extends org.apache.spark.metrics.sink.GraphiteSink {
   public GraphiteSink(
       final Properties properties,
diff --git a/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml
index 7811398..939b58c 100644
--- a/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml
@@ -16,418 +16,60 @@
     limitations under the License.
 -->
 <FindBugsFilter>
-  <!-- Ignored bug categories. Bugs in these categories should not block the build. -->
-  <Bug category="I18N"/>
-  <Bug pattern="DM_STRING_CTOR"/>
-  <Bug pattern="EI_EXPOSE_REP" />
-  <Bug pattern="EI_EXPOSE_REP2" />
+  <!--
+    Beam UDFs are intended to be serialized for transmission only. There is no expectation that they can
+    be stored and read by a different version of the code.
+  -->
   <Bug pattern="SE_NO_SERIALVERSIONID"/>
 
-  <!-- The uncallable method error fails on @ProcessElement style methods -->
+  <!--
+    The preferred way to find nullness errors is {@code enableChecker: true}. If a module has
+    many Spotbugs nullness errors, it is not worthwhile to fix them. But Spotbugs is useful for
+    other coding issues. Instead of disabling spotbugs, turn on this flag.
+
+    More context: Migration from Findbugs to Spotbugs caused a loss of coverage for nullness. Then upgrade
+    from Spotbugs 3.1.12 to 4.0.6 found hundreds of new nullness errors spread throughout all modules.
+    It is not worthwhile to fix them, but instead they are disabled and the modules should eventually be
+    fixed to pass the Checker Framework nullness type system.
+  -->
+  <Bug code="NP,RCN" />
+
+  <!--
+    When arrays are passed in and out of objects, spotbugs warns. Yet most objects are mutable in some way in
+    Java and carry the same risk. These warning codes produce a large number of unhelpful errors having to do with
+    passing byte[] arrays around, since Beam deals in serialized data. Trusting callers and callees not to mutate
+    things inappropriately is inherent to Java.
+  -->
+  <Bug code="EI,EI2" />
+
+  <!--
+    Beam DoFns are invoked via reflection by looking at the annotations. To spotbugs, these methods
+    seem to be uncallable because they are on anonymous classes.
+  -->
   <Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
 
-  <!-- Suppress checking of AutoValue internals -->
+  <!--
+    Many test classes are captured by lambdas and marked `implements Serializable`. They are not
+    actually serializable, so the TestPipeline that they employ does not need to be transient or
+    serializable.
+  -->
+  <Match>
+    <Bug pattern="SE_BAD_FIELD" />
+    <Class name=".*Test$" />
+    <Field type="org.apache.beam.sdk.testing.TestPipeline" />
+  </Match>
+
+  <!-- Suppress checking of autogenerated classes -->
   <Match>
     <Class name="~.*AutoValue_.*"/>
   </Match>
-
-  <!--
-          Suppressed findbugs issues. All new issues should include a comment why they're
-          suppressed.
-
-          Suppressions should go in this file rather than inline using @SuppressFBWarnings to avoid
-          unapproved artifact license.
-	-->
   <Match>
-    <Class name="org.apache.beam.sdk.coders.AvroCoder$SerializableSchemaSupplier"/>
-    <Field name="schema"/>
-    <Bug pattern="SE_BAD_FIELD"/>
-    <!--
-    writeReplace makes this object serializable. This is a limitation of FindBugs as discussed here:
-    http://stackoverflow.com/questions/26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern
-    -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.coders.SerializableCoder"/>
-    <Field name="typeDescriptor"/>
-    <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
-    <!--
-    the field is used only in getEncodedTypeDescriptor, where it is restored if it is not present due to
-    serialization
-    -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.io.jms.JmsRecord"/>
-    <Field name="jmsDestination"/>
-    <Bug pattern="SE_BAD_FIELD"/>
-    <!--
-    JMS destination is serializable according to the JMS spec even if it doesn't implement
-    Serializable.
-     -->
-  </Match>
-  <Match>
-    <Class name="org.apache.beam.sdk.io.jms.JmsRecord"/>
-    <Field name="jmsReplyTo"/>
-    <Bug pattern="SE_BAD_FIELD"/>
-    <!--
-    JMS ReplyTo destination is serializable according to the JMS spec even if it doesn't implement
-    Serializable.
-     -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.coders.InstantCoder$LexicographicLongConverter"/>
-    <Bug pattern="HE_INHERITS_EQUALS_USE_HASHCODE"/>
-    <!-- Converter overrides .equals() to add documentation but does not change behavior -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.testing.PAssert$PCollectionViewAssert"/>
-    <Method name="equals" />
-    <Bug pattern="EQ_UNUSUAL"/>
-    <!-- Unsupported operation -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.testing.PAssert$PCollectionContentsAssert"/>
-    <Method name="equals" />
-    <Bug pattern="EQ_UNUSUAL"/>
-    <!-- Unsupported operation -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.testing.SerializableMatchers$SerializableArrayViaCoder"/>
-    <Field name="value" />
-    <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
-    <!-- Cached value is lazily restored on read. -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.transforms.Mean$CountSum"/>
-    <Method name="equals" />
-    <Bug pattern="FE_FLOATING_POINT_EQUALITY"/>
-    <!-- Comparing doubles directly since equals method is only used in coder test. -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.util.ExposedByteArrayInputStream"/>
-    <Method name="readAll" />
-    <Bug pattern="EI_EXPOSE_REP"/>
-    <!-- Returns internal buffer by design. -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.util.ExposedByteArrayOutputStream"/>
-    <Method name="toByteArray" />
-    <Bug pattern="EI_EXPOSE_REP"/>
-    <!-- Returns internal buffer by design. -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.util.ExposedByteArrayOutputStream"/>
-    <Method name="toByteArray" />
-    <Bug pattern="EI_EXPOSE_REP"/>
-    <!-- Returns internal buffer by design. -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.util.ExposedByteArrayOutputStream"/>
-    <Method name="writeAndOwn" />
-    <Bug pattern="EI_EXPOSE_REP"/>
-    <!-- Takes ownership of input buffer -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.dataflow.util.MonitoringUtilTest" />
-    <Field name="thrown" />
-    <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" />
-    <!-- TestRule used automatically by JUnit framework -->
-  </Match>
-  <Match>
-    <Class name="org.apache.beam.runners.dataflow.util.OutputReference" />
-    <Field name="type" />
-    <Bug pattern="SS_SHOULD_BE_STATIC" />
-    <!-- Field read via reflection -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.dataflow.options.DataflowPipelineOptionsTest" />
-    <Field name="restoreSystemProperties" />
-    <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" />
-    <!-- TestRule used automatically by JUnit framework -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.dataflow.TestDataflowRunner" />
-    <Method name="waitForStreamingJobTermination" />
-    <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
-    <!-- waitForStreamingJobTermination checks status via job.waitUntilFinish() -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$QueueMessageReceiver" />
-    <Or>
-      <Method name="failed" />
-      <Method name="cancelled" />
-      <Method name="completed" />
-    </Or>
-    <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
-    <!-- updates is a non-capacity-limited LinkedBlockingQueue, which
-      can never refuse an offered update -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$CodedBroadcastHelper"/>
-    <Or>
-      <Field name="bcast" />
-      <Field name="value" />
-    </Or>
-    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
-    <!--
-      Spark's Broadcast variables are a distributed and cached objects
-      and should not be treated as "normal" objects.
-    -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$DirectBroadcastHelper"/>
-    <Or>
-      <Field name="bcast" />
-      <Field name="value" />
-    </Or>
-    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
-    <!--
-      Spark's Broadcast variables are a distributed and cached objects
-      and should not be treated as "normal" objects.
-    -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.spark.metrics.sink.CsvSink"/>
-    <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
-    <!-- Intentionally overriding parent name because inheritors should replace the parent. -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.spark.metrics.sink.GraphiteSink"/>
-    <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
-    <!-- Intentionally overriding parent name because inheritors should replace the parent. -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.io.LocalResourceId"/>
-    <Method name="getCurrentDirectory" />
-    <Bug pattern="NP_NULL_PARAM_DEREF"/>
-    <!--
-      Path.getParent() could return null. However, we check the returned Path is not null.
-    -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.extensions.gcp.storage.GcsResourceId"/>
-    <Method name="getCurrentDirectory" />
-    <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
-    <!--
-      GcsPath.getParent() could return null. However, we check the returned Path is not null.
-    -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.util.ZipFiles"/>
-    <Method name="zipDirectory" />
-    <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
-    <!--
-      File.listFiles() will return null if the File instance is not a directory. Null dereference is
-      not a possibility here since we validate sourceDirectory is directory via
-      sourceDirectory.isDirectory()
-    -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.util.ZipFiles"/>
-    <Method name="zipDirectoryInternal" />
-    <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
-    <!--
-      File.listFiles() will return null if the File instance is not a directory. Null dereference is
-      not a possibility here since we validate sourceDirectory is directory via
-      sourceDirectory.isDirectory()
-    -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.core.StateMerging"/>
-    <Method name="prefetchRead" />
-    <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
-    <!-- prefetch call readLater -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runner.core.PaneInfoTracker"/>
-    <Method name="getNextPaneInfo" />
-    <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
-    <!-- prefetch side effect -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runner.core.SystemReduceFn"/>
-    <Method name="prefetchOnTrigger" />
-    <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
-    <!-- prefetch side effect -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runner.core.WatermarkHold"/>
-    <Method name="extractAndRelease" />
-    <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
-    <!-- prefetch call readLater -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.core.triggers.AfterDelayFromFirstElementStateMachine"/>
-    <Method name="prefetchOnElement" />
-    <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
-    <!-- prefetch side effect -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.core.triggers.AfterDelayFromFirstElementStateMachine"/>
-    <Method name="prefetchShouldFire" />
-    <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
-    <!-- prefetch side effect -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.core.triggers.AfterPaneStateMachine"/>
-    <Method name="prefetchShouldFire" />
-    <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
-    <!-- prefetch side effect -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.core.triggers.TriggerStateMachines$StateMachineConverter"/>
-    <Method name="evaluateSpecific" />
-    <Bug pattern="UPM_UNCALLED_PRIVATE_METHOD"/>
-    <!-- Called via reflection -->
-  </Match>
-
-  <!--
-    Baseline issues. No new issues should be added below this line and all existing issues should
-    have an associated JIRA
-  -->
-
-  <Match>
-    <Class name="org.apache.beam.sdk.coders.JAXBCoder"/>
-    <Method name="getContext"/>
-    <Bug pattern="DC_DOUBLECHECK"/>
-    <!--[BEAM-398] Possible double check of field-->
-  </Match>
-  <Match>
-    <Class name="org.apache.beam.sdk.testing.WindowSupplier"/>
-    <Field name="windows"/>
-    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
-    <!--[BEAM-407] Inconsistent synchronization -->
-  </Match>
-  <Match>
-    <Class name="org.apache.beam.sdk.util.CombineFnUtil$NonSerializableBoundedCombineFn"/>
-    <Field name="context"/>
-    <Bug pattern="SE_BAD_FIELD"/>
-    <!--
-      The class is not meant to be serializable, writeObject() just throws an exception. Therefore
-      it's reasonable for this field to also not be serializable.
-    -->
-  </Match>
-  <Match>
-    <Class name="org.apache.beam.runners.core.WatermarkHold"/>
-    <Field name="timerInternals"/>
-    <Bug pattern="SE_BAD_FIELD"/>
-    <!--[BEAM-420] Non-transient non-serializable instance field in serializable class-->
-  </Match>
-  <Match>
-    <Class name="StateSpecs$CombiningStateSpec"/>
-    <Method name="equals"/>
-    <Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
-    <!--[BEAM-421] Class doesn't override equals in superclass-->
-  </Match>
-  <Match>
-    <Class name="org.apache.beam.sdk.util.ClassPath$ClassInfo"/>
-    <Method name="equals"/>
-    <Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
-    <!--[BEAM-1676] Class doesn't override equals in superclass-->
-  </Match>
-  <Match>
-    <Class name="org.apache.beam.sdk.util.AutoValue_GcsUtil_StorageObjectOrIOException"/>
-    <Bug pattern="NM_CLASS_NOT_EXCEPTION"/>
-    <!-- It is clear from the name that this class holds either StorageObject or IOException. -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.extensions.gcp.util.GcsUtil$StorageObjectOrIOException"/>
-    <Bug pattern="NM_CLASS_NOT_EXCEPTION"/>
-    <!-- It is clear from the name that this class holds either StorageObject or IOException. -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.direct.ParDoMultiOverrideFactory$StatefulParDo"/>
-    <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
-    <!-- PTransforms do not actually support serialization. -->
-  </Match>
-
-  <Match>
-    <Class name="org.apache.beam.sdk.options.ProxyInvocationHandler"/>
-    <Field name="~.*"/>
-    <Bug pattern="SE_BAD_FIELD"/>
-    <!--
-      ProxyInvocationHandler implements Serializable only for the sake of throwing an informative
-      exception in writeObject()
-    -->
-  </Match>
-
-  <Match>
-    <!--
-  Classes in this package is auto-generated, let's disable the findbugs for it.
-  -->
     <Package name="org.apache.beam.sdk.extensions.sql.impl.parser.impl"/>
   </Match>
-
   <Match>
-    <!--
-  Classes in this package is auto-generated, let's disable the findbugs for it.
-  -->
     <Package name="org.apache.beam.sdk.schemas.parser.generated"/>
   </Match>
-
   <Match>
-    <!--
-  Classes in this package is auto-generated, let's disable the findbugs for it.
-  -->
     <Package name="org.apache.beam.sdk.io.clickhouse.impl.parser"/>
   </Match>
-
-  <Match>
-    <Class name="org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory$SimpleStageBundleFactory"/>
-    <Field name="wrappedClient"/>
-    <Bug pattern="URF_UNREAD_FIELD"/>
-    <!-- Fix build. -->
-  </Match>
-
-  <Match>
-    <!--
-      This is a false positive. Spotbugs does not recognize the use of try-with-resources, so it thinks that
-      the connection is not correctly closed.
-    -->
-    <Or>
-      <And>
-        <Class name="org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn"/>
-        <Method name="processElement"/>
-      </And>
-      <And>
-        <Class name="org.apache.beam.sdk.io.jdbc.JdbcIO$ReadRows"/>
-        <Method name="inferBeamSchema"/>
-      </And>
-    </Or>
-
-    <Bug pattern="OBL_UNSATISFIED_OBLIGATION"/>
-  </Match>
 </FindBugsFilter>
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 83bc428..8b77446 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.coders;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -210,6 +211,10 @@
    * serialization and hence is able to encode the {@link Schema} object directly.
    */
   private static class SerializableSchemaSupplier implements Serializable, Supplier<Schema> {
+    // writeReplace makes this object serializable. This is a limitation of FindBugs as discussed
+    // here:
+    // http://stackoverflow.com/questions/26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern
+    @SuppressFBWarnings("SE_BAD_FIELD")
     private final Schema schema;
 
     private SerializableSchemaSupplier(Schema schema) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
index c0cc6c5..ffdf3fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.coders;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -166,6 +167,8 @@
   private final Class<T> type;
 
   /** Access via {@link #getEncodedTypeDescriptor()}. */
+  // the field is restored lazily if it is not present due to serialization
+  @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED")
   private transient @Nullable TypeDescriptor<T> typeDescriptor;
 
   protected SerializableCoder(Class<T> type, TypeDescriptor<T> typeDescriptor) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java
index fb922b0..bbdd3a5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java
@@ -30,5 +30,5 @@
 public interface ExternalTransformRegistrar {
 
   /** A mapping from URN to an {@link ExternalTransformBuilder} class. */
-  Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders();
+  Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders();
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java
index 3560464..07f940e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java
@@ -154,7 +154,7 @@
   @Experimental(Experimental.Kind.FILESYSTEM)
   public static class ClassLoaderFileSystemRegistrar implements FileSystemRegistrar {
     @Override
-    public Iterable<FileSystem> fromOptions(@Nullable PipelineOptions options) {
+    public Iterable<FileSystem<?>> fromOptions(@Nullable PipelineOptions options) {
       return ImmutableList.of(new ClassLoaderFileSystem());
     }
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
index 7da675c..1e9ff1c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
@@ -40,5 +40,5 @@
    * <p>Each {@link FileSystem#getScheme() scheme} is required to be unique among all {@link
    * FileSystem}s registered by all {@link FileSystemRegistrar}s.
    */
-  Iterable<FileSystem> fromOptions(PipelineOptions options);
+  Iterable<FileSystem<?>> fromOptions(PipelineOptions options);
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java
index f93cb8f..7c5c483 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java
@@ -134,7 +134,7 @@
     public static final String URN = "beam:external:java:generate_sequence:v1";
 
     @Override
-    public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
+    public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
       return ImmutableMap.of(URN, AutoValue_GenerateSequence.Builder.class);
     }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
index 3a15c67..09f1571 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
@@ -29,7 +29,7 @@
 @Experimental(Kind.FILESYSTEM)
 public class LocalFileSystemRegistrar implements FileSystemRegistrar {
   @Override
-  public Iterable<FileSystem> fromOptions(@Nullable PipelineOptions options) {
+  public Iterable<FileSystem<?>> fromOptions(@Nullable PipelineOptions options) {
     return ImmutableList.of(new LocalFileSystem());
   }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index 7c75893..ff8e1c6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -33,6 +33,7 @@
 import com.fasterxml.jackson.databind.SerializerProvider;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.beans.PropertyDescriptor;
 import java.io.IOException;
 import java.io.NotSerializableException;
@@ -95,8 +96,17 @@
   private final int hashCode = ThreadLocalRandom.current().nextInt();
 
   private final Set<Class<? extends PipelineOptions>> knownInterfaces;
+
+  // ProxyInvocationHandler implements Serializable only for the sake of throwing an informative
+  // exception in writeObject()
+  @SuppressFBWarnings("SE_BAD_FIELD")
   private final ClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
+
+  // ProxyInvocationHandler implements Serializable only for the sake of throwing an informative
+  // exception in writeObject()
+  @SuppressFBWarnings("SE_BAD_FIELD")
   private final Map<String, BoundValue> options;
+
   private final Map<String, JsonNode> jsonOptions;
   private final Map<String, String> gettersToPropertyNames;
   private final Map<String, String> settersToPropertyNames;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 937be48..05b1914 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -681,6 +681,7 @@
      */
     @Deprecated
     @Override
+    @SuppressFBWarnings("EQ_UNUSUAL")
     public boolean equals(@Nullable Object o) {
       throw new UnsupportedOperationException(
           "If you meant to test object equality, use .containsInAnyOrder instead.");
@@ -1048,6 +1049,7 @@
      */
     @Deprecated
     @Override
+    @SuppressFBWarnings("EQ_UNUSUAL")
     public boolean equals(@Nullable Object o) {
       throw new UnsupportedOperationException(
           String.format(
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
index 97fd9b4..70d35b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
@@ -20,6 +20,7 @@
 import static org.hamcrest.Matchers.in;
 import static org.hamcrest.core.Is.is;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
@@ -796,6 +797,7 @@
    */
   private static class SerializableArrayViaCoder<T> implements SerializableSupplier<T[]> {
     /** Cached value that is not serialized. */
+    @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED")
     private transient T @Nullable [] value;
 
     /** The bytes of {@link #value} when encoded via {@link #coder}. */
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowSupplier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowSupplier.java
index 4c27ecd..0458817 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowSupplier.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowSupplier.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.testing;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.Serializable;
 import java.util.Collection;
 import org.apache.beam.sdk.coders.Coder;
@@ -33,6 +34,8 @@
  * supplier is {@link Serializable}, and handles encoding and decoding the windows with a {@link
  * Coder} provided for the windows.
  */
+// Spotbugs thinks synchronization is for the fields, but it is for the act of decoding
+@SuppressFBWarnings("IS2_INCONSISTENT_SYNC")
 final class WindowSupplier implements Supplier<Collection<BoundedWindow>>, Serializable {
   private final Coder<? extends BoundedWindow> coder;
   private final Collection<byte[]> encodedWindows;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index 6e50010..925cf89 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -149,6 +150,9 @@
     }
 
     @Override
+    // Comparing doubles directly since class is package private and equals method is only used in
+    // coder test.
+    @SuppressFBWarnings("FE_FLOATING_POINT_EQUALITY")
     public boolean equals(@Nullable Object other) {
       if (!(other instanceof CountSum)) {
         return false;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
index 73885e6..20d6325 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.util;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.io.NotSerializableException;
 import java.io.ObjectOutputStream;
@@ -115,6 +116,9 @@
   private static class NonSerializableBoundedCombineFn<InputT, AccumT, OutputT>
       extends CombineFn<InputT, AccumT, OutputT> {
     private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
+
+    // The class is not meant to be serializable, writeObject() just throws an exception.
+    @SuppressFBWarnings("SE_BAD_FIELD")
     private final Context context;
 
     private NonSerializableBoundedCombineFn(
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExposedByteArrayOutputStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExposedByteArrayOutputStream.java
index 78ab0f5..fca7491f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExposedByteArrayOutputStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExposedByteArrayOutputStream.java
@@ -62,6 +62,7 @@
    *
    * <p><i>Note: After passing any byte array to this method, it must not be modified again.</i>
    */
+  // Takes ownership of input buffer by design - Spotbugs is right to warn that this is dangerous
   public synchronized void writeAndOwn(byte[] b) throws IOException {
     if (b.length == 0) {
       return;
@@ -91,6 +92,7 @@
   }
 
   @Override
+  // Exposes internal mutable reference by design - Spotbugs is right to warn that this is dangerous
   public synchronized byte[] toByteArray() {
     // Note: count == buf.length is not a correct criteria to "return buf;", because the internal
     // buf may be reused after reset().
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java
index af7e1f4..c263146 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java
@@ -38,7 +38,8 @@
     for (FileSystemRegistrar registrar :
         Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
       if (registrar instanceof LocalFileSystemRegistrar) {
-        Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create());
+        Iterable<FileSystem<?>> fileSystems =
+            registrar.fromOptions(PipelineOptionsFactory.create());
         assertThat(fileSystems, contains(instanceOf(LocalFileSystem.class)));
         return;
       }
diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
index fe6dcf5..c20e8d2 100644
--- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
+++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
@@ -107,10 +107,10 @@
           ImmutableMap.builder();
       for (ExternalTransformRegistrar registrar :
           ServiceLoader.load(ExternalTransformRegistrar.class)) {
-        for (Map.Entry<String, Class<? extends ExternalTransformBuilder>> entry :
+        for (Map.Entry<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> entry :
             registrar.knownBuilders().entrySet()) {
           String urn = entry.getKey();
-          Class<? extends ExternalTransformBuilder> builderClass = entry.getValue();
+          Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass = entry.getValue();
           builder.put(
               urn,
               spec -> {
@@ -130,7 +130,7 @@
 
     private static PTransform<?, ?> translate(
         ExternalTransforms.ExternalConfigurationPayload payload,
-        Class<? extends ExternalTransformBuilder> builderClass)
+        Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass)
         throws Exception {
       Preconditions.checkState(
           ExternalTransformBuilder.class.isAssignableFrom(builderClass),
@@ -142,8 +142,8 @@
       return buildTransform(builderClass, configObject);
     }
 
-    private static Object initConfiguration(Class<? extends ExternalTransformBuilder> builderClass)
-        throws Exception {
+    private static Object initConfiguration(
+        Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass) throws Exception {
       for (Method method : builderClass.getMethods()) {
         if (method.getName().equals("buildExternal")) {
           Preconditions.checkState(
@@ -241,9 +241,9 @@
     }
 
     private static PTransform<?, ?> buildTransform(
-        Class<? extends ExternalTransformBuilder> builderClass, Object configObject)
+        Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass, Object configObject)
         throws Exception {
-      Constructor<? extends ExternalTransformBuilder> constructor =
+      Constructor<? extends ExternalTransformBuilder<?, ?, ?>> constructor =
           builderClass.getDeclaredConstructor();
       constructor.setAccessible(true);
       ExternalTransformBuilder<?, ?, ?> externalTransformBuilder = constructor.newInstance();
diff --git a/sdks/java/extensions/google-cloud-platform-core/build.gradle b/sdks/java/extensions/google-cloud-platform-core/build.gradle
index 5629d98..bd23376 100644
--- a/sdks/java/extensions/google-cloud-platform-core/build.gradle
+++ b/sdks/java/extensions/google-cloud-platform-core/build.gradle
@@ -21,7 +21,6 @@
 plugins { id 'org.apache.beam.module' }
 applyJavaNature(
   enableChecker: false,
-  ignoreRawtypeErrors: true,
   automaticModuleName: 'org.apache.beam.sdk.extensions.gcp')
 
 description = "Apache Beam :: SDKs :: Java :: Extensions :: Google Cloud Platform Core"
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
index 6caff00..b090359 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
@@ -35,7 +35,7 @@
 public class GcsFileSystemRegistrar implements FileSystemRegistrar {
 
   @Override
-  public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
+  public Iterable<FileSystem<?>> fromOptions(@Nonnull PipelineOptions options) {
     checkNotNull(options, "Expect the runner have called FileSystems.setDefaultPipelineOptions().");
     return ImmutableList.of(new GcsFileSystem(options.as(GcsOptions.class)));
   }
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index 5db0978..6845af8 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -44,6 +44,7 @@
 import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
 import com.google.cloud.hadoop.util.ResilientOperation;
 import com.google.cloud.hadoop.util.RetryDeterminer;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.channels.SeekableByteChannel;
@@ -807,6 +808,8 @@
   }
 
   /** A class that holds either a {@link StorageObject} or an {@link IOException}. */
+  // It is clear from the name that this class holds either StorageObject or IOException.
+  @SuppressFBWarnings("NM_CLASS_NOT_EXCEPTION")
   @AutoValue
   public abstract static class StorageObjectOrIOException {
 
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java
index 83a213b..10752dd 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java
@@ -40,7 +40,8 @@
     for (FileSystemRegistrar registrar :
         Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
       if (registrar instanceof GcsFileSystemRegistrar) {
-        Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create());
+        Iterable<FileSystem<?>> fileSystems =
+            registrar.fromOptions(PipelineOptionsFactory.create());
         assertThat(fileSystems, contains(instanceOf(GcsFileSystem.class)));
         return;
       }
diff --git a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
index a253a4b0..f5d6def 100644
--- a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
+++ b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
@@ -42,7 +42,7 @@
           .build();
 
   @Override
-  public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
+  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
     return org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.of(
         URN, Builder.class);
   }
diff --git a/sdks/java/extensions/sql/jdbc/src/main/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLine.java b/sdks/java/extensions/sql/jdbc/src/main/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLine.java
index 0b1506f..6dd8fc1 100644
--- a/sdks/java/extensions/sql/jdbc/src/main/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLine.java
+++ b/sdks/java/extensions/sql/jdbc/src/main/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLine.java
@@ -26,6 +26,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import sqlline.SqlLine;
 import sqlline.SqlLine.Status;
@@ -65,11 +66,11 @@
     SqlLine sqlLine = new SqlLine();
 
     if (outputStream != null) {
-      sqlLine.setOutputStream(new PrintStream(outputStream));
+      sqlLine.setOutputStream(new PrintStream(outputStream, false, Charsets.UTF_8.name()));
     }
 
     if (errorStream != null) {
-      sqlLine.setErrorStream(new PrintStream(errorStream));
+      sqlLine.setErrorStream(new PrintStream(errorStream, false, Charsets.UTF_8.name()));
     }
 
     return sqlLine.begin(modifiedArgs, inputStream, true);
diff --git a/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java b/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
index 3da48a2..6d4fa60 100644
--- a/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
+++ b/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
@@ -49,6 +49,7 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -141,7 +142,7 @@
     sqlEnv =
         BeamSqlEnv.builder(inMemoryMetaStore)
             .setPipelineOptions(PipelineOptionsFactory.create())
-            .setRuleSets(new RuleSet[] {RuleSets.ofList(ruleList)})
+            .setRuleSets(ImmutableList.of(RuleSets.ofList(ruleList)))
             .build();
     sqlEnv.executeDdl(String.format(CREATE_TABLE_STATEMENT, Method.DIRECT_READ.toString()));
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index b8b0e72..9c43664 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -22,6 +22,7 @@
 import java.lang.reflect.Method;
 import java.sql.SQLException;
 import java.util.AbstractMap.SimpleEntry;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -146,7 +147,7 @@
     private boolean autoLoadBuiltinFunctions;
     private boolean autoLoadUdfs;
     private PipelineOptions pipelineOptions;
-    private RuleSet[] ruleSets;
+    private Collection<RuleSet> ruleSets;
 
     private BeamSqlEnvBuilder(TableProvider tableProvider) {
       checkNotNull(tableProvider, "Table provider for the default schema must be sets.");
@@ -178,7 +179,7 @@
     }
 
     /** Set the ruleSet used for query optimizer. */
-    public BeamSqlEnvBuilder setRuleSets(RuleSet[] ruleSets) {
+    public BeamSqlEnvBuilder setRuleSets(Collection<RuleSet> ruleSets) {
       this.ruleSets = ruleSets;
       return this;
     }
@@ -309,16 +310,28 @@
       }
     }
 
-    private QueryPlanner instantiatePlanner(JdbcConnection jdbcConnection, RuleSet[] ruleSets) {
+    private QueryPlanner instantiatePlanner(
+        JdbcConnection jdbcConnection, Collection<RuleSet> ruleSets) {
+      Class<?> queryPlannerClass;
       try {
-        return (QueryPlanner)
-            Class.forName(queryPlannerClassName)
-                .getConstructor(JdbcConnection.class, RuleSet[].class)
-                .newInstance(jdbcConnection, ruleSets);
-      } catch (Exception e) {
+        queryPlannerClass = Class.forName(queryPlannerClassName);
+      } catch (ClassNotFoundException exc) {
         throw new RuntimeException(
-            String.format("Cannot construct query planner %s", queryPlannerClassName), e);
+            "Cannot find requested QueryPlanner class: " + queryPlannerClassName, exc);
       }
+
+      QueryPlanner.Factory factory;
+      try {
+        factory = (QueryPlanner.Factory) queryPlannerClass.getField("FACTORY").get(null);
+      } catch (NoSuchFieldException | IllegalAccessException exc) {
+        throw new RuntimeException(
+            String.format(
+                "QueryPlanner class %s does not have an accessible static field 'FACTORY' of type QueryPlanner.Factory",
+                queryPlannerClassName),
+            exc);
+      }
+
+      return factory.createPlanner(jdbcConnection, ruleSets);
     }
   }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
index cea8fba..7e7517b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
@@ -17,9 +17,11 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.Factory;
 import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters.Kind;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
 import org.apache.beam.sdk.extensions.sql.impl.planner.RelMdNodeStats;
@@ -76,12 +78,21 @@
   private final JdbcConnection connection;
 
   /** Called by {@link BeamSqlEnv}.instantiatePlanner() reflectively. */
-  public CalciteQueryPlanner(JdbcConnection connection, RuleSet[] ruleSets) {
+  public CalciteQueryPlanner(JdbcConnection connection, Collection<RuleSet> ruleSets) {
     this.connection = connection;
     this.planner = Frameworks.getPlanner(defaultConfig(connection, ruleSets));
   }
 
-  public FrameworkConfig defaultConfig(JdbcConnection connection, RuleSet[] ruleSets) {
+  public static final Factory FACTORY =
+      new Factory() {
+        @Override
+        public QueryPlanner createPlanner(
+            JdbcConnection jdbcConnection, Collection<RuleSet> ruleSets) {
+          return new CalciteQueryPlanner(jdbcConnection, ruleSets);
+        }
+      };
+
+  public FrameworkConfig defaultConfig(JdbcConnection connection, Collection<RuleSet> ruleSets) {
     final CalciteConnectionConfig config = connection.config();
     final SqlParser.ConfigBuilder parserConfig =
         SqlParser.configBuilder()
@@ -115,7 +126,7 @@
         .defaultSchema(defaultSchema)
         .traitDefs(traitDefs)
         .context(Contexts.of(connection.config()))
-        .ruleSets(ruleSets)
+        .ruleSets(ruleSets.toArray(new RuleSet[0]))
         .costFactory(BeamCostModel.FACTORY)
         .typeSystem(connection.getTypeFactory().getTypeSystem())
         .operatorTable(ChainedSqlOperatorTable.of(opTab0, catalogReader))
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
index a690082..20e5604 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
@@ -18,12 +18,14 @@
 package org.apache.beam.sdk.extensions.sql.impl;
 
 import com.google.auto.value.AutoOneOf;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
 
 /**
  * An interface that planners should implement to convert sql statement to {@link BeamRelNode} or
@@ -70,4 +72,8 @@
           ImmutableList.copyOf(positionalParams));
     }
   }
+
+  interface Factory {
+    QueryPlanner createPlanner(JdbcConnection jdbcConnection, Collection<RuleSet> ruleSets);
+  }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
index 1b8083c..2ecc26b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.planner;
 
+import java.util.Collection;
 import java.util.List;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
@@ -168,14 +169,14 @@
   private static final List<RelOptRule> BEAM_TO_ENUMERABLE =
       ImmutableList.of(BeamEnumerableConverterRule.INSTANCE);
 
-  public static RuleSet[] getRuleSets() {
-    return new RuleSet[] {
-      RuleSets.ofList(
-          ImmutableList.<RelOptRule>builder()
-              .addAll(BEAM_CONVERTERS)
-              .addAll(BEAM_TO_ENUMERABLE)
-              .addAll(LOGICAL_OPTIMIZATIONS)
-              .build())
-    };
+  public static Collection<RuleSet> getRuleSets() {
+
+    return ImmutableList.of(
+        RuleSets.ofList(
+            ImmutableList.<RelOptRule>builder()
+                .addAll(BEAM_CONVERTERS)
+                .addAll(BEAM_TO_ENUMERABLE)
+                .addAll(LOGICAL_OPTIMIZATIONS)
+                .build()));
   }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java
index 37fbc61..5cd2676 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java
@@ -45,7 +45,6 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectCalcMergeRule;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectToCalcRule;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.Pair;
 import org.junit.Before;
@@ -88,7 +87,7 @@
     sqlEnv =
         BeamSqlEnv.builder(tableProvider)
             .setPipelineOptions(PipelineOptionsFactory.create())
-            .setRuleSets(new RuleSet[] {RuleSets.ofList(defaultRules)})
+            .setRuleSets(ImmutableList.of(RuleSets.ofList(defaultRules)))
             .build();
   }
 
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java
index 2d0a1be..a967fc3 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java
@@ -232,7 +232,7 @@
     TestTableProvider tableProvider = new TestTableProvider();
     createThreeTables(tableProvider);
     List<RelOptRule> ruleSet =
-        Arrays.stream(BeamRuleSets.getRuleSets())
+        BeamRuleSets.getRuleSets().stream()
             .flatMap(rules -> StreamSupport.stream(rules.spliterator(), false))
             .filter(rule -> !(rule instanceof BeamJoinPushThroughJoinRule))
             .filter(rule -> !(rule instanceof BeamJoinAssociateRule))
@@ -242,7 +242,7 @@
     BeamSqlEnv env =
         BeamSqlEnv.builder(tableProvider)
             .setPipelineOptions(PipelineOptionsFactory.create())
-            .setRuleSets(new RuleSet[] {RuleSets.ofList(ruleSet)})
+            .setRuleSets(ImmutableList.of(RuleSets.ofList(ruleSet)))
             .build();
 
     // This is Join(Join(medium, large), small) which should be converted to a join that large table
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java
index 1acb94f..d3221e0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java
@@ -48,7 +48,6 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterToCalcRule;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectCalcMergeRule;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectToCalcRule;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets;
 import org.joda.time.Duration;
 import org.junit.Before;
@@ -93,7 +92,7 @@
     sqlEnv =
         BeamSqlEnv.builder(tableProvider)
             .setPipelineOptions(PipelineOptionsFactory.create())
-            .setRuleSets(new RuleSet[] {RuleSets.ofList(rulesWithPushDown)})
+            .setRuleSets(ImmutableList.of(RuleSets.ofList(rulesWithPushDown)))
             .build();
   }
 
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java
index e64a103..9a67950 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java
@@ -50,7 +50,6 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterToCalcRule;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectCalcMergeRule;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectToCalcRule;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets;
 import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.joda.time.Duration;
@@ -96,7 +95,7 @@
     sqlEnv =
         BeamSqlEnv.builder(tableProvider)
             .setPipelineOptions(PipelineOptionsFactory.create())
-            .setRuleSets(new RuleSet[] {RuleSets.ofList(rulesWithPushDown)})
+            .setRuleSets(ImmutableList.of(RuleSets.ofList(rulesWithPushDown)))
             .build();
   }
 
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java
index 363c0f2..941e984 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java
@@ -48,7 +48,6 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterToCalcRule;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectCalcMergeRule;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectToCalcRule;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets;
 import org.joda.time.Duration;
 import org.junit.Before;
@@ -92,7 +91,7 @@
     sqlEnv =
         BeamSqlEnv.builder(tableProvider)
             .setPipelineOptions(PipelineOptionsFactory.create())
-            .setRuleSets(new RuleSet[] {RuleSets.ofList(rulesWithPushDown)})
+            .setRuleSets(ImmutableList.of(RuleSets.ofList(rulesWithPushDown)))
             .build();
   }
 
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
index 9b8a417..334af11 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
@@ -19,6 +19,7 @@
 
 import com.google.zetasql.LanguageOptions;
 import com.google.zetasql.Value;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
@@ -72,7 +73,7 @@
    * Called by {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv}.instantiatePlanner()
    * reflectively.
    */
-  public ZetaSQLQueryPlanner(JdbcConnection jdbcConnection, RuleSet[] ruleSets) {
+  public ZetaSQLQueryPlanner(JdbcConnection jdbcConnection, Collection<RuleSet> ruleSets) {
     plannerImpl =
         new ZetaSQLPlannerImpl(defaultConfig(jdbcConnection, modifyRuleSetsForZetaSql(ruleSets)));
     setDefaultTimezone(
@@ -82,15 +83,24 @@
             .getZetaSqlDefaultTimezone());
   }
 
-  public static RuleSet[] getZetaSqlRuleSets() {
+  public static final Factory FACTORY =
+      new Factory() {
+        @Override
+        public QueryPlanner createPlanner(
+            JdbcConnection jdbcConnection, Collection<RuleSet> ruleSets) {
+          return new ZetaSQLQueryPlanner(jdbcConnection, ruleSets);
+        }
+      };
+
+  public static Collection<RuleSet> getZetaSqlRuleSets() {
     return modifyRuleSetsForZetaSql(BeamRuleSets.getRuleSets());
   }
 
-  private static RuleSet[] modifyRuleSetsForZetaSql(RuleSet[] ruleSets) {
-    RuleSet[] ret = new RuleSet[ruleSets.length];
-    for (int i = 0; i < ruleSets.length; i++) {
+  private static Collection<RuleSet> modifyRuleSetsForZetaSql(Collection<RuleSet> ruleSets) {
+    ImmutableList.Builder<RuleSet> ret = ImmutableList.builder();
+    for (RuleSet ruleSet : ruleSets) {
       ImmutableList.Builder<RelOptRule> bd = ImmutableList.builder();
-      for (RelOptRule rule : ruleSets[i]) {
+      for (RelOptRule rule : ruleSet) {
         // TODO[BEAM-9075]: Fix join re-ordering for ZetaSQL planner. Currently join re-ordering
         //  requires the JoinCommuteRule, which doesn't work without struct flattening.
         if (rule instanceof JoinCommuteRule) {
@@ -109,9 +119,9 @@
           bd.add(rule);
         }
       }
-      ret[i] = RuleSets.ofList(bd.build());
+      ret.add(RuleSets.ofList(bd.build()));
     }
-    return ret;
+    return ret.build();
   }
 
   public String getDefaultTimezone() {
@@ -177,7 +187,8 @@
     return (BeamRelNode) plannerImpl.transform(0, desiredTraits, root.rel);
   }
 
-  private static FrameworkConfig defaultConfig(JdbcConnection connection, RuleSet[] ruleSets) {
+  private static FrameworkConfig defaultConfig(
+      JdbcConnection connection, Collection<RuleSet> ruleSets) {
     final CalciteConnectionConfig config = connection.config();
     final SqlParser.ConfigBuilder parserConfig =
         SqlParser.configBuilder()
@@ -210,7 +221,7 @@
         .parserConfig(parserConfig.build())
         .defaultSchema(defaultSchema)
         .traitDefs(traitDefs)
-        .ruleSets(ruleSets)
+        .ruleSets(ruleSets.toArray(new RuleSet[0]))
         .costFactory(BeamCostModel.FACTORY)
         .typeSystem(connection.getTypeFactory().getTypeSystem())
         .operatorTable(ChainedSqlOperatorTable.of(opTab0, catalogReader))
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPushDownTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPushDownTest.java
index c529425..80ea234 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPushDownTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPushDownTest.java
@@ -43,6 +43,7 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.FrameworkConfig;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.Frameworks;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.joda.time.Duration;
 import org.junit.BeforeClass;
@@ -186,7 +187,7 @@
             .defaultSchema(defaultSchemaPlus)
             .traitDefs(traitDefs)
             .context(Contexts.of(contexts))
-            .ruleSets(ZetaSQLQueryPlanner.getZetaSqlRuleSets())
+            .ruleSets(ZetaSQLQueryPlanner.getZetaSqlRuleSets().toArray(new RuleSet[0]))
             .costFactory(BeamCostModel.FACTORY)
             .typeSystem(jdbcConnection.getTypeFactory().getTypeSystem())
             .build();
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java
index eacb6b8..6d9ba67 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java
@@ -30,6 +30,7 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.ConventionTraitDef;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.FrameworkConfig;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.Frameworks;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 
 /** Common setup for ZetaSQL tests. */
@@ -77,7 +78,7 @@
             .defaultSchema(jdbcConnection.getCurrentSchemaPlus())
             .traitDefs(ImmutableList.of(ConventionTraitDef.INSTANCE))
             .context(Contexts.of(jdbcConnection.config()))
-            .ruleSets(ZetaSQLQueryPlanner.getZetaSqlRuleSets())
+            .ruleSets(ZetaSQLQueryPlanner.getZetaSqlRuleSets().toArray(new RuleSet[0]))
             .costFactory(BeamCostModel.FACTORY)
             .typeSystem(jdbcConnection.getTypeFactory().getTypeSystem())
             .build();
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java
index 49252ed..e9e5ace 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java
@@ -35,7 +35,7 @@
 public class S3FileSystemRegistrar implements FileSystemRegistrar {
 
   @Override
-  public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
+  public Iterable<FileSystem<?>> fromOptions(@Nonnull PipelineOptions options) {
     checkNotNull(options, "Expect the runner have called FileSystems.setDefaultPipelineOptions().");
     return ImmutableList.of(new S3FileSystem(options.as(S3Options.class)));
   }
diff --git a/sdks/java/io/azure/build.gradle b/sdks/java/io/azure/build.gradle
index a2deb94..3860aca 100644
--- a/sdks/java/io/azure/build.gradle
+++ b/sdks/java/io/azure/build.gradle
@@ -19,8 +19,7 @@
 plugins { id 'org.apache.beam.module' }
 applyJavaNature(
   automaticModuleName: 'org.apache.beam.sdk.io.azure',
-  enableChecker: false,
-  ignoreRawtypeErrors: true)
+  enableChecker: false)
 
 dependencies {
   compile project(path: ":sdks:java:core", configuration: "shadow")
diff --git a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemRegistrar.java b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemRegistrar.java
index 14569c6..33a7054 100644
--- a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemRegistrar.java
+++ b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemRegistrar.java
@@ -27,7 +27,7 @@
 
 public class AzureBlobStoreFileSystemRegistrar implements FileSystemRegistrar {
   @Override
-  public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
+  public Iterable<FileSystem<?>> fromOptions(@Nonnull PipelineOptions options) {
     checkNotNull(options, "Expect the runner have called FileSystems.setDefaultPipelineOptions().");
     // TODO
     return ImmutableList.of();
diff --git a/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzfsResourceIdTest.java b/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzfsResourceIdTest.java
index abd380c..ee28187 100644
--- a/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzfsResourceIdTest.java
+++ b/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzfsResourceIdTest.java
@@ -56,7 +56,7 @@
     public String expectedResult;
 
     @Parameterized.Parameters
-    public static Collection paths() {
+    public static Collection<Object[]> paths() {
       return Arrays.asList(
           new Object[][] {
             {"azfs://account/container/", "", RESOLVE_DIRECTORY, "azfs://account/container/"},
diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index 14e1c4e..175044b 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -21,7 +21,6 @@
 plugins { id 'org.apache.beam.module' }
 applyJavaNature(
   enableChecker: false,
-  ignoreRawtypeErrors: true,
   automaticModuleName: 'org.apache.beam.sdk.io.gcp',
   enableSpotbugs: false,
 )
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index c62dccb..56ab4e6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -1024,9 +1024,9 @@
 
   static class BigQueryServerStreamImpl<T> implements BigQueryServerStream<T> {
 
-    private final ServerStream serverStream;
+    private final ServerStream<T> serverStream;
 
-    public BigQueryServerStreamImpl(ServerStream serverStream) {
+    public BigQueryServerStreamImpl(ServerStream<T> serverStream) {
       this.serverStream = serverStream;
     }
 
@@ -1069,7 +1069,7 @@
 
     @Override
     public BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request) {
-      return new BigQueryServerStreamImpl(client.readRowsCallable().call(request));
+      return new BigQueryServerStreamImpl<>(client.readRowsCallable().call(request));
     }
 
     @Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
index d460c5d..06bf36f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
@@ -82,7 +82,7 @@
    * Stream}.
    */
   public BigQueryStorageStreamSource<T> fromExisting(Stream newStream) {
-    return new BigQueryStorageStreamSource(
+    return new BigQueryStorageStreamSource<>(
         readSession, newStream, jsonTableSchema, parseFn, outputCoder, bqServices);
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index c333c1c..f5774fc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -55,6 +55,7 @@
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Instant;
@@ -362,7 +363,7 @@
   }
 
   private static final SerializableFunction<Row, TableRow> ROW_TO_TABLE_ROW =
-      new ToTableRow(SerializableFunctions.identity());
+      new ToTableRow<>(SerializableFunctions.identity());
 
   /** Convert a Beam {@link Row} to a BigQuery {@link TableRow}. */
   public static SerializableFunction<Row, TableRow> toTableRow() {
@@ -423,7 +424,7 @@
     return output;
   }
 
-  private static Object fromBeamField(FieldType fieldType, Object fieldValue) {
+  private static @Nullable Object fromBeamField(FieldType fieldType, Object fieldValue) {
     if (fieldValue == null) {
       if (!fieldType.getNullable()) {
         throw new IllegalArgumentException("Field is not nullable.");
@@ -435,8 +436,8 @@
       case ARRAY:
       case ITERABLE:
         FieldType elementType = fieldType.getCollectionElementType();
-        Iterable items = (Iterable) fieldValue;
-        List convertedItems = Lists.newArrayListWithCapacity(Iterables.size(items));
+        Iterable<?> items = (Iterable<?>) fieldValue;
+        List<Object> convertedItems = Lists.newArrayListWithCapacity(Iterables.size(items));
         for (Object item : items) {
           convertedItems.add(fromBeamField(elementType, item));
         }
@@ -670,7 +671,7 @@
       FieldType beamField, Object value, BigQueryUtils.ConversionOptions options) {
     // Check whether the type of array element is equal.
     List<Object> values = (List<Object>) value;
-    List<Object> ret = new ArrayList();
+    List<Object> ret = new ArrayList<>();
     FieldType collectionElement = beamField.getCollectionElementType();
     for (Object v : values) {
       ret.add(convertAvroFormat(collectionElement, v, options));
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
index 6f382fe..b56f324 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
@@ -35,8 +35,8 @@
     this.elementCoder = elementCoder;
   }
 
-  public static <ElementT> TableRowInfoCoder of(Coder<ElementT> elementCoder) {
-    return new TableRowInfoCoder(elementCoder);
+  public static <ElementT> TableRowInfoCoder<ElementT> of(Coder<ElementT> elementCoder) {
+    return new TableRowInfoCoder<>(elementCoder);
   }
 
   @Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
index 46f86af..fbc95c6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
@@ -33,7 +33,7 @@
  */
 @VisibleForTesting
 class TagWithUniqueIds<ElementT>
-    extends DoFn<KV<ShardedKey<String>, ElementT>, KV<ShardedKey<String>, TableRowInfo>> {
+    extends DoFn<KV<ShardedKey<String>, ElementT>, KV<ShardedKey<String>, TableRowInfo<ElementT>>> {
   private transient String randomUUID;
   private transient long sequenceNo = 0L;
 
@@ -50,6 +50,7 @@
     // BigQuery.
     context.output(
         KV.of(
-            context.element().getKey(), new TableRowInfo(context.element().getValue(), uniqueId)));
+            context.element().getKey(),
+            new TableRowInfo<>(context.element().getValue(), uniqueId)));
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 0dd6f8a..b214838 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -379,7 +379,9 @@
             .setUseAvroLogicalTypes(useAvroLogicalTypes);
     if (schemaUpdateOptions != null) {
       List<String> options =
-          schemaUpdateOptions.stream().map(Enum::name).collect(Collectors.toList());
+          schemaUpdateOptions.stream()
+              .map(Enum<SchemaUpdateOption>::name)
+              .collect(Collectors.toList());
       loadConfig.setSchemaUpdateOptions(options);
     }
     if (timePartitioning != null) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareIOErrorCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareIOErrorCoder.java
index da139ee..6ad7ae0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareIOErrorCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareIOErrorCoder.java
@@ -39,7 +39,7 @@
     this.originalCoder = NullableCoder.of(originalCoder);
   }
 
-  public static <T> HealthcareIOErrorCoder of(Coder<T> originalCoder) {
+  public static <T> HealthcareIOErrorCoder<T> of(Coder<T> originalCoder) {
     return new HealthcareIOErrorCoder<>(originalCoder);
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
index 926e79b..93612ae 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
@@ -44,7 +44,7 @@
   public static final String URN = "beam:external:java:pubsub:read:v1";
 
   @Override
-  public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
+  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
     return ImmutableMap.of(URN, ReadBuilder.class);
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
index 7ab956c..d4df6f6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
@@ -42,7 +42,7 @@
   public static final String URN = "beam:external:java:pubsub:write:v1";
 
   @Override
-  public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
+  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
     return ImmutableMap.of(URN, WriteBuilder.class);
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
index 5e05c05..7dcc44b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
@@ -27,6 +27,8 @@
 import com.google.cloud.spanner.DatabaseId;
 import com.google.cloud.spanner.Spanner;
 import com.google.cloud.spanner.SpannerOptions;
+import com.google.spanner.v1.CommitRequest;
+import com.google.spanner.v1.CommitResponse;
 import io.grpc.CallOptions;
 import io.grpc.Channel;
 import io.grpc.ClientCall;
@@ -105,7 +107,7 @@
     if (commitDeadline != null && commitDeadline.get().getMillis() > 0) {
 
       // Set the GRPC deadline on the Commit API call.
-      UnaryCallSettings.Builder commitSettings =
+      UnaryCallSettings.Builder<CommitRequest, CommitResponse> commitSettings =
           builder.getSpannerStubSettingsBuilder().commitSettings();
       RetrySettings.Builder commitRetrySettings = commitSettings.getRetrySettings().toBuilder();
       commitSettings.setRetrySettings(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 6a2658c..0554b74 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -801,7 +801,7 @@
 
     abstract FailureMode getFailureMode();
 
-    abstract @Nullable PCollection getSchemaReadySignal();
+    abstract @Nullable PCollection<?> getSchemaReadySignal();
 
     abstract OptionalInt getGroupingFactor();
 
@@ -820,7 +820,7 @@
 
       abstract Builder setFailureMode(FailureMode failureMode);
 
-      abstract Builder setSchemaReadySignal(PCollection schemaReadySignal);
+      abstract Builder setSchemaReadySignal(PCollection<?> schemaReadySignal);
 
       abstract Builder setGroupingFactor(int groupingFactor);
 
@@ -947,7 +947,7 @@
      *
      * @see Wait.OnSignal
      */
-    public Write withSchemaReadySignal(PCollection signal) {
+    public Write withSchemaReadySignal(PCollection<?> signal) {
       return toBuilder().setSchemaReadySignal(signal).build();
     }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java
index 37d8cbf..412864d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java
@@ -120,7 +120,7 @@
 
   // Longs tend to get converted back to Integers due to JSON serialization. Convert them back.
   public static TableRow convertNumbers(TableRow tableRow) {
-    for (TableRow.Entry entry : tableRow.entrySet()) {
+    for (TableRow.Entry<?, Object> entry : tableRow.entrySet()) {
       if (entry.getValue() instanceof Integer) {
         entry.setValue(Long.valueOf((Integer) entry.getValue()));
       }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index 06bb0a1..6970ef8 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -138,7 +138,7 @@
   }
 
   private void checkTypedReadQueryObject(
-      BigQueryIO.TypedRead read, String query, String kmsKey, String tempDataset) {
+      BigQueryIO.TypedRead<?> read, String query, String kmsKey, String tempDataset) {
     checkTypedReadQueryObjectWithValidate(read, query, kmsKey, tempDataset, true);
   }
 
@@ -159,7 +159,7 @@
   }
 
   private void checkTypedReadQueryObjectWithValidate(
-      BigQueryIO.TypedRead read,
+      BigQueryIO.TypedRead<?> read,
       String query,
       String kmsKey,
       String tempDataset,
@@ -226,7 +226,7 @@
 
   @Test
   public void testBuildQueryBasedTypedReadSource() {
-    BigQueryIO.TypedRead read =
+    BigQueryIO.TypedRead<?> read =
         BigQueryIO.readTableRows()
             .fromQuery("foo_query")
             .withKmsKey("kms_key")
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
index 1b55843..5c9bea5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
@@ -226,7 +226,7 @@
         .withMethod(Method.DIRECT_READ);
   }
 
-  private void checkTypedReadQueryObject(TypedRead typedRead, String query) {
+  private void checkTypedReadQueryObject(TypedRead<?> typedRead, String query) {
     assertNull(typedRead.getTable());
     assertEquals(query, typedRead.getQuery().get());
   }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
index 118d994..00ffba2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -213,7 +213,7 @@
   }
 
   private void checkTypedReadTableObject(
-      TypedRead typedRead, String project, String dataset, String table) {
+      TypedRead<?> typedRead, String project, String dataset, String table) {
     assertEquals(project, typedRead.getTable().getProjectId());
     assertEquals(dataset, typedRead.getTable().getDatasetId());
     assertEquals(table, typedRead.getTable().getTableId());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index ebe706c..37999aa 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -1641,7 +1641,7 @@
                   .toString();
           TableRowWriter<TableRow> writer =
               new TableRowWriter<>(filename, SerializableFunctions.identity());
-          try (TableRowWriter ignored = writer) {
+          try (TableRowWriter<TableRow> ignored = writer) {
             TableRow tableRow = new TableRow().set("name", tableName);
             writer.write(tableRow);
           }
@@ -1992,7 +1992,7 @@
     p.run();
 
     List<String> expectedOptions =
-        schemaUpdateOptions.stream().map(Enum::name).collect(Collectors.toList());
+        schemaUpdateOptions.stream().map(SchemaUpdateOption::name).collect(Collectors.toList());
 
     for (Job job : fakeJobService.getAllJobs()) {
       JobConfigurationLoad configuration = job.getConfiguration().getLoad();
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
index c159148..e08a24b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
@@ -193,7 +193,7 @@
 
     RunnerApi.ParDoPayload parDoPayload =
         RunnerApi.ParDoPayload.parseFrom(writeParDo.getSpec().getPayload());
-    DoFn pubsubWriter = ParDoTranslation.getDoFn(parDoPayload);
+    DoFn<?, ?> pubsubWriter = ParDoTranslation.getDoFn(parDoPayload);
 
     String idAttributeActual = (String) Whitebox.getInternalState(pubsubWriter, "idAttribute");
 
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
index 03fd170..b0eb872 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
@@ -62,6 +62,7 @@
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
@@ -743,7 +744,8 @@
     BatchableMutationFilterFn testFn =
         new BatchableMutationFilterFn(null, null, 10000000, 3 * CELLS_PER_KEY, 1000);
 
-    ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class);
+    BatchableMutationFilterFn.ProcessContext mockProcessContext =
+        Mockito.mock(ProcessContext.class);
     when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
 
     // Capture the outputs.
@@ -797,7 +799,8 @@
     BatchableMutationFilterFn testFn =
         new BatchableMutationFilterFn(null, null, mutationSize * 3, 1000, 1000);
 
-    ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class);
+    DoFn<MutationGroup, MutationGroup>.ProcessContext mockProcessContext =
+        Mockito.mock(ProcessContext.class);
     when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
 
     // Capture the outputs.
@@ -850,7 +853,8 @@
     long mutationSize = MutationSizeEstimator.sizeOf(m(1L));
     BatchableMutationFilterFn testFn = new BatchableMutationFilterFn(null, null, 1000, 1000, 3);
 
-    ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class);
+    BatchableMutationFilterFn.ProcessContext mockProcessContext =
+        Mockito.mock(ProcessContext.class);
     when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
 
     // Capture the outputs.
@@ -888,7 +892,8 @@
 
     BatchableMutationFilterFn testFn = new BatchableMutationFilterFn(null, null, 0, 0, 0);
 
-    ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class);
+    BatchableMutationFilterFn.ProcessContext mockProcessContext =
+        Mockito.mock(ProcessContext.class);
     when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
 
     // Capture the outputs.
@@ -921,8 +926,10 @@
             100, // groupingFactor
             null);
 
-    ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class);
-    FinishBundleContext mockFinishBundleContext = Mockito.mock(FinishBundleContext.class);
+    GatherSortCreateBatchesFn.ProcessContext mockProcessContext =
+        Mockito.mock(ProcessContext.class);
+    GatherSortCreateBatchesFn.FinishBundleContext mockFinishBundleContext =
+        Mockito.mock(FinishBundleContext.class);
     when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
 
     // Capture the outputs.
@@ -982,8 +989,10 @@
             3, // groupingFactor
             null);
 
-    ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class);
-    FinishBundleContext mockFinishBundleContext = Mockito.mock(FinishBundleContext.class);
+    GatherSortCreateBatchesFn.ProcessContext mockProcessContext =
+        Mockito.mock(ProcessContext.class);
+    GatherSortCreateBatchesFn.FinishBundleContext mockFinishBundleContext =
+        Mockito.mock(FinishBundleContext.class);
     when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
     OutputReceiver<Iterable<MutationGroup>> mockOutputReceiver = mock(OutputReceiver.class);
 
@@ -1090,8 +1099,10 @@
   }
 
   private void testAndVerifyBatches(GatherSortCreateBatchesFn testFn) throws Exception {
-    ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class);
-    FinishBundleContext mockFinishBundleContext = Mockito.mock(FinishBundleContext.class);
+    GatherSortCreateBatchesFn.ProcessContext mockProcessContext =
+        Mockito.mock(ProcessContext.class);
+    GatherSortCreateBatchesFn.FinishBundleContext mockFinishBundleContext =
+        Mockito.mock(FinishBundleContext.class);
     when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
 
     // Capture the output at finish bundle..
diff --git a/sdks/java/io/hadoop-file-system/build.gradle b/sdks/java/io/hadoop-file-system/build.gradle
index bb3c05b..f83f318 100644
--- a/sdks/java/io/hadoop-file-system/build.gradle
+++ b/sdks/java/io/hadoop-file-system/build.gradle
@@ -19,7 +19,6 @@
 plugins { id 'org.apache.beam.module' }
 applyJavaNature(
   enableChecker:false,
-  ignoreRawtypeErrors:true,
   automaticModuleName: 'org.apache.beam.sdk.io.hdfs')
 
 description = "Apache Beam :: SDKs :: Java :: IO :: Hadoop File System"
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
index 8368aff..bdfc94e 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
@@ -50,7 +50,7 @@
   private static final String CONFIG_KEY_DFS_NAMESERVICES = "dfs.nameservices";
 
   @Override
-  public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
+  public Iterable<FileSystem<?>> fromOptions(@Nonnull PipelineOptions options) {
     final List<Configuration> configurations =
         options.as(HadoopFileSystemOptions.class).getHdfsConfiguration();
     if (configurations == null) {
@@ -63,7 +63,7 @@
             "The %s currently only supports at most a single Hadoop configuration.",
             HadoopFileSystemRegistrar.class.getSimpleName()));
 
-    final ImmutableList.Builder<FileSystem> builder = ImmutableList.builder();
+    final ImmutableList.Builder<FileSystem<?>> builder = ImmutableList.builder();
     final Set<String> registeredSchemes = new HashSet<>();
 
     // this will only do zero or one loop
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
index 23e5e06..d1fc7c4 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
@@ -59,8 +59,8 @@
     assertThat(
         deserializedConfiguration,
         Matchers.<Map.Entry<String, String>>containsInAnyOrder(
-            new AbstractMap.SimpleEntry("testPropertyA", "A"),
-            new AbstractMap.SimpleEntry("testPropertyB", "B"),
-            new AbstractMap.SimpleEntry("testPropertyC", "baseC")));
+            new AbstractMap.SimpleEntry<>("testPropertyA", "A"),
+            new AbstractMap.SimpleEntry<>("testPropertyB", "B"),
+            new AbstractMap.SimpleEntry<>("testPropertyC", "baseC")));
   }
 }
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
index 8f3fa3d..f2f289f 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
@@ -57,11 +57,11 @@
     assertThat(
         options.getHdfsConfiguration().get(0),
         Matchers.<Map.Entry<String, String>>contains(
-            new AbstractMap.SimpleEntry("propertyA", "A")));
+            new AbstractMap.SimpleEntry<>("propertyA", "A")));
     assertThat(
         options.getHdfsConfiguration().get(1),
         Matchers.<Map.Entry<String, String>>contains(
-            new AbstractMap.SimpleEntry("propertyB", "B")));
+            new AbstractMap.SimpleEntry<>("propertyB", "B")));
   }
 
   @Test
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
index 52e67c7..f8aaeb6 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
@@ -68,7 +68,7 @@
     for (FileSystemRegistrar registrar :
         Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
       if (registrar instanceof HadoopFileSystemRegistrar) {
-        Iterable<FileSystem> fileSystems = registrar.fromOptions(options);
+        Iterable<FileSystem<?>> fileSystems = registrar.fromOptions(options);
         assertEquals(
             hdfsClusterBaseUri.getScheme(),
             ((HadoopFileSystem) Iterables.getOnlyElement(fileSystems)).getScheme());
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 892074d..c2ef750 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -21,6 +21,7 @@
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
 import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.io.Serializable;
 import java.sql.Connection;
@@ -559,6 +560,8 @@
       return rows;
     }
 
+    // Spotbugs seems to not understand the multi-statement try-with-resources
+    @SuppressFBWarnings("OBL_UNSATISFIED_OBLIGATION")
     private Schema inferBeamSchema() {
       DataSource ds = getDataSourceProviderFn().apply(null);
       try (Connection conn = ds.getConnection();
@@ -889,6 +892,8 @@
     }
 
     @ProcessElement
+    // Spotbugs seems to not understand the nested try-with-resources
+    @SuppressFBWarnings("OBL_UNSATISFIED_OBLIGATION")
     public void processElement(ProcessContext context) throws Exception {
       // Only acquire the connection if we need to perform a read.
       if (connection == null) {
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadRowsRegistrar.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadRowsRegistrar.java
index cc623a2..72c8b05 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadRowsRegistrar.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadRowsRegistrar.java
@@ -38,7 +38,7 @@
   public static final String URN = "beam:external:java:jdbc:read_rows:v1";
 
   @Override
-  public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
+  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
     return ImmutableMap.of(URN, JdbcReadRowsRegistrar.Builder.class);
   }
 
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteRegistrar.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteRegistrar.java
index 46e363d..39997e0 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteRegistrar.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteRegistrar.java
@@ -38,7 +38,7 @@
   public static final String URN = "beam:external:java:jdbc:write:v1";
 
   @Override
-  public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
+  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
     return ImmutableMap.of(URN, JdbcWriteRegistrar.Builder.class);
   }
 
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java
index ad8e0ae..11434c3 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.jms;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Objects;
@@ -32,8 +33,17 @@
   private final @Nullable String jmsMessageID;
   private final long jmsTimestamp;
   private final String jmsCorrelationID;
+
+  // JMS ReplyTo destination is serializable according to the JMS spec even if it doesn't implement
+  // Serializable.
+  @SuppressFBWarnings("SE_BAD_FIELD")
   private final @Nullable Destination jmsReplyTo;
+
+  // JMS destination is serializable according to the JMS spec even if it doesn't implement
+  // Serializable.
+  @SuppressFBWarnings("SE_BAD_FIELD")
   private final Destination jmsDestination;
+
   private final int jmsDeliveryMode;
   private final boolean jmsRedelivered;
   private final String jmsType;
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index b7f2c09..13aabc8 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -492,8 +492,11 @@
       public static final String URN = "beam:external:java:kafka:read:v1";
 
       @Override
-      public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
-        return ImmutableMap.of(URN, AutoValue_KafkaIO_Read.Builder.class);
+      public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+        return ImmutableMap.of(
+            URN,
+            (Class<? extends ExternalTransformBuilder<?, ?, ?>>)
+                (Class<?>) AutoValue_KafkaIO_Read.Builder.class);
       }
 
       /** Parameters class to expose the Read transform to an external SDK. */
@@ -1324,9 +1327,10 @@
         }
         throw new UnsupportedOperationException(
             runner
-                + " is not a runner known to be compatible with Kafka exactly-once sink. "
-                + "This implementation of exactly-once sink relies on specific checkpoint guarantees. "
-                + "Only the runners with known to have compatible checkpoint semantics are allowed.");
+                + " is not a runner known to be compatible with Kafka exactly-once sink. This"
+                + " implementation of exactly-once sink relies on specific checkpoint guarantees."
+                + " Only the runners with known to have compatible checkpoint semantics are"
+                + " allowed.");
       }
     }
 
@@ -1417,8 +1421,10 @@
       public static final String URN = "beam:external:java:kafka:write:v1";
 
       @Override
-      public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
-        return ImmutableMap.of(URN, AutoValue_KafkaIO_Write.Builder.class);
+      public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+        return ImmutableMap.of(
+            URN,
+            (Class<KafkaIO.Write.Builder<?, ?>>) (Class<?>) AutoValue_KafkaIO_Write.Builder.class);
       }
 
       /** Parameters class to expose the Write transform to an external SDK. */
diff --git a/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java b/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java
index 2f9b26d..75fb73b 100644
--- a/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java
+++ b/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java
@@ -164,8 +164,8 @@
     private static Schema schema = new Schema.Parser().parse(rawSchema);
 
     @Override
-    public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
-      ImmutableMap.Builder<String, Class<? extends ExternalTransformBuilder>> builder =
+    public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+      ImmutableMap.Builder<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> builder =
           ImmutableMap.builder();
       builder.put(TEST_PREFIX_URN, PrefixBuilder.class);
       builder.put(TEST_MULTI_URN, MultiBuilder.class);