Merge pull request #1512 from dxichen/checkpoint-restore-precendence-cleanup

Merging misc. incremental fixes related to blob store state backends and async commit.
diff --git a/samza-api/src/main/java/org/apache/samza/serializers/JsonCheckpointIdMixin.java b/samza-api/src/main/java/org/apache/samza/serializers/JsonCheckpointIdMixin.java
deleted file mode 100644
index 752e329..0000000
--- a/samza-api/src/main/java/org/apache/samza/serializers/JsonCheckpointIdMixin.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers;
-
-import org.apache.samza.checkpoint.CheckpointId;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-
-/**
- * A mix-in Jackson class to convert {@link CheckpointId} to/from JSON.
- */
-public abstract class JsonCheckpointIdMixin {
-  @JsonCreator
-  private JsonCheckpointIdMixin(
-      @JsonProperty("millis") long millis,
-      @JsonProperty("nanos") long nanos) {
-  }
-
-  @JsonProperty("millis")
-  abstract long getMillis();
-
-  @JsonProperty("nanos")
-  abstract long getNanos();
-}
diff --git a/samza-api/src/main/java/org/apache/samza/storage/KafkaChangelogRestoreParams.java b/samza-api/src/main/java/org/apache/samza/storage/KafkaChangelogRestoreParams.java
index 961a85b..ed4601a 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/KafkaChangelogRestoreParams.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/KafkaChangelogRestoreParams.java
@@ -20,7 +20,6 @@
 package org.apache.samza.storage;
 
 import java.util.Map;
-import java.util.Set;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemConsumer;
@@ -36,7 +35,6 @@
   private final Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories;
   private final Map<String, Serde<Object>> serdes;
   private final MessageCollector collector;
-  private final Set<String> storeNames;
 
   public KafkaChangelogRestoreParams(
       Map<String, SystemConsumer> storeConsumers,
@@ -44,15 +42,13 @@
       Map<String, SystemAdmin> systemAdmins,
       Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
       Map<String, Serde<Object>> serdes,
-      MessageCollector collector,
-      Set<String> storeNames) {
+      MessageCollector collector) {
     this.storeConsumers = storeConsumers;
     this.inMemoryStores = inMemoryStores;
     this.systemAdmins = systemAdmins;
     this.storageEngineFactories = storageEngineFactories;
     this.serdes = serdes;
     this.collector = collector;
-    this.storeNames = storeNames;
   }
 
   public Map<String, SystemConsumer> getStoreConsumers() {
@@ -78,8 +74,4 @@
   public MessageCollector getCollector() {
     return collector;
   }
-
-  public Set<String> getStoreNames() {
-    return storeNames;
-  }
 }
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java b/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
index f51d414..91f3df3 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
@@ -20,6 +20,7 @@
 package org.apache.samza.storage;
 
 import java.io.File;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import org.apache.samza.config.Config;
 import org.apache.samza.context.ContainerContext;
@@ -51,6 +52,7 @@
       TaskModel taskModel,
       ExecutorService restoreExecutor,
       MetricsRegistry metricsRegistry,
+      Set<String> storesToRestore,
       Config config,
       Clock clock,
       File loggedStoreBaseDir,
diff --git a/samza-api/src/main/java/org/apache/samza/storage/TaskRestoreManager.java b/samza-api/src/main/java/org/apache/samza/storage/TaskRestoreManager.java
index 999325e..1f5a5ce 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/TaskRestoreManager.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/TaskRestoreManager.java
@@ -47,7 +47,7 @@
   void restore() throws InterruptedException;
 
   /**
-   * Closes all initiated ressources include storage engines
+   * Closes all initiated resources include storage engines
    */
   void close();
 }
diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index 7535d65..4367244 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -72,10 +72,12 @@
       "org.apache.samza.storage.KafkaChangelogStateBackendFactory";
   public static final List<String> DEFAULT_BACKUP_FACTORIES = ImmutableList.of(
       KAFKA_STATE_BACKEND_FACTORY);
+  public static final String JOB_BACKUP_FACTORIES = STORE_PREFIX + "backup.factories";
   public static final String STORE_BACKUP_FACTORIES = STORE_PREFIX + "%s.backup.factories";
-  // TODO BLOCKER dchen make this per store
-  public static final String RESTORE_FACTORY_SUFFIX = "restore.factory";
-  public static final String STORE_RESTORE_FACTORY = STORE_PREFIX + RESTORE_FACTORY_SUFFIX;
+  public static final String RESTORE_FACTORIES_SUFFIX = "restore.factories";
+  public static final String STORE_RESTORE_FACTORIES = STORE_PREFIX + "%s." + RESTORE_FACTORIES_SUFFIX;
+  public static final String JOB_RESTORE_FACTORIES = STORE_PREFIX + RESTORE_FACTORIES_SUFFIX;
+  public static final List<String> DEFAULT_RESTORE_FACTORIES = ImmutableList.of(KAFKA_STATE_BACKEND_FACTORY);
 
   static final String CHANGELOG_SYSTEM = "job.changelog.system";
   static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX + "%s.changelog.delete.retention.ms";
@@ -101,8 +103,7 @@
     for (String key : subConfig.keySet()) {
       if (key.endsWith(SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX)) {
         storeNames.add(key.substring(0, key.length() - SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX.length()));
-      } else if (key.endsWith(FACTORY_SUFFIX) && !key.equals(RESTORE_FACTORY_SUFFIX)) {
-        // TODO HIGH dchen STORE_RESTORE_FACTORY added here to be ignored. Update/remove after changing restore factory -> factories
+      } else if (key.endsWith(FACTORY_SUFFIX)) {
         storeNames.add(key.substring(0, key.length() - FACTORY_SUFFIX.length()));
       }
     }
@@ -290,42 +291,103 @@
         .count();
   }
 
