Merge pull request #12376: [BEAM-10575] Eliminate legacy rawtypes from GCP IOs and some others; enable -Wrawtypes -Werror
diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index 44d966b..3393e6d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -169,7 +169,7 @@
TableRow row = new TableRow();
row.set("string", "abc");
byte[] rawbytes = {(byte) 0xab, (byte) 0xac};
- row.set("bytes", new String(Base64.getEncoder().encodeToString(rawbytes)));
+ row.set("bytes", Base64.getEncoder().encodeToString(rawbytes));
row.set("integer", 5);
row.set("float", 0.5);
row.set("numeric", 5);
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
index 7795db4..2775c8b 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
@@ -146,7 +146,7 @@
List<CompletionCandidate> all = new ArrayList<>();
for (String s : entries) {
String[] countValue = s.split(":", -1);
- all.add(new CompletionCandidate(countValue[0], Integer.valueOf(countValue[1])));
+ all.add(new CompletionCandidate(countValue[0], Integer.parseInt(countValue[1])));
}
return all;
}
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/game/StatefulTeamScoreTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/game/StatefulTeamScoreTest.java
index a4bae4e..6d4c964 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/game/StatefulTeamScoreTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/game/StatefulTeamScoreTest.java
@@ -159,7 +159,6 @@
.apply(ParDo.of(new UpdateTeamScoreFn(100)));
String redTeam = TestUser.RED_ONE.getTeam();
- String blueTeam = TestUser.BLUE_ONE.getTeam();
IntervalWindow window1 = new IntervalWindow(baseTime, teamWindowDuration);
IntervalWindow window2 = new IntervalWindow(window1.end(), teamWindowDuration);
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index 8bf99bf..7c5c80f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -21,10 +21,11 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
/** An wrapper interface that represents the execution of a {@link DoFn}. */
-public interface DoFnRunner<InputT, OutputT> {
+public interface DoFnRunner<InputT extends @Nullable Object, OutputT extends @Nullable Object> {
/** Prepares and calls a {@link DoFn DoFn's} {@link DoFn.StartBundle @StartBundle} method. */
void startBundle();
@@ -38,7 +39,7 @@
* Calls a {@link DoFn DoFn's} {@link DoFn.OnTimer @OnTimer} method for the given timer in the
* given window.
*/
- <KeyT> void onTimer(
+ <KeyT extends @Nullable Object> void onTimer(
String timerId,
String timerFamilyId,
KeyT key,
@@ -57,7 +58,8 @@
* Calls a {@link DoFn DoFn's} {@link DoFn.OnWindowExpiration @OnWindowExpiration} method and
* performs additional task, such as extracts a value saved in a state before garbage collection.
*/
- <KeyT> void onWindowExpiration(BoundedWindow window, Instant timestamp, KeyT key);
+ <KeyT extends @Nullable Object> void onWindowExpiration(
+ BoundedWindow window, Instant timestamp, KeyT key);
/**
* @since 2.5.0
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
index 1537ad5..6c216f5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
@@ -19,6 +19,7 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
@@ -70,6 +71,8 @@
return new ReadableState<PaneInfo>() {
@Override
+ @SuppressFBWarnings(
+ "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") // just prefetch calls to readLater
public ReadableState<PaneInfo> readLater() {
previousPaneFuture.readLater();
return this;
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index d555da6..f667bd4 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.core;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -46,6 +47,7 @@
* Prefetch all bag state in {@code address} across all windows under merge in {@code context},
* except for the bag state in the final state address window which we can blindly append to.
*/
+ @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") // just prefetch calls to readLater
public static <K, T, W extends BoundedWindow> void prefetchBags(
MergingStateAccessor<K, W> context, StateTag<BagState<T>> address) {
Map<W, BagState<T>> map = context.accessInEachMergingWindow(address);
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index a4d51ba..89e8a63 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.core;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
@@ -116,6 +117,7 @@
}
@Override
+ @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") // just prefetch calls to readLater
public void prefetchOnTrigger(StateAccessor<K> state) {
state.access(bufferTag).readLater();
}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 948b281..1498e77 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -19,6 +19,7 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
@@ -69,7 +70,10 @@
StateTags.makeSystemTagInternal(
StateTags.watermarkStateInternal("extra", TimestampCombiner.EARLIEST));
+ // [BEAM-420] Seems likely these should all be transient or this class should not be Serializable
+ @SuppressFBWarnings("SE_BAD_FIELD")
private final TimerInternals timerInternals;
+
private final WindowingStrategy<?, W> windowingStrategy;
private final StateTag<WatermarkHoldState> elementHoldTag;
@@ -270,6 +274,7 @@
}
/** Prefetch watermark holds in preparation for merging. */
+ @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") // just prefetch calls to readLater
public void prefetchOnMerge(MergingStateAccessor<?, W> context) {
Map<W, WatermarkHoldState> map = context.accessInEachMergingWindow(elementHoldTag);
WatermarkHoldState result = context.access(elementHoldTag);
@@ -298,6 +303,7 @@
* all of the existing holds. For example, if the new window implies a later watermark hold, then
* earlier holds may be released.
*/
+ @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") // just prefetch calls to readLater
public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) {
WindowTracing.debug(
"WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{}; " + "outputWatermark:{}",
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 1045813..b75f031 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
@@ -385,21 +386,33 @@
private final BlockingQueue<VisibleExecutorUpdate> updates = new LinkedBlockingQueue<>();
@Override
+ // updates is a non-capacity-limited LinkedBlockingQueue, which can never refuse an offered
+ // update
+ @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
public void failed(Exception e) {
updates.offer(VisibleExecutorUpdate.fromException(e));
}
@Override
+ // updates is a non-capacity-limited LinkedBlockingQueue, which can never refuse an offered
+ // update
+ @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
public void failed(Error e) {
updates.offer(VisibleExecutorUpdate.fromError(e));
}
@Override
+ // updates is a non-capacity-limited LinkedBlockingQueue, which can never refuse an offered
+ // update
+ @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
public void cancelled() {
updates.offer(VisibleExecutorUpdate.cancelled());
}
@Override
+ // updates is a non-capacity-limited LinkedBlockingQueue, which can never refuse an offered
+ // update
+ @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
public void completed() {
updates.offer(VisibleExecutorUpdate.finished());
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index c0c05e4..e3ff795 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -19,6 +19,7 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -125,7 +126,10 @@
static class GbkThenStatefulParDo<K, InputT, OutputT>
extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> {
+ @SuppressFBWarnings(
+ "SE_TRANSIENT_FIELD_NOT_RESTORED") // PTransforms do not actually support serialization.
private final transient DoFn<KV<K, InputT>, OutputT> doFn;
+
private final TupleTagList additionalOutputTags;
private final TupleTag<OutputT> mainOutputTag;
private final List<PCollectionView<?>> sideInputs;
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
index 83a707d..d6262e7 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.metrics;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
@@ -40,6 +41,8 @@
private PrintStream ps;
@Override
+ @SuppressFBWarnings(
+ "DM_DEFAULT_ENCODING") // should this method specify the encoding for the PrintStream?
public void open(MetricConfig config) {
synchronized (this) {
if (path == null) {
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
index 41c9824..57494d6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
@@ -22,6 +22,7 @@
import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
@@ -142,6 +143,7 @@
* Return {@code true} if the job succeeded or {@code false} if it terminated in any other manner.
*/
@SuppressWarnings("FutureReturnValueIgnored") // Job status checked via job.waitUntilFinish
+ @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
private boolean waitForStreamingJobTermination(
final DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
// In streaming, there are infinite retries, so rather than timeout
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/OutputReference.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/OutputReference.java
index 9b9ced3..21732e6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/OutputReference.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/OutputReference.java
@@ -21,6 +21,7 @@
import com.google.api.client.json.GenericJson;
import com.google.api.client.util.Key;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Objects;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -30,6 +31,7 @@
*/
public final class OutputReference extends GenericJson {
@Key("@type")
+ @SuppressFBWarnings("SS_SHOULD_BE_STATIC") // read via reflection so must be Field just like this
public final String type = "OutputReference";
@Key("step_name")
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
index 327234c..6d80563 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
@@ -22,7 +22,6 @@
import com.google.api.client.json.GenericJson;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.JsonGenerator;
-import com.google.api.client.util.Charsets;
import com.google.api.services.dataflow.model.InstructionOutput;
import com.google.api.services.dataflow.model.ParallelInstruction;
import com.google.api.services.dataflow.model.SideInputInfo;
@@ -42,6 +41,7 @@
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
/** Container class for different types of network nodes. All nodes only have reference equality. */
@@ -169,7 +169,7 @@
generator.enablePrettyPrint();
generator.serialize(json);
generator.flush();
- return byteStream.toString();
+ return byteStream.toString(Charsets.UTF_8.name());
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
index e4de8cb9..08a995d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
@@ -37,6 +38,7 @@
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
/**
* A {@link PrintStream} factory that creates {@link PrintStream}s which output to the specified JUL
@@ -66,14 +68,17 @@
private int carryOverBytes;
private byte[] carryOverByteArray;
- private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel) {
+ private JulHandlerPrintStream(Handler handler, String loggerName, Level logLevel)
+ throws UnsupportedEncodingException {
super(
new OutputStream() {
@Override
public void write(int i) throws IOException {
throw new RuntimeException("All methods should be overwritten so this is unused");
}
- });
+ },
+ false,
+ Charsets.UTF_8.name());
this.handler = handler;
this.loggerName = loggerName;
this.messageLevel = logLevel;
@@ -401,7 +406,11 @@
* specified {@code loggerName} and {@code level}.
*/
static PrintStream create(Handler handler, String loggerName, Level messageLevel) {
- return new JulHandlerPrintStream(handler, loggerName, messageLevel);
+ try {
+ return new JulHandlerPrintStream(handler, loggerName, messageLevel);
+ } catch (UnsupportedEncodingException exc) {
+ throw new RuntimeException("Encoding not supported: " + Charsets.UTF_8.name(), exc);
+ }
}
@VisibleForTesting
diff --git a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarCreator.java b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarCreator.java
index 71397dc..ef05075 100644
--- a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarCreator.java
+++ b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/PortablePipelineJarCreator.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.jobsubmission;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -90,7 +92,7 @@
.as(PortablePipelineOptions.class);
final String jobName = jobInfo.jobName();
- File outputFile = new File(pipelineOptions.getOutputExecutablePath());
+ File outputFile = new File(checkArgumentNotNull(pipelineOptions.getOutputExecutablePath()));
LOG.info("Creating jar {} for job {}", outputFile.getAbsolutePath(), jobName);
outputStream =
new JarOutputStream(new FileOutputStream(outputFile), createManifest(mainClass, jobName));
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java
index 16a1298..d87cbd2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/CsvSink.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.spark.metrics.sink;
import com.codahale.metrics.MetricRegistry;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Properties;
import org.apache.beam.runners.spark.metrics.AggregatorMetric;
import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
@@ -26,6 +27,8 @@
/**
* A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} metrics to a CSV file.
*/
+// Intentionally overriding parent name because inheritors should replace the parent.
+@SuppressFBWarnings("NM_SAME_SIMPLE_NAME_AS_SUPERCLASS")
public class CsvSink extends org.apache.spark.metrics.sink.CsvSink {
public CsvSink(
final Properties properties,
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java
index c25e8cd..eca1b2b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java
@@ -18,12 +18,15 @@
package org.apache.beam.runners.spark.metrics.sink;
import com.codahale.metrics.MetricRegistry;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Properties;
import org.apache.beam.runners.spark.metrics.AggregatorMetric;
import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
import org.apache.spark.metrics.sink.Sink;
/** A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} metrics to Graphite. */
+// Intentionally overriding parent name because inheritors should replace the parent.
+@SuppressFBWarnings("NM_SAME_SIMPLE_NAME_AS_SUPERCLASS")
public class GraphiteSink extends org.apache.spark.metrics.sink.GraphiteSink {
public GraphiteSink(
final Properties properties,
diff --git a/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml
index 7811398..939b58c 100644
--- a/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml
@@ -16,418 +16,60 @@
limitations under the License.
-->
<FindBugsFilter>
- <!-- Ignored bug categories. Bugs in these categories should not block the build. -->
- <Bug category="I18N"/>
- <Bug pattern="DM_STRING_CTOR"/>
- <Bug pattern="EI_EXPOSE_REP" />
- <Bug pattern="EI_EXPOSE_REP2" />
+ <!--
+ Beam UDFs are intended to be serialized for transmission only. There is no expectation that they can
+ be stored and read by a different version of the code.
+ -->
<Bug pattern="SE_NO_SERIALVERSIONID"/>
- <!-- The uncallable method error fails on @ProcessElement style methods -->
+ <!--
+ The preferred way to find nullness errors is {@code enableChecker: true}. If a module has
+ many Spotbugs nullness errors, it is not worthwhile to fix them. But Spotbugs is useful for
+ other coding issues. Instead of disabling spotbugs, turn on this flag.
+
+ More context: Migration from Findbugs to Spotbugs caused a loss of coverage for nullness. Then upgrade
+ from Spotbugs 3.1.12 to 4.0.6 found hundreds of new nullness errors spread throughout all modules.
+ It is not worthwhile to fix them, but instead they are disabled and the modules should eventually be
+ fixed to pass the Checker Framework nullness type system.
+ -->
+ <Bug code="NP,RCN" />
+
+ <!--
+ When arrays are passed in and out of objects, spotbugs warns. Yet most objects are mutable in some way in
+ Java and carry the same risk. These warning codes produce a large number of unhelpful errors having to do with
+ passing byte[] arrays around, since Beam deals in serialized data. Trusting callers and callees not to mutate
+ things inappropriately is inherent to Java.
+ -->
+ <Bug code="EI,EI2" />
+
+ <!--
+ Beam DoFns are invoked via reflection by looking at the annotations. To spotbugs, these methods
+ seem to be uncallable because they are on anonymous classes.
+ -->
<Bug pattern="UMAC_UNCALLABLE_METHOD_OF_ANONYMOUS_CLASS"/>
- <!-- Suppress checking of AutoValue internals -->
+ <!--
+ Many test classes are captured by lambdas and marked `implements Serializable`. They are not
+ actually serializable, so the TestPipeline that they employ does not need to be transient or
+ serializable.
+ -->
+ <Match>
+ <Bug pattern="SE_BAD_FIELD" />
+ <Class name=".*Test$" />
+ <Field type="org.apache.beam.sdk.testing.TestPipeline" />
+ </Match>
+
+ <!-- Suppress checking of autogenerated classes -->
<Match>
<Class name="~.*AutoValue_.*"/>
</Match>
-
- <!--
- Suppressed findbugs issues. All new issues should include a comment why they're
- suppressed.
-
- Suppressions should go in this file rather than inline using @SuppressFBWarnings to avoid
- unapproved artifact license.
- -->
<Match>
- <Class name="org.apache.beam.sdk.coders.AvroCoder$SerializableSchemaSupplier"/>
- <Field name="schema"/>
- <Bug pattern="SE_BAD_FIELD"/>
- <!--
- writeReplace makes this object serializable. This is a limitation of FindBugs as discussed here:
- http://stackoverflow.com/questions/26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern
- -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.coders.SerializableCoder"/>
- <Field name="typeDescriptor"/>
- <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
- <!--
- the field is used only in getEncodedTypeDescriptor, where it is restored if it is not present due to
- serialization
- -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.io.jms.JmsRecord"/>
- <Field name="jmsDestination"/>
- <Bug pattern="SE_BAD_FIELD"/>
- <!--
- JMS destination is serializable according to the JMS spec even if it doesn't implement
- Serializable.
- -->
- </Match>
- <Match>
- <Class name="org.apache.beam.sdk.io.jms.JmsRecord"/>
- <Field name="jmsReplyTo"/>
- <Bug pattern="SE_BAD_FIELD"/>
- <!--
- JMS ReplyTo destination is serializable according to the JMS spec even if it doesn't implement
- Serializable.
- -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.coders.InstantCoder$LexicographicLongConverter"/>
- <Bug pattern="HE_INHERITS_EQUALS_USE_HASHCODE"/>
- <!-- Converter overrides .equals() to add documentation but does not change behavior -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.testing.PAssert$PCollectionViewAssert"/>
- <Method name="equals" />
- <Bug pattern="EQ_UNUSUAL"/>
- <!-- Unsupported operation -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.testing.PAssert$PCollectionContentsAssert"/>
- <Method name="equals" />
- <Bug pattern="EQ_UNUSUAL"/>
- <!-- Unsupported operation -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.testing.SerializableMatchers$SerializableArrayViaCoder"/>
- <Field name="value" />
- <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
- <!-- Cached value is lazily restored on read. -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.transforms.Mean$CountSum"/>
- <Method name="equals" />
- <Bug pattern="FE_FLOATING_POINT_EQUALITY"/>
- <!-- Comparing doubles directly since equals method is only used in coder test. -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.util.ExposedByteArrayInputStream"/>
- <Method name="readAll" />
- <Bug pattern="EI_EXPOSE_REP"/>
- <!-- Returns internal buffer by design. -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.util.ExposedByteArrayOutputStream"/>
- <Method name="toByteArray" />
- <Bug pattern="EI_EXPOSE_REP"/>
- <!-- Returns internal buffer by design. -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.util.ExposedByteArrayOutputStream"/>
- <Method name="toByteArray" />
- <Bug pattern="EI_EXPOSE_REP"/>
- <!-- Returns internal buffer by design. -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.util.ExposedByteArrayOutputStream"/>
- <Method name="writeAndOwn" />
- <Bug pattern="EI_EXPOSE_REP"/>
- <!-- Takes ownership of input buffer -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runners.dataflow.util.MonitoringUtilTest" />
- <Field name="thrown" />
- <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" />
- <!-- TestRule used automatically by JUnit framework -->
- </Match>
- <Match>
- <Class name="org.apache.beam.runners.dataflow.util.OutputReference" />
- <Field name="type" />
- <Bug pattern="SS_SHOULD_BE_STATIC" />
- <!-- Field read via reflection -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runners.dataflow.options.DataflowPipelineOptionsTest" />
- <Field name="restoreSystemProperties" />
- <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" />
- <!-- TestRule used automatically by JUnit framework -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runners.dataflow.TestDataflowRunner" />
- <Method name="waitForStreamingJobTermination" />
- <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
- <!-- waitForStreamingJobTermination checks status via job.waitUntilFinish() -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$QueueMessageReceiver" />
- <Or>
- <Method name="failed" />
- <Method name="cancelled" />
- <Method name="completed" />
- </Or>
- <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
- <!-- updates is a non-capacity-limited LinkedBlockingQueue, which
- can never refuse an offered update -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$CodedBroadcastHelper"/>
- <Or>
- <Field name="bcast" />
- <Field name="value" />
- </Or>
- <Bug pattern="IS2_INCONSISTENT_SYNC"/>
- <!--
- Spark's Broadcast variables are a distributed and cached objects
- and should not be treated as "normal" objects.
- -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runners.spark.util.BroadcastHelper$DirectBroadcastHelper"/>
- <Or>
- <Field name="bcast" />
- <Field name="value" />
- </Or>
- <Bug pattern="IS2_INCONSISTENT_SYNC"/>
- <!--
- Spark's Broadcast variables are a distributed and cached objects
- and should not be treated as "normal" objects.
- -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runners.spark.metrics.sink.CsvSink"/>
- <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
- <!-- Intentionally overriding parent name because inheritors should replace the parent. -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runners.spark.metrics.sink.GraphiteSink"/>
- <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
- <!-- Intentionally overriding parent name because inheritors should replace the parent. -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.io.LocalResourceId"/>
- <Method name="getCurrentDirectory" />
- <Bug pattern="NP_NULL_PARAM_DEREF"/>
- <!--
- Path.getParent() could return null. However, we check the returned Path is not null.
- -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.extensions.gcp.storage.GcsResourceId"/>
- <Method name="getCurrentDirectory" />
- <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
- <!--
- GcsPath.getParent() could return null. However, we check the returned Path is not null.
- -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.util.ZipFiles"/>
- <Method name="zipDirectory" />
- <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
- <!--
- File.listFiles() will return null if the File instance is not a directory. Null dereference is
- not a possibility here since we validate sourceDirectory is directory via
- sourceDirectory.isDirectory()
- -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.util.ZipFiles"/>
- <Method name="zipDirectoryInternal" />
- <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
- <!--
- File.listFiles() will return null if the File instance is not a directory. Null dereference is
- not a possibility here since we validate sourceDirectory is directory via
- sourceDirectory.isDirectory()
- -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runners.core.StateMerging"/>
- <Method name="prefetchRead" />
- <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
- <!-- prefetch call readLater -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runner.core.PaneInfoTracker"/>
- <Method name="getNextPaneInfo" />
- <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
- <!-- prefetch side effect -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runner.core.SystemReduceFn"/>
- <Method name="prefetchOnTrigger" />
- <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
- <!-- prefetch side effect -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runner.core.WatermarkHold"/>
- <Method name="extractAndRelease" />
- <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
- <!-- prefetch call readLater -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runners.core.triggers.AfterDelayFromFirstElementStateMachine"/>
- <Method name="prefetchOnElement" />
- <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
- <!-- prefetch side effect -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runners.core.triggers.AfterDelayFromFirstElementStateMachine"/>
- <Method name="prefetchShouldFire" />
- <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
- <!-- prefetch side effect -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runners.core.triggers.AfterPaneStateMachine"/>
- <Method name="prefetchShouldFire" />
- <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
- <!-- prefetch side effect -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runners.core.triggers.TriggerStateMachines$StateMachineConverter"/>
- <Method name="evaluateSpecific" />
- <Bug pattern="UPM_UNCALLED_PRIVATE_METHOD"/>
- <!-- Called via reflection -->
- </Match>
-
- <!--
- Baseline issues. No new issues should be added below this line and all existing issues should
- have an associated JIRA
- -->
-
- <Match>
- <Class name="org.apache.beam.sdk.coders.JAXBCoder"/>
- <Method name="getContext"/>
- <Bug pattern="DC_DOUBLECHECK"/>
- <!--[BEAM-398] Possible double check of field-->
- </Match>
- <Match>
- <Class name="org.apache.beam.sdk.testing.WindowSupplier"/>
- <Field name="windows"/>
- <Bug pattern="IS2_INCONSISTENT_SYNC"/>
- <!--[BEAM-407] Inconsistent synchronization -->
- </Match>
- <Match>
- <Class name="org.apache.beam.sdk.util.CombineFnUtil$NonSerializableBoundedCombineFn"/>
- <Field name="context"/>
- <Bug pattern="SE_BAD_FIELD"/>
- <!--
- The class is not meant to be serializable, writeObject() just throws an exception. Therefore
- it's reasonable for this field to also not be serializable.
- -->
- </Match>
- <Match>
- <Class name="org.apache.beam.runners.core.WatermarkHold"/>
- <Field name="timerInternals"/>
- <Bug pattern="SE_BAD_FIELD"/>
- <!--[BEAM-420] Non-transient non-serializable instance field in serializable class-->
- </Match>
- <Match>
- <Class name="StateSpecs$CombiningStateSpec"/>
- <Method name="equals"/>
- <Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
- <!--[BEAM-421] Class doesn't override equals in superclass-->
- </Match>
- <Match>
- <Class name="org.apache.beam.sdk.util.ClassPath$ClassInfo"/>
- <Method name="equals"/>
- <Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
- <!--[BEAM-1676] Class doesn't override equals in superclass-->
- </Match>
- <Match>
- <Class name="org.apache.beam.sdk.util.AutoValue_GcsUtil_StorageObjectOrIOException"/>
- <Bug pattern="NM_CLASS_NOT_EXCEPTION"/>
- <!-- It is clear from the name that this class holds either StorageObject or IOException. -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.extensions.gcp.util.GcsUtil$StorageObjectOrIOException"/>
- <Bug pattern="NM_CLASS_NOT_EXCEPTION"/>
- <!-- It is clear from the name that this class holds either StorageObject or IOException. -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.runners.direct.ParDoMultiOverrideFactory$StatefulParDo"/>
- <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
- <!-- PTransforms do not actually support serialization. -->
- </Match>
-
- <Match>
- <Class name="org.apache.beam.sdk.options.ProxyInvocationHandler"/>
- <Field name="~.*"/>
- <Bug pattern="SE_BAD_FIELD"/>
- <!--
- ProxyInvocationHandler implements Serializable only for the sake of throwing an informative
- exception in writeObject()
- -->
- </Match>
-
- <Match>
- <!--
- Classes in this package is auto-generated, let's disable the findbugs for it.
- -->
<Package name="org.apache.beam.sdk.extensions.sql.impl.parser.impl"/>
</Match>
-
<Match>
- <!--
- Classes in this package is auto-generated, let's disable the findbugs for it.
- -->
<Package name="org.apache.beam.sdk.schemas.parser.generated"/>
</Match>
-
<Match>
- <!--
- Classes in this package is auto-generated, let's disable the findbugs for it.
- -->
<Package name="org.apache.beam.sdk.io.clickhouse.impl.parser"/>
</Match>
-
- <Match>
- <Class name="org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory$SimpleStageBundleFactory"/>
- <Field name="wrappedClient"/>
- <Bug pattern="URF_UNREAD_FIELD"/>
- <!-- Fix build. -->
- </Match>
-
- <Match>
- <!--
- This is a false positive. Spotbugs does not recognize the use of try-with-resources, so it thinks that
- the connection is not correctly closed.
- -->
- <Or>
- <And>
- <Class name="org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn"/>
- <Method name="processElement"/>
- </And>
- <And>
- <Class name="org.apache.beam.sdk.io.jdbc.JdbcIO$ReadRows"/>
- <Method name="inferBeamSchema"/>
- </And>
- </Or>
-
- <Bug pattern="OBL_UNSATISFIED_OBLIGATION"/>
- </Match>
</FindBugsFilter>
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 83bc428..8b77446 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.coders;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -210,6 +211,10 @@
* serialization and hence is able to encode the {@link Schema} object directly.
*/
private static class SerializableSchemaSupplier implements Serializable, Supplier<Schema> {
+ // writeReplace makes this object serializable. This is a limitation of FindBugs as discussed
+ // here:
+ // http://stackoverflow.com/questions/26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern
+ @SuppressFBWarnings("SE_BAD_FIELD")
private final Schema schema;
private SerializableSchemaSupplier(Schema schema) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
index c0cc6c5..ffdf3fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.coders;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
@@ -166,6 +167,8 @@
private final Class<T> type;
/** Access via {@link #getEncodedTypeDescriptor()}. */
+ // the field is restored lazily if it is not present due to serialization
+ @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED")
private transient @Nullable TypeDescriptor<T> typeDescriptor;
protected SerializableCoder(Class<T> type, TypeDescriptor<T> typeDescriptor) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java
index fb922b0..bbdd3a5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java
@@ -30,5 +30,5 @@
public interface ExternalTransformRegistrar {
/** A mapping from URN to an {@link ExternalTransformBuilder} class. */
- Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders();
+ Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders();
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java
index 3560464..07f940e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java
@@ -154,7 +154,7 @@
@Experimental(Experimental.Kind.FILESYSTEM)
public static class ClassLoaderFileSystemRegistrar implements FileSystemRegistrar {
@Override
- public Iterable<FileSystem> fromOptions(@Nullable PipelineOptions options) {
+ public Iterable<FileSystem<?>> fromOptions(@Nullable PipelineOptions options) {
return ImmutableList.of(new ClassLoaderFileSystem());
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
index 7da675c..1e9ff1c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
@@ -40,5 +40,5 @@
* <p>Each {@link FileSystem#getScheme() scheme} is required to be unique among all {@link
* FileSystem}s registered by all {@link FileSystemRegistrar}s.
*/
- Iterable<FileSystem> fromOptions(PipelineOptions options);
+ Iterable<FileSystem<?>> fromOptions(PipelineOptions options);
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java
index f93cb8f..7c5c483 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java
@@ -134,7 +134,7 @@
public static final String URN = "beam:external:java:generate_sequence:v1";
@Override
- public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
+ public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
return ImmutableMap.of(URN, AutoValue_GenerateSequence.Builder.class);
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
index 3a15c67..09f1571 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
@@ -29,7 +29,7 @@
@Experimental(Kind.FILESYSTEM)
public class LocalFileSystemRegistrar implements FileSystemRegistrar {
@Override
- public Iterable<FileSystem> fromOptions(@Nullable PipelineOptions options) {
+ public Iterable<FileSystem<?>> fromOptions(@Nullable PipelineOptions options) {
return ImmutableList.of(new LocalFileSystem());
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index 7c75893..ff8e1c6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -33,6 +33,7 @@
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.beans.PropertyDescriptor;
import java.io.IOException;
import java.io.NotSerializableException;
@@ -95,8 +96,17 @@
private final int hashCode = ThreadLocalRandom.current().nextInt();
private final Set<Class<? extends PipelineOptions>> knownInterfaces;
+
+ // ProxyInvocationHandler implements Serializable only for the sake of throwing an informative
+ // exception in writeObject()
+ @SuppressFBWarnings("SE_BAD_FIELD")
private final ClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
+
+ // ProxyInvocationHandler implements Serializable only for the sake of throwing an informative
+ // exception in writeObject()
+ @SuppressFBWarnings("SE_BAD_FIELD")
private final Map<String, BoundValue> options;
+
private final Map<String, JsonNode> jsonOptions;
private final Map<String, String> gettersToPropertyNames;
private final Map<String, String> settersToPropertyNames;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 937be48..05b1914 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -681,6 +681,7 @@
*/
@Deprecated
@Override
+ @SuppressFBWarnings("EQ_UNUSUAL")
public boolean equals(@Nullable Object o) {
throw new UnsupportedOperationException(
"If you meant to test object equality, use .containsInAnyOrder instead.");
@@ -1048,6 +1049,7 @@
*/
@Deprecated
@Override
+ @SuppressFBWarnings("EQ_UNUSUAL")
public boolean equals(@Nullable Object o) {
throw new UnsupportedOperationException(
String.format(
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
index 97fd9b4..70d35b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
@@ -20,6 +20,7 @@
import static org.hamcrest.Matchers.in;
import static org.hamcrest.core.Is.is;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
@@ -796,6 +797,7 @@
*/
private static class SerializableArrayViaCoder<T> implements SerializableSupplier<T[]> {
/** Cached value that is not serialized. */
+ @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED")
private transient T @Nullable [] value;
/** The bytes of {@link #value} when encoded via {@link #coder}. */
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowSupplier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowSupplier.java
index 4c27ecd..0458817 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowSupplier.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowSupplier.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.testing;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.util.Collection;
import org.apache.beam.sdk.coders.Coder;
@@ -33,6 +34,8 @@
* supplier is {@link Serializable}, and handles encoding and decoding the windows with a {@link
* Coder} provided for the windows.
*/
+// Spotbugs thinks synchronization is for the fields, but it is for the act of decoding
+@SuppressFBWarnings("IS2_INCONSISTENT_SYNC")
final class WindowSupplier implements Supplier<Collection<BoundedWindow>>, Serializable {
private final Coder<? extends BoundedWindow> coder;
private final Collection<byte[]> encodedWindows;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index 6e50010..925cf89 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.transforms;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -149,6 +150,9 @@
}
@Override
+ // Comparing doubles directly since class is package private and equals method is only used in
+ // coder test.
+ @SuppressFBWarnings("FE_FLOATING_POINT_EQUALITY")
public boolean equals(@Nullable Object other) {
if (!(other instanceof CountSum)) {
return false;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
index 73885e6..20d6325 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.util;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.NotSerializableException;
import java.io.ObjectOutputStream;
@@ -115,6 +116,9 @@
private static class NonSerializableBoundedCombineFn<InputT, AccumT, OutputT>
extends CombineFn<InputT, AccumT, OutputT> {
private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn;
+
+ // The class is not meant to be serializable, writeObject() just throws an exception.
+ @SuppressFBWarnings("SE_BAD_FIELD")
private final Context context;
private NonSerializableBoundedCombineFn(
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExposedByteArrayOutputStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExposedByteArrayOutputStream.java
index 78ab0f5..fca7491f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExposedByteArrayOutputStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExposedByteArrayOutputStream.java
@@ -62,6 +62,7 @@
*
* <p><i>Note: After passing any byte array to this method, it must not be modified again.</i>
*/
+ // Takes ownership of input buffer by design - Spotbugs is right to warn that this is dangerous
public synchronized void writeAndOwn(byte[] b) throws IOException {
if (b.length == 0) {
return;
@@ -91,6 +92,7 @@
}
@Override
+ // Exposes internal mutable reference by design - Spotbugs is right to warn that this is dangerous
public synchronized byte[] toByteArray() {
// Note: count == buf.length is not a correct criteria to "return buf;", because the internal
// buf may be reused after reset().
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java
index af7e1f4..c263146 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemRegistrarTest.java
@@ -38,7 +38,8 @@
for (FileSystemRegistrar registrar :
Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
if (registrar instanceof LocalFileSystemRegistrar) {
- Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create());
+ Iterable<FileSystem<?>> fileSystems =
+ registrar.fromOptions(PipelineOptionsFactory.create());
assertThat(fileSystems, contains(instanceOf(LocalFileSystem.class)));
return;
}
diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
index fe6dcf5..c20e8d2 100644
--- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
+++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
@@ -107,10 +107,10 @@
ImmutableMap.builder();
for (ExternalTransformRegistrar registrar :
ServiceLoader.load(ExternalTransformRegistrar.class)) {
- for (Map.Entry<String, Class<? extends ExternalTransformBuilder>> entry :
+ for (Map.Entry<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> entry :
registrar.knownBuilders().entrySet()) {
String urn = entry.getKey();
- Class<? extends ExternalTransformBuilder> builderClass = entry.getValue();
+ Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass = entry.getValue();
builder.put(
urn,
spec -> {
@@ -130,7 +130,7 @@
private static PTransform<?, ?> translate(
ExternalTransforms.ExternalConfigurationPayload payload,
- Class<? extends ExternalTransformBuilder> builderClass)
+ Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass)
throws Exception {
Preconditions.checkState(
ExternalTransformBuilder.class.isAssignableFrom(builderClass),
@@ -142,8 +142,8 @@
return buildTransform(builderClass, configObject);
}
- private static Object initConfiguration(Class<? extends ExternalTransformBuilder> builderClass)
- throws Exception {
+ private static Object initConfiguration(
+ Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass) throws Exception {
for (Method method : builderClass.getMethods()) {
if (method.getName().equals("buildExternal")) {
Preconditions.checkState(
@@ -241,9 +241,9 @@
}
private static PTransform<?, ?> buildTransform(
- Class<? extends ExternalTransformBuilder> builderClass, Object configObject)
+ Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass, Object configObject)
throws Exception {
- Constructor<? extends ExternalTransformBuilder> constructor =
+ Constructor<? extends ExternalTransformBuilder<?, ?, ?>> constructor =
builderClass.getDeclaredConstructor();
constructor.setAccessible(true);
ExternalTransformBuilder<?, ?, ?> externalTransformBuilder = constructor.newInstance();
diff --git a/sdks/java/extensions/google-cloud-platform-core/build.gradle b/sdks/java/extensions/google-cloud-platform-core/build.gradle
index 5629d98..bd23376 100644
--- a/sdks/java/extensions/google-cloud-platform-core/build.gradle
+++ b/sdks/java/extensions/google-cloud-platform-core/build.gradle
@@ -21,7 +21,6 @@
plugins { id 'org.apache.beam.module' }
applyJavaNature(
enableChecker: false,
- ignoreRawtypeErrors: true,
automaticModuleName: 'org.apache.beam.sdk.extensions.gcp')
description = "Apache Beam :: SDKs :: Java :: Extensions :: Google Cloud Platform Core"
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
index 6caff00..b090359 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
@@ -35,7 +35,7 @@
public class GcsFileSystemRegistrar implements FileSystemRegistrar {
@Override
- public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
+ public Iterable<FileSystem<?>> fromOptions(@Nonnull PipelineOptions options) {
checkNotNull(options, "Expect the runner have called FileSystems.setDefaultPipelineOptions().");
return ImmutableList.of(new GcsFileSystem(options.as(GcsOptions.class)));
}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index 5db0978..6845af8 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -44,6 +44,7 @@
import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
import com.google.cloud.hadoop.util.ResilientOperation;
import com.google.cloud.hadoop.util.RetryDeterminer;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.SeekableByteChannel;
@@ -807,6 +808,8 @@
}
/** A class that holds either a {@link StorageObject} or an {@link IOException}. */
+ // It is clear from the name that this class holds either StorageObject or IOException.
+ @SuppressFBWarnings("NM_CLASS_NOT_EXCEPTION")
@AutoValue
public abstract static class StorageObjectOrIOException {
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java
index 83a213b..10752dd 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java
@@ -40,7 +40,8 @@
for (FileSystemRegistrar registrar :
Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
if (registrar instanceof GcsFileSystemRegistrar) {
- Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create());
+ Iterable<FileSystem<?>> fileSystems =
+ registrar.fromOptions(PipelineOptionsFactory.create());
assertThat(fileSystems, contains(instanceOf(GcsFileSystem.class)));
return;
}
diff --git a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
index a253a4b0..f5d6def 100644
--- a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
+++ b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
@@ -42,7 +42,7 @@
.build();
@Override
- public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
+ public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
return org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.of(
URN, Builder.class);
}
diff --git a/sdks/java/extensions/sql/jdbc/src/main/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLine.java b/sdks/java/extensions/sql/jdbc/src/main/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLine.java
index 0b1506f..6dd8fc1 100644
--- a/sdks/java/extensions/sql/jdbc/src/main/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLine.java
+++ b/sdks/java/extensions/sql/jdbc/src/main/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLine.java
@@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.checkerframework.checker.nullness.qual.Nullable;
import sqlline.SqlLine;
import sqlline.SqlLine.Status;
@@ -65,11 +66,11 @@
SqlLine sqlLine = new SqlLine();
if (outputStream != null) {
- sqlLine.setOutputStream(new PrintStream(outputStream));
+ sqlLine.setOutputStream(new PrintStream(outputStream, false, Charsets.UTF_8.name()));
}
if (errorStream != null) {
- sqlLine.setErrorStream(new PrintStream(errorStream));
+ sqlLine.setErrorStream(new PrintStream(errorStream, false, Charsets.UTF_8.name()));
}
return sqlLine.begin(modifiedArgs, inputStream, true);
diff --git a/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java b/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
index 3da48a2..6d4fa60 100644
--- a/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
+++ b/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
@@ -49,6 +49,7 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -141,7 +142,7 @@
sqlEnv =
BeamSqlEnv.builder(inMemoryMetaStore)
.setPipelineOptions(PipelineOptionsFactory.create())
- .setRuleSets(new RuleSet[] {RuleSets.ofList(ruleList)})
+ .setRuleSets(ImmutableList.of(RuleSets.ofList(ruleList)))
.build();
sqlEnv.executeDdl(String.format(CREATE_TABLE_STATEMENT, Method.DIRECT_READ.toString()));
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index b8b0e72..9c43664 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -22,6 +22,7 @@
import java.lang.reflect.Method;
import java.sql.SQLException;
import java.util.AbstractMap.SimpleEntry;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -146,7 +147,7 @@
private boolean autoLoadBuiltinFunctions;
private boolean autoLoadUdfs;
private PipelineOptions pipelineOptions;
- private RuleSet[] ruleSets;
+ private Collection<RuleSet> ruleSets;
private BeamSqlEnvBuilder(TableProvider tableProvider) {
checkNotNull(tableProvider, "Table provider for the default schema must be sets.");
@@ -178,7 +179,7 @@
}
/** Set the ruleSet used for query optimizer. */
- public BeamSqlEnvBuilder setRuleSets(RuleSet[] ruleSets) {
+ public BeamSqlEnvBuilder setRuleSets(Collection<RuleSet> ruleSets) {
this.ruleSets = ruleSets;
return this;
}
@@ -309,16 +310,28 @@
}
}
- private QueryPlanner instantiatePlanner(JdbcConnection jdbcConnection, RuleSet[] ruleSets) {
+ private QueryPlanner instantiatePlanner(
+ JdbcConnection jdbcConnection, Collection<RuleSet> ruleSets) {
+ Class<?> queryPlannerClass;
try {
- return (QueryPlanner)
- Class.forName(queryPlannerClassName)
- .getConstructor(JdbcConnection.class, RuleSet[].class)
- .newInstance(jdbcConnection, ruleSets);
- } catch (Exception e) {
+ queryPlannerClass = Class.forName(queryPlannerClassName);
+ } catch (ClassNotFoundException exc) {
throw new RuntimeException(
- String.format("Cannot construct query planner %s", queryPlannerClassName), e);
+ "Cannot find requested QueryPlanner class: " + queryPlannerClassName, exc);
}
+
+ QueryPlanner.Factory factory;
+ try {
+ factory = (QueryPlanner.Factory) queryPlannerClass.getField("FACTORY").get(null);
+ } catch (NoSuchFieldException | IllegalAccessException exc) {
+ throw new RuntimeException(
+ String.format(
+ "QueryPlanner class %s does not have an accessible static field 'FACTORY' of type QueryPlanner.Factory",
+ queryPlannerClassName),
+ exc);
+ }
+
+ return factory.createPlanner(jdbcConnection, ruleSets);
}
}
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
index cea8fba..7e7517b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
@@ -17,9 +17,11 @@
*/
package org.apache.beam.sdk.extensions.sql.impl;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.Factory;
import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters.Kind;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.RelMdNodeStats;
@@ -76,12 +78,21 @@
private final JdbcConnection connection;
/** Called by {@link BeamSqlEnv}.instantiatePlanner() reflectively. */
- public CalciteQueryPlanner(JdbcConnection connection, RuleSet[] ruleSets) {
+ public CalciteQueryPlanner(JdbcConnection connection, Collection<RuleSet> ruleSets) {
this.connection = connection;
this.planner = Frameworks.getPlanner(defaultConfig(connection, ruleSets));
}
- public FrameworkConfig defaultConfig(JdbcConnection connection, RuleSet[] ruleSets) {
+ public static final Factory FACTORY =
+ new Factory() {
+ @Override
+ public QueryPlanner createPlanner(
+ JdbcConnection jdbcConnection, Collection<RuleSet> ruleSets) {
+ return new CalciteQueryPlanner(jdbcConnection, ruleSets);
+ }
+ };
+
+ public FrameworkConfig defaultConfig(JdbcConnection connection, Collection<RuleSet> ruleSets) {
final CalciteConnectionConfig config = connection.config();
final SqlParser.ConfigBuilder parserConfig =
SqlParser.configBuilder()
@@ -115,7 +126,7 @@
.defaultSchema(defaultSchema)
.traitDefs(traitDefs)
.context(Contexts.of(connection.config()))
- .ruleSets(ruleSets)
+ .ruleSets(ruleSets.toArray(new RuleSet[0]))
.costFactory(BeamCostModel.FACTORY)
.typeSystem(connection.getTypeFactory().getTypeSystem())
.operatorTable(ChainedSqlOperatorTable.of(opTab0, catalogReader))
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
index a690082..20e5604 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
@@ -18,12 +18,14 @@
package org.apache.beam.sdk.extensions.sql.impl;
import com.google.auto.value.AutoOneOf;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
/**
* An interface that planners should implement to convert sql statement to {@link BeamRelNode} or
@@ -70,4 +72,8 @@
ImmutableList.copyOf(positionalParams));
}
}
+
+ interface Factory {
+ QueryPlanner createPlanner(JdbcConnection jdbcConnection, Collection<RuleSet> ruleSets);
+ }
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
index 1b8083c..2ecc26b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.impl.planner;
+import java.util.Collection;
import java.util.List;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
@@ -168,14 +169,14 @@
private static final List<RelOptRule> BEAM_TO_ENUMERABLE =
ImmutableList.of(BeamEnumerableConverterRule.INSTANCE);
- public static RuleSet[] getRuleSets() {
- return new RuleSet[] {
- RuleSets.ofList(
- ImmutableList.<RelOptRule>builder()
- .addAll(BEAM_CONVERTERS)
- .addAll(BEAM_TO_ENUMERABLE)
- .addAll(LOGICAL_OPTIMIZATIONS)
- .build())
- };
+ public static Collection<RuleSet> getRuleSets() {
+
+ return ImmutableList.of(
+ RuleSets.ofList(
+ ImmutableList.<RelOptRule>builder()
+ .addAll(BEAM_CONVERTERS)
+ .addAll(BEAM_TO_ENUMERABLE)
+ .addAll(LOGICAL_OPTIMIZATIONS)
+ .build()));
}
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java
index 37fbc61..5cd2676 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java
@@ -45,7 +45,6 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectCalcMergeRule;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectToCalcRule;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.Pair;
import org.junit.Before;
@@ -88,7 +87,7 @@
sqlEnv =
BeamSqlEnv.builder(tableProvider)
.setPipelineOptions(PipelineOptionsFactory.create())
- .setRuleSets(new RuleSet[] {RuleSets.ofList(defaultRules)})
+ .setRuleSets(ImmutableList.of(RuleSets.ofList(defaultRules)))
.build();
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java
index 2d0a1be..a967fc3 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/JoinReorderingTest.java
@@ -232,7 +232,7 @@
TestTableProvider tableProvider = new TestTableProvider();
createThreeTables(tableProvider);
List<RelOptRule> ruleSet =
- Arrays.stream(BeamRuleSets.getRuleSets())
+ BeamRuleSets.getRuleSets().stream()
.flatMap(rules -> StreamSupport.stream(rules.spliterator(), false))
.filter(rule -> !(rule instanceof BeamJoinPushThroughJoinRule))
.filter(rule -> !(rule instanceof BeamJoinAssociateRule))
@@ -242,7 +242,7 @@
BeamSqlEnv env =
BeamSqlEnv.builder(tableProvider)
.setPipelineOptions(PipelineOptionsFactory.create())
- .setRuleSets(new RuleSet[] {RuleSets.ofList(ruleSet)})
+ .setRuleSets(ImmutableList.of(RuleSets.ofList(ruleSet)))
.build();
// This is Join(Join(medium, large), small) which should be converted to a join that large table
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java
index 1acb94f..d3221e0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java
@@ -48,7 +48,6 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterToCalcRule;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectCalcMergeRule;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectToCalcRule;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets;
import org.joda.time.Duration;
import org.junit.Before;
@@ -93,7 +92,7 @@
sqlEnv =
BeamSqlEnv.builder(tableProvider)
.setPipelineOptions(PipelineOptionsFactory.create())
- .setRuleSets(new RuleSet[] {RuleSets.ofList(rulesWithPushDown)})
+ .setRuleSets(ImmutableList.of(RuleSets.ofList(rulesWithPushDown)))
.build();
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java
index e64a103..9a67950 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java
@@ -50,7 +50,6 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterToCalcRule;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectCalcMergeRule;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectToCalcRule;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.joda.time.Duration;
@@ -96,7 +95,7 @@
sqlEnv =
BeamSqlEnv.builder(tableProvider)
.setPipelineOptions(PipelineOptionsFactory.create())
- .setRuleSets(new RuleSet[] {RuleSets.ofList(rulesWithPushDown)})
+ .setRuleSets(ImmutableList.of(RuleSets.ofList(rulesWithPushDown)))
.build();
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java
index 363c0f2..941e984 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java
@@ -48,7 +48,6 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterToCalcRule;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectCalcMergeRule;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectToCalcRule;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets;
import org.joda.time.Duration;
import org.junit.Before;
@@ -92,7 +91,7 @@
sqlEnv =
BeamSqlEnv.builder(tableProvider)
.setPipelineOptions(PipelineOptionsFactory.create())
- .setRuleSets(new RuleSet[] {RuleSets.ofList(rulesWithPushDown)})
+ .setRuleSets(ImmutableList.of(RuleSets.ofList(rulesWithPushDown)))
.build();
}
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
index 9b8a417..334af11 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
@@ -19,6 +19,7 @@
import com.google.zetasql.LanguageOptions;
import com.google.zetasql.Value;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
@@ -72,7 +73,7 @@
* Called by {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv}.instantiatePlanner()
* reflectively.
*/
- public ZetaSQLQueryPlanner(JdbcConnection jdbcConnection, RuleSet[] ruleSets) {
+ public ZetaSQLQueryPlanner(JdbcConnection jdbcConnection, Collection<RuleSet> ruleSets) {
plannerImpl =
new ZetaSQLPlannerImpl(defaultConfig(jdbcConnection, modifyRuleSetsForZetaSql(ruleSets)));
setDefaultTimezone(
@@ -82,15 +83,24 @@
.getZetaSqlDefaultTimezone());
}
- public static RuleSet[] getZetaSqlRuleSets() {
+ public static final Factory FACTORY =
+ new Factory() {
+ @Override
+ public QueryPlanner createPlanner(
+ JdbcConnection jdbcConnection, Collection<RuleSet> ruleSets) {
+ return new ZetaSQLQueryPlanner(jdbcConnection, ruleSets);
+ }
+ };
+
+ public static Collection<RuleSet> getZetaSqlRuleSets() {
return modifyRuleSetsForZetaSql(BeamRuleSets.getRuleSets());
}
- private static RuleSet[] modifyRuleSetsForZetaSql(RuleSet[] ruleSets) {
- RuleSet[] ret = new RuleSet[ruleSets.length];
- for (int i = 0; i < ruleSets.length; i++) {
+ private static Collection<RuleSet> modifyRuleSetsForZetaSql(Collection<RuleSet> ruleSets) {
+ ImmutableList.Builder<RuleSet> ret = ImmutableList.builder();
+ for (RuleSet ruleSet : ruleSets) {
ImmutableList.Builder<RelOptRule> bd = ImmutableList.builder();
- for (RelOptRule rule : ruleSets[i]) {
+ for (RelOptRule rule : ruleSet) {
// TODO[BEAM-9075]: Fix join re-ordering for ZetaSQL planner. Currently join re-ordering
// requires the JoinCommuteRule, which doesn't work without struct flattening.
if (rule instanceof JoinCommuteRule) {
@@ -109,9 +119,9 @@
bd.add(rule);
}
}
- ret[i] = RuleSets.ofList(bd.build());
+ ret.add(RuleSets.ofList(bd.build()));
}
- return ret;
+ return ret.build();
}
public String getDefaultTimezone() {
@@ -177,7 +187,8 @@
return (BeamRelNode) plannerImpl.transform(0, desiredTraits, root.rel);
}
- private static FrameworkConfig defaultConfig(JdbcConnection connection, RuleSet[] ruleSets) {
+ private static FrameworkConfig defaultConfig(
+ JdbcConnection connection, Collection<RuleSet> ruleSets) {
final CalciteConnectionConfig config = connection.config();
final SqlParser.ConfigBuilder parserConfig =
SqlParser.configBuilder()
@@ -210,7 +221,7 @@
.parserConfig(parserConfig.build())
.defaultSchema(defaultSchema)
.traitDefs(traitDefs)
- .ruleSets(ruleSets)
+ .ruleSets(ruleSets.toArray(new RuleSet[0]))
.costFactory(BeamCostModel.FACTORY)
.typeSystem(connection.getTypeFactory().getTypeSystem())
.operatorTable(ChainedSqlOperatorTable.of(opTab0, catalogReader))
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPushDownTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPushDownTest.java
index c529425..80ea234 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPushDownTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPushDownTest.java
@@ -43,6 +43,7 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.FrameworkConfig;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.Frameworks;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.junit.BeforeClass;
@@ -186,7 +187,7 @@
.defaultSchema(defaultSchemaPlus)
.traitDefs(traitDefs)
.context(Contexts.of(contexts))
- .ruleSets(ZetaSQLQueryPlanner.getZetaSqlRuleSets())
+ .ruleSets(ZetaSQLQueryPlanner.getZetaSqlRuleSets().toArray(new RuleSet[0]))
.costFactory(BeamCostModel.FACTORY)
.typeSystem(jdbcConnection.getTypeFactory().getTypeSystem())
.build();
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java
index eacb6b8..6d9ba67 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java
@@ -30,6 +30,7 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.ConventionTraitDef;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.FrameworkConfig;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.Frameworks;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
/** Common setup for ZetaSQL tests. */
@@ -77,7 +78,7 @@
.defaultSchema(jdbcConnection.getCurrentSchemaPlus())
.traitDefs(ImmutableList.of(ConventionTraitDef.INSTANCE))
.context(Contexts.of(jdbcConnection.config()))
- .ruleSets(ZetaSQLQueryPlanner.getZetaSqlRuleSets())
+ .ruleSets(ZetaSQLQueryPlanner.getZetaSqlRuleSets().toArray(new RuleSet[0]))
.costFactory(BeamCostModel.FACTORY)
.typeSystem(jdbcConnection.getTypeFactory().getTypeSystem())
.build();
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java
index 49252ed..e9e5ace 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java
@@ -35,7 +35,7 @@
public class S3FileSystemRegistrar implements FileSystemRegistrar {
@Override
- public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
+ public Iterable<FileSystem<?>> fromOptions(@Nonnull PipelineOptions options) {
checkNotNull(options, "Expect the runner have called FileSystems.setDefaultPipelineOptions().");
return ImmutableList.of(new S3FileSystem(options.as(S3Options.class)));
}
diff --git a/sdks/java/io/azure/build.gradle b/sdks/java/io/azure/build.gradle
index a2deb94..3860aca 100644
--- a/sdks/java/io/azure/build.gradle
+++ b/sdks/java/io/azure/build.gradle
@@ -19,8 +19,7 @@
plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.azure',
- enableChecker: false,
- ignoreRawtypeErrors: true)
+ enableChecker: false)
dependencies {
compile project(path: ":sdks:java:core", configuration: "shadow")
diff --git a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemRegistrar.java b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemRegistrar.java
index 14569c6..33a7054 100644
--- a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemRegistrar.java
+++ b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemRegistrar.java
@@ -27,7 +27,7 @@
public class AzureBlobStoreFileSystemRegistrar implements FileSystemRegistrar {
@Override
- public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
+ public Iterable<FileSystem<?>> fromOptions(@Nonnull PipelineOptions options) {
checkNotNull(options, "Expect the runner have called FileSystems.setDefaultPipelineOptions().");
// TODO
return ImmutableList.of();
diff --git a/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzfsResourceIdTest.java b/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzfsResourceIdTest.java
index abd380c..ee28187 100644
--- a/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzfsResourceIdTest.java
+++ b/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzfsResourceIdTest.java
@@ -56,7 +56,7 @@
public String expectedResult;
@Parameterized.Parameters
- public static Collection paths() {
+ public static Collection<Object[]> paths() {
return Arrays.asList(
new Object[][] {
{"azfs://account/container/", "", RESOLVE_DIRECTORY, "azfs://account/container/"},
diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index 14e1c4e..175044b 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -21,7 +21,6 @@
plugins { id 'org.apache.beam.module' }
applyJavaNature(
enableChecker: false,
- ignoreRawtypeErrors: true,
automaticModuleName: 'org.apache.beam.sdk.io.gcp',
enableSpotbugs: false,
)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index c62dccb..56ab4e6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -1024,9 +1024,9 @@
static class BigQueryServerStreamImpl<T> implements BigQueryServerStream<T> {
- private final ServerStream serverStream;
+ private final ServerStream<T> serverStream;
- public BigQueryServerStreamImpl(ServerStream serverStream) {
+ public BigQueryServerStreamImpl(ServerStream<T> serverStream) {
this.serverStream = serverStream;
}
@@ -1069,7 +1069,7 @@
@Override
public BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request) {
- return new BigQueryServerStreamImpl(client.readRowsCallable().call(request));
+ return new BigQueryServerStreamImpl<>(client.readRowsCallable().call(request));
}
@Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
index d460c5d..06bf36f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
@@ -82,7 +82,7 @@
* Stream}.
*/
public BigQueryStorageStreamSource<T> fromExisting(Stream newStream) {
- return new BigQueryStorageStreamSource(
+ return new BigQueryStorageStreamSource<>(
readSession, newStream, jsonTableSchema, parseFn, outputCoder, bqServices);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index c333c1c..f5774fc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -55,6 +55,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Instant;
@@ -362,7 +363,7 @@
}
private static final SerializableFunction<Row, TableRow> ROW_TO_TABLE_ROW =
- new ToTableRow(SerializableFunctions.identity());
+ new ToTableRow<>(SerializableFunctions.identity());
/** Convert a Beam {@link Row} to a BigQuery {@link TableRow}. */
public static SerializableFunction<Row, TableRow> toTableRow() {
@@ -423,7 +424,7 @@
return output;
}
- private static Object fromBeamField(FieldType fieldType, Object fieldValue) {
+ private static @Nullable Object fromBeamField(FieldType fieldType, Object fieldValue) {
if (fieldValue == null) {
if (!fieldType.getNullable()) {
throw new IllegalArgumentException("Field is not nullable.");
@@ -435,8 +436,8 @@
case ARRAY:
case ITERABLE:
FieldType elementType = fieldType.getCollectionElementType();
- Iterable items = (Iterable) fieldValue;
- List convertedItems = Lists.newArrayListWithCapacity(Iterables.size(items));
+ Iterable<?> items = (Iterable<?>) fieldValue;
+ List<Object> convertedItems = Lists.newArrayListWithCapacity(Iterables.size(items));
for (Object item : items) {
convertedItems.add(fromBeamField(elementType, item));
}
@@ -670,7 +671,7 @@
FieldType beamField, Object value, BigQueryUtils.ConversionOptions options) {
// Check whether the type of array element is equal.
List<Object> values = (List<Object>) value;
- List<Object> ret = new ArrayList();
+ List<Object> ret = new ArrayList<>();
FieldType collectionElement = beamField.getCollectionElementType();
for (Object v : values) {
ret.add(convertAvroFormat(collectionElement, v, options));
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
index 6f382fe..b56f324 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
@@ -35,8 +35,8 @@
this.elementCoder = elementCoder;
}
- public static <ElementT> TableRowInfoCoder of(Coder<ElementT> elementCoder) {
- return new TableRowInfoCoder(elementCoder);
+ public static <ElementT> TableRowInfoCoder<ElementT> of(Coder<ElementT> elementCoder) {
+ return new TableRowInfoCoder<>(elementCoder);
}
@Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
index 46f86af..fbc95c6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
@@ -33,7 +33,7 @@
*/
@VisibleForTesting
class TagWithUniqueIds<ElementT>
- extends DoFn<KV<ShardedKey<String>, ElementT>, KV<ShardedKey<String>, TableRowInfo>> {
+ extends DoFn<KV<ShardedKey<String>, ElementT>, KV<ShardedKey<String>, TableRowInfo<ElementT>>> {
private transient String randomUUID;
private transient long sequenceNo = 0L;
@@ -50,6 +50,7 @@
// BigQuery.
context.output(
KV.of(
- context.element().getKey(), new TableRowInfo(context.element().getValue(), uniqueId)));
+ context.element().getKey(),
+ new TableRowInfo<>(context.element().getValue(), uniqueId)));
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 0dd6f8a..b214838 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -379,7 +379,9 @@
.setUseAvroLogicalTypes(useAvroLogicalTypes);
if (schemaUpdateOptions != null) {
List<String> options =
- schemaUpdateOptions.stream().map(Enum::name).collect(Collectors.toList());
+ schemaUpdateOptions.stream()
+ .map(Enum<SchemaUpdateOption>::name)
+ .collect(Collectors.toList());
loadConfig.setSchemaUpdateOptions(options);
}
if (timePartitioning != null) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareIOErrorCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareIOErrorCoder.java
index da139ee..6ad7ae0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareIOErrorCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareIOErrorCoder.java
@@ -39,7 +39,7 @@
this.originalCoder = NullableCoder.of(originalCoder);
}
- public static <T> HealthcareIOErrorCoder of(Coder<T> originalCoder) {
+ public static <T> HealthcareIOErrorCoder<T> of(Coder<T> originalCoder) {
return new HealthcareIOErrorCoder<>(originalCoder);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
index 926e79b..93612ae 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
@@ -44,7 +44,7 @@
public static final String URN = "beam:external:java:pubsub:read:v1";
@Override
- public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
+ public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
return ImmutableMap.of(URN, ReadBuilder.class);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
index 7ab956c..d4df6f6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
@@ -42,7 +42,7 @@
public static final String URN = "beam:external:java:pubsub:write:v1";
@Override
- public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
+ public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
return ImmutableMap.of(URN, WriteBuilder.class);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
index 5e05c05..7dcc44b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
@@ -27,6 +27,8 @@
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
+import com.google.spanner.v1.CommitRequest;
+import com.google.spanner.v1.CommitResponse;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
@@ -105,7 +107,7 @@
if (commitDeadline != null && commitDeadline.get().getMillis() > 0) {
// Set the GRPC deadline on the Commit API call.
- UnaryCallSettings.Builder commitSettings =
+ UnaryCallSettings.Builder<CommitRequest, CommitResponse> commitSettings =
builder.getSpannerStubSettingsBuilder().commitSettings();
RetrySettings.Builder commitRetrySettings = commitSettings.getRetrySettings().toBuilder();
commitSettings.setRetrySettings(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 6a2658c..0554b74 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -801,7 +801,7 @@
abstract FailureMode getFailureMode();
- abstract @Nullable PCollection getSchemaReadySignal();
+ abstract @Nullable PCollection<?> getSchemaReadySignal();
abstract OptionalInt getGroupingFactor();
@@ -820,7 +820,7 @@
abstract Builder setFailureMode(FailureMode failureMode);
- abstract Builder setSchemaReadySignal(PCollection schemaReadySignal);
+ abstract Builder setSchemaReadySignal(PCollection<?> schemaReadySignal);
abstract Builder setGroupingFactor(int groupingFactor);
@@ -947,7 +947,7 @@
*
* @see Wait.OnSignal
*/
- public Write withSchemaReadySignal(PCollection signal) {
+ public Write withSchemaReadySignal(PCollection<?> signal) {
return toBuilder().setSchemaReadySignal(signal).build();
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java
index 37d8cbf..412864d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java
@@ -120,7 +120,7 @@
// Longs tend to get converted back to Integers due to JSON serialization. Convert them back.
public static TableRow convertNumbers(TableRow tableRow) {
- for (TableRow.Entry entry : tableRow.entrySet()) {
+ for (TableRow.Entry<?, Object> entry : tableRow.entrySet()) {
if (entry.getValue() instanceof Integer) {
entry.setValue(Long.valueOf((Integer) entry.getValue()));
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index 06bb0a1..6970ef8 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -138,7 +138,7 @@
}
private void checkTypedReadQueryObject(
- BigQueryIO.TypedRead read, String query, String kmsKey, String tempDataset) {
+ BigQueryIO.TypedRead<?> read, String query, String kmsKey, String tempDataset) {
checkTypedReadQueryObjectWithValidate(read, query, kmsKey, tempDataset, true);
}
@@ -159,7 +159,7 @@
}
private void checkTypedReadQueryObjectWithValidate(
- BigQueryIO.TypedRead read,
+ BigQueryIO.TypedRead<?> read,
String query,
String kmsKey,
String tempDataset,
@@ -226,7 +226,7 @@
@Test
public void testBuildQueryBasedTypedReadSource() {
- BigQueryIO.TypedRead read =
+ BigQueryIO.TypedRead<?> read =
BigQueryIO.readTableRows()
.fromQuery("foo_query")
.withKmsKey("kms_key")
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
index 1b55843..5c9bea5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
@@ -226,7 +226,7 @@
.withMethod(Method.DIRECT_READ);
}
- private void checkTypedReadQueryObject(TypedRead typedRead, String query) {
+ private void checkTypedReadQueryObject(TypedRead<?> typedRead, String query) {
assertNull(typedRead.getTable());
assertEquals(query, typedRead.getQuery().get());
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
index 118d994..00ffba2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -213,7 +213,7 @@
}
private void checkTypedReadTableObject(
- TypedRead typedRead, String project, String dataset, String table) {
+ TypedRead<?> typedRead, String project, String dataset, String table) {
assertEquals(project, typedRead.getTable().getProjectId());
assertEquals(dataset, typedRead.getTable().getDatasetId());
assertEquals(table, typedRead.getTable().getTableId());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index ebe706c..37999aa 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -1641,7 +1641,7 @@
.toString();
TableRowWriter<TableRow> writer =
new TableRowWriter<>(filename, SerializableFunctions.identity());
- try (TableRowWriter ignored = writer) {
+ try (TableRowWriter<TableRow> ignored = writer) {
TableRow tableRow = new TableRow().set("name", tableName);
writer.write(tableRow);
}
@@ -1992,7 +1992,7 @@
p.run();
List<String> expectedOptions =
- schemaUpdateOptions.stream().map(Enum::name).collect(Collectors.toList());
+ schemaUpdateOptions.stream().map(SchemaUpdateOption::name).collect(Collectors.toList());
for (Job job : fakeJobService.getAllJobs()) {
JobConfigurationLoad configuration = job.getConfiguration().getLoad();
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
index c159148..e08a24b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
@@ -193,7 +193,7 @@
RunnerApi.ParDoPayload parDoPayload =
RunnerApi.ParDoPayload.parseFrom(writeParDo.getSpec().getPayload());
- DoFn pubsubWriter = ParDoTranslation.getDoFn(parDoPayload);
+ DoFn<?, ?> pubsubWriter = ParDoTranslation.getDoFn(parDoPayload);
String idAttributeActual = (String) Whitebox.getInternalState(pubsubWriter, "idAttribute");
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
index 03fd170..b0eb872 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
@@ -62,6 +62,7 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
@@ -743,7 +744,8 @@
BatchableMutationFilterFn testFn =
new BatchableMutationFilterFn(null, null, 10000000, 3 * CELLS_PER_KEY, 1000);
- ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class);
+ BatchableMutationFilterFn.ProcessContext mockProcessContext =
+ Mockito.mock(ProcessContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
// Capture the outputs.
@@ -797,7 +799,8 @@
BatchableMutationFilterFn testFn =
new BatchableMutationFilterFn(null, null, mutationSize * 3, 1000, 1000);
- ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class);
+ DoFn<MutationGroup, MutationGroup>.ProcessContext mockProcessContext =
+ Mockito.mock(ProcessContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
// Capture the outputs.
@@ -850,7 +853,8 @@
long mutationSize = MutationSizeEstimator.sizeOf(m(1L));
BatchableMutationFilterFn testFn = new BatchableMutationFilterFn(null, null, 1000, 1000, 3);
- ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class);
+ BatchableMutationFilterFn.ProcessContext mockProcessContext =
+ Mockito.mock(ProcessContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
// Capture the outputs.
@@ -888,7 +892,8 @@
BatchableMutationFilterFn testFn = new BatchableMutationFilterFn(null, null, 0, 0, 0);
- ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class);
+ BatchableMutationFilterFn.ProcessContext mockProcessContext =
+ Mockito.mock(ProcessContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
// Capture the outputs.
@@ -921,8 +926,10 @@
100, // groupingFactor
null);
- ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class);
- FinishBundleContext mockFinishBundleContext = Mockito.mock(FinishBundleContext.class);
+ GatherSortCreateBatchesFn.ProcessContext mockProcessContext =
+ Mockito.mock(ProcessContext.class);
+ GatherSortCreateBatchesFn.FinishBundleContext mockFinishBundleContext =
+ Mockito.mock(FinishBundleContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
// Capture the outputs.
@@ -982,8 +989,10 @@
3, // groupingFactor
null);
- ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class);
- FinishBundleContext mockFinishBundleContext = Mockito.mock(FinishBundleContext.class);
+ GatherSortCreateBatchesFn.ProcessContext mockProcessContext =
+ Mockito.mock(ProcessContext.class);
+ GatherSortCreateBatchesFn.FinishBundleContext mockFinishBundleContext =
+ Mockito.mock(FinishBundleContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
OutputReceiver<Iterable<MutationGroup>> mockOutputReceiver = mock(OutputReceiver.class);
@@ -1090,8 +1099,10 @@
}
private void testAndVerifyBatches(GatherSortCreateBatchesFn testFn) throws Exception {
- ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class);
- FinishBundleContext mockFinishBundleContext = Mockito.mock(FinishBundleContext.class);
+ GatherSortCreateBatchesFn.ProcessContext mockProcessContext =
+ Mockito.mock(ProcessContext.class);
+ GatherSortCreateBatchesFn.FinishBundleContext mockFinishBundleContext =
+ Mockito.mock(FinishBundleContext.class);
when(mockProcessContext.sideInput(any())).thenReturn(getSchema());
// Capture the output at finish bundle..
diff --git a/sdks/java/io/hadoop-file-system/build.gradle b/sdks/java/io/hadoop-file-system/build.gradle
index bb3c05b..f83f318 100644
--- a/sdks/java/io/hadoop-file-system/build.gradle
+++ b/sdks/java/io/hadoop-file-system/build.gradle
@@ -19,7 +19,6 @@
plugins { id 'org.apache.beam.module' }
applyJavaNature(
enableChecker:false,
- ignoreRawtypeErrors:true,
automaticModuleName: 'org.apache.beam.sdk.io.hdfs')
description = "Apache Beam :: SDKs :: Java :: IO :: Hadoop File System"
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
index 8368aff..bdfc94e 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
@@ -50,7 +50,7 @@
private static final String CONFIG_KEY_DFS_NAMESERVICES = "dfs.nameservices";
@Override
- public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
+ public Iterable<FileSystem<?>> fromOptions(@Nonnull PipelineOptions options) {
final List<Configuration> configurations =
options.as(HadoopFileSystemOptions.class).getHdfsConfiguration();
if (configurations == null) {
@@ -63,7 +63,7 @@
"The %s currently only supports at most a single Hadoop configuration.",
HadoopFileSystemRegistrar.class.getSimpleName()));
- final ImmutableList.Builder<FileSystem> builder = ImmutableList.builder();
+ final ImmutableList.Builder<FileSystem<?>> builder = ImmutableList.builder();
final Set<String> registeredSchemes = new HashSet<>();
// this will only do zero or one loop
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
index 23e5e06..d1fc7c4 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
@@ -59,8 +59,8 @@
assertThat(
deserializedConfiguration,
Matchers.<Map.Entry<String, String>>containsInAnyOrder(
- new AbstractMap.SimpleEntry("testPropertyA", "A"),
- new AbstractMap.SimpleEntry("testPropertyB", "B"),
- new AbstractMap.SimpleEntry("testPropertyC", "baseC")));
+ new AbstractMap.SimpleEntry<>("testPropertyA", "A"),
+ new AbstractMap.SimpleEntry<>("testPropertyB", "B"),
+ new AbstractMap.SimpleEntry<>("testPropertyC", "baseC")));
}
}
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
index 8f3fa3d..f2f289f 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
@@ -57,11 +57,11 @@
assertThat(
options.getHdfsConfiguration().get(0),
Matchers.<Map.Entry<String, String>>contains(
- new AbstractMap.SimpleEntry("propertyA", "A")));
+ new AbstractMap.SimpleEntry<>("propertyA", "A")));
assertThat(
options.getHdfsConfiguration().get(1),
Matchers.<Map.Entry<String, String>>contains(
- new AbstractMap.SimpleEntry("propertyB", "B")));
+ new AbstractMap.SimpleEntry<>("propertyB", "B")));
}
@Test
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
index 52e67c7..f8aaeb6 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
@@ -68,7 +68,7 @@
for (FileSystemRegistrar registrar :
Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
if (registrar instanceof HadoopFileSystemRegistrar) {
- Iterable<FileSystem> fileSystems = registrar.fromOptions(options);
+ Iterable<FileSystem<?>> fileSystems = registrar.fromOptions(options);
assertEquals(
hdfsClusterBaseUri.getScheme(),
((HadoopFileSystem) Iterables.getOnlyElement(fileSystems)).getScheme());
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 892074d..c2ef750 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -21,6 +21,7 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.value.AutoValue;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.Serializable;
import java.sql.Connection;
@@ -559,6 +560,8 @@
return rows;
}
+ // Spotbugs seems to not understand the multi-statement try-with-resources
+ @SuppressFBWarnings("OBL_UNSATISFIED_OBLIGATION")
private Schema inferBeamSchema() {
DataSource ds = getDataSourceProviderFn().apply(null);
try (Connection conn = ds.getConnection();
@@ -889,6 +892,8 @@
}
@ProcessElement
+ // Spotbugs seems to not understand the nested try-with-resources
+ @SuppressFBWarnings("OBL_UNSATISFIED_OBLIGATION")
public void processElement(ProcessContext context) throws Exception {
// Only acquire the connection if we need to perform a read.
if (connection == null) {
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadRowsRegistrar.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadRowsRegistrar.java
index cc623a2..72c8b05 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadRowsRegistrar.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadRowsRegistrar.java
@@ -38,7 +38,7 @@
public static final String URN = "beam:external:java:jdbc:read_rows:v1";
@Override
- public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
+ public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
return ImmutableMap.of(URN, JdbcReadRowsRegistrar.Builder.class);
}
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteRegistrar.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteRegistrar.java
index 46e363d..39997e0 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteRegistrar.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteRegistrar.java
@@ -38,7 +38,7 @@
public static final String URN = "beam:external:java:jdbc:write:v1";
@Override
- public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
+ public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
return ImmutableMap.of(URN, JdbcWriteRegistrar.Builder.class);
}
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java
index ad8e0ae..11434c3 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsRecord.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.jms;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
@@ -32,8 +33,17 @@
private final @Nullable String jmsMessageID;
private final long jmsTimestamp;
private final String jmsCorrelationID;
+
+ // JMS ReplyTo destination is serializable according to the JMS spec even if it doesn't implement
+ // Serializable.
+ @SuppressFBWarnings("SE_BAD_FIELD")
private final @Nullable Destination jmsReplyTo;
+
+ // JMS destination is serializable according to the JMS spec even if it doesn't implement
+ // Serializable.
+ @SuppressFBWarnings("SE_BAD_FIELD")
private final Destination jmsDestination;
+
private final int jmsDeliveryMode;
private final boolean jmsRedelivered;
private final String jmsType;
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index b7f2c09..13aabc8 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -492,8 +492,11 @@
public static final String URN = "beam:external:java:kafka:read:v1";
@Override
- public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
- return ImmutableMap.of(URN, AutoValue_KafkaIO_Read.Builder.class);
+ public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+ return ImmutableMap.of(
+ URN,
+ (Class<? extends ExternalTransformBuilder<?, ?, ?>>)
+ (Class<?>) AutoValue_KafkaIO_Read.Builder.class);
}
/** Parameters class to expose the Read transform to an external SDK. */
@@ -1324,9 +1327,10 @@
}
throw new UnsupportedOperationException(
runner
- + " is not a runner known to be compatible with Kafka exactly-once sink. "
- + "This implementation of exactly-once sink relies on specific checkpoint guarantees. "
- + "Only the runners with known to have compatible checkpoint semantics are allowed.");
+ + " is not a runner known to be compatible with Kafka exactly-once sink. This"
+ + " implementation of exactly-once sink relies on specific checkpoint guarantees."
+ + " Only the runners with known to have compatible checkpoint semantics are"
+ + " allowed.");
}
}
@@ -1417,8 +1421,10 @@
public static final String URN = "beam:external:java:kafka:write:v1";
@Override
- public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
- return ImmutableMap.of(URN, AutoValue_KafkaIO_Write.Builder.class);
+ public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+ return ImmutableMap.of(
+ URN,
+ (Class<KafkaIO.Write.Builder<?, ?>>) (Class<?>) AutoValue_KafkaIO_Write.Builder.class);
}
/** Parameters class to expose the Write transform to an external SDK. */
diff --git a/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java b/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java
index 2f9b26d..75fb73b 100644
--- a/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java
+++ b/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java
@@ -164,8 +164,8 @@
private static Schema schema = new Schema.Parser().parse(rawSchema);
@Override
- public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
- ImmutableMap.Builder<String, Class<? extends ExternalTransformBuilder>> builder =
+ public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+ ImmutableMap.Builder<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> builder =
ImmutableMap.builder();
builder.put(TEST_PREFIX_URN, PrefixBuilder.class);
builder.put(TEST_MULTI_URN, MultiBuilder.class);