blob: 2429a0ae5c60dbb30808858a1a5dbcad73c85847 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage.blobstore;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.config.BlobStoreConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.storage.StorageManagerUtil;
import org.apache.samza.storage.TaskRestoreManager;
import org.apache.samza.storage.blobstore.index.DirIndex;
import org.apache.samza.storage.blobstore.index.SnapshotIndex;
import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
import org.apache.samza.storage.blobstore.util.DirDiffUtil;
import org.apache.samza.util.FileUtil;
import org.apache.samza.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BlobStoreRestoreManager implements TaskRestoreManager {
private static final Logger LOG = LoggerFactory.getLogger(BlobStoreRestoreManager.class);
// when checking if checkpoint dir is the same as remote snapshot, exclude the "OFFSET" family of files files
// that are written to the checkpoint dir after the remote upload is complete as part of
// TaskStorageCommitManager#writeCheckpointToStoreDirectories.
private static final Set<String> FILES_TO_IGNORE = ImmutableSet.of(
StorageManagerUtil.OFFSET_FILE_NAME_LEGACY,
StorageManagerUtil.OFFSET_FILE_NAME_NEW,
StorageManagerUtil.SIDE_INPUT_OFFSET_FILE_NAME_LEGACY,
StorageManagerUtil.CHECKPOINT_FILE_NAME);
private final TaskModel taskModel;
private final String jobName;
private final String jobId;
private final ExecutorService executor;
private final Config config;
private final StorageConfig storageConfig;
private final BlobStoreConfig blobStoreConfig;
private final StorageManagerUtil storageManagerUtil;
private final BlobStoreUtil blobStoreUtil;
private final DirDiffUtil dirDiffUtil;
private final File loggedBaseDir;
private final File nonLoggedBaseDir;
private final String taskName;
private final Set<String> storesToRestore;
private final BlobStoreRestoreManagerMetrics metrics;
private BlobStoreManager blobStoreManager;
/**
* Map of store name and Pair of blob id of SnapshotIndex and the corresponding SnapshotIndex from last successful
* task checkpoint
*/
private Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes;
public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService restoreExecutor, Set<String> storesToRestore,
BlobStoreRestoreManagerMetrics metrics, Config config, File loggedBaseDir, File nonLoggedBaseDir,
StorageManagerUtil storageManagerUtil, BlobStoreManager blobStoreManager) {
this.taskModel = taskModel;
this.jobName = new JobConfig(config).getName().get();
this.jobId = new JobConfig(config).getJobId();
this.executor = restoreExecutor;
this.config = config;
this.storageConfig = new StorageConfig(config);
this.blobStoreConfig = new BlobStoreConfig(config);
this.storageManagerUtil = storageManagerUtil;
this.blobStoreManager = blobStoreManager;
this.blobStoreUtil = createBlobStoreUtil(blobStoreManager, executor, metrics);
this.dirDiffUtil = new DirDiffUtil();
this.prevStoreSnapshotIndexes = new HashMap<>();
this.loggedBaseDir = loggedBaseDir;
this.nonLoggedBaseDir = nonLoggedBaseDir;
this.taskName = taskModel.getTaskName().getTaskName();
this.storesToRestore = storesToRestore;
this.metrics = metrics;
}
@Override
public void init(Checkpoint checkpoint) {
long startTime = System.nanoTime();
LOG.debug("Initializing blob store restore manager for task: {}", taskName);
blobStoreManager.init();
// get previous SCMs from checkpoint
prevStoreSnapshotIndexes = blobStoreUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, checkpoint, storesToRestore);
metrics.getSnapshotIndexNs.set(System.nanoTime() - startTime);
LOG.trace("Found previous snapshot index during blob store restore manager init for task: {} to be: {}",
taskName, prevStoreSnapshotIndexes);
metrics.initStoreMetrics(storesToRestore);
// Note: blocks the caller thread.
deleteUnusedStoresFromBlobStore(jobName, jobId, taskName, storageConfig, blobStoreConfig, prevStoreSnapshotIndexes,
blobStoreUtil, executor);
metrics.initNs.set(System.nanoTime() - startTime);
}
/**
* Restore state from checkpoints and state snapshots.
* State restore is performed by first retrieving the SnapshotIndex of the previous commit for every store from the
* prevStoreSnapshotIndexes map. Local store is deleted to perform a restore from local checkpoint directory or remote
* directory. If no local state checkpoint directory is found, or if the local checkpoint directory is different from
* the remote snapshot, local checkpoint directory is deleted and a restore from the remote store is done by
* downloading the state asynchronously and in parallel.
*
*/
@Override
public CompletableFuture<Void> restore() {
return restoreStores(jobName, jobId, taskModel.getTaskName(), storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir,
storageConfig, metrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor);
}
@Override
public void close() {
blobStoreManager.close();
}
@VisibleForTesting
protected BlobStoreUtil createBlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor,
BlobStoreRestoreManagerMetrics metrics) {
return new BlobStoreUtil(blobStoreManager, executor, null, metrics);
}
/**
* Deletes blob store contents for stores that were present in the last checkpoint but are either no longer
* present in job configs (removed by user since last deployment) or are no longer configured to be backed
* up using blob stores.
*
* This method blocks until all the necessary store contents and snapshot index blobs have been marked for deletion.
*/
@VisibleForTesting
static void deleteUnusedStoresFromBlobStore(String jobName, String jobId, String taskName, StorageConfig storageConfig,
BlobStoreConfig blobStoreConfig, Map<String, Pair<String, SnapshotIndex>> initialStoreSnapshotIndexes,
BlobStoreUtil blobStoreUtil, ExecutorService executor) {
List<String> storesToBackup =
storageConfig.getStoresWithBackupFactory(BlobStoreStateBackendFactory.class.getName());
List<String> storesToRestore =
storageConfig.getStoresWithRestoreFactory(BlobStoreStateBackendFactory.class.getName());
List<CompletionStage<Void>> storeDeletionFutures = new ArrayList<>();
initialStoreSnapshotIndexes.forEach((storeName, scmAndSnapshotIndex) -> {
if (!storesToBackup.contains(storeName) && !storesToRestore.contains(storeName)) {
LOG.info("Removing task: {} store: {} from blob store. It is either no longer used, " +
"or is no longer configured to be backed up or restored with blob store.", taskName, storeName);
DirIndex dirIndex = scmAndSnapshotIndex.getRight().getDirIndex();
Metadata requestMetadata =
new Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(), jobName, jobId, taskName, storeName);
CompletionStage<Void> storeDeletionFuture =
blobStoreUtil.cleanUpDir(dirIndex, requestMetadata) // delete files and sub-dirs previously marked for removal
.thenComposeAsync(v ->
blobStoreUtil.deleteDir(dirIndex, requestMetadata), executor) // deleted files and dirs still present
.thenComposeAsync(v -> blobStoreUtil.deleteSnapshotIndexBlob(
scmAndSnapshotIndex.getLeft(), requestMetadata),
executor); // delete the snapshot index blob
storeDeletionFutures.add(storeDeletionFuture);
}
});
FutureUtil.allOf(storeDeletionFutures).join();
}
/**
* Restores all eligible stores in the task.
*/
@VisibleForTesting
static CompletableFuture<Void> restoreStores(String jobName, String jobId, TaskName taskName, Set<String> storesToRestore,
Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes,
File loggedBaseDir, StorageConfig storageConfig, BlobStoreRestoreManagerMetrics metrics,
StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, DirDiffUtil dirDiffUtil,
ExecutorService executor) {
long restoreStartTime = System.nanoTime();
List<CompletionStage<Void>> restoreFutures = new ArrayList<>();
LOG.debug("Starting restore for task: {} stores: {}", taskName, storesToRestore);
storesToRestore.forEach(storeName -> {
if (!prevStoreSnapshotIndexes.containsKey(storeName)) {
LOG.info("No checkpointed snapshot index found for task: {} store: {}. Skipping restore.", taskName, storeName);
// TODO HIGH shesharm what should we do with the local state already present on disk, if any?
// E.g. this will be the case if user changes a store from changelog based backup and restore to
// blob store based backup and restore, both at the same time.
return;
}
Pair<String, SnapshotIndex> scmAndSnapshotIndex = prevStoreSnapshotIndexes.get(storeName);
long storeRestoreStartTime = System.nanoTime();
SnapshotIndex snapshotIndex = scmAndSnapshotIndex.getRight();
DirIndex dirIndex = snapshotIndex.getDirIndex();
DirIndex.Stats stats = DirIndex.getStats(dirIndex);
metrics.filesToRestore.getValue().addAndGet(stats.filesPresent);
metrics.bytesToRestore.getValue().addAndGet(stats.bytesPresent);
metrics.filesRemaining.getValue().addAndGet(stats.filesPresent);
metrics.bytesRemaining.getValue().addAndGet(stats.bytesPresent);
CheckpointId checkpointId = snapshotIndex.getSnapshotMetadata().getCheckpointId();
File storeDir = storageManagerUtil.getTaskStoreDir(loggedBaseDir, storeName, taskName, TaskMode.Active);
Path storeCheckpointDir = Paths.get(storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId));
LOG.trace("Got task: {} store: {} local store directory: {} and local store checkpoint directory: {}",
taskName, storeName, storeDir, storeCheckpointDir);
// we always delete the store dir to preserve transactional state guarantees.
try {
LOG.debug("Deleting local store directory: {}. Will be restored from local store checkpoint directory " +
"or remote snapshot.", storeDir);
FileUtils.deleteDirectory(storeDir);
} catch (IOException e) {
throw new SamzaException(String.format("Error deleting store directory: %s", storeDir), e);
}
boolean shouldRestore = shouldRestore(taskName.getTaskName(), storeName, dirIndex,
storeCheckpointDir, storageConfig, dirDiffUtil);
if (shouldRestore) { // restore the store from the remote blob store
// delete all store checkpoint directories. if we only delete the store directory and don't
// delete the checkpoint directories, the store size on disk will grow to 2x after restore
// until the first commit is completed and older checkpoint dirs are deleted. This is
// because the hard-linked checkpoint dir files will no longer be de-duped with the
// now-deleted main store directory contents and will take up additional space of their
// own during the restore.
deleteCheckpointDirs(taskName, storeName, loggedBaseDir, storageManagerUtil);
metrics.storePreRestoreNs.get(storeName).set(System.nanoTime() - storeRestoreStartTime);
enqueueRestore(jobName, jobId, taskName.toString(), storeName, storeDir, dirIndex, storeRestoreStartTime,
restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor);
} else {
LOG.debug("Renaming store checkpoint directory: {} to store directory: {} since its contents are identical " +
"to the remote snapshot.", storeCheckpointDir, storeDir);
// atomically rename the checkpoint dir to the store dir
new FileUtil().move(storeCheckpointDir.toFile(), storeDir);
// delete any other checkpoint dirs.
deleteCheckpointDirs(taskName, storeName, loggedBaseDir, storageManagerUtil);
}
});
// wait for all restores to finish
return FutureUtil.allOf(restoreFutures).whenComplete((res, ex) -> {
LOG.info("Restore completed for task: {} stores", taskName);
metrics.restoreNs.set(System.nanoTime() - restoreStartTime);
});
}
/**
* Determines if the store needs to be restored from remote snapshot based on local and remote state.
*/
@VisibleForTesting
static boolean shouldRestore(String taskName, String storeName, DirIndex dirIndex,
Path storeCheckpointDir, StorageConfig storageConfig, DirDiffUtil dirDiffUtil) {
// if a store checkpoint directory exists for the last successful task checkpoint, try to use it.
boolean restoreStore;
if (Files.exists(storeCheckpointDir)) {
if (storageConfig.cleanLoggedStoreDirsOnStart(storeName)) {
LOG.debug("Restoring task: {} store: {} from remote snapshot since the store is configured to be " +
"restored on each restart.", taskName, storeName);
restoreStore = true;
} else if (dirDiffUtil.areSameDir(FILES_TO_IGNORE, false).test(storeCheckpointDir.toFile(), dirIndex)) {
restoreStore = false; // no restore required for this store.
} else {
// we don't optimize for the case when the local host doesn't contain the most recent store checkpoint
// directory but contains an older checkpoint directory which could have partial overlap with the remote
// snapshot. we also don't try to optimize for any edge cases where the most recent checkpoint directory
// contents could be partially different than the remote store (afaik, there is no known valid scenario
// where this could happen right now, except for the offset file handling above).
// it's simpler and fast enough for now to restore the entire store instead.
LOG.error("Local store checkpoint directory: {} contents are not the same as the remote snapshot. " +
"Queuing for restore from remote snapshot.", storeCheckpointDir);
restoreStore = true;
}
} else { // did not find last checkpoint dir, restore the store from the remote blob store
LOG.debug("No local store checkpoint directory found at: {}. " +
"Queuing for restore from remote snapshot.", storeCheckpointDir);
restoreStore = true;
}
return restoreStore;
}
/**
* Starts the restore for the store, enqueuing all restore-completion futures into {@param restoreFutures}.
*/
@VisibleForTesting
static void enqueueRestore(String jobName, String jobId, String taskName, String storeName, File storeDir, DirIndex dirIndex,
long storeRestoreStartTime, List<CompletionStage<Void>> restoreFutures, BlobStoreUtil blobStoreUtil,
DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics, ExecutorService executor) {
Metadata requestMetadata = new Metadata(storeDir.getAbsolutePath(), Optional.empty(), jobName, jobId, taskName, storeName);
CompletableFuture<Void> restoreFuture =
blobStoreUtil.restoreDir(storeDir, dirIndex, requestMetadata).thenRunAsync(() -> {
metrics.storeRestoreNs.get(storeName).set(System.nanoTime() - storeRestoreStartTime);
long postRestoreStartTime = System.nanoTime();
LOG.trace("Comparing restored store directory: {} and remote directory to verify restore.", storeDir);
if (!dirDiffUtil.areSameDir(FILES_TO_IGNORE, true).test(storeDir, dirIndex)) {
metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() - postRestoreStartTime);
throw new SamzaException(
String.format("Restored store directory: %s contents " + "are not the same as the remote snapshot.",
storeDir.getAbsolutePath()));
} else {
metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() - postRestoreStartTime);
LOG.info("Restore from remote snapshot completed for store: {}", storeDir);
}
}, executor);
restoreFutures.add(restoreFuture);
}
private static void deleteCheckpointDirs(TaskName taskName, String storeName, File loggedBaseDir, StorageManagerUtil storageManagerUtil) {
try {
List<File> checkpointDirs = storageManagerUtil.getTaskStoreCheckpointDirs(
loggedBaseDir, storeName, taskName, TaskMode.Active);
for (File checkpointDir: checkpointDirs) {
LOG.debug("Deleting local store checkpoint directory: {} before restore.", checkpointDir);
FileUtils.deleteDirectory(checkpointDir);
}
} catch (Exception e) {
throw new SamzaException(
String.format("Error deleting checkpoint directory for task: %s store: %s.",
taskName, storeName), e);
}
}
}