blob: 1b0ae33224da93b111311518612b2861ef8b763b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.metadata;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
import org.apache.hadoop.hdds.utils.db.DBProfile;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedStatistics;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.utils.db.DatanodeDBProfile;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.StatsLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.NoSuchElementException;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
import static org.apache.hadoop.hdds.utils.db.DBStoreBuilder.HDDS_DEFAULT_DB_PROFILE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF;
/**
* Implementation of the {@link DatanodeStore} interface that contains
* functionality common to all more derived datanode store implementations.
*/
public abstract class AbstractDatanodeStore implements DatanodeStore {
private Table<String, Long> metadataTable;
private Table<String, BlockData> blockDataTable;
private Table<String, BlockData> blockDataTableWithIterator;
private Table<String, ChunkInfoList> deletedBlocksTable;
private static final Logger LOG =
LoggerFactory.getLogger(AbstractDatanodeStore.class);
private volatile DBStore store;
private final AbstractDatanodeDBDefinition dbDef;
private final ManagedColumnFamilyOptions cfOptions;
private static DatanodeDBProfile dbProfile;
private final boolean openReadOnly;
/**
* Constructs the metadata store and starts the DB services.
*
* @param config - Ozone Configuration.
* @throws IOException - on Failure.
*/
protected AbstractDatanodeStore(ConfigurationSource config,
AbstractDatanodeDBDefinition dbDef, boolean openReadOnly)
throws IOException {
dbProfile = DatanodeDBProfile
.getProfile(config.getEnum(HDDS_DB_PROFILE, HDDS_DEFAULT_DB_PROFILE));
// The same config instance is used on each datanode, so we can share the
// corresponding column family options, providing a single shared cache
// for all containers on a datanode.
cfOptions = dbProfile.getColumnFamilyOptions(config);
this.dbDef = dbDef;
this.openReadOnly = openReadOnly;
start(config);
}
@Override
public void start(ConfigurationSource config)
throws IOException {
if (this.store == null) {
ManagedDBOptions options = dbProfile.getDBOptions();
options.setCreateIfMissing(true);
options.setCreateMissingColumnFamilies(true);
if (this.dbDef instanceof DatanodeSchemaOneDBDefinition ||
this.dbDef instanceof DatanodeSchemaTwoDBDefinition) {
long maxWalSize = DBProfile.toLong(StorageUnit.MB.toBytes(2));
options.setMaxTotalWalSize(maxWalSize);
}
String rocksDbStat = config.getTrimmed(
OZONE_METADATA_STORE_ROCKSDB_STATISTICS,
OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT);
if (!rocksDbStat.equals(OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF)) {
ManagedStatistics statistics = new ManagedStatistics();
statistics.setStatsLevel(StatsLevel.valueOf(rocksDbStat));
options.setStatistics(statistics);
}
if (this.dbDef instanceof DatanodeSchemaThreeDBDefinition) {
DatanodeConfiguration dc =
config.getObject(DatanodeConfiguration.class);
// Config user log files
InfoLogLevel level = InfoLogLevel.valueOf(
dc.getRocksdbLogLevel() + "_LEVEL");
options.setInfoLogLevel(level);
options.setMaxLogFileSize(dc.getRocksdbMaxFileSize());
options.setKeepLogFileNum(dc.getRocksdbMaxFileNum());
options.setDeleteObsoleteFilesPeriodMicros(
dc.getRocksdbDeleteObsoleteFilesPeriod());
// For V3, all Rocksdb dir has the same "container.db" name. So use
// parentDirName(storage UUID)-dbDirName as db metrics name
this.store = DBStoreBuilder.newBuilder(config, dbDef)
.setDBOptions(options)
.setDefaultCFOptions(cfOptions)
.setOpenReadOnly(openReadOnly)
.setDBJmxBeanNameName(dbDef.getDBLocation(config).getName() + "-" +
dbDef.getName())
.build();
} else {
this.store = DBStoreBuilder.newBuilder(config, dbDef)
.setDBOptions(options)
.setDefaultCFOptions(cfOptions)
.setOpenReadOnly(openReadOnly)
.build();
}
// Use the DatanodeTable wrapper to disable the table iterator on
// existing Table implementations retrieved from the DBDefinition.
// See the DatanodeTable's Javadoc for an explanation of why this is
// necessary.
metadataTable = new DatanodeTable<>(
dbDef.getMetadataColumnFamily().getTable(this.store));
checkTableStatus(metadataTable, metadataTable.getName());
// The block iterator this class returns will need to use the table
// iterator internally, so construct a block data table instance
// that does not have the iterator disabled by DatanodeTable.
blockDataTableWithIterator =
dbDef.getBlockDataColumnFamily().getTable(this.store);
blockDataTable = new DatanodeTable<>(blockDataTableWithIterator);
checkTableStatus(blockDataTable, blockDataTable.getName());
deletedBlocksTable = new DatanodeTable<>(
dbDef.getDeletedBlocksColumnFamily().getTable(this.store));
checkTableStatus(deletedBlocksTable, deletedBlocksTable.getName());
}
}
@Override
public synchronized void stop() throws Exception {
if (store != null) {
store.close();
store = null;
}
}
@Override
public DBStore getStore() {
return this.store;
}
@Override
public BatchOperationHandler getBatchHandler() {
return this.store;
}
@Override
public Table<String, Long> getMetadataTable() {
return metadataTable;
}
@Override
public Table<String, BlockData> getBlockDataTable() {
return blockDataTable;
}
@Override
public Table<String, ChunkInfoList> getDeletedBlocksTable() {
return deletedBlocksTable;
}
@Override
public BlockIterator<BlockData> getBlockIterator(long containerID)
throws IOException {
return new KeyValueBlockIterator(containerID,
blockDataTableWithIterator.iterator());
}
@Override
public BlockIterator<BlockData> getBlockIterator(long containerID,
KeyPrefixFilter filter) throws IOException {
return new KeyValueBlockIterator(containerID,
blockDataTableWithIterator.iterator(), filter);
}
@Override
public synchronized boolean isClosed() {
if (this.store == null) {
return true;
}
return this.store.isClosed();
}
@Override
public void close() throws IOException {
this.store.close();
this.cfOptions.close();
}
@Override
public void flushDB() throws IOException {
store.flushDB();
}
@Override
public void flushLog(boolean sync) throws IOException {
store.flushLog(sync);
}
@Override
public void compactDB() throws IOException {
store.compactDB();
}
@VisibleForTesting
public DatanodeDBProfile getDbProfile() {
return dbProfile;
}
protected AbstractDatanodeDBDefinition getDbDef() {
return this.dbDef;
}
protected Table<String, BlockData> getBlockDataTableWithIterator() {
return this.blockDataTableWithIterator;
}
private static void checkTableStatus(Table<?, ?> table, String name)
throws IOException {
String logMessage = "Unable to get a reference to %s table. Cannot " +
"continue.";
String errMsg = "Inconsistent DB state, Table - %s. Please check the" +
" logs for more info.";
if (table == null) {
LOG.error(String.format(logMessage, name));
throw new IOException(String.format(errMsg, name));
}
}
/**
* Block Iterator for KeyValue Container. This block iterator returns blocks
* which match with the {@link MetadataKeyFilters.KeyPrefixFilter}. If no
* filter is specified, then default filter used is
* {@link MetadataKeyFilters#getUnprefixedKeyFilter()}
*/
@InterfaceAudience.Public
public static class KeyValueBlockIterator implements
BlockIterator<BlockData>, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(
KeyValueBlockIterator.class);
private final TableIterator<String, ? extends Table.KeyValue<String,
BlockData>>
blockIterator;
private static final KeyPrefixFilter DEFAULT_BLOCK_FILTER =
MetadataKeyFilters.getUnprefixedKeyFilter();
private final KeyPrefixFilter blockFilter;
private BlockData nextBlock;
private final long containerID;
/**
* KeyValueBlockIterator to iterate unprefixed blocks in a container.
* @param iterator - The underlying iterator to apply the block filter to.
*/
KeyValueBlockIterator(long containerID,
TableIterator<String, ? extends Table.KeyValue<String, BlockData>>
iterator) {
this.containerID = containerID;
this.blockIterator = iterator;
this.blockFilter = DEFAULT_BLOCK_FILTER;
}
/**
* KeyValueBlockIterator to iterate blocks in a container.
* @param iterator - The underlying iterator to apply the block filter to.
* @param filter - Block filter, filter to be applied for blocks
*/
KeyValueBlockIterator(long containerID,
TableIterator<String, ? extends Table.KeyValue<String, BlockData>>
iterator, KeyPrefixFilter filter) {
this.containerID = containerID;
this.blockIterator = iterator;
this.blockFilter = filter;
}
/**
* This method returns blocks matching with the filter.
* @return next block or null if no more blocks
* @throws IOException
*/
@Override
public BlockData nextBlock() throws IOException, NoSuchElementException {
if (nextBlock != null) {
BlockData currentBlock = nextBlock;
nextBlock = null;
return currentBlock;
}
if (hasNext()) {
return nextBlock();
}
throw new NoSuchElementException("Block Iterator reached end for " +
"ContainerID " + containerID);
}
@Override
public boolean hasNext() throws IOException {
if (nextBlock != null) {
return true;
}
while (blockIterator.hasNext()) {
Table.KeyValue<String, BlockData> keyValue = blockIterator.next();
byte[] keyBytes = StringUtils.string2Bytes(keyValue.getKey());
if (blockFilter.filterKey(null, keyBytes, null)) {
nextBlock = keyValue.getValue();
if (LOG.isTraceEnabled()) {
LOG.trace("Block matching with filter found: blockID is : {} for " +
"containerID {}", nextBlock.getLocalID(), containerID);
}
return true;
}
}
return false;
}
@Override
public void seekToFirst() {
nextBlock = null;
blockIterator.seekToFirst();
}
@Override
public void seekToLast() {
nextBlock = null;
blockIterator.seekToLast();
}
@Override
public void close() throws IOException {
blockIterator.close();
}
}
}