| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.storage; |
| |
| import com.fasterxml.jackson.core.JsonParseException; |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.JsonMappingException; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.ObjectWriter; |
| import com.google.common.collect.ImmutableMap; |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.StandardCopyOption; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Semaphore; |
| import java.util.stream.Collectors; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.checkpoint.CheckpointId; |
| import org.apache.samza.checkpoint.CheckpointV2; |
| import org.apache.samza.clustermanager.StandbyTaskUtil; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.config.StorageConfig; |
| import org.apache.samza.container.TaskName; |
| import org.apache.samza.job.model.TaskMode; |
| import org.apache.samza.job.model.TaskModel; |
| import org.apache.samza.serializers.CheckpointV2Serde; |
| import org.apache.samza.serializers.model.SamzaObjectMapper; |
| import org.apache.samza.system.SystemAdmin; |
| import org.apache.samza.system.SystemStream; |
| import org.apache.samza.system.SystemStreamPartition; |
| import org.apache.samza.util.Clock; |
| import org.apache.samza.util.FileUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| public class StorageManagerUtil { |
| private static final Logger LOG = LoggerFactory.getLogger(StorageManagerUtil.class); |
| public static final String CHECKPOINT_FILE_NAME = "CHECKPOINT-V2"; |
| public static final String OFFSET_FILE_NAME_NEW = "OFFSET-v2"; |
| public static final String OFFSET_FILE_NAME_LEGACY = "OFFSET"; |
| public static final String SIDE_INPUT_OFFSET_FILE_NAME_LEGACY = "SIDE-INPUT-OFFSETS"; |
| private static final ObjectMapper OBJECT_MAPPER = SamzaObjectMapper.getObjectMapper(); |
| private static final TypeReference<Map<SystemStreamPartition, String>> OFFSETS_TYPE_REFERENCE = |
| new TypeReference<Map<SystemStreamPartition, String>>() { }; |
| private static final ObjectWriter SSP_OFFSET_OBJECT_WRITER = OBJECT_MAPPER.writerFor(OFFSETS_TYPE_REFERENCE); |
| private static final String SST_FILE_SUFFIX = ".sst"; |
| private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new CheckpointV2Serde(); |
| |
| /** |
| * Unlike checkpoint or offset files, side input offset file can have multiple writers / readers, |
| * since they are written during SideInputTask commit and copied to store checkpoint directory |
| * by TaskStorageCommitManager during regular TaskInstance commit (these commits are on separate run loops). |
| * |
| * We use a (process-wide) semaphore to ensure that such write and copy operations are thread-safe. |
| * |
| * To avoid deadlocks between the two run loops, the semaphore should be acquired and released within |
| * the same method call, and any such methods should not call each other while holding a permit. |
| * We use a Semaphore instead of a ReentrantLock to make such cases easier to detect. |
| */ |
| private static final Semaphore SIDE_INPUT_OFFSET_FILE_SEMAPHORE = new Semaphore(1); |
| |
| /** |
| * Fetch the starting offset for the input {@link SystemStreamPartition} |
| * |
| * Note: The method doesn't respect {@link org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT} and |
| * {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET} configurations. It will use the locally |
| * checkpointed offset if it is valid, or fall back to oldest offset of the stream. |
| * |
| * @param ssp system stream partition for which starting offset is requested |
| * @param admin system admin associated with the ssp |
| * @param fileOffset local file offset for the ssp |
| * @param oldestOffset oldest offset for the ssp from the source |
| * @return starting offset for the incoming {@link SystemStreamPartition} |
| */ |
| public String getStartingOffset( |
| SystemStreamPartition ssp, SystemAdmin admin, String fileOffset, String oldestOffset) { |
| String startingOffset = oldestOffset; |
| if (fileOffset != null) { |
| // File offset was the last message written to the local checkpoint that is also reflected in the store, |
| // so we start with the NEXT offset |
| String resumeOffset = admin.getOffsetsAfter(ImmutableMap.of(ssp, fileOffset)).get(ssp); |
| if (admin.offsetComparator(oldestOffset, resumeOffset) <= 0) { |
| startingOffset = resumeOffset; |
| } else { |
| // If the offset we plan to use is older than the oldest offset, just use the oldest offset. |
| // This can happen with source of the store(changelog, etc) configured with a TTL cleanup policy |
| LOG.warn("Local store offset {} is lower than the oldest offset {} of the source stream." |
| + " The values between these offsets cannot be restored.", resumeOffset, oldestOffset); |
| } |
| } |
| LOG.info("Starting offset for SystemStreamPartition {} is {}, fileOffset: {}, oldestOffset from source: {}", ssp, |
| startingOffset, fileOffset, oldestOffset); |
| return startingOffset; |
| } |
| |
| /** |
| * Checks if the store is stale. If the time elapsed since the last modified time of the offset file is greater than |
| * the {@code storeDeleteRetentionInMs}, then the store is considered stale. |
| * |
| * @param storeDir the base directory of the store |
| * @param storeDeleteRetentionInMs store delete retention in millis |
| * @param currentTimeMs current time in ms |
| * @param isSideInput true if store is a side-input store, false if it is a regular store |
| * @return true if the store is stale, false otherwise |
| */ |
| public boolean isStaleStore(File storeDir, long storeDeleteRetentionInMs, long currentTimeMs, boolean isSideInput) { |
| long offsetFileLastModifiedTime; |
| boolean isStaleStore = false; |
| String storePath = storeDir.toPath().toString(); |
| |
| if (storeDir.exists()) { |
| |
| // We check if the new offset-file exists, if so we use its last-modified time, if it doesn't we use the legacy file |
| // depending on if it is a side-input or not, |
| // if neither exists, we use 0L (the default return value of lastModified() when file does not exist |
| File offsetFileRefNew = new File(storeDir, OFFSET_FILE_NAME_NEW); |
| File offsetFileRefLegacy = new File(storeDir, OFFSET_FILE_NAME_LEGACY); |
| File checkpointV2File = new File(storeDir, CHECKPOINT_FILE_NAME); |
| |
| if (checkpointV2File.exists()) { |
| offsetFileLastModifiedTime = checkpointV2File.lastModified(); |
| } else if (offsetFileRefNew.exists()) { |
| offsetFileLastModifiedTime = offsetFileRefNew.lastModified(); |
| } else if (!isSideInput && offsetFileRefLegacy.exists()) { |
| offsetFileLastModifiedTime = offsetFileRefLegacy.lastModified(); |
| } else if (isSideInput) { |
| try { |
| SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly(); |
| File sideInputOffsetFileRefLegacy = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY); |
| if (sideInputOffsetFileRefLegacy.exists()) { |
| offsetFileLastModifiedTime = sideInputOffsetFileRefLegacy.lastModified(); |
| } else { |
| offsetFileLastModifiedTime = 0L; |
| } |
| } finally { |
| SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release(); |
| } |
| } else { |
| offsetFileLastModifiedTime = 0L; |
| } |
| |
| if ((currentTimeMs - offsetFileLastModifiedTime) >= storeDeleteRetentionInMs) { |
| LOG.info( |
| String.format("Store: %s is stale since lastModifiedTime of offset file: %d, is older than store deleteRetentionMs: %d.", |
| storePath, offsetFileLastModifiedTime, storeDeleteRetentionInMs)); |
| isStaleStore = true; |
| } |
| } else { |
| LOG.info("Storage partition directory: {} does not exist.", storePath); |
| } |
| return isStaleStore; |
| } |
| |
| /** |
| * Directory loggedStoreDir associated with the store storeName is valid if all of the following |
| * conditions are true: |
| * a) If the store is a persistent store. |
| * b) If there is a valid offset file associated with the store. |
| * c) If the store has not gone stale. |
| * |
| * @return true if the logged store is valid, false otherwise. |
| */ |
| public boolean isLoggedStoreValid(String storeName, File loggedStoreDir, Config config, |
| Map<String, SystemStream> storeChangelogs, TaskModel taskModel, Clock clock, Map<String, StorageEngine> taskStores) { |
| long changeLogDeleteRetentionInMs = new StorageConfig(config).getChangeLogDeleteRetentionInMs(storeName); |
| |
| if (storeChangelogs.containsKey(storeName)) { |
| SystemStreamPartition changelogSSP = new SystemStreamPartition( |
| storeChangelogs.get(storeName), taskModel.getChangelogPartition()); |
| |
| return taskStores.get(storeName).getStoreProperties().isPersistedToDisk() |
| && isOffsetFileValid(loggedStoreDir, Collections.singleton(changelogSSP), false) |
| && !isStaleStore(loggedStoreDir, changeLogDeleteRetentionInMs, clock.currentTimeMillis(), false); |
| } |
| |
| return false; |
| } |
| |
| /** |
| * An offset file associated with logged store {@code storeDir} is valid if it exists, is not empty, |
| * and the offsets are for the store's current changelog or side input SSPs. |
| * |
| * @param storeDir the base directory of the store |
| * @param storeSSPs ssps (if any) associated with the store |
| * @param isSideInput true if store is a side-input store, false if it is a regular store |
| * @return true if the offset file is valid. false otherwise. |
| */ |
| public boolean isOffsetFileValid(File storeDir, Set<SystemStreamPartition> storeSSPs, boolean isSideInput) { |
| boolean hasValidOffsetFile = false; |
| if (storeDir.exists()) { |
| Map<SystemStreamPartition, String> offsetContents = readOffsetFile(storeDir, storeSSPs, isSideInput); |
| if (offsetContents == null) { |
| LOG.info("Offset file is invalid since it does not exist. Store directory: {}", storeDir.toPath()); |
| } else if (offsetContents.isEmpty()) { |
| LOG.info("Offset file is invalid since it is empty. Store directory: {}", storeDir.toPath()); |
| } else if (!offsetContents.keySet().equals(storeSSPs)) { |
| LOG.info("Offset file is invalid since changelog or side input SSPs don't match. " |
| + "Store directory: {}. SSPs from offset-file: {} SSPs expected: {} ", |
| storeDir.toPath(), offsetContents.keySet(), storeSSPs); |
| } else { |
| hasValidOffsetFile = true; |
| } |
| } |
| |
| return hasValidOffsetFile; |
| } |
| |
| /** |
| * Write the given SSP-Offset map into the offsets file. |
| * @param storeDir the directory of the store |
| * @param offsets The SSP-offset to write |
| * @param isSideInput true if store is a side-input store, false if it is a regular store |
| * @throws IOException because of deserializing to json |
| */ |
| public void writeOffsetFile(File storeDir, Map<SystemStreamPartition, String> offsets, boolean isSideInput) throws IOException { |
| |
| // First, we write the new-format offset file |
| File offsetFile = new File(storeDir, OFFSET_FILE_NAME_NEW); |
| String fileContents = SSP_OFFSET_OBJECT_WRITER.writeValueAsString(offsets); |
| FileUtil fileUtil = new FileUtil(); |
| fileUtil.writeWithChecksum(offsetFile, fileContents); |
| |
| // Now we write the old format offset file, which are different for store-offset and side-inputs |
| if (isSideInput) { |
| SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly(); |
| try { |
| offsetFile = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY); |
| fileContents = SSP_OFFSET_OBJECT_WRITER.writeValueAsString(offsets); |
| fileUtil.writeWithChecksum(offsetFile, fileContents); |
| } finally { |
| SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release(); |
| } |
| } else { |
| offsetFile = new File(storeDir, OFFSET_FILE_NAME_LEGACY); |
| fileUtil.writeWithChecksum(offsetFile, offsets.entrySet().iterator().next().getValue()); |
| } |
| } |
| |
| /** |
| * Writes the checkpoint to the store checkpoint directory based on the checkpointId. |
| * |
| * @param storeDir store or store checkpoint directory to write the checkpoint to |
| * @param checkpoint checkpoint v2 containing the checkpoint Id |
| */ |
| public void writeCheckpointV2File(File storeDir, CheckpointV2 checkpoint) { |
| File offsetFile = new File(storeDir, CHECKPOINT_FILE_NAME); |
| byte[] fileContents = CHECKPOINT_V2_SERDE.toBytes(checkpoint); |
| FileUtil fileUtil = new FileUtil(); |
| fileUtil.writeWithChecksum(offsetFile, new String(fileContents)); |
| } |
| |
| public void copySideInputOffsetFileToCheckpointDir(File storeBaseDir, |
| TaskName taskName, String storeName, TaskMode taskMode, CheckpointId checkpointId) { |
| File storeDir = getTaskStoreDir(storeBaseDir, storeName, taskName, taskMode); |
| File checkpointDir = new File(getStoreCheckpointDir(storeDir, checkpointId)); |
| |
| SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly(); |
| try { |
| File storeSideInputOffsetsFile = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY); |
| File checkpointDirSideInputOffsetsFile = new File(checkpointDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY); |
| if (storeSideInputOffsetsFile.exists()) { |
| FileUtils.copyFile(storeSideInputOffsetsFile, checkpointDirSideInputOffsetsFile); |
| } else { |
| LOG.info("Did not find the file to copy: {} for taskName: {} taskMode: {} storeName: {} checkpointId: {}", |
| SIDE_INPUT_OFFSET_FILE_NAME_LEGACY, taskName, taskMode, storeName, checkpointId); |
| } |
| } catch (IOException e) { |
| String msg = String.format( |
| "Error copying %s file to checkpoint dir for taskName: %s taskMode: %s storeName: %s checkpointId: %s", |
| SIDE_INPUT_OFFSET_FILE_NAME_LEGACY, taskName, taskMode, storeName, checkpointId); |
| LOG.error(msg, e); |
| throw new SamzaException(msg, e); |
| } finally { |
| SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release(); |
| } |
| } |
| |
| /** |
| * Delete the offset file for this store, if one exists. |
| * @param storeDir the directory of the store |
| */ |
| public void deleteOffsetFile(File storeDir) { |
| deleteOffsetFile(storeDir, OFFSET_FILE_NAME_NEW); |
| deleteOffsetFile(storeDir, OFFSET_FILE_NAME_LEGACY); |
| } |
| |
| /** |
| * Delete the given offsetFile for the store if it exists. |
| */ |
| private void deleteOffsetFile(File storeDir, String offsetFileName) { |
| File offsetFile = new File(storeDir, offsetFileName); |
| if (offsetFile.exists()) { |
| new FileUtil().rm(offsetFile); |
| } |
| } |
| |
| /** |
| * Check if a store's disk footprint exists. |
| * |
| * @param storeDir the base directory of the store |
| * @return true if a non-empty storeDir exists, false otherwise |
| */ |
| public boolean storeExists(File storeDir) { |
| return storeDir.exists() && storeDir.list().length > 0; |
| } |
| |
| /** |
| * Read and return the offset from the directory's offset file |
| * |
| * @param storagePartitionDir the base directory of the store |
| * @param storeSSPs SSPs associated with the store (if any) |
| * @param isSideInput, true if the store is a side-input store, false otherwise |
| * @return the content of the offset file if it exists for the store, null otherwise. |
| */ |
| public Map<SystemStreamPartition, String> readOffsetFile(File storagePartitionDir, Set<SystemStreamPartition> storeSSPs, boolean isSideInput) { |
| |
| File offsetFileRefNew = new File(storagePartitionDir, OFFSET_FILE_NAME_NEW); |
| File offsetFileRefLegacy = new File(storagePartitionDir, OFFSET_FILE_NAME_LEGACY); |
| |
| // First we check if the new offset file exists, if it does we read offsets from it regardless of old or new format, |
| // if it doesn't exist, we check if the store is non-sideInput and legacy-offset file exists, if so we read offsets |
| // from the old non-side-input offset file (regardless of the offset format), |
| // last, we check if the store is a sideInput and the old side-input-offset file exists |
| if (offsetFileRefNew.exists()) { |
| return readOffsetFile(storagePartitionDir, offsetFileRefNew.getName(), storeSSPs); |
| } else if (!isSideInput && offsetFileRefLegacy.exists()) { |
| return readOffsetFile(storagePartitionDir, offsetFileRefLegacy.getName(), storeSSPs); |
| } else if (isSideInput) { |
| Map<SystemStreamPartition, String> result = new HashMap<>(); |
| SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly(); |
| try { |
| File sideInputOffsetFileRefLegacy = new File(storagePartitionDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY); |
| if (sideInputOffsetFileRefLegacy.exists()) { |
| result = readOffsetFile(storagePartitionDir, sideInputOffsetFileRefLegacy.getName(), storeSSPs); |
| } |
| } finally { |
| SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release(); |
| } |
| return result; |
| } else { |
| return new HashMap<>(); |
| } |
| } |
| |
| /** |
| * Read and return the {@link CheckpointV2} from the directory's {@link #CHECKPOINT_FILE_NAME} file. |
| * If the file does not exist, returns null. |
| * |
| * @param storagePartitionDir store directory to read the checkpoint file from |
| * @return the {@link CheckpointV2} object retrieved from the checkpoint file if found, otherwise return null |
| */ |
| // TODO dchen use checkpoint v2 file before migrating off of dual checkpoints |
| public CheckpointV2 readCheckpointV2File(File storagePartitionDir) { |
| File checkpointFile = new File(storagePartitionDir, CHECKPOINT_FILE_NAME); |
| if (checkpointFile.exists()) { |
| String serializedCheckpointV2 = new FileUtil().readWithChecksum(checkpointFile); |
| return new CheckpointV2Serde().fromBytes(serializedCheckpointV2.getBytes()); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Read and return the contents of the offset file. |
| * |
| * @param storagePartitionDir the base directory of the store |
| * @param offsetFileName the name of the offset file |
| * @param storeSSPs SSPs associated with the store (if any) |
| * @return the content of the offset file if it exists for the store, null otherwise. |
| */ |
| private Map<SystemStreamPartition, String> readOffsetFile(File storagePartitionDir, String offsetFileName, Set<SystemStreamPartition> storeSSPs) { |
| Map<SystemStreamPartition, String> offsets = new HashMap<>(); |
| String fileContents = null; |
| File offsetFileRef = new File(storagePartitionDir, offsetFileName); |
| String storePath = storagePartitionDir.getPath(); |
| |
| if (offsetFileRef.exists()) { |
| LOG.debug("Found offset file in storage partition directory: {}", storePath); |
| try { |
| fileContents = new FileUtil().readWithChecksum(offsetFileRef); |
| offsets = OBJECT_MAPPER.readValue(fileContents, OFFSETS_TYPE_REFERENCE); |
| } catch (JsonParseException | JsonMappingException e) { |
| LOG.info("Exception in json-parsing offset file {} {}, reading as string offset-value", storagePartitionDir.toPath(), offsetFileName); |
| final String finalFileContents = fileContents; |
| offsets = (storeSSPs.size() == 1) ? storeSSPs.stream().collect(Collectors.toMap(ssp -> ssp, offset -> finalFileContents)) : offsets; |
| } catch (Exception e) { |
| LOG.warn("Failed to read offset file in storage partition directory: {}", storePath, e); |
| } |
| } else { |
| LOG.info("No offset file found in storage partition directory: {}", storePath); |
| } |
| |
| return offsets; |
| } |
| |
| /** |
| * Creates and returns a File pointing to the directory for the given store and task, given a particular base directory. |
| * In case of a standby task (TaskMode.Standby), the storeDirectory is the same as it would be for an active task. |
| * |
| * @param storeBaseDir the base directory to use |
| * @param storeName the store name to use |
| * @param taskName the task name which is referencing the store |
| * @param taskMode the mode of the given task |
| * @return the partition directory for the store |
| */ |
| public File getTaskStoreDir(File storeBaseDir, String storeName, TaskName taskName, TaskMode taskMode) { |
| TaskName taskNameForDirName = taskName; |
| if (taskMode.equals(TaskMode.Standby)) { |
| taskNameForDirName = StandbyTaskUtil.getActiveTaskName(taskName); |
| } |
| return new File(storeBaseDir, (storeName + File.separator + taskNameForDirName.toString()).replace(' ', '_')); |
| } |
| |
| public List<File> getTaskStoreCheckpointDirs(File storeBaseDir, String storeName, |
| TaskName taskName, TaskMode taskMode) { |
| try { |
| File storeDir = new File(storeBaseDir, storeName); |
| String taskStoreName = getTaskStoreDir(storeBaseDir, storeName, taskName, taskMode).getName(); |
| |
| if (storeDir.exists()) { // new store or no local state |
| List<File> checkpointDirs = Files.list(storeDir.toPath()) |
| .map(Path::toFile) |
| .filter(file -> file.getName().contains(taskStoreName + "-")) |
| .collect(Collectors.toList()); |
| return checkpointDirs; |
| } else { |
| return Collections.emptyList(); |
| } |
| } catch (IOException e) { |
| throw new SamzaException( |
| String.format("Error finding checkpoint dirs for task: %s mode: %s store: %s in dir: %s", |
| taskName, taskMode, storeName, storeBaseDir), e); |
| } |
| } |
| |
| /** |
| * Returns the path for a storage engine to create its checkpoint based on the current checkpoint id. |
| * |
| * @param taskStoreDir directory of the store as returned by {@link #getTaskStoreDir} |
| * @param checkpointId current checkpoint id |
| * @return String denoting the file path of the store with the given checkpoint id |
| */ |
| public String getStoreCheckpointDir(File taskStoreDir, CheckpointId checkpointId) { |
| return taskStoreDir.getPath() + "-" + checkpointId.serialize(); |
| } |
| |
| public void restoreCheckpointFiles(File checkpointDir, File storeDir) { |
| // the current task store dir should already be deleted for restore |
| assert !storeDir.exists(); |
| |
| try { |
| Files.createDirectory(storeDir.toPath()); |
| |
| for (File file : checkpointDir.listFiles()) { |
| String fileName = file.getName(); |
| File targetFile = new File(storeDir, fileName); |
| |
| if (fileName.endsWith(SST_FILE_SUFFIX)) { |
| // hardlink'ing the immutable sst-files. |
| Files.createLink(targetFile.toPath(), file.toPath()); |
| } else { |
| // true copy for all other files. |
| Files.copy(file.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); |
| } |
| } |
| } catch (Exception e) { |
| throw new SamzaException(String.format( |
| "Failed to restore store from checkpoint dir: %s to current dir: %s", checkpointDir, storeDir), e); |
| } |
| } |
| } |