Fixes a bug in custom unbounded readers

Custom unbounded readers are read in bundles of at most
10k elements or 10 seconds. A recent change accidentally removed
the 10k element limit. This change reintroduces it and
adds a test.

The previous test also was passing vacuously because
the iteration limit was incorrect (it would always
have only one iteration).
----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=112723469
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java
index 5dbeaa0..f159f66 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java
@@ -546,14 +546,16 @@
     }
   }
 
+  // Commit at least once every 10 seconds or 10k records.  This keeps the watermark advancing
+  // smoothly, and ensures that not too much work will have to be reprocessed in the event of
+  // a crash.
+  @VisibleForTesting
+  static final int MAX_UNBOUNDED_BUNDLE_SIZE = 10000;
+  @VisibleForTesting
+  static final Duration MAX_UNBOUNDED_BUNDLE_READ_TIME = Duration.standardSeconds(10);
+
   private static class UnboundedReaderIterator<T>
       extends NativeReader.NativeReaderIterator<WindowedValue<ValueWithRecordId<T>>> {
-    // Commit at least once every 10 seconds or 10k records.  This keeps the watermark advancing
-    // smoothly, and ensures that not too much work will have to be reprocessed in the event of
-    // a crash.
-    private static final int MAX_BUNDLE_SIZE = 10000;
-    private static final Duration MAX_BUNDLE_READ_TIME = Duration.standardSeconds(10);
-
     private final UnboundedSource.UnboundedReader<T> reader;
     private final boolean started;
     private final Instant endTime;
@@ -561,7 +563,7 @@
 
     private UnboundedReaderIterator(UnboundedSource.UnboundedReader<T> reader, boolean started) {
       this.reader = reader;
-      this.endTime = Instant.now().plus(MAX_BUNDLE_READ_TIME);
+      this.endTime = Instant.now().plus(MAX_UNBOUNDED_BUNDLE_READ_TIME);
       this.elemsRead = 0;
       this.started = started;
     }
@@ -588,7 +590,7 @@
 
     @Override
     public boolean advance() throws IOException {
-      if (elemsRead >= MAX_BUNDLE_SIZE
+      if (elemsRead >= MAX_UNBOUNDED_BUNDLE_SIZE
           || Instant.now().isAfter(endTime)) {
         return false;
       }
@@ -598,6 +600,7 @@
       while (true) {
         try {
           if (reader.advance()) {
+            elemsRead++;
             return true;
           }
         } catch (Exception e) {
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSourcesTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSourcesTest.java
index 15256df..b4fbb3b 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSourcesTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSourcesTest.java
@@ -32,6 +32,7 @@
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -632,7 +633,8 @@
     options.setNumWorkers(5);
 
     ByteString state = ByteString.EMPTY;
-    for (int i = 0; i < 100; /* Incremented in inner loop */) {
+    for (int i = 0; i < 10 * CustomSources.MAX_UNBOUNDED_BUNDLE_SIZE;
+         /* Incremented in inner loop */) {
       WindowedValue<ValueWithRecordId<KV<Integer, Integer>>> value;
 
       // Initialize streaming context with state from previous iteration.
@@ -664,6 +666,8 @@
           iterator = reader.iterator();
 
       // Verify data.
+      Instant beforeReading = Instant.now();
+      int numReadOnThisIteration = 0;
       for (boolean more = iterator.start(); more; more = iterator.advance()) {
         value = iterator.getCurrent();
         assertEquals(KV.of(0, i), value.getValue().getValue());
@@ -673,7 +677,14 @@
         assertThat(value.getWindows(), contains((BoundedWindow) GlobalWindow.INSTANCE));
         assertEquals(i, value.getTimestamp().getMillis());
         i++;
+        numReadOnThisIteration++;
       }
+      Instant afterReading = Instant.now();
+      assertThat(
+          new Duration(beforeReading, afterReading).getStandardSeconds(),
+          lessThanOrEqualTo(CustomSources.MAX_UNBOUNDED_BUNDLE_READ_TIME.getStandardSeconds() + 1));
+      assertThat(
+          numReadOnThisIteration, lessThanOrEqualTo(CustomSources.MAX_UNBOUNDED_BUNDLE_SIZE));
 
       // Extract and verify state modifications.
       context.flushState();