blob: b1577a14ca2b364b02d11f79e7192645248dc107 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/scanner-context.h"
#include <gutil/strings/substitute.h>
#include "exec/hdfs-scan-node-base.h"
#include "exec/hdfs-scan-node.h"
#include "runtime/exec-env.h"
#include "runtime/mem-pool.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/string-buffer.h"
#include "util/debug-util.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
using namespace impala;
using namespace strings;
static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
// We always want output_buffer_bytes_left_ to be non-NULL, so we can avoid a NULL check
// in GetBytes(). We use this variable, which is set to 0, to initialize
// output_buffer_bytes_left_. After the first successful call to GetBytes(),
// output_buffer_bytes_left_ will be set to something else.
static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
HdfsPartitionDescriptor* partition_desc, DiskIoMgr::ScanRange* scan_range,
const vector<FilterContext>& filter_ctxs)
: state_(state),
scan_node_(scan_node),
partition_desc_(partition_desc),
num_completed_io_buffers_(0),
filter_ctxs_(filter_ctxs) {
AddStream(scan_range);
}
ScannerContext::~ScannerContext() {
DCHECK(streams_.empty());
}
void ScannerContext::ReleaseCompletedResources(RowBatch* batch, bool done) {
for (int i = 0; i < streams_.size(); ++i) {
streams_[i]->ReleaseCompletedResources(batch, done);
}
}
void ScannerContext::ClearStreams() {
streams_.clear();
}
ScannerContext::Stream::Stream(ScannerContext* parent)
: parent_(parent),
next_read_past_size_bytes_(INIT_READ_PAST_SIZE_BYTES),
boundary_pool_(new MemPool(parent->scan_node_->mem_tracker())),
boundary_buffer_(new StringBuffer(boundary_pool_.get())) {
}
ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) {
std::unique_ptr<Stream> stream(new Stream(this));
stream->scan_range_ = range;
stream->file_desc_ = scan_node_->GetFileDesc(partition_desc_->id(), stream->filename());
stream->file_len_ = stream->file_desc_->file_length;
stream->total_bytes_returned_ = 0;
stream->io_buffer_pos_ = NULL;
stream->io_buffer_bytes_left_ = 0;
stream->boundary_buffer_bytes_left_ = 0;
stream->output_buffer_pos_ = NULL;
stream->output_buffer_bytes_left_ =
const_cast<int64_t*>(&OUTPUT_BUFFER_BYTES_LEFT_INIT);
stream->contains_tuple_data_ = scan_node_->tuple_desc()->ContainsStringData();
streams_.push_back(std::move(stream));
return streams_.back().get();
}
void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool done) {
DCHECK(batch != nullptr || done || !contains_tuple_data_);
if (done) {
// Mark any pending resources as completed
if (io_buffer_ != nullptr) {
++parent_->num_completed_io_buffers_;
completed_io_buffers_.push_back(move(io_buffer_));
}
// Set variables to nullptr to make sure streams are not used again
io_buffer_pos_ = nullptr;
io_buffer_bytes_left_ = 0;
// Cancel the underlying scan range to clean up any queued buffers there
scan_range_->Cancel(Status::CANCELLED);
}
for (unique_ptr<DiskIoMgr::BufferDescriptor>& buffer : completed_io_buffers_) {
if (contains_tuple_data_ && batch != nullptr) {
batch->AddIoBuffer(move(buffer));
// TODO: We can do row batch compaction here. This is the only place io buffers are
// queued. A good heuristic is to check the number of io buffers queued and if
// there are too many, we should compact.
} else {
ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(buffer));
parent_->scan_node_->num_owned_io_buffers_.Add(-1);
}
}
parent_->num_completed_io_buffers_ -= completed_io_buffers_.size();
completed_io_buffers_.clear();
if (contains_tuple_data_ && batch != nullptr) {
// If we're not done, keep using the last chunk allocated in boundary_pool_ so we
// don't have to reallocate. If we are done, transfer it to the row batch.
batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), /* keep_current */ !done);
}
if (done) boundary_pool_->FreeAll();
}
Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
// Nothing to do if we've already processed all data in the file
int64_t offset = file_offset() + boundary_buffer_bytes_left_;
int64_t file_bytes_remaining = file_desc()->file_length - offset;
if (io_buffer_ == NULL && file_bytes_remaining == 0) return Status::OK();
// Otherwise, io_buffer_ should only be null the first time this is called
DCHECK(io_buffer_ != NULL ||
(total_bytes_returned_ == 0 && completed_io_buffers_.empty()));
// We can't use the eosr() function because it reflects how many bytes have been
// returned, not if we're fetched all the buffers in the scan range
bool eosr = false;
if (io_buffer_ != NULL) {
eosr = io_buffer_->eosr();
++parent_->num_completed_io_buffers_;
completed_io_buffers_.push_back(move(io_buffer_));
}
if (!eosr) {
SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
RETURN_IF_ERROR(scan_range_->GetNext(&io_buffer_));
} else {
SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
int64_t read_past_buffer_size = 0;
int64_t max_buffer_size = parent_->state_->io_mgr()->max_read_buffer_size();
if (!read_past_size_cb_.empty()) read_past_buffer_size = read_past_size_cb_(offset);
if (read_past_buffer_size <= 0) {
// Either no callback was set or the callback did not return an estimate. Use
// the default doubling strategy.
read_past_buffer_size = next_read_past_size_bytes_;
next_read_past_size_bytes_ =
min<int64_t>(next_read_past_size_bytes_ * 2, max_buffer_size);
}
read_past_buffer_size = ::max(read_past_buffer_size, read_past_size);
read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining);
read_past_buffer_size = ::min(read_past_buffer_size, max_buffer_size);
// We're reading past the scan range. Be careful not to read past the end of file.
DCHECK_GE(read_past_buffer_size, 0);
if (read_past_buffer_size == 0) {
io_buffer_bytes_left_ = 0;
// TODO: We are leaving io_buffer_ = NULL, revisit.
return Status::OK();
}
int64_t partition_id = parent_->partition_descriptor()->id();
DiskIoMgr::ScanRange* range = parent_->scan_node_->AllocateScanRange(
scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
scan_range_->disk_id(), false, DiskIoMgr::BufferOpts::Uncached());
RETURN_IF_ERROR(parent_->state_->io_mgr()->Read(
parent_->scan_node_->reader_context(), range, &io_buffer_));
}
DCHECK(io_buffer_ != NULL);
parent_->scan_node_->num_owned_io_buffers_.Add(1);
io_buffer_pos_ = reinterpret_cast<uint8_t*>(io_buffer_->buffer());
io_buffer_bytes_left_ = io_buffer_->len();
if (io_buffer_->len() == 0) {
file_len_ = file_offset() + boundary_buffer_bytes_left_;
VLOG_FILE << "Unexpectedly read 0 bytes from file=" << filename() << " table="
<< parent_->scan_node_->hdfs_table()->name()
<< ". Setting expected file length=" << file_len_;
}
return Status::OK();
}
Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_t* len) {
*out_buffer = NULL;
*len = 0;
if (eosr()) return Status::OK();
if (UNLIKELY(parent_->cancelled())) {
DCHECK(*out_buffer == NULL);
return Status::CANCELLED;
}
if (boundary_buffer_bytes_left_ > 0) {
DCHECK_EQ(output_buffer_pos_, &boundary_buffer_pos_);
DCHECK_EQ(output_buffer_bytes_left_, &boundary_buffer_bytes_left_);
*out_buffer = boundary_buffer_pos_;
// Don't return more bytes past eosr
*len = min(boundary_buffer_bytes_left_, bytes_left());
DCHECK_GE(*len, 0);
if (!peek) {
boundary_buffer_pos_ += *len;
boundary_buffer_bytes_left_ -= *len;
total_bytes_returned_ += *len;
}
return Status::OK();
}
if (io_buffer_bytes_left_ == 0) {
// We're at the end of the boundary buffer and the current IO buffer. Get a new IO
// buffer and set the current buffer to it.
RETURN_IF_ERROR(GetNextBuffer());
output_buffer_pos_ = &io_buffer_pos_;
output_buffer_bytes_left_ = &io_buffer_bytes_left_;
}
DCHECK(io_buffer_ != NULL);
*out_buffer = io_buffer_pos_;
*len = io_buffer_bytes_left_;
if (!peek) {
io_buffer_bytes_left_ = 0;
io_buffer_pos_ += *len;
total_bytes_returned_ += *len;
}
DCHECK_GE(bytes_left(), 0);
return Status::OK();
}
Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
uint8_t** out_buffer, bool peek, int64_t* out_len) {
DCHECK_GT(requested_len, boundary_buffer_bytes_left_);
*out_buffer = NULL;
if (boundary_buffer_bytes_left_ == 0) {
if (contains_tuple_data_) {
boundary_buffer_->Reset();
} else {
boundary_buffer_->Clear();
}
}
while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) {
// We must copy the remainder of 'io_buffer_' to 'boundary_buffer_' before advancing
// to handle the case when the read straddles a block boundary. Preallocate
// 'boundary_buffer_' to avoid unnecessary resizes for large reads.
if (io_buffer_bytes_left_ > 0) {
RETURN_IF_ERROR(boundary_buffer_->GrowBuffer(requested_len));
RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, io_buffer_bytes_left_));
boundary_buffer_bytes_left_ += io_buffer_bytes_left_;
}
int64_t remaining_requested_len = requested_len - boundary_buffer_->len();
RETURN_IF_ERROR(GetNextBuffer(remaining_requested_len));
if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
// No more bytes (i.e. EOF).
if (io_buffer_bytes_left_ == 0) break;
}
// We have read the full 'requested_len' bytes or couldn't read more bytes.
int64_t requested_bytes_left = requested_len - boundary_buffer_bytes_left_;
DCHECK_GE(requested_bytes_left, 0);
int64_t num_bytes = min(io_buffer_bytes_left_, requested_bytes_left);
*out_len = boundary_buffer_bytes_left_ + num_bytes;
DCHECK_LE(*out_len, requested_len);
if (boundary_buffer_bytes_left_ == 0) {
// No stitching, just return the memory
output_buffer_pos_ = &io_buffer_pos_;
output_buffer_bytes_left_ = &io_buffer_bytes_left_;
} else {
RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, num_bytes));
boundary_buffer_bytes_left_ += num_bytes;
boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->buffer()) +
boundary_buffer_->len() - boundary_buffer_bytes_left_;
io_buffer_bytes_left_ -= num_bytes;
io_buffer_pos_ += num_bytes;
output_buffer_pos_ = &boundary_buffer_pos_;
output_buffer_bytes_left_ = &boundary_buffer_bytes_left_;
}
*out_buffer = *output_buffer_pos_;
if (!peek) {
total_bytes_returned_ += *out_len;
if (boundary_buffer_bytes_left_ == 0) {
io_buffer_bytes_left_ -= num_bytes;
io_buffer_pos_ += num_bytes;
} else {
DCHECK_EQ(boundary_buffer_bytes_left_, *out_len);
boundary_buffer_bytes_left_ = 0;
}
}
return Status::OK();
}
bool ScannerContext::cancelled() const {
if (!scan_node_->HasRowBatchQueue()) return false;
return static_cast<HdfsScanNode*>(scan_node_)->done();
}
Status ScannerContext::Stream::ReportIncompleteRead(int64_t length, int64_t bytes_read) {
return Status(TErrorCode::SCANNER_INCOMPLETE_READ, length, bytes_read,
filename(), file_offset());
}
Status ScannerContext::Stream::ReportInvalidRead(int64_t length) {
return Status(TErrorCode::SCANNER_INVALID_READ, length, filename(), file_offset());
}
Status ScannerContext::Stream::ReportInvalidInt() {
return Status(TErrorCode::SCANNER_INVALID_INT, filename(), file_offset());
}