| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.storage; |
| |
| import java.io.File; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import org.apache.samza.Partition; |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.config.StorageConfig; |
| import org.apache.samza.job.model.TaskModel; |
| import org.apache.samza.system.ChangelogSSPIterator; |
| import org.apache.samza.system.StreamMetadataCache; |
| import org.apache.samza.system.StreamSpec; |
| import org.apache.samza.system.SystemAdmin; |
| import org.apache.samza.system.SystemAdmins; |
| import org.apache.samza.system.SystemConsumer; |
| import org.apache.samza.system.SystemStream; |
| import org.apache.samza.system.SystemStreamMetadata; |
| import org.apache.samza.system.SystemStreamPartition; |
| import org.apache.samza.util.Clock; |
| import org.apache.samza.util.FileUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import scala.collection.JavaConverters; |
| |
| |
| /** |
| * This is the legacy state restoration, based on changelog and offset files. |
| * |
| * Restore logic for all stores of a task including directory cleanup, setup, changelogSSP validation, registering |
| * with the respective consumer, restoring stores, and stopping stores. |
| */ |
| class NonTransactionalStateTaskRestoreManager implements TaskRestoreManager { |
| private static final Logger LOG = LoggerFactory.getLogger(NonTransactionalStateTaskRestoreManager.class); |
| |
| private final Map<String, StorageEngine> taskStores; // Map of all StorageEngines for this task indexed by store name |
| private final Set<String> taskStoresToRestore; |
| // Set of store names which need to be restored by consuming using system-consumers (see registerStartingOffsets) |
| |
| private final TaskModel taskModel; |
| private final Clock clock; // Clock value used to validate base-directories for staleness. See isLoggedStoreValid. |
| private Map<SystemStream, String> changeLogOldestOffsets; // Map of changelog oldest known offsets |
| private final Map<SystemStreamPartition, String> fileOffsets; // Map of offsets read from offset file indexed by changelog SSP |
| private final Map<String, SystemStream> changelogSystemStreams; // Map of change log system-streams indexed by store name |
| private final SystemAdmins systemAdmins; |
| private final File loggedStoreBaseDirectory; |
| private final File nonLoggedStoreBaseDirectory; |
| private final StreamMetadataCache streamMetadataCache; |
| private final Map<String, SystemConsumer> storeConsumers; |
| private final int maxChangeLogStreamPartitions; |
| private final StorageConfig storageConfig; |
| private final StorageManagerUtil storageManagerUtil; |
| |
| NonTransactionalStateTaskRestoreManager( |
| TaskModel taskModel, |
| Map<String, SystemStream> changelogSystemStreams, |
| Map<String, StorageEngine> taskStores, |
| SystemAdmins systemAdmins, |
| StreamMetadataCache streamMetadataCache, |
| Map<String, SystemConsumer> storeConsumers, |
| int maxChangeLogStreamPartitions, |
| File loggedStoreBaseDirectory, |
| File nonLoggedStoreBaseDirectory, |
| Config config, |
| Clock clock) { |
| this.taskStores = taskStores; |
| this.taskModel = taskModel; |
| this.clock = clock; |
| this.changelogSystemStreams = changelogSystemStreams; |
| this.systemAdmins = systemAdmins; |
| this.fileOffsets = new HashMap<>(); |
| this.taskStoresToRestore = this.taskStores.entrySet().stream() |
| .filter(x -> x.getValue().getStoreProperties().isLoggedStore()) |
| .map(x -> x.getKey()).collect(Collectors.toSet()); |
| this.loggedStoreBaseDirectory = loggedStoreBaseDirectory; |
| this.nonLoggedStoreBaseDirectory = nonLoggedStoreBaseDirectory; |
| this.streamMetadataCache = streamMetadataCache; |
| this.storeConsumers = storeConsumers; |
| this.maxChangeLogStreamPartitions = maxChangeLogStreamPartitions; |
| this.storageConfig = new StorageConfig(config); |
| this.storageManagerUtil = new StorageManagerUtil(); |
| } |
| |
| /** |
| * Cleans up and sets up store directories, validates changeLog SSPs for all stores of this task, |
| * and registers SSPs with the respective consumers. |
| */ |
| @Override |
| public void init(Map<SystemStreamPartition, String> checkpointedChangelogSSPOffsets) { |
| cleanBaseDirsAndReadOffsetFiles(); |
| setupBaseDirs(); |
| validateChangelogStreams(); |
| getOldestChangeLogOffsets(); |
| registerStartingOffsets(); |
| } |
| |
| /** For each store for this task, |
| * a. Deletes the corresponding non-logged-store base dir. |
| * b. Deletes the logged-store-base-dir (depending on {@param cleanLoggedStoreDirs} value). See {@link #isLoggedStoreValid} for validation semantics. |
| * c. If the logged-store-base-dir is valid, this method reads the offset file and stores each offset. |
| */ |
| private void cleanBaseDirsAndReadOffsetFiles() { |
| LOG.debug("Cleaning base directories for stores."); |
| |
| FileUtil fileUtil = new FileUtil(); |
| taskStores.forEach((storeName, storageEngine) -> { |
| if (!storageEngine.getStoreProperties().isLoggedStore()) { |
| File nonLoggedStorePartitionDir = |
| storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode()); |
| LOG.info("Got non logged storage partition directory as " + nonLoggedStorePartitionDir.toPath().toString()); |
| |
| if (nonLoggedStorePartitionDir.exists()) { |
| LOG.info("Deleting non logged storage partition directory " + nonLoggedStorePartitionDir.toPath().toString()); |
| fileUtil.rm(nonLoggedStorePartitionDir); |
| } |
| } else { |
| File loggedStorePartitionDir = |
| storageManagerUtil.getTaskStoreDir(loggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode()); |
| LOG.info("Got logged storage partition directory as " + loggedStorePartitionDir.toPath().toString()); |
| |
| // Delete the logged store if it is not valid. |
| if (!isLoggedStoreValid(storeName, loggedStorePartitionDir) || storageConfig.getCleanLoggedStoreDirsOnStart(storeName)) { |
| LOG.info("Deleting logged storage partition directory " + loggedStorePartitionDir.toPath().toString()); |
| fileUtil.rm(loggedStorePartitionDir); |
| } else { |
| |
| SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition()); |
| Map<SystemStreamPartition, String> offset = |
| storageManagerUtil.readOffsetFile(loggedStorePartitionDir, Collections.singleton(changelogSSP), false); |
| LOG.info("Read offset {} for the store {} from logged storage partition directory {}", offset, storeName, loggedStorePartitionDir); |
| |
| if (offset.containsKey(changelogSSP)) { |
| fileOffsets.put(changelogSSP, offset.get(changelogSSP)); |
| } |
| } |
| } |
| }); |
| } |
| |
| /** |
| * Directory loggedStoreDir associated with the logged store storeName is determined to be valid |
| * if all of the following conditions are true. |
| * a) If the store has to be persisted to disk. |
| * b) If there is a valid offset file associated with the logged store. |
| * c) If the logged store has not gone stale. |
| * |
| * @return true if the logged store is valid, false otherwise. |
| */ |
| private boolean isLoggedStoreValid(String storeName, File loggedStoreDir) { |
| long changeLogDeleteRetentionInMs = storageConfig.getChangeLogDeleteRetentionInMs(storeName); |
| |
| if (changelogSystemStreams.containsKey(storeName)) { |
| SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition()); |
| return this.taskStores.get(storeName).getStoreProperties().isPersistedToDisk() |
| && storageManagerUtil.isOffsetFileValid(loggedStoreDir, Collections.singleton(changelogSSP), false) |
| && !storageManagerUtil.isStaleStore(loggedStoreDir, changeLogDeleteRetentionInMs, clock.currentTimeMillis(), false); |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Create stores' base directories for logged-stores if they dont exist. |
| */ |
| private void setupBaseDirs() { |
| LOG.debug("Setting up base directories for stores."); |
| taskStores.forEach((storeName, storageEngine) -> { |
| if (storageEngine.getStoreProperties().isLoggedStore()) { |
| |
| File loggedStorePartitionDir = |
| storageManagerUtil.getTaskStoreDir(loggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode()); |
| |
| LOG.info("Using logged storage partition directory: " + loggedStorePartitionDir.toPath().toString() |
| + " for store: " + storeName); |
| |
| if (!loggedStorePartitionDir.exists()) { |
| loggedStorePartitionDir.mkdirs(); |
| } |
| } else { |
| File nonLoggedStorePartitionDir = |
| storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode()); |
| LOG.info("Using non logged storage partition directory: " + nonLoggedStorePartitionDir.toPath().toString() |
| + " for store: " + storeName); |
| nonLoggedStorePartitionDir.mkdirs(); |
| } |
| }); |
| } |
| |
| /** |
| * Validates each changelog system-stream with its respective SystemAdmin. |
| */ |
| private void validateChangelogStreams() { |
| LOG.info("Validating change log streams: " + changelogSystemStreams); |
| |
| for (SystemStream changelogSystemStream : changelogSystemStreams.values()) { |
| SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(changelogSystemStream.getSystem()); |
| StreamSpec changelogSpec = |
| StreamSpec.createChangeLogStreamSpec(changelogSystemStream.getStream(), changelogSystemStream.getSystem(), |
| maxChangeLogStreamPartitions); |
| |
| systemAdmin.validateStream(changelogSpec); |
| } |
| } |
| |
| /** |
| * Get the oldest offset for each changelog SSP based on the stream's metadata (obtained from streamMetadataCache). |
| */ |
| private void getOldestChangeLogOffsets() { |
| |
| Map<SystemStream, SystemStreamMetadata> changeLogMetadata = JavaConverters.mapAsJavaMapConverter( |
| streamMetadataCache.getStreamMetadata( |
| JavaConverters.asScalaSetConverter(new HashSet<>(changelogSystemStreams.values())).asScala().toSet(), |
| false)).asJava(); |
| |
| LOG.info("Got change log stream metadata: {}", changeLogMetadata); |
| |
| changeLogOldestOffsets = |
| getChangeLogOldestOffsetsForPartition(taskModel.getChangelogPartition(), changeLogMetadata); |
| LOG.info("Assigning oldest change log offsets for taskName {} : {}", taskModel.getTaskName(), |
| changeLogOldestOffsets); |
| } |
| |
| /** |
| * Builds a map from SystemStreamPartition to oldest offset for changelogs. |
| */ |
| private Map<SystemStream, String> getChangeLogOldestOffsetsForPartition(Partition partition, |
| Map<SystemStream, SystemStreamMetadata> inputStreamMetadata) { |
| |
| Map<SystemStream, String> retVal = new HashMap<>(); |
| |
| // NOTE: do not use Collectors.Map because of https://bugs.openjdk.java.net/browse/JDK-8148463 |
| inputStreamMetadata.entrySet() |
| .stream() |
| .filter(x -> x.getValue().getSystemStreamPartitionMetadata().get(partition) != null) |
| .forEach(e -> retVal.put(e.getKey(), |
| e.getValue().getSystemStreamPartitionMetadata().get(partition).getOldestOffset())); |
| |
| return retVal; |
| } |
| |
| /** |
| * Determines the starting offset for each store SSP (based on {@link #getStartingOffset(SystemStreamPartition, SystemAdmin)}) and |
| * registers it with the respective SystemConsumer for starting consumption. |
| */ |
| private void registerStartingOffsets() { |
| |
| for (Map.Entry<String, SystemStream> changelogSystemStreamEntry : changelogSystemStreams.entrySet()) { |
| SystemStreamPartition systemStreamPartition = |
| new SystemStreamPartition(changelogSystemStreamEntry.getValue(), taskModel.getChangelogPartition()); |
| SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(changelogSystemStreamEntry.getValue().getSystem()); |
| SystemConsumer systemConsumer = storeConsumers.get(changelogSystemStreamEntry.getKey()); |
| |
| String offset = getStartingOffset(systemStreamPartition, systemAdmin); |
| |
| if (offset != null) { |
| LOG.info("Registering change log consumer with offset " + offset + " for %" + systemStreamPartition); |
| systemConsumer.register(systemStreamPartition, offset); |
| } else { |
| LOG.info("Skipping change log restoration for {} because stream appears to be empty (offset was null).", |
| systemStreamPartition); |
| taskStoresToRestore.remove(changelogSystemStreamEntry.getKey()); |
| } |
| } |
| } |
| |
| /** |
| * Returns the offset with which the changelog consumer should be initialized for the given SystemStreamPartition. |
| * |
| * If a file offset exists, it represents the last changelog offset which is also reflected in the on-disk state. |
| * In that case, we use the next offset after the file offset, as long as it is newer than the oldest offset |
| * currently available in the stream. |
| * |
| * If there isn't a file offset or it's older than the oldest available offset, we simply start with the oldest. |
| * |
| * @param systemStreamPartition the changelog partition for which the offset is needed. |
| * @param systemAdmin the [[SystemAdmin]] for the changelog. |
| * @return the offset to from which the changelog consumer should be initialized. |
| */ |
| private String getStartingOffset(SystemStreamPartition systemStreamPartition, SystemAdmin systemAdmin) { |
| String fileOffset = fileOffsets.get(systemStreamPartition); |
| |
| // NOTE: changeLogOldestOffsets may contain a null-offset for the given SSP (signifying an empty stream) |
| // therefore, we need to differentiate that from the case where the offset is simply missing |
| if (!changeLogOldestOffsets.containsKey(systemStreamPartition.getSystemStream())) { |
| throw new SamzaException("Missing a change log offset for " + systemStreamPartition); |
| } |
| |
| String oldestOffset = changeLogOldestOffsets.get(systemStreamPartition.getSystemStream()); |
| return storageManagerUtil.getStartingOffset(systemStreamPartition, systemAdmin, fileOffset, oldestOffset); |
| } |
| |
| /** |
| * Restore each store in taskStoresToRestore sequentially |
| */ |
| @Override |
| public void restore() throws InterruptedException { |
| for (String storeName : taskStoresToRestore) { |
| LOG.info("Restoring store: {} for task: {}", storeName, taskModel.getTaskName()); |
| SystemConsumer systemConsumer = storeConsumers.get(storeName); |
| SystemStream systemStream = changelogSystemStreams.get(storeName); |
| SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem()); |
| ChangelogSSPIterator changelogSSPIterator = new ChangelogSSPIterator(systemConsumer, |
| new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()), null, systemAdmin, false); |
| |
| taskStores.get(storeName).restore(changelogSSPIterator); |
| } |
| } |
| |
| /** |
| * Stop only persistent stores. In case of certain stores and store mode (such as RocksDB), this |
| * can invoke compaction. |
| */ |
| public void stopPersistentStores() { |
| |
| Map<String, StorageEngine> persistentStores = this.taskStores.entrySet().stream().filter(e -> { |
| return e.getValue().getStoreProperties().isPersistedToDisk(); |
| }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); |
| |
| persistentStores.forEach((storeName, storageEngine) -> { |
| storageEngine.stop(); |
| this.taskStores.remove(storeName); |
| }); |
| LOG.info("Stopped persistent stores {}", persistentStores); |
| } |
| } |