blob: 8c656a3c8f54b8ae43dc6b9b8cce88434fab81c6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "BigEndian.h"
#include "Exception.h"
#include "ExceptionInternal.h"
#include "PacketHeader.h"
namespace hdfs {
namespace internal {
int PacketHeader::PkgHeaderSize = PacketHeader::CalcPkgHeaderSize();
int PacketHeader::CalcPkgHeaderSize() {
PacketHeaderProto header;
header.set_offsetinblock(0);
header.set_datalen(0);
header.set_lastpacketinblock(false);
header.set_seqno(0);
return header.ByteSize() + sizeof(int32_t) /*packet length*/ + sizeof(int16_t)/* proto length */;
}
int PacketHeader::GetPkgHeaderSize() {
return PkgHeaderSize;
}
PacketHeader::PacketHeader() :
packetLen(0) {
}
PacketHeader::PacketHeader(int packetLen, int64_t offsetInBlock, int64_t seqno,
bool lastPacketInBlock, int dataLen) :
packetLen(packetLen) {
proto.set_offsetinblock(offsetInBlock);
proto.set_seqno(seqno);
proto.set_lastpacketinblock(lastPacketInBlock);
proto.set_datalen(dataLen);
}
int PacketHeader::getDataLen() {
return proto.datalen();
}
bool PacketHeader::isLastPacketInBlock() {
return proto.lastpacketinblock();
}
bool PacketHeader::sanityCheck(int64_t lastSeqNo) {
// We should only have a non-positive data length for the last packet
if (proto.datalen() <= 0 && !proto.lastpacketinblock())
return false;
// The last packet should not contain data
if (proto.lastpacketinblock() && proto.datalen() != 0)
return false;
// Seqnos should always increase by 1 with each packet received
if (proto.seqno() != lastSeqNo + 1)
return false;
return true;
}
int64_t PacketHeader::getSeqno() {
return proto.seqno();
}
int64_t PacketHeader::getOffsetInBlock() {
return proto.offsetinblock();
}
int PacketHeader::getPacketLen() {
return packetLen;
}
void PacketHeader::readFields(const char * buf, size_t size) {
int16_t protoLen;
assert(size > sizeof(packetLen) + sizeof(protoLen));
packetLen = ReadBigEndian32FromArray(buf);
protoLen = ReadBigEndian16FromArray(buf + sizeof(packetLen));
if (packetLen < static_cast<int>(sizeof(int32_t)) || protoLen < 0
|| static_cast<int>(sizeof(packetLen) + sizeof(protoLen)) + protoLen > static_cast<int>(size)) {
THROW(HdfsIOException, "Invalid PacketHeader, packetLen is %d, protoLen is %hd, buf size is %zu", packetLen,
protoLen, size);
}
if (!proto.ParseFromArray(buf + sizeof(packetLen) + sizeof(protoLen),
protoLen)) {
THROW(HdfsIOException,
"PacketHeader cannot parse PacketHeaderProto from datanode response.");
}
}
void PacketHeader::writeInBuffer(char * buf, size_t size) {
buf = WriteBigEndian32ToArray(packetLen, buf);
buf = WriteBigEndian16ToArray(proto.ByteSize(), buf);
proto.SerializeToArray(buf, size - sizeof(int32_t) - sizeof(int16_t));
}
}
}