blob: 2d49de7402ca45cc6bafb5797befc11f8bfe74cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.task;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.samza.SamzaException;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.util.HighResolutionClock;
import org.apache.samza.util.ThreadUtil;
/**
* TaskCallbackManager manages the life cycle of {@link AsyncStreamTask} callbacks,
* including creation, update and status. Internally it maintains a PriorityQueue
* for the callbacks based on the sequence number, and updates the offsets for checkpointing
* by always moving forward to the latest contiguous callback (uses the high watermark).
*/
public class TaskCallbackManager {
private static final class TaskCallbacks {
private final Queue<TaskCallbackImpl> callbacks = new PriorityQueue<>();
private final Object lock = new Object();
private long nextSeqNum = 0L;
/**
* Adding the newly complete callback to the callback queue
* Move the queue to the last contiguous callback to commit offset
* @param cb new callback completed
* @return list of callbacks to be committed
*/
List<TaskCallbackImpl> update(TaskCallbackImpl cb) {
synchronized (lock) {
callbacks.add(cb);
List<TaskCallbackImpl> callbacksToUpdate = new ArrayList<>();
// look for the last contiguous callback
while (!callbacks.isEmpty() && callbacks.peek().matchSeqNum(nextSeqNum)) {
++nextSeqNum;
TaskCallbackImpl callback = callbacks.poll();
callbacksToUpdate.add(callback);
if (callback.coordinator.commitRequest().isDefined()) {
break;
}
}
return callbacksToUpdate;
}
}
}
private long seqNum = 0L;
private final TaskCallbacks completedCallbacks = new TaskCallbacks();
private final ScheduledExecutorService timer;
private final TaskCallbackListener listener;
private final long timeout;
private final int maxConcurrency;
private final HighResolutionClock clock;
public TaskCallbackManager(TaskCallbackListener listener,
ScheduledExecutorService timer,
long timeout,
int maxConcurrency,
HighResolutionClock clock) {
this.listener = listener;
this.timer = timer;
this.timeout = timeout;
this.maxConcurrency = maxConcurrency;
this.clock = clock;
}
public TaskCallbackImpl createCallback(TaskName taskName,
IncomingMessageEnvelope envelope,
ReadableCoordinator coordinator) {
final TaskCallbackImpl callback =
new TaskCallbackImpl(listener, taskName, envelope, coordinator, seqNum++, clock.nanoTime());
if (timer != null) {
Runnable timerTask = new Runnable() {
@Override
public void run() {
ThreadUtil.logThreadDump("Thread dump at task callback timeout");
String msg = "Callback for task {} " + callback.taskName + " timed out after " + timeout + " ms.";
callback.failure(new SamzaException(msg));
}
};
ScheduledFuture scheduledFuture = timer.schedule(timerTask, timeout, TimeUnit.MILLISECONDS);
callback.setScheduledFuture(scheduledFuture);
}
return callback;
}
/**
* Update the task callbacks with the new callback completed.
* It uses a high-watermark model to roll the callbacks for checkpointing.
* @param callback new completed callback
* @return the list of callbacks for checkpointing
*/
public List<TaskCallbackImpl> updateCallback(TaskCallbackImpl callback) {
if (maxConcurrency > 1) {
// Use the completedCallbacks queue to handle the out-of-order case when max concurrency is larger than 1
return completedCallbacks.update(callback);
} else {
return ImmutableList.of(callback);
}
}
}