blob: 3c6ff27be85d28bfe461dd9f3e0bb3c4935c44f3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/exec-env.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/io/disk-io-mgr-internal.h"
#include "runtime/io/hdfs-file-reader.h"
#include "runtime/io/local-file-reader.h"
#include "util/error-util.h"
#include "util/hdfs-util.h"
#include "common/names.h"
using namespace impala;
using namespace impala::io;
DECLARE_bool(cache_remote_file_handles);
DECLARE_bool(cache_s3_file_handles);
// Implementation of the ScanRange functionality. Each ScanRange contains a queue
// of ready buffers. For each ScanRange, there is only a single producer and
// consumer thread, i.e. only one disk thread will push to a scan range at
// any time and only one thread will remove from the queue. This is to guarantee
// that buffers are queued and read in file order.
bool ScanRange::EnqueueReadyBuffer(unique_ptr<BufferDescriptor> buffer) {
DCHECK(buffer->buffer_ != nullptr) << "Cannot enqueue freed buffer";
{
unique_lock<mutex> scan_range_lock(lock_);
DCHECK(Validate()) << DebugString();
DCHECK(!eosr_queued_);
if (!buffer->is_cached()) {
// All non-cached buffers are enqueued by disk threads. Indicate that the read
// finished.
DCHECK(read_in_flight_);
read_in_flight_ = false;
}
if (!cancel_status_.ok()) {
// This range has been cancelled, no need to enqueue the buffer.
CleanUpBuffer(scan_range_lock, move(buffer));
// One or more threads may be blocked in WaitForInFlightRead() waiting for the read
// to complete. Wake up all of them.
buffer_ready_cv_.NotifyAll();
return false;
}
// Clean up any surplus buffers. E.g. we may have allocated too many if the file was
// shorter than expected.
if (buffer->eosr()) CleanUpUnusedBuffers(scan_range_lock);
eosr_queued_ = buffer->eosr();
ready_buffers_.emplace_back(move(buffer));
}
buffer_ready_cv_.NotifyOne();
return true;
}
Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
DCHECK(*buffer == nullptr);
bool eosr;
{
unique_lock<mutex> scan_range_lock(lock_);
DCHECK(Validate()) << DebugString();
while (!all_buffers_returned(scan_range_lock) && ready_buffers_.empty()) {
buffer_ready_cv_.Wait(scan_range_lock);
}
// No more buffers to return - return the cancel status or OK if not cancelled.
if (all_buffers_returned(scan_range_lock)) {
// Wait until read finishes to ensure buffers are freed.
while (read_in_flight_) buffer_ready_cv_.Wait(scan_range_lock);
DCHECK_EQ(0, ready_buffers_.size());
return cancel_status_;
}
// Remove the first ready buffer from the queue and return it
DCHECK(!ready_buffers_.empty());
*buffer = move(ready_buffers_.front());
ready_buffers_.pop_front();
eosr = (*buffer)->eosr();
DCHECK(!eosr || unused_iomgr_buffers_.empty()) << DebugString();
}
// Update tracking counters. The buffer has now moved from the IoMgr to the caller.
if (eosr) reader_->RemoveActiveScanRange(this);
num_buffers_in_reader_.Add(1);
return Status::OK();
}
void ScanRange::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
vector<unique_ptr<BufferDescriptor>> buffers;
buffers.emplace_back(move(buffer_desc));
AddUnusedBuffers(move(buffers), true);
}
void ScanRange::AddUnusedBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers,
bool returned) {
DCHECK_GT(buffers.size(), 0);
/// Keep track of whether the range was unblocked in this function. If so, we need
/// to schedule it so it resumes progress.
bool unblocked = false;
{
unique_lock<mutex> scan_range_lock(lock_);
if (returned) {
// Buffers were in reader but now aren't.
num_buffers_in_reader_.Add(-buffers.size());
}
for (unique_ptr<BufferDescriptor>& buffer : buffers) {
// We should not hold onto the buffers in the following cases:
// 1. the scan range is using external buffers, e.g. cached buffers.
// 2. the scan range is cancelled
// 3. the scan range already hit eosr
// 4. we already have enough buffers to read the remainder of the scan range.
if (external_buffer_tag_ != ExternalBufferTag::NO_BUFFER
|| !cancel_status_.ok()
|| eosr_queued_
|| unused_iomgr_buffer_bytes_ >= len_ - iomgr_buffer_cumulative_bytes_used_) {
CleanUpBuffer(scan_range_lock, move(buffer));
} else {
unused_iomgr_buffer_bytes_ += buffer->buffer_len();
unused_iomgr_buffers_.emplace_back(move(buffer));
if (blocked_on_buffer_) {
blocked_on_buffer_ = false;
unblocked = true;
}
}
}
}
// Must drop the ScanRange lock before acquiring the RequestContext lock.
if (unblocked) {
unique_lock<mutex> reader_lock(reader_->lock_);
// Reader may have been cancelled after we dropped 'scan_range_lock' above.
if (reader_->state_ == RequestContext::Cancelled) {
DCHECK(!cancel_status_.ok());
} else {
reader_->ScheduleScanRange(reader_lock, this);
}
}
}
unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer(
const unique_lock<mutex>& scan_range_lock) {
DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock());
if (unused_iomgr_buffers_.empty()) return nullptr;
unique_ptr<BufferDescriptor> result = move(unused_iomgr_buffers_.back());
unused_iomgr_buffers_.pop_back();
unused_iomgr_buffer_bytes_ -= result->buffer_len();
return result;
}
ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
int64_t bytes_remaining = bytes_to_read_ - bytes_read_;
DCHECK_GT(bytes_remaining, 0);
unique_ptr<BufferDescriptor> buffer_desc;
{
unique_lock<mutex> lock(lock_);
DCHECK(!read_in_flight_);
if (!cancel_status_.ok()) return ReadOutcome::CANCELLED;
if (external_buffer_tag_ == ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(
this, client_buffer_.data, client_buffer_.len));
} else {
DCHECK(external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
<< "This code path does not handle other buffer types, i.e. HDFS cache. "
<< "external_buffer_tag_=" << static_cast<int>(external_buffer_tag_);
buffer_desc = GetUnusedBuffer(lock);
if (buffer_desc == nullptr) {
// No buffer available - the range will be rescheduled when a buffer is added.
blocked_on_buffer_ = true;
return ReadOutcome::BLOCKED_ON_BUFFER;
}
iomgr_buffer_cumulative_bytes_used_ += buffer_desc->buffer_len();
}
read_in_flight_ = true;
}
// No locks in this section. Only working on local vars. We don't want to hold a
// lock across the read call.
// To use the file handle cache:
// 1. It must be enabled at the daemon level.
// 2. The file cannot be erasure coded.
// 3. The file is a local HDFS file (expected_local_) OR it is a remote HDFS file and
// 'cache_remote_file_handles' is true
// Note: S3, ADLS, and ABFS file handles are not cached. Erasure coded HDFS files
// are also not cached (IMPALA-8178), due to excessive memory usage (see HDFS-14308).
bool use_file_handle_cache = false;
if (is_file_handle_caching_enabled() && !is_erasure_coded_ &&
(expected_local_ ||
(FLAGS_cache_remote_file_handles && disk_id_ == io_mgr_->RemoteDfsDiskId()) ||
(FLAGS_cache_s3_file_handles && disk_id_ == io_mgr_->RemoteS3DiskId()))) {
use_file_handle_cache = true;
}
Status read_status = file_reader_->Open(use_file_handle_cache);
bool eof = false;
if (read_status.ok()) {
COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L);
COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL << disk_id);
if (sub_ranges_.empty()) {
DCHECK(cache_.data == nullptr);
read_status = file_reader_->ReadFromPos(queue, offset_ + bytes_read_,
buffer_desc->buffer_,
min(bytes_to_read() - bytes_read_, buffer_desc->buffer_len_),
&buffer_desc->len_, &eof);
} else {
read_status = ReadSubRanges(queue, buffer_desc.get(), &eof);
}
COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, -1L);
}
DCHECK(buffer_desc->buffer_ != nullptr);
DCHECK(!buffer_desc->is_cached()) <<
"Pure HDFS cache reads don't go through this code path.";
if (!read_status.ok()) {
// Free buffer to release resources before we cancel the range so that all buffers
// are freed at cancellation.
buffer_desc->Free();
buffer_desc.reset();
// Propagate 'read_status' to the scan range. This will also wake up any waiting
// threads.
CancelInternal(read_status, true);
// At this point we cannot touch the state of this range because the client
// may notice cancellation, then reuse the scan range.
return ReadOutcome::CANCELLED;
}
bytes_read_ += buffer_desc->len();
DCHECK_LE(bytes_read_, bytes_to_read_);
// It is end of stream if it is end of file, or read all the bytes.
buffer_desc->eosr_ = eof || bytes_read_ == bytes_to_read_;
// After calling EnqueueReadyBuffer(), it is no longer valid to touch 'buffer_desc'.
// Store the state we need before calling EnqueueReadyBuffer().
bool eosr = buffer_desc->eosr();
// No more reads for this scan range - we can close it.
if (eosr) file_reader_->Close();
// Read successful - enqueue the buffer and return the appropriate outcome.
if (!EnqueueReadyBuffer(move(buffer_desc))) return ReadOutcome::CANCELLED;
// At this point, if eosr=true, then we cannot touch the state of this scan range
// because the client may notice eos, then reuse the scan range.
return eosr ? ReadOutcome::SUCCESS_EOSR : ReadOutcome::SUCCESS_NO_EOSR;
}
Status ScanRange::ReadSubRanges(
DiskQueue* queue, BufferDescriptor* buffer_desc, bool* eof) {
buffer_desc->len_ = 0;
while (buffer_desc->len() < buffer_desc->buffer_len()
&& sub_range_pos_.index < sub_ranges_.size()) {
SubRange& sub_range = sub_ranges_[sub_range_pos_.index];
int64_t offset = sub_range.offset + sub_range_pos_.bytes_read;
int64_t bytes_to_read = min(sub_range.length - sub_range_pos_.bytes_read,
buffer_desc->buffer_len() - buffer_desc->len());
if (cache_.data != nullptr) {
memcpy(buffer_desc->buffer_ + buffer_desc->len(),
cache_.data + offset, bytes_to_read);
} else {
int64_t current_bytes_read;
Status read_status = file_reader_->ReadFromPos(queue, offset,
buffer_desc->buffer_ + buffer_desc->len(), bytes_to_read, &current_bytes_read,
eof);
if (!read_status.ok()) return read_status;
if (current_bytes_read != bytes_to_read) {
DCHECK(*eof);
DCHECK_LT(current_bytes_read, bytes_to_read);
return Status(TErrorCode::SCANNER_INCOMPLETE_READ, bytes_to_read,
current_bytes_read, file(), offset);
}
}
buffer_desc->len_ += bytes_to_read;
sub_range_pos_.bytes_read += bytes_to_read;
if (sub_range_pos_.bytes_read == sub_range.length) {
sub_range_pos_.index += 1;
sub_range_pos_.bytes_read = 0;
}
}
return Status::OK();
}
void ScanRange::SetBlockedOnBuffer() {
unique_lock<mutex> lock(lock_);
blocked_on_buffer_ = true;
}
void ScanRange::CleanUpBuffer(
const boost::unique_lock<boost::mutex>& scan_range_lock,
unique_ptr<BufferDescriptor> buffer_desc) {
DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock());
DCHECK(buffer_desc != nullptr);
DCHECK_EQ(this, buffer_desc->scan_range_);
buffer_desc->Free();
if (all_buffers_returned(scan_range_lock) && num_buffers_in_reader_.Load() == 0) {
// Close the scan range if there are no more buffers in the reader and no more buffers
// will be returned to readers in future. Close() is idempotent so it is ok to call
// multiple times during cleanup so long as the range is actually finished.
file_reader_->Close();
}
}
void ScanRange::CleanUpBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers) {
unique_lock<mutex> lock(lock_);
for (unique_ptr<BufferDescriptor>& buffer : buffers) CleanUpBuffer(lock, move(buffer));
}
void ScanRange::CleanUpUnusedBuffers(const unique_lock<mutex>& scan_range_lock) {
while (!unused_iomgr_buffers_.empty()) {
CleanUpBuffer(scan_range_lock, GetUnusedBuffer(scan_range_lock));
}
}
void ScanRange::Cancel(const Status& status) {
// Cancelling a range that was never started, ignore.
if (io_mgr_ == nullptr) return;
CancelInternal(status, false);
// Wait until an in-flight read is finished. The read thread will clean up the
// buffer it used. Once the range is cancelled, no more reads should be started.
WaitForInFlightRead();
reader_->RemoveActiveScanRange(this);
}
void ScanRange::CancelInternal(const Status& status, bool read_error) {
DCHECK(io_mgr_ != nullptr);
DCHECK(!status.ok());
{
// Grab both locks to make sure that we don't change 'cancel_status_' while other
// threads are in critical sections.
unique_lock<mutex> scan_range_lock(lock_);
{
unique_lock<SpinLock> fs_lock(file_reader_->lock());
DCHECK(Validate()) << DebugString();
// If already cancelled, preserve the original reason for cancellation. Most of the
// cleanup is not required if already cancelled, but we need to set
// 'read_in_flight_' to false.
if (cancel_status_.ok()) cancel_status_ = status;
}
/// Clean up 'ready_buffers_' while still holding 'lock_' to prevent other threads
/// from seeing inconsistent state.
while (!ready_buffers_.empty()) {
CleanUpBuffer(scan_range_lock, move(ready_buffers_.front()));
ready_buffers_.pop_front();
}
/// Clean up buffers that we don't need any more because we won't read any more data.
CleanUpUnusedBuffers(scan_range_lock);
if (read_error) {
DCHECK(read_in_flight_);
read_in_flight_ = false;
}
}
buffer_ready_cv_.NotifyAll();
// For cached buffers, we can't close the range until the cached buffer is returned.
// Close() is called from ScanRange::CleanUpBufferLocked().
// TODO: IMPALA-4249 - this Close() call makes it unsafe to reuse a cancelled scan
// range, because there is no synchronisation between this Close() call and the
// client adding the ScanRange back into the IoMgr.
if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) file_reader_->Close();
}
void ScanRange::WaitForInFlightRead() {
unique_lock<mutex> scan_range_lock(lock_);
while (read_in_flight_) buffer_ready_cv_.Wait(scan_range_lock);
}
string ScanRange::DebugString() const {
stringstream ss;
ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_;
if (file_reader_) ss << " " << file_reader_->DebugString();
ss << " cancel_status=" << cancel_status_.GetDetail()
<< " buffer_queue=" << ready_buffers_.size()
<< " num_buffers_in_readers=" << num_buffers_in_reader_.Load()
<< " unused_iomgr_buffers=" << unused_iomgr_buffers_.size()
<< " unused_iomgr_buffer_bytes=" << unused_iomgr_buffer_bytes_
<< " blocked_on_buffer=" << blocked_on_buffer_;
return ss.str();
}
bool ScanRange::Validate() {
if (bytes_read_ > bytes_to_read_) {
LOG(ERROR) << "Bytes read tracking is wrong. Too many bytes have been read."
<< " bytes_read_=" << bytes_read_
<< " bytes_to_read_=" << bytes_to_read_;
return false;
}
if (!cancel_status_.ok() && !ready_buffers_.empty()) {
LOG(ERROR) << "Cancelled range should not have queued buffers " << DebugString();
return false;
}
int64_t unused_iomgr_buffer_bytes = 0;
for (auto& buffer : unused_iomgr_buffers_)
unused_iomgr_buffer_bytes += buffer->buffer_len();
if (unused_iomgr_buffer_bytes != unused_iomgr_buffer_bytes_) {
LOG(ERROR) << "unused_iomgr_buffer_bytes_ incorrect actual: "
<< unused_iomgr_buffer_bytes_
<< " vs. expected: " << unused_iomgr_buffer_bytes;
return false;
}
bool is_finished = !cancel_status_.ok() || eosr_queued_;
if (is_finished && !unused_iomgr_buffers_.empty()) {
LOG(ERROR) << "Held onto too many buffers " << unused_iomgr_buffers_.size()
<< " bytes: " << unused_iomgr_buffer_bytes_
<< " cancel_status: " << cancel_status_.GetDetail()
<< " eosr_queued: " << eosr_queued_;
return false;
}
if (!is_finished && blocked_on_buffer_ && !unused_iomgr_buffers_.empty()) {
LOG(ERROR) << "Blocked despite having buffers: " << DebugString();
return false;
}
return true;
}
ScanRange::ScanRange()
: RequestRange(RequestType::READ),
external_buffer_tag_(ExternalBufferTag::NO_BUFFER) {}
ScanRange::~ScanRange() {
DCHECK(!read_in_flight_);
DCHECK_EQ(0, ready_buffers_.size());
DCHECK_EQ(0, num_buffers_in_reader_.Load());
}
void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
const BufferOpts& buffer_opts, void* meta_data) {
Reset(fs, file, len, offset, disk_id, expected_local, is_erasure_coded, mtime,
buffer_opts, {}, meta_data);
}
void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
const BufferOpts& buffer_opts, vector<SubRange>&& sub_ranges, void* meta_data) {
DCHECK(ready_buffers_.empty());
DCHECK(!read_in_flight_);
DCHECK(file != nullptr);
DCHECK_GE(len, 0);
DCHECK_GE(offset, 0);
DCHECK(buffer_opts.client_buffer_ == nullptr ||
buffer_opts.client_buffer_len_ >= len_);
fs_ = fs;
if (fs_) {
file_reader_ = make_unique<HdfsFileReader>(this, fs_, expected_local);
} else {
file_reader_ = make_unique<LocalFileReader>(this);
}
file_ = file;
len_ = len;
bytes_to_read_ = len;
offset_ = offset;
disk_id_ = disk_id;
cache_options_ = buffer_opts.cache_options_;
// HDFS ranges must have an mtime > 0. Local ranges do not use mtime.
if (fs_) DCHECK_GT(mtime, 0);
mtime_ = mtime;
meta_data_ = meta_data;
if (buffer_opts.client_buffer_ != nullptr) {
external_buffer_tag_ = ExternalBufferTag::CLIENT_BUFFER;
client_buffer_.data = buffer_opts.client_buffer_;
client_buffer_.len = buffer_opts.client_buffer_len_;
} else {
external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
}
// Erasure coded should not be considered local (see IMPALA-7019).
DCHECK(!(expected_local && is_erasure_coded));
expected_local_ = expected_local;
is_erasure_coded_ = is_erasure_coded;
io_mgr_ = nullptr;
reader_ = nullptr;
sub_ranges_.clear();
sub_range_pos_ = {};
InitSubRanges(move(sub_ranges));
}
void ScanRange::InitSubRanges(vector<SubRange>&& sub_ranges) {
sub_ranges_ = std::move(sub_ranges);
DCHECK(ValidateSubRanges());
MergeSubRanges();
DCHECK(ValidateSubRanges());
sub_range_pos_ = {};
if (sub_ranges_.empty()) return;
int length_sum = 0;
for (auto& sub_range : sub_ranges_) {
length_sum += sub_range.length;
}
bytes_to_read_ = length_sum;
}
bool ScanRange::ValidateSubRanges() {
for (int i = 0; i < sub_ranges_.size(); ++i) {
SubRange& sub_range = sub_ranges_[i];
if (sub_range.length <= 0) return false;
if (sub_range.offset < offset_) return false;
if (sub_range.offset + sub_range.length > offset_ + len_) return false;
if (i == sub_ranges_.size() - 1) break;
SubRange& next_sub_range = sub_ranges_[i+1];
if (sub_range.offset + sub_range.length > next_sub_range.offset) return false;
}
return true;
}
void ScanRange::MergeSubRanges() {
if (sub_ranges_.empty()) return;
for (int i = 0; i < sub_ranges_.size() - 1; ++i) {
SubRange& current = sub_ranges_[i];
int j = i + 1;
for (; j < sub_ranges_.size(); ++j) {
SubRange& sr_j = sub_ranges_[j];
if (sr_j.offset == current.offset + current.length) {
current.length += sr_j.length;
} else {
break;
}
}
if (j > i + 1) {
sub_ranges_.erase(sub_ranges_.begin() + i + 1, sub_ranges_.begin() + j);
}
}
}
void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
DCHECK(!read_in_flight_);
io_mgr_ = io_mgr;
reader_ = reader;
unused_iomgr_buffer_bytes_ = 0;
iomgr_buffer_cumulative_bytes_used_ = 0;
cancel_status_ = Status::OK();
eosr_queued_ = false;
blocked_on_buffer_ = false;
bytes_read_ = 0;
sub_range_pos_ = {};
file_reader_->ResetState();
DCHECK(Validate()) << DebugString();
}
void ScanRange::SetFileReader(unique_ptr<FileReader> file_reader) {
file_reader_ = move(file_reader);
}
Status ScanRange::ReadFromCache(
const unique_lock<mutex>& reader_lock, bool* read_succeeded) {
DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
DCHECK(UseHdfsCache());
DCHECK_EQ(bytes_read_, 0);
*read_succeeded = false;
Status status = file_reader_->Open(false);
if (!status.ok()) return status;
// Check cancel status.
{
unique_lock<mutex> lock(lock_);
RETURN_IF_ERROR(cancel_status_);
}
file_reader_->CachedFile(&cache_.data, &cache_.len);
// Data was not cached, caller will fall back to normal read path.
if (cache_.data == nullptr) {
VLOG_QUERY << "Cache read failed for scan range: " << DebugString()
<< ". Switching to disk read path.";
// Clean up the scan range state before re-issuing it.
file_reader_->Close();
return Status::OK();
}
// A partial read can happen when files are truncated.
// TODO: If HDFS ever supports partially cached blocks, we'll have to distinguish
// between errors and partially cached blocks here.
if (cache_.len < len()) {
VLOG_QUERY << "Error reading file from HDFS cache: " << file_ << ". Expected "
<< len() << " bytes, but read " << cache_.len << ". Switching to disk read path.";
// Close the scan range. 'read_succeeded' is still false, so the caller will fall back
// to non-cached read of this scan range.
file_reader_->Close();
return Status::OK();
}
*read_succeeded = true;
// If there are sub-ranges, then we need to memcpy() them from the cached buffer.
if (HasSubRanges()) return Status::OK();
DCHECK(external_buffer_tag_ != ExternalBufferTag::CLIENT_BUFFER);
external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
bytes_read_ = cache_.len;
// Create a single buffer desc for the entire scan range and enqueue that.
// The memory is owned by the HDFS java client, not the Impala backend.
unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(
this, cache_.data, 0));
desc->len_ = cache_.len;
desc->eosr_ = true;
EnqueueReadyBuffer(move(desc));
COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, cache_.len);
return Status::OK();
}
BufferDescriptor::BufferDescriptor(ScanRange* scan_range,
uint8_t* buffer, int64_t buffer_len)
: scan_range_(scan_range),
buffer_(buffer),
buffer_len_(buffer_len) {
DCHECK(scan_range != nullptr);
DCHECK(buffer != nullptr);
DCHECK_GE(buffer_len, 0);
}
BufferDescriptor::BufferDescriptor(ScanRange* scan_range,
BufferPool::ClientHandle* bp_client, BufferPool::BufferHandle handle) :
scan_range_(scan_range),
buffer_(handle.data()),
buffer_len_(handle.len()),
bp_client_(bp_client),
handle_(move(handle)) {
DCHECK(scan_range != nullptr);
DCHECK(bp_client_->is_registered());
DCHECK(handle_.is_open());
}
void BufferDescriptor::Free() {
DCHECK(buffer_ != nullptr);
if (!is_cached() && !is_client_buffer()) {
// Only buffers that were allocated by DiskIoMgr need to be freed.
ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
bp_client_, &handle_);
}
buffer_ = nullptr;
}