| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.storage; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import java.io.File; |
| import java.io.FileFilter; |
| import java.io.IOException; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutorService; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.io.filefilter.WildcardFileFilter; |
| import org.apache.samza.Partition; |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.checkpoint.Checkpoint; |
| import org.apache.samza.checkpoint.CheckpointId; |
| import org.apache.samza.checkpoint.CheckpointManager; |
| import org.apache.samza.checkpoint.CheckpointV1; |
| import org.apache.samza.checkpoint.CheckpointV2; |
| import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.container.TaskInstanceMetrics; |
| import org.apache.samza.container.TaskName; |
| import org.apache.samza.job.model.TaskMode; |
| import org.apache.samza.system.SystemStream; |
| import org.apache.samza.system.SystemStreamPartition; |
| import org.apache.samza.util.FutureUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Handles the commit of the state stores of the task. |
| */ |
| public class TaskStorageCommitManager { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class); |
| |
| private final TaskName taskName; |
| private final CheckpointManager checkpointManager; |
| private final ContainerStorageManager containerStorageManager; |
| private final Map<String, TaskBackupManager> stateBackendToBackupManager; |
| private final Partition taskChangelogPartition; |
| private final StorageManagerUtil storageManagerUtil; |
| private final ExecutorService backupExecutor; |
| private final File durableStoreBaseDir; |
| private final Map<String, SystemStream> storeChangelogs; |
| private final TaskInstanceMetrics metrics; |
| |
| // Available after init(), since stores are created by ContainerStorageManager#start() |
| private Map<String, StorageEngine> storageEngines; |
| |
| public TaskStorageCommitManager(TaskName taskName, Map<String, TaskBackupManager> stateBackendToBackupManager, |
| ContainerStorageManager containerStorageManager, Map<String, SystemStream> storeChangelogs, Partition changelogPartition, |
| CheckpointManager checkpointManager, Config config, ExecutorService backupExecutor, |
| StorageManagerUtil storageManagerUtil, File durableStoreBaseDir, TaskInstanceMetrics metrics) { |
| this.taskName = taskName; |
| this.containerStorageManager = containerStorageManager; |
| this.stateBackendToBackupManager = stateBackendToBackupManager; |
| this.taskChangelogPartition = changelogPartition; |
| this.checkpointManager = checkpointManager; |
| this.backupExecutor = backupExecutor; |
| this.durableStoreBaseDir = durableStoreBaseDir; |
| this.storeChangelogs = storeChangelogs; |
| this.storageManagerUtil = storageManagerUtil; |
| this.metrics = metrics; |
| } |
| |
| public void init() { |
| // Assuming that container storage manager has already started and created to stores |
| storageEngines = containerStorageManager.getAllStores(taskName); |
| if (checkpointManager != null) { |
| Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName); |
| LOG.debug("Last checkpoint on start for task: {} is: {}", taskName, checkpoint); |
| stateBackendToBackupManager.values() |
| .forEach(storageBackupManager -> storageBackupManager.init(checkpoint)); |
| } else { |
| stateBackendToBackupManager.values() |
| .forEach(storageBackupManager -> storageBackupManager.init(null)); |
| } |
| } |
| |
| /** |
| * Synchronously captures the current state of the stores in order to persist it to the backup manager |
| * in the async {@link #upload(CheckpointId, Map)} phase. Returns a map of state backend factory name to |
| * a map of store name to state checkpoint markers for all configured state backends and stores. |
| * |
| * @param checkpointId {@link CheckpointId} of the current commit |
| * @return a map of state backend factory name to a map of store name to state checkpoint markers |
| */ |
| public Map<String, Map<String, String>> snapshot(CheckpointId checkpointId) { |
| // Flush all stores |
| storageEngines.values().forEach(StorageEngine::flush); |
| LOG.debug("Flushed all storage engines for taskName: {}, checkpoint id: {}", |
| taskName, checkpointId); |
| |
| |
| // Checkpoint all persisted and durable stores |
| long checkpointStartNs = System.nanoTime(); |
| storageEngines.forEach((storeName, storageEngine) -> { |
| if (storageEngine.getStoreProperties().isPersistedToDisk() && |
| storageEngine.getStoreProperties().isDurableStore()) { |
| storageEngine.checkpoint(checkpointId); |
| } |
| }); |
| long checkpointNs = System.nanoTime() - checkpointStartNs; |
| metrics.storeCheckpointNs().update(checkpointNs); |
| LOG.debug("Checkpointed all storage engines for taskName: {}, checkpoint id: {} in {} ns", |
| taskName, checkpointId, checkpointNs); |
| |
| // state backend factory -> store Name -> state checkpoint marker |
| Map<String, Map<String, String>> stateBackendToStoreSCMs = new HashMap<>(); |
| |
| // for each configured state backend factory, backup the state for all stores in this task. |
| stateBackendToBackupManager.forEach((stateBackendFactoryName, backupManager) -> { |
| Map<String, String> snapshotSCMs = backupManager.snapshot(checkpointId); |
| LOG.debug("Created snapshot for taskName: {}, checkpoint id: {}, state backend: {}. Snapshot SCMs: {}", |
| taskName, checkpointId, stateBackendFactoryName, snapshotSCMs); |
| stateBackendToStoreSCMs.put(stateBackendFactoryName, snapshotSCMs); |
| }); |
| |
| return stateBackendToStoreSCMs; |
| } |
| |
| /** |
| * Asynchronously backs up the local state to the remote storage and returns a future containing the committed |
| * map of state backend factory name to the map of store name to state checkpoint marker. |
| * |
| * @param checkpointId the {@link CheckpointId} associated with this commit |
| * @return a future containing the Map of FactoryName to (Map of StoreName to StateCheckpointMarker). |
| */ |
| public CompletableFuture<Map<String, Map<String, String>>> upload( |
| CheckpointId checkpointId, Map<String, Map<String, String>> snapshotSCMs) { |
| // state backend factory -> store Name -> state checkpoint marker |
| Map<String, CompletableFuture<Map<String, String>>> stateBackendToStoreSCMs = new HashMap<>(); |
| |
| // for each configured state backend factory, backup the state for all stores in this task. |
| stateBackendToBackupManager.forEach((stateBackendFactoryName, backupManager) -> { |
| try { |
| Map<String, String> factorySnapshotSCMs = |
| snapshotSCMs.getOrDefault(stateBackendFactoryName, Collections.emptyMap()); |
| LOG.debug("Starting upload for taskName: {}, checkpoint id: {}, state backend snapshot SCM: {}", |
| taskName, checkpointId, factorySnapshotSCMs); |
| |
| CompletableFuture<Map<String, String>> uploadFuture = |
| backupManager.upload(checkpointId, factorySnapshotSCMs); |
| uploadFuture.thenAccept(uploadSCMs -> |
| LOG.debug("Finished upload for taskName: {}, checkpoint id: {}, state backend: {}. Upload SCMs: {}", |
| taskName, checkpointId, stateBackendFactoryName, uploadSCMs)); |
| |
| stateBackendToStoreSCMs.put(stateBackendFactoryName, uploadFuture); |
| } catch (Exception e) { |
| throw new SamzaException( |
| String.format("Error backing up local state for taskName: %s, checkpoint id: %s, state backend: %s", |
| taskName, checkpointId, stateBackendFactoryName), e); |
| } |
| }); |
| |
| return FutureUtil.toFutureOfMap(stateBackendToStoreSCMs); |
| } |
| |
| /** |
| * Writes the {@link Checkpoint} information returned by {@link #upload(CheckpointId, Map)} |
| * in each store directory and store checkpoint directory. Written content depends on the type of {@code checkpoint}. |
| * For {@link CheckpointV2}, writes the entire task {@link CheckpointV2}. |
| * For {@link CheckpointV1}, only writes the changelog ssp offsets in the OFFSET* files. |
| * |
| * Note: The assumption is that this method will be invoked once for each {@link Checkpoint} version that the |
| * task needs to write as determined by {@link org.apache.samza.config.TaskConfig#getCheckpointWriteVersions()}. |
| * This is required for upgrade and rollback compatibility. |
| * |
| * @param checkpoint the latest checkpoint to be persisted to local file system |
| */ |
| public void writeCheckpointToStoreDirectories(Checkpoint checkpoint) { |
| if (checkpoint instanceof CheckpointV1) { |
| LOG.debug("Writing CheckpointV1 to store and checkpoint directories for taskName: {} with checkpoint: {}", |
| taskName, checkpoint); |
| // Write CheckpointV1 changelog offsets to store and checkpoint directories |
| writeChangelogOffsetFiles(checkpoint.getOffsets()); |
| } else if (checkpoint instanceof CheckpointV2) { |
| LOG.debug("Writing CheckpointV2 to store and checkpoint directories for taskName: {} with checkpoint: {}", |
| taskName, checkpoint); |
| storageEngines.forEach((storeName, storageEngine) -> { |
| // Only write the checkpoint file if the store is durable and persisted to disk |
| if (storageEngine.getStoreProperties().isDurableStore() && |
| storageEngine.getStoreProperties().isPersistedToDisk()) { |
| CheckpointV2 checkpointV2 = (CheckpointV2) checkpoint; |
| |
| try { |
| File storeDir = storageManagerUtil.getTaskStoreDir(durableStoreBaseDir, storeName, taskName, TaskMode.Active); |
| storageManagerUtil.writeCheckpointV2File(storeDir, checkpointV2); |
| |
| CheckpointId checkpointId = checkpointV2.getCheckpointId(); |
| File checkpointDir = Paths.get(storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId)).toFile(); |
| storageManagerUtil.writeCheckpointV2File(checkpointDir, checkpointV2); |
| } catch (Exception e) { |
| throw new SamzaException( |
| String.format("Write checkpoint file failed for task: %s, storeName: %s, checkpointId: %s", |
| taskName, storeName, ((CheckpointV2) checkpoint).getCheckpointId()), e); |
| } |
| } |
| }); |
| } else { |
| throw new SamzaException("Unsupported checkpoint version: " + checkpoint.getVersion()); |
| } |
| } |
| |
| /** |
| * Performs any post-commit and cleanup actions after the {@link Checkpoint} is successfully written to the |
| * checkpoint topic. Invokes {@link TaskBackupManager#cleanUp(CheckpointId, Map)} on each of the configured task |
| * backup managers. Deletes all local store checkpoint directories older than the {@code latestCheckpointId}. |
| * |
| * @param latestCheckpointId CheckpointId of the most recent successful commit |
| * @param stateCheckpointMarkers map of map(stateBackendFactoryName to map(storeName to state checkpoint markers) from |
| * the latest commit |
| */ |
| public CompletableFuture<Void> cleanUp(CheckpointId latestCheckpointId, |
| Map<String, Map<String, String>> stateCheckpointMarkers) { |
| List<CompletableFuture<Void>> cleanUpFutures = new ArrayList<>(); |
| |
| // Call cleanup on each backup manager |
| stateCheckpointMarkers.forEach((factoryName, storeSCMs) -> { |
| if (stateBackendToBackupManager.containsKey(factoryName)) { |
| LOG.debug("Cleaning up commit for factory: {} for task: {}", factoryName, taskName); |
| TaskBackupManager backupManager = stateBackendToBackupManager.get(factoryName); |
| cleanUpFutures.add(backupManager.cleanUp(latestCheckpointId, storeSCMs)); |
| } else { |
| // This may happen during migration from one state backend to another, where the latest commit contains |
| // a state backend that is no longer supported for the current commit manager |
| LOG.warn("Ignored cleanup for scm: {} due to unknown factory: {} ", storeSCMs, factoryName); |
| } |
| }); |
| |
| return FutureUtil.allOf(cleanUpFutures) |
| .thenAcceptAsync(aVoid -> deleteOldCheckpointDirs(latestCheckpointId), backupExecutor); |
| } |
| |
| private void deleteOldCheckpointDirs(CheckpointId latestCheckpointId) { |
| // Delete directories for checkpoints older than latestCheckpointId |
| if (latestCheckpointId != null) { |
| LOG.debug("Deleting checkpoints older than checkpoint id: {}", latestCheckpointId); |
| File[] files = durableStoreBaseDir.listFiles(); |
| if (files != null) { |
| for (File storeDir : files) { |
| String storeName = storeDir.getName(); |
| String taskStoreName = storageManagerUtil |
| .getTaskStoreDir(durableStoreBaseDir, storeName, taskName, TaskMode.Active).getName(); |
| FileFilter fileFilter = new WildcardFileFilter(taskStoreName + "-*"); |
| File[] checkpointDirs = storeDir.listFiles(fileFilter); |
| if (checkpointDirs != null) { |
| for (File checkpointDir : checkpointDirs) { |
| if (!checkpointDir.getName().contains(latestCheckpointId.serialize())) { |
| try { |
| FileUtils.deleteDirectory(checkpointDir); |
| } catch (IOException e) { |
| throw new SamzaException( |
| String.format("Unable to delete checkpoint directory: %s", checkpointDir.getName()), e); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Close all the state backup managers |
| */ |
| public void close() { |
| LOG.debug("Stopping backup managers for task {}.", taskName); |
| stateBackendToBackupManager.values().forEach(storageBackupManager -> { |
| if (storageBackupManager != null) { |
| storageBackupManager.close(); |
| } |
| }); |
| } |
| |
| /** |
| * Writes the newest changelog ssp offset for each logged and persistent store to the OFFSET file in the current |
| * store directory (for allowing rollbacks). If the Kafka transactional backup manager is enabled, also writes to |
| * the store checkpoint directory. |
| * |
| * These files are used during container startup to ensure transactional state, and to determine whether the |
| * there is any new information in the changelog that is not reflected in the on-disk copy of the store. |
| * If there is any delta, it is replayed from the changelog. E.g. this can happen if the job was run on this host, |
| * then another host, and then back to this host. |
| */ |
| @VisibleForTesting |
| void writeChangelogOffsetFiles(Map<SystemStreamPartition, String> checkpointOffsets) { |
| if (storageEngines == null) { |
| throw new SamzaException(String.format( |
| "Storage engines are not initialized and writeChangelogOffsetFiles not be written for task %s", taskName)); |
| } |
| storeChangelogs.forEach((storeName, systemStream) -> { |
| SystemStreamPartition changelogSSP = new SystemStreamPartition( |
| systemStream.getSystem(), systemStream.getStream(), taskChangelogPartition); |
| |
| // Only write if the store is durable and persisted to disk |
| if (checkpointOffsets.containsKey(changelogSSP) && |
| storageEngines.containsKey(storeName) && |
| storageEngines.get(storeName).getStoreProperties().isDurableStore() && |
| storageEngines.get(storeName).getStoreProperties().isPersistedToDisk()) { |
| LOG.debug("Writing changelog offset for taskName {} store {} changelog {}.", taskName, storeName, systemStream); |
| File currentStoreDir = storageManagerUtil.getTaskStoreDir(durableStoreBaseDir, storeName, taskName, TaskMode.Active); |
| try { |
| KafkaChangelogSSPOffset kafkaChangelogSSPOffset = KafkaChangelogSSPOffset |
| .fromString(checkpointOffsets.get(changelogSSP)); |
| // Write offsets to file system if it is non-null |
| String newestOffset = kafkaChangelogSSPOffset.getChangelogOffset(); |
| if (newestOffset != null) { |
| // Write changelog SSP offset to the OFFSET files in the task store directory |
| writeChangelogOffsetFile(storeName, changelogSSP, newestOffset, currentStoreDir); |
| |
| // Write changelog SSP offset to the OFFSET files in the store checkpoint directory |
| File checkpointDir = Paths.get(storageManagerUtil.getStoreCheckpointDir( |
| currentStoreDir, kafkaChangelogSSPOffset.getCheckpointId())).toFile(); |
| writeChangelogOffsetFile(storeName, changelogSSP, newestOffset, checkpointDir); |
| } else { |
| // If newestOffset is null, then it means the changelog ssp is (or has become) empty. This could be |
| // either because the changelog topic was newly added, repartitioned, or manually deleted and recreated. |
| // No need to persist the offset file. |
| LOG.debug("Deleting OFFSET file for taskName {} store {} changelog ssp {} since the newestOffset is null.", |
| taskName, storeName, changelogSSP); |
| storageManagerUtil.deleteOffsetFile(currentStoreDir); |
| } |
| } catch (IOException e) { |
| throw new SamzaException( |
| String.format("Error storing offset for taskName %s store %s changelog %s.", taskName, storeName, |
| systemStream), e); |
| } |
| } |
| }); |
| LOG.debug("Done writing OFFSET files for logged persistent key value stores for task {}", taskName); |
| } |
| |
| @VisibleForTesting |
| void writeChangelogOffsetFile(String storeName, SystemStreamPartition ssp, String newestOffset, |
| File writeDirectory) throws IOException { |
| LOG.debug("Storing newest offset {} for taskName {} store {} changelog ssp {} in OFFSET file at path: {}.", |
| newestOffset, taskName, storeName, ssp, writeDirectory); |
| storageManagerUtil.writeOffsetFile(writeDirectory, Collections.singletonMap(ssp, newestOffset), false); |
| } |
| } |