IMPALA-10526: Fix BufferPoolTest.Multi8RandomSpillToRemoteMix failed in sanitizer builds

Fixed a data race issue when running testcase
BufferPoolTest.Multi8RandomSpillToRemoteMix in the tsan build. The
solution is to access ScanRange::use_local_buffer_ under
ScanRange::lock_.

Tests:
Rerun testcase BufferPoolTest.Multi8RandomSpillToRemoteMix.

Change-Id: Ic77dd85d20efc7066758b2ba61e2138745cbd90b
Reviewed-on: http://gerrit.cloudera.org:8080/17096
Reviewed-by: Thomas Tauber-Marshall <tmarshall@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index dca4398..d70304e 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -375,7 +375,9 @@
   ReadOutcome DoRead(DiskQueue* queue, int disk_id);
 
   /// The function runs the actual read logic to read content with the specific reader.
-  ReadOutcome DoReadInternal(DiskQueue* queue, int disk_id, FileReader* file_reader);
+  /// If use_local_buffer is true, it will read from the local buffer with the local
+  /// buffer reader.
+  ReadOutcome DoReadInternal(DiskQueue* queue, int disk_id, bool use_local_buffer);
 
   /// Cleans up a buffer that was not returned to the client.
   /// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor.
@@ -585,7 +587,7 @@
   /// If set to true, the scan range is using local_buffer_reader_ to do scan operations.
   /// The flag is set during DoRead(). If the path is a remote path and the file has
   /// a local buffer, the flag is set to true, otherwise the flag is false.
-  bool use_local_buffer_{false};
+  bool use_local_buffer_ = false;
 
   /// If not empty, the ScanRange will only read these parts from the file.
   std::vector<SubRange> sub_ranges_;
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 569ef57..d1d75c9 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -164,11 +164,12 @@
 }
 
 ReadOutcome ScanRange::DoReadInternal(
-    DiskQueue* queue, int disk_id, FileReader* file_reader) {
+    DiskQueue* queue, int disk_id, bool use_local_buff) {
   int64_t bytes_remaining = bytes_to_read_ - bytes_read_;
   DCHECK_GT(bytes_remaining, 0);
 
   unique_ptr<BufferDescriptor> buffer_desc;
+  FileReader* file_reader = nullptr;
   {
     unique_lock<mutex> lock(lock_);
     DCHECK(!read_in_flight_);
@@ -190,7 +191,15 @@
       iomgr_buffer_cumulative_bytes_used_ += buffer_desc->buffer_len();
     }
     read_in_flight_ = true;
+    if (use_local_buff) {
+      file_reader = local_buffer_reader_.get();
+      file_ = disk_buffer_file_->path();
+    } else {
+      file_reader = file_reader_.get();
+    }
+    use_local_buffer_ = use_local_buff;
   }
+  DCHECK(file_reader != nullptr);
 
   // No locks in this section.  Only working on local vars.  We don't want to hold a
   // lock across the read call.
@@ -265,7 +274,7 @@
 }
 
 ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
-  FileReader* file_reader = file_reader_.get();
+  bool use_local_buffer = false;
   if (disk_file_ != nullptr && disk_file_->disk_type() != DiskFileType::LOCAL) {
     // The sequence for acquiring the locks should always be from the local to
     // the remote to avoid deadlocks.
@@ -283,18 +292,15 @@
       // If the local buffer exists, we can read from the local buffer, otherwise,
       // we will read from the remote file system.
       if (!disk_buffer_file_->is_deleted(buffer_file_lock)) {
-        file_reader = local_buffer_reader_.get();
-        file_ = disk_buffer_file_->path();
-        use_local_buffer_ = true;
+        use_local_buffer = true;
       } else {
         // Read from the remote file. The remote file must be in persisted status.
         DCHECK(disk_file_->is_persisted(file_lock));
-        use_local_buffer_ = false;
       }
     }
-    return DoReadInternal(queue, disk_id, file_reader);
+    return DoReadInternal(queue, disk_id, use_local_buffer);
   }
-  return DoReadInternal(queue, disk_id, file_reader);
+  return DoReadInternal(queue, disk_id, use_local_buffer);
 }
 
 Status ScanRange::ReadSubRanges(
@@ -382,13 +388,13 @@
 void ScanRange::CancelInternal(const Status& status, bool read_error) {
   DCHECK(io_mgr_ != nullptr);
   DCHECK(!status.ok());
-  FileReader* file_reader =
-      use_local_buffer_ ? local_buffer_reader_.get() : file_reader_.get();
+  FileReader* file_reader = nullptr;
   {
     // Grab both locks to make sure that we don't change 'cancel_status_' while other
     // threads are in critical sections.
     unique_lock<mutex> scan_range_lock(lock_);
     {
+      file_reader = use_local_buffer_ ? local_buffer_reader_.get() : file_reader_.get();
       unique_lock<SpinLock> fs_lock(file_reader->lock());
       DCHECK(Validate()) << DebugString();
       // If already cancelled, preserve the original reason for cancellation. Most of the
@@ -418,7 +424,10 @@
   // TODO: IMPALA-4249 - this Close() call makes it unsafe to reuse a cancelled scan
   // range, because there is no synchronisation between this Close() call and the
   // client adding the ScanRange back into the IoMgr.
-  if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) file_reader->Close();
+  if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) {
+    DCHECK(file_reader != nullptr);
+    file_reader->Close();
+  }
 }
 
 void ScanRange::WaitForInFlightRead() {