Merge pull request #428 from peihe/disable-unbounded-read
Revert PR-427 to re-enable streaming bounded read
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index d77fc86..d54927d 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -361,15 +361,7 @@
builder.put(View.AsList.class, StreamingViewAsList.class);
builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
- if (options.getExperiments() == null
- || !options.getExperiments().contains("enable_streaming_bounded_read")) {
- builder.put(Read.Bounded.class, UnsupportedIO.class);
- builder.put(AvroIO.Read.Bound.class, UnsupportedIO.class);
- builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class);
- builder.put(TextIO.Read.Bound.class, UnsupportedIO.class);
- } else {
- builder.put(Read.Bounded.class, StreamingBoundedRead.class);
- }
+ builder.put(Read.Bounded.class, StreamingBoundedRead.class);
builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class);
builder.put(Window.Bound.class, AssignWindows.class);
// In streaming mode must use either the custom Pubsub unbounded source/sink or
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
index 7ad99db..1dac6c4 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
@@ -46,8 +46,6 @@
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
import com.google.cloud.dataflow.sdk.io.AvroIO;
-import com.google.cloud.dataflow.sdk.io.AvroSource;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions;
@@ -931,33 +929,6 @@
}
@Test
- public void testBoundedSourceUnsupportedInStreaming() throws Exception {
- testUnsupportedSource(
- AvroSource.readFromFileWithClass("foo", String.class), "Read.Bounded", true);
- }
-
- @Test
- public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception {
- testUnsupportedSource(
- BigQueryIO.Read.from("project:bar.baz").withoutValidation(), "BigQueryIO.Read", true);
- }
-
- @Test
- public void testAvroIOSourceUnsupportedInStreaming() throws Exception {
- testUnsupportedSource(AvroIO.Read.from("foo"), "AvroIO.Read", true);
- }
-
- @Test
- public void testTextIOSourceUnsupportedInStreaming() throws Exception {
- testUnsupportedSource(TextIO.Read.from("foo"), "TextIO.Read", true);
- }
-
- @Test
- public void testReadBoundedSourceUnsupportedInStreaming() throws Exception {
- testUnsupportedSource(Read.from(AvroSource.from("/tmp/test")), "Read.Bounded", true);
- }
-
- @Test
public void testReadUnboundedUnsupportedInBatch() throws Exception {
testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);
}