[Runtime] Fix checkpoint causing empty results in PipelineServiceExecutorContext (#543) (#544)

* fix checkpoint causing empty results in interactive query
* skip checkpoint when context is PipelineServiceExecutorContext
diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-pipeline/src/main/java/com/antgroup/geaflow/runtime/pipeline/runner/PipelineRunner.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-pipeline/src/main/java/com/antgroup/geaflow/runtime/pipeline/runner/PipelineRunner.java
index bcdfc26..c08bae8 100644
--- a/geaflow/geaflow-core/geaflow-runtime/geaflow-pipeline/src/main/java/com/antgroup/geaflow/runtime/pipeline/runner/PipelineRunner.java
+++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-pipeline/src/main/java/com/antgroup/geaflow/runtime/pipeline/runner/PipelineRunner.java
@@ -106,11 +106,13 @@
 
             Map<Integer, List<ExecutionTask>> vertex2Tasks = ExecutionCycleTaskAssigner.assign(graph);
 
+            // Skip checkpoint if it's a PipelineServiceExecutorContext
+            boolean skipCheckpoint = pipelineExecutorContext instanceof PipelineServiceExecutorContext;
             IExecutionCycle cycle = ExecutionCycleBuilder.buildExecutionCycle(graph, vertex2Tasks,
                 pipelineContext.getConfig(), ExecutionIdGenerator.getInstance().generateId(),
                 pipelineExecutorContext.getPipelineTaskId(), pipelineExecutorContext.getPipelineTaskName(),
                 ExecutionIdGenerator.getInstance().generateId(), pipelineExecutorContext.getDriverId(),
-                pipelineExecutorContext.getDriverIndex());
+                pipelineExecutorContext.getDriverIndex(), skipCheckpoint);
             return CycleSchedulerContextFactory.create(cycle, null);
         });
         return context;
diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilder.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilder.java
index 7fc4292..3fffd47 100644
--- a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilder.java
+++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilder.java
@@ -45,7 +45,8 @@
                                                       String name,
                                                       long schedulerId,
                                                       String driverId,
-                                                      int driverIndex) {
+                                                      int driverIndex,
+                                                      boolean skipCheckpoint) {
 
         int flyingCount = executionGraph.getCycleGroupMeta().getFlyingCount();
         long iterationCount = executionGraph.getCycleGroupMeta().getIterationCount();
@@ -54,7 +55,7 @@
         for (ExecutionVertexGroup vertexGroup : executionGraph.getVertexGroupMap().values()) {
             ExecutionNodeCycle nodeCycle = buildExecutionCycle(vertexGroup,
                 vertex2Tasks, config, pipelineId, pipelineTaskId, name, schedulerId, driverId, driverIndex);
-            graphCycle.addCycle(nodeCycle);
+            graphCycle.addCycle(nodeCycle, skipCheckpoint);
         }
         return graphCycle;
     }
diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionGraphCycle.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionGraphCycle.java
index d59c1e6..01a2c77 100644
--- a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionGraphCycle.java
+++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionGraphCycle.java
@@ -68,7 +68,7 @@
         return haLevel;
     }
 
-    public void addCycle(IExecutionCycle cycle) {
+    public void addCycle(IExecutionCycle cycle, boolean skipCheckpoint) {
         if (cycleMap.containsKey(cycle.getCycleId())) {
             throw new GeaflowRuntimeException(String.format("cycle %d already added", cycle.getCycleId()));
         }
@@ -80,8 +80,8 @@
         cycleParents.get(cycle.getCycleId()).addAll(nodeCycle.getVertexGroup().getParentVertexGroupIds());
         cycleChildren.get(cycle.getCycleId()).addAll(nodeCycle.getVertexGroup().getChildrenVertexGroupIds());
 
-        if (iterationCount > 1 && haLevel != HighAvailableLevel.CHECKPOINT
-            && (cycle.getType() == ExecutionCycleType.ITERATION || cycle.getType() == ExecutionCycleType.ITERATION_WITH_AGG)) {
+        if (!skipCheckpoint && (iterationCount > 1 && haLevel != HighAvailableLevel.CHECKPOINT
+            && (cycle.getType() == ExecutionCycleType.ITERATION || cycle.getType() == ExecutionCycleType.ITERATION_WITH_AGG))) {
             haLevel = HighAvailableLevel.CHECKPOINT;
         }
     }
diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/ExecutionGraphCycleSchedulerTest.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/ExecutionGraphCycleSchedulerTest.java
index fdbe4a3..77baf5d 100644
--- a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/ExecutionGraphCycleSchedulerTest.java
+++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/ExecutionGraphCycleSchedulerTest.java
@@ -206,7 +206,7 @@
             1, 10, configuration, "driver_id", 0);
         ExecutionNodeCycle nodeCycle = buildMockIterationNodeCycle(configuration);
 
-        graphCycle.addCycle(nodeCycle);
+        graphCycle.addCycle(nodeCycle, false);
         try {
             ReflectionUtil.setField(graphCycle, "haLevel", CHECKPOINT);
         } catch (Exception e) {
diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/context/ExecutionGraphCycleSchedulerContextTest.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/context/ExecutionGraphCycleSchedulerContextTest.java
index 656d9ad..ecf261d 100644
--- a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/context/ExecutionGraphCycleSchedulerContextTest.java
+++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/context/ExecutionGraphCycleSchedulerContextTest.java
@@ -40,7 +40,7 @@
         parentContext.init(1);
 
         ExecutionNodeCycle iterationCycle = buildPipelineCycle(false, finishIterationId);
-        graph.addCycle(iterationCycle);
+        graph.addCycle(iterationCycle, false);
         CheckpointSchedulerContext iterationContext = new CheckpointSchedulerContext(iterationCycle, parentContext);
         iterationContext.init();
 
diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilderTest.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilderTest.java
index 664ec30..b7ea8b0 100644
--- a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilderTest.java
+++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilderTest.java
@@ -35,7 +35,7 @@
         meta.setFlyingCount(0);
         meta.setGroupType(CycleGroupType.windowed);
 
-        ExecutionCycleBuilder.buildExecutionCycle(executionGraph, null, null, 0, 0, null, 0,null, 0);
+        ExecutionCycleBuilder.buildExecutionCycle(executionGraph, null, null, 0, 0, null, 0,null, 0, false);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class,
@@ -45,7 +45,7 @@
         CycleGroupMeta meta = executionGraph.getCycleGroupMeta();
         meta.setIterationCount(0);
         meta.setGroupType(CycleGroupType.windowed);
-        ExecutionCycleBuilder.buildExecutionCycle(executionGraph, null, null, 0, 0, null, 0, null, 0);
+        ExecutionCycleBuilder.buildExecutionCycle(executionGraph, null, null, 0, 0, null, 0, null, 0, false);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class,
@@ -62,7 +62,7 @@
         executionGraph.getVertexGroupMap().put(1, vertexGroup);
 
         ExecutionCycleBuilder.buildExecutionCycle(executionGraph, null, null, 0, 0,  null, 0, null,
-            0);
+            0, false);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class,
@@ -79,6 +79,6 @@
         executionGraph.getVertexGroupMap().put(1, vertexGroup);
 
         ExecutionCycleBuilder.buildExecutionCycle(executionGraph, null, null, 0, 0,  null, 0, null,
-            0);
+            0, false);
     }
 }