blob: b43bcb839c0ba6916cb2d10f1b27854e32ca5893 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBComparator;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
/**
* A LevelDB implementation of {@link YarnConfigurationStore}.
*/
public class LeveldbConfigurationStore extends YarnConfigurationStore {
public static final Logger LOG =
LoggerFactory.getLogger(LeveldbConfigurationStore.class);
private static final String DB_NAME = "yarn-conf-store";
private static final String LOG_KEY = "log";
private static final String VERSION_KEY = "version";
private static final String CONF_VERSION_NAME = "conf-version-store";
private static final String CONF_VERSION_KEY = "conf-version";
private DB db;
private DB versiondb;
private long maxLogs;
private Configuration conf;
@VisibleForTesting
protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(0, 1);
private Timer compactionTimer;
private long compactionIntervalMsec;
@Override
public void initialize(Configuration config, Configuration schedConf,
RMContext rmContext) throws IOException {
this.conf = config;
try {
initDatabase(schedConf);
this.maxLogs = config.getLong(
YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
this.compactionIntervalMsec = config.getLong(
YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS,
YarnConfiguration
.DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
startCompactionTimer();
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public void format() throws Exception {
close();
FileSystem fs = FileSystem.getLocal(conf);
fs.delete(getStorageDir(DB_NAME), true);
}
private void initDatabase(Configuration config) throws Exception {
Path storeRoot = createStorageDir(DB_NAME);
Options options = new Options();
options.createIfMissing(false);
options.comparator(new DBComparator() {
@Override
public int compare(byte[] key1, byte[] key2) {
String key1Str = new String(key1, StandardCharsets.UTF_8);
String key2Str = new String(key2, StandardCharsets.UTF_8);
if (key1Str.equals(key2Str)) {
return 0;
} else if (key1Str.equals(VERSION_KEY)) {
return 1;
} else if (key2Str.equals(VERSION_KEY)) {
return -1;
} else if (key1Str.equals(LOG_KEY)) {
return 1;
} else if (key2Str.equals(LOG_KEY)) {
return -1;
}
return key1Str.compareTo(key2Str);
}
@Override
public String name() {
return "keyComparator";
}
public byte[] findShortestSeparator(byte[] start, byte[] limit) {
return start;
}
public byte[] findShortSuccessor(byte[] key) {
return key;
}
});
Path confVersion = createStorageDir(CONF_VERSION_NAME);
Options confOptions = new Options();
confOptions.createIfMissing(false);
LOG.info("Using conf version at " + confVersion);
File confVersionFile = new File(confVersion.toString());
try {
versiondb = JniDBFactory.factory.open(confVersionFile, confOptions);
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating conf version at " + confVersionFile);
confOptions.createIfMissing(true);
try {
versiondb = JniDBFactory.factory.open(confVersionFile, confOptions);
versiondb.put(bytes(CONF_VERSION_KEY), bytes(String.valueOf(0)));
} catch (DBException dbErr) {
throw new IOException(dbErr.getMessage(), dbErr);
}
} else {
throw e;
}
}
LOG.info("Using conf database at " + storeRoot);
File dbfile = new File(storeRoot.toString());
try {
db = JniDBFactory.factory.open(dbfile, options);
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating conf database at " + dbfile);
options.createIfMissing(true);
try {
db = JniDBFactory.factory.open(dbfile, options);
// Write the initial scheduler configuration
WriteBatch initBatch = db.createWriteBatch();
for (Map.Entry<String, String> kv : config) {
initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
}
db.write(initBatch);
long configVersion = getConfigVersion() + 1L;
versiondb.put(bytes(CONF_VERSION_KEY),
bytes(String.valueOf(configVersion)));
} catch (DBException dbErr) {
throw new IOException(dbErr.getMessage(), dbErr);
}
} else {
throw e;
}
}
}
private Path createStorageDir(String storageName) throws IOException {
Path root = getStorageDir(storageName);
FileSystem fs = FileSystem.getLocal(conf);
fs.mkdirs(root, new FsPermission((short) 0700));
return root;
}
private Path getStorageDir(String storageName) throws IOException {
String storePath = conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
if (storePath == null) {
throw new IOException("No store location directory configured in " +
YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
}
return new Path(storePath, storageName);
}
@Override
public void close() throws IOException {
if (db != null) {
db.close();
}
if (versiondb != null) {
versiondb.close();
}
}
@Override
public void logMutation(LogMutation logMutation) throws IOException {
if (maxLogs > 0) {
LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY)));
logs.add(logMutation);
if (logs.size() > maxLogs) {
logs.removeFirst();
}
db.put(bytes(LOG_KEY), serLogMutations(logs));
}
}
@Override
public void confirmMutation(LogMutation pendingMutation,
boolean isValid) throws IOException {
WriteBatch updateBatch = db.createWriteBatch();
if (isValid) {
for (Map.Entry<String, String> changes :
pendingMutation.getUpdates().entrySet()) {
if (changes.getValue() == null || changes.getValue().isEmpty()) {
updateBatch.delete(bytes(changes.getKey()));
} else {
updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue()));
}
}
long configVersion = getConfigVersion() + 1L;
versiondb.put(bytes(CONF_VERSION_KEY),
bytes(String.valueOf(configVersion)));
}
db.write(updateBatch);
}
private byte[] serLogMutations(LinkedList<LogMutation> mutations) throws
IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutput oos = new ObjectOutputStream(baos)) {
oos.writeObject(mutations);
oos.flush();
return baos.toByteArray();
}
}
private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws
IOException {
if (mutations == null) {
return new LinkedList<>();
}
try (ObjectInput input = new ObjectInputStream(
new ByteArrayInputStream(mutations))) {
return (LinkedList<LogMutation>) input.readObject();
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
@Override
public synchronized Configuration retrieve() {
DBIterator itr = db.iterator();
itr.seekToFirst();
Configuration config = new Configuration(false);
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
String value = new String(entry.getValue(), StandardCharsets.UTF_8);
if (key.equals(LOG_KEY) || key.equals(VERSION_KEY)) {
break;
}
config.set(key, value);
}
return config;
}
@Override
public long getConfigVersion() {
String version = new String(versiondb.get(bytes(CONF_VERSION_KEY)),
StandardCharsets.UTF_8);
return Long.parseLong(version);
}
@Override
public List<LogMutation> getConfirmedConfHistory(long fromId) {
return null; // unimplemented
}
// TODO below was taken from LeveldbRMStateStore, it can probably be
// refactored
private void startCompactionTimer() {
if (compactionIntervalMsec > 0) {
compactionTimer = new Timer(
this.getClass().getSimpleName() + " compaction timer", true);
compactionTimer.schedule(new CompactionTimerTask(),
compactionIntervalMsec, compactionIntervalMsec);
}
}
// TODO: following is taken from LeveldbRMStateStore
@Override
public Version getConfStoreVersion() throws Exception {
Version version = null;
try {
byte[] data = db.get(bytes(VERSION_KEY));
if (data != null) {
version = new VersionPBImpl(YarnServerCommonProtos.VersionProto
.parseFrom(data));
}
} catch (DBException e) {
throw new IOException(e);
}
return version;
}
@VisibleForTesting
protected LinkedList<LogMutation> getLogs() throws Exception {
return deserLogMutations(db.get(bytes(LOG_KEY)));
}
@VisibleForTesting
protected DB getDB() {
return db;
}
@Override
public void storeVersion() throws Exception {
storeVersion(CURRENT_VERSION_INFO);
}
@VisibleForTesting
protected void storeVersion(Version version) throws Exception {
byte[] data = ((VersionPBImpl) version).getProto()
.toByteArray();
try {
db.put(bytes(VERSION_KEY), data);
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
private class CompactionTimerTask extends TimerTask {
@Override
public void run() {
long start = Time.monotonicNow();
LOG.info("Starting full compaction cycle");
try {
db.compactRange(null, null);
} catch (DBException e) {
LOG.error("Error compacting database", e);
}
long duration = Time.monotonicNow() - start;
LOG.info("Full compaction cycle completed in " + duration + " msec");
}
}
}