-  public List<String> getStoreBackupFactory(String storeName) {
-    List<String> storeBackupManagers = getList(String.format(STORE_BACKUP_FACTORIES, storeName), new ArrayList<>());
-    // For backwards compatibility if the changelog is enabled, we use default kafka backup factory
-    if (storeBackupManagers.isEmpty() && getChangelogStream(storeName).isPresent()) {
-      storeBackupManagers = DEFAULT_BACKUP_FACTORIES;
+  private List<String> getJobStoreBackupFactories() {
+    return getList(JOB_BACKUP_FACTORIES, new ArrayList<>());
+  }
+
+  /**
+   * Backup state backend factory follows the precedence:
+   *
+   * 1. If stores.store-name.backup.factories config key exists the store-name, that value is used
+   * 2. If stores.backup.factories is set for the job, that value is used
+   * 3. If stores.store-name.changelog is set for store-name, the default Kafka changelog state backend factory
+   * 4. Otherwise no backup factories will be configured for the store
+   *
+   * Note: that 2 takes precedence over 3 enables job based migration off of Changelog restores
+   * @return List of backup factories for the store in order of backup precedence
+   */
+  public List<String> getStoreBackupFactories(String storeName) {
+    List<String> storeBackupManagers;
+    if (containsKey(String.format(STORE_BACKUP_FACTORIES, storeName))) {
+      storeBackupManagers = getList(String.format(STORE_BACKUP_FACTORIES, storeName), new ArrayList<>());
+    } else {
+      storeBackupManagers = getJobStoreBackupFactories();
+      // For backwards compatibility if the changelog is enabled, we use default kafka backup factory
+      if (storeBackupManagers.isEmpty() && getChangelogStream(storeName).isPresent()) {
+        storeBackupManagers = DEFAULT_BACKUP_FACTORIES;
+      }
     }
     return storeBackupManagers;
   }
 
   public Set<String> getBackupFactories() {
     return getStoreNames().stream()
-        .flatMap((storeName) -> getStoreBackupFactory(storeName).stream())
+        .flatMap((storeName) -> getStoreBackupFactories(storeName).stream())
         .collect(Collectors.toSet());
   }
 
-  public String getRestoreFactory() {
-    return get(STORE_RESTORE_FACTORY, KAFKA_STATE_BACKEND_FACTORY);
-  }
-
   public List<String> getStoresWithBackupFactory(String backendFactoryName) {
     return getStoreNames().stream()
-        .filter((storeName) -> getStoreBackupFactory(storeName)
+        .filter((storeName) -> getStoreBackupFactories(storeName)
             .contains(backendFactoryName))
         .collect(Collectors.toList());
   }
 
-  // TODO BLOCKER dchen update when making restore managers per store
+  public List<String> getPersistentStoresWithBackupFactory(String backendFactoryName) {
+    return getStoreNames().stream()
+        .filter(storeName -> {
+          Optional<String> storeFactory = getStorageFactoryClassName(storeName);
+          return storeFactory.isPresent() &&
+              !storeFactory.get().equals(StorageConfig.INMEMORY_KV_STORAGE_ENGINE_FACTORY);
+        })
+        .filter((storeName) -> getStoreBackupFactories(storeName)
+            .contains(backendFactoryName))
+        .collect(Collectors.toList());
+  }
+
+  private List<String> getJobStoreRestoreFactories() {
+    return getList(JOB_RESTORE_FACTORIES, new ArrayList<>());
+  }
+
+  /**
+   * Restore state backend factory follows the precedence:
+   *
+   * 1. If stores.store-name.restore.factories config key exists for the store-name, that value is used
+   * 2. If stores.restore.factories is set for the job, that value is used
+   * 3. If stores.store-name.changelog is set for store-name, the default Kafka changelog state backend factory
+   * 4. Otherwise no restore factories will be configured for the store
+   *
+   * Note that 2 takes precedence over 3 enables job based migration off of Changelog restores
+   * @return List of restore factories for the store in order of restoration precedence
+   */
+  public List<String> getStoreRestoreFactories(String storeName) {
+    List<String> storeRestoreManagers;
+    if (containsKey(String.format(STORE_RESTORE_FACTORIES, storeName))) {
+      storeRestoreManagers = getList(String.format(STORE_RESTORE_FACTORIES, storeName), new ArrayList<>());
+    } else {
+      storeRestoreManagers = getJobStoreRestoreFactories();
+      // for backwards compatibility if changelog is enabled, we use default Kafka backup factory
+      if (storeRestoreManagers.isEmpty() && getChangelogStream(storeName).isPresent()) {
+        storeRestoreManagers = DEFAULT_RESTORE_FACTORIES;
+      }
+    }
+    return storeRestoreManagers;
+  }
+
+  public Set<String> getRestoreFactories() {
+    return getStoreNames().stream()
+        .flatMap((storesName) -> getStoreRestoreFactories(storesName).stream())
+        .collect(Collectors.toSet());
+  }
+
   public List<String> getStoresWithRestoreFactory(String backendFactoryName) {
     return getStoreNames().stream()
-        .filter((storeName) -> getRestoreFactory().equals(backendFactoryName))
+        .filter((storeName) -> getStoreRestoreFactories(storeName).contains(backendFactoryName))
         .collect(Collectors.toList());
   }
 
   /**
    * Helper method to get if logged store dirs should be deleted regardless of their contents.
-   * @return
    */
   public boolean cleanLoggedStoreDirsOnStart(String storeName) {
     return getBoolean(String.format(CLEAN_LOGGED_STOREDIRS_ON_START, storeName), false);
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
index 8db1d2a..06a8727 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -63,7 +63,7 @@
   // COMMIT_MAX_DELAY_MS have passed since the pending commit start. if the pending commit
   // does not complete within this timeout, the container will shut down.
   public static final String COMMIT_TIMEOUT_MS = "task.commit.timeout.ms";
-  static final long DEFAULT_COMMIT_TIMEOUT_MS = Duration.ofMinutes(1).toMillis();
+  static final long DEFAULT_COMMIT_TIMEOUT_MS = Duration.ofMinutes(30).toMillis();
 
   // how long to wait for a clean shutdown
   public static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
@@ -125,8 +125,8 @@
   public static final List<String> DEFAULT_CHECKPOINT_WRITE_VERSIONS = ImmutableList.of("1", "2");
 
   // checkpoint version to read during container startup
-  public static final String CHECKPOINT_READ_VERSION = "task.checkpoint.read.version";
-  public static final short DEFAULT_CHECKPOINT_READ_VERSION = 1;
+  public static final String CHECKPOINT_READ_VERSIONS = "task.checkpoint.read.versions";
+  public static final List<String> DEFAULT_CHECKPOINT_READ_VERSIONS = ImmutableList.of("1");
 
   public static final String TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = "task.transactional.state.checkpoint.enabled";
   private static final boolean DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = true;
@@ -348,8 +348,16 @@
         .stream().map(Short::valueOf).collect(Collectors.toList());
   }
 
-  public short getCheckpointReadVersion() {
-    return getShort(CHECKPOINT_READ_VERSION, DEFAULT_CHECKPOINT_READ_VERSION);
+  public List<Short> getCheckpointReadVersions() {
+    List<Short> checkpointReadPriorityList = getList(CHECKPOINT_READ_VERSIONS, DEFAULT_CHECKPOINT_READ_VERSIONS)
+        .stream().map(Short::valueOf).collect(Collectors.toList());
+    if (checkpointReadPriorityList.isEmpty()) {
+      // if the user explicitly defines the checkpoint read list to be empty
+      throw new IllegalArgumentException("No checkpoint read versions defined for job. "
+          + "Please remove the task.checkpoint.read.versions or define valid checkpoint versions");
+    } else {
+      return checkpointReadPriorityList;
+    }
   }
 
   public boolean getTransactionalStateCheckpointEnabled() {
diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
index c172288..083b483 100644
--- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
+++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
@@ -244,7 +244,7 @@
    * The Startpoints that are written to with {@link #writeStartpoint(SystemStreamPartition, Startpoint)} and with
    * {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are moved from a "read-write" namespace
    * to a "fan out" namespace.
-   * This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this
+   * This method is not atomic nor thread-safe. The intent is for the Samza Processor's coordinator to use this
    * method to assign the Startpoints to the appropriate tasks.
    * @param taskToSSPs Determines which {@link TaskName} each {@link SystemStreamPartition} maps to.
    * @return The set of active {@link TaskName}s that were fanned out to.
diff --git a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
index be419b6..8433996 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
@@ -91,6 +91,7 @@
       TaskModel taskModel,
       ExecutorService restoreExecutor,
       MetricsRegistry metricsRegistry,
+      Set<String> storesToRestore,
       Config config,
       Clock clock,
       File loggedStoreBaseDir,
@@ -108,7 +109,7 @@
 
     if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
       return new TransactionalStateTaskRestoreManager(
-          kafkaChangelogRestoreParams.getStoreNames(),
+          storesToRestore,
           jobContext,
           containerContext,
           taskModel,
@@ -128,7 +129,7 @@
       );
     } else {
       return new NonTransactionalStateTaskRestoreManager(
-          kafkaChangelogRestoreParams.getStoreNames(),
+          storesToRestore,
           jobContext,
           containerContext,
           taskModel,
diff --git a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
index 35fa375..33af367 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
@@ -326,7 +326,6 @@
     return storageManagerUtil.getStartingOffset(systemStreamPartition, systemAdmin, fileOffset, oldestOffset);
   }
 
-  // TODO dchen put this in common code path for transactional and non-transactional
   private Map<String, StorageEngine> createStoreEngines(Set<String> storeNames, JobContext jobContext,
       ContainerContext containerContext, Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
       Map<String, Serde<Object>> serdes, MetricsRegistry metricsRegistry,
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index 057b248..badeb28 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -113,7 +113,6 @@
    * @param isSideInput true if store is a side-input store, false if it is a regular store
    * @return true if the store is stale, false otherwise
    */
-  // TODO BLOCKER dchen do these methods need to be updated to also read the new checkpoint file?
   public boolean isStaleStore(File storeDir, long storeDeleteRetentionInMs, long currentTimeMs, boolean isSideInput) {
     long offsetFileLastModifiedTime;
     boolean isStaleStore = false;
@@ -127,8 +126,11 @@
       File offsetFileRefNew = new File(storeDir, OFFSET_FILE_NAME_NEW);
       File offsetFileRefLegacy = new File(storeDir, OFFSET_FILE_NAME_LEGACY);
       File sideInputOffsetFileRefLegacy = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+      File checkpointV2File = new File(storeDir, CHECKPOINT_FILE_NAME);
 
-      if (offsetFileRefNew.exists()) {
+      if (checkpointV2File.exists()) {
+        offsetFileLastModifiedTime = checkpointV2File.lastModified();
+      } else if (offsetFileRefNew.exists()) {
         offsetFileLastModifiedTime = offsetFileRefNew.lastModified();
       } else if (!isSideInput && offsetFileRefLegacy.exists()) {
         offsetFileLastModifiedTime = offsetFileRefLegacy.lastModified();
@@ -184,7 +186,6 @@
    * @param isSideInput true if store is a side-input store, false if it is a regular store
    * @return true if the offset file is valid. false otherwise.
    */
-  // TODO BLOCKER dchen do these methods need to be updated to also read the new checkpoint file?
   public boolean isOffsetFileValid(File storeDir, Set<SystemStreamPartition> storeSSPs, boolean isSideInput) {
     boolean hasValidOffsetFile = false;
     if (storeDir.exists()) {
@@ -305,11 +306,11 @@
   /**
    * Read and return the {@link CheckpointV2} from the directory's {@link #CHECKPOINT_FILE_NAME} file.
    * If the file does not exist, returns null.
-   * // TODO HIGH dchen add tests at all call sites for handling null value.
    *
    * @param storagePartitionDir store directory to read the checkpoint file from
    * @return the {@link CheckpointV2} object retrieved from the checkpoint file if found, otherwise return null
    */
+  // TODO dchen use checkpoint v2 file before migrating off of dual checkpoints
   public CheckpointV2 readCheckpointV2File(File storagePartitionDir) {
     File checkpointFile = new File(storagePartitionDir, CHECKPOINT_FILE_NAME);
     if (checkpointFile.exists()) {
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 8091e48..6b657d5 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -24,6 +24,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.checkpoint.CheckpointManager;
 import org.apache.samza.config.Config;
@@ -215,7 +217,10 @@
    */
   @SuppressWarnings("rawtypes")
   private void getContainerStorageManagers() {
-    String factoryClass = new StorageConfig(jobConfig).getRestoreFactory();
+    Set<String> factoryClasses = new StorageConfig(jobConfig).getRestoreFactories();
+    Map<String, StateBackendFactory> stateBackendFactories = factoryClasses.stream().collect(
+        Collectors.toMap(factoryClass -> factoryClass,
+          factoryClass -> ReflectionUtil.getObj(factoryClass, StateBackendFactory.class)));
     Clock clock = SystemClock.instance();
     StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, clock);
     // don't worry about prefetching for this; looks like the tool doesn't flush to offset files anyways
@@ -242,7 +247,7 @@
               new SamzaContainerMetrics(containerModel.getId(), new MetricsRegistryMap(), ""),
               JobContextImpl.fromConfigWithDefaults(jobConfig, jobModel),
               containerContext,
-              ReflectionUtil.getObj(factoryClass, StateBackendFactory.class),
+              stateBackendFactories,
               new HashMap<>(),
               storeBaseDir,
               storeBaseDir,
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
index 20f653c..f33bb5b 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
@@ -611,7 +611,7 @@
     } else if (checkpoint instanceof CheckpointV1) {
       // If the checkpoint v1 is used, we need to fetch the changelog SSPs in the inputOffsets in order to get the
       // store offset.
-      Map<SystemStreamPartition, String> checkpointedOffsets = ((CheckpointV1) checkpoint).getOffsets();
+      Map<SystemStreamPartition, String> checkpointedOffsets = checkpoint.getOffsets();
       storeChangelogs.forEach((storeName, systemStream) -> {
         Partition changelogPartition = taskModel.getChangelogPartition();
         SystemStreamPartition storeChangelogSSP = new SystemStreamPartition(systemStream, changelogPartition);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
index 15cc87b..fb04ce5 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
@@ -114,7 +114,7 @@
     this.storageManagerUtil = storageManagerUtil;
     StorageConfig storageConfig = new StorageConfig(config);
     this.storesToBackup =
-        storageConfig.getStoresWithBackupFactory(BlobStoreStateBackendFactory.class.getName());
+        storageConfig.getPersistentStoresWithBackupFactory(BlobStoreStateBackendFactory.class.getName());
     this.loggedStoreBaseDir = loggedStoreBaseDir;
     this.blobStoreManager = blobStoreManager;
     this.blobStoreUtil = createBlobStoreUtil(blobStoreManager, executor, blobStoreTaskBackupMetrics);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
index 052e49d..64bc74e 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
@@ -84,7 +84,7 @@
   private final File loggedBaseDir;
   private final File nonLoggedBaseDir;
   private final String taskName;
-  private final List<String> storesToRestore;
+  private final Set<String> storesToRestore;
   private final BlobStoreRestoreManagerMetrics metrics;
 
   private BlobStoreManager blobStoreManager;
@@ -95,13 +95,13 @@
    */
   private Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes;
 
-  public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService restoreExecutor,
+  public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService restoreExecutor, Set<String> storesToRestore,
       BlobStoreRestoreManagerMetrics metrics, Config config, File loggedBaseDir, File nonLoggedBaseDir,
       StorageManagerUtil storageManagerUtil, BlobStoreManager blobStoreManager) {
     this.taskModel = taskModel;
     this.jobName = new JobConfig(config).getName().get();
     this.jobId = new JobConfig(config).getJobId();
-    this.executor = restoreExecutor; // TODO BLOCKER dchen1 dont block on restore executor
+    this.executor = restoreExecutor;
     this.config = config;
     this.storageConfig = new StorageConfig(config);
     this.blobStoreConfig = new BlobStoreConfig(config);
@@ -113,9 +113,7 @@
     this.loggedBaseDir = loggedBaseDir;
     this.nonLoggedBaseDir = nonLoggedBaseDir;
     this.taskName = taskModel.getTaskName().getTaskName();
-    StorageConfig storageConfig = new StorageConfig(config);
-    this.storesToRestore =
-        storageConfig.getStoresWithRestoreFactory(BlobStoreStateBackendFactory.class.getName());
+    this.storesToRestore = storesToRestore;
     this.metrics = metrics;
   }
 
@@ -168,7 +166,7 @@
 
   /**
    * Deletes blob store contents for stores that were present in the last checkpoint but are either no longer
-   * present in job configs (removed by user since last deploymetn) or are no longer configured to be backed
+   * present in job configs (removed by user since last deployment) or are no longer configured to be backed
    * up using blob stores.
    *
    * This method blocks until all the necessary store contents and snapshot index blobs have been marked for deletion.
@@ -209,7 +207,7 @@
    * Restores all eligible stores in the task.
    */
   @VisibleForTesting
-  static void restoreStores(String jobName, String jobId, TaskName taskName, List<String> storesToRestore,
+  static void restoreStores(String jobName, String jobId, TaskName taskName, Set<String> storesToRestore,
       Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes,
       File loggedBaseDir, StorageConfig storageConfig, BlobStoreRestoreManagerMetrics metrics,
       StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, DirDiffUtil dirDiffUtil,
@@ -284,7 +282,7 @@
     FutureUtil.allOf(restoreFutures).whenComplete((res, ex) -> {
       LOG.info("Restore completed for task: {} stores", taskName);
       metrics.restoreNs.set(System.nanoTime() - restoreStartTime);
-    }).join(); // TODO BLOCKER dchen1 make non-blocking.
+    }).join(); // TODO dchen make non-blocking for the restore executor
   }
 
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
index 23482c7..e2512b4 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
@@ -21,6 +21,7 @@
 
 import com.google.common.base.Preconditions;
 import java.io.File;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.BlobStoreConfig;
@@ -73,6 +74,7 @@
       TaskModel taskModel,
       ExecutorService restoreExecutor,
       MetricsRegistry metricsRegistry,
+      Set<String> storesToRestore,
       Config config,
       Clock clock,
       File loggedStoreBaseDir,
@@ -84,7 +86,7 @@
     BlobStoreManagerFactory factory = ReflectionUtil.getObj(blobStoreManagerFactory, BlobStoreManagerFactory.class);
     BlobStoreManager blobStoreManager = factory.getRestoreBlobStoreManager(config, restoreExecutor);
     BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(metricsRegistry);
-    return new BlobStoreRestoreManager(taskModel, restoreExecutor, metrics, config, loggedStoreBaseDir,
+    return new BlobStoreRestoreManager(taskModel, restoreExecutor, storesToRestore, metrics, config, loggedStoreBaseDir,
         nonLoggedStoreBaseDir, new StorageManagerUtil(), blobStoreManager);
   }
 
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/index/serde/SnapshotIndexSerde.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/index/serde/SnapshotIndexSerde.java
index 2e0e50c..9e316ad 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/index/serde/SnapshotIndexSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/index/serde/SnapshotIndexSerde.java
@@ -24,9 +24,8 @@
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
 import org.apache.samza.SamzaException;
-import org.apache.samza.checkpoint.CheckpointId;
-import org.apache.samza.serializers.JsonCheckpointIdMixin;
 import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.storage.blobstore.index.DirIndex;
 import org.apache.samza.storage.blobstore.index.FileBlob;
 import org.apache.samza.storage.blobstore.index.FileIndex;
@@ -37,7 +36,7 @@
 
 public class SnapshotIndexSerde implements Serde<SnapshotIndex> {
 
-  private final static ObjectMapper MAPPER = new ObjectMapper();
+  private final static ObjectMapper MAPPER = SamzaObjectMapper.getObjectMapper();
   private TypeReference<SnapshotIndex> typeReference;
   private final ObjectWriter objectWriter;
 
@@ -48,8 +47,7 @@
         .addMixIn(DirIndex.class, JsonDirIndexMixin.class)
         .addMixIn(FileIndex.class, JsonFileIndexMixin.class)
         .addMixIn(FileMetadata.class, JsonFileMetadataMixin.class)
-        .addMixIn(FileBlob.class, JsonFileBlobMixin.class)
-        .addMixIn(CheckpointId.class, JsonCheckpointIdMixin.class);
+        .addMixIn(FileBlob.class, JsonFileBlobMixin.class);
 
     this.typeReference = new TypeReference<SnapshotIndex>() { };
     this.objectWriter = MAPPER.writerFor(typeReference);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
index 9c8b61f..b312b36 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java
@@ -456,7 +456,7 @@
    * @return A future containing the {@link FileIndex} for the uploaded file.
    */
   @VisibleForTesting
-  CompletableFuture<FileIndex> putFile(File file, SnapshotMetadata snapshotMetadata) {
+  public CompletableFuture<FileIndex> putFile(File file, SnapshotMetadata snapshotMetadata) {
     if (file == null || !file.isFile()) {
       String message = file != null ? "Dir or Symbolic link" : "null";
       throw new SamzaException(String.format("Required a non-null parameter of type file, provided: %s", message));
@@ -483,7 +483,7 @@
 
         fileBlobFuture = blobStoreManager.put(inputStream, metadata)
             .thenApplyAsync(id -> {
-              LOG.trace("Put complete. Closing input stream for file: {}.", file.getPath());
+              LOG.trace("Put complete. Received Blob ID {}. Closing input stream for file: {}.", id, file.getPath());
               try {
                 finalInputStream.close();
               } catch (Exception e) {
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 92b4e59..1f98b22 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -185,22 +185,38 @@
       taskNames.foreach(checkpointManager.register)
       checkpointManager.start()
 
-      // TODO dchen make add support for checkpointv2
+      // Get preferred read version for the checkpoint application
+      val checkpointReadVersion = taskConfig.getCheckpointReadVersions.get(0)
+      val defaultCheckpoint = if (checkpointReadVersion == 1) {
+        new CheckpointV1(new java.util.HashMap[SystemStreamPartition, String]())
+      } else if (checkpointReadVersion == 2) {
+        new CheckpointV2(CheckpointId.create(), new java.util.HashMap[SystemStreamPartition, String](),
+          new java.util.HashMap[String, util.Map[String, String]]())
+      } else {
+        throw new SamzaException("Unrecognized checkpoint read version: " + checkpointReadVersion)
+      }
+
       val lastCheckpoints = taskNames.map(taskName => {
         taskName -> Option(checkpointManager.readLastCheckpoint(taskName))
-          .getOrElse(new CheckpointV1(new java.util.HashMap[SystemStreamPartition, String]()))
-          .getOffsets
-          .asScala
-          .toMap
+          .getOrElse(defaultCheckpoint)
       }).toMap
 
-      lastCheckpoints.foreach(lcp => logCheckpoint(lcp._1, lcp._2, "Current checkpoint for task: "+ lcp._1))
+      lastCheckpoints.foreach(lcp => logCheckpoint(lcp._1, lcp._2.getOffsets.asScala.toMap,
+        "Current checkpoint for task: " + lcp._1))
 
       if (newOffsets != null) {
         newOffsets.foreach {
           case (taskName: TaskName, offsets: Map[SystemStreamPartition, String]) =>
             logCheckpoint(taskName, offsets, "New offset to be written for task: " + taskName)
-            val checkpoint = new CheckpointV1(offsets.asJava)
+            val checkpoint = if (checkpointReadVersion == 1) {
+              new CheckpointV1(offsets.asJava)
+            } else if (checkpointReadVersion == 2) {
+              val lastSCMs = lastCheckpoints.getOrElse(taskName, defaultCheckpoint)
+                .asInstanceOf[CheckpointV2].getStateCheckpointMarkers
+              new CheckpointV2(CheckpointId.create(), offsets.asJava, lastSCMs)
+            } else {
+              throw new SamzaException("Unrecognized checkpoint read version: " + checkpointReadVersion)
+            }
             checkpointManager.writeCheckpoint(taskName, checkpoint)
             info(s"Updated the checkpoint of the task: $taskName to: $offsets")
         }
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
index b740a13..a068e2a 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
@@ -43,6 +43,9 @@
   def getCheckpointFile(taskName: TaskName) = getFile(jobName, taskName, "checkpoints")
 
   def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
+    if (!checkpoint.isInstanceOf[CheckpointV1]) {
+      throw new SamzaException("Unsupported checkpoint version: " + checkpoint.getVersion)
+    }
     val bytes = serde.toBytes(checkpoint.asInstanceOf[CheckpointV1])
     val fos = new FileOutputStream(getCheckpointFile(taskName))
 
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index b3360f2..fe27552 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -23,12 +23,10 @@
 import java.lang.management.ManagementFactory
 import java.net.{URL, UnknownHostException}
 import java.nio.file.Path
-import java.time.Duration
 import java.util
 import java.util.concurrent._
 import java.util.function.Consumer
 import java.util.{Base64, Optional}
-
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.samza.SamzaException
@@ -502,9 +500,22 @@
     val loggedStorageBaseDir = getLoggedStorageBaseDir(jobConfig, defaultStoreBaseDir)
     info("Got base directory for logged data stores: %s" format loggedStorageBaseDir)
 
-    // TODO dchen should we enforce restore factories to be subset of backup factories?
-    val stateStorageBackendRestoreFactory = ReflectionUtil
-      .getObj(storageConfig.getRestoreFactory(), classOf[StateBackendFactory])
+    val backupFactoryNames = storageConfig.getBackupFactories
+    val restoreFactoryNames = storageConfig.getRestoreFactories
+
+    // Restore factories should be a subset of backup factories
+    if (!backupFactoryNames.containsAll(restoreFactoryNames)) {
+      backupFactoryNames.removeAll(restoreFactoryNames)
+      throw new SamzaException("Restore state backend factories is not a subset of backup state backend factories, " +
+        "missing factories: " + backupFactoryNames.toString)
+    }
+
+    val stateStorageBackendBackupFactories = backupFactoryNames.asScala.map(
+      ReflectionUtil.getObj(_, classOf[StateBackendFactory])
+    )
+    val stateStorageBackendRestoreFactories = restoreFactoryNames.asScala.map(
+      factoryName => (factoryName , ReflectionUtil.getObj(factoryName, classOf[StateBackendFactory])))
+      .toMap.asJava
 
     val containerStorageManager = new ContainerStorageManager(
       checkpointManager,
@@ -521,7 +532,7 @@
       samzaContainerMetrics,
       jobContext,
       containerContext,
-      stateStorageBackendRestoreFactory,
+      stateStorageBackendRestoreFactories,
       taskCollectors.asJava,
       loggedStorageBaseDir,
       nonLoggedStorageBaseDir,
@@ -530,9 +541,7 @@
 
     storeWatchPaths.addAll(containerStorageManager.getStoreDirectoryPaths)
 
-    val stateStorageBackendBackupFactories = storageConfig.getBackupFactories().asScala.map(
-      ReflectionUtil.getObj(_, classOf[StateBackendFactory])
-    )
+
 
     // Create taskInstances
     val taskInstances: Map[TaskName, TaskInstance] = taskModels
@@ -684,19 +693,6 @@
       containerStorageManager = containerStorageManager,
       diagnosticsManager = diagnosticsManager)
   }
-
-  /**
-    * Builds the set of SSPs for all changelogs on this container.
-    */
-  @VisibleForTesting
-  private[container] def getChangelogSSPsForContainer(containerModel: ContainerModel,
-    changeLogSystemStreams: util.Map[String, SystemStream]): Set[SystemStreamPartition] = {
-    containerModel.getTasks.values().asScala
-      .map(taskModel => taskModel.getChangelogPartition)
-      .flatMap(changelogPartition => changeLogSystemStreams.asScala.map { case (_, systemStream) =>
-        new SystemStreamPartition(systemStream, changelogPartition) })
-      .toSet
-  }
 }
 
 class SamzaContainer(
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 8a872de..f91aae1 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -290,6 +290,7 @@
 
         if (!commitInProgress.tryAcquire(commitTimeoutMs, TimeUnit.MILLISECONDS)) {
           val timeSinceLastCommit = System.currentTimeMillis() - lastCommitStartTimeMs
+          metrics.commitsTimedOut.set(metrics.commitsTimedOut.getValue + 1)
           throw new SamzaException("Timeout waiting for pending commit for taskName: %s to finish. " +
             "%s ms have elapsed since the pending commit started. Max allowed commit delay is %s ms " +
             "and commit timeout beyond that is %s ms" format (taskName, timeSinceLastCommit,
@@ -356,14 +357,11 @@
           val checkpointWriteFuture: CompletableFuture[util.Map[String, util.Map[String, String]]] =
             uploadSCMsFuture.thenApplyAsync(writeCheckpoint(checkpointId, inputOffsets), commitThreadPool)
 
-          val cleanupStartTimeNs = System.nanoTime()
           val cleanUpFuture: CompletableFuture[Void] =
             checkpointWriteFuture.thenComposeAsync(cleanUp(checkpointId), commitThreadPool)
           cleanUpFuture.whenComplete(new BiConsumer[Void, Throwable] {
             override def accept(v: Void, throwable: Throwable): Unit = {
-              if (throwable == null) {
-                metrics.asyncCleanupNs.update(System.nanoTime() - cleanupStartTimeNs)
-              } else {
+              if (throwable != null) {
                 warn("Commit cleanup did not complete successfully for taskName: %s checkpointId: %s with error msg: %s"
                   format (taskName, checkpointId, throwable.getMessage))
               }
@@ -380,6 +378,7 @@
       }
     })
 
+    metrics.lastCommitNs.set(System.nanoTime() - commitStartNs)
     metrics.commitSyncNs.update(System.nanoTime() - commitStartNs)
     debug("Finishing sync stage of commit for taskName: %s checkpointId: %s" format (taskName, checkpointId))
   }
@@ -428,6 +427,7 @@
       override def apply(uploadSCMs: util.Map[String, util.Map[String, String]]): CompletableFuture[Void] = {
         // Perform cleanup on unused checkpoints
         debug("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId))
+        val cleanUpStartTime = System.nanoTime()
         try {
           commitManager.cleanUp(checkpointId, uploadSCMs)
         } catch {
@@ -438,6 +438,8 @@
             throw new SamzaException(
               "Failed to remove old checkpoint state for taskName: %s checkpointId: %s."
                 format(taskName, checkpointId), e)
+        } finally {
+          metrics.asyncCleanupNs.update(System.nanoTime() - cleanUpStartTime)
         }
       }
     }
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
index f13e37a..54d3665 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
@@ -38,9 +38,10 @@
   val pendingMessages = newGauge("pending-messages", 0)
   val messagesInFlight = newGauge("messages-in-flight", 0)
   val asyncCallbackCompleted = newCounter("async-callback-complete-calls")
-
+  val commitsTimedOut = newGauge("commits-timed-out", 0)
   val commitsSkipped = newGauge("commits-skipped", 0)
   val commitNs = newTimer("commit-ns")
+  val lastCommitNs = newGauge("last-commit-ns", 0L)
   val commitSyncNs = newTimer("commit-sync-ns")
   val commitAsyncNs = newTimer("commit-async-ns")
   val snapshotNs = newTimer("snapshot-ns")
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 5cc79b3..f804a5e 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -44,6 +44,7 @@
 import org.apache.samza.SamzaException;
 import org.apache.samza.checkpoint.Checkpoint;
 import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV2;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.StorageConfig;
@@ -120,7 +121,6 @@
   private static final int RESTORE_THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS = 60;
 
   /** Maps containing relevant per-task objects */
-  private final Map<TaskName, TaskRestoreManager> taskRestoreManagers;
   private final Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics;
   private final Map<TaskName, TaskInstanceCollector> taskInstanceCollectors;
   private final Map<TaskName, Map<String, StorageEngine>> inMemoryStores; // subset of taskStores after #start()
@@ -131,6 +131,8 @@
   private final Map<String, SystemStream> changelogSystemStreams; // Map of changelog system-streams indexed by store name
   private final Map<String, Serde<Object>> serdes; // Map of Serde objects indexed by serde name (specified in config)
   private final SystemAdmins systemAdmins;
+  private final Clock clock;
+  private final Map<String, StateBackendFactory> restoreStateBackendFactories;
 
   private final StreamMetadataCache streamMetadataCache;
   private final SamzaContainerMetrics samzaContainerMetrics;
@@ -153,7 +155,7 @@
   private final Set<String> sideInputStoreNames;
   private final Map<SystemStreamPartition, TaskSideInputHandler> sspSideInputHandlers;
   private SystemConsumers sideInputSystemConsumers;
-  private volatile Map<TaskName, CountDownLatch> sideInputTaskLatches; // Used by the sideInput-read thread to signal to the main thread
+  private final Map<TaskName, CountDownLatch> sideInputTaskLatches; // Used by the sideInput-read thread to signal to the main thread
   private volatile boolean shouldShutdown = false;
   private RunLoop sideInputRunLoop;
 
@@ -183,7 +185,7 @@
       SamzaContainerMetrics samzaContainerMetrics,
       JobContext jobContext,
       ContainerContext containerContext,
-      StateBackendFactory stateBackendFactory,
+      Map<String, StateBackendFactory> restoreStateBackendFactories,
       Map<TaskName, TaskInstanceCollector> taskInstanceCollectors,
       File loggedStoreBaseDirectory,
       File nonLoggedStoreBaseDirectory,
@@ -203,6 +205,8 @@
 
     LOG.info("Starting with changelogSystemStreams = {} taskSideInputStoreSSPs = {}", this.changelogSystemStreams, this.taskSideInputStoreSSPs);
 
+    this.clock = clock;
+    this.restoreStateBackendFactories = restoreStateBackendFactories;
     this.storageEngineFactories = storageEngineFactories;
     this.serdes = serdes;
     this.loggedStoreBaseDirectory = loggedStoreBaseDirectory;
@@ -260,16 +264,13 @@
     JobConfig jobConfig = new JobConfig(config);
     int restoreThreadPoolSize =
         Math.min(
-            Math.max(containerModel.getTasks().size() * 2, jobConfig.getRestoreThreadPoolSize()),
+            Math.max(containerModel.getTasks().size() * restoreStateBackendFactories.size() * 2,
+                jobConfig.getRestoreThreadPoolSize()),
             jobConfig.getRestoreThreadPoolMaxSize()
         );
     this.restoreExecutor = Executors.newFixedThreadPool(restoreThreadPoolSize,
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
 
-    // creating task restore managers
-    this.taskRestoreManagers = createTaskRestoreManagers(stateBackendFactory, clock,
-        this.samzaContainerMetrics);
-
     this.sspSideInputHandlers = createSideInputHandlers(clock);
 
     // create SystemConsumers for consuming from taskSideInputSSPs, if sideInputs are being used
@@ -393,27 +394,95 @@
     return storeConsumers;
   }
 
-  private Map<TaskName, TaskRestoreManager> createTaskRestoreManagers(StateBackendFactory factory, Clock clock,
-      SamzaContainerMetrics samzaContainerMetrics) {
-    Map<TaskName, TaskRestoreManager> taskRestoreManagers = new HashMap<>();
+  private Map<String, TaskRestoreManager> createTaskRestoreManagers(Map<String, StateBackendFactory> factories,
+      Map<String, Set<String>> backendFactoryStoreNames, Clock clock, SamzaContainerMetrics samzaContainerMetrics, TaskName taskName,
+      TaskModel taskModel) {
+    // Get the factories for the task based on the stores of the tasks to be restored from the factory
+    Map<String, TaskRestoreManager> backendFactoryRestoreManagers = new HashMap<>(); // backendFactoryName -> restoreManager
+    MetricsRegistry taskMetricsRegistry =
+        taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap();
 
-    containerModel.getTasks().forEach((taskName, taskModel) -> {
-      MetricsRegistry taskMetricsRegistry =
-          taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap();
-      Set<String> nonSideInputStoreNames = storageEngineFactories.keySet().stream()
-          .filter(storeName -> !sideInputStoreNames.contains(storeName))
-          .collect(Collectors.toSet());
+    backendFactoryStoreNames.forEach((factoryName, storeNames) -> {
+      StateBackendFactory factory = factories.get(factoryName);
       KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new KafkaChangelogRestoreParams(storeConsumers,
           inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), storageEngineFactories, serdes,
-          taskInstanceCollectors.get(taskName), nonSideInputStoreNames);
+          taskInstanceCollectors.get(taskName));
+      TaskRestoreManager restoreManager = factory.getRestoreManager(jobContext, containerContext, taskModel, restoreExecutor,
+          taskMetricsRegistry, storeNames, config, clock, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory,
+          kafkaChangelogRestoreParams);
 
-      taskRestoreManagers.put(taskName,
-          factory.getRestoreManager(jobContext, containerContext, taskModel, restoreExecutor,
-              taskMetricsRegistry, config, clock, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory,
-              kafkaChangelogRestoreParams));
-      samzaContainerMetrics.addStoresRestorationGauge(taskName);
+      backendFactoryRestoreManagers.put(factoryName, restoreManager);
     });
-    return taskRestoreManagers;
+    samzaContainerMetrics.addStoresRestorationGauge(taskName);
+    return backendFactoryRestoreManagers;
+  }
+
+  /**
+   * Return a map of backend factory names to set of stores that should be restored using it
+   */
+  @VisibleForTesting
+  Map<String, Set<String>> getBackendFactoryStoreNames(Checkpoint checkpoint, Set<String> storeNames,
+      StorageConfig storageConfig) {
+    Map<String, Set<String>> backendFactoryStoreNames = new HashMap<>(); // backendFactoryName -> set(storeNames)
+
+    if (checkpoint != null && checkpoint.getVersion() == 1) {
+      // Only restore stores with changelog streams configured
+      Set<String> changelogStores = storeNames.stream()
+          .filter(storeName -> storageConfig.getChangelogStream(storeName).isPresent())
+          .collect(Collectors.toSet());
+      // Default to changelog backend factory when using checkpoint v1 for backwards compatibility
+      if (!changelogStores.isEmpty()) {
+        backendFactoryStoreNames.put(StorageConfig.KAFKA_STATE_BACKEND_FACTORY, changelogStores);
+      }
+      if (storeNames.size() > changelogStores.size()) {
+        Set<String> nonChangelogStores = storeNames.stream()
+            .filter(storeName -> !changelogStores.contains(storeName))
+            .collect(Collectors.toSet());
+        LOG.info("non-Side input stores: {}, do not have a configured store changelogs for checkpoint V1,"
+                + "restore for the store will be skipped",
+            nonChangelogStores);
+      }
+    } else if (checkpoint == null ||  checkpoint.getVersion() == 2) {
+      // Extract the state checkpoint markers if checkpoint exists
+      Map<String, Map<String, String>> stateCheckpointMarkers = checkpoint == null ? Collections.emptyMap() :
+          ((CheckpointV2) checkpoint).getStateCheckpointMarkers();
+
+      // Find stores associated to each state backend factory
+      storeNames.forEach(storeName -> {
+        List<String> storeFactories = storageConfig.getStoreRestoreFactories(storeName);
+
+        if (storeFactories.isEmpty()) {
+          // If the restore factory is not configured for the store and the store does not have a changelog topic
+          LOG.info("non-Side input store: {}, does not have a configured restore factories nor store changelogs,"
+                  + "restore for the store will be skipped",
+              storeName);
+        } else {
+          // Search the ordered list for the first matched state backend factory in the checkpoint
+          // If the checkpoint does not exist or state checkpoint markers does not exist, we match the first configured
+          // restore manager
+          Optional<String> factoryNameOpt = storeFactories.stream()
+              .filter(factoryName -> stateCheckpointMarkers.containsKey(factoryName) &&
+                  stateCheckpointMarkers.get(factoryName).containsKey(storeName))
+              .findFirst();
+          String factoryName;
+          if (factoryNameOpt.isPresent()) {
+            factoryName = factoryNameOpt.get();
+          } else { // Restore factories configured but no checkpoints found
+            // Use first configured restore factory
+            factoryName = storeFactories.get(0);
+            LOG.warn("No matching checkpoints found for configured factories: {}, " +
+                "defaulting to using the first configured factory with no checkpoints", storeFactories);
+          }
+          if (!backendFactoryStoreNames.containsKey(factoryName)) {
+            backendFactoryStoreNames.put(factoryName, new HashSet<>());
+          }
+          backendFactoryStoreNames.get(factoryName).add(storeName);
+        }
+      });
+    } else {
+      throw new SamzaException(String.format("Unsupported checkpoint version %s", checkpoint.getVersion()));
+    }
+    return backendFactoryStoreNames;
   }
 
   // Helper method to filter active Tasks from the container model
@@ -443,9 +512,9 @@
       }
 
       for (String storeName : storesToCreate) {
-        List<String> storeBackupManager = storageConfig.getStoreBackupFactory(storeName);
+        List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
         // A store is considered durable if it is backed by a changelog or another backupManager factory
-        boolean isDurable = changelogSystemStreams.containsKey(storeName) || !storeBackupManager.isEmpty();
+        boolean isDurable = changelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty();
         boolean isSideInput = this.sideInputStoreNames.contains(storeName);
         // Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir
         // for non logged stores
@@ -467,7 +536,7 @@
         // add created store to map
         taskStores.get(taskName).put(storeName, storageEngine);
 
-        LOG.info("Created non side input store store {} in read-write mode for task {}", storeName, taskName);
+        LOG.info("Created task store {} in read-write mode for task {} in path {}", storeName, taskName, storeDirectory.getAbsolutePath());
       }
     }
     return taskStores;
@@ -646,30 +715,53 @@
   private void restoreStores() throws InterruptedException {
     LOG.info("Store Restore started");
     Set<TaskName> activeTasks = getTasks(containerModel, TaskMode.Active).keySet();
+    // TODO HIGH dchen verify davinci lifecycle
+    // Find all non-side input stores
+    Set<String> nonSideInputStoreNames = storageEngineFactories.keySet()
+        .stream()
+        .filter(storeName -> !sideInputStoreNames.contains(storeName))
+        .collect(Collectors.toSet());
 
-    // initialize each TaskStorageManager
-    this.taskRestoreManagers.forEach((taskName, taskRestoreManager) -> {
+    // Obtain the checkpoints for each task
+    Map<TaskName, Map<String, TaskRestoreManager>> taskRestoreManagers = new HashMap<>();
+    Map<TaskName, Checkpoint> taskCheckpoints = new HashMap<>();
+    containerModel.getTasks().forEach((taskName, taskModel) -> {
       Checkpoint taskCheckpoint = null;
       if (checkpointManager != null && activeTasks.contains(taskName)) {
         // only pass in checkpoints for active tasks
         taskCheckpoint = checkpointManager.readLastCheckpoint(taskName);
         LOG.info("Obtained checkpoint: {} for state restore for taskName: {}", taskCheckpoint, taskName);
       }
-      taskRestoreManager.init(taskCheckpoint);
+      taskCheckpoints.put(taskName, taskCheckpoint);
+      Map<String, Set<String>> backendFactoryStoreNames = getBackendFactoryStoreNames(taskCheckpoint, nonSideInputStoreNames,
+          new StorageConfig(config));
+      Map<String, TaskRestoreManager> taskStoreRestoreManagers = createTaskRestoreManagers(restoreStateBackendFactories,
+          backendFactoryStoreNames, clock, samzaContainerMetrics, taskName, taskModel);
+      taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
     });
 
-    // Start each store consumer once
+    // Initialize each TaskStorageManager
+    taskRestoreManagers.forEach((taskName, restoreManagers) ->
+        restoreManagers.forEach((factoryName, taskRestoreManager) ->
+            taskRestoreManager.init(taskCheckpoints.get(taskName))
+        )
+    );
+
+    // Start each store consumer once.
+    // Note: These consumers are per system and only changelog system store consumers will be started.
+    // Some TaskRestoreManagers may not require the consumer to to be started, but due to the agnostic nature of
+    // ContainerStorageManager we always start the changelog consumer here in case it is required
     this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
 
-    List<Future> taskRestoreFutures = new ArrayList<>(this.taskRestoreManagers.entrySet().size());
+    List<Future> taskRestoreFutures = new ArrayList<>();
 
     // Submit restore callable for each taskInstance
-    this.taskRestoreManagers.forEach((taskInstance, taskRestoreManager) -> {
-      taskRestoreFutures.add(restoreExecutor.submit(
-          new TaskRestoreCallable(this.samzaContainerMetrics, taskInstance, taskRestoreManager)));
-    });
+    taskRestoreManagers.forEach((taskInstance, restoreManagersMap) ->
+        // Submit for each restore factory
+        restoreManagersMap.forEach((factoryName, taskRestoreManager) -> taskRestoreFutures.add(restoreExecutor.submit(
+        new TaskRestoreCallable(this.samzaContainerMetrics, taskInstance, taskRestoreManager)))));
 
-    // loop-over the future list to wait for each thread to finish, catch any exceptions during restore and throw
+    // Loop-over the future list to wait for each thread to finish, catch any exceptions during restore and throw
     // as samza exceptions
     for (Future future : taskRestoreFutures) {
       try {
@@ -689,9 +781,6 @@
     this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop);
 
     // Now create persistent non side input stores in read-write mode, leave non-persistent stores as-is
-    Set<String> nonSideInputStoreNames = storageEngineFactories.keySet().stream()
-        .filter(storeName -> !sideInputStoreNames.contains(storeName))
-        .collect(Collectors.toSet());
     this.taskStores = createTaskStores(nonSideInputStoreNames, this.containerModel, jobContext, containerContext,
         storageEngineFactories, serdes, taskInstanceMetrics, taskInstanceCollectors);
     // Add in memory stores
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index 91f1fa1..5dba698 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -30,6 +30,7 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemStream;
 import org.junit.Test;
 
 import static org.apache.samza.config.StorageConfig.*;
@@ -71,7 +72,7 @@
   }
 
   /**
-   * Test verifies that the {@link StorageConfig#STORE_RESTORE_FACTORY} which matches pattern for store.%s.factory
+   * Test verifies that the {@link StorageConfig#STORE_RESTORE_FACTORIES} which matches pattern for store.%s.factory
    * is not picked up as in store names list
    */
   @Test
@@ -83,8 +84,7 @@
     // has stores
     StorageConfig storageConfig = new StorageConfig(new MapConfig(
         ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0), "store0.factory.class",
-            String.format(StorageConfig.FACTORY, STORE_NAME1), "store1.factory.class",
-            STORE_RESTORE_FACTORY, "org.apache.class")));
+            String.format(StorageConfig.FACTORY, STORE_NAME1), "store1.factory.class", STORE_RESTORE_FACTORIES, "org.apache.class")));
 
     List<String> actual = storageConfig.getStoreNames();
     // ordering shouldn't matter
@@ -101,57 +101,72 @@
     StorageConfig storageConfig = new StorageConfig(
         new MapConfig(ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), "")));
     assertEquals(Optional.empty(), storageConfig.getChangelogStream(STORE_NAME0));
+    assertEquals(Collections.emptyMap(), storageConfig.getStoreChangelogs());
 
     // store has full changelog system-stream defined
     storageConfig = new StorageConfig(new MapConfig(
         ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0),
-            "changelog-system.changelog-stream0")));
+            "changelog-system.changelog-stream0", String.format(FACTORY, STORE_NAME0), "store0.factory.class")));
     assertEquals(Optional.of("changelog-system.changelog-stream0"), storageConfig.getChangelogStream(STORE_NAME0));
+    assertEquals(ImmutableMap.of(STORE_NAME0, new SystemStream("changelog-system", "changelog-stream0")), storageConfig.getStoreChangelogs());
 
     // store has changelog stream defined, but system comes from job.changelog.system
     storageConfig = new StorageConfig(new MapConfig(
         ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), "changelog-stream0",
+            String.format(FACTORY, STORE_NAME0), "store0.factory.class",
             StorageConfig.CHANGELOG_SYSTEM, "changelog-system")));
     assertEquals(Optional.of("changelog-system.changelog-stream0"), storageConfig.getChangelogStream(STORE_NAME0));
+    assertEquals(ImmutableMap.of(STORE_NAME0,  new SystemStream("changelog-system", "changelog-stream0")), storageConfig.getStoreChangelogs());
 
     // batch mode: create unique stream name
     storageConfig = new StorageConfig(new MapConfig(
         ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0),
-            "changelog-system.changelog-stream0", ApplicationConfig.APP_MODE,
+            "changelog-system.changelog-stream0", String.format(FACTORY, STORE_NAME0), "store0.factory.class",
+            ApplicationConfig.APP_MODE,
             ApplicationConfig.ApplicationMode.BATCH.name().toLowerCase(), ApplicationConfig.APP_RUN_ID, "run-id")));
     assertEquals(Optional.of("changelog-system.changelog-stream0-run-id"),
         storageConfig.getChangelogStream(STORE_NAME0));
+    assertEquals(ImmutableMap.of(STORE_NAME0,  new SystemStream("changelog-system", "changelog-stream0-run-id")), storageConfig.getStoreChangelogs());
 
     // job has no changelog stream defined
     storageConfig = new StorageConfig(new MapConfig(
         ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system", JobConfig.JOB_DEFAULT_SYSTEM,
-            "should-not-be-used")));
+            "should-not-be-used", String.format(FACTORY, STORE_NAME0), "store0.factory.class")));
     assertEquals(Optional.empty(), storageConfig.getChangelogStream(STORE_NAME0));
+    assertEquals(Collections.emptyMap(), storageConfig.getStoreChangelogs());
 
     // job.changelog.system takes precedence over job.default.system when changelog is specified as just streamName
     storageConfig = new StorageConfig(new MapConfig(
         ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system", JobConfig.JOB_DEFAULT_SYSTEM,
-            "should-not-be-used", String.format(CHANGELOG_STREAM, STORE_NAME0), "streamName")));
+            "should-not-be-used", String.format(CHANGELOG_STREAM, STORE_NAME0), "streamName",
+            String.format(FACTORY, STORE_NAME0), "store0.factory.class")));
     assertEquals("changelog-system.streamName", storageConfig.getChangelogStream(STORE_NAME0).get());
+    assertEquals(ImmutableMap.of(STORE_NAME0,  new SystemStream("changelog-system", "streamName")), storageConfig.getStoreChangelogs());
 
     // job.changelog.system takes precedence over job.default.system when changelog is specified as {systemName}.{streamName}
     storageConfig = new StorageConfig(new MapConfig(
         ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system", JobConfig.JOB_DEFAULT_SYSTEM,
-            "should-not-be-used", String.format(CHANGELOG_STREAM, STORE_NAME0), "changelog-system.streamName")));
+            "should-not-be-used", String.format(CHANGELOG_STREAM, STORE_NAME0), "changelog-system.streamName",
+            String.format(FACTORY, STORE_NAME0), "store0.factory.class")));
     assertEquals("changelog-system.streamName", storageConfig.getChangelogStream(STORE_NAME0).get());
