blob: 8350fa928d288b19262f84f1a48ed1cfe6c77f4b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.DBManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBComparator;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
/**
* A LevelDB implementation of {@link YarnConfigurationStore}.
*/
public class LeveldbConfigurationStore extends YarnConfigurationStore {
public static final Logger LOG =
LoggerFactory.getLogger(LeveldbConfigurationStore.class);
private static final String DB_NAME = "yarn-conf-store";
private static final String LOG_KEY = "log";
private static final String VERSION_KEY = "version";
private static final String CONF_VERSION_NAME = "conf-version-store";
private static final String CONF_VERSION_KEY = "conf-version";
private DB db;
private DBManager dbManager;
private DBManager versionDbManager;
private DB versionDb;
private long maxLogs;
private Configuration conf;
private Configuration initSchedConf;
@VisibleForTesting
protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(0, 1);
@Override
public void initialize(Configuration config, Configuration schedConf,
RMContext rmContext) throws IOException {
this.conf = config;
this.initSchedConf = schedConf;
this.dbManager = new DBManager();
this.versionDbManager = new DBManager();
try {
initDatabase();
this.maxLogs = config.getLong(
YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
long compactionIntervalMsec = config.getLong(
YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS,
YarnConfiguration
.DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
dbManager.startCompactionTimer(compactionIntervalMsec,
this.getClass().getSimpleName());
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public void format() throws Exception {
close();
FileSystem fs = FileSystem.getLocal(conf);
fs.delete(getStorageDir(DB_NAME), true);
}
private void initDatabase() throws Exception {
Path confVersion = createStorageDir(CONF_VERSION_NAME);
Options confOptions = new Options();
confOptions.createIfMissing(false);
File confVersionFile = new File(confVersion.toString());
versionDb = versionDbManager.initDatabase(confVersionFile, confOptions,
this::initVersionDb);
Path storeRoot = createStorageDir(DB_NAME);
Options options = new Options();
options.createIfMissing(false);
options.comparator(new DBComparator() {
@Override
public int compare(byte[] key1, byte[] key2) {
String key1Str = new String(key1, StandardCharsets.UTF_8);
String key2Str = new String(key2, StandardCharsets.UTF_8);
if (key1Str.equals(key2Str)) {
return 0;
} else if (key1Str.equals(VERSION_KEY)) {
return 1;
} else if (key2Str.equals(VERSION_KEY)) {
return -1;
} else if (key1Str.equals(LOG_KEY)) {
return 1;
} else if (key2Str.equals(LOG_KEY)) {
return -1;
}
return key1Str.compareTo(key2Str);
}
@Override
public String name() {
return "keyComparator";
}
public byte[] findShortestSeparator(byte[] start, byte[] limit) {
return start;
}
public byte[] findShortSuccessor(byte[] key) {
return key;
}
});
LOG.info("Using conf database at {}", storeRoot);
File dbFile = new File(storeRoot.toString());
db = dbManager.initDatabase(dbFile, options, this::initDb);
}
private void initVersionDb(DB database) {
database.put(bytes(CONF_VERSION_KEY), bytes(String.valueOf(0)));
}
private void initDb(DB database) {
WriteBatch initBatch = database.createWriteBatch();
for (Map.Entry<String, String> kv : initSchedConf) {
initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
}
database.write(initBatch);
increaseConfigVersion();
}
private Path createStorageDir(String storageName) throws IOException {
Path root = getStorageDir(storageName);
FileSystem fs = FileSystem.getLocal(conf);
fs.mkdirs(root, new FsPermission((short) 0700));
return root;
}
private Path getStorageDir(String storageName) throws IOException {
String storePath = conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
if (storePath == null) {
throw new IOException("No store location directory configured in " +
YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
}
return new Path(storePath, storageName);
}
@Override
public void close() throws IOException {
dbManager.close();
versionDbManager.close();
}
@Override
public void logMutation(LogMutation logMutation) throws IOException {
if (maxLogs > 0) {
LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY)));
logs.add(logMutation);
if (logs.size() > maxLogs) {
logs.removeFirst();
}
db.put(bytes(LOG_KEY), serLogMutations(logs));
}
}
@Override
public void confirmMutation(LogMutation pendingMutation,
boolean isValid) {
WriteBatch updateBatch = db.createWriteBatch();
if (isValid) {
for (Map.Entry<String, String> changes :
pendingMutation.getUpdates().entrySet()) {
if (changes.getValue() == null || changes.getValue().isEmpty()) {
updateBatch.delete(bytes(changes.getKey()));
} else {
updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue()));
}
}
increaseConfigVersion();
}
db.write(updateBatch);
}
private byte[] serLogMutations(LinkedList<LogMutation> mutations) throws
IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutput oos = new ObjectOutputStream(baos)) {
oos.writeObject(mutations);
oos.flush();
return baos.toByteArray();
}
}
// Because of type erasure casting to LinkedList<LogMutation> will be
// unchecked. A way around that would be to iterate over the logMutations
// which is overkill in this case.
@SuppressWarnings("unchecked")
private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws
IOException {
if (mutations == null) {
return new LinkedList<>();
}
try (ObjectInput input = new ObjectInputStream(
new ByteArrayInputStream(mutations))) {
return (LinkedList<LogMutation>) input.readObject();
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
@Override
public synchronized Configuration retrieve() {
DBIterator itr = db.iterator();
itr.seekToFirst();
Configuration config = new Configuration(false);
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
String value = new String(entry.getValue(), StandardCharsets.UTF_8);
if (key.equals(LOG_KEY) || key.equals(VERSION_KEY)) {
break;
}
config.set(key, value);
}
return config;
}
private void increaseConfigVersion() {
long configVersion = getConfigVersion() + 1L;
versionDb.put(bytes(CONF_VERSION_KEY),
bytes(String.valueOf(configVersion)));
}
@Override
public long getConfigVersion() {
String version = new String(versionDb.get(bytes(CONF_VERSION_KEY)),
StandardCharsets.UTF_8);
return Long.parseLong(version);
}
@Override
public List<LogMutation> getConfirmedConfHistory(long fromId) {
return null; // unimplemented
}
@Override
public Version getConfStoreVersion() throws Exception {
return dbManager.loadVersion(VERSION_KEY);
}
@VisibleForTesting
@Override
protected LinkedList<LogMutation> getLogs() throws Exception {
return deserLogMutations(db.get(bytes(LOG_KEY)));
}
@VisibleForTesting
protected DB getDB() {
return db;
}
@Override
public void storeVersion() throws Exception {
try {
storeVersion(CURRENT_VERSION_INFO);
} catch (DBException e) {
throw new IOException(e);
}
}
@VisibleForTesting
protected void storeVersion(Version version) {
dbManager.storeVersion(VERSION_KEY, version);
}
@Override
public Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
}