Side Inputs + Blob Store Backups - Part 2 - Allow bulk restore from blob store for side input stores (#1657)

Allow bulk restore from blob store for side input stores. If a blob store state backend is available, side input store restores can be sped up significantly by bulk-restoring initial state from the blob store, and only using the Kafka side input topic to catch up on the delta since last checkpoint.

Fixed a bug found during additional testing: in-memory stores configured with a changelog are not being restored correctly. The store instance created initially is retained but not restored, instead a new store instance is created and restored from changelog, and then thrown away.
diff --git a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
index 88e4a5b..b1c0b78 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
@@ -339,7 +339,7 @@
     // Put non persisted stores
     nonPersistedStores.forEach(storageEngines::put);
     // Create persisted stores
-    storeNames.forEach(storeName -> {
+    storeNames.stream().filter(s -> !nonPersistedStores.containsKey(s)).forEach(storeName -> {
       boolean isLogged = this.storeChangelogs.containsKey(storeName);
       File storeBaseDir = isLogged ? this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory;
       File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskModel.getTaskName(),
diff --git a/samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java b/samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
index 981e847..1e5a298 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
@@ -30,6 +30,7 @@
 import org.apache.samza.task.ReadableCoordinator;
 import org.apache.samza.task.TaskCallback;
 import org.apache.samza.task.TaskCallbackFactory;
+import org.apache.samza.task.TaskCoordinator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,16 +45,20 @@
   private final Set<SystemStreamPartition> taskSSPs;
   private final TaskSideInputHandler taskSideInputHandler;
   private final TaskInstanceMetrics metrics;
+  private final long commitMs;
+  private volatile boolean loggedManualCommitWarning = false;
 
   public SideInputTask(
       TaskName taskName,
       Set<SystemStreamPartition> taskSSPs,
       TaskSideInputHandler taskSideInputHandler,
-      TaskInstanceMetrics metrics) {
+      TaskInstanceMetrics metrics,
+      long commitMs) {
     this.taskName = taskName;
     this.taskSSPs = taskSSPs;
     this.taskSideInputHandler = taskSideInputHandler;
     this.metrics = metrics;
+    this.commitMs = commitMs;
   }
 
   @Override
@@ -69,6 +74,14 @@
     try {
       this.taskSideInputHandler.process(envelope);
       this.metrics.messagesActuallyProcessed().inc();
+      if (commitMs <= 0 && !loggedManualCommitWarning) {
+        loggedManualCommitWarning = true; // log warning once
+        LOG.warn("Manual commit is enabled (task.commit.ms < 0). " +
+            "Side input task will request a commit after processing every message. " +
+            "This will incur a significant performance penalty and is not recommended.");
+        // commit every message. useful for integration tests. not desirable for regular use.
+        coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+      }
       callback.complete();
     } catch (Exception e) {
       callback.failure(e);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index badeb28..15028b9 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -35,7 +35,9 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Semaphore;
 import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.checkpoint.CheckpointV2;
@@ -70,6 +72,19 @@
   private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new CheckpointV2Serde();
 
   /**
+   * Unlike checkpoint or offset files, side input offset file can have multiple writers / readers,
+   * since they are written during SideInputTask commit and copied to store checkpoint directory
+   * by TaskStorageCommitManager during regular TaskInstance commit (these commits are on separate run loops).
+   *
+   * We use a (process-wide) semaphore to ensure that such write and copy operations are thread-safe.
+   *
+   * To avoid deadlocks between the two run loops, the semaphore should be acquired and released within
+   * the same method call, and any such methods should not call each other while holding a permit.
+   * We use a Semaphore instead of a ReentrantLock to make such cases easier to detect.
+   */
+  private static final Semaphore SIDE_INPUT_OFFSET_FILE_SEMAPHORE = new Semaphore(1);
+
+  /**
    * Fetch the starting offset for the input {@link SystemStreamPartition}
    *
    * Note: The method doesn't respect {@link org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT} and
@@ -125,7 +140,6 @@
       // if neither exists, we use 0L (the default return value of lastModified() when file does not exist
       File offsetFileRefNew = new File(storeDir, OFFSET_FILE_NAME_NEW);
       File offsetFileRefLegacy = new File(storeDir, OFFSET_FILE_NAME_LEGACY);
-      File sideInputOffsetFileRefLegacy = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
       File checkpointV2File = new File(storeDir, CHECKPOINT_FILE_NAME);
 
       if (checkpointV2File.exists()) {
@@ -134,8 +148,18 @@
         offsetFileLastModifiedTime = offsetFileRefNew.lastModified();
       } else if (!isSideInput && offsetFileRefLegacy.exists()) {
         offsetFileLastModifiedTime = offsetFileRefLegacy.lastModified();
-      } else if (isSideInput && sideInputOffsetFileRefLegacy.exists()) {
-        offsetFileLastModifiedTime = sideInputOffsetFileRefLegacy.lastModified();
+      } else if (isSideInput) {
+        try {
+          SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly();
+          File sideInputOffsetFileRefLegacy = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+          if (sideInputOffsetFileRefLegacy.exists()) {
+            offsetFileLastModifiedTime = sideInputOffsetFileRefLegacy.lastModified();
+          } else {
+            offsetFileLastModifiedTime = 0L;
+          }
+        } finally {
+          SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release();
+        }
       } else {
         offsetFileLastModifiedTime = 0L;
       }
@@ -223,9 +247,14 @@
 
     // Now we write the old format offset file, which are different for store-offset and side-inputs
     if (isSideInput) {
-      offsetFile = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
-      fileContents = SSP_OFFSET_OBJECT_WRITER.writeValueAsString(offsets);
-      fileUtil.writeWithChecksum(offsetFile, fileContents);
+      SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly();
+      try {
+        offsetFile = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+        fileContents = SSP_OFFSET_OBJECT_WRITER.writeValueAsString(offsets);
+        fileUtil.writeWithChecksum(offsetFile, fileContents);
+      } finally {
+        SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release();
+      }
     } else {
       offsetFile = new File(storeDir, OFFSET_FILE_NAME_LEGACY);
       fileUtil.writeWithChecksum(offsetFile, offsets.entrySet().iterator().next().getValue());
@@ -245,6 +274,32 @@
     fileUtil.writeWithChecksum(offsetFile, new String(fileContents));
   }
 
+  public void copySideInputOffsetFileToCheckpointDir(File storeBaseDir,
+      TaskName taskName, String storeName, TaskMode taskMode, CheckpointId checkpointId) {
+    File storeDir = getTaskStoreDir(storeBaseDir, storeName, taskName, taskMode);
+    File checkpointDir = new File(getStoreCheckpointDir(storeDir, checkpointId));
+
+    SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly();
+    try {
+      File storeSideInputOffsetsFile = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+      File checkpointDirSideInputOffsetsFile = new File(checkpointDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+      if (storeSideInputOffsetsFile.exists()) {
+        FileUtils.copyFile(storeSideInputOffsetsFile, checkpointDirSideInputOffsetsFile);
+      } else {
+        LOG.info("Did not find the file to copy: {} for taskName: {} taskMode: {} storeName: {} checkpointId: {}",
+            SIDE_INPUT_OFFSET_FILE_NAME_LEGACY, taskName, taskMode, storeName, checkpointId);
+      }
+    } catch (IOException e) {
+      String msg = String.format(
+          "Error copying %s file to checkpoint dir for taskName: %s taskMode: %s storeName: %s checkpointId: %s",
+          SIDE_INPUT_OFFSET_FILE_NAME_LEGACY, taskName, taskMode, storeName, checkpointId);
+      LOG.error(msg, e);
+      throw new SamzaException(msg, e);
+    } finally {
+      SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release();
+    }
+  }
+
   /**
    * Delete the offset file for this store, if one exists.
    * @param storeDir the directory of the store
@@ -286,7 +341,6 @@
 
     File offsetFileRefNew = new File(storagePartitionDir, OFFSET_FILE_NAME_NEW);
     File offsetFileRefLegacy = new File(storagePartitionDir, OFFSET_FILE_NAME_LEGACY);
-    File sideInputOffsetFileRefLegacy = new File(storagePartitionDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
 
     // First we check if the new offset file exists, if it does we read offsets from it regardless of old or new format,
     // if it doesn't exist, we check if the store is non-sideInput and legacy-offset file exists, if so we read offsets
@@ -296,8 +350,18 @@
       return readOffsetFile(storagePartitionDir, offsetFileRefNew.getName(), storeSSPs);
     } else if (!isSideInput && offsetFileRefLegacy.exists()) {
       return readOffsetFile(storagePartitionDir, offsetFileRefLegacy.getName(), storeSSPs);
-    } else if (isSideInput && sideInputOffsetFileRefLegacy.exists()) {
-      return readOffsetFile(storagePartitionDir, sideInputOffsetFileRefLegacy.getName(), storeSSPs);
+    } else if (isSideInput) {
+      Map<SystemStreamPartition, String> result = new HashMap<>();
+      SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly();
+      try {
+        File sideInputOffsetFileRefLegacy = new File(storagePartitionDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+        if (sideInputOffsetFileRefLegacy.exists()) {
+          result = readOffsetFile(storagePartitionDir, sideInputOffsetFileRefLegacy.getName(), storeSSPs);
+        }
+      } finally {
+        SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release();
+      }
+      return result;
     } else {
       return new HashMap<>();
     }
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
index 0c01fa1..dca0fa2 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
@@ -23,12 +23,14 @@
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import org.apache.commons.io.FileUtils;
@@ -42,6 +44,7 @@
 import org.apache.samza.checkpoint.CheckpointV2;
 import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
 import org.apache.samza.container.TaskInstanceMetrics;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.TaskMode;
@@ -63,6 +66,7 @@
   private final ContainerStorageManager containerStorageManager;
   private final Map<String, TaskBackupManager> stateBackendToBackupManager;
   private final Partition taskChangelogPartition;
+  private final StorageConfig config;
   private final StorageManagerUtil storageManagerUtil;
   private final ExecutorService backupExecutor;
   private final File durableStoreBaseDir;
@@ -81,6 +85,7 @@
     this.stateBackendToBackupManager = stateBackendToBackupManager;
     this.taskChangelogPartition = changelogPartition;
     this.checkpointManager = checkpointManager;
+    this.config = new StorageConfig(config);
     this.backupExecutor = backupExecutor;
     this.durableStoreBaseDir = durableStoreBaseDir;
     this.storeChangelogs = storeChangelogs;
@@ -122,7 +127,15 @@
     storageEngines.forEach((storeName, storageEngine) -> {
       if (storageEngine.getStoreProperties().isPersistedToDisk() &&
           storageEngine.getStoreProperties().isDurableStore()) {
-        storageEngine.checkpoint(checkpointId);
+        Optional<Path> checkpointDir = storageEngine.checkpoint(checkpointId);
+
+        // if checkpoint is for a side input store
+        if (checkpointDir.isPresent() && !config.getSideInputs(storeName).isEmpty()) {
+          storageManagerUtil.copySideInputOffsetFileToCheckpointDir(
+              durableStoreBaseDir, taskName, storeName, TaskMode.Active, checkpointId);
+          LOG.debug("Copied side input offsets file to checkpoint dir for taskName: {} storeName: {} checkpointId: {}",
+              taskName, storeName, checkpointId);
+        }
       }
     });
     long checkpointNs = System.nanoTime() - checkpointStartNs;
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
index 104bc2c..2122a16 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
@@ -178,9 +178,10 @@
   public void close() {
     TaskName taskName = taskModel.getTaskName();
     storeEngines.forEach((storeName, storeEngine) -> {
-      if (storeEngine.getStoreProperties().isPersistedToDisk())
+      if (storeEngine.getStoreProperties().isPersistedToDisk()) {
         storeEngine.stop();
-      LOG.info("Stopped persistent store: {} in task: {}", storeName, taskName);
+        LOG.info("Stopped persistent store: {} in task: {}", storeName, taskName);
+      }
     });
   }
 
@@ -192,7 +193,7 @@
     // Put non persisted stores
     nonPersistedStores.forEach(storageEngines::put);
     // Create persisted stores
-    storeNames.forEach(storeName -> {
+    storeNames.stream().filter(s -> !nonPersistedStores.containsKey(s)).forEach(storeName -> {
       boolean isLogged = this.storeChangelogs.containsKey(storeName);
       File storeBaseDir = isLogged ? this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory;
       File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskModel.getTaskName(),
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 64e52c5..c9ee065 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -51,6 +51,7 @@
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeManager;
+import org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemConsumer;
@@ -270,12 +271,30 @@
         LOG.info("Obtained checkpoint: {} for state restore for taskName: {}", taskCheckpoint, taskName);
       }
       taskCheckpoints.put(taskName, taskCheckpoint);
-      Map<String, Set<String>> backendFactoryStoreNames =
+
+      Map<String, Set<String>> backendFactoryToStoreNames =
           ContainerStorageManagerUtil.getBackendFactoryStoreNames(
               nonSideInputStoreNames, taskCheckpoint, new StorageConfig(config));
+
+      Map<String, Set<String>> backendFactoryToSideInputStoreNames =
+          ContainerStorageManagerUtil.getBackendFactoryStoreNames(
+              sideInputStoreNames, taskCheckpoint, new StorageConfig(config));
+
+      // include side input stores for (initial bulk) restore if backed up using blob store state backend
+      String blobStoreStateBackendFactory = BlobStoreStateBackendFactory.class.getName();
+      if (backendFactoryToSideInputStoreNames.containsKey(blobStoreStateBackendFactory)) {
+        Set<String> sideInputStoreNames = backendFactoryToSideInputStoreNames.get(blobStoreStateBackendFactory);
+
+        if (backendFactoryToStoreNames.containsKey(blobStoreStateBackendFactory)) {
+          backendFactoryToStoreNames.get(blobStoreStateBackendFactory).addAll(sideInputStoreNames);
+        } else {
+          backendFactoryToStoreNames.put(blobStoreStateBackendFactory, sideInputStoreNames);
+        }
+      }
+
       Map<String, TaskRestoreManager> taskStoreRestoreManagers =
           ContainerStorageManagerUtil.createTaskRestoreManagers(
-              taskName, backendFactoryStoreNames, restoreStateBackendFactories,
+              taskName, backendFactoryToStoreNames, restoreStateBackendFactories,
               storageEngineFactories, storeConsumers,
               inMemoryStores, systemAdmins, restoreExecutor,
               taskModel, jobContext, containerContext,
@@ -360,9 +379,13 @@
     // Stop each store consumer once
     this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop);
 
-    // Now create persistent non side input stores in read-write mode, leave non-persistent stores as-is
+    // Now create persistent non-side-input stores in read-write mode, leave non-persistent and side-input stores as-is
+    Set<String> inMemoryStoreNames =
+        ContainerStorageManagerUtil.getInMemoryStoreNames(this.storageEngineFactories, this.config);
+    Set<String> storesToCreate = nonSideInputStoreNames.stream()
+        .filter(s -> !inMemoryStoreNames.contains(s)).collect(Collectors.toSet());
     this.taskStores = ContainerStorageManagerUtil.createTaskStores(
-        nonSideInputStoreNames, this.storageEngineFactories, this.sideInputStoreNames,
+        storesToCreate, this.storageEngineFactories, this.sideInputStoreNames,
         this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths,
         this.containerModel, this.jobContext, this.containerContext,
         this.serdes, this.taskInstanceMetrics, this.taskInstanceCollectors, this.storageManagerUtil,
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
index 5b3d90a..d9c9cd4 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
@@ -171,14 +171,7 @@
       StorageManagerUtil storageManagerUtil,
       File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory,
       Config config) {
-    StorageConfig storageConfig = new StorageConfig(config);
-    Set<String> inMemoryStoreNames = storageEngineFactories.keySet().stream()
-        .filter(storeName -> {
-          Optional<String> storeFactory = storageConfig.getStorageFactoryClassName(storeName);
-          return storeFactory.isPresent() && storeFactory.get()
-              .equals(StorageConfig.INMEMORY_KV_STORAGE_ENGINE_FACTORY);
-        })
-        .collect(Collectors.toSet());
+    Set<String> inMemoryStoreNames = getInMemoryStoreNames(storageEngineFactories, config);
     return ContainerStorageManagerUtil.createTaskStores(
         inMemoryStoreNames, storageEngineFactories, sideInputStoreNames,
         activeTaskChangelogSystemStreams, storeDirectoryPaths,
@@ -187,6 +180,19 @@
         loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
   }
 
+  public static Set<String> getInMemoryStoreNames(
+      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
+      Config config) {
+    StorageConfig storageConfig = new StorageConfig(config);
+    return storageEngineFactories.keySet().stream()
+        .filter(storeName -> {
+          Optional<String> storeFactory = storageConfig.getStorageFactoryClassName(storeName);
+          return storeFactory.isPresent() && storeFactory.get()
+              .equals(StorageConfig.INMEMORY_KV_STORAGE_ENGINE_FACTORY);
+        })
+        .collect(Collectors.toSet());
+  }
+
   /**
    * Returns a map from store names to their corresponding changelog system consumers.
    * @return map of store name to its changelog system consumer.
@@ -308,7 +314,10 @@
   }
 
   /**
-   * Return a map of backend factory names to set of stores that should be restored using it
+   * Returns a map of backend factory names to subset of provided storeNames that should be restored using it.
+   * For CheckpointV1, only includes stores that should be restored using a configured changelog.
+   * For CheckpointV2, associates stores with the highest precedence configured restore factory that has a SCM in
+   * the checkpoint, or the highest precedence restore factory configured if there are no SCMs in the checkpoint.
    */
   public static Map<String, Set<String>> getBackendFactoryStoreNames(
       Set<String> storeNames, Checkpoint checkpoint, StorageConfig storageConfig) {
@@ -327,8 +336,8 @@
         Set<String> nonChangelogStores = storeNames.stream()
             .filter(storeName -> !changelogStores.contains(storeName))
             .collect(Collectors.toSet());
-        LOG.info("non-Side input stores: {}, do not have a configured store changelogs for checkpoint V1,"
-                + "restore for the store will be skipped",
+        LOG.info("Stores: {}, do not have a configured store changelogs for checkpoint V1,"
+                + "changelog restore for the store will be skipped.",
             nonChangelogStores);
       }
     } else if (checkpoint == null ||  checkpoint.getVersion() == 2) {
@@ -342,8 +351,8 @@
 
         if (storeFactories.isEmpty()) {
           // If the restore factory is not configured for the store and the store does not have a changelog topic
-          LOG.info("non-Side input store: {}, does not have a configured restore factories nor store changelogs,"
-                  + "restore for the store will be skipped",
+          LOG.info("Store: {} does not have a configured restore factory or a changelog topic, "
+                  + "restore for the store will be skipped.",
               storeName);
         } else {
           // Search the ordered list for the first matched state backend factory in the checkpoint
@@ -359,8 +368,8 @@
           } else { // Restore factories configured but no checkpoints found
             // Use first configured restore factory
             factoryName = storeFactories.get(0);
-            LOG.warn("No matching checkpoints found for configured factories: {}, " +
-                "defaulting to using the first configured factory with no checkpoints", storeFactories);
+            LOG.warn("No matching SCMs found for configured restore factories: {} for storeName: {}, " +
+                "defaulting to using the first configured factory with no SCM.", storeFactories, storeName);
           }
           if (!backendFactoryStoreNames.containsKey(factoryName)) {
             backendFactoryStoreNames.put(factoryName, new HashSet<>());
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
index 96390ba..3ccaf1a 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
@@ -222,7 +222,8 @@
           sideInputTaskMetrics.put(taskName, sideInputMetrics);
 
           RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs,
-              taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName));
+              taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName),
+              new TaskConfig(config).getCommitMs());
           sideInputTasks.put(taskName, sideInputTask);
         }
       });
diff --git a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
index ef8409c..99a9e77 100644
--- a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
@@ -19,12 +19,15 @@
 
 package org.apache.samza.checkpoint;
 
+import com.google.common.collect.ImmutableSet;
+
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.samza.config.JobConfig;
@@ -135,7 +138,11 @@
 
     // run the application
     RunApplicationContext context = runApplication(
-        new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, Collections.singletonMap(STORE_NAME, CHANGELOG_TOPIC)), "myApp", configs);
+        new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC,
+            ImmutableSet.of(STORE_NAME), Collections.singletonMap(STORE_NAME, CHANGELOG_TOPIC),
+            Collections.emptySet(), Collections.emptyMap(),
+            Optional.empty(), Optional.empty(), Optional.empty()),
+        "myApp", configs);
 
     // wait for the application to finish
     context.getRunner().waitForFinish();