+    assertEquals(ImmutableMap.of(STORE_NAME0,  new SystemStream("changelog-system", "streamName")), storageConfig.getStoreChangelogs());
 
     // systemName specified using stores.{storeName}.changelog = {systemName}.{streamName} should take precedence even
     // when job.changelog.system and job.default.system are specified
     storageConfig = new StorageConfig(new MapConfig(
         ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "default-changelog-system",
             JobConfig.JOB_DEFAULT_SYSTEM, "default-system",
-            String.format(CHANGELOG_STREAM, STORE_NAME0), "nondefault-changelog-system.streamName")));
+            String.format(CHANGELOG_STREAM, STORE_NAME0), "nondefault-changelog-system.streamName",
+            String.format(FACTORY, STORE_NAME0), "store0.factory.class")));
     assertEquals("nondefault-changelog-system.streamName", storageConfig.getChangelogStream(STORE_NAME0).get());
+    assertEquals(ImmutableMap.of(STORE_NAME0,  new SystemStream("nondefault-changelog-system", "streamName")), storageConfig.getStoreChangelogs());
 
     // fall back to job.default.system if job.changelog.system is not specified
     storageConfig = new StorageConfig(new MapConfig(
-        ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "default-system", String.format(CHANGELOG_STREAM, STORE_NAME0), "streamName")));
+        ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "default-system", String.format(CHANGELOG_STREAM, STORE_NAME0),
+            "streamName", String.format(FACTORY, STORE_NAME0), "store0.factory.class")));
     assertEquals("default-system.streamName", storageConfig.getChangelogStream(STORE_NAME0).get());
