Add workflow garbage collector  (#705)

The workflow garbage collection is added which scans all the workflow context,
and delete them if workflow config does not exists.
Test is added which checks the functionality of workflow garbage collection.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index 7b7bc03..9aca3ef 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -51,5 +51,7 @@
         }
       }
     }
+
+    TaskUtil.workflowGarbageCollection(dataProvider, manager);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index aa24c4d..5c93469 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -37,6 +37,7 @@
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.ResourceConfig;
@@ -1036,6 +1037,49 @@
     setNextJobPurgeTime(workflow, currentTime, purgeInterval, rebalanceScheduler, manager);
   }
 
+  /**
+   * The function that loops through the all existing workflow contexts and removes IdealState and
+   * workflow context of the workflow whose workflow config does not exist.
+   * @param dataProvider
+   * @param manager
+   */
+  public static void workflowGarbageCollection(WorkflowControllerDataProvider dataProvider,
+      final HelixManager manager) {
+    // Garbage collections for conditions where workflow context exists but config is missing.
+    Map<String, ZNRecord> contexts = dataProvider.getContexts();
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore();
+
+    Set<String> toBeDeletedWorkflows = new HashSet<>();
+    for (Map.Entry<String, ZNRecord> entry : contexts.entrySet()) {
+      if (entry.getValue() != null
+          && entry.getValue().getId().equals(TaskUtil.WORKFLOW_CONTEXT_KW)) {
+        if (dataProvider.getWorkflowConfig(entry.getKey()) == null) {
+          toBeDeletedWorkflows.add(entry.getKey());
+        }
+      }
+    }
+
+    for (String workflowName : toBeDeletedWorkflows) {
+      LOG.warn(String.format(
+          "WorkflowContext exists for workflow %s. However, Workflow Config is missing! Deleting the WorkflowConfig and IdealState!!",
+          workflowName));
+
+      if (!cleanupWorkflowIdealStateExtView(accessor, workflowName)) {
+        LOG.warn(String.format(
+            "Error occurred while trying to remove workflow idealstate/externalview for %s.",
+            workflowName));
+        continue;
+      }
+
+      if (!removeWorkflowContext(propertyStore, workflowName)) {
+        LOG.warn(String.format("Error occurred while trying to remove workflow context for %s.",
+            workflowName));
+        continue;
+      }
+    }
+  }
+
   private static void setNextJobPurgeTime(String workflow, long currentTime, long purgeInterval,
       RebalanceScheduler rebalanceScheduler, HelixManager manager) {
     long nextPurgeTime = currentTime + purgeInterval;
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
index 6bff49a..ad75e15 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowContextWithoutConfig.java
@@ -23,6 +23,7 @@
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.TaskState;
@@ -37,6 +38,7 @@
 
 /**
  * Test to check workflow context is not created without workflow config.
+ * Test workflow context will be deleted if workflow config has been removed.
  */
 public class TestWorkflowContextWithoutConfig extends TaskTestBase {
   private HelixAdmin _admin;
@@ -49,36 +51,8 @@
 
   @Test
   public void testWorkflowContextWithoutConfig() throws Exception {
-    final long expiryTime = 5000L;
     String workflowName1 = TestHelper.getTestMethodName() + "_1";
-    Workflow.Builder builder1 = new Workflow.Builder(workflowName1);
-
-    // Workflow DAG Schematic:
-    //          JOB0
-    //           /\
-    //          /  \
-    //         /    \
-    //       JOB1   JOB2
-
-    JobConfig.Builder jobBuilder1 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setMaxAttemptsPerTask(1).setWorkflow(workflowName1)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
-
-    JobConfig.Builder jobBuilder2 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setMaxAttemptsPerTask(1).setWorkflow(workflowName1)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
-
-    JobConfig.Builder jobBuilder3 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setMaxAttemptsPerTask(1).setWorkflow(workflowName1)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
-
-    builder1.addParentChildDependency("JOB0", "JOB1");
-    builder1.addParentChildDependency("JOB0", "JOB2");
-    builder1.addJob("JOB0", jobBuilder1);
-    builder1.addJob("JOB1", jobBuilder2);
-    builder1.addJob("JOB2", jobBuilder3);
-    builder1.setExpiry(expiryTime);
-
+    Workflow.Builder builder1 = createSimpleWorkflowBuilder(workflowName1);
     _driver.start(builder1.build());
 
     // Wait until workflow is created and IN_PROGRESS state.
@@ -105,7 +79,7 @@
       WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName1);
       IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, workflowName1);
       return (wCtx == null && wCfg == null && idealState == null);
-    }, 30 * 1000);
+    }, TestHelper.WAIT_DURATION);
     Assert.assertTrue(workflowExpired);
 
     // Write idealState to ZooKeeper
