blob: 53bd424642afa888ab2c2d9cc7dd0d75f9e05b7d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdds.utils.db;
import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_CHECKPOINTS_DIR_NAME;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.utils.RocksDBStoreMBean;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.metrics2.util.MBeans;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.utils.db.cache.TableCacheImpl;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.TransactionLogIterator;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* RocksDB Store that supports creating Tables in DB.
*/
public class RDBStore implements DBStore {
private static final Logger LOG =
LoggerFactory.getLogger(RDBStore.class);
private RocksDB db;
private File dbLocation;
private final WriteOptions writeOptions;
private final DBOptions dbOptions;
private final CodecRegistry codecRegistry;
private final Hashtable<String, ColumnFamilyHandle> handleTable;
private ObjectName statMBeanName;
private RDBCheckpointManager checkPointManager;
private String checkpointsParentDir;
private List<ColumnFamilyHandle> columnFamilyHandles;
@VisibleForTesting
public RDBStore(File dbFile, DBOptions options,
Set<TableConfig> families) throws IOException {
this(dbFile, options, families, new CodecRegistry());
}
public RDBStore(File dbFile, DBOptions options, Set<TableConfig> families,
CodecRegistry registry)
throws IOException {
Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
Preconditions.checkNotNull(families);
Preconditions.checkArgument(families.size() > 0);
handleTable = new Hashtable<>();
codecRegistry = registry;
final List<ColumnFamilyDescriptor> columnFamilyDescriptors =
new ArrayList<>();
columnFamilyHandles = new ArrayList<>();
for (TableConfig family : families) {
columnFamilyDescriptors.add(family.getDescriptor());
}
dbOptions = options;
dbLocation = dbFile;
// TODO: Read from the next Config.
writeOptions = new WriteOptions();
try {
db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles);
for (int x = 0; x < columnFamilyHandles.size(); x++) {
handleTable.put(
DFSUtil.bytes2String(columnFamilyHandles.get(x).getName()),
columnFamilyHandles.get(x));
}
if (dbOptions.statistics() != null) {
Map<String, String> jmxProperties = new HashMap<>();
jmxProperties.put("dbName", dbFile.getName());
statMBeanName = HddsUtils.registerWithJmxProperties(
"Ozone", "RocksDbStore", jmxProperties,
RocksDBStoreMBean.create(dbOptions.statistics(),
dbFile.getName()));
if (statMBeanName == null) {
LOG.warn("jmx registration failed during RocksDB init, db path :{}",
dbFile.getAbsolutePath());
}
}
//create checkpoints directory if not exists.
checkpointsParentDir = Paths.get(dbLocation.getParent(),
OM_DB_CHECKPOINTS_DIR_NAME).toString();
File checkpointsDir = new File(checkpointsParentDir);
if (!checkpointsDir.exists()) {
boolean success = checkpointsDir.mkdir();
if (!success) {
LOG.warn("Unable to create RocksDB checkpoint directory");
}
}
//Initialize checkpoint manager
checkPointManager = new RDBCheckpointManager(db, "om");
} catch (RocksDBException e) {
throw toIOException(
"Failed init RocksDB, db path : " + dbFile.getAbsolutePath(), e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("RocksDB successfully opened.");
LOG.debug("[Option] dbLocation= {}", dbLocation.getAbsolutePath());
LOG.debug("[Option] createIfMissing = {}", options.createIfMissing());
LOG.debug("[Option] maxOpenFiles= {}", options.maxOpenFiles());
}
}
public static IOException toIOException(String msg, RocksDBException e) {
String statusCode = e.getStatus() == null ? "N/A" :
e.getStatus().getCodeString();
String errMessage = e.getMessage() == null ? "Unknown error" :
e.getMessage();
String output = msg + "; status : " + statusCode
+ "; message : " + errMessage;
return new IOException(output, e);
}
@Override
public void compactDB() throws IOException {
if (db != null) {
try {
db.compactRange();
} catch (RocksDBException e) {
throw toIOException("Failed to compact db", e);
}
}
}
@Override
public void close() throws IOException {
for (final ColumnFamilyHandle handle : handleTable.values()) {
handle.close();
}
if (statMBeanName != null) {
MBeans.unregister(statMBeanName);
statMBeanName = null;
}
if (db != null) {
db.close();
}
if (dbOptions != null) {
dbOptions.close();
}
if (writeOptions != null) {
writeOptions.close();
}
}
@Override
public <KEY, VALUE> void move(KEY key, Table<KEY, VALUE> source,
Table<KEY, VALUE> dest) throws IOException {
try (BatchOperation batchOperation = initBatchOperation()) {
VALUE value = source.get(key);
dest.putWithBatch(batchOperation, key, value);
source.deleteWithBatch(batchOperation, key);
commitBatchOperation(batchOperation);
}
}
@Override
public <KEY, VALUE> void move(KEY key, VALUE value, Table<KEY, VALUE> source,
Table<KEY, VALUE> dest) throws IOException {
move(key, key, value, source, dest);
}
@Override
public <KEY, VALUE> void move(KEY sourceKey, KEY destKey, VALUE value,
Table<KEY, VALUE> source,
Table<KEY, VALUE> dest) throws IOException {
try (BatchOperation batchOperation = initBatchOperation()) {
dest.putWithBatch(batchOperation, destKey, value);
source.deleteWithBatch(batchOperation, sourceKey);
commitBatchOperation(batchOperation);
}
}
@Override
public long getEstimatedKeyCount() throws IOException {
try {
return db.getLongProperty("rocksdb.estimate-num-keys");
} catch (RocksDBException e) {
throw toIOException("Unable to get the estimated count.", e);
}
}
@Override
public BatchOperation initBatchOperation() {
return new RDBBatchOperation();
}
@Override
public void commitBatchOperation(BatchOperation operation)
throws IOException {
((RDBBatchOperation) operation).commit(db, writeOptions);
}
@VisibleForTesting
protected ObjectName getStatMBeanName() {
return statMBeanName;
}
@Override
public Table<byte[], byte[]> getTable(String name) throws IOException {
ColumnFamilyHandle handle = handleTable.get(name);
if (handle == null) {
throw new IOException("No such table in this DB. TableName : " + name);
}
return new RDBTable(this.db, handle, this.writeOptions);
}
@Override
public <KEY, VALUE> Table<KEY, VALUE> getTable(String name,
Class<KEY> keyType, Class<VALUE> valueType) throws IOException {
return new TypedTable<KEY, VALUE>(getTable(name), codecRegistry, keyType,
valueType);
}
@Override
public <KEY, VALUE> Table<KEY, VALUE> getTable(String name,
Class<KEY> keyType, Class<VALUE> valueType,
TableCacheImpl.CacheCleanupPolicy cleanupPolicy) throws IOException {
return new TypedTable<KEY, VALUE>(getTable(name), codecRegistry, keyType,
valueType, cleanupPolicy);
}
@Override
public ArrayList<Table> listTables() throws IOException {
ArrayList<Table> returnList = new ArrayList<>();
for (ColumnFamilyHandle handle : handleTable.values()) {
returnList.add(new RDBTable(db, handle, writeOptions));
}
return returnList;
}
@Override
public void flush() throws IOException {
final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true);
try {
db.flush(flushOptions);
} catch (RocksDBException e) {
LOG.error("Unable to Flush RocksDB data", e);
throw toIOException("Unable to Flush RocksDB data", e);
}
}
@Override
public DBCheckpoint getCheckpoint(boolean flush) {
final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(flush);
try {
db.flush(flushOptions);
} catch (RocksDBException e) {
LOG.error("Unable to Flush RocksDB data before creating snapshot", e);
}
return checkPointManager.createCheckpoint(checkpointsParentDir);
}
@Override
public File getDbLocation() {
return dbLocation;
}
@Override
public Map<Integer, String> getTableNames() {
Map<Integer, String> tableNames = new HashMap<>();
StringCodec stringCodec = new StringCodec();
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
try {
tableNames.put(columnFamilyHandle.getID(), stringCodec
.fromPersistedFormat(columnFamilyHandle.getName()));
} catch (RocksDBException | IOException e) {
LOG.error("Unexpected exception while reading column family handle " +
"name", e);
}
}
return tableNames;
}
@Override
public CodecRegistry getCodecRegistry() {
return codecRegistry;
}
@Override
public DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
throws SequenceNumberNotFoundException {
DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
try {
TransactionLogIterator transactionLogIterator =
db.getUpdatesSince(sequenceNumber);
// Only the first record needs to be checked if its seq number <
// ( 1 + passed_in_sequence_number). For example, if seqNumber passed
// in is 100, then we can read from the WAL ONLY if the first sequence
// number is <= 101. If it is 102, then 101 may already be flushed to
// SST. If it 99, we can skip 99 and 100, and then read from 101.
boolean checkValidStartingSeqNumber = true;
while (transactionLogIterator.isValid()) {
TransactionLogIterator.BatchResult result =
transactionLogIterator.getBatch();
long currSequenceNumber = result.sequenceNumber();
if (checkValidStartingSeqNumber &&
currSequenceNumber > 1 + sequenceNumber) {
throw new SequenceNumberNotFoundException("Unable to read data from" +
" RocksDB wal to get delta updates. It may have already been" +
"flushed to SSTs.");
}
// If the above condition was not satisfied, then it is OK to reset
// the flag.
checkValidStartingSeqNumber = false;
if (currSequenceNumber <= sequenceNumber) {
transactionLogIterator.next();
continue;
}
dbUpdatesWrapper.addWriteBatch(result.writeBatch().data(),
result.sequenceNumber());
transactionLogIterator.next();
}
} catch (RocksDBException e) {
LOG.error("Unable to get delta updates since sequenceNumber {} ",
sequenceNumber, e);
}
return dbUpdatesWrapper;
}
@VisibleForTesting
public RocksDB getDb() {
return db;
}
}