Side Inputs + Blob Store Backups - Part 2 - Allow bulk restore from blob store for side input stores (#1657)
Allow bulk restore from blob store for side input stores. If a blob store state backend is available, side input store restores can be sped up significantly by bulk-restoring initial state from the blob store, and only using the Kafka side input topic to catch up on the delta since last checkpoint.
Fixed a bug found during additional testing: in-memory stores configured with a changelog are not being restored correctly. The store instance created initially is retained but not restored, instead a new store instance is created and restored from changelog, and then thrown away.
diff --git a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
index 88e4a5b..b1c0b78 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
@@ -339,7 +339,7 @@
// Put non persisted stores
nonPersistedStores.forEach(storageEngines::put);
// Create persisted stores
- storeNames.forEach(storeName -> {
+ storeNames.stream().filter(s -> !nonPersistedStores.containsKey(s)).forEach(storeName -> {
boolean isLogged = this.storeChangelogs.containsKey(storeName);
File storeBaseDir = isLogged ? this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory;
File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskModel.getTaskName(),
diff --git a/samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java b/samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
index 981e847..1e5a298 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
@@ -30,6 +30,7 @@
import org.apache.samza.task.ReadableCoordinator;
import org.apache.samza.task.TaskCallback;
import org.apache.samza.task.TaskCallbackFactory;
+import org.apache.samza.task.TaskCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,16 +45,20 @@
private final Set<SystemStreamPartition> taskSSPs;
private final TaskSideInputHandler taskSideInputHandler;
private final TaskInstanceMetrics metrics;
+ private final long commitMs;
+ private volatile boolean loggedManualCommitWarning = false;
public SideInputTask(
TaskName taskName,
Set<SystemStreamPartition> taskSSPs,
TaskSideInputHandler taskSideInputHandler,
- TaskInstanceMetrics metrics) {
+ TaskInstanceMetrics metrics,
+ long commitMs) {
this.taskName = taskName;
this.taskSSPs = taskSSPs;
this.taskSideInputHandler = taskSideInputHandler;
this.metrics = metrics;
+ this.commitMs = commitMs;
}
@Override
@@ -69,6 +74,14 @@
try {
this.taskSideInputHandler.process(envelope);
this.metrics.messagesActuallyProcessed().inc();
+ if (commitMs <= 0 && !loggedManualCommitWarning) {
+ loggedManualCommitWarning = true; // log warning once
+ LOG.warn("Manual commit is enabled (task.commit.ms < 0). " +
+ "Side input task will request a commit after processing every message. " +
+ "This will incur a significant performance penalty and is not recommended.");
+ // commit every message. useful for integration tests. not desirable for regular use.
+ coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+ }
callback.complete();
} catch (Exception e) {
callback.failure(e);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index badeb28..15028b9 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -35,7 +35,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.checkpoint.CheckpointV2;
@@ -70,6 +72,19 @@
private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new CheckpointV2Serde();
/**
+ * Unlike checkpoint or offset files, side input offset file can have multiple writers / readers,
+ * since they are written during SideInputTask commit and copied to store checkpoint directory
+ * by TaskStorageCommitManager during regular TaskInstance commit (these commits are on separate run loops).
+ *
+ * We use a (process-wide) semaphore to ensure that such write and copy operations are thread-safe.
+ *
+ * To avoid deadlocks between the two run loops, the semaphore should be acquired and released within
+ * the same method call, and any such methods should not call each other while holding a permit.
+ * We use a Semaphore instead of a ReentrantLock to make such cases easier to detect.
+ */
+ private static final Semaphore SIDE_INPUT_OFFSET_FILE_SEMAPHORE = new Semaphore(1);
+
+ /**
* Fetch the starting offset for the input {@link SystemStreamPartition}
*
* Note: The method doesn't respect {@link org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT} and
@@ -125,7 +140,6 @@
// if neither exists, we use 0L (the default return value of lastModified() when file does not exist
File offsetFileRefNew = new File(storeDir, OFFSET_FILE_NAME_NEW);
File offsetFileRefLegacy = new File(storeDir, OFFSET_FILE_NAME_LEGACY);
- File sideInputOffsetFileRefLegacy = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
File checkpointV2File = new File(storeDir, CHECKPOINT_FILE_NAME);
if (checkpointV2File.exists()) {
@@ -134,8 +148,18 @@
offsetFileLastModifiedTime = offsetFileRefNew.lastModified();
} else if (!isSideInput && offsetFileRefLegacy.exists()) {
offsetFileLastModifiedTime = offsetFileRefLegacy.lastModified();
- } else if (isSideInput && sideInputOffsetFileRefLegacy.exists()) {
- offsetFileLastModifiedTime = sideInputOffsetFileRefLegacy.lastModified();
+ } else if (isSideInput) {
+ try {
+ SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly();
+ File sideInputOffsetFileRefLegacy = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+ if (sideInputOffsetFileRefLegacy.exists()) {
+ offsetFileLastModifiedTime = sideInputOffsetFileRefLegacy.lastModified();
+ } else {
+ offsetFileLastModifiedTime = 0L;
+ }
+ } finally {
+ SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release();
+ }
} else {
offsetFileLastModifiedTime = 0L;
}
@@ -223,9 +247,14 @@
// Now we write the old format offset file, which are different for store-offset and side-inputs
if (isSideInput) {
- offsetFile = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
- fileContents = SSP_OFFSET_OBJECT_WRITER.writeValueAsString(offsets);
- fileUtil.writeWithChecksum(offsetFile, fileContents);
+ SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly();
+ try {
+ offsetFile = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+ fileContents = SSP_OFFSET_OBJECT_WRITER.writeValueAsString(offsets);
+ fileUtil.writeWithChecksum(offsetFile, fileContents);
+ } finally {
+ SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release();
+ }
} else {
offsetFile = new File(storeDir, OFFSET_FILE_NAME_LEGACY);
fileUtil.writeWithChecksum(offsetFile, offsets.entrySet().iterator().next().getValue());
@@ -245,6 +274,32 @@
fileUtil.writeWithChecksum(offsetFile, new String(fileContents));
}
+ public void copySideInputOffsetFileToCheckpointDir(File storeBaseDir,
+ TaskName taskName, String storeName, TaskMode taskMode, CheckpointId checkpointId) {
+ File storeDir = getTaskStoreDir(storeBaseDir, storeName, taskName, taskMode);
+ File checkpointDir = new File(getStoreCheckpointDir(storeDir, checkpointId));
+
+ SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly();
+ try {
+ File storeSideInputOffsetsFile = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+ File checkpointDirSideInputOffsetsFile = new File(checkpointDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+ if (storeSideInputOffsetsFile.exists()) {
+ FileUtils.copyFile(storeSideInputOffsetsFile, checkpointDirSideInputOffsetsFile);
+ } else {
+ LOG.info("Did not find the file to copy: {} for taskName: {} taskMode: {} storeName: {} checkpointId: {}",
+ SIDE_INPUT_OFFSET_FILE_NAME_LEGACY, taskName, taskMode, storeName, checkpointId);
+ }
+ } catch (IOException e) {
+ String msg = String.format(
+ "Error copying %s file to checkpoint dir for taskName: %s taskMode: %s storeName: %s checkpointId: %s",
+ SIDE_INPUT_OFFSET_FILE_NAME_LEGACY, taskName, taskMode, storeName, checkpointId);
+ LOG.error(msg, e);
+ throw new SamzaException(msg, e);
+ } finally {
+ SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release();
+ }
+ }
+
/**
* Delete the offset file for this store, if one exists.
* @param storeDir the directory of the store
@@ -286,7 +341,6 @@
File offsetFileRefNew = new File(storagePartitionDir, OFFSET_FILE_NAME_NEW);
File offsetFileRefLegacy = new File(storagePartitionDir, OFFSET_FILE_NAME_LEGACY);
- File sideInputOffsetFileRefLegacy = new File(storagePartitionDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
// First we check if the new offset file exists, if it does we read offsets from it regardless of old or new format,
// if it doesn't exist, we check if the store is non-sideInput and legacy-offset file exists, if so we read offsets
@@ -296,8 +350,18 @@
return readOffsetFile(storagePartitionDir, offsetFileRefNew.getName(), storeSSPs);
} else if (!isSideInput && offsetFileRefLegacy.exists()) {
return readOffsetFile(storagePartitionDir, offsetFileRefLegacy.getName(), storeSSPs);
- } else if (isSideInput && sideInputOffsetFileRefLegacy.exists()) {
- return readOffsetFile(storagePartitionDir, sideInputOffsetFileRefLegacy.getName(), storeSSPs);
+ } else if (isSideInput) {
+ Map<SystemStreamPartition, String> result = new HashMap<>();
+ SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly();
+ try {
+ File sideInputOffsetFileRefLegacy = new File(storagePartitionDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+ if (sideInputOffsetFileRefLegacy.exists()) {
+ result = readOffsetFile(storagePartitionDir, sideInputOffsetFileRefLegacy.getName(), storeSSPs);
+ }
+ } finally {
+ SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release();
+ }
+ return result;
} else {
return new HashMap<>();
}
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
index 0c01fa1..dca0fa2 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
@@ -23,12 +23,14 @@
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.commons.io.FileUtils;
@@ -42,6 +44,7 @@
import org.apache.samza.checkpoint.CheckpointV2;
import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
@@ -63,6 +66,7 @@
private final ContainerStorageManager containerStorageManager;
private final Map<String, TaskBackupManager> stateBackendToBackupManager;
private final Partition taskChangelogPartition;
+ private final StorageConfig config;
private final StorageManagerUtil storageManagerUtil;
private final ExecutorService backupExecutor;
private final File durableStoreBaseDir;
@@ -81,6 +85,7 @@
this.stateBackendToBackupManager = stateBackendToBackupManager;
this.taskChangelogPartition = changelogPartition;
this.checkpointManager = checkpointManager;
+ this.config = new StorageConfig(config);
this.backupExecutor = backupExecutor;
this.durableStoreBaseDir = durableStoreBaseDir;
this.storeChangelogs = storeChangelogs;
@@ -122,7 +127,15 @@
storageEngines.forEach((storeName, storageEngine) -> {
if (storageEngine.getStoreProperties().isPersistedToDisk() &&
storageEngine.getStoreProperties().isDurableStore()) {
- storageEngine.checkpoint(checkpointId);
+ Optional<Path> checkpointDir = storageEngine.checkpoint(checkpointId);
+
+ // if checkpoint is for a side input store
+ if (checkpointDir.isPresent() && !config.getSideInputs(storeName).isEmpty()) {
+ storageManagerUtil.copySideInputOffsetFileToCheckpointDir(
+ durableStoreBaseDir, taskName, storeName, TaskMode.Active, checkpointId);
+ LOG.debug("Copied side input offsets file to checkpoint dir for taskName: {} storeName: {} checkpointId: {}",
+ taskName, storeName, checkpointId);
+ }
}
});
long checkpointNs = System.nanoTime() - checkpointStartNs;
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
index 104bc2c..2122a16 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
@@ -178,9 +178,10 @@
public void close() {
TaskName taskName = taskModel.getTaskName();
storeEngines.forEach((storeName, storeEngine) -> {
- if (storeEngine.getStoreProperties().isPersistedToDisk())
+ if (storeEngine.getStoreProperties().isPersistedToDisk()) {
storeEngine.stop();
- LOG.info("Stopped persistent store: {} in task: {}", storeName, taskName);
+ LOG.info("Stopped persistent store: {} in task: {}", storeName, taskName);
+ }
});
}
@@ -192,7 +193,7 @@
// Put non persisted stores
nonPersistedStores.forEach(storageEngines::put);
// Create persisted stores
- storeNames.forEach(storeName -> {
+ storeNames.stream().filter(s -> !nonPersistedStores.containsKey(s)).forEach(storeName -> {
boolean isLogged = this.storeChangelogs.containsKey(storeName);
File storeBaseDir = isLogged ? this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory;
File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskModel.getTaskName(),
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 64e52c5..c9ee065 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -51,6 +51,7 @@
import org.apache.samza.metrics.Gauge;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeManager;
+import org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemConsumer;
@@ -270,12 +271,30 @@
LOG.info("Obtained checkpoint: {} for state restore for taskName: {}", taskCheckpoint, taskName);
}
taskCheckpoints.put(taskName, taskCheckpoint);
- Map<String, Set<String>> backendFactoryStoreNames =
+
+ Map<String, Set<String>> backendFactoryToStoreNames =
ContainerStorageManagerUtil.getBackendFactoryStoreNames(
nonSideInputStoreNames, taskCheckpoint, new StorageConfig(config));
+
+ Map<String, Set<String>> backendFactoryToSideInputStoreNames =
+ ContainerStorageManagerUtil.getBackendFactoryStoreNames(
+ sideInputStoreNames, taskCheckpoint, new StorageConfig(config));
+
+ // include side input stores for (initial bulk) restore if backed up using blob store state backend
+ String blobStoreStateBackendFactory = BlobStoreStateBackendFactory.class.getName();
+ if (backendFactoryToSideInputStoreNames.containsKey(blobStoreStateBackendFactory)) {
+ Set<String> sideInputStoreNames = backendFactoryToSideInputStoreNames.get(blobStoreStateBackendFactory);
+
+ if (backendFactoryToStoreNames.containsKey(blobStoreStateBackendFactory)) {
+ backendFactoryToStoreNames.get(blobStoreStateBackendFactory).addAll(sideInputStoreNames);
+ } else {
+ backendFactoryToStoreNames.put(blobStoreStateBackendFactory, sideInputStoreNames);
+ }
+ }
+
Map<String, TaskRestoreManager> taskStoreRestoreManagers =
ContainerStorageManagerUtil.createTaskRestoreManagers(
- taskName, backendFactoryStoreNames, restoreStateBackendFactories,
+ taskName, backendFactoryToStoreNames, restoreStateBackendFactories,
storageEngineFactories, storeConsumers,
inMemoryStores, systemAdmins, restoreExecutor,
taskModel, jobContext, containerContext,
@@ -360,9 +379,13 @@
// Stop each store consumer once
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop);
- // Now create persistent non side input stores in read-write mode, leave non-persistent stores as-is
+ // Now create persistent non-side-input stores in read-write mode, leave non-persistent and side-input stores as-is
+ Set<String> inMemoryStoreNames =
+ ContainerStorageManagerUtil.getInMemoryStoreNames(this.storageEngineFactories, this.config);
+ Set<String> storesToCreate = nonSideInputStoreNames.stream()
+ .filter(s -> !inMemoryStoreNames.contains(s)).collect(Collectors.toSet());
this.taskStores = ContainerStorageManagerUtil.createTaskStores(
- nonSideInputStoreNames, this.storageEngineFactories, this.sideInputStoreNames,
+ storesToCreate, this.storageEngineFactories, this.sideInputStoreNames,
this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths,
this.containerModel, this.jobContext, this.containerContext,
this.serdes, this.taskInstanceMetrics, this.taskInstanceCollectors, this.storageManagerUtil,
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
index 5b3d90a..d9c9cd4 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
@@ -171,14 +171,7 @@
StorageManagerUtil storageManagerUtil,
File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory,
Config config) {
- StorageConfig storageConfig = new StorageConfig(config);
- Set<String> inMemoryStoreNames = storageEngineFactories.keySet().stream()
- .filter(storeName -> {
- Optional<String> storeFactory = storageConfig.getStorageFactoryClassName(storeName);
- return storeFactory.isPresent() && storeFactory.get()
- .equals(StorageConfig.INMEMORY_KV_STORAGE_ENGINE_FACTORY);
- })
- .collect(Collectors.toSet());
+ Set<String> inMemoryStoreNames = getInMemoryStoreNames(storageEngineFactories, config);
return ContainerStorageManagerUtil.createTaskStores(
inMemoryStoreNames, storageEngineFactories, sideInputStoreNames,
activeTaskChangelogSystemStreams, storeDirectoryPaths,
@@ -187,6 +180,19 @@
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
}
+ public static Set<String> getInMemoryStoreNames(
+ Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
+ Config config) {
+ StorageConfig storageConfig = new StorageConfig(config);
+ return storageEngineFactories.keySet().stream()
+ .filter(storeName -> {
+ Optional<String> storeFactory = storageConfig.getStorageFactoryClassName(storeName);
+ return storeFactory.isPresent() && storeFactory.get()
+ .equals(StorageConfig.INMEMORY_KV_STORAGE_ENGINE_FACTORY);
+ })
+ .collect(Collectors.toSet());
+ }
+
/**
* Returns a map from store names to their corresponding changelog system consumers.
* @return map of store name to its changelog system consumer.
@@ -308,7 +314,10 @@
}
/**
- * Return a map of backend factory names to set of stores that should be restored using it
+ * Returns a map of backend factory names to subset of provided storeNames that should be restored using it.
+ * For CheckpointV1, only includes stores that should be restored using a configured changelog.
+ * For CheckpointV2, associates stores with the highest precedence configured restore factory that has a SCM in
+ * the checkpoint, or the highest precedence restore factory configured if there are no SCMs in the checkpoint.
*/
public static Map<String, Set<String>> getBackendFactoryStoreNames(
Set<String> storeNames, Checkpoint checkpoint, StorageConfig storageConfig) {
@@ -327,8 +336,8 @@
Set<String> nonChangelogStores = storeNames.stream()
.filter(storeName -> !changelogStores.contains(storeName))
.collect(Collectors.toSet());
- LOG.info("non-Side input stores: {}, do not have a configured store changelogs for checkpoint V1,"
- + "restore for the store will be skipped",
+ LOG.info("Stores: {}, do not have a configured store changelogs for checkpoint V1,"
+ + "changelog restore for the store will be skipped.",
nonChangelogStores);
}
} else if (checkpoint == null || checkpoint.getVersion() == 2) {
@@ -342,8 +351,8 @@
if (storeFactories.isEmpty()) {
// If the restore factory is not configured for the store and the store does not have a changelog topic
- LOG.info("non-Side input store: {}, does not have a configured restore factories nor store changelogs,"
- + "restore for the store will be skipped",
+ LOG.info("Store: {} does not have a configured restore factory or a changelog topic, "
+ + "restore for the store will be skipped.",
storeName);
} else {
// Search the ordered list for the first matched state backend factory in the checkpoint
@@ -359,8 +368,8 @@
} else { // Restore factories configured but no checkpoints found
// Use first configured restore factory
factoryName = storeFactories.get(0);
- LOG.warn("No matching checkpoints found for configured factories: {}, " +
- "defaulting to using the first configured factory with no checkpoints", storeFactories);
+ LOG.warn("No matching SCMs found for configured restore factories: {} for storeName: {}, " +
+ "defaulting to using the first configured factory with no SCM.", storeFactories, storeName);
}
if (!backendFactoryStoreNames.containsKey(factoryName)) {
backendFactoryStoreNames.put(factoryName, new HashSet<>());
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
index 96390ba..3ccaf1a 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
@@ -222,7 +222,8 @@
sideInputTaskMetrics.put(taskName, sideInputMetrics);
RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs,
- taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName));
+ taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName),
+ new TaskConfig(config).getCommitMs());
sideInputTasks.put(taskName, sideInputTask);
}
});
diff --git a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
index ef8409c..99a9e77 100644
--- a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
@@ -19,12 +19,15 @@
package org.apache.samza.checkpoint;
+import com.google.common.collect.ImmutableSet;
+
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.samza.config.JobConfig;
@@ -135,7 +138,11 @@
// run the application
RunApplicationContext context = runApplication(
- new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, Collections.singletonMap(STORE_NAME, CHANGELOG_TOPIC)), "myApp", configs);
+ new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC,
+ ImmutableSet.of(STORE_NAME), Collections.singletonMap(STORE_NAME, CHANGELOG_TOPIC),
+ Collections.emptySet(), Collections.emptyMap(),
+ Optional.empty(), Optional.empty(), Optional.empty()),
+ "myApp", configs);
// wait for the application to finish
context.getRunner().waitForFinish();
@@ -162,7 +169,11 @@
// run the application
RunApplicationContext context = runApplication(
- new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, Collections.singletonMap(STORE_NAME, changelogTopic)), "myApp", overriddenConfigs);
+ new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC,
+ ImmutableSet.of(STORE_NAME), Collections.singletonMap(STORE_NAME, changelogTopic),
+ Collections.emptySet(), Collections.emptyMap(),
+ Optional.empty(), Optional.empty(), Optional.empty()),
+ "myApp", overriddenConfigs);
// wait for the application to finish
context.getRunner().waitForFinish();
diff --git a/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java b/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
index 99c54b0..9ab8706 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
@@ -19,11 +19,14 @@
package org.apache.samza.storage;
+import com.google.common.collect.ImmutableList;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import org.apache.samza.application.TaskApplication;
import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
@@ -34,6 +37,7 @@
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
@@ -61,15 +65,34 @@
public static final Logger LOG = LoggerFactory.getLogger(MyStatefulApplication.class);
private static Map<String, List<String>> initialStoreContents = new HashMap<>();
+ private static Map<String, List<String>> initialInMemoryStoreContents = new HashMap<>();
+ private static Map<String, List<String>> initialSideInputStoreContents = new HashMap<>();
private static boolean crashedOnce = false;
+
private final String inputSystem;
private final String inputTopic;
- private final Map<String, String> storeToChangelog;
+ private final Set<String> storeNames;
+ private final Map<String, String> storeNamesToChangelog;
+ private final Set<String> inMemoryStoreNames;
+ private final Map<String, String> inMemoryStoreNamesToChangelog;
+ private final Optional<String> sideInputStoreName;
+ private final Optional<String> sideInputTopic;
+ private final Optional<SideInputsProcessor> sideInputProcessor;
- public MyStatefulApplication(String inputSystem, String inputTopic, Map<String, String> storeToChangelog) {
+ public MyStatefulApplication(String inputSystem, String inputTopic,
+ Set<String> storeNames, Map<String, String> storeNamesToChangelog,
+ Set<String> inMemoryStoreNames, Map<String, String> inMemoryStoreNamesToChangelog,
+ Optional<String> sideInputStoreName, Optional<String> sideInputTopic,
+ Optional<SideInputsProcessor> sideInputProcessor) {
this.inputSystem = inputSystem;
this.inputTopic = inputTopic;
- this.storeToChangelog = storeToChangelog;
+ this.storeNames = storeNames;
+ this.storeNamesToChangelog = storeNamesToChangelog;
+ this.inMemoryStoreNames = inMemoryStoreNames;
+ this.inMemoryStoreNamesToChangelog = inMemoryStoreNamesToChangelog;
+ this.sideInputStoreName = sideInputStoreName;
+ this.sideInputTopic = sideInputTopic;
+ this.sideInputProcessor = sideInputProcessor;
}
@Override
@@ -81,18 +104,46 @@
TaskApplicationDescriptor desc = appDescriptor
.withInputStream(isd)
- .withTaskFactory((StreamTaskFactory) () -> new MyTask(storeToChangelog.keySet()));
+ .withTaskFactory((StreamTaskFactory) () -> new MyTask(storeNames, inMemoryStoreNames, sideInputStoreName));
- storeToChangelog.forEach((storeName, changelogTopic) -> {
- RocksDbTableDescriptor<String, String> td = new RocksDbTableDescriptor<>(storeName, serde)
- .withChangelogStream(changelogTopic)
- .withChangelogReplicationFactor(1);
+ inMemoryStoreNames.forEach(storeName -> {
+ InMemoryTableDescriptor<String, String> imtd;
+ if (inMemoryStoreNamesToChangelog.containsKey(storeName)) {
+ imtd = new InMemoryTableDescriptor<>(storeName, serde)
+ .withChangelogStream(inMemoryStoreNamesToChangelog.get(storeName));
+ } else {
+ imtd = new InMemoryTableDescriptor<>(storeName, serde);
+ }
+
+ desc.withTable(imtd);
+ });
+
+ storeNames.forEach(storeName -> {
+ RocksDbTableDescriptor<String, String> td;
+ if (storeNamesToChangelog.containsKey(storeName)) {
+ String changelogTopic = storeNamesToChangelog.get(storeName);
+ td = new RocksDbTableDescriptor<>(storeName, serde)
+ .withChangelogStream(changelogTopic)
+ .withChangelogReplicationFactor(1);
+ } else {
+ td = new RocksDbTableDescriptor<>(storeName, serde);
+ }
desc.withTable(td);
});
+
+ if (sideInputStoreName.isPresent()) {
+ RocksDbTableDescriptor<String, String> sideInputStoreTd =
+ new RocksDbTableDescriptor<>(sideInputStoreName.get(), serde)
+ .withSideInputs(ImmutableList.of(sideInputTopic.get()))
+ .withSideInputsProcessor(sideInputProcessor.get());
+ desc.withTable(sideInputStoreTd);
+ }
}
public static void resetTestState() {
initialStoreContents = new HashMap<>();
+ initialInMemoryStoreContents = new HashMap<>();
+ initialSideInputStoreContents = new HashMap<>();
crashedOnce = false;
}
@@ -100,27 +151,64 @@
return initialStoreContents;
}
+ public static Map<String, List<String>> getInitialInMemoryStoreContents() {
+ return initialInMemoryStoreContents;
+ }
+
+ public static Map<String, List<String>> getInitialSideInputStoreContents() {
+ return initialSideInputStoreContents;
+ }
+
static class MyTask implements StreamTask, InitableTask {
private final Set<KeyValueStore<String, String>> stores = new HashSet<>();
private final Set<String> storeNames;
+ private final Set<String> inMemoryStoreNames;
+ private final Optional<String> sideInputStoreName;
- MyTask(Set<String> storeNames) {
+ MyTask(Set<String> storeNames, Set<String> inMemoryStoreNames, Optional<String> sideInputStoreName) {
this.storeNames = storeNames;
+ this.inMemoryStoreNames = inMemoryStoreNames;
+ this.sideInputStoreName = sideInputStoreName;
}
@Override
public void init(Context context) {
storeNames.forEach(storeName -> {
KeyValueStore<String, String> store = (KeyValueStore<String, String>) context.getTaskContext().getStore(storeName);
- stores.add(store);
+ stores.add(store); // any input messages will be written to all 'stores'
KeyValueIterator<String, String> storeEntries = store.all();
- List<String> storeInitialChangelog = new ArrayList<>();
+ List<String> storeInitialContents = new ArrayList<>();
while (storeEntries.hasNext()) {
- storeInitialChangelog.add(storeEntries.next().getValue());
+ storeInitialContents.add(storeEntries.next().getValue());
}
- initialStoreContents.put(storeName, storeInitialChangelog);
+ initialStoreContents.put(storeName, storeInitialContents);
storeEntries.close();
});
+
+ inMemoryStoreNames.forEach(storeName -> {
+ KeyValueStore<String, String> store =
+ (KeyValueStore<String, String>) context.getTaskContext().getStore(storeName);
+ stores.add(store); // any input messages will be written to all 'stores'.
+ KeyValueIterator<String, String> storeEntries = store.all();
+ List<String> storeInitialContents = new ArrayList<>();
+ while (storeEntries.hasNext()) {
+ storeInitialContents.add(storeEntries.next().getValue());
+ }
+ initialInMemoryStoreContents.put(storeName, storeInitialContents);
+ storeEntries.close();
+ });
+
+ if (sideInputStoreName.isPresent()) {
+ KeyValueStore<String, String> sideInputStore =
+ (KeyValueStore<String, String>) context.getTaskContext().getStore(sideInputStoreName.get());
+ KeyValueIterator<String, String> sideInputStoreEntries = sideInputStore.all();
+ List<String> sideInputStoreInitialContents = new ArrayList<>();
+ while (sideInputStoreEntries.hasNext()) {
+ sideInputStoreInitialContents.add(sideInputStoreEntries.next().getValue());
+ }
+ initialSideInputStoreContents.put(sideInputStoreName.get(), sideInputStoreInitialContents);
+ sideInputStoreEntries.close();
+ }
}
@Override
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/BaseStateBackendIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/BaseStateBackendIntegrationTest.java
new file mode 100644
index 0000000..15b4d94
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/BaseStateBackendIntegrationTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.samza.storage.MyStatefulApplication;
+import org.apache.samza.storage.SideInputsProcessor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness;
+import org.apache.samza.util.FileUtil;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BaseStateBackendIntegrationTest extends StreamApplicationIntegrationTestHarness {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseStateBackendIntegrationTest.class);
+
+ public void initialRun(
+ String inputSystem,
+ String inputTopicName,
+ String sideInputTopicName,
+ List<String> inputMessages,
+ List<String> sideInputMessages,
+ Set<String> regularStoreNames,
+ Map<String, String> regularStoreChangelogTopics,
+ Set<String> inMemoryStoreNames,
+ Map<String, String> inMemoryStoreChangelogTopics,
+ String sideInputStoreName,
+ List<String> expectedChangelogMessagesAfterInitialRun,
+ Map<String, String> overriddenConfigs) {
+ // create input topic and produce the first batch of input messages
+ createTopic(inputTopicName, 1);
+ inputMessages.forEach(m -> produceMessage(inputTopicName, 0, m, m));
+
+ // create side input topic and produce the first batch of side input messages
+ createTopic(sideInputTopicName, 1);
+ sideInputMessages.forEach(m -> produceMessage(sideInputTopicName, 0, m, m));
+
+
+ // verify that the input messages were produced successfully
+ if (inputMessages.size() > 0) {
+ List<ConsumerRecord<String, String>> inputRecords =
+ consumeMessages(inputTopicName, inputMessages.size());
+ List<String> readInputMessages = inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
+ Assert.assertEquals(inputMessages, readInputMessages);
+ }
+
+ // verify that the side input messages were produced successfully
+ if (sideInputMessages.size() > 0) {
+ List<ConsumerRecord<String, String>> sideInputRecords =
+ consumeMessages(sideInputTopicName, sideInputMessages.size());
+ List<String> readSideInputMessages = sideInputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
+ Assert.assertEquals(sideInputMessages, readSideInputMessages);
+ }
+
+ // run the application
+ RunApplicationContext context = runApplication(
+ new MyStatefulApplication(inputSystem, inputTopicName,
+ regularStoreNames, regularStoreChangelogTopics,
+ inMemoryStoreNames, inMemoryStoreChangelogTopics,
+ Optional.of(sideInputStoreName), Optional.of(sideInputTopicName), Optional.of(new MySideInputProcessor())),
+ "myApp", overriddenConfigs);
+
+ // wait for the application to finish
+ context.getRunner().waitForFinish();
+
+ // consume and verify the changelog messages
+ HashSet<String> changelogTopics = new HashSet<>(regularStoreChangelogTopics.values());
+ changelogTopics.addAll(inMemoryStoreChangelogTopics.values());
+ changelogTopics.forEach(changelogTopicName -> {
+ if (expectedChangelogMessagesAfterInitialRun.size() > 0) {
+ List<ConsumerRecord<String, String>> changelogRecords =
+ consumeMessages(changelogTopicName, expectedChangelogMessagesAfterInitialRun.size());
+ List<String> changelogMessages = changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
+ Assert.assertEquals(expectedChangelogMessagesAfterInitialRun, changelogMessages);
+ }
+ });
+
+ LOG.info("Finished initial run");
+ }
+
+ public void secondRun(
+ boolean hostAffinity,
+ String loggedStoreBaseDir,
+ String inputSystem,
+ String inputTopicName,
+ String sideInputTopicName,
+ List<String> inputMessages,
+ List<String> sideInputMessages,
+ Set<String> regularStoreNames,
+ Map<String, String> regularStoreChangelogTopics,
+ Set<String> inMemoryStoreNames,
+ Map<String, String> inMemoryStoreChangelogTopics,
+ String sideInputStoreName,
+ List<String> expectedChangelogMessagesAfterSecondRun,
+ List<String> expectedInitialStoreContents,
+ List<String> expectedInitialInMemoryStoreContents,
+ List<String> expectedInitialSideInputStoreContents,
+ Map<String, String> overriddenConfigs) {
+ // clear the local store directory
+ if (!hostAffinity) {
+ new FileUtil().rm(new File(loggedStoreBaseDir));
+ }
+
+ // produce the second batch of input messages
+
+ inputMessages.forEach(m -> produceMessage(inputTopicName, 0, m, m));
+
+ // produce the second batch of side input messages
+ sideInputMessages.forEach(m -> produceMessage(sideInputTopicName, 0, m, m));
+
+ // run the application
+ RunApplicationContext context = runApplication(
+ new MyStatefulApplication(inputSystem, inputTopicName,
+ regularStoreNames, regularStoreChangelogTopics,
+ inMemoryStoreNames, inMemoryStoreChangelogTopics,
+ Optional.of(sideInputStoreName), Optional.of(sideInputTopicName), Optional.of(new MySideInputProcessor())),
+ "myApp", overriddenConfigs);
+
+ // wait for the application to finish
+ context.getRunner().waitForFinish();
+
+ // verify the store contents during startup
+ for (String storeName: regularStoreNames) {
+ Assert.assertEquals(expectedInitialStoreContents,
+ MyStatefulApplication.getInitialStoreContents().get(storeName));
+ }
+
+ // verify the memory store contents during startup
+ for (String storeName: inMemoryStoreNames) {
+ Assert.assertEquals(expectedInitialInMemoryStoreContents,
+ MyStatefulApplication.getInitialInMemoryStoreContents().get(storeName));
+ }
+
+ // verify that the side input store contents during startup include messages up to head of topic
+ Assert.assertEquals(expectedInitialSideInputStoreContents,
+ MyStatefulApplication.getInitialSideInputStoreContents().get(sideInputStoreName));
+
+ // consume and verify any additional changelog messages for stores with changelogs
+ HashSet<String> changelogTopics = new HashSet<>(regularStoreChangelogTopics.values());
+ changelogTopics.addAll(inMemoryStoreChangelogTopics.values());
+ changelogTopics.forEach(changelogTopicName -> {
+ List<ConsumerRecord<String, String>> changelogRecords =
+ consumeMessages(changelogTopicName, expectedChangelogMessagesAfterSecondRun.size());
+ List<String> changelogMessages = changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
+ Assert.assertEquals(expectedChangelogMessagesAfterSecondRun, changelogMessages);
+ });
+ }
+
+ static class MySideInputProcessor implements SideInputsProcessor, Serializable {
+ @Override
+ public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store) {
+ return ImmutableSet.of(new Entry<>(message.getKey(), message.getMessage()));
+ }
+ }
+}
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
new file mode 100644
index 0000000..f654459
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.config.BlobStoreConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.storage.MyStatefulApplication;
+import org.apache.samza.storage.SideInputsProcessor;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.blobstore.Metadata;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.test.util.TestBlobStoreManager;
+import org.apache.samza.util.FileUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(value = Parameterized.class)
+public class BlobStoreStateBackendIntegrationTest extends BaseStateBackendIntegrationTest {
+ @Parameterized.Parameters(name = "hostAffinity={0}")
+ public static Collection<Boolean> data() {
+ return Arrays.asList(true, false);
+ }
+
+ private static final String INPUT_SYSTEM = "kafka";
+ private static final String INPUT_TOPIC = "inputTopic";
+ private static final String SIDE_INPUT_TOPIC = "sideInputTopic";
+
+ private static final String REGULAR_STORE_NAME = "regularStore";
+ private static final String IN_MEMORY_STORE_NAME = "inMemoryStore";
+ private static final String SIDE_INPUT_STORE_NAME = "sideInputStore";
+
+ private static final String IN_MEMORY_STORE_CHANGELOG_TOPIC = "inMemoryStoreChangelog";
+
+ private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
+ private static final String BLOB_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "blob-store").getAbsolutePath();
+ private static final String BLOB_STORE_LEDGER_DIR = new File(BLOB_STORE_BASE_DIR, "ledger").getAbsolutePath();
+
+ private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
+ put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+ put(JobConfig.PROCESSOR_ID, "0");
+ put(TaskConfig.GROUPER_FACTORY, "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+
+ put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2, 1");
+ put(TaskConfig.CHECKPOINT_WRITE_VERSIONS, "1, 2");
+ put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
+ put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
+
+ put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
+ put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are skipped due to in progress commits
+
+ // override store level state backend for in memory stores to use Kafka changelogs
+ put(String.format(StorageConfig.STORE_BACKUP_FACTORIES, IN_MEMORY_STORE_NAME),
+ "org.apache.samza.storage.KafkaChangelogStateBackendFactory");
+ put(String.format(StorageConfig.STORE_RESTORE_FACTORIES, IN_MEMORY_STORE_NAME),
+ "org.apache.samza.storage.KafkaChangelogStateBackendFactory");
+
+ put(StorageConfig.JOB_BACKUP_FACTORIES, "org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory");
+ put(StorageConfig.JOB_RESTORE_FACTORIES, "org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory");
+ put(BlobStoreConfig.BLOB_STORE_MANAGER_FACTORY, "org.apache.samza.test.util.TestBlobStoreManagerFactory");
+
+ put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
+ put(TestBlobStoreManager.BLOB_STORE_BASE_DIR, BLOB_STORE_BASE_DIR);
+ put(TestBlobStoreManager.BLOB_STORE_LEDGER_DIR, BLOB_STORE_LEDGER_DIR);
+ } };
+
+ private final boolean hostAffinity;
+
+ public BlobStoreStateBackendIntegrationTest(boolean hostAffinity) {
+ this.hostAffinity = hostAffinity;
+ }
+
+ @Before
+ @Override
+ public void setUp() {
+ super.setUp();
+ // reset static state shared with task between each parameterized iteration
+ MyStatefulApplication.resetTestState();
+ FileUtil fileUtil = new FileUtil();
+ fileUtil.rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local store on startup
+ // no need to clear ledger dir since subdir of blob store base dir
+ fileUtil.rm(new File(BLOB_STORE_BASE_DIR)); // always clear local "blob store" on startup
+
+ }
+
+ @Test
+ public void testStopAndRestart() {
+ List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
+ List<String> sideInputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "4", "5", "6");
+ initialRun(
+ INPUT_SYSTEM,
+ INPUT_TOPIC,
+ SIDE_INPUT_TOPIC,
+ inputMessagesOnInitialRun,
+ sideInputMessagesOnInitialRun,
+ ImmutableSet.of(REGULAR_STORE_NAME),
+ Collections.emptyMap(),
+ ImmutableSet.of(IN_MEMORY_STORE_NAME),
+ ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+ SIDE_INPUT_STORE_NAME,
+ Collections.emptyList(),
+ CONFIGS);
+
+ Pair<String, SnapshotIndex> lastRegularSnapshot =
+ verifyLedger(REGULAR_STORE_NAME, Optional.empty(), hostAffinity, false, false);
+ Pair<String, SnapshotIndex> lastSideInputSnapshot =
+ verifyLedger(SIDE_INPUT_STORE_NAME, Optional.empty(), hostAffinity, true,
+ false /* no side input offsets file will be present during initial restore */);
+
+ // verifies transactional state too
+ List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", ":shutdown");
+ List<String> sideInputMessagesBeforeSecondRun = Arrays.asList("7", "8", "9");
+ List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
+ // verifies that in-memory stores backed by changelogs work correctly
+ // (requires overriding store level state backends explicitly)
+ List<String> expectedInitialInMemoryStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
+ List<String> expectedInitialSideInputStoreContentsOnSecondRun = new ArrayList<>(sideInputMessagesOnInitialRun);
+ expectedInitialSideInputStoreContentsOnSecondRun.addAll(sideInputMessagesBeforeSecondRun);
+ secondRun(
+ hostAffinity,
+ LOGGED_STORE_BASE_DIR,
+ INPUT_SYSTEM,
+ INPUT_TOPIC,
+ SIDE_INPUT_TOPIC,
+ inputMessagesBeforeSecondRun,
+ sideInputMessagesBeforeSecondRun,
+ ImmutableSet.of(REGULAR_STORE_NAME),
+ Collections.emptyMap(),
+ ImmutableSet.of(IN_MEMORY_STORE_NAME),
+ ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+ SIDE_INPUT_STORE_NAME,
+ Collections.emptyList(),
+ expectedInitialStoreContentsOnSecondRun,
+ expectedInitialInMemoryStoreContentsOnSecondRun,
+ expectedInitialSideInputStoreContentsOnSecondRun,
+ CONFIGS);
+
+ verifyLedger(REGULAR_STORE_NAME, Optional.of(lastRegularSnapshot), hostAffinity, false, false);
+ verifyLedger(SIDE_INPUT_STORE_NAME, Optional.of(lastSideInputSnapshot), hostAffinity, true, true);
+ }
+
+ /**
+ * Verifies the ledger for TestBlobStoreManager.
+ * @param startingSnapshot snapshot file name and files present in snapshot at the beginning of verification (from previous run), if any.
+ * @return Pair file for latest snapshot at time of verification
+ */
+ private static Pair<String, SnapshotIndex> verifyLedger(String storeName,
+ Optional<Pair<String, SnapshotIndex>> startingSnapshot,
+ boolean hostAffinity, boolean verifySideInputOffsetsUploaded, boolean verifySideInputOffsetsRestored) {
+ Path ledgerLocation = Paths.get(BLOB_STORE_LEDGER_DIR);
+ try {
+ File filesAddedLedger = Paths.get(ledgerLocation.toString(), TestBlobStoreManager.LEDGER_FILES_ADDED).toFile();
+ Set<String> filesAdded = Files.lines(filesAddedLedger.toPath()).filter(l -> l.contains(storeName)).collect(Collectors.toSet());
+ File filesReadLedger = Paths.get(ledgerLocation.toString(), TestBlobStoreManager.LEDGER_FILES_READ).toFile();
+ Set<String> filesRead = Files.lines(filesReadLedger.toPath()).filter(l -> l.contains(storeName)).collect(Collectors.toSet());
+ File filesDeletedLedger = Paths.get(ledgerLocation.toString(), TestBlobStoreManager.LEDGER_FILES_DELETED).toFile();
+ Set<String> filesDeleted = Files.lines(filesDeletedLedger.toPath()).filter(l -> l.contains(storeName)).collect(Collectors.toSet());
+ File filesTTLUpdatedLedger = Paths.get(ledgerLocation.toString(), TestBlobStoreManager.LEDGER_FILES_TTL_UPDATED).toFile();
+ Set<String> filesTTLUpdated = Files.lines(filesTTLUpdatedLedger.toPath()).filter(l -> l.contains(storeName)).collect(Collectors.toSet());
+
+ // 1. test that files read = files present in last snapshot *at run start* + snapshot file itself + previous snapshot files
+ if (startingSnapshot.isPresent() && !hostAffinity) { // no restore if host affinity (local state already present)
+ Set<String> filesPresentInStartingSnapshot = startingSnapshot.get().getRight()
+ .getDirIndex().getFilesPresent().stream()
+ .map(fi -> fi.getBlobs().get(0).getBlobId()).collect(Collectors.toSet());
+ Set<String> filesToRestore = new HashSet<>();
+ filesToRestore.add(startingSnapshot.get().getLeft());
+ filesToRestore.addAll(filesPresentInStartingSnapshot);
+ // assert that all files to restore in starting snapshot + starting snapshot itself are present in files read
+ assertTrue(Sets.difference(filesToRestore, filesRead).isEmpty());
+ // assert that the remaining read files are all snapshot indexes (for post commit cleanup)
+ assertTrue(Sets.difference(filesRead, filesToRestore).stream().allMatch(s -> s.contains(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH)));
+ }
+
+ // read files added again as ordered list, not set, to get last file added
+ List<String> filesAddedLines = Files.readAllLines(filesAddedLedger.toPath()).stream().filter(l -> l.contains(storeName)).collect(Collectors.toList());
+ String lastFileAdded = filesAddedLines.get(filesAddedLines.size() - 1); // get last line.
+ assertTrue(lastFileAdded.contains(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH)); // assert is a snapshot
+ SnapshotIndex lastSnapshotIndex = new SnapshotIndexSerde().fromBytes(Files.readAllBytes(Paths.get(lastFileAdded)));
+ Set<String> filesPresentInLastSnapshot = lastSnapshotIndex.getDirIndex().getFilesPresent().stream()
+ .map(fi -> fi.getBlobs().get(0).getBlobId()).collect(Collectors.toSet());
+
+ // 2. test that all added files were ttl reset
+ assertEquals(filesAdded, filesTTLUpdated);
+
+ // 3. test that files deleted = files added - files present in last snapshot + snapshot file itself
+ // i.e., net remaining files (files added - files deleted) = files present in last snapshot + snapshot file itself.
+ assertEquals(Sets.difference(filesAdded, filesDeleted),
+ Sets.union(filesPresentInLastSnapshot, Collections.singleton(lastFileAdded)));
+
+ // 4. test that the files restored/added for side input stores contains side input offsets file
+ if (verifySideInputOffsetsUploaded) {
+ assertTrue(filesAdded.stream().anyMatch(f -> f.contains(StorageManagerUtil.SIDE_INPUT_OFFSET_FILE_NAME_LEGACY)));
+ }
+
+ if (!hostAffinity && verifySideInputOffsetsRestored) { // only read / restored if no host affinity
+ assertTrue(filesRead.stream().anyMatch(f -> f.contains(StorageManagerUtil.SIDE_INPUT_OFFSET_FILE_NAME_LEGACY)));
+ }
+
+ return Pair.of(lastFileAdded, lastSnapshotIndex);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static class MySideInputProcessor implements SideInputsProcessor, Serializable {
+ @Override
+ public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store) {
+ return ImmutableSet.of(new Entry<>(message.getKey(), message.getMessage()));
+ }
+ }
+}
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java
new file mode 100644
index 0000000..66ef3eb
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.storage.MyStatefulApplication;
+import org.apache.samza.util.FileUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+
+@RunWith(value = Parameterized.class)
+public class KafkaNonTransactionalStateIntegrationTest extends BaseStateBackendIntegrationTest {
+ @Parameterized.Parameters(name = "hostAffinity={0}")
+ public static Collection<Boolean> data() {
+ return Arrays.asList(true, false);
+ }
+
+ private static final String INPUT_SYSTEM = "kafka";
+ private static final String INPUT_TOPIC = "inputTopic";
+ private static final String SIDE_INPUT_TOPIC = "sideInputTopic";
+
+ private static final String REGULAR_STORE_NAME = "regularStore";
+ private static final String REGULAR_STORE_CHANGELOG_TOPIC = "changelog";
+ private static final String IN_MEMORY_STORE_NAME = "inMemoryStore";
+ private static final String IN_MEMORY_STORE_CHANGELOG_TOPIC = "inMemoryStoreChangelog";
+ private static final String SIDE_INPUT_STORE_NAME = "sideInputStore";
+
+ private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
+
+ private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
+ put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+ put(JobConfig.PROCESSOR_ID, "0");
+ put(TaskConfig.GROUPER_FACTORY, "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+
+ put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
+ put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
+
+ put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
+ put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are skipped due to in progress commits
+
+ put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "false");
+ put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "false");
+ put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
+ } };
+
+ private final boolean hostAffinity;
+
+ public KafkaNonTransactionalStateIntegrationTest(boolean hostAffinity) {
+ this.hostAffinity = hostAffinity;
+ }
+
+ @Before
+ @Override
+ public void setUp() {
+ super.setUp();
+ // reset static state shared with task between each parameterized iteration
+ MyStatefulApplication.resetTestState();
+ new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local store on startup
+ }
+
+ @Test
+ public void testStopAndRestart() {
+ List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
+ List<String> sideInputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "4", "5", "6");
+ List<String> expectedChangelogMessagesAfterInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
+ initialRun(
+ INPUT_SYSTEM,
+ INPUT_TOPIC,
+ SIDE_INPUT_TOPIC,
+ inputMessagesOnInitialRun,
+ sideInputMessagesOnInitialRun,
+ ImmutableSet.of(REGULAR_STORE_NAME),
+ ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+ ImmutableSet.of(IN_MEMORY_STORE_NAME),
+ ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+ SIDE_INPUT_STORE_NAME,
+ expectedChangelogMessagesAfterInitialRun,
+ CONFIGS);
+
+ // first two are reverts for uncommitted messages from last run for keys 98 and 99
+ List<String> expectedChangelogMessagesAfterSecondRun =
+ Arrays.asList("98", "99", "4", "5", "5");
+ List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", ":shutdown");
+ List<String> sideInputMessagesBeforeSecondRun = Arrays.asList("7", "8", "9");
+ List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3", "98", "99");
+ List<String> expectedInitialInMemoryStoreContentsOnSecondRun = Arrays.asList("1", "2", "3", "98", "99");
+ List<String> expectedInitialSideInputStoreContentsOnSecondRun = new ArrayList<>(sideInputMessagesOnInitialRun);
+ expectedInitialSideInputStoreContentsOnSecondRun.addAll(sideInputMessagesBeforeSecondRun);
+ secondRun(
+ hostAffinity,
+ LOGGED_STORE_BASE_DIR,
+ INPUT_SYSTEM,
+ INPUT_TOPIC,
+ SIDE_INPUT_TOPIC,
+ inputMessagesBeforeSecondRun,
+ sideInputMessagesBeforeSecondRun,
+ ImmutableSet.of(REGULAR_STORE_NAME),
+ ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+ ImmutableSet.of(IN_MEMORY_STORE_NAME),
+ ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+ SIDE_INPUT_STORE_NAME,
+ expectedChangelogMessagesAfterSecondRun,
+ expectedInitialStoreContentsOnSecondRun,
+ expectedInitialInMemoryStoreContentsOnSecondRun,
+ expectedInitialSideInputStoreContentsOnSecondRun,
+ CONFIGS);
+ }
+}
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java
new file mode 100644
index 0000000..8bec243
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.storage.MyStatefulApplication;
+import org.apache.samza.util.FileUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+
+@RunWith(value = Parameterized.class)
+public class KafkaTransactionalStateIntegrationTest extends BaseStateBackendIntegrationTest {
+ @Parameterized.Parameters(name = "hostAffinity={0}")
+ public static Collection<Boolean> data() {
+ return Arrays.asList(true, false);
+ }
+
+ private static final String INPUT_SYSTEM = "kafka";
+ private static final String INPUT_TOPIC = "inputTopic";
+ private static final String SIDE_INPUT_TOPIC = "sideInputTopic";
+
+ private static final String REGULAR_STORE_NAME = "regularStore";
+ private static final String REGULAR_STORE_CHANGELOG_TOPIC = "changelog";
+ private static final String IN_MEMORY_STORE_NAME = "inMemoryStore";
+ private static final String IN_MEMORY_STORE_CHANGELOG_TOPIC = "inMemoryStoreChangelog";
+ private static final String SIDE_INPUT_STORE_NAME = "sideInputStore";
+
+ private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
+
+ private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
+ put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+ put(JobConfig.PROCESSOR_ID, "0");
+ put(TaskConfig.GROUPER_FACTORY, "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+
+ put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
+ put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
+
+ put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
+ put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are skipped due to in progress commits
+
+ put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
+ put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
+ put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
+ } };
+
+ private final boolean hostAffinity;
+
+ public KafkaTransactionalStateIntegrationTest(boolean hostAffinity) {
+ this.hostAffinity = hostAffinity;
+ }
+
+ @Before
+ @Override
+ public void setUp() {
+ super.setUp();
+ // reset static state shared with task between each parameterized iteration
+ MyStatefulApplication.resetTestState();
+ new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local store on startup
+ }
+
+ @Test
+ public void testStopAndRestart() {
+ List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
+ List<String> sideInputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "4", "5", "6");
+ List<String> expectedChangelogMessagesAfterInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
+ initialRun(
+ INPUT_SYSTEM,
+ INPUT_TOPIC,
+ SIDE_INPUT_TOPIC,
+ inputMessagesOnInitialRun,
+ sideInputMessagesOnInitialRun,
+ ImmutableSet.of(REGULAR_STORE_NAME),
+ ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+ ImmutableSet.of(IN_MEMORY_STORE_NAME),
+ ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+ SIDE_INPUT_STORE_NAME,
+ expectedChangelogMessagesAfterInitialRun,
+ CONFIGS);
+
+ // first two are reverts for uncommitted messages from last run for keys 98 and 99
+ List<String> expectedChangelogMessagesAfterSecondRun =
+ Arrays.asList(null, null, "98", "99", "4", "5", "5");
+ List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", ":shutdown");
+ List<String> sideInputMessagesBeforeSecondRun = Arrays.asList("7", "8", "9");
+ List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
+ List<String> expectedInitialInMemoryStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
+ List<String> expectedInitialSideInputStoreContentsOnSecondRun = new ArrayList<>(sideInputMessagesOnInitialRun);
+ expectedInitialSideInputStoreContentsOnSecondRun.addAll(sideInputMessagesBeforeSecondRun);
+ secondRun(
+ hostAffinity,
+ LOGGED_STORE_BASE_DIR,
+ INPUT_SYSTEM,
+ INPUT_TOPIC,
+ SIDE_INPUT_TOPIC,
+ inputMessagesBeforeSecondRun,
+ sideInputMessagesBeforeSecondRun,
+ ImmutableSet.of(REGULAR_STORE_NAME),
+ ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+ ImmutableSet.of(IN_MEMORY_STORE_NAME),
+ ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+ SIDE_INPUT_STORE_NAME,
+ expectedChangelogMessagesAfterSecondRun,
+ expectedInitialStoreContentsOnSecondRun,
+ expectedInitialInMemoryStoreContentsOnSecondRun,
+ expectedInitialSideInputStoreContentsOnSecondRun,
+ CONFIGS);
+ }
+
+ @Test
+ public void testWithEmptyChangelogFromInitialRun() {
+ // expected changelog messages will always match since we'll read 0 messages
+ initialRun(
+ INPUT_SYSTEM,
+ INPUT_TOPIC,
+ SIDE_INPUT_TOPIC,
+ ImmutableList.of("crash_once"),
+ Collections.emptyList(),
+ ImmutableSet.of(REGULAR_STORE_NAME),
+ ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+ ImmutableSet.of(IN_MEMORY_STORE_NAME),
+ ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+ SIDE_INPUT_STORE_NAME,
+ Collections.emptyList(),
+ CONFIGS);
+
+ List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", ":shutdown");
+ List<String> expectedChangelogMessagesAfterSecondRun = Arrays.asList("4", "5", "5");
+ secondRun(hostAffinity,
+ LOGGED_STORE_BASE_DIR,
+ INPUT_SYSTEM,
+ INPUT_TOPIC,
+ SIDE_INPUT_TOPIC,
+ inputMessagesBeforeSecondRun,
+ Collections.emptyList(),
+ ImmutableSet.of(REGULAR_STORE_NAME),
+ ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+ ImmutableSet.of(IN_MEMORY_STORE_NAME),
+ ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+ SIDE_INPUT_STORE_NAME,
+ expectedChangelogMessagesAfterSecondRun,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ CONFIGS);
+ }
+
+ @Test
+ public void testWithNewChangelogAfterInitialRun() {
+ List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
+ List<String> expectedChangelogMessagesAfterInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
+ initialRun(
+ INPUT_SYSTEM,
+ INPUT_TOPIC,
+ SIDE_INPUT_TOPIC,
+ inputMessagesOnInitialRun,
+ Collections.emptyList(),
+ ImmutableSet.of(REGULAR_STORE_NAME),
+ ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+ ImmutableSet.of(IN_MEMORY_STORE_NAME),
+ ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+ SIDE_INPUT_STORE_NAME,
+ expectedChangelogMessagesAfterInitialRun,
+ CONFIGS);
+
+ // admin client delete topic doesn't seem to work, times out up to 60 seconds.
+ // simulate delete topic by changing the changelog topic instead.
+ String newChangelogTopic = "changelog2";
+ String newInMemoryStoreChangelogTopic = "inMemChangelog2";
+ List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", ":shutdown");
+ List<String> expectedChangelogMessagesAfterSecondRun = Arrays.asList("98", "99", "4", "5", "5");
+ secondRun(hostAffinity,
+ LOGGED_STORE_BASE_DIR,
+ INPUT_SYSTEM,
+ INPUT_TOPIC,
+ SIDE_INPUT_TOPIC,
+ inputMessagesBeforeSecondRun,
+ Collections.emptyList(),
+ ImmutableSet.of(REGULAR_STORE_NAME),
+ ImmutableMap.of(REGULAR_STORE_NAME, newChangelogTopic),
+ ImmutableSet.of(IN_MEMORY_STORE_NAME),
+ ImmutableMap.of(IN_MEMORY_STORE_NAME, newInMemoryStoreChangelogTopic),
+ SIDE_INPUT_STORE_NAME,
+ expectedChangelogMessagesAfterSecondRun,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ CONFIGS);
+ }
+}
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
deleted file mode 100644
index 771f93d..0000000
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-import com.google.common.collect.ImmutableList;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.KafkaConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.storage.MyStatefulApplication;
-import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness;
-import org.apache.samza.util.FileUtil;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-@RunWith(value = Parameterized.class)
-public class TransactionalStateIntegrationTest extends StreamApplicationIntegrationTestHarness {
- @Parameterized.Parameters(name = "hostAffinity={0}")
- public static Collection<Boolean> data() {
- return Arrays.asList(true, false);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(TransactionalStateIntegrationTest.class);
-
- private static final String INPUT_TOPIC = "inputTopic";
- private static final String INPUT_SYSTEM = "kafka";
- private static final String STORE_NAME = "store";
- private static final String CHANGELOG_TOPIC = "changelog";
- private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
- private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
- put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
- put(JobConfig.PROCESSOR_ID, "0");
- put(TaskConfig.GROUPER_FACTORY, "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
- put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
- put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
- put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
- put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
- put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
- put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
- put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are skipped due to in progress commits
- } };
-
- private final boolean hostAffinity;
-
- public TransactionalStateIntegrationTest(boolean hostAffinity) {
- this.hostAffinity = hostAffinity;
- }
-
- @Before
- @Override
- public void setUp() {
- super.setUp();
- // reset static state shared with task between each parameterized iteration
- MyStatefulApplication.resetTestState();
- new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local store on startup
- }
-
- @Test
- public void testStopAndRestart() {
- List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
- // double check collectors.flush
- List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
- initialRun(inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun);
-
- // first two are reverts for uncommitted messages from last run for keys 98 and 99
- List<String> expectedChangelogMessagesAfterSecondRun =
- Arrays.asList(null, null, "98", "99", "4", "5", "5");
- List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
- secondRun(CHANGELOG_TOPIC,
- expectedChangelogMessagesAfterSecondRun, expectedInitialStoreContentsOnSecondRun, CONFIGS);
- }
-
- @Test
- public void testWithEmptyChangelogFromInitialRun() {
- // expected changelog messages will always match since we'll read 0 messages
- initialRun(ImmutableList.of("crash_once"), Collections.emptyList());
- secondRun(CHANGELOG_TOPIC, ImmutableList.of("4", "5", "5"), Collections.emptyList(), CONFIGS);
- }
-
- @Test
- public void testWithNewChangelogAfterInitialRun() {
- List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
- List<String> expectedChangelogMessagesAfterInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
- initialRun(inputMessagesOnInitialRun, expectedChangelogMessagesAfterInitialRun);
-
- // admin client delete topic doesn't seem to work, times out up to 60 seconds.
- // simulate delete topic by changing the changelog topic instead.
- String newChangelogTopic = "changelog2";
- LOG.info("Changing changelog topic to: {}", newChangelogTopic);
- secondRun(newChangelogTopic, ImmutableList.of("98", "99", "4", "5", "5"), Collections.emptyList(), CONFIGS);
- }
-
- private void initialRun(List<String> inputMessages, List<String> expectedChangelogMessages) {
- // create input topic and produce the first batch of input messages
- createTopic(INPUT_TOPIC, 1);
- inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
-
- // verify that the input messages were produced successfully
- if (inputMessages.size() > 0) {
- List<ConsumerRecord<String, String>> inputRecords =
- consumeMessages(INPUT_TOPIC, inputMessages.size());
- List<String> readInputMessages = inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
- Assert.assertEquals(inputMessages, readInputMessages);
- }
-
- // run the application
- RunApplicationContext context = runApplication(
- new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, Collections.singletonMap(STORE_NAME, CHANGELOG_TOPIC)),
- "myApp", CONFIGS);
-
- // wait for the application to finish
- context.getRunner().waitForFinish();
-
- // consume and verify the changelog messages
- if (expectedChangelogMessages.size() > 0) {
- List<ConsumerRecord<String, String>> changelogRecords =
- consumeMessages(CHANGELOG_TOPIC, expectedChangelogMessages.size());
- List<String> changelogMessages = changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
- Assert.assertEquals(expectedChangelogMessages, changelogMessages);
- }
-
- LOG.info("Finished initial run");
- }
-
- private void secondRun(String changelogTopic, List<String> expectedChangelogMessages,
- List<String> expectedInitialStoreContents, Map<String, String> overriddenConfigs) {
- // clear the local store directory
- if (!hostAffinity) {
- new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR));
- }
-
- // produce the second batch of input messages
-
- List<String> inputMessages = Arrays.asList("4", "5", "5", ":shutdown");
- inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
-
- // run the application
- RunApplicationContext context = runApplication(
- new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, Collections.singletonMap(STORE_NAME, changelogTopic)),
- "myApp", overriddenConfigs);
-
- // wait for the application to finish
- context.getRunner().waitForFinish();
-
- // consume and verify any additional changelog messages
- List<ConsumerRecord<String, String>> changelogRecords =
- consumeMessages(changelogTopic, expectedChangelogMessages.size());
- List<String> changelogMessages = changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
- Assert.assertEquals(expectedChangelogMessages, changelogMessages);
-
- // verify the store contents during startup (this is after changelog verification to ensure init has completed)
- Assert.assertEquals(expectedInitialStoreContents, MyStatefulApplication.getInitialStoreContents().get(STORE_NAME));
- }
-}
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
deleted file mode 100644
index 765bd45..0000000
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-import com.google.common.collect.ImmutableList;
-
-import com.google.common.collect.ImmutableMap;
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.samza.application.SamzaApplication;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.KafkaConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.storage.MyStatefulApplication;
-import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness;
-import org.apache.samza.util.FileUtil;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Runs the same tests as {@link TransactionalStateIntegrationTest}, except with an additional
- * unused store with a changelog. Ensures that the stores are isolated, e.g. if the checkpointed
- * changelog offset for one is null (no data).
- */
-@RunWith(value = Parameterized.class)
-public class TransactionalStateMultiStoreIntegrationTest extends StreamApplicationIntegrationTestHarness {
- @Parameterized.Parameters(name = "hostAffinity={0}")
- public static Collection<Boolean> data() {
- return Arrays.asList(true, false);
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(TransactionalStateMultiStoreIntegrationTest.class);
-
- private static final String INPUT_TOPIC = "inputTopic";
- private static final String INPUT_SYSTEM = "kafka";
- private static final String STORE_1_NAME = "store1";
- private static final String STORE_2_NAME = "store2";
- private static final String STORE_1_CHANGELOG = "changelog1";
- private static final String STORE_2_CHANGELOG = "changelog2";
- private static final String APP_NAME = "myApp";
- private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
- private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
- put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
- put(JobConfig.PROCESSOR_ID, "0");
- put(TaskConfig.GROUPER_FACTORY, "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
- put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
- put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
- put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
- put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
- put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
- put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
- put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are skipped due to in progress commits
- } };
-
- private final boolean hostAffinity;
-
- public TransactionalStateMultiStoreIntegrationTest(boolean hostAffinity) {
- this.hostAffinity = hostAffinity;
- }
-
- @Before
- @Override
- public void setUp() {
- super.setUp();
- // reset static state shared with task between each parameterized iteration
- MyStatefulApplication.resetTestState();
- new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local store on startup
- }
-
- @Test
- public void testStopAndRestart() {
- List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
- List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
- initialRun(inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun);
-
- // first two are reverts for uncommitted messages from last run for keys 98 and 99
- List<String> expectedChangelogMessagesOnSecondRun =
- Arrays.asList(null, null, "98", "99", "4", "5", "5");
- List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
- secondRun(STORE_1_CHANGELOG,
- expectedChangelogMessagesOnSecondRun, expectedInitialStoreContentsOnSecondRun);
- }
-
- @Test
- public void testWithEmptyChangelogFromInitialRun() {
- // expected changelog messages will always match since we'll read 0 messages
- initialRun(ImmutableList.of("crash_once"), Collections.emptyList());
- secondRun(STORE_1_CHANGELOG, ImmutableList.of("4", "5", "5"), Collections.emptyList());
- }
-
- @Test
- public void testWithNewChangelogAfterInitialRun() {
- List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
- List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
- initialRun(inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun);
-
- // admin client delete topic doesn't seem to work, times out up to 60 seconds.
- // simulate delete topic by changing the changelog topic instead.
- String newChangelogTopic = "changelog3";
- LOG.info("Changing changelog topic to: {}", newChangelogTopic);
- secondRun(newChangelogTopic, ImmutableList.of("98", "99", "4", "5", "5"), Collections.emptyList());
- }
-
- private void initialRun(List<String> inputMessages, List<String> expectedChangelogMessages) {
- // create input topic and produce the first batch of input messages
- createTopic(INPUT_TOPIC, 1);
- inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
-
- // verify that the input messages were produced successfully
- if (inputMessages.size() > 0) {
- List<ConsumerRecord<String, String>> inputRecords =
- consumeMessages(INPUT_TOPIC, inputMessages.size());
- List<String> readInputMessages = inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
- Assert.assertEquals(inputMessages, readInputMessages);
- }
-
- SamzaApplication app = new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, ImmutableMap.of(
- STORE_1_NAME, STORE_1_CHANGELOG,
- STORE_2_NAME, STORE_2_CHANGELOG
- ));
-
- // run the application
- RunApplicationContext context = runApplication(app, APP_NAME, CONFIGS);
-
-
- // consume and verify the changelog messages
- if (expectedChangelogMessages.size() > 0) {
- List<ConsumerRecord<String, String>> changelogRecords =
- consumeMessages(STORE_1_CHANGELOG, expectedChangelogMessages.size());
- List<String> changelogMessages = changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
- Assert.assertEquals(expectedChangelogMessages, changelogMessages);
- }
-
- // wait for the application to finish
- context.getRunner().waitForFinish();
- LOG.info("Finished initial run");
- }
-
- private void secondRun(String changelogTopic, List<String> expectedChangelogMessages,
- List<String> expectedInitialStoreContents) {
- // clear the local store directory
- if (!hostAffinity) {
- new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR));
- }
-
- // produce the second batch of input messages
-
- List<String> inputMessages = Arrays.asList("4", "5", "5", ":shutdown");
- inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
-
- SamzaApplication app = new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, ImmutableMap.of(
- STORE_1_NAME, changelogTopic,
- STORE_2_NAME, STORE_2_CHANGELOG
- ));
- // run the application
- RunApplicationContext context = runApplication(app, APP_NAME, CONFIGS);
-
- // wait for the application to finish
- context.getRunner().waitForFinish();
-
- // consume and verify any additional changelog messages
- List<ConsumerRecord<String, String>> changelogRecords =
- consumeMessages(changelogTopic, expectedChangelogMessages.size());
- List<String> changelogMessages = changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
- Assert.assertEquals(expectedChangelogMessages, changelogMessages);
-
- // verify the store contents during startup (this is after changelog verification to ensure init has completed)
- Assert.assertEquals(expectedInitialStoreContents, MyStatefulApplication.getInitialStoreContents().get(STORE_1_NAME));
- }
-}
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java
new file mode 100644
index 0000000..d247c10
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.storage.blobstore.BlobStoreManager;
+import org.apache.samza.storage.blobstore.Metadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBlobStoreManager implements BlobStoreManager {
+ private static final Logger LOG = LoggerFactory.getLogger(TestBlobStoreManager.class);
+ public static final String BLOB_STORE_BASE_DIR = "blob.store.base.dir";
+ public static final String BLOB_STORE_LEDGER_DIR = "blob.store.ledger.dir";
+ public static final String LEDGER_FILES_ADDED = "filesAdded";
+ public static final String LEDGER_FILES_READ = "filesRead";
+ public static final String LEDGER_FILES_DELETED = "filesRemoved";
+ public static final String LEDGER_FILES_TTL_UPDATED = "filesTTLUpdated";
+
+ private final Path stateLocation;
+ private final File filesAddedLedger;
+ private final File filesReadLedger;
+ private final File filesDeletedLedger;
+ private final File filesTTLUpdatedLedger;
+
+ public TestBlobStoreManager(Config config, ExecutorService executorService) {
+ this.stateLocation = Paths.get(config.get(BLOB_STORE_BASE_DIR));
+ Path ledgerLocation = Paths.get(config.get(BLOB_STORE_LEDGER_DIR));
+ try {
+ if (Files.notExists(ledgerLocation)) {
+ Files.createDirectories(ledgerLocation);
+ }
+
+ filesAddedLedger = Paths.get(ledgerLocation.toString(), LEDGER_FILES_ADDED).toFile();
+ filesReadLedger = Paths.get(ledgerLocation.toString(), LEDGER_FILES_READ).toFile();
+ filesDeletedLedger = Paths.get(ledgerLocation.toString(), LEDGER_FILES_DELETED).toFile();
+ filesTTLUpdatedLedger = Paths.get(ledgerLocation.toString(), LEDGER_FILES_TTL_UPDATED).toFile();
+
+ FileUtils.touch(filesAddedLedger);
+ FileUtils.touch(filesReadLedger);
+ FileUtils.touch(filesDeletedLedger);
+ FileUtils.touch(filesTTLUpdatedLedger);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void init() {
+ }
+
+ @Override
+ public CompletionStage<String> put(InputStream inputStream, Metadata metadata) {
+ String payloadPath = metadata.getPayloadPath();
+ String suffix;
+ if (payloadPath.equals(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH)) {
+ // include (fake checkpoint ID as an) unique ID in path for snapshot index blobs to avoid overwriting
+ suffix = payloadPath + "-" + CheckpointId.create();
+ } else {
+ String[] parts = payloadPath.split("/");
+ String checkpointId = parts[parts.length - 2];
+ String fileName = parts[parts.length - 1];
+ suffix = checkpointId + "/" + fileName;
+ }
+
+ Path destination = Paths.get(stateLocation.toString(), metadata.getJobName(), metadata.getJobId(),
+ metadata.getTaskName(), metadata.getStoreName(), suffix);
+ LOG.info("Creating file at {}", destination);
+ try {
+ FileUtils.writeStringToFile(filesAddedLedger, destination + "\n", Charset.defaultCharset(), true);
+ FileUtils.copyInputStreamToFile(inputStream, destination.toFile());
+ } catch (IOException e) {
+ throw new RuntimeException("Error creating file " + destination, e);
+ }
+ return CompletableFuture.completedFuture(destination.toString());
+ }
+
+ @Override
+ public CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata) {
+ LOG.info("Reading file at {}", id);
+ try {
+ FileUtils.writeStringToFile(filesReadLedger, id + "\n", Charset.defaultCharset(), true);
+ Path path = Paths.get(id);
+ Files.copy(path, outputStream);
+ outputStream.flush();
+ } catch (IOException e) {
+ throw new RuntimeException("Error reading file for id " + id, e);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletionStage<Void> delete(String id, Metadata metadata) {
+ LOG.info("Deleting file at {}", id);
+ try {
+ FileUtils.writeStringToFile(filesDeletedLedger, id + "\n", Charset.defaultCharset(), true);
+ Files.delete(Paths.get(id));
+ } catch (IOException e) {
+ throw new RuntimeException("Error deleting file for id " + id, e);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletionStage<Void> removeTTL(String blobId, Metadata metadata) {
+ LOG.info("Removing TTL (no-op) for file at {}", blobId);
+ try {
+ FileUtils.writeStringToFile(filesTTLUpdatedLedger, blobId + "\n", Charset.defaultCharset(), true);
+ } catch (IOException e) {
+ throw new RuntimeException("Error updating ttl for id " + blobId, e);
+ }
+
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManagerFactory.java b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManagerFactory.java
new file mode 100644
index 0000000..449f1c1
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManagerFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.util;
+
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.config.Config;
+import org.apache.samza.storage.blobstore.BlobStoreManager;
+import org.apache.samza.storage.blobstore.BlobStoreManagerFactory;
+
+public class TestBlobStoreManagerFactory implements BlobStoreManagerFactory {
+ @Override
+ public BlobStoreManager getBackupBlobStoreManager(Config config, ExecutorService backupExecutor) {
+ return new TestBlobStoreManager(config, backupExecutor);
+ }
+
+ @Override
+ public BlobStoreManager getRestoreBlobStoreManager(Config config, ExecutorService restoreExecutor) {
+ return new TestBlobStoreManager(config, restoreExecutor);
+ }
+}