[GOBBLIN-1999] Ignore concurrent check if the flow execution ID is the same as the c… (#3874)

* Ignore concurrent check if the flow execution ID is the same as the currently running flow execution ID to handle race condition of concurrent hosts misreporting status

* cleanup
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 0bf8b71..775fbeb 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -152,13 +152,17 @@
    * @param flowGroup
    * @return true, if any jobs of the flow are RUNNING.
    */
-  public boolean isFlowRunning(String flowName, String flowGroup) {
+  public boolean isFlowRunning(String flowName, String flowGroup, long flowExecutionId) {
     List<FlowStatus> flowStatusList = getLatestFlowStatus(flowName, flowGroup, 1, null);
     if (flowStatusList == null || flowStatusList.isEmpty()) {
       return false;
     } else {
       FlowStatus flowStatus = flowStatusList.get(0);
       ExecutionStatus flowExecutionStatus = flowStatus.getFlowExecutionStatus();
+      // If the latest flow status is the current job about to get kicked off, we should ignore this check
+      if (flowStatus.getFlowExecutionId() == flowExecutionId) {
+        return false;
+      }
       return !FINISHED_STATUSES.contains(flowExecutionStatus.name());
     }
   }
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 3801779..601e40e 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -38,25 +38,61 @@
 public class FlowStatusGeneratorTest {
 
   @Test
-  public void testIsFlowRunning() {
+  public void testIsFlowRunningFirstExecution() {
     JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
     String flowName = "testName";
     String flowGroup = "testGroup";
+    long currFlowExecutionId = 1234L;
     when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(null);
 
     FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
-    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, currFlowExecutionId));
+  }
 
-    //If a flow is COMPILED, isFlowRunning() should return true.
+  @Test
+  public void testIsFlowRunningCompiledPastExecution() {
+    JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
+    String flowName = "testName";
+    String flowGroup = "testGroup";
     long flowExecutionId = 1234L;
     when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
         Lists.newArrayList(flowExecutionId));
     JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
     Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus).iterator();
-    when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
-    Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+    when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(
+        jobStatusIterator);
+    FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
+    // Block the next execution if the prior one is in compiled as it's considered still running
+    Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId + 1));
+  }
 
+  @Test
+  public void skipFlowConcurrentCheckSameFlowExecutionId() {
+    JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
+    String flowName = "testName";
+    String flowGroup = "testGroup";
+    long flowExecutionId = 1234L;
+    when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
+        Lists.newArrayList(flowExecutionId));
+    JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+        .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
+    Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus).iterator();
+    when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(
+        jobStatusIterator);
+    FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
+    // If the flow is compiled but the flow execution status is the same as the one about to be kicked off, do not consider it as running.
+    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
+  }
+
+    @Test
+    public void testIsFlowRunningJobExecutionIgnored() {
+    String flowName = "testName";
+    String flowGroup = "testGroup";
+    long flowExecutionId = 1234L;
+    JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
+    when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
+        Lists.newArrayList(flowExecutionId));
     //JobStatuses should be ignored, only the flow level status matters.
     String job1 = "job1";
     String job2 = "job2";
@@ -69,15 +105,17 @@
         .jobName(job3).eventName("CANCELLED").build();
     JobStatus flowStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("CANCELLED").build();
-    jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
+    Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
+    FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
+
     when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
-    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
 
     flowStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         .jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
     jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
     when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
-    Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+    Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId+1));
   }
 
   @Test
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 809779f..618d8b0 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -247,7 +247,7 @@
         TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
         Optional<Dag<JobExecutionPlan>> compiledDagOptional =
             this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup,
-                flowName);
+                flowName, flowMetadata);
 
         if (!compiledDagOptional.isPresent()) {
           Instrumented.markMeter(this.flowOrchestrationFailedMeter);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index cda69eb..7400602 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -76,9 +76,9 @@
     specCompiler.awaitHealthy();
 
     TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
-    Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
-        validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, flowName);
     Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec);
+    Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
+        validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, flowName, flowMetadata);
 
     if (!jobExecutionPlanDagOptional.isPresent()) {
       return Optional.absent();
@@ -89,7 +89,6 @@
       return Optional.absent();
     }
 
-    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get());
     flowCompilationTimer.stop(flowMetadata);
     return jobExecutionPlanDagOptional;
   }
@@ -101,13 +100,18 @@
    * @throws IOException
    */
   public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Config flowConfig, FlowSpec flowSpec,
-      String flowGroup, String flowName) throws IOException {
+      String flowGroup, String flowName, Map<String,String> flowMetadata) throws IOException {
     boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
         ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, isFlowConcurrencyEnabled);
 
     Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(flowSpec);
+    if (jobExecutionPlanDag.isEmpty()) {
+      return Optional.absent();
+    }
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
 
-    if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, allowConcurrentExecution)) {
+    if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, allowConcurrentExecution,
+        Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)))) {
       return Optional.fromNullable(jobExecutionPlanDag);
     } else {
       log.warn("Another instance of flowGroup: {}, flowName: {} running; Skipping flow execution since "
@@ -121,9 +125,7 @@
           quotaManager.releaseQuota(dagNode);
         }
       }
-
       // Send FLOW_FAILED event
-      Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec);
       flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because another instance is running and concurrent "
           + "executions are disabled. Set flow.allowConcurrentExecution to true in the flowSpec to change this behaviour.");
       new TimingEvent(eventSubmitter, TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
@@ -140,8 +142,8 @@
    * @return true if the {@link FlowSpec} allows concurrent executions or if no other instance of the flow is currently RUNNING.
    */
   private boolean isExecutionPermitted(FlowStatusGenerator flowStatusGenerator, String flowName, String flowGroup,
-      boolean allowConcurrentExecution) {
-    return allowConcurrentExecution || !flowStatusGenerator.isFlowRunning(flowName, flowGroup);
+      boolean allowConcurrentExecution, long flowExecutionId) {
+    return allowConcurrentExecution || !flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
   }
 
   /**