| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.kafka.streams.processor.internals; |
| |
| import org.apache.kafka.clients.admin.AdminClient; |
| import org.apache.kafka.clients.admin.DeleteRecordsResult; |
| import org.apache.kafka.clients.admin.RecordsToDelete; |
| import org.apache.kafka.clients.consumer.Consumer; |
| import org.apache.kafka.common.Cluster; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.utils.LogContext; |
| import org.apache.kafka.streams.errors.StreamsException; |
| import org.apache.kafka.streams.errors.TaskIdFormatException; |
| import org.apache.kafka.streams.errors.TaskMigratedException; |
| import org.apache.kafka.streams.processor.TaskId; |
| import org.apache.kafka.streams.state.HostInfo; |
| import org.slf4j.Logger; |
| |
| import java.io.File; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import static java.util.Collections.singleton; |
| |
| public class TaskManager { |
| // initialize the task list |
| // activeTasks needs to be concurrent as it can be accessed |
| // by QueryableState |
| private final Logger log; |
| private final UUID processId; |
| private final AssignedStreamsTasks active; |
| private final AssignedStandbyTasks standby; |
| private final ChangelogReader changelogReader; |
| private final String logPrefix; |
| private final Consumer<byte[], byte[]> restoreConsumer; |
| private final StreamThread.AbstractTaskCreator<StreamTask> taskCreator; |
| private final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator; |
| private final StreamsMetadataState streamsMetadataState; |
| |
| final AdminClient adminClient; |
| private DeleteRecordsResult deleteRecordsResult; |
| |
| // following information is updated during rebalance phase by the partition assignor |
| private Cluster cluster; |
| private Map<TaskId, Set<TopicPartition>> assignedActiveTasks; |
| private Map<TaskId, Set<TopicPartition>> assignedStandbyTasks; |
| |
| private Consumer<byte[], byte[]> consumer; |
| |
| TaskManager(final ChangelogReader changelogReader, |
| final UUID processId, |
| final String logPrefix, |
| final Consumer<byte[], byte[]> restoreConsumer, |
| final StreamsMetadataState streamsMetadataState, |
| final StreamThread.AbstractTaskCreator<StreamTask> taskCreator, |
| final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator, |
| final AdminClient adminClient, |
| final AssignedStreamsTasks active, |
| final AssignedStandbyTasks standby) { |
| this.changelogReader = changelogReader; |
| this.processId = processId; |
| this.logPrefix = logPrefix; |
| this.streamsMetadataState = streamsMetadataState; |
| this.restoreConsumer = restoreConsumer; |
| this.taskCreator = taskCreator; |
| this.standbyTaskCreator = standbyTaskCreator; |
| this.active = active; |
| this.standby = standby; |
| |
| final LogContext logContext = new LogContext(logPrefix); |
| |
| this.log = logContext.logger(getClass()); |
| |
| this.adminClient = adminClient; |
| } |
| |
| void createTasks(final Collection<TopicPartition> assignment) { |
| if (consumer == null) { |
| throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen."); |
| } |
| |
| // do this first as we may have suspended standby tasks that |
| // will become active or vice versa |
| standby.closeNonAssignedSuspendedTasks(assignedStandbyTasks); |
| active.closeNonAssignedSuspendedTasks(assignedActiveTasks); |
| |
| addStreamTasks(assignment); |
| addStandbyTasks(); |
| |
| // Pause all the new partitions until the underlying state store is ready for all the active tasks. |
| pausePartitions(); |
| } |
| |
| private void addStreamTasks(final Collection<TopicPartition> assignment) { |
| if (assignedActiveTasks.isEmpty()) { |
| return; |
| } |
| final Map<TaskId, Set<TopicPartition>> newTasks = new HashMap<>(); |
| // collect newly assigned tasks and reopen re-assigned tasks |
| log.debug("Adding assigned tasks as active: {}", assignedActiveTasks); |
| for (final Map.Entry<TaskId, Set<TopicPartition>> entry : assignedActiveTasks.entrySet()) { |
| final TaskId taskId = entry.getKey(); |
| final Set<TopicPartition> partitions = entry.getValue(); |
| |
| if (assignment.containsAll(partitions)) { |
| try { |
| if (!active.maybeResumeSuspendedTask(taskId, partitions)) { |
| newTasks.put(taskId, partitions); |
| } |
| } catch (final StreamsException e) { |
| log.error("Failed to resume an active task {} due to the following error:", taskId, e); |
| throw e; |
| } |
| } else { |
| log.warn("Task {} owned partitions {} are not contained in the assignment {}", taskId, partitions, assignment); |
| } |
| } |
| |
| if (newTasks.isEmpty()) { |
| return; |
| } |
| |
| // CANNOT FIND RETRY AND BACKOFF LOGIC |
| // create all newly assigned tasks (guard against race condition with other thread via backoff and retry) |
| // -> other thread will call removeSuspendedTasks(); eventually |
| log.trace("New active tasks to be created: {}", newTasks); |
| |
| for (final StreamTask task : taskCreator.createTasks(consumer, newTasks)) { |
| active.addNewTask(task); |
| } |
| } |
| |
| private void addStandbyTasks() { |
| final Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = this.assignedStandbyTasks; |
| if (assignedStandbyTasks.isEmpty()) { |
| return; |
| } |
| log.debug("Adding assigned standby tasks {}", assignedStandbyTasks); |
| final Map<TaskId, Set<TopicPartition>> newStandbyTasks = new HashMap<>(); |
| // collect newly assigned standby tasks and reopen re-assigned standby tasks |
| for (final Map.Entry<TaskId, Set<TopicPartition>> entry : assignedStandbyTasks.entrySet()) { |
| final TaskId taskId = entry.getKey(); |
| final Set<TopicPartition> partitions = entry.getValue(); |
| if (!standby.maybeResumeSuspendedTask(taskId, partitions)) { |
| newStandbyTasks.put(taskId, partitions); |
| } |
| } |
| |
| if (newStandbyTasks.isEmpty()) { |
| return; |
| } |
| |
| // create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry) |
| // -> other thread will call removeSuspendedStandbyTasks(); eventually |
| log.trace("New standby tasks to be created: {}", newStandbyTasks); |
| |
| for (final StandbyTask task : standbyTaskCreator.createTasks(consumer, newStandbyTasks)) { |
| standby.addNewTask(task); |
| } |
| } |
| |
| Set<TaskId> activeTaskIds() { |
| return active.allAssignedTaskIds(); |
| } |
| |
| Set<TaskId> standbyTaskIds() { |
| return standby.allAssignedTaskIds(); |
| } |
| |
| public Set<TaskId> prevActiveTaskIds() { |
| return active.previousTaskIds(); |
| } |
| |
| /** |
| * Returns ids of tasks whose states are kept on the local storage. |
| */ |
| public Set<TaskId> cachedTasksIds() { |
| // A client could contain some inactive tasks whose states are still kept on the local storage in the following scenarios: |
| // 1) the client is actively maintaining standby tasks by maintaining their states from the change log. |
| // 2) the client has just got some tasks migrated out of itself to other clients while these task states |
| // have not been cleaned up yet (this can happen in a rolling bounce upgrade, for example). |
| |
| final HashSet<TaskId> tasks = new HashSet<>(); |
| |
| final File[] stateDirs = taskCreator.stateDirectory().listTaskDirectories(); |
| if (stateDirs != null) { |
| for (final File dir : stateDirs) { |
| try { |
| final TaskId id = TaskId.parse(dir.getName()); |
| // if the checkpoint file exists, the state is valid. |
| if (new File(dir, StateManagerUtil.CHECKPOINT_FILE_NAME).exists()) { |
| tasks.add(id); |
| } |
| } catch (final TaskIdFormatException e) { |
| // there may be some unknown files that sits in the same directory, |
| // we should ignore these files instead trying to delete them as well |
| } |
| } |
| } |
| |
| return tasks; |
| } |
| |
| public UUID processId() { |
| return processId; |
| } |
| |
| InternalTopologyBuilder builder() { |
| return taskCreator.builder(); |
| } |
| |
| /** |
| * Similar to shutdownTasksAndState, however does not close the task managers, in the hope that |
| * soon the tasks will be assigned again |
| * @throws TaskMigratedException if the task producer got fenced (EOS only) |
| */ |
| void suspendTasksAndState() { |
| log.debug("Suspending all active tasks {} and standby tasks {}", active.runningTaskIds(), standby.runningTaskIds()); |
| |
| final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); |
| |
| firstException.compareAndSet(null, active.suspend()); |
| // close all restoring tasks as well and then reset changelog reader; |
| // for those restoring and still assigned tasks, they will be re-created |
| // in addStreamTasks. |
| firstException.compareAndSet(null, active.closeAllRestoringTasks()); |
| changelogReader.reset(); |
| |
| firstException.compareAndSet(null, standby.suspend()); |
| |
| // remove the changelog partitions from restore consumer |
| restoreConsumer.unsubscribe(); |
| |
| final Exception exception = firstException.get(); |
| if (exception != null) { |
| throw new StreamsException(logPrefix + "failed to suspend stream tasks", exception); |
| } |
| } |
| |
| void shutdown(final boolean clean) { |
| final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); |
| |
| log.debug("Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", active.runningTaskIds(), standby.runningTaskIds(), |
| active.previousTaskIds(), standby.previousTaskIds()); |
| |
| try { |
| active.close(clean); |
| } catch (final RuntimeException fatalException) { |
| firstException.compareAndSet(null, fatalException); |
| } |
| standby.close(clean); |
| |
| // remove the changelog partitions from restore consumer |
| try { |
| restoreConsumer.unsubscribe(); |
| } catch (final RuntimeException fatalException) { |
| firstException.compareAndSet(null, fatalException); |
| } |
| taskCreator.close(); |
| standbyTaskCreator.close(); |
| |
| final RuntimeException fatalException = firstException.get(); |
| if (fatalException != null) { |
| throw fatalException; |
| } |
| } |
| |
| AdminClient getAdminClient() { |
| return adminClient; |
| } |
| |
| Set<TaskId> suspendedActiveTaskIds() { |
| return active.previousTaskIds(); |
| } |
| |
| Set<TaskId> suspendedStandbyTaskIds() { |
| return standby.previousTaskIds(); |
| } |
| |
| StreamTask activeTask(final TopicPartition partition) { |
| return active.runningTaskFor(partition); |
| } |
| |
| StandbyTask standbyTask(final TopicPartition partition) { |
| return standby.runningTaskFor(partition); |
| } |
| |
| Map<TaskId, StreamTask> activeTasks() { |
| return active.runningTaskMap(); |
| } |
| |
| Map<TaskId, StandbyTask> standbyTasks() { |
| return standby.runningTaskMap(); |
| } |
| |
| void setConsumer(final Consumer<byte[], byte[]> consumer) { |
| this.consumer = consumer; |
| } |
| |
| void pausePartitions() { |
| log.trace("Pausing partitions: {}", consumer.assignment()); |
| consumer.pause(consumer.assignment()); |
| } |
| |
| /** |
| * @throws IllegalStateException If store gets registered after initialized is already finished |
| * @throws StreamsException if the store's change log does not contain the partition |
| */ |
| boolean updateNewAndRestoringTasks() { |
| active.initializeNewTasks(); |
| standby.initializeNewTasks(); |
| |
| final Collection<TopicPartition> restored = changelogReader.restore(active); |
| |
| active.updateRestored(restored); |
| |
| if (active.allTasksRunning()) { |
| final Set<TopicPartition> assignment = consumer.assignment(); |
| log.trace("Resuming partitions {}", assignment); |
| consumer.resume(assignment); |
| assignStandbyPartitions(); |
| return standby.allTasksRunning(); |
| } |
| return false; |
| } |
| |
| boolean hasActiveRunningTasks() { |
| return active.hasRunningTasks(); |
| } |
| |
| boolean hasStandbyRunningTasks() { |
| return standby.hasRunningTasks(); |
| } |
| |
| private void assignStandbyPartitions() { |
| final Collection<StandbyTask> running = standby.running(); |
| final Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>(); |
| for (final StandbyTask standbyTask : running) { |
| checkpointedOffsets.putAll(standbyTask.checkpointedOffsets()); |
| } |
| |
| restoreConsumer.assign(checkpointedOffsets.keySet()); |
| for (final Map.Entry<TopicPartition, Long> entry : checkpointedOffsets.entrySet()) { |
| final TopicPartition partition = entry.getKey(); |
| final long offset = entry.getValue(); |
| if (offset >= 0) { |
| restoreConsumer.seek(partition, offset); |
| } else { |
| restoreConsumer.seekToBeginning(singleton(partition)); |
| } |
| } |
| } |
| |
| public void setClusterMetadata(final Cluster cluster) { |
| this.cluster = cluster; |
| } |
| |
| public void setPartitionsByHostState(final Map<HostInfo, Set<TopicPartition>> partitionsByHostState) { |
| this.streamsMetadataState.onChange(partitionsByHostState, cluster); |
| } |
| |
| public void setAssignmentMetadata(final Map<TaskId, Set<TopicPartition>> activeTasks, |
| final Map<TaskId, Set<TopicPartition>> standbyTasks) { |
| this.assignedActiveTasks = activeTasks; |
| this.assignedStandbyTasks = standbyTasks; |
| } |
| |
| public void updateSubscriptionsFromAssignment(final List<TopicPartition> partitions) { |
| if (builder().sourceTopicPattern() != null) { |
| final Set<String> assignedTopics = new HashSet<>(); |
| for (final TopicPartition topicPartition : partitions) { |
| assignedTopics.add(topicPartition.topic()); |
| } |
| |
| final Collection<String> existingTopics = builder().subscriptionUpdates().getUpdates(); |
| if (!existingTopics.containsAll(assignedTopics)) { |
| assignedTopics.addAll(existingTopics); |
| builder().updateSubscribedTopics(assignedTopics, logPrefix); |
| } |
| } |
| } |
| |
| public void updateSubscriptionsFromMetadata(final Set<String> topics) { |
| if (builder().sourceTopicPattern() != null) { |
| final Collection<String> existingTopics = builder().subscriptionUpdates().getUpdates(); |
| if (!existingTopics.equals(topics)) { |
| builder().updateSubscribedTopics(topics, logPrefix); |
| } |
| } |
| } |
| |
| /** |
| * @throws TaskMigratedException if committing offsets failed (non-EOS) |
| * or if the task producer got fenced (EOS) |
| */ |
| int commitAll() { |
| final int committed = active.commit(); |
| return committed + standby.commit(); |
| } |
| |
| /** |
| * @throws TaskMigratedException if the task producer got fenced (EOS only) |
| */ |
| int process(final long now) { |
| return active.process(now); |
| } |
| |
| /** |
| * @throws TaskMigratedException if the task producer got fenced (EOS only) |
| */ |
| int punctuate() { |
| return active.punctuate(); |
| } |
| |
| /** |
| * @throws TaskMigratedException if committing offsets failed (non-EOS) |
| * or if the task producer got fenced (EOS) |
| */ |
| int maybeCommitActiveTasksPerUserRequested() { |
| return active.maybeCommitPerUserRequested(); |
| } |
| |
| void maybePurgeCommitedRecords() { |
| // we do not check any possible exceptions since none of them are fatal |
| // that should cause the application to fail, and we will try delete with |
| // newer offsets anyways. |
| if (deleteRecordsResult == null || deleteRecordsResult.all().isDone()) { |
| |
| if (deleteRecordsResult != null && deleteRecordsResult.all().isCompletedExceptionally()) { |
| log.debug("Previous delete-records request has failed: {}. Try sending the new request now", deleteRecordsResult.lowWatermarks()); |
| } |
| |
| final Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>(); |
| for (final Map.Entry<TopicPartition, Long> entry : active.recordsToDelete().entrySet()) { |
| recordsToDelete.put(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())); |
| } |
| if (!recordsToDelete.isEmpty()) { |
| deleteRecordsResult = adminClient.deleteRecords(recordsToDelete); |
| log.trace("Sent delete-records request: {}", recordsToDelete); |
| } |
| } |
| } |
| |
| /** |
| * Produces a string representation containing useful information about the TaskManager. |
| * This is useful in debugging scenarios. |
| * |
| * @return A string representation of the TaskManager instance. |
| */ |
| @Override |
| public String toString() { |
| return toString(""); |
| } |
| |
| public String toString(final String indent) { |
| final StringBuilder builder = new StringBuilder(); |
| builder.append("TaskManager\n"); |
| builder.append(indent).append("\tMetadataState:\n"); |
| builder.append(streamsMetadataState.toString(indent + "\t\t")); |
| builder.append(indent).append("\tActive tasks:\n"); |
| builder.append(active.toString(indent + "\t\t")); |
| builder.append(indent).append("\tStandby tasks:\n"); |
| builder.append(standby.toString(indent + "\t\t")); |
| return builder.toString(); |
| } |
| |
| // the following functions are for testing only |
| Map<TaskId, Set<TopicPartition>> assignedActiveTasks() { |
| return assignedActiveTasks; |
| } |
| |
| Map<TaskId, Set<TopicPartition>> assignedStandbyTasks() { |
| return assignedStandbyTasks; |
| } |
| } |