@@ -162,7 +169,11 @@
 
     // run the application
     RunApplicationContext context = runApplication(
-        new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, Collections.singletonMap(STORE_NAME, changelogTopic)), "myApp", overriddenConfigs);
+        new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC,
+            ImmutableSet.of(STORE_NAME), Collections.singletonMap(STORE_NAME, changelogTopic),
+            Collections.emptySet(), Collections.emptyMap(),
+            Optional.empty(), Optional.empty(), Optional.empty()),
+        "myApp", overriddenConfigs);
 
     // wait for the application to finish
     context.getRunner().waitForFinish();
diff --git a/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java b/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
index 99c54b0..9ab8706 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
@@ -19,11 +19,14 @@
 
 package org.apache.samza.storage;
 
+import com.google.common.collect.ImmutableList;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.samza.application.TaskApplication;
 import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
@@ -34,6 +37,7 @@
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
 import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
@@ -61,15 +65,34 @@
   public static final Logger LOG = LoggerFactory.getLogger(MyStatefulApplication.class);
 
   private static Map<String, List<String>> initialStoreContents = new HashMap<>();
+  private static Map<String, List<String>> initialInMemoryStoreContents = new HashMap<>();
+  private static Map<String, List<String>> initialSideInputStoreContents = new HashMap<>();
   private static boolean crashedOnce = false;