+    assertEquals(ImmutableMap.of(STORE_NAME0,  new SystemStream("default-system", "streamName")), storageConfig.getStoreChangelogs());
   }
 
   @Test(expected = SamzaException.class)
@@ -188,12 +203,12 @@
     assertTrue(factories.contains(factory3));
     assertTrue(factories.contains(KAFKA_STATE_BACKEND_FACTORY));
     assertEquals(4, factories.size());
-    assertEquals(ImmutableList.of(factory1, factory2), storageConfig.getStoreBackupFactory(STORE_NAME0));
-    assertEquals(ImmutableList.of(factory1), storageConfig.getStoreBackupFactory(STORE_NAME1));
-    assertEquals(ImmutableList.of(factory3), storageConfig.getStoreBackupFactory(STORE_NAME2));
-    assertEquals(DEFAULT_BACKUP_FACTORIES, storageConfig.getStoreBackupFactory(STORE_NAME3));
-    assertTrue(storageConfig.getStoreBackupFactory("emptyStore").isEmpty());
-    assertTrue(storageConfig.getStoreBackupFactory("noFactoryStore").isEmpty());
+    assertEquals(ImmutableList.of(factory1, factory2), storageConfig.getStoreBackupFactories(STORE_NAME0));
+    assertEquals(ImmutableList.of(factory1), storageConfig.getStoreBackupFactories(STORE_NAME1));
+    assertEquals(ImmutableList.of(factory3), storageConfig.getStoreBackupFactories(STORE_NAME2));
+    assertEquals(DEFAULT_BACKUP_FACTORIES, storageConfig.getStoreBackupFactories(STORE_NAME3));
+    assertTrue(storageConfig.getStoreBackupFactories("emptyStore").isEmpty());
+    assertTrue(storageConfig.getStoreBackupFactories("noFactoryStore").isEmpty());
   }
 
   @Test
