blob: eba6c814c56ff2a8c4739af704a36fb806043238 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.singleton;
public class TaskManager {
// initialize the task list
// activeTasks needs to be concurrent as it can be accessed
// by QueryableState
private final Logger log;
private final UUID processId;
private final AssignedStreamsTasks active;
private final AssignedStandbyTasks standby;
private final ChangelogReader changelogReader;
private final String logPrefix;
private final Consumer<byte[], byte[]> restoreConsumer;
private final StreamThread.AbstractTaskCreator<StreamTask> taskCreator;
private final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
private final StreamsMetadataState streamsMetadataState;
final AdminClient adminClient;
private DeleteRecordsResult deleteRecordsResult;
// following information is updated during rebalance phase by the partition assignor
private Cluster cluster;
private Map<TaskId, Set<TopicPartition>> assignedActiveTasks;
private Map<TaskId, Set<TopicPartition>> assignedStandbyTasks;
private Consumer<byte[], byte[]> consumer;
TaskManager(final ChangelogReader changelogReader,
final UUID processId,
final String logPrefix,
final Consumer<byte[], byte[]> restoreConsumer,
final StreamsMetadataState streamsMetadataState,
final StreamThread.AbstractTaskCreator<StreamTask> taskCreator,
final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator,
final AdminClient adminClient,
final AssignedStreamsTasks active,
final AssignedStandbyTasks standby) {
this.changelogReader = changelogReader;
this.processId = processId;
this.logPrefix = logPrefix;
this.streamsMetadataState = streamsMetadataState;
this.restoreConsumer = restoreConsumer;
this.taskCreator = taskCreator;
this.standbyTaskCreator = standbyTaskCreator; = active;
this.standby = standby;
final LogContext logContext = new LogContext(logPrefix);
this.log = logContext.logger(getClass());
this.adminClient = adminClient;
void createTasks(final Collection<TopicPartition> assignment) {
if (consumer == null) {
throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen.");
// do this first as we may have suspended standby tasks that
// will become active or vice versa
// Pause all the new partitions until the underlying state store is ready for all the active tasks.
private void addStreamTasks(final Collection<TopicPartition> assignment) {
if (assignedActiveTasks.isEmpty()) {
final Map<TaskId, Set<TopicPartition>> newTasks = new HashMap<>();
// collect newly assigned tasks and reopen re-assigned tasks
log.debug("Adding assigned tasks as active: {}", assignedActiveTasks);
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : assignedActiveTasks.entrySet()) {
final TaskId taskId = entry.getKey();
final Set<TopicPartition> partitions = entry.getValue();
if (assignment.containsAll(partitions)) {
try {
if (!active.maybeResumeSuspendedTask(taskId, partitions)) {
newTasks.put(taskId, partitions);
} catch (final StreamsException e) {
log.error("Failed to resume an active task {} due to the following error:", taskId, e);
throw e;
} else {
log.warn("Task {} owned partitions {} are not contained in the assignment {}", taskId, partitions, assignment);
if (newTasks.isEmpty()) {
// create all newly assigned tasks (guard against race condition with other thread via backoff and retry)
// -> other thread will call removeSuspendedTasks(); eventually
log.trace("New active tasks to be created: {}", newTasks);
for (final StreamTask task : taskCreator.createTasks(consumer, newTasks)) {
private void addStandbyTasks() {
final Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = this.assignedStandbyTasks;
if (assignedStandbyTasks.isEmpty()) {
log.debug("Adding assigned standby tasks {}", assignedStandbyTasks);
final Map<TaskId, Set<TopicPartition>> newStandbyTasks = new HashMap<>();
// collect newly assigned standby tasks and reopen re-assigned standby tasks
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : assignedStandbyTasks.entrySet()) {
final TaskId taskId = entry.getKey();
final Set<TopicPartition> partitions = entry.getValue();
if (!standby.maybeResumeSuspendedTask(taskId, partitions)) {
newStandbyTasks.put(taskId, partitions);
if (newStandbyTasks.isEmpty()) {
// create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry)
// -> other thread will call removeSuspendedStandbyTasks(); eventually
log.trace("New standby tasks to be created: {}", newStandbyTasks);
for (final StandbyTask task : standbyTaskCreator.createTasks(consumer, newStandbyTasks)) {
Set<TaskId> activeTaskIds() {
return active.allAssignedTaskIds();
Set<TaskId> standbyTaskIds() {
return standby.allAssignedTaskIds();
public Set<TaskId> prevActiveTaskIds() {
return active.previousTaskIds();
* Returns ids of tasks whose states are kept on the local storage.
public Set<TaskId> cachedTasksIds() {
// A client could contain some inactive tasks whose states are still kept on the local storage in the following scenarios:
// 1) the client is actively maintaining standby tasks by maintaining their states from the change log.
// 2) the client has just got some tasks migrated out of itself to other clients while these task states
// have not been cleaned up yet (this can happen in a rolling bounce upgrade, for example).
final HashSet<TaskId> tasks = new HashSet<>();
final File[] stateDirs = taskCreator.stateDirectory().listTaskDirectories();
if (stateDirs != null) {
for (final File dir : stateDirs) {
try {
final TaskId id = TaskId.parse(dir.getName());
// if the checkpoint file exists, the state is valid.
if (new File(dir, StateManagerUtil.CHECKPOINT_FILE_NAME).exists()) {
} catch (final TaskIdFormatException e) {
// there may be some unknown files that sits in the same directory,
// we should ignore these files instead trying to delete them as well
return tasks;
public UUID processId() {
return processId;
InternalTopologyBuilder builder() {
return taskCreator.builder();
* Similar to shutdownTasksAndState, however does not close the task managers, in the hope that
* soon the tasks will be assigned again
* @throws TaskMigratedException if the task producer got fenced (EOS only)
void suspendTasksAndState() {
log.debug("Suspending all active tasks {} and standby tasks {}", active.runningTaskIds(), standby.runningTaskIds());
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
firstException.compareAndSet(null, active.suspend());
// close all restoring tasks as well and then reset changelog reader;
// for those restoring and still assigned tasks, they will be re-created
// in addStreamTasks.
firstException.compareAndSet(null, active.closeAllRestoringTasks());
firstException.compareAndSet(null, standby.suspend());
// remove the changelog partitions from restore consumer
final Exception exception = firstException.get();
if (exception != null) {
throw new StreamsException(logPrefix + "failed to suspend stream tasks", exception);
void shutdown(final boolean clean) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
log.debug("Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", active.runningTaskIds(), standby.runningTaskIds(),
active.previousTaskIds(), standby.previousTaskIds());
try {
} catch (final RuntimeException fatalException) {
firstException.compareAndSet(null, fatalException);
// remove the changelog partitions from restore consumer
try {
} catch (final RuntimeException fatalException) {
firstException.compareAndSet(null, fatalException);
final RuntimeException fatalException = firstException.get();
if (fatalException != null) {
throw fatalException;
AdminClient getAdminClient() {
return adminClient;
Set<TaskId> suspendedActiveTaskIds() {
return active.previousTaskIds();
Set<TaskId> suspendedStandbyTaskIds() {
return standby.previousTaskIds();
StreamTask activeTask(final TopicPartition partition) {
return active.runningTaskFor(partition);
StandbyTask standbyTask(final TopicPartition partition) {
return standby.runningTaskFor(partition);
Map<TaskId, StreamTask> activeTasks() {
return active.runningTaskMap();
Map<TaskId, StandbyTask> standbyTasks() {
return standby.runningTaskMap();
void setConsumer(final Consumer<byte[], byte[]> consumer) {
this.consumer = consumer;
void pausePartitions() {
log.trace("Pausing partitions: {}", consumer.assignment());
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
boolean updateNewAndRestoringTasks() {
final Collection<TopicPartition> restored = changelogReader.restore(active);
if (active.allTasksRunning()) {
final Set<TopicPartition> assignment = consumer.assignment();
log.trace("Resuming partitions {}", assignment);
return standby.allTasksRunning();
return false;
boolean hasActiveRunningTasks() {
return active.hasRunningTasks();
boolean hasStandbyRunningTasks() {
return standby.hasRunningTasks();
private void assignStandbyPartitions() {
final Collection<StandbyTask> running = standby.running();
final Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
for (final StandbyTask standbyTask : running) {
for (final Map.Entry<TopicPartition, Long> entry : checkpointedOffsets.entrySet()) {
final TopicPartition partition = entry.getKey();
final long offset = entry.getValue();
if (offset >= 0) {, offset);
} else {
public void setClusterMetadata(final Cluster cluster) {
this.cluster = cluster;
public void setPartitionsByHostState(final Map<HostInfo, Set<TopicPartition>> partitionsByHostState) {
this.streamsMetadataState.onChange(partitionsByHostState, cluster);
public void setAssignmentMetadata(final Map<TaskId, Set<TopicPartition>> activeTasks,
final Map<TaskId, Set<TopicPartition>> standbyTasks) {
this.assignedActiveTasks = activeTasks;
this.assignedStandbyTasks = standbyTasks;
public void updateSubscriptionsFromAssignment(final List<TopicPartition> partitions) {
if (builder().sourceTopicPattern() != null) {
final Set<String> assignedTopics = new HashSet<>();
for (final TopicPartition topicPartition : partitions) {
final Collection<String> existingTopics = builder().subscriptionUpdates().getUpdates();
if (!existingTopics.containsAll(assignedTopics)) {
builder().updateSubscribedTopics(assignedTopics, logPrefix);
public void updateSubscriptionsFromMetadata(final Set<String> topics) {
if (builder().sourceTopicPattern() != null) {
final Collection<String> existingTopics = builder().subscriptionUpdates().getUpdates();
if (!existingTopics.equals(topics)) {
builder().updateSubscribedTopics(topics, logPrefix);
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
int commitAll() {
final int committed = active.commit();
return committed + standby.commit();
* @throws TaskMigratedException if the task producer got fenced (EOS only)
int process(final long now) {
return active.process(now);
* @throws TaskMigratedException if the task producer got fenced (EOS only)
int punctuate() {
return active.punctuate();
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
int maybeCommitActiveTasksPerUserRequested() {
return active.maybeCommitPerUserRequested();
void maybePurgeCommitedRecords() {
// we do not check any possible exceptions since none of them are fatal
// that should cause the application to fail, and we will try delete with
// newer offsets anyways.
if (deleteRecordsResult == null || deleteRecordsResult.all().isDone()) {
if (deleteRecordsResult != null && deleteRecordsResult.all().isCompletedExceptionally()) {
log.debug("Previous delete-records request has failed: {}. Try sending the new request now", deleteRecordsResult.lowWatermarks());
final Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
for (final Map.Entry<TopicPartition, Long> entry : active.recordsToDelete().entrySet()) {
recordsToDelete.put(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()));
if (!recordsToDelete.isEmpty()) {
deleteRecordsResult = adminClient.deleteRecords(recordsToDelete);
log.trace("Sent delete-records request: {}", recordsToDelete);
* Produces a string representation containing useful information about the TaskManager.
* This is useful in debugging scenarios.
* @return A string representation of the TaskManager instance.
public String toString() {
return toString("");
public String toString(final String indent) {
final StringBuilder builder = new StringBuilder();
builder.append(streamsMetadataState.toString(indent + "\t\t"));
builder.append(indent).append("\tActive tasks:\n");
builder.append(active.toString(indent + "\t\t"));
builder.append(indent).append("\tStandby tasks:\n");
builder.append(standby.toString(indent + "\t\t"));
return builder.toString();
// the following functions are for testing only
Map<TaskId, Set<TopicPartition>> assignedActiveTasks() {
return assignedActiveTasks;
Map<TaskId, Set<TopicPartition>> assignedStandbyTasks() {
return assignedStandbyTasks;