+
   private final String inputSystem;
   private final String inputTopic;
-  private final Map<String, String> storeToChangelog;
+  private final Set<String> storeNames;
+  private final Map<String, String> storeNamesToChangelog;
+  private final Set<String> inMemoryStoreNames;
+  private final Map<String, String> inMemoryStoreNamesToChangelog;
+  private final Optional<String> sideInputStoreName;
+  private final Optional<String> sideInputTopic;
+  private final Optional<SideInputsProcessor> sideInputProcessor;
 
-  public MyStatefulApplication(String inputSystem, String inputTopic, Map<String, String> storeToChangelog) {
+  public MyStatefulApplication(String inputSystem, String inputTopic,
+      Set<String> storeNames, Map<String, String> storeNamesToChangelog,
+      Set<String> inMemoryStoreNames, Map<String, String> inMemoryStoreNamesToChangelog,
+      Optional<String> sideInputStoreName, Optional<String> sideInputTopic,
+      Optional<SideInputsProcessor> sideInputProcessor) {
     this.inputSystem = inputSystem;
     this.inputTopic = inputTopic;
-    this.storeToChangelog = storeToChangelog;
+    this.storeNames = storeNames;
+    this.storeNamesToChangelog = storeNamesToChangelog;
+    this.inMemoryStoreNames = inMemoryStoreNames;
+    this.inMemoryStoreNamesToChangelog = inMemoryStoreNamesToChangelog;
+    this.sideInputStoreName = sideInputStoreName;
+    this.sideInputTopic = sideInputTopic;
+    this.sideInputProcessor = sideInputProcessor;
   }
 
   @Override
@@ -81,18 +104,46 @@
 
     TaskApplicationDescriptor desc = appDescriptor
         .withInputStream(isd)
-        .withTaskFactory((StreamTaskFactory) () -> new MyTask(storeToChangelog.keySet()));
+        .withTaskFactory((StreamTaskFactory) () -> new MyTask(storeNames, inMemoryStoreNames, sideInputStoreName));
 
-    storeToChangelog.forEach((storeName, changelogTopic) -> {
-      RocksDbTableDescriptor<String, String> td = new RocksDbTableDescriptor<>(storeName, serde)
-          .withChangelogStream(changelogTopic)
-          .withChangelogReplicationFactor(1);
+    inMemoryStoreNames.forEach(storeName -> {
+      InMemoryTableDescriptor<String, String> imtd;
+      if (inMemoryStoreNamesToChangelog.containsKey(storeName)) {
+        imtd = new InMemoryTableDescriptor<>(storeName, serde)
+            .withChangelogStream(inMemoryStoreNamesToChangelog.get(storeName));
+      } else {
+        imtd = new InMemoryTableDescriptor<>(storeName, serde);
+      }
+
+      desc.withTable(imtd);
+    });
+
+    storeNames.forEach(storeName -> {
+      RocksDbTableDescriptor<String, String> td;
+      if (storeNamesToChangelog.containsKey(storeName)) {
+        String changelogTopic = storeNamesToChangelog.get(storeName);
+        td = new RocksDbTableDescriptor<>(storeName, serde)
+            .withChangelogStream(changelogTopic)
+            .withChangelogReplicationFactor(1);
+      } else {
+        td = new RocksDbTableDescriptor<>(storeName, serde);
+      }
       desc.withTable(td);
     });
+
+    if (sideInputStoreName.isPresent()) {
+      RocksDbTableDescriptor<String, String> sideInputStoreTd =
+          new RocksDbTableDescriptor<>(sideInputStoreName.get(), serde)
+              .withSideInputs(ImmutableList.of(sideInputTopic.get()))
+              .withSideInputsProcessor(sideInputProcessor.get());
+      desc.withTable(sideInputStoreTd);
+    }
   }
 
   public static void resetTestState() {
     initialStoreContents = new HashMap<>();
+    initialInMemoryStoreContents = new HashMap<>();
+    initialSideInputStoreContents = new HashMap<>();
     crashedOnce = false;
   }
 
@@ -100,27 +151,64 @@
     return initialStoreContents;
   }
 
+  public static Map<String, List<String>> getInitialInMemoryStoreContents() {
+    return initialInMemoryStoreContents;
+  }
+
+  public static Map<String, List<String>> getInitialSideInputStoreContents() {
+    return initialSideInputStoreContents;
+  }
+
   static class MyTask implements StreamTask, InitableTask {
     private final Set<KeyValueStore<String, String>> stores = new HashSet<>();
     private final Set<String> storeNames;
+    private final Set<String> inMemoryStoreNames;
+    private final Optional<String> sideInputStoreName;
 
-    MyTask(Set<String> storeNames) {
+    MyTask(Set<String> storeNames, Set<String> inMemoryStoreNames, Optional<String> sideInputStoreName) {
       this.storeNames = storeNames;
+      this.inMemoryStoreNames = inMemoryStoreNames;
+      this.sideInputStoreName = sideInputStoreName;
     }
 
     @Override
     public void init(Context context) {
       storeNames.forEach(storeName -> {
         KeyValueStore<String, String> store = (KeyValueStore<String, String>) context.getTaskContext().getStore(storeName);
-        stores.add(store);
+        stores.add(store); // any input messages will be written to all 'stores'
         KeyValueIterator<String, String> storeEntries = store.all();
-        List<String> storeInitialChangelog = new ArrayList<>();
+        List<String> storeInitialContents = new ArrayList<>();
         while (storeEntries.hasNext()) {
-          storeInitialChangelog.add(storeEntries.next().getValue());
+          storeInitialContents.add(storeEntries.next().getValue());
         }
-        initialStoreContents.put(storeName, storeInitialChangelog);
+        initialStoreContents.put(storeName, storeInitialContents);
         storeEntries.close();
       });
+
+      inMemoryStoreNames.forEach(storeName -> {
+        KeyValueStore<String, String> store =
+            (KeyValueStore<String, String>) context.getTaskContext().getStore(storeName);
+        stores.add(store); // any input messages will be written to all 'stores'.
+        KeyValueIterator<String, String> storeEntries = store.all();
+        List<String> storeInitialContents = new ArrayList<>();
+        while (storeEntries.hasNext()) {
+          storeInitialContents.add(storeEntries.next().getValue());
+        }
+        initialInMemoryStoreContents.put(storeName, storeInitialContents);
+        storeEntries.close();
+      });
+
+      if (sideInputStoreName.isPresent()) {
+        KeyValueStore<String, String> sideInputStore =
+            (KeyValueStore<String, String>) context.getTaskContext().getStore(sideInputStoreName.get());
+        KeyValueIterator<String, String> sideInputStoreEntries = sideInputStore.all();
+        List<String> sideInputStoreInitialContents = new ArrayList<>();
+        while (sideInputStoreEntries.hasNext()) {
+          sideInputStoreInitialContents.add(sideInputStoreEntries.next().getValue());
+        }
+        initialSideInputStoreContents.put(sideInputStoreName.get(), sideInputStoreInitialContents);
+        sideInputStoreEntries.close();
+      }
     }
 
     @Override
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/BaseStateBackendIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/BaseStateBackendIntegrationTest.java
new file mode 100644
index 0000000..15b4d94
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/BaseStateBackendIntegrationTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.samza.storage.MyStatefulApplication;
+import org.apache.samza.storage.SideInputsProcessor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness;
+import org.apache.samza.util.FileUtil;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BaseStateBackendIntegrationTest extends StreamApplicationIntegrationTestHarness {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseStateBackendIntegrationTest.class);
+
+  public void initialRun(
+      String inputSystem,
+      String inputTopicName,
+      String sideInputTopicName,
+      List<String> inputMessages,
+      List<String> sideInputMessages,
+      Set<String> regularStoreNames,
+      Map<String, String> regularStoreChangelogTopics,
+      Set<String> inMemoryStoreNames,
+      Map<String, String> inMemoryStoreChangelogTopics,
+      String sideInputStoreName,
+      List<String> expectedChangelogMessagesAfterInitialRun,
+      Map<String, String> overriddenConfigs) {
+    // create input topic and produce the first batch of input messages
+    createTopic(inputTopicName, 1);
+    inputMessages.forEach(m -> produceMessage(inputTopicName, 0, m, m));
+
+    // create side input topic and produce the first batch of side input messages
+    createTopic(sideInputTopicName, 1);
+    sideInputMessages.forEach(m -> produceMessage(sideInputTopicName, 0, m, m));
+
+
+    // verify that the input messages were produced successfully
+    if (inputMessages.size() > 0) {
+      List<ConsumerRecord<String, String>> inputRecords =
+          consumeMessages(inputTopicName, inputMessages.size());
+      List<String> readInputMessages = inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
+      Assert.assertEquals(inputMessages, readInputMessages);
+    }
+
+    // verify that the side input messages were produced successfully
+    if (sideInputMessages.size() > 0) {
+      List<ConsumerRecord<String, String>> sideInputRecords =
+          consumeMessages(sideInputTopicName, sideInputMessages.size());
+      List<String> readSideInputMessages = sideInputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
+      Assert.assertEquals(sideInputMessages, readSideInputMessages);
+    }
+
+    // run the application
+    RunApplicationContext context = runApplication(
+        new MyStatefulApplication(inputSystem, inputTopicName,
+            regularStoreNames, regularStoreChangelogTopics,
+            inMemoryStoreNames, inMemoryStoreChangelogTopics,
+            Optional.of(sideInputStoreName), Optional.of(sideInputTopicName), Optional.of(new MySideInputProcessor())),
+        "myApp", overriddenConfigs);
+
+    // wait for the application to finish
+    context.getRunner().waitForFinish();
+
+    // consume and verify the changelog messages
+    HashSet<String> changelogTopics = new HashSet<>(regularStoreChangelogTopics.values());
+    changelogTopics.addAll(inMemoryStoreChangelogTopics.values());
+    changelogTopics.forEach(changelogTopicName -> {
+      if (expectedChangelogMessagesAfterInitialRun.size() > 0) {
+        List<ConsumerRecord<String, String>> changelogRecords =
+            consumeMessages(changelogTopicName, expectedChangelogMessagesAfterInitialRun.size());
+        List<String> changelogMessages = changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
+        Assert.assertEquals(expectedChangelogMessagesAfterInitialRun, changelogMessages);
+      }
+    });
+
+    LOG.info("Finished initial run");
+  }
+
+  public void secondRun(
+      boolean hostAffinity,
+      String loggedStoreBaseDir,
+      String inputSystem,
+      String inputTopicName,
+      String sideInputTopicName,
+      List<String> inputMessages,
+      List<String> sideInputMessages,
+      Set<String> regularStoreNames,
+      Map<String, String> regularStoreChangelogTopics,
+      Set<String> inMemoryStoreNames,
+      Map<String, String> inMemoryStoreChangelogTopics,
+      String sideInputStoreName,
+      List<String> expectedChangelogMessagesAfterSecondRun,
+      List<String> expectedInitialStoreContents,
+      List<String> expectedInitialInMemoryStoreContents,
+      List<String> expectedInitialSideInputStoreContents,
+      Map<String, String> overriddenConfigs) {
+    // clear the local store directory
+    if (!hostAffinity) {
+      new FileUtil().rm(new File(loggedStoreBaseDir));
+    }
+
+    // produce the second batch of input messages
+
+    inputMessages.forEach(m -> produceMessage(inputTopicName, 0, m, m));
+
+    // produce the second batch of side input messages
+    sideInputMessages.forEach(m -> produceMessage(sideInputTopicName, 0, m, m));
+
+    // run the application
+    RunApplicationContext context = runApplication(
+        new MyStatefulApplication(inputSystem, inputTopicName,
+            regularStoreNames, regularStoreChangelogTopics,
+            inMemoryStoreNames, inMemoryStoreChangelogTopics,
+            Optional.of(sideInputStoreName), Optional.of(sideInputTopicName), Optional.of(new MySideInputProcessor())),
+        "myApp", overriddenConfigs);
+
+    // wait for the application to finish
+    context.getRunner().waitForFinish();
+
+    // verify the store contents during startup
+    for (String storeName: regularStoreNames) {
+      Assert.assertEquals(expectedInitialStoreContents,
+          MyStatefulApplication.getInitialStoreContents().get(storeName));
+    }
+
+    // verify the memory store contents during startup
+    for (String storeName: inMemoryStoreNames) {
+      Assert.assertEquals(expectedInitialInMemoryStoreContents,
+          MyStatefulApplication.getInitialInMemoryStoreContents().get(storeName));
+    }
+
+    // verify that the side input store contents during startup include messages up to head of topic
+    Assert.assertEquals(expectedInitialSideInputStoreContents,
+        MyStatefulApplication.getInitialSideInputStoreContents().get(sideInputStoreName));
+
+    // consume and verify any additional changelog messages for stores with changelogs
+    HashSet<String> changelogTopics = new HashSet<>(regularStoreChangelogTopics.values());
+    changelogTopics.addAll(inMemoryStoreChangelogTopics.values());
+    changelogTopics.forEach(changelogTopicName -> {
+      List<ConsumerRecord<String, String>> changelogRecords =
+          consumeMessages(changelogTopicName, expectedChangelogMessagesAfterSecondRun.size());
+      List<String> changelogMessages = changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
+      Assert.assertEquals(expectedChangelogMessagesAfterSecondRun, changelogMessages);
+    });
+  }
+
+  static class MySideInputProcessor implements SideInputsProcessor, Serializable {
+    @Override
+    public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store) {
+      return ImmutableSet.of(new Entry<>(message.getKey(), message.getMessage()));
+    }
+  }
+}
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
new file mode 100644
index 0000000..f654459
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.config.BlobStoreConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.storage.MyStatefulApplication;
+import org.apache.samza.storage.SideInputsProcessor;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.blobstore.Metadata;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.test.util.TestBlobStoreManager;
+import org.apache.samza.util.FileUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(value = Parameterized.class)
+public class BlobStoreStateBackendIntegrationTest extends BaseStateBackendIntegrationTest {
+  @Parameterized.Parameters(name = "hostAffinity={0}")
+  public static Collection<Boolean> data() {
+    return Arrays.asList(true, false);
+  }
+
+  private static final String INPUT_SYSTEM = "kafka";
+  private static final String INPUT_TOPIC = "inputTopic";
+  private static final String SIDE_INPUT_TOPIC = "sideInputTopic";
+
+  private static final String REGULAR_STORE_NAME = "regularStore";
+  private static final String IN_MEMORY_STORE_NAME = "inMemoryStore";
+  private static final String SIDE_INPUT_STORE_NAME = "sideInputStore";
+
+  private static final String IN_MEMORY_STORE_CHANGELOG_TOPIC = "inMemoryStoreChangelog";
+
+  private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
+  private static final String BLOB_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "blob-store").getAbsolutePath();
+  private static final String BLOB_STORE_LEDGER_DIR = new File(BLOB_STORE_BASE_DIR, "ledger").getAbsolutePath();
+
+  private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
+      put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+      put(JobConfig.PROCESSOR_ID, "0");
+      put(TaskConfig.GROUPER_FACTORY, "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+
+      put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2, 1");
+      put(TaskConfig.CHECKPOINT_WRITE_VERSIONS, "1, 2");
+      put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
+      put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
+
+      put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
+      put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are skipped due to in progress commits
+
+      // override store level state backend for in memory stores to use Kafka changelogs
+      put(String.format(StorageConfig.STORE_BACKUP_FACTORIES, IN_MEMORY_STORE_NAME),
+          "org.apache.samza.storage.KafkaChangelogStateBackendFactory");
+      put(String.format(StorageConfig.STORE_RESTORE_FACTORIES, IN_MEMORY_STORE_NAME),
+          "org.apache.samza.storage.KafkaChangelogStateBackendFactory");
+
+      put(StorageConfig.JOB_BACKUP_FACTORIES, "org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory");
+      put(StorageConfig.JOB_RESTORE_FACTORIES, "org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory");
+      put(BlobStoreConfig.BLOB_STORE_MANAGER_FACTORY, "org.apache.samza.test.util.TestBlobStoreManagerFactory");
+
+      put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
+      put(TestBlobStoreManager.BLOB_STORE_BASE_DIR, BLOB_STORE_BASE_DIR);
+      put(TestBlobStoreManager.BLOB_STORE_LEDGER_DIR, BLOB_STORE_LEDGER_DIR);
+    } };
+
+  private final boolean hostAffinity;
+
+  public BlobStoreStateBackendIntegrationTest(boolean hostAffinity) {
+    this.hostAffinity = hostAffinity;
+  }
+
+  @Before
+  @Override
+  public void setUp() {
+    super.setUp();
+    // reset static state shared with task between each parameterized iteration
+    MyStatefulApplication.resetTestState();
+    FileUtil fileUtil = new FileUtil();
+    fileUtil.rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local store on startup
+    // no need to clear ledger dir since subdir of blob store base dir
+    fileUtil.rm(new File(BLOB_STORE_BASE_DIR)); // always clear local "blob store" on startup
+
+  }
+
+  @Test
+  public void testStopAndRestart() {
+    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
+    List<String> sideInputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "4", "5", "6");
+    initialRun(
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesOnInitialRun,
+        sideInputMessagesOnInitialRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        Collections.emptyMap(),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        Collections.emptyList(),
+        CONFIGS);
+
+    Pair<String, SnapshotIndex> lastRegularSnapshot =
+        verifyLedger(REGULAR_STORE_NAME, Optional.empty(), hostAffinity, false, false);
+    Pair<String, SnapshotIndex> lastSideInputSnapshot =
+        verifyLedger(SIDE_INPUT_STORE_NAME, Optional.empty(), hostAffinity, true,
+            false /* no side input offsets file will be present during initial restore */);
+
+    // verifies transactional state too
+    List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", ":shutdown");
+    List<String> sideInputMessagesBeforeSecondRun = Arrays.asList("7", "8", "9");
+    List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
+    // verifies that in-memory stores backed by changelogs work correctly
+    // (requires overriding store level state backends explicitly)
+    List<String> expectedInitialInMemoryStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
+    List<String> expectedInitialSideInputStoreContentsOnSecondRun = new ArrayList<>(sideInputMessagesOnInitialRun);
+    expectedInitialSideInputStoreContentsOnSecondRun.addAll(sideInputMessagesBeforeSecondRun);
+    secondRun(
+        hostAffinity,
+        LOGGED_STORE_BASE_DIR,
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesBeforeSecondRun,
+        sideInputMessagesBeforeSecondRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        Collections.emptyMap(),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        Collections.emptyList(),
+        expectedInitialStoreContentsOnSecondRun,
+        expectedInitialInMemoryStoreContentsOnSecondRun,
+        expectedInitialSideInputStoreContentsOnSecondRun,
+        CONFIGS);
+
+    verifyLedger(REGULAR_STORE_NAME, Optional.of(lastRegularSnapshot), hostAffinity, false, false);
+    verifyLedger(SIDE_INPUT_STORE_NAME, Optional.of(lastSideInputSnapshot), hostAffinity, true, true);
+  }
+
+  /**
+   * Verifies the ledger for TestBlobStoreManager.
+   * @param startingSnapshot snapshot file name and files present in snapshot at the beginning of verification (from previous run), if any.
+   * @return Pair file for latest snapshot at time of verification
+   */
+  private static Pair<String, SnapshotIndex> verifyLedger(String storeName,
+      Optional<Pair<String, SnapshotIndex>> startingSnapshot,
+      boolean hostAffinity, boolean verifySideInputOffsetsUploaded, boolean verifySideInputOffsetsRestored) {
+    Path ledgerLocation = Paths.get(BLOB_STORE_LEDGER_DIR);
+    try {
+      File filesAddedLedger = Paths.get(ledgerLocation.toString(), TestBlobStoreManager.LEDGER_FILES_ADDED).toFile();
+      Set<String> filesAdded = Files.lines(filesAddedLedger.toPath()).filter(l -> l.contains(storeName)).collect(Collectors.toSet());
+      File filesReadLedger = Paths.get(ledgerLocation.toString(), TestBlobStoreManager.LEDGER_FILES_READ).toFile();
+      Set<String> filesRead = Files.lines(filesReadLedger.toPath()).filter(l -> l.contains(storeName)).collect(Collectors.toSet());
+      File filesDeletedLedger = Paths.get(ledgerLocation.toString(), TestBlobStoreManager.LEDGER_FILES_DELETED).toFile();
+      Set<String> filesDeleted = Files.lines(filesDeletedLedger.toPath()).filter(l -> l.contains(storeName)).collect(Collectors.toSet());
+      File filesTTLUpdatedLedger = Paths.get(ledgerLocation.toString(), TestBlobStoreManager.LEDGER_FILES_TTL_UPDATED).toFile();
+      Set<String> filesTTLUpdated = Files.lines(filesTTLUpdatedLedger.toPath()).filter(l -> l.contains(storeName)).collect(Collectors.toSet());
+
+      // 1. test that files read = files present in last snapshot *at run start* + snapshot file itself + previous snapshot files
+      if (startingSnapshot.isPresent() && !hostAffinity) { // no restore if host affinity (local state already present)
+        Set<String> filesPresentInStartingSnapshot = startingSnapshot.get().getRight()
+            .getDirIndex().getFilesPresent().stream()
+            .map(fi -> fi.getBlobs().get(0).getBlobId()).collect(Collectors.toSet());
+        Set<String> filesToRestore = new HashSet<>();
+        filesToRestore.add(startingSnapshot.get().getLeft());
+        filesToRestore.addAll(filesPresentInStartingSnapshot);
+        // assert that all files to restore in starting snapshot + starting snapshot itself are present in files read
+        assertTrue(Sets.difference(filesToRestore, filesRead).isEmpty());
+        // assert that the remaining read files are all snapshot indexes (for post commit cleanup)
+        assertTrue(Sets.difference(filesRead, filesToRestore).stream().allMatch(s -> s.contains(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH)));
+      }
+
+      // read files added again as ordered list, not set, to get last file added
+      List<String> filesAddedLines = Files.readAllLines(filesAddedLedger.toPath()).stream().filter(l -> l.contains(storeName)).collect(Collectors.toList());
+      String lastFileAdded = filesAddedLines.get(filesAddedLines.size() - 1); // get last line.
+      assertTrue(lastFileAdded.contains(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH)); // assert is a snapshot
+      SnapshotIndex lastSnapshotIndex = new SnapshotIndexSerde().fromBytes(Files.readAllBytes(Paths.get(lastFileAdded)));
+      Set<String> filesPresentInLastSnapshot = lastSnapshotIndex.getDirIndex().getFilesPresent().stream()
+          .map(fi -> fi.getBlobs().get(0).getBlobId()).collect(Collectors.toSet());
+
+      // 2. test that all added files were ttl reset
+      assertEquals(filesAdded, filesTTLUpdated);
+
+      // 3. test that files deleted = files added - files present in last snapshot + snapshot file itself
+      // i.e., net remaining files (files added - files deleted) = files present in last snapshot + snapshot file itself.
+      assertEquals(Sets.difference(filesAdded, filesDeleted),
+          Sets.union(filesPresentInLastSnapshot, Collections.singleton(lastFileAdded)));
+
+      // 4. test that the files restored/added for side input stores contains side input offsets file
+      if (verifySideInputOffsetsUploaded) {
+        assertTrue(filesAdded.stream().anyMatch(f -> f.contains(StorageManagerUtil.SIDE_INPUT_OFFSET_FILE_NAME_LEGACY)));
+      }
+
+      if (!hostAffinity && verifySideInputOffsetsRestored) { // only read / restored if no host affinity
+        assertTrue(filesRead.stream().anyMatch(f -> f.contains(StorageManagerUtil.SIDE_INPUT_OFFSET_FILE_NAME_LEGACY)));
+      }
+
+      return Pair.of(lastFileAdded, lastSnapshotIndex);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  static class MySideInputProcessor implements SideInputsProcessor, Serializable {
+    @Override
+    public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store) {
+      return ImmutableSet.of(new Entry<>(message.getKey(), message.getMessage()));
+    }
+  }
+}
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java
new file mode 100644
index 0000000..66ef3eb
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.storage.MyStatefulApplication;
+import org.apache.samza.util.FileUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+
+@RunWith(value = Parameterized.class)
+public class KafkaNonTransactionalStateIntegrationTest extends BaseStateBackendIntegrationTest {
+  @Parameterized.Parameters(name = "hostAffinity={0}")
+  public static Collection<Boolean> data() {
+    return Arrays.asList(true, false);
+  }
+
+  private static final String INPUT_SYSTEM = "kafka";
+  private static final String INPUT_TOPIC = "inputTopic";
+  private static final String SIDE_INPUT_TOPIC = "sideInputTopic";
+
+  private static final String REGULAR_STORE_NAME = "regularStore";
+  private static final String REGULAR_STORE_CHANGELOG_TOPIC = "changelog";
+  private static final String IN_MEMORY_STORE_NAME = "inMemoryStore";
+  private static final String IN_MEMORY_STORE_CHANGELOG_TOPIC = "inMemoryStoreChangelog";
+  private static final String SIDE_INPUT_STORE_NAME = "sideInputStore";
+
+  private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
+
+  private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
+      put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+      put(JobConfig.PROCESSOR_ID, "0");
+      put(TaskConfig.GROUPER_FACTORY, "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+
+      put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
+      put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
+
+      put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
+      put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are skipped due to in progress commits
+
+      put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "false");
+      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "false");
+      put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
+    } };
+
+  private final boolean hostAffinity;
+
+  public KafkaNonTransactionalStateIntegrationTest(boolean hostAffinity) {
+    this.hostAffinity = hostAffinity;
+  }
+
+  @Before
+  @Override
+  public void setUp() {
+    super.setUp();
+    // reset static state shared with task between each parameterized iteration
+    MyStatefulApplication.resetTestState();
+    new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local store on startup
+  }
+
+  @Test
+  public void testStopAndRestart() {
+    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
+    List<String> sideInputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "4", "5", "6");
+    List<String> expectedChangelogMessagesAfterInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
+    initialRun(
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesOnInitialRun,
+        sideInputMessagesOnInitialRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        expectedChangelogMessagesAfterInitialRun,
+        CONFIGS);
+
+    // first two are reverts for uncommitted messages from last run for keys 98 and 99
+    List<String> expectedChangelogMessagesAfterSecondRun =
+        Arrays.asList("98", "99", "4", "5", "5");
+    List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", ":shutdown");
+    List<String> sideInputMessagesBeforeSecondRun = Arrays.asList("7", "8", "9");
+    List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3", "98", "99");
+    List<String> expectedInitialInMemoryStoreContentsOnSecondRun = Arrays.asList("1", "2", "3", "98", "99");
+    List<String> expectedInitialSideInputStoreContentsOnSecondRun = new ArrayList<>(sideInputMessagesOnInitialRun);
+    expectedInitialSideInputStoreContentsOnSecondRun.addAll(sideInputMessagesBeforeSecondRun);
+    secondRun(
+        hostAffinity,
+        LOGGED_STORE_BASE_DIR,
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesBeforeSecondRun,
+        sideInputMessagesBeforeSecondRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        expectedChangelogMessagesAfterSecondRun,
+        expectedInitialStoreContentsOnSecondRun,
+        expectedInitialInMemoryStoreContentsOnSecondRun,
+        expectedInitialSideInputStoreContentsOnSecondRun,
+        CONFIGS);
+  }
+}
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java
new file mode 100644
index 0000000..8bec243
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.storage.MyStatefulApplication;
+import org.apache.samza.util.FileUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+
+@RunWith(value = Parameterized.class)
+public class KafkaTransactionalStateIntegrationTest extends BaseStateBackendIntegrationTest {
+  @Parameterized.Parameters(name = "hostAffinity={0}")
+  public static Collection<Boolean> data() {
+    return Arrays.asList(true, false);
+  }
+
+  private static final String INPUT_SYSTEM = "kafka";
+  private static final String INPUT_TOPIC = "inputTopic";
+  private static final String SIDE_INPUT_TOPIC = "sideInputTopic";
+
+  private static final String REGULAR_STORE_NAME = "regularStore";
+  private static final String REGULAR_STORE_CHANGELOG_TOPIC = "changelog";
+  private static final String IN_MEMORY_STORE_NAME = "inMemoryStore";
+  private static final String IN_MEMORY_STORE_CHANGELOG_TOPIC = "inMemoryStoreChangelog";
+  private static final String SIDE_INPUT_STORE_NAME = "sideInputStore";
+
+  private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
+
+  private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
+      put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+      put(JobConfig.PROCESSOR_ID, "0");
+      put(TaskConfig.GROUPER_FACTORY, "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+
+      put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
+      put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
+
+      put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
+      put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are skipped due to in progress commits
+
+      put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
+      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
+      put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
+    } };
+
+  private final boolean hostAffinity;
+
+  public KafkaTransactionalStateIntegrationTest(boolean hostAffinity) {
+    this.hostAffinity = hostAffinity;
+  }
+
+  @Before
+  @Override
+  public void setUp() {
+    super.setUp();
+    // reset static state shared with task between each parameterized iteration
+    MyStatefulApplication.resetTestState();
+    new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local store on startup
+  }
+
+  @Test
+  public void testStopAndRestart() {
+    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
+    List<String> sideInputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "4", "5", "6");
+    List<String> expectedChangelogMessagesAfterInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
+    initialRun(
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesOnInitialRun,
+        sideInputMessagesOnInitialRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        expectedChangelogMessagesAfterInitialRun,
+        CONFIGS);
+
+    // first two are reverts for uncommitted messages from last run for keys 98 and 99
+    List<String> expectedChangelogMessagesAfterSecondRun =
+        Arrays.asList(null, null, "98", "99", "4", "5", "5");
+    List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", ":shutdown");
+    List<String> sideInputMessagesBeforeSecondRun = Arrays.asList("7", "8", "9");
+    List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
+    List<String> expectedInitialInMemoryStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
+    List<String> expectedInitialSideInputStoreContentsOnSecondRun = new ArrayList<>(sideInputMessagesOnInitialRun);
+    expectedInitialSideInputStoreContentsOnSecondRun.addAll(sideInputMessagesBeforeSecondRun);
+    secondRun(
+        hostAffinity,
+        LOGGED_STORE_BASE_DIR,
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesBeforeSecondRun,
+        sideInputMessagesBeforeSecondRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        expectedChangelogMessagesAfterSecondRun,
+        expectedInitialStoreContentsOnSecondRun,
+        expectedInitialInMemoryStoreContentsOnSecondRun,
+        expectedInitialSideInputStoreContentsOnSecondRun,
+        CONFIGS);
+  }
+
+  @Test
+  public void testWithEmptyChangelogFromInitialRun() {
+    // expected changelog messages will always match since we'll read 0 messages
+    initialRun(
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        ImmutableList.of("crash_once"),
+        Collections.emptyList(),
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        Collections.emptyList(),
+        CONFIGS);
+
+    List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", ":shutdown");
+    List<String> expectedChangelogMessagesAfterSecondRun = Arrays.asList("4", "5", "5");
+    secondRun(hostAffinity,
+        LOGGED_STORE_BASE_DIR,
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesBeforeSecondRun,
+        Collections.emptyList(),
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        expectedChangelogMessagesAfterSecondRun,
+        Collections.emptyList(),
+        Collections.emptyList(),
+        Collections.emptyList(),
+        CONFIGS);
+  }
+
+  @Test
+  public void testWithNewChangelogAfterInitialRun() {
+    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
+    List<String> expectedChangelogMessagesAfterInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
+    initialRun(
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesOnInitialRun,
+        Collections.emptyList(),
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        expectedChangelogMessagesAfterInitialRun,
+        CONFIGS);
+
+    // admin client delete topic doesn't seem to work, times out up to 60 seconds.
+    // simulate delete topic by changing the changelog topic instead.
+    String newChangelogTopic = "changelog2";
+    String newInMemoryStoreChangelogTopic = "inMemChangelog2";
+    List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", ":shutdown");
+    List<String> expectedChangelogMessagesAfterSecondRun = Arrays.asList("98", "99", "4", "5", "5");
+    secondRun(hostAffinity,
+        LOGGED_STORE_BASE_DIR,
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesBeforeSecondRun,
+        Collections.emptyList(),
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, newChangelogTopic),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, newInMemoryStoreChangelogTopic),
+        SIDE_INPUT_STORE_NAME,
+        expectedChangelogMessagesAfterSecondRun,
+        Collections.emptyList(),
+        Collections.emptyList(),
+        Collections.emptyList(),
+        CONFIGS);
+  }
+}
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
deleted file mode 100644
index 771f93d..0000000
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-import com.google.common.collect.ImmutableList;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.KafkaConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.storage.MyStatefulApplication;
-import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness;
-import org.apache.samza.util.FileUtil;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-@RunWith(value = Parameterized.class)
-public class TransactionalStateIntegrationTest extends StreamApplicationIntegrationTestHarness {
-  @Parameterized.Parameters(name = "hostAffinity={0}")
-  public static Collection<Boolean> data() {
-    return Arrays.asList(true, false);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(TransactionalStateIntegrationTest.class);
-
-  private static final String INPUT_TOPIC = "inputTopic";
-  private static final String INPUT_SYSTEM = "kafka";
-  private static final String STORE_NAME = "store";
-  private static final String CHANGELOG_TOPIC = "changelog";
-  private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
-  private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
-      put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
-      put(JobConfig.PROCESSOR_ID, "0");
-      put(TaskConfig.GROUPER_FACTORY, "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
-      put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
-      put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
-      put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
-      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
-      put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
-      put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
-      put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are skipped due to in progress commits
-    } };
-
-  private final boolean hostAffinity;
-
-  public TransactionalStateIntegrationTest(boolean hostAffinity) {
-    this.hostAffinity = hostAffinity;
-  }
-
-  @Before
-  @Override
-  public void setUp() {
-    super.setUp();
-    // reset static state shared with task between each parameterized iteration
-    MyStatefulApplication.resetTestState();
-    new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local store on startup
-  }
-
-  @Test
-  public void testStopAndRestart() {
-    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
-    // double check collectors.flush
-    List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
-    initialRun(inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun);
-
-    // first two are reverts for uncommitted messages from last run for keys 98 and 99
-    List<String> expectedChangelogMessagesAfterSecondRun =
-        Arrays.asList(null, null, "98", "99", "4", "5", "5");
-    List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
-    secondRun(CHANGELOG_TOPIC,
-        expectedChangelogMessagesAfterSecondRun, expectedInitialStoreContentsOnSecondRun, CONFIGS);
-  }
-
-  @Test
-  public void testWithEmptyChangelogFromInitialRun() {
-    // expected changelog messages will always match since we'll read 0 messages
-    initialRun(ImmutableList.of("crash_once"), Collections.emptyList());
-    secondRun(CHANGELOG_TOPIC, ImmutableList.of("4", "5", "5"), Collections.emptyList(), CONFIGS);
-  }
-
-  @Test
-  public void testWithNewChangelogAfterInitialRun() {
-    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
-    List<String> expectedChangelogMessagesAfterInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
-    initialRun(inputMessagesOnInitialRun, expectedChangelogMessagesAfterInitialRun);
-
-    // admin client delete topic doesn't seem to work, times out up to 60 seconds.
-    // simulate delete topic by changing the changelog topic instead.
-    String newChangelogTopic = "changelog2";
-    LOG.info("Changing changelog topic to: {}", newChangelogTopic);
-    secondRun(newChangelogTopic, ImmutableList.of("98", "99", "4", "5", "5"), Collections.emptyList(), CONFIGS);
-  }
-
-  private void initialRun(List<String> inputMessages, List<String> expectedChangelogMessages) {
-    // create input topic and produce the first batch of input messages
-    createTopic(INPUT_TOPIC, 1);
-    inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
-
-    // verify that the input messages were produced successfully
-    if (inputMessages.size() > 0) {
-      List<ConsumerRecord<String, String>> inputRecords =
-          consumeMessages(INPUT_TOPIC, inputMessages.size());
-      List<String> readInputMessages = inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
-      Assert.assertEquals(inputMessages, readInputMessages);
-    }
-
-    // run the application
-    RunApplicationContext context = runApplication(
-        new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, Collections.singletonMap(STORE_NAME, CHANGELOG_TOPIC)),
-        "myApp", CONFIGS);
-
-    // wait for the application to finish
-    context.getRunner().waitForFinish();
-
-    // consume and verify the changelog messages
-    if (expectedChangelogMessages.size() > 0) {
-      List<ConsumerRecord<String, String>> changelogRecords =
-          consumeMessages(CHANGELOG_TOPIC, expectedChangelogMessages.size());
-      List<String> changelogMessages = changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
-      Assert.assertEquals(expectedChangelogMessages, changelogMessages);
-    }
-
-    LOG.info("Finished initial run");
-  }
-
-  private void secondRun(String changelogTopic, List<String> expectedChangelogMessages,
-      List<String> expectedInitialStoreContents, Map<String, String> overriddenConfigs) {
-    // clear the local store directory
-    if (!hostAffinity) {
-      new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR));
-    }
-
-    // produce the second batch of input messages
-
-    List<String> inputMessages = Arrays.asList("4", "5", "5", ":shutdown");
-    inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
-
-    // run the application
-    RunApplicationContext context = runApplication(
-        new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, Collections.singletonMap(STORE_NAME, changelogTopic)),
-        "myApp", overriddenConfigs);
-
-    // wait for the application to finish
-    context.getRunner().waitForFinish();
-
-    // consume and verify any additional changelog messages
-    List<ConsumerRecord<String, String>> changelogRecords =
-        consumeMessages(changelogTopic, expectedChangelogMessages.size());
-    List<String> changelogMessages = changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
-    Assert.assertEquals(expectedChangelogMessages, changelogMessages);
-
-    // verify the store contents during startup (this is after changelog verification to ensure init has completed)
-    Assert.assertEquals(expectedInitialStoreContents, MyStatefulApplication.getInitialStoreContents().get(STORE_NAME));
-  }
-}
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
deleted file mode 100644
index 765bd45..0000000
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-import com.google.common.collect.ImmutableList;
-
-import com.google.common.collect.ImmutableMap;
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.samza.application.SamzaApplication;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.KafkaConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.storage.MyStatefulApplication;
-import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness;
-import org.apache.samza.util.FileUtil;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Runs the same tests as {@link TransactionalStateIntegrationTest}, except with an additional
- * unused store with a changelog. Ensures that the stores are isolated, e.g. if the checkpointed
- * changelog offset for one is null (no data).
- */
-@RunWith(value = Parameterized.class)
-public class TransactionalStateMultiStoreIntegrationTest extends StreamApplicationIntegrationTestHarness {
-  @Parameterized.Parameters(name = "hostAffinity={0}")
-  public static Collection<Boolean> data() {
-    return Arrays.asList(true, false);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(TransactionalStateMultiStoreIntegrationTest.class);
-
-  private static final String INPUT_TOPIC = "inputTopic";
-  private static final String INPUT_SYSTEM = "kafka";
-  private static final String STORE_1_NAME = "store1";
-  private static final String STORE_2_NAME = "store2";
-  private static final String STORE_1_CHANGELOG = "changelog1";
-  private static final String STORE_2_CHANGELOG = "changelog2";
-  private static final String APP_NAME = "myApp";
-  private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
-  private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
-      put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
-      put(JobConfig.PROCESSOR_ID, "0");
-      put(TaskConfig.GROUPER_FACTORY, "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
-      put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
-      put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
-      put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
-      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
-      put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
-      put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
-      put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are skipped due to in progress commits
-    } };
-
-  private final boolean hostAffinity;
-
-  public TransactionalStateMultiStoreIntegrationTest(boolean hostAffinity) {
-    this.hostAffinity = hostAffinity;
-  }
-
-  @Before
-  @Override
-  public void setUp() {
-    super.setUp();
-    // reset static state shared with task between each parameterized iteration
-    MyStatefulApplication.resetTestState();
-    new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local store on startup
-  }
-
-  @Test
-  public void testStopAndRestart() {
-    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
-    List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
-    initialRun(inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun);
-
-    // first two are reverts for uncommitted messages from last run for keys 98 and 99
-    List<String> expectedChangelogMessagesOnSecondRun =
-        Arrays.asList(null, null, "98", "99", "4", "5", "5");
-    List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
-    secondRun(STORE_1_CHANGELOG,
-        expectedChangelogMessagesOnSecondRun, expectedInitialStoreContentsOnSecondRun);
-  }
-
-  @Test
-  public void testWithEmptyChangelogFromInitialRun() {
-    // expected changelog messages will always match since we'll read 0 messages
-    initialRun(ImmutableList.of("crash_once"), Collections.emptyList());
-    secondRun(STORE_1_CHANGELOG, ImmutableList.of("4", "5", "5"), Collections.emptyList());
-  }
-
-  @Test
-  public void testWithNewChangelogAfterInitialRun() {
-    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", "-97", ":98", ":99", ":crash_once");
-    List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", "97", null, "98", "99");
-    initialRun(inputMessagesOnInitialRun, expectedChangelogMessagesOnInitialRun);
-
-    // admin client delete topic doesn't seem to work, times out up to 60 seconds.
-    // simulate delete topic by changing the changelog topic instead.
-    String newChangelogTopic = "changelog3";
-    LOG.info("Changing changelog topic to: {}", newChangelogTopic);
-    secondRun(newChangelogTopic, ImmutableList.of("98", "99", "4", "5", "5"), Collections.emptyList());
-  }
-
-  private void initialRun(List<String> inputMessages, List<String> expectedChangelogMessages) {
-    // create input topic and produce the first batch of input messages
-    createTopic(INPUT_TOPIC, 1);
-    inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
-
-    // verify that the input messages were produced successfully
-    if (inputMessages.size() > 0) {
-      List<ConsumerRecord<String, String>> inputRecords =
-          consumeMessages(INPUT_TOPIC, inputMessages.size());
-      List<String> readInputMessages = inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
-      Assert.assertEquals(inputMessages, readInputMessages);
-    }
-
-    SamzaApplication app =  new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, ImmutableMap.of(
-        STORE_1_NAME, STORE_1_CHANGELOG,
-        STORE_2_NAME, STORE_2_CHANGELOG
-    ));
-
-    // run the application
-    RunApplicationContext context = runApplication(app, APP_NAME, CONFIGS);
-
-
-    // consume and verify the changelog messages
-    if (expectedChangelogMessages.size() > 0) {
-      List<ConsumerRecord<String, String>> changelogRecords =
-          consumeMessages(STORE_1_CHANGELOG, expectedChangelogMessages.size());
-      List<String> changelogMessages = changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
-      Assert.assertEquals(expectedChangelogMessages, changelogMessages);
-    }
-
-    // wait for the application to finish
-    context.getRunner().waitForFinish();
-    LOG.info("Finished initial run");
-  }
-
-  private void secondRun(String changelogTopic, List<String> expectedChangelogMessages,
-      List<String> expectedInitialStoreContents) {
-    // clear the local store directory
-    if (!hostAffinity) {
-      new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR));
-    }
-
-    // produce the second batch of input messages
-
-    List<String> inputMessages = Arrays.asList("4", "5", "5", ":shutdown");
-    inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
-
-    SamzaApplication app =  new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, ImmutableMap.of(
-        STORE_1_NAME, changelogTopic,
-        STORE_2_NAME, STORE_2_CHANGELOG
-    ));
-    // run the application
-    RunApplicationContext context = runApplication(app, APP_NAME, CONFIGS);
-
-    // wait for the application to finish
-    context.getRunner().waitForFinish();
-
-    // consume and verify any additional changelog messages
-    List<ConsumerRecord<String, String>> changelogRecords =
-        consumeMessages(changelogTopic, expectedChangelogMessages.size());
-    List<String> changelogMessages = changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
-    Assert.assertEquals(expectedChangelogMessages, changelogMessages);
-
-    // verify the store contents during startup (this is after changelog verification to ensure init has completed)
-    Assert.assertEquals(expectedInitialStoreContents, MyStatefulApplication.getInitialStoreContents().get(STORE_1_NAME));
-  }
-}
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java
new file mode 100644
index 0000000..d247c10
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.storage.blobstore.BlobStoreManager;
+import org.apache.samza.storage.blobstore.Metadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBlobStoreManager implements BlobStoreManager {
+  private static final Logger LOG = LoggerFactory.getLogger(TestBlobStoreManager.class);
+  public static final String BLOB_STORE_BASE_DIR = "blob.store.base.dir";
+  public static final String BLOB_STORE_LEDGER_DIR = "blob.store.ledger.dir";
+  public static final String LEDGER_FILES_ADDED = "filesAdded";
+  public static final String LEDGER_FILES_READ = "filesRead";
+  public static final String LEDGER_FILES_DELETED = "filesRemoved";
+  public static final String LEDGER_FILES_TTL_UPDATED = "filesTTLUpdated";
+
+  private final Path stateLocation;
+  private final File filesAddedLedger;
+  private final File filesReadLedger;
+  private final File filesDeletedLedger;
+  private final File filesTTLUpdatedLedger;
+
+  public TestBlobStoreManager(Config config, ExecutorService executorService) {
+    this.stateLocation = Paths.get(config.get(BLOB_STORE_BASE_DIR));
+    Path ledgerLocation = Paths.get(config.get(BLOB_STORE_LEDGER_DIR));
+    try {
+      if (Files.notExists(ledgerLocation)) {
+        Files.createDirectories(ledgerLocation);
+      }
+
+      filesAddedLedger = Paths.get(ledgerLocation.toString(), LEDGER_FILES_ADDED).toFile();
+      filesReadLedger = Paths.get(ledgerLocation.toString(), LEDGER_FILES_READ).toFile();
+      filesDeletedLedger = Paths.get(ledgerLocation.toString(), LEDGER_FILES_DELETED).toFile();
+      filesTTLUpdatedLedger = Paths.get(ledgerLocation.toString(), LEDGER_FILES_TTL_UPDATED).toFile();
+
+      FileUtils.touch(filesAddedLedger);
+      FileUtils.touch(filesReadLedger);
+      FileUtils.touch(filesDeletedLedger);
+      FileUtils.touch(filesTTLUpdatedLedger);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void init() {
+  }
+
+  @Override
+  public CompletionStage<String> put(InputStream inputStream, Metadata metadata) {
+    String payloadPath = metadata.getPayloadPath();
+    String suffix;
+    if (payloadPath.equals(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH)) {
+      // include (fake checkpoint ID as an) unique ID in path for snapshot index blobs to avoid overwriting
+      suffix = payloadPath + "-" + CheckpointId.create();
+    } else {
+      String[] parts = payloadPath.split("/");
+      String checkpointId = parts[parts.length - 2];
+      String fileName = parts[parts.length - 1];
+      suffix = checkpointId + "/" + fileName;
+    }
+
+    Path destination = Paths.get(stateLocation.toString(), metadata.getJobName(), metadata.getJobId(),
+        metadata.getTaskName(), metadata.getStoreName(), suffix);
+    LOG.info("Creating file at {}", destination);
+    try {
+      FileUtils.writeStringToFile(filesAddedLedger, destination + "\n", Charset.defaultCharset(), true);
+      FileUtils.copyInputStreamToFile(inputStream, destination.toFile());
+    } catch (IOException e) {
+      throw new RuntimeException("Error creating file " + destination, e);
+    }
+    return CompletableFuture.completedFuture(destination.toString());
+  }
+
+  @Override
+  public CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata) {
+    LOG.info("Reading file at {}", id);
+    try {
+      FileUtils.writeStringToFile(filesReadLedger, id + "\n", Charset.defaultCharset(), true);
+      Path path = Paths.get(id);
+      Files.copy(path, outputStream);
+      outputStream.flush();
+    } catch (IOException e) {
+      throw new RuntimeException("Error reading file for id " + id, e);
+    }
+    return CompletableFuture.completedFuture(null);
+  }
+
+  @Override
+  public CompletionStage<Void> delete(String id, Metadata metadata) {
+    LOG.info("Deleting file at {}", id);
+    try {
+      FileUtils.writeStringToFile(filesDeletedLedger, id + "\n", Charset.defaultCharset(), true);
+      Files.delete(Paths.get(id));
+    } catch (IOException e) {
+      throw new RuntimeException("Error deleting file for id " + id, e);
+    }
+    return CompletableFuture.completedFuture(null);
+  }
+
+  @Override
+  public CompletionStage<Void> removeTTL(String blobId, Metadata metadata) {
+    LOG.info("Removing TTL (no-op) for file at {}", blobId);
+    try {
+      FileUtils.writeStringToFile(filesTTLUpdatedLedger, blobId + "\n", Charset.defaultCharset(), true);
+    } catch (IOException e) {
+      throw new RuntimeException("Error updating ttl for id " + blobId, e);
+    }
+
+    return CompletableFuture.completedFuture(null);
+  }
+
+  @Override
+  public void close() {
+  }
+}
diff --git a/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManagerFactory.java b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManagerFactory.java
new file mode 100644
index 0000000..449f1c1
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManagerFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.util;
+
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.config.Config;
+import org.apache.samza.storage.blobstore.BlobStoreManager;
+import org.apache.samza.storage.blobstore.BlobStoreManagerFactory;
+
+public class TestBlobStoreManagerFactory implements BlobStoreManagerFactory {
+  @Override
+  public BlobStoreManager getBackupBlobStoreManager(Config config, ExecutorService backupExecutor) {
+    return new TestBlobStoreManager(config, backupExecutor);
+  }
+
+  @Override
+  public BlobStoreManager getRestoreBlobStoreManager(Config config, ExecutorService restoreExecutor) {
+    return new TestBlobStoreManager(config, restoreExecutor);
+  }
+}