IMPALA-8742: Switch to ScanRange::bytes_to_read() instead of len()

IMPALA-7543 introduced sub-ranges in scan ranges. These are smaller
parts of the scan ranges that actually need to be read, other parts
of the scan range can be skipped. Currently sub-ranges are only used
in the Parquet scanner during page filtering.

With sub-ranges the scan range has a new field 'bytes_to_read_', that
is the sum of the lengths of the sub-ranges. Or, if there are no
sub-ranges, 'bytes_to_read_' equals to field 'len_' which is the length
of the whole scan range.

At some parts of Impala ScanRange::len() is being used instead of
ScanRange::bytes_to_read(). It doesn't cause a bug because only the
Parquet scanner uses sub-ranges, i.e. bytes_to_read() usually equals to
len(). The Parquet scanner also doesn't hit the bug because it tracks
which pages it reads.

However, it can be a potential source of bugs in the future to leave
the invocations of len() instead of bytes_to_read(). Also, the scanners
might allocate more memory than needed. At couple of places we still
need to invoke len(), e.g. when we test scan-range containment (for
local splits), or when we test whether a scan range contains the
mid-point of a Parquet row group.

Testing:
Added a scanner reservation test.
Ran the exhaustive tests.

Change-Id: Ie896db3f4b5f3e2272d81c2d360049af09c41d9c
Reviewed-on: http://gerrit.cloudera.org:8080/14348
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 40234e3..65fd7d0 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -333,7 +333,7 @@
       context_->partition_descriptor()->id(), filename);
   const vector<ScanRange*>& splits = desc->splits;
   for (int i = 0; i < splits.size(); ++i) {
-    COUNTER_ADD(bytes_skipped_counter_, splits[i]->len());
+    COUNTER_ADD(bytes_skipped_counter_, splits[i]->bytes_to_read());
   }
   scan_node_->SkipFile(file_format(), desc);
 }
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 0286d30..f407ca5 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -560,7 +560,7 @@
     // improve throughput. Note that if this is a columnar format like Parquet,
     // '*scan_range' is the small footer range only so we won't request an increase.
     int64_t ideal_scan_range_reservation =
-        io_mgr->ComputeIdealBufferReservation((*scan_range)->len());
+        io_mgr->ComputeIdealBufferReservation((*scan_range)->bytes_to_read());
     *reservation = IncreaseReservationIncrementally(*reservation, ideal_scan_range_reservation);
     initial_range_ideal_reservation_stats_->UpdateCounter(ideal_scan_range_reservation);
     initial_range_actual_reservation_stats_->UpdateCounter(*reservation);
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 10571b2..0a74f34 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -1700,7 +1700,7 @@
 
   vector<int64_t> col_range_lengths(column_readers.size());
   for (int i = 0; i < column_readers.size(); ++i) {
-    col_range_lengths[i] = column_readers[i]->scan_range()->len();
+    col_range_lengths[i] = column_readers[i]->scan_range()->bytes_to_read();
   }
 
   // The scanner-wide stream was used only to read the file footer.  Each column has added
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 9680778..9c93c38 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -141,12 +141,14 @@
     void set_read_past_size_cb(ReadPastSizeCallback cb) { read_past_size_cb_ = cb; }
 
     /// Return the number of bytes left in the range for this stream.
-    int64_t bytes_left() { return scan_range_->len() - total_bytes_returned_; }
+    int64_t bytes_left() { return scan_range_->bytes_to_read() - total_bytes_returned_; }
 
     /// If true, all bytes in this scan range have been returned from this ScannerContext
     /// to callers or we hit eof before reaching the end of the scan range. Callers can
     /// continue to call Read*()/Get*()/Skip*() methods on the stream until eof() is true.
-    bool eosr() const { return total_bytes_returned_ >= scan_range_->len() || eof(); }
+    bool eosr() const {
+      return total_bytes_returned_ >= scan_range_->bytes_to_read() || eof();
+    }
 
     /// If true, the stream has reached the end of the file. After this is true, any
     /// Read*()/Get*()/Skip*() methods will not succeed.
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 5a8b396..c1804ae 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -362,6 +362,11 @@
     return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
         Substitute("Invalid scan range. Non-positive length $0", range->len()));
   }
+  if (range->bytes_to_read() <= 0) {
+    return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
+        Substitute("Invalid scan range. Non-positive bytes to read $0",
+                   range->bytes_to_read()));
+  }
   return Status::OK();
 }
 
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index c7066e9..ceb3afe 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -396,7 +396,7 @@
   // Add each range to the queue of the disk the range is on
   for (ScanRange* range : ranges) {
     // Don't add empty ranges.
-    DCHECK_NE(range->len(), 0);
+    DCHECK_NE(range->bytes_to_read(), 0);
     AddActiveScanRangeLocked(lock, range);
     if (range->UseHdfsCache()) {
       cached_ranges_.Enqueue(range);
@@ -479,7 +479,7 @@
   DCHECK(Validate()) << endl << DebugString();
   if (state_ == RequestContext::Cancelled) return CONTEXT_CANCELLED;
 
-  DCHECK_NE(range->len(), 0);
+  DCHECK_NE(range->bytes_to_read(), 0);
   if (range->UseHdfsCache()) {
     bool cached_read_succeeded;
     RETURN_IF_ERROR(TryReadFromCache(lock, range, &cached_read_succeeded,
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index d9e3423..abcb39c 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -221,7 +221,8 @@
     if (sub_ranges_.empty()) {
       DCHECK(cache_.data == nullptr);
       read_status = file_reader_->ReadFromPos(queue, offset_ + bytes_read_,
-          buffer_desc->buffer_, min(len() - bytes_read_, buffer_desc->buffer_len_),
+          buffer_desc->buffer_,
+          min(bytes_to_read() - bytes_read_, buffer_desc->buffer_len_),
           &buffer_desc->len_, &eof);
     } else {
       read_status = ReadSubRanges(queue, buffer_desc.get(), &eof);
diff --git a/testdata/workloads/functional-query/queries/QueryTest/scanner-reservation.test b/testdata/workloads/functional-query/queries/QueryTest/scanner-reservation.test
index 0ca55ff..1003651 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/scanner-reservation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/scanner-reservation.test
@@ -53,3 +53,11 @@
 row_regex:.*ParquetRowGroupIdealReservation.*Avg: 24.00 MB.*
 row_regex:.*ParquetRowGroupActualReservation.*Avg: 4.00 MB.*
 ====
+---- QUERY
+# IMPALA-8742: Use ScanRange::bytes_to_read() instead of len(), it has an effect
+# on the calculated ideal reservation.
+select * from tpch_parquet.lineitem
+where l_orderkey < 10;
+---- RUNTIME_PROFILE
+row_regex:.*ParquetRowGroupIdealReservation.*Avg: 3.50 MB.*
+====