@@ -115,8 +89,7 @@
     // Create and start a new workflow just to make sure pipeline runs several times and context
     // will not be created for workflow1 again
     String workflowName2 = TestHelper.getTestMethodName() + "_2";
-    Workflow.Builder builder2 = new Workflow.Builder(workflowName2);
-    builder2.addJob("JOB0", jobBuilder1);
+    Workflow.Builder builder2 = createSimpleWorkflowBuilder(workflowName2);
     _driver.start(builder2.build());
     _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
 
@@ -126,7 +99,106 @@
       WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName1);
       IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, workflowName1);
       return (wCtx == null && wCfg == null && idealState != null);
-    }, 30 * 1000);
+    }, TestHelper.WAIT_DURATION);
     Assert.assertTrue(workflowContextNotCreated);
   }
+
+  @Test
+  public void testWorkflowContextGarbageCollection() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder builder1 = createSimpleWorkflowBuilder(workflowName);
+    _driver.start(builder1.build());
+
+    // Wait until workflow is created and IN_PROGRESS state.
+    _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
+
+    // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for this
+    // workflow
+    Assert.assertNotNull(_driver.getWorkflowConfig(workflowName));
+    Assert.assertNotNull(_driver.getWorkflowContext(workflowName));
+    Assert.assertNotNull(_admin.getResourceIdealState(CLUSTER_NAME, workflowName));
+
+    String workflowContextPath =
+        "/" + CLUSTER_NAME + "/PROPERTYSTORE/TaskRebalancer/" + workflowName + "/Context";
+
+    ZNRecord record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
+        null, AccessOption.PERSISTENT);
+    Assert.assertNotNull(record);
+
+    // Wait until workflow is completed.
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+    // Verify that WorkflowConfig, WorkflowContext, and IdealState are removed after workflow got
+    // expired.
+    boolean workflowExpired = TestHelper.verify(() -> {
+      WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
+      WorkflowConfig wCfg = _driver.getWorkflowConfig(workflowName);
+      IdealState idealState = _admin.getResourceIdealState(CLUSTER_NAME, workflowName);
+      return (wCtx == null && wCfg == null && idealState == null);
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(workflowExpired);
+
+
+    _controller.syncStop();
+
+    // Write workflow context to ZooKeeper
+    _manager.getHelixDataAccessor().getBaseDataAccessor().set(workflowContextPath, record,
+        AccessOption.PERSISTENT);
+
+    // Verify context is written back to ZK.
+    record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowContextPath,
+        null, AccessOption.PERSISTENT);
+    Assert.assertNotNull(record);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // Create and start new workflow just to make sure controller is running and new workflow is
+    // scheduled successfully.
+    String workflowName2 = TestHelper.getTestMethodName() + "_2";
+    Workflow.Builder builder2 = createSimpleWorkflowBuilder(workflowName2);
+    _driver.start(builder2.build());
+    _driver.pollForWorkflowState(workflowName2, TaskState.COMPLETED);
+
+    // Verify that WorkflowContext will be deleted
+    boolean contextDeleted = TestHelper.verify(() -> {
+      WorkflowContext wCtx = _driver.getWorkflowContext(workflowName);
+      return (wCtx == null);
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(contextDeleted);
+  }
+
+  Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
+    final long expiryTime = 5000L;
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+
+    // Workflow DAG Schematic:
+    //          JOB0
+    //           /\
+    //          /  \
+    //         /    \
+    //       JOB1   JOB2
+
+    JobConfig.Builder jobBuilder1 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(1).setWorkflow(workflowName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+
+    JobConfig.Builder jobBuilder2 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(1).setWorkflow(workflowName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+
+    JobConfig.Builder jobBuilder3 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(1).setWorkflow(workflowName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"));
+
+    builder.addParentChildDependency("JOB0", "JOB1");
+    builder.addParentChildDependency("JOB0", "JOB2");
+    builder.addJob("JOB0", jobBuilder1);
+    builder.addJob("JOB1", jobBuilder2);
+    builder.addJob("JOB2", jobBuilder3);
+    builder.setExpiry(expiryTime);
+    return builder;
+  }
 }