@@ -415,4 +430,148 @@
     configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0), String.valueOf(storeSpecificLagOverride));
     assertEquals(storeSpecificLagOverride, new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));
   }
+
+  @Test
+  public void testGetRestoreManagers() {
+    String storeName = "store1";
+    String storeName2 = "store2";
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(String.format(FACTORY, storeName), "store1.factory.class");
+    configMap.put(String.format(FACTORY, storeName2), "store2.factory.class");
+
+    // empty config, return no restore managers
+    assertEquals(Collections.emptySet(), new StorageConfig(new MapConfig(configMap)).getRestoreFactories());
+    assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName));
+    assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName2));
+
+    // changelog set, should default to kafka state backend restore
+    String changelogStreamOverride = "changelogStream";
+    configMap.put(String.format(CHANGELOG_STREAM, storeName), changelogStreamOverride);
+    configMap.put(String.format(CHANGELOG_STREAM, storeName2), changelogStreamOverride);
+    configMap.put(StorageConfig.CHANGELOG_SYSTEM, "changelog-system");
+    configMap.put(String.format(StorageConfig.CHANGELOG_STREAM, storeName), "changelog-stream0");
+    assertEquals(ImmutableSet.of(KAFKA_STATE_BACKEND_FACTORY), new StorageConfig(new MapConfig(configMap)).getRestoreFactories());
+    assertEquals(DEFAULT_RESTORE_FACTORIES, new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName));
+    assertEquals(DEFAULT_RESTORE_FACTORIES, new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName2));
+
+    // job restore manager config set should override to job backend factory
+    String jobRestoreFactory1 = "jobBackendRestoreFactory1";
+    String jobRestoreFactory2 = "jobBackendRestoreFactory2";
+    String jobRestoreFactoryOverride = jobRestoreFactory1 + "," + jobRestoreFactory2;
+    configMap.put(JOB_RESTORE_FACTORIES, jobRestoreFactoryOverride);
+    assertEquals(ImmutableSet.of(jobRestoreFactory1, jobRestoreFactory2),
+        new StorageConfig(new MapConfig(configMap)).getRestoreFactories());
+    assertEquals(ImmutableList.of(jobRestoreFactory1, jobRestoreFactory2),
+        new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName));
+    assertEquals(ImmutableList.of(jobRestoreFactory1, jobRestoreFactory2),
+        new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName2));
+
+    // store specific restore managers set
+    String storeRestoreFactory1 = "storeBackendRestoreFactory1";
+    String storeRestoreFactory2 = "storeBackendRestoreFactory2";
+    String storeRestoreFactoryOverride = storeRestoreFactory1 + "," + storeRestoreFactory2;
+    configMap.put(String.format(STORE_RESTORE_FACTORIES, storeName), storeRestoreFactoryOverride);
+    assertEquals(ImmutableSet.of(jobRestoreFactory1, jobRestoreFactory2, storeRestoreFactory1, storeRestoreFactory2),
+        new StorageConfig(new MapConfig(configMap)).getRestoreFactories());
+    assertEquals(ImmutableList.of(storeRestoreFactory1, storeRestoreFactory2),
+        new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName));
+    assertEquals(ImmutableList.of(jobRestoreFactory1, jobRestoreFactory2),
+        new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName2));
+
+    String emptyBackupFactory = "";
+    configMap.put(String.format(STORE_RESTORE_FACTORIES, storeName), emptyBackupFactory);
+    assertEquals(ImmutableSet.of(jobRestoreFactory1, jobRestoreFactory2),
+        new StorageConfig(new MapConfig(configMap)).getRestoreFactories());
+    assertEquals(Collections.emptyList(),
+        new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName));
+    assertEquals(ImmutableList.of(jobRestoreFactory1, jobRestoreFactory2),
+        new StorageConfig(new MapConfig(configMap)).getStoreRestoreFactories(storeName2));
+    assertEquals(Collections.emptyList(),
+        new StorageConfig(new MapConfig(configMap)).getStoresWithRestoreFactory(KAFKA_STATE_BACKEND_FACTORY));
+    assertEquals(Collections.emptyList(),
+        new StorageConfig(new MapConfig(configMap)).getStoresWithRestoreFactory(storeRestoreFactory1));
+    assertEquals(Collections.emptyList(),
+        new StorageConfig(new MapConfig(configMap)).getStoresWithRestoreFactory(storeRestoreFactory2));
+  }
+
+  @Test
+  public void testGetBackupManagers() {
+    String storeName = "store1";
+    String storeName2 = "store2";
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(String.format(FACTORY, storeName), "store1.factory.class");
+    configMap.put(String.format(FACTORY, storeName2), "store2.factory.class");
+
+    // empty config, return no restore managers
+    assertEquals(Collections.emptySet(), new StorageConfig(new MapConfig(configMap)).getBackupFactories());
+    assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName));
+    assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2));
+
+    // changelog set, should default to kafka state backend restore
+    String changelogStreamOverride = "changelogStream";
+    configMap.put(String.format(CHANGELOG_STREAM, storeName), changelogStreamOverride);
+    configMap.put(String.format(CHANGELOG_STREAM, storeName2), changelogStreamOverride);
+    configMap.put(StorageConfig.CHANGELOG_SYSTEM, "changelog-system");
+    configMap.put(String.format(StorageConfig.CHANGELOG_STREAM, storeName), "changelog-stream0");
+    assertEquals(ImmutableSet.of(KAFKA_STATE_BACKEND_FACTORY), new StorageConfig(new MapConfig(configMap)).getBackupFactories());
+    assertEquals(DEFAULT_BACKUP_FACTORIES, new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName));
+    assertEquals(DEFAULT_BACKUP_FACTORIES, new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2));
+    assertEquals(ImmutableList.of(storeName2, storeName),
+        new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(KAFKA_STATE_BACKEND_FACTORY));
+
+    // job restore manager config set should override to job backend factory
+    String jobBackupFactory1 = "jobBackendBackupFactory1";
+    String jobBackupFactory2 = "jobBackendBackupFactory2";
+    String jobBackupFactoryOverride = jobBackupFactory1 + "," + jobBackupFactory2;
+    configMap.put(JOB_BACKUP_FACTORIES, jobBackupFactoryOverride);
+    assertEquals(ImmutableSet.of(jobBackupFactory1, jobBackupFactory2),
+        new StorageConfig(new MapConfig(configMap)).getBackupFactories());
+    assertEquals(ImmutableList.of(jobBackupFactory1, jobBackupFactory2),
+        new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName));
+    assertEquals(ImmutableList.of(jobBackupFactory1, jobBackupFactory2),
+        new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2));
+    assertEquals(Collections.emptyList(),
+        new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(KAFKA_STATE_BACKEND_FACTORY));
+    assertEquals(ImmutableList.of(storeName2, storeName),
+        new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory1));
+    assertEquals(ImmutableList.of(storeName2, storeName),
+        new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory2));
+
+    // store specific restore managers set
+    String storeBackupFactory1 = "storeBackendBackupFactory1";
+    String storeBackupFactory2 = "storeBackendBackupFactory2";
+    String storeBackupFactoryOverride = storeBackupFactory1 + "," + storeBackupFactory2;
+    configMap.put(String.format(STORE_BACKUP_FACTORIES, storeName), storeBackupFactoryOverride);
+    assertEquals(ImmutableSet.of(jobBackupFactory1, jobBackupFactory2, storeBackupFactory1, storeBackupFactory2),
+        new StorageConfig(new MapConfig(configMap)).getBackupFactories());
+    assertEquals(ImmutableList.of(storeBackupFactory1, storeBackupFactory2),
+        new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName));
+    assertEquals(ImmutableList.of(jobBackupFactory1, jobBackupFactory2),
+        new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2));
+    assertEquals(Collections.emptyList(),
+        new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(KAFKA_STATE_BACKEND_FACTORY));
+    assertEquals(ImmutableList.of(storeName2),
+        new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory1));
+    assertEquals(ImmutableList.of(storeName2),
+        new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(jobBackupFactory2));
+    assertEquals(ImmutableList.of(storeName),
+        new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(storeBackupFactory1));
+    assertEquals(ImmutableList.of(storeName),
+        new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(storeBackupFactory2));
+
+    String emptyBackupFactory = "";
+    configMap.put(String.format(STORE_BACKUP_FACTORIES, storeName), emptyBackupFactory);
+    assertEquals(ImmutableSet.of(jobBackupFactory1, jobBackupFactory2),
+        new StorageConfig(new MapConfig(configMap)).getBackupFactories());
+    assertEquals(Collections.emptyList(),
+        new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName));
+    assertEquals(ImmutableList.of(jobBackupFactory1, jobBackupFactory2),
+        new StorageConfig(new MapConfig(configMap)).getStoreBackupFactories(storeName2));
+    assertEquals(Collections.emptyList(),
+        new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(KAFKA_STATE_BACKEND_FACTORY));
+    assertEquals(Collections.emptyList(),
+        new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(storeBackupFactory1));
+    assertEquals(Collections.emptyList(),
+        new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(storeBackupFactory2));
+  }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
index 7e0da6d..ddc0c8e 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java
@@ -21,6 +21,7 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.MoreExecutors;
 
 import java.io.File;
