blob: be8cfb9900d4d55fb456c0f30a9d0f7091ca8566 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdds.utils.db;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.utils.RocksDBStoreMBean;
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedTransactionLogIterator;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
import org.apache.hadoop.metrics2.util.MBeans;
import com.google.common.base.Preconditions;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.rocksdb.RocksDBException;
import org.rocksdb.TransactionLogIterator.BatchResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* RocksDB Store that supports creating Tables in DB.
*/
public class RDBStore implements DBStore {
private static final Logger LOG =
LoggerFactory.getLogger(RDBStore.class);
private final RocksDatabase db;
private final File dbLocation;
private final CodecRegistry codecRegistry;
private ObjectName statMBeanName;
private final RDBCheckpointManager checkPointManager;
private final String checkpointsParentDir;
private final RDBMetrics rdbMetrics;
private final String dbJmxBeanName;
@VisibleForTesting
public RDBStore(File dbFile, ManagedDBOptions options,
Set<TableConfig> families) throws IOException {
this(dbFile, options, new ManagedWriteOptions(), families,
new CodecRegistry(), false, null);
}
public RDBStore(File dbFile, ManagedDBOptions dbOptions,
ManagedWriteOptions writeOptions, Set<TableConfig> families,
CodecRegistry registry, boolean readOnly,
String dbJmxBeanNameName) throws IOException {
Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
Preconditions.checkNotNull(families);
Preconditions.checkArgument(!families.isEmpty());
codecRegistry = registry;
dbLocation = dbFile;
dbJmxBeanName = dbJmxBeanNameName == null ? dbFile.getName() :
dbJmxBeanNameName;
try {
db = RocksDatabase.open(dbFile, dbOptions, writeOptions,
families, readOnly);
if (dbOptions.statistics() != null) {
Map<String, String> jmxProperties = new HashMap<>();
jmxProperties.put("dbName", dbJmxBeanName);
statMBeanName = HddsUtils.registerWithJmxProperties(
"Ozone", "RocksDbStore", jmxProperties,
RocksDBStoreMBean.create(dbOptions.statistics(), dbJmxBeanName));
if (statMBeanName == null) {
LOG.warn("jmx registration failed during RocksDB init, db path :{}",
dbJmxBeanName);
} else {
LOG.debug("jmx registration succeed during RocksDB init, db path :{}",
dbJmxBeanName);
}
}
//create checkpoints directory if not exists.
checkpointsParentDir =
Paths.get(dbLocation.getParent(), "db.checkpoints").toString();
File checkpointsDir = new File(checkpointsParentDir);
if (!checkpointsDir.exists()) {
boolean success = checkpointsDir.mkdir();
if (!success) {
LOG.warn("Unable to create RocksDB checkpoint directory");
}
}
//Initialize checkpoint manager
checkPointManager = new RDBCheckpointManager(db, dbLocation.getName());
rdbMetrics = RDBMetrics.create();
} catch (IOException e) {
String msg = "Failed init RocksDB, db path : " + dbFile.getAbsolutePath()
+ ", " + "exception :" + (e.getCause() == null ?
e.getClass().getCanonicalName() + " " + e.getMessage() :
e.getCause().getClass().getCanonicalName() + " " +
e.getCause().getMessage());
throw new IOException(msg, e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("RocksDB successfully opened.");
LOG.debug("[Option] dbLocation= {}", dbLocation.getAbsolutePath());
LOG.debug("[Option] createIfMissing = {}", dbOptions.createIfMissing());
LOG.debug("[Option] maxOpenFiles= {}", dbOptions.maxOpenFiles());
}
}
@Override
public void compactDB() throws IOException {
db.compactRange();
}
@Override
public void close() throws IOException {
if (statMBeanName != null) {
MBeans.unregister(statMBeanName);
statMBeanName = null;
}
RDBMetrics.unRegister();
checkPointManager.close();
db.close();
}
@Override
public <K, V> void move(K key, Table<K, V> source,
Table<K, V> dest) throws IOException {
try (BatchOperation batchOperation = initBatchOperation()) {
V value = source.get(key);
dest.putWithBatch(batchOperation, key, value);
source.deleteWithBatch(batchOperation, key);
commitBatchOperation(batchOperation);
}
}
@Override
public <K, V> void move(K key, V value, Table<K, V> source,
Table<K, V> dest) throws IOException {
move(key, key, value, source, dest);
}
@Override
public <K, V> void move(K sourceKey, K destKey, V value,
Table<K, V> source,
Table<K, V> dest) throws IOException {
try (BatchOperation batchOperation = initBatchOperation()) {
dest.putWithBatch(batchOperation, destKey, value);
source.deleteWithBatch(batchOperation, sourceKey);
commitBatchOperation(batchOperation);
}
}
@Override
public long getEstimatedKeyCount() throws IOException {
return db.estimateNumKeys();
}
@Override
public BatchOperation initBatchOperation() {
return new RDBBatchOperation();
}
@Override
public void commitBatchOperation(BatchOperation operation)
throws IOException {
((RDBBatchOperation) operation).commit(db);
}
@VisibleForTesting
protected ObjectName getStatMBeanName() {
return statMBeanName;
}
@Override
public Table<byte[], byte[]> getTable(String name) throws IOException {
final ColumnFamily handle = db.getColumnFamily(name);
if (handle == null) {
throw new IOException("No such table in this DB. TableName : " + name);
}
return new RDBTable(this.db, handle, rdbMetrics);
}
@Override
public <K, V> Table<K, V> getTable(String name,
Class<K> keyType, Class<V> valueType) throws IOException {
return new TypedTable<>(getTable(name), codecRegistry, keyType,
valueType);
}
@Override
public <K, V> Table<K, V> getTable(String name,
Class<K> keyType, Class<V> valueType,
TableCache.CacheType cacheType) throws IOException {
return new TypedTable<>(getTable(name), codecRegistry, keyType,
valueType, cacheType);
}
@Override
public ArrayList<Table> listTables() {
ArrayList<Table> returnList = new ArrayList<>();
for (ColumnFamily family : getColumnFamilies()) {
returnList.add(new RDBTable(db, family, rdbMetrics));
}
return returnList;
}
@Override
public void flushDB() throws IOException {
db.flush();
}
@Override
public void flushLog(boolean sync) throws IOException {
// for RocksDB it is sufficient to flush the WAL as entire db can
// be reconstructed using it.
db.flushWal(sync);
}
@Override
public DBCheckpoint getCheckpoint(boolean flush) throws IOException {
if (flush) {
this.flushDB();
}
return checkPointManager.createCheckpoint(checkpointsParentDir);
}
@Override
public File getDbLocation() {
return dbLocation;
}
@Override
public Map<Integer, String> getTableNames() {
Map<Integer, String> tableNames = new HashMap<>();
StringCodec stringCodec = new StringCodec();
for (ColumnFamily columnFamily : getColumnFamilies()) {
tableNames.put(columnFamily.getID(), columnFamily.getName(stringCodec));
}
return tableNames;
}
public Collection<ColumnFamily> getColumnFamilies() {
return db.getExtraColumnFamilies();
}
@Override
public CodecRegistry getCodecRegistry() {
return codecRegistry;
}
@Override
public DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
throws SequenceNumberNotFoundException {
return getUpdatesSince(sequenceNumber, Long.MAX_VALUE);
}
@Override
public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
throws SequenceNumberNotFoundException {
if (limitCount <= 0) {
throw new IllegalArgumentException("Illegal count for getUpdatesSince.");
}
DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
try (ManagedTransactionLogIterator logIterator =
db.getUpdatesSince(sequenceNumber)) {
// If Recon's sequence number is out-of-date and the iterator is invalid,
// throw SNNFE and let Recon fall back to full snapshot.
// This could happen after OM restart.
if (db.getLatestSequenceNumber() != sequenceNumber &&
!logIterator.get().isValid()) {
throw new SequenceNumberNotFoundException(
"Invalid transaction log iterator when getting updates since "
+ "sequence number " + sequenceNumber);
}
// Only the first record needs to be checked if its seq number <
// ( 1 + passed_in_sequence_number). For example, if seqNumber passed
// in is 100, then we can read from the WAL ONLY if the first sequence
// number is <= 101. If it is 102, then 101 may already be flushed to
// SST. If it 99, we can skip 99 and 100, and then read from 101.
boolean checkValidStartingSeqNumber = true;
while (logIterator.get().isValid()) {
BatchResult result = logIterator.get().getBatch();
try {
long currSequenceNumber = result.sequenceNumber();
if (checkValidStartingSeqNumber &&
currSequenceNumber > 1 + sequenceNumber) {
throw new SequenceNumberNotFoundException("Unable to read data from"
+ " RocksDB wal to get delta updates. It may have already been"
+ " flushed to SSTs.");
}
// If the above condition was not satisfied, then it is OK to reset
// the flag.
checkValidStartingSeqNumber = false;
if (currSequenceNumber <= sequenceNumber) {
logIterator.get().next();
continue;
}
dbUpdatesWrapper.addWriteBatch(result.writeBatch().data(),
result.sequenceNumber());
if (currSequenceNumber - sequenceNumber >= limitCount) {
break;
}
} finally {
result.writeBatch().close();
}
logIterator.get().next();
}
} catch (SequenceNumberNotFoundException e) {
LOG.warn("Unable to get delta updates since sequenceNumber {}. "
+ "This exception will be thrown to the client",
sequenceNumber, e);
// Throw the exception back to Recon. Expect Recon to fall back to
// full snapshot.
throw e;
} catch (RocksDBException | IOException e) {
LOG.error("Unable to get delta updates since sequenceNumber {}. "
+ "This exception will not be thrown to the client ",
sequenceNumber, e);
}
dbUpdatesWrapper.setLatestSequenceNumber(db.getLatestSequenceNumber());
return dbUpdatesWrapper;
}
@Override
public boolean isClosed() {
return db.isClosed();
}
@VisibleForTesting
public RocksDatabase getDb() {
return db;
}
public String getProperty(String property) throws IOException {
return db.getProperty(property);
}
public String getProperty(ColumnFamily family, String property)
throws IOException {
return db.getProperty(family, property);
}
public RDBMetrics getMetrics() {
return rdbMetrics;
}
}