blob: 57df840153ec7e5f26bcb68e293ce0f100e1f0db [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.samza.container;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.SamzaException;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.CoordinatorRequests;
import org.apache.samza.scheduler.EpochTimeScheduler;
import org.apache.samza.task.ReadableCoordinator;
import org.apache.samza.task.TaskCallback;
import org.apache.samza.task.TaskCallbackFactory;
import org.apache.samza.task.TaskCallbackImpl;
import org.apache.samza.task.TaskCallbackListener;
import org.apache.samza.task.TaskCallbackManager;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.HighResolutionClock;
import org.apache.samza.util.Throttleable;
import org.apache.samza.util.ThrottlingScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* The run loop supports both single-threaded and multi-threaded execution models.
* <p>
* If job.container.thread.pool.size &gt; 1 (multi-threaded), operations like commit, window and timer for all tasks within a container
* happens on a thread pool.
* If job.container.thread.pool.size &lt; 1 (single-threaded), operations for all tasks are multiplexed onto one execution thread.
* </p>.
* Note: In both models, process/processAsync for all tasks is invoked on the run loop thread.
public class RunLoop implements Runnable, Throttleable {
private static final Logger log = LoggerFactory.getLogger(RunLoop.class);
private final List<AsyncTaskWorker> taskWorkers;
private final SystemConsumers consumerMultiplexer;
private final Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToTaskWorkerMapping;
private final ExecutorService threadPool;
private final CoordinatorRequests coordinatorRequests;
private final Object latch;
private final int maxConcurrency;
private final long windowMs;
private final long commitMs;
private final long callbackTimeoutMs;
private final long maxIdleMs;
private final SamzaContainerMetrics containerMetrics;
private final ScheduledExecutorService workerTimer;
private final ScheduledExecutorService callbackTimer;
private final ThrottlingScheduler callbackExecutor;
private volatile boolean shutdownNow = false;
private volatile Throwable throwable = null;
private final HighResolutionClock clock;
private final boolean isAsyncCommitEnabled;
private volatile boolean runLoopResumedSinceLastChecked;
public RunLoop(Map<TaskName, RunLoopTask> runLoopTasks,
ExecutorService threadPool,
SystemConsumers consumerMultiplexer,
int maxConcurrency,
long windowMs,
long commitMs,
long callbackTimeoutMs,
long maxThrottlingDelayMs,
long maxIdleMs,
SamzaContainerMetrics containerMetrics,
HighResolutionClock clock,
boolean isAsyncCommitEnabled) {
this.threadPool = threadPool;
this.consumerMultiplexer = consumerMultiplexer;
this.containerMetrics = containerMetrics;
this.windowMs = windowMs;
this.commitMs = commitMs;
this.maxConcurrency = maxConcurrency;
this.callbackTimeoutMs = callbackTimeoutMs;
this.maxIdleMs = maxIdleMs;
this.callbackTimer = (callbackTimeoutMs > 0) ? Executors.newSingleThreadScheduledExecutor() : null;
this.callbackExecutor = new ThrottlingScheduler(maxThrottlingDelayMs);
this.coordinatorRequests = new CoordinatorRequests(runLoopTasks.keySet());
this.latch = new Object();
this.workerTimer = Executors.newSingleThreadScheduledExecutor();
this.clock = clock;
Map<TaskName, AsyncTaskWorker> workers = new HashMap<>();
for (RunLoopTask task : runLoopTasks.values()) {
workers.put(task.taskName(), new AsyncTaskWorker(task));
// Partions and tasks assigned to the container will not change during the run loop life time
this.sspToTaskWorkerMapping = Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(runLoopTasks, workers));
this.taskWorkers = Collections.unmodifiableList(new ArrayList<>(workers.values()));
this.isAsyncCommitEnabled = isAsyncCommitEnabled;
* Returns mapping of the SystemStreamPartition to the AsyncTaskWorkers to efficiently route the envelopes
private static Map<SystemStreamPartition, List<AsyncTaskWorker>> getSspToAsyncTaskWorkerMap(
Map<TaskName, RunLoopTask> runLoopTasks, Map<TaskName, AsyncTaskWorker> taskWorkers) {
Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToWorkerMap = new HashMap<>();
for (RunLoopTask task : runLoopTasks.values()) {
Set<SystemStreamPartition> ssps = task.systemStreamPartitions();
for (SystemStreamPartition ssp : ssps) {
sspToWorkerMap.putIfAbsent(ssp, new ArrayList<>());
return sspToWorkerMap;
* The run loop chooses messages from the SystemConsumers, and run the ready tasks asynchronously.
* Window and commit are run in a thread pool, and they are mutual exclusive with task process.
* The loop thread will block if all tasks are busy, and resume if any task finishes.
public void run() {
try {
for (AsyncTaskWorker taskWorker : taskWorkers) {
long prevNs = clock.nanoTime();
while (!shutdownNow && throwable == null) {
long startNs = clock.nanoTime();
IncomingMessageEnvelope envelope = chooseEnvelope();
long chooseNs = clock.nanoTime();
containerMetrics.chooseNs().update(chooseNs - startNs);
long blockNs = clock.nanoTime();
containerMetrics.blockNs().update(blockNs - chooseNs);
long currentNs = clock.nanoTime();
long activeNs = currentNs - blockNs;
long totalNs = currentNs - prevNs;
prevNs = currentNs;
if (totalNs != 0) {
// totalNs is not 0 if timer metrics are enabled
containerMetrics.utilization().set(((double) activeNs) / totalNs);
* The current semantics of external shutdown request (RunLoop.shutdown()) is loosely defined and run loop doesn't
* wait for inflight messages to complete and triggers shutdown as soon as it notices the shutdown request.
* Hence, it is possible that the exception may or may not propagated based on order of execution
* between process callback and run loop thread.
if (throwable != null) {
log.error("Caught throwable and stopping run loop", throwable);
throw new SamzaException(throwable);
} finally {
if (callbackTimer != null) callbackTimer.shutdown();
public void setWorkFactor(double workFactor) {
public double getWorkFactor() {
return callbackExecutor.getWorkFactor();
public void shutdown() {
shutdownNow = true;
* Chooses an envelope from messageChooser without updating it. This enables flow control
* on the SSP level, meaning the task will not get further messages for the SSP if it cannot
* process it. The chooser is updated only after the callback to process is invoked, then the task
* is able to process more messages. This flow control does not block. so in case of empty message chooser,
* it will return null immediately without blocking, and the chooser will not poll the underlying system
* consumer since there are still messages in the SystemConsumers buffer.
private IncomingMessageEnvelope chooseEnvelope() {
IncomingMessageEnvelope envelope = consumerMultiplexer.choose(false);
if (envelope != null) {
log.trace("Choose envelope ssp {} offset {} for processing",
envelope.getSystemStreamPartition(), envelope.getOffset());
} else {
log.trace("No envelope is available");
return envelope;
* Insert the envelope into the task pending queues and run all the tasks
private void runTasks(IncomingMessageEnvelope envelope) {
if (!shutdownNow) {
if (envelope != null) {
PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope);
for (AsyncTaskWorker worker : sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition())) {
for (AsyncTaskWorker worker: taskWorkers) {;
* Block the runloop thread if all tasks are busy. When a task worker finishes or window/commit completes,
* it will resume the runloop.
* In addition, delay the RunLoop thread for a short time if there are no new messages to process and the run loop
* has not been resumed since the last time this code was run. This will prevent the main thread from spinning when it
* has no work to distribute. If a task worker finishes or window/commit completes before the timeout then resume
* the RunLoop thread immediately. That event may allow a task worker to start processing a message that has already
* been chosen. In any event it should only delay for a short time. It needs to periodically check for new messages.
private void blockIfBusyOrNoNewWork(IncomingMessageEnvelope envelope) {
synchronized (latch) {
// First check to see if we should delay the run loop for a short time. The runLoopResumedSinceLastChecked boolean
// is used to ensure we don't delay if there may already be a task ready to dequeue a previously chosen/pending
// message. It is better to occasionally make one additional loop when there is no work to do then delay the
// runloop when there is work that could be started immediately.
if ((envelope == null) && !runLoopResumedSinceLastChecked) {
try {
log.trace("Start no work wait");
log.trace("End no work wait");
} catch (InterruptedException e) {
throw new SamzaException("Run loop is interrupted", e);
runLoopResumedSinceLastChecked = false;
// Next check to see if we should block if all the tasks are busy.
while (!shutdownNow && throwable == null) {
for (AsyncTaskWorker worker : taskWorkers) {
if (worker.state.isReady()) {
try {
log.trace("Block loop thread");
} catch (InterruptedException e) {
throw new SamzaException("Run loop is interrupted", e);
* Resume the runloop thread. This API is triggered in the following scenarios:
* A. A task becomes ready to process a message.
* B. A task has failed when processing a message.
* C. User thread shuts down the run loop.
private void resume() {
log.trace("Resume loop thread");
if (coordinatorRequests.shouldShutdownNow() && coordinatorRequests.commitRequests().isEmpty()) {
shutdownNow = true;
synchronized (latch) {
runLoopResumedSinceLastChecked = true;
* Set the throwable and abort run loop. The throwable will be thrown from the run loop thread
* @param t throwable
private void abort(Throwable t) {
throwable = t;
* PendingEnvenlope contains an envelope that is not processed by this task, and
* a flag indicating whether it has been processed by any tasks.
private static final class PendingEnvelope {
private final IncomingMessageEnvelope envelope;
private boolean processed = false;
PendingEnvelope(IncomingMessageEnvelope envelope) {
this.envelope = envelope;
* Returns true if the envelope has not been processed.
private boolean markProcessed() {
boolean oldValue = processed;
processed = true;
return !oldValue;
private enum WorkerOp {
* The AsyncTaskWorker encapsulates the states of an {@link org.apache.samza.task.AsyncStreamTask}. If the task becomes ready, it
* will run the task asynchronously. It runs window and commit in the provided thread pool.
private class AsyncTaskWorker implements TaskCallbackListener {
private final RunLoopTask task;
private final TaskCallbackManager callbackManager;
private volatile AsyncTaskState state;
AsyncTaskWorker(RunLoopTask task) {
this.task = task;
this.callbackManager = new TaskCallbackManager(this, callbackTimer, callbackTimeoutMs, maxConcurrency, clock);
Set<SystemStreamPartition> sspSet = getWorkingSSPSet(task);
this.state = new AsyncTaskState(task.taskName(), task.metrics(), sspSet, !task.intermediateStreams().isEmpty());
private void init() {
// schedule the timer for windowing and commiting
if (task.isWindowableTask() && windowMs > 0L) {
workerTimer.scheduleAtFixedRate(new Runnable() {
public void run() {
log.trace("Task {} need window", task.taskName());
}, windowMs, windowMs, TimeUnit.MILLISECONDS);
if (commitMs > 0L) {
workerTimer.scheduleAtFixedRate(new Runnable() {
public void run() {
log.trace("Task {} need commit", task.taskName());
}, commitMs, commitMs, TimeUnit.MILLISECONDS);
final EpochTimeScheduler epochTimeScheduler = task.epochTimeScheduler();
if (epochTimeScheduler != null) {
epochTimeScheduler.registerListener(() -> {
* Returns those partitions for the task for which we have not received end-of-stream from the consumer.
* @param task
* @return a Set of SSPs such that all SSPs are not at end of stream.
private Set<SystemStreamPartition> getWorkingSSPSet(RunLoopTask task) {
Set<SystemStreamPartition> allPartitions = task.systemStreamPartitions();
// filter only those SSPs that are not at end of stream.
Set<SystemStreamPartition> workingSSPSet =
.filter(ssp -> !consumerMultiplexer.isEndOfStream(ssp))
return workingSSPSet;
* Invoke next task operation based on its state
private void run() {
switch (state.nextOp()) {
case WINDOW:
case COMMIT:
//no op
private void endOfStream() {
state.complete = true;
try {
ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName());
// issue a request for shutdown of the task
// invoke commit on the task - if the endOfStream callback had requested a final commit.
boolean needFinalCommit = coordinatorRequests.commitRequests().remove(task.taskName());
if (needFinalCommit) {
} finally {
* Process asynchronously. The callback needs to be fired once the processing is done.
private void process() {
final IncomingMessageEnvelope envelope = state.fetchEnvelope();
log.trace("Process ssp {} offset {}", envelope.getSystemStreamPartition(), envelope.getOffset());
final ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName());
TaskCallbackFactory callbackFactory = new TaskCallbackFactory() {
public TaskCallback createCallback() {
return callbackManager.createCallback(task.taskName(), envelope, coordinator);
task.process(envelope, coordinator, callbackFactory);
* Invoke window. Run window in thread pool if not the single thread mode.
private void window() {
Runnable windowWorker = new Runnable() {
public void run() {
try {;
ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName());
long startTime = clock.nanoTime();
containerMetrics.windowNs().update(clock.nanoTime() - startTime);
* Window calls that execute for more than will starve process calls
* since window has higher priority than process in {@link AsyncTaskState#nextOp()}.
* Warn the users if this is the case.
long averageWindowMs = TimeUnit.NANOSECONDS.toMillis(
(long) containerMetrics.windowNs().getSnapshot().getAverage());
if (averageWindowMs >= windowMs) {
log.warn("Average window call duration {} is greater than the configured {}. " +
"This can starve process calls, so consider setting >> {} ms.",
new Object[]{averageWindowMs, windowMs, averageWindowMs});
} catch (Throwable t) {
log.error("Task {} window failed", task.taskName(), t);
} finally {
log.trace("Task {} window completed", task.taskName());
if (threadPool != null) {
log.trace("Task {} window on the thread pool", task.taskName());
} else {
log.trace("Task {} window on the run loop thread", task.taskName());;
* Invoke commit. Run commit in thread pool if not the single thread mode.
private void commit() {
Runnable commitWorker = new Runnable() {
public void run() {
try {
long startTime = clock.nanoTime();
containerMetrics.commitNs().update(clock.nanoTime() - startTime);
} catch (Throwable t) {
log.error("Task {} commit failed", task.taskName(), t);
} finally {
log.trace("Task {} commit completed", task.taskName());
if (threadPool != null) {
log.trace("Task {} commits on the thread pool", task.taskName());
} else {
log.trace("Task {} commits on the run loop thread", task.taskName());;
private void scheduler() {
Runnable timerWorker = new Runnable() {
public void run() {
try {
ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName());
long startTime = clock.nanoTime();
containerMetrics.timerNs().update(clock.nanoTime() - startTime);
} catch (Throwable t) {
log.error("Task {} scheduler failed", task.taskName(), t);
} finally {
log.trace("Task {} scheduler completed", task.taskName());
if (threadPool != null) {
log.trace("Task {} scheduler runs on the thread pool", task.taskName());
} else {
log.trace("Task {} scheduler runs on the run loop thread", task.taskName());;
* Task process completes successfully, update the offsets based on the high-water mark.
* Then it will trigger the listener for task state change.
* * @param callback AsyncSteamTask.processAsync callback
public void onComplete(final TaskCallback callback) {
long workNanos = clock.nanoTime() - ((TaskCallbackImpl) callback).getTimeCreatedNs();
callbackExecutor.schedule(new Runnable() {
public void run() {
try {
TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
containerMetrics.processNs().update(clock.nanoTime() - callbackImpl.getTimeCreatedNs());
log.trace("Got callback complete for task {}, ssp {}",
callbackImpl.getTaskName(), callbackImpl.getSystemStreamPartition());
List<TaskCallbackImpl> callbacksToUpdate = callbackManager.updateCallback(callbackImpl);
for (TaskCallbackImpl callbackToUpdate : callbacksToUpdate) {
log.trace("Update offset for ssp {}, offset {}", callbackToUpdate.getSystemStreamPartition(),
// update offset
if (task.offsetManager() != null) {
task.offsetManager().update(task.taskName(), callbackToUpdate.getSystemStreamPartition(),
// update coordinator
} catch (Throwable t) {
log.error("Error marking process as complete.", t);
} finally {
}, workNanos);
* Task process fails. Trigger the listener indicating failure.
* @param callback AsyncSteamTask.processAsync callback
* @param t throwable of the failure
public void onFailure(TaskCallback callback, Throwable t) {
try {
// set the exception code ahead of marking the message as processed to make sure the exception
// is visible to the run loop thread promptly. Refer SAMZA-2510 for more details.
// update pending count, but not offset
TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
log.error("Got callback failure for task {}", callbackImpl.getTaskName(), t);
} catch (Throwable e) {
log.error("Error marking process as failed.", e);
} finally {
* AsyncTaskState manages the state of the AsyncStreamTask. In summary, a worker has the following states:
* ready - ready for window, commit or process next incoming message.
* busy - doing window, commit or not able to process next message.
* idle - no pending messages, and no window/commit
private final class AsyncTaskState {
private volatile boolean needWindow = false;
private volatile boolean needCommit = false;
private volatile boolean needScheduler = false;
private volatile boolean complete = false;
private volatile boolean endOfStream = false;
private volatile boolean windowInFlight = false;
private volatile boolean commitInFlight = false;
private volatile boolean schedulerInFlight = false;
private final AtomicInteger messagesInFlight = new AtomicInteger(0);
private final ArrayDeque<PendingEnvelope> pendingEnvelopeQueue;
//Set of SSPs that we are currently processing for this task instance
private final Set<SystemStreamPartition> processingSspSet;
private final TaskName taskName;
private final TaskInstanceMetrics taskMetrics;
private final boolean hasIntermediateStreams;
AsyncTaskState(TaskName taskName, TaskInstanceMetrics taskMetrics, Set<SystemStreamPartition> sspSet, boolean hasIntermediateStreams) {
this.taskName = taskName;
this.taskMetrics = taskMetrics;
this.pendingEnvelopeQueue = new ArrayDeque<>();
this.processingSspSet = sspSet;
this.hasIntermediateStreams = hasIntermediateStreams;
private boolean checkEndOfStream() {
if (pendingEnvelopeQueue.size() == 1) {
PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
if (envelope.isEndOfStream()) {
SystemStreamPartition ssp = envelope.getSystemStreamPartition();
if (!hasIntermediateStreams) {
return processingSspSet.isEmpty();
* Returns whether the task is ready to do process/window/commit.
private boolean isReady() {
if (checkEndOfStream()) {
endOfStream = true;
if (coordinatorRequests.commitRequests().remove(taskName)) {
needCommit = true;
boolean opInFlight = windowInFlight || commitInFlight || schedulerInFlight;
* A task is ready to commit, when task.commit(needCommit) is requested either by user or commit thread
* and either of the following conditions are true.
* a) When process, window, commit and scheduler are not in progress.
* b) When task.async.commit is true and window, commit are not in progress.
if (needCommit) {
return (messagesInFlight.get() == 0 || isAsyncCommitEnabled) && !opInFlight;
} else if (needWindow || needScheduler || endOfStream) {
* A task is ready for window, scheduler or end-of-stream operation.
return messagesInFlight.get() == 0 && !opInFlight;
} else {
* A task is ready to process new message, when number of task.process calls in progress < task.max.concurrency
* and either of the following conditions are true.
* a) When window, commit and scheduler are not in progress.
* b) When task.async.commit is true and window and scheduler are not in progress.
return messagesInFlight.get() < maxConcurrency && !windowInFlight && !schedulerInFlight && (isAsyncCommitEnabled || !commitInFlight);
* Returns the next operation by this taskWorker
private WorkerOp nextOp() {
if (complete) return WorkerOp.NO_OP;
if (isReady()) {
if (needCommit) return WorkerOp.COMMIT;
else if (needWindow) return WorkerOp.WINDOW;
else if (needScheduler) return WorkerOp.SCHEDULER;
else if (endOfStream && pendingEnvelopeQueue.isEmpty()) return WorkerOp.END_OF_STREAM;
else if (!pendingEnvelopeQueue.isEmpty()) return WorkerOp.PROCESS;
return WorkerOp.NO_OP;
private void needWindow() {
needWindow = true;
private void needCommit() {
needCommit = true;
private void needScheduler() {
needScheduler = true;
private void startWindow() {
needWindow = false;
windowInFlight = true;
private void startCommit() {
needCommit = false;
commitInFlight = true;
private void startProcess() {
int count = messagesInFlight.incrementAndGet();
private void startScheduler() {
needScheduler = false;
schedulerInFlight = true;
private void doneCommit() {
commitInFlight = false;
private void doneWindow() {
windowInFlight = false;
private void doneProcess() {
int count = messagesInFlight.decrementAndGet();
private void doneScheduler() {
schedulerInFlight = false;
* Insert an PendingEnvelope into the pending envelope queue.
* The function will be called in the run loop thread so no synchronization.
* @param pendingEnvelope
private void insertEnvelope(PendingEnvelope pendingEnvelope) {
int queueSize = pendingEnvelopeQueue.size();
log.trace("Insert envelope to task {} queue.", taskName);
log.debug("Task {} pending envelope count is {} after insertion.", taskName, queueSize);
* Fetch the pending envelope in the pending queue for the task to process.
* Update the chooser for flow control on the SSP level. Once it's updated, the RunLoop
* will be able to choose new messages from this SSP for the task to process. Note that we
* update only when the envelope is first time being processed. This solves the issue in
* Broadcast stream where a message need to be processed by multiple tasks. In that case,
* the envelope will be in the pendingEnvelopeQueue of each task. Only the first fetch updates
* the chooser with the next envelope in the broadcast stream partition.
* The function will be called in the run loop thread so no synchronization.
* @return
private IncomingMessageEnvelope fetchEnvelope() {
PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.remove();
int queueSize = pendingEnvelopeQueue.size();
log.trace("fetch envelope ssp {} offset {} to process.",
pendingEnvelope.envelope.getSystemStreamPartition(), pendingEnvelope.envelope.getOffset());
log.debug("Task {} pending envelopes count is {} after fetching.", taskName, queueSize);
if (pendingEnvelope.markProcessed()) {
SystemStreamPartition partition = pendingEnvelope.envelope.getSystemStreamPartition();
log.debug("Update chooser for {}", partition);
return pendingEnvelope.envelope;