@@ -28,8 +29,8 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import org.apache.commons.lang3.tuple.Pair;
@@ -176,7 +177,7 @@
     TaskName taskName = mock(TaskName.class);
     BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap());
     metrics.initStoreMetrics(ImmutableList.of("storeName"));
-    List<String> storesToRestore = ImmutableList.of("storeName");
+    Set<String> storesToRestore = ImmutableSet.of("storeName");
     SnapshotIndex snapshotIndex = mock(SnapshotIndex.class);
     Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
         ImmutableMap.of("storeName", Pair.of("blobId", snapshotIndex));
@@ -222,7 +223,7 @@
     TaskName taskName = mock(TaskName.class);
     BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap());
     metrics.initStoreMetrics(ImmutableList.of("storeName"));
-    List<String> storesToRestore = ImmutableList.of("storeName");
+    Set<String> storesToRestore = ImmutableSet.of("storeName");
     SnapshotIndex snapshotIndex = mock(SnapshotIndex.class);
     Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
         ImmutableMap.of("storeName", Pair.of("blobId", snapshotIndex));
@@ -274,7 +275,7 @@
     TaskName taskName = mock(TaskName.class);
     BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap());
     metrics.initStoreMetrics(ImmutableList.of("storeName"));
-    List<String> storesToRestore = ImmutableList.of("storeName");
+    Set<String> storesToRestore = ImmutableSet.of("storeName");
     SnapshotIndex snapshotIndex = mock(SnapshotIndex.class);
     Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
         ImmutableMap.of("storeName", Pair.of("blobId", snapshotIndex));
@@ -332,7 +333,7 @@
     TaskName taskName = mock(TaskName.class);
     BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap());
     metrics.initStoreMetrics(ImmutableList.of("newStoreName"));
-    List<String> storesToRestore = ImmutableList.of("newStoreName"); // new store in config
+    Set<String> storesToRestore = ImmutableSet.of("newStoreName"); // new store in config
     SnapshotIndex snapshotIndex = mock(SnapshotIndex.class);
     Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes = mock(Map.class);
     when(prevStoreSnapshotIndexes.containsKey("newStoreName")).thenReturn(false);
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 5bcbdcd..85b2011 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -19,8 +19,7 @@
 
 package org.apache.samza.checkpoint
 
-import java.util.Properties
-
+import java.util.{Collections, Properties}
 import org.apache.samza.Partition
 import org.apache.samza.checkpoint.CheckpointTool.{CheckpointToolCommandLine, TaskNameToCheckpointMap}
 import org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, MockSystemFactory}
@@ -32,9 +31,10 @@
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system._
+import org.junit.Assert.{assertEquals, assertNotNull}
 import org.junit.{Before, Test}
 import org.mockito.Matchers._
-import org.mockito.Mockito
+import org.mockito.{ArgumentCaptor, Matchers, Mockito}
 import org.mockito.Mockito._
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mockito.MockitoSugar
@@ -67,8 +67,10 @@
 
   val tn0 = new TaskName("Partition 0")
   val tn1 = new TaskName("Partition 1")
+  val tn2 = new TaskName("Partition 2")
   val p0 = new Partition(0)
   val p1 = new Partition(1)
+  val p2 = new Partition(2)
 
   @Before
   def setup() {
@@ -83,8 +85,9 @@
     ).asJava)
     config = JobPlanner.generateSingleJobConfig(userDefinedConfig)
     val metadata = new SystemStreamMetadata("foo", Map[Partition, SystemStreamPartitionMetadata](
-      new Partition(0) -> new SystemStreamPartitionMetadata("0", "100", "101"),
-      new Partition(1) -> new SystemStreamPartitionMetadata("0", "200", "201")
+      p0 -> new SystemStreamPartitionMetadata("0", "100", "101"),
+      p1 -> new SystemStreamPartitionMetadata("0", "200", "201"),
+      p2 -> new SystemStreamPartitionMetadata("0", "300", "301")
     ).asJava)
     TestCheckpointTool.checkpointManager = mock[CheckpointManager]
     TestCheckpointTool.systemAdmin = mock[SystemAdmin]
@@ -94,6 +97,11 @@
       .thenReturn(new CheckpointV1(Map(new SystemStreamPartition("test", "foo", p0) -> "1234").asJava))
     when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn1))
       .thenReturn(new CheckpointV1(Map(new SystemStreamPartition("test", "foo", p1) -> "4321").asJava))
+    when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn2))
+      .thenReturn(new CheckpointV2(null,
+        Map(new SystemStreamPartition("test", "foo", p2) -> "5678").asJava,
+        Map("BackupFactory"-> Map("StoreName"-> "offset").asJava).asJava
+      ))
   }
 
   @Test
@@ -119,6 +127,42 @@
   }
 
   @Test
+  def testOverwriteCheckpointV2() {
+    // Skips the v1 checkpoints for the task
+    when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn0))
+      .thenReturn(null)
+    when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn1))
+      .thenReturn(null)
+
+    val toOverwrite = Map(
+      tn0 -> Map(new SystemStreamPartition("test", "foo", p0) -> "42"),
+      tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43"),
+      tn2 -> Map(new SystemStreamPartition("test", "foo", p2) -> "45"))
+
+    val checkpointV2Config = new MapConfig(config, Map(TaskConfig.CHECKPOINT_READ_VERSIONS -> "2").asJava)
+
+    val argument = ArgumentCaptor.forClass(classOf[CheckpointV2])
+    val checkpointTool = CheckpointTool(checkpointV2Config, toOverwrite)
+
+    checkpointTool.run()
+    verify(TestCheckpointTool.checkpointManager)
+      .writeCheckpoint(Matchers.same(tn0), argument.capture())
+    assertNotNull(argument.getValue.getCheckpointId)
+    assertEquals(Map(new SystemStreamPartition("test", "foo", p0) -> "42").asJava, argument.getValue.getOffsets)
+    assertEquals(Collections.emptyMap(), argument.getValue.getStateCheckpointMarkers)
+    verify(TestCheckpointTool.checkpointManager)
+      .writeCheckpoint(Matchers.same(tn1), argument.capture())
+    assertNotNull(argument.getValue.getCheckpointId)
+    assertEquals(Map(new SystemStreamPartition("test", "foo", p1) -> "43").asJava, argument.getValue.getOffsets)
+    assertEquals(Collections.emptyMap(), argument.getValue.getStateCheckpointMarkers)
+    verify(TestCheckpointTool.checkpointManager)
+      .writeCheckpoint(Matchers.same(tn2), argument.capture())
+    assertNotNull(argument.getValue.getCheckpointId)
+    assertEquals(Map(new SystemStreamPartition("test", "foo", p2) -> "45").asJava, argument.getValue.getOffsets)
+    assertEquals(Map("BackupFactory"-> Map("StoreName"-> "offset").asJava).asJava, argument.getValue.getStateCheckpointMarkers)
+  }
+
+  @Test
   def testGrouping(): Unit = {
     val config : java.util.Properties = new Properties()
     config.put("tasknames.Partition 0.systems.kafka-atc-repartitioned-requests.streams.ArticleRead.partitions.0", "0000")
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 606f86d..e357fe5 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -227,6 +227,8 @@
     when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
     val commitTimer = mock[Timer]
     when(this.metrics.commitNs).thenReturn(commitTimer)
+    val lastCommitGauge = mock[Gauge[Long]]
+    when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
     val commitSyncTimer = mock[Timer]
     when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
     val commitAsyncTimer = mock[Timer]
@@ -320,6 +322,8 @@
     when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
     val commitTimer = mock[Timer]
     when(this.metrics.commitNs).thenReturn(commitTimer)
+    val lastCommitGauge = mock[Gauge[Long]]
+    when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
     val commitSyncTimer = mock[Timer]
     when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
     val commitAsyncTimer = mock[Timer]
@@ -389,6 +393,8 @@
     when(this.metrics.asyncUploadNs).thenReturn(uploadTimer)
     val skippedCounter = mock[Gauge[Int]]
     when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+    val lastCommitGauge = mock[Gauge[Long]]
+    when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
 
     val inputOffsets = Map(SYSTEM_STREAM_PARTITION -> "4").asJava
     val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]()
@@ -460,6 +466,8 @@
     when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
     val skippedCounter = mock[Gauge[Int]]
     when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+    val lastCommitGauge = mock[Gauge[Long]]
+    when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
 
     val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
     inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -510,6 +518,8 @@
     when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
     val skippedCounter = mock[Gauge[Int]]
     when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+    val lastCommitGauge = mock[Gauge[Long]]
+    when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
 
     val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
     inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -560,6 +570,8 @@
     when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
     val skippedCounter = mock[Gauge[Int]]
     when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+    val lastCommitGauge = mock[Gauge[Long]]
+    when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
 
     val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
     inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -611,6 +623,8 @@
     when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
     val skippedCounter = mock[Gauge[Int]]
     when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+    val lastCommitGauge = mock[Gauge[Long]]
+    when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
 
     val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
     inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -662,6 +676,8 @@
     when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
     val skippedCounter = mock[Gauge[Int]]
     when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+    val lastCommitGauge = mock[Gauge[Long]]
+    when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
 
     val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
     inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -714,6 +730,8 @@
     when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
     val skippedCounter = mock[Gauge[Int]]
     when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+    val lastCommitGauge = mock[Gauge[Long]]
+    when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
 
     val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
     inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -728,10 +746,19 @@
 
     taskInstance.commit // async stage will be run by caller due to direct executor
 
+    val asyncCommitTimeCaptor = ArgumentCaptor.forClass(classOf[Long])
+    val uploadTimeCaptor = ArgumentCaptor.forClass(classOf[Long])
+    val cleanUpTimeCaptor = ArgumentCaptor.forClass(classOf[Long])
+
     verify(commitsCounter).inc()
     verify(snapshotTimer).update(anyLong())
     verify(uploadTimer).update(anyLong())
     verify(commitTimer).update(anyLong())
+    verify(commitAsyncTimer).update(asyncCommitTimeCaptor.capture())
+    verify(uploadTimer).update(uploadTimeCaptor.capture())
+    verify(cleanUpTimer).update(cleanUpTimeCaptor.capture())
+
+    assertTrue((cleanUpTimeCaptor.getValue + uploadTimeCaptor.getValue) < asyncCommitTimeCaptor.getValue)
 
     taskInstance.commit
 
@@ -763,6 +790,8 @@
     when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
     val skippedCounter = mock[Gauge[Int]]
     when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+    val lastCommitGauge = mock[Gauge[Long]]
+    when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
 
     val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
     inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -794,7 +823,6 @@
 
     verify(commitsCounter, times(1)).inc() // should only have been incremented once on the initial commit
     verify(snapshotTimer).update(anyLong())
-    verify(uploadTimer).update(anyLong())
     verifyZeroInteractions(commitTimer)
 
     cleanUpFuture.complete(null) // just to unblock shared executor
@@ -818,6 +846,8 @@
     when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
     val skippedCounter = mock[Gauge[Int]]
     when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+    val lastCommitGauge = mock[Gauge[Long]]
+    when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
 
     val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
     inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
@@ -879,6 +909,8 @@
     when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
     val skippedCounter = mock[Gauge[Int]]
     when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
+    val lastCommitGauge = mock[Gauge[Long]]
+    when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
 
     val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
     inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index f381f3d..8645e4a 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -18,16 +18,24 @@
  */
 package org.apache.samza.storage;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.io.File;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.samza.Partition;
 import org.apache.samza.checkpoint.CheckpointManager;
 import org.apache.samza.checkpoint.CheckpointV1;
+import org.apache.samza.checkpoint.CheckpointV2;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.SamzaContainerMetrics;
 import org.apache.samza.container.TaskInstance;
@@ -170,6 +178,8 @@
     Map<String, String> configMap = new HashMap<>();
     configMap.put("stores." + STORE_NAME + ".key.serde", "stringserde");
     configMap.put("stores." + STORE_NAME + ".msg.serde", "stringserde");
+    configMap.put("stores." + STORE_NAME + ".factory", mockStorageEngineFactory.getClass().getName());
+    configMap.put("stores." + STORE_NAME + ".changelog", SYSTEM_NAME + "." + STREAM_NAME);
     configMap.put("serializers.registry.stringserde.class", StringSerdeFactory.class.getName());
     configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
     Config config = new MapConfig(configMap);
@@ -215,7 +225,7 @@
     ContainerModel mockContainerModel = new ContainerModel("samza-container-test", tasks);
     when(mockContainerContext.getContainerModel()).thenReturn(mockContainerModel);
 
-    // Reset the  expected number of sysConsumer create, start and stop calls, and store.restore() calls
+    // Reset the expected number of sysConsumer create, start and stop calls, and store.restore() calls
     this.systemConsumerCreationCount = 0;
     this.systemConsumerStartCount = 0;
     this.systemConsumerStopCount = 0;
@@ -223,7 +233,7 @@
 
     StateBackendFactory backendFactory = mock(StateBackendFactory.class);
     TaskRestoreManager restoreManager = mock(TaskRestoreManager.class);
-    when(backendFactory.getRestoreManager(any(), any(), any(), any(), any(), any(), any(), any(), any(), any()))
+    when(backendFactory.getRestoreManager(any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any()))
         .thenReturn(restoreManager);
     doAnswer(invocation -> {
       storeRestoreCallCount++;
@@ -246,7 +256,7 @@
         samzaContainerMetrics,
         mock(JobContext.class),
         mockContainerContext,
-        backendFactory,
+        ImmutableMap.of(StorageConfig.KAFKA_STATE_BACKEND_FACTORY, backendFactory),
         mock(Map.class),
         DEFAULT_LOGGED_STORE_BASE_DIR,
         DEFAULT_STORE_BASE_DIR,
@@ -270,4 +280,224 @@
     Assert.assertEquals("systemConsumerStopCount count should be 1", 1, this.systemConsumerStopCount);
     Assert.assertEquals("systemConsumerStartCount count should be 1", 1, this.systemConsumerStartCount);
   }
+
+  @Test
+  public void testNoConfiguredDurableStores() throws InterruptedException {
+    taskRestoreMetricGauges = new HashMap<>();
+    this.tasks = new HashMap<>();
+    this.taskInstanceMetrics = new HashMap<>();
+
+    // Add two mocked tasks
+    addMockedTask("task 0", 0);
+    addMockedTask("task 1", 1);
+
+    // Mock container metrics
+    samzaContainerMetrics = mock(SamzaContainerMetrics.class);
+    when(samzaContainerMetrics.taskStoreRestorationMetrics()).thenReturn(taskRestoreMetricGauges);
+
+    // Create mocked configs for specifying serdes
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("serializers.registry.stringserde.class", StringSerdeFactory.class.getName());
+    configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
+    Config config = new MapConfig(configMap);
+
+    Map<String, Serde<Object>> serdes = new HashMap<>();
+    serdes.put("stringserde", mock(Serde.class));
+
+    CheckpointManager checkpointManager = mock(CheckpointManager.class);
+    when(checkpointManager.readLastCheckpoint(any(TaskName.class))).thenReturn(new CheckpointV1(new HashMap<>()));
+
+    ContainerContext mockContainerContext = mock(ContainerContext.class);
+    ContainerModel mockContainerModel = new ContainerModel("samza-container-test", tasks);
+    when(mockContainerContext.getContainerModel()).thenReturn(mockContainerModel);
+
+    // Reset the expected number of sysConsumer create, start and stop calls, and store.restore() calls
+    this.systemConsumerCreationCount = 0;
+    this.systemConsumerStartCount = 0;
+    this.systemConsumerStopCount = 0;
+    this.storeRestoreCallCount = 0;
+
+    StateBackendFactory backendFactory = mock(StateBackendFactory.class);
+    TaskRestoreManager restoreManager = mock(TaskRestoreManager.class);
+    when(backendFactory.getRestoreManager(any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any()))
+        .thenReturn(restoreManager);
+    doAnswer(invocation -> {
+      storeRestoreCallCount++;
+      return null;
+    }).when(restoreManager).restore();
+
+    // Create the container storage manager
+    ContainerStorageManager containerStorageManager = new ContainerStorageManager(
+        checkpointManager,
+        mockContainerModel,
+        mock(StreamMetadataCache.class),
+        mock(SystemAdmins.class),
+        new HashMap<>(),
+        new HashMap<>(),
+        new HashMap<>(),
+        new HashMap<>(),
+        serdes,
+        config,
+        taskInstanceMetrics,
+        samzaContainerMetrics,
+        mock(JobContext.class),
+        mockContainerContext,
+        new HashMap<>(),
+        mock(Map.class),
+        DEFAULT_LOGGED_STORE_BASE_DIR,
+        DEFAULT_STORE_BASE_DIR,
+        null,
+        new SystemClock());
+
+    containerStorageManager.start();
+    containerStorageManager.shutdown();
+
+    for (Gauge gauge : taskRestoreMetricGauges.values()) {
+      Assert.assertTrue("Restoration time gauge value should never be invoked",
+          mockingDetails(gauge).getInvocations().size() == 0);
+    }
+
+    Assert.assertEquals("Store restore count should be 2 because there are 0 stores", 0, this.storeRestoreCallCount);
+    Assert.assertEquals(0,
+        this.systemConsumerCreationCount);
+    Assert.assertEquals(0, this.systemConsumerStopCount);
+    Assert.assertEquals(0, this.systemConsumerStartCount);
+  }
+
+  @Test
+  public void testCheckpointBasedRestoreFactoryCreation() {
+    Set<String> storeNames = ImmutableSet.of("storeName0", "storeName1", "storeName2");
+
+    StorageConfig mockConfig = mock(StorageConfig.class);
+    when(mockConfig.getStoreRestoreFactories("storeName0"))
+        .thenReturn(ImmutableList.of("factory0", "factory1", "factory2"));
+    when(mockConfig.getStoreRestoreFactories("storeName1"))
+        .thenReturn(ImmutableList.of("factory2", "factory1"));
+    when(mockConfig.getStoreRestoreFactories("storeName2"))
+        .thenReturn(Collections.emptyList());
+
+    when(mockConfig.getChangelogStream("storeName0"))
+        .thenReturn(Optional.empty());
+    when(mockConfig.getChangelogStream("storeName1"))
+        .thenReturn(Optional.of("changelog"));
+    when(mockConfig.getChangelogStream("storeName2"))
+        .thenReturn(Optional.of("changelog"));
+
+    CheckpointV1 checkpointV1 = mock(CheckpointV1.class);
+    when(checkpointV1.getVersion()).thenReturn((short) 1);
+    Map<String, Set<String>> factoriesToStores = this.containerStorageManager
+        .getBackendFactoryStoreNames(checkpointV1, storeNames, mockConfig);
+
+    Assert.assertEquals(1, factoriesToStores.size());
+    Assert.assertEquals(ImmutableSet.of("storeName1", "storeName2"),
+        factoriesToStores.get(StorageConfig.KAFKA_STATE_BACKEND_FACTORY));
+
+    factoriesToStores = this.containerStorageManager
+        .getBackendFactoryStoreNames(null, storeNames, mockConfig);
+
+    Assert.assertEquals(2, factoriesToStores.size());
+    Assert.assertEquals(ImmutableSet.of("storeName0"),
+        factoriesToStores.get("factory0"));
+    Assert.assertEquals(ImmutableSet.of("storeName1"),
+        factoriesToStores.get("factory2"));
+  }
+
+  @Test
+  public void testCheckpointV2BasedRestoreFactoryCreation() {
+    Set<String> storeNames = ImmutableSet.of("storeName0", "storeName1", "storeName2");
+
+    StorageConfig mockConfig = mock(StorageConfig.class);
+    when(mockConfig.getStoreRestoreFactories("storeName0"))
+        .thenReturn(ImmutableList.of("factory0", "factory1", "factory2"));
+    when(mockConfig.getStoreRestoreFactories("storeName1"))
+        .thenReturn(ImmutableList.of("factory2", "factory1"));
+    when(mockConfig.getStoreRestoreFactories("storeName2"))
+        .thenReturn(Collections.emptyList());
+
+    when(mockConfig.getChangelogStream("storeName0"))
+        .thenReturn(Optional.empty());
+    when(mockConfig.getChangelogStream("storeName1"))
+        .thenReturn(Optional.of("changelog"));
+    when(mockConfig.getChangelogStream("storeName2"))
+        .thenReturn(Optional.of("changelog"));
+
+    CheckpointV2 checkpointV2 = mock(CheckpointV2.class);
+    when(checkpointV2.getVersion()).thenReturn((short) 2);
+    when(checkpointV2.getStateCheckpointMarkers())
+        .thenReturn(ImmutableMap.of(
+            "factory0", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""),
+            "factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""),
+            "factory2", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", "")));
+
+    Map<String, Set<String>> factoriesToStores = this.containerStorageManager
+        .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig);
+    Assert.assertEquals(2, factoriesToStores.size());
+    Assert.assertEquals(ImmutableSet.of("storeName0"),
+        factoriesToStores.get("factory0"));
+    Assert.assertEquals(ImmutableSet.of("storeName1"),
+        factoriesToStores.get("factory2"));
+
+    when(checkpointV2.getStateCheckpointMarkers())
+        .thenReturn(ImmutableMap.of(
+            "factory2", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", "")));
+    factoriesToStores = this.containerStorageManager
+        .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig);
+    Assert.assertEquals(1, factoriesToStores.size());
+    Assert.assertEquals(ImmutableSet.of("storeName1", "storeName0"),
+        factoriesToStores.get("factory2"));
+
+    when(checkpointV2.getStateCheckpointMarkers())
+        .thenReturn(ImmutableMap.of(
+            "factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""),
+            "factory2", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", "")));
+    factoriesToStores = this.containerStorageManager
+        .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig);
+    Assert.assertEquals(2, factoriesToStores.size());
+    Assert.assertEquals(ImmutableSet.of("storeName0"),
+        factoriesToStores.get("factory1"));
+    Assert.assertEquals(ImmutableSet.of("storeName1"),
+        factoriesToStores.get("factory2"));
+
+    when(checkpointV2.getStateCheckpointMarkers())
+        .thenReturn(ImmutableMap.of(
+            "factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""),
+            "factory2", ImmutableMap.of("storeName0", "", "storeName2", "")));
+    factoriesToStores = this.containerStorageManager
+        .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig);
+    Assert.assertEquals(1, factoriesToStores.size());
+    Assert.assertEquals(ImmutableSet.of("storeName0", "storeName1"),
+        factoriesToStores.get("factory1"));
+
+    when(checkpointV2.getStateCheckpointMarkers())
+        .thenReturn(ImmutableMap.of(
+            "factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", "")));
+    factoriesToStores = this.containerStorageManager
+        .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig);
+    Assert.assertEquals(1, factoriesToStores.size());
+    Assert.assertEquals(ImmutableSet.of("storeName0", "storeName1"),
+        factoriesToStores.get("factory1"));
+
+    when(checkpointV2.getStateCheckpointMarkers())
+        .thenReturn(Collections.emptyMap());
+    factoriesToStores = this.containerStorageManager
+        .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig);
+    Assert.assertEquals(2, factoriesToStores.size());
+    Assert.assertEquals(ImmutableSet.of("storeName0"),
+        factoriesToStores.get("factory0"));
+    Assert.assertEquals(ImmutableSet.of("storeName1"),
+        factoriesToStores.get("factory2"));
+
+    when(checkpointV2.getStateCheckpointMarkers())
+        .thenReturn(ImmutableMap.of(
+            "factory0", ImmutableMap.of("storeName1", "", "storeName2", ""),
+            "factory1", ImmutableMap.of("storeName1", "", "storeName2", ""),
+            "factory2", ImmutableMap.of("storeName0", "", "storeName2", "")));
+    factoriesToStores = this.containerStorageManager
+        .getBackendFactoryStoreNames(checkpointV2, storeNames, mockConfig);
+    Assert.assertEquals(2, factoriesToStores.size());
+    Assert.assertEquals(ImmutableSet.of("storeName1"),
+        factoriesToStores.get("factory1"));
+    Assert.assertEquals(ImmutableSet.of("storeName0"),
+        factoriesToStores.get("factory2"));
+  }
 }
diff --git a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
index 0f0b071..01c22f5 100644
--- a/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
+++ b/samza-kafka/src/main/java/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.java
@@ -19,6 +19,8 @@
 package org.apache.samza.checkpoint.kafka;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
 import org.apache.samza.container.TaskName;
 
 /**
@@ -28,6 +30,10 @@
 
   public static final String CHECKPOINT_V1_KEY_TYPE = "checkpoint";
   public static final String CHECKPOINT_V2_KEY_TYPE = "checkpoint-v2";
+  public static final Map<String, Short> CHECKPOINT_KEY_VERSIONS = ImmutableMap.of(
+      CHECKPOINT_V1_KEY_TYPE, (short) 1,
+      CHECKPOINT_V2_KEY_TYPE, (short) 2
+  );
   /**
    * The SystemStreamPartitionGrouperFactory configured for this job run. Since, checkpoints of different
    * groupers are not compatible, we persist and validate them across job runs.
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 8fd21fe..5c061ae 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -152,9 +152,8 @@
     }
 
     if (topicPartitionToSSP.size() == 0) {
-      String msg = String.format("Cannot start KafkaConsumerProxy without any registered TopicPartitions for %s", systemName);
-      LOG.error(msg);
-      throw new SamzaException(msg);
+      String msg = String.format("Started KafkaConsumerProxy without any registered TopicPartitions for %s", systemName);
+      LOG.warn(msg);
     }
   }
 
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index a83a34b..e719439 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -34,6 +34,7 @@
 import org.apache.samza.util.Logging
 import org.apache.samza.{Partition, SamzaException}
 
+import java.{lang, util}
 import scala.collection.mutable
 
 /**
@@ -80,7 +81,7 @@
   // for active containers, this will be set to true, while false for standby containers.
   val stopConsumerAfterFirstRead: Boolean = new TaskConfig(config).getCheckpointManagerConsumerStopAfterFirstRead
 
-  val checkpointReadVersion: Short = new TaskConfig(config).getCheckpointReadVersion
+  val checkpointReadVersions: util.List[lang.Short] = new TaskConfig(config).getCheckpointReadVersions
 
   /**
     * Create checkpoint stream prior to start.
@@ -287,16 +288,17 @@
         val msgBytes = checkpointEnvelope.getMessage.asInstanceOf[Array[Byte]]
         try {
           // if checkpoint key version does not match configured checkpoint version to read, skip the message.
-          if (checkpointReadVersion == CheckpointV1.CHECKPOINT_VERSION &&
-            KafkaCheckpointLogKey.CHECKPOINT_V1_KEY_TYPE.equals(checkpointKey.getType)) {
-            val msgBytes = checkpointEnvelope.getMessage.asInstanceOf[Array[Byte]]
-            val checkpoint = checkpointV1MsgSerde.fromBytes(msgBytes)
-            checkpoints.put(checkpointKey.getTaskName, checkpoint)
-          } else if (checkpointReadVersion == CheckpointV2.CHECKPOINT_VERSION &&
-            KafkaCheckpointLogKey.CHECKPOINT_V2_KEY_TYPE.equals(checkpointKey.getType)) {
-            val checkpoint = checkpointV2MsgSerde.fromBytes(msgBytes)
-            checkpoints.put(checkpointKey.getTaskName, checkpoint)
-          } // else ignore and skip the message
+          if (checkpointReadVersions.contains(
+            KafkaCheckpointLogKey.CHECKPOINT_KEY_VERSIONS.get(checkpointKey.getType))) {
+            if (!checkpoints.contains(checkpointKey.getTaskName) ||
+              shouldOverrideCheckpoint(checkpoints.get(checkpointKey.getTaskName), checkpointKey)) {
+              checkpoints.put(checkpointKey.getTaskName, deserializeCheckpoint(checkpointKey, msgBytes))
+            } // else ignore the de-prioritized checkpoint
+          } else {
+            // Ignore and skip the unknown checkpoint key type. We do not want to throw any exceptions for this case
+            // for forwards compatibility with new checkpoints versions in the checkpoint topic
+            warn(s"Ignoring unknown checkpoint key type for checkpoint key: $checkpointKey")
+          }
         } catch {
           case e: Exception =>
             if (validateCheckpoint) {
@@ -372,4 +374,29 @@
       case _ => throw new SamzaException("Unknown checkpoint version: " + checkpoint.getVersion)
     }
   }
+
+  private def shouldOverrideCheckpoint(currentCheckpoint: Option[Checkpoint],
+    newCheckpointKey: KafkaCheckpointLogKey): Boolean = {
+    val newCheckpointVersion = KafkaCheckpointLogKey.CHECKPOINT_KEY_VERSIONS.get(newCheckpointKey.getType)
+    if (newCheckpointVersion == null) {
+      // Unknown checkpoint version
+      throw new IllegalArgumentException("Unknown checkpoint key type: " + newCheckpointKey.getType +
+        " for checkpoint key: " + newCheckpointKey)
+    }
+    // Override checkpoint if the current checkpoint does not exist or if new checkpoint has a higher restore
+    // priority than the currently written checkpoint
+    currentCheckpoint.isEmpty ||
+      checkpointReadVersions.indexOf(newCheckpointVersion) <=
+        checkpointReadVersions.indexOf(currentCheckpoint.get.getVersion)
+  }
+
+  private def deserializeCheckpoint(checkpointKey: KafkaCheckpointLogKey, checkpointMsgBytes: Array[Byte]): Checkpoint = {
+    if (KafkaCheckpointLogKey.CHECKPOINT_V1_KEY_TYPE.equals(checkpointKey.getType)) {
+      checkpointV1MsgSerde.fromBytes(checkpointMsgBytes)
+    } else if (KafkaCheckpointLogKey.CHECKPOINT_V2_KEY_TYPE.equals(checkpointKey.getType)) {
+      checkpointV2MsgSerde.fromBytes(checkpointMsgBytes)
+    } else {
+      throw new IllegalArgumentException("Unknown checkpoint key type: " + checkpointKey.getType)
+    }
+  }
 }
diff --git a/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala
index 167bc78..4c8d8e4 100644
--- a/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -767,8 +767,12 @@
       "stores.loggedStore1.clean.on.container.start" -> cleanStoreDirsOnStart.toString,
       "stores.store1.key.serde" -> classOf[StringSerdeFactory].getCanonicalName,
       "stores.store1.msg.serde" -> classOf[StringSerdeFactory].getCanonicalName,
+      "stores.store1.factory" -> mockStorageEngineFactory.getClass.getName,
+      "stores.store1.changelog" -> "system.stream",
+      "stores.loggedStore1.factory" -> mockStorageEngineFactory.getClass.getName,
       "stores.loggedStore1.key.serde" -> classOf[StringSerdeFactory].getCanonicalName,
       "stores.loggedStore1.msg.serde" -> classOf[StringSerdeFactory].getCanonicalName,
+      "stores.loggedStore1.changelog" -> "system.stream",
       TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED -> "false").asJava)
 
     var mockSerdes: Map[String, Serde[AnyRef]] = HashMap[String, Serde[AnyRef]]((classOf[StringSerdeFactory].getCanonicalName, Mockito.mock(classOf[Serde[AnyRef]])))
@@ -800,7 +804,8 @@
       Mockito.mock(classOf[SamzaContainerMetrics]),
       mockJobContext,
       mockContainerContext,
-      new mockKafkaChangelogBackendManager(changeLogSystemStreams),
+      ImmutableMap.of(StorageConfig.KAFKA_STATE_BACKEND_FACTORY,
+        new mockKafkaChangelogBackendManager(changeLogSystemStreams)),
       new HashMap[TaskName, TaskInstanceCollector].asJava,
       loggedStoreBaseDir,
       TaskStorageManagerBuilder.defaultStoreBaseDir,
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index f940292..866904b 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -194,7 +194,7 @@
       .put(s"systems.$checkpointSystemName.producer.bootstrap.servers", brokerList)
       .put(s"systems.$checkpointSystemName.consumer.zookeeper.connect", zkConnect)
       .put("task.checkpoint.system", checkpointSystemName)
-      .put(TaskConfig.CHECKPOINT_READ_VERSION, "2")
+      .put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2")
       .build())
 
     // Skips reading any v1 checkpoints
@@ -211,6 +211,59 @@
   }
 
   @Test
+  def testCheckpointV1AndV2WriteAndReadV1V2PrecedenceList(): Unit = {
+    val checkpointTopic = "checkpoint-topic-1"
+    val kcm1 = createKafkaCheckpointManager(checkpointTopic)
+    kcm1.register(taskName)
+    kcm1.createResources
+    kcm1.start
+    kcm1.stop
+
+    // check that start actually creates the topic with log compaction enabled
+    val topicConfig = adminZkClient.getAllTopicConfigs().getOrElse(checkpointTopic, new Properties())
+
+    assertEquals(topicConfig, new KafkaConfig(config).getCheckpointTopicProperties())
+    assertEquals("compact", topicConfig.get("cleanup.policy"))
+    assertEquals("26214400", topicConfig.get("segment.bytes"))
+
+    // read before topic exists should result in a null checkpoint
+    val readCp = readCheckpoint(checkpointTopic, taskName)
+    assertNull(readCp)
+
+    val checkpointV1 = new CheckpointV1(ImmutableMap.of(ssp, "offset-1"))
+    val checkpointV2 = new CheckpointV2(CheckpointId.create(), ImmutableMap.of(ssp, "offset-2"),
+      ImmutableMap.of("factory1", ImmutableMap.of("store1", "changelogOffset")))
+
+    val overrideConfig = new MapConfig(new ImmutableMap.Builder[String, String]()
+      .put(JobConfig.JOB_NAME, "some-job-name")
+      .put(JobConfig.JOB_ID, "i001")
+      .put(s"systems.$checkpointSystemName.samza.factory", classOf[KafkaSystemFactory].getCanonicalName)
+      .put(s"systems.$checkpointSystemName.producer.bootstrap.servers", brokerList)
+      .put(s"systems.$checkpointSystemName.consumer.zookeeper.connect", zkConnect)
+      .put("task.checkpoint.system", checkpointSystemName)
+      .put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2,1")
+      .build())
+
+    // Still reads any v1 checkpoints due to precedence list
+    writeCheckpoint(checkpointTopic, taskName, checkpointV1)
+    assertEquals(checkpointV1, readCheckpoint(checkpointTopic, taskName, overrideConfig))
+
+    // writing a v2 checkpoint would allow reading it back
+    writeCheckpoint(checkpointTopic, taskName, checkpointV2)
+    assertEquals(checkpointV2, readCheckpoint(checkpointTopic, taskName, overrideConfig))
+
+    // writing v1 checkpoint is still skipped
+    writeCheckpoint(checkpointTopic, taskName, checkpointV1)
+    assertEquals(checkpointV2, readCheckpoint(checkpointTopic, taskName, overrideConfig))
+
+    val newCheckpointV2 = new CheckpointV2(CheckpointId.create(), ImmutableMap.of(ssp, "offset-3"),
+      ImmutableMap.of("factory1", ImmutableMap.of("store1", "changelogOffset")))
+    // writing v2 returns a new checkpoint v2
+    writeCheckpoint(checkpointTopic, taskName, newCheckpointV2)
+    assertEquals(newCheckpointV2, readCheckpoint(checkpointTopic, taskName, overrideConfig))
+  }
+
+  @Test
   def testCheckpointValidationSkipped(): Unit = {
     val checkpointTopic = "checkpoint-topic-1"
     val kcm1 = createKafkaCheckpointManager(checkpointTopic, serde = new MockCheckpointSerde(),
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
index d5059b3..34d407f 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
@@ -106,8 +106,8 @@
       storePropertiesBuilder.setPersistedToDisk(true);
     }
     // The store is durable iff it is backed by the task backup manager
-    List<String> storeBackupManager = storageConfig.getStoreBackupFactory(storeName);
-    storePropertiesBuilder.setIsDurable(!storeBackupManager.isEmpty());
+    List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
+    storePropertiesBuilder.setIsDurable(!storeBackupManagers.isEmpty());
 
     int batchSize = storageConfigSubset.getInt(WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
     int cacheSize = storageConfigSubset.getInt(OBJECT_CACHE_SIZE, Math.max(batchSize, DEFAULT_OBJECT_CACHE_SIZE));
diff --git a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
index d0f1c2f..66eb79c 100644
--- a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
@@ -84,7 +84,7 @@
         Arrays.asList(null, null, "98", "99", "4", "5", "5");
     List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", "2", "3");
     Map<String, String> configOverrides = new HashMap<>(CONFIGS);
-    configOverrides.put(TaskConfig.CHECKPOINT_READ_VERSION, "2");
+    configOverrides.put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2");
     secondRun(CHANGELOG_TOPIC,
         expectedChangelogMessagesAfterSecondRun, expectedInitialStoreContentsOnSecondRun, configOverrides);
   }