Fixes a bug in custom unbounded readers
Custom unbounded readers are read in bundles of at most
10k elements or 10 seconds. A recent change accidentally removed
the 10k element limit. This change reintroduces it and
adds a test.
The previous test also was passing vacuously because
the iteration limit was incorrect (it would always
have only one iteration).
----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=112723469
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java
index 5dbeaa0..f159f66 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java
@@ -546,14 +546,16 @@
}
}
+ // Commit at least once every 10 seconds or 10k records. This keeps the watermark advancing
+ // smoothly, and ensures that not too much work will have to be reprocessed in the event of
+ // a crash.
+ @VisibleForTesting
+ static final int MAX_UNBOUNDED_BUNDLE_SIZE = 10000;
+ @VisibleForTesting
+ static final Duration MAX_UNBOUNDED_BUNDLE_READ_TIME = Duration.standardSeconds(10);
+
private static class UnboundedReaderIterator<T>
extends NativeReader.NativeReaderIterator<WindowedValue<ValueWithRecordId<T>>> {
- // Commit at least once every 10 seconds or 10k records. This keeps the watermark advancing
- // smoothly, and ensures that not too much work will have to be reprocessed in the event of
- // a crash.
- private static final int MAX_BUNDLE_SIZE = 10000;
- private static final Duration MAX_BUNDLE_READ_TIME = Duration.standardSeconds(10);
-
private final UnboundedSource.UnboundedReader<T> reader;
private final boolean started;
private final Instant endTime;
@@ -561,7 +563,7 @@
private UnboundedReaderIterator(UnboundedSource.UnboundedReader<T> reader, boolean started) {
this.reader = reader;
- this.endTime = Instant.now().plus(MAX_BUNDLE_READ_TIME);
+ this.endTime = Instant.now().plus(MAX_UNBOUNDED_BUNDLE_READ_TIME);
this.elemsRead = 0;
this.started = started;
}
@@ -588,7 +590,7 @@
@Override
public boolean advance() throws IOException {
- if (elemsRead >= MAX_BUNDLE_SIZE
+ if (elemsRead >= MAX_UNBOUNDED_BUNDLE_SIZE
|| Instant.now().isAfter(endTime)) {
return false;
}
@@ -598,6 +600,7 @@
while (true) {
try {
if (reader.advance()) {
+ elemsRead++;
return true;
}
} catch (Exception e) {
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSourcesTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSourcesTest.java
index 15256df..b4fbb3b 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSourcesTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSourcesTest.java
@@ -32,6 +32,7 @@
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -632,7 +633,8 @@
options.setNumWorkers(5);
ByteString state = ByteString.EMPTY;
- for (int i = 0; i < 100; /* Incremented in inner loop */) {
+ for (int i = 0; i < 10 * CustomSources.MAX_UNBOUNDED_BUNDLE_SIZE;
+ /* Incremented in inner loop */) {
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>> value;
// Initialize streaming context with state from previous iteration.
@@ -664,6 +666,8 @@
iterator = reader.iterator();
// Verify data.
+ Instant beforeReading = Instant.now();
+ int numReadOnThisIteration = 0;
for (boolean more = iterator.start(); more; more = iterator.advance()) {
value = iterator.getCurrent();
assertEquals(KV.of(0, i), value.getValue().getValue());
@@ -673,7 +677,14 @@
assertThat(value.getWindows(), contains((BoundedWindow) GlobalWindow.INSTANCE));
assertEquals(i, value.getTimestamp().getMillis());
i++;
+ numReadOnThisIteration++;
}
+ Instant afterReading = Instant.now();
+ assertThat(
+ new Duration(beforeReading, afterReading).getStandardSeconds(),
+ lessThanOrEqualTo(CustomSources.MAX_UNBOUNDED_BUNDLE_READ_TIME.getStandardSeconds() + 1));
+ assertThat(
+ numReadOnThisIteration, lessThanOrEqualTo(CustomSources.MAX_UNBOUNDED_BUNDLE_SIZE));
// Extract and verify state modifications.
context.flushState();