blob: 42d0471433414cb771931b161b795cb4b83c8f16 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
*
* We still use triplets to store DatanodeStorageInfo for each block in the
* block group, as well as the previous/next block in the corresponding
* DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
* are sorted and strictly mapped to the corresponding block.
*
* Normally each block belonging to group is stored in only one DataNode.
* However, it is possible that some block is over-replicated. Thus the triplet
* array's size can be larger than (m+k). Thus currently we use an extra byte
* array to record the block index for each triplet.
*/
@InterfaceAudience.Private
public class BlockInfoStriped extends BlockInfo {
private final ErasureCodingPolicy ecPolicy;
/**
* Always the same size with triplets. Record the block index for each triplet
* TODO: actually this is only necessary for over-replicated block. Thus can
* be further optimized to save memory usage.
*/
private byte[] indices;
public BlockInfoStriped(Block blk, ErasureCodingPolicy ecPolicy) {
super(blk, (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()));
indices = new byte[ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()];
initIndices();
this.ecPolicy = ecPolicy;
}
public short getTotalBlockNum() {
return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
}
public short getDataBlockNum() {
return (short) ecPolicy.getNumDataUnits();
}
public short getParityBlockNum() {
return (short) ecPolicy.getNumParityUnits();
}
public int getCellSize() {
return ecPolicy.getCellSize();
}
/**
* If the block is committed/completed and its length is less than a full
* stripe, it returns the the number of actual data blocks.
* Otherwise it returns the number of data units specified by erasure coding policy.
*/
public short getRealDataBlockNum() {
if (isComplete() || getBlockUCState() == BlockUCState.COMMITTED) {
return (short) Math.min(getDataBlockNum(),
(getNumBytes() - 1) / ecPolicy.getCellSize() + 1);
} else {
return getDataBlockNum();
}
}
public short getRealTotalBlockNum() {
return (short) (getRealDataBlockNum() + getParityBlockNum());
}
public ErasureCodingPolicy getErasureCodingPolicy() {
return ecPolicy;
}
private void initIndices() {
for (int i = 0; i < indices.length; i++) {
indices[i] = -1;
}
}
private int findSlot() {
int i = getTotalBlockNum();
for (; i < getCapacity(); i++) {
if (getStorageInfo(i) == null) {
return i;
}
}
// need to expand the triplet size
ensureCapacity(i + 1, true);
return i;
}
@Override
boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
Preconditions.checkArgument(BlockIdManager.isStripedBlockID(
reportedBlock.getBlockId()), "reportedBlock is not striped");
Preconditions.checkArgument(BlockIdManager.convertToStripedID(
reportedBlock.getBlockId()) == this.getBlockId(),
"reported blk_%s does not belong to the group of stored blk_%s",
reportedBlock.getBlockId(), this.getBlockId());
int blockIndex = BlockIdManager.getBlockIndex(reportedBlock);
int index = blockIndex;
DatanodeStorageInfo old = getStorageInfo(index);
if (old != null && !old.equals(storage)) { // over replicated
// check if the storage has been stored
int i = findStorageInfo(storage);
if (i == -1) {
index = findSlot();
} else {
return true;
}
}
addStorage(storage, index, blockIndex);
return true;
}
private void addStorage(DatanodeStorageInfo storage, int index,
int blockIndex) {
setStorageInfo(index, storage);
setNext(index, null);
setPrevious(index, null);
indices[index] = (byte) blockIndex;
}
private int findStorageInfoFromEnd(DatanodeStorageInfo storage) {
final int len = getCapacity();
for(int idx = len - 1; idx >= 0; idx--) {
DatanodeStorageInfo cur = getStorageInfo(idx);
if (storage.equals(cur)) {
return idx;
}
}
return -1;
}
@VisibleForTesting
public byte getStorageBlockIndex(DatanodeStorageInfo storage) {
int i = this.findStorageInfo(storage);
return i == -1 ? -1 : indices[i];
}
/**
* Identify the block stored in the given datanode storage. Note that
* the returned block has the same block Id with the one seen/reported by the
* DataNode.
*/
Block getBlockOnStorage(DatanodeStorageInfo storage) {
int index = getStorageBlockIndex(storage);
if (index < 0) {
return null;
} else {
Block block = new Block(this);
block.setBlockId(this.getBlockId() + index);
return block;
}
}
@Override
boolean removeStorage(DatanodeStorageInfo storage) {
int dnIndex = findStorageInfoFromEnd(storage);
if (dnIndex < 0) { // the node is not found
return false;
}
assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
"Block is still in the list and must be removed first.";
// set the triplet to null
setStorageInfo(dnIndex, null);
setNext(dnIndex, null);
setPrevious(dnIndex, null);
indices[dnIndex] = -1;
return true;
}
private void ensureCapacity(int totalSize, boolean keepOld) {
if (getCapacity() < totalSize) {
Object[] old = triplets;
byte[] oldIndices = indices;
triplets = new Object[totalSize * 3];
indices = new byte[totalSize];
initIndices();
if (keepOld) {
System.arraycopy(old, 0, triplets, 0, old.length);
System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
}
}
}
public long spaceConsumed() {
// In case striped blocks, total usage by this striped blocks should
// be the total of data blocks and parity blocks because
// `getNumBytes` is the total of actual data block size.
return StripedBlockUtil.spaceConsumedByStripedBlock(getNumBytes(),
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits(),
ecPolicy.getCellSize());
}
@Override
public final boolean isStriped() {
return true;
}
@Override
public BlockType getBlockType() {
return BlockType.STRIPED;
}
@Override
public int numNodes() {
assert this.triplets != null : "BlockInfo is not initialized";
assert triplets.length % 3 == 0 : "Malformed BlockInfo";
int num = 0;
for (int idx = getCapacity()-1; idx >= 0; idx--) {
if (getStorageInfo(idx) != null) {
num++;
}
}
return num;
}
@Override
final boolean hasNoStorage() {
final int len = getCapacity();
for(int idx = 0; idx < len; idx++) {
if (getStorageInfo(idx) != null) {
return false;
}
}
return true;
}
/**
* Striped blocks on Provided Storage is not supported. All blocks on
* Provided storage are assumed to be "contiguous".
*/
@Override
boolean isProvided() {
return false;
}
/**
* This class contains datanode storage information and block index in the
* block group.
*/
public static class StorageAndBlockIndex {
private final DatanodeStorageInfo storage;
private final byte blockIndex;
StorageAndBlockIndex(DatanodeStorageInfo storage, byte blockIndex) {
this.storage = storage;
this.blockIndex = blockIndex;
}
/**
* @return storage in the datanode.
*/
public DatanodeStorageInfo getStorage() {
return storage;
}
/**
* @return block index in the block group.
*/
public byte getBlockIndex() {
return blockIndex;
}
}
public Iterable<StorageAndBlockIndex> getStorageAndIndexInfos() {
return new Iterable<StorageAndBlockIndex>() {
@Override
public Iterator<StorageAndBlockIndex> iterator() {
return new Iterator<StorageAndBlockIndex>() {
private int index = 0;
@Override
public boolean hasNext() {
while (index < getCapacity() && getStorageInfo(index) == null) {
index++;
}
return index < getCapacity();
}
@Override
public StorageAndBlockIndex next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
int i = index++;
return new StorageAndBlockIndex(
(DatanodeStorageInfo) triplets[i * 3], indices[i]);
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove is not supported");
}
};
}
};
}
}