blob: 894e6bf5f4174611dbb8a6154fab85b61253dadb [file] [log] [blame]
* Copyright (c) 2013 - 2014, Pivotal Inc.
* All rights reserved.
* Author: Zhanwei Wang
* 2014 -
* open source under Apache License Version 2.0
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include "BigEndian.h"
#include "Exception.h"
#include "ExceptionInternal.h"
#include "Packet.h"
#include "PacketHeader.h"
namespace Hdfs {
namespace Internal {
Packet::Packet() :
lastPacketInBlock(false), syncBlock(false), checksumPos(0), checksumSize(0),
checksumStart(0), dataPos(0), dataStart(0), headerStart(0), maxChunks(
0), numChunks(0), offsetInBlock(0), seqno(HEART_BEAT_SEQNO) {
Packet::Packet(int pktSize, int chunksPerPkt, int64_t offsetInBlock,
int64_t seqno, int checksumSize) :
lastPacketInBlock(false), syncBlock(false), checksumSize(checksumSize), headerStart(0),
maxChunks(chunksPerPkt), numChunks(0), offsetInBlock(offsetInBlock), seqno(seqno), buffer(pktSize) {
checksumPos = checksumStart = PacketHeader::GetPkgHeaderSize();
dataPos = dataStart = checksumStart + chunksPerPkt * checksumSize;
assert(dataPos >= 0);
void Packet::reset(int pktSize, int chunksPerPkt, int64_t offsetInBlock,
int64_t seqno, int checksumSize) {
lastPacketInBlock = false;
syncBlock = false;
this->checksumSize = checksumSize;
headerStart = 0;
maxChunks = chunksPerPkt;
numChunks = 0;
this->offsetInBlock = offsetInBlock;
this->seqno = seqno;
checksumPos = checksumStart = PacketHeader::GetPkgHeaderSize();
dataPos = dataStart = checksumStart + chunksPerPkt * checksumSize;
if (pktSize > static_cast<int>(buffer.size())) {
assert(dataPos >= 0);
void Packet::addChecksum(uint32_t checksum) {
if (checksumPos + static_cast<int>(sizeof(uint32_t)) > dataStart) {
"Packet: failed to add checksum into packet, checksum is too large");
WriteBigEndian32ToArray(checksum, &buffer[checksumPos]);
checksumPos += checksumSize;
void Packet::addData(const char * buf, int size) {
if (size + dataPos > static_cast<int>(buffer.size())) {
"Packet: failed add data to packet, packet size is too small");
memcpy(&buffer[dataPos], buf, size);
dataPos += size;
assert(dataPos >= 0);
void Packet::setSyncFlag(bool sync) {
syncBlock = sync;
void Packet::increaseNumChunks() {
bool Packet::isFull() {
return numChunks >= maxChunks;
bool Packet::isHeartbeat() {
return HEART_BEAT_SEQNO == seqno;
void Packet::setLastPacketInBlock(bool lastPacket) {
lastPacketInBlock = lastPacket;
int Packet::getDataSize() {
return dataPos - dataStart;
int64_t Packet::getLastByteOffsetBlock() {
assert(offsetInBlock >= 0 && dataPos >= dataStart);
assert(dataPos - dataStart <= maxChunks * static_cast<int>(buffer.size()));
return offsetInBlock + dataPos - dataStart;
const ConstPacketBuffer Packet::getBuffer() {
* Once this is called, no more data can be added to the packet.
* This is called only when the packet is ready to be sent.
int dataLen = dataPos - dataStart;
int checksumLen = checksumPos - checksumStart;
if (checksumPos != dataStart) {
* move the checksum to cover the gap.
* This can happen for the last packet.
memmove(&buffer[dataStart - checksumLen], &buffer[checksumStart],
headerStart = dataStart - checksumPos;
checksumStart += dataStart - checksumPos;
checksumPos = dataStart;
assert(dataPos >= 0);
int pktLen = dataLen + checksumLen;
PacketHeader header(pktLen + sizeof(int32_t)
/* why we add 4 bytes? Because the server will reduce 4 bytes. -_-*/
, offsetInBlock, seqno, lastPacketInBlock, dataLen);
return ConstPacketBuffer(&buffer[headerStart],
PacketHeader::GetPkgHeaderSize() + pktLen);