blob: f88a12d968c515cb5ec1579fdf0092e92c95305f [file] [log] [blame]
/********************************************************************
* 2014 -
* open source under Apache License Version 2.0
********************************************************************/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "DateTime.h"
#include "Pipeline.h"
#include "Logger.h"
#include "Exception.h"
#include "ExceptionInternal.h"
#include "OutputStreamInter.h"
#include "FileSystemInter.h"
#include "DataTransferProtocolSender.h"
#include "datatransfer.pb.h"
#include <inttypes.h>
namespace Hdfs {
namespace Internal {
PipelineImpl::PipelineImpl(bool append, const char * path, const SessionConfig & conf,
shared_ptr<FileSystemInter> filesystem, int checksumType, int chunkSize,
int replication, int64_t bytesSent, PacketPool & packetPool, shared_ptr<LocatedBlock> lastBlock) :
checksumType(checksumType), chunkSize(chunkSize), errorIndex(-1), replication(replication), bytesAcked(
bytesSent), bytesSent(bytesSent), packetPool(packetPool), filesystem(filesystem), lastBlock(lastBlock), path(
path) {
canAddDatanode = conf.canAddDatanode();
blockWriteRetry = conf.getBlockWriteRetry();
connectTimeout = conf.getOutputConnTimeout();
readTimeout = conf.getOutputReadTimeout();
writeTimeout = conf.getOutputWriteTimeout();
clientName = filesystem->getClientName();
if (append) {
LOG(DEBUG2, "create pipeline for file %s to append to %s at position %" PRId64,
path, lastBlock->toString().c_str(), lastBlock->getNumBytes());
stage = PIPELINE_SETUP_APPEND;
assert(lastBlock);
nodes = lastBlock->getLocations();
storageIDs = lastBlock->getStorageIDs();
buildForAppendOrRecovery(false);
stage = DATA_STREAMING;
} else {
LOG(DEBUG2, "create pipeline for file %s to write to a new block", path);
stage = PIPELINE_SETUP_CREATE;
buildForNewBlock();
stage = DATA_STREAMING;
}
}
int PipelineImpl::findNewDatanode(const std::vector<DatanodeInfo> & original) {
if (nodes.size() != original.size() + 1) {
THROW(HdfsIOException, "Failed to acquire a datanode for block %s from namenode.",
lastBlock->toString().c_str());
}
for (size_t i = 0; i < nodes.size(); i++) {
size_t j = 0;
for (; j < original.size() && !(nodes[i] == original[j]); j++)
;
if (j == original.size()) {
return i;
}
}
THROW(HdfsIOException, "Cannot add new datanode for block %s.", lastBlock->toString().c_str());
}
void PipelineImpl::transfer(const ExtendedBlock & blk, const DatanodeInfo & src,
const std::vector<DatanodeInfo> & targets, const Token & token) {
shared_ptr<Socket> so(new TcpSocketImpl);
shared_ptr<BufferedSocketReader> in(new BufferedSocketReaderImpl(*so));
so->connect(src.getIpAddr().c_str(), src.getXferPort(), connectTimeout);
DataTransferProtocolSender sender(*so, writeTimeout, src.formatAddress());
sender.transferBlock(blk, token, clientName.c_str(), targets);
int size;
size = in->readVarint32(readTimeout);
std::vector<char> buf(size);
in->readFully(&buf[0], size, readTimeout);
BlockOpResponseProto resp;
if (!resp.ParseFromArray(&buf[0], size)) {
THROW(HdfsIOException, "cannot parse datanode response from %s fro block %s.",
src.formatAddress().c_str(), lastBlock->toString().c_str());
}
if (Status::DT_PROTO_SUCCESS != resp.status()) {
THROW(HdfsIOException, "Failed to transfer block to a new datanode %s for block %s.",
targets[0].formatAddress().c_str(),
lastBlock->toString().c_str());
}
}
bool PipelineImpl::addDatanodeToPipeline(const std::vector<DatanodeInfo> & excludedNodes) {
try {
/*
* get a new datanode
*/
std::vector<DatanodeInfo> original = nodes;
shared_ptr<LocatedBlock> lb;
lb = filesystem->getAdditionalDatanode(path, *lastBlock, nodes, storageIDs,
excludedNodes, 1);
nodes = lb->getLocations();
storageIDs = lb->getStorageIDs();
/*
* failed to add new datanode into pipeline.
*/
if (original.size() == nodes.size()) {
LOG(LOG_ERROR,
"Failed to add new datanode into pipeline for block: %s file %s.",
lastBlock->toString().c_str(), path.c_str());
} else {
/*
* find the new datanode
*/
int d = findNewDatanode(original);
/*
* in case transfer block fail.
*/
errorIndex = d;
/*
* transfer replica
*/
DatanodeInfo & src = d == 0 ? nodes[1] : nodes[d - 1];
std::vector<DatanodeInfo> targets;
targets.push_back(nodes[d]);
LOG(INFO, "Replicate block %s from %s to %s for file %s.", lastBlock->toString().c_str(),
src.formatAddress().c_str(), targets[0].formatAddress().c_str(), path.c_str());
transfer(*lastBlock, src, targets, lb->getToken());
errorIndex = -1;
return true;
}
} catch (const HdfsCanceled & e) {
throw;
} catch (const HdfsFileSystemClosed & e) {
throw;
} catch (const SafeModeException & e) {
throw;
} catch (const HdfsException & e) {
std::string buffer;
LOG(LOG_ERROR,
"Failed to add a new datanode into pipeline for block: %s file %s.\n%s",
lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
}
return false;
}
void PipelineImpl::checkPipelineWithReplicas() {
if (static_cast<int>(nodes.size()) < replication) {
std::stringstream ss;
ss.imbue(std::locale::classic());
int size = nodes.size();
for (int i = 0; i < size - 1; ++i) {
ss << nodes[i].formatAddress() << ", ";
}
if (nodes.empty()) {
ss << "Empty";
} else {
ss << nodes.back().formatAddress();
}
LOG(WARNING,
"the number of nodes in pipeline is %d [%s], is less than the expected number of replica %d for block %s file %s",
static_cast<int>(nodes.size()), ss.str().c_str(), replication,
lastBlock->toString().c_str(), path.c_str());
}
}
void PipelineImpl::buildForAppendOrRecovery(bool recovery) {
int64_t gs = 0;
int retry = blockWriteRetry;
exception_ptr lastException;
std::vector<DatanodeInfo> excludedNodes;
shared_ptr<LocatedBlock> lb;
std::string buffer;
do {
/*
* Remove bad datanode from list of datanodes.
* If errorIndex was not set (i.e. appends), then do not remove
* any datanodes
*/
if (errorIndex >= 0) {
assert(lastBlock);
LOG(LOG_ERROR, "Pipeline: node %s is invalid and removed from pipeline when %s block %s for file %s, stage = %s.",
nodes[errorIndex].formatAddress().c_str(),
(recovery ? "recovery" : "append to"), lastBlock->toString().c_str(),
path.c_str(), StageToString(stage));
excludedNodes.push_back(nodes[errorIndex]);
nodes.erase(nodes.begin() + errorIndex);
if (!storageIDs.empty()) {
storageIDs.erase(storageIDs.begin() + errorIndex);
}
if (nodes.empty()) {
THROW(HdfsIOException,
"Build pipeline to %s block %s failed: all datanodes are bad.",
(recovery ? "recovery" : "append to"), lastBlock->toString().c_str());
}
errorIndex = -1;
}
try {
gs = 0;
/*
* Check if the number of datanodes in pipeline satisfy the replication requirement,
* add new datanode if not
*/
if (stage != PIPELINE_SETUP_CREATE && stage != PIPELINE_CLOSE
&& static_cast<int>(nodes.size()) < replication && canAddDatanode) {
if (!addDatanodeToPipeline(excludedNodes)) {
THROW(HdfsIOException,
"Failed to add new datanode into pipeline for block: %s file %s, "
"set \"output.replace-datanode-on-failure\" to \"false\" to disable this feature.",
lastBlock->toString().c_str(), path.c_str());
}
}
if (errorIndex >= 0) {
continue;
}
checkPipelineWithReplicas();
/*
* Update generation stamp and access token
*/
lb = filesystem->updateBlockForPipeline(*lastBlock);
gs = lb->getGenerationStamp();
/*
* Try to build pipeline
*/
createBlockOutputStream(lb->getToken(), gs, recovery);
/*
* everything is ok, reset errorIndex.
*/
errorIndex = -1;
lastException = exception_ptr();
break; //break on success
} catch (const HdfsInvalidBlockToken & e) {
lastException = current_exception();
recovery = true;
LOG(LOG_ERROR,
"Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %" PRId64 ",\n%s",
lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer));
LOG(INFO, "Try to recovery pipeline for block %s file %s.",
lastBlock->toString().c_str(), path.c_str());
} catch (const HdfsTimeoutException & e) {
lastException = current_exception();
recovery = true;
LOG(LOG_ERROR,
"Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %" PRId64 ",\n%s",
lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer));
LOG(INFO, "Try to recovery pipeline for block %s file %s.",
lastBlock->toString().c_str(), path.c_str());
} catch (const HdfsIOException & e) {
lastException = current_exception();
/*
* Set recovery flag to true in case of failed to create a pipeline for appending.
*/
recovery = true;
LOG(LOG_ERROR,
"Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %" PRId64 ",\n%s",
lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer));
LOG(INFO, "Try to recovery pipeline for block %s file %s.", lastBlock->toString().c_str(), path.c_str());
}
/*
* we don't known what happened, no datanode is reported failure, reduce retry count in case infinite loop.
* it may caused by rpc call throw HdfsIOException
*/
if (errorIndex < 0) {
--retry;
}
} while (retry > 0);
if (lastException) {
rethrow_exception(lastException);
}
/*
* Update pipeline at the namenode, non-idempotent RPC call.
*/
lb->setPoolId(lastBlock->getPoolId());
lb->setBlockId(lastBlock->getBlockId());
lb->setLocations(nodes);
lb->setStorageIDs(storageIDs);
lb->setNumBytes(lastBlock->getNumBytes());
lb->setOffset(lastBlock->getOffset());
filesystem->updatePipeline(*lastBlock, *lb, nodes, storageIDs);
lastBlock = lb;
}
void PipelineImpl::locateNextBlock(
const std::vector<DatanodeInfo> & excludedNodes) {
milliseconds sleeptime(100);
milliseconds fiveSeconds(5000);
int retry = blockWriteRetry;
while (true) {
try {
lastBlock = filesystem->addBlock(path, lastBlock.get(),
excludedNodes);
assert(lastBlock);
return;
} catch (const NotReplicatedYetException & e) {
LOG(DEBUG1, "Got NotReplicatedYetException when try to addBlock for block %s, "
"already retry %d times, max retry %d times", lastBlock->toString().c_str(),
blockWriteRetry - retry, blockWriteRetry);
if (retry--) {
try {
sleep_for(sleeptime);
} catch (...) {
}
sleeptime *= 2;
sleeptime = sleeptime < fiveSeconds ? sleeptime : fiveSeconds;
} else {
throw;
}
}
}
}
static std::string FormatExcludedNodes(
const std::vector<DatanodeInfo> & excludedNodes) {
std::stringstream ss;
ss.imbue(std::locale::classic());
ss << "[";
int size = excludedNodes.size();
for (int i = 0; i < size - 1; ++i) {
ss << excludedNodes[i].formatAddress() << ", ";
}
if (excludedNodes.empty()) {
ss << "Empty";
} else {
ss << excludedNodes.back().formatAddress();
}
ss << "]";
return ss.str();
}
void PipelineImpl::buildForNewBlock() {
int retryAllocNewBlock = 0, retry = blockWriteRetry;
LocatedBlock lb;
std::vector<DatanodeInfo> excludedNodes;
shared_ptr<LocatedBlock> block = lastBlock;
std::string buffer;
do {
errorIndex = -1;
lastBlock = block;
try {
locateNextBlock(excludedNodes);
lastBlock->setNumBytes(0);
nodes = lastBlock->getLocations();
storageIDs = lastBlock->getStorageIDs();
} catch (const HdfsRpcException & e) {
const char * lastBlockName = lastBlock ? lastBlock->toString().c_str() : "Null";
LOG(LOG_ERROR,
"Failed to allocate a new empty block for file %s, last block %s, excluded nodes %s.\n%s",
path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str(), GetExceptionDetail(e, buffer));
if (retryAllocNewBlock > blockWriteRetry) {
throw;
}
LOG(INFO, "Retry to allocate a new empty block for file %s, last block %s, excluded nodes %s.",
path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str());
++retryAllocNewBlock;
continue;
} catch (const HdfsException & e) {
const char * lastBlockName = lastBlock ? lastBlock->toString().c_str() : "Null";
LOG(LOG_ERROR,
"Failed to allocate a new empty block for file %s, last block %s, excluded nodes %s.\n%s",
path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str(), GetExceptionDetail(e, buffer));
throw;
}
retryAllocNewBlock = 0;
checkPipelineWithReplicas();
if (nodes.empty()) {
THROW(HdfsIOException,
"No datanode is available to create a pipeline for block %s file %s.",
lastBlock->toString().c_str(), path.c_str());
}
try {
createBlockOutputStream(lastBlock->getToken(), 0, false);
break; //break on success
} catch (const HdfsInvalidBlockToken & e) {
LOG(LOG_ERROR,
"Failed to setup the pipeline for new block %s file %s.\n%s",
lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
} catch (const HdfsTimeoutException & e) {
LOG(LOG_ERROR,
"Failed to setup the pipeline for new block %s file %s.\n%s",
lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
} catch (const HdfsIOException & e) {
LOG(LOG_ERROR,
"Failed to setup the pipeline for new block %s file %s.\n%s",
lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
}
LOG(INFO, "Abandoning block: %s for file %s.", lastBlock->toString().c_str(), path.c_str());
try {
filesystem->abandonBlock(*lastBlock, path);
} catch (const HdfsException & e) {
LOG(LOG_ERROR,
"Failed to abandon useless block %s for file %s.\n%s",
lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
throw;
}
if (errorIndex >= 0) {
LOG(INFO, "Excluding invalid datanode: %s for block %s for file %s",
nodes[errorIndex].formatAddress().c_str(), lastBlock->toString().c_str(), path.c_str());
excludedNodes.push_back(nodes[errorIndex]);
} else {
/*
* we don't known what happened, no datanode is reported failure, reduce retry count in case of infinite loop.
*/
--retry;
}
} while (retry);
}
/*
* bad link node must be either empty or a "IP:PORT"
*/
void PipelineImpl::checkBadLinkFormat(const std::string & n) {
std::string node = n;
if (node.empty()) {
return;
}
do {
const char * host = &node[0], *port;
size_t pos = node.find_last_of(":");
if (pos == node.npos || pos + 1 == node.length()) {
break;
}
node[pos] = 0;
port = &node[pos + 1];
struct addrinfo hints, *addrs;
memset(&hints, 0, sizeof(hints));
hints.ai_family = PF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
int p;
char * end;
p = strtol(port, &end, 0);
if (p >= 65536 || p <= 0 || end != port + strlen(port)) {
break;
}
if (getaddrinfo(host, port, &hints, &addrs)) {
break;
}
freeaddrinfo(addrs);
return;
} while (0);
LOG(FATAL, "Cannot parser the firstBadLink string %s, it should be a bug or protocol incompatible.",
n.c_str());
THROW(HdfsException, "Cannot parser the firstBadLink string %s, it should be a bug or protocol incompatible.",
n.c_str());
}
void PipelineImpl::createBlockOutputStream(const Token & token, int64_t gs, bool recovery) {
std::string firstBadLink;
exception_ptr lastError;
bool needWrapException = true;
try {
sock = shared_ptr < Socket > (new TcpSocketImpl);
reader = shared_ptr<BufferedSocketReader>(new BufferedSocketReaderImpl(*sock));
sock->connect(nodes[0].getIpAddr().c_str(), nodes[0].getXferPort(),
connectTimeout);
std::vector<DatanodeInfo> targets;
for (size_t i = 1; i < nodes.size(); ++i) {
targets.push_back(nodes[i]);
}
DataTransferProtocolSender sender(*sock, writeTimeout,
nodes[0].formatAddress());
sender.writeBlock(*lastBlock, token, clientName.c_str(), targets,
(recovery ? (stage | 0x1) : stage), nodes.size(),
lastBlock->getNumBytes(), bytesSent, gs, checksumType, chunkSize);
int size;
size = reader->readVarint32(readTimeout);
std::vector<char> buf(size);
reader->readFully(&buf[0], size, readTimeout);
BlockOpResponseProto resp;
if (!resp.ParseFromArray(&buf[0], size)) {
THROW(HdfsIOException, "cannot parse datanode response from %s for block %s.",
nodes[0].formatAddress().c_str(), lastBlock->toString().c_str());
}
Status pipelineStatus = resp.status();
firstBadLink = resp.firstbadlink();
if (Status::DT_PROTO_SUCCESS != pipelineStatus) {
needWrapException = false;
if (Status::DT_PROTO_ERROR_ACCESS_TOKEN == pipelineStatus) {
THROW(HdfsInvalidBlockToken,
"Got access token error for connect ack with firstBadLink as %s for block %s",
firstBadLink.c_str(), lastBlock->toString().c_str());
} else {
THROW(HdfsIOException, "Bad connect ack with firstBadLink as %s for block %s",
firstBadLink.c_str(), lastBlock->toString().c_str());
}
}
return;
} catch (...) {
errorIndex = 0;
lastError = current_exception();
}
checkBadLinkFormat(firstBadLink);
if (!firstBadLink.empty()) {
for (size_t i = 0; i < nodes.size(); ++i) {
if (nodes[i].getXferAddr() == firstBadLink) {
errorIndex = i;
break;
}
}
}
assert(lastError);
if (!needWrapException) {
rethrow_exception(lastError);
}
try {
rethrow_exception(lastError);
} catch (const HdfsException & e) {
NESTED_THROW(HdfsIOException,
"Cannot create block output stream for block %s, "
"recovery flag: %s, with last generate stamp %" PRId64 ".",
lastBlock->toString().c_str(), (recovery ? "true" : "false"), gs);
}
}
void PipelineImpl::resend() {
assert(stage != PIPELINE_CLOSE);
for (size_t i = 0; i < packets.size(); ++i) {
ConstPacketBuffer b = packets[i]->getBuffer();
sock->writeFully(b.getBuffer(), b.getSize(), writeTimeout);
int64_t tmp = packets[i]->getLastByteOffsetBlock();
bytesSent = bytesSent > tmp ? bytesSent : tmp;
}
}
void PipelineImpl::send(shared_ptr<Packet> packet) {
ConstPacketBuffer buffer = packet->getBuffer();
if (!packet->isHeartbeat()) {
packets.push_back(packet);
}
/*
* too many packets pending on the ack. wait in case of consuming to much memory.
*/
if (static_cast<int>(packets.size()) > packetPool.getMaxSize()) {
waitForAcks(false);
}
bool failover = false;
do {
try {
if (failover) {
resend();
} else {
assert(sock);
sock->writeFully(buffer.getBuffer(), buffer.getSize(),
writeTimeout);
int64_t tmp = packet->getLastByteOffsetBlock();
bytesSent = bytesSent > tmp ? bytesSent : tmp;
}
checkResponse(false);
return;
} catch (const HdfsIOException & e) {
if (errorIndex < 0) {
errorIndex = 0;
}
sock.reset();
}
buildForAppendOrRecovery(true);
failover = true;
if (stage == PIPELINE_CLOSE) {
assert(packets.size() == 1 && packets[0]->isLastPacketInBlock());
packets.clear();
break;
}
} while (true);
}
void PipelineImpl::processAck(PipelineAck & ack) {
assert(!ack.isInvalid());
int64_t seqno = ack.getSeqno();
if (HEART_BEAT_SEQNO == seqno) {
return;
}
assert(!packets.empty());
Packet & packet = *packets[0];
if (ack.isSuccess()) {
if (packet.getSeqno() != seqno) {
THROW(HdfsIOException,
"processAck: pipeline ack expecting seqno %" PRId64 " but received %" PRId64 " for block %s.",
packet.getSeqno(), seqno, lastBlock->toString().c_str());
}
int64_t tmp = packet.getLastByteOffsetBlock();
bytesAcked = tmp > bytesAcked ? tmp : bytesAcked;
assert(lastBlock);
lastBlock->setNumBytes(bytesAcked);
if (packet.isLastPacketInBlock()) {
sock.reset();
}
packetPool.relesePacket(packets[0]);
packets.pop_front();
} else {
for (int i = ack.getNumOfReplies() - 1; i >= 0; --i) {
if (Status::DT_PROTO_SUCCESS != ack.getReply(i)) {
errorIndex = i;
/*
* handle block token expire as same as HdfsIOException.
*/
THROW(HdfsIOException,
"processAck: ack report error at node: %s for block %s.",
nodes[i].formatAddress().c_str(), lastBlock->toString().c_str());
}
}
}
}
void PipelineImpl::processResponse() {
PipelineAck ack;
std::vector<char> buf;
int size = reader->readVarint32(readTimeout);
ack.reset();
buf.resize(size);
reader->readFully(&buf[0], size, readTimeout);
ack.readFrom(&buf[0], size);
if (ack.isInvalid()) {
THROW(HdfsIOException,
"processAllAcks: get an invalid DataStreamer packet ack for block %s",
lastBlock->toString().c_str());
}
processAck(ack);
}
void PipelineImpl::checkResponse(bool wait) {
int timeout = wait ? readTimeout : 0;
bool readable = reader->poll(timeout);
if (readable) {
processResponse();
} else if (wait) {
THROW(HdfsIOException, "Timeout when reading response for block %s, datanode %s do not response.",
lastBlock->toString().c_str(),
nodes[0].formatAddress().c_str());
}
}
void PipelineImpl::flush() {
waitForAcks(true);
}
void PipelineImpl::waitForAcks(bool force) {
bool failover = false;
while (!packets.empty()) {
/*
* just wait for some acks in case of consuming too much memory.
*/
if (!force && static_cast<int>(packets.size()) < packetPool.getMaxSize()) {
return;
}
try {
if (failover) {
resend();
}
checkResponse(true);
failover = false;
} catch (const HdfsIOException & e) {
if (errorIndex < 0) {
errorIndex = 0;
}
std::string buffer;
LOG(LOG_ERROR,
"Failed to flush pipeline on datanode %s for block %s file %s.\n%s",
nodes[errorIndex].formatAddress().c_str(), lastBlock->toString().c_str(),
path.c_str(), GetExceptionDetail(e, buffer));
LOG(INFO, "Rebuild pipeline to flush for block %s file %s.", lastBlock->toString().c_str(), path.c_str());
sock.reset();
failover = true;
}
if (failover) {
buildForAppendOrRecovery(true);
if (stage == PIPELINE_CLOSE) {
assert(packets.size() == 1 && packets[0]->isLastPacketInBlock());
packets.clear();
break;
}
}
}
}
shared_ptr<LocatedBlock> PipelineImpl::close(shared_ptr<Packet> lastPacket) {
waitForAcks(true);
lastPacket->setLastPacketInBlock(true);
stage = PIPELINE_CLOSE;
send(lastPacket);
waitForAcks(true);
sock.reset();
lastBlock->setNumBytes(bytesAcked);
LOG(DEBUG2, "close pipeline for file %s, block %s with length %" PRId64,
path.c_str(), lastBlock->toString().c_str(),
lastBlock->getNumBytes());
return lastBlock;
}
}
}