YARN-10002. Code cleanup and improvements in ConfigurationStoreBaseTest. Contributed by Benjamin Teke
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
index eeb38d3..97d9933 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -337,6 +338,12 @@
}
@Override
+ protected LinkedList<LogMutation> getLogs() {
+ // Unimplemented.
+ return null;
+ }
+
+ @Override
protected Version getConfStoreVersion() throws Exception {
return null;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
index d031ea9..dc2c724 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -87,6 +88,12 @@
}
@Override
+ protected LinkedList<LogMutation> getLogs() {
+ // Unimplemented.
+ return null;
+ }
+
+ @Override
public Version getConfStoreVersion() throws Exception {
// Does nothing.
return null;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
index b43bcb8..f90bdea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -333,6 +333,7 @@
}
@VisibleForTesting
+ @Override
protected LinkedList<LogMutation> getLogs() throws Exception {
return deserLogMutations(db.get(bytes(LOG_KEY)));
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index e8e3ecf..8ab3e44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -27,6 +27,7 @@
import java.io.IOException;
import java.io.Serializable;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -156,6 +157,13 @@
protected abstract Version getConfStoreVersion() throws Exception;
/**
+ * Get a list of configuration mutations.
+ * @return list of configuration mutations.
+ * @throws Exception On mutation fetch failure
+ */
+ protected abstract LinkedList<LogMutation> getLogs() throws Exception;
+
+ /**
* Persist the hard-coded schema version to the conf store.
* @throws Exception On storage failure
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
index 0fa48b4..fae4929 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
@@ -119,6 +119,7 @@
}
@VisibleForTesting
+ @Override
protected LinkedList<LogMutation> getLogs() throws Exception {
return (LinkedList<LogMutation>)
deserializeObject(zkManager.getData(logsPath));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
index 4b3153a..3a8c362 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
@@ -34,16 +34,13 @@
* Base class for {@link YarnConfigurationStore} implementations.
*/
public abstract class ConfigurationStoreBaseTest {
+ static final String TEST_USER = "testUser";
+ YarnConfigurationStore confStore = createConfStore();
+ Configuration conf;
+ Configuration schedConf;
+ RMContext rmContext;
- protected YarnConfigurationStore confStore = createConfStore();
-
- protected abstract YarnConfigurationStore createConfStore();
-
- protected Configuration conf;
- protected Configuration schedConf;
- protected RMContext rmContext;
-
- protected static final String TEST_USER = "testUser";
+ abstract YarnConfigurationStore createConfStore();
@Before
public void setUp() throws Exception {
@@ -59,20 +56,12 @@
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val1", confStore.retrieve().get("key1"));
- Map<String, String> update1 = new HashMap<>();
- update1.put("keyUpdate1", "valUpdate1");
- YarnConfigurationStore.LogMutation mutation1 =
- new YarnConfigurationStore.LogMutation(update1, TEST_USER);
- confStore.logMutation(mutation1);
- confStore.confirmMutation(mutation1, true);
+ confStore.confirmMutation(prepareLogMutation("keyUpdate1", "valUpdate1"),
+ true);
assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
- Map<String, String> update2 = new HashMap<>();
- update2.put("keyUpdate2", "valUpdate2");
- YarnConfigurationStore.LogMutation mutation2 =
- new YarnConfigurationStore.LogMutation(update2, TEST_USER);
- confStore.logMutation(mutation2);
- confStore.confirmMutation(mutation2, false);
+ confStore.confirmMutation(prepareLogMutation("keyUpdate2", "valUpdate2"),
+ false);
assertNull("Configuration should not be updated",
confStore.retrieve().get("keyUpdate2"));
confStore.close();
@@ -84,13 +73,20 @@
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
+ confStore.confirmMutation(prepareLogMutation("key", null), true);
+ assertNull(confStore.retrieve().get("key"));
+ confStore.close();
+ }
+
+ YarnConfigurationStore.LogMutation prepareLogMutation(String key,
+ String value)
+ throws Exception {
Map<String, String> update = new HashMap<>();
- update.put("key", null);
+ update.put(key, value);
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
- confStore.confirmMutation(mutation, true);
- assertNull(confStore.retrieve().get("key"));
- confStore.close();
+
+ return mutation;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java
new file mode 100644
index 0000000..169c36d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.junit.Test;
+
+import java.util.LinkedList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Base class for the persistent {@link YarnConfigurationStore}
+ * implementations, namely {@link TestLeveldbConfigurationStore} and
+ * {@link TestZKConfigurationStore}.
+ */
+public abstract class PersistentConfigurationStoreBaseTest extends
+ ConfigurationStoreBaseTest {
+
+ abstract Version getVersion();
+
+ @Test
+ public void testGetConfigurationVersion() throws Exception {
+ confStore.initialize(conf, schedConf, rmContext);
+ long v1 = confStore.getConfigVersion();
+ assertEquals(1, v1);
+ confStore.confirmMutation(prepareLogMutation("keyver", "valver"), true);
+ long v2 = confStore.getConfigVersion();
+ assertEquals(2, v2);
+ confStore.close();
+ }
+
+ @Test
+ public void testPersistConfiguration() throws Exception {
+ schedConf.set("key", "val");
+ confStore.initialize(conf, schedConf, rmContext);
+ assertEquals("val", confStore.retrieve().get("key"));
+ confStore.close();
+
+ // Create a new configuration store, and check for old configuration
+ confStore = createConfStore();
+ schedConf.set("key", "badVal");
+ // Should ignore passed-in scheduler configuration.
+ confStore.initialize(conf, schedConf, rmContext);
+ assertEquals("val", confStore.retrieve().get("key"));
+ confStore.close();
+ }
+
+ @Test
+ public void testPersistUpdatedConfiguration() throws Exception {
+ confStore.initialize(conf, schedConf, rmContext);
+ assertNull(confStore.retrieve().get("key"));
+
+ confStore.confirmMutation(prepareLogMutation("key", "val"), true);
+ assertEquals("val", confStore.retrieve().get("key"));
+ confStore.close();
+
+ // Create a new configuration store, and check for updated configuration
+ confStore = createConfStore();
+ schedConf.set("key", "badVal");
+ // Should ignore passed-in scheduler configuration.
+ confStore.initialize(conf, schedConf, rmContext);
+ assertEquals("val", confStore.retrieve().get("key"));
+ confStore.close();
+ }
+
+
+ @Test
+ public void testVersion() throws Exception {
+ confStore.initialize(conf, schedConf, rmContext);
+ assertNull(confStore.getConfStoreVersion());
+ confStore.checkVersion();
+ assertEquals(getVersion(),
+ confStore.getConfStoreVersion());
+ confStore.close();
+ }
+
+ @Test
+ public void testMaxLogs() throws Exception {
+ conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
+ confStore.initialize(conf, schedConf, rmContext);
+ LinkedList<YarnConfigurationStore.LogMutation> logs = confStore.getLogs();
+ assertEquals(0, logs.size());
+
+ YarnConfigurationStore.LogMutation mutation =
+ prepareLogMutation("key1", "val1");
+ logs = confStore.getLogs();
+ assertEquals(1, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+ confStore.confirmMutation(mutation, true);
+ assertEquals(1, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+
+ mutation = prepareLogMutation("key2", "val2");
+ logs = confStore.getLogs();
+ assertEquals(2, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+ assertEquals("val2", logs.get(1).getUpdates().get("key2"));
+ confStore.confirmMutation(mutation, true);
+ assertEquals(2, logs.size());
+ assertEquals("val1", logs.get(0).getUpdates().get("key1"));
+ assertEquals("val2", logs.get(1).getUpdates().get("key2"));
+
+ // Next update should purge first update from logs.
+ mutation = prepareLogMutation("key3", "val3");
+ logs = confStore.getLogs();
+ assertEquals(2, logs.size());
+ assertEquals("val2", logs.get(0).getUpdates().get("key2"));
+ assertEquals("val3", logs.get(1).getUpdates().get("key3"));
+ confStore.confirmMutation(mutation, true);
+ assertEquals(2, logs.size());
+ assertEquals("val2", logs.get(0).getUpdates().get("key2"));
+ assertEquals("val3", logs.get(1).getUpdates().get("key3"));
+ confStore.close();
+ }
+
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
index e4ca3d3..f5da5ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
@@ -32,19 +32,22 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
+import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
/**
* Tests {@link FSSchedulerConfigurationStore}.
*/
public class TestFSSchedulerConfigurationStore {
+ private static final String TEST_USER = "test";
private FSSchedulerConfigurationStore configurationStore;
private Configuration conf;
private File testSchedulerConfigurationDir;
@@ -90,35 +93,29 @@
Configuration storeConf = configurationStore.retrieve();
compareConfig(conf, storeConf);
- Map<String, String> updates = new HashMap<>();
- updates.put("a", null);
- updates.put("b", "bb");
-
Configuration expectConfig = new Configuration(conf);
expectConfig.unset("a");
expectConfig.set("b", "bb");
- LogMutation logMutation = new LogMutation(updates, "test");
- configurationStore.logMutation(logMutation);
- configurationStore.confirmMutation(logMutation, true);
+ prepareParameterizedLogMutation(configurationStore, true,
+ "a", null, "b", "bb");
storeConf = configurationStore.retrieve();
- assertEquals(null, storeConf.get("a"));
+ assertNull(storeConf.get("a"));
assertEquals("bb", storeConf.get("b"));
assertEquals("c", storeConf.get("c"));
compareConfig(expectConfig, storeConf);
- updates.put("b", "bbb");
- configurationStore.logMutation(logMutation);
- configurationStore.confirmMutation(logMutation, true);
+ prepareParameterizedLogMutation(configurationStore, true,
+ "a", null, "b", "bbb");
storeConf = configurationStore.retrieve();
- assertEquals(null, storeConf.get("a"));
+ assertNull(storeConf.get("a"));
assertEquals("bbb", storeConf.get("b"));
assertEquals("c", storeConf.get("c"));
}
@Test
- public void confirmMutationWithInValid() throws Exception {
+ public void confirmMutationWithInvalid() throws Exception {
conf.set("a", "a");
conf.set("b", "b");
conf.set("c", "c");
@@ -127,13 +124,8 @@
Configuration storeConf = configurationStore.retrieve();
compareConfig(conf, storeConf);
- Map<String, String> updates = new HashMap<>();
- updates.put("a", null);
- updates.put("b", "bb");
-
- LogMutation logMutation = new LogMutation(updates, "test");
- configurationStore.logMutation(logMutation);
- configurationStore.confirmMutation(logMutation, false);
+ prepareParameterizedLogMutation(configurationStore, false,
+ "a", null, "b", "bb");
storeConf = configurationStore.retrieve();
compareConfig(conf, storeConf);
@@ -142,7 +134,7 @@
@Test
public void testFileSystemClose() throws Exception {
MiniDFSCluster hdfsCluster = null;
- FileSystem fs = null;
+ FileSystem fs;
Path path = new Path("/tmp/confstore");
try {
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
@@ -164,11 +156,8 @@
fs.close();
try {
- Map<String, String> updates = new HashMap<>();
- updates.put("testkey", "testvalue");
- LogMutation logMutation = new LogMutation(updates, "test");
- configStore.logMutation(logMutation);
- configStore.confirmMutation(logMutation, true);
+ prepareParameterizedLogMutation(configStore, true,
+ "testkey", "testvalue");
} catch (IOException e) {
if (e.getMessage().contains("Filesystem closed")) {
fail("FSSchedulerConfigurationStore failed to handle " +
@@ -178,13 +167,12 @@
}
}
} finally {
+ assert hdfsCluster != null;
fs = hdfsCluster.getFileSystem();
if (fs.exists(path)) {
fs.delete(path, true);
}
- if (hdfsCluster != null) {
- hdfsCluster.shutdown();
- }
+ hdfsCluster.shutdown();
}
}
@@ -197,15 +185,14 @@
Configuration storedConfig = configurationStore.retrieve();
assertEquals("a", storedConfig.get("a"));
configurationStore.format();
- boolean exceptionCaught = false;
try {
- storedConfig = configurationStore.retrieve();
+ configurationStore.retrieve();
+ fail("Expected an IOException with message containing \"no capacity " +
+ "scheduler file in\" to be thrown");
} catch (IOException e) {
- if (e.getMessage().contains("no capacity scheduler file in")) {
- exceptionCaught = true;
- }
+ assertThat(e.getMessage(),
+ CoreMatchers.containsString("no capacity scheduler file in"));
}
- assertTrue(exceptionCaught);
}
@Test
@@ -243,4 +230,27 @@
schedulerConf.get(entry.getKey()));
}
}
+
+ private void prepareParameterizedLogMutation(
+ FSSchedulerConfigurationStore configStore,
+ boolean validityFlag, String... values) throws Exception {
+ Map<String, String> updates = new HashMap<>();
+ String key;
+ String value;
+
+ if (values.length % 2 != 0) {
+ throw new IllegalArgumentException("The number of parameters should be " +
+ "even.");
+ }
+
+ for (int i = 1; i <= values.length; i += 2) {
+ key = values[i - 1];
+ value = values[i];
+ updates.put(key, value);
+ }
+
+ LogMutation logMutation = new LogMutation(updates, TEST_USER);
+ configStore.logMutation(logMutation);
+ configStore.confirmMutation(logMutation, validityFlag);
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
index 785a910..3565ed1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
@@ -38,8 +38,6 @@
import java.io.File;
import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.LinkedList;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -49,7 +47,8 @@
/**
* Tests {@link LeveldbConfigurationStore}.
*/
-public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
+public class TestLeveldbConfigurationStore extends
+ PersistentConfigurationStoreBaseTest {
public static final Logger LOG =
LoggerFactory.getLogger(TestLeveldbConfigurationStore.class);
@@ -58,8 +57,6 @@
System.getProperty("java.io.tmpdir")),
TestLeveldbConfigurationStore.class.getName());
- private ResourceManager rm;
-
@Before
public void setUp() throws Exception {
super.setUp();
@@ -69,16 +66,6 @@
conf.set(YarnConfiguration.RM_SCHEDCONF_STORE_PATH, TEST_DIR.toString());
}
- @Test
- public void testVersioning() throws Exception {
- confStore.initialize(conf, schedConf, rmContext);
- assertNull(confStore.getConfStoreVersion());
- confStore.checkVersion();
- assertEquals(LeveldbConfigurationStore.CURRENT_VERSION_INFO,
- confStore.getConfStoreVersion());
- confStore.close();
- }
-
@Test(expected = YarnConfStoreVersionIncompatibleException.class)
public void testIncompatibleVersion() throws Exception {
try {
@@ -97,54 +84,11 @@
}
@Test
- public void testPersistConfiguration() throws Exception {
- schedConf.set("key", "val");
- confStore.initialize(conf, schedConf, rmContext);
- assertEquals("val", confStore.retrieve().get("key"));
- confStore.close();
-
- // Create a new configuration store, and check for old configuration
- confStore = createConfStore();
- schedConf.set("key", "badVal");
- // Should ignore passed-in scheduler configuration.
- confStore.initialize(conf, schedConf, rmContext);
- assertEquals("val", confStore.retrieve().get("key"));
- confStore.close();
- }
-
- @Test
- public void testPersistUpdatedConfiguration() throws Exception {
- confStore.initialize(conf, schedConf, rmContext);
- assertNull(confStore.retrieve().get("key"));
-
- Map<String, String> update = new HashMap<>();
- update.put("key", "val");
- YarnConfigurationStore.LogMutation mutation =
- new YarnConfigurationStore.LogMutation(update, TEST_USER);
- confStore.logMutation(mutation);
- confStore.confirmMutation(mutation, true);
- assertEquals("val", confStore.retrieve().get("key"));
- confStore.close();
-
- // Create a new configuration store, and check for updated configuration
- confStore = createConfStore();
- schedConf.set("key", "badVal");
- // Should ignore passed-in scheduler configuration.
- confStore.initialize(conf, schedConf, rmContext);
- assertEquals("val", confStore.retrieve().get("key"));
- confStore.close();
- }
-
- @Test
public void testDisableAuditLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0);
confStore.initialize(conf, schedConf, rmContext);
- Map<String, String> update = new HashMap<>();
- update.put("key1", "val1");
- YarnConfigurationStore.LogMutation mutation =
- new YarnConfigurationStore.LogMutation(update, TEST_USER);
- confStore.logMutation(mutation);
+ prepareLogMutation("key1", "val1");
boolean logKeyPresent = false;
DB db = ((LeveldbConfigurationStore) confStore).getDB();
@@ -162,55 +106,6 @@
confStore.close();
}
- @Test
- public void testMaxLogs() throws Exception {
- conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
- confStore.initialize(conf, schedConf, rmContext);
- LinkedList<YarnConfigurationStore.LogMutation> logs =
- ((LeveldbConfigurationStore) confStore).getLogs();
- assertEquals(0, logs.size());
-
- Map<String, String> update1 = new HashMap<>();
- update1.put("key1", "val1");
- YarnConfigurationStore.LogMutation mutation =
- new YarnConfigurationStore.LogMutation(update1, TEST_USER);
- confStore.logMutation(mutation);
- logs = ((LeveldbConfigurationStore) confStore).getLogs();
- assertEquals(1, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
- confStore.confirmMutation(mutation, true);
- assertEquals(1, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-
- Map<String, String> update2 = new HashMap<>();
- update2.put("key2", "val2");
- mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
- confStore.logMutation(mutation);
- logs = ((LeveldbConfigurationStore) confStore).getLogs();
- assertEquals(2, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
- assertEquals("val2", logs.get(1).getUpdates().get("key2"));
- confStore.confirmMutation(mutation, true);
- assertEquals(2, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
- assertEquals("val2", logs.get(1).getUpdates().get("key2"));
-
- // Next update should purge first update from logs.
- Map<String, String> update3 = new HashMap<>();
- update3.put("key3", "val3");
- mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);
- confStore.logMutation(mutation);
- logs = ((LeveldbConfigurationStore) confStore).getLogs();
- assertEquals(2, logs.size());
- assertEquals("val2", logs.get(0).getUpdates().get("key2"));
- assertEquals("val3", logs.get(1).getUpdates().get("key3"));
- confStore.confirmMutation(mutation, true);
- assertEquals(2, logs.size());
- assertEquals("val2", logs.get(0).getUpdates().get("key2"));
- assertEquals("val3", logs.get(1).getUpdates().get("key3"));
- confStore.close();
- }
-
/**
* When restarting, RM should read from current state of store, including
* any updates from the previous RM instance.
@@ -240,7 +135,7 @@
.getConfStore().retrieve().get("key"));
// Next update is not persisted, it should not be recovered
schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
- log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+ confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
rm1.close();
// Start RM2 and verifies it starts with updated configuration
@@ -258,4 +153,10 @@
public YarnConfigurationStore createConfStore() {
return new LeveldbConfigurationStore();
}
+
+ @Override
+ Version getVersion() {
+ return LeveldbConfigurationStore.CURRENT_VERSION_INFO;
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
index 6715a05..b9a25802 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java
@@ -18,13 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
-import org.apache.hadoop.util.curator.ZKCuratorManager;
-import org.apache.hadoop.yarn.server.records.Version;
-import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
@@ -34,8 +27,11 @@
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
@@ -45,14 +41,17 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -63,8 +62,8 @@
/**
* Tests {@link ZKConfigurationStore}.
*/
-public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
-
+public class TestZKConfigurationStore extends
+ PersistentConfigurationStoreBaseTest {
public static final Logger LOG =
LoggerFactory.getLogger(TestZKConfigurationStore.class);
@@ -90,6 +89,7 @@
}
@Before
+ @Override
public void setUp() throws Exception {
super.setUp();
curatorTestingServer = setupCuratorServer();
@@ -109,15 +109,6 @@
curatorTestingServer.stop();
}
- @Test
- public void testVersioning() throws Exception {
- confStore.initialize(conf, schedConf, rmContext);
- assertNull(confStore.getConfStoreVersion());
- confStore.checkVersion();
- assertEquals(ZKConfigurationStore.CURRENT_VERSION_INFO,
- confStore.getConfStoreVersion());
- }
-
@Test(expected = YarnConfStoreVersionIncompatibleException.class)
public void testIncompatibleVersion() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
@@ -142,23 +133,6 @@
}
@Test
- public void testPersistConfiguration() throws Exception {
- schedConf.set("key", "val");
- confStore.initialize(conf, schedConf, rmContext);
- assertEquals("val", confStore.retrieve().get("key"));
-
- assertNull(confStore.retrieve().get(YarnConfiguration.RM_HOSTNAME));
-
- // Create a new configuration store, and check for old configuration
- confStore = createConfStore();
- schedConf.set("key", "badVal");
- // Should ignore passed-in scheduler configuration.
- confStore.initialize(conf, schedConf, rmContext);
- assertEquals("val", confStore.retrieve().get("key"));
- }
-
-
- @Test
public void testFormatConfiguration() throws Exception {
schedConf.set("key", "val");
confStore.initialize(conf, schedConf, rmContext);
@@ -168,90 +142,6 @@
}
@Test
- public void testGetConfigurationVersion() throws Exception {
- confStore.initialize(conf, schedConf, rmContext);
- long v1 = confStore.getConfigVersion();
- assertEquals(1, v1);
- Map<String, String> update = new HashMap<>();
- update.put("keyver", "valver");
- YarnConfigurationStore.LogMutation mutation =
- new YarnConfigurationStore.LogMutation(update, TEST_USER);
- confStore.logMutation(mutation);
- confStore.confirmMutation(mutation, true);
- long v2 = confStore.getConfigVersion();
- assertEquals(2, v2);
- }
-
- @Test
- public void testPersistUpdatedConfiguration() throws Exception {
- confStore.initialize(conf, schedConf, rmContext);
- assertNull(confStore.retrieve().get("key"));
-
- Map<String, String> update = new HashMap<>();
- update.put("key", "val");
- LogMutation mutation = new LogMutation(update, TEST_USER);
- confStore.logMutation(mutation);
- confStore.confirmMutation(mutation, true);
- assertEquals("val", confStore.retrieve().get("key"));
-
- // Create a new configuration store, and check for updated configuration
- confStore = createConfStore();
- schedConf.set("key", "badVal");
- // Should ignore passed-in scheduler configuration.
- confStore.initialize(conf, schedConf, rmContext);
- assertEquals("val", confStore.retrieve().get("key"));
- }
-
- @Test
- public void testMaxLogs() throws Exception {
- conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
- confStore.initialize(conf, schedConf, rmContext);
- LinkedList<YarnConfigurationStore.LogMutation> logs =
- ((ZKConfigurationStore) confStore).getLogs();
- assertEquals(0, logs.size());
-
- Map<String, String> update1 = new HashMap<>();
- update1.put("key1", "val1");
- YarnConfigurationStore.LogMutation mutation =
- new YarnConfigurationStore.LogMutation(update1, TEST_USER);
- confStore.logMutation(mutation);
- logs = ((ZKConfigurationStore) confStore).getLogs();
- assertEquals(1, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
- confStore.confirmMutation(mutation, true);
- assertEquals(1, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-
- Map<String, String> update2 = new HashMap<>();
- update2.put("key2", "val2");
- mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
- confStore.logMutation(mutation);
- logs = ((ZKConfigurationStore) confStore).getLogs();
- assertEquals(2, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
- assertEquals("val2", logs.get(1).getUpdates().get("key2"));
- confStore.confirmMutation(mutation, true);
- assertEquals(2, logs.size());
- assertEquals("val1", logs.get(0).getUpdates().get("key1"));
- assertEquals("val2", logs.get(1).getUpdates().get("key2"));
-
- // Next update should purge first update from logs.
- Map<String, String> update3 = new HashMap<>();
- update3.put("key3", "val3");
- mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);
- confStore.logMutation(mutation);
- logs = ((ZKConfigurationStore) confStore).getLogs();
- assertEquals(2, logs.size());
- assertEquals("val2", logs.get(0).getUpdates().get("key2"));
- assertEquals("val3", logs.get(1).getUpdates().get("key3"));
- confStore.confirmMutation(mutation, true);
- assertEquals(2, logs.size());
- assertEquals("val2", logs.get(0).getUpdates().get("key2"));
- assertEquals("val3", logs.get(1).getUpdates().get("key3"));
- }
-
-
- @Test
public void testDisableAuditLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0);
confStore.initialize(conf, schedConf, rmContext);
@@ -262,11 +152,7 @@
byte[] data = null;
((ZKConfigurationStore) confStore).zkManager.setData(logsPath, data, -1);
- Map<String, String> update = new HashMap<>();
- update.put("key1", "val1");
- YarnConfigurationStore.LogMutation mutation =
- new YarnConfigurationStore.LogMutation(update, TEST_USER);
- confStore.logMutation(mutation);
+ prepareLogMutation("key1", "val1");
data = ((ZKConfigurationStore) confStore).zkManager.getData(logsPath);
assertNull("Failed to Disable Audit Logs", data);
@@ -489,4 +375,9 @@
public YarnConfigurationStore createConfStore() {
return new ZKConfigurationStore();
}
+
+ @Override
+ Version getVersion() {
+ return ZKConfigurationStore.CURRENT_VERSION_INFO;
+ }
}