Beam-2535 : Added original PR watermark hold functionality.
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index e1080e5..bf5c0bb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -44,12 +44,14 @@
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -71,6 +73,8 @@
 
   private final ParDoEvaluatorFactory<KV<K, InputT>, OutputT> delegateFactory;
 
+  private final EvaluationContext evaluationContext;
+
   StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext, PipelineOptions options) {
     this.delegateFactory =
         new ParDoEvaluatorFactory<>(
@@ -92,6 +96,8 @@
         CacheBuilder.newBuilder()
             .weakValues()
             .build(new CleanupSchedulingLoader(evaluationContext));
+
+    this.evaluationContext = evaluationContext;
   }
 
   @Override
@@ -146,7 +152,13 @@
             application.getTransform().getSchemaInformation(),
             application.getTransform().getSideInputMapping());
 
-    return new StatefulParDoEvaluator<>(delegateEvaluator);
+    DirectStepContext stepContext =
+            evaluationContext
+                    .getExecutionContext(application, inputBundle.getKey())
+                    .getStepContext(evaluationContext.getStepName(application));
+
+    stepContext.stateInternals().commit();
+    return new StatefulParDoEvaluator<>(delegateEvaluator, stepContext);
   }
 
   private class CleanupSchedulingLoader
@@ -241,10 +253,13 @@
     private final List<TimerData> pushedBackTimers = new ArrayList<>();
     private final DirectTimerInternals timerInternals;
 
+    DirectStepContext stepContext;
+
     public StatefulParDoEvaluator(
-        DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator) {
+        DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator, DirectStepContext stepContext) {
       this.delegateEvaluator = delegateEvaluator;
       this.timerInternals = delegateEvaluator.getParDoEvaluator().getStepContext().timerInternals();
+      this.stepContext = stepContext;
     }
 
     @Override
@@ -269,6 +284,14 @@
         WindowNamespace<?> windowNamespace = (WindowNamespace) timer.getNamespace();
         BoundedWindow timerWindow = windowNamespace.getWindow();
         delegateEvaluator.onTimer(timer, timerWindow);
+
+        StateTag<WatermarkHoldState> timerTag =
+                StateTags.makeSystemTagInternal(
+                        StateTags.watermarkStateInternal(
+                                "timer-" + timer.getTimerId(), TimestampCombiner.EARLIEST));
+        stepContext.stateInternals().state(timer.getNamespace(), timerTag).clear();
+        stepContext.stateInternals().commit();
+
         if (timerInternals.containsUpdateForTimeBefore(currentInputWatermark)) {
           break;
         }
@@ -278,15 +301,44 @@
 
     @Override
     public TransformResult<KeyedWorkItem<K, KV<K, InputT>>> finishBundle() throws Exception {
+
       TransformResult<KV<K, InputT>> delegateResult = delegateEvaluator.finishBundle();
+      Boolean isTimerDeclared = false;
+      for (TimerData timerData : delegateResult.getTimerUpdate().getSetTimers()) {
+        StateTag<WatermarkHoldState> timerTag =
+                StateTags.makeSystemTagInternal(
+                        StateTags.watermarkStateInternal(
+                                "timer-" + timerData.getTimerId(), TimestampCombiner.EARLIEST));
+
+        stepContext
+                .stateInternals()
+                .state(timerData.getNamespace(), timerTag)
+                .add(timerData.getOutputTimestamp());
+        isTimerDeclared = true;
+      }
+
+      CopyOnAccessInMemoryStateInternals state;
+      Instant watermarkHold;
+
+      if(isTimerDeclared && delegateResult.getState() != null){
+        state = delegateResult.getState();
+        watermarkHold = stepContext.commitState().getEarliestWatermarkHold();
+      } else if(isTimerDeclared){
+        state = stepContext.commitState();
+        watermarkHold = state.getEarliestWatermarkHold();
+      }else {
+        state = delegateResult.getState();
+        watermarkHold = delegateResult.getWatermarkHold();
+      }
+
       TimerUpdate timerUpdate =
           delegateResult.getTimerUpdate().withPushedBackTimers(pushedBackTimers);
       pushedBackTimers.clear();
       StepTransformResult.Builder<KeyedWorkItem<K, KV<K, InputT>>> regroupedResult =
           StepTransformResult.<KeyedWorkItem<K, KV<K, InputT>>>withHold(
-                  delegateResult.getTransform(), delegateResult.getWatermarkHold())
+                  delegateResult.getTransform(), watermarkHold)
               .withTimerUpdate(timerUpdate)
-              .withState(delegateResult.getState())
+              .withState(state)
               .withMetricUpdates(delegateResult.getLogicalMetricUpdates())
               .addOutput(Lists.newArrayList(delegateResult.getOutputBundles()));
 
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index d9e7ac2..8c0872d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -464,8 +464,9 @@
     public synchronized WatermarkUpdate refresh() {
       Instant oldWatermark = currentWatermark.get();
       Instant newWatermark =
-          INSTANT_ORDERING.min(
-              inputWatermark.get(), inputWatermark.getEarliestTimerTimestamp(), holds.getMinHold());
+              INSTANT_ORDERING.min(
+                      inputWatermark.get(), holds.getMinHold(), inputWatermark.getEarliestTimerTimestamp());
+
       newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark);
       currentWatermark.set(newWatermark);
       return updateAndTrace(getName(), oldWatermark, newWatermark);