blob: 727c438c5f3bf1c7a03367afe487488f41856b42 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.gobblin.service.modules.orchestration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
import org.apache.gobblin.service.monitoring.KillFlowEvent;
import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import static org.apache.gobblin.service.ExecutionStatus.*;
* This class implements a manager to manage the life cycle of a {@link Dag}. A {@link Dag} is submitted to the
* {@link DagManager} by the {@link Orchestrator#orchestrate(Spec)} method. On receiving a {@link Dag}, the
* {@link DagManager} first persists the {@link Dag} to the {@link DagStateStore}, and then submits it to the specific
* {@link DagManagerThread}'s {@link BlockingQueue} based on the flowExecutionId of the Flow.
* This guarantees that each {@link Dag} received by the {@link DagManager} can be recovered in case of a leadership
* change or service restart.
* The implementation of the {@link DagManager} is multi-threaded. Each {@link DagManagerThread} polls the
* {@link BlockingQueue} for new Dag submissions at fixed intervals. It deques any newly submitted Dags and coordinates
* the execution of individual jobs in the Dag. The coordination logic involves polling the {@link JobStatus}es of running
* jobs. Upon completion of a job, it will either schedule the next job in the Dag (on SUCCESS) or mark the Dag as failed
* (on FAILURE). Upon completion of a Dag execution, it will perform the required clean up actions.
* For deleteSpec/cancellation requests for a flow URI, {@link DagManager} finds out the flowExecutionId using
* {@link JobStatusRetriever}, and forwards the request to the {@link DagManagerThread} which handled the addSpec request
* for this flow. We need separate {@link BlockingQueue}s for each {@link DagManagerThread} because
* cancellation needs the information which is stored only in the same {@link DagManagerThread}.
* The {@link DagManager} is active only in the leader mode. To ensure, each {@link Dag} managed by a {@link DagManager} is
* checkpointed to a persistent location. On start up or leadership change,
* the {@link DagManager} loads all the checkpointed {@link Dag}s and adds them to the {@link BlockingQueue}.
* Current implementation supports only FileSystem-based checkpointing of the Dag statuses.
public class DagManager extends AbstractIdleService {
public static final String DEFAULT_FLOW_FAILURE_OPTION =;
public static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager.";
private static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + "jobStatusRetriever";
private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
public static final Integer DEFAULT_NUM_THREADS = 3;
private static final Integer TERMINATION_TIMEOUT = 30;
public static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + "numThreads";
public static final String JOB_STATUS_POLLING_INTERVAL_KEY = DAG_MANAGER_PREFIX + "pollingInterval";
private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = JOB_STATUS_RETRIEVER_KEY + ".class";
private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS = FsJobStatusRetriever.class.getName();
private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + "dagStateStoreClass";
private static final String FAILED_DAG_STATESTORE_PREFIX = "failedDagStateStore";
private static final String FAILED_DAG_RETENTION_TIME_UNIT = FAILED_DAG_STATESTORE_PREFIX + ".retention.timeUnit";
private static final String DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT = "DAYS";
private static final String FAILED_DAG_RETENTION_TIME = FAILED_DAG_STATESTORE_PREFIX + ".retention.time";
private static final long DEFAULT_FAILED_DAG_RETENTION_TIME = 7L;
public static final String FAILED_DAG_POLLING_INTERVAL = FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
private static final String USER_JOB_QUOTA_KEY = DAG_MANAGER_PREFIX + "defaultJobQuota";
private static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
private static final String PER_USER_QUOTA = DAG_MANAGER_PREFIX + "perUserQuota";
private static final String QUOTA_SEPERATOR = ":";
* Action to be performed on a {@link Dag}, in case of a job failure. Currently, we allow 2 modes:
* <ul>
* <li> FINISH_RUNNING, which allows currently running jobs to finish.</li>
* <li> FINISH_ALL_POSSIBLE, which allows every possible job in the Dag to finish, as long as all the dependencies
* of the job are successful.</li>
* </ul>
public enum FailureOption {
private final String failureOption;
FailureOption(final String failureOption) {
this.failureOption = failureOption;
public String toString() {
return this.failureOption;
private BlockingQueue<Dag<JobExecutionPlan>>[] queue;
private BlockingQueue<String>[] cancelQueue;
private BlockingQueue<String>[] resumeQueue;
DagManagerThread[] dagManagerThreads;
private ScheduledExecutorService scheduledExecutorPool;
private boolean instrumentationEnabled;
private DagStateStore dagStateStore;
private DagStateStore failedDagStateStore;
private Map<URI, TopologySpec> topologySpecMap;
private final Integer numThreads;
private final Integer pollingInterval;
private final Integer retentionPollingInterval;
private final JobStatusRetriever jobStatusRetriever;
private final Config config;
private final Optional<EventSubmitter> eventSubmitter;
private final int defaultQuota;
private final Map<String, Integer> perUserQuota;
private final long failedDagRetentionTime;
private volatile boolean isActive = false;
public DagManager(Config config, boolean instrumentationEnabled) {
this.config = config;
this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
this.queue = initializeDagQueue(this.numThreads);
this.cancelQueue = initializeDagQueue(this.numThreads);
this.resumeQueue = initializeDagQueue(this.numThreads);
this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
this.retentionPollingInterval = ConfigUtils.getInt(config, FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL);
this.instrumentationEnabled = instrumentationEnabled;
if (instrumentationEnabled) {
MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
this.eventSubmitter = Optional.of(new EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
} else {
this.eventSubmitter = Optional.absent();
this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA);
this.perUserQuota = new HashMap<>();
for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) {
this.perUserQuota.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
try {
this.jobStatusRetriever = createJobStatusRetriever(config);
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Exception encountered during DagManager initialization", e);
TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
this.failedDagRetentionTime = timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME, DEFAULT_FAILED_DAG_RETENTION_TIME));
JobStatusRetriever createJobStatusRetriever(Config config) throws ReflectiveOperationException {
Class jobStatusRetrieverClass = Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, DEFAULT_JOB_STATUS_RETRIEVER_CLASS));
return (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, config);
KafkaJobStatusMonitor createJobStatusMonitor(Config config) throws ReflectiveOperationException {
return new KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) {
try {
Class dagStateStoreClass = Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, FSDagStateStore.class.getName()));
return (DagStateStore) GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, topologySpecMap);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
// Initializes and returns an array of Queue of size numThreads
private static LinkedBlockingDeque[] initializeDagQueue(int numThreads) {
LinkedBlockingDeque[] queue = new LinkedBlockingDeque[numThreads];
for (int i=0; i< numThreads; i++) {
queue[i] = new LinkedBlockingDeque<>();
return queue;
public DagManager(Config config) {
this(config, true);
/** Start the service. On startup, the service launches a fixed pool of {@link DagManagerThread}s, which are scheduled at
* fixed intervals. The service also loads any {@link Dag}s
protected void startUp() {
//Do nothing.
* Method to submit a {@link Dag} to the {@link DagManager}. The {@link DagManager} optionally persists the
* submitted dag to the {@link DagStateStore} and then adds the dag to a {@link BlockingQueue} to be picked up
* by one of the {@link DagManagerThread}s.
* @param dag {@link Dag} to be added
* @param persist whether to persist the dag to the {@link DagStateStore}
* @param setStatus if true, set all jobs in the dag to pending
synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, boolean setStatus) throws IOException {
if (persist) {
//Persist the dag
int queueId = DagManagerUtils.getDagQueueId(dag, this.numThreads);
// Add the dag to the specific queue determined by flowExecutionId
// Flow cancellation request has to be forwarded to the same DagManagerThread where the
// flow create request was forwarded. This is because Azkaban Exec Id is stored in the DagNode of the
// specific DagManagerThread queue
if (!this.queue[queueId].offer(dag)) {
throw new IOException("Could not add dag" + DagManagerUtils.generateDagId(dag) + "to queue");
if (setStatus) {
private void submitEventsAndSetStatus(Dag<JobExecutionPlan> dag) {
if (this.eventSubmitter.isPresent()) {
for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
* Method to submit a {@link URI} for cancellation requsts to the {@link DagManager}.
* The {@link DagManager} adds the dag to the {@link BlockingQueue} to be picked up by one of the {@link DagManagerThread}s.
synchronized public void stopDag(URI uri) throws IOException {
String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
String flowName = FlowSpec.Utils.getFlowName(uri);
List<Long> flowExecutionIds = this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);"Found {} flows to cancel.", flowExecutionIds.size());
for (long flowExecutionId : flowExecutionIds) {
killFlow(flowGroup, flowName, flowExecutionId);
* Add the specified flow to {@link DagManager#cancelQueue}
private void killFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException {
int queueId = DagManagerUtils.getDagQueueId(flowExecutionId, this.numThreads);
String dagId = DagManagerUtils.generateDagId(flowGroup, flowName, flowExecutionId);
if (!this.cancelQueue[queueId].offer(dagId)) {
throw new IOException("Could not add dag " + dagId + " to cancellation queue.");
public void handleKillFlowEvent(KillFlowEvent killFlowEvent) {"Received kill request for flow ({}, {}, {})", killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
try {
killFlow(killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
} catch (IOException e) {
log.warn("Failed to kill flow", e);
public void handleResumeFlowEvent(ResumeFlowEvent resumeFlowEvent) {"Received resume request for flow ({}, {}, {})", resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(), resumeFlowEvent.getFlowExecutionId());
String dagId = DagManagerUtils.generateDagId(resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(), resumeFlowEvent.getFlowExecutionId());
int queueId = DagManagerUtils.getDagQueueId(resumeFlowEvent.getFlowExecutionId(), this.numThreads);
if (!this.resumeQueue[queueId].offer(dagId)) {
log.warn("Could not add dag " + dagId + " to resume queue");
public synchronized void setTopologySpecMap(Map<URI, TopologySpec> topologySpecMap) {
this.topologySpecMap = topologySpecMap;
* When a {@link DagManager} becomes active, it loads the serialized representations of the currently running {@link Dag}s
* from the checkpoint directory, deserializes the {@link Dag}s and adds them to a queue to be consumed by
* the {@link DagManagerThread}s.
* @param active a boolean to indicate if the {@link DagManager} is the leader.
public synchronized void setActive(boolean active) {
if (this.isActive == active) {"DagManager already {}, skipping further actions.", (!active) ? "inactive" : "active");
this.isActive = active;
try {
if (this.isActive) {"Activating DagManager.");"Scheduling {} DagManager threads", numThreads);
//Initializing state store for persisting Dags.
this.dagStateStore = createDagStateStore(config, topologySpecMap);
this.failedDagStateStore = createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
Set<String> failedDagIds = Collections.synchronizedSet(this.failedDagStateStore.getDagIds());
//On startup, the service creates DagManagerThreads that are scheduled at a fixed rate.
this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
queue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
FailedDagRetentionThread failedDagRetentionThread = new FailedDagRetentionThread(failedDagStateStore, failedDagIds, failedDagRetentionTime);
this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0, retentionPollingInterval, TimeUnit.MINUTES);
List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();"Loading " + dags.size() + " dags from dag state store");
for (Dag<JobExecutionPlan> dag : dags) {
addDag(dag, false, false);
} else { //Mark the DagManager inactive."Inactivating the DagManager. Shutting down all DagManager threads");
try {
this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("Exception encountered when shutting down DagManager threads.", e);
} catch (IOException e) {
log.error("Exception encountered when activating the new DagManager", e);
throw new RuntimeException(e);
* Each {@link DagManagerThread} performs 2 actions when scheduled:
* <ol>
* <li> Dequeues any newly submitted {@link Dag}s from the Dag queue. All the {@link JobExecutionPlan}s which
* are part of the dequed {@link Dag} will be managed this thread. </li>
* <li> Polls the job status store for the current job statuses of all the running jobs it manages.</li>
* </ol>
public static class DagManagerThread implements Runnable {
private final Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>();
private static final Map<String, Integer> proxyUserToJobCount = new ConcurrentHashMap<>();
private static final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap<>();
private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
private Set<String> failedDagIds;
private final Map<String, Dag<JobExecutionPlan>> resumingDags = new HashMap<>();
// dagToJobs holds a map of dagId to running jobs of that dag
final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>();
final Map<String, Long> dagToSLA = new HashMap<>();
private final Set<String> failedDagIdsFinishRunning = new HashSet<>();
private final Set<String> failedDagIdsFinishAllPossible = new HashSet<>();
private final MetricContext metricContext;
private final Optional<EventSubmitter> eventSubmitter;
private final Optional<Timer> jobStatusPolledTimer;
private final int defaultQuota;
private final Map<String, Integer> perUserQuota;
private final AtomicLong orchestrationDelay = new AtomicLong(0);
private static Map<String, FlowState> flowGauges = Maps.newHashMap();
private JobStatusRetriever jobStatusRetriever;
private DagStateStore dagStateStore;
private DagStateStore failedDagStateStore;
private BlockingQueue<Dag<JobExecutionPlan>> queue;
private BlockingQueue<String> cancelQueue;
private BlockingQueue<String> resumeQueue;
* Constructor.
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, DagStateStore failedDagStateStore,
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, BlockingQueue<String> resumeQueue,
boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> perUserQuota, Set<String> failedDagIds) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
this.failedDagStateStore = failedDagStateStore;
this.failedDagIds = failedDagIds;
this.queue = queue;
this.cancelQueue = cancelQueue;
this.resumeQueue = resumeQueue;
this.defaultQuota = defaultQuota;
this.perUserQuota = perUserQuota;
if (instrumentationEnabled) {
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
this.jobStatusPolledTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
ContextAwareGauge<Long> orchestrationDelayMetric = metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
() -> orchestrationDelay.get());
} else {
this.metricContext = null;
this.eventSubmitter = Optional.absent();
this.jobStatusPolledTimer = Optional.absent();
* Main body of the {@link DagManagerThread}. Deque the next item from the queue and poll job statuses of currently
* running jobs.
* Because this thread runs in a regular interval, we should avoid doing repetitive work inside it.
public void run() {
try {
String nextDagToCancel = cancelQueue.poll();
//Poll the cancelQueue for a new Dag to cancel.
if (nextDagToCancel != null) {
while (!queue.isEmpty()) {
Dag<JobExecutionPlan> dag = queue.poll();
//Poll the queue for a new Dag to execute.
if (dag != null) {
if (dag.isEmpty()) {"Empty dag; ignoring the dag");
//Initialize dag.
while (!resumeQueue.isEmpty()) {
String dagId = resumeQueue.poll();
log.debug("Polling job statuses..");
//Poll and update the job statuses of running jobs.
log.debug("Poll done.");
//Clean up any finished dags
log.debug("Cleaning up finished dags..");
log.debug("Clean up done");
} catch (Exception e) {
log.error("Exception encountered in {}", getClass().getName(), e);
* Begin resuming a dag by setting the status of both the dag and the failed/cancelled dag nodes to {@link ExecutionStatus#PENDING_RESUME},
* and also sending events so that this status will be reflected in the job status state store.
private void beginResumingDag(String dagId) throws IOException {
if (!this.failedDagIds.contains(dagId)) {
log.warn("No dag found with dagId " + dagId + ", so cannot resume flow");
Dag<JobExecutionPlan> dag = this.failedDagStateStore.getDag(dagId);
if (dag == null) {
log.error("Dag " + dagId + " was found in memory but not found in failed dag state store");
long flowResumeTime = System.currentTimeMillis();
// Set the flow and it's failed or cancelled nodes to PENDING_RESUME so that the flow will be resumed from the point before it failed
DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
for (DagNode<JobExecutionPlan> node : dag.getNodes()) {
ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
if (executionStatus.equals(FAILED) || executionStatus.equals(CANCELLED)) {
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
// Set flowStartTime so that flow SLA will be based on current time instead of original flow
this.resumingDags.put(dagId, dag);
* Finish resuming dags by first verifying the status is correct (flow should be {@link ExecutionStatus#PENDING_RESUME}
* and jobs should not be {@link ExecutionStatus#FAILED} or {@link ExecutionStatus#CANCELLED}) and then calling
* {@link #initialize}. This is separated from {@link #beginResumingDag} because it could take some time for the
* job status state store to reflect the updated status.
private void finishResumingDags() throws IOException {
for (Map.Entry<String, Dag<JobExecutionPlan>> dag : this.resumingDags.entrySet()) {
JobStatus flowStatus = pollFlowStatus(dag.getValue());
if (flowStatus == null || !flowStatus.getEventName().equals( {
boolean dagReady = true;
for (DagNode<JobExecutionPlan> node : dag.getValue().getNodes()) {
JobStatus jobStatus = pollJobStatus(node);
if (jobStatus == null || jobStatus.getEventName().equals( || jobStatus.getEventName().equals( {
dagReady = false;
if (dagReady) {
* Cancels the dag and sends a cancellation tracking event.
* @param dagToCancel dag node to cancel
* @throws ExecutionException executionException
* @throws InterruptedException interruptedException
private void cancelDag(String dagToCancel) throws ExecutionException, InterruptedException {"Cancel flow with DagId {}", dagToCancel);
if (this.dagToJobs.containsKey(dagToCancel)) {
List<DagNode<JobExecutionPlan>> dagNodesToCancel = this.dagToJobs.get(dagToCancel);"Found {} DagNodes to cancel.", dagNodesToCancel.size());
for (DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
this.dags.get(dagToCancel).setMessage("Flow killed by request");
} else {
log.warn("Did not find Dag with id {}, it might be already cancelled/finished.", dagToCancel);
private void cancelDagNode(DagNode<JobExecutionPlan> dagNodeToCancel) throws ExecutionException, InterruptedException {
Properties props = new Properties();
if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
Future future = dagNodeToCancel.getValue().getJobFuture().get();
String serializedFuture = DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, serializedFuture);
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), props);
private void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
if (this.eventSubmitter.isPresent()) {
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
* This method determines the next set of jobs to execute from the dag and submits them for execution.
* This method updates internal data structures tracking currently running Dags and jobs.
private void initialize(Dag<JobExecutionPlan> dag)
throws IOException {
//Add Dag to the map of running dags
String dagId = DagManagerUtils.generateDagId(dag);"Initializing Dag {}", DagManagerUtils.getFullyQualifiedDagName(dag));
if (this.dags.containsKey(dagId)) {
log.warn("Already tracking a dag with dagId {}, skipping.", dagId);
this.dags.put(dagId, dag);
log.debug("Dag {} - determining if any jobs are already running.", DagManagerUtils.getFullyQualifiedDagName(dag));
//A flag to indicate if the flow is already running.
boolean isDagRunning = false;
//Are there any jobs already in the running state? This check is for Dags already running
//before a leadership change occurs.
for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
addJobState(dagId, dagNode);
//Update the running jobs counter.
isDagRunning = true;
FlowId flowId = DagManagerUtils.getFlowId(dag);
if (!flowGauges.containsKey(flowId.toString())) {
String flowStateGaugeName =, flowId.getFlowGroup(),
flowId.getFlowName(), ServiceMetricNames.RUNNING_STATUS);
flowGauges.put(flowId.toString(), FlowState.RUNNING);
ContextAwareGauge<Integer> gauge = RootMetricContext
.get().newContextAwareGauge(flowStateGaugeName, () -> flowGauges.get(flowId.toString()).value);
RootMetricContext.get().register(flowStateGaugeName, gauge);
log.debug("Dag {} submitting jobs ready for execution.", DagManagerUtils.getFullyQualifiedDagName(dag));
//Determine the next set of jobs to run and submit them for execution
Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = submitNext(dagId);
for (DagNode dagNode: nextSubmitted.get(dagId)) {
addJobState(dagId, dagNode);
// Set flow status to running
DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, TimingEvent.FlowTimings.FLOW_RUNNING);
flowGauges.put(flowId.toString(), FlowState.RUNNING);
// Report the orchestration delay the first time the Dag is initialized. Orchestration delay is defined as
// the time difference between the instant when a flow first transitions to the running state and the instant
// when the flow is submitted to Gobblin service.
if (!isDagRunning) {
this.orchestrationDelay.set(System.currentTimeMillis() - DagManagerUtils.getFlowExecId(dag));
}"Dag {} Initialization complete.", DagManagerUtils.getFullyQualifiedDagName(dag));
* Proceed the execution of each dag node based on job status.
private void pollAndAdvanceDag() throws IOException, ExecutionException, InterruptedException {
Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = Maps.newHashMap();
List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
boolean slaKilled = slaKillIfNeeded(node);
JobStatus jobStatus = pollJobStatus(node);
boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
ExecutionStatus status = getJobExecutionStatus(slaKilled, killOrphanFlow, jobStatus);
JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(node);
switch (status) {
case FAILED:
if (jobStatus != null && jobStatus.isShouldRetry()) {"Retrying job: {}, current attempts: {}, max attempts: {}", DagManagerUtils.getFullyQualifiedJobName(node),
jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
for (Map.Entry<String, Set<DagNode<JobExecutionPlan>>> entry: nextSubmitted.entrySet()) {
String dagId = entry.getKey();
Set<DagNode<JobExecutionPlan>> dagNodes = entry.getValue();
for (DagNode<JobExecutionPlan> dagNode: dagNodes) {
addJobState(dagId, dagNode);
for (DagNode<JobExecutionPlan> dagNode: nodesToCleanUp) {
String dagId = DagManagerUtils.generateDagId(dagNode);
deleteJobState(dagId, dagNode);
* Cancel the job if the job has been "orphaned". A job is orphaned if has been in ORCHESTRATED
* {@link ExecutionStatus} for some specific amount of time.
* @param node {@link DagNode} representing the job
* @param jobStatus current {@link JobStatus} of the job
* @return true if the total time that the job remains in the ORCHESTRATED state exceeds
* {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node, JobStatus jobStatus)
throws ExecutionException, InterruptedException {
if (jobStatus == null) {
return false;
ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
long timeOutForJobStart = DagManagerUtils.getJobStartSla(node);
long jobOrchestratedTime = jobStatus.getOrchestratedTime();
if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - jobOrchestratedTime > timeOutForJobStart) {"Job {} of flow {} exceeded the job start SLA of {} ms. Killing the job now...",
String dagId = DagManagerUtils.generateDagId(node);
this.dags.get(dagId).setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration");
return true;
} else {
return false;
private ExecutionStatus getJobExecutionStatus(boolean slaKilled, boolean killOrphanFlow, JobStatus jobStatus) {
if (slaKilled || killOrphanFlow) {
} else {
if (jobStatus == null) {
return PENDING;
} else {
return valueOf(jobStatus.getEventName());
* Check if the SLA is configured for the flow this job belongs to.
* If it is, this method will try to cancel the job when SLA is reached.
* @param node dag node of the job
* @return true if the job is killed because it reached sla
* @throws ExecutionException exception
* @throws InterruptedException exception
private boolean slaKillIfNeeded(DagNode<JobExecutionPlan> node) throws ExecutionException, InterruptedException {
long flowStartTime = DagManagerUtils.getFlowStartTime(node);
long currentTime = System.currentTimeMillis();
String dagId = DagManagerUtils.generateDagId(node);
long flowSla;
if (dagToSLA.containsKey(dagId)) {
flowSla = dagToSLA.get(dagId);
} else {
flowSla = DagManagerUtils.getFlowSLA(node);
dagToSLA.put(dagId, flowSla);
if (currentTime > flowStartTime + flowSla) {"Flow {} exceeded the SLA of {} ms. Killing the job {} now...",
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), flowSla,
this.dags.get(dagId).setMessage("Flow killed due to exceeding SLA of " + flowSla + " ms");
return true;
return false;
* Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
private JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
long flowExecutionId = jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName);
* Retrieve the flow's {@link JobStatus} (i.e. job status with {@link JobStatusRetriever#NA_KEY} as job name/group) from a dag
private JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag) {
if (dag == null || dag.isEmpty()) {
return null;
Config jobConfig = dag.getNodes().get(0).getValue().getJobSpec().getConfig();
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
long flowExecutionId = jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
return pollStatus(flowGroup, flowName, flowExecutionId, JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY);
private JobStatus pollStatus(String flowGroup, String flowName, long flowExecutionId, String jobGroup, String jobName) {
long pollStartTime = System.nanoTime();
Iterator<JobStatus> jobStatusIterator =
this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId, jobName, jobGroup);
Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() - pollStartTime, TimeUnit.NANOSECONDS);
if (jobStatusIterator.hasNext()) {
} else {
return null;
* Submit next set of Dag nodes in the Dag identified by the provided dagId
* @param dagId The dagId that should be processed.
* @return
* @throws IOException
synchronized Map<String, Set<DagNode<JobExecutionPlan>>> submitNext(String dagId) throws IOException {
Dag<JobExecutionPlan> dag = this.dags.get(dagId);
Set<DagNode<JobExecutionPlan>> nextNodes = DagManagerUtils.getNext(dag);
//Submit jobs from the dag ready for execution.
for (DagNode<JobExecutionPlan> dagNode : nextNodes) {
//Checkpoint the dag state
Map<String, Set<DagNode<JobExecutionPlan>>> dagIdToNextJobs = Maps.newHashMap();
dagIdToNextJobs.put(dagId, nextNodes);
return dagIdToNextJobs;
* Submits a {@link JobSpec} to a {@link org.apache.gobblin.runtime.api.SpecExecutor}.
private void submitJob(DagNode<JobExecutionPlan> dagNode) {
JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode);
JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
// Run this spec on selected executor
SpecProducer producer = null;
try {
producer = DagManagerUtils.getSpecProducer(dagNode);
TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
// Submit the job to the SpecProducer, which in turn performs the actual job submission to the SpecExecutor instance.
// The SpecProducer implementations submit the job to the underlying executor and return when the submission is complete,
// either successfully or unsuccessfully. To catch any exceptions in the job submission, the DagManagerThread
// blocks (by calling Future#get()) until the submission is completed.
Future addSpecFuture = producer.addSpec(jobSpec);
//Persist the dag
if (this.metricContext != null) {
getRunningJobsCounterForUser(dagNode).forEach(counter ->;
jobMetadata.put(TimingEvent.METADATA_MESSAGE, producer.getExecutionLink(addSpecFuture, specExecutorUri));
if (jobOrchestrationTimer != null) {
}"Orchestrated job: {} on Executor: {}", DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
} catch (Exception e) {
TimingEvent jobFailedTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED) : null;
String message = "Cannot submit job " + DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
log.error(message, e);
jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage());
if (jobFailedTimer != null) {
private void checkQuota(DagNode<JobExecutionPlan> dagNode) throws IOException {
String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
boolean proxyUserCheck = true;
if (proxyUser != null) {
proxyUserCheck = incrementMapAndCheckQuota(proxyUserToJobCount, proxyUser, dagNode);
String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
boolean requesterCheck = true;
String requesterMessage = null;
if (serializedRequesters != null) {
for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
requesterCheck &= incrementMapAndCheckQuota(requesterToJobCount, requester.getName(), dagNode);
if (!requesterCheck && requesterMessage == null) {
requesterMessage = "Quota exceeded for requester " + requester.getName() + " on executor " + specExecutorUri + ": quota="
+ getQuotaForUser(requester.getName()) + ", runningJobs=" + requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode));
// Throw errors for reach quota at the end to avoid inconsistent job counts
if (!proxyUserCheck) {
throw new IOException("Quota exceeded for proxy user " + proxyUser + " on executor " + specExecutorUri +
": quota=" + getQuotaForUser(proxyUser) + ", runningJobs=" + proxyUserToJobCount.get(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode)));
if (!requesterCheck) {
throw new IOException(requesterMessage);
* Increment quota by one for the given map and key.
* @return true if quota is not reached for this user or user is whitelisted, false otherwise.
private boolean incrementMapAndCheckQuota(Map<String, Integer> quotaMap, String user, DagNode<JobExecutionPlan> dagNode) {
String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
int jobCount = quotaMap.getOrDefault(key, 0);
// Only increment job count for first attempt, since job is considered running between retries
if (dagNode.getValue().getCurrentAttempts() == 1) {
quotaMap.put(key, jobCount);
return jobCount <= getQuotaForUser(user);
private int getQuotaForUser(String user) {
return perUserQuota.getOrDefault(user, defaultQuota);
* Method that defines the actions to be performed when a job finishes either successfully or with failure.
* This method updates the state of the dag and performs clean up actions as necessary.
* TODO : Dag should have a status field, like JobExecutionPlan has. This method should update that field,
* which should be used by cleanup(). It may also remove the need of failedDagIdsFinishRunning,
* failedDagIdsFinishAllPossible.
private Map<String, Set<DagNode<JobExecutionPlan>>> onJobFinish(DagNode<JobExecutionPlan> dagNode)
throws IOException {
Dag<JobExecutionPlan> dag = this.jobToDag.get(dagNode);
String dagId = DagManagerUtils.generateDagId(dag);
String jobName = DagManagerUtils.getFullyQualifiedJobName(dagNode);
ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);"Job {} of Dag {} has finished with status {}", jobName, dagId,;
if (this.metricContext != null) {
getRunningJobsCounterForUser(dagNode).forEach(counter -> counter.dec());
switch (jobStatus) {
// TODO : For now treat canceled as failed, till we introduce failure option - CANCEL
case FAILED:
dag.setMessage("Flow failed because job " + jobName + " failed");
if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
} else {
return Maps.newHashMap();
if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
} else {
return Maps.newHashMap();
return submitNext(dagId);
log.warn("It should not reach here. Job status is unexpected.");
return Maps.newHashMap();
* Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link DagNode}.
private void releaseQuota(DagNode<JobExecutionPlan> dagNode) {
String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
if (proxyUser != null) {
String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
if (proxyUserToJobCount.containsKey(proxyUserKey) && proxyUserToJobCount.get(proxyUserKey) > 0) {
proxyUserToJobCount.put(proxyUserKey, proxyUserToJobCount.get(proxyUserKey) - 1);
String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
if (serializedRequesters != null) {
try {
for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
String requesterKey = DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
if (requesterToJobCount.containsKey(requesterKey) && requesterToJobCount.get(requesterKey) > 0) {
requesterToJobCount.put(requesterKey, requesterToJobCount.get(requesterKey) - 1);
} catch (IOException e) {
log.error("Failed to release quota for requester list " + serializedRequesters, e);
private void deleteJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
private void addJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
Dag<JobExecutionPlan> dag = this.dags.get(dagId);
this.jobToDag.put(dagNode, dag);
if (this.dagToJobs.containsKey(dagId)) {
} else {
LinkedList<DagNode<JobExecutionPlan>> dagNodeList = Lists.newLinkedList();
this.dagToJobs.put(dagId, dagNodeList);
private boolean hasRunningJobs(String dagId) {
return !this.dagToJobs.get(dagId).isEmpty();
private ContextAwareCounter getRunningJobsCounter(DagNode<JobExecutionPlan> dagNode) {
return metricContext.contextAwareCounter(
private List<ContextAwareCounter> getRunningJobsCounterForUser(DagNode<JobExecutionPlan> dagNode) {
Config configs = dagNode.getValue().getJobSpec().getConfig();
String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null);
List<ContextAwareCounter> counters = new ArrayList<>();
if (StringUtils.isNotEmpty(proxy)) {
ServiceMetricNames.SERVICE_USERS, proxy)));
try {
String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
if (StringUtils.isNotEmpty(serializedRequesters)) {
List<ServiceRequester> requesters = RequesterService.deserialize(serializedRequesters);
for (ServiceRequester requester : requesters) {
.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SERVICE_USERS, requester.getName())));
} catch (IOException e) {
log.error("Error while fetching requester list.", e);
return counters;
* Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state.
private void cleanUp() throws IOException {
List<String> dagIdstoClean = new ArrayList<>();
//Clean up failed dags
for (String dagId : this.failedDagIdsFinishRunning) {
//Skip monitoring of any other jobs of the failed dag.
LinkedList<DagNode<JobExecutionPlan>> dagNodeList = this.dagToJobs.get(dagId);
while (!dagNodeList.isEmpty()) {
DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
deleteJobState(dagId, dagNode);
}"Dag {} has finished with status FAILED; Cleaning up dag from the state store.", dagId);
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.FAILED);
// send an event before cleaning up dag
DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), TimingEvent.FlowTimings.FLOW_FAILED);
//Clean up completed dags
for (String dagId : this.dags.keySet()) {
if (!hasRunningJobs(dagId) && !this.failedDagIdsFinishRunning.contains(dagId)) {
String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
status = TimingEvent.FlowTimings.FLOW_FAILED;
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.FAILED);
} else {
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.SUCCESSFUL);
}"Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
// send an event before cleaning up dag
DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), status);
for (String dagId: dagIdstoClean) {
* Add a dag to failed dag state store
private synchronized void addFailedDag(String dagId) {
try {"Adding dag " + dagId + " to failed dag state store");
} catch (IOException e) {
log.error("Failed to add dag " + dagId + " to failed dag state store", e);
* Note that removal of a {@link Dag} entry in {@link #dags} needs to be happen after {@link #cleanUp()}
* since the real {@link Dag} object is required for {@link #cleanUp()},
* and cleaning of all relevant states need to be atomic
* @param dagId
private synchronized void cleanUpDag(String dagId) {
// clears flow event after cancelled job to allow resume event status to be set
try {
} catch (IOException ioe) {
log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe);
private enum FlowState {
public int value;
FlowState(int value) {
this.value = value;
* Thread that runs retention on failed dags based on their original start time (which is the flow execution ID).
public static class FailedDagRetentionThread implements Runnable {
private final DagStateStore failedDagStateStore;
private final Set<String> failedDagIds;
private final long failedDagRetentionTime;
FailedDagRetentionThread(DagStateStore failedDagStateStore, Set<String> failedDagIds, long failedDagRetentionTime) {
this.failedDagStateStore = failedDagStateStore;
this.failedDagIds = failedDagIds;
this.failedDagRetentionTime = failedDagRetentionTime;
public void run() {
try {"Cleaning failed dag state store");
long startTime = System.currentTimeMillis();
List<String> dagIdsToClean = new ArrayList<>();
for (String dagId : this.failedDagIds) {
if (this.failedDagRetentionTime > 0L && startTime > DagManagerUtils.getFlowExecId(dagId) + this.failedDagRetentionTime) {
for (String dagId : dagIdsToClean) {
}"Cleaned " + dagIdsToClean.size() + " dags from the failed dag state store");
} catch (Exception e) {
log.error("Failed to run retention on failed dag state store", e);
/** Stop the service. */
protected void shutDown()
throws Exception {
this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);