Merge pull request #1512 from dxichen/checkpoint-restore-precendence-cleanup
Merging misc. incremental fixes related to blob store state backends and async commit.
diff --git a/samza-api/src/main/java/org/apache/samza/serializers/JsonCheckpointIdMixin.java b/samza-api/src/main/java/org/apache/samza/serializers/JsonCheckpointIdMixin.java
deleted file mode 100644
index 752e329..0000000
--- a/samza-api/src/main/java/org/apache/samza/serializers/JsonCheckpointIdMixin.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers;
-
-import org.apache.samza.checkpoint.CheckpointId;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-
-/**
- * A mix-in Jackson class to convert {@link CheckpointId} to/from JSON.
- */
-public abstract class JsonCheckpointIdMixin {
- @JsonCreator
- private JsonCheckpointIdMixin(
- @JsonProperty("millis") long millis,
- @JsonProperty("nanos") long nanos) {
- }
-
- @JsonProperty("millis")
- abstract long getMillis();
-
- @JsonProperty("nanos")
- abstract long getNanos();
-}
diff --git a/samza-api/src/main/java/org/apache/samza/storage/KafkaChangelogRestoreParams.java b/samza-api/src/main/java/org/apache/samza/storage/KafkaChangelogRestoreParams.java
index 961a85b..ed4601a 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/KafkaChangelogRestoreParams.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/KafkaChangelogRestoreParams.java
@@ -20,7 +20,6 @@
package org.apache.samza.storage;
import java.util.Map;
-import java.util.Set;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
@@ -36,7 +35,6 @@
private final Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories;
private final Map<String, Serde<Object>> serdes;
private final MessageCollector collector;
- private final Set<String> storeNames;
public KafkaChangelogRestoreParams(
Map<String, SystemConsumer> storeConsumers,
@@ -44,15 +42,13 @@
Map<String, SystemAdmin> systemAdmins,
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Map<String, Serde<Object>> serdes,
- MessageCollector collector,
- Set<String> storeNames) {
+ MessageCollector collector) {
this.storeConsumers = storeConsumers;
this.inMemoryStores = inMemoryStores;
this.systemAdmins = systemAdmins;
this.storageEngineFactories = storageEngineFactories;
this.serdes = serdes;
this.collector = collector;
- this.storeNames = storeNames;
}
public Map<String, SystemConsumer> getStoreConsumers() {
@@ -78,8 +74,4 @@
public MessageCollector getCollector() {
return collector;
}
-
- public Set<String> getStoreNames() {
- return storeNames;
- }
}
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java b/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
index f51d414..91f3df3 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
@@ -20,6 +20,7 @@
package org.apache.samza.storage;
import java.io.File;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.samza.config.Config;
import org.apache.samza.context.ContainerContext;
@@ -51,6 +52,7 @@
TaskModel taskModel,
ExecutorService restoreExecutor,
MetricsRegistry metricsRegistry,
+ Set<String> storesToRestore,
Config config,
Clock clock,
File loggedStoreBaseDir,
diff --git a/samza-api/src/main/java/org/apache/samza/storage/TaskRestoreManager.java b/samza-api/src/main/java/org/apache/samza/storage/TaskRestoreManager.java
index 999325e..1f5a5ce 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/TaskRestoreManager.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/TaskRestoreManager.java
@@ -47,7 +47,7 @@
void restore() throws InterruptedException;
/**
- * Closes all initiated ressources include storage engines
+ * Closes all initiated resources include storage engines
*/
void close();
}
diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index 7535d65..4367244 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -72,10 +72,12 @@
"org.apache.samza.storage.KafkaChangelogStateBackendFactory";
public static final List<String> DEFAULT_BACKUP_FACTORIES = ImmutableList.of(
KAFKA_STATE_BACKEND_FACTORY);
+ public static final String JOB_BACKUP_FACTORIES = STORE_PREFIX + "backup.factories";
public static final String STORE_BACKUP_FACTORIES = STORE_PREFIX + "%s.backup.factories";
- // TODO BLOCKER dchen make this per store
- public static final String RESTORE_FACTORY_SUFFIX = "restore.factory";
- public static final String STORE_RESTORE_FACTORY = STORE_PREFIX + RESTORE_FACTORY_SUFFIX;
+ public static final String RESTORE_FACTORIES_SUFFIX = "restore.factories";
+ public static final String STORE_RESTORE_FACTORIES = STORE_PREFIX + "%s." + RESTORE_FACTORIES_SUFFIX;
+ public static final String JOB_RESTORE_FACTORIES = STORE_PREFIX + RESTORE_FACTORIES_SUFFIX;
+ public static final List<String> DEFAULT_RESTORE_FACTORIES = ImmutableList.of(KAFKA_STATE_BACKEND_FACTORY);
static final String CHANGELOG_SYSTEM = "job.changelog.system";
static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX + "%s.changelog.delete.retention.ms";
@@ -101,8 +103,7 @@
for (String key : subConfig.keySet()) {
if (key.endsWith(SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX)) {
storeNames.add(key.substring(0, key.length() - SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX.length()));
- } else if (key.endsWith(FACTORY_SUFFIX) && !key.equals(RESTORE_FACTORY_SUFFIX)) {
- // TODO HIGH dchen STORE_RESTORE_FACTORY added here to be ignored. Update/remove after changing restore factory -> factories
+ } else if (key.endsWith(FACTORY_SUFFIX)) {
storeNames.add(key.substring(0, key.length() - FACTORY_SUFFIX.length()));
}
}
@@ -290,42 +291,103 @@
.count();
}
- public List<String> getStoreBackupFactory(String storeName) {
- List<String> storeBackupManagers = getList(String.format(STORE_BACKUP_FACTORIES, storeName), new ArrayList<>());
- // For backwards compatibility if the changelog is enabled, we use default kafka backup factory
- if (storeBackupManagers.isEmpty() && getChangelogStream(storeName).isPresent()) {
- storeBackupManagers = DEFAULT_BACKUP_FACTORIES;
+ private List<String> getJobStoreBackupFactories() {
+ return getList(JOB_BACKUP_FACTORIES, new ArrayList<>());
+ }
+
+ /**
+ * Backup state backend factory follows the precedence:
+ *
+ * 1. If stores.store-name.backup.factories config key exists the store-name, that value is used
+ * 2. If stores.backup.factories is set for the job, that value is used
+ * 3. If stores.store-name.changelog is set for store-name, the default Kafka changelog state backend factory
+ * 4. Otherwise no backup factories will be configured for the store
+ *
+ * Note: that 2 takes precedence over 3 enables job based migration off of Changelog restores
+ * @return List of backup factories for the store in order of backup precedence
+ */
+ public List<String> getStoreBackupFactories(String storeName) {
+ List<String> storeBackupManagers;
+ if (containsKey(String.format(STORE_BACKUP_FACTORIES, storeName))) {
+ storeBackupManagers = getList(String.format(STORE_BACKUP_FACTORIES, storeName), new ArrayList<>());
+ } else {
+ storeBackupManagers = getJobStoreBackupFactories();
+ // For backwards compatibility if the changelog is enabled, we use default kafka backup factory
+ if (storeBackupManagers.isEmpty() && getChangelogStream(storeName).isPresent()) {
+ storeBackupManagers = DEFAULT_BACKUP_FACTORIES;
+ }
}
return storeBackupManagers;
}
public Set<String> getBackupFactories() {
return getStoreNames().stream()
- .flatMap((storeName) -> getStoreBackupFactory(storeName).stream())
+ .flatMap((storeName) -> getStoreBackupFactories(storeName).stream())
.collect(Collectors.toSet());
}
- public String getRestoreFactory() {
- return get(STORE_RESTORE_FACTORY, KAFKA_STATE_BACKEND_FACTORY);
- }
-
public List<String> getStoresWithBackupFactory(String backendFactoryName) {
return getStoreNames().stream()
- .filter((storeName) -> getStoreBackupFactory(storeName)
+ .filter((storeName) -> getStoreBackupFactories(storeName)
.contains(backendFactoryName))
.collect(Collectors.toList());
}
- // TODO BLOCKER dchen update when making restore managers per store
+ public List<String> getPersistentStoresWithBackupFactory(String backendFactoryName) {
+ return getStoreNames().stream()
+ .filter(storeName -> {
+ Optional<String> storeFactory = getStorageFactoryClassName(storeName);
+ return storeFactory.isPresent() &&
+ !storeFactory.get().equals(StorageConfig.INMEMORY_KV_STORAGE_ENGINE_FACTORY);
+ })
+ .filter((storeName) -> getStoreBackupFactories(storeName)
+ .contains(backendFactoryName))
+ .collect(Collectors.toList());
+ }
+
+ private List<String> getJobStoreRestoreFactories() {
+ return getList(JOB_RESTORE_FACTORIES, new ArrayList<>());
+ }
+
+ /**
+ * Restore state backend factory follows the precedence:
+ *
+ * 1. If stores.store-name.restore.factories config key exists for the store-name, that value is used
+ * 2. If stores.restore.factories is set for the job, that value is used
+ * 3. If stores.store-name.changelog is set for store-name, the default Kafka changelog state backend factory
+ * 4. Otherwise no restore factories will be configured for the store
+ *
+ * Note that 2 takes precedence over 3 enables job based migration off of Changelog restores
+ * @return List of restore factories for the store in order of restoration precedence
+ */
+ public List<String> getStoreRestoreFactories(String storeName) {
+ List<String> storeRestoreManagers;
+ if (containsKey(String.format(STORE_RESTORE_FACTORIES, storeName))) {
+ storeRestoreManagers = getList(String.format(STORE_RESTORE_FACTORIES, storeName), new ArrayList<>());
+ } else {
+ storeRestoreManagers = getJobStoreRestoreFactories();
+ // for backwards compatibility if changelog is enabled, we use default Kafka backup factory
+ if (storeRestoreManagers.isEmpty() && getChangelogStream(storeName).isPresent()) {
+ storeRestoreManagers = DEFAULT_RESTORE_FACTORIES;
+ }
+ }
+ return storeRestoreManagers;
+ }
+
+ public Set<String> getRestoreFactories() {
+ return getStoreNames().stream()
+ .flatMap((storesName) -> getStoreRestoreFactories(storesName).stream())
+ .collect(Collectors.toSet());
+ }
+
public List<String> getStoresWithRestoreFactory(String backendFactoryName) {
return getStoreNames().stream()
- .filter((storeName) -> getRestoreFactory().equals(backendFactoryName))
+ .filter((storeName) -> getStoreRestoreFactories(storeName).contains(backendFactoryName))
.collect(Collectors.toList());
}
/**
* Helper method to get if logged store dirs should be deleted regardless of their contents.
- * @return
*/
public boolean cleanLoggedStoreDirsOnStart(String storeName) {
return getBoolean(String.format(CLEAN_LOGGED_STOREDIRS_ON_START, storeName), false);
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
index 8db1d2a..06a8727 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -63,7 +63,7 @@
// COMMIT_MAX_DELAY_MS have passed since the pending commit start. if the pending commit
// does not complete within this timeout, the container will shut down.
public static final String COMMIT_TIMEOUT_MS = "task.commit.timeout.ms";
- static final long DEFAULT_COMMIT_TIMEOUT_MS = Duration.ofMinutes(1).toMillis();
+ static final long DEFAULT_COMMIT_TIMEOUT_MS = Duration.ofMinutes(30).toMillis();
// how long to wait for a clean shutdown
public static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
@@ -125,8 +125,8 @@
public static final List<String> DEFAULT_CHECKPOINT_WRITE_VERSIONS = ImmutableList.of("1", "2");
// checkpoint version to read during container startup
- public static final String CHECKPOINT_READ_VERSION = "task.checkpoint.read.version";
- public static final short DEFAULT_CHECKPOINT_READ_VERSION = 1;
+ public static final String CHECKPOINT_READ_VERSIONS = "task.checkpoint.read.versions";
+ public static final List<String> DEFAULT_CHECKPOINT_READ_VERSIONS = ImmutableList.of("1");
public static final String TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = "task.transactional.state.checkpoint.enabled";
private static final boolean DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = true;
@@ -348,8 +348,16 @@
.stream().map(Short::valueOf).collect(Collectors.toList());
}
- public short getCheckpointReadVersion() {
- return getShort(CHECKPOINT_READ_VERSION, DEFAULT_CHECKPOINT_READ_VERSION);
+ public List<Short> getCheckpointReadVersions() {
+ List<Short> checkpointReadPriorityList = getList(CHECKPOINT_READ_VERSIONS, DEFAULT_CHECKPOINT_READ_VERSIONS)
+ .stream().map(Short::valueOf).collect(Collectors.toList());
+ if (checkpointReadPriorityList.isEmpty()) {
+ // if the user explicitly defines the checkpoint read list to be empty
+ throw new IllegalArgumentException("No checkpoint read versions defined for job. "
+ + "Please remove the task.checkpoint.read.versions or define valid checkpoint versions");
+ } else {
+ return checkpointReadPriorityList;
+ }
}
public boolean getTransactionalStateCheckpointEnabled() {
diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
index c172288..083b483 100644
--- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
+++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
@@ -244,7 +244,7 @@
* The Startpoints that are written to with {@link #writeStartpoint(SystemStreamPartition, Startpoint)} and with
* {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are moved from a "read-write" namespace
* to a "fan out" namespace.
- * This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this
+ * This method is not atomic nor thread-safe. The intent is for the Samza Processor's coordinator to use this
* method to assign the Startpoints to the appropriate tasks.
* @param taskToSSPs Determines which {@link TaskName} each {@link SystemStreamPartition} maps to.
* @return The set of active {@link TaskName}s that were fanned out to.
diff --git a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
index be419b6..8433996 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
@@ -91,6 +91,7 @@
TaskModel taskModel,
ExecutorService restoreExecutor,
MetricsRegistry metricsRegistry,
+ Set<String> storesToRestore,
Config config,
Clock clock,
File loggedStoreBaseDir,
@@ -108,7 +109,7 @@
if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
return new TransactionalStateTaskRestoreManager(
- kafkaChangelogRestoreParams.getStoreNames(),
+ storesToRestore,
jobContext,
containerContext,
taskModel,
@@ -128,7 +129,7 @@
);
} else {
return new NonTransactionalStateTaskRestoreManager(
- kafkaChangelogRestoreParams.getStoreNames(),
+ storesToRestore,
jobContext,
containerContext,
taskModel,
diff --git a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
index 35fa375..33af367 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
@@ -326,7 +326,6 @@
return storageManagerUtil.getStartingOffset(systemStreamPartition, systemAdmin, fileOffset, oldestOffset);
}
- // TODO dchen put this in common code path for transactional and non-transactional
private Map<String, StorageEngine> createStoreEngines(Set<String> storeNames, JobContext jobContext,
ContainerContext containerContext, Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Map<String, Serde<Object>> serdes, MetricsRegistry metricsRegistry,
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index 057b248..badeb28 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -113,7 +113,6 @@
* @param isSideInput true if store is a side-input store, false if it is a regular store
* @return true if the store is stale, false otherwise
*/
- // TODO BLOCKER dchen do these methods need to be updated to also read the new checkpoint file?
public boolean isStaleStore(File storeDir, long storeDeleteRetentionInMs, long currentTimeMs, boolean isSideInput) {
long offsetFileLastModifiedTime;
boolean isStaleStore = false;
@@ -127,8 +126,11 @@
File offsetFileRefNew = new File(storeDir, OFFSET_FILE_NAME_NEW);
File offsetFileRefLegacy = new File(storeDir, OFFSET_FILE_NAME_LEGACY);
File sideInputOffsetFileRefLegacy = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+ File checkpointV2File = new File(storeDir, CHECKPOINT_FILE_NAME);
- if (offsetFileRefNew.exists()) {
+ if (checkpointV2File.exists()) {
+ offsetFileLastModifiedTime = checkpointV2File.lastModified();
+ } else if (offsetFileRefNew.exists()) {
offsetFileLastModifiedTime = offsetFileRefNew.lastModified();
} else if (!isSideInput && offsetFileRefLegacy.exists()) {
offsetFileLastModifiedTime = offsetFileRefLegacy.lastModified();
@@ -184,7 +186,6 @@
* @param isSideInput true if store is a side-input store, false if it is a regular store
* @return true if the offset file is valid. false otherwise.
*/
- // TODO BLOCKER dchen do these methods need to be updated to also read the new checkpoint file?
public boolean isOffsetFileValid(File storeDir, Set<SystemStreamPartition> storeSSPs, boolean isSideInput) {
boolean hasValidOffsetFile = false;
if (storeDir.exists()) {
@@ -305,11 +306,11 @@
/**
* Read and return the {@link CheckpointV2} from the directory's {@link #CHECKPOINT_FILE_NAME} file.
* If the file does not exist, returns null.
- * // TODO HIGH dchen add tests at all call sites for handling null value.
*
* @param storagePartitionDir store directory to read the checkpoint file from
* @return the {@link CheckpointV2} object retrieved from the checkpoint file if found, otherwise return null
*/
+ // TODO dchen use checkpoint v2 file before migrating off of dual checkpoints
public CheckpointV2 readCheckpointV2File(File storagePartitionDir) {
File checkpointFile = new File(storagePartitionDir, CHECKPOINT_FILE_NAME);
if (checkpointFile.exists()) {
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 8091e48..6b657d5 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -24,6 +24,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.config.Config;
@@ -215,7 +217,10 @@
*/
@SuppressWarnings("rawtypes")
private void getContainerStorageManagers() {
- String factoryClass = new StorageConfig(jobConfig).getRestoreFactory();
+ Set<String> factoryClasses = new StorageConfig(jobConfig).getRestoreFactories();
+ Map<String, StateBackendFactory> stateBackendFactories = factoryClasses.stream().collect(
+ Collectors.toMap(factoryClass -> factoryClass,
+ factoryClass -> ReflectionUtil.getObj(factoryClass, StateBackendFactory.class)));
Clock clock = SystemClock.instance();
StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, clock);
// don't worry about prefetching for this; looks like the tool doesn't flush to offset files anyways
@@ -242,7 +247,7 @@
new SamzaContainerMetrics(containerModel.getId(), new MetricsRegistryMap(), ""),
JobContextImpl.fromConfigWithDefaults(jobConfig, jobModel),
containerContext,
- ReflectionUtil.getObj(factoryClass, StateBackendFactory.class),
+ stateBackendFactories,
new HashMap<>(),
storeBaseDir,
storeBaseDir,
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
index 20f653c..f33bb5b 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
@@ -611,7 +611,7 @@
} else if (checkpoint instanceof CheckpointV1) {
// If the checkpoint v1 is used, we need to fetch the changelog SSPs in the inputOffsets in order to get the
// store offset.
- Map<SystemStreamPartition, String> checkpointedOffsets = ((CheckpointV1) checkpoint).getOffsets();
+ Map<SystemStreamPartition, String> checkpointedOffsets = checkpoint.getOffsets();
storeChangelogs.forEach((storeName, systemStream) -> {
Partition changelogPartition = taskModel.getChangelogPartition();
SystemStreamPartition storeChangelogSSP = new SystemStreamPartition(systemStream, changelogPartition);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
index 15cc87b..fb04ce5 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
@@ -114,7 +114,7 @@
this.storageManagerUtil = storageManagerUtil;
StorageConfig storageConfig = new StorageConfig(config);
this.storesToBackup =
- storageConfig.getStoresWithBackupFactory(BlobStoreStateBackendFactory.class.getName());
+ storageConfig.getPersistentStoresWithBackupFactory(BlobStoreStateBackendFactory.class.getName());
this.loggedStoreBaseDir = loggedStoreBaseDir;
this.blobStoreManager = blobStoreManager;
this.blobStoreUtil = createBlobStoreUtil(blobStoreManager, executor, blobStoreTaskBackupMetrics);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
index 052e49d..64bc74e 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
@@ -84,7 +84,7 @@
private final File loggedBaseDir;
private final File nonLoggedBaseDir;
private final String taskName;
- private final List<String> storesToRestore;
+ private final Set<String> storesToRestore;
private final BlobStoreRestoreManagerMetrics metrics;
private BlobStoreManager blobStoreManager;
@@ -95,13 +95,13 @@
*/
private Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes;
- public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService restoreExecutor,
+ public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService restoreExecutor, Set<String> storesToRestore,
BlobStoreRestoreManagerMetrics metrics, Config config, File loggedBaseDir, File nonLoggedBaseDir,
StorageManagerUtil storageManagerUtil, BlobStoreManager blobStoreManager) {
this.taskModel = taskModel;
this.jobName = new JobConfig(config).getName().get();
this.jobId = new JobConfig(config).getJobId();
- this.executor = restoreExecutor; // TODO BLOCKER dchen1 dont block on restore executor
+ this.executor = restoreExecutor;
this.config = config;
this.storageConfig = new StorageConfig(config);
this.blobStoreConfig = new BlobStoreConfig(config);
@@ -113,9 +113,7 @@
this.loggedBaseDir = loggedBaseDir;
this.nonLoggedBaseDir = nonLoggedBaseDir;
this.taskName = taskModel.getTaskName().getTaskName();
- StorageConfig storageConfig = new StorageConfig(config);
- this.storesToRestore =
- storageConfig.getStoresWithRestoreFactory(BlobStoreStateBackendFactory.class.getName());
+ this.storesToRestore = storesToRestore;
this.metrics = metrics;
}
@@ -168,7 +166,7 @@
/**
* Deletes blob store contents for stores that were present in the last checkpoint but are either no longer
- * present in job configs (removed by user since last deploymetn) or are no longer configured to be backed
+ * present in job configs (removed by user since last deployment) or are no longer configured to be backed
* up using blob stores.
*
* This method blocks until all the necessary store contents and snapshot index blobs have been marked for deletion.
@@ -209,7 +207,7 @@
* Restores all eligible stores in the task.
*/
@VisibleForTesting
- static void restoreStores(String jobName, String jobId, TaskName taskName, List<String> storesToRestore,
+ static void restoreStores(String jobName, String jobId, TaskName taskName, Set<String> storesToRestore,
Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes,
File loggedBaseDir, StorageConfig storageConfig, BlobStoreRestoreManagerMetrics metrics,
StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, DirDiffUtil dirDiffUtil,
@@ -284,7 +282,7 @@
FutureUtil.allOf(restoreFutures).whenComplete((res, ex) -> {
LOG.info("Restore completed for task: {} stores", taskName);
metrics.restoreNs.set(System.nanoTime() - restoreStartTime);
- }).join(); // TODO BLOCKER dchen1 make non-blocking.
+ }).join(); // TODO dchen make non-blocking for the restore executor
}
/**
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
index 23482c7..e2512b4 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
@@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import java.io.File;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.BlobStoreConfig;
@@ -73,6 +74,7 @@
TaskModel taskModel,
ExecutorService restoreExecutor,
MetricsRegistry metricsRegistry,
+ Set<String> storesToRestore,
Config config,
Clock clock,
File loggedStoreBaseDir,
@@ -84,7 +86,7 @@
BlobStoreManagerFactory factory = ReflectionUtil.getObj(blobStoreManagerFactory, BlobStoreManagerFactory.class);
BlobStoreManager blobStoreManager = factory.getRestoreBlobStoreManager(config, restoreExecutor);
BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(metricsRegistry);
- return new BlobStoreRestoreManager(taskModel, restoreExecutor, metrics, config, loggedStoreBaseDir,
+ return new BlobStoreRestoreManager(taskModel, restoreExecutor, storesToRestore, metrics, config, loggedStoreBaseDir,
nonLoggedStoreBaseDir, new StorageManagerUtil(), blobStoreManager);
}
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/index/serde/SnapshotIndexSerde.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/index/serde/SnapshotIndexSerde.java
index 2e0e50c..9e316ad 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/index/serde/SnapshotIndexSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/index/serde/SnapshotIndexSerde.java
@@ -24,9 +24,8 @@
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import org.apache.samza.SamzaException;
-import org.apache.samza.checkpoint.CheckpointId;
-import org.apache.samza.serializers.JsonCheckpointIdMixin;
import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.storage.blobstore.index.DirIndex;
import org.apache.samza.storage.blobstore.index.FileBlob;
import org.apache.samza.storage.blobstore.index.FileIndex;
@@ -37,7 +36,7 @@
public class SnapshotIndexSerde implements Serde<SnapshotIndex> {
- private final static ObjectMapper MAPPER = new ObjectMapper();
+ private final static ObjectMapper MAPPER = SamzaObjectMapper.getObjectMapper();
private TypeReference<SnapshotIndex> typeReference;
private final ObjectWriter objectWriter;
@@ -48,8 +47,7 @@
.addMixIn(DirIndex.class, JsonDirIndexMixin.class)
.addMixIn(FileIndex.class, JsonFileIndexMixin.class)
.addMixIn(FileMetadata.class, JsonFileMetadataMixin.class)
- .addMixIn(FileBlob.class, JsonFileBlobMixin.class)
- .addMixIn(CheckpointId.class, JsonCheckpointIdMixin.class);
+ .addMixIn(FileBlob.class, JsonFileBlobMixin.class);
this.typeReference = new TypeReference<SnapshotIndex>() { };
this.objectWriter = MAPPER.writerFor(typeReference);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
index 9c8b61f..b312b36 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
@@ -456,7 +456,7 @@
* @return A future containing the {@link FileIndex} for the uploaded file.
*/
@VisibleForTesting
- CompletableFuture<FileIndex> putFile(File file, SnapshotMetadata snapshotMetadata) {
+ public CompletableFuture<FileIndex> putFile(File file, SnapshotMetadata snapshotMetadata) {
if (file == null || !file.isFile()) {
String message = file != null ? "Dir or Symbolic link" : "null";
throw new SamzaException(String.format("Required a non-null parameter of type file, provided: %s", message));
@@ -483,7 +483,7 @@
fileBlobFuture = blobStoreManager.put(inputStream, metadata)
.thenApplyAsync(id -> {
- LOG.trace("Put complete. Closing input stream for file: {}.", file.getPath());
+ LOG.trace("Put complete. Received Blob ID {}. Closing input stream for file: {}.", id, file.getPath());
try {
finalInputStream.close();
} catch (Exception e) {
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 92b4e59..1f98b22 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -185,22 +185,38 @@
taskNames.foreach(checkpointManager.register)
checkpointManager.start()
- // TODO dchen make add support for checkpointv2
+ // Get preferred read version for the checkpoint application
+ val checkpointReadVersion = taskConfig.getCheckpointReadVersions.get(0)
+ val defaultCheckpoint = if (checkpointReadVersion == 1) {
+ new CheckpointV1(new java.util.HashMap[SystemStreamPartition, String]())
+ } else if (checkpointReadVersion == 2) {
+ new CheckpointV2(CheckpointId.create(), new java.util.HashMap[SystemStreamPartition, String](),
+ new java.util.HashMap[String, util.Map[String, String]]())
+ } else {
+ throw new SamzaException("Unrecognized checkpoint read version: " + checkpointReadVersion)
+ }
+
val lastCheckpoints = taskNames.map(taskName => {
taskName -> Option(checkpointManager.readLastCheckpoint(taskName))
- .getOrElse(new CheckpointV1(new java.util.HashMap[SystemStreamPartition, String]()))
- .getOffsets
- .asScala
- .toMap
+ .getOrElse(defaultCheckpoint)
}).toMap
- lastCheckpoints.foreach(lcp => logCheckpoint(lcp._1, lcp._2, "Current checkpoint for task: "+ lcp._1))
+ lastCheckpoints.foreach(lcp => logCheckpoint(lcp._1, lcp._2.getOffsets.asScala.toMap,
+ "Current checkpoint for task: " + lcp._1))
if (newOffsets != null) {
newOffsets.foreach {
case (taskName: TaskName, offsets: Map[SystemStreamPartition, String]) =>
logCheckpoint(taskName, offsets, "New offset to be written for task: " + taskName)
- val checkpoint = new CheckpointV1(offsets.asJava)
+ val checkpoint = if (checkpointReadVersion == 1) {
+ new CheckpointV1(offsets.asJava)
+ } else if (checkpointReadVersion == 2) {
+ val lastSCMs = lastCheckpoints.getOrElse(taskName, defaultCheckpoint)
+ .asInstanceOf[CheckpointV2].getStateCheckpointMarkers
+ new CheckpointV2(CheckpointId.create(), offsets.asJava, lastSCMs)
+ } else {
+ throw new SamzaException("Unrecognized checkpoint read version: " + checkpointReadVersion)
+ }
checkpointManager.writeCheckpoint(taskName, checkpoint)
info(s"Updated the checkpoint of the task: $taskName to: $offsets")
}
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
index b740a13..a068e2a 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
@@ -43,6 +43,9 @@
def getCheckpointFile(taskName: TaskName) = getFile(jobName, taskName, "checkpoints")
def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
+ if (!checkpoint.isInstanceOf[CheckpointV1]) {
+ throw new SamzaException("Unsupported checkpoint version: " + checkpoint.getVersion)
+ }
val bytes = serde.toBytes(checkpoint.asInstanceOf[CheckpointV1])
val fos = new FileOutputStream(getCheckpointFile(taskName))
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index b3360f2..fe27552 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -23,12 +23,10 @@
import java.lang.management.ManagementFactory
import java.net.{URL, UnknownHostException}
import java.nio.file.Path
-import java.time.Duration
import java.util
import java.util.concurrent._
import java.util.function.Consumer
import java.util.{Base64, Optional}
-
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.samza.SamzaException
@@ -502,9 +500,22 @@
val loggedStorageBaseDir = getLoggedStorageBaseDir(jobConfig, defaultStoreBaseDir)
info("Got base directory for logged data stores: %s" format loggedStorageBaseDir)
- // TODO dchen should we enforce restore factories to be subset of backup factories?
- val stateStorageBackendRestoreFactory = ReflectionUtil
- .getObj(storageConfig.getRestoreFactory(), classOf[StateBackendFactory])
+ val backupFactoryNames = storageConfig.getBackupFactories
+ val restoreFactoryNames = storageConfig.getRestoreFactories
+
+ // Restore factories should be a subset of backup factories
+ if (!backupFactoryNames.containsAll(restoreFactoryNames)) {
+ backupFactoryNames.removeAll(restoreFactoryNames)
+ throw new SamzaException("Restore state backend factories is not a subset of backup state backend factories, " +
+ "missing factories: " + backupFactoryNames.toString)
+ }
+
+ val stateStorageBackendBackupFactories = backupFactoryNames.asScala.map(
+ ReflectionUtil.getObj(_, classOf[StateBackendFactory])
+ )
+ val stateStorageBackendRestoreFactories = restoreFactoryNames.asScala.map(
+ factoryName => (factoryName , ReflectionUtil.getObj(factoryName, classOf[StateBackendFactory])))
+ .toMap.asJava
val containerStorageManager = new ContainerStorageManager(
checkpointManager,
@@ -521,7 +532,7 @@
samzaContainerMetrics,
jobContext,
containerContext,
- stateStorageBackendRestoreFactory,
+ stateStorageBackendRestoreFactories,
taskCollectors.asJava,
loggedStorageBaseDir,
nonLoggedStorageBaseDir,
@@ -530,9 +541,7 @@
storeWatchPaths.addAll(containerStorageManager.getStoreDirectoryPaths)
- val stateStorageBackendBackupFactories = storageConfig.getBackupFactories().asScala.map(
- ReflectionUtil.getObj(_, classOf[StateBackendFactory])
- )
+
// Create taskInstances
val taskInstances: Map[TaskName, TaskInstance] = taskModels
@@ -684,19 +693,6 @@
containerStorageManager = containerStorageManager,
diagnosticsManager = diagnosticsManager)
}
-
- /**
- * Builds the set of SSPs for all changelogs on this container.
- */
- @VisibleForTesting
- private[container] def getChangelogSSPsForContainer(containerModel: ContainerModel,
- changeLogSystemStreams: util.Map[String, SystemStream]): Set[SystemStreamPartition] = {
- containerModel.getTasks.values().asScala
- .map(taskModel => taskModel.getChangelogPartition)
- .flatMap(changelogPartition => changeLogSystemStreams.asScala.map { case (_, systemStream) =>
- new SystemStreamPartition(systemStream, changelogPartition) })
- .toSet
- }
}
class SamzaContainer(
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 8a872de..f91aae1 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -290,6 +290,7 @@
if (!commitInProgress.tryAcquire(commitTimeoutMs, TimeUnit.MILLISECONDS)) {
val timeSinceLastCommit = System.currentTimeMillis() - lastCommitStartTimeMs
+ metrics.commitsTimedOut.set(metrics.commitsTimedOut.getValue + 1)
throw new SamzaException("Timeout waiting for pending commit for taskName: %s to finish. " +
"%s ms have elapsed since the pending commit started. Max allowed commit delay is %s ms " +
"and commit timeout beyond that is %s ms" format (taskName, timeSinceLastCommit,
@@ -356,14 +357,11 @@
val checkpointWriteFuture: CompletableFuture[util.Map[String, util.Map[String, String]]] =
uploadSCMsFuture.thenApplyAsync(writeCheckpoint(checkpointId, inputOffsets), commitThreadPool)
- val cleanupStartTimeNs = System.nanoTime()
val cleanUpFuture: CompletableFuture[Void] =
checkpointWriteFuture.thenComposeAsync(cleanUp(checkpointId), commitThreadPool)
cleanUpFuture.whenComplete(new BiConsumer[Void, Throwable] {
override def accept(v: Void, throwable: Throwable): Unit = {
- if (throwable == null) {
- metrics.asyncCleanupNs.update(System.nanoTime() - cleanupStartTimeNs)
- } else {
+ if (throwable != null) {
warn("Commit cleanup did not complete successfully for taskName: %s checkpointId: %s with error msg: %s"
format (taskName, checkpointId, throwable.getMessage))
}
@@ -380,6 +378,7 @@
}
})
+ metrics.lastCommitNs.set(System.nanoTime() - commitStartNs)
metrics.commitSyncNs.update(System.nanoTime() - commitStartNs)
debug("Finishing sync stage of commit for taskName: %s checkpointId: %s" format (taskName, checkpointId))
}
@@ -428,6 +427,7 @@
override def apply(uploadSCMs: util.Map[String, util.Map[String, String]]): CompletableFuture[Void] = {
// Perform cleanup on unused checkpoints
debug("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId))
+ val cleanUpStartTime = System.nanoTime()
try {
commitManager.cleanUp(checkpointId, uploadSCMs)
} catch {
@@ -438,6 +438,8 @@
throw new SamzaException(
"Failed to remove old checkpoint state for taskName: %s checkpointId: %s."
format(taskName, checkpointId), e)
+ } finally {
+ metrics.asyncCleanupNs.update(System.nanoTime() - cleanUpStartTime)
}
}
}
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
index f13e37a..54d3665 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
@@ -38,9 +38,10 @@
val pendingMessages = newGauge("pending-messages", 0)
val messagesInFlight = newGauge("messages-in-flight", 0)
val asyncCallbackCompleted = newCounter("async-callback-complete-calls")
-
+ val commitsTimedOut = newGauge("commits-timed-out", 0)
val commitsSkipped = newGauge("commits-skipped", 0)
val commitNs = newTimer("commit-ns")
+ val lastCommitNs = newGauge("last-commit-ns", 0L)
val commitSyncNs = newTimer("commit-sync-ns")
val commitAsyncNs = newTimer("commit-async-ns")
val snapshotNs = newTimer("snapshot-ns")
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 5cc79b3..f804a5e 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -44,6 +44,7 @@
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV2;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StorageConfig;
@@ -120,7 +121,6 @@
private static final int RESTORE_THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS = 60;
/** Maps containing relevant per-task objects */
- private final Map<TaskName, TaskRestoreManager> taskRestoreManagers;
private final Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics;
private final Map<TaskName, TaskInstanceCollector> taskInstanceCollectors;
private final Map<TaskName, Map<String, StorageEngine>> inMemoryStores; // subset of taskStores after #start()
@@ -131,6 +131,8 @@
private final Map<String, SystemStream> changelogSystemStreams; // Map of changelog system-streams indexed by store name
private final Map<String, Serde<Object>> serdes; // Map of Serde objects indexed by serde name (specified in config)
private final SystemAdmins systemAdmins;
+ private final Clock clock;
+ private final Map<String, StateBackendFactory> restoreStateBackendFactories;
private final StreamMetadataCache streamMetadataCache;
private final SamzaContainerMetrics samzaContainerMetrics;
@@ -153,7 +155,7 @@
private final Set<String> sideInputStoreNames;
private final Map<SystemStreamPartition, TaskSideInputHandler> sspSideInputHandlers;
private SystemConsumers sideInputSystemConsumers;
- private volatile Map<TaskName, CountDownLatch> sideInputTaskLatches; // Used by the sideInput-read thread to signal to the main thread
+ private final Map<TaskName, CountDownLatch> sideInputTaskLatches; // Used by the sideInput-read thread to signal to the main thread
private volatile boolean shouldShutdown = false;
private RunLoop sideInputRunLoop;
@@ -183,7 +185,7 @@
SamzaContainerMetrics samzaContainerMetrics,
JobContext jobContext,
ContainerContext containerContext,
- StateBackendFactory stateBackendFactory,
+ Map<String, StateBackendFactory> restoreStateBackendFactories,
Map<TaskName, TaskInstanceCollector> taskInstanceCollectors,
File loggedStoreBaseDirectory,
File nonLoggedStoreBaseDirectory,
@@ -203,6 +205,8 @@
LOG.info("Starting with changelogSystemStreams = {} taskSideInputStoreSSPs = {}", this.changelogSystemStreams, this.taskSideInputStoreSSPs);
+ this.clock = clock;
+ this.restoreStateBackendFactories = restoreStateBackendFactories;
this.storageEngineFactories = storageEngineFactories;
this.serdes = serdes;
this.loggedStoreBaseDirectory = loggedStoreBaseDirectory;
@@ -260,16 +264,13 @@
JobConfig jobConfig = new JobConfig(config);
int restoreThreadPoolSize =
Math.min(
- Math.max(containerModel.getTasks().size() * 2, jobConfig.getRestoreThreadPoolSize()),
+ Math.max(containerModel.getTasks().size() * restoreStateBackendFactories.size() * 2,
+ jobConfig.getRestoreThreadPoolSize()),
jobConfig.getRestoreThreadPoolMaxSize()
);
this.restoreExecutor = Executors.newFixedThreadPool(restoreThreadPoolSize,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
- // creating task restore managers
- this.taskRestoreManagers = createTaskRestoreManagers(stateBackendFactory, clock,
- this.samzaContainerMetrics);
-
this.sspSideInputHandlers = createSideInputHandlers(clock);
// create SystemConsumers for consuming from taskSideInputSSPs, if sideInputs are being used
@@ -393,27 +394,95 @@
return storeConsumers;
}
- private Map<TaskName, TaskRestoreManager> createTaskRestoreManagers(StateBackendFactory factory, Clock clock,
- SamzaContainerMetrics samzaContainerMetrics) {
- Map<TaskName, TaskRestoreManager> taskRestoreManagers = new HashMap<>();
+ private Map<String, TaskRestoreManager> createTaskRestoreManagers(Map<String, StateBackendFactory> factories,
+ Map<String, Set<String>> backendFactoryStoreNames, Clock clock, SamzaContainerMetrics samzaContainerMetrics, TaskName taskName,
+ TaskModel taskModel) {
+ // Get the factories for the task based on the stores of the tasks to be restored from the factory
+ Map<String, TaskRestoreManager> backendFactoryRestoreManagers = new HashMap<>(); // backendFactoryName -> restoreManager
+ MetricsRegistry taskMetricsRegistry =
+ taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap();
- containerModel.getTasks().forEach((taskName, taskModel) -> {
- MetricsRegistry taskMetricsRegistry =
- taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap();
- Set<String> nonSideInputStoreNames = storageEngineFactories.keySet().stream()
- .filter(storeName -> !sideInputStoreNames.contains(storeName))
- .collect(Collectors.toSet());
+ backendFactoryStoreNames.forEach((factoryName, storeNames) -> {
+ StateBackendFactory factory = factories.get(factoryName);
KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new KafkaChangelogRestoreParams(storeConsumers,
inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), storageEngineFactories, serdes,
- taskInstanceCollectors.get(taskName), nonSideInputStoreNames);
+ taskInstanceCollectors.get(taskName));
+ TaskRestoreManager restoreManager = factory.getRestoreManager(jobContext, containerContext, taskModel, restoreExecutor,
+ taskMetricsRegistry, storeNames, config, clock, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory,
+ kafkaChangelogRestoreParams);
- taskRestoreManagers.put(taskName,
- factory.getRestoreManager(jobContext, containerContext, taskModel, restoreExecutor,
- taskMetricsRegistry, config, clock, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory,
- kafkaChangelogRestoreParams));
- samzaContainerMetrics.addStoresRestorationGauge(taskName);
+ backendFactoryRestoreManagers.put(factoryName, restoreManager);
});
- return taskRestoreManagers;
+ samzaContainerMetrics.addStoresRestorationGauge(taskName);
+ return backendFactoryRestoreManagers;
+ }
+
+ /**
+ * Return a map of backend factory names to set of stores that should be restored using it
+ */
+ @VisibleForTesting
+ Map<String, Set<String>> getBackendFactoryStoreNames(Checkpoint checkpoint, Set<String> storeNames,
+ StorageConfig storageConfig) {
+ Map<String, Set<String>> backendFactoryStoreNames = new HashMap<>(); // backendFactoryName -> set(storeNames)
+
+ if (checkpoint != null && checkpoint.getVersion() == 1) {
+ // Only restore stores with changelog streams configured
+ Set<String> changelogStores = storeNames.stream()
+ .filter(storeName -> storageConfig.getChangelogStream(storeName).isPresent())
+ .collect(Collectors.toSet());
+ // Default to changelog backend factory when using checkpoint v1 for backwards compatibility
+ if (!changelogStores.isEmpty()) {
+ backendFactoryStoreNames.put(StorageConfig.KAFKA_STATE_BACKEND_FACTORY, changelogStores);
+ }
+ if (storeNames.size() > changelogStores.size()) {
+ Set<String> nonChangelogStores = storeNames.stream()
+ .filter(storeName -> !changelogStores.contains(storeName))
+ .collect(Collectors.toSet());
+ LOG.info("non-Side input stores: {}, do not have a configured store changelogs for checkpoint V1,"
+ + "restore for the store will be skipped",
+ nonChangelogStores);
+ }
+ } else if (checkpoint == null || checkpoint.getVersion() == 2) {
+ // Extract the state checkpoint markers if checkpoint exists
+ Map<String, Map<String, String>> stateCheckpointMarkers = checkpoint == null ? Collections.emptyMap() :
+ ((CheckpointV2) checkpoint).getStateCheckpointMarkers();
+
+ // Find stores associated to each state backend factory
+ storeNames.forEach(storeName -> {
+ List<String> storeFactories = storageConfig.getStoreRestoreFactories(storeName);
+
+ if (storeFactories.isEmpty()) {
+ // If the restore factory is not configured for the store and the store does not have a changelog topic
+ LOG.info("non-Side input store: {}, does not have a configured restore factories nor store changelogs,"
+ + "restore for the store will be skipped",
+ storeName);
+ } else {
+ // Search the ordered list for the first matched state backend factory in the checkpoint
+ // If the checkpoint does not exist or state checkpoint markers does not exist, we match the first configured
+ // restore manager
+ Optional<String> factoryNameOpt = storeFactories.stream()
+ .filter(factoryName -> stateCheckpointMarkers.containsKey(factoryName) &&
+ stateCheckpointMarkers.get(factoryName).containsKey(storeName))
+ .findFirst();
+ String factoryName;
+ if (factoryNameOpt.isPresent()) {
+ factoryName = factoryNameOpt.get();
+ } else { // Restore factories configured but no checkpoints found
+ // Use first configured restore factory
+ factoryName = storeFactories.get(0);
+ LOG.warn("No matching checkpoints found for configured factories: {}, " +
+ "defaulting to using the first configured factory with no checkpoints", storeFactories);
+ }
+ if (!backendFactoryStoreNames.containsKey(factoryName)) {
+ backendFactoryStoreNames.put(factoryName, new HashSet<>());
+ }
+ backendFactoryStoreNames.get(factoryName).add(storeName);
+ }
+ });
+ } else {
+ throw new SamzaException(String.format("Unsupported checkpoint version %s", checkpoint.getVersion()));
+ }
+ return backendFactoryStoreNames;
}
// Helper method to filter active Tasks from the container model
@@ -443,9 +512,9 @@
}
for (String storeName : storesToCreate) {
- List<String> storeBackupManager = storageConfig.getStoreBackupFactory(storeName);
+ List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
// A store is considered durable if it is backed by a changelog or another backupManager factory
- boolean isDurable = changelogSystemStreams.containsKey(storeName) || !storeBackupManager.isEmpty();
+ boolean isDurable = changelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty();
boolean isSideInput = this.sideInputStoreNames.contains(storeName);
// Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir
// for non logged stores
@@ -467,7 +536,7 @@
// add created store to map
taskStores.get(taskName).put(storeName, storageEngine);
- LOG.info("Created non side input store store {} in read-write mode for task {}", storeName, taskName);
+ LOG.info("Created task store {} in read-write mode for task {} in path {}", storeName, taskName, storeDirectory.getAbsolutePath());
}
}
return taskStores;
@@ -646,30 +715,53 @@
private void restoreStores() throws InterruptedException {
LOG.info("Store Restore started");
Set<TaskName> activeTasks = getTasks(containerModel, TaskMode.Active).keySet();
+ // TODO HIGH dchen verify davinci lifecycle
+ // Find all non-side input stores
+ Set<String> nonSideInputStoreNames = storageEngineFactories.keySet()
+ .stream()
+ .filter(storeName -> !sideInputStoreNames.contains(storeName))
+ .collect(Collectors.toSet());
- // initialize each TaskStorageManager
- this.taskRestoreManagers.forEach((taskName, taskRestoreManager) -> {
+ // Obtain the checkpoints for each task
+ Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers = new HashMap<>();
+ Map<TaskName, Checkpoint> taskCheckpoints = new HashMap<>();
+ containerModel.getTasks().forEach((taskName, taskModel) -> {
Checkpoint taskCheckpoint = null;
if (checkpointManager != null && activeTasks.contains(taskName)) {
// only pass in checkpoints for active tasks
taskCheckpoint = checkpointManager.readLastCheckpoint(taskName);
LOG.info("Obtained checkpoint: {} for state restore for taskName: {}", taskCheckpoint, taskName);
}
- taskRestoreManager.init(taskCheckpoint);
+ taskCheckpoints.put(taskName, taskCheckpoint);
+ Map<String, Set<String>> backendFactoryStoreNames = getBackendFactoryStoreNames(taskCheckpoint, nonSideInputStoreNames,
+ new StorageConfig(config));
+ Map<String, TaskRestoreManager> taskStoreRestoreManagers = createTaskRestoreManagers(restoreStateBackendFactories,
+ backendFactoryStoreNames, clock, samzaContainerMetrics, taskName, taskModel);
+ taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
});
- // Start each store consumer once
+ // Initialize each TaskStorageManager
+ taskRestoreManagers.forEach((taskName, restoreManagers) ->
+ restoreManagers.forEach((factoryName, taskRestoreManager) ->
+ taskRestoreManager.init(taskCheckpoints.get(taskName))
+ )
+ );
+
+ // Start each store consumer once.
+ // Note: These consumers are per system and only changelog system store consumers will be started.
+ // Some TaskRestoreManagers may not require the consumer to to be started, but due to the agnostic nature of
+ // ContainerStorageManager we always start the changelog consumer here in case it is required
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
- List<Future> taskRestoreFutures = new ArrayList<>(this.taskRestoreManagers.entrySet().size());
+ List<Future> taskRestoreFutures = new ArrayList<>();
// Submit restore callable for each taskInstance
- this.taskRestoreManagers.forEach((taskInstance, taskRestoreManager) -> {
- taskRestoreFutures.add(restoreExecutor.submit(
- new TaskRestoreCallable(this.samzaContainerMetrics, taskInstance, taskRestoreManager)));
- });
+ taskRestoreManagers.forEach((taskInstance, restoreManagersMap) ->
+ // Submit for each restore factory
+ restoreManagersMap.forEach((factoryName, taskRestoreManager) -> taskRestoreFutures.add(restoreExecutor.submit(
+ new TaskRestoreCallable(this.samzaContainerMetrics, taskInstance, taskRestoreManager)))));
- // loop-over the future list to wait for each thread to finish, catch any exceptions during restore and throw
+ // Loop-over the future list to wait for each thread to finish, catch any exceptions during restore and throw
// as samza exceptions
for (Future future : taskRestoreFutures) {
try {
@@ -689,9 +781,6 @@
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop);
// Now create persistent non side input stores in read-write mode, leave non-persistent stores as-is
- Set<String> nonSideInputStoreNames = storageEngineFactories.keySet().stream()
- .filter(storeName -> !sideInputStoreNames.contains(storeName))
- .collect(Collectors.toSet());
this.taskStores = createTaskStores(nonSideInputStoreNames, this.containerModel, jobContext, containerContext,
storageEngineFactories, serdes, taskInstanceMetrics, taskInstanceCollectors);
// Add in memory stores
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index 91f1fa1..5dba698 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -30,6 +30,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemStream;
import org.junit.Test;
import static org.apache.samza.config.StorageConfig.*;
@@ -71,7 +72,7 @@
}
/**
- * Test verifies that the {@link StorageConfig#STORE_RESTORE_FACTORY} which matches pattern for store.%s.factory
+ * Test verifies that the {@link StorageConfig#STORE_RESTORE_FACTORIES} which matches pattern for store.%s.factory
* is not picked up as in store names list
*/
@Test
@@ -83,8 +84,7 @@
// has stores
StorageConfig storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0), "store0.factory.class",
- String.format(StorageConfig.FACTORY, STORE_NAME1), "store1.factory.class",
- STORE_RESTORE_FACTORY, "org.apache.class")));
+ String.format(StorageConfig.FACTORY, STORE_NAME1), "store1.factory.class", STORE_RESTORE_FACTORIES, "org.apache.class")));
List<String> actual = storageConfig.getStoreNames();
// ordering shouldn't matter
@@ -101,57 +101,72 @@
StorageConfig storageConfig = new StorageConfig(
new MapConfig(ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), "")));
assertEquals(Optional.empty(), storageConfig.getChangelogStream(STORE_NAME0));
+ assertEquals(Collections.emptyMap(), storageConfig.getStoreChangelogs());
// store has full changelog system-stream defined
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0),
- "changelog-system.changelog-stream0")));
+ "changelog-system.changelog-stream0", String.format(FACTORY, STORE_NAME0), "store0.factory.class")));
assertEquals(Optional.of("changelog-system.changelog-stream0"), storageConfig.getChangelogStream(STORE_NAME0));
+ assertEquals(ImmutableMap.of(STORE_NAME0, new SystemStream("changelog-system", "changelog-stream0")), storageConfig.getStoreChangelogs());
// store has changelog stream defined, but system comes from job.changelog.system
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), "changelog-stream0",
+ String.format(FACTORY, STORE_NAME0), "store0.factory.class",
StorageConfig.CHANGELOG_SYSTEM, "changelog-system")));
assertEquals(Optional.of("changelog-system.changelog-stream0"), storageConfig.getChangelogStream(STORE_NAME0));
+ assertEquals(ImmutableMap.of(STORE_NAME0, new SystemStream("changelog-system", "changelog-stream0")), storageConfig.getStoreChangelogs());
// batch mode: create unique stream name
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0),
- "changelog-system.changelog-stream0", ApplicationConfig.APP_MODE,
+ "changelog-system.changelog-stream0", String.format(FACTORY, STORE_NAME0), "store0.factory.class",
+ ApplicationConfig.APP_MODE,
ApplicationConfig.ApplicationMode.BATCH.name().toLowerCase(), ApplicationConfig.APP_RUN_ID, "run-id")));
assertEquals(Optional.of("changelog-system.changelog-stream0-run-id"),
storageConfig.getChangelogStream(STORE_NAME0));
+ assertEquals(ImmutableMap.of(STORE_NAME0, new SystemStream("changelog-system", "changelog-stream0-run-id")), storageConfig.getStoreChangelogs());
// job has no changelog stream defined
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system", JobConfig.JOB_DEFAULT_SYSTEM,
- "should-not-be-used")));
+ "should-not-be-used", String.format(FACTORY, STORE_NAME0), "store0.factory.class")));
assertEquals(Optional.empty(), storageConfig.getChangelogStream(STORE_NAME0));
+ assertEquals(Collections.emptyMap(), storageConfig.getStoreChangelogs());
// job.changelog.system takes precedence over job.default.system when changelog is specified as just streamName
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system", JobConfig.JOB_DEFAULT_SYSTEM,
- "should-not-be-used", String.format(CHANGELOG_STREAM, STORE_NAME0), "streamName")));
+ "should-not-be-used", String.format(CHANGELOG_STREAM, STORE_NAME0), "streamName",
+ String.format(FACTORY, STORE_NAME0), "store0.factory.class")));
assertEquals("changelog-system.streamName", storageConfig.getChangelogStream(STORE_NAME0).get());
+ assertEquals(ImmutableMap.of(STORE_NAME0, new SystemStream("changelog-system", "streamName")), storageConfig.getStoreChangelogs());
// job.changelog.system takes precedence over job.default.system when changelog is specified as {systemName}.{streamName}
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system", JobConfig.JOB_DEFAULT_SYSTEM,
- "should-not-be-used", String.format(CHANGELOG_STREAM, STORE_NAME0), "changelog-system.streamName")));
+ "should-not-be-used", String.format(CHANGELOG_STREAM, STORE_NAME0), "changelog-system.streamName",
+ String.format(FACTORY, STORE_NAME0), "store0.factory.class")));
assertEquals("changelog-system.streamName", storageConfig.getChangelogStream(STORE_NAME0).get());
+ assertEquals(ImmutableMap.of(STORE_NAME0, new SystemStream("changelog-system", "streamName")), storageConfig.getStoreChangelogs());
// systemName specified using stores.{storeName}.changelog = {systemName}.{streamName} should take precedence even
// when job.changelog.system and job.default.system are specified
storageConfig = new StorageConfig(new MapConfig(
ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "default-changelog-system",
JobConfig.JOB_DEFAULT_SYSTEM, "default-system",
- String.format(CHANGELOG_STREAM, STORE_NAME0), "nondefault-changelog-system.streamName")));
+ String.format(CHANGELOG_STREAM, STORE_NAME0), "nondefault-changelog-system.streamName",
+ String.format(FACTORY, STORE_NAME0), "store0.factory.class")));
assertEquals("nondefault-changelog-system.streamName", storageConfig.getChangelogStream(STORE_NAME0).get());
+ assertEquals(ImmutableMap.of(STORE_NAME0, new SystemStream("nondefault-changelog-system", "streamName")), storageConfig.getStoreChangelogs());
// fall back to job.default.system if job.changelog.system is not specified
storageConfig = new StorageConfig(new MapConfig(
- ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "default-system", String.format(CHANGELOG_STREAM, STORE_NAME0), "streamName")));
+ ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "default-system", String.format(CHANGELOG_STREAM, STORE_NAME0),
+ "streamName", String.format(FACTORY, STORE_NAME0), "store0.factory.class")));
assertEquals("default-system.streamName", storageConfig.getChangelogStream(STORE_NAME0).get());
+ assertEquals(ImmutableMap.of(STORE_NAME0, new SystemStream("default-system", "streamName")), storageConfig.getStoreChangelogs());
}
@Test(expected = SamzaException.class)
@@ -188,12 +203,12 @@
assertTrue(factories.contains(factory3));
assertTrue(factories.contains(KAFKA_STATE_BACKEND_FACTORY));
assertEquals(4, factories.size());
- assertEquals(ImmutableList.of(factory1, factory2), storageConfig.getStoreBackupFactory(STORE_NAME0));
- assertEquals(ImmutableList.of(factory1), storageConfig.getStoreBackupFactory(STORE_NAME1));
- assertEquals(ImmutableList.of(factory3), storageConfig.getStoreBackupFactory(STORE_NAME2));
- assertEquals(DEFAULT_BACKUP_FACTORIES, storageConfig.getStoreBackupFactory(STORE_NAME3));
- assertTrue(storageConfig.getStoreBackupFactory("emptyStore").isEmpty());
- assertTrue(storageConfig.getStoreBackupFactory("noFactoryStore").isEmpty());
+ assertEquals(ImmutableList.of(factory1, factory2), storageConfig.getStoreBackupFactories(STORE_NAME0));
+ assertEquals(ImmutableList.of(factory1), storageConfig.getStoreBackupFactories(STORE_NAME1));
+ assertEquals(ImmutableList.of(factory3), storageConfig.getStoreBackupFactories(STORE_NAME2));
+ assertEquals(DEFAULT_BACKUP_FACTORIES, storageConfig.getStoreBackupFactories(STORE_NAME3));
+ assertTrue(storageConfig.getStoreBackupFactories("emptyStore").isEmpty());
+ assertTrue(storageConfig.getStoreBackupFactories("noFactoryStore").isEmpty());
}
@Test
@@ -415,4 +430,148 @@
configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0), String.valueOf(storeSpecificLagOverride));
assertEquals(storeSpecificLagOverride, new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));
}
+
+ @Test
+ public void testGetRestoreManagers() {
+ String storeName = "store1";
+ String storeName2 = "store2";
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put(String.format(FACTORY, storeName), "store1.factory.class");
+ configMap.put(String.format(FACTORY, storeName2), "store2.factory.class");
+
+ // empty config, return no restore managers
+ assertEquals(Collections.emptySet(), new StorageConfig(new MapConfig(configMap)).getRestoreFactories());
+ assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName));
+ assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName2));
+
+ // changelog set, should default to kafka state backend restore
+ String changelogStreamOverride = "changelogStream";
+ configMap.put(String.format(CHANGELOG_STREAM, storeName), changelogStreamOverride);
+ configMap.put(String.format(CHANGELOG_STREAM, storeName2), changelogStreamOverride);
+ configMap.put(StorageConfig.CHANGELOG_SYSTEM, "changelog-system");
+ configMap.put(String.format(StorageConfig.CHANGELOG_STREAM, storeName), "changelog-stream0");
+ assertEquals(ImmutableSet.of(KAFKA_STATE_BACKEND_FACTORY), new StorageConfig(new MapConfig(configMap)).getRestoreFactories());
+ assertEquals(DEFAULT_RESTORE_FACTORIES, new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName));
+ assertEquals(DEFAULT_RESTORE_FACTORIES, new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName2));
+
+ // job restore manager config set should override to job backend factory
+ String jobRestoreFactory1 = "jobBackendRestoreFactory1";
+ String jobRestoreFactory2 = "jobBackendRestoreFactory2";
+ String jobRestoreFactoryOverride = jobRestoreFactory1 + "," + jobRestoreFactory2;
+ configMap.put(JOB_RESTORE_FACTORIES, jobRestoreFactoryOverride);
+ assertEquals(ImmutableSet.of(jobRestoreFactory1, jobRestoreFactory2),
+ new StorageConfig(new MapConfig(configMap)).getRestoreFactories());
+ assertEquals(ImmutableList.of(jobRestoreFactory1, jobRestoreFactory2),
+ new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName));
+ assertEquals(ImmutableList.of(jobRestoreFactory1, jobRestoreFactory2),
+ new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName2));
+
+ // store specific restore managers set
+ String storeRestoreFactory1 = "storeBackendRestoreFactory1";
+ String storeRestoreFactory2 = "storeBackendRestoreFactory2";
+ String storeRestoreFactoryOverride = storeRestoreFactory1 + "," + storeRestoreFactory2;
+ configMap.put(String.format(STORE_RESTORE_FACTORIES, storeName), storeRestoreFactoryOverride);
+ assertEquals(ImmutableSet.of(jobRestoreFactory1, jobRestoreFactory2, storeRestoreFactory1, storeRestoreFactory2),
+ new StorageConfig(new MapConfig(configMap)).getRestoreFactories());
+ assertEquals(ImmutableList.of(storeRestoreFactory1, storeRestoreFactory2),
+ new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName));
+ assertEquals(ImmutableList.of(jobRestoreFactory1, jobRestoreFactory2),
+ new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName2));
+
+ String emptyBackupFactory = "";
+ configMap.put(String.format(STORE_RESTORE_FACTORIES, storeName), emptyBackupFactory);
+ assertEquals(ImmutableSet.of(jobRestoreFactory1, jobRestoreFactory2),
+ new StorageConfig(new MapConfig(configMap)).getRestoreFactories());
+ assertEquals(Collections.emptyList(),
+ new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName));
+ assertEquals(ImmutableList.of(jobRestoreFactory1, jobRestoreFactory2),
+ new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName2));
+ assertEquals(Collections.emptyList(),
+ new StorageConfig(new MapConfig(configMap)).getStoresWithRestoreFactory(KAFKA_STATE_BACKEND_FACTORY));
+ assertEquals(Collections.emptyList(),
+ new StorageConfig(new MapConfig(configMap)).getStoresWithRestoreFactory(storeRestoreFactory1));
+ assertEquals(Collections.emptyList(),
+ new StorageConfig(new MapConfig(configMap)).getStoresWithRestoreFactory(storeRestoreFactory2));
+ }
+
+ @Test
+ public void testGetBackupManagers() {
+ String storeName = "store1";
+ String storeName2 = "store2";
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put(String.format(FACTORY, storeName), "store1.factory.class");
+ configMap.put(String.format(FACTORY, storeName2), "store2.factory.class");
+
+ // empty config, return no restore managers
+ assertEquals(Collections.emptySet(), new StorageConfig(new MapConfig(configMap)).getBackupFactories());
+ assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName));
+ assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2));
+
+ // changelog set, should default to kafka state backend restore
+ String changelogStreamOverride = "changelogStream";
+ configMap.put(String.format(CHANGELOG_STREAM, storeName), changelogStreamOverride);
+ configMap.put(String.format(CHANGELOG_STREAM, storeName2), changelogStreamOverride);
+ configMap.put(StorageConfig.CHANGELOG_SYSTEM, "changelog-system");
+ configMap.put(String.format(StorageConfig.CHANGELOG_STREAM, storeName), "changelog-stream0");
+ assertEquals(ImmutableSet.of(KAFKA_STATE_BACKEND_FACTORY), new StorageConfig(new MapConfig(configMap)).getBackupFactories());
+ assertEquals(DEFAULT_BACKUP_FACTORIES, new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName));
+ assertEquals(DEFAULT_BACKUP_FACTORIES, new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2));
+ assertEquals(ImmutableList.of(storeName2, storeName),
+ new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(KAFKA_STATE_BACKEND_FACTORY));
+
+ // job restore manager config set should override to job backend factory
+ String jobBackupFactory1 = "jobBackendBackupFactory1";
+ String jobBackupFactory2 = "jobBackendBackupFactory2";
+ String jobBackupFactoryOverride = jobBackupFactory1 + "," + jobBackupFactory2;
+ configMap.put(JOB_BACKUP_FACTORIES, jobBackupFactoryOverride);
+ assertEquals(ImmutableSet.of(jobBackupFactory1, jobBackupFactory2),
+ new StorageConfig(new MapConfig(configMap)).getBackupFactories());
+ assertEquals(ImmutableList.of(jobBackupFactory1, jobBackupFactory2),
+ new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName));
+ assertEquals(ImmutableList.of(jobBackupFactory1, jobBackupFactory2),
+ new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2));
+ assertEquals(Collections.emptyList(),
+ new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(KAFKA_STATE_BACKEND_FACTORY));
+ assertEquals(ImmutableList.of(storeName2, storeName),
+ new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory1));
+ assertEquals(ImmutableList.of(storeName2, storeName),
+ new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory2));
+
+ // store specific restore managers set
+ String storeBackupFactory1 = "storeBackendBackupFactory1";
+ String storeBackupFactory2 = "storeBackendBackupFactory2";
+ String storeBackupFactoryOverride = storeBackupFactory1 + "," + storeBackupFactory2;
+ configMap.put(String.format(STORE_BACKUP_FACTORIES, storeName), storeBackupFactoryOverride);
+ assertEquals(ImmutableSet.of(jobBackupFactory1, jobBackupFactory2, storeBackupFactory1, storeBackupFactory2),
+ new StorageConfig(new MapConfig(configMap)).getBackupFactories());
+ assertEquals(ImmutableList.of(storeBackupFactory1, storeBackupFactory2),
+ new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName));
+ assertEquals(ImmutableList.of(jobBackupFactory1, jobBackupFactory2),
+ new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2));
+ assertEquals(Collections.emptyList(),
+ new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(KAFKA_STATE_BACKEND_FACTORY));
+ assertEquals(ImmutableList.of(storeName2),
+ new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory1));
+ assertEquals(ImmutableList.of(storeName2),
+ new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory2));
+ assertEquals(ImmutableList.of(storeName),
+ new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(storeBackupFactory1));
+ assertEquals(ImmutableList.of(storeName),
+ new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(storeBackupFactory2));
+
+ String emptyBackupFactory = "";
+ configMap.put(String.format(STORE_BACKUP_FACTORIES, storeName), emptyBackupFactory);
+ assertEquals(ImmutableSet.of(jobBackupFactory1, jobBackupFactory2),
+ new StorageConfig(new MapConfig(configMap)).getBackupFactories());
+ assertEquals(Collections.emptyList(),
+ new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName));
+ assertEquals(ImmutableList.of(jobBackupFactory1, jobBackupFactory2),
+ new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2));
+ assertEquals(Collections.emptyList(),
+ new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(KAFKA_STATE_BACKEND_FACTORY));
+ assertEquals(Collections.emptyList(),
+ new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(storeBackupFactory1));
+ assertEquals(Collections.emptyList(),
+ new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(storeBackupFactory2));
+ }
}
diff --git a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
index 7e0da6d..ddc0c8e 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
@@ -28,8 +29,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.tuple.Pair;
@@ -176,7 +177,7 @@
TaskName taskName = mock(TaskName.class);
BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap());
metrics.initStoreMetrics(ImmutableList.of("storeName"));
- List<String> storesToRestore = ImmutableList.of("storeName");
+ Set<String> storesToRestore = ImmutableSet.of("storeName");
SnapshotIndex snapshotIndex = mock(SnapshotIndex.class);
Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
ImmutableMap.of("storeName", Pair.of("blobId", snapshotIndex));
@@ -222,7 +223,7 @@
TaskName taskName = mock(TaskName.class);
BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap());
metrics.initStoreMetrics(ImmutableList.of("storeName"));
- List<String> storesToRestore = ImmutableList.of("storeName");
+ Set<String> storesToRestore = ImmutableSet.of("storeName");
SnapshotIndex snapshotIndex = mock(SnapshotIndex.class);
Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
ImmutableMap.of("storeName", Pair.of("blobId", snapshotIndex));
@@ -274,7 +275,7 @@
TaskName taskName = mock(TaskName.class);
BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap());
metrics.initStoreMetrics(ImmutableList.of("storeName"));
- List<String> storesToRestore = ImmutableList.of("storeName");
+ Set<String> storesToRestore = ImmutableSet.of("storeName");
SnapshotIndex snapshotIndex = mock(SnapshotIndex.class);
Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
ImmutableMap.of("storeName", Pair.of("blobId", snapshotIndex));
@@ -332,7 +333,7 @@
TaskName taskName = mock(TaskName.class);
BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap());
metrics.initStoreMetrics(ImmutableList.of("newStoreName"));
- List<String> storesToRestore = ImmutableList.of("newStoreName"); // new store in config
+ Set<String> storesToRestore = ImmutableSet.of("newStoreName"); // new store in config
SnapshotIndex snapshotIndex = mock(SnapshotIndex.class);
Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes = mock(Map.class);
when(prevStoreSnapshotIndexes.containsKey("newStoreName")).thenReturn(false);
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 5bcbdcd..85b2011 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -19,8 +19,7 @@
package org.apache.samza.checkpoint
-import java.util.Properties
-
+import java.util.{Collections, Properties}
import org.apache.samza.Partition
import org.apache.samza.checkpoint.CheckpointTool.{CheckpointToolCommandLine, TaskNameToCheckpointMap}
import org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, MockSystemFactory}
@@ -32,9 +31,10 @@
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.system._
+import org.junit.Assert.{assertEquals, assertNotNull}
import org.junit.{Before, Test}
import org.mockito.Matchers._
-import org.mockito.Mockito
+import org.mockito.{ArgumentCaptor, Matchers, Mockito}
import org.mockito.Mockito._
import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mockito.MockitoSugar
@@ -67,8 +67,10 @@
val tn0 = new TaskName("Partition 0")
val tn1 = new TaskName("Partition 1")
+ val tn2 = new TaskName("Partition 2")
val p0 = new Partition(0)
val p1 = new Partition(1)
+ val p2 = new Partition(2)
@Before
def setup() {
@@ -83,8 +85,9 @@
).asJava)
config = JobPlanner.generateSingleJobConfig(userDefinedConfig)
val metadata = new SystemStreamMetadata("foo", Map[Partition, SystemStreamPartitionMetadata](
- new Partition(0) -> new SystemStreamPartitionMetadata("0", "100", "101"),
- new Partition(1) -> new SystemStreamPartitionMetadata("0", "200", "201")
+ p0 -> new SystemStreamPartitionMetadata("0", "100", "101"),
+ p1 -> new SystemStreamPartitionMetadata("0", "200", "201"),
+ p2 -> new SystemStreamPartitionMetadata("0", "300", "301")
).asJava)
TestCheckpointTool.checkpointManager = mock[CheckpointManager]
TestCheckpointTool.systemAdmin = mock[SystemAdmin]
@@ -94,6 +97,11 @@
.thenReturn(new CheckpointV1(Map(new SystemStreamPartition("test", "foo", p0) -> "1234").asJava))
when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn1))
.thenReturn(new CheckpointV1(Map(new SystemStreamPartition("test", "foo", p1) -> "4321").asJava))
+ when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn2))
+ .thenReturn(new CheckpointV2(null,
+ Map(new SystemStreamPartition("test", "foo", p2) -> "5678").asJava,
+ Map("BackupFactory"-> Map("StoreName"-> "offset").asJava).asJava
+ ))
}
@Test
@@ -119,6 +127,42 @@
}
@Test
+ def testOverwriteCheckpointV2() {
+ // Skips the v1 checkpoints for the task
+ when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn0))
+ .thenReturn(null)
+ when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn1))
+ .thenReturn(null)
+
+ val toOverwrite = Map(
+ tn0 -> Map(new SystemStreamPartition("test", "foo", p0) -> "42"),
+ tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43"),
+ tn2 -> Map(new SystemStreamPartition("test", "foo", p2) -> "45"))
+
+ val checkpointV2Config = new MapConfig(config, Map(TaskConfig.CHECKPOINT_READ_VERSIONS -> "2").asJava)
+
+ val argument = ArgumentCaptor.forClass(classOf[CheckpointV2])
+ val checkpointTool = CheckpointTool(checkpointV2Config, toOverwrite)
+
+ checkpointTool.run()
+ verify(TestCheckpointTool.checkpointManager)
+ .writeCheckpoint(Matchers.same(tn0), argument.capture())
+ assertNotNull(argument.getValue.getCheckpointId)
+ assertEquals(Map(new SystemStreamPartition("test", "foo", p0) -> "42").asJava, argument.getValue.getOffsets)
+ assertEquals(Collections.emptyMap(), argument.getValue.getStateCheckpointMarkers)
+ verify(TestCheckpointTool.checkpointManager)
+ .writeCheckpoint(Matchers.same(tn1), argument.capture())
+ assertNotNull(argument.getValue.getCheckpointId)
+ assertEquals(Map(new SystemStreamPartition("test", "foo", p1) -> "43").asJava, argument.getValue.getOffsets)
+ assertEquals(Collections.emptyMap(), argument.getValue.getStateCheckpointMarkers)
+ verify(TestCheckpointTool.checkpointManager)
+ .writeCheckpoint(Matchers.same(tn2), argument.capture())
+ assertNotNull(argument.getValue.getCheckpointId)
+ assertEquals(Map(new SystemStreamPartition("test", "foo", p2) -> "45").asJava, argument.getValue.getOffsets)
+ assertEquals(Map("BackupFactory"-> Map("StoreName"-> "offset").asJava).asJava, argument.getValue.getStateCheckpointMarkers)
+ }
+
+ @Test
def testGrouping(): Unit = {
val config : java.util.Properties = new Properties()
config.put("tasknames.Partition 0.systems.kafka-atc-repartitioned-requests.streams.ArticleRead.partitions.0", "0000")
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 606f86d..e357fe5 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -227,6 +227,8 @@
when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
val commitTimer = mock[Timer]
when(this.metrics.commitNs).thenReturn(commitTimer)
+ val lastCommitGauge = mock[Gauge[Long]]
+ when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val commitSyncTimer = mock[Timer]
when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
val commitAsyncTimer = mock[Timer]
@@ -320,6 +322,8 @@
when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
val commitTimer = mock[Timer]
when(this.metrics.commitNs).thenReturn(commitTimer)
+ val lastCommitGauge = mock[Gauge[Long]]
+ when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val commitSyncTimer = mock[Timer]
when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
val commitAsyncTimer = mock[Timer]
@@ -389,6 +393,8 @@
when(this.metrics.asyncUploadNs).thenReturn(uploadTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+ val lastCommitGauge = mock[Gauge[Long]]
+ when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = Map(SYSTEM_STREAM_PARTITION -> "4").asJava
val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]()
@@ -460,6 +466,8 @@
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+ val lastCommitGauge = mock[Gauge[Long]]
+ when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -510,6 +518,8 @@
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+ val lastCommitGauge = mock[Gauge[Long]]
+ when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -560,6 +570,8 @@
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+ val lastCommitGauge = mock[Gauge[Long]]
+ when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -611,6 +623,8 @@
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+ val lastCommitGauge = mock[Gauge[Long]]
+ when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -662,6 +676,8 @@
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+ val lastCommitGauge = mock[Gauge[Long]]
+ when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -714,6 +730,8 @@
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+ val lastCommitGauge = mock[Gauge[Long]]
+ when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -728,10 +746,19 @@
taskInstance.commit // async stage will be run by caller due to direct executor
+ val asyncCommitTimeCaptor = ArgumentCaptor.forClass(classOf[Long])
+ val uploadTimeCaptor = ArgumentCaptor.forClass(classOf[Long])
+ val cleanUpTimeCaptor = ArgumentCaptor.forClass(classOf[Long])
+
verify(commitsCounter).inc()
verify(snapshotTimer).update(anyLong())
verify(uploadTimer).update(anyLong())
verify(commitTimer).update(anyLong())
+ verify(commitAsyncTimer).update(asyncCommitTimeCaptor.capture())
+ verify(uploadTimer).update(uploadTimeCaptor.capture())
+ verify(cleanUpTimer).update(cleanUpTimeCaptor.capture())
+
+ assertTrue((cleanUpTimeCaptor.getValue + uploadTimeCaptor.getValue) < asyncCommitTimeCaptor.getValue)
taskInstance.commit
@@ -763,6 +790,8 @@
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+ val lastCommitGauge = mock[Gauge[Long]]
+ when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -794,7 +823,6 @@
verify(commitsCounter, times(1)).inc() // should only have been incremented once on the initial commit
verify(snapshotTimer).update(anyLong())
- verify(uploadTimer).update(anyLong())
verifyZeroInteractions(commitTimer)
cleanUpFuture.complete(null) // just to unblock shared executor
@@ -818,6 +846,8 @@
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+ val lastCommitGauge = mock[Gauge[Long]]
+ when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -879,6 +909,8 @@
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+ val lastCommitGauge = mock[Gauge[Long]]
+ when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index f381f3d..8645e4a 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -18,16 +18,24 @@
*/
package org.apache.samza.storage;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.checkpoint.CheckpointV1;
+import org.apache.samza.checkpoint.CheckpointV2;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.container.TaskInstance;
@@ -170,6 +178,8 @@
Map<String, String> configMap = new HashMap<>();
configMap.put("stores." + STORE_NAME + ".key.serde", "stringserde");
configMap.put("stores." + STORE_NAME + ".msg.serde", "stringserde");
+ configMap.put("stores." + STORE_NAME + ".factory", mockStorageEngineFactory.getClass().getName());
+ configMap.put("stores." + STORE_NAME + ".changelog", SYSTEM_NAME + "." + STREAM_NAME);
configMap.put("serializers.registry.stringserde.class", StringSerdeFactory.class.getName());
configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
Config config = new MapConfig(configMap);
@@ -215,7 +225,7 @@
ContainerModel mockContainerModel = new ContainerModel("samza-container-test", tasks);
when(mockContainerContext.getContainerModel()).thenReturn(mockContainerModel);
- // Reset the expected number of sysConsumer create, start and stop calls, and store.restore() calls
+ // Reset the expected number of sysConsumer create, start and stop calls, and store.restore() calls
this.systemConsumerCreationCount = 0;
this.systemConsumerStartCount = 0;
this.systemConsumerStopCount = 0;
@@ -223,7 +233,7 @@
StateBackendFactory backendFactory = mock(StateBackendFactory.class);
TaskRestoreManager restoreManager = mock(TaskRestoreManager.class);
- when(backendFactory.getRestoreManager(any(), any(), any(), any(), any(), any(), any(), any(), any(), any()))
+ when(backendFactory.getRestoreManager(any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any()))
.thenReturn(restoreManager);
doAnswer(invocation -> {
storeRestoreCallCount++;
@@ -246,7 +256,7 @@
samzaContainerMetrics,
mock(JobContext.class),
mockContainerContext,
- backendFactory,
+ ImmutableMap.of(StorageConfig.KAFKA_STATE_BACKEND_FACTORY, backendFactory),
mock(Map.class),
DEFAULT_LOGGED_STORE_BASE_DIR,
DEFAULT_STORE_BASE_DIR,
@@ -270,4 +280,224 @@
Assert.assertEquals("systemConsumerStopCount count should be 1", 1, this.systemConsumerStopCount);
Assert.assertEquals("systemConsumerStartCount count should be 1", 1, this.systemConsumerStartCount);
}
+
+ @Test
+ public void testNoConfiguredDurableStores() throws InterruptedException {
+ taskRestoreMetricGauges = new HashMap<>();
+ this.tasks = new HashMap<>();
+ this.taskInstanceMetrics = new HashMap<>();
+
+ // Add two mocked tasks
+ addMockedTask("task 0", 0);
+ addMockedTask("task 1", 1);
+
+ // Mock container metrics
+ samzaContainerMetrics = mock(SamzaContainerMetrics.class);
+ when(samzaContainerMetrics.taskStoreRestorationMetrics()).thenReturn(taskRestoreMetricGauges);
+
+ // Create mocked configs for specifying serdes
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("serializers.registry.stringserde.class", StringSerdeFactory.class.getName());
+ configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
+ Config config = new MapConfig(configMap);
+
+ Map<String, Serde<Object>> serdes = new HashMap<>();
+ serdes.put("stringserde", mock(Serde.class));
+
+ CheckpointManager checkpointManager = mock(CheckpointManager.class);
+ when(checkpointManager.readLastCheckpoint(any(TaskName.class))).thenReturn(new CheckpointV1(new HashMap<>()));
+
+ ContainerContext mockContainerContext = mock(ContainerContext.class);
+ ContainerModel mockContainerModel = new ContainerModel("samza-container-test", tasks);
+ when(mockContainerContext.getContainerModel()).thenReturn(mockContainerModel);
+
+ // Reset the expected number of sysConsumer create, start and stop calls, and store.restore() calls
+ this.systemConsumerCreationCount = 0;
+ this.systemConsumerStartCount = 0;
+ this.systemConsumerStopCount = 0;
+ this.storeRestoreCallCount = 0;
+
+ StateBackendFactory backendFactory = mock(StateBackendFactory.class);
+ TaskRestoreManager restoreManager = mock(TaskRestoreManager.class);
+ when(backendFactory.getRestoreManager(any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any()))
+ .thenReturn(restoreManager);
+ doAnswer(invocation -> {
+ storeRestoreCallCount++;
+ return null;
+ }).when(restoreManager).restore();
+
+ // Create the container storage manager
+ ContainerStorageManager containerStorageManager = new ContainerStorageManager(
+ checkpointManager,
+ mockContainerModel,
+ mock(StreamMetadataCache.class),
+ mock(SystemAdmins.class),
+ new HashMap<>(),
+ new HashMap<>(),
+ new HashMap<>(),
+ new HashMap<>(),
+ serdes,
+ config,
+ taskInstanceMetrics,
+ samzaContainerMetrics,
+ mock(JobContext.class),
+ mockContainerContext,
+ new HashMap<>(),
+ mock(Map.class),
+ DEFAULT_LOGGED_STORE_BASE_DIR,
+ DEFAULT_STORE_BASE_DIR,
+ null,
+ new SystemClock());
+
+ containerStorageManager.start();
+ containerStorageManager.shutdown();
+
+ for (Gauge gauge : taskRestoreMetricGauges.values()) {
+ Assert.assertTrue("Restoration time gauge value should never be invoked",
+ mockingDetails(gauge).getInvocations().size() == 0);
+ }
+
+ Assert.assertEquals("Store restore count should be 2 because there are 0 stores", 0, this.storeRestoreCallCount);
+ Assert.assertEquals(0,
+ this.systemConsumerCreationCount);
+ Assert.assertEquals(0, this.systemConsumerStopCount);
+ Assert.assertEquals(0, this.systemConsumerStartCount);
+ }
+
+ @Test
+ public void testCheckpointBasedRestoreFactoryCreation() {
+ Set<String> storeNames = ImmutableSet.of("storeName0", "storeName1", "storeName2");
+
+ StorageConfig mockConfig = mock(StorageConfig.class);
+ when(mockConfig.getStoreRestoreFactories("storeName0"))
+ .thenReturn(ImmutableList.of("factory0", "factory1", "factory2"));
+ when(mockConfig.getStoreRestoreFactories("storeName1"))
+ .thenReturn(ImmutableList.of("factory2", "factory1"));
+ when(mockConfig.getStoreRestoreFactories("storeName2"))
+ .thenReturn(Collections.emptyList());
+
+ when(mockConfig.getChangelogStream("storeName0"))
+ .thenReturn(Optional.empty());
+ when(mockConfig.getChangelogStream("storeName1"))
+ .thenReturn(Optional.of("changelog"));
+ when(mockConfig.getChangelogStream("storeName2"))
+ .thenReturn(Optional.of("changelog"));
+
+ CheckpointV1 checkpointV1 = mock(CheckpointV1.class);
+ when(checkpointV1.getVersion()).thenReturn((short) 1);
+ Map<String, Set<String>> factoriesToStores = this.containerStorageManager
+ .getBackendFactoryStoreNames(checkpointV1, storeNames, mockConfig);
+
+ Assert.assertEquals(1, factoriesToStores.size());
+ Assert.assertEquals(ImmutableSet.of("storeName1", "storeName2"),
+ factoriesToStores.get(StorageConfig.KAFKA_STATE_BACKEND_FACTORY));
+
+ factoriesToStores = this.containerStorageManager
+ .getBackendFactoryStoreNames(null, storeNames, mockConfig);
+
+ Assert.assertEquals(2, factoriesToStores.size());
+ Assert.assertEquals(ImmutableSet.of("storeName0"),
+ factoriesToStores.get("factory0"));
+ Assert.assertEquals(ImmutableSet.of("storeName1"),
+ factoriesToStores.get("factory2"));
+ }
+
+ @Test
+ public void testCheckpointV2BasedRestoreFactoryCreation() {
+ Set<String> storeNames = ImmutableSet.of("storeName0", "storeName1", "storeName2");
+
+ StorageConfig mockConfig = mock(StorageConfig.class);
+ when(mockConfig.getStoreRestoreFactories("storeName0"))
+ .thenReturn(ImmutableList.of("factory0", "factory1", "factory2"));
+ when(mockConfig.getStoreRestoreFactories("storeName1"))
+ .thenReturn(ImmutableList.of("factory2", "factory1"));
+ when(mockConfig.getStoreRestoreFactories("storeName2"))
+ .thenReturn(Collections.emptyList());
+
+ when(mockConfig.getChangelogStream("storeName0"))
+ .thenReturn(Optional.empty());
+ when(mockConfig.getChangelogStream("storeName1"))
+ .thenReturn(Optional.of("changelog"));
+ when(mockConfig.getChangelogStream("storeName2"))
+ .thenReturn(Optional.of("changelog"));
+
+ CheckpointV2 checkpointV2 = mock(CheckpointV2.class);
+ when(checkpointV2.getVersion()).thenReturn((short) 2);
+ when(checkpointV2.getStateCheckpointMarkers())
+ .thenReturn(ImmutableMap.of(
+ "factory0", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""),
+ "factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""),
+ "factory2", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", "")));
+
+ Map<String, Set<String>> factoriesToStores = this.containerStorageManager
+ .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig);
+ Assert.assertEquals(2, factoriesToStores.size());
+ Assert.assertEquals(ImmutableSet.of("storeName0"),
+ factoriesToStores.get("factory0"));
+ Assert.assertEquals(ImmutableSet.of("storeName1"),
+ factoriesToStores.get("factory2"));
+
+ when(checkpointV2.getStateCheckpointMarkers())
+ .thenReturn(ImmutableMap.of(
+ "factory2", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", "")));
+ factoriesToStores = this.containerStorageManager
+ .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig);
+ Assert.assertEquals(1, factoriesToStores.size());
+ Assert.assertEquals(ImmutableSet.of("storeName1", "storeName0"),
+ factoriesToStores.get("factory2"));
+
+ when(checkpointV2.getStateCheckpointMarkers())
+ .thenReturn(ImmutableMap.of(
+ "factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""),
+ "factory2", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", "")));
+ factoriesToStores = this.containerStorageManager
+ .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig);
+ Assert.assertEquals(2, factoriesToStores.size());
+ Assert.assertEquals(ImmutableSet.of("storeName0"),
+ factoriesToStores.get("factory1"));
+ Assert.assertEquals(ImmutableSet.of("storeName1"),
+ factoriesToStores.get("factory2"));
+
+ when(checkpointV2.getStateCheckpointMarkers())
+ .thenReturn(ImmutableMap.of(
+ "factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""),
+ "factory2", ImmutableMap.of("storeName0", "", "storeName2", "")));
+ factoriesToStores = this.containerStorageManager
+ .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig);
+ Assert.assertEquals(1, factoriesToStores.size());
+ Assert.assertEquals(ImmutableSet.of("storeName0", "storeName1"),
+ factoriesToStores.get("factory1"));
+
+ when(checkpointV2.getStateCheckpointMarkers())
+ .thenReturn(ImmutableMap.of(
+ "factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", "")));
+ factoriesToStores = this.containerStorageManager
+ .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig);
+ Assert.assertEquals(1, factoriesToStores.size());
+ Assert.assertEquals(ImmutableSet.of("storeName0", "storeName1"),
+ factoriesToStores.get("factory1"));
+
+ when(checkpointV2.getStateCheckpointMarkers())
+ .thenReturn(Collections.emptyMap());
+ factoriesToStores = this.containerStorageManager
+ .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig);
+ Assert.assertEquals(2, factoriesToStores.size());
+ Assert.assertEquals(ImmutableSet.of("storeName0"),
+ factoriesToStores.get("factory0"));
+ Assert.assertEquals(ImmutableSet.of("storeName1"),
+ factoriesToStores.get("factory2"));
+
+ when(checkpointV2.getStateCheckpointMarkers())
+ .thenReturn(ImmutableMap.of(
+ "factory0", ImmutableMap.of("storeName1", "", "storeName2", ""),
+ "factory1", ImmutableMap.of("storeName1", "", "storeName2", ""),
+ "factory2", ImmutableMap.of("storeName0", "", "storeName2", "")));
+ factoriesToStores = this.containerStorageManager
+ .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig);
+ Assert.assertEquals(2, factoriesToStores.size());
+ Assert.assertEquals(ImmutableSet.of("storeName1"),
+ factoriesToStores.get("factory1"));
+ Assert.assertEquals(ImmutableSet.of("storeName0"),
+ factoriesToStores.get("factory2"));
+ }
}
diff --git a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
index 0f0b071..01c22f5 100644
--- a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
+++ b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
@@ -19,6 +19,8 @@
package org.apache.samza.checkpoint.kafka;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
import org.apache.samza.container.TaskName;
/**
@@ -28,6 +30,10 @@
public static final String CHECKPOINT_V1_KEY_TYPE = "checkpoint";
public static final String CHECKPOINT_V2_KEY_TYPE = "checkpoint-v2";
+ public static final Map<String, Short> CHECKPOINT_KEY_VERSIONS = ImmutableMap.of(
+ CHECKPOINT_V1_KEY_TYPE, (short) 1,
+ CHECKPOINT_V2_KEY_TYPE, (short) 2
+ );
/**
* The SystemStreamPartitionGrouperFactory configured for this job run. Since, checkpoints of different
* groupers are not compatible, we persist and validate them across job runs.
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 8fd21fe..5c061ae 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -152,9 +152,8 @@
}
if (topicPartitionToSSP.size() == 0) {
- String msg = String.format("Cannot start KafkaConsumerProxy without any registered TopicPartitions for %s", systemName);
- LOG.error(msg);
- throw new SamzaException(msg);
+ String msg = String.format("Started KafkaConsumerProxy without any registered TopicPartitions for %s", systemName);
+ LOG.warn(msg);
}
}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index a83a34b..e719439 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -34,6 +34,7 @@
import org.apache.samza.util.Logging
import org.apache.samza.{Partition, SamzaException}
+import java.{lang, util}
import scala.collection.mutable
/**
@@ -80,7 +81,7 @@
// for active containers, this will be set to true, while false for standby containers.
val stopConsumerAfterFirstRead: Boolean = new TaskConfig(config).getCheckpointManagerConsumerStopAfterFirstRead
- val checkpointReadVersion: Short = new TaskConfig(config).getCheckpointReadVersion
+ val checkpointReadVersions: util.List[lang.Short] = new TaskConfig(config).getCheckpointReadVersions
/**
* Create checkpoint stream prior to start.
@@ -287,16 +288,17 @@
val msgBytes = checkpointEnvelope.getMessage.asInstanceOf[Array[Byte]]
try {
// if checkpoint key version does not match configured checkpoint version to read, skip the message.
- if (checkpointReadVersion == CheckpointV1.CHECKPOINT_VERSION &&
- KafkaCheckpointLogKey.CHECKPOINT_V1_KEY_TYPE.equals(checkpointKey.getType)) {
- val msgBytes = checkpointEnvelope.getMessage.asInstanceOf[Array[Byte]]
- val checkpoint = checkpointV1MsgSerde.fromBytes(msgBytes)
- checkpoints.put(checkpointKey.getTaskName, checkpoint)
- } else if (checkpointReadVersion == CheckpointV2.CHECKPOINT_VERSION &&
- KafkaCheckpointLogKey.CHECKPOINT_V2_KEY_TYPE.equals(checkpointKey.getType)) {
- val checkpoint = checkpointV2MsgSerde.fromBytes(msgBytes)
- checkpoints.put(checkpointKey.getTaskName, checkpoint)
- } // else ignore and skip the message
+ if (checkpointReadVersions.contains(
+ KafkaCheckpointLogKey.CHECKPOINT_KEY_VERSIONS.get(checkpointKey.getType))) {
+ if (!checkpoints.contains(checkpointKey.getTaskName) ||
+ shouldOverrideCheckpoint(checkpoints.get(checkpointKey.getTaskName), checkpointKey)) {
+ checkpoints.put(checkpointKey.getTaskName, deserializeCheckpoint(checkpointKey, msgBytes))
+ } // else ignore the de-prioritized checkpoint
+ } else {
+ // Ignore and skip the unknown checkpoint key type. We do not want to throw any exceptions for this case
+ // for forwards compatibility with new checkpoints versions in the checkpoint topic
+ warn(s"Ignoring unknown checkpoint key type for checkpoint key: $checkpointKey")
+ }
} catch {
case e: Exception =>
if (validateCheckpoint) {
@@ -372,4 +374,29 @@
case _ => throw new SamzaException("Unknown checkpoint version: " + checkpoint.getVersion)
}
}
+
+ private def shouldOverrideCheckpoint(currentCheckpoint: Option[Checkpoint],
+ newCheckpointKey: KafkaCheckpointLogKey): Boolean = {
+ val newCheckpointVersion = KafkaCheckpointLogKey.CHECKPOINT_KEY_VERSIONS.get(newCheckpointKey.getType)
+ if (newCheckpointVersion == null) {
+ // Unknown checkpoint version
+ throw new IllegalArgumentException("Unknown checkpoint key type: " + newCheckpointKey.getType +
+ " for checkpoint key: " + newCheckpointKey)
+ }
+ // Override checkpoint if the current checkpoint does not exist or if new checkpoint has a higher restore
+ // priority than the currently written checkpoint
+ currentCheckpoint.isEmpty ||
+ checkpointReadVersions.indexOf(newCheckpointVersion) <=
+ checkpointReadVersions.indexOf(currentCheckpoint.get.getVersion)
+ }
+
+ private def deserializeCheckpoint(checkpointKey: KafkaCheckpointLogKey, checkpointMsgBytes: Array[Byte]): Checkpoint = {
+ if (KafkaCheckpointLogKey.CHECKPOINT_V1_KEY_TYPE.equals(checkpointKey.getType)) {
+ checkpointV1MsgSerde.fromBytes(checkpointMsgBytes)
+ } else if (KafkaCheckpointLogKey.CHECKPOINT_V2_KEY_TYPE.equals(checkpointKey.getType)) {
+ checkpointV2MsgSerde.fromBytes(checkpointMsgBytes)
+ } else {
+ throw new IllegalArgumentException("Unknown checkpoint key type: " + checkpointKey.getType)
+ }
+ }
}
diff --git a/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala
index 167bc78..4c8d8e4 100644
--- a/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -767,8 +767,12 @@
"stores.loggedStore1.clean.on.container.start" -> cleanStoreDirsOnStart.toString,
"stores.store1.key.serde" -> classOf[StringSerdeFactory].getCanonicalName,
"stores.store1.msg.serde" -> classOf[StringSerdeFactory].getCanonicalName,
+ "stores.store1.factory" -> mockStorageEngineFactory.getClass.getName,
+ "stores.store1.changelog" -> "system.stream",
+ "stores.loggedStore1.factory" -> mockStorageEngineFactory.getClass.getName,
"stores.loggedStore1.key.serde" -> classOf[StringSerdeFactory].getCanonicalName,
"stores.loggedStore1.msg.serde" -> classOf[StringSerdeFactory].getCanonicalName,
+ "stores.loggedStore1.changelog" -> "system.stream",
TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED -> "false").asJava)
var mockSerdes: Map[String, Serde[AnyRef]] = HashMap[String, Serde[AnyRef]]((classOf[StringSerdeFactory].getCanonicalName, Mockito.mock(classOf[Serde[AnyRef]])))
@@ -800,7 +804,8 @@
Mockito.mock(classOf[SamzaContainerMetrics]),
mockJobContext,
mockContainerContext,
- new mockKafkaChangelogBackendManager(changeLogSystemStreams),
+ ImmutableMap.of(StorageConfig.KAFKA_STATE_BACKEND_FACTORY,
+ new mockKafkaChangelogBackendManager(changeLogSystemStreams)),
new HashMap[TaskName, TaskInstanceCollector].asJava,
loggedStoreBaseDir,
TaskStorageManagerBuilder.defaultStoreBaseDir,
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index f940292..866904b 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -194,7 +194,7 @@
.put(s"systems.$checkpointSystemName.producer.bootstrap.servers", brokerList)
.put(s"systems.$checkpointSystemName.consumer.zookeeper.connect", zkConnect)
.put("task.checkpoint.system", checkpointSystemName)
- .put(TaskConfig.CHECKPOINT_READ_VERSION, "2")
+ .put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2")
.build())
// Skips reading any v1 checkpoints
@@ -211,6 +211,59 @@
}
@Test
+ def testCheckpointV1AndV2WriteAndReadV1V2PrecedenceList(): Unit = {
+ val checkpointTopic = "checkpoint-topic-1"
+ val kcm1 = createKafkaCheckpointManager(checkpointTopic)
+ kcm1.register(taskName)
+ kcm1.createResources
+ kcm1.start
+ kcm1.stop
+
+ // check that start actually creates the topic with log compaction enabled
+ val topicConfig = adminZkClient.getAllTopicConfigs().getOrElse(checkpointTopic, new Properties())
+
+ assertEquals(topicConfig, new KafkaConfig(config).getCheckpointTopicProperties())
+ assertEquals("compact", topicConfig.get("cleanup.policy"))
+ assertEquals("26214400", topicConfig.get("segment.bytes"))
+
+ // read before topic exists should result in a null checkpoint
+ val readCp = readCheckpoint(checkpointTopic, taskName)
+ assertNull(readCp)
+
+ val checkpointV1 = new CheckpointV1(ImmutableMap.of(ssp, "offset-1"))
+ val checkpointV2 = new CheckpointV2(CheckpointId.create(), ImmutableMap.of(ssp, "offset-2"),
+ ImmutableMap.of("factory1", ImmutableMap.of("store1", "changelogOffset")))
+
+ val overrideConfig = new MapConfig(new ImmutableMap.Builder[String, String]()
+ .put(JobConfig.JOB_NAME, "some-job-name")
+ .put(JobConfig.JOB_ID, "i001")
+ .put(s"systems.$checkpointSystemName.samza.factory", classOf[KafkaSystemFactory].getCanonicalName)
+ .put(s"systems.$checkpointSystemName.producer.bootstrap.servers", brokerList)
+ .put(s"systems.$checkpointSystemName.consumer.zookeeper.connect", zkConnect)
+ .put("task.checkpoint.system", checkpointSystemName)
+ .put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2,1")
+ .build())
+
+ // Still reads any v1 checkpoints due to precedence list
+ writeCheckpoint(checkpointTopic, taskName, checkpointV1)
+ assertEquals(checkpointV1, readCheckpoint(checkpointTopic, taskName, overrideConfig))
+
+ // writing a v2 checkpoint would allow reading it back
+ writeCheckpoint(checkpointTopic, taskName, checkpointV2)
+ assertEquals(checkpointV2, readCheckpoint(checkpointTopic, taskName, overrideConfig))
+
+ // writing v1 checkpoint is still skipped
+ writeCheckpoint(checkpointTopic, taskName, checkpointV1)
+ assertEquals(checkpointV2, readCheckpoint(checkpointTopic, taskName, overrideConfig))
+
+ val newCheckpointV2 = new CheckpointV2(CheckpointId.create(), ImmutableMap.of(ssp, "offset-3"),
+ ImmutableMap.of("factory1", ImmutableMap.of("store1", "changelogOffset")))
+ // writing v2 returns a new checkpoint v2
+ writeCheckpoint(checkpointTopic, taskName, newCheckpointV2)
+ assertEquals(newCheckpointV2, readCheckpoint(checkpointTopic, taskName, overrideConfig))
+ }
+
+ @Test
def testCheckpointValidationSkipped(): Unit = {
val checkpointTopic = "checkpoint-topic-1"
val kcm1 = createKafkaCheckpointManager(checkpointTopic, serde = new MockCheckpointSerde(),
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
index d5059b3..34d407f 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
@@ -106,8 +106,8 @@
storePropertiesBuilder.setPersistedToDisk(true);
}
// The store is durable iff it is backed by the task backup manager
- List<String> storeBackupManager = storageConfig.getStoreBackupFactory(storeName);
- storePropertiesBuilder.setIsDurable(!storeBackupManager.isEmpty());
+ List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
+ storePropertiesBuilder.setIsDurable(!storeBackupManagers.isEmpty());
int batchSize = storageConfigSubset.getInt(WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
int cacheSize = storageConfigSubset.getInt(OBJECT_CACHE_SIZE, Math.max(batchSize, DEFAULT_OBJECT_CACHE_SIZE));
diff --git a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
index d0f1c2f..66eb79c 100644
--- a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
@@ -84,7 +84,7 @@
Arrays.asList(null, null, "98", "99", "4", "5", "5");
List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
Map<String, String> configOverrides = new HashMap<>(CONFIGS);
- configOverrides.put(TaskConfig.CHECKPOINT_READ_VERSION, "2");
+ configOverrides.put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2");
secondRun(CHANGELOG_TOPIC,
expectedChangelogMessagesAfterSecondRun, expectedInitialStoreContentsOnSecondRun, configOverrides);
}