IMPALA-5378: Disk IO manager needs to understand ADLS

The Disk IO Manager had customized support for S3 and remote HDFS that
allows for these to use a separate queue and have a customized number
of IO threads. ADLS did not have this support.

Based on the code in DiskIoMgr::Init and DiskIoMgr::AssignQueue, IOs
for ADLS were previously put in local disk queues. Since local disks
are considered rotational unless we can confirm otherwise by looking at
the /sys filesystem, this means that THREADS_PER_ROTATIONAL_DISK=1 was
being applied as the thread count.

This patch adds customized support for ADLS, similar to how it was done
for S3. We set 16 threads as the default number of IO threads for ADLS.
For smaller clusters, setting a higher number like 64 would work better.
We keep the thread count to a lower default of 16 since there is an
undocumented concurrency limit for clusters, which is around 500-700
connections, which means we would hurt node level parallelism if we
have higher thread level parallelism, for larger clusters.

We also set the default maximum chunk size for ADLS as 128k. This is due
to the fact that direct reads aren't supported for ADLS, which means that
the JNI array allocation and the memcpy adds significant overhead for
larger buffers. 128k was chosen emperically for S3 for the same reason.
Since this reason also holds for ADLS, we keep the same value. A new
flag called FLAGS_adls_read_chunk_size is used to control this value.

TODO: Settle on a buffer size with the most optimal buffer size
emperically.

Change-Id: I067f053fec941e3631610c5cc89a384f257ba906
Reviewed-on: http://gerrit.cloudera.org:8080/7033
Reviewed-by: Sailesh Mukil <sailesh@cloudera.com>
Tested-by: Impala Public Jenkins
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
index 9ccd1da..0668eb6 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -33,6 +33,13 @@
     "when performing HDFS read operations. This is necessary to use HDFS hedged reads "
     "(assuming the HDFS client is configured to do so).");
 
+// TODO: Run perf tests and empirically settle on the most optimal default value for the
+// read buffer size. Currently setting it as 128k for the same reason as for S3, i.e.
+// due to JNI array allocation and memcpy overhead, 128k was emperically found to have the
+// least overhead.
+DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to use when "
+    "reading from ADLS.");
+
 // Implementation of the ScanRange functionality. Each ScanRange contains a queue
 // of ready buffers. For each ScanRange, there is only a single producer and
 // consumer thread, i.e. only one disk thread will push to a scan range at
@@ -389,6 +396,10 @@
     DCHECK(IsS3APath(file()));
     return 128 * 1024;
   }
+  if (disk_id_ == io_mgr_->RemoteAdlsDiskId()) {
+    DCHECK(IsADLSPath(file()));
+    return FLAGS_adls_read_chunk_size;
+  }
   // The length argument of hdfsRead() is an int. Ensure we don't overflow it.
   return numeric_limits<int>::max();
 }
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 1f28b1a..536ed88 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -51,6 +51,11 @@
 // open to S3 and use of multiple CPU cores since S3 reads are relatively compute
 // expensive (SSL and JNI buffer overheads).
 DEFINE_int32(num_s3_io_threads, 16, "number of S3 I/O threads");
+// The maximum number of ADLS I/O threads. This number is a good default to have for
+// clusters that may vary widely in size, due to an undocumented concurrency limit
+// enforced by ADLS for a cluster, which spans between 500-700. For smaller clusters
+// (~10 nodes), 64 threads would be more ideal.
+DEFINE_int32(num_adls_io_threads, 16, "number of ADLS I/O threads");
 // The read size is the size of the reads sent to hdfs/os.
 // There is a trade off of latency and throughout, trying to keep disks busy but
 // not introduce seeks.  The literature seems to agree that with 8 MB reads, random
@@ -388,6 +393,8 @@
       num_threads_per_disk = FLAGS_num_remote_hdfs_io_threads;
     } else if (i == RemoteS3DiskId()) {
       num_threads_per_disk = FLAGS_num_s3_io_threads;
+    } else if (i == RemoteAdlsDiskId()) {
+      num_threads_per_disk = FLAGS_num_adls_io_threads;
     } else if (num_threads_per_disk_ != 0) {
       num_threads_per_disk = num_threads_per_disk_;
     } else if (DiskInfo::is_rotational(i)) {
@@ -1248,9 +1255,11 @@
       return RemoteDfsDiskId();
     }
     if (IsS3APath(file)) return RemoteS3DiskId();
+    if (IsADLSPath(file)) return RemoteAdlsDiskId();
   }
   // Assign to a local disk queue.
   DCHECK(!IsS3APath(file)); // S3 is always remote.
+  DCHECK(!IsADLSPath(file)); // ADLS is always remote.
   if (disk_id == -1) {
     // disk id is unknown, assign it an arbitrary one.
     disk_id = next_disk_id_.Add(1);
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index b70c9c8..41d3226 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -756,6 +756,9 @@
   /// The disk ID (and therefore disk_queues_ index) used for S3 accesses.
   int RemoteS3DiskId() const { return num_local_disks() + REMOTE_S3_DISK_OFFSET; }
 
+  /// The disk ID (and therefore disk_queues_ index) used for ADLS accesses.
+  int RemoteAdlsDiskId() const { return num_local_disks() + REMOTE_ADLS_DISK_OFFSET; }
+
   /// Dumps the disk IoMgr queues (for readers and disks)
   std::string DebugString();
 
@@ -787,6 +790,7 @@
   enum {
     REMOTE_DFS_DISK_OFFSET = 0,
     REMOTE_S3_DISK_OFFSET,
+    REMOTE_ADLS_DISK_OFFSET,
     REMOTE_NUM_DISKS
   };