[NEMO-377] Fix watermark emission when there are no outputs in GBKWindowTransform (#210)
JIRA: [NEMO-377: Fix watermark emission when there are no outputs in GBKWindowTransform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-377)
**Major changes:**
- Set min value when there are no outputs in GBKWindow
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index cf75cb4..a7c815f 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -53,6 +53,7 @@
private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
private Watermark prevOutputWatermark;
private final Map<K, Watermark> keyAndWatermarkHoldMap;
+ private boolean dataReceived = false;
/**
* GroupByKey constructor.
@@ -123,6 +124,7 @@
@Override
public void onData(final WindowedValue<KV<K, InputT>> element) {
checkAndInvokeBundle();
+ dataReceived = true;
// We can call Beam's DoFnRunner#processElement here,
// but it may generate some overheads if we call the method for each data.
@@ -178,7 +180,8 @@
private void emitOutputWatermark(final Watermark inputWatermark) {
// Find min watermark hold
final Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
- ? new Watermark(Long.MAX_VALUE) // set this to MAX, in order to just use the input watermark.
+ ? new Watermark(dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE)
+ // set this to MAX, in order not to emit input watermark when there are no outputs.
: Collections.min(keyAndWatermarkHoldMap.values());
final Watermark outputWatermarkCandidate = new Watermark(
Math.max(prevOutputWatermark.getTimestamp(),
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
index 4e51525..1af392c 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -173,13 +173,7 @@
doFnTransform.onWatermark(watermark2);
assertEquals(0, oc.outputs.size()); // do not emit anything
- assertEquals(1, oc.watermarks.size());
-
- // check output watermark
- assertEquals(1400,
- oc.watermarks.get(0).getTimestamp());
-
- oc.watermarks.clear();
+ assertEquals(0, oc.watermarks.size());
doFnTransform.onData(WindowedValue.of(
KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING));