ExpiredJob Workaround for Selective Update Race Conditions (#1470)

This PR implements a workaround for determining expired jobs
that avoids selective update race condition: if JobConfig doesn't
exist in the cache, check ZK directly.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index 82def48..0b0d7d4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -81,8 +81,8 @@
         if (nextPurgeTime <= currentTime) {
           nextPurgeTime = currentTime + purgeInterval;
           // Find jobs that are ready to be purged
-          Set<String> expiredJobs =
-              TaskUtil.getExpiredJobsFromCache(dataProvider, workflowConfig, workflowContext);
+          Set<String> expiredJobs = TaskUtil
+              .getExpiredJobsFromCache(dataProvider, workflowConfig, workflowContext, manager);
           if (!expiredJobs.isEmpty()) {
             expiredJobsMap.put(workflowConfig.getWorkflowId(), expiredJobs);
           }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index fa89cde..890b151 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -789,7 +789,7 @@
    */
   public static Set<String> getExpiredJobsFromCache(
       WorkflowControllerDataProvider workflowControllerDataProvider, WorkflowConfig workflowConfig,
-      WorkflowContext workflowContext) {
+      WorkflowContext workflowContext, HelixManager manager) {
     Set<String> expiredJobs = new HashSet<>();
     Map<String, TaskState> jobStates = workflowContext.getJobStates();
     for (String job : workflowConfig.getJobDag().getAllNodes()) {
@@ -797,6 +797,11 @@
         continue;
       }
       JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(job);
+      // TODO: Temporary solution for cache selective update race conditions
+      if (jobConfig == null) {
+        jobConfig = TaskUtil.getJobConfig(manager, job);
+      }
+
       JobContext jobContext = workflowControllerDataProvider.getJobContext(job);
       TaskState jobState = jobStates.get(job);
       if (isJobExpired(job, jobConfig, jobContext, jobState)) {
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
index a85ba67..c930954 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTaskUtil.java
@@ -98,7 +98,7 @@
     expectedJobs.add(workflowName + "_Job_3");
     Assert.assertEquals(TaskUtil
         .getExpiredJobsFromCache(workflowControllerDataProvider, jobQueue.getWorkflowConfig(),
-            workflowContext), expectedJobs);
+            workflowContext, _manager), expectedJobs);
   }
 
   @Test
@@ -188,7 +188,7 @@
     expectedJobs.add(workflowName + "_Job_8");
     Assert.assertEquals(TaskUtil
         .getExpiredJobsFromCache(workflowControllerDataProvider, workflow.getWorkflowConfig(),
-            workflowContext), expectedJobs);
+            workflowContext, _manager), expectedJobs);
   }
 
   @Test