blob: 1fa88086e473e0082a8023f7cf7bc651ae002a83 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.runtime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.SlidingTimeWindowReservoir;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.runtime.fork.Fork;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
import lombok.Getter;
import static com.codahale.metrics.MetricRegistry.name;
/**
* A class for executing {@link Task}s and retrying failed ones as well as for executing {@link Fork}s.
*
* @author Yinan Li
*/
public class TaskExecutor extends AbstractIdleService {
private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class);
// Thread pool executor for running tasks
private final ScheduledExecutorService taskExecutor;
// A separate thread pool executor for running forks of tasks
@Getter
private final ExecutorService forkExecutor;
// Task retry interval
@Getter
private final long retryIntervalInSeconds;
// The maximum number of items in the queued task time map.
@Getter
private final int queuedTaskTimeMaxSize;
// The maximum age of the items in the queued task time map.
@Getter
private final long queuedTaskTimeMaxAge ;
// Map of queued task ids to queue times. The key is the task id, the value is the time the task was queued. If the
// task is being retried, the time may be in the future. Entries with time in the future will not be counted as
// queued until the time is in the past.
private final Map<String, Long> queuedTasks = Maps.newConcurrentMap();
// Set of historical queued task times. The key is the UTC epoch time the task started, the value is the milliseconds
// the task waited to start.
private final ConcurrentSkipListMap<Long, Long> queuedTaskTimeHistorical = new ConcurrentSkipListMap<>();
// The timestamp for the last time the metrics were calculated.
private long lastCalculationTime = 0;
// The total number of tasks currently queued and queued over the historical lookback period.
@Getter
private AtomicInteger queuedTaskCount = new AtomicInteger();
// The total number of tasks currently queued.
@Getter
private AtomicInteger currentQueuedTaskCount = new AtomicInteger();
// The total number of tasks queued over the historical lookback period.
@Getter
private AtomicInteger historicalQueuedTaskCount = new AtomicInteger();
// The total time tasks have currently been in the queue and were in the queue during the historical lookback period.
@Getter
private AtomicLong queuedTaskTotalTime = new AtomicLong();
// The total time tasks have currently been in the queue.
@Getter
private AtomicLong currentQueuedTaskTotalTime = new AtomicLong();
// The total time tasks have been in the queue during the historical lookback period.
@Getter
private AtomicLong historicalQueuedTaskTotalTime = new AtomicLong();
// Count of running tasks.
@Getter
private final Counter runningTaskCount = new Counter();
// Count of failed tasks.
@Getter
private final Meter successfulTaskCount = new Meter();
// Count of failed tasks.
@Getter
private final Meter failedTaskCount = new Meter();
@Getter
private final Timer taskCreateAndRunTimer;
// The metric set exposed from the task executor.
private final TaskExecutorQueueMetricSet metricSet = new TaskExecutorQueueMetricSet();
/**
* Constructor used internally.
*/
private TaskExecutor(int taskExecutorThreadPoolSize, int coreRetryThreadPoolSize, long retryIntervalInSeconds,
int queuedTaskTimeMaxSize, long queuedTaskTimeMaxAge, int timerWindowSize) {
Preconditions.checkArgument(taskExecutorThreadPoolSize > 0, "Task executor thread pool size should be positive");
Preconditions.checkArgument(retryIntervalInSeconds > 0, "Task retry interval should be positive");
Preconditions.checkArgument(queuedTaskTimeMaxSize > 0, "Queued task time max size should be positive");
Preconditions.checkArgument(queuedTaskTimeMaxAge > 0, "Queued task time max age should be positive");
// Currently a fixed-size thread pool is used to execute tasks. We probably need to revisit this later.
this.taskExecutor = ExecutorsUtils.loggingDecorator(Executors.newScheduledThreadPool(
taskExecutorThreadPoolSize,
ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("TaskExecutor-%d"))));
this.retryIntervalInSeconds = retryIntervalInSeconds;
this.queuedTaskTimeMaxSize = queuedTaskTimeMaxSize;
this.queuedTaskTimeMaxAge = queuedTaskTimeMaxAge;
this.taskCreateAndRunTimer = new Timer(new SlidingTimeWindowReservoir(timerWindowSize, TimeUnit.MINUTES));
this.forkExecutor = ExecutorsUtils.loggingDecorator(
new ThreadPoolExecutor(
// The core thread pool size is equal to that of the task
// executor as there's at least one fork per task
taskExecutorThreadPoolSize,
// The fork executor thread pool size is essentially unbounded. This is to make sure all forks of
// a task get a thread to run so all forks of the task are making progress. This is necessary since
// otherwise the parent task will be blocked if the record queue (bounded) of some fork is full and
// that fork has not yet started to run because of no available thread. The task cannot proceed in
// this case because it has to make sure every records go to every forks.
Integer.MAX_VALUE,
0L,
TimeUnit.MILLISECONDS,
// The work queue is a SynchronousQueue. This essentially forces a new thread to be created for each fork.
new SynchronousQueue<Runnable>(),
ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("ForkExecutor-%d"))));
}
/**
* Constructor to work with {@link java.util.Properties}.
*/
public TaskExecutor(Properties properties) {
this(Integer.parseInt(properties.getProperty(ConfigurationKeys.TASK_EXECUTOR_THREADPOOL_SIZE_KEY,
Integer.toString(ConfigurationKeys.DEFAULT_TASK_EXECUTOR_THREADPOOL_SIZE))),
Integer.parseInt(properties.getProperty(ConfigurationKeys.TASK_RETRY_THREAD_POOL_CORE_SIZE_KEY,
Integer.toString(ConfigurationKeys.DEFAULT_TASK_RETRY_THREAD_POOL_CORE_SIZE))),
Long.parseLong(properties.getProperty(ConfigurationKeys.TASK_RETRY_INTERVAL_IN_SEC_KEY,
Long.toString(ConfigurationKeys.DEFAULT_TASK_RETRY_INTERVAL_IN_SEC))),
Integer.parseInt(properties.getProperty(ConfigurationKeys.QUEUED_TASK_TIME_MAX_SIZE,
Integer.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_SIZE))),
Long.parseLong(properties.getProperty(ConfigurationKeys.QUEUED_TASK_TIME_MAX_AGE,
Long.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE))),
Integer.parseInt(properties.getProperty(ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
Integer.toString(ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES))));
}
/**
* Constructor to work with Hadoop {@link org.apache.hadoop.conf.Configuration}.
*/
public TaskExecutor(Configuration conf) {
this(conf.getInt(ConfigurationKeys.TASK_EXECUTOR_THREADPOOL_SIZE_KEY,
ConfigurationKeys.DEFAULT_TASK_EXECUTOR_THREADPOOL_SIZE),
conf.getInt(ConfigurationKeys.TASK_RETRY_THREAD_POOL_CORE_SIZE_KEY,
ConfigurationKeys.DEFAULT_TASK_RETRY_THREAD_POOL_CORE_SIZE),
conf.getLong(ConfigurationKeys.TASK_RETRY_INTERVAL_IN_SEC_KEY,
ConfigurationKeys.DEFAULT_TASK_RETRY_INTERVAL_IN_SEC),
conf.getInt(ConfigurationKeys.QUEUED_TASK_TIME_MAX_SIZE,
ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_SIZE),
conf.getLong(ConfigurationKeys.QUEUED_TASK_TIME_MAX_AGE,
ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE),
conf.getInt(ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES));
Log4jConfigurationHelper.setLogLevel(conf.getTrimmedStringCollection(Log4jConfigurationHelper.LOG_LEVEL_OVERRIDE_MAP));
}
@Override
protected void startUp()
throws Exception {
LOG.info("Starting the task executor");
if (this.taskExecutor.isShutdown() || this.taskExecutor.isTerminated()) {
throw new IllegalStateException("Task thread pool executor is shutdown or terminated");
}
if (this.forkExecutor.isShutdown() || this.forkExecutor.isTerminated()) {
throw new IllegalStateException("Fork thread pool executor is shutdown or terminated");
}
}
@Override
protected void shutDown()
throws Exception {
LOG.info("Stopping the task executor");
try {
ExecutorsUtils.shutdownExecutorService(this.taskExecutor, Optional.of(LOG));
} finally {
ExecutorsUtils.shutdownExecutorService(this.forkExecutor, Optional.of(LOG));
}
}
/**
* Execute a {@link Task}.
*
* @param task {@link Task} to be executed
*/
public void execute(Task task) {
LOG.info(String.format("Executing task %s", task.getTaskId()));
this.taskExecutor.execute(new TrackingTask(task));
}
/**
* Submit a {@link Task} to run.
*
* @param task {@link Task} to be submitted
* @return a {@link java.util.concurrent.Future} for the submitted {@link Task}
*/
public Future<?> submit(Task task) {
LOG.info(String.format("Submitting task %s", task.getTaskId()));
return this.taskExecutor.submit(new TrackingTask(task));
}
/**
* Execute a {@link Fork}.
*
* @param fork {@link Fork} to be executed
*/
public void execute(Fork fork) {
LOG.info(String.format("Executing fork %d of task %s", fork.getIndex(), fork.getTaskId()));
this.forkExecutor.execute(fork);
}
/**
* Submit a {@link Fork} to run.
*
* @param fork {@link Fork} to be submitted
* @return a {@link java.util.concurrent.Future} for the submitted {@link Fork}
*/
public Future<?> submit(Fork fork) {
LOG.info(String.format("Submitting fork %d of task %s", fork.getIndex(), fork.getTaskId()));
return this.forkExecutor.submit(fork);
}
/**
* Retry a failed {@link Task}.
*
* @param task failed {@link Task} to be retried
*/
public void retry(Task task) {
if (GobblinMetrics.isEnabled(task.getTaskState().getWorkunit()) &&
task.getTaskState().contains(ConfigurationKeys.FORK_BRANCHES_KEY)) {
// Adjust metrics to clean up numbers from the failed task
task.getTaskState()
.adjustJobMetricsOnRetry(task.getTaskState().getPropAsInt(ConfigurationKeys.FORK_BRANCHES_KEY));
}
// Task retry interval increases linearly with number of retries
long interval = task.getRetryCount() * this.retryIntervalInSeconds;
// Schedule the retry of the failed task
this.taskExecutor.schedule(new TrackingTask(task, interval, TimeUnit.SECONDS), interval, TimeUnit.SECONDS);
LOG.info(String.format("Scheduled retry of failed task %s to run in %d seconds", task.getTaskId(), interval));
task.incrementRetryCount();
}
public MetricSet getTaskExecutorQueueMetricSet() {
return this.metricSet;
}
private synchronized void calculateMetrics() {
long currentTimeMillis = System.currentTimeMillis();
if (lastCalculationTime < currentTimeMillis - TimeUnit.SECONDS.toMillis(10)) {
LOG.debug("Starting metric calculation.");
int currentQueuedTaskCount = 0;
int futureQueuedTaskCount = 0;
long currentQueuedTaskTotalTime = 0;
for (Map.Entry<String, Long> queuedTask : this.queuedTasks.entrySet()) {
if (queuedTask.getValue() <= currentTimeMillis) {
currentQueuedTaskCount++;
long currentQueuedTaskTime = currentTimeMillis - queuedTask.getValue();
currentQueuedTaskTotalTime += currentQueuedTaskTime;
LOG.debug(String.format("Task %s has been waiting in the queue for %d ms.", queuedTask.getKey(), currentQueuedTaskTime));
} else {
futureQueuedTaskCount++;
}
}
if (futureQueuedTaskCount > 0) {
LOG.debug(String.format("%d tasks were ignored during metric calculations because they are scheduled to run in the future.", futureQueuedTaskCount));
}
this.currentQueuedTaskCount.set(currentQueuedTaskCount);
this.currentQueuedTaskTotalTime.set(currentQueuedTaskTotalTime);
LOG.debug(String.format("%d current tasks have been waiting for a total of %d ms.", currentQueuedTaskCount, currentQueuedTaskTotalTime));
int historicalQueuedTaskCount = 0;
long historicalQueuedTaskTotalTime = 0;
long cutoff = currentTimeMillis - queuedTaskTimeMaxAge;
Iterator<Map.Entry<Long, Long>> iterator = queuedTaskTimeHistorical.descendingMap().entrySet().iterator();
while (iterator.hasNext()) {
try {
Map.Entry<Long, Long> historicalQueuedTask = iterator.next();
if (historicalQueuedTask.getKey() < cutoff || historicalQueuedTaskCount >= queuedTaskTimeMaxSize) {
LOG.debug(String.format("Task started at %d is before the cutoff of %d and is being removed. Queue time %d will be removed from metric calculations.", historicalQueuedTask.getKey(), cutoff, historicalQueuedTask.getValue()));
iterator.remove();
} else {
historicalQueuedTaskCount++;
historicalQueuedTaskTotalTime += historicalQueuedTask.getValue();
LOG.debug(String.format("Task started at %d is after cutoff. Queue time %d will be used in metric calculations.", historicalQueuedTask.getKey(), historicalQueuedTask.getValue()));
}
} catch (NoSuchElementException e) {
LOG.warn("Ran out of items in historical task queue time set.");
}
}
this.historicalQueuedTaskCount.set(historicalQueuedTaskCount);
this.historicalQueuedTaskTotalTime.set(historicalQueuedTaskTotalTime);
LOG.debug(String.format("%d historical tasks have been waiting for a total of %d ms.", historicalQueuedTaskCount, historicalQueuedTaskTotalTime));
int totalQueuedTaskCount = currentQueuedTaskCount + historicalQueuedTaskCount;
long totalQueuedTaskTime = currentQueuedTaskTotalTime + historicalQueuedTaskTotalTime;
this.queuedTaskCount.set(totalQueuedTaskCount);
this.queuedTaskTotalTime.set(totalQueuedTaskTime);
LOG.debug(String.format("%d tasks have been waiting for a total of %d ms.", totalQueuedTaskCount, totalQueuedTaskTime));
this.lastCalculationTime = currentTimeMillis;
LOG.debug("Finished metric calculation.");
} else {
LOG.debug("Skipped metric calculation because not enough time has elapsed since the last calculation.");
}
}
private class TaskExecutorQueueMetricSet implements MetricSet {
@Override
public Map<String, Metric> getMetrics() {
final Map<String, Metric> metrics = new HashMap<>();
metrics.put(name("queued", "current", "count"), new Gauge<Integer>() {
@Override
public Integer getValue() {
calculateMetrics();
return currentQueuedTaskCount.intValue();
}
});
metrics.put(name("queued", "historical", "count"), new Gauge<Integer>() {
@Override
public Integer getValue() {
calculateMetrics();
return historicalQueuedTaskCount.intValue();
}
});
metrics.put(name("queued", "count"), new Gauge<Integer>() {
@Override
public Integer getValue() {
calculateMetrics();
return queuedTaskCount.intValue();
}
});
metrics.put(name("queued", "current", "time", "total"), new Gauge<Long>() {
@Override
public Long getValue() {
calculateMetrics();
return currentQueuedTaskTotalTime.longValue();
}
});
metrics.put(name("queued", "historical", "time", "total"), new Gauge<Long>() {
@Override
public Long getValue() {
calculateMetrics();
return historicalQueuedTaskTotalTime.longValue();
}
});
metrics.put(name("queued", "time", "total"), new Gauge<Long>() {
@Override
public Long getValue() {
calculateMetrics();
return queuedTaskTotalTime.longValue();
}
});
metrics.put(name("running", "count"), runningTaskCount);
metrics.put(name("successful", "count"), successfulTaskCount);
metrics.put(name("failed", "count"), failedTaskCount);
return Collections.unmodifiableMap(metrics);
}
}
private class TrackingTask implements Runnable {
private Task underlyingTask;
public TrackingTask(Task task) {
this(task, 0, TimeUnit.SECONDS);
}
public TrackingTask(Task task, long interval, TimeUnit timeUnit) {
long now = System.currentTimeMillis();
long timeToRun = now + timeUnit.toMillis(interval);
LOG.debug(String.format("Task %s queued to run %s.", task.getTaskId(), timeToRun <= now ? "now" : "at " + timeToRun));
queuedTasks.putIfAbsent(task.getTaskId(), timeToRun);
this.underlyingTask = task;
}
@Override
public void run() {
long startTime = System.currentTimeMillis();
onStart(startTime);
try {
this.underlyingTask.run();
successfulTaskCount.mark();
} catch (Exception e) {
failedTaskCount.mark();
LOG.error(String.format("Task %s failed", underlyingTask.getTaskId()), e);
throw e;
} finally {
runningTaskCount.dec();
}
}
private void onStart(long startTime) {
Long queueTime = queuedTasks.remove(this.underlyingTask.getTaskId());
long workUnitCreationTime = this.underlyingTask.getTaskContext().getTaskState().getPropAsLong(ConfigurationKeys.WORK_UNIT_CREATION_TIME_IN_MILLIS, 0);
long timeInQueue = startTime - queueTime;
long timeSinceWorkUnitCreation = startTime - workUnitCreationTime;
taskCreateAndRunTimer.update(timeSinceWorkUnitCreation, TimeUnit.MILLISECONDS);
LOG.debug(String.format("Task %s started. Saving queued time of %d ms to history.", underlyingTask.getTaskId(), timeInQueue));
queuedTaskTimeHistorical.putIfAbsent(System.currentTimeMillis(), timeInQueue);
runningTaskCount.inc();
}
}
}