blob: d5118089027b0064791b490324b02f5fb8e6a856 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include "reader/block_reader.h"
#include "filehandle.h"
#include "common/continuation/continuation.h"
#include "common/logging.h"
#include "connection/datanodeconnection.h"
#include "hdfspp/events.h"
#include "x-platform/types.h"
#include <future>
#include <memory>
#include <string>
#include <tuple>
#include <boost/asio/buffer.hpp>
#define FMT_THIS_ADDR "this=" << (void*)this
namespace hdfs {
using ::hadoop::hdfs::LocatedBlocksProto;
FileHandle::~FileHandle() {}
FileHandleImpl::FileHandleImpl(const std::string & cluster_name,
const std::string & path,
std::shared_ptr<IoService> io_service, const std::shared_ptr<std::string> &client_name,
const std::shared_ptr<const struct FileInfo> file_info,
std::shared_ptr<BadDataNodeTracker> bad_data_nodes,
std::shared_ptr<LibhdfsEvents> event_handlers)
: cluster_name_(cluster_name), path_(path), io_service_(io_service), client_name_(client_name), file_info_(file_info),
bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()), event_handlers_(event_handlers), bytes_read_(0) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl("
<< FMT_THIS_ADDR << ", ...) called");
void FileHandleImpl::PositionRead(
void *buf, size_t buf_size, uint64_t offset,
const std::function<void(const Status &, size_t)> &handler) {
LOG_DEBUG(kFileHandle, << "FileHandleImpl::PositionRead("
<< FMT_THIS_ADDR << ", buf=" << buf
<< ", buf_size=" << std::to_string(buf_size) << ") called");
/* prevent usage after cancelation */
if(cancel_state_->is_canceled()) {
handler(Status::Canceled(), 0);
auto callback = [this, handler](const Status &status,
const std::string &contacted_datanode,
size_t bytes_read) {
/* determine if DN gets marked bad */
if (ShouldExclude(status)) {
bytes_read_ += bytes_read;
handler(status, bytes_read);
AsyncPreadSome(offset, boost::asio::buffer(buf, buf_size), bad_node_tracker_, callback);
Status FileHandleImpl::PositionRead(void *buf, size_t buf_size, off_t offset, size_t *bytes_read) {
LOG_DEBUG(kFileHandle, << "FileHandleImpl::[sync]PositionRead("
<< FMT_THIS_ADDR << ", buf=" << buf
<< ", buf_size=" << std::to_string(buf_size)
<< ", offset=" << offset << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status, size_t>>>();
std::future<std::tuple<Status, size_t>> future(callstate->get_future());
/* wrap async call with promise/future to make it blocking */
auto callback = [callstate](const Status &s, size_t bytes) {
PositionRead(buf, buf_size, offset, callback);
/* wait for async to finish */
auto returnstate = future.get();
auto stat = std::get<0>(returnstate);
if (!stat.ok()) {
return stat;
*bytes_read = std::get<1>(returnstate);
return stat;
Status FileHandleImpl::Read(void *buf, size_t buf_size, size_t *bytes_read) {
LOG_DEBUG(kFileHandle, << "FileHandleImpl::Read("
<< FMT_THIS_ADDR << ", buf=" << buf
<< ", buf_size=" << std::to_string(buf_size) << ") called");
Status stat = PositionRead(buf, buf_size, offset_, bytes_read);
if(!stat.ok()) {
return stat;
offset_ += *bytes_read;
return Status::OK();
Status FileHandleImpl::Seek(off_t *offset, std::ios_base::seekdir whence) {
LOG_DEBUG(kFileHandle, << "FileHandleImpl::Seek("
<< ", offset=" << *offset << ", ...) called");
if(cancel_state_->is_canceled()) {
return Status::Canceled();
off_t new_offset = -1;
switch (whence) {
case std::ios_base::beg:
new_offset = *offset;
case std::ios_base::cur:
new_offset = offset_ + *offset;
case std::ios_base::end:
new_offset = file_info_->file_length_ + *offset;
/* unsupported */
return Status::InvalidArgument("Invalid Seek whence argument");
if(!CheckSeekBounds(new_offset)) {
return Status::InvalidArgument("Seek offset out of bounds");
offset_ = new_offset;
*offset = offset_;
return Status::OK();
/* return false if seek will be out of bounds */
bool FileHandleImpl::CheckSeekBounds(ssize_t desired_position) {
ssize_t file_length = file_info_->file_length_;
if (desired_position < 0 || desired_position > file_length) {
return false;
return true;
* Note that this method must be thread-safe w.r.t. the unsafe operations occurring
* on the FileHandle
void FileHandleImpl::AsyncPreadSome(
size_t offset, const MutableBuffer &buffer,
std::shared_ptr<NodeExclusionRule> excluded_nodes,
const std::function<void(const Status &, const std::string &, size_t)> handler) {
using ::hadoop::hdfs::DatanodeInfoProto;
using ::hadoop::hdfs::LocatedBlockProto;
LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome("
<< FMT_THIS_ADDR << ", ...) called");
if(cancel_state_->is_canceled()) {
handler(Status::Canceled(), "", 0);
if(offset == file_info_->file_length_) {
handler(Status::OK(), "", 0);
} else if(offset > file_info_->file_length_){
handler(Status::InvalidOffset("AsyncPreadSome: trying to begin a read past the EOF"), "", 0);
if (client_name_ == nullptr) {
handler(Status::Error("AsyncPreadSome: Unable to generate random client name"), "", 0);
* Note: block and chosen_dn will end up pointing to things inside
* the blocks_ vector. They shouldn't be directly deleted.
auto block = std::find_if(
file_info_->blocks_.begin(), file_info_->blocks_.end(), [offset](const LocatedBlockProto &p) {
return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
if (block == file_info_->blocks_.end()) {
LOG_WARN(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" << FMT_THIS_ADDR
<< ", ...) Cannot find corresponding blocks");
handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0);
* If user supplies a rule use it, otherwise use the tracker.
* User is responsible for making sure one of them isn't null.
std::shared_ptr<NodeExclusionRule> rule =
excluded_nodes != nullptr ? excluded_nodes : bad_node_tracker_;
auto datanodes = block->locs();
auto it = std::find_if(datanodes.begin(), datanodes.end(),
[rule](const DatanodeInfoProto &dn) {
return !rule->IsBadNode(;
if (it == datanodes.end()) {
LOG_WARN(kFileHandle, << "FileHandleImpl::AsyncPreadSome("
<< FMT_THIS_ADDR << ", ...) No datanodes available");
handler(Status::ResourceUnavailable("No datanodes available"), "", 0);
DatanodeInfoProto &chosen_dn = *it;
std::string dnIpAddr =;
std::string dnHostName =;
uint64_t offset_within_block = offset - block->offset();
uint64_t size_within_block = std::min<uint64_t>(
block->b().numbytes() - offset_within_block, boost::asio::buffer_size(buffer));
LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome("
<< FMT_THIS_ADDR << "), ...) Datanode hostname=" << dnHostName << ", IP Address=" << dnIpAddr
<< ", file path=\"" << path_ << "\", offset=" << std::to_string(offset) << ", read size=" << size_within_block);
// This is where we will put the logic for re-using a DN connection; we can
// steal the FileHandle's dn and put it back when we're done
std::shared_ptr<DataNodeConnection> dn = CreateDataNodeConnection(io_service_, chosen_dn, &block->blocktoken());
std::string dn_id = dn->uuid_;
std::string client_name = *client_name_;
// Wrap the DN in a block reader to handle the state and logic of the
// block request protocol
std::shared_ptr<BlockReader> reader;
reader = CreateBlockReader(BlockReaderOptions(), dn, event_handlers_);
// Lambdas cannot capture copies of member variables so we'll make explicit
// copies for it
auto event_handlers = event_handlers_;
auto path = path_;
auto cluster_name = cluster_name_;
auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const Status & status, size_t transferred) {
event_response event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred);
if (event_resp.response_type() == event_response::kTest_Error) {
handler(event_resp.status(), dn_id, transferred);
handler(status, dn_id, transferred);
auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffer, reader, dn_id, client_name]
(Status status, std::shared_ptr<DataNodeConnection> dn) {
event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0);
if (event_resp.response_type() == event_response::kTest_Error) {
status = event_resp.status();
if (status.ok()) {
client_name, *block, offset_within_block,
boost::asio::buffer(buffer, size_within_block), read_handler);
} else {
handler(status, dn_id, 0);
std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options,
std::shared_ptr<DataNodeConnection> dn,
std::shared_ptr<LibhdfsEvents> event_handlers)
std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options, dn, cancel_state_, event_handlers);
LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << FMT_THIS_ADDR
<< ", ..., dnconn=" << dn.get()
<< ") called. New BlockReader = " << reader.get());
return reader;
std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection(
std::shared_ptr<IoService> io_service,
const ::hadoop::hdfs::DatanodeInfoProto & dn,
const hadoop::common::TokenProto * token) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateDataNodeConnection("
<< FMT_THIS_ADDR << ", ...) called");
return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token, event_handlers_.get());
std::shared_ptr<LibhdfsEvents> FileHandleImpl::get_event_handlers() {
return event_handlers_;
void FileHandleImpl::CancelOperations() {
LOG_INFO(kFileHandle, << "FileHandleImpl::CancelOperations("
<< FMT_THIS_ADDR << ") called");
/* Push update to BlockReaders that may be hung in an asio call */
std::vector<std::shared_ptr<BlockReader>> live_readers = readers_.GetLiveReaders();
for(auto reader : live_readers) {
void FileHandleImpl::SetFileEventCallback(file_event_callback callback) {
std::shared_ptr<LibhdfsEvents> new_event_handlers;
if (event_handlers_) {
new_event_handlers = std::make_shared<LibhdfsEvents>(*event_handlers_);
} else {
new_event_handlers = std::make_shared<LibhdfsEvents>();
event_handlers_ = new_event_handlers;
bool FileHandle::ShouldExclude(const Status &s) {
if (s.ok()) {
return false;
switch (s.code()) {
/* client side resource exhaustion */
case Status::kResourceUnavailable:
case Status::kOperationCanceled:
return false;
case Status::kInvalidArgument:
case Status::kUnimplemented:
case Status::kException:
return true;
uint64_t FileHandleImpl::get_bytes_read() { return bytes_read_.load(); }
void FileHandleImpl::clear_bytes_read() {; }