blob: 8f482e3531b7d1b8789bd2df945c7dc5ecaf434a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.WireFormat;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
private final static int CHUNK_SIZE = 64*1024; // 64K
private static long[] EMPTY_LONGS = new long[]{0, 0};
public static BlockListAsLongs EMPTY = new BlockListAsLongs() {
@Override
public int getNumberOfBlocks() {
return 0;
}
@Override
public ByteString getBlocksBuffer() {
return ByteString.EMPTY;
}
@Override
public long[] getBlockListAsLongs() {
return EMPTY_LONGS;
}
@Override
public Iterator<BlockReportReplica> iterator() {
return Collections.emptyIterator();
}
};
/**
* Prepare an instance to in-place decode the given ByteString buffer.
* @param numBlocks - blocks in the buffer
* @param blocksBuf - ByteString encoded varints
* @param maxDataLength - maximum allowable data size in protobuf message
* @return BlockListAsLongs
*/
public static BlockListAsLongs decodeBuffer(final int numBlocks,
final ByteString blocksBuf, final int maxDataLength) {
return new BufferDecoder(numBlocks, blocksBuf, maxDataLength);
}
/**
* Prepare an instance to in-place decode the given ByteString buffers.
* @param numBlocks - blocks in the buffers
* @param blocksBufs - list of ByteString encoded varints
* @return BlockListAsLongs
*/
@VisibleForTesting
public static BlockListAsLongs decodeBuffers(final int numBlocks,
final List<ByteString> blocksBufs) {
return decodeBuffers(numBlocks, blocksBufs,
IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
}
/**
* Prepare an instance to in-place decode the given ByteString buffers.
* @param numBlocks - blocks in the buffers
* @param blocksBufs - list of ByteString encoded varints
* @param maxDataLength - maximum allowable data size in protobuf message
* @return BlockListAsLongs
*/
public static BlockListAsLongs decodeBuffers(final int numBlocks,
final List<ByteString> blocksBufs, final int maxDataLength) {
// this doesn't actually copy the data
return decodeBuffer(numBlocks, ByteString.copyFrom(blocksBufs),
maxDataLength);
}
/**
* Prepare an instance to in-place decode the given list of Longs. Note
* it's much more efficient to decode ByteString buffers and only exists
* for compatibility.
* @param blocksList - list of longs
* @return BlockListAsLongs
*/
public static BlockListAsLongs decodeLongs(List<Long> blocksList) {
return decodeLongs(blocksList, IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
}
/**
* Prepare an instance to in-place decode the given list of Longs. Note
* it's much more efficient to decode ByteString buffers and only exists
* for compatibility.
* @param blocksList - list of longs
* @param maxDataLength - maximum allowable data size in protobuf message
* @return BlockListAsLongs
*/
public static BlockListAsLongs decodeLongs(List<Long> blocksList,
int maxDataLength) {
return blocksList.isEmpty() ? EMPTY :
new LongsDecoder(blocksList, maxDataLength);
}
/**
* Prepare an instance to encode the collection of replicas into an
* efficient ByteString.
* @param replicas - replicas to encode
* @return BlockListAsLongs
*/
@VisibleForTesting
public static BlockListAsLongs encode(
final Collection<? extends Replica> replicas) {
BlockListAsLongs.Builder builder = builder(IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
for (Replica replica : replicas) {
builder.add(replica);
}
return builder.build();
}
public static BlockListAsLongs readFrom(InputStream is, int maxDataLength)
throws IOException {
CodedInputStream cis = CodedInputStream.newInstance(is);
if (maxDataLength != IPC_MAXIMUM_DATA_LENGTH_DEFAULT) {
cis.setSizeLimit(maxDataLength);
}
int numBlocks = -1;
ByteString blocksBuf = null;
while (!cis.isAtEnd()) {
int tag = cis.readTag();
int field = WireFormat.getTagFieldNumber(tag);
switch(field) {
case 0:
break;
case 1:
numBlocks = (int)cis.readInt32();
break;
case 2:
blocksBuf = cis.readBytes();
break;
default:
cis.skipField(tag);
break;
}
}
if (numBlocks != -1 && blocksBuf != null) {
return decodeBuffer(numBlocks, blocksBuf, maxDataLength);
}
return null;
}
public void writeTo(OutputStream os) throws IOException {
CodedOutputStream cos = CodedOutputStream.newInstance(os);
cos.writeInt32(1, getNumberOfBlocks());
cos.writeBytes(2, getBlocksBuffer());
cos.flush();
}
@VisibleForTesting
public static Builder builder() {
return builder(IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
}
public static Builder builder(int maxDataLength) {
return new BlockListAsLongs.Builder(maxDataLength);
}
/**
* The number of blocks
* @return - the number of blocks
*/
abstract public int getNumberOfBlocks();
/**
* Very efficient encoding of the block report into a ByteString to avoid
* the overhead of protobuf repeating fields. Primitive repeating fields
* require re-allocs of an ArrayList<Long> and the associated (un)boxing
* overhead which puts pressure on GC.
*
* The structure of the buffer is as follows:
* - each replica is represented by 4 longs:
* blockId, block length, genstamp, replica state
*
* @return ByteString encoded block report
*/
abstract public ByteString getBlocksBuffer();
/**
* List of ByteStrings that encode this block report
*
* @return ByteStrings
*/
public List<ByteString> getBlocksBuffers() {
final ByteString blocksBuf = getBlocksBuffer();
final List<ByteString> buffers;
final int size = blocksBuf.size();
if (size <= CHUNK_SIZE) {
buffers = Collections.singletonList(blocksBuf);
} else {
buffers = new ArrayList<ByteString>();
for (int pos=0; pos < size; pos += CHUNK_SIZE) {
// this doesn't actually copy the data
buffers.add(blocksBuf.substring(pos, Math.min(pos+CHUNK_SIZE, size)));
}
}
return buffers;
}
/**
* Convert block report to old-style list of longs. Only used to
* re-encode the block report when the DN detects an older NN. This is
* inefficient, but in practice a DN is unlikely to be upgraded first
*
* The structure of the array is as follows:
* 0: the length of the finalized replica list;
* 1: the length of the under-construction replica list;
* - followed by finalized replica list where each replica is represented by
* 3 longs: one for the blockId, one for the block length, and one for
* the generation stamp;
* - followed by the invalid replica represented with three -1s;
* - followed by the under-construction replica list where each replica is
* represented by 4 longs: three for the block id, length, generation
* stamp, and the fourth for the replica state.
* @return list of longs
*/
abstract public long[] getBlockListAsLongs();
/**
* Returns a singleton iterator over blocks in the block report. Do not
* add the returned blocks to a collection.
* @return Iterator
*/
abstract public Iterator<BlockReportReplica> iterator();
public static class Builder {
private final ByteString.Output out;
private final CodedOutputStream cos;
private int numBlocks = 0;
private int numFinalized = 0;
private final int maxDataLength;
Builder(int maxDataLength) {
out = ByteString.newOutput(64*1024);
cos = CodedOutputStream.newInstance(out);
this.maxDataLength = maxDataLength;
}
public void add(Replica replica) {
try {
// zig-zag to reduce size of legacy blocks
cos.writeSInt64NoTag(replica.getBlockId());
cos.writeRawVarint64(replica.getBytesOnDisk());
cos.writeRawVarint64(replica.getGenerationStamp());
ReplicaState state = replica.getState();
// although state is not a 64-bit value, using a long varint to
// allow for future use of the upper bits
cos.writeRawVarint64(state.getValue());
if (state == ReplicaState.FINALIZED) {
numFinalized++;
}
numBlocks++;
} catch (IOException ioe) {
// shouldn't happen, ByteString.Output doesn't throw IOE
throw new IllegalStateException(ioe);
}
}
public int getNumberOfBlocks() {
return numBlocks;
}
public BlockListAsLongs build() {
try {
cos.flush();
} catch (IOException ioe) {
// shouldn't happen, ByteString.Output doesn't throw IOE
throw new IllegalStateException(ioe);
}
return new BufferDecoder(numBlocks, numFinalized, out.toByteString(),
maxDataLength);
}
}
// decode new-style ByteString buffer based block report
private static class BufferDecoder extends BlockListAsLongs {
// reserve upper bits for future use. decoding masks off these bits to
// allow compatibility for the current through future release that may
// start using the bits
private static long NUM_BYTES_MASK = (-1L) >>> (64 - 48);
private static long REPLICA_STATE_MASK = (-1L) >>> (64 - 4);
private final ByteString buffer;
private final int numBlocks;
private int numFinalized;
private final int maxDataLength;
BufferDecoder(final int numBlocks, final ByteString buf,
final int maxDataLength) {
this(numBlocks, -1, buf, maxDataLength);
}
BufferDecoder(final int numBlocks, final int numFinalized,
final ByteString buf, final int maxDataLength) {
this.numBlocks = numBlocks;
this.numFinalized = numFinalized;
this.buffer = buf;
this.maxDataLength = maxDataLength;
}
@Override
public int getNumberOfBlocks() {
return numBlocks;
}
@Override
public ByteString getBlocksBuffer() {
return buffer;
}
@Override
public long[] getBlockListAsLongs() {
// terribly inefficient but only occurs if server tries to transcode
// an undecoded buffer into longs - ie. it will never happen but let's
// handle it anyway
if (numFinalized == -1) {
int n = 0;
for (Replica replica : this) {
if (replica.getState() == ReplicaState.FINALIZED) {
n++;
}
}
numFinalized = n;
}
int numUc = numBlocks - numFinalized;
int size = 2 + 3*(numFinalized+1) + 4*(numUc);
long[] longs = new long[size];
longs[0] = numFinalized;
longs[1] = numUc;
int idx = 2;
int ucIdx = idx + 3*numFinalized;
// delimiter block
longs[ucIdx++] = -1;
longs[ucIdx++] = -1;
longs[ucIdx++] = -1;
for (BlockReportReplica block : this) {
switch (block.getState()) {
case FINALIZED: {
longs[idx++] = block.getBlockId();
longs[idx++] = block.getNumBytes();
longs[idx++] = block.getGenerationStamp();
break;
}
default: {
longs[ucIdx++] = block.getBlockId();
longs[ucIdx++] = block.getNumBytes();
longs[ucIdx++] = block.getGenerationStamp();
longs[ucIdx++] = block.getState().getValue();
break;
}
}
}
return longs;
}
@Override
public Iterator<BlockReportReplica> iterator() {
return new Iterator<BlockReportReplica>() {
final BlockReportReplica block = new BlockReportReplica();
final CodedInputStream cis = buffer.newCodedInput();
private int currentBlockIndex = 0;
{
if (maxDataLength != IPC_MAXIMUM_DATA_LENGTH_DEFAULT) {
cis.setSizeLimit(maxDataLength);
}
}
@Override
public boolean hasNext() {
return currentBlockIndex < numBlocks;
}
@Override
public BlockReportReplica next() {
currentBlockIndex++;
try {
// zig-zag to reduce size of legacy blocks and mask off bits
// we don't (yet) understand
block.setBlockId(cis.readSInt64());
block.setNumBytes(cis.readRawVarint64() & NUM_BYTES_MASK);
block.setGenerationStamp(cis.readRawVarint64());
long state = cis.readRawVarint64() & REPLICA_STATE_MASK;
block.setState(ReplicaState.getState((int)state));
} catch (IOException e) {
throw new IllegalStateException(e);
}
return block;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
// decode old style block report of longs
private static class LongsDecoder extends BlockListAsLongs {
private final List<Long> values;
private final int finalizedBlocks;
private final int numBlocks;
private final int maxDataLength;
// set the header
LongsDecoder(List<Long> values, int maxDataLength) {
this.values = values.subList(2, values.size());
this.finalizedBlocks = values.get(0).intValue();
this.numBlocks = finalizedBlocks + values.get(1).intValue();
this.maxDataLength = maxDataLength;
}
@Override
public int getNumberOfBlocks() {
return numBlocks;
}
@Override
public ByteString getBlocksBuffer() {
Builder builder = builder(maxDataLength);
for (Replica replica : this) {
builder.add(replica);
}
return builder.build().getBlocksBuffer();
}
@Override
public long[] getBlockListAsLongs() {
long[] longs = new long[2+values.size()];
longs[0] = finalizedBlocks;
longs[1] = numBlocks - finalizedBlocks;
for(int i=0; i<values.size(); i++) {
longs[2+i] = values.get(i);
}
return longs;
}
@Override
public Iterator<BlockReportReplica> iterator() {
return new Iterator<BlockReportReplica>() {
private final BlockReportReplica block = new BlockReportReplica();
final Iterator<Long> iter = values.iterator();
private int currentBlockIndex = 0;
@Override
public boolean hasNext() {
return currentBlockIndex < numBlocks;
}
@Override
public BlockReportReplica next() {
if (currentBlockIndex == finalizedBlocks) {
// verify the presence of the delimiter block
readBlock();
Preconditions.checkArgument(block.getBlockId() == -1 &&
block.getNumBytes() == -1 &&
block.getGenerationStamp() == -1,
"Invalid delimiter block");
}
readBlock();
if (currentBlockIndex++ < finalizedBlocks) {
block.setState(ReplicaState.FINALIZED);
} else {
block.setState(ReplicaState.getState(iter.next().intValue()));
}
return block;
}
private void readBlock() {
block.setBlockId(iter.next());
block.setNumBytes(iter.next());
block.setGenerationStamp(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
@InterfaceAudience.Private
public static class BlockReportReplica extends Block implements Replica {
private ReplicaState state;
private BlockReportReplica() {
}
public BlockReportReplica(Block block) {
super(block);
if (block instanceof BlockReportReplica) {
this.state = ((BlockReportReplica)block).getState();
} else {
this.state = ReplicaState.FINALIZED;
}
}
public void setState(ReplicaState state) {
this.state = state;
}
@Override
public ReplicaState getState() {
return state;
}
@Override
public long getBytesOnDisk() {
return getNumBytes();
}
@Override
public long getVisibleLength() {
throw new UnsupportedOperationException();
}
@Override
public String getStorageUuid() {
throw new UnsupportedOperationException();
}
@Override
public boolean isOnTransientStorage() {
throw new UnsupportedOperationException();
}
@Override
public boolean equals(Object o) {
return super.equals(o);
}
@Override
public int hashCode() {
return super.hashCode();
}
}
}