IMPALA-10526: Fix BufferPoolTest.Multi8RandomSpillToRemoteMix failed in sanitizer builds
Fixed a data race issue when running testcase
BufferPoolTest.Multi8RandomSpillToRemoteMix in the tsan build. The
solution is to access ScanRange::use_local_buffer_ under
ScanRange::lock_.
Tests:
Rerun testcase BufferPoolTest.Multi8RandomSpillToRemoteMix.
Change-Id: Ic77dd85d20efc7066758b2ba61e2138745cbd90b
Reviewed-on: http://gerrit.cloudera.org:8080/17096
Reviewed-by: Thomas Tauber-Marshall <tmarshall@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index dca4398..d70304e 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -375,7 +375,9 @@
ReadOutcome DoRead(DiskQueue* queue, int disk_id);
/// The function runs the actual read logic to read content with the specific reader.
- ReadOutcome DoReadInternal(DiskQueue* queue, int disk_id, FileReader* file_reader);
+ /// If use_local_buffer is true, it will read from the local buffer with the local
+ /// buffer reader.
+ ReadOutcome DoReadInternal(DiskQueue* queue, int disk_id, bool use_local_buffer);
/// Cleans up a buffer that was not returned to the client.
/// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor.
@@ -585,7 +587,7 @@
/// If set to true, the scan range is using local_buffer_reader_ to do scan operations.
/// The flag is set during DoRead(). If the path is a remote path and the file has
/// a local buffer, the flag is set to true, otherwise the flag is false.
- bool use_local_buffer_{false};
+ bool use_local_buffer_ = false;
/// If not empty, the ScanRange will only read these parts from the file.
std::vector<SubRange> sub_ranges_;
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 569ef57..d1d75c9 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -164,11 +164,12 @@
}
ReadOutcome ScanRange::DoReadInternal(
- DiskQueue* queue, int disk_id, FileReader* file_reader) {
+ DiskQueue* queue, int disk_id, bool use_local_buff) {
int64_t bytes_remaining = bytes_to_read_ - bytes_read_;
DCHECK_GT(bytes_remaining, 0);
unique_ptr<BufferDescriptor> buffer_desc;
+ FileReader* file_reader = nullptr;
{
unique_lock<mutex> lock(lock_);
DCHECK(!read_in_flight_);
@@ -190,7 +191,15 @@
iomgr_buffer_cumulative_bytes_used_ += buffer_desc->buffer_len();
}
read_in_flight_ = true;
+ if (use_local_buff) {
+ file_reader = local_buffer_reader_.get();
+ file_ = disk_buffer_file_->path();
+ } else {
+ file_reader = file_reader_.get();
+ }
+ use_local_buffer_ = use_local_buff;
}
+ DCHECK(file_reader != nullptr);
// No locks in this section. Only working on local vars. We don't want to hold a
// lock across the read call.
@@ -265,7 +274,7 @@
}
ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
- FileReader* file_reader = file_reader_.get();
+ bool use_local_buffer = false;
if (disk_file_ != nullptr && disk_file_->disk_type() != DiskFileType::LOCAL) {
// The sequence for acquiring the locks should always be from the local to
// the remote to avoid deadlocks.
@@ -283,18 +292,15 @@
// If the local buffer exists, we can read from the local buffer, otherwise,
// we will read from the remote file system.
if (!disk_buffer_file_->is_deleted(buffer_file_lock)) {
- file_reader = local_buffer_reader_.get();
- file_ = disk_buffer_file_->path();
- use_local_buffer_ = true;
+ use_local_buffer = true;
} else {
// Read from the remote file. The remote file must be in persisted status.
DCHECK(disk_file_->is_persisted(file_lock));
- use_local_buffer_ = false;
}
}
- return DoReadInternal(queue, disk_id, file_reader);
+ return DoReadInternal(queue, disk_id, use_local_buffer);
}
- return DoReadInternal(queue, disk_id, file_reader);
+ return DoReadInternal(queue, disk_id, use_local_buffer);
}
Status ScanRange::ReadSubRanges(
@@ -382,13 +388,13 @@
void ScanRange::CancelInternal(const Status& status, bool read_error) {
DCHECK(io_mgr_ != nullptr);
DCHECK(!status.ok());
- FileReader* file_reader =
- use_local_buffer_ ? local_buffer_reader_.get() : file_reader_.get();
+ FileReader* file_reader = nullptr;
{
// Grab both locks to make sure that we don't change 'cancel_status_' while other
// threads are in critical sections.
unique_lock<mutex> scan_range_lock(lock_);
{
+ file_reader = use_local_buffer_ ? local_buffer_reader_.get() : file_reader_.get();
unique_lock<SpinLock> fs_lock(file_reader->lock());
DCHECK(Validate()) << DebugString();
// If already cancelled, preserve the original reason for cancellation. Most of the
@@ -418,7 +424,10 @@
// TODO: IMPALA-4249 - this Close() call makes it unsafe to reuse a cancelled scan
// range, because there is no synchronisation between this Close() call and the
// client adding the ScanRange back into the IoMgr.
- if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) file_reader->Close();
+ if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) {
+ DCHECK(file_reader != nullptr);
+ file_reader->Close();
+ }
}
void ScanRange::WaitForInFlightRead() {