blob: 0cdf44cbd0ca4518ebcb7b0837f7cf657b5833ba [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static java.util.Collections.singleton;
public class ProcessorStateManager {
private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
public static final String LOCK_FILE_NAME = ".lock";
private final String applicationId;
private final int defaultPartition;
private final Map<String, TopicPartition> partitionForTopic;
private final File baseDir;
private final FileLock directoryLock;
private final Map<String, StateStore> stores;
private final Set<String> loggingEnabled;
private final Consumer<byte[], byte[]> restoreConsumer;
private final Map<TopicPartition, Long> restoredOffsets;
private final Map<TopicPartition, Long> checkpointedOffsets;
private final Map<TopicPartition, Long> offsetLimits;
private final boolean isStandby;
private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
* @throws IOException if any error happens while creating or locking the state directory
public ProcessorStateManager(String applicationId, int defaultPartition, Collection<TopicPartition> sources, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
this.applicationId = applicationId;
this.defaultPartition = defaultPartition;
this.partitionForTopic = new HashMap<>();
for (TopicPartition source : sources) {
this.partitionForTopic.put(source.topic(), source);
this.baseDir = baseDir;
this.stores = new HashMap<>();
this.loggingEnabled = new HashSet<>();
this.restoreConsumer = restoreConsumer;
this.restoredOffsets = new HashMap<>();
this.isStandby = isStandby;
this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
this.offsetLimits = new HashMap<>();
// create the state directory for this task if missing (we won't create the parent directory)
// try to acquire the exclusive lock on the state directory
directoryLock = lockStateDirectory(baseDir, 5);
if (directoryLock == null) {
throw new IOException("Failed to lock the state directory: " + baseDir.getCanonicalPath());
// load the checkpoint information
OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
this.checkpointedOffsets = new HashMap<>(;
// delete the checkpoint file after finish loading its stored offsets
private static void createStateDirectory(File stateDir) throws IOException {
if (!stateDir.exists()) {
public static String storeChangelogTopic(String applicationId, String storeName) {
return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
* @throws IOException if any error happens when locking the state directory
public static FileLock lockStateDirectory(File stateDir) throws IOException {
return lockStateDirectory(stateDir, 0);
private static FileLock lockStateDirectory(File stateDir, int retry) throws IOException {
File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
FileLock lock = lockStateDirectory(channel);
while (lock == null && retry > 0) {
try {
} catch (Exception ex) {
// do nothing
lock = lockStateDirectory(channel);
if (lock == null) {
return lock;
private static FileLock lockStateDirectory(FileChannel channel) throws IOException {
try {
return channel.tryLock();
} catch (OverlappingFileLockException e) {
return null;
public File baseDir() {
return this.baseDir;
* @throws IllegalArgumentException if the store name has already been registered or if it is not a valid name
* (e.g., when it conflicts with the names of internal topics, like the checkpoint file name)
* @throws StreamsException if the store's change log does not contain the partition
public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
if (
throw new IllegalArgumentException("Illegal store name: " + CHECKPOINT_FILE_NAME);
if (this.stores.containsKey(
throw new IllegalArgumentException("Store " + + " has already been registered.");
if (loggingEnabled)
// check that the underlying change log topic exist or not
String topic;
if (loggingEnabled)
topic = storeChangelogTopic(this.applicationId,;
else topic =;
// block until the partition is ready for this state changelog topic or time has elapsed
int partition = getPartition(topic);
boolean partitionNotFound = true;
long startTime = System.currentTimeMillis();
long waitTime = 5000L; // hard-code the value since we should not block after KIP-4
do {
try {
} catch (InterruptedException e) {
// ignore
for (PartitionInfo partitionInfo : restoreConsumer.partitionsFor(topic)) {
if (partitionInfo.partition() == partition) {
partitionNotFound = false;
} while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime);
if (partitionNotFound)
throw new StreamsException("Store " + + "'s change log (" + topic + ") does not contain partition " + partition);
this.stores.put(, store);
if (isStandby) {
if (store.persistent())
restoreCallbacks.put(topic, stateRestoreCallback);
} else {
restoreActiveState(topic, stateRestoreCallback);
private void restoreActiveState(String topicName, StateRestoreCallback stateRestoreCallback) {
// ---- try to restore the state from change-log ---- //
// subscribe to the store's partition
if (!restoreConsumer.subscription().isEmpty()) {
throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
TopicPartition storePartition = new TopicPartition(topicName, getPartition(topicName));
try {
// calculate the end offset of the partition
// TODO: this is a bit hacky to first seek then position to get the end offset
long endOffset = restoreConsumer.position(storePartition);
// restore from the checkpointed offset of the change log if it is persistent and the offset exists;
// restore the state from the beginning of the change log otherwise
if (checkpointedOffsets.containsKey(storePartition)) {, checkpointedOffsets.get(storePartition));
} else {
// restore its state from changelog records
long limit = offsetLimit(storePartition);
while (true) {
long offset = 0L;
for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
offset = record.offset();
if (offset >= limit) break;
stateRestoreCallback.restore(record.key(), record.value());
if (offset >= limit) {
} else if (restoreConsumer.position(storePartition) == endOffset) {
} else if (restoreConsumer.position(storePartition) > endOffset) {
// For a logging enabled changelog (no offset limit),
// the log end offset should not change while restoring since it is only written by this thread.
throw new IllegalStateException("Log end offset should not change while restoring");
// record the restored offset for its change log partition
long newOffset = Math.min(limit, restoreConsumer.position(storePartition));
restoredOffsets.put(storePartition, newOffset);
} finally {
// un-assign the change log partition
public Map<TopicPartition, Long> checkpointedOffsets() {
Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();
for (Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) {
String topicName = entry.getKey();
int partition = getPartition(topicName);
TopicPartition storePartition = new TopicPartition(topicName, partition);
if (checkpointedOffsets.containsKey(storePartition)) {
partitionsAndOffsets.put(storePartition, checkpointedOffsets.get(storePartition));
} else {
partitionsAndOffsets.put(storePartition, -1L);
return partitionsAndOffsets;
public List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
long limit = offsetLimit(storePartition);
List<ConsumerRecord<byte[], byte[]>> remainingRecords = null;
// restore states from changelog records
StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic());
long lastOffset = -1L;
int count = 0;
for (ConsumerRecord<byte[], byte[]> record : records) {
if (record.offset() < limit) {
restoreCallback.restore(record.key(), record.value());
lastOffset = record.offset();
} else {
if (remainingRecords == null)
remainingRecords = new ArrayList<>(records.size() - count);
// record the restored offset for its change log partition
restoredOffsets.put(storePartition, lastOffset + 1);
return remainingRecords;
public void putOffsetLimit(TopicPartition partition, long limit) {
offsetLimits.put(partition, limit);
private long offsetLimit(TopicPartition partition) {
Long limit = offsetLimits.get(partition);
return limit != null ? limit : Long.MAX_VALUE;
public StateStore getStore(String name) {
return stores.get(name);
public void flush() {
if (!this.stores.isEmpty()) {
log.debug("Flushing stores.");
for (StateStore store : this.stores.values())
* @throws IOException if any error happens when flushing or closing the state stores
public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
try {
if (!stores.isEmpty()) {
log.debug("Closing stores.");
for (Map.Entry<String, StateStore> entry : stores.entrySet()) {
log.debug("Closing storage engine {}", entry.getKey());
Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
for (String storeName : stores.keySet()) {
TopicPartition part;
if (loggingEnabled.contains(storeName))
part = new TopicPartition(storeChangelogTopic(applicationId, storeName), getPartition(storeName));
part = new TopicPartition(storeName, getPartition(storeName));
// only checkpoint the offset to the offsets file if it is persistent;
if (stores.get(storeName).persistent()) {
Long offset = ackedOffsets.get(part);
if (offset != null) {
// store the last offset + 1 (the log position after restoration)
checkpointOffsets.put(part, offset + 1);
} else {
// if no record was produced. we need to check the restored offset.
offset = restoredOffsets.get(part);
if (offset != null)
checkpointOffsets.put(part, offset);
// write the checkpoint file before closing, to indicate clean shutdown
OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
} finally {
// release the state directory directoryLock
private int getPartition(String topic) {
TopicPartition partition = partitionForTopic.get(topic);
return partition == null ? defaultPartition : partition.partition();