blob: 0876c6de64ea4049984125a2506e1cda0089b7c8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/disk-io-mgr-internal.h"
#include "common/names.h"
using namespace impala;
void DiskIoRequestContext::Cancel(const Status& status) {
DCHECK(!status.ok());
// Callbacks are collected in this vector and invoked while no lock is held.
vector<WriteRange::WriteDoneCallback> write_callbacks;
{
lock_guard<mutex> lock(lock_);
DCHECK(Validate()) << endl << DebugString();
// Already being cancelled
if (state_ == DiskIoRequestContext::Cancelled) return;
DCHECK(status_.ok());
status_ = status;
// The reader will be put into a cancelled state until call cleanup is complete.
state_ = DiskIoRequestContext::Cancelled;
// Cancel all scan ranges for this reader. Each range could be one one of
// four queues.
for (int i = 0; i < disk_states_.size(); ++i) {
DiskIoRequestContext::PerDiskState& state = disk_states_[i];
RequestRange* range = NULL;
while ((range = state.in_flight_ranges()->Dequeue()) != NULL) {
if (range->request_type() == RequestType::READ) {
static_cast<ScanRange*>(range)->Cancel(status);
} else {
DCHECK(range->request_type() == RequestType::WRITE);
write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_);
}
}
ScanRange* scan_range;
while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != NULL) {
scan_range->Cancel(status);
}
WriteRange* write_range;
while ((write_range = state.unstarted_write_ranges()->Dequeue()) != NULL) {
write_callbacks.push_back(write_range->callback_);
}
}
ScanRange* range = NULL;
while ((range = ready_to_start_ranges_.Dequeue()) != NULL) {
range->Cancel(status);
}
while ((range = blocked_ranges_.Dequeue()) != NULL) {
range->Cancel(status);
}
while ((range = cached_ranges_.Dequeue()) != NULL) {
range->Cancel(status);
}
// Schedule reader on all disks. The disks will notice it is cancelled and do any
// required cleanup
for (int i = 0; i < disk_states_.size(); ++i) {
DiskIoRequestContext::PerDiskState& state = disk_states_[i];
state.ScheduleContext(this, i);
}
}
for (const WriteRange::WriteDoneCallback& write_callback: write_callbacks) {
write_callback(status_);
}
// Signal reader and unblock the GetNext/Read thread. That read will fail with
// a cancelled status.
ready_to_start_ranges_cv_.notify_all();
}
void DiskIoRequestContext::AddRequestRange(
DiskIoMgr::RequestRange* range, bool schedule_immediately) {
// DCHECK(lock_.is_locked()); // TODO: boost should have this API
DiskIoRequestContext::PerDiskState& state = disk_states_[range->disk_id()];
if (state.done()) {
DCHECK_EQ(state.num_remaining_ranges(), 0);
state.set_done(false);
++num_disks_with_ranges_;
}
bool schedule_context;
if (range->request_type() == RequestType::READ) {
DiskIoMgr::ScanRange* scan_range = static_cast<DiskIoMgr::ScanRange*>(range);
if (schedule_immediately) {
ScheduleScanRange(scan_range);
} else {
state.unstarted_scan_ranges()->Enqueue(scan_range);
num_unstarted_scan_ranges_.Add(1);
}
// If next_scan_range_to_start is NULL, schedule this DiskIoRequestContext so that it will
// be set. If it's not NULL, this context will be scheduled when GetNextRange() is
// invoked.
schedule_context = state.next_scan_range_to_start() == NULL;
} else {
DCHECK(range->request_type() == RequestType::WRITE);
DCHECK(!schedule_immediately);
DiskIoMgr::WriteRange* write_range = static_cast<DiskIoMgr::WriteRange*>(range);
state.unstarted_write_ranges()->Enqueue(write_range);
// ScheduleContext() has no effect if the context is already scheduled,
// so this is safe.
schedule_context = true;
}
if (schedule_context) state.ScheduleContext(this, range->disk_id());
++state.num_remaining_ranges();
}
DiskIoRequestContext::DiskIoRequestContext(DiskIoMgr* parent, int num_disks)
: parent_(parent),
bytes_read_counter_(NULL),
read_timer_(NULL),
active_read_thread_counter_(NULL),
disks_accessed_bitmap_(NULL),
state_(Inactive),
disk_states_(num_disks) {
}
// Resets this object.
void DiskIoRequestContext::Reset(MemTracker* tracker) {
DCHECK_EQ(state_, Inactive);
status_ = Status::OK();
bytes_read_counter_ = NULL;
read_timer_ = NULL;
active_read_thread_counter_ = NULL;
disks_accessed_bitmap_ = NULL;
state_ = Active;
mem_tracker_ = tracker;
num_unstarted_scan_ranges_.Store(0);
num_disks_with_ranges_ = 0;
num_used_buffers_.Store(0);
num_buffers_in_reader_.Store(0);
num_ready_buffers_.Store(0);
total_range_queue_capacity_.Store(0);
num_finished_ranges_.Store(0);
num_remote_ranges_.Store(0);
bytes_read_local_.Store(0);
bytes_read_short_circuit_.Store(0);
bytes_read_dn_cache_.Store(0);
unexpected_remote_bytes_.Store(0);
initial_queue_capacity_ = DiskIoMgr::DEFAULT_QUEUE_CAPACITY;
DCHECK(ready_to_start_ranges_.empty());
DCHECK(blocked_ranges_.empty());
DCHECK(cached_ranges_.empty());
for (int i = 0; i < disk_states_.size(); ++i) {
disk_states_[i].Reset();
}
}
// Dumps out request context information. Lock should be taken by caller
string DiskIoRequestContext::DebugString() const {
stringstream ss;
ss << endl << " DiskIoRequestContext: " << (void*)this << " (state=";
if (state_ == DiskIoRequestContext::Inactive) ss << "Inactive";
if (state_ == DiskIoRequestContext::Cancelled) ss << "Cancelled";
if (state_ == DiskIoRequestContext::Active) ss << "Active";
if (state_ != DiskIoRequestContext::Inactive) {
ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail())
<< " #ready_buffers=" << num_ready_buffers_.Load()
<< " #used_buffers=" << num_used_buffers_.Load()
<< " #num_buffers_in_reader=" << num_buffers_in_reader_.Load()
<< " #finished_scan_ranges=" << num_finished_ranges_.Load()
<< " #disk_with_ranges=" << num_disks_with_ranges_
<< " #disks=" << num_disks_with_ranges_;
for (int i = 0; i < disk_states_.size(); ++i) {
ss << endl << " " << i << ": "
<< "is_on_queue=" << disk_states_[i].is_on_queue()
<< " done=" << disk_states_[i].done()
<< " #num_remaining_scan_ranges=" << disk_states_[i].num_remaining_ranges()
<< " #in_flight_ranges=" << disk_states_[i].in_flight_ranges()->size()
<< " #unstarted_scan_ranges=" << disk_states_[i].unstarted_scan_ranges()->size()
<< " #unstarted_write_ranges="
<< disk_states_[i].unstarted_write_ranges()->size()
<< " #reading_threads=" << disk_states_[i].num_threads_in_op();
}
}
ss << ")";
return ss.str();
}
bool DiskIoRequestContext::Validate() const {
if (state_ == DiskIoRequestContext::Inactive) {
LOG(WARNING) << "state_ == DiskIoRequestContext::Inactive";
return false;
}
if (num_used_buffers_.Load() < 0) {
LOG(WARNING) << "num_used_buffers_ < 0: #used=" << num_used_buffers_.Load();
return false;
}
if (num_ready_buffers_.Load() < 0) {
LOG(WARNING) << "num_ready_buffers_ < 0: #used=" << num_ready_buffers_.Load();
return false;
}
int total_unstarted_ranges = 0;
for (int i = 0; i < disk_states_.size(); ++i) {
const PerDiskState& state = disk_states_[i];
bool on_queue = state.is_on_queue();
int num_reading_threads = state.num_threads_in_op();
total_unstarted_ranges += state.unstarted_scan_ranges()->size();
if (num_reading_threads < 0) {
LOG(WARNING) << "disk_id=" << i
<< "state.num_threads_in_read < 0: #threads="
<< num_reading_threads;
return false;
}
if (state_ != DiskIoRequestContext::Cancelled) {
if (state.unstarted_scan_ranges()->size() + state.in_flight_ranges()->size() >
state.num_remaining_ranges()) {
LOG(WARNING) << "disk_id=" << i
<< " state.unstarted_ranges.size() + state.in_flight_ranges.size()"
<< " > state.num_remaining_ranges:"
<< " #unscheduled=" << state.unstarted_scan_ranges()->size()
<< " #in_flight=" << state.in_flight_ranges()->size()
<< " #remaining=" << state.num_remaining_ranges();
return false;
}
// If we have an in_flight range, the reader must be on the queue or have a
// thread actively reading for it.
if (!state.in_flight_ranges()->empty() && !on_queue && num_reading_threads == 0) {
LOG(WARNING) << "disk_id=" << i
<< " reader has inflight ranges but is not on the disk queue."
<< " #in_flight_ranges=" << state.in_flight_ranges()->size()
<< " #reading_threads=" << num_reading_threads
<< " on_queue=" << on_queue;
return false;
}
if (state.done() && num_reading_threads > 0) {
LOG(WARNING) << "disk_id=" << i
<< " state set to done but there are still threads working."
<< " #reading_threads=" << num_reading_threads;
return false;
}
} else {
// Is Cancelled
if (!state.in_flight_ranges()->empty()) {
LOG(WARNING) << "disk_id=" << i
<< "Reader cancelled but has in flight ranges.";
return false;
}
if (!state.unstarted_scan_ranges()->empty()) {
LOG(WARNING) << "disk_id=" << i
<< "Reader cancelled but has unstarted ranges.";
return false;
}
}
if (state.done() && on_queue) {
LOG(WARNING) << "disk_id=" << i
<< " state set to done but the reader is still on the disk queue."
<< " state.done=true and state.is_on_queue=true";
return false;
}
}
if (state_ != DiskIoRequestContext::Cancelled) {
if (total_unstarted_ranges != num_unstarted_scan_ranges_.Load()) {
LOG(WARNING) << "total_unstarted_ranges=" << total_unstarted_ranges
<< " sum_in_states=" << num_unstarted_scan_ranges_.Load();
return false;
}
} else {
if (!ready_to_start_ranges_.empty()) {
LOG(WARNING) << "Reader cancelled but has ready to start ranges.";
return false;
}
if (!blocked_ranges_.empty()) {
LOG(WARNING) << "Reader cancelled but has blocked ranges.";
return false;
}
}
return true;
}