SOLR-10391: Add overwrite option to UPLOAD ConfigSet action (#1861)

When set to true, Solr will overwrite an existing configset in ZooKeeper in an UPLOAD.
A new cleanup parameter can also be passed to let Solr know what to do with the files that existed in the old configset, but no longer exist in the new configset (remove or keep)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 0b50271..f4909e3 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -160,6 +160,8 @@
 
 * SOLR-14875: Make SolrEventListeners load from packages (noble)
 
+* SOLR-10391: ConfigSet handler now supports overrides on existing configsets. (Tomás Fernández Löbbe)
+
 Improvements
 ---------------------
 
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ConfigSetsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/ConfigSetsHandler.java
index 728bf9f..bd06069 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ConfigSetsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ConfigSetsHandler.java
@@ -20,9 +20,12 @@
 import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
@@ -38,6 +41,7 @@
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.common.cloud.ZkMaintenanceUtils;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.params.ConfigSetParams;
 import org.apache.solr.common.params.ConfigSetParams.ConfigSetAction;
@@ -154,7 +158,9 @@
     SolrZkClient zkClient = coreContainer.getZkController().getZkClient();
     String configPathInZk = ZkConfigManager.CONFIGS_ZKNODE + "/" + configSetName;
 
-    if (zkClient.exists(configPathInZk, true)) {
+    boolean overwritesExisting = zkClient.exists(configPathInZk, true);
+
+    if (overwritesExisting && !req.getParams().getBool(ConfigSetParams.OVERWRITE, false)) {
       throw new SolrException(ErrorCode.BAD_REQUEST,
           "The configuration " + configSetName + " already exists in zookeeper");
     }
@@ -169,25 +175,99 @@
     InputStream inputStream = contentStreamsIterator.next().getStream();
 
     // Create a node for the configuration in zookeeper
-    boolean trusted = getTrusted(req);
-    zkClient.makePath(configPathInZk, ("{\"trusted\": " + Boolean.toString(trusted) + "}").
-        getBytes(StandardCharsets.UTF_8), true);
+    boolean cleanup = req.getParams().getBool(ConfigSetParams.CLEANUP, false);
+    
+    Set<String> filesToDelete;
+    if (overwritesExisting && cleanup) {
+      filesToDelete = getAllConfigsetFiles(zkClient, configPathInZk);
+    } else {
+      filesToDelete = Collections.emptySet();
+    }
+    createBaseZnode(zkClient, overwritesExisting, isTrusted(req), cleanup, configPathInZk);
 
     ZipInputStream zis = new ZipInputStream(inputStream, StandardCharsets.UTF_8);
     ZipEntry zipEntry = null;
     while ((zipEntry = zis.getNextEntry()) != null) {
       String filePathInZk = configPathInZk + "/" + zipEntry.getName();
+      if (filePathInZk.endsWith("/")) {
+        filesToDelete.remove(filePathInZk.substring(0, filePathInZk.length() -1));
+      } else {
+        filesToDelete.remove(filePathInZk);
+      }
       if (zipEntry.isDirectory()) {
-        zkClient.makePath(filePathInZk, true);
+        zkClient.makePath(filePathInZk, false,  true);
       } else {
         createZkNodeIfNotExistsAndSetData(zkClient, filePathInZk,
             IOUtils.toByteArray(zis));
       }
     }
     zis.close();
+    deleteUnusedFiles(zkClient, filesToDelete);
   }
 
-  boolean getTrusted(SolrQueryRequest req) {
+  private void createBaseZnode(SolrZkClient zkClient, boolean overwritesExisting, boolean requestIsTrusted, boolean cleanup, String configPathInZk) throws KeeperException, InterruptedException {
+    byte[] baseZnodeData =  ("{\"trusted\": " + Boolean.toString(requestIsTrusted) + "}").getBytes(StandardCharsets.UTF_8);
+
+    if (overwritesExisting) {
+      if (cleanup && requestIsTrusted) {
+        zkClient.setData(configPathInZk, baseZnodeData, true);
+      } else if (!requestIsTrusted) {
+        ensureOverwritingUntrustedConfigSet(zkClient, configPathInZk);
+      }
+    } else {
+      zkClient.makePath(configPathInZk, baseZnodeData, true);
+    }
+  }
+
+  private void deleteUnusedFiles(SolrZkClient zkClient, Set<String> filesToDelete) throws InterruptedException, KeeperException {
+    if (!filesToDelete.isEmpty()) {
+      if (log.isInfoEnabled()) {
+        log.info("Cleaning up {} unused files", filesToDelete.size());
+      }
+      if (log.isDebugEnabled()) {
+        log.debug("Cleaning up unused files: {}", filesToDelete);
+      }
+      for (String f:filesToDelete) {
+        try {
+          zkClient.delete(f, -1, true);
+        } catch (KeeperException.NoNodeException nne) {
+        }
+      }
+    }
+  }
+
+  private Set<String> getAllConfigsetFiles(SolrZkClient zkClient, String configPathInZk) throws KeeperException, InterruptedException {
+    final Set<String> files = new HashSet<>();
+    if (!configPathInZk.startsWith(ZkConfigManager.CONFIGS_ZKNODE + "/")) {
+      throw new IllegalArgumentException("\"" + configPathInZk + "\" not recognized as a configset path");
+    }
+    ZkMaintenanceUtils.traverseZkTree(zkClient, configPathInZk, ZkMaintenanceUtils.VISIT_ORDER.VISIT_POST, files::add);
+    files.remove(configPathInZk);
+    return files;
+  }
+
+  /*
+   * Fail if an untrusted request tries to update a trusted ConfigSet
+   */
+  private void ensureOverwritingUntrustedConfigSet(SolrZkClient zkClient, String configSetZkPath) {
+    byte[] configSetNodeContent;
+    try {
+      configSetNodeContent = zkClient.getData(configSetZkPath, null, null, true);
+    } catch (KeeperException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Exception while fetching current configSet at " + configSetZkPath, e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted while fetching current configSet at " + configSetZkPath, e);
+    }
+    @SuppressWarnings("unchecked")
+    Map<Object, Object> contentMap = (Map<Object, Object>) Utils.fromJSON(configSetNodeContent);
+    boolean isCurrentlyTrusted = (boolean) contentMap.getOrDefault("trusted", true);
+    if (isCurrentlyTrusted) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Trying to make an unstrusted ConfigSet update on a trusted configSet");
+    }
+  }
+
+  boolean isTrusted(SolrQueryRequest req) {
     AuthenticationPlugin authcPlugin = coreContainer.getAuthenticationPlugin();
     if (log.isInfoEnabled()) {
       log.info("Trying to upload a configset. authcPlugin: {}, user principal: {}",
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestConfigSetsAPI.java b/solr/core/src/test/org/apache/solr/cloud/TestConfigSetsAPI.java
index 2f89df1..6fea2cb 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestConfigSetsAPI.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestConfigSetsAPI.java
@@ -32,7 +32,6 @@
 import java.lang.invoke.MethodHandles;
 import java.net.URI;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.security.Principal;
 import java.util.Arrays;
 import java.util.Collection;
@@ -40,6 +39,8 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -87,7 +88,9 @@
 import org.apache.solr.util.ExternalPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.apache.zookeeper.data.Stat;
 import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -95,6 +98,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.solr.common.params.CommonParams.NAME;
 import static org.apache.solr.core.ConfigSetProperties.DEFAULT_FILENAME;
 import static org.junit.matchers.JUnitMatchers.containsString;
@@ -122,6 +126,7 @@
   }
 
   @Override
+  @After
   public void tearDown() throws Exception {
     cluster.deleteAllCollections();
     cluster.deleteAllConfigSets();
@@ -185,7 +190,7 @@
     FileUtils.copyDirectory(configDir, tmpConfigDir);
     if (oldProps != null) {
       FileUtils.write(new File(tmpConfigDir, ConfigSetProperties.DEFAULT_FILENAME),
-          getConfigSetProps(oldProps), StandardCharsets.UTF_8);
+          getConfigSetProps(oldProps), UTF_8);
     }
     zkConfigManager.uploadConfigDir(tmpConfigDir.toPath(), baseConfigSetName);
   }
@@ -231,7 +236,7 @@
     }
 
     if (oldPropsData != null) {
-      InputStreamReader reader = new InputStreamReader(new ByteArrayInputStream(oldPropsData), StandardCharsets.UTF_8);
+      InputStreamReader reader = new InputStreamReader(new ByteArrayInputStream(oldPropsData), UTF_8);
       try {
         return ConfigSetProperties.readFromInputStream(reader);
       } finally {
@@ -309,9 +314,9 @@
     // Create dummy config files in zookeeper
     zkClient.makePath("/configs/myconf", true);
     zkClient.create("/configs/myconf/firstDummyFile",
-        "first dummy content".getBytes(StandardCharsets.UTF_8), CreateMode.PERSISTENT, true);
+        "first dummy content".getBytes(UTF_8), CreateMode.PERSISTENT, true);
     zkClient.create("/configs/myconf/anotherDummyFile",
-        "second dummy content".getBytes(StandardCharsets.UTF_8), CreateMode.PERSISTENT, true);
+        "second dummy content".getBytes(UTF_8), CreateMode.PERSISTENT, true);
 
     // Checking error when configuration name specified already exists
     ignoreException("already exists");
@@ -353,6 +358,132 @@
   }
 
   @Test
+  public void testOverwrite() throws Exception {
+    String configsetName = "regular";
+    String configsetSuffix = "testOverwrite-1";
+    uploadConfigSetWithAssertions(configsetName, configsetSuffix, null);
+    try (SolrZkClient zkClient = new SolrZkClient(cluster.getZkServer().getZkAddress(),
+            AbstractZkTestCase.TIMEOUT, 45000, null)) {
+      int solrconfigZkVersion = getConfigZNodeVersion(zkClient, configsetName, configsetSuffix, "solrconfig.xml");
+      ignoreException("The configuration regulartestOverwrite-1 already exists in zookeeper");
+      assertEquals("Can't overwrite an existing configset unless the overwrite parameter is set",
+              400, uploadConfigSet(configsetName, configsetSuffix, null, zkClient, false, false));
+      unIgnoreException("The configuration regulartestOverwrite-1 already exists in zookeeper");
+      assertEquals("Expecting version to remain equal",
+              solrconfigZkVersion, getConfigZNodeVersion(zkClient, configsetName, configsetSuffix, "solrconfig.xml"));
+      assertEquals(0, uploadConfigSet(configsetName, configsetSuffix, null, zkClient, true, false));
+      assertTrue("Expecting version bump",
+              solrconfigZkVersion < getConfigZNodeVersion(zkClient, configsetName, configsetSuffix, "solrconfig.xml"));
+    }
+
+  }
+
+  @Test
+  public void testOverwriteWithCleanup() throws Exception {
+    String configsetName = "regular";
+    String configsetSuffix = "testOverwriteWithCleanup-1";
+    uploadConfigSetWithAssertions(configsetName, configsetSuffix, null);
+    try (SolrZkClient zkClient = new SolrZkClient(cluster.getZkServer().getZkAddress(),
+            AbstractZkTestCase.TIMEOUT, 45000, null)) {
+      List<String> extraFiles = Arrays.asList(
+              "/configs/regulartestOverwriteWithCleanup-1/foo1",
+              "/configs/regulartestOverwriteWithCleanup-1/foo2",
+              "/configs/regulartestOverwriteWithCleanup-1/foo2/1",
+              "/configs/regulartestOverwriteWithCleanup-1/foo2/2");
+      for (String f : extraFiles) {
+        zkClient.makePath(f, true);
+      }
+      assertEquals(0, uploadConfigSet(configsetName, configsetSuffix, null, zkClient, true, false));
+      for (String f : extraFiles) {
+        assertTrue("Expecting file " + f + " to exist in ConfigSet but it's gone", zkClient.exists(f, true));
+      }
+      assertEquals(0, uploadConfigSet(configsetName, configsetSuffix, null, zkClient, true, true));
+      for (String f : extraFiles) {
+        assertFalse("Expecting file " + f + " to be deleted from ConfigSet but it wasn't", zkClient.exists(f, true));
+      }
+      assertConfigsetFiles(configsetName, configsetSuffix, zkClient);
+    }
+  }
+
+  @Test
+  public void testOverwriteWithTrust() throws Exception {
+    String configsetName = "regular";
+    String configsetSuffix = "testOverwriteWithTrust-1";
+    uploadConfigSetWithAssertions(configsetName, configsetSuffix, null);
+    try (SolrZkClient zkClient = new SolrZkClient(cluster.getZkServer().getZkAddress(),
+            AbstractZkTestCase.TIMEOUT, 45000, null)) {
+      assertFalse(isTrusted(zkClient, configsetName, configsetSuffix));
+      int solrconfigZkVersion = getConfigZNodeVersion(zkClient, configsetName, configsetSuffix, "solrconfig.xml");
+      // Was untrusted, overwrite with untrusted
+      assertEquals(0, uploadConfigSet(configsetName, configsetSuffix, null, zkClient, true, false));
+      assertTrue("Expecting version bump",
+              solrconfigZkVersion < getConfigZNodeVersion(zkClient, configsetName, configsetSuffix, "solrconfig.xml"));
+      assertFalse(isTrusted(zkClient, configsetName, configsetSuffix));
+      solrconfigZkVersion = getConfigZNodeVersion(zkClient, configsetName, configsetSuffix, "solrconfig.xml");
+
+      // Was untrusted, overwrite with trusted but no cleanup
+      assertEquals(0, uploadConfigSet(configsetName, configsetSuffix, "solr", zkClient, true, false));
+      assertTrue("Expecting version bump",
+              solrconfigZkVersion < getConfigZNodeVersion(zkClient, configsetName, configsetSuffix, "solrconfig.xml"));
+      assertFalse(isTrusted(zkClient, configsetName, configsetSuffix));
+      solrconfigZkVersion = getConfigZNodeVersion(zkClient, configsetName, configsetSuffix, "solrconfig.xml");
+
+      // Was untrusted, overwrite with trusted with cleanup
+      assertEquals(0, uploadConfigSet(configsetName, configsetSuffix, "solr", zkClient, true, true));
+      assertTrue("Expecting version bump",
+              solrconfigZkVersion < getConfigZNodeVersion(zkClient, configsetName, configsetSuffix, "solrconfig.xml"));
+      assertTrue(isTrusted(zkClient, configsetName, configsetSuffix));
+      solrconfigZkVersion = getConfigZNodeVersion(zkClient, configsetName, configsetSuffix, "solrconfig.xml");
+
+      // Was trusted, try to overwrite with untrusted with no cleanup
+      ignoreException("Trying to make an unstrusted ConfigSet update on a trusted configSet");
+      assertEquals("Can't upload a trusted configset with an untrusted request",
+              400, uploadConfigSet(configsetName, configsetSuffix, null, zkClient, true, false));
+      assertEquals("Expecting version to remain equal",
+              solrconfigZkVersion, getConfigZNodeVersion(zkClient, configsetName, configsetSuffix, "solrconfig.xml"));
+      assertTrue(isTrusted(zkClient, configsetName, configsetSuffix));
+
+      // Was trusted, try to overwrite with untrusted with cleanup
+      ignoreException("Trying to make an unstrusted ConfigSet update on a trusted configSet");
+      assertEquals("Can't upload a trusted configset with an untrusted request",
+              400, uploadConfigSet(configsetName, configsetSuffix, null, zkClient, true, true));
+      assertEquals("Expecting version to remain equal",
+              solrconfigZkVersion, getConfigZNodeVersion(zkClient, configsetName, configsetSuffix, "solrconfig.xml"));
+      assertTrue(isTrusted(zkClient, configsetName, configsetSuffix));
+      unIgnoreException("Trying to make an unstrusted ConfigSet update on a trusted configSet");
+
+      // Was trusted, overwrite with trusted no cleanup
+      assertEquals(0, uploadConfigSet(configsetName, configsetSuffix, "solr", zkClient, true, false));
+      assertTrue("Expecting version bump",
+              solrconfigZkVersion < getConfigZNodeVersion(zkClient, configsetName, configsetSuffix, "solrconfig.xml"));
+      assertTrue(isTrusted(zkClient, configsetName, configsetSuffix));
+      solrconfigZkVersion = getConfigZNodeVersion(zkClient, configsetName, configsetSuffix, "solrconfig.xml");
+
+      // Was trusted, overwrite with trusted with cleanup
+      assertEquals(0, uploadConfigSet(configsetName, configsetSuffix, "solr", zkClient, true, true));
+      assertTrue("Expecting version bump",
+              solrconfigZkVersion < getConfigZNodeVersion(zkClient, configsetName, configsetSuffix, "solrconfig.xml"));
+      assertTrue(isTrusted(zkClient, configsetName, configsetSuffix));
+    }
+
+  }
+
+  private boolean isTrusted(SolrZkClient zkClient, String configsetName, String configsetSuffix) throws KeeperException, InterruptedException {
+    String configSetZkPath = String.format(Locale.ROOT,"/configs/%s%s", configsetName, configsetSuffix);
+    byte[] configSetNodeContent = zkClient.getData(configSetZkPath, null, null, true);;
+
+    @SuppressWarnings("unchecked")
+    Map<Object, Object> contentMap = (Map<Object, Object>) Utils.fromJSON(configSetNodeContent);
+    return (boolean) contentMap.getOrDefault("trusted", true);
+  }
+
+  private int getConfigZNodeVersion(SolrZkClient zkClient, String configsetName, String configsetSuffix, String configFile) throws KeeperException, InterruptedException {
+    Stat stat = new Stat();
+    zkClient.getData(String.format(Locale.ROOT, "/configs/%s%s/%s", configsetName, configsetSuffix, configFile), null, stat, true);
+    return stat.getVersion();
+  }
+
+  @Test
   public void testUpload() throws Exception {
     String suffix = "-untrusted";
     uploadConfigSetWithAssertions("regular", suffix, null);
@@ -431,38 +562,44 @@
     try {
       long statusCode = uploadConfigSet(configSetName, suffix, username, zkClient);
       assertEquals(0l, statusCode);
-
-      assertTrue("managed-schema file should have been uploaded",
-          zkClient.exists("/configs/"+configSetName+suffix+"/managed-schema", true));
-      assertTrue("managed-schema file contents on zookeeper are not exactly same as that of the file uploaded in config",
-          Arrays.equals(zkClient.getData("/configs/"+configSetName+suffix+"/managed-schema", null, null, true),
-              readFile("solr/configsets/upload/"+configSetName+"/managed-schema")));
-
-      assertTrue("solrconfig.xml file should have been uploaded",
-          zkClient.exists("/configs/"+configSetName+suffix+"/solrconfig.xml", true));
-      byte data[] = zkClient.getData("/configs/"+configSetName+suffix, null, null, true);
-      //assertEquals("{\"trusted\": false}", new String(data, StandardCharsets.UTF_8));
-      assertTrue("solrconfig.xml file contents on zookeeper are not exactly same as that of the file uploaded in config",
-          Arrays.equals(zkClient.getData("/configs/"+configSetName+suffix+"/solrconfig.xml", null, null, true),
-              readFile("solr/configsets/upload/"+configSetName+"/solrconfig.xml")));
+      assertConfigsetFiles(configSetName, suffix, zkClient);
     } finally {
       zkClient.close();
     }
   }
+  private void assertConfigsetFiles(String configSetName, String suffix, SolrZkClient zkClient) throws KeeperException, InterruptedException, IOException {
+    assertTrue("managed-schema file should have been uploaded",
+        zkClient.exists("/configs/"+configSetName+suffix+"/managed-schema", true));
+    assertTrue("managed-schema file contents on zookeeper are not exactly same as that of the file uploaded in config",
+        Arrays.equals(zkClient.getData("/configs/"+configSetName+suffix+"/managed-schema", null, null, true),
+            readFile("solr/configsets/upload/"+configSetName+"/managed-schema")));
+
+    assertTrue("solrconfig.xml file should have been uploaded",
+        zkClient.exists("/configs/"+configSetName+suffix+"/solrconfig.xml", true));
+    byte data[] = zkClient.getData("/configs/"+configSetName+suffix, null, null, true);
+    //assertEquals("{\"trusted\": false}", new String(data, StandardCharsets.UTF_8));
+    assertTrue("solrconfig.xml file contents on zookeeper are not exactly same as that of the file uploaded in config",
+        Arrays.equals(zkClient.getData("/configs/"+configSetName+suffix+"/solrconfig.xml", null, null, true),
+            readFile("solr/configsets/upload/"+configSetName+"/solrconfig.xml")));
+  }
 
   private long uploadConfigSet(String configSetName, String suffix, String username,
-      SolrZkClient zkClient) throws IOException {
+                               SolrZkClient zkClient) throws IOException {
+    ZkConfigManager configManager = new ZkConfigManager(zkClient);
+    assertFalse(configManager.configExists(configSetName + suffix));
+    return uploadConfigSet(configSetName, suffix, username, zkClient, false, false);
+  }
+
+  private long uploadConfigSet(String configSetName, String suffix, String username,
+      SolrZkClient zkClient, boolean overwrite, boolean cleanup) throws IOException {
     // Read zipped sample config
     ByteBuffer sampleZippedConfig = TestSolrConfigHandler
         .getFileContent(
             createTempZipFile("solr/configsets/upload/"+configSetName), false);
 
-    ZkConfigManager configManager = new ZkConfigManager(zkClient);
-    assertFalse(configManager.configExists(configSetName+suffix));
-
     @SuppressWarnings({"rawtypes"})
     Map map = postDataAndGetResponse(cluster.getSolrClient(),
-        cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/admin/configs?action=UPLOAD&name="+configSetName+suffix,
+        cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/admin/configs?action=UPLOAD&name="+configSetName+suffix + (overwrite? "&overwrite=true" : "") + (cleanup? "&cleanup=true" : ""),
         sampleZippedConfig, username);
     assertNotNull(map);
     long statusCode = (long) getObjectByPath(map, false, Arrays.asList("responseHeader", "status"));
@@ -581,7 +718,7 @@
       entity = cloudClient.getLbClient().getHttpClient().execute(httpPost)
           .getEntity();
       try {
-        response = EntityUtils.toString(entity, StandardCharsets.UTF_8);
+        response = EntityUtils.toString(entity, UTF_8);
         m = (Map) Utils.fromJSONString(response);
       } catch (JSONParser.ParseException e) {
         System.err.println("err response: " + response);
@@ -634,7 +771,7 @@
     // Ensure ConfigSet is immutable
     FileUtils.copyDirectory(configDir, tmpConfigDir);
     FileUtils.write(new File(tmpConfigDir, "configsetprops.json"),
-        getConfigSetProps(ImmutableMap.<String, String>of("immutable", "true")), StandardCharsets.UTF_8);
+        getConfigSetProps(ImmutableMap.<String, String>of("immutable", "true")), UTF_8);
     zkConfigManager.uploadConfigDir(tmpConfigDir.toPath(), "configSet");
 
     // no ConfigSet name
@@ -747,7 +884,7 @@
   }
 
   private StringBuilder getConfigSetProps(Map<String, String> map) {
-    return new StringBuilder(new String(Utils.toJSON(map), StandardCharsets.UTF_8));
+    return new StringBuilder(new String(Utils.toJSON(map), UTF_8));
   }
 
   public static class CreateNoErrorChecking extends ConfigSetAdminRequest.Create {
diff --git a/solr/solr-ref-guide/src/configsets-api.adoc b/solr/solr-ref-guide/src/configsets-api.adoc
index 9b0cf26..3110189 100644
--- a/solr/solr-ref-guide/src/configsets-api.adoc
+++ b/solr/solr-ref-guide/src/configsets-api.adoc
@@ -92,11 +92,17 @@
 
 If you use any of these parameters or features, you must have enabled security features in your Solr installation and you must upload the configset as an authenticated user.
 
-The `upload` command takes one parameter:
+The `upload` command takes the following parameters:
 
 name::
 The configset to be created when the upload is complete. This parameter is required.
 
+overwrite::
+If set to `true`, Solr will overwrite an existing configset with the same name (if false, the request will fail). Default is `false`.
+
+cleanup::
+When overwriting an existing configset (`overwrite=true`), this parameter tells Solr to delete the files in ZooKeeper that existed in the old configset but not in the one being uploaded. Default is `false`.
+
 The body of the request should be a zip file that contains the configset. The zip file must be created from within the `conf` directory (i.e., `solrconfig.xml` must be the top level entry in the zip file).
 
 Here is an example on how to create the zip file named "myconfig.zip" and upload it as a configset named "myConfigSet":
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/ConfigSetParams.java b/solr/solrj/src/java/org/apache/solr/common/params/ConfigSetParams.java
index fde7e57..0443ea7 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/ConfigSetParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/ConfigSetParams.java
@@ -24,6 +24,8 @@
 public interface ConfigSetParams
 {
   public final static String ACTION = "action";
+  public final static String OVERWRITE = "overwrite";
+  public final static String CLEANUP = "cleanup";
 
   public enum ConfigSetAction {
     CREATE,
diff --git a/solr/solrj/src/test/org/apache/solr/common/util/TestZkMaintenanceUtils.java b/solr/solrj/src/test/org/apache/solr/common/util/TestZkMaintenanceUtils.java
index 94e0465..d914382 100644
--- a/solr/solrj/src/test/org/apache/solr/common/util/TestZkMaintenanceUtils.java
+++ b/solr/solrj/src/test/org/apache/solr/common/util/TestZkMaintenanceUtils.java
@@ -16,11 +16,47 @@
  */
 package org.apache.solr.common.util;
 
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
 import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.cloud.ZkTestServer;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkMaintenanceUtils;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestZkMaintenanceUtils extends SolrTestCaseJ4 {
+
+  protected static ZkTestServer zkServer;
+  private static Path zkDir;
+
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    zkDir = createTempDir("TestZkMaintenanceUtils");
+    zkServer = new ZkTestServer(zkDir);
+    zkServer.run();
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws IOException, InterruptedException {
+
+    if (zkServer != null) {
+      zkServer.shutdown();
+      zkServer = null;
+    }
+    if (null != zkDir) {
+      FileUtils.deleteDirectory(zkDir.toFile());
+      zkDir = null;
+    }
+  }
+
   @Test
   public void testPaths() {
     assertEquals("Unexpected path construction"
@@ -52,4 +88,28 @@
         , ZkMaintenanceUtils.getZkParent("/leadingslashonly"));
 
   }
+
+  @Test
+  public void testTraverseZkTree() throws Exception {
+    try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+      zkClient.makePath("/testTraverseZkTree/1/1", true, true);
+      zkClient.makePath("/testTraverseZkTree/1/2", false, true);
+      zkClient.makePath("/testTraverseZkTree/2", false, true);
+      assertEquals(Arrays.asList("/testTraverseZkTree", "/testTraverseZkTree/1", "/testTraverseZkTree/1/1", "/testTraverseZkTree/1/2", "/testTraverseZkTree/2"), getTraverseedZNodes(zkClient, "/testTraverseZkTree", ZkMaintenanceUtils.VISIT_ORDER.VISIT_PRE));
+      assertEquals(Arrays.asList("/testTraverseZkTree/1/1", "/testTraverseZkTree/1/2", "/testTraverseZkTree/1", "/testTraverseZkTree/2", "/testTraverseZkTree"), getTraverseedZNodes(zkClient, "/testTraverseZkTree", ZkMaintenanceUtils.VISIT_ORDER.VISIT_POST));
+
+    }
+  }
+
+  private List<String> getTraverseedZNodes(SolrZkClient zkClient, String path, ZkMaintenanceUtils.VISIT_ORDER visitOrder) throws KeeperException, InterruptedException {
+    List<String> result = new ArrayList<>();
+    ZkMaintenanceUtils.traverseZkTree(zkClient, path, visitOrder, new ZkMaintenanceUtils.ZkVisitor() {
+
+      @Override
+      public void visit(String path) throws InterruptedException, KeeperException {
+        result.add(path);
+      }
+    });
+    return result;
+  }
 }