blob: a7fbf7a20ca4bed311c00491c45c86e8036a7738 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/scanner-context.inline.h"
#include <gutil/strings/substitute.h>
#include "exec/hdfs-scan-node-base.h"
#include "exec/hdfs-scan-node.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/io/request-context.h"
#include "runtime/exec-env.h"
#include "runtime/mem-pool.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/string-buffer.h"
#include "util/debug-util.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
using namespace impala;
using namespace impala::io;
using namespace strings;
static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
const int64_t ScannerContext::Stream::OUTPUT_BUFFER_BYTES_LEFT_INIT;
static const Status& CONTEXT_CANCELLED = Status::CancelledInternal("ScannerContext");
ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
BufferPool::ClientHandle* bp_client, int64_t total_reservation,
HdfsPartitionDescriptor* partition_desc, const vector<FilterContext>& filter_ctxs,
MemPool* expr_results_pool)
: state_(state),
scan_node_(scan_node),
bp_client_(bp_client),
total_reservation_(total_reservation),
partition_desc_(partition_desc),
filter_ctxs_(filter_ctxs),
expr_results_pool_(expr_results_pool) {}
ScannerContext::~ScannerContext() {
DCHECK(streams_.empty());
}
void ScannerContext::TryIncreaseReservation(int64_t ideal_reservation) {
total_reservation_ = scan_node_->IncreaseReservationIncrementally(
total_reservation_, ideal_reservation);
}
void ScannerContext::ReleaseCompletedResources(bool done) {
for (int i = 0; i < streams_.size(); ++i) {
streams_[i]->ReleaseCompletedResources(done);
}
}
void ScannerContext::ClearStreams() {
streams_.clear();
}
ScannerContext::Stream::Stream(ScannerContext* parent, ScanRange* scan_range,
int64_t reservation, const HdfsFileDesc* file_desc)
: parent_(parent),
scan_range_(scan_range),
file_desc_(file_desc),
reservation_(reservation),
file_len_(file_desc->file_length),
next_read_past_size_bytes_(INIT_READ_PAST_SIZE_BYTES),
boundary_pool_(new MemPool(parent->scan_node_->mem_tracker())),
boundary_buffer_(new StringBuffer(boundary_pool_.get())) {
}
ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range, int64_t reservation) {
streams_.emplace_back(new Stream(this, range, reservation,
scan_node_->GetFileDesc(partition_desc_->id(), range->file())));
return streams_.back().get();
}
void ScannerContext::Stream::ReleaseCompletedResources(bool done) {
if (done) {
// Cancel the underlying scan range to clean up any queued buffers there
scan_range_->Cancel(CONTEXT_CANCELLED);
boundary_pool_->FreeAll();
// Reset variables - the stream is no longer valid.
io_buffer_pos_ = nullptr;
io_buffer_bytes_left_ = 0;
boundary_buffer_pos_ = nullptr;
boundary_buffer_bytes_left_ = 0;
boundary_buffer_->Reset();
}
// Check if we're done with the current I/O buffer.
if (io_buffer_ != nullptr && io_buffer_bytes_left_ == 0) ReturnIoBuffer();
}
Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
DCHECK_EQ(0, io_buffer_bytes_left_);
DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
if (UNLIKELY(parent_->cancelled())) return CONTEXT_CANCELLED;
if (io_buffer_ != nullptr) ReturnIoBuffer();
// Nothing to do if we're at the end of the file - return leaving io_buffer_ == nullptr.
int64_t offset = file_offset() + boundary_buffer_bytes_left_;
int64_t file_bytes_remaining = file_desc()->file_length - offset;
if (file_bytes_remaining == 0) return Status::OK();
if (!scan_range_eosr_) {
// Get the next buffer from 'scan_range_'.
SCOPED_TIMER2(parent_->state_->total_storage_wait_timer(),
parent_->scan_node_->scanner_io_wait_time());
Status status = scan_range_->GetNext(&io_buffer_);
DCHECK(!status.ok() || io_buffer_ != nullptr);
RETURN_IF_ERROR(status);
scan_range_eosr_ = io_buffer_->eosr();
} else {
// Already got all buffers from 'scan_range_' - reading past end.
SCOPED_TIMER2(parent_->state_->total_storage_wait_timer(),
parent_->scan_node_->scanner_io_wait_time());
int64_t read_past_buffer_size = 0;
int64_t max_buffer_size = io_mgr->max_buffer_size();
if (!read_past_size_cb_.empty()) read_past_buffer_size = read_past_size_cb_(offset);
if (read_past_buffer_size <= 0) {
// Either no callback was set or the callback did not return an estimate. Use
// the default doubling strategy.
read_past_buffer_size = next_read_past_size_bytes_;
next_read_past_size_bytes_ =
min<int64_t>(next_read_past_size_bytes_ * 2, max_buffer_size);
}
read_past_buffer_size = ::max(read_past_buffer_size, read_past_size);
read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining);
read_past_buffer_size = ::min(read_past_buffer_size, max_buffer_size);
read_past_buffer_size = ::min(read_past_buffer_size, reservation_);
// We're reading past the scan range. Be careful not to read past the end of file.
DCHECK_GE(read_past_buffer_size, 0);
if (read_past_buffer_size == 0) {
io_buffer_bytes_left_ = 0;
return Status::OK();
}
int64_t partition_id = parent_->partition_descriptor()->id();
bool expected_local = false;
// Disable HDFS caching as we are reading past the end.
int cache_options = scan_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE;
ScanRange* range = parent_->scan_node_->AllocateScanRange(
scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
scan_range_->disk_id(), expected_local, scan_range_->is_erasure_coded(),
scan_range_->mtime(), BufferOpts(cache_options));
bool needs_buffers;
RETURN_IF_ERROR(
parent_->scan_node_->reader_context()->StartScanRange(range, &needs_buffers));
if (needs_buffers) {
// Allocate fresh buffers. The buffers for 'scan_range_' should be released now
// since we hit EOS.
if (reservation_ < io_mgr->min_buffer_size()) {
return Status(Substitute("Could not read past end of scan range in file '$0'. "
"Reservation provided $1 was < the minimum I/O buffer size",
reservation_, io_mgr->min_buffer_size()));
}
RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
parent_->bp_client_, range, reservation_));
}
RETURN_IF_ERROR(range->GetNext(&io_buffer_));
DCHECK(io_buffer_->eosr());
}
DCHECK(io_buffer_ != nullptr);
if (UNLIKELY(io_buffer_ == nullptr)) {
// This has bitten us before, so we defend against NULL in release builds here. It
// indicates an error in the IoMgr, which did not return a valid buffer.
// TODO(IMPALA-5914): Remove this check once we're confident we're not hitting it.
return Status(TErrorCode::INTERNAL_ERROR, Substitute("Internal error: "
"Failed to receive buffer from scan range for file $0 at offset $1",
filename(), offset));
}
io_buffer_pos_ = reinterpret_cast<uint8_t*>(io_buffer_->buffer());
io_buffer_bytes_left_ = io_buffer_->len();
if (io_buffer_->len() == 0) {
file_len_ = file_offset() + boundary_buffer_bytes_left_;
VLOG_FILE << "Unexpectedly read 0 bytes from file=" << filename() << " table="
<< parent_->scan_node_->hdfs_table()->name()
<< ". Setting expected file length=" << file_len_;
}
return Status::OK();
}
Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_t* len) {
*out_buffer = nullptr;
*len = 0;
if (eosr()) return Status::OK();
if (UNLIKELY(parent_->cancelled())) {
DCHECK(*out_buffer == nullptr);
return CONTEXT_CANCELLED;
}
if (boundary_buffer_bytes_left_ > 0) {
DCHECK(ValidateBufferPointers());
DCHECK_EQ(output_buffer_bytes_left_, &boundary_buffer_bytes_left_);
*out_buffer = boundary_buffer_pos_;
// Don't return more bytes past eosr
*len = min(boundary_buffer_bytes_left_, bytes_left());
DCHECK_GE(*len, 0);
if (!peek) {
AdvanceBufferPos(*len, &boundary_buffer_pos_, &boundary_buffer_bytes_left_);
total_bytes_returned_ += *len;
}
return Status::OK();
}
if (io_buffer_bytes_left_ == 0) {
// We're at the end of the boundary buffer and the current IO buffer. Get a new IO
// buffer and set the current buffer to it.
RETURN_IF_ERROR(GetNextBuffer());
// Check that we're not pointing to the IO buffer if there are bytes left in the
// boundary buffer.
DCHECK_EQ(boundary_buffer_bytes_left_, 0);
output_buffer_pos_ = &io_buffer_pos_;
output_buffer_bytes_left_ = &io_buffer_bytes_left_;
}
DCHECK(io_buffer_ != nullptr);
*out_buffer = io_buffer_pos_;
*len = io_buffer_bytes_left_;
if (!peek) {
AdvanceBufferPos(*len, &io_buffer_pos_, &io_buffer_bytes_left_);
total_bytes_returned_ += *len;
}
DCHECK_GE(bytes_left(), 0);
DCHECK(ValidateBufferPointers());
return Status::OK();
}
Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
uint8_t** out_buffer, bool peek, int64_t* out_len) {
DCHECK_GT(requested_len, boundary_buffer_bytes_left_);
DCHECK(output_buffer_bytes_left_ != &io_buffer_bytes_left_
|| requested_len > io_buffer_bytes_left_) << "All bytes in output buffer "
<< requested_len << " " << io_buffer_bytes_left_;
*out_buffer = nullptr;
if (boundary_buffer_bytes_left_ == 0) boundary_buffer_->Clear();
DCHECK(ValidateBufferPointers());
// First this loop ensures, by reading I/O buffers one-by-one, that we've got all of
// the requested bytes in 'boundary_buffer_', 'io_buffer_', or split between the two.
// We may not be able to get all of the bytes if we hit eof.
while (boundary_buffer_bytes_left_ + io_buffer_bytes_left_ < requested_len) {
if (io_buffer_bytes_left_ > 0) {
// Copy the remainder of 'io_buffer_' to 'boundary_buffer_' before getting the next
// 'io_buffer_'. Preallocate 'boundary_buffer_' to avoid unnecessary resizes for
// large reads.
RETURN_IF_ERROR(boundary_buffer_->GrowBuffer(requested_len));
RETURN_IF_ERROR(CopyIoToBoundary(io_buffer_bytes_left_));
}
int64_t remaining_requested_len = requested_len - boundary_buffer_bytes_left_;
RETURN_IF_ERROR(GetNextBuffer(remaining_requested_len));
if (UNLIKELY(parent_->cancelled())) return CONTEXT_CANCELLED;
// No more bytes (i.e. EOF).
if (io_buffer_bytes_left_ == 0) break;
}
// We have read the full 'requested_len' bytes or hit eof.
// We can assemble the contiguous bytes in two ways:
// 1. if the the read range falls entirely with an I/O buffer, we return a pointer into
// that I/O buffer.
// 2. if the read straddles I/O buffers, we append the data to 'boundary_buffer_'.
// 'boundary_buffer_' may already contain some of the data that we need if we did a
// "peek" earlier.
int64_t requested_bytes_left = requested_len - boundary_buffer_bytes_left_;
DCHECK_GE(requested_bytes_left, 0);
int64_t num_bytes_left_to_copy = min(io_buffer_bytes_left_, requested_bytes_left);
*out_len = boundary_buffer_bytes_left_ + num_bytes_left_to_copy;
DCHECK_LE(*out_len, requested_len);
if (boundary_buffer_bytes_left_ == 0) {
// Case 1: return a pointer into the I/O buffer.
output_buffer_pos_ = &io_buffer_pos_;
output_buffer_bytes_left_ = &io_buffer_bytes_left_;
} else {
// Case 2: return a pointer into the boundary buffer, after copying any required
// data from the I/O buffer.
DCHECK_EQ(output_buffer_pos_, &boundary_buffer_pos_);
DCHECK_EQ(output_buffer_bytes_left_, &boundary_buffer_bytes_left_);
if (io_buffer_bytes_left_ > 0) {
RETURN_IF_ERROR(CopyIoToBoundary(num_bytes_left_to_copy));
}
}
*out_buffer = *output_buffer_pos_;
if (!peek) {
total_bytes_returned_ += *out_len;
AdvanceBufferPos(*out_len, output_buffer_pos_, output_buffer_bytes_left_);
}
DCHECK(ValidateBufferPointers());
return Status::OK();
}
bool ScannerContext::Stream::SkipBytesInternal(
int64_t length, int64_t bytes_left, Status* status) {
DCHECK_GT(bytes_left, 0);
DCHECK_EQ(0, boundary_buffer_bytes_left_);
DCHECK_EQ(0, io_buffer_bytes_left_);
// Skip data in subsequent buffers by simply fetching them.
// TODO: consider adding support to skip ahead in a ScanRange so we can avoid doing
// actual I/O in some cases.
while (bytes_left > 0) {
*status = GetNextBuffer(bytes_left);
if (!status->ok()) return false;
if (io_buffer_ == nullptr) {
// Hit end of file before reading the requested bytes.
DCHECK_GT(bytes_left, 0);
*status = ReportIncompleteRead(length, length - bytes_left);
return false;
}
int64_t io_buffer_bytes_to_skip = std::min(bytes_left, io_buffer_bytes_left_);
AdvanceBufferPos(io_buffer_bytes_to_skip, &io_buffer_pos_, &io_buffer_bytes_left_);
// Check if we skipped all data in this I/O buffer.
if (io_buffer_bytes_left_ == 0) ReturnIoBuffer();
bytes_left -= io_buffer_bytes_to_skip;
total_bytes_returned_ += io_buffer_bytes_to_skip;
}
return true;
}
Status ScannerContext::Stream::CopyIoToBoundary(int64_t num_bytes) {
DCHECK(io_buffer_ != nullptr);
DCHECK_GT(io_buffer_bytes_left_, 0);
DCHECK_GE(io_buffer_bytes_left_, num_bytes);
RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, num_bytes));
boundary_buffer_bytes_left_ += num_bytes;
boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->buffer()) +
boundary_buffer_->len() - boundary_buffer_bytes_left_;
AdvanceBufferPos(num_bytes, &io_buffer_pos_, &io_buffer_bytes_left_);
// If all data from I/O buffer was returned or copied to boundary buffer, we don't need
// I/O buffer.
if (io_buffer_bytes_left_ == 0) ReturnIoBuffer();
output_buffer_pos_ = &boundary_buffer_pos_;
output_buffer_bytes_left_ = &boundary_buffer_bytes_left_;
return Status::OK();
}
void ScannerContext::Stream::ReturnIoBuffer() {
DCHECK(io_buffer_ != nullptr);
ScanRange* range = io_buffer_->scan_range();
range->ReturnBuffer(move(io_buffer_));
io_buffer_pos_ = nullptr;
io_buffer_bytes_left_ = 0;
}
bool ScannerContext::cancelled() const {
if (state_->is_cancelled()) return true;
if (!scan_node_->HasRowBatchQueue()) return false;
return static_cast<HdfsScanNode*>(scan_node_)->done();
}
bool ScannerContext::Stream::ValidateBufferPointers() const {
// If there are bytes left in the boundary buffer, the output buffer pointers must point
// to it.
return boundary_buffer_bytes_left_ == 0 ||
(output_buffer_pos_ == &boundary_buffer_pos_ &&
output_buffer_bytes_left_ == &boundary_buffer_bytes_left_);
}
Status ScannerContext::Stream::ReportIncompleteRead(int64_t length, int64_t bytes_read) {
return Status(TErrorCode::SCANNER_INCOMPLETE_READ, length, bytes_read,
filename(), file_offset());
}
Status ScannerContext::Stream::ReportInvalidRead(int64_t length) {
return Status(TErrorCode::SCANNER_INVALID_READ, length, filename(), file_offset());
}
Status ScannerContext::Stream::ReportInvalidInt() {
return Status(TErrorCode::SCANNER_INVALID_INT, filename(), file_offset());
}