[GOBBLIN-1437] cleaning/refactoring flowConfig/delete and flowExecutions/delete code

Closes #3275 from arjun4084346/deleteKillSemantics
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
index c045c5a..2e26571 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
@@ -55,6 +55,8 @@
    * a consumer on the physical executor side. */
   Future<? extends SpecProducer<Spec>> getProducer();
 
+  String VERB_KEY = "Verb";
+
   public static enum Verb {
     ADD(1, "add"),
     UPDATE(2, "update"),
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
index 558827f..eb7dfb4 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
@@ -61,4 +61,9 @@
   default Future<?> deserializeAddSpecResponse(String serializedResponse) {
     return new CompletedFuture(serializedResponse, null);
   }
+
+  /** Cancel the job execution identified by jobURI */
+  default Future<?> cancelJob(URI jobURI, Properties properties) {
+      return new CompletedFuture<>(jobURI, null);
+  }
 }
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
index 9085d3a..5f1445f 100644
--- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
@@ -45,7 +45,6 @@
 import org.apache.gobblin.writer.AsyncDataWriter;
 import org.apache.gobblin.writer.WriteCallback;
 
-import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.VERB_KEY;
 import javax.annotation.concurrent.NotThreadSafe;
 import lombok.extern.slf4j.Slf4j;
 
@@ -105,7 +104,7 @@
   public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
 
     AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
-        .setMetadata(ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.DELETE.name()))
+        .setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.DELETE.name()))
         .setProperties(Maps.fromProperties(headers)).build();
 
     log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
@@ -114,6 +113,17 @@
   }
 
   @Override
+  public Future<?> cancelJob(URI deletedSpecURI, Properties properties) {
+    AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
+        .setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.CANCEL.name()))
+        .setProperties(Maps.fromProperties(properties)).build();
+
+    log.info("Cancelling job: " + deletedSpecURI + " using Kafka.");
+
+    return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
+  }
+
+  @Override
   public Future<? extends List<Spec>> listSpecs() {
     throw new UnsupportedOperationException();
   }
@@ -144,7 +154,7 @@
 
       avroJobSpecBuilder.setUri(jobSpec.getUri().toString()).setVersion(jobSpec.getVersion())
           .setDescription(jobSpec.getDescription()).setProperties(Maps.fromProperties(jobSpec.getConfigAsProperties()))
-          .setMetadata(ImmutableMap.of(VERB_KEY, verb.name()));
+          .setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, verb.name()));
 
       if (jobSpec.getTemplateURI().isPresent()) {
         avroJobSpecBuilder.setTemplateUri(jobSpec.getTemplateURI().get().toString());
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 5ca1fed..727c438 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -610,7 +610,7 @@
         props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, serializedFuture);
         sendCancellationEvent(dagNodeToCancel.getValue());
       }
-      DagManagerUtils.getSpecProducer(dagNodeToCancel).deleteSpec(dagNodeToCancel.getValue().getJobSpec().getUri(), props);
+      DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), props);
     }
 
     private void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index f790361..cde02b3 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -365,38 +365,42 @@
     // .. this will work for Identity compiler but not always for multi-hop.
     // Note: Current logic assumes compilation is consistent between all executions
     if (spec instanceof FlowSpec) {
+      //Send the dag to the DagManager to stop it.
+      //Also send it to the SpecProducer to do any cleanup tasks on SpecExecutor.
       if (this.dagManager.isPresent()) {
-        //Send the dag to the DagManager.
         _log.info("Forwarding cancel request for flow URI {} to DagManager.", spec.getUri());
         this.dagManager.get().stopDag(spec.getUri());
-      } else {
-        // If DagManager is not enabled, we need to recompile the flow to find the spec producer,
-        // If compilation results is different, it remove request can go to some different spec producer
-        Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
-
-        if (jobExecutionPlanDag.isEmpty()) {
-          _log.warn("Cannot determine an executor to delete Spec: " + spec);
-          return;
-        }
-
-        // Delete all compiled JobSpecs on their respective Executor
-        for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) {
-          JobExecutionPlan jobExecutionPlan = dagNode.getValue();
-          Spec jobSpec = jobExecutionPlan.getJobSpec();
-          try {
-            SpecProducer<Spec> producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
-            _log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer));
-            producer.deleteSpec(jobSpec.getUri(), headers);
-          } catch (Exception e) {
-            _log.error(String.format("Could not delete JobSpec: %s for flow: %s", jobSpec, spec), e);
-          }
-        }
       }
