blob: 8db645e48d049d549fe526df952f0cc3816687c3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "reader/block_reader.h"
#include "reader/datatransfer.h"
#include "common/continuation/continuation.h"
#include "common/continuation/asio.h"
#include "common/logging.h"
#include "common/util.h"
#include <future>
#include <boost/asio/buffer.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/completion_condition.hpp>
namespace hdfs {
#define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" << (void*)parent_
#define FMT_CONT_AND_READER_ADDR "this=" << (void*)this << ", reader=" << (void*)reader_
#define FMT_THIS_ADDR "this=" << (void*)this
// Stuff an OpReadBlockProto message with required fields.
hadoop::hdfs::OpReadBlockProto ReadBlockProto(const std::string &client_name,
bool verify_checksum, const hadoop::common::TokenProto *token,
const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t offset)
{
using namespace hadoop::hdfs;
using namespace hadoop::common;
BaseHeaderProto *base_h = new BaseHeaderProto();
base_h->set_allocated_block(new ExtendedBlockProto(*block));
if (token) {
base_h->set_allocated_token(new TokenProto(*token));
}
ClientOperationHeaderProto *h = new ClientOperationHeaderProto();
h->set_clientname(client_name);
h->set_allocated_baseheader(base_h);
OpReadBlockProto p;
p.set_allocated_header(h);
p.set_offset(offset);
p.set_len(length);
p.set_sendchecksums(verify_checksum);
// TODO: p.set_allocated_cachingstrategy();
return p;
}
//
// Notes about the BlockReader and associated object lifecycles (9/29/16)
// -We have a several stages in the read pipeline. Each stage represents a logical
// step in the HDFS block transfer logic. They are implemented as continuations
// for now, and in some cases the stage may have a nested continuation as well.
// It's important to make sure that continuations, nested or otherwise, cannot
// outlive the objects they depend on.
//
// -The BlockReader holds a shared_ptr to the DataNodeConnection that's used in each
// pipeline stage. The connection object must never be destroyed while operations are
// pending on the ASIO side (see HDFS-10931). In order to prevent a state where the
// BlockReader or one of the corresponding pipelines outlives the connection each
// pipeline stage must explicitly hold a shared pointer copied from BlockReaderImpl::dn_.
//
static int8_t unsecured_request_block_header[3] = {0, kDataTransferVersion, Operation::kReadBlock};
void BlockReaderImpl::AsyncRequestBlock(const std::string &client_name,
const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
uint64_t offset, const std::function<void(Status)> &handler)
{
LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncRequestBlock("
<< FMT_THIS_ADDR << ", ..., length="
<< length << ", offset=" << offset << ", ...) called");
// The total number of bytes that we need to transfer from the DN is
// the amount that the user wants (bytesToRead), plus the padding at
// the beginning in order to chunk-align. Note that the DN may elect
// to send more than this amount if the read starts/ends mid-chunk.
bytes_to_read_ = length;
struct State {
std::string header;
hadoop::hdfs::OpReadBlockProto request;
hadoop::hdfs::BlockOpResponseProto response;
};
auto m = continuation::Pipeline<State>::Create(cancel_state_);
State *s = &m->state();
s->request = ReadBlockProto(client_name, options_.verify_checksum,
dn_->token_.get(), block, length, offset);
s->header = std::string((const char*)unsecured_request_block_header, 3);
bool serialize_success = true;
s->header += SerializeDelimitedProtobufMessage(&s->request, &serialize_success);
if(!serialize_success) {
handler(Status::Error("Unable to serialize protobuf message"));
return;
}
auto read_pb_message =
new continuation::ReadDelimitedPBMessageContinuation<AsyncStream, 16384>(dn_, &s->response);
m->Push(asio_continuation::Write(dn_, boost::asio::buffer(s->header))).Push(read_pb_message);
m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status;
if (stat.ok()) {
const auto &resp = s.response;
if(this->event_handlers_) {
event_response event_resp = this->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (stat.ok() && event_resp.response_type() == event_response::kTest_Error) {
stat = Status::Error("Test error");
}
#endif
}
if (stat.ok() && resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
if (resp.has_readopchecksuminfo()) {
const auto &checksum_info = resp.readopchecksuminfo();
chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
}
state_ = kReadPacketHeader;
} else {
stat = Status::Error(s.response.message().c_str());
}
}
handler(stat);
});
}
Status BlockReaderImpl::RequestBlock(const std::string &client_name,
const hadoop::hdfs::ExtendedBlockProto *block,
uint64_t length, uint64_t offset)
{
LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlock("
<< FMT_THIS_ADDR <<"..., length="
<< length << ", offset=" << offset << ") called");
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future(stat->get_future());
AsyncRequestBlock(client_name, block, length, offset,
[stat](const Status &status) { stat->set_value(status); });
return future.get();
}
struct BlockReaderImpl::ReadPacketHeader : continuation::Continuation
{
ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {}
virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacketHeader::Run("
<< FMT_CONT_AND_PARENT_ADDR << ") called");
parent_->packet_data_read_bytes_ = 0;
parent_->packet_len_ = 0;
auto handler = [next, this](const boost::system::error_code &ec, size_t) {
Status status;
if (ec) {
status = Status(ec.value(), ec.message().c_str());
} else {
parent_->packet_len_ = packet_length();
parent_->header_.Clear();
bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart],
header_length());
assert(v && "Failed to parse the header");
(void)v; //avoids unused variable warning
parent_->state_ = kReadChecksum;
}
if(parent_->event_handlers_) {
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (status.ok() && event_resp.response_type() == event_response::kTest_Error) {
status = Status::Error("Test error");
}
#endif
}
next(status);
};
boost::asio::async_read(*parent_->dn_, boost::asio::buffer(buf_),
std::bind(&ReadPacketHeader::CompletionHandler, this,
std::placeholders::_1, std::placeholders::_2), handler);
}
private:
static const size_t kMaxHeaderSize = 512;
static const size_t kPayloadLenOffset = 0;
static const size_t kPayloadLenSize = sizeof(int32_t);
static const size_t kHeaderLenOffset = 4;
static const size_t kHeaderLenSize = sizeof(int16_t);
static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize;
BlockReaderImpl *parent_;
std::array<char, kMaxHeaderSize> buf_;
size_t packet_length() const {
return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset]));
}
size_t header_length() const {
return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset]));
}
size_t CompletionHandler(const boost::system::error_code &ec, size_t transferred) {
if (ec) {
return 0;
} else if (transferred < kHeaderStart) {
return kHeaderStart - transferred;
} else {
return kHeaderStart + header_length() - transferred;
}
}
// Keep the DN connection alive
std::shared_ptr<DataNodeConnection> shared_conn_;
};
struct BlockReaderImpl::ReadChecksum : continuation::Continuation
{
ReadChecksum(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {}
virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadChecksum::Run("
<< FMT_CONT_AND_PARENT_ADDR << ") called");
auto parent = parent_;
if (parent->state_ != kReadChecksum) {
next(Status::OK());
return;
}
std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_;
auto handler = [parent, next, this, keep_conn_alive_](const boost::system::error_code &ec, size_t)
{
Status status;
if (ec) {
status = Status(ec.value(), ec.message().c_str());
} else {
parent->state_ = parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
}
if(parent->event_handlers_) {
event_response event_resp = parent->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (status.ok() && event_resp.response_type() == event_response::kTest_Error) {
status = Status::Error("Test error");
}
#endif
}
next(status);
};
parent->checksum_.resize(parent->packet_len_ - sizeof(int) - parent->header_.datalen());
boost::asio::async_read(*parent->dn_, boost::asio::buffer(parent->checksum_), handler);
}
private:
BlockReaderImpl *parent_;
// Keep the DataNodeConnection alive
std::shared_ptr<DataNodeConnection> shared_conn_;
};
struct BlockReaderImpl::ReadData : continuation::Continuation
{
ReadData(BlockReaderImpl *parent, std::shared_ptr<size_t> bytes_transferred,
const boost::asio::mutable_buffers_1 &buf) : parent_(parent),
bytes_transferred_(bytes_transferred), buf_(buf), shared_conn_(parent->dn_)
{
buf_.begin();
}
~ReadData() {
buf_.end();
}
virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run("
<< FMT_CONT_AND_PARENT_ADDR << ") called");
auto handler =
[next, this](const boost::system::error_code &ec, size_t transferred) {
Status status;
if (ec) {
status = Status(ec.value(), ec.message().c_str());
}
*bytes_transferred_ += transferred;
parent_->bytes_to_read_ -= transferred;
parent_->packet_data_read_bytes_ += transferred;
if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
parent_->state_ = kReadPacketHeader;
}
if(parent_->event_handlers_) {
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (status.ok() && event_resp.response_type() == event_response::kTest_Error) {
status = Status::Error("Test error");
}
#endif
}
next(status);
};
auto data_len = parent_->header_.datalen() - parent_->packet_data_read_bytes_;
boost::asio::async_read(*parent_->dn_, buf_, boost::asio::transfer_exactly(data_len), handler);
}
private:
BlockReaderImpl *parent_;
std::shared_ptr<size_t> bytes_transferred_;
const boost::asio::mutable_buffers_1 buf_;
// Keep DNConnection alive.
std::shared_ptr<DataNodeConnection> shared_conn_;
};
struct BlockReaderImpl::ReadPadding : continuation::Continuation
{
ReadPadding(BlockReaderImpl *parent) : parent_(parent),
padding_(parent->chunk_padding_bytes_),
bytes_transferred_(std::make_shared<size_t>(0)),
read_data_(new ReadData(parent, bytes_transferred_, boost::asio::buffer(padding_))),
shared_conn_(parent->dn_) {}
virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPadding::Run("
<< FMT_CONT_AND_PARENT_ADDR << ") called");
if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) {
next(Status::OK());
return;
}
std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_;
auto h = [next, this, keep_conn_alive_](const Status &stat) {
Status status = stat;
if (status.ok()) {
assert(reinterpret_cast<const int &>(*bytes_transferred_) == parent_->chunk_padding_bytes_);
parent_->chunk_padding_bytes_ = 0;
parent_->state_ = kReadData;
}
if(parent_->event_handlers_) {
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (status.ok() && event_resp.response_type() == event_response::kTest_Error) {
status = Status::Error("Test error");
}
#endif
}
next(status);
};
read_data_->Run(h);
}
private:
BlockReaderImpl *parent_;
std::vector<char> padding_;
std::shared_ptr<size_t> bytes_transferred_;
std::shared_ptr<continuation::Continuation> read_data_;
ReadPadding(const ReadPadding &) = delete;
ReadPadding &operator=(const ReadPadding &) = delete;
// Keep DNConnection alive.
std::shared_ptr<DataNodeConnection> shared_conn_;
};
struct BlockReaderImpl::AckRead : continuation::Continuation
{
AckRead(BlockReaderImpl *parent) : parent_(parent), shared_conn_(parent->dn_) {}
virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::AckRead::Run(" << FMT_CONT_AND_PARENT_ADDR << ") called");
if (parent_->bytes_to_read_ > 0) {
next(Status::OK());
return;
}
auto m = continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create(parent_->cancel_state_);
m->state().set_status(parent_->options_.verify_checksum
? hadoop::hdfs::Status::CHECKSUM_OK
: hadoop::hdfs::Status::SUCCESS);
m->Push(continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state()));
std::shared_ptr<DataNodeConnection> keep_conn_alive_ = shared_conn_;
m->Run([this, next, keep_conn_alive_](const Status &stat, const hadoop::hdfs::ClientReadStatusProto &)
{
Status status = stat;
if (status.ok()) {
parent_->state_ = BlockReaderImpl::kFinished;
}
if(parent_->event_handlers_) {
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (status.ok() && event_resp.response_type() == event_response::kTest_Error) {
status = Status::Error("Test error");
}
#endif
}
next(status);
});
}
private:
BlockReaderImpl *parent_;
// Keep DNConnection alive.
std::shared_ptr<DataNodeConnection> shared_conn_;
};
void BlockReaderImpl::AsyncReadPacket(const MutableBuffer &buffer,
const std::function<void(const Status &, size_t bytes_transferred)> &handler)
{
assert(state_ != kOpen && "Not connected");
LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadPacket called");
struct State {
std::shared_ptr<size_t> bytes_transferred;
};
auto m = continuation::Pipeline<State>::Create(cancel_state_);
m->state().bytes_transferred = std::make_shared<size_t>(0);
// Note: some of these continuations have nested pipelines.
m->Push(new ReadPacketHeader(this))
.Push(new ReadChecksum(this))
.Push(new ReadPadding(this))
.Push(new ReadData(
this, m->state().bytes_transferred, buffer))
.Push(new AckRead(this));
auto self = this->shared_from_this();
m->Run([self, handler](const Status &status, const State &state) {
handler(status, *state.bytes_transferred);
});
}
size_t BlockReaderImpl::ReadPacket(const MutableBuffer &buffer, Status *status)
{
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacket called");
size_t transferred = 0;
auto done = std::make_shared<std::promise<void>>();
auto future = done->get_future();
AsyncReadPacket(buffer,
[status, &transferred, done](const Status &stat, size_t t) {
*status = stat;
transferred = t;
done->set_value();
});
future.wait();
return transferred;
}
struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation
{
RequestBlockContinuation(BlockReader *reader, const std::string &client_name,
const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, uint64_t offset)
: reader_(reader), client_name_(client_name), length_(length), offset_(offset)
{
block_.CheckTypeAndMergeFrom(*block);
}
virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlockContinuation::Run("
<< FMT_CONT_AND_READER_ADDR << ") called");
reader_->AsyncRequestBlock(client_name_, &block_, length_, offset_, next);
}
private:
BlockReader *reader_;
const std::string client_name_;
hadoop::hdfs::ExtendedBlockProto block_;
uint64_t length_;
uint64_t offset_;
};
struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation
{
ReadBlockContinuation(BlockReader *reader, MutableBuffer buffer, size_t *transferred)
: reader_(reader), buffer_(buffer), buffer_size_(boost::asio::buffer_size(buffer)), transferred_(transferred) {}
virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadBlockContinuation::Run("
<< FMT_CONT_AND_READER_ADDR << ") called");
*transferred_ = 0;
next_ = next;
OnReadData(Status::OK(), 0);
}
private:
BlockReader *reader_;
const MutableBuffer buffer_;
const size_t buffer_size_;
size_t *transferred_;
std::function<void(const Status &)> next_;
void OnReadData(const Status &status, size_t transferred) {
using std::placeholders::_1;
using std::placeholders::_2;
*transferred_ += transferred;
if (!status.ok()) {
next_(status);
} else if (*transferred_ >= buffer_size_) {
next_(status);
} else {
reader_->AsyncReadPacket(
boost::asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
}
}
};
void BlockReaderImpl::AsyncReadBlock(
const std::string & client_name,
const hadoop::hdfs::LocatedBlockProto &block,
size_t offset,
const MutableBuffer &buffer,
const std::function<void(const Status &, size_t)> handler)
{
LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadBlock("
<< FMT_THIS_ADDR << ") called");
auto m = continuation::Pipeline<size_t>::Create(cancel_state_);
size_t * bytesTransferred = &m->state();
size_t size = boost::asio::buffer_size(buffer);
m->Push(new RequestBlockContinuation(this, client_name, &block.b(), size, offset))
.Push(new ReadBlockContinuation(this, buffer, bytesTransferred));
m->Run([handler] (const Status &status, const size_t totalBytesTransferred) {
handler(status, totalBytesTransferred);
});
}
void BlockReaderImpl::CancelOperation() {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::CancelOperation("
<< FMT_THIS_ADDR << ") called");
/* just forward cancel to DNConnection */
dn_->Cancel();
}
}