[GOBBLIN-1999] Ignore concurrent check if the flow execution ID is the same as the c… (#3874)
* Ignore concurrent check if the flow execution ID is the same as the currently running flow execution ID to handle race condition of concurrent hosts misreporting status
* cleanup
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 0bf8b71..775fbeb 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -152,13 +152,17 @@
* @param flowGroup
* @return true, if any jobs of the flow are RUNNING.
*/
- public boolean isFlowRunning(String flowName, String flowGroup) {
+ public boolean isFlowRunning(String flowName, String flowGroup, long flowExecutionId) {
List<FlowStatus> flowStatusList = getLatestFlowStatus(flowName, flowGroup, 1, null);
if (flowStatusList == null || flowStatusList.isEmpty()) {
return false;
} else {
FlowStatus flowStatus = flowStatusList.get(0);
ExecutionStatus flowExecutionStatus = flowStatus.getFlowExecutionStatus();
+ // If the latest flow status is the current job about to get kicked off, we should ignore this check
+ if (flowStatus.getFlowExecutionId() == flowExecutionId) {
+ return false;
+ }
return !FINISHED_STATUSES.contains(flowExecutionStatus.name());
}
}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 3801779..601e40e 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -38,25 +38,61 @@
public class FlowStatusGeneratorTest {
@Test
- public void testIsFlowRunning() {
+ public void testIsFlowRunningFirstExecution() {
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
String flowName = "testName";
String flowGroup = "testGroup";
+ long currFlowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(null);
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
- Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+ Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, currFlowExecutionId));
+ }
- //If a flow is COMPILED, isFlowRunning() should return true.
+ @Test
+ public void testIsFlowRunningCompiledPastExecution() {
+ JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
+ String flowName = "testName";
+ String flowGroup = "testGroup";
long flowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
Lists.newArrayList(flowExecutionId));
JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus).iterator();
- when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
- Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+ when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(
+ jobStatusIterator);
+ FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
+ // Block the next execution if the prior one is in compiled as it's considered still running
+ Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId + 1));
+ }
+ @Test
+ public void skipFlowConcurrentCheckSameFlowExecutionId() {
+ JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
+ String flowName = "testName";
+ String flowGroup = "testGroup";
+ long flowExecutionId = 1234L;
+ when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
+ Lists.newArrayList(flowExecutionId));
+ JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+ .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
+ Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus).iterator();
+ when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(
+ jobStatusIterator);
+ FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
+ // If the flow is compiled but the flow execution status is the same as the one about to be kicked off, do not consider it as running.
+ Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
+ }
+
+ @Test
+ public void testIsFlowRunningJobExecutionIgnored() {
+ String flowName = "testName";
+ String flowGroup = "testGroup";
+ long flowExecutionId = 1234L;
+ JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
+ when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
+ Lists.newArrayList(flowExecutionId));
//JobStatuses should be ignored, only the flow level status matters.
String job1 = "job1";
String job2 = "job2";
@@ -69,15 +105,17 @@
.jobName(job3).eventName("CANCELLED").build();
JobStatus flowStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("CANCELLED").build();
- jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
+ Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
+ FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
+
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
- Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+ Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
flowStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
- Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+ Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId+1));
}
@Test
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 809779f..618d8b0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -247,7 +247,7 @@
TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
Optional<Dag<JobExecutionPlan>> compiledDagOptional =
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup,
- flowName);
+ flowName, flowMetadata);
if (!compiledDagOptional.isPresent()) {
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index cda69eb..7400602 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -76,9 +76,9 @@
specCompiler.awaitHealthy();
TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
- Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
- validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, flowName);
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec);
+ Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
+ validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, flowName, flowMetadata);
if (!jobExecutionPlanDagOptional.isPresent()) {
return Optional.absent();
@@ -89,7 +89,6 @@
return Optional.absent();
}
- addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get());
flowCompilationTimer.stop(flowMetadata);
return jobExecutionPlanDagOptional;
}
@@ -101,13 +100,18 @@
* @throws IOException
*/
public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Config flowConfig, FlowSpec flowSpec,
- String flowGroup, String flowName) throws IOException {
+ String flowGroup, String flowName, Map<String,String> flowMetadata) throws IOException {
boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, isFlowConcurrencyEnabled);
Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(flowSpec);
+ if (jobExecutionPlanDag.isEmpty()) {
+ return Optional.absent();
+ }
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
- if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, allowConcurrentExecution)) {
+ if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, allowConcurrentExecution,
+ Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)))) {
return Optional.fromNullable(jobExecutionPlanDag);
} else {
log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since "
@@ -121,9 +125,7 @@
quotaManager.releaseQuota(dagNode);
}
}
-
// Send FLOW_FAILED event
- Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec);
flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because another instance is running and concurrent "
+ "executions are disabled. Set flow.allowConcurrentExecution to true in the flowSpec to change this behaviour.");
new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
@@ -140,8 +142,8 @@
* @return true if the {@link FlowSpec} allows concurrent executions or if no other instance of the flow is currently RUNNING.
*/
private boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowName, String flowGroup,
- boolean allowConcurrentExecution) {
- return allowConcurrentExecution || !flowStatusGenerator.isFlowRunning(flowName, flowGroup);
+ boolean allowConcurrentExecution, long flowExecutionId) {
+ return allowConcurrentExecution || !flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
}
/**