blob: 252363c4e040f88fae3aeef93a7227a703353e90 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdds.utils.db;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.utils.RocksDBStoreMBean;
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
import org.apache.hadoop.metrics2.util.MBeans;
import com.google.common.base.Preconditions;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.TransactionLogIterator;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* RocksDB Store that supports creating Tables in DB.
*/
public class RDBStore implements DBStore {
private static final Logger LOG =
LoggerFactory.getLogger(RDBStore.class);
private RocksDB db;
private File dbLocation;
private final WriteOptions writeOptions;
private final DBOptions dbOptions;
private final CodecRegistry codecRegistry;
private final Map<String, ColumnFamilyHandle> handleTable;
private ObjectName statMBeanName;
private RDBCheckpointManager checkPointManager;
private String checkpointsParentDir;
private List<ColumnFamilyHandle> columnFamilyHandles;
private RDBMetrics rdbMetrics;
@VisibleForTesting
public RDBStore(File dbFile, DBOptions options,
Set<TableConfig> families) throws IOException {
this(dbFile, options, new WriteOptions(), families, new CodecRegistry(),
false);
}
public RDBStore(File dbFile, DBOptions options,
WriteOptions writeOptions, Set<TableConfig> families,
CodecRegistry registry, boolean readOnly)
throws IOException {
Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
Preconditions.checkNotNull(families);
Preconditions.checkArgument(!families.isEmpty());
handleTable = new HashMap<>();
codecRegistry = registry;
final List<ColumnFamilyDescriptor> columnFamilyDescriptors =
new ArrayList<>();
columnFamilyHandles = new ArrayList<>();
for (TableConfig family : families) {
columnFamilyDescriptors.add(family.getDescriptor());
}
dbOptions = options;
dbLocation = dbFile;
this.writeOptions = writeOptions;
try {
// This logic has been added to support old column families that have
// been removed, or those that may have been created in a future version.
// TODO : Revisit this logic during upgrade implementation.
List<TableConfig> columnFamiliesInDb = getColumnFamiliesInExistingDb();
List<TableConfig> extraCf = columnFamiliesInDb.stream().filter(
cf -> !families.contains(cf)).collect(Collectors.toList());
if (!extraCf.isEmpty()) {
LOG.info("Found the following extra column families in existing DB : " +
"{}", extraCf);
extraCf.forEach(cf -> columnFamilyDescriptors.add(cf.getDescriptor()));
}
if (readOnly) {
db = RocksDB.openReadOnly(dbOptions, dbLocation.getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles);
} else {
db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles);
}
for (int x = 0; x < columnFamilyHandles.size(); x++) {
handleTable.put(
StringUtils.bytes2String(columnFamilyHandles.get(x).getName()),
columnFamilyHandles.get(x));
}
if (dbOptions.statistics() != null) {
Map<String, String> jmxProperties = new HashMap<>();
jmxProperties.put("dbName", dbFile.getName());
statMBeanName = HddsUtils.registerWithJmxProperties(
"Ozone", "RocksDbStore", jmxProperties,
RocksDBStoreMBean.create(dbOptions.statistics(),
dbFile.getName()));
if (statMBeanName == null) {
LOG.warn("jmx registration failed during RocksDB init, db path :{}",
dbFile.getAbsolutePath());
}
}
//create checkpoints directory if not exists.
checkpointsParentDir =
Paths.get(dbLocation.getParent(), "db.checkpoints").toString();
File checkpointsDir = new File(checkpointsParentDir);
if (!checkpointsDir.exists()) {
boolean success = checkpointsDir.mkdir();
if (!success) {
LOG.warn("Unable to create RocksDB checkpoint directory");
}
}
//Initialize checkpoint manager
checkPointManager = new RDBCheckpointManager(db, "rdb");
rdbMetrics = RDBMetrics.create();
} catch (RocksDBException e) {
String msg = "Failed init RocksDB, db path : " + dbFile.getAbsolutePath()
+ ", " + "exception :" + (e.getCause() == null ?
e.getClass().getCanonicalName() + " " + e.getMessage() :
e.getCause().getClass().getCanonicalName() + " " +
e.getCause().getMessage());
throw toIOException(msg, e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("RocksDB successfully opened.");
LOG.debug("[Option] dbLocation= {}", dbLocation.getAbsolutePath());
LOG.debug("[Option] createIfMissing = {}", options.createIfMissing());
LOG.debug("[Option] maxOpenFiles= {}", options.maxOpenFiles());
}
}
/**
* Read DB and return existing column families.
* @return List of column families
* @throws RocksDBException on Error.
*/
private List<TableConfig> getColumnFamiliesInExistingDb()
throws RocksDBException {
List<byte[]> bytes = RocksDB.listColumnFamilies(new Options(),
dbLocation.getAbsolutePath());
List<TableConfig> columnFamiliesInDb = bytes.stream()
.map(cfbytes -> new TableConfig(StringUtils.bytes2String(cfbytes),
DBStoreBuilder.HDDS_DEFAULT_DB_PROFILE.getColumnFamilyOptions()))
.collect(Collectors.toList());
if (LOG.isDebugEnabled()) {
LOG.debug("Found column Families in DB : {}",
columnFamiliesInDb);
}
return columnFamiliesInDb;
}
public static IOException toIOException(String msg, RocksDBException e) {
String statusCode = e.getStatus() == null ? "N/A" :
e.getStatus().getCodeString();
String errMessage = e.getMessage() == null ? "Unknown error" :
e.getMessage();
String output = msg + "; status : " + statusCode
+ "; message : " + errMessage;
return new IOException(output, e);
}
@Override
public void compactDB() throws IOException {
if (db != null) {
try {
db.compactRange();
} catch (RocksDBException e) {
throw toIOException("Failed to compact db", e);
}
}
}
@Override
public void close() throws IOException {
for (final ColumnFamilyHandle handle : handleTable.values()) {
handle.close();
}
if (statMBeanName != null) {
MBeans.unregister(statMBeanName);
statMBeanName = null;
}
RDBMetrics.unRegister();
if (db != null) {
db.close();
}
if (dbOptions != null) {
dbOptions.close();
}
if (writeOptions != null) {
writeOptions.close();
}
}
@Override
public <K, V> void move(K key, Table<K, V> source,
Table<K, V> dest) throws IOException {
try (BatchOperation batchOperation = initBatchOperation()) {
V value = source.get(key);
dest.putWithBatch(batchOperation, key, value);
source.deleteWithBatch(batchOperation, key);
commitBatchOperation(batchOperation);
}
}
@Override
public <K, V> void move(K key, V value, Table<K, V> source,
Table<K, V> dest) throws IOException {
move(key, key, value, source, dest);
}
@Override
public <K, V> void move(K sourceKey, K destKey, V value,
Table<K, V> source,
Table<K, V> dest) throws IOException {
try (BatchOperation batchOperation = initBatchOperation()) {
dest.putWithBatch(batchOperation, destKey, value);
source.deleteWithBatch(batchOperation, sourceKey);
commitBatchOperation(batchOperation);
}
}
@Override
public long getEstimatedKeyCount() throws IOException {
try {
return db.getLongProperty("rocksdb.estimate-num-keys");
} catch (RocksDBException e) {
throw toIOException("Unable to get the estimated count.", e);
}
}
@Override
public BatchOperation initBatchOperation() {
return new RDBBatchOperation();
}
@Override
public void commitBatchOperation(BatchOperation operation)
throws IOException {
((RDBBatchOperation) operation).commit(db, writeOptions);
}
@VisibleForTesting
protected ObjectName getStatMBeanName() {
return statMBeanName;
}
@Override
public Table<byte[], byte[]> getTable(String name) throws IOException {
ColumnFamilyHandle handle = handleTable.get(name);
if (handle == null) {
throw new IOException("No such table in this DB. TableName : " + name);
}
return new RDBTable(this.db, handle, this.writeOptions, rdbMetrics);
}
@Override
public <K, V> Table<K, V> getTable(String name,
Class<K> keyType, Class<V> valueType) throws IOException {
return new TypedTable<>(getTable(name), codecRegistry, keyType,
valueType);
}
@Override
public <K, V> Table<K, V> getTable(String name,
Class<K> keyType, Class<V> valueType,
TableCache.CacheType cacheType) throws IOException {
return new TypedTable<>(getTable(name), codecRegistry, keyType,
valueType, cacheType);
}
@Override
public ArrayList<Table> listTables() {
ArrayList<Table> returnList = new ArrayList<>();
for (ColumnFamilyHandle handle : handleTable.values()) {
returnList.add(new RDBTable(db, handle, writeOptions, rdbMetrics));
}
return returnList;
}
@Override
public void flushDB() throws IOException {
try (FlushOptions flushOptions = new FlushOptions()) {
flushOptions.setWaitForFlush(true);
db.flush(flushOptions);
} catch (RocksDBException e) {
throw toIOException("Unable to Flush RocksDB data", e);
}
}
@Override
public void flushLog(boolean sync) throws IOException {
if (db != null) {
try {
// for RocksDB it is sufficient to flush the WAL as entire db can
// be reconstructed using it.
db.flushWal(sync);
} catch (RocksDBException e) {
throw toIOException("Failed to flush db", e);
}
}
}
@Override
public DBCheckpoint getCheckpoint(boolean flush) throws IOException {
if (flush) {
this.flushDB();
}
return checkPointManager.createCheckpoint(checkpointsParentDir);
}
@Override
public File getDbLocation() {
return dbLocation;
}
@Override
public Map<Integer, String> getTableNames() {
Map<Integer, String> tableNames = new HashMap<>();
StringCodec stringCodec = new StringCodec();
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
try {
tableNames.put(columnFamilyHandle.getID(), stringCodec
.fromPersistedFormat(columnFamilyHandle.getName()));
} catch (RocksDBException | IOException e) {
LOG.error("Unexpected exception while reading column family handle " +
"name", e);
}
}
return tableNames;
}
@Override
public CodecRegistry getCodecRegistry() {
return codecRegistry;
}
@Override
public DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
throws SequenceNumberNotFoundException {
DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
try {
TransactionLogIterator transactionLogIterator =
db.getUpdatesSince(sequenceNumber);
// Only the first record needs to be checked if its seq number <
// ( 1 + passed_in_sequence_number). For example, if seqNumber passed
// in is 100, then we can read from the WAL ONLY if the first sequence
// number is <= 101. If it is 102, then 101 may already be flushed to
// SST. If it 99, we can skip 99 and 100, and then read from 101.
boolean checkValidStartingSeqNumber = true;
while (transactionLogIterator.isValid()) {
TransactionLogIterator.BatchResult result =
transactionLogIterator.getBatch();
long currSequenceNumber = result.sequenceNumber();
if (checkValidStartingSeqNumber &&
currSequenceNumber > 1 + sequenceNumber) {
throw new SequenceNumberNotFoundException("Unable to read data from" +
" RocksDB wal to get delta updates. It may have already been" +
"flushed to SSTs.");
}
// If the above condition was not satisfied, then it is OK to reset
// the flag.
checkValidStartingSeqNumber = false;
if (currSequenceNumber <= sequenceNumber) {
transactionLogIterator.next();
continue;
}
dbUpdatesWrapper.addWriteBatch(result.writeBatch().data(),
result.sequenceNumber());
transactionLogIterator.next();
}
} catch (RocksDBException e) {
LOG.error("Unable to get delta updates since sequenceNumber {} ",
sequenceNumber, e);
}
return dbUpdatesWrapper;
}
@VisibleForTesting
public RocksDB getDb() {
return db;
}
public RDBMetrics getMetrics() {
return rdbMetrics;
}
}