IMPALA-8742: Switch to ScanRange::bytes_to_read() instead of len()
IMPALA-7543 introduced sub-ranges in scan ranges. These are smaller
parts of the scan ranges that actually need to be read, other parts
of the scan range can be skipped. Currently sub-ranges are only used
in the Parquet scanner during page filtering.
With sub-ranges the scan range has a new field 'bytes_to_read_', that
is the sum of the lengths of the sub-ranges. Or, if there are no
sub-ranges, 'bytes_to_read_' equals to field 'len_' which is the length
of the whole scan range.
At some parts of Impala ScanRange::len() is being used instead of
ScanRange::bytes_to_read(). It doesn't cause a bug because only the
Parquet scanner uses sub-ranges, i.e. bytes_to_read() usually equals to
len(). The Parquet scanner also doesn't hit the bug because it tracks
which pages it reads.
However, it can be a potential source of bugs in the future to leave
the invocations of len() instead of bytes_to_read(). Also, the scanners
might allocate more memory than needed. At couple of places we still
need to invoke len(), e.g. when we test scan-range containment (for
local splits), or when we test whether a scan range contains the
mid-point of a Parquet row group.
Testing:
Added a scanner reservation test.
Ran the exhaustive tests.
Change-Id: Ie896db3f4b5f3e2272d81c2d360049af09c41d9c
Reviewed-on: http://gerrit.cloudera.org:8080/14348
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 40234e3..65fd7d0 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -333,7 +333,7 @@
context_->partition_descriptor()->id(), filename);
const vector<ScanRange*>& splits = desc->splits;
for (int i = 0; i < splits.size(); ++i) {
- COUNTER_ADD(bytes_skipped_counter_, splits[i]->len());
+ COUNTER_ADD(bytes_skipped_counter_, splits[i]->bytes_to_read());
}
scan_node_->SkipFile(file_format(), desc);
}
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 0286d30..f407ca5 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -560,7 +560,7 @@
// improve throughput. Note that if this is a columnar format like Parquet,
// '*scan_range' is the small footer range only so we won't request an increase.
int64_t ideal_scan_range_reservation =
- io_mgr->ComputeIdealBufferReservation((*scan_range)->len());
+ io_mgr->ComputeIdealBufferReservation((*scan_range)->bytes_to_read());
*reservation = IncreaseReservationIncrementally(*reservation, ideal_scan_range_reservation);
initial_range_ideal_reservation_stats_->UpdateCounter(ideal_scan_range_reservation);
initial_range_actual_reservation_stats_->UpdateCounter(*reservation);
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 10571b2..0a74f34 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -1700,7 +1700,7 @@
vector<int64_t> col_range_lengths(column_readers.size());
for (int i = 0; i < column_readers.size(); ++i) {
- col_range_lengths[i] = column_readers[i]->scan_range()->len();
+ col_range_lengths[i] = column_readers[i]->scan_range()->bytes_to_read();
}
// The scanner-wide stream was used only to read the file footer. Each column has added
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 9680778..9c93c38 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -141,12 +141,14 @@
void set_read_past_size_cb(ReadPastSizeCallback cb) { read_past_size_cb_ = cb; }
/// Return the number of bytes left in the range for this stream.
- int64_t bytes_left() { return scan_range_->len() - total_bytes_returned_; }
+ int64_t bytes_left() { return scan_range_->bytes_to_read() - total_bytes_returned_; }
/// If true, all bytes in this scan range have been returned from this ScannerContext
/// to callers or we hit eof before reaching the end of the scan range. Callers can
/// continue to call Read*()/Get*()/Skip*() methods on the stream until eof() is true.
- bool eosr() const { return total_bytes_returned_ >= scan_range_->len() || eof(); }
+ bool eosr() const {
+ return total_bytes_returned_ >= scan_range_->bytes_to_read() || eof();
+ }
/// If true, the stream has reached the end of the file. After this is true, any
/// Read*()/Get*()/Skip*() methods will not succeed.
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 5a8b396..c1804ae 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -362,6 +362,11 @@
return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
Substitute("Invalid scan range. Non-positive length $0", range->len()));
}
+ if (range->bytes_to_read() <= 0) {
+ return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
+ Substitute("Invalid scan range. Non-positive bytes to read $0",
+ range->bytes_to_read()));
+ }
return Status::OK();
}
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index c7066e9..ceb3afe 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -396,7 +396,7 @@
// Add each range to the queue of the disk the range is on
for (ScanRange* range : ranges) {
// Don't add empty ranges.
- DCHECK_NE(range->len(), 0);
+ DCHECK_NE(range->bytes_to_read(), 0);
AddActiveScanRangeLocked(lock, range);
if (range->UseHdfsCache()) {
cached_ranges_.Enqueue(range);
@@ -479,7 +479,7 @@
DCHECK(Validate()) << endl << DebugString();
if (state_ == RequestContext::Cancelled) return CONTEXT_CANCELLED;
- DCHECK_NE(range->len(), 0);
+ DCHECK_NE(range->bytes_to_read(), 0);
if (range->UseHdfsCache()) {
bool cached_read_succeeded;
RETURN_IF_ERROR(TryReadFromCache(lock, range, &cached_read_succeeded,
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index d9e3423..abcb39c 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -221,7 +221,8 @@
if (sub_ranges_.empty()) {
DCHECK(cache_.data == nullptr);
read_status = file_reader_->ReadFromPos(queue, offset_ + bytes_read_,
- buffer_desc->buffer_, min(len() - bytes_read_, buffer_desc->buffer_len_),
+ buffer_desc->buffer_,
+ min(bytes_to_read() - bytes_read_, buffer_desc->buffer_len_),
&buffer_desc->len_, &eof);
} else {
read_status = ReadSubRanges(queue, buffer_desc.get(), &eof);
diff --git a/testdata/workloads/functional-query/queries/QueryTest/scanner-reservation.test b/testdata/workloads/functional-query/queries/QueryTest/scanner-reservation.test
index 0ca55ff..1003651 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/scanner-reservation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/scanner-reservation.test
@@ -53,3 +53,11 @@
row_regex:.*ParquetRowGroupIdealReservation.*Avg: 24.00 MB.*
row_regex:.*ParquetRowGroupActualReservation.*Avg: 4.00 MB.*
====
+---- QUERY
+# IMPALA-8742: Use ScanRange::bytes_to_read() instead of len(), it has an effect
+# on the calculated ideal reservation.
+select * from tpch_parquet.lineitem
+where l_orderkey < 10;
+---- RUNTIME_PROFILE
+row_regex:.*ParquetRowGroupIdealReservation.*Avg: 3.50 MB.*
+====