+      // We need to recompile the flow to find the spec producer,
+      // If compilation result is different, its remove request can go to some different spec producer
+      deleteFromExecutor(spec, headers);
     } else {
       throw new RuntimeException("Spec not of type FlowSpec, cannot delete: " + spec);
     }
   }
 
+  private void deleteFromExecutor(Spec spec, Properties headers) {
+    Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+    if (jobExecutionPlanDag.isEmpty()) {
+      _log.warn("Cannot determine an executor to delete Spec: " + spec);
+      return;
+    }
+
+    // Delete all compiled JobSpecs on their respective Executor
+    for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) {
+      JobExecutionPlan jobExecutionPlan = dagNode.getValue();
+      Spec jobSpec = jobExecutionPlan.getJobSpec();
+      try {
+        SpecProducer<Spec> producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
+        _log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer));
+        producer.deleteSpec(jobSpec.getUri(), headers);
+      } catch (Exception e) {
+        _log.error(String.format("Could not delete JobSpec: %s for flow: %s", jobSpec, spec), e);
+      }
+    }
+  }
+
   private FlowStatusGenerator buildFlowStatusGenerator(Config config) {
     JobStatusRetriever jobStatusRetriever;
     try {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index f9b8bd9..67ade43 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -106,15 +106,15 @@
     AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
         assertTrue(input -> dagManager.dagManagerThreads[queue3].dagToJobs.containsKey(dagId3), ERROR_MESSAGE);
 
-    // mock delete spec
+    // mock cancel job
     dagManager.stopDag(FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowGroup("group0").setFlowName("flow0")));
     dagManager.stopDag(FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowGroup("group1").setFlowName("flow1")));
     dagManager.stopDag(FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowGroup("group2").setFlowName("flow2")));
 
-    // verify deleteSpec() of specProducer is called once
-    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag1), ERROR_MESSAGE);
-    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new DeletePredicate(dag2), ERROR_MESSAGE);
-    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new DeletePredicate(dag3), ERROR_MESSAGE);
+    // verify cancelJob() of specProducer is called once
+    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new CancelPredicate(dag1), ERROR_MESSAGE);
+    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new CancelPredicate(dag2), ERROR_MESSAGE);
+    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new CancelPredicate(dag3), ERROR_MESSAGE);
 
     // mock flow cancellation tracking event
     Mockito.doReturn(DagManagerTest.getMockJobStatus("flow0", "group0", flowExecutionId1,
@@ -165,10 +165,10 @@
     // check the SLA value
     Assert.assertEquals(dagManager.dagManagerThreads[queue].dagToSLA.get(dagId).longValue(), DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
 
-    // verify deleteSpec() of the specProducer is not called once
+    // verify cancelJob() of the specProducer is not called once
     // which means job cancellation was triggered
     try {
-      AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag), ERROR_MESSAGE);
+      AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new CancelPredicate(dag), ERROR_MESSAGE);
     } catch (TimeoutException e) {
       AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
           assertTrue(input -> dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId), ERROR_MESSAGE);
@@ -209,9 +209,9 @@
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
         assertTrue(input -> dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId), ERROR_MESSAGE);
 
-    // verify deleteSpec() of specProducer is called once
+    // verify cancelJob() of specProducer is called once
     // which means job cancellation was triggered
-    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag), ERROR_MESSAGE);
+    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new CancelPredicate(dag), ERROR_MESSAGE);
 
     // check removal of dag from dagToSLA map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -248,9 +248,9 @@
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
         assertTrue(input -> dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId), ERROR_MESSAGE);
 
-    // verify deleteSpec() of specProducer is called once
+    // verify cancelJob() of specProducer is called once
     // which means job cancellation was triggered
-    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag), ERROR_MESSAGE);
+    AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new CancelPredicate(dag), ERROR_MESSAGE);
 
     // check removal of dag from dagToSLA map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -277,16 +277,16 @@
   }
 }
 
-class DeletePredicate implements Predicate<Void> {
+class CancelPredicate implements Predicate<Void> {
   private final Dag<JobExecutionPlan> dag;
-  public DeletePredicate(Dag<JobExecutionPlan> dag) {
+  public CancelPredicate(Dag<JobExecutionPlan> dag) {
     this.dag = dag;
   }
 
   @Override
   public boolean apply(@Nullable Void input) {
     try {
-      verify(dag.getNodes().get(0).getValue().getSpecExecutor().getProducer().get()).deleteSpec(any(), any());
+      verify(dag.getNodes().get(0).getValue().getSpecExecutor().getProducer().get()).cancelJob(any(), any());
     } catch (Throwable e) {
       return false;
     }