blob: 727c438c5f3bf1c7a03367afe487488f41856b42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
import org.apache.gobblin.service.monitoring.KillFlowEvent;
import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import static org.apache.gobblin.service.ExecutionStatus.*;
/**
* This class implements a manager to manage the life cycle of a {@link Dag}. A {@link Dag} is submitted to the
* {@link DagManager} by the {@link Orchestrator#orchestrate(Spec)} method. On receiving a {@link Dag}, the
* {@link DagManager} first persists the {@link Dag} to the {@link DagStateStore}, and then submits it to the specific
* {@link DagManagerThread}'s {@link BlockingQueue} based on the flowExecutionId of the Flow.
* This guarantees that each {@link Dag} received by the {@link DagManager} can be recovered in case of a leadership
* change or service restart.
*
* The implementation of the {@link DagManager} is multi-threaded. Each {@link DagManagerThread} polls the
* {@link BlockingQueue} for new Dag submissions at fixed intervals. It deques any newly submitted Dags and coordinates
* the execution of individual jobs in the Dag. The coordination logic involves polling the {@link JobStatus}es of running
* jobs. Upon completion of a job, it will either schedule the next job in the Dag (on SUCCESS) or mark the Dag as failed
* (on FAILURE). Upon completion of a Dag execution, it will perform the required clean up actions.
*
* For deleteSpec/cancellation requests for a flow URI, {@link DagManager} finds out the flowExecutionId using
* {@link JobStatusRetriever}, and forwards the request to the {@link DagManagerThread} which handled the addSpec request
* for this flow. We need separate {@link BlockingQueue}s for each {@link DagManagerThread} because
* cancellation needs the information which is stored only in the same {@link DagManagerThread}.
*
* The {@link DagManager} is active only in the leader mode. To ensure, each {@link Dag} managed by a {@link DagManager} is
* checkpointed to a persistent location. On start up or leadership change,
* the {@link DagManager} loads all the checkpointed {@link Dag}s and adds them to the {@link BlockingQueue}.
* Current implementation supports only FileSystem-based checkpointing of the Dag statuses.
*/
@Alpha
@Slf4j
public class DagManager extends AbstractIdleService {
public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name();
public static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager.";
private static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + "jobStatusRetriever";
private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
public static final Integer DEFAULT_NUM_THREADS = 3;
private static final Integer TERMINATION_TIMEOUT = 30;
public static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + "numThreads";
public static final String JOB_STATUS_POLLING_INTERVAL_KEY = DAG_MANAGER_PREFIX + "pollingInterval";
private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = JOB_STATUS_RETRIEVER_KEY + ".class";
private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS = FsJobStatusRetriever.class.getName();
private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + "dagStateStoreClass";
private static final String FAILED_DAG_STATESTORE_PREFIX = "failedDagStateStore";
private static final String FAILED_DAG_RETENTION_TIME_UNIT = FAILED_DAG_STATESTORE_PREFIX + ".retention.timeUnit";
private static final String DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT = "DAYS";
private static final String FAILED_DAG_RETENTION_TIME = FAILED_DAG_STATESTORE_PREFIX + ".retention.time";
private static final long DEFAULT_FAILED_DAG_RETENTION_TIME = 7L;
public static final String FAILED_DAG_POLLING_INTERVAL = FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
private static final String USER_JOB_QUOTA_KEY = DAG_MANAGER_PREFIX + "defaultJobQuota";
private static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
private static final String PER_USER_QUOTA = DAG_MANAGER_PREFIX + "perUserQuota";
private static final String QUOTA_SEPERATOR = ":";
/**
* Action to be performed on a {@link Dag}, in case of a job failure. Currently, we allow 2 modes:
* <ul>
* <li> FINISH_RUNNING, which allows currently running jobs to finish.</li>
* <li> FINISH_ALL_POSSIBLE, which allows every possible job in the Dag to finish, as long as all the dependencies
* of the job are successful.</li>
* </ul>
*/
public enum FailureOption {
FINISH_RUNNING("FINISH_RUNNING"),
CANCEL("CANCEL"),
FINISH_ALL_POSSIBLE("FINISH_ALL_POSSIBLE");
private final String failureOption;
FailureOption(final String failureOption) {
this.failureOption = failureOption;
}
@Override
public String toString() {
return this.failureOption;
}
}
private BlockingQueue<Dag<JobExecutionPlan>>[] queue;
private BlockingQueue<String>[] cancelQueue;
private BlockingQueue<String>[] resumeQueue;
DagManagerThread[] dagManagerThreads;
private ScheduledExecutorService scheduledExecutorPool;
private boolean instrumentationEnabled;
private DagStateStore dagStateStore;
private DagStateStore failedDagStateStore;
private Map<URI, TopologySpec> topologySpecMap;
@Getter
private final Integer numThreads;
private final Integer pollingInterval;
private final Integer retentionPollingInterval;
@Getter
private final JobStatusRetriever jobStatusRetriever;
private final Config config;
private final Optional<EventSubmitter> eventSubmitter;
private final int defaultQuota;
private final Map<String, Integer> perUserQuota;
private final long failedDagRetentionTime;
private volatile boolean isActive = false;
public DagManager(Config config, boolean instrumentationEnabled) {
this.config = config;
this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
this.queue = initializeDagQueue(this.numThreads);
this.cancelQueue = initializeDagQueue(this.numThreads);
this.resumeQueue = initializeDagQueue(this.numThreads);
this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
this.pollingInterval = ConfigUtils.getInt(config, JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
this.retentionPollingInterval = ConfigUtils.getInt(config, FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL);
this.instrumentationEnabled = instrumentationEnabled;
if (instrumentationEnabled) {
MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
this.eventSubmitter = Optional.of(new EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
} else {
this.eventSubmitter = Optional.absent();
}
this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA);
this.perUserQuota = new HashMap<>();
for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) {
this.perUserQuota.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
}
try {
this.jobStatusRetriever = createJobStatusRetriever(config);
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Exception encountered during DagManager initialization", e);
}
TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
this.failedDagRetentionTime = timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME, DEFAULT_FAILED_DAG_RETENTION_TIME));
}
JobStatusRetriever createJobStatusRetriever(Config config) throws ReflectiveOperationException {
Class jobStatusRetrieverClass = Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, DEFAULT_JOB_STATUS_RETRIEVER_CLASS));
return (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, config);
}
KafkaJobStatusMonitor createJobStatusMonitor(Config config) throws ReflectiveOperationException {
return new KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
}
DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) {
try {
Class dagStateStoreClass = Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, FSDagStateStore.class.getName()));
return (DagStateStore) GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, topologySpecMap);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}
// Initializes and returns an array of Queue of size numThreads
private static LinkedBlockingDeque[] initializeDagQueue(int numThreads) {
LinkedBlockingDeque[] queue = new LinkedBlockingDeque[numThreads];
for (int i=0; i< numThreads; i++) {
queue[i] = new LinkedBlockingDeque<>();
}
return queue;
}
public DagManager(Config config) {
this(config, true);
}
/** Start the service. On startup, the service launches a fixed pool of {@link DagManagerThread}s, which are scheduled at
* fixed intervals. The service also loads any {@link Dag}s
*/
@Override
protected void startUp() {
//Do nothing.
}
/**
* Method to submit a {@link Dag} to the {@link DagManager}. The {@link DagManager} optionally persists the
* submitted dag to the {@link DagStateStore} and then adds the dag to a {@link BlockingQueue} to be picked up
* by one of the {@link DagManagerThread}s.
* @param dag {@link Dag} to be added
* @param persist whether to persist the dag to the {@link DagStateStore}
* @param setStatus if true, set all jobs in the dag to pending
*/
synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, boolean setStatus) throws IOException {
if (persist) {
//Persist the dag
this.dagStateStore.writeCheckpoint(dag);
}
int queueId = DagManagerUtils.getDagQueueId(dag, this.numThreads);
// Add the dag to the specific queue determined by flowExecutionId
// Flow cancellation request has to be forwarded to the same DagManagerThread where the
// flow create request was forwarded. This is because Azkaban Exec Id is stored in the DagNode of the
// specific DagManagerThread queue
if (!this.queue[queueId].offer(dag)) {
throw new IOException("Could not add dag" + DagManagerUtils.generateDagId(dag) + "to queue");
}
if (setStatus) {
submitEventsAndSetStatus(dag);
}
}
private void submitEventsAndSetStatus(Dag<JobExecutionPlan> dag) {
if (this.eventSubmitter.isPresent()) {
for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
jobExecutionPlan.setExecutionStatus(PENDING);
}
}
}
/**
* Method to submit a {@link URI} for cancellation requsts to the {@link DagManager}.
* The {@link DagManager} adds the dag to the {@link BlockingQueue} to be picked up by one of the {@link DagManagerThread}s.
*/
synchronized public void stopDag(URI uri) throws IOException {
String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
String flowName = FlowSpec.Utils.getFlowName(uri);
List<Long> flowExecutionIds = this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
log.info("Found {} flows to cancel.", flowExecutionIds.size());
for (long flowExecutionId : flowExecutionIds) {
killFlow(flowGroup, flowName, flowExecutionId);
}
}
/**
* Add the specified flow to {@link DagManager#cancelQueue}
*/
private void killFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException {
int queueId = DagManagerUtils.getDagQueueId(flowExecutionId, this.numThreads);
String dagId = DagManagerUtils.generateDagId(flowGroup, flowName, flowExecutionId);
if (!this.cancelQueue[queueId].offer(dagId)) {
throw new IOException("Could not add dag " + dagId + " to cancellation queue.");
}
}
@Subscribe
public void handleKillFlowEvent(KillFlowEvent killFlowEvent) {
log.info("Received kill request for flow ({}, {}, {})", killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
try {
killFlow(killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
} catch (IOException e) {
log.warn("Failed to kill flow", e);
}
}
@Subscribe
public void handleResumeFlowEvent(ResumeFlowEvent resumeFlowEvent) {
log.info("Received resume request for flow ({}, {}, {})", resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(), resumeFlowEvent.getFlowExecutionId());
String dagId = DagManagerUtils.generateDagId(resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(), resumeFlowEvent.getFlowExecutionId());
int queueId = DagManagerUtils.getDagQueueId(resumeFlowEvent.getFlowExecutionId(), this.numThreads);
if (!this.resumeQueue[queueId].offer(dagId)) {
log.warn("Could not add dag " + dagId + " to resume queue");
}
}
public synchronized void setTopologySpecMap(Map<URI, TopologySpec> topologySpecMap) {
this.topologySpecMap = topologySpecMap;
}
/**
* When a {@link DagManager} becomes active, it loads the serialized representations of the currently running {@link Dag}s
* from the checkpoint directory, deserializes the {@link Dag}s and adds them to a queue to be consumed by
* the {@link DagManagerThread}s.
* @param active a boolean to indicate if the {@link DagManager} is the leader.
*/
public synchronized void setActive(boolean active) {
if (this.isActive == active) {
log.info("DagManager already {}, skipping further actions.", (!active) ? "inactive" : "active");
return;
}
this.isActive = active;
try {
if (this.isActive) {
log.info("Activating DagManager.");
log.info("Scheduling {} DagManager threads", numThreads);
//Initializing state store for persisting Dags.
this.dagStateStore = createDagStateStore(config, topologySpecMap);
this.failedDagStateStore = createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
Set<String> failedDagIds = Collections.synchronizedSet(this.failedDagStateStore.getDagIds());
//On startup, the service creates DagManagerThreads that are scheduled at a fixed rate.
this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
queue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
}
FailedDagRetentionThread failedDagRetentionThread = new FailedDagRetentionThread(failedDagStateStore, failedDagIds, failedDagRetentionTime);
this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0, retentionPollingInterval, TimeUnit.MINUTES);
List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
log.info("Loading " + dags.size() + " dags from dag state store");
for (Dag<JobExecutionPlan> dag : dags) {
addDag(dag, false, false);
}
} else { //Mark the DagManager inactive.
log.info("Inactivating the DagManager. Shutting down all DagManager threads");
this.scheduledExecutorPool.shutdown();
try {
this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("Exception encountered when shutting down DagManager threads.", e);
}
}
} catch (IOException e) {
log.error("Exception encountered when activating the new DagManager", e);
throw new RuntimeException(e);
}
}
/**
* Each {@link DagManagerThread} performs 2 actions when scheduled:
* <ol>
* <li> Dequeues any newly submitted {@link Dag}s from the Dag queue. All the {@link JobExecutionPlan}s which
* are part of the dequed {@link Dag} will be managed this thread. </li>
* <li> Polls the job status store for the current job statuses of all the running jobs it manages.</li>
* </ol>
*/
public static class DagManagerThread implements Runnable {
private final Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>();
private static final Map<String, Integer> proxyUserToJobCount = new ConcurrentHashMap<>();
private static final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap<>();
private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
private Set<String> failedDagIds;
private final Map<String, Dag<JobExecutionPlan>> resumingDags = new HashMap<>();
// dagToJobs holds a map of dagId to running jobs of that dag
final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>();
final Map<String, Long> dagToSLA = new HashMap<>();
private final Set<String> failedDagIdsFinishRunning = new HashSet<>();
private final Set<String> failedDagIdsFinishAllPossible = new HashSet<>();
private final MetricContext metricContext;
private final Optional<EventSubmitter> eventSubmitter;
private final Optional<Timer> jobStatusPolledTimer;
private final int defaultQuota;
private final Map<String, Integer> perUserQuota;
private final AtomicLong orchestrationDelay = new AtomicLong(0);
private static Map<String, FlowState> flowGauges = Maps.newHashMap();
private JobStatusRetriever jobStatusRetriever;
private DagStateStore dagStateStore;
private DagStateStore failedDagStateStore;
private BlockingQueue<Dag<JobExecutionPlan>> queue;
private BlockingQueue<String> cancelQueue;
private BlockingQueue<String> resumeQueue;
/**
* Constructor.
*/
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, DagStateStore failedDagStateStore,
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, BlockingQueue<String> resumeQueue,
boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> perUserQuota, Set<String> failedDagIds) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
this.failedDagStateStore = failedDagStateStore;
this.failedDagIds = failedDagIds;
this.queue = queue;
this.cancelQueue = cancelQueue;
this.resumeQueue = resumeQueue;
this.defaultQuota = defaultQuota;
this.perUserQuota = perUserQuota;
if (instrumentationEnabled) {
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
this.jobStatusPolledTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
ContextAwareGauge<Long> orchestrationDelayMetric = metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
() -> orchestrationDelay.get());
this.metricContext.register(orchestrationDelayMetric);
} else {
this.metricContext = null;
this.eventSubmitter = Optional.absent();
this.jobStatusPolledTimer = Optional.absent();
}
}
/**
* Main body of the {@link DagManagerThread}. Deque the next item from the queue and poll job statuses of currently
* running jobs.
* Because this thread runs in a regular interval, we should avoid doing repetitive work inside it.
*/
@Override
public void run() {
try {
String nextDagToCancel = cancelQueue.poll();
//Poll the cancelQueue for a new Dag to cancel.
if (nextDagToCancel != null) {
cancelDag(nextDagToCancel);
}
while (!queue.isEmpty()) {
Dag<JobExecutionPlan> dag = queue.poll();
//Poll the queue for a new Dag to execute.
if (dag != null) {
if (dag.isEmpty()) {
log.info("Empty dag; ignoring the dag");
}
//Initialize dag.
initialize(dag);
}
}
while (!resumeQueue.isEmpty()) {
String dagId = resumeQueue.poll();
beginResumingDag(dagId);
}
finishResumingDags();
log.debug("Polling job statuses..");
//Poll and update the job statuses of running jobs.
pollAndAdvanceDag();
log.debug("Poll done.");
//Clean up any finished dags
log.debug("Cleaning up finished dags..");
cleanUp();
log.debug("Clean up done");
} catch (Exception e) {
log.error("Exception encountered in {}", getClass().getName(), e);
}
}
/**
* Begin resuming a dag by setting the status of both the dag and the failed/cancelled dag nodes to {@link ExecutionStatus#PENDING_RESUME},
* and also sending events so that this status will be reflected in the job status state store.
*/
private void beginResumingDag(String dagId) throws IOException {
if (!this.failedDagIds.contains(dagId)) {
log.warn("No dag found with dagId " + dagId + ", so cannot resume flow");
return;
}
Dag<JobExecutionPlan> dag = this.failedDagStateStore.getDag(dagId);
if (dag == null) {
log.error("Dag " + dagId + " was found in memory but not found in failed dag state store");
return;
}
long flowResumeTime = System.currentTimeMillis();
// Set the flow and it's failed or cancelled nodes to PENDING_RESUME so that the flow will be resumed from the point before it failed
DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
for (DagNode<JobExecutionPlan> node : dag.getNodes()) {
ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
if (executionStatus.equals(FAILED) || executionStatus.equals(CANCELLED)) {
node.getValue().setExecutionStatus(PENDING_RESUME);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
}
// Set flowStartTime so that flow SLA will be based on current time instead of original flow
node.getValue().setFlowStartTime(flowResumeTime);
}
this.resumingDags.put(dagId, dag);
}
/**
* Finish resuming dags by first verifying the status is correct (flow should be {@link ExecutionStatus#PENDING_RESUME}
* and jobs should not be {@link ExecutionStatus#FAILED} or {@link ExecutionStatus#CANCELLED}) and then calling
* {@link #initialize}. This is separated from {@link #beginResumingDag} because it could take some time for the
* job status state store to reflect the updated status.
*/
private void finishResumingDags() throws IOException {
for (Map.Entry<String, Dag<JobExecutionPlan>> dag : this.resumingDags.entrySet()) {
JobStatus flowStatus = pollFlowStatus(dag.getValue());
if (flowStatus == null || !flowStatus.getEventName().equals(PENDING_RESUME.name())) {
continue;
}
boolean dagReady = true;
for (DagNode<JobExecutionPlan> node : dag.getValue().getNodes()) {
JobStatus jobStatus = pollJobStatus(node);
if (jobStatus == null || jobStatus.getEventName().equals(FAILED.name()) || jobStatus.getEventName().equals(CANCELLED.name())) {
dagReady = false;
break;
}
}
if (dagReady) {
this.dagStateStore.writeCheckpoint(dag.getValue());
this.failedDagStateStore.cleanUp(dag.getValue());
this.failedDagIds.remove(dag.getKey());
this.resumingDags.remove(dag.getKey());
initialize(dag.getValue());
}
}
}
/**
* Cancels the dag and sends a cancellation tracking event.
* @param dagToCancel dag node to cancel
* @throws ExecutionException executionException
* @throws InterruptedException interruptedException
*/
private void cancelDag(String dagToCancel) throws ExecutionException, InterruptedException {
log.info("Cancel flow with DagId {}", dagToCancel);
if (this.dagToJobs.containsKey(dagToCancel)) {
List<DagNode<JobExecutionPlan>> dagNodesToCancel = this.dagToJobs.get(dagToCancel);
log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size());
for (DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
cancelDagNode(dagNodeToCancel);
}
this.dags.get(dagToCancel).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
this.dags.get(dagToCancel).setMessage("Flow killed by request");
} else {
log.warn("Did not find Dag with id {}, it might be already cancelled/finished.", dagToCancel);
}
}
private void cancelDagNode(DagNode<JobExecutionPlan> dagNodeToCancel) throws ExecutionException, InterruptedException {
Properties props = new Properties();
if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
Future future = dagNodeToCancel.getValue().getJobFuture().get();
String serializedFuture = DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, serializedFuture);
sendCancellationEvent(dagNodeToCancel.getValue());
}
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), props);
}
private void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
if (this.eventSubmitter.isPresent()) {
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
jobExecutionPlan.setExecutionStatus(CANCELLED);
}
}
/**
* This method determines the next set of jobs to execute from the dag and submits them for execution.
* This method updates internal data structures tracking currently running Dags and jobs.
*/
private void initialize(Dag<JobExecutionPlan> dag)
throws IOException {
//Add Dag to the map of running dags
String dagId = DagManagerUtils.generateDagId(dag);
log.info("Initializing Dag {}", DagManagerUtils.getFullyQualifiedDagName(dag));
if (this.dags.containsKey(dagId)) {
log.warn("Already tracking a dag with dagId {}, skipping.", dagId);
return;
}
this.dags.put(dagId, dag);
log.debug("Dag {} - determining if any jobs are already running.", DagManagerUtils.getFullyQualifiedDagName(dag));
//A flag to indicate if the flow is already running.
boolean isDagRunning = false;
//Are there any jobs already in the running state? This check is for Dags already running
//before a leadership change occurs.
for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
addJobState(dagId, dagNode);
//Update the running jobs counter.
getRunningJobsCounter(dagNode).inc();
isDagRunning = true;
}
}
FlowId flowId = DagManagerUtils.getFlowId(dag);
if (!flowGauges.containsKey(flowId.toString())) {
String flowStateGaugeName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowId.getFlowGroup(),
flowId.getFlowName(), ServiceMetricNames.RUNNING_STATUS);
flowGauges.put(flowId.toString(), FlowState.RUNNING);
ContextAwareGauge<Integer> gauge = RootMetricContext
.get().newContextAwareGauge(flowStateGaugeName, () -> flowGauges.get(flowId.toString()).value);
RootMetricContext.get().register(flowStateGaugeName, gauge);
}
log.debug("Dag {} submitting jobs ready for execution.", DagManagerUtils.getFullyQualifiedDagName(dag));
//Determine the next set of jobs to run and submit them for execution
Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = submitNext(dagId);
for (DagNode dagNode: nextSubmitted.get(dagId)) {
addJobState(dagId, dagNode);
}
// Set flow status to running
DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, TimingEvent.FlowTimings.FLOW_RUNNING);
flowGauges.put(flowId.toString(), FlowState.RUNNING);
// Report the orchestration delay the first time the Dag is initialized. Orchestration delay is defined as
// the time difference between the instant when a flow first transitions to the running state and the instant
// when the flow is submitted to Gobblin service.
if (!isDagRunning) {
this.orchestrationDelay.set(System.currentTimeMillis() - DagManagerUtils.getFlowExecId(dag));
}
log.info("Dag {} Initialization complete.", DagManagerUtils.getFullyQualifiedDagName(dag));
}
/**
* Proceed the execution of each dag node based on job status.
*/
private void pollAndAdvanceDag() throws IOException, ExecutionException, InterruptedException {
this.failedDagIdsFinishRunning.clear();
Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = Maps.newHashMap();
List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
boolean slaKilled = slaKillIfNeeded(node);
JobStatus jobStatus = pollJobStatus(node);
boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
ExecutionStatus status = getJobExecutionStatus(slaKilled, killOrphanFlow, jobStatus);
JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(node);
switch (status) {
case COMPLETE:
jobExecutionPlan.setExecutionStatus(COMPLETE);
nextSubmitted.putAll(onJobFinish(node));
nodesToCleanUp.add(node);
break;
case FAILED:
jobExecutionPlan.setExecutionStatus(FAILED);
nextSubmitted.putAll(onJobFinish(node));
nodesToCleanUp.add(node);
break;
case CANCELLED:
jobExecutionPlan.setExecutionStatus(CANCELLED);
nextSubmitted.putAll(onJobFinish(node));
nodesToCleanUp.add(node);
break;
case PENDING:
jobExecutionPlan.setExecutionStatus(PENDING);
break;
case PENDING_RETRY:
jobExecutionPlan.setExecutionStatus(PENDING_RETRY);
break;
default:
jobExecutionPlan.setExecutionStatus(RUNNING);
break;
}
if (jobStatus != null && jobStatus.isShouldRetry()) {
log.info("Retrying job: {}, current attempts: {}, max attempts: {}", DagManagerUtils.getFullyQualifiedJobName(node),
jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
submitJob(node);
}
}
for (Map.Entry<String, Set<DagNode<JobExecutionPlan>>> entry: nextSubmitted.entrySet()) {
String dagId = entry.getKey();
Set<DagNode<JobExecutionPlan>> dagNodes = entry.getValue();
for (DagNode<JobExecutionPlan> dagNode: dagNodes) {
addJobState(dagId, dagNode);
}
}
for (DagNode<JobExecutionPlan> dagNode: nodesToCleanUp) {
String dagId = DagManagerUtils.generateDagId(dagNode);
deleteJobState(dagId, dagNode);
}
}
/**
* Cancel the job if the job has been "orphaned". A job is orphaned if has been in ORCHESTRATED
* {@link ExecutionStatus} for some specific amount of time.
* @param node {@link DagNode} representing the job
* @param jobStatus current {@link JobStatus} of the job
* @return true if the total time that the job remains in the ORCHESTRATED state exceeds
* {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
*/
private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node, JobStatus jobStatus)
throws ExecutionException, InterruptedException {
if (jobStatus == null) {
return false;
}
ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
long timeOutForJobStart = DagManagerUtils.getJobStartSla(node);
long jobOrchestratedTime = jobStatus.getOrchestratedTime();
if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - jobOrchestratedTime > timeOutForJobStart) {
log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing the job now...",
DagManagerUtils.getJobName(node),
DagManagerUtils.getFullyQualifiedDagName(node),
timeOutForJobStart);
cancelDagNode(node);
String dagId = DagManagerUtils.generateDagId(node);
this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
this.dags.get(dagId).setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration");
return true;
} else {
return false;
}
}
private ExecutionStatus getJobExecutionStatus(boolean slaKilled, boolean killOrphanFlow, JobStatus jobStatus) {
if (slaKilled || killOrphanFlow) {
return CANCELLED;
} else {
if (jobStatus == null) {
return PENDING;
} else {
return valueOf(jobStatus.getEventName());
}
}
}
/**
* Check if the SLA is configured for the flow this job belongs to.
* If it is, this method will try to cancel the job when SLA is reached.
*
* @param node dag node of the job
* @return true if the job is killed because it reached sla
* @throws ExecutionException exception
* @throws InterruptedException exception
*/
private boolean slaKillIfNeeded(DagNode<JobExecutionPlan> node) throws ExecutionException, InterruptedException {
long flowStartTime = DagManagerUtils.getFlowStartTime(node);
long currentTime = System.currentTimeMillis();
String dagId = DagManagerUtils.generateDagId(node);
long flowSla;
if (dagToSLA.containsKey(dagId)) {
flowSla = dagToSLA.get(dagId);
} else {
flowSla = DagManagerUtils.getFlowSLA(node);
dagToSLA.put(dagId, flowSla);
}
if (currentTime > flowStartTime + flowSla) {
log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} now...",
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), flowSla,
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
cancelDagNode(node);
this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
this.dags.get(dagId).setMessage("Flow killed due to exceeding SLA of " + flowSla + " ms");
return true;
}
return false;
}
/**
* Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
*/
private JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
long flowExecutionId = jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName);
}
/**
* Retrieve the flow's {@link JobStatus} (i.e. job status with {@link JobStatusRetriever#NA_KEY} as job name/group) from a dag
*/
private JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag) {
if (dag == null || dag.isEmpty()) {
return null;
}
Config jobConfig = dag.getNodes().get(0).getValue().getJobSpec().getConfig();
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
long flowExecutionId = jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
return pollStatus(flowGroup, flowName, flowExecutionId, JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY);
}
private JobStatus pollStatus(String flowGroup, String flowName, long flowExecutionId, String jobGroup, String jobName) {
long pollStartTime = System.nanoTime();
Iterator<JobStatus> jobStatusIterator =
this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId, jobName, jobGroup);
Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() - pollStartTime, TimeUnit.NANOSECONDS);
if (jobStatusIterator.hasNext()) {
return jobStatusIterator.next();
} else {
return null;
}
}
/**
* Submit next set of Dag nodes in the Dag identified by the provided dagId
* @param dagId The dagId that should be processed.
* @return
* @throws IOException
*/
synchronized Map<String, Set<DagNode<JobExecutionPlan>>> submitNext(String dagId) throws IOException {
Dag<JobExecutionPlan> dag = this.dags.get(dagId);
Set<DagNode<JobExecutionPlan>> nextNodes = DagManagerUtils.getNext(dag);
//Submit jobs from the dag ready for execution.
for (DagNode<JobExecutionPlan> dagNode : nextNodes) {
submitJob(dagNode);
}
//Checkpoint the dag state
this.dagStateStore.writeCheckpoint(dag);
Map<String, Set<DagNode<JobExecutionPlan>>> dagIdToNextJobs = Maps.newHashMap();
dagIdToNextJobs.put(dagId, nextNodes);
return dagIdToNextJobs;
}
/**
* Submits a {@link JobSpec} to a {@link org.apache.gobblin.runtime.api.SpecExecutor}.
*/
private void submitJob(DagNode<JobExecutionPlan> dagNode) {
DagManagerUtils.incrementJobAttempt(dagNode);
JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode);
jobExecutionPlan.setExecutionStatus(RUNNING);
JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
// Run this spec on selected executor
SpecProducer producer = null;
try {
checkQuota(dagNode);
producer = DagManagerUtils.getSpecProducer(dagNode);
TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
// Submit the job to the SpecProducer, which in turn performs the actual job submission to the SpecExecutor instance.
// The SpecProducer implementations submit the job to the underlying executor and return when the submission is complete,
// either successfully or unsuccessfully. To catch any exceptions in the job submission, the DagManagerThread
// blocks (by calling Future#get()) until the submission is completed.
Future addSpecFuture = producer.addSpec(jobSpec);
dagNode.getValue().setJobFuture(Optional.of(addSpecFuture));
//Persist the dag
this.dagStateStore.writeCheckpoint(this.dags.get(DagManagerUtils.generateDagId(dagNode)));
if (this.metricContext != null) {
getRunningJobsCounter(dagNode).inc();
getRunningJobsCounterForUser(dagNode).forEach(counter -> counter.inc());
}
addSpecFuture.get();
jobMetadata.put(TimingEvent.METADATA_MESSAGE, producer.getExecutionLink(addSpecFuture, specExecutorUri));
if (jobOrchestrationTimer != null) {
jobOrchestrationTimer.stop(jobMetadata);
}
log.info("Orchestrated job: {} on Executor: {}", DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
} catch (Exception e) {
TimingEvent jobFailedTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED) : null;
String message = "Cannot submit job " + DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
log.error(message, e);
jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage());
if (jobFailedTimer != null) {
jobFailedTimer.stop(jobMetadata);
}
}
}
private void checkQuota(DagNode<JobExecutionPlan> dagNode) throws IOException {
String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
boolean proxyUserCheck = true;
if (proxyUser != null) {
proxyUserCheck = incrementMapAndCheckQuota(proxyUserToJobCount, proxyUser, dagNode);
}
String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
boolean requesterCheck = true;
String requesterMessage = null;
if (serializedRequesters != null) {
for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
requesterCheck &= incrementMapAndCheckQuota(requesterToJobCount, requester.getName(), dagNode);
if (!requesterCheck && requesterMessage == null) {
requesterMessage = "Quota exceeded for requester " + requester.getName() + " on executor " + specExecutorUri + ": quota="
+ getQuotaForUser(requester.getName()) + ", runningJobs=" + requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode));
}
}
}
// Throw errors for reach quota at the end to avoid inconsistent job counts
if (!proxyUserCheck) {
throw new IOException("Quota exceeded for proxy user " + proxyUser + " on executor " + specExecutorUri +
": quota=" + getQuotaForUser(proxyUser) + ", runningJobs=" + proxyUserToJobCount.get(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode)));
}
if (!requesterCheck) {
throw new IOException(requesterMessage);
}
}
/**
* Increment quota by one for the given map and key.
* @return true if quota is not reached for this user or user is whitelisted, false otherwise.
*/
private boolean incrementMapAndCheckQuota(Map<String, Integer> quotaMap, String user, DagNode<JobExecutionPlan> dagNode) {
String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
int jobCount = quotaMap.getOrDefault(key, 0);
// Only increment job count for first attempt, since job is considered running between retries
if (dagNode.getValue().getCurrentAttempts() == 1) {
jobCount++;
quotaMap.put(key, jobCount);
}
return jobCount <= getQuotaForUser(user);
}
private int getQuotaForUser(String user) {
return perUserQuota.getOrDefault(user, defaultQuota);
}
/**
* Method that defines the actions to be performed when a job finishes either successfully or with failure.
* This method updates the state of the dag and performs clean up actions as necessary.
* TODO : Dag should have a status field, like JobExecutionPlan has. This method should update that field,
* which should be used by cleanup(). It may also remove the need of failedDagIdsFinishRunning,
* failedDagIdsFinishAllPossible.
*/
private Map<String, Set<DagNode<JobExecutionPlan>>> onJobFinish(DagNode<JobExecutionPlan> dagNode)
throws IOException {
Dag<JobExecutionPlan> dag = this.jobToDag.get(dagNode);
String dagId = DagManagerUtils.generateDagId(dag);
String jobName = DagManagerUtils.getFullyQualifiedJobName(dagNode);
ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, jobStatus.name());
releaseQuota(dagNode);
if (this.metricContext != null) {
getRunningJobsCounter(dagNode).dec();
getRunningJobsCounterForUser(dagNode).forEach(counter -> counter.dec());
}
switch (jobStatus) {
// TODO : For now treat canceled as failed, till we introduce failure option - CANCEL
case FAILED:
dag.setMessage("Flow failed because job " + jobName + " failed");
if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
this.failedDagIdsFinishRunning.add(dagId);
} else {
this.failedDagIdsFinishAllPossible.add(dagId);
}
return Maps.newHashMap();
case CANCELLED:
if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
this.failedDagIdsFinishRunning.add(dagId);
} else {
this.failedDagIdsFinishAllPossible.add(dagId);
}
return Maps.newHashMap();
case COMPLETE:
return submitNext(dagId);
default:
log.warn("It should not reach here. Job status is unexpected.");
return Maps.newHashMap();
}
}
/**
* Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link DagNode}.
*/
private void releaseQuota(DagNode<JobExecutionPlan> dagNode) {
String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
if (proxyUser != null) {
String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
if (proxyUserToJobCount.containsKey(proxyUserKey) && proxyUserToJobCount.get(proxyUserKey) > 0) {
proxyUserToJobCount.put(proxyUserKey, proxyUserToJobCount.get(proxyUserKey) - 1);
}
}
String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
if (serializedRequesters != null) {
try {
for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
String requesterKey = DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
if (requesterToJobCount.containsKey(requesterKey) && requesterToJobCount.get(requesterKey) > 0) {
requesterToJobCount.put(requesterKey, requesterToJobCount.get(requesterKey) - 1);
}
}
} catch (IOException e) {
log.error("Failed to release quota for requester list " + serializedRequesters, e);
}
}
}
private void deleteJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
this.jobToDag.remove(dagNode);
this.dagToJobs.get(dagId).remove(dagNode);
this.dagToSLA.remove(dagId);
}
private void addJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
Dag<JobExecutionPlan> dag = this.dags.get(dagId);
this.jobToDag.put(dagNode, dag);
if (this.dagToJobs.containsKey(dagId)) {
this.dagToJobs.get(dagId).add(dagNode);
} else {
LinkedList<DagNode<JobExecutionPlan>> dagNodeList = Lists.newLinkedList();
dagNodeList.add(dagNode);
this.dagToJobs.put(dagId, dagNodeList);
}
}
private boolean hasRunningJobs(String dagId) {
return !this.dagToJobs.get(dagId).isEmpty();
}
private ContextAwareCounter getRunningJobsCounter(DagNode<JobExecutionPlan> dagNode) {
return metricContext.contextAwareCounter(
MetricRegistry.name(
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.RUNNING_FLOWS_COUNTER,
dagNode.getValue().getSpecExecutor().getUri().toString()));
}
private List<ContextAwareCounter> getRunningJobsCounterForUser(DagNode<JobExecutionPlan> dagNode) {
Config configs = dagNode.getValue().getJobSpec().getConfig();
String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null);
List<ContextAwareCounter> counters = new ArrayList<>();
if (StringUtils.isNotEmpty(proxy)) {
counters.add(metricContext.contextAwareCounter(
MetricRegistry.name(
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.SERVICE_USERS, proxy)));
}
try {
String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
if (StringUtils.isNotEmpty(serializedRequesters)) {
List<ServiceRequester> requesters = RequesterService.deserialize(serializedRequesters);
for (ServiceRequester requester : requesters) {
counters.add(metricContext.contextAwareCounter(MetricRegistry
.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SERVICE_USERS, requester.getName())));
}
}
} catch (IOException e) {
log.error("Error while fetching requester list.", e);
}
return counters;
}
/**
* Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state.
*/
private void cleanUp() throws IOException {
List<String> dagIdstoClean = new ArrayList<>();
//Clean up failed dags
for (String dagId : this.failedDagIdsFinishRunning) {
addFailedDag(dagId);
//Skip monitoring of any other jobs of the failed dag.
LinkedList<DagNode<JobExecutionPlan>> dagNodeList = this.dagToJobs.get(dagId);
while (!dagNodeList.isEmpty()) {
DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
deleteJobState(dagId, dagNode);
}
log.info("Dag {} has finished with status FAILED; Cleaning up dag from the state store.", dagId);
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.FAILED);
// send an event before cleaning up dag
DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), TimingEvent.FlowTimings.FLOW_FAILED);
dagIdstoClean.add(dagId);
}
//Clean up completed dags
for (String dagId : this.dags.keySet()) {
if (!hasRunningJobs(dagId) && !this.failedDagIdsFinishRunning.contains(dagId)) {
String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
addFailedDag(dagId);
status = TimingEvent.FlowTimings.FLOW_FAILED;
this.failedDagIdsFinishAllPossible.remove(dagId);
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.FAILED);
} else {
flowGauges.put(DagManagerUtils.getFlowId(this.dags.get(dagId)).toString(), FlowState.SUCCESSFUL);
}
log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
// send an event before cleaning up dag
DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), status);
dagIdstoClean.add(dagId);
}
}
for (String dagId: dagIdstoClean) {
cleanUpDag(dagId);
}
}
/**
* Add a dag to failed dag state store
*/
private synchronized void addFailedDag(String dagId) {
try {
log.info("Adding dag " + dagId + " to failed dag state store");
this.failedDagStateStore.writeCheckpoint(this.dags.get(dagId));
} catch (IOException e) {
log.error("Failed to add dag " + dagId + " to failed dag state store", e);
}
this.failedDagIds.add(dagId);
}
/**
* Note that removal of a {@link Dag} entry in {@link #dags} needs to be happen after {@link #cleanUp()}
* since the real {@link Dag} object is required for {@link #cleanUp()},
* and cleaning of all relevant states need to be atomic
* @param dagId
*/
private synchronized void cleanUpDag(String dagId) {
// clears flow event after cancelled job to allow resume event status to be set
this.dags.get(dagId).setFlowEvent(null);
try {
this.dagStateStore.cleanUp(dags.get(dagId));
} catch (IOException ioe) {
log.error(String.format("Failed to clean %s from backStore due to:", dagId), ioe);
}
this.dags.remove(dagId);
this.dagToJobs.remove(dagId);
}
}
private enum FlowState {
FAILED(-1),
RUNNING(0),
SUCCESSFUL(1);
public int value;
FlowState(int value) {
this.value = value;
}
}
/**
* Thread that runs retention on failed dags based on their original start time (which is the flow execution ID).
*/
public static class FailedDagRetentionThread implements Runnable {
private final DagStateStore failedDagStateStore;
private final Set<String> failedDagIds;
private final long failedDagRetentionTime;
FailedDagRetentionThread(DagStateStore failedDagStateStore, Set<String> failedDagIds, long failedDagRetentionTime) {
this.failedDagStateStore = failedDagStateStore;
this.failedDagIds = failedDagIds;
this.failedDagRetentionTime = failedDagRetentionTime;
}
@Override
public void run() {
try {
log.info("Cleaning failed dag state store");
long startTime = System.currentTimeMillis();
List<String> dagIdsToClean = new ArrayList<>();
for (String dagId : this.failedDagIds) {
if (this.failedDagRetentionTime > 0L && startTime > DagManagerUtils.getFlowExecId(dagId) + this.failedDagRetentionTime) {
this.failedDagStateStore.cleanUp(dagId);
dagIdsToClean.add(dagId);
}
}
for (String dagId : dagIdsToClean) {
this.failedDagIds.remove(dagId);
}
log.info("Cleaned " + dagIdsToClean.size() + " dags from the failed dag state store");
} catch (Exception e) {
log.error("Failed to run retention on failed dag state store", e);
}
}
}
/** Stop the service. */
@Override
protected void shutDown()
throws Exception {
this.scheduledExecutorPool.shutdown();
this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
}
}