ExpiredJob Workaround for Selective Update Race Conditions (#1470)
This PR implements a workaround for determining expired jobs
that avoids selective update race condition: if JobConfig doesn't
exist in the cache, check ZK directly.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index 82def48..0b0d7d4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -81,8 +81,8 @@
if (nextPurgeTime <= currentTime) {
nextPurgeTime = currentTime + purgeInterval;
// Find jobs that are ready to be purged
- Set<String> expiredJobs =
- TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig, workflowContext);
+ Set<String> expiredJobs = TaskUtil
+ .getExpiredJobsFromCache(dataProvider, workflowConfig, workflowContext, manager);
if (!expiredJobs.isEmpty()) {
expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs);
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index fa89cde..890b151 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -789,7 +789,7 @@
*/
public static Set<String> getExpiredJobsFromCache(
WorkflowControllerDataProvider workflowControllerDataProvider, WorkflowConfig workflowConfig,
- WorkflowContext workflowContext) {
+ WorkflowContext workflowContext, HelixManager manager) {
Set<String> expiredJobs = new HashSet<>();
Map<String, TaskState> jobStates = workflowContext.getJobStates();
for (String job : workflowConfig.getJobDag().getAllNodes()) {
@@ -797,6 +797,11 @@
continue;
}
JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job);
+ // TODO: Temporary solution for cache selective update race conditions
+ if (jobConfig == null) {
+ jobConfig = TaskUtil.getJobConfig(manager, job);
+ }
+
JobContext jobContext = workflowControllerDataProvider.getJobContext(job);
TaskState jobState = jobStates.get(job);
if (isJobExpired(job, jobConfig, jobContext, jobState)) {
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
index a85ba67..c930954 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
@@ -98,7 +98,7 @@
expectedJobs.add(workflowName + "_Job_3");
Assert.assertEquals(TaskUtil
.getExpiredJobsFromCache(workflowControllerDataProvider, jobQueue.getWorkflowConfig(),
- workflowContext), expectedJobs);
+ workflowContext, _manager), expectedJobs);
}
@Test
@@ -188,7 +188,7 @@
expectedJobs.add(workflowName + "_Job_8");
Assert.assertEquals(TaskUtil
.getExpiredJobsFromCache(workflowControllerDataProvider, workflow.getWorkflowConfig(),
- workflowContext), expectedJobs);
+ workflowContext, _manager), expectedJobs);
}
@Test