IMPALA-9033: log on slow HDFS I/Os

This logs a message with the time taken and also logs basic HDFS
statistics from the I/O, which would tell us if it was a remote
read, etc.

The threshold is 10s and is configurable via
--fs_slow_read_log_threshold_ms in case we want to make it more or less
sensitive.

Here's some example output that I obtained by adding a 500ms sleep
to the code path, and lowering the threshold to 500ms:

I1010 12:09:38.211959 30292 hdfs-file-reader.cc:173] 2448e3196bf9ee94:69adb16f00000001] Slow FS I/O operation on hdfs://localhost:20500/test-warehouse/tpch.lineitem/lineitem.tbl for instance 2448e3196bf9ee94:69adb16f00000001 of query 2448e3196bf9ee94:69adb16f00000000. Last read returned 8.00 MB. This thread has read 8.00 MB/8.00 MB starting at offset 394264576 in this I/O scheduling quantum and taken 584.129ms so far. I/O status: OK
I1010 12:09:38.212011 30292 hdfs-file-reader.cc:353] 2448e3196bf9ee94:69adb16f00000001] Stats for last read by this I/O thread: totalBytesRead=8388608 totalLocalBytesRead=8388608 totalShortCircuitBytesRead=8388608 totalZeroCopyBytesRead=0

Change-Id: I1929921495706b482d91d91cffe27bee4478f5c4
Reviewed-on: http://gerrit.cloudera.org:8080/14406
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc
index 14cf2b5..c862ee0 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -23,6 +23,7 @@
 #include "runtime/io/hdfs-file-reader.h"
 #include "runtime/io/request-context.h"
 #include "runtime/io/request-ranges.h"
+#include "util/debug-util.h"
 #include "util/hdfs-util.h"
 #include "util/histogram-metric.h"
 #include "util/impalad-metrics.h"
@@ -36,6 +37,10 @@
     "when performing HDFS read operations. This is necessary to use HDFS hedged reads "
     "(assuming the HDFS client is configured to do so).");
 
+DEFINE_int64(fs_slow_read_log_threshold_ms, 10L * 1000L,
+    "Log diagnostics about I/Os issued via the HDFS client that take longer than this "
+    "threshold.");
+
 #ifndef NDEBUG
 DECLARE_int32(stress_disk_read_delay_ms);
 #endif
@@ -159,6 +164,25 @@
         status = ReadFromPosInternal(hdfs_file, queue, position_in_file,
             buffer + *bytes_read, chunk_size, &current_bytes_read);
       }
+      // Log diagnostics for failed and successful reads.
+      int64_t elapsed_time = req_context_read_timer.ElapsedTime();
+      bool is_slow_read = elapsed_time
+          > FLAGS_fs_slow_read_log_threshold_ms * NANOS_PER_MICRO * MICROS_PER_MILLI;
+      if (is_slow_read) {
+        LOG(INFO) << "Slow FS I/O operation on " << *scan_range_->file_string() << " for "
+                  << "instance " << PrintId(scan_range_->reader_->instance_id())
+                  << " of query " << PrintId(scan_range_->reader_->query_id()) << ". "
+                  << "Last read returned "
+                  << PrettyPrinter::PrintBytes(current_bytes_read) << ". "
+                  << "This thread has read "
+                  << PrettyPrinter::PrintBytes(*bytes_read + current_bytes_read)
+                  << "/" << PrettyPrinter::PrintBytes(bytes_to_read)
+                  << " starting at offset " << file_offset << " in this I/O scheduling "
+                  << "quantum and taken "
+                  << PrettyPrinter::Print(elapsed_time, TUnit::TIME_NS) << " so far. "
+                  << "I/O status: " << (status.ok() ? "OK" : status.GetDetail());
+      }
+
       if (!status.ok()) {
         break;
       }
@@ -171,7 +195,7 @@
       *bytes_read += current_bytes_read;
 
       // Collect and accumulate statistics
-      GetHdfsStatistics(hdfs_file);
+      GetHdfsStatistics(hdfs_file, is_slow_read);
     }
 
     int64_t cached_bytes_missed = *bytes_read - cached_read;
@@ -274,7 +298,7 @@
 void HdfsFileReader::Close() {
   unique_lock<SpinLock> hdfs_lock(lock_);
   if (exclusive_hdfs_fh_ != nullptr) {
-    GetHdfsStatistics(exclusive_hdfs_fh_->file());
+    GetHdfsStatistics(exclusive_hdfs_fh_->file(), false);
 
     if (cached_buffer_ != nullptr) {
       hadoopRzBufferFree(exclusive_hdfs_fh_->file(), cached_buffer_);
@@ -314,7 +338,7 @@
   }
 }
 
-void HdfsFileReader::GetHdfsStatistics(hdfsFile hdfs_file) {
+void HdfsFileReader::GetHdfsStatistics(hdfsFile hdfs_file, bool log_stats) {
   struct hdfsReadStatistics* stats;
   if (IsHdfsPath(scan_range_->file())) {
     int success = hdfsFileGetReadStatistics(hdfs_file, &stats);
@@ -326,6 +350,13 @@
       if (stats->totalLocalBytesRead != stats->totalBytesRead) {
         num_remote_bytes_ += stats->totalBytesRead - stats->totalLocalBytesRead;
       }
+      if (log_stats) {
+        LOG(INFO) << "Stats for last read by this I/O thread:"
+                  << " totalBytesRead=" << stats->totalBytesRead
+                  << " totalLocalBytesRead=" << stats->totalLocalBytesRead
+                  << " totalShortCircuitBytesRead=" << stats->totalShortCircuitBytesRead
+                  << " totalZeroCopyBytesRead=" << stats->totalZeroCopyBytesRead;
+      }
       hdfsFileFreeReadStatistics(stats);
     }
     hdfsFileClearReadStatistics(hdfs_file);
diff --git a/be/src/runtime/io/hdfs-file-reader.h b/be/src/runtime/io/hdfs-file-reader.h
index 1e228e2..57b9332 100644
--- a/be/src/runtime/io/hdfs-file-reader.h
+++ b/be/src/runtime/io/hdfs-file-reader.h
@@ -74,7 +74,9 @@
   Status ReadFromPosInternal(hdfsFile hdfs_file, DiskQueue* disk_queue,
       int64_t position_in_file, uint8_t* buffer, int64_t chunk_size, int* bytes_read);
 
-  void GetHdfsStatistics(hdfsFile hdfs_file);
+  /// Update counters with HDFS read statistics from 'hdfs_file'. If 'log_stats' is
+  /// true, the statistics are logged.
+  void GetHdfsStatistics(hdfsFile hdfs_file, bool log_stats);
 
   /// Hadoop filesystem that contains the file being read.
   hdfsFS const hdfs_fs_;
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index fe612ce..2777032 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -659,6 +659,9 @@
 
   bool IsCancelled() { return is_cancelled_ != nullptr && *is_cancelled_; }
 
+  /// Return the total elapsed time accumulated by this timer so far.
+  int64_t ElapsedTime() { return sw_.ElapsedTime(); }
+
   /// Update counter when object is destroyed
   ~ScopedTimer() {
     sw_.Stop();