blob: 84339968e05d4a6c6006a25c7c5689d49a0d0dc8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.commons.collections4.MapUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SSPMetadataCache;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
private StreamMetadataCache streamCache;
/*
* This keeps track of the changelog SSPs that are associated with the whole container. This is used so that we can
* prefetch the metadata about the all of the changelog SSPs associated with the container whenever we need the
* metadata about some of the changelog SSPs.
* An example use case is when Samza writes offset files for stores ({@link TaskStorageManager}). Each task is
* responsible for its own offset file, but if we can do prefetching, then most tasks will already have cached
* metadata by the time they need the offset metadata.
* Note: By using all changelog streams to build the sspsToPrefetch, any fetches done for persisted stores will
* include the ssps for non-persisted stores, so this is slightly suboptimal. However, this does not increase the
* actual number of calls to the {@link SystemAdmin}, and we can decouple this logic from the per-task objects (e.g.
* {@link TaskStorageManager}).
*/
private SSPMetadataCache sspCache;
@Override
public TaskBackupManager getBackupManager(JobContext jobContext,
ContainerModel containerModel,
TaskModel taskModel,
ExecutorService backupExecutor,
MetricsRegistry metricsRegistry,
Config config,
Clock clock,
File loggedStoreBaseDir,
File nonLoggedStoreBaseDir) {
SystemAdmins systemAdmins = new SystemAdmins(config);
StorageConfig storageConfig = new StorageConfig(config);
Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
if (new TaskConfig(config).getTransactionalStateCheckpointEnabled()) {
return new KafkaTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs,
systemAdmins, taskModel.getChangelogPartition());
} else {
return new KafkaNonTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs,
systemAdmins, taskModel.getChangelogPartition());
}
}
@Override
public TaskRestoreManager getRestoreManager(JobContext jobContext,
ContainerContext containerContext,
TaskModel taskModel,
ExecutorService restoreExecutor,
MetricsRegistry metricsRegistry,
Set<String> storesToRestore,
Config config,
Clock clock,
File loggedStoreBaseDir,
File nonLoggedStoreBaseDir,
KafkaChangelogRestoreParams kafkaChangelogRestoreParams) {
Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
Set<SystemStreamPartition> changelogSSPs = storeChangelogs.values().stream()
.flatMap(ss -> containerContext.getContainerModel().getTasks().values().stream()
.map(tm -> new SystemStreamPartition(ss, tm.getChangelogPartition())))
.collect(Collectors.toSet());
// filter out standby store-ssp pairs
Map<String, SystemStream> filteredStoreChangelogs =
filterStandbySystemStreams(storeChangelogs, containerContext.getContainerModel());
SystemAdmins systemAdmins = new SystemAdmins(kafkaChangelogRestoreParams.getSystemAdmins());
if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
return new TransactionalStateTaskRestoreManager(
storesToRestore,
jobContext,
containerContext,
taskModel,
filteredStoreChangelogs,
kafkaChangelogRestoreParams.getInMemoryStores(),
kafkaChangelogRestoreParams.getStorageEngineFactories(),
kafkaChangelogRestoreParams.getSerdes(),
systemAdmins,
kafkaChangelogRestoreParams.getStoreConsumers(),
metricsRegistry,
kafkaChangelogRestoreParams.getCollector(),
getSspCache(systemAdmins, clock, changelogSSPs),
loggedStoreBaseDir,
nonLoggedStoreBaseDir,
config,
clock
);
} else {
return new NonTransactionalStateTaskRestoreManager(
storesToRestore,
jobContext,
containerContext,
taskModel,
filteredStoreChangelogs,
kafkaChangelogRestoreParams.getInMemoryStores(),
kafkaChangelogRestoreParams.getStorageEngineFactories(),
kafkaChangelogRestoreParams.getSerdes(),
systemAdmins,
getStreamCache(systemAdmins, clock),
kafkaChangelogRestoreParams.getStoreConsumers(),
metricsRegistry,
kafkaChangelogRestoreParams.getCollector(),
jobContext.getJobModel().getMaxChangeLogStreamPartitions(),
loggedStoreBaseDir,
nonLoggedStoreBaseDir,
config,
clock
);
}
}
@Override
//TODO HIGH snjain implement this
public StateBackendAdmin getAdmin(JobModel jobModel, Config config) {
return new NoOpKafkaChangelogStateBackendAdmin();
}
public Set<SystemStreamPartition> getChangelogSSPForContainer(Map<String, SystemStream> storeChangelogs,
ContainerContext containerContext) {
return storeChangelogs.values().stream()
.flatMap(ss -> containerContext.getContainerModel().getTasks().values().stream()
.map(tm -> new SystemStreamPartition(ss, tm.getChangelogPartition())))
.collect(Collectors.toSet());
}
/**
* Shared cache across all KafkaRestoreManagers for the Kafka topic
*
* @param admins system admins used the fetch the stream metadata
* @param clock for cache invalidation
* @return StreamMetadataCache containing the stream metadata
*/
@VisibleForTesting
StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
if (streamCache == null) {
streamCache = new StreamMetadataCache(admins, 5000, clock);
}
return streamCache;
}
/**
* Shared cache across KafkaRestoreManagers for the Kafka partition
*
* @param admins system admins used the fetch the stream metadata
* @param clock for cache invalidation
* @param ssps SSPs to prefetch
* @return SSPMetadataCache containing the partition metadata
*/
private SSPMetadataCache getSspCache(SystemAdmins admins, Clock clock, Set<SystemStreamPartition> ssps) {
if (sspCache == null) {
sspCache = new SSPMetadataCache(admins, Duration.ofSeconds(5), clock, ssps);
}
return sspCache;
}
@VisibleForTesting
Map<String, SystemStream> filterStandbySystemStreams(Map<String, SystemStream> changelogSystemStreams,
ContainerModel containerModel) {
Map<SystemStreamPartition, String> changelogSSPToStore = new HashMap<>();
changelogSystemStreams.forEach((storeName, systemStream) ->
containerModel.getTasks().forEach((taskName, taskModel) ->
changelogSSPToStore.put(new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()), storeName))
);
Set<TaskModel> standbyTaskModels = containerModel.getTasks().values().stream()
.filter(taskModel -> taskModel.getTaskMode().equals(TaskMode.Standby))
.collect(Collectors.toSet());
// remove all standby task changelog ssps
standbyTaskModels.forEach((taskModel) -> {
changelogSystemStreams.forEach((storeName, systemStream) -> {
SystemStreamPartition ssp = new SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
changelogSSPToStore.remove(ssp);
});
});
// changelogSystemStreams correspond only to active tasks (since those of standby-tasks moved to sideInputs above)
return MapUtils.invertMap(changelogSSPToStore).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, x -> x.getValue().getSystemStream()));
}
public class NoOpKafkaChangelogStateBackendAdmin implements StateBackendAdmin {
@Override
public void createResources() {
// all the changelog creations are handled by {@link ChangelogStreamManager}
}
@Override
public void validateResources() {
// all the changelog validations are handled by {@link ChangelogStreamManager}
}
}
}