YARN-10002. Code cleanup and improvements in ConfigurationStoreBaseTest. Contributed by Benjamin Teke
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
index eeb38d3..97d9933 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -337,6 +338,12 @@
   }
 
   @Override
+  protected LinkedList<LogMutation> getLogs() {
+    // Unimplemented.
+    return null;
+  }
+
+  @Override
   protected Version getConfStoreVersion() throws Exception {
     return null;
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
index d031ea9..dc2c724 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
@@ -22,6 +22,7 @@
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -87,6 +88,12 @@
   }
 
   @Override
+  protected LinkedList<LogMutation> getLogs() {
+    // Unimplemented.
+    return null;
+  }
+
+  @Override
   public Version getConfStoreVersion() throws Exception {
     // Does nothing.
     return null;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
index b43bcb8..f90bdea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -333,6 +333,7 @@
   }
 
   @VisibleForTesting
+  @Override
   protected LinkedList<LogMutation> getLogs() throws Exception {
     return deserLogMutations(db.get(bytes(LOG_KEY)));
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index e8e3ecf..8ab3e44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -27,6 +27,7 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -156,6 +157,13 @@
   protected abstract Version getConfStoreVersion() throws Exception;
 
   /**
+   * Get a list of configuration mutations.
+   * @return list of configuration mutations.
+   * @throws Exception On mutation fetch failure
+   */
+  protected abstract LinkedList<LogMutation> getLogs() throws Exception;
+
+  /**
    * Persist the hard-coded schema version to the conf store.
    * @throws Exception On storage failure
    */
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
index 0fa48b4..fae4929 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
@@ -119,6 +119,7 @@
   }
 
   @VisibleForTesting
+  @Override
   protected LinkedList<LogMutation> getLogs() throws Exception {
     return (LinkedList<LogMutation>)
         deserializeObject(zkManager.getData(logsPath));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
index 4b3153a..3a8c362 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
@@ -34,16 +34,13 @@
  * Base class for {@link YarnConfigurationStore} implementations.
  */
 public abstract class ConfigurationStoreBaseTest {
+  static final String TEST_USER = "testUser";
+  YarnConfigurationStore confStore = createConfStore();
+  Configuration conf;
+  Configuration schedConf;
+  RMContext rmContext;
 
-  protected YarnConfigurationStore confStore = createConfStore();
-
-  protected abstract YarnConfigurationStore createConfStore();
-
-  protected Configuration conf;
-  protected Configuration schedConf;
-  protected RMContext rmContext;
-
-  protected static final String TEST_USER = "testUser";
+  abstract YarnConfigurationStore createConfStore();
 
   @Before
   public void setUp() throws Exception {
@@ -59,20 +56,12 @@
     confStore.initialize(conf, schedConf, rmContext);
     assertEquals("val1", confStore.retrieve().get("key1"));
 
-    Map<String, String> update1 = new HashMap<>();
-    update1.put("keyUpdate1", "valUpdate1");
-    YarnConfigurationStore.LogMutation mutation1 =
-        new YarnConfigurationStore.LogMutation(update1, TEST_USER);
-    confStore.logMutation(mutation1);
-    confStore.confirmMutation(mutation1, true);
+    confStore.confirmMutation(prepareLogMutation("keyUpdate1", "valUpdate1"),
+        true);
     assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
 
-    Map<String, String> update2 = new HashMap<>();
-    update2.put("keyUpdate2", "valUpdate2");
-    YarnConfigurationStore.LogMutation mutation2 =
-        new YarnConfigurationStore.LogMutation(update2, TEST_USER);
-    confStore.logMutation(mutation2);
-    confStore.confirmMutation(mutation2, false);
+    confStore.confirmMutation(prepareLogMutation("keyUpdate2", "valUpdate2"),
+        false);
     assertNull("Configuration should not be updated",
         confStore.retrieve().get("keyUpdate2"));
     confStore.close();
@@ -84,13 +73,20 @@
     confStore.initialize(conf, schedConf, rmContext);
     assertEquals("val", confStore.retrieve().get("key"));
 
+    confStore.confirmMutation(prepareLogMutation("key", null), true);
+    assertNull(confStore.retrieve().get("key"));
+    confStore.close();
+  }
+
+  YarnConfigurationStore.LogMutation prepareLogMutation(String key,
+                                                        String value)
+      throws Exception {
     Map<String, String> update = new HashMap<>();
-    update.put("key", null);
+    update.put(key, value);
     YarnConfigurationStore.LogMutation mutation =
         new YarnConfigurationStore.LogMutation(update, TEST_USER);
     confStore.logMutation(mutation);
-    confStore.confirmMutation(mutation, true);
-    assertNull(confStore.retrieve().get("key"));
-    confStore.close();
+
+    return mutation;
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java
new file mode 100644
index 0000000..169c36d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.junit.Test;
+
+import java.util.LinkedList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Base class for the persistent {@link YarnConfigurationStore}
+ * implementations, namely {@link TestLeveldbConfigurationStore} and
+ * {@link TestZKConfigurationStore}.
+ */
+public abstract class PersistentConfigurationStoreBaseTest extends
+    ConfigurationStoreBaseTest {
+
+  abstract Version getVersion();
+
+  @Test
+  public void testGetConfigurationVersion() throws Exception {
+    confStore.initialize(conf, schedConf, rmContext);
+    long v1 = confStore.getConfigVersion();
+    assertEquals(1, v1);
+    confStore.confirmMutation(prepareLogMutation("keyver", "valver"), true);
+    long v2 = confStore.getConfigVersion();
+    assertEquals(2, v2);
+    confStore.close();
+  }
+
+  @Test
+  public void testPersistConfiguration() throws Exception {
+    schedConf.set("key", "val");
+    confStore.initialize(conf, schedConf, rmContext);
+    assertEquals("val", confStore.retrieve().get("key"));
+    confStore.close();
+
+    // Create a new configuration store, and check for old configuration
+    confStore = createConfStore();
+    schedConf.set("key", "badVal");
+    // Should ignore passed-in scheduler configuration.
+    confStore.initialize(conf, schedConf, rmContext);
+    assertEquals("val", confStore.retrieve().get("key"));
+    confStore.close();
+  }
+
+  @Test
+  public void testPersistUpdatedConfiguration() throws Exception {
+    confStore.initialize(conf, schedConf, rmContext);
+    assertNull(confStore.retrieve().get("key"));
+
+    confStore.confirmMutation(prepareLogMutation("key", "val"), true);
+    assertEquals("val", confStore.retrieve().get("key"));
+    confStore.close();
+
+    // Create a new configuration store, and check for updated configuration
+    confStore = createConfStore();
+    schedConf.set("key", "badVal");
+    // Should ignore passed-in scheduler configuration.
+    confStore.initialize(conf, schedConf, rmContext);
+    assertEquals("val", confStore.retrieve().get("key"));
+    confStore.close();
+  }
+
+
+  @Test
+  public void testVersion() throws Exception {
+    confStore.initialize(conf, schedConf, rmContext);
+    assertNull(confStore.getConfStoreVersion());
+    confStore.checkVersion();
+    assertEquals(getVersion(),
+        confStore.getConfStoreVersion());
+    confStore.close();
+  }
+
+  @Test
+  public void testMaxLogs() throws Exception {
+    conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
+    confStore.initialize(conf, schedConf, rmContext);
+    LinkedList<YarnConfigurationStore.LogMutation> logs = confStore.getLogs();
+    assertEquals(0, logs.size());
+
+    YarnConfigurationStore.LogMutation mutation =
+        prepareLogMutation("key1", "val1");
+    logs = confStore.getLogs();
+    assertEquals(1, logs.size());
+    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+    confStore.confirmMutation(mutation, true);
+    assertEquals(1, logs.size());
+    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+
+    mutation = prepareLogMutation("key2", "val2");
+    logs = confStore.getLogs();
+    assertEquals(2, logs.size());
+    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+    assertEquals("val2", logs.get(1).getUpdates().get("key2"));
+    confStore.confirmMutation(mutation, true);
+    assertEquals(2, logs.size());
+    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+    assertEquals("val2", logs.get(1).getUpdates().get("key2"));
+
+    // Next update should purge first update from logs.
+    mutation = prepareLogMutation("key3", "val3");
+    logs = confStore.getLogs();
+    assertEquals(2, logs.size());
+    assertEquals("val2", logs.get(0).getUpdates().get("key2"));
+    assertEquals("val3", logs.get(1).getUpdates().get("key3"));
+    confStore.confirmMutation(mutation, true);
+    assertEquals(2, logs.size());
+    assertEquals("val2", logs.get(0).getUpdates().get("key2"));
+    assertEquals("val3", logs.get(1).getUpdates().get("key3"));
+    confStore.close();
+  }
+
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
index e4ca3d3..f5da5ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
@@ -32,19 +32,22 @@
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
+import org.hamcrest.CoreMatchers;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 
 
 /**
  * Tests {@link FSSchedulerConfigurationStore}.
  */
 public class TestFSSchedulerConfigurationStore {
+  private static final String TEST_USER = "test";
   private FSSchedulerConfigurationStore configurationStore;
   private Configuration conf;
   private File testSchedulerConfigurationDir;
@@ -90,35 +93,29 @@
     Configuration storeConf = configurationStore.retrieve();
     compareConfig(conf, storeConf);
 
-    Map<String, String> updates = new HashMap<>();
-    updates.put("a", null);
-    updates.put("b", "bb");
-
     Configuration expectConfig = new Configuration(conf);
     expectConfig.unset("a");
     expectConfig.set("b", "bb");
 
-    LogMutation logMutation = new LogMutation(updates, "test");
-    configurationStore.logMutation(logMutation);
-    configurationStore.confirmMutation(logMutation, true);
+    prepareParameterizedLogMutation(configurationStore, true,
+        "a", null, "b", "bb");
     storeConf = configurationStore.retrieve();
-    assertEquals(null, storeConf.get("a"));
+    assertNull(storeConf.get("a"));
     assertEquals("bb", storeConf.get("b"));
     assertEquals("c", storeConf.get("c"));
 
     compareConfig(expectConfig, storeConf);
 
-    updates.put("b", "bbb");
-    configurationStore.logMutation(logMutation);
-    configurationStore.confirmMutation(logMutation, true);
+    prepareParameterizedLogMutation(configurationStore, true,
+        "a", null, "b", "bbb");
     storeConf = configurationStore.retrieve();
-    assertEquals(null, storeConf.get("a"));
+    assertNull(storeConf.get("a"));
     assertEquals("bbb", storeConf.get("b"));
     assertEquals("c", storeConf.get("c"));
   }
 
   @Test
-  public void confirmMutationWithInValid() throws Exception {
+  public void confirmMutationWithInvalid() throws Exception {
     conf.set("a", "a");
     conf.set("b", "b");
     conf.set("c", "c");
@@ -127,13 +124,8 @@
     Configuration storeConf = configurationStore.retrieve();
     compareConfig(conf, storeConf);
 
-    Map<String, String> updates = new HashMap<>();
-    updates.put("a", null);
-    updates.put("b", "bb");
-
-    LogMutation logMutation = new LogMutation(updates, "test");
-    configurationStore.logMutation(logMutation);
-    configurationStore.confirmMutation(logMutation, false);
+    prepareParameterizedLogMutation(configurationStore, false,
+        "a", null, "b", "bb");
     storeConf = configurationStore.retrieve();
 
     compareConfig(conf, storeConf);
@@ -142,7 +134,7 @@
   @Test
   public void testFileSystemClose() throws Exception {
     MiniDFSCluster hdfsCluster = null;
-    FileSystem fs = null;
+    FileSystem fs;
     Path path = new Path("/tmp/confstore");
     try {
       HdfsConfiguration hdfsConfig = new HdfsConfiguration();
@@ -164,11 +156,8 @@
       fs.close();
 
       try {
-        Map<String, String> updates = new HashMap<>();
-        updates.put("testkey", "testvalue");
-        LogMutation logMutation = new LogMutation(updates, "test");
-        configStore.logMutation(logMutation);
-        configStore.confirmMutation(logMutation, true);
+        prepareParameterizedLogMutation(configStore, true,
+            "testkey", "testvalue");
       } catch (IOException e) {
         if (e.getMessage().contains("Filesystem closed")) {
           fail("FSSchedulerConfigurationStore failed to handle " +
@@ -178,13 +167,12 @@
         }
       }
     } finally {
+      assert hdfsCluster != null;
       fs = hdfsCluster.getFileSystem();
       if (fs.exists(path)) {
         fs.delete(path, true);
       }
-      if (hdfsCluster != null) {
-        hdfsCluster.shutdown();
-      }
+      hdfsCluster.shutdown();
     }
   }
 
@@ -197,15 +185,14 @@
     Configuration storedConfig = configurationStore.retrieve();
     assertEquals("a", storedConfig.get("a"));
     configurationStore.format();
-    boolean exceptionCaught = false;
     try {
-      storedConfig = configurationStore.retrieve();
+      configurationStore.retrieve();
+      fail("Expected an IOException with message containing \"no capacity " +
+          "scheduler file in\" to be thrown");
     } catch (IOException e) {
-      if (e.getMessage().contains("no capacity scheduler file in")) {
-        exceptionCaught = true;
-      }
+      assertThat(e.getMessage(),
+          CoreMatchers.containsString("no capacity scheduler file in"));
     }
-    assertTrue(exceptionCaught);
   }
 
   @Test
@@ -243,4 +230,27 @@
           schedulerConf.get(entry.getKey()));
     }
   }
+
+  private void prepareParameterizedLogMutation(
+      FSSchedulerConfigurationStore configStore,
+      boolean validityFlag, String... values) throws Exception {
+    Map<String, String> updates = new HashMap<>();
+    String key;
+    String value;
+
+    if (values.length % 2 != 0) {
+      throw new IllegalArgumentException("The number of parameters should be " +
+          "even.");
+    }
+
+    for (int i = 1; i <= values.length; i += 2) {
+      key = values[i - 1];
+      value = values[i];
+      updates.put(key, value);
+    }
+
+    LogMutation logMutation = new LogMutation(updates, TEST_USER);
+    configStore.logMutation(logMutation);
+    configStore.confirmMutation(logMutation, validityFlag);
+  }
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
index 785a910..3565ed1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
@@ -38,8 +38,6 @@
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -49,7 +47,8 @@
 /**
  * Tests {@link LeveldbConfigurationStore}.
  */
-public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
+public class TestLeveldbConfigurationStore extends
+    PersistentConfigurationStoreBaseTest {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(TestLeveldbConfigurationStore.class);
@@ -58,8 +57,6 @@
           System.getProperty("java.io.tmpdir")),
       TestLeveldbConfigurationStore.class.getName());
 
-  private ResourceManager rm;
-
   @Before
   public void setUp() throws Exception {
     super.setUp();
@@ -69,16 +66,6 @@
     conf.set(YarnConfiguration.RM_SCHEDCONF_STORE_PATH, TEST_DIR.toString());
   }
 
-  @Test
-  public void testVersioning() throws Exception {
-    confStore.initialize(conf, schedConf, rmContext);
-    assertNull(confStore.getConfStoreVersion());
-    confStore.checkVersion();
-    assertEquals(LeveldbConfigurationStore.CURRENT_VERSION_INFO,
-        confStore.getConfStoreVersion());
-    confStore.close();
-  }
-
   @Test(expected = YarnConfStoreVersionIncompatibleException.class)
   public void testIncompatibleVersion() throws Exception {
     try {
@@ -97,54 +84,11 @@
   }
 
   @Test
-  public void testPersistConfiguration() throws Exception {
-    schedConf.set("key", "val");
-    confStore.initialize(conf, schedConf, rmContext);
-    assertEquals("val", confStore.retrieve().get("key"));
-    confStore.close();
-
-    // Create a new configuration store, and check for old configuration
-    confStore = createConfStore();
-    schedConf.set("key", "badVal");
-    // Should ignore passed-in scheduler configuration.
-    confStore.initialize(conf, schedConf, rmContext);
-    assertEquals("val", confStore.retrieve().get("key"));
-    confStore.close();
-  }
-
-  @Test
-  public void testPersistUpdatedConfiguration() throws Exception {
-    confStore.initialize(conf, schedConf, rmContext);
-    assertNull(confStore.retrieve().get("key"));
-
-    Map<String, String> update = new HashMap<>();
-    update.put("key", "val");
-    YarnConfigurationStore.LogMutation mutation =
-        new YarnConfigurationStore.LogMutation(update, TEST_USER);
-    confStore.logMutation(mutation);
-    confStore.confirmMutation(mutation, true);
-    assertEquals("val", confStore.retrieve().get("key"));
-    confStore.close();
-
-    // Create a new configuration store, and check for updated configuration
-    confStore = createConfStore();
-    schedConf.set("key", "badVal");
-    // Should ignore passed-in scheduler configuration.
-    confStore.initialize(conf, schedConf, rmContext);
-    assertEquals("val", confStore.retrieve().get("key"));
-    confStore.close();
-  }
-
-  @Test
   public void testDisableAuditLogs() throws Exception {
     conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0);
     confStore.initialize(conf, schedConf, rmContext);
 
-    Map<String, String> update = new HashMap<>();
-    update.put("key1", "val1");
-    YarnConfigurationStore.LogMutation mutation =
-        new YarnConfigurationStore.LogMutation(update, TEST_USER);
-    confStore.logMutation(mutation);
+    prepareLogMutation("key1", "val1");
 
     boolean logKeyPresent = false;
     DB db = ((LeveldbConfigurationStore) confStore).getDB();
@@ -162,55 +106,6 @@
     confStore.close();
   }
 
-  @Test
-  public void testMaxLogs() throws Exception {
-    conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
-    confStore.initialize(conf, schedConf, rmContext);
-    LinkedList<YarnConfigurationStore.LogMutation> logs =
-        ((LeveldbConfigurationStore) confStore).getLogs();
-    assertEquals(0, logs.size());
-
-    Map<String, String> update1 = new HashMap<>();
-    update1.put("key1", "val1");
-    YarnConfigurationStore.LogMutation mutation =
-        new YarnConfigurationStore.LogMutation(update1, TEST_USER);
-    confStore.logMutation(mutation);
-    logs = ((LeveldbConfigurationStore) confStore).getLogs();
-    assertEquals(1, logs.size());
-    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-    confStore.confirmMutation(mutation, true);
-    assertEquals(1, logs.size());
-    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-
-    Map<String, String> update2 = new HashMap<>();
-    update2.put("key2", "val2");
-    mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
-    confStore.logMutation(mutation);
-    logs = ((LeveldbConfigurationStore) confStore).getLogs();
-    assertEquals(2, logs.size());
-    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-    assertEquals("val2", logs.get(1).getUpdates().get("key2"));
-    confStore.confirmMutation(mutation, true);
-    assertEquals(2, logs.size());
-    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-    assertEquals("val2", logs.get(1).getUpdates().get("key2"));
-
-    // Next update should purge first update from logs.
-    Map<String, String> update3 = new HashMap<>();
-    update3.put("key3", "val3");
-    mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);
-    confStore.logMutation(mutation);
-    logs = ((LeveldbConfigurationStore) confStore).getLogs();
-    assertEquals(2, logs.size());
-    assertEquals("val2", logs.get(0).getUpdates().get("key2"));
-    assertEquals("val3", logs.get(1).getUpdates().get("key3"));
-    confStore.confirmMutation(mutation, true);
-    assertEquals(2, logs.size());
-    assertEquals("val2", logs.get(0).getUpdates().get("key2"));
-    assertEquals("val3", logs.get(1).getUpdates().get("key3"));
-    confStore.close();
-  }
-
   /**
    * When restarting, RM should read from current state of store, including
    * any updates from the previous RM instance.
@@ -240,7 +135,7 @@
         .getConfStore().retrieve().get("key"));
     // Next update is not persisted, it should not be recovered
     schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
-    log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
     rm1.close();
 
     // Start RM2 and verifies it starts with updated configuration
@@ -258,4 +153,10 @@
   public YarnConfigurationStore createConfStore() {
     return new LeveldbConfigurationStore();
   }
+
+  @Override
+  Version getVersion() {
+    return LeveldbConfigurationStore.CURRENT_VERSION_INFO;
+  }
+
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
index 6715a05..b9a25802 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
@@ -18,13 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
 
-import org.apache.hadoop.util.curator.ZKCuratorManager;
-import org.apache.hadoop.yarn.server.records.Version;
-import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryNTimes;
@@ -34,8 +27,11 @@
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
@@ -45,14 +41,17 @@
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
 import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -63,8 +62,8 @@
 /**
  * Tests {@link ZKConfigurationStore}.
  */
-public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
-
+public class TestZKConfigurationStore extends
+    PersistentConfigurationStoreBaseTest {
   public static final Logger LOG =
       LoggerFactory.getLogger(TestZKConfigurationStore.class);
 
@@ -90,6 +89,7 @@
   }
 
   @Before
+  @Override
   public void setUp() throws Exception {
     super.setUp();
     curatorTestingServer = setupCuratorServer();
@@ -109,15 +109,6 @@
     curatorTestingServer.stop();
   }
 
-  @Test
-  public void testVersioning() throws Exception {
-    confStore.initialize(conf, schedConf, rmContext);
-    assertNull(confStore.getConfStoreVersion());
-    confStore.checkVersion();
-    assertEquals(ZKConfigurationStore.CURRENT_VERSION_INFO,
-        confStore.getConfStoreVersion());
-  }
-
   @Test(expected = YarnConfStoreVersionIncompatibleException.class)
   public void testIncompatibleVersion() throws Exception {
     confStore.initialize(conf, schedConf, rmContext);
@@ -142,23 +133,6 @@
   }
 
   @Test
-  public void testPersistConfiguration() throws Exception {
-    schedConf.set("key", "val");
-    confStore.initialize(conf, schedConf, rmContext);
-    assertEquals("val", confStore.retrieve().get("key"));
-
-    assertNull(confStore.retrieve().get(YarnConfiguration.RM_HOSTNAME));
-
-    // Create a new configuration store, and check for old configuration
-    confStore = createConfStore();
-    schedConf.set("key", "badVal");
-    // Should ignore passed-in scheduler configuration.
-    confStore.initialize(conf, schedConf, rmContext);
-    assertEquals("val", confStore.retrieve().get("key"));
-  }
-
-
-  @Test
   public void testFormatConfiguration() throws Exception {
     schedConf.set("key", "val");
     confStore.initialize(conf, schedConf, rmContext);
@@ -168,90 +142,6 @@
   }
 
   @Test
-  public void testGetConfigurationVersion() throws Exception {
-    confStore.initialize(conf, schedConf, rmContext);
-    long v1 = confStore.getConfigVersion();
-    assertEquals(1, v1);
-    Map<String, String> update = new HashMap<>();
-    update.put("keyver", "valver");
-    YarnConfigurationStore.LogMutation mutation =
-        new YarnConfigurationStore.LogMutation(update, TEST_USER);
-    confStore.logMutation(mutation);
-    confStore.confirmMutation(mutation, true);
-    long v2 = confStore.getConfigVersion();
-    assertEquals(2, v2);
-  }
-
-  @Test
-  public void testPersistUpdatedConfiguration() throws Exception {
-    confStore.initialize(conf, schedConf, rmContext);
-    assertNull(confStore.retrieve().get("key"));
-
-    Map<String, String> update = new HashMap<>();
-    update.put("key", "val");
-    LogMutation mutation = new LogMutation(update, TEST_USER);
-    confStore.logMutation(mutation);
-    confStore.confirmMutation(mutation, true);
-    assertEquals("val", confStore.retrieve().get("key"));
-
-    // Create a new configuration store, and check for updated configuration
-    confStore = createConfStore();
-    schedConf.set("key", "badVal");
-    // Should ignore passed-in scheduler configuration.
-    confStore.initialize(conf, schedConf, rmContext);
-    assertEquals("val", confStore.retrieve().get("key"));
-  }
-
-  @Test
-  public void testMaxLogs() throws Exception {
-    conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
-    confStore.initialize(conf, schedConf, rmContext);
-    LinkedList<YarnConfigurationStore.LogMutation> logs =
-        ((ZKConfigurationStore) confStore).getLogs();
-    assertEquals(0, logs.size());
-
-    Map<String, String> update1 = new HashMap<>();
-    update1.put("key1", "val1");
-    YarnConfigurationStore.LogMutation mutation =
-        new YarnConfigurationStore.LogMutation(update1, TEST_USER);
-    confStore.logMutation(mutation);
-    logs = ((ZKConfigurationStore) confStore).getLogs();
-    assertEquals(1, logs.size());
-    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-    confStore.confirmMutation(mutation, true);
-    assertEquals(1, logs.size());
-    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-
-    Map<String, String> update2 = new HashMap<>();
-    update2.put("key2", "val2");
-    mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
-    confStore.logMutation(mutation);
-    logs = ((ZKConfigurationStore) confStore).getLogs();
-    assertEquals(2, logs.size());
-    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-    assertEquals("val2", logs.get(1).getUpdates().get("key2"));
-    confStore.confirmMutation(mutation, true);
-    assertEquals(2, logs.size());
-    assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-    assertEquals("val2", logs.get(1).getUpdates().get("key2"));
-
-    // Next update should purge first update from logs.
-    Map<String, String> update3 = new HashMap<>();
-    update3.put("key3", "val3");
-    mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);
-    confStore.logMutation(mutation);
-    logs = ((ZKConfigurationStore) confStore).getLogs();
-    assertEquals(2, logs.size());
-    assertEquals("val2", logs.get(0).getUpdates().get("key2"));
-    assertEquals("val3", logs.get(1).getUpdates().get("key3"));
-    confStore.confirmMutation(mutation, true);
-    assertEquals(2, logs.size());
-    assertEquals("val2", logs.get(0).getUpdates().get("key2"));
-    assertEquals("val3", logs.get(1).getUpdates().get("key3"));
-  }
-
-
-  @Test
   public void testDisableAuditLogs() throws Exception {
     conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0);
     confStore.initialize(conf, schedConf, rmContext);
@@ -262,11 +152,7 @@
     byte[] data = null;
     ((ZKConfigurationStore) confStore).zkManager.setData(logsPath, data, -1);
 
-    Map<String, String> update = new HashMap<>();
-    update.put("key1", "val1");
-    YarnConfigurationStore.LogMutation mutation =
-        new YarnConfigurationStore.LogMutation(update, TEST_USER);
-    confStore.logMutation(mutation);
+    prepareLogMutation("key1", "val1");
 
     data = ((ZKConfigurationStore) confStore).zkManager.getData(logsPath);
     assertNull("Failed to Disable Audit Logs", data);
@@ -489,4 +375,9 @@
   public YarnConfigurationStore createConfStore() {
     return new ZKConfigurationStore();
   }
+
+  @Override
+  Version getVersion() {
+    return ZKConfigurationStore.CURRENT_VERSION_INFO;
+  }
 }