blob: 601e40e5ff303422ea01f116e64fb80bfc6dd906 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.service.monitoring;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
import com.google.common.collect.Lists;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.test.matchers.service.monitoring.FlowStatusMatch;
import org.apache.gobblin.test.matchers.service.monitoring.JobStatusMatch;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;
public class FlowStatusGeneratorTest {
@Test
public void testIsFlowRunningFirstExecution() {
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
String flowName = "testName";
String flowGroup = "testGroup";
long currFlowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(null);
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, currFlowExecutionId));
}
@Test
public void testIsFlowRunningCompiledPastExecution() {
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
Lists.newArrayList(flowExecutionId));
JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(
jobStatusIterator);
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
// Block the next execution if the prior one is in compiled as it's considered still running
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId + 1));
}
@Test
public void skipFlowConcurrentCheckSameFlowExecutionId() {
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
Lists.newArrayList(flowExecutionId));
JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(
jobStatusIterator);
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
// If the flow is compiled but the flow execution status is the same as the one about to be kicked off, do not consider it as running.
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
}
@Test
public void testIsFlowRunningJobExecutionIgnored() {
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
Lists.newArrayList(flowExecutionId));
//JobStatuses should be ignored, only the flow level status matters.
String job1 = "job1";
String job2 = "job2";
String job3 = "job3";
JobStatus jobStatus1 = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(job1).eventName("COMPLETE").build();
JobStatus jobStatus2 = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(job2).eventName("FAILED").build();
JobStatus jobStatus3 = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(job3).eventName("CANCELLED").build();
JobStatus flowStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("CANCELLED").build();
Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
flowStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId+1));
}
@Test
public void testGetFlowStatusesAcrossGroup() {
final long JOB_EXEC_ID = 987L;
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
// setup: one flow...
String flowGroup = "myFlowGroup";
int countPerFlowName = 2;
String flowName1 = "flowName1";
long flowExecutionId1 = 111L;
ExecutionStatus flowStatus1 = ExecutionStatus.ORCHESTRATED;
// ...with two jobs, each (differently) tagged.
String f0Js1Status = ExecutionStatus.COMPLETE.name();
String f0Js1Tag = "step-1";
String f0Js1JobGroup1 = "job-group-x";
String f0Js1JobName1 = "job-name-a";
JobStatus f1Js0 = createFlowJobStatus(flowGroup, flowName1, flowExecutionId1, flowStatus1);
JobStatus f1Js1 = createJobStatus(flowGroup, flowName1, flowExecutionId1,
f0Js1Status, f0Js1Tag, f0Js1JobGroup1, f0Js1JobName1, JOB_EXEC_ID);
String f0Js2Status = ExecutionStatus.FAILED.name();
String f0Js2Tag = "step-2";
String f0Js2JobGroup1 = "job-group-y";
String f0Js2JobName1 = "job-name-b";
JobStatus f1Js2 = createJobStatus(flowGroup, flowName1, flowExecutionId1,
f0Js2Status, f0Js2Tag, f0Js2JobGroup1, f0Js2JobName1, JOB_EXEC_ID);
// IMPORTANT: result invariants to honor - ordered by ascending flowName, all of same flowName adjacent, therein descending flowExecutionId
// NOTE: Three copies of FlowStatus are needed for repeated use, due to mutable, non-rewinding `Iterator FlowStatus.getJobStatusIterator`
FlowStatus flowStatus = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
FlowStatus flowStatus2 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
FlowStatus flowStatus3 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
Mockito.when(jobStatusRetriever.getFlowStatusesForFlowGroupExecutions("myFlowGroup", 2))
.thenReturn(Collections.singletonList(flowStatus), Collections.singletonList(flowStatus2), Collections.singletonList(flowStatus3)); // (for three invocations)
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
JobStatusMatch.Dependent f0jsmDep1 = JobStatusMatch.Dependent.ofTagged(f0Js1JobGroup1, f0Js1JobName1, JOB_EXEC_ID, f0Js1Status, f0Js1Tag);
JobStatusMatch.Dependent f0jsmDep2 = JobStatusMatch.Dependent.ofTagged(f0Js2JobGroup1, f0Js2JobName1, JOB_EXEC_ID, f0Js2Status, f0Js2Tag);
// verify all jobs returned when no tag constraint
List<FlowStatus> flowStatusesResult = flowStatusGenerator.getFlowStatusesAcrossGroup(flowGroup, countPerFlowName, null);
Assert.assertEquals(flowStatusesResult.size(), 1);
assertThat(flowStatusesResult.get(0), FlowStatusMatch.withDependentJobStatuses(flowGroup, flowName1, flowExecutionId1, flowStatus1,
Arrays.asList(f0jsmDep1, f0jsmDep2)));
// verify 'flow pseudo status' plus first job returned against first job's tag
List<FlowStatus> flowStatusesResult2 = flowStatusGenerator.getFlowStatusesAcrossGroup(flowGroup, countPerFlowName, f0Js1Tag);
Assert.assertEquals(flowStatusesResult2.size(), 1);
assertThat(flowStatusesResult2.get(0), FlowStatusMatch.withDependentJobStatuses(flowGroup, flowName1, flowExecutionId1, flowStatus1,
Arrays.asList(f0jsmDep1)));
// verify 'flow pseudo status' plus second job returned against second job's tag
List<FlowStatus> flowStatusesResult3 = flowStatusGenerator.getFlowStatusesAcrossGroup(flowGroup, countPerFlowName, f0Js2Tag);
Assert.assertEquals(flowStatusesResult3.size(), 1);
assertThat(flowStatusesResult3.get(0), FlowStatusMatch.withDependentJobStatuses(flowGroup, flowName1, flowExecutionId1, flowStatus1,
Arrays.asList(f0jsmDep2)));
}
private FlowStatus createFlowStatus(String flowGroup, String flowName, long flowExecutionId, List<JobStatus> jobStatuses) {
return new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatuses.iterator(),
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatuses.iterator()));
}
private JobStatus createFlowJobStatus(String flowGroup, String flowName, long flowExecutionId, ExecutionStatus status) {
return createJobStatus(flowGroup, flowName, flowExecutionId, status.name(), null,
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, 0L);
}
private JobStatus createJobStatus(String flowGroup, String flowName, long flowExecutionId, String eventName,
String jobTag, String jobGroup, String jobName, long jobExecutionId) {
return JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.eventName(eventName).jobTag(jobTag)
.jobGroup(jobGroup).jobName(jobName).jobExecutionId(jobExecutionId).build();
}
}