IMPALA-8525: preads should use hdfsPreadFully rather than hdfsPread

Modifies HdfsFileReader so that it calls hdfsPreadFully instead of
hdfsPread. hdfsPreadFully is a new libhdfs API introduced by HDFS-14564
(Add libhdfs APIs for readFully; add readFully to
ByteBufferPositionedReadable). hdfsPreadFully improves performance of
preads, especially when reading data from S3. The major difference
between hdfsPread and hdfsPreadFully is that hdfsPreadFully is
guaranteed to read all the requested bytes, whereas hdfsPread is only
guaranteed to read up to the number of requested bytes.

hdfsPreadFully reduces the amount of JNI array allocations necessary
when reading data from S3. When any read method in libhdfs is called,
the method allocates an array whose size is equal to the amount of data
requested. The issue is that Java's InputStream#read only guarantees
that it will read up to the amount of data requested. This can lead to
issues where a libhdfs read request allocates a large Java array, even
though the read request only partially fills it up.
PositionedReadable#readFully on the other hand, guarantees that all
requested data will be read, thus preventing any unnecessary JNI array
allocations.

hdfsPreadFully improves the effectiveness of
fs.s3a.experimental.input.fadvise=RANDOM (HADOOP-13203). S3A recommends
setting fadvise=RANDOM when doing random reads, which is common in
Impala when reading Parquet or ORC files. fadvise=RANDOM causes the
HTTP GET request that reads the S3 data to simply request the data
bounded by the parameters of the current read request (e.g. for
'read(long position, ..., int length)' it requests 'length' bytes). The
chunk-size optimization in HdfsFileReader hurts performance when
fadvise=RANDOM because each HTTP GET request will only request
'chunk-size' amount of bytes at a time. Which is why this patch removes
the chunk-size optimization as well. hdfsPreadFully helps here because
all the data in the scan range will be requested by a single HTTP GET
request.

Since hdfsPreadFully improves S3 read performance, this patch enables
preads for S3A files by default. Even if fadvise=SEQUENTIAL,
hdfsPreadFully still improves performance since it avoids unnecessary
JNI allocation overhead.

The chunk-size optimization (added in
https://gerrit.cloudera.org/#/c/63/) is no longer necessary after this
patch. hdfsPreadFully prevents any unnecessary array allocations.
Furthermore, it is likely the chunk-size optimization was added due to
overhead fixed by HDFS-14285.

Fixes a bug in IMPALA-8884 where the
'impala-server.io-mgr.queue-$i.read-size' statistics were being updated
with the chunk-size passed to HdfsFileReader::ReadFromPosInternal, which
is not necessarily equivalent to the amount of data actually read.

Testing:
* Ran core tests
* Ran core tests on S3
* Ad-hoc functional and performance testing on ABFS; no perf regression
observed; planning to further investigate the interaction between
hdfsPreadFully + ABFS in a future JIRA

Change-Id: I29ea34897096bc790abdeb98073a47f1c4c10feb
Reviewed-on: http://gerrit.cloudera.org:8080/14635
Reviewed-by: Sahil Takiar <stakiar@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 0404d19..c2a4803 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -320,7 +320,9 @@
       return true; \
     });
 
-REMOVED_FLAG(authorization_policy_file)
+REMOVED_FLAG(abfs_read_chunk_size);
+REMOVED_FLAG(adls_read_chunk_size);
+REMOVED_FLAG(authorization_policy_file);
 REMOVED_FLAG(be_service_threads);
 REMOVED_FLAG(cgroup_hierarchy_path);
 REMOVED_FLAG(disable_admission_control);
diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc
index 92b8b98..edb655f 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -35,7 +35,8 @@
 
 DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of hdfsRead() "
     "when performing HDFS read operations. This is necessary to use HDFS hedged reads "
-    "(assuming the HDFS client is configured to do so).");
+    "(assuming the HDFS client is configured to do so). Preads are always enabled for "
+    "S3A reads.");
 
 DEFINE_int64(fs_slow_read_log_threshold_ms, 10L * 1000L,
     "Log diagnostics about I/Os issued via the HDFS client that take longer than this "
@@ -116,7 +117,6 @@
     }
   });
 
-  int64_t max_chunk_size = scan_range_->MaxReadChunkSize();
   Status status = Status::OK();
   {
     ScopedTimer<MonotonicStopWatch> req_context_read_timer(
@@ -134,10 +134,10 @@
     }
 
     while (*bytes_read < bytes_to_read) {
-      int chunk_size = min(bytes_to_read - *bytes_read, max_chunk_size);
-      DCHECK_GT(chunk_size, 0);
+      int bytes_remaining = bytes_to_read - *bytes_read;
+      DCHECK_GT(bytes_remaining, 0);
       // The hdfsRead() length argument is an int.
-      DCHECK_LE(chunk_size, numeric_limits<int>::max());
+      DCHECK_LE(bytes_remaining, numeric_limits<int>::max());
       int current_bytes_read = -1;
       // bytes_read_ is only updated after the while loop
       int64_t position_in_file = file_offset + *bytes_read;
@@ -145,7 +145,7 @@
       // ReadFromPosInternal() might fail due to a bad file handle.
       // If that was the case, allow for a retry to fix it.
       status = ReadFromPosInternal(hdfs_file, queue, position_in_file,
-          buffer + *bytes_read, chunk_size, &current_bytes_read);
+          buffer + *bytes_read, bytes_remaining, &current_bytes_read);
 
       // Retry if:
       // - first read was not successful
@@ -163,7 +163,7 @@
                   << scan_range_->mtime() << " offset " << file_offset;
         req_context_read_timer.Start();
         status = ReadFromPosInternal(hdfs_file, queue, position_in_file,
-            buffer + *bytes_read, chunk_size, &current_bytes_read);
+            buffer + *bytes_read, bytes_remaining, &current_bytes_read);
       }
       // Log diagnostics for failed and successful reads.
       int64_t elapsed_time = req_context_read_timer.ElapsedTime();
@@ -217,18 +217,18 @@
 }
 
 Status HdfsFileReader::ReadFromPosInternal(hdfsFile hdfs_file, DiskQueue* queue,
-    int64_t position_in_file, uint8_t* buffer, int64_t chunk_size, int* bytes_read) {
-  queue->read_size()->Update(chunk_size);
+    int64_t position_in_file, uint8_t* buffer, int64_t bytes_to_read, int* bytes_read) {
   ScopedHistogramTimer read_timer(queue->read_latency());
   // For file handles from the cache, any of the below file operations may fail
   // due to a bad file handle.
-  if (FLAGS_use_hdfs_pread) {
-    *bytes_read = hdfsPread(hdfs_fs_, hdfs_file, position_in_file, buffer, chunk_size);
-    if (*bytes_read == -1) {
+  if (FLAGS_use_hdfs_pread || IsS3APath(scan_range_->file_string()->c_str())) {
+    if (hdfsPreadFully(
+          hdfs_fs_, hdfs_file, position_in_file, buffer, bytes_to_read) == -1) {
       return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
           GetHdfsErrorMsg("Error reading from HDFS file: ",
               *scan_range_->file_string()));
     }
+    *bytes_read = bytes_to_read;
   } else {
     const int64_t cur_offset = hdfsTell(hdfs_fs_, hdfs_file);
     if (cur_offset == -1) {
@@ -245,13 +245,14 @@
                 position_in_file, *scan_range_->file_string(), GetHdfsErrorMsg("")));
       }
     }
-    *bytes_read = hdfsRead(hdfs_fs_, hdfs_file, buffer, chunk_size);
+    *bytes_read = hdfsRead(hdfs_fs_, hdfs_file, buffer, bytes_to_read);
     if (*bytes_read == -1) {
       return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
           GetHdfsErrorMsg("Error reading from HDFS file: ",
               *scan_range_->file_string()));
     }
   }
+  queue->read_size()->Update(*bytes_read);
   return Status::OK();
 }
 
diff --git a/be/src/runtime/io/hdfs-file-reader.h b/be/src/runtime/io/hdfs-file-reader.h
index 57b9332..29f0e2c 100644
--- a/be/src/runtime/io/hdfs-file-reader.h
+++ b/be/src/runtime/io/hdfs-file-reader.h
@@ -66,13 +66,13 @@
   void WriteDataCache(DataCache* remote_data_cache, int64_t file_offset,
       const uint8_t* buffer, int64_t buffer_len, int64_t cached_bytes_missed);
 
-  /// Read [position_in_file, position_in_file + chunk_size) from 'hdfs_file'
+  /// Read [position_in_file, position_in_file + bytes_to_read) from 'hdfs_file'
   /// into 'buffer'. Update 'bytes_read' on success. Returns error status on
   /// failure. When not using HDFS pread, this function will always implicitly
   /// seek to 'position_in_file' if 'hdfs_file' is not at it already.
   /// 'disk_queue' metrics are updated based on the operation.
   Status ReadFromPosInternal(hdfsFile hdfs_file, DiskQueue* disk_queue,
-      int64_t position_in_file, uint8_t* buffer, int64_t chunk_size, int* bytes_read);
+      int64_t position_in_file, uint8_t* buffer, int64_t bytes_to_read, int* bytes_read);
 
   /// Update counters with HDFS read statistics from 'hdfs_file'. If 'log_stats' is
   /// true, the statistics are logged.
diff --git a/be/src/runtime/io/local-file-reader.cc b/be/src/runtime/io/local-file-reader.cc
index 7e612c2..4f3ba3d 100644
--- a/be/src/runtime/io/local-file-reader.cc
+++ b/be/src/runtime/io/local-file-reader.cc
@@ -75,13 +75,13 @@
         Substitute("Could not seek to $0 for file: $1: $2",
             scan_range_->offset(), *scan_range_->file_string(), GetStrErrMsg()));
   }
-  queue->read_size()->Update(bytes_to_read);
   {
     ScopedHistogramTimer read_timer(queue->read_latency());
     *bytes_read = fread(buffer, 1, bytes_to_read, file_);
   }
   DCHECK_GE(*bytes_read, 0);
   DCHECK_LE(*bytes_read, bytes_to_read);
+  queue->read_size()->Update(*bytes_read);
   if (*bytes_read < bytes_to_read) {
     if (ferror(file_) != 0) {
       return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 65cc3bd..e6dc48f 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -405,9 +405,6 @@
   /// cancelled.
   bool EnqueueReadyBuffer(std::unique_ptr<BufferDescriptor> buffer);
 
-  /// Maximum length in bytes for hdfsRead() calls.
-  int64_t MaxReadChunkSize() const;
-
   /// Get the read statistics from the Hdfs file handle and aggregate them to
   /// the RequestContext. This clears the statistics on this file handle.
   /// It is safe to pass hdfsFile by value, as hdfsFile's underlying type is a
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index abcb39c..3c6ff27 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -28,15 +28,6 @@
 using namespace impala;
 using namespace impala::io;
 
-// TODO: Run perf tests and empirically settle on the most optimal default value for the
-// read buffer sizes. Currently setting them as 128k for the same reason as for S3, i.e.
-// due to JNI array allocation and memcpy overhead, 128k was emperically found to have the
-// least overhead.
-DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to use when "
-    "reading from ADLS.");
-DEFINE_int64(abfs_read_chunk_size, 128 * 1024, "The maximum read chunk size to use when "
-    "reading from ABFS.");
-
 DECLARE_bool(cache_remote_file_handles);
 DECLARE_bool(cache_s3_file_handles);
 
@@ -570,29 +561,6 @@
   file_reader_ = move(file_reader);
 }
 
-int64_t ScanRange::MaxReadChunkSize() const {
-  // S3 InputStreams don't support DIRECT_READ (i.e. java.nio.ByteBuffer read()
-  // interface).  So, hdfsRead() needs to allocate a Java byte[] and copy the data out.
-  // Profiles show that both the JNI array allocation and the memcpy adds much more
-  // overhead for larger buffers, so limit the size of each read request.  128K was
-  // chosen empirically by trying values between 4K and 8M and optimizing for lower CPU
-  // utilization and higher S3 througput.
-  if (disk_id_ == io_mgr_->RemoteS3DiskId()) {
-    DCHECK(IsS3APath(file()));
-    return 128 * 1024;
-  }
-  if (disk_id_ == io_mgr_->RemoteAdlsDiskId()) {
-    DCHECK(IsADLSPath(file()));
-    return FLAGS_adls_read_chunk_size;
-  }
-  if (disk_id_ == io_mgr_->RemoteAbfsDiskId()) {
-    DCHECK(IsABFSPath(file()));
-    return FLAGS_abfs_read_chunk_size;
-  }
-  // The length argument of hdfsRead() is an int. Ensure we don't overflow it.
-  return numeric_limits<int>::max();
-}
-
 Status ScanRange::ReadFromCache(
     const unique_lock<mutex>& reader_lock, bool* read_succeeded) {
   DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());