[GOBBLIN-1437] cleaning/refactoring flowConfig/delete and flowExecutions/delete code
Closes #3275 from arjun4084346/deleteKillSemantics
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
index c045c5a..2e26571 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
@@ -55,6 +55,8 @@
* a consumer on the physical executor side. */
Future<? extends SpecProducer<Spec>> getProducer();
+ String VERB_KEY = "Verb";
+
public static enum Verb {
ADD(1, "add"),
UPDATE(2, "update"),
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
index 558827f..eb7dfb4 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
@@ -61,4 +61,9 @@
default Future<?> deserializeAddSpecResponse(String serializedResponse) {
return new CompletedFuture(serializedResponse, null);
}
+
+ /** Cancel the job execution identified by jobURI */
+ default Future<?> cancelJob(URI jobURI, Properties properties) {
+ return new CompletedFuture<>(jobURI, null);
+ }
}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
index 9085d3a..5f1445f 100644
--- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
@@ -45,7 +45,6 @@
import org.apache.gobblin.writer.AsyncDataWriter;
import org.apache.gobblin.writer.WriteCallback;
-import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.VERB_KEY;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
@@ -105,7 +104,7 @@
public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
- .setMetadata(ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.DELETE.name()))
+ .setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.DELETE.name()))
.setProperties(Maps.fromProperties(headers)).build();
log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
@@ -114,6 +113,17 @@
}
@Override
+ public Future<?> cancelJob(URI deletedSpecURI, Properties properties) {
+ AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
+ .setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.CANCEL.name()))
+ .setProperties(Maps.fromProperties(properties)).build();
+
+ log.info("Cancelling job: " + deletedSpecURI + " using Kafka.");
+
+ return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
+ }
+
+ @Override
public Future<? extends List<Spec>> listSpecs() {
throw new UnsupportedOperationException();
}
@@ -144,7 +154,7 @@
avroJobSpecBuilder.setUri(jobSpec.getUri().toString()).setVersion(jobSpec.getVersion())
.setDescription(jobSpec.getDescription()).setProperties(Maps.fromProperties(jobSpec.getConfigAsProperties()))
- .setMetadata(ImmutableMap.of(VERB_KEY, verb.name()));
+ .setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, verb.name()));
if (jobSpec.getTemplateURI().isPresent()) {
avroJobSpecBuilder.setTemplateUri(jobSpec.getTemplateURI().get().toString());
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 5ca1fed..727c438 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -610,7 +610,7 @@
props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, serializedFuture);
sendCancellationEvent(dagNodeToCancel.getValue());
}
- DagManagerUtils.getSpecProducer(dagNodeToCancel).deleteSpec(dagNodeToCancel.getValue().getJobSpec().getUri(), props);
+ DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), props);
}
private void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index f790361..cde02b3 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -365,38 +365,42 @@
// .. this will work for Identity compiler but not always for multi-hop.
// Note: Current logic assumes compilation is consistent between all executions
if (spec instanceof FlowSpec) {
+ //Send the dag to the DagManager to stop it.
+ //Also send it to the SpecProducer to do any cleanup tasks on SpecExecutor.
if (this.dagManager.isPresent()) {
- //Send the dag to the DagManager.
_log.info("Forwarding cancel request for flow URI {} to DagManager.", spec.getUri());
this.dagManager.get().stopDag(spec.getUri());
- } else {
- // If DagManager is not enabled, we need to recompile the flow to find the spec producer,
- // If compilation results is different, it remove request can go to some different spec producer
- Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
-
- if (jobExecutionPlanDag.isEmpty()) {
- _log.warn("Cannot determine an executor to delete Spec: " + spec);
- return;
- }
-
- // Delete all compiled JobSpecs on their respective Executor
- for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) {
- JobExecutionPlan jobExecutionPlan = dagNode.getValue();
- Spec jobSpec = jobExecutionPlan.getJobSpec();
- try {
- SpecProducer<Spec> producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
- _log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer));
- producer.deleteSpec(jobSpec.getUri(), headers);
- } catch (Exception e) {
- _log.error(String.format("Could not delete JobSpec: %s for flow: %s", jobSpec, spec), e);
- }
- }
}
+ // We need to recompile the flow to find the spec producer,
+ // If compilation result is different, its remove request can go to some different spec producer
+ deleteFromExecutor(spec, headers);
} else {
throw new RuntimeException("Spec not of type FlowSpec, cannot delete: " + spec);
}
}
+ private void deleteFromExecutor(Spec spec, Properties headers) {
+ Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+ if (jobExecutionPlanDag.isEmpty()) {
+ _log.warn("Cannot determine an executor to delete Spec: " + spec);
+ return;
+ }
+
+ // Delete all compiled JobSpecs on their respective Executor
+ for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) {
+ JobExecutionPlan jobExecutionPlan = dagNode.getValue();
+ Spec jobSpec = jobExecutionPlan.getJobSpec();
+ try {
+ SpecProducer<Spec> producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
+ _log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer));
+ producer.deleteSpec(jobSpec.getUri(), headers);
+ } catch (Exception e) {
+ _log.error(String.format("Could not delete JobSpec: %s for flow: %s", jobSpec, spec), e);
+ }
+ }
+ }
+
private FlowStatusGenerator buildFlowStatusGenerator(Config config) {
JobStatusRetriever jobStatusRetriever;
try {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index f9b8bd9..67ade43 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -106,15 +106,15 @@
AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
assertTrue(input -> dagManager.dagManagerThreads[queue3].dagToJobs.containsKey(dagId3), ERROR_MESSAGE);
- // mock delete spec
+ // mock cancel job
dagManager.stopDag(FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowGroup("group0").setFlowName("flow0")));
dagManager.stopDag(FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowGroup("group1").setFlowName("flow1")));
dagManager.stopDag(FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowGroup("group2").setFlowName("flow2")));
- // verify deleteSpec() of specProducer is called once
- AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag1), ERROR_MESSAGE);
- AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new DeletePredicate(dag2), ERROR_MESSAGE);
- AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new DeletePredicate(dag3), ERROR_MESSAGE);
+ // verify cancelJob() of specProducer is called once
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new CancelPredicate(dag1), ERROR_MESSAGE);
+ AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new CancelPredicate(dag2), ERROR_MESSAGE);
+ AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).assertTrue(new CancelPredicate(dag3), ERROR_MESSAGE);
// mock flow cancellation tracking event
Mockito.doReturn(DagManagerTest.getMockJobStatus("flow0", "group0", flowExecutionId1,
@@ -165,10 +165,10 @@
// check the SLA value
Assert.assertEquals(dagManager.dagManagerThreads[queue].dagToSLA.get(dagId).longValue(), DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
- // verify deleteSpec() of the specProducer is not called once
+ // verify cancelJob() of the specProducer is not called once
// which means job cancellation was triggered
try {
- AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag), ERROR_MESSAGE);
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new CancelPredicate(dag), ERROR_MESSAGE);
} catch (TimeoutException e) {
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
assertTrue(input -> dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId), ERROR_MESSAGE);
@@ -209,9 +209,9 @@
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
assertTrue(input -> dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId), ERROR_MESSAGE);
- // verify deleteSpec() of specProducer is called once
+ // verify cancelJob() of specProducer is called once
// which means job cancellation was triggered
- AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag), ERROR_MESSAGE);
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new CancelPredicate(dag), ERROR_MESSAGE);
// check removal of dag from dagToSLA map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -248,9 +248,9 @@
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
assertTrue(input -> dagManager.dagManagerThreads[queue].dagToJobs.containsKey(dagId), ERROR_MESSAGE);
- // verify deleteSpec() of specProducer is called once
+ // verify cancelJob() of specProducer is called once
// which means job cancellation was triggered
- AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new DeletePredicate(dag), ERROR_MESSAGE);
+ AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).assertTrue(new CancelPredicate(dag), ERROR_MESSAGE);
// check removal of dag from dagToSLA map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -277,16 +277,16 @@
}
}
-class DeletePredicate implements Predicate<Void> {
+class CancelPredicate implements Predicate<Void> {
private final Dag<JobExecutionPlan> dag;
- public DeletePredicate(Dag<JobExecutionPlan> dag) {
+ public CancelPredicate(Dag<JobExecutionPlan> dag) {
this.dag = dag;
}
@Override
public boolean apply(@Nullable Void input) {
try {
- verify(dag.getNodes().get(0).getValue().getSpecExecutor().getProducer().get()).deleteSpec(any(), any());
+ verify(dag.getNodes().get(0).getValue().getSpecExecutor().getProducer().get()).cancelJob(any(), any());
} catch (Throwable e) {
return false;
}