blob: 4272861c57e2dfe5e7507f9fb74999d7cacc8a15 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.keyvalue.impl;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.hdds.utils.BatchOperation;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH;
/**
* This class is for performing block related operations on the KeyValue
* Container.
*/
public class BlockManagerImpl implements BlockManager {
static final Logger LOG = LoggerFactory.getLogger(BlockManagerImpl.class);
private static byte[] blockCommitSequenceIdKey =
DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX);
private Configuration config;
/**
* Constructs a Block Manager.
*
* @param conf - Ozone configuration
*/
public BlockManagerImpl(Configuration conf) {
Preconditions.checkNotNull(conf, "Config cannot be null");
this.config = conf;
}
/**
* Puts or overwrites a block.
*
* @param container - Container for which block need to be added.
* @param data - BlockData.
* @return length of the block.
* @throws IOException
*/
public long putBlock(Container container, BlockData data) throws IOException {
Preconditions.checkNotNull(data, "BlockData cannot be null for put " +
"operation.");
Preconditions.checkState(data.getContainerID() >= 0, "Container Id " +
"cannot be negative");
// We are not locking the key manager since LevelDb serializes all actions
// against a single DB. We rely on DB level locking to avoid conflicts.
try(ReferenceCountedDB db = BlockUtils.
getDB((KeyValueContainerData) container.getContainerData(), config)) {
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
long bcsId = data.getBlockCommitSequenceId();
long containerBCSId = ((KeyValueContainerData) container.
getContainerData()).getBlockCommitSequenceId();
// default blockCommitSequenceId for any block is 0. It the putBlock
// request is not coming via Ratis(for test scenarios), it will be 0.
// In such cases, we should overwrite the block as well
if (bcsId != 0) {
if (bcsId <= containerBCSId) {
// Since the blockCommitSequenceId stored in the db is greater than
// equal to blockCommitSequenceId to be updated, it means the putBlock
// transaction is reapplied in the ContainerStateMachine on restart.
// It also implies that the given block must already exist in the db.
// just log and return
LOG.warn("blockCommitSequenceId " + containerBCSId
+ " in the Container Db is greater than" + " the supplied value "
+ bcsId + " .Ignoring it");
return data.getSize();
}
}
// update the blockData as well as BlockCommitSequenceId here
BatchOperation batch = new BatchOperation();
batch.put(Longs.toByteArray(data.getLocalID()),
data.getProtoBufMessage().toByteArray());
batch.put(blockCommitSequenceIdKey,
Longs.toByteArray(bcsId));
db.getStore().writeBatch(batch);
container.updateBlockCommitSequenceId(bcsId);
// Increment keycount here
container.getContainerData().incrKeyCount();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Block " + data.getBlockID() + " successfully committed with bcsId "
+ bcsId + " chunk size " + data.getChunks().size());
}
return data.getSize();
}
}
/**
* Gets an existing block.
*
* @param container - Container from which block need to be fetched.
* @param blockID - BlockID of the block.
* @return Key Data.
* @throws IOException
*/
@Override
public BlockData getBlock(Container container, BlockID blockID)
throws IOException {
long bcsId = blockID.getBlockCommitSequenceId();
Preconditions.checkNotNull(blockID,
"BlockID cannot be null in GetBlock request");
Preconditions.checkNotNull(container,
"Container cannot be null");
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
try(ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) {
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
long containerBCSId = containerData.getBlockCommitSequenceId();
if (containerBCSId < bcsId) {
throw new StorageContainerException(
"Unable to find the block with bcsID " + bcsId + " .Container "
+ container.getContainerData().getContainerID() + " bcsId is "
+ containerBCSId + ".", UNKNOWN_BCSID);
}
byte[] kData = db.getStore().get(Longs.toByteArray(blockID.getLocalID()));
if (kData == null) {
throw new StorageContainerException("Unable to find the block." +
blockID, NO_SUCH_BLOCK);
}
ContainerProtos.BlockData blockData =
ContainerProtos.BlockData.parseFrom(kData);
long id = blockData.getBlockID().getBlockCommitSequenceId();
if (id < bcsId) {
throw new StorageContainerException(
"bcsId " + bcsId + " mismatches with existing block Id "
+ id + " for block " + blockID + ".", BCSID_MISMATCH);
}
return BlockData.getFromProtoBuf(blockData);
}
}
/**
* Returns the length of the committed block.
*
* @param container - Container from which block need to be fetched.
* @param blockID - BlockID of the block.
* @return length of the block.
* @throws IOException in case, the block key does not exist in db.
*/
@Override
public long getCommittedBlockLength(Container container, BlockID blockID)
throws IOException {
KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
try(ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) {
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
byte[] kData = db.getStore().get(Longs.toByteArray(blockID.getLocalID()));
if (kData == null) {
throw new StorageContainerException("Unable to find the block.",
NO_SUCH_BLOCK);
}
ContainerProtos.BlockData blockData =
ContainerProtos.BlockData.parseFrom(kData);
return blockData.getSize();
}
}
/**
* Deletes an existing block.
*
* @param container - Container from which block need to be deleted.
* @param blockID - ID of the block.
* @throws StorageContainerException
*/
public void deleteBlock(Container container, BlockID blockID) throws
IOException {
Preconditions.checkNotNull(blockID, "block ID cannot be null.");
Preconditions.checkState(blockID.getContainerID() >= 0,
"Container ID cannot be negative.");
Preconditions.checkState(blockID.getLocalID() >= 0,
"Local ID cannot be negative.");
KeyValueContainerData cData = (KeyValueContainerData) container
.getContainerData();
try(ReferenceCountedDB db = BlockUtils.getDB(cData, config)) {
// This is a post condition that acts as a hint to the user.
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
// Note : There is a race condition here, since get and delete
// are not atomic. Leaving it here since the impact is refusing
// to delete a Block which might have just gotten inserted after
// the get check.
byte[] kKey = Longs.toByteArray(blockID.getLocalID());
byte[] kData = db.getStore().get(kKey);
if (kData == null) {
throw new StorageContainerException("Unable to find the block.",
NO_SUCH_BLOCK);
}
db.getStore().delete(kKey);
// Decrement blockcount here
container.getContainerData().decrKeyCount();
}
}
/**
* List blocks in a container.
*
* @param container - Container from which blocks need to be listed.
* @param startLocalID - Key to start from, 0 to begin.
* @param count - Number of blocks to return.
* @return List of Blocks that match the criteria.
*/
@Override
public List<BlockData> listBlock(Container container, long startLocalID, int
count) throws IOException {
Preconditions.checkNotNull(container, "container cannot be null");
Preconditions.checkState(startLocalID >= 0, "startLocal ID cannot be " +
"negative");
Preconditions.checkArgument(count > 0,
"Count must be a positive number.");
container.readLock();
try {
List<BlockData> result = null;
KeyValueContainerData cData =
(KeyValueContainerData) container.getContainerData();
try (ReferenceCountedDB db = BlockUtils.getDB(cData, config)) {
result = new ArrayList<>();
byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
List<Map.Entry<byte[], byte[]>> range = db.getStore()
.getSequentialRangeKVs(startKeyInBytes, count,
MetadataKeyFilters.getNormalKeyFilter());
for (Map.Entry<byte[], byte[]> entry : range) {
BlockData value = BlockUtils.getBlockData(entry.getValue());
BlockData data = new BlockData(value.getBlockID());
result.add(data);
}
return result;
}
} finally {
container.readUnlock();
}
}
/**
* Shutdown KeyValueContainerManager.
*/
public void shutdown() {
BlockUtils.shutdownCache(ContainerCache.getInstance(config));
}
}