blob: e292df96049638d03435d0cc45eff678d7252dd4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage;
import java.io.File;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.config.Config;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.ContainerContextImpl;
import org.apache.samza.context.JobContextImpl;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
import org.apache.samza.system.SSPMetadataCache;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.StreamUtil;
import org.apache.samza.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Recovers the state storage from the changelog streams and stores the state
* in the directory provided by the users. The changelog streams are derived
* from the job's config file.
*/
public class StorageRecovery {
private static final Logger LOG = LoggerFactory.getLogger(StorageRecovery.class);
private final Config jobConfig;
private final File storeBaseDir;
private final SystemAdmins systemAdmins;
private final Map<String, SystemStream> changeLogSystemStreams = new HashMap<>();
private final Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories = new HashMap<>();
private final Map<String, ContainerStorageManager> containerStorageManagers = new HashMap<>();
private int maxPartitionNumber = 0;
private Map<String, ContainerModel> containers = new HashMap<>();
/**
* Construct the StorageRecovery
*
* @param config
* the job config
* @param path
* the directory path where we put the stores
*/
StorageRecovery(Config config, String path) {
jobConfig = config;
storeBaseDir = new File(path, "state");
systemAdmins = new SystemAdmins(config);
}
/**
* setup phase which assigns required values to the variables used for all
* tasks.
*/
private void setup() {
LOG.info("setting up the recovery...");
getContainerModels();
getChangeLogSystemStreamsAndStorageFactories();
getChangeLogMaxPartitionNumber();
getContainerStorageManagers();
}
/**
* run the setup phase and restore all the task storage
*/
public void run() {
setup();
LOG.info("start recovering...");
systemAdmins.start();
this.containerStorageManagers.forEach((containerName, containerStorageManager) -> {
try {
containerStorageManager.start();
} catch (InterruptedException e) {
// we can ignore the exception since its only used in the context of a command line tool and bubbling the
// exception upstream isn't needed.
LOG.warn("Received an interrupt during store restoration for container {}."
+ " Proceeding with the next container", containerName);
}
});
this.containerStorageManagers.forEach((containerName, containerStorageManager) -> containerStorageManager.shutdown());
systemAdmins.stop();
LOG.info("successfully recovered in " + storeBaseDir.toString());
}
/**
* Build ContainerModels from job config file and put the results in the containerModels map.
*/
private void getContainerModels() {
MetricsRegistryMap metricsRegistryMap = new MetricsRegistryMap();
CoordinatorStreamStore coordinatorStreamStore = new CoordinatorStreamStore(jobConfig, metricsRegistryMap);
coordinatorStreamStore.init();
try {
Config configFromCoordinatorStream = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamStore);
JobModelManager jobModelManager =
JobModelManager.apply(configFromCoordinatorStream, changelogStreamManager.readPartitionMapping(),
coordinatorStreamStore, metricsRegistryMap);
JobModel jobModel = jobModelManager.jobModel();
containers = jobModel.getContainers();
} finally {
coordinatorStreamStore.close();
}
}
/**
* Get the changelog streams and the storage factories from the config file
* and put them into the maps
*/
private void getChangeLogSystemStreamsAndStorageFactories() {
StorageConfig config = new StorageConfig(jobConfig);
List<String> storeNames = config.getStoreNames();
LOG.info("Got store names: " + storeNames.toString());
for (String storeName : storeNames) {
Optional<String> streamName = config.getChangelogStream(storeName);
LOG.info("stream name for " + storeName + " is " + streamName.orElse(null));
streamName.ifPresent(name -> changeLogSystemStreams.put(storeName, StreamUtil.getSystemStreamFromNames(name)));
Optional<String> factoryClass = config.getStorageFactoryClassName(storeName);
if (factoryClass.isPresent()) {
@SuppressWarnings("unchecked")
StorageEngineFactory<Object, Object> factory =
(StorageEngineFactory<Object, Object>) ReflectionUtil.getObj(factoryClass.get(), StorageEngineFactory.class);
storageEngineFactories.put(storeName, factory);
} else {
throw new SamzaException("Missing storage factory for " + storeName + ".");
}
}
}
/**
* get the max partition number of the changelog stream
*/
private void getChangeLogMaxPartitionNumber() {
int maxPartitionId = 0;
for (ContainerModel containerModel : containers.values()) {
for (TaskModel taskModel : containerModel.getTasks().values()) {
maxPartitionId = Math.max(maxPartitionId, taskModel.getChangelogPartition().getPartitionId());
}
}
maxPartitionNumber = maxPartitionId + 1;
}
private Map<String, Serde<Object>> getSerdes() {
Map<String, Serde<Object>> serdeMap = new HashMap<>();
SerializerConfig serializerConfig = new SerializerConfig(jobConfig);
// Adding all serdes from factories
serializerConfig.getSerdeNames()
.forEach(serdeName -> {
String serdeClassName = serializerConfig.getSerdeFactoryClass(serdeName)
.orElseGet(() -> SerializerConfig.getPredefinedSerdeFactoryName(serdeName));
@SuppressWarnings("unchecked")
Serde<Object> serde =
ReflectionUtil.getObj(serdeClassName, SerdeFactory.class).getSerde(serdeName, serializerConfig);
serdeMap.put(serdeName, serde);
});
return serdeMap;
}
/**
* create one TaskStorageManager for each task. Add all of them to the
* List<TaskStorageManager>
*/
@SuppressWarnings("rawtypes")
private void getContainerStorageManagers() {
Clock clock = SystemClock.instance();
StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, clock);
// don't worry about prefetching for this; looks like the tool doesn't flush to offset files anyways
Map<String, SystemFactory> systemFactories = new SystemConfig(jobConfig).getSystemFactories();
CheckpointManager checkpointManager = new TaskConfig(jobConfig)
.getCheckpointManager(new MetricsRegistryMap()).orElse(null);
for (ContainerModel containerModel : containers.values()) {
ContainerContext containerContext = new ContainerContextImpl(containerModel, new MetricsRegistryMap());
Set<SystemStreamPartition> changelogSSPs = changeLogSystemStreams.values().stream()
.flatMap(ss -> containerModel.getTasks().values().stream()
.map(tm -> new SystemStreamPartition(ss, tm.getChangelogPartition())))
.collect(Collectors.toSet());
SSPMetadataCache sspMetadataCache = new SSPMetadataCache(systemAdmins, Duration.ofMillis(5000), clock, changelogSSPs);
ContainerStorageManager containerStorageManager =
new ContainerStorageManager(
checkpointManager,
containerModel,
streamMetadataCache,
sspMetadataCache,
systemAdmins,
changeLogSystemStreams,
new HashMap<>(),
storageEngineFactories,
systemFactories,
this.getSerdes(),
jobConfig,
new HashMap<>(),
new SamzaContainerMetrics(containerModel.getId(), new MetricsRegistryMap()),
JobContextImpl.fromConfigWithDefaults(jobConfig),
containerContext,
new HashMap<>(),
storeBaseDir,
storeBaseDir,
maxPartitionNumber,
null,
new SystemClock());
this.containerStorageManagers.put(containerModel.getId(), containerStorageManager);
}
}
}