[Runtime] Fix checkpoint causing empty results in PipelineServiceExecutorContext (#543) (#544)
* fix checkpoint causing empty results in interactive query
* skip checkpoint when context is PipelineServiceExecutorContext
diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-pipeline/src/main/java/com/antgroup/geaflow/runtime/pipeline/runner/PipelineRunner.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-pipeline/src/main/java/com/antgroup/geaflow/runtime/pipeline/runner/PipelineRunner.java
index bcdfc26..c08bae8 100644
--- a/geaflow/geaflow-core/geaflow-runtime/geaflow-pipeline/src/main/java/com/antgroup/geaflow/runtime/pipeline/runner/PipelineRunner.java
+++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-pipeline/src/main/java/com/antgroup/geaflow/runtime/pipeline/runner/PipelineRunner.java
@@ -106,11 +106,13 @@
Map<Integer, List<ExecutionTask>> vertex2Tasks = ExecutionCycleTaskAssigner.assign(graph);
+ // Skip checkpoint if it's a PipelineServiceExecutorContext
+ boolean skipCheckpoint = pipelineExecutorContext instanceof PipelineServiceExecutorContext;
IExecutionCycle cycle = ExecutionCycleBuilder.buildExecutionCycle(graph, vertex2Tasks,
pipelineContext.getConfig(), ExecutionIdGenerator.getInstance().generateId(),
pipelineExecutorContext.getPipelineTaskId(), pipelineExecutorContext.getPipelineTaskName(),
ExecutionIdGenerator.getInstance().generateId(), pipelineExecutorContext.getDriverId(),
- pipelineExecutorContext.getDriverIndex());
+ pipelineExecutorContext.getDriverIndex(), skipCheckpoint);
return CycleSchedulerContextFactory.create(cycle, null);
});
return context;
diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilder.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilder.java
index 7fc4292..3fffd47 100644
--- a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilder.java
+++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilder.java
@@ -45,7 +45,8 @@
String name,
long schedulerId,
String driverId,
- int driverIndex) {
+ int driverIndex,
+ boolean skipCheckpoint) {
int flyingCount = executionGraph.getCycleGroupMeta().getFlyingCount();
long iterationCount = executionGraph.getCycleGroupMeta().getIterationCount();
@@ -54,7 +55,7 @@
for (ExecutionVertexGroup vertexGroup : executionGraph.getVertexGroupMap().values()) {
ExecutionNodeCycle nodeCycle = buildExecutionCycle(vertexGroup,
vertex2Tasks, config, pipelineId, pipelineTaskId, name, schedulerId, driverId, driverIndex);
- graphCycle.addCycle(nodeCycle);
+ graphCycle.addCycle(nodeCycle, skipCheckpoint);
}
return graphCycle;
}
diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionGraphCycle.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionGraphCycle.java
index d59c1e6..01a2c77 100644
--- a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionGraphCycle.java
+++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/main/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionGraphCycle.java
@@ -68,7 +68,7 @@
return haLevel;
}
- public void addCycle(IExecutionCycle cycle) {
+ public void addCycle(IExecutionCycle cycle, boolean skipCheckpoint) {
if (cycleMap.containsKey(cycle.getCycleId())) {
throw new GeaflowRuntimeException(String.format("cycle %d already added", cycle.getCycleId()));
}
@@ -80,8 +80,8 @@
cycleParents.get(cycle.getCycleId()).addAll(nodeCycle.getVertexGroup().getParentVertexGroupIds());
cycleChildren.get(cycle.getCycleId()).addAll(nodeCycle.getVertexGroup().getChildrenVertexGroupIds());
- if (iterationCount > 1 && haLevel != HighAvailableLevel.CHECKPOINT
- && (cycle.getType() == ExecutionCycleType.ITERATION || cycle.getType() == ExecutionCycleType.ITERATION_WITH_AGG)) {
+ if (!skipCheckpoint && (iterationCount > 1 && haLevel != HighAvailableLevel.CHECKPOINT
+ && (cycle.getType() == ExecutionCycleType.ITERATION || cycle.getType() == ExecutionCycleType.ITERATION_WITH_AGG))) {
haLevel = HighAvailableLevel.CHECKPOINT;
}
}
diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/ExecutionGraphCycleSchedulerTest.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/ExecutionGraphCycleSchedulerTest.java
index fdbe4a3..77baf5d 100644
--- a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/ExecutionGraphCycleSchedulerTest.java
+++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/ExecutionGraphCycleSchedulerTest.java
@@ -206,7 +206,7 @@
1, 10, configuration, "driver_id", 0);
ExecutionNodeCycle nodeCycle = buildMockIterationNodeCycle(configuration);
- graphCycle.addCycle(nodeCycle);
+ graphCycle.addCycle(nodeCycle, false);
try {
ReflectionUtil.setField(graphCycle, "haLevel", CHECKPOINT);
} catch (Exception e) {
diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/context/ExecutionGraphCycleSchedulerContextTest.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/context/ExecutionGraphCycleSchedulerContextTest.java
index 656d9ad..ecf261d 100644
--- a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/context/ExecutionGraphCycleSchedulerContextTest.java
+++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/context/ExecutionGraphCycleSchedulerContextTest.java
@@ -40,7 +40,7 @@
parentContext.init(1);
ExecutionNodeCycle iterationCycle = buildPipelineCycle(false, finishIterationId);
- graph.addCycle(iterationCycle);
+ graph.addCycle(iterationCycle, false);
CheckpointSchedulerContext iterationContext = new CheckpointSchedulerContext(iterationCycle, parentContext);
iterationContext.init();
diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilderTest.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilderTest.java
index 664ec30..b7ea8b0 100644
--- a/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilderTest.java
+++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/src/test/java/com/antgroup/geaflow/runtime/core/scheduler/cycle/ExecutionCycleBuilderTest.java
@@ -35,7 +35,7 @@
meta.setFlyingCount(0);
meta.setGroupType(CycleGroupType.windowed);
- ExecutionCycleBuilder.buildExecutionCycle(executionGraph, null, null, 0, 0, null, 0,null, 0);
+ ExecutionCycleBuilder.buildExecutionCycle(executionGraph, null, null, 0, 0, null, 0,null, 0, false);
}
@Test(expectedExceptions = IllegalArgumentException.class,
@@ -45,7 +45,7 @@
CycleGroupMeta meta = executionGraph.getCycleGroupMeta();
meta.setIterationCount(0);
meta.setGroupType(CycleGroupType.windowed);
- ExecutionCycleBuilder.buildExecutionCycle(executionGraph, null, null, 0, 0, null, 0, null, 0);
+ ExecutionCycleBuilder.buildExecutionCycle(executionGraph, null, null, 0, 0, null, 0, null, 0, false);
}
@Test(expectedExceptions = IllegalArgumentException.class,
@@ -62,7 +62,7 @@
executionGraph.getVertexGroupMap().put(1, vertexGroup);
ExecutionCycleBuilder.buildExecutionCycle(executionGraph, null, null, 0, 0, null, 0, null,
- 0);
+ 0, false);
}
@Test(expectedExceptions = IllegalArgumentException.class,
@@ -79,6 +79,6 @@
executionGraph.getVertexGroupMap().put(1, vertexGroup);
ExecutionCycleBuilder.buildExecutionCycle(executionGraph, null, null, 0, 0, null, 0, null,
- 0);
+ 0, false);
}
}