SOLR-13608: Incremental backup for Solr
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
index ceec4e2..ceac8af 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
@@ -77,29 +77,32 @@
     }
     String backupName = message.getStr(NAME);
     String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
+    boolean incremental = message.getBool(CoreAdminParams.BACKUP_INCREMENTAL, false);
 
     Instant startTime = Instant.now();
 
     CoreContainer cc = ocmh.overseer.getCoreContainer();
     BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
-    BackupManager backupMgr = new BackupManager(repository, ocmh.zkStateReader);
 
     // Backup location
     URI location = repository.createURI(message.getStr(CoreAdminParams.BACKUP_LOCATION));
     URI backupPath = repository.resolve(location, backupName);
 
+    BackupManager backupMgr = BackupManager.forBackup(repository, ocmh.zkStateReader, location, backupName);
+
     //Validating if the directory already exists.
     if (repository.exists(backupPath)) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The backup directory already exists: " + backupPath);
+      if (!incremental)
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The backup directory already exists: " + backupPath);
+    } else {
+      // Create a directory to store backup details.
+      repository.createDirectory(backupPath);
     }
 
-    // Create a directory to store backup details.
-    repository.createDirectory(backupPath);
-
     String strategy = message.getStr(CollectionAdminParams.INDEX_BACKUP_STRATEGY, CollectionAdminParams.COPY_FILES_STRATEGY);
     switch (strategy) {
       case CollectionAdminParams.COPY_FILES_STRATEGY: {
-        copyIndexFiles(backupPath, collectionName, message, results);
+        copyIndexFiles(backupPath, collectionName, message, results, incremental);
         break;
       }
       case CollectionAdminParams.NO_INDEX_BACKUP_STRATEGY: {
@@ -111,15 +114,16 @@
 
     //Download the configs
     String configName = ocmh.zkStateReader.readConfigName(collectionName);
-    backupMgr.downloadConfigDir(location, backupName, configName);
+    backupMgr.downloadConfigDir(configName);
 
     //Save the collection's state. Can be part of the monolithic clusterstate.json or a individual state.json
     //Since we don't want to distinguish we extract the state and back it up as a separate json
     DocCollection collectionState = ocmh.zkStateReader.getClusterState().getCollection(collectionName);
-    backupMgr.writeCollectionState(location, backupName, collectionName, collectionState);
+    backupMgr.writeCollectionState(collectionName, collectionState);
+    backupMgr.downloadCollectionProperties(collectionName);
 
+    // We should write the properties last, the exist of file guarantee that the backup success
     Properties properties = new Properties();
-
     properties.put(BackupManager.BACKUP_NAME_PROP, backupName);
     properties.put(BackupManager.COLLECTION_NAME_PROP, collectionName);
     properties.put(BackupManager.COLLECTION_ALIAS_PROP, extCollectionName);
@@ -130,9 +134,7 @@
     //if they are not the same then we can throw an error or have an 'overwriteConfig' flag
     //TODO save numDocs for the shardLeader. We can use it to sanity check the restore.
 
-    backupMgr.writeBackupProperties(location, backupName, properties);
-
-    backupMgr.downloadCollectionProperties(location, backupName, collectionName);
+    backupMgr.writeBackupProperties(properties);
 
     log.info("Completed backing up ZK data for backupName={}", backupName);
   }
@@ -142,7 +144,7 @@
     // If that is not possible, we choose any other replica for the given shard.
     Collection<CoreSnapshotMetaData> snapshots = snapshotMeta.getReplicaSnapshotsForShard(slice.getName());
 
-    Optional<CoreSnapshotMetaData> leaderCore = snapshots.stream().filter(x -> x.isLeader()).findFirst();
+    Optional<CoreSnapshotMetaData> leaderCore = snapshots.stream().filter(CoreSnapshotMetaData::isLeader).findFirst();
     if (leaderCore.isPresent()) {
       log.info("Replica {} was the leader when snapshot {} was created.", leaderCore.get().getCoreName(), snapshotMeta.getName());
       Replica r = slice.getReplica(leaderCore.get().getCoreName());
@@ -155,7 +157,7 @@
                                .filter(x -> x.getState() != State.DOWN && snapshotMeta.isSnapshotExists(slice.getName(), x))
                                .findFirst();
 
-    if (!r.isPresent()) {
+    if (r.isEmpty()) {
       throw new SolrException(ErrorCode.SERVER_ERROR,
           "Unable to find any live replica with a snapshot named " + snapshotMeta.getName() + " for shard " + slice.getName());
     }
@@ -163,7 +165,7 @@
     return r.get();
   }
 
-  private void copyIndexFiles(URI backupPath, String collectionName, ZkNodeProps request, NamedList results) throws Exception {
+  private void copyIndexFiles(URI backupPath, String collectionName, ZkNodeProps request, NamedList results, boolean incremental) throws Exception {
     String backupName = request.getStr(NAME);
     String asyncId = request.getStr(ASYNC);
     String repoName = request.getStr(CoreAdminParams.BACKUP_REPOSITORY);
@@ -174,7 +176,7 @@
     if (commitName != null) {
       SolrZkClient zkClient = ocmh.zkStateReader.getZkClient();
       snapshotMeta = SolrSnapshotManager.getCollectionLevelSnapshot(zkClient, collectionName, commitName);
-      if (!snapshotMeta.isPresent()) {
+      if (snapshotMeta.isEmpty()) {
         throw new SolrException(ErrorCode.BAD_REQUEST, "Snapshot with name " + commitName
             + " does not exist for collection " + collectionName);
       }
@@ -194,7 +196,7 @@
 
     final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
     for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices()) {
-      Replica replica = null;
+      Replica replica;
 
       if (snapshotMeta.isPresent()) {
         if (!shardsToConsider.contains(slice.getName())) {
@@ -219,9 +221,8 @@
       params.set(CoreAdminParams.BACKUP_REPOSITORY, repoName);
       params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.toASCIIString()); // note: index dir will be here then the "snapshot." + slice name
       params.set(CORE_NAME_PROP, coreName);
-      if (snapshotMeta.isPresent()) {
-        params.set(CoreAdminParams.COMMIT_NAME, snapshotMeta.get().getName());
-      }
+      params.set(CoreAdminParams.BACKUP_INCREMENTAL, incremental);
+      snapshotMeta.ifPresent(collectionSnapshotMetaData -> params.set(CoreAdminParams.COMMIT_NAME, collectionSnapshotMetaData.getName()));
 
       shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler);
       log.debug("Sent backup request to core={} for backupName={}", coreName, backupName);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
index 20dfc6d..de62ed9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
@@ -103,12 +103,12 @@
     URI location = repository.createURI(message.getStr(CoreAdminParams.BACKUP_LOCATION));
     URI backupPath = repository.resolve(location, backupName);
     ZkStateReader zkStateReader = ocmh.zkStateReader;
-    BackupManager backupMgr = new BackupManager(repository, zkStateReader);
+    BackupManager backupMgr = BackupManager.forRestore(repository, zkStateReader, location, backupName);
 
-    Properties properties = backupMgr.readBackupProperties(location, backupName);
+    Properties properties = backupMgr.readBackupProperties();
     String backupCollection = properties.getProperty(BackupManager.COLLECTION_NAME_PROP);
     String backupCollectionAlias = properties.getProperty(BackupManager.COLLECTION_ALIAS_PROP);
-    DocCollection backupCollectionState = backupMgr.readCollectionState(location, backupName, backupCollection);
+    DocCollection backupCollectionState = backupMgr.readCollectionState(backupCollection);
 
     // Get the Solr nodes to restore a collection.
     final List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(
@@ -149,7 +149,7 @@
       //TODO add overwrite option?
     } else {
       log.info("Uploading config {}", restoreConfigName);
-      backupMgr.uploadConfigDir(location, backupName, configName, restoreConfigName);
+      backupMgr.uploadConfigDir(configName, restoreConfigName);
     }
 
     log.info("Starting restore into collection={} with backup_name={} at location={}", restoreCollectionName, backupName,
@@ -209,7 +209,7 @@
     }
 
     // Restore collection properties
-    backupMgr.uploadCollectionProperties(location, backupName, restoreCollectionName);
+    backupMgr.uploadCollectionProperties(restoreCollectionName);
 
     DocCollection restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
 
diff --git a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
index b15bbfe..0820d1b 100644
--- a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
+++ b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
@@ -28,6 +28,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Properties;
 
 import com.google.common.base.Preconditions;
@@ -70,9 +71,61 @@
   protected final ZkStateReader zkStateReader;
   protected final BackupRepository repository;
 
-  public BackupManager(BackupRepository repository, ZkStateReader zkStateReader) {
+  // Multiple generations of a backup can coexist under a backupId
+  protected FileGeneration propertiesFile;
+  // In case of generation > 0, all zk related files are stored under this subFolder
+  // If generation = 0, value of subFolder will be ignored to maintain backward compatibility
+  protected final String subFolder;
+  protected final URI backupPath;
+
+  private BackupManager(BackupRepository repository,
+                       ZkStateReader zkStateReader,
+                       URI backupPath,
+                       FileGeneration propertiesFile) {
     this.repository = Objects.requireNonNull(repository);
     this.zkStateReader = Objects.requireNonNull(zkStateReader);
+    this.backupPath = backupPath;
+    this.propertiesFile = propertiesFile;
+    this.subFolder = "gen-" + propertiesFile.gen;
+  }
+
+  public static BackupManager forBackup(BackupRepository repository,
+                                        ZkStateReader stateReader,
+                                        URI backupLoc,
+                                        String backupId) throws IOException{
+    Objects.requireNonNull(repository);
+    Objects.requireNonNull(stateReader);
+
+    URI backupPath = repository.resolve(backupLoc, backupId);
+
+    Optional<FileGeneration> opFileGen = FileGeneration.findMostRecent(BACKUP_PROPS_FILE,
+        repository.listAllOrEmpty(backupPath));
+    if (opFileGen.isPresent()) {
+      return new BackupManager(repository, stateReader, backupPath, opFileGen.get().nextGen());
+    } else {
+      return new BackupManager(repository, stateReader, backupPath, FileGeneration.zeroGen(BACKUP_PROPS_FILE));
+    }
+  }
+
+  public static BackupManager forRestore(BackupRepository repository,
+                                         ZkStateReader stateReader,
+                                         URI backupLoc,
+                                         String backupId) throws IOException {
+    Objects.requireNonNull(repository);
+    Objects.requireNonNull(stateReader);
+
+    URI backupPath = repository.resolve(backupLoc, backupId);
+    if (!repository.exists(backupPath)) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Couldn't restore since doesn't exist: " + backupPath);
+    }
+
+    Optional<FileGeneration> opFileGen = FileGeneration.findMostRecent(BACKUP_PROPS_FILE,
+        repository.listAll(repository.resolve(backupLoc, backupId)));
+    if (opFileGen.isPresent()) {
+      return new BackupManager(repository, stateReader, backupPath, opFileGen.get());
+    } else {
+      throw new IllegalStateException("No " + BACKUP_PROPS_FILE + " was found, the backup does not exist or not complete");
+    }
   }
 
   /**
@@ -85,24 +138,13 @@
   /**
    * This method returns the configuration parameters for the specified backup.
    *
-   * @param backupLoc The base path used to store the backup data.
-   * @param backupId  The unique name for the backup whose configuration params are required.
    * @return the configuration parameters for the specified backup.
    * @throws IOException In case of errors.
    */
-  public Properties readBackupProperties(URI backupLoc, String backupId) throws IOException {
-    Objects.requireNonNull(backupLoc);
-    Objects.requireNonNull(backupId);
-
-    // Backup location
-    URI backupPath = repository.resolve(backupLoc, backupId);
-    if (!repository.exists(backupPath)) {
-      throw new SolrException(ErrorCode.SERVER_ERROR, "Couldn't restore since doesn't exist: " + backupPath);
-    }
-
+  public Properties readBackupProperties() throws IOException {
     Properties props = new Properties();
     try (Reader is = new InputStreamReader(new PropertiesInputStream(
-        repository.openInput(backupPath, BACKUP_PROPS_FILE, IOContext.DEFAULT)), StandardCharsets.UTF_8)) {
+        repository.openInput(backupPath, propertiesFile.getFileName(), IOContext.DEFAULT)), StandardCharsets.UTF_8)) {
       props.load(is);
       return props;
     }
@@ -111,13 +153,11 @@
   /**
    * This method stores the backup properties at the specified location in the repository.
    *
-   * @param backupLoc  The base path used to store the backup data.
-   * @param backupId  The unique name for the backup whose configuration params are required.
    * @param props The backup properties
    * @throws IOException in case of I/O error
    */
-  public void writeBackupProperties(URI backupLoc, String backupId, Properties props) throws IOException {
-    URI dest = repository.resolve(backupLoc, backupId, BACKUP_PROPS_FILE);
+  public void writeBackupProperties(Properties props) throws IOException {
+    URI dest = repository.resolve(backupPath, propertiesFile.getFileName());
     try (Writer propsWriter = new OutputStreamWriter(repository.createOutput(dest), StandardCharsets.UTF_8)) {
       props.store(propsWriter, "Backup properties file");
     }
@@ -126,16 +166,14 @@
   /**
    * This method reads the meta-data information for the backed-up collection.
    *
-   * @param backupLoc The base path used to store the backup data.
-   * @param backupId The unique name for the backup.
    * @param collectionName The name of the collection whose meta-data is to be returned.
    * @return the meta-data information for the backed-up collection.
    * @throws IOException in case of errors.
    */
-  public DocCollection readCollectionState(URI backupLoc, String backupId, String collectionName) throws IOException {
+  public DocCollection readCollectionState(String collectionName) throws IOException {
     Objects.requireNonNull(collectionName);
 
-    URI zkStateDir = repository.resolve(backupLoc, backupId, ZK_STATE_DIR);
+    URI zkStateDir = getUriUnderSubFolder(ZK_STATE_DIR);
     try (IndexInput is = repository.openInput(zkStateDir, COLLECTION_PROPS_FILE, IOContext.DEFAULT)) {
       byte[] arr = new byte[(int) is.length()]; // probably ok since the json file should be small.
       is.readBytes(arr, 0, (int) is.length());
@@ -144,18 +182,32 @@
     }
   }
 
+  private URI getUriUnderSubFolder(String... files) {
+    if (propertiesFile.gen == 0) {
+      if (files.length == 0) {
+        return backupPath;
+      }
+
+      return repository.resolve(backupPath, files);
+    } else {
+      URI subFolderURI = repository.resolve(backupPath, subFolder);
+      if (files.length == 0) {
+        return subFolderURI;
+      }
+      return repository.resolve(subFolderURI, files);
+    }
+  }
+
   /**
    * This method writes the collection meta-data to the specified location in the repository.
    *
-   * @param backupLoc The base path used to store the backup data.
-   * @param backupId  The unique name for the backup.
    * @param collectionName The name of the collection whose meta-data is being stored.
    * @param collectionState The collection meta-data to be stored.
    * @throws IOException in case of I/O errors.
    */
-  public void writeCollectionState(URI backupLoc, String backupId, String collectionName,
+  public void writeCollectionState(String collectionName,
                                    DocCollection collectionState) throws IOException {
-    URI dest = repository.resolve(backupLoc, backupId, ZK_STATE_DIR, COLLECTION_PROPS_FILE);
+    URI dest = getUriUnderSubFolder(ZK_STATE_DIR, COLLECTION_PROPS_FILE);
     try (OutputStream collectionStateOs = repository.createOutput(dest)) {
       collectionStateOs.write(Utils.toJSON(Collections.singletonMap(collectionName, collectionState)));
     }
@@ -164,15 +216,13 @@
   /**
    * This method uploads the Solr configuration files to the desired location in Zookeeper.
    *
-   * @param backupLoc  The base path used to store the backup data.
-   * @param backupId  The unique name for the backup.
    * @param sourceConfigName The name of the config to be copied
    * @param targetConfigName  The name of the config to be created.
    * @throws IOException in case of I/O errors.
    */
-  public void uploadConfigDir(URI backupLoc, String backupId, String sourceConfigName, String targetConfigName)
+  public void uploadConfigDir(String sourceConfigName, String targetConfigName)
       throws IOException {
-    URI source = repository.resolve(backupLoc, backupId, ZK_STATE_DIR, CONFIG_STATE_DIR, sourceConfigName);
+    URI source = getUriUnderSubFolder(ZK_STATE_DIR, CONFIG_STATE_DIR, sourceConfigName);
     String zkPath = ZkConfigManager.CONFIGS_ZKNODE + "/" + targetConfigName;
     uploadToZk(zkStateReader.getZkClient(), source, zkPath);
   }
@@ -180,22 +230,24 @@
   /**
    * This method stores the contents of a specified Solr config at the specified location in repository.
    *
-   * @param backupLoc  The base path used to store the backup data.
-   * @param backupId  The unique name for the backup.
    * @param configName The name of the config to be saved.
    * @throws IOException in case of I/O errors.
    */
-  public void downloadConfigDir(URI backupLoc, String backupId, String configName) throws IOException {
-    URI dest = repository.resolve(backupLoc, backupId, ZK_STATE_DIR, CONFIG_STATE_DIR, configName);
-    repository.createDirectory(repository.resolve(backupLoc, backupId, ZK_STATE_DIR));
-    repository.createDirectory(repository.resolve(backupLoc, backupId, ZK_STATE_DIR, CONFIG_STATE_DIR));
+  public void downloadConfigDir(String configName) throws IOException {
+    URI generationFolder = getUriUnderSubFolder();
+    if (!repository.exists(generationFolder))
+      repository.createDirectory(generationFolder);
+
+    URI dest = repository.resolve(generationFolder, ZK_STATE_DIR, CONFIG_STATE_DIR, configName);
+    repository.createDirectory(repository.resolve(generationFolder, ZK_STATE_DIR));
+    repository.createDirectory(repository.resolve(generationFolder, ZK_STATE_DIR, CONFIG_STATE_DIR));
     repository.createDirectory(dest);
 
     downloadFromZK(zkStateReader.getZkClient(), ZkConfigManager.CONFIGS_ZKNODE + "/" + configName, dest);
   }
 
-  public void uploadCollectionProperties(URI backupLoc, String backupId, String collectionName) throws IOException {
-    URI sourceDir = repository.resolve(backupLoc, backupId, ZK_STATE_DIR);
+  public void uploadCollectionProperties(String collectionName) throws IOException {
+    URI sourceDir = getUriUnderSubFolder(ZK_STATE_DIR);
     URI source = repository.resolve(sourceDir, ZkStateReader.COLLECTION_PROPS_ZKNODE);
     if (!repository.exists(source)) {
       // No collection properties to restore
@@ -213,8 +265,8 @@
     }
   }
 
-  public void downloadCollectionProperties(URI backupLoc, String backupId, String collectionName) throws IOException {
-    URI dest = repository.resolve(backupLoc, backupId, ZK_STATE_DIR, ZkStateReader.COLLECTION_PROPS_ZKNODE);
+  public void downloadCollectionProperties(String collectionName) throws IOException {
+    URI dest = getUriUnderSubFolder(ZK_STATE_DIR, ZkStateReader.COLLECTION_PROPS_ZKNODE);
     String zkPath = ZkStateReader.COLLECTIONS_ZKNODE + '/' + collectionName + '/' + ZkStateReader.COLLECTION_PROPS_ZKNODE;
 
 
diff --git a/solr/core/src/java/org/apache/solr/core/backup/FileGeneration.java b/solr/core/src/java/org/apache/solr/core/backup/FileGeneration.java
new file mode 100644
index 0000000..763e9c7
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/core/backup/FileGeneration.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.core.backup;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Finding and represent file with different generation.
+ * i.e: backup-10.properties
+ */
+class FileGeneration {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final String prefix;
+  private final String suffix;
+  public final long gen;
+
+  private FileGeneration(String prefix, String suffix, long gen) {
+    this.prefix = prefix;
+    this.suffix = suffix;
+    this.gen = gen;
+  }
+
+  FileGeneration nextGen() {
+    return new FileGeneration(prefix, suffix, gen+1);
+  }
+
+  static FileGeneration zeroGen(String zeroGenFile) {
+    int index = zeroGenFile.indexOf('.');
+    if (index == -1) {
+      return new FileGeneration(zeroGenFile, "", 0);
+    }
+    return new FileGeneration(zeroGenFile.substring(0, index), zeroGenFile.substring(index), 0);
+  }
+
+  static Optional<FileGeneration> findMostRecent(String zeroGenFile, String[] listFiles) {
+    if (listFiles == null || listFiles.length == 0) {
+      return Optional.empty();
+    }
+
+    int index = zeroGenFile.indexOf('.');
+    if (index == -1) {
+      return findMostRecent(zeroGenFile, "", listFiles);
+    }
+    return findMostRecent(zeroGenFile.substring(0, index), zeroGenFile.substring(index), listFiles);
+  }
+
+  private static Optional<FileGeneration> findMostRecent(String prefix, String suffix, String[] listFiles) {
+    long gen = -1;
+    for (String file : listFiles) {
+      if (file.startsWith(prefix) && file.endsWith(suffix)) {
+        if (prefix.length() + suffix.length() == file.length()) {
+          gen = Math.max(gen, 0);
+        } else {
+          if (file.charAt(prefix.length()) != '-') {
+            log.info("This file {} does not match with the file generation", file);
+            continue;
+          }
+
+          try {
+            gen = Math.max(gen, Long.parseLong(file.substring(prefix.length() + 1, file.length() - suffix.length())));
+          } catch (NumberFormatException e) {
+            log.info("This file {} does not match with the file generation", file);
+          }
+        }
+      }
+    }
+
+    if (gen == -1)
+      return Optional.empty();
+
+    return Optional.of(new FileGeneration(prefix, suffix, gen));
+  }
+
+  public String getFileName() {
+    if (gen == 0) {
+      return prefix + suffix;
+    }
+    else return String.format("%s-%d%s", prefix, gen, suffix);
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepository.java b/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepository.java
index 875be18..e7e47a3 100644
--- a/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepository.java
+++ b/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepository.java
@@ -20,8 +20,12 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URI;
+import java.util.Collection;
+import java.util.Objects;
 import java.util.Optional;
 
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
@@ -181,4 +185,77 @@
    *           in case of errors.
    */
   void copyFileTo(URI sourceRepo, String fileName, Directory dest) throws IOException;
+
+  /**
+   * Delete {@code files} at {@code path}
+   * @since 8.2.0
+   */
+  default void delete(URI path, Collection<String> files) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Get checksum of {@code fileName} at {@code path}
+   * This method only be called on Lucene index files
+   * @since 8.2.0
+   */
+  default Checksum checksum(URI path, String fileName) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Get checksum of {@code fileName} at {@code dir}.
+   * This method only be called on Lucene index files
+   * @since 8.2.0
+   */
+  default Checksum checksum(Directory dir, String fileName) throws IOException {
+    try (IndexInput in = dir.openChecksumInput(fileName, IOContext.READONCE)) {
+      final long length = in.length();
+      if (length < CodecUtil.footerLength()) {
+        throw new CorruptIndexException("File: " + fileName + " is corrupted, its length must be >= " +
+            CodecUtil.footerLength() + " but was: " + in.length(), in);
+      }
+
+      return new BackupRepository.Checksum(fileName, String.valueOf(CodecUtil.retrieveChecksum(in)), in.length());
+    }
+  }
+
+  /**
+   * List all files or directories directly under {@code path}.
+   * @return an empty array in case of IOException
+   */
+  default String[] listAllOrEmpty(URI path) {
+    try {
+      return this.listAll(path);
+    } catch (IOException e) {
+      return new String[0];
+    }
+  }
+
+  class Checksum {
+    public final String fileName;
+    public final String checksum;
+    public final long size;
+
+    Checksum(String fileName, String checksum, long size) {
+      this.fileName = fileName;
+      this.checksum = checksum;
+      this.size = size;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      Checksum checksum = (Checksum) o;
+      return size == checksum.size &&
+          Objects.equals(fileName, checksum.fileName) &&
+          Objects.equals(this.checksum, checksum.checksum);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(fileName, checksum, size);
+    }
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepositoryFactory.java b/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepositoryFactory.java
index 9e02b21..bfc1e51 100644
--- a/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepositoryFactory.java
+++ b/solr/core/src/java/org/apache/solr/core/backup/repository/BackupRepositoryFactory.java
@@ -72,6 +72,11 @@
         "Could not find a backup repository with name " + name);
 
     BackupRepository result = loader.newInstance(repo.className, BackupRepository.class);
+    if ("trackingBackupRepo".equals(name) && repo.initArgs.get("factory") == null) {
+      repo.initArgs.add("factory", this);
+      repo.initArgs.add("loader", loader);
+    }
+
     result.init(repo.initArgs);
     return result;
   }
diff --git a/solr/core/src/java/org/apache/solr/core/backup/repository/HdfsBackupRepository.java b/solr/core/src/java/org/apache/solr/core/backup/repository/HdfsBackupRepository.java
index 6c0b04c..be46478 100644
--- a/solr/core/src/java/org/apache/solr/core/backup/repository/HdfsBackupRepository.java
+++ b/solr/core/src/java/org/apache/solr/core/backup/repository/HdfsBackupRepository.java
@@ -21,6 +21,7 @@
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collection;
 import java.util.Objects;
 
 import com.google.common.base.Preconditions;
@@ -196,4 +197,19 @@
       dest.copyFrom(dir, fileName, fileName, DirectoryFactory.IOCONTEXT_NO_CACHE);
     }
   }
+
+  @Override
+  public void delete(URI path, Collection<String> files) throws IOException {
+    for (String file : files) {
+      fileSystem.delete(new Path(new Path(path), file), true);
+    }
+  }
+
+  @Override
+  public Checksum checksum(URI repo, String fileName) throws IOException {
+    try (HdfsDirectory dir = new HdfsDirectory(new Path(repo), NoLockFactory.INSTANCE,
+        hdfsConfig, copyBufferSize)) {
+      return checksum(dir, fileName);
+    }
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/core/backup/repository/LocalFileSystemRepository.java b/solr/core/src/java/org/apache/solr/core/backup/repository/LocalFileSystemRepository.java
index 01810f6..8173701 100644
--- a/solr/core/src/java/org/apache/solr/core/backup/repository/LocalFileSystemRepository.java
+++ b/solr/core/src/java/org/apache/solr/core/backup/repository/LocalFileSystemRepository.java
@@ -27,6 +27,7 @@
 import java.nio.file.Paths;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collection;
 import java.util.Objects;
 
 import org.apache.lucene.store.Directory;
@@ -129,6 +130,12 @@
 
   @Override
   public String[] listAll(URI dirPath) throws IOException {
+    // It is better to check the existence of the directory first since
+    // creating a FSDirectory will create a corresponds folder if the directory does not exist
+    if (!exists(dirPath)) {
+      return new String[0];
+    }
+
     try (FSDirectory dir = new SimpleFSDirectory(Paths.get(dirPath), NoLockFactory.INSTANCE)) {
       return dir.listAll();
     }
@@ -154,5 +161,24 @@
   }
 
   @Override
+  public Checksum checksum(URI repo, String fileName) throws IOException {
+    try (FSDirectory dir = new SimpleFSDirectory(Paths.get(repo), NoLockFactory.INSTANCE)) {
+      return checksum(dir, fileName);
+    }
+  }
+
+  @Override
+  public void delete(URI path, Collection<String> files) throws IOException {
+    if (files.isEmpty())
+      return;
+
+    try (FSDirectory dir = new SimpleFSDirectory(Paths.get(path), NoLockFactory.INSTANCE)) {
+      for (String file : files) {
+        dir.deleteFile(file);
+      }
+    }
+  }
+
+  @Override
   public void close() throws IOException {}
 }
diff --git a/solr/core/src/java/org/apache/solr/handler/SnapShooter.java b/solr/core/src/java/org/apache/solr/handler/SnapShooter.java
index 2c3c691..0192a29 100644
--- a/solr/core/src/java/org/apache/solr/handler/SnapShooter.java
+++ b/solr/core/src/java/org/apache/solr/handler/SnapShooter.java
@@ -22,15 +22,19 @@
 import java.nio.file.Paths;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Consumer;
 
+import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.store.Directory;
 import org.apache.solr.common.SolrException;
@@ -62,8 +66,10 @@
   private URI snapshotDirPath = null;
   private BackupRepository backupRepo = null;
   private String commitName; // can be null
+  private final boolean incremental;
 
   @Deprecated
+  // Deprecated since 8.2.0
   public SnapShooter(SolrCore core, String location, String snapshotName) {
     String snapDirStr = null;
     // Note - This logic is only applicable to the usecase where a shared file-system is exposed via
@@ -74,10 +80,16 @@
     } else {
       snapDirStr = core.getCoreDescriptor().getInstanceDir().resolve(location).normalize().toString();
     }
+    this.incremental = false;
     initialize(new LocalFileSystemRepository(), core, Paths.get(snapDirStr).toUri(), snapshotName, null);
   }
 
   public SnapShooter(BackupRepository backupRepo, SolrCore core, URI location, String snapshotName, String commitName) {
+    this(backupRepo, core, location, snapshotName, commitName, false);
+  }
+
+  public SnapShooter(BackupRepository backupRepo, SolrCore core, URI location, String snapshotName, String commitName, boolean incremental) {
+    this.incremental = incremental;
     initialize(backupRepo, core, location, snapshotName, commitName);
   }
 
@@ -142,7 +154,7 @@
           " Directory does not exist: " + snapshotDirPath);
     }
 
-    if (backupRepo.exists(snapshotDirPath)) {
+    if (!incremental && backupRepo.exists(snapshotDirPath)) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
           "Snapshot directory already exists: " + snapshotDirPath);
     }
@@ -234,8 +246,12 @@
       Collection<String> files = indexCommit.getFileNames();
       Directory dir = solrCore.getDirectoryFactory().get(solrCore.getIndexDir(), DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
       try {
-        for(String fileName : files) {
-          backupRepo.copyFileFrom(dir, fileName, snapshotDirPath);
+        if (incremental) {
+          incrementalCopy(indexCommit, files, dir);
+        } else {
+          for(String fileName : files) {
+            backupRepo.copyFileFrom(dir, fileName, snapshotDirPath);
+          }
         }
       } finally {
         solrCore.getDirectoryFactory().release(dir);
@@ -260,6 +276,54 @@
     }
   }
 
+  private void incrementalCopy(IndexCommit indexCommit, Collection<String> indexFiles, Directory dir) throws IOException {
+    Set<String> existedFiles = new HashSet<>(Arrays.asList(backupRepo.listAllOrEmpty(snapshotDirPath)));
+
+    // Files in destination with same name as files in indexCommit but with different checksum or length should be deleted first
+    List<String> corruptedFiles = new ArrayList<>();
+    List<String> filesNeedCopyOver = new ArrayList<>();
+
+    for(String fileName : indexFiles) {
+      if (existedFiles.contains(fileName)) {
+        BackupRepository.Checksum originalFileCS = backupRepo.checksum(dir, fileName);
+        try {
+          BackupRepository.Checksum existedFileCS = backupRepo.checksum(snapshotDirPath, fileName);
+          if (Objects.equals(originalFileCS, existedFileCS)) {
+            continue;
+          }
+        } catch (CorruptIndexException e) {
+          log.info("Found a corrupted file in backup repository {}", fileName);
+        }
+
+        corruptedFiles.add(fileName);
+      }
+
+      filesNeedCopyOver.add(fileName);
+    }
+
+    backupRepo.delete(snapshotDirPath, corruptedFiles);
+
+    boolean copySegmentsFile = false;
+    for (String fileName : filesNeedCopyOver) {
+      if (fileName.equals(indexCommit.getSegmentsFileName())) {
+        copySegmentsFile = true;
+        continue;
+      }
+
+      backupRepo.copyFileFrom(dir, fileName, snapshotDirPath);
+    }
+
+    if (copySegmentsFile) {
+      // copy segments_N last, in case of failures on copy new files, the backup still work
+      backupRepo.copyFileFrom(dir, indexCommit.getSegmentsFileName(), snapshotDirPath);
+    }
+
+    // finally delete unused files
+    //TODO keeping multiple indexCommit
+    existedFiles.removeAll(indexFiles);
+    backupRepo.delete(snapshotDirPath, existedFiles);
+  }
+
   private void deleteOldBackups(int numberToKeep) throws IOException {
     String[] paths = backupRepo.listAll(baseSnapDirPath);
     List<OldBackupDirectory> dirs = new ArrayList<>();
@@ -303,6 +367,14 @@
     replicationHandler.snapShootDetails = details;
   }
 
+  private static String[] listAllOrEmpty(BackupRepository repo, URI dir) {
+    try {
+      return repo.listAll(dir);
+    } catch (IOException e) {
+      return new String[0];
+    }
+  }
+
   public static final String DATE_FMT = "yyyyMMddHHmmssSSS";
 
 }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/BackupCoreOp.java b/solr/core/src/java/org/apache/solr/handler/admin/BackupCoreOp.java
index 503eed0..3e61c4f 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/BackupCoreOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/BackupCoreOp.java
@@ -37,6 +37,7 @@
 
     String cname = params.required().get(CoreAdminParams.CORE);
     String name = params.required().get(NAME);
+    boolean incremental = params.getBool(CoreAdminParams.BACKUP_INCREMENTAL, false);
 
     String repoName = params.get(CoreAdminParams.BACKUP_REPOSITORY);
     BackupRepository repository = it.handler.coreContainer.newBackupRepository(Optional.ofNullable(repoName));
@@ -53,7 +54,7 @@
 
     URI locationUri = repository.createURI(location);
     try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
-      SnapShooter snapShooter = new SnapShooter(repository, core, locationUri, name, commitName);
+      SnapShooter snapShooter = new SnapShooter(repository, core, locationUri, name, commitName, incremental);
       // validateCreateSnapshot will create parent dirs instead of throw; that choice is dubious.
       //  But we want to throw. One reason is that
       //  this dir really should, in fact must, already exist here if triggered via a collection backup on a shared
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index d292976..a2d4cc2 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -1085,6 +1085,7 @@
         }
       }
 
+      boolean incremental = req.getParams().getBool(CoreAdminParams.BACKUP_INCREMENTAL, false);
       // Check if the specified location is valid for this repository.
       URI uri = repository.createURI(location);
       try {
@@ -1099,10 +1100,11 @@
       if (!CollectionAdminParams.INDEX_BACKUP_STRATEGIES.contains(strategy)) {
         throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown index backup strategy " + strategy);
       }
-
-      Map<String, Object> params = copy(req.getParams(), null, NAME, COLLECTION_PROP, FOLLOW_ALIASES, CoreAdminParams.COMMIT_NAME);
+      
+      Map<String, Object> params = copy(req.getParams(), null, NAME, COLLECTION_PROP, FOLLOW_ALIASES, CoreAdminParams.COMMIT_NAME, CoreAdminParams.BACKUP_REPOSITORY);
       params.put(CoreAdminParams.BACKUP_LOCATION, location);
       params.put(CollectionAdminParams.INDEX_BACKUP_STRATEGY, strategy);
+      params.put(CoreAdminParams.BACKUP_INCREMENTAL, incremental);
       return params;
     }),
     RESTORE_OP(RESTORE, (req, rsp, h) -> {
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java
index 9a41afe..b3dd3b1 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java
@@ -16,19 +16,27 @@
  */
 package org.apache.solr.cloud.api.collections;
 
-import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.CoreMatchers.not;
-
 import java.io.IOException;
+import java.io.OutputStream;
 import java.lang.invoke.MethodHandles;
+import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
+import java.util.Set;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.TestUtil;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
@@ -36,6 +44,7 @@
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest.ClusterProp;
+import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.RequestStatusState;
 import org.apache.solr.cloud.AbstractDistribZkTestBase;
 import org.apache.solr.cloud.SolrCloudTestCase;
@@ -44,14 +53,23 @@
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.params.CollectionAdminParams;
 import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.TrackingBackupRepository;
+import org.apache.solr.core.backup.repository.BackupRepository;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.core.TrackingBackupRepository.copiedFiles;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.not;
+
 /**
  * This class implements the logic required to test Solr cloud backup/restore capability.
  */
@@ -80,11 +98,6 @@
   public abstract String getCollectionNamePrefix();
 
   /**
-   * @return The name of the backup repository to use.
-   */
-  public abstract String getBackupRepoName();
-
-  /**
    * @return The absolute path for the backup location.
    *         Could return null.
    */
@@ -104,9 +117,7 @@
     setTestSuffix("testok");
     boolean isImplicit = random().nextBoolean();
     boolean doSplitShardOperation = !isImplicit && random().nextBoolean();
-    replFactor = TestUtil.nextInt(random(), 1, 2);
-    numTlogReplicas = TestUtil.nextInt(random(), 0, 1);
-    numPullReplicas = TestUtil.nextInt(random(), 0, 1);
+    randomizeReplicaTypes();
     int backupReplFactor = replFactor + numPullReplicas + numTlogReplicas;
 
     CollectionAdminRequest.Create create = isImplicit ?
@@ -162,11 +173,86 @@
   }
 
   @Test
+  @Slow
+  public void testBackupIncremental() throws Exception {
+    TrackingBackupRepository.clear();
+
+    setTestSuffix("testbackupinc");
+    randomizeReplicaTypes();
+    CloudSolrClient solrClient = cluster.getSolrClient();
+
+    CollectionAdminRequest
+        .createCollection(getCollectionName(), "conf1", NUM_SHARDS, replFactor, numTlogReplicas, numPullReplicas)
+        .setMaxShardsPerNode(-1)
+        .process(solrClient);
+
+    indexDocs(getCollectionName(), false);
+
+    String backupName = BACKUPNAME_PREFIX + testSuffix;
+    try (BackupRepository repository = cluster.getJettySolrRunner(0).getCoreContainer()
+        .newBackupRepository(Optional.of(getBackupRepoName()))) {
+      String backupLocation = repository.getBackupLocation(getBackupLocation());
+      IncrementalBackupVerifier verifier = new IncrementalBackupVerifier(repository, backupLocation, backupName);
+      backupRestoreCorruptBackupFilesThenCheck(solrClient, verifier, backupLocation, backupName);
+
+      indexDocs(getCollectionName(), false);
+      backupRestoreCorruptBackupFilesThenCheck(solrClient, verifier, backupLocation, backupName);
+
+      for (int i = 0; i < 15; i++) {
+        indexDocs(getCollectionName(), 5,false);
+      }
+
+      backupRestoreCorruptBackupFilesThenCheck(solrClient, verifier, backupLocation, backupName);
+
+      new UpdateRequest()
+          .deleteByQuery("*:*")
+          .commit(cluster.getSolrClient(), getCollectionName());
+
+      backupRestoreCorruptBackupFilesThenCheck(solrClient, verifier, backupLocation, backupName);
+
+      indexDocs(getCollectionName(), false);
+      backupRestoreCorruptBackupFilesThenCheck(solrClient, verifier, backupLocation, backupName);
+    }
+  }
+
+  private void backupRestoreCorruptBackupFilesThenCheck(CloudSolrClient solrClient,
+                                                        IncrementalBackupVerifier verifier,
+                                                        String backupLocation,
+                                                        String backupName) throws Exception {
+    verifier.incrementalBackupThenVerify();
+
+    if( random().nextBoolean() )
+      simpleRestoreAndCheckDocCount(solrClient, backupLocation, backupName);
+
+    if ( random().nextBoolean() )
+      verifier.corruptBackupFiles();
+  }
+
+  private void simpleRestoreAndCheckDocCount(CloudSolrClient solrClient, String backupLocation, String backupName) throws Exception{
+    Map<String, Integer> origShardToDocCount = getShardToDocCountMap(solrClient, getCollectionState(getCollectionName()));
+
+    String restoreCollectionName = getCollectionName() + "_restored";
+    CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
+        .setLocation(backupLocation).setRepositoryName(getBackupRepoName()).process(solrClient);
+
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(
+        restoreCollectionName, cluster.getSolrClient().getZkStateReader(), log.isDebugEnabled(), true, 30);
+
+    // check num docs are the same
+    assertEquals(origShardToDocCount, getShardToDocCountMap(solrClient, getCollectionState(restoreCollectionName)));
+
+    // this methods may get invoked multiple times, collection must be cleanup
+    CollectionAdminRequest.deleteCollection(restoreCollectionName).process(solrClient);
+  }
+
+  protected String getBackupRepoName() {
+    return "trackingBackupRepo";
+  }
+
+  @Test
   public void testRestoreFailure() throws Exception {
     setTestSuffix("testfailure");
-    replFactor = TestUtil.nextInt(random(), 1, 2);
-    numTlogReplicas = TestUtil.nextInt(random(), 0, 1);
-    numPullReplicas = TestUtil.nextInt(random(), 0, 1);
+    randomizeReplicaTypes();
 
     CollectionAdminRequest.Create create =
         CollectionAdminRequest.createCollection(getCollectionName(), "conf1", NUM_SHARDS, replFactor, numTlogReplicas, numPullReplicas);
@@ -213,6 +299,12 @@
     }
   }
 
+  private void randomizeReplicaTypes() {
+    replFactor = TestUtil.nextInt(random(), 1, 2);
+    numTlogReplicas = TestUtil.nextInt(random(), 0, 1);
+    numPullReplicas = TestUtil.nextInt(random(), 0, 1);
+  }
+
   /**
    * This test validates the backup of collection configuration using
    *  {@linkplain CollectionAdminParams#NO_INDEX_BACKUP_STRATEGY}.
@@ -266,18 +358,13 @@
     return cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(collectionName).getActiveSlices().size();
   }
 
-  private int indexDocs(String collectionName, boolean useUUID) throws Exception {
-    Random random = new Random(docsSeed);// use a constant seed for the whole test run so that we can easily re-index.
-    int numDocs = random.nextInt(100);
-    if (numDocs == 0) {
-      log.info("Indexing ZERO test docs");
-      return 0;
-    }
+  private void indexDocs(String collectionName, int numDocs, boolean useUUID) throws Exception {
+    Random random = new Random(docsSeed);
 
     List<SolrInputDocument> docs = new ArrayList<>(numDocs);
     for (int i=0; i<numDocs; i++) {
       SolrInputDocument doc = new SolrInputDocument();
-      doc.addField("id", ((useUUID == true) ? java.util.UUID.randomUUID().toString() : i));
+      doc.addField("id", (useUUID ? java.util.UUID.randomUUID().toString() : i));
       doc.addField("shard_s", "shard" + (1 + random.nextInt(NUM_SHARDS))); // for implicit router
       docs.add(doc);
     }
@@ -287,7 +374,17 @@
     client.commit(collectionName);
 
     log.info("Indexed {} docs to collection: {}", numDocs, collectionName);
+  }
 
+  private int indexDocs(String collectionName, boolean useUUID) throws Exception {
+    Random random = new Random(docsSeed);// use a constant seed for the whole test run so that we can easily re-index.
+    int numDocs = random.nextInt(100);
+    if (numDocs == 0) {
+      log.info("Indexing ZERO test docs");
+      return 0;
+    }
+
+    indexDocs(collectionName, numDocs, useUUID);
     return numDocs;
   }
 
@@ -373,7 +470,7 @@
     props.setProperty("customKey", "customVal");
     restore.setProperties(props);
 
-    if (sameConfig==false) {
+    if (!sameConfig) {
       restore.setConfigName("customConfigName");
     }
     if (random().nextBoolean()) {
@@ -451,4 +548,132 @@
     }
     return shardToDocCount;
   }
+
+  private class IncrementalBackupVerifier {
+    private BackupRepository repository;
+    private String backupLocation;
+    private String backupName;
+    private Map<String, Collection<String>> lastShardCommitToBackupFiles = new HashMap<>();
+    private List<URI> corruptedFiles = new ArrayList<>();
+
+    IncrementalBackupVerifier(BackupRepository repository, String backupLocation, String backupName) {
+      this.repository = repository;
+      this.backupLocation = backupLocation;
+      this.backupName = backupName;
+    }
+
+    private void corruptBackupFiles() throws Exception {
+      for(Slice slice : getCollectionState(getCollectionName()).getSlices()) {
+        Replica leader = slice.getLeader();
+
+        try (SolrCore solrCore = cluster.getReplicaJetty(leader).getCoreContainer().getCore(leader.getCoreName())) {
+          Directory dir = solrCore.getDirectoryFactory().get(solrCore.getIndexDir(), DirectoryFactory.DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
+          try {
+            URI snapshotDirPath = repository.resolve(repository.createURI(backupLocation), backupName, "snapshot." + slice.getName());
+
+            String[] listFiles = repository.listAll(snapshotDirPath);
+            for (String file : listFiles) {
+              if (random().nextBoolean())
+                continue;
+
+              repository.delete(snapshotDirPath, Collections.singleton(file));
+              try (OutputStream os = repository.createOutput(repository.resolve(snapshotDirPath, file))) {
+                byte[] corruptedData = new byte[random().nextInt(30) + 1];
+                random().nextBytes(corruptedData);
+                os.write(corruptedData);
+              }
+              corruptedFiles.add(repository.resolve(snapshotDirPath, file));
+            }
+          } finally {
+            solrCore.getDirectoryFactory().release(dir);
+          }
+        }
+      }
+    }
+
+    private void backupThenWait() throws SolrServerException, IOException {
+      CollectionAdminRequest.Backup backup = CollectionAdminRequest.backupCollection(getCollectionName(), backupName)
+          .setLocation(backupLocation).setIncremental(true).setRepositoryName(getBackupRepoName());
+      assertEquals(0, backup.process(cluster.getSolrClient()).getStatus());
+    }
+
+    void incrementalBackupThenVerify() throws IOException, SolrServerException {
+      int numCopiedFiles = copiedFiles().size();
+      backupThenWait();
+      List<URI> newFilesCopiedOver = copiedFiles().subList(numCopiedFiles, copiedFiles().size());
+      verify(newFilesCopiedOver, corruptedFiles);
+
+      // these corrupted files are no longer valid for the next check since they are healed by the last backup
+      corruptedFiles.clear();
+    }
+
+    public void verify(List<URI> newFilesCopiedOver, List<URI> corruptedFiles) throws IOException {
+      //TODO Datcm verify each backup will reupload config files
+
+      // verify indexes file
+      for(Slice slice : getCollectionState(getCollectionName()).getSlices()) {
+        Replica leader = slice.getLeader();
+
+        try (SolrCore solrCore = cluster.getReplicaJetty(leader).getCoreContainer().getCore(leader.getCoreName())) {
+          Directory dir = solrCore.getDirectoryFactory().get(solrCore.getIndexDir(), DirectoryFactory.DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
+          try {
+
+            URI snapshotDirPath = repository.resolve(repository.createURI(backupLocation), backupName, "snapshot." + slice.getName());
+            IndexCommit lastCommit = solrCore.getDeletionPolicy().getLatestCommit();
+            Collection<URI> newFilesInIndex = newFilesComparedToLastBackup(slice.getName(), lastCommit)
+                .stream()
+                .map(f -> repository.resolve(snapshotDirPath, f))
+                .collect(Collectors.toList());
+
+            String[] listFiles = repository.listAll(snapshotDirPath);
+
+            lastCommit.getFileNames().forEach(
+                f -> {
+                  URI remoteURI = repository.resolve(snapshotDirPath, f);
+                  if (newFilesInIndex.contains(remoteURI) || corruptedFiles.contains(remoteURI)) {
+                    assertTrue(newFilesCopiedOver.contains(remoteURI));
+                  } else {
+                    assertFalse(newFilesCopiedOver.contains(remoteURI));
+                  }
+                }
+            );
+
+            assertEquals("Incremental backup stored more files than needed stored: " + Arrays.toString(listFiles) +
+                " commit-files: " + lastCommit.getFileNames(),lastCommit.getFileNames().size(), listFiles.length);
+
+            Set<String> allChecksums = new HashSet<>();
+            // assert checksum are same
+            Arrays.stream(listFiles)
+                .forEach(f -> {
+                  try {
+                    BackupRepository.Checksum localChecksum = repository.checksum(dir, f);
+                    BackupRepository.Checksum remoteChecksum = repository.checksum(snapshotDirPath, f);
+                    allChecksums.add(localChecksum.checksum);
+                    allChecksums.add(remoteChecksum.checksum);
+                    assertEquals(repository.checksum(dir, f), repository.checksum(snapshotDirPath, f));
+                  } catch (IOException e) {
+                    throw new IllegalStateException(e);
+                  }
+                });
+
+            if (listFiles.length > 7) {
+              assertNotEquals("All files have same checksum, this likely be a bug in checksum implementation",1, allChecksums.size());
+            }
+          } finally {
+            solrCore.getDirectoryFactory().release(dir);
+          }
+        }
+      }
+    }
+
+    private Collection<String> newFilesComparedToLastBackup(String shardName, IndexCommit currentCommit) throws IOException {
+      Collection<String> oldFiles = lastShardCommitToBackupFiles.put(shardName, currentCommit.getFileNames());
+      if (oldFiles == null)
+        oldFiles = new ArrayList<>();
+
+      List<String> newFiles = new ArrayList<>(currentCommit.getFileNames());
+      newFiles.removeAll(oldFiles);
+      return newFiles;
+    }
+  }
 }
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestHdfsCloudBackupRestore.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestHdfsCloudBackupRestore.java
index 2e7ea10..388d2ef 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestHdfsCloudBackupRestore.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestHdfsCloudBackupRestore.java
@@ -88,6 +88,9 @@
       "  </solrcloud>\n" +
       "  \n" +
       "  <backup>\n" +
+      "    <repository name=\"trackingBackupRepo\" default=\"true\" class=\"org.apache.solr.core.TrackingBackupRepository\"> \n" +
+      "      <str name=\"delegateRepoName\">hdfs</str>\n" +
+      "    </repository>\n" +
       "    <repository  name=\"hdfs\" class=\"org.apache.solr.core.backup.repository.HdfsBackupRepository\"> \n" +
       "      <str name=\"location\">${solr.hdfs.default.backup.path}</str>\n" +
       "      <str name=\"solr.hdfs.home\">${solr.hdfs.home:}</str>\n" +
@@ -162,11 +165,6 @@
   }
 
   @Override
-  public String getBackupRepoName() {
-    return "hdfs";
-  }
-
-  @Override
   public String getBackupLocation() {
     return null;
   }
@@ -186,17 +184,17 @@
 
     HdfsBackupRepository repo = new HdfsBackupRepository();
     repo.init(new NamedList<>(params));
-    BackupManager mgr = new BackupManager(repo, solrClient.getZkStateReader());
 
     URI baseLoc = repo.createURI("/backup");
 
-    Properties props = mgr.readBackupProperties(baseLoc, backupName);
+    BackupManager mgr = BackupManager.forRestore(repo, solrClient.getZkStateReader(), baseLoc, backupName);
+    Properties props = mgr.readBackupProperties();
     assertNotNull(props);
     assertEquals(collectionName, props.getProperty(COLLECTION_NAME_PROP));
     assertEquals(backupName, props.getProperty(BACKUP_NAME_PROP));
     assertEquals(configName, props.getProperty(COLL_CONF));
 
-    DocCollection collectionState = mgr.readCollectionState(baseLoc, backupName, collectionName);
+    DocCollection collectionState = mgr.readCollectionState(collectionName);
     assertNotNull(collectionState);
     assertEquals(collectionName, collectionState.getName());
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java
index 2b6abf1..5c4228b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java
@@ -17,7 +17,6 @@
 package org.apache.solr.cloud.api.collections;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
 import org.apache.solr.common.cloud.ZkConfigManager;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -27,8 +26,40 @@
  * Solr backup/restore still requires a "shared" file-system. Its just that in this case such file-system would be
  * exposed via local file-system API.
  */
-@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12866")
 public class TestLocalFSCloudBackupRestore extends AbstractCloudBackupRestoreTestCase {
+  public static final String SOLR_XML = "<solr>\n" +
+      "\n" +
+      "  <str name=\"shareSchema\">${shareSchema:false}</str>\n" +
+      "  <str name=\"configSetBaseDir\">${configSetBaseDir:configsets}</str>\n" +
+      "  <str name=\"coreRootDirectory\">${coreRootDirectory:.}</str>\n" +
+      "\n" +
+      "  <shardHandlerFactory name=\"shardHandlerFactory\" class=\"HttpShardHandlerFactory\">\n" +
+      "    <str name=\"urlScheme\">${urlScheme:}</str>\n" +
+      "    <int name=\"socketTimeout\">${socketTimeout:90000}</int>\n" +
+      "    <int name=\"connTimeout\">${connTimeout:15000}</int>\n" +
+      "  </shardHandlerFactory>\n" +
+      "\n" +
+      "  <solrcloud>\n" +
+      "    <str name=\"host\">127.0.0.1</str>\n" +
+      "    <int name=\"hostPort\">${hostPort:8983}</int>\n" +
+      "    <str name=\"hostContext\">${hostContext:solr}</str>\n" +
+      "    <int name=\"zkClientTimeout\">${solr.zkclienttimeout:30000}</int>\n" +
+      "    <bool name=\"genericCoreNodeNames\">${genericCoreNodeNames:true}</bool>\n" +
+      "    <int name=\"leaderVoteWait\">10000</int>\n" +
+      "    <int name=\"distribUpdateConnTimeout\">${distribUpdateConnTimeout:45000}</int>\n" +
+      "    <int name=\"distribUpdateSoTimeout\">${distribUpdateSoTimeout:340000}</int>\n" +
+      "  </solrcloud>\n" +
+      "  \n" +
+      "  <backup>\n" +
+      "    <repository name=\"trackingBackupRepo\" class=\"org.apache.solr.core.TrackingBackupRepository\"> \n" +
+      "      <str name=\"delegateRepoName\">localfs</str>\n" +
+      "    </repository>\n" +
+      "    <repository name=\"localfs\" class=\"org.apache.solr.core.backup.repository.LocalFileSystemRepository\"> \n" +
+      "    </repository>\n" +
+      "  </backup>\n" +
+      "  \n" +
+      "</solr>\n";
+
   private static String backupLocation;
 
   @BeforeClass
@@ -36,6 +67,7 @@
     configureCluster(NUM_SHARDS)// nodes
         .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
         .addConfig("confFaulty", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+        .withSolrXml(SOLR_XML)
         .configure();
     cluster.getZkClient().delete(ZkConfigManager.CONFIGS_ZKNODE + Path.SEPARATOR + "confFaulty" + Path.SEPARATOR + "solrconfig.xml", -1, true);
 
@@ -53,11 +85,6 @@
   }
 
   @Override
-  public String getBackupRepoName() {
-    return null;
-  }
-
-  @Override
   public String getBackupLocation() {
     return backupLocation;
   }
diff --git a/solr/core/src/test/org/apache/solr/core/backup/FileGenerationTest.java b/solr/core/src/test/org/apache/solr/core/backup/FileGenerationTest.java
new file mode 100644
index 0000000..0401f85
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/core/backup/FileGenerationTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.core.backup;
+
+import java.util.Optional;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class FileGenerationTest {
+
+  @Test
+  public void test() {
+    FileGeneration fileGen = FileGeneration.zeroGen("backup.properties");
+    assertEquals("backup.properties", fileGen.getFileName());
+    fileGen = fileGen.nextGen();
+    assertEquals("backup-1.properties", fileGen.getFileName());
+    fileGen = fileGen.nextGen();
+    assertEquals("backup-2.properties", fileGen.getFileName());
+
+    fileGen = FileGeneration.findMostRecent("backup.properties",
+        new String[] {"aaa", "baa.properties", "backup.properties",
+            "backup-1.properties", "backup-2.properties", "backup-neqewq.properties", "backup999.properties"}).get();
+    assertEquals("backup-2.properties", fileGen.getFileName());
+    fileGen = fileGen.nextGen();
+    assertEquals("backup-3.properties", fileGen.getFileName());
+
+    Optional<FileGeneration> op = FileGeneration.findMostRecent("backup.properties",
+        new String[0]);
+    assertTrue(op.isEmpty());
+  }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 7f3cbd4..c461b37 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -1008,6 +1008,7 @@
     protected String location;
     protected Optional<String> commitName = Optional.empty();
     protected Optional<String> indexBackupStrategy = Optional.empty();
+    protected boolean incremental = false;
 
     public Backup(String collection, String name) {
       super(CollectionAction.BACKUP, collection);
@@ -1051,6 +1052,11 @@
       return this;
     }
 
+    public Backup setIncremental(boolean incremental) {
+      this.incremental = incremental;
+      return this;
+    }
+
     @Override
     public SolrParams getParams() {
       ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
@@ -1066,6 +1072,7 @@
       if (indexBackupStrategy.isPresent()) {
         params.set(CollectionAdminParams.INDEX_BACKUP_STRATEGY, indexBackupStrategy.get());
       }
+      params.set(CoreAdminParams.BACKUP_INCREMENTAL, incremental);
       return params;
     }
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
index 9103450..736a406 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
@@ -121,6 +121,11 @@
   public static final String BACKUP_LOCATION = "location";
 
   /**
+   * A parameter to specify whether incremental backup is used
+   */
+  public static final String BACKUP_INCREMENTAL = "incremental";
+
+  /**
    * A parameter to specify the name of the commit to be stored during the backup operation.
    */
   public static final String COMMIT_NAME = "commitName";
diff --git a/solr/test-framework/src/java/org/apache/solr/core/TrackingBackupRepository.java b/solr/test-framework/src/java/org/apache/solr/core/TrackingBackupRepository.java
new file mode 100644
index 0000000..376baee
--- /dev/null
+++ b/solr/test-framework/src/java/org/apache/solr/core/TrackingBackupRepository.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.core;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.core.backup.repository.BackupRepositoryFactory;
+
+public class TrackingBackupRepository implements BackupRepository {
+  private static final List<URI> COPIED_FILES = Collections.synchronizedList(new ArrayList<>());
+
+  private BackupRepository delegate;
+
+  @Override
+  public <T> T getConfigProperty(String name) {
+    return delegate.getConfigProperty(name);
+  }
+
+  @Override
+  public URI createURI(String path) {
+    return delegate.createURI(path);
+  }
+
+  @Override
+  public URI resolve(URI baseUri, String... pathComponents) {
+    return delegate.resolve(baseUri, pathComponents);
+  }
+
+  @Override
+  public boolean exists(URI path) throws IOException {
+    return delegate.exists(path);
+  }
+
+  @Override
+  public PathType getPathType(URI path) throws IOException {
+    return delegate.getPathType(path);
+  }
+
+  @Override
+  public String[] listAll(URI path) throws IOException {
+    return delegate.listAll(path);
+  }
+
+  @Override
+  public IndexInput openInput(URI dirPath, String fileName, IOContext ctx) throws IOException {
+    return delegate.openInput(dirPath, fileName, ctx);
+  }
+
+  @Override
+  public OutputStream createOutput(URI path) throws IOException {
+    return delegate.createOutput(path);
+  }
+
+  @Override
+  public void createDirectory(URI path) throws IOException {
+    delegate.createDirectory(path);
+  }
+
+  @Override
+  public void deleteDirectory(URI path) throws IOException {
+    delegate.deleteDirectory(path);
+  }
+
+  @Override
+  public void copyFileFrom(Directory sourceDir, String fileName, URI dest) throws IOException {
+    COPIED_FILES.add(delegate.resolve(dest, fileName));
+    delegate.copyFileFrom(sourceDir, fileName, dest);
+  }
+
+  @Override
+  public void copyFileTo(URI sourceRepo, String fileName, Directory dest) throws IOException {
+    delegate.copyFileTo(sourceRepo, fileName, dest);
+  }
+
+  @Override
+  public void close() throws IOException {
+    delegate.close();
+  }
+
+
+  @Override
+  public void delete(URI path, Collection<String> files) throws IOException {
+    delegate.delete(path, files);
+  }
+
+  @Override
+  public Checksum checksum(URI path, String fileName) throws IOException {
+    return delegate.checksum(path, fileName);
+  }
+
+  @Override
+  public Checksum checksum(Directory dir, String fileName) throws IOException {
+    return delegate.checksum(dir, fileName);
+  }
+
+  @Override
+  public void init(NamedList args) {
+    BackupRepositoryFactory factory = (BackupRepositoryFactory) args.get("factory");
+    SolrResourceLoader loader = (SolrResourceLoader) args.get("loader");
+    String repoName = (String) args.get("delegateRepoName");
+
+    this.delegate = factory.newInstance(loader, repoName);
+  }
+
+  /**
+   * @return list of files were copied by using {@link #copyFileFrom(Directory, String, URI)}
+   */
+  public static List<URI> copiedFiles() {
+    return new ArrayList<>(COPIED_FILES);
+  }
+
+  /**
+   * Clear all tracking data
+   */
+  public static void clear() {
+    COPIED_FILES.clear();
+  }
+
+}