Beam-2535 : Added original PR watermark hold functionality.
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index e1080e5..bf5c0bb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -44,12 +44,14 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -71,6 +73,8 @@
private final ParDoEvaluatorFactory<KV<K, InputT>, OutputT> delegateFactory;
+ private final EvaluationContext evaluationContext;
+
StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext, PipelineOptions options) {
this.delegateFactory =
new ParDoEvaluatorFactory<>(
@@ -92,6 +96,8 @@
CacheBuilder.newBuilder()
.weakValues()
.build(new CleanupSchedulingLoader(evaluationContext));
+
+ this.evaluationContext = evaluationContext;
}
@Override
@@ -146,7 +152,13 @@
application.getTransform().getSchemaInformation(),
application.getTransform().getSideInputMapping());
- return new StatefulParDoEvaluator<>(delegateEvaluator);
+ DirectStepContext stepContext =
+ evaluationContext
+ .getExecutionContext(application, inputBundle.getKey())
+ .getStepContext(evaluationContext.getStepName(application));
+
+ stepContext.stateInternals().commit();
+ return new StatefulParDoEvaluator<>(delegateEvaluator, stepContext);
}
private class CleanupSchedulingLoader
@@ -241,10 +253,13 @@
private final List<TimerData> pushedBackTimers = new ArrayList<>();
private final DirectTimerInternals timerInternals;
+ DirectStepContext stepContext;
+
public StatefulParDoEvaluator(
- DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator) {
+ DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator, DirectStepContext stepContext) {
this.delegateEvaluator = delegateEvaluator;
this.timerInternals = delegateEvaluator.getParDoEvaluator().getStepContext().timerInternals();
+ this.stepContext = stepContext;
}
@Override
@@ -269,6 +284,14 @@
WindowNamespace<?> windowNamespace = (WindowNamespace) timer.getNamespace();
BoundedWindow timerWindow = windowNamespace.getWindow();
delegateEvaluator.onTimer(timer, timerWindow);
+
+ StateTag<WatermarkHoldState> timerTag =
+ StateTags.makeSystemTagInternal(
+ StateTags.watermarkStateInternal(
+ "timer-" + timer.getTimerId(), TimestampCombiner.EARLIEST));
+ stepContext.stateInternals().state(timer.getNamespace(), timerTag).clear();
+ stepContext.stateInternals().commit();
+
if (timerInternals.containsUpdateForTimeBefore(currentInputWatermark)) {
break;
}
@@ -278,15 +301,44 @@
@Override
public TransformResult<KeyedWorkItem<K, KV<K, InputT>>> finishBundle() throws Exception {
+
TransformResult<KV<K, InputT>> delegateResult = delegateEvaluator.finishBundle();
+ Boolean isTimerDeclared = false;
+ for (TimerData timerData : delegateResult.getTimerUpdate().getSetTimers()) {
+ StateTag<WatermarkHoldState> timerTag =
+ StateTags.makeSystemTagInternal(
+ StateTags.watermarkStateInternal(
+ "timer-" + timerData.getTimerId(), TimestampCombiner.EARLIEST));
+
+ stepContext
+ .stateInternals()
+ .state(timerData.getNamespace(), timerTag)
+ .add(timerData.getOutputTimestamp());
+ isTimerDeclared = true;
+ }
+
+ CopyOnAccessInMemoryStateInternals state;
+ Instant watermarkHold;
+
+ if(isTimerDeclared && delegateResult.getState() != null){
+ state = delegateResult.getState();
+ watermarkHold = stepContext.commitState().getEarliestWatermarkHold();
+ } else if(isTimerDeclared){
+ state = stepContext.commitState();
+ watermarkHold = state.getEarliestWatermarkHold();
+ }else {
+ state = delegateResult.getState();
+ watermarkHold = delegateResult.getWatermarkHold();
+ }
+
TimerUpdate timerUpdate =
delegateResult.getTimerUpdate().withPushedBackTimers(pushedBackTimers);
pushedBackTimers.clear();
StepTransformResult.Builder<KeyedWorkItem<K, KV<K, InputT>>> regroupedResult =
StepTransformResult.<KeyedWorkItem<K, KV<K, InputT>>>withHold(
- delegateResult.getTransform(), delegateResult.getWatermarkHold())
+ delegateResult.getTransform(), watermarkHold)
.withTimerUpdate(timerUpdate)
- .withState(delegateResult.getState())
+ .withState(state)
.withMetricUpdates(delegateResult.getLogicalMetricUpdates())
.addOutput(Lists.newArrayList(delegateResult.getOutputBundles()));
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index d9e7ac2..8c0872d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -464,8 +464,9 @@
public synchronized WatermarkUpdate refresh() {
Instant oldWatermark = currentWatermark.get();
Instant newWatermark =
- INSTANT_ORDERING.min(
- inputWatermark.get(), inputWatermark.getEarliestTimerTimestamp(), holds.getMinHold());
+ INSTANT_ORDERING.min(
+ inputWatermark.get(), holds.getMinHold(), inputWatermark.getEarliestTimerTimestamp());
+
newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark);
currentWatermark.set(newWatermark);
return updateAndTrace(getName(), oldWatermark, newWatermark);