Merge pull request #428 from peihe/disable-unbounded-read

Revert PR-427 to re-enable streaming bounded read
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index d77fc86..d54927d 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -361,15 +361,7 @@
       builder.put(View.AsList.class, StreamingViewAsList.class);
       builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
       builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
-      if (options.getExperiments() == null
-          || !options.getExperiments().contains("enable_streaming_bounded_read")) {
-        builder.put(Read.Bounded.class, UnsupportedIO.class);
-        builder.put(AvroIO.Read.Bound.class, UnsupportedIO.class);
-        builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class);
-        builder.put(TextIO.Read.Bound.class, UnsupportedIO.class);
-      } else {
-        builder.put(Read.Bounded.class, StreamingBoundedRead.class);
-      }
+      builder.put(Read.Bounded.class, StreamingBoundedRead.class);
       builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class);
       builder.put(Window.Bound.class, AssignWindows.class);
       // In streaming mode must use either the custom Pubsub unbounded source/sink or
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
index 7ad99db..1dac6c4 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
@@ -46,8 +46,6 @@
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
 import com.google.cloud.dataflow.sdk.io.AvroIO;
-import com.google.cloud.dataflow.sdk.io.AvroSource;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
 import com.google.cloud.dataflow.sdk.io.Read;
 import com.google.cloud.dataflow.sdk.io.TextIO;
 import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions;
@@ -931,33 +929,6 @@
   }
 
   @Test
-  public void testBoundedSourceUnsupportedInStreaming() throws Exception {
-    testUnsupportedSource(
-        AvroSource.readFromFileWithClass("foo", String.class), "Read.Bounded", true);
-  }
-
-  @Test
-  public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception {
-    testUnsupportedSource(
-        BigQueryIO.Read.from("project:bar.baz").withoutValidation(), "BigQueryIO.Read", true);
-  }
-
-  @Test
-  public void testAvroIOSourceUnsupportedInStreaming() throws Exception {
-    testUnsupportedSource(AvroIO.Read.from("foo"), "AvroIO.Read", true);
-  }
-
-  @Test
-  public void testTextIOSourceUnsupportedInStreaming() throws Exception {
-    testUnsupportedSource(TextIO.Read.from("foo"), "TextIO.Read", true);
-  }
-
-  @Test
-  public void testReadBoundedSourceUnsupportedInStreaming() throws Exception {
-    testUnsupportedSource(Read.from(AvroSource.from("/tmp/test")), "Read.Bounded", true);
-  }
-
-  @Test
   public void testReadUnboundedUnsupportedInBatch() throws Exception {
     testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);
   }