blob: c9ee065c04053f3883f71851a06702b44bae9c2e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeManager;
import org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.TaskInstanceCollector;
import org.apache.samza.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ContainerStorageManager is a per-container object that manages
* the restore of per-task partitions.
*
* It is responsible for
* a) performing all container-level actions for restore such as, initializing and shutting down
* taskStorage managers, starting, registering and stopping consumers, etc.
*
* b) performing individual task stores' restores in parallel.
*
* and
* c) restoring sideInputs.
* It provides bootstrap semantics for sideinputs -- the main thread is blocked until
* all sideInputSSPs have not caught up. Side input store flushes are not in sync with task-commit, although
* they happen at the same frequency.
* In case, where a user explicitly requests a task-commit, it will not include committing sideInputs.
*/
public class ContainerStorageManager {
private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class);
private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
private static final int RESTORE_THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS = 60;
/** Maps containing relevant per-task objects */
private final Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics;
private final Map<TaskName, TaskInstanceCollector> taskInstanceCollectors;
private final Map<TaskName, Map<String, StorageEngine>> inMemoryStores; // subset of taskStores after #start()
private Map<TaskName, Map<String, StorageEngine>> taskStores; // Will be available in #start() after #restoreStores()
private final Map<String, SystemConsumer> storeConsumers; // Mapping from store name to SystemConsumers
private final Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories; // Map of storageEngineFactories indexed by store name
private final Map<String, SystemStream> changelogSystemStreams;
private final Map<String, SystemStream> activeTaskChangelogSystemStreams; // Map of changelog system-streams indexed by store name
private final Map<String, Serde<Object>> serdes; // Map of Serde objects indexed by serde name (specified in config)
private final SerdeManager serdeManager;
private final SystemAdmins systemAdmins;
private final Map<String, SystemFactory> systemFactories;
private final Clock clock;
private final Map<String, StateBackendFactory> restoreStateBackendFactories;
private final Map<String, Set<SystemStream>> sideInputSystemStreams;
private final StreamMetadataCache streamMetadataCache;
private final SamzaContainerMetrics samzaContainerMetrics;
private final CheckpointManager checkpointManager;
/* Parameters required to re-create taskStores post-restoration */
private final ContainerModel containerModel;
private final JobContext jobContext;
private final ContainerContext containerContext;
private final File loggedStoreBaseDirectory;
private final File nonLoggedStoreBaseDirectory;
private final Set<Path> storeDirectoryPaths; // the set of store directory paths, used by SamzaContainer to initialize its disk-space-monitor
private final ExecutorService restoreExecutor;
private final Config config;
private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
private final Set<String> sideInputStoreNames;
private SideInputsManager sideInputsManager; // created in start() after restoreStores() for regular stores is complete.
private boolean isStarted = false;
public ContainerStorageManager(
CheckpointManager checkpointManager,
ContainerModel containerModel,
StreamMetadataCache streamMetadataCache,
SystemAdmins systemAdmins,
Map<String, SystemStream> changelogSystemStreams,
Map<String, Set<SystemStream>> sideInputSystemStreams,
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Map<String, SystemFactory> systemFactories,
Map<String, Serde<Object>> serdes,
Config config,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
SamzaContainerMetrics samzaContainerMetrics,
JobContext jobContext,
ContainerContext containerContext,
Map<String, StateBackendFactory> restoreStateBackendFactories,
Map<TaskName, TaskInstanceCollector> taskInstanceCollectors,
File loggedStoreBaseDirectory,
File nonLoggedStoreBaseDirectory,
SerdeManager serdeManager,
Clock clock) {
this.checkpointManager = checkpointManager;
this.containerModel = containerModel;
this.streamMetadataCache = streamMetadataCache;
this.systemAdmins = systemAdmins;
this.changelogSystemStreams = changelogSystemStreams;
this.sideInputSystemStreams = sideInputSystemStreams;
this.storageEngineFactories = storageEngineFactories;
this.systemFactories = systemFactories;
this.serdes = serdes;
this.config = config;
this.taskInstanceMetrics = taskInstanceMetrics;
this.samzaContainerMetrics = samzaContainerMetrics;
this.jobContext = jobContext;
this.containerContext = containerContext;
this.restoreStateBackendFactories = restoreStateBackendFactories;
this.taskInstanceCollectors = taskInstanceCollectors;
this.loggedStoreBaseDirectory = loggedStoreBaseDirectory;
this.nonLoggedStoreBaseDirectory = nonLoggedStoreBaseDirectory;
this.serdeManager = serdeManager;
this.clock = clock;
this.sideInputStoreNames =
ContainerStorageManagerUtil.getSideInputStoreNames(sideInputSystemStreams, changelogSystemStreams, containerModel);
this.activeTaskChangelogSystemStreams =
ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams(changelogSystemStreams, containerModel);
LOG.info("Starting with changelogSystemStreams = {} activeTaskChangelogSystemStreams = {} sideInputSystemStreams = {}",
changelogSystemStreams, activeTaskChangelogSystemStreams, sideInputSystemStreams);
if (loggedStoreBaseDirectory != null && loggedStoreBaseDirectory.equals(nonLoggedStoreBaseDirectory)) {
LOG.warn("Logged and non-logged store base directory are configured to same path: {}. It is recommended to configure"
+ "them separately to ensure clean up of non-logged store data doesn't accidentally impact logged store data.",
loggedStoreBaseDirectory);
}
this.storeDirectoryPaths = new HashSet<>();
this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(
activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames,
storeDirectoryPaths, containerModel, jobContext, containerContext,
taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
// Refactor Note (prateekm): in previous version, there's a subtle difference between 'this.changelogSystemStreams'
// (which is actually activeTaskChangelogSystemStreams) vs the passed in changelogSystemStreams.
// create a map from storeNames to changelog system consumers (1 per system in activeTaskChangelogSystemStreams)
this.storeConsumers = ContainerStorageManagerUtil.createStoreChangelogConsumers(
activeTaskChangelogSystemStreams, systemFactories, samzaContainerMetrics.registry(), config);
JobConfig jobConfig = new JobConfig(config);
int restoreThreadPoolSize =
Math.min(
Math.max(containerModel.getTasks().size() * restoreStateBackendFactories.size() * 2,
jobConfig.getRestoreThreadPoolSize()),
jobConfig.getRestoreThreadPoolMaxSize()
);
this.restoreExecutor = Executors.newFixedThreadPool(restoreThreadPoolSize,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
}
public void start() throws SamzaException, InterruptedException {
// Restores and recreates stores.
restoreStores();
// Shutdown restore executor since it will no longer be used
try {
restoreExecutor.shutdown();
if (restoreExecutor.awaitTermination(RESTORE_THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.MILLISECONDS)) {
restoreExecutor.shutdownNow();
}
} catch (Exception e) {
LOG.error("Error shutting down restore executor", e);
}
// create and restore side input stores
this.sideInputsManager = new SideInputsManager(
sideInputSystemStreams, systemFactories,
changelogSystemStreams, activeTaskChangelogSystemStreams,
storageEngineFactories, storeDirectoryPaths,
containerModel, jobContext, containerContext,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors,
streamMetadataCache, systemAdmins, serdeManager, serdes, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
// blocks until initial side inputs restore is complete
sideInputsManager.start();
// Refactor Note (prateekm): In the previous version, side input stores were created in constructor,
// and were added to taskStores (which are created in restoreStores()) at the end of restoreStores().
// In this version, side input stores are created much later during start() (this method),
// in the SideInputsManager constructor call above, and are added to taskStores here after side input restore
// (in sideInputsManager.start()) is complete. Completing the side input restore first isn't strictly necessary.
sideInputsManager.getSideInputStores().forEach((taskName, stores) -> {
if (!this.taskStores.containsKey(taskName)) {
taskStores.put(taskName, new HashMap<>());
}
taskStores.get(taskName).putAll(stores);
});
isStarted = true;
}
// Restoration of all stores, in parallel across tasks
private void restoreStores() throws InterruptedException {
LOG.info("Store Restore started");
Set<TaskName> activeTasks = ContainerStorageManagerUtil.getTasks(containerModel, TaskMode.Active).keySet();
// Find all non-side input stores
Set<String> nonSideInputStoreNames = storageEngineFactories.keySet()
.stream()
.filter(storeName -> !sideInputStoreNames.contains(storeName))
.collect(Collectors.toSet());
// Obtain the checkpoints for each task
Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers = new HashMap<>();
Map<TaskName, Checkpoint> taskCheckpoints = new HashMap<>();
containerModel.getTasks().forEach((taskName, taskModel) -> {
Checkpoint taskCheckpoint = null;
if (checkpointManager != null && activeTasks.contains(taskName)) {
// only pass in checkpoints for active tasks
taskCheckpoint = checkpointManager.readLastCheckpoint(taskName);
LOG.info("Obtained checkpoint: {} for state restore for taskName: {}", taskCheckpoint, taskName);
}
taskCheckpoints.put(taskName, taskCheckpoint);
Map<String, Set<String>> backendFactoryToStoreNames =
ContainerStorageManagerUtil.getBackendFactoryStoreNames(
nonSideInputStoreNames, taskCheckpoint, new StorageConfig(config));
Map<String, Set<String>> backendFactoryToSideInputStoreNames =
ContainerStorageManagerUtil.getBackendFactoryStoreNames(
sideInputStoreNames, taskCheckpoint, new StorageConfig(config));
// include side input stores for (initial bulk) restore if backed up using blob store state backend
String blobStoreStateBackendFactory = BlobStoreStateBackendFactory.class.getName();
if (backendFactoryToSideInputStoreNames.containsKey(blobStoreStateBackendFactory)) {
Set<String> sideInputStoreNames = backendFactoryToSideInputStoreNames.get(blobStoreStateBackendFactory);
if (backendFactoryToStoreNames.containsKey(blobStoreStateBackendFactory)) {
backendFactoryToStoreNames.get(blobStoreStateBackendFactory).addAll(sideInputStoreNames);
} else {
backendFactoryToStoreNames.put(blobStoreStateBackendFactory, sideInputStoreNames);
}
}
Map<String, TaskRestoreManager> taskStoreRestoreManagers =
ContainerStorageManagerUtil.createTaskRestoreManagers(
taskName, backendFactoryToStoreNames, restoreStateBackendFactories,
storageEngineFactories, storeConsumers,
inMemoryStores, systemAdmins, restoreExecutor,
taskModel, jobContext, containerContext,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
});
// Initialize each TaskStorageManager
taskRestoreManagers.forEach((taskName, restoreManagers) ->
restoreManagers.forEach((factoryName, taskRestoreManager) ->
taskRestoreManager.init(taskCheckpoints.get(taskName))
)
);
// Start each store consumer once.
// Note: These consumers are per system and only changelog system store consumers will be started.
// Some TaskRestoreManagers may not require the consumer to to be started, but due to the agnostic nature of
// ContainerStorageManager we always start the changelog consumer here in case it is required
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
List<Future<Void>> taskRestoreFutures = new ArrayList<>();
// Submit restore callable for each taskInstance
taskRestoreManagers.forEach((taskInstance, restoreManagersMap) -> {
// Submit for each restore factory
restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
long startTime = System.currentTimeMillis();
String taskName = taskInstance.getTaskName();
LOG.info("Starting restore for state for task: {}", taskName);
CompletableFuture<Void> restoreFuture = taskRestoreManager.restore().handle((res, ex) -> {
// Stop all persistent stores after restoring. Certain persistent stores opened in BulkLoad mode are compacted
// on stop, so paralleling stop() also parallelizes their compaction (a time-intensive operation).
try {
taskRestoreManager.close();
} catch (Exception e) {
LOG.error("Error closing restore manager for task: {} after {} restore",
taskName, ex != null ? "unsuccessful" : "successful", e);
// ignore exception from close. container may still be be able to continue processing/backups
// if restore manager close fails.
}
long timeToRestore = System.currentTimeMillis() - startTime;
if (samzaContainerMetrics != null) {
Gauge taskGauge = samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(taskInstance, null);
if (taskGauge != null) {
taskGauge.set(timeToRestore);
}
}
if (ex != null) {
// log and rethrow exception to communicate restore failure
String msg = String.format("Error restoring state for task: %s", taskName);
LOG.error(msg, ex);
throw new SamzaException(msg, ex); // wrap in unchecked exception to throw from lambda
} else {
return null;
}
});
taskRestoreFutures.add(restoreFuture);
});
});
// Loop-over the future list to wait for each restore to finish, catch any exceptions during restore and throw
// as samza exceptions
for (Future<Void> future : taskRestoreFutures) {
try {
future.get();
} catch (InterruptedException e) {
LOG.warn("Received an interrupt during store restoration. Interrupting the restore executor to exit "
+ "prematurely without restoring full state.");
restoreExecutor.shutdownNow();
throw e;
} catch (Exception e) {
LOG.error("Exception when restoring state.", e);
throw new SamzaException("Exception when restoring state.", e);
}
}
// Stop each store consumer once
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop);
// Now create persistent non-side-input stores in read-write mode, leave non-persistent and side-input stores as-is
Set<String> inMemoryStoreNames =
ContainerStorageManagerUtil.getInMemoryStoreNames(this.storageEngineFactories, this.config);
Set<String> storesToCreate = nonSideInputStoreNames.stream()
.filter(s -> !inMemoryStoreNames.contains(s)).collect(Collectors.toSet());
this.taskStores = ContainerStorageManagerUtil.createTaskStores(
storesToCreate, this.storageEngineFactories, this.sideInputStoreNames,
this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths,
this.containerModel, this.jobContext, this.containerContext,
this.serdes, this.taskInstanceMetrics, this.taskInstanceCollectors, this.storageManagerUtil,
this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, this.config);
// Add in memory stores
this.inMemoryStores.forEach((taskName, stores) -> {
if (!this.taskStores.containsKey(taskName)) {
taskStores.put(taskName, new HashMap<>());
}
taskStores.get(taskName).putAll(stores);
});
LOG.info("Store Restore complete");
}
/**
* Get the {@link StorageEngine} instance with a given name for a given task.
* @param taskName the task name for which the storage engine is desired.
* @param storeName the desired store's name.
* @return the task store.
*/
public Optional<StorageEngine> getStore(TaskName taskName, String storeName) {
if (!isStarted) {
throw new SamzaException(String.format(
"Attempting to access store %s for task %s before ContainerStorageManager is started.",
storeName, taskName));
}
return Optional.ofNullable(this.taskStores.get(taskName).get(storeName));
}
/**
* Get all {@link StorageEngine} instance used by a given task.
* @param taskName the task name, all stores for which are desired.
* @return map of stores used by the given task, indexed by storename
*/
public Map<String, StorageEngine> getAllStores(TaskName taskName) {
if (!isStarted) {
throw new SamzaException(String.format(
"Attempting to access stores for task %s before ContainerStorageManager is started.", taskName));
}
return this.taskStores.get(taskName);
}
/**
* Set of directory paths for all stores restored by this {@link ContainerStorageManager}.
* @return the set of all store directory paths
*/
public Set<Path> getStoreDirectoryPaths() {
return this.storeDirectoryPaths;
}
@VisibleForTesting
public void stopStores() {
this.taskStores.forEach((taskName, storeMap) -> storeMap.forEach((storeName, store) -> store.stop()));
}
public void shutdown() {
// stop all non side input stores including persistent and non-persistent stores
if (taskStores != null) {
this.containerModel.getTasks()
.forEach((taskName, taskModel) -> taskStores.get(taskName)
.entrySet().stream()
.filter(e -> !sideInputStoreNames.contains(e.getKey()))
.forEach(e -> e.getValue().stop()));
}
this.sideInputsManager.shutdown();
LOG.info("Shutdown complete");
}
}