blob: 6b717ecdfbff21660e33863215deac21256c2e37 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.DirectBufferPool;
import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class to handle reading packets one-at-a-time from the wire.
* These packets are used both for reading and writing data to/from
* DataNodes.
*/
@InterfaceAudience.Private
public class PacketReceiver implements Closeable {
/**
* The max size of any single packet. This prevents OOMEs when
* invalid data is sent.
*/
public static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
static final Logger LOG = LoggerFactory.getLogger(PacketReceiver.class);
private static final DirectBufferPool bufferPool = new DirectBufferPool();
private final boolean useDirectBuffers;
/**
* The entirety of the most recently read packet.
* The first PKT_LENGTHS_LEN bytes of this buffer are the
* length prefixes.
*/
private ByteBuffer curPacketBuf = null;
/**
* A slice of {@link #curPacketBuf} which contains just the checksums.
*/
private ByteBuffer curChecksumSlice = null;
/**
* A slice of {@link #curPacketBuf} which contains just the data.
*/
private ByteBuffer curDataSlice = null;
/**
* The packet header of the most recently read packet.
*/
private PacketHeader curHeader;
public PacketReceiver(boolean useDirectBuffers) {
this.useDirectBuffers = useDirectBuffers;
reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN);
}
public PacketHeader getHeader() {
return curHeader;
}
public ByteBuffer getDataSlice() {
return curDataSlice;
}
public ByteBuffer getChecksumSlice() {
return curChecksumSlice;
}
/**
* Reads all of the data for the next packet into the appropriate buffers.
*
* The data slice and checksum slice members will be set to point to the
* user data and corresponding checksums. The header will be parsed and
* set.
*/
public void receiveNextPacket(ReadableByteChannel in) throws IOException {
doRead(in, null);
}
/**
* @see #receiveNextPacket(ReadableByteChannel)
*/
public void receiveNextPacket(InputStream in) throws IOException {
doRead(null, in);
}
private void doRead(ReadableByteChannel ch, InputStream in)
throws IOException {
// Each packet looks like:
// PLEN HLEN HEADER CHECKSUMS DATA
// 32-bit 16-bit <protobuf> <variable length>
//
// PLEN: Payload length
// = length(PLEN) + length(CHECKSUMS) + length(DATA)
// This length includes its own encoded length in
// the sum for historical reasons.
//
// HLEN: Header length
// = length(HEADER)
//
// HEADER: the actual packet header fields, encoded in protobuf
// CHECKSUMS: the crcs for the data chunk. May be missing if
// checksums were not requested
// DATA the actual block data
Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
curPacketBuf.clear();
curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN);
doReadFully(ch, in, curPacketBuf);
curPacketBuf.flip();
int payloadLen = curPacketBuf.getInt();
if (payloadLen < Ints.BYTES) {
// The "payload length" includes its own length. Therefore it
// should never be less than 4 bytes
throw new IOException("Invalid payload length " +
payloadLen);
}
int dataPlusChecksumLen = payloadLen - Ints.BYTES;
int headerLen = curPacketBuf.getShort();
if (headerLen < 0) {
throw new IOException("Invalid header length " + headerLen);
}
LOG.trace("readNextPacket: dataPlusChecksumLen={}, headerLen={}",
dataPlusChecksumLen, headerLen);
// Sanity check the buffer size so we don't allocate too much memory
// and OOME.
int totalLen = payloadLen + headerLen;
if (totalLen < 0 || totalLen > MAX_PACKET_SIZE) {
throw new IOException("Incorrect value for packet payload size: " +
payloadLen);
}
// Make sure we have space for the whole packet, and
// read it.
reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN +
dataPlusChecksumLen + headerLen);
curPacketBuf.clear();
curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN +
dataPlusChecksumLen + headerLen);
doReadFully(ch, in, curPacketBuf);
curPacketBuf.flip();
curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
// Extract the header from the front of the buffer (after the length prefixes)
byte[] headerBuf = new byte[headerLen];
curPacketBuf.get(headerBuf);
if (curHeader == null) {
curHeader = new PacketHeader();
}
curHeader.setFieldsFromData(payloadLen, headerBuf);
// Compute the sub-slices of the packet
int checksumLen = dataPlusChecksumLen - curHeader.getDataLen();
if (checksumLen < 0) {
throw new IOException("Invalid packet: data length in packet header " +
"exceeds data length received. dataPlusChecksumLen=" +
dataPlusChecksumLen + " header: " + curHeader);
}
reslicePacket(headerLen, checksumLen, curHeader.getDataLen());
}
/**
* Rewrite the last-read packet on the wire to the given output stream.
*/
public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException {
Preconditions.checkState(!useDirectBuffers,
"Currently only supported for non-direct buffers");
mirrorOut.write(curPacketBuf.array(),
curPacketBuf.arrayOffset(),
curPacketBuf.remaining());
}
private static void doReadFully(ReadableByteChannel ch, InputStream in,
ByteBuffer buf) throws IOException {
if (ch != null) {
readChannelFully(ch, buf);
} else {
Preconditions.checkState(!buf.isDirect(),
"Must not use direct buffers with InputStream API");
IOUtils.readFully(in, buf.array(),
buf.arrayOffset() + buf.position(),
buf.remaining());
buf.position(buf.position() + buf.remaining());
}
}
private void reslicePacket(
int headerLen, int checksumsLen, int dataLen) {
// Packet structure (refer to doRead() for details):
// PLEN HLEN HEADER CHECKSUMS DATA
// 32-bit 16-bit <protobuf> <variable length>
// |--- lenThroughHeader ----|
// |----------- lenThroughChecksums ----|
// |------------------- lenThroughData ------|
int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen;
int lenThroughChecksums = lenThroughHeader + checksumsLen;
int lenThroughData = lenThroughChecksums + dataLen;
assert dataLen >= 0 : "invalid datalen: " + dataLen;
assert curPacketBuf.position() == lenThroughHeader;
assert curPacketBuf.limit() == lenThroughData :
"headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen +
" rem=" + curPacketBuf.remaining();
// Slice the checksums.
curPacketBuf.position(lenThroughHeader);
curPacketBuf.limit(lenThroughChecksums);
curChecksumSlice = curPacketBuf.slice();
// Slice the data.
curPacketBuf.position(lenThroughChecksums);
curPacketBuf.limit(lenThroughData);
curDataSlice = curPacketBuf.slice();
// Reset buffer to point to the entirety of the packet (including
// length prefixes)
curPacketBuf.position(0);
curPacketBuf.limit(lenThroughData);
}
private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
throws IOException {
while (buf.remaining() > 0) {
int n = ch.read(buf);
if (n < 0) {
throw new IOException("Premature EOF reading from " + ch);
}
}
}
private void reallocPacketBuf(int atLeastCapacity) {
// Realloc the buffer if this packet is longer than the previous
// one.
if (curPacketBuf == null ||
curPacketBuf.capacity() < atLeastCapacity) {
ByteBuffer newBuf;
if (useDirectBuffers) {
newBuf = bufferPool.getBuffer(atLeastCapacity);
} else {
newBuf = ByteBuffer.allocate(atLeastCapacity);
}
// If reallocing an existing buffer, copy the old packet length
// prefixes over
if (curPacketBuf != null) {
curPacketBuf.flip();
newBuf.put(curPacketBuf);
}
returnPacketBufToPool();
curPacketBuf = newBuf;
}
}
private void returnPacketBufToPool() {
if (curPacketBuf != null && curPacketBuf.isDirect()) {
bufferPool.returnBuffer(curPacketBuf);
curPacketBuf = null;
}
}
@Override // Closeable
public void close() {
returnPacketBufToPool();
}
@Override
protected void finalize() throws Throwable {
try {
// just in case it didn't get closed, we
// may as well still try to return the buffer
returnPacketBufToPool();
} finally {
super.finalize();
}
}
}