blob: 7dd1bde1b7795adc9ab4558a1fdb80e8a11f5ed2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.utils;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.ratis.thirdparty.com.google.common.annotations.
VisibleForTesting;
import org.rocksdb.DbPath;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* RocksDB implementation of ozone metadata store.
*/
public class RocksDBStore implements MetadataStore {
private static final Logger LOG =
LoggerFactory.getLogger(RocksDBStore.class);
private RocksDB db = null;
private File dbLocation;
private WriteOptions writeOptions;
private Options dbOptions;
private ObjectName statMBeanName;
public RocksDBStore(File dbFile, Options options)
throws IOException {
Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
RocksDB.loadLibrary();
dbOptions = options;
dbLocation = dbFile;
writeOptions = new WriteOptions();
try {
db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath());
if (dbOptions.statistics() != null) {
Map<String, String> jmxProperties = new HashMap<String, String>();
jmxProperties.put("dbName", dbFile.getName());
statMBeanName = HddsUtils.registerWithJmxProperties(
"Ozone", "RocksDbStore", jmxProperties,
RocksDBStoreMBean.create(dbOptions.statistics(),
dbFile.getName()));
if (statMBeanName == null) {
LOG.warn("jmx registration failed during RocksDB init, db path :{}",
dbFile.getAbsolutePath());
}
}
} catch (RocksDBException e) {
throw new IOException(
"Failed init RocksDB, db path : " + dbFile.getAbsolutePath(), e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("RocksDB successfully opened.");
LOG.debug("[Option] dbLocation= {}", dbLocation.getAbsolutePath());
LOG.debug("[Option] createIfMissing = {}", options.createIfMissing());
LOG.debug("[Option] compactionPriority= {}", options.compactionStyle());
LOG.debug("[Option] compressionType= {}", options.compressionType());
LOG.debug("[Option] maxOpenFiles= {}", options.maxOpenFiles());
LOG.debug("[Option] writeBufferSize= {}", options.writeBufferSize());
}
}
public static IOException toIOException(String msg, RocksDBException e) {
String statusCode = e.getStatus() == null ? "N/A" :
e.getStatus().getCodeString();
String errMessage = e.getMessage() == null ? "Unknown error" :
e.getMessage();
String output = msg + "; status : " + statusCode
+ "; message : " + errMessage;
return new IOException(output, e);
}
@Override
public void put(byte[] key, byte[] value) throws IOException {
try {
db.put(writeOptions, key, value);
} catch (RocksDBException e) {
throw toIOException("Failed to put key-value to metadata store", e);
}
}
@Override
public boolean isEmpty() throws IOException {
RocksIterator it = null;
try {
it = db.newIterator();
it.seekToFirst();
return !it.isValid();
} finally {
if (it != null) {
it.close();
}
}
}
@Override
public byte[] get(byte[] key) throws IOException {
try {
return db.get(key);
} catch (RocksDBException e) {
throw toIOException("Failed to get the value for the given key", e);
}
}
@Override
public void delete(byte[] key) throws IOException {
try {
db.delete(key);
} catch (RocksDBException e) {
throw toIOException("Failed to delete the given key", e);
}
}
@Override
public List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
int count, MetadataKeyFilters.MetadataKeyFilter... filters)
throws IOException, IllegalArgumentException {
return getRangeKVs(startKey, count, false, filters);
}
@Override
public List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey,
int count, MetadataKeyFilters.MetadataKeyFilter... filters)
throws IOException, IllegalArgumentException {
return getRangeKVs(startKey, count, true, filters);
}
private List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
int count, boolean sequential,
MetadataKeyFilters.MetadataKeyFilter... filters)
throws IOException, IllegalArgumentException {
List<Map.Entry<byte[], byte[]>> result = new ArrayList<>();
long start = System.currentTimeMillis();
if (count < 0) {
throw new IllegalArgumentException(
"Invalid count given " + count + ", count must be greater than 0");
}
RocksIterator it = null;
try {
it = db.newIterator();
if (startKey == null) {
it.seekToFirst();
} else {
if(get(startKey) == null) {
// Key not found, return empty list
return result;
}
it.seek(startKey);
}
while(it.isValid() && result.size() < count) {
byte[] currentKey = it.key();
byte[] currentValue = it.value();
it.prev();
final byte[] prevKey = it.isValid() ? it.key() : null;
it.seek(currentKey);
it.next();
final byte[] nextKey = it.isValid() ? it.key() : null;
if (filters == null) {
result.add(new AbstractMap.SimpleImmutableEntry<>(currentKey,
currentValue));
} else {
if (Arrays.asList(filters).stream()
.allMatch(entry -> entry.filterKey(prevKey,
currentKey, nextKey))) {
result.add(new AbstractMap.SimpleImmutableEntry<>(currentKey,
currentValue));
} else {
if (result.size() > 0 && sequential) {
// if the caller asks for a sequential range of results,
// and we met a dis-match, abort iteration from here.
// if result is empty, we continue to look for the first match.
break;
}
}
}
}
} finally {
if (it != null) {
it.close();
}
long end = System.currentTimeMillis();
long timeConsumed = end - start;
if (LOG.isDebugEnabled()) {
if (filters != null) {
for (MetadataKeyFilters.MetadataKeyFilter filter : filters) {
int scanned = filter.getKeysScannedNum();
int hinted = filter.getKeysHintedNum();
if (scanned > 0 || hinted > 0) {
LOG.debug(
"getRangeKVs ({}) numOfKeysScanned={}, numOfKeysHinted={}",
filter.getClass().getSimpleName(), filter.getKeysScannedNum(),
filter.getKeysHintedNum());
}
}
}
LOG.debug("Time consumed for getRangeKVs() is {}ms,"
+ " result length is {}.", timeConsumed, result.size());
}
}
return result;
}
@Override
public void writeBatch(BatchOperation operation)
throws IOException {
List<BatchOperation.SingleOperation> operations =
operation.getOperations();
if (!operations.isEmpty()) {
try (WriteBatch writeBatch = new WriteBatch()) {
for (BatchOperation.SingleOperation opt : operations) {
switch (opt.getOpt()) {
case DELETE:
writeBatch.delete(opt.getKey());
break;
case PUT:
writeBatch.put(opt.getKey(), opt.getValue());
break;
default:
throw new IllegalArgumentException("Invalid operation "
+ opt.getOpt());
}
}
db.write(writeOptions, writeBatch);
} catch (RocksDBException e) {
throw toIOException("Batch write operation failed", e);
}
}
}
@Override
public void compactDB() throws IOException {
if (db != null) {
try {
db.compactRange();
} catch (RocksDBException e) {
throw toIOException("Failed to compact db", e);
}
}
}
@Override
public void flushDB(boolean sync) throws IOException {
if (db != null) {
try {
// for RocksDB it is sufficient to flush the WAL as entire db can
// be reconstructed using it.
db.flushWal(sync);
} catch (RocksDBException e) {
throw toIOException("Failed to flush db", e);
}
}
}
private void deleteQuietly(File fileOrDir) {
if (fileOrDir != null && fileOrDir.exists()) {
try {
FileUtils.forceDelete(fileOrDir);
} catch (IOException e) {
LOG.warn("Failed to delete dir {}", fileOrDir.getAbsolutePath(), e);
}
}
}
@Override
public void destroy() throws IOException {
// Make sure db is closed.
close();
// There is no destroydb java API available,
// equivalently we can delete all db directories.
deleteQuietly(dbLocation);
deleteQuietly(new File(dbOptions.dbLogDir()));
deleteQuietly(new File(dbOptions.walDir()));
List<DbPath> dbPaths = dbOptions.dbPaths();
if (dbPaths != null) {
dbPaths.forEach(dbPath -> {
deleteQuietly(new File(dbPath.toString()));
});
}
}
@Override
public ImmutablePair<byte[], byte[]> peekAround(int offset,
byte[] from) throws IOException, IllegalArgumentException {
RocksIterator it = null;
try {
it = db.newIterator();
if (from == null) {
it.seekToFirst();
} else {
it.seek(from);
}
if (!it.isValid()) {
return null;
}
switch (offset) {
case 0:
break;
case 1:
it.next();
break;
case -1:
it.prev();
break;
default:
throw new IllegalArgumentException(
"Position can only be -1, 0 " + "or 1, but found " + offset);
}
return it.isValid() ? new ImmutablePair<>(it.key(), it.value()) : null;
} finally {
if (it != null) {
it.close();
}
}
}
@Override
public void iterate(byte[] from, EntryConsumer consumer)
throws IOException {
RocksIterator it = null;
try {
it = db.newIterator();
if (from != null) {
it.seek(from);
} else {
it.seekToFirst();
}
while (it.isValid()) {
if (!consumer.consume(it.key(), it.value())) {
break;
}
it.next();
}
} finally {
if (it != null) {
it.close();
}
}
}
@Override
public void close() throws IOException {
if (statMBeanName != null) {
MBeans.unregister(statMBeanName);
statMBeanName = null;
}
if (db != null) {
db.close();
}
}
@VisibleForTesting
protected ObjectName getStatMBeanName() {
return statMBeanName;
}
@Override
public MetaStoreIterator<KeyValue> iterator() {
return new RocksDBStoreIterator(db.newIterator());
}
}