| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "runtime/tmp-file-mgr.h" |
| |
| #include <limits> |
| |
| #include <boost/algorithm/string.hpp> |
| #include <boost/filesystem.hpp> |
| #include <boost/lexical_cast.hpp> |
| #include <boost/thread/locks.hpp> |
| #include <boost/uuid/random_generator.hpp> |
| #include <boost/uuid/uuid_io.hpp> |
| #include <gutil/strings/join.h> |
| #include <gutil/strings/substitute.h> |
| |
| #include "runtime/io/disk-io-mgr.h" |
| #include "runtime/io/request-context.h" |
| #include "runtime/runtime-state.h" |
| #include "runtime/tmp-file-mgr-internal.h" |
| #include "util/bit-util.h" |
| #include "util/collection-metrics.h" |
| #include "util/debug-util.h" |
| #include "util/disk-info.h" |
| #include "util/filesystem-util.h" |
| #include "util/parse-util.h" |
| #include "util/pretty-printer.h" |
| #include "util/runtime-profile-counters.h" |
| |
| #include "common/names.h" |
| |
| DEFINE_bool(disk_spill_encryption, true, |
| "Set this to encrypt and perform an integrity " |
| "check on all data spilled to disk during a query"); |
| DEFINE_string(scratch_dirs, "/tmp", |
| "Writable scratch directories. " |
| "This is a comma-separated list of directories. Each directory is " |
| "specified as the directory path and an optional limit on the bytes that will " |
| "be allocated in that directory. If the optional limit is provided, the path and " |
| "the limit are separated by a colon. E.g. '/dir1:10G,/dir2:5GB,/dir3' will allow " |
| "allocating up to 10GB of scratch in /dir1, 5GB of scratch in /dir2 and an " |
| "unlimited amount in /dir3."); |
| DEFINE_bool(allow_multiple_scratch_dirs_per_device, true, |
| "If false and --scratch_dirs contains multiple directories on the same device, " |
| "then only the first writable directory is used"); |
| |
| using boost::algorithm::is_any_of; |
| using boost::algorithm::join; |
| using boost::algorithm::split; |
| using boost::algorithm::token_compress_on; |
| using boost::filesystem::absolute; |
| using boost::filesystem::path; |
| using boost::uuids::random_generator; |
| using namespace impala::io; |
| using namespace strings; |
| |
| namespace impala { |
| |
| const string TMP_SUB_DIR_NAME = "impala-scratch"; |
| const uint64_t AVAILABLE_SPACE_THRESHOLD_MB = 1024; |
| |
| // Metric keys |
| const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS = "tmp-file-mgr.active-scratch-dirs"; |
| const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST = |
| "tmp-file-mgr.active-scratch-dirs.list"; |
| const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK = |
| "tmp-file-mgr.scratch-space-bytes-used-high-water-mark"; |
| const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED = |
| "tmp-file-mgr.scratch-space-bytes-used"; |
| const string SCRATCH_DIR_BYTES_USED_FORMAT = |
| "tmp-file-mgr.scratch-space-bytes-used.dir-$0"; |
| |
| TmpFileMgr::TmpFileMgr() |
| : initialized_(false), |
| num_active_scratch_dirs_metric_(nullptr), |
| active_scratch_dirs_metric_(nullptr), |
| scratch_bytes_used_metric_(nullptr) {} |
| |
| Status TmpFileMgr::Init(MetricGroup* metrics) { |
| return InitCustom( |
| FLAGS_scratch_dirs, !FLAGS_allow_multiple_scratch_dirs_per_device, metrics); |
| } |
| |
| Status TmpFileMgr::InitCustom( |
| const string& tmp_dirs_spec, bool one_dir_per_device, MetricGroup* metrics) { |
| vector<string> all_tmp_dirs; |
| // Empty string should be interpreted as no scratch |
| if (!tmp_dirs_spec.empty()) { |
| split(all_tmp_dirs, tmp_dirs_spec, is_any_of(","), token_compress_on); |
| } |
| return InitCustom(all_tmp_dirs, one_dir_per_device, metrics); |
| } |
| |
| Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers, |
| bool one_dir_per_device, MetricGroup* metrics) { |
| DCHECK(!initialized_); |
| if (tmp_dir_specifiers.empty()) { |
| LOG(WARNING) << "Running without spill to disk: no scratch directories provided."; |
| } |
| vector<TmpDir> tmp_dirs; |
| // Parse the directory specifiers. Don't return an error on parse errors, just log a |
| // warning - we don't want to abort process startup because of misconfigured scratch, |
| // since queries will generally still be runnable. |
| for (const string& tmp_dir_spec : tmp_dir_specifiers) { |
| vector<string> toks; |
| split(toks, tmp_dir_spec, is_any_of(":"), token_compress_on); |
| if (toks.size() > 2) { |
| LOG(ERROR) << "Could not parse temporary dir specifier, too many colons: '" |
| << tmp_dir_spec << "'"; |
| continue; |
| } |
| int64_t bytes_limit = numeric_limits<int64_t>::max(); |
| if (toks.size() == 2) { |
| bool is_percent; |
| bytes_limit = ParseUtil::ParseMemSpec(toks[1], &is_percent, 0); |
| if (bytes_limit < 0 || is_percent) { |
| LOG(ERROR) << "Malformed data cache capacity configuration '" << tmp_dir_spec |
| << "'"; |
| continue; |
| } else if (bytes_limit == 0) { |
| // Interpret -1, 0 or empty string as no limit. |
| bytes_limit = numeric_limits<int64_t>::max(); |
| } |
| } |
| IntGauge* bytes_used_metric = metrics->AddGauge( |
| SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", tmp_dirs.size())); |
| tmp_dirs.emplace_back(toks[0], bytes_limit, bytes_used_metric); |
| } |
| |
| vector<bool> is_tmp_dir_on_disk(DiskInfo::num_disks(), false); |
| // For each tmp directory, find the disk it is on, |
| // so additional tmp directories on the same disk can be skipped. |
| for (int i = 0; i < tmp_dirs.size(); ++i) { |
| path tmp_path(trim_right_copy_if(tmp_dirs[i].path, is_any_of("/"))); |
| tmp_path = absolute(tmp_path); |
| path scratch_subdir_path(tmp_path / TMP_SUB_DIR_NAME); |
| // tmp_path must be a writable directory. |
| Status status = FileSystemUtil::VerifyIsDirectory(tmp_path.string()); |
| if (!status.ok()) { |
| LOG(WARNING) << "Cannot use directory " << tmp_path.string() << " for scratch: " |
| << status.msg().msg(); |
| continue; |
| } |
| // Find the disk id of tmp_path. Add the scratch directory if there isn't another |
| // directory on the same disk (or if we don't know which disk it is on). |
| int disk_id = DiskInfo::disk_id(tmp_path.c_str()); |
| if (!one_dir_per_device || disk_id < 0 || !is_tmp_dir_on_disk[disk_id]) { |
| uint64_t available_space; |
| RETURN_IF_ERROR(FileSystemUtil::GetSpaceAvailable(tmp_path.string(), |
| &available_space)); |
| if (available_space < AVAILABLE_SPACE_THRESHOLD_MB * 1024 * 1024) { |
| LOG(WARNING) << "Filesystem containing scratch directory " << tmp_path |
| << " has less than " << AVAILABLE_SPACE_THRESHOLD_MB |
| << "MB available."; |
| } |
| // Create the directory, destroying if already present. If this succeeds, we will |
| // have an empty writable scratch directory. |
| status = FileSystemUtil::RemoveAndCreateDirectory(scratch_subdir_path.string()); |
| if (status.ok()) { |
| if (disk_id >= 0) is_tmp_dir_on_disk[disk_id] = true; |
| LOG(INFO) << "Using scratch directory " << scratch_subdir_path.string() << " on " |
| << "disk " << disk_id |
| << " limit: " << PrettyPrinter::PrintBytes(tmp_dirs[i].bytes_limit); |
| tmp_dirs_.emplace_back(scratch_subdir_path.string(), tmp_dirs[i].bytes_limit, |
| tmp_dirs[i].bytes_used_metric); |
| } else { |
| LOG(WARNING) << "Could not remove and recreate directory " |
| << scratch_subdir_path.string() << ": cannot use it for scratch. " |
| << "Error was: " << status.msg().msg(); |
| } |
| } |
| } |
| |
| DCHECK(metrics != nullptr); |
| num_active_scratch_dirs_metric_ = |
| metrics->AddGauge(TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS, 0); |
| active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister( |
| metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>()); |
| num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size()); |
| for (int i = 0; i < tmp_dirs_.size(); ++i) { |
| active_scratch_dirs_metric_->Add(tmp_dirs_[i].path); |
| } |
| scratch_bytes_used_metric_ = |
| metrics->AddHWMGauge(TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK, |
| TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED, 0); |
| |
| initialized_ = true; |
| |
| if (tmp_dirs_.empty() && !tmp_dirs.empty()) { |
| LOG(ERROR) << "Running without spill to disk: could not use any scratch " |
| << "directories in list: " << join(tmp_dir_specifiers, ",") |
| << ". See previous warnings for information on causes."; |
| } |
| return Status::OK(); |
| } |
| |
| void TmpFileMgr::NewFile( |
| FileGroup* file_group, DeviceId device_id, unique_ptr<File>* new_file) { |
| DCHECK(initialized_); |
| DCHECK_GE(device_id, 0); |
| DCHECK_LT(device_id, tmp_dirs_.size()); |
| DCHECK(file_group != nullptr); |
| // Generate the full file path. |
| string unique_name = lexical_cast<string>(random_generator()()); |
| stringstream file_name; |
| file_name << PrintId(file_group->unique_id()) << "_" << unique_name; |
| path new_file_path(tmp_dirs_[device_id].path); |
| new_file_path /= file_name.str(); |
| |
| new_file->reset(new File(file_group, device_id, new_file_path.string())); |
| } |
| |
| string TmpFileMgr::GetTmpDirPath(DeviceId device_id) const { |
| DCHECK(initialized_); |
| DCHECK_GE(device_id, 0); |
| DCHECK_LT(device_id, tmp_dirs_.size()); |
| return tmp_dirs_[device_id].path; |
| } |
| |
| int TmpFileMgr::NumActiveTmpDevices() { |
| DCHECK(initialized_); |
| return tmp_dirs_.size(); |
| } |
| |
| vector<TmpFileMgr::DeviceId> TmpFileMgr::ActiveTmpDevices() { |
| vector<TmpFileMgr::DeviceId> devices; |
| for (DeviceId device_id = 0; device_id < tmp_dirs_.size(); ++device_id) { |
| devices.push_back(device_id); |
| } |
| return devices; |
| } |
| |
| TmpFileMgr::File::File(FileGroup* file_group, DeviceId device_id, const string& path) |
| : file_group_(file_group), |
| path_(path), |
| device_id_(device_id), |
| disk_id_(DiskInfo::disk_id(path.c_str())), |
| bytes_allocated_(0), |
| blacklisted_(false) { |
| DCHECK(file_group != nullptr); |
| } |
| |
| bool TmpFileMgr::File::AllocateSpace(int64_t num_bytes, int64_t* offset) { |
| DCHECK_GT(num_bytes, 0); |
| TmpDir* dir = GetDir(); |
| // Increment optimistically and roll back if the limit is exceeded. |
| if (dir->bytes_used_metric->Increment(num_bytes) > dir->bytes_limit) { |
| dir->bytes_used_metric->Increment(-num_bytes); |
| return false; |
| } |
| *offset = bytes_allocated_; |
| bytes_allocated_ += num_bytes; |
| return true; |
| } |
| |
| int TmpFileMgr::File::AssignDiskQueue() const { |
| return file_group_->io_mgr_->AssignQueue(path_.c_str(), disk_id_, false); |
| } |
| |
| void TmpFileMgr::File::Blacklist(const ErrorMsg& msg) { |
| LOG(ERROR) << "Error for temporary file '" << path_ << "': " << msg.msg(); |
| blacklisted_ = true; |
| } |
| |
| Status TmpFileMgr::File::Remove() { |
| // Remove the file if present (it may not be present if no writes completed). |
| Status status = FileSystemUtil::RemovePaths({path_}); |
| GetDir()->bytes_used_metric->Increment(-bytes_allocated_); |
| return status; |
| } |
| |
| TmpFileMgr::TmpDir* TmpFileMgr::File::GetDir() { |
| return &file_group_->tmp_file_mgr_->tmp_dirs_[device_id_]; |
| } |
| |
| string TmpFileMgr::File::DebugString() { |
| return Substitute("File $0 path '$1' device id $2 disk id $3 bytes allocated $4 " |
| "blacklisted $5", this, path_, device_id_, disk_id_, bytes_allocated_, |
| blacklisted_); |
| } |
| |
| TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, |
| RuntimeProfile* profile, const TUniqueId& unique_id, int64_t bytes_limit) |
| : tmp_file_mgr_(tmp_file_mgr), |
| io_mgr_(io_mgr), |
| io_ctx_(nullptr), |
| unique_id_(unique_id), |
| bytes_limit_(bytes_limit), |
| write_counter_(ADD_COUNTER(profile, "ScratchWrites", TUnit::UNIT)), |
| bytes_written_counter_(ADD_COUNTER(profile, "ScratchBytesWritten", TUnit::BYTES)), |
| read_counter_(ADD_COUNTER(profile, "ScratchReads", TUnit::UNIT)), |
| bytes_read_counter_(ADD_COUNTER(profile, "ScratchBytesRead", TUnit::BYTES)), |
| scratch_space_bytes_used_counter_( |
| ADD_COUNTER(profile, "ScratchFileUsedBytes", TUnit::BYTES)), |
| disk_read_timer_(ADD_TIMER(profile, "TotalReadBlockTime")), |
| encryption_timer_(ADD_TIMER(profile, "TotalEncryptionTime")), |
| current_bytes_allocated_(0), |
| next_allocation_index_(0), |
| free_ranges_(64) { |
| DCHECK(tmp_file_mgr != nullptr); |
| io_ctx_ = io_mgr_->RegisterContext(); |
| } |
| |
| TmpFileMgr::FileGroup::~FileGroup() { |
| DCHECK_EQ(tmp_files_.size(), 0); |
| } |
| |
| Status TmpFileMgr::FileGroup::CreateFiles() { |
| lock_.DCheckLocked(); |
| DCHECK(tmp_files_.empty()); |
| vector<DeviceId> tmp_devices = tmp_file_mgr_->ActiveTmpDevices(); |
| int files_allocated = 0; |
| // Initialize the tmp files and the initial file to use. |
| for (int i = 0; i < tmp_devices.size(); ++i) { |
| TmpFileMgr::DeviceId device_id = tmp_devices[i]; |
| unique_ptr<TmpFileMgr::File> tmp_file; |
| tmp_file_mgr_->NewFile(this, device_id, &tmp_file); |
| tmp_files_.emplace_back(std::move(tmp_file)); |
| ++files_allocated; |
| } |
| DCHECK_EQ(tmp_files_.size(), files_allocated); |
| if (tmp_files_.size() == 0) return ScratchAllocationFailedStatus({}); |
| // Start allocating on a random device to avoid overloading the first device. |
| next_allocation_index_ = rand() % tmp_files_.size(); |
| return Status::OK(); |
| } |
| |
| void TmpFileMgr::FileGroup::Close() { |
| // Cancel writes before deleting the files, since in-flight writes could re-create |
| // deleted files. |
| if (io_ctx_ != nullptr) io_mgr_->UnregisterContext(io_ctx_.get()); |
| for (std::unique_ptr<TmpFileMgr::File>& file : tmp_files_) { |
| Status status = file->Remove(); |
| if (!status.ok()) { |
| LOG(WARNING) << "Error removing scratch file '" << file->path() |
| << "': " << status.msg().msg(); |
| } |
| } |
| tmp_file_mgr_->scratch_bytes_used_metric_->Increment( |
| -1 * scratch_space_bytes_used_counter_->value()); |
| |
| tmp_files_.clear(); |
| } |
| |
| Status TmpFileMgr::FileGroup::AllocateSpace( |
| int64_t num_bytes, File** tmp_file, int64_t* file_offset) { |
| lock_guard<SpinLock> lock(lock_); |
| int64_t scratch_range_bytes = max<int64_t>(1L, BitUtil::RoundUpToPowerOfTwo(num_bytes)); |
| int free_ranges_idx = BitUtil::Log2Ceiling64(scratch_range_bytes); |
| if (!free_ranges_[free_ranges_idx].empty()) { |
| *tmp_file = free_ranges_[free_ranges_idx].back().first; |
| *file_offset = free_ranges_[free_ranges_idx].back().second; |
| free_ranges_[free_ranges_idx].pop_back(); |
| return Status::OK(); |
| } |
| |
| if (bytes_limit_ != -1 |
| && current_bytes_allocated_ + scratch_range_bytes > bytes_limit_) { |
| return Status(TErrorCode::SCRATCH_LIMIT_EXCEEDED, bytes_limit_, GetBackendString()); |
| } |
| |
| // Lazily create the files on the first write. |
| if (tmp_files_.empty()) RETURN_IF_ERROR(CreateFiles()); |
| |
| // Track the indices of any directories where we failed due to capacity. This is |
| // required for error reporting if we are totally out of capacity so that it's clear |
| // that some disks were at capacity. |
| vector<int> at_capacity_dirs; |
| |
| // Find the next physical file in round-robin order and allocate a range from it. |
| for (int attempt = 0; attempt < tmp_files_.size(); ++attempt) { |
| int idx = next_allocation_index_; |
| next_allocation_index_ = (next_allocation_index_ + 1) % tmp_files_.size(); |
| *tmp_file = tmp_files_[idx].get(); |
| if ((*tmp_file)->is_blacklisted()) continue; |
| if (!(*tmp_file)->AllocateSpace(scratch_range_bytes, file_offset)) { |
| at_capacity_dirs.push_back(idx); |
| continue; |
| } |
| scratch_space_bytes_used_counter_->Add(scratch_range_bytes); |
| tmp_file_mgr_->scratch_bytes_used_metric_->Increment(scratch_range_bytes); |
| current_bytes_allocated_ += num_bytes; |
| return Status::OK(); |
| } |
| return ScratchAllocationFailedStatus(at_capacity_dirs); |
| } |
| |
| void TmpFileMgr::FileGroup::RecycleFileRange(unique_ptr<WriteHandle> handle) { |
| int64_t scratch_range_bytes = |
| max<int64_t>(1L, BitUtil::RoundUpToPowerOfTwo(handle->len())); |
| int free_ranges_idx = BitUtil::Log2Ceiling64(scratch_range_bytes); |
| lock_guard<SpinLock> lock(lock_); |
| free_ranges_[free_ranges_idx].emplace_back( |
| handle->file_, handle->write_range_->offset()); |
| } |
| |
| Status TmpFileMgr::FileGroup::Write( |
| MemRange buffer, WriteDoneCallback cb, unique_ptr<TmpFileMgr::WriteHandle>* handle) { |
| DCHECK_GE(buffer.len(), 0); |
| |
| File* tmp_file; |
| int64_t file_offset; |
| RETURN_IF_ERROR(AllocateSpace(buffer.len(), &tmp_file, &file_offset)); |
| |
| unique_ptr<WriteHandle> tmp_handle(new WriteHandle(encryption_timer_, cb)); |
| WriteHandle* tmp_handle_ptr = tmp_handle.get(); // Pass ptr by value into lambda. |
| WriteRange::WriteDoneCallback callback = [this, tmp_handle_ptr]( |
| const Status& write_status) { WriteComplete(tmp_handle_ptr, write_status); }; |
| RETURN_IF_ERROR( |
| tmp_handle->Write(io_ctx_.get(), tmp_file, file_offset, buffer, callback)); |
| write_counter_->Add(1); |
| bytes_written_counter_->Add(buffer.len()); |
| *handle = move(tmp_handle); |
| return Status::OK(); |
| } |
| |
| Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, MemRange buffer) { |
| RETURN_IF_ERROR(ReadAsync(handle, buffer)); |
| return WaitForAsyncRead(handle, buffer); |
| } |
| |
| Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) { |
| DCHECK(handle->write_range_ != nullptr); |
| DCHECK(!handle->is_cancelled_); |
| DCHECK_EQ(buffer.len(), handle->len()); |
| Status status; |
| |
| // Don't grab 'write_state_lock_' in this method - it is not necessary because we |
| // don't touch any members that it protects and could block other threads for the |
| // duration of the synchronous read. |
| DCHECK(!handle->write_in_flight_); |
| DCHECK(handle->read_range_ == nullptr); |
| DCHECK(handle->write_range_ != nullptr); |
| // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state |
| // since the write is not in flight. |
| handle->read_range_ = scan_range_pool_.Add(new ScanRange); |
| handle->read_range_->Reset(nullptr, handle->write_range_->file(), |
| handle->write_range_->len(), handle->write_range_->offset(), |
| handle->write_range_->disk_id(), false, false, ScanRange::INVALID_MTIME, |
| BufferOpts::ReadInto(buffer.data(), buffer.len(), BufferOpts::NO_CACHING)); |
| read_counter_->Add(1); |
| bytes_read_counter_->Add(buffer.len()); |
| bool needs_buffers; |
| RETURN_IF_ERROR(io_ctx_->StartScanRange(handle->read_range_, &needs_buffers)); |
| DCHECK(!needs_buffers) << "Already provided a buffer"; |
| return Status::OK(); |
| } |
| |
| Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* handle, MemRange buffer) { |
| DCHECK(handle->read_range_ != nullptr); |
| // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state |
| // since the write is not in flight. |
| SCOPED_TIMER(disk_read_timer_); |
| unique_ptr<BufferDescriptor> io_mgr_buffer; |
| Status status = handle->read_range_->GetNext(&io_mgr_buffer); |
| if (!status.ok()) goto exit; |
| DCHECK(io_mgr_buffer != NULL); |
| DCHECK(io_mgr_buffer->eosr()); |
| DCHECK_LE(io_mgr_buffer->len(), buffer.len()); |
| if (io_mgr_buffer->len() < buffer.len()) { |
| // The read was truncated - this is an error. |
| status = Status(TErrorCode::SCRATCH_READ_TRUNCATED, buffer.len(), |
| handle->write_range_->file(), GetBackendString(), handle->write_range_->offset(), |
| io_mgr_buffer->len()); |
| goto exit; |
| } |
| DCHECK_EQ(io_mgr_buffer->buffer(), buffer.data()); |
| |
| if (FLAGS_disk_spill_encryption) { |
| status = handle->CheckHashAndDecrypt(buffer); |
| if (!status.ok()) goto exit; |
| } |
| exit: |
| // Always return the buffer before exiting to avoid leaking it. |
| if (io_mgr_buffer != nullptr) handle->read_range_->ReturnBuffer(move(io_mgr_buffer)); |
| handle->read_range_ = nullptr; |
| return status; |
| } |
| |
| Status TmpFileMgr::FileGroup::RestoreData( |
| unique_ptr<WriteHandle> handle, MemRange buffer) { |
| DCHECK_EQ(handle->write_range_->data(), buffer.data()); |
| DCHECK_EQ(handle->len(), buffer.len()); |
| DCHECK(!handle->write_in_flight_); |
| DCHECK(handle->read_range_ == nullptr); |
| // Decrypt after the write is finished, so that we don't accidentally write decrypted |
| // data to disk. |
| Status status; |
| if (FLAGS_disk_spill_encryption) { |
| status = handle->CheckHashAndDecrypt(buffer); |
| } |
| RecycleFileRange(move(handle)); |
| return status; |
| } |
| |
| void TmpFileMgr::FileGroup::DestroyWriteHandle(unique_ptr<WriteHandle> handle) { |
| handle->Cancel(); |
| handle->WaitForWrite(); |
| RecycleFileRange(move(handle)); |
| } |
| |
| void TmpFileMgr::FileGroup::WriteComplete( |
| WriteHandle* handle, const Status& write_status) { |
| Status status; |
| if (!write_status.ok()) { |
| status = RecoverWriteError(handle, write_status); |
| if (status.ok()) return; |
| } else { |
| status = write_status; |
| } |
| handle->WriteComplete(status); |
| } |
| |
| Status TmpFileMgr::FileGroup::RecoverWriteError( |
| WriteHandle* handle, const Status& write_status) { |
| DCHECK(!write_status.ok()); |
| DCHECK(handle->file_ != nullptr); |
| |
| // We can't recover from cancellation or memory limit exceeded. |
| if (write_status.IsCancelled() || write_status.IsMemLimitExceeded()) { |
| return write_status; |
| } |
| |
| // Save and report the error before retrying so that the failure isn't silent. |
| { |
| lock_guard<SpinLock> lock(lock_); |
| scratch_errors_.push_back(write_status); |
| } |
| handle->file_->Blacklist(write_status.msg()); |
| |
| // Do not retry cancelled writes or propagate the error, simply return CANCELLED. |
| if (handle->is_cancelled_) return Status::CancelledInternal("TmpFileMgr write"); |
| |
| TmpFileMgr::File* tmp_file; |
| int64_t file_offset; |
| // Discard the scratch file range - we will not reuse ranges from a bad file. |
| // Choose another file to try. Blacklisting ensures we don't retry the same file. |
| // If this fails, the status will include all the errors in 'scratch_errors_'. |
| RETURN_IF_ERROR(AllocateSpace(handle->len(), &tmp_file, &file_offset)); |
| return handle->RetryWrite(io_ctx_.get(), tmp_file, file_offset); |
| } |
| |
| Status TmpFileMgr::FileGroup::ScratchAllocationFailedStatus( |
| const vector<int>& at_capacity_dirs) { |
| vector<string> tmp_dir_paths; |
| for (TmpDir& tmp_dir : tmp_file_mgr_->tmp_dirs_) { |
| tmp_dir_paths.push_back(tmp_dir.path); |
| } |
| vector<string> at_capacity_dir_paths; |
| for (int dir_idx : at_capacity_dirs) { |
| at_capacity_dir_paths.push_back(tmp_file_mgr_->tmp_dirs_[dir_idx].path); |
| } |
| Status status(TErrorCode::SCRATCH_ALLOCATION_FAILED, join(tmp_dir_paths, ","), |
| GetBackendString(), |
| PrettyPrinter::PrintBytes( |
| tmp_file_mgr_->scratch_bytes_used_metric_->current_value()->GetValue()), |
| PrettyPrinter::PrintBytes(current_bytes_allocated_), |
| join(at_capacity_dir_paths, ",")); |
| // Include all previous errors that may have caused the failure. |
| for (Status& err : scratch_errors_) status.MergeStatus(err); |
| return status; |
| } |
| |
| string TmpFileMgr::FileGroup::DebugString() { |
| lock_guard<SpinLock> lock(lock_); |
| stringstream ss; |
| ss << "FileGroup " << this << " bytes limit " << bytes_limit_ |
| << " current bytes allocated " << current_bytes_allocated_ |
| << " next allocation index " << next_allocation_index_ << " writes " |
| << write_counter_->value() << " bytes written " << bytes_written_counter_->value() |
| << " reads " << read_counter_->value() << " bytes read " |
| << bytes_read_counter_->value() << " scratch bytes used " |
| << scratch_space_bytes_used_counter_ << " dist read timer " |
| << disk_read_timer_->value() << " encryption timer " << encryption_timer_->value() |
| << endl |
| << " " << tmp_files_.size() << " files:" << endl; |
| for (unique_ptr<File>& file : tmp_files_) { |
| ss << " " << file->DebugString() << endl; |
| } |
| return ss.str(); |
| } |
| |
| TmpFileMgr::WriteHandle::WriteHandle( |
| RuntimeProfile::Counter* encryption_timer, WriteDoneCallback cb) |
| : cb_(cb), |
| encryption_timer_(encryption_timer), |
| file_(nullptr), |
| read_range_(nullptr), |
| is_cancelled_(false), |
| write_in_flight_(false) {} |
| |
| TmpFileMgr::WriteHandle::~WriteHandle() { |
| DCHECK(!write_in_flight_); |
| DCHECK(read_range_ == nullptr); |
| } |
| |
| string TmpFileMgr::WriteHandle::TmpFilePath() const { |
| if (file_ == nullptr) return ""; |
| return file_->path(); |
| } |
| |
| int64_t TmpFileMgr::WriteHandle::len() const { |
| return write_range_->len(); |
| } |
| |
| Status TmpFileMgr::WriteHandle::Write(RequestContext* io_ctx, |
| File* file, int64_t offset, MemRange buffer, |
| WriteRange::WriteDoneCallback callback) { |
| DCHECK(!write_in_flight_); |
| |
| if (FLAGS_disk_spill_encryption) RETURN_IF_ERROR(EncryptAndHash(buffer)); |
| |
| // Set all member variables before calling AddWriteRange(): after it succeeds, |
| // WriteComplete() may be called concurrently with the remainder of this function. |
| file_ = file; |
| write_range_.reset( |
| new WriteRange(file->path(), offset, file->AssignDiskQueue(), callback)); |
| write_range_->SetData(buffer.data(), buffer.len()); |
| write_in_flight_ = true; |
| Status status = io_ctx->AddWriteRange(write_range_.get()); |
| if (!status.ok()) { |
| // The write will not be in flight if we returned with an error. |
| write_in_flight_ = false; |
| // We won't return this WriteHandle to the client of FileGroup, so it won't be |
| // cancelled in the normal way. Mark the handle as cancelled so it can be |
| // cleanly destroyed. |
| is_cancelled_ = true; |
| return status; |
| } |
| return Status::OK(); |
| } |
| |
| Status TmpFileMgr::WriteHandle::RetryWrite( |
| RequestContext* io_ctx, File* file, int64_t offset) { |
| DCHECK(write_in_flight_); |
| file_ = file; |
| write_range_->SetRange(file->path(), offset, file->AssignDiskQueue()); |
| Status status = io_ctx->AddWriteRange(write_range_.get()); |
| if (!status.ok()) { |
| // The write will not be in flight if we returned with an error. |
| write_in_flight_ = false; |
| return status; |
| } |
| return Status::OK(); |
| } |
| |
| void TmpFileMgr::WriteHandle::WriteComplete(const Status& write_status) { |
| WriteDoneCallback cb; |
| { |
| lock_guard<mutex> lock(write_state_lock_); |
| DCHECK(write_in_flight_); |
| write_in_flight_ = false; |
| // Need to extract 'cb_' because once 'write_in_flight_' is false and we release |
| // 'write_state_lock_', 'this' may be destroyed. |
| cb = move(cb_); |
| |
| // Notify before releasing the lock - after the lock is released 'this' may be |
| // destroyed. |
| write_complete_cv_.NotifyAll(); |
| } |
| // Call 'cb' last - once 'cb' is called client code may call Read() or destroy this |
| // handle. |
| cb(write_status); |
| } |
| |
| void TmpFileMgr::WriteHandle::Cancel() { |
| CancelRead(); |
| { |
| unique_lock<mutex> lock(write_state_lock_); |
| is_cancelled_ = true; |
| // TODO: in future, if DiskIoMgr supported write cancellation, we could cancel it |
| // here. |
| } |
| } |
| |
| void TmpFileMgr::WriteHandle::CancelRead() { |
| if (read_range_ != nullptr) { |
| read_range_->Cancel(Status::CancelledInternal("TmpFileMgr read")); |
| read_range_ = nullptr; |
| } |
| } |
| |
| void TmpFileMgr::WriteHandle::WaitForWrite() { |
| unique_lock<mutex> lock(write_state_lock_); |
| while (write_in_flight_) write_complete_cv_.Wait(lock); |
| } |
| |
| Status TmpFileMgr::WriteHandle::EncryptAndHash(MemRange buffer) { |
| DCHECK(FLAGS_disk_spill_encryption); |
| SCOPED_TIMER(encryption_timer_); |
| // Since we're using GCM/CTR/CFB mode, we must take care not to reuse a |
| // key/IV pair. Regenerate a new key and IV for every data buffer we write. |
| key_.InitializeRandom(); |
| RETURN_IF_ERROR(key_.Encrypt(buffer.data(), buffer.len(), buffer.data())); |
| |
| if (!key_.IsGcmMode()) { |
| hash_.Compute(buffer.data(), buffer.len()); |
| } |
| return Status::OK(); |
| } |
| |
| Status TmpFileMgr::WriteHandle::CheckHashAndDecrypt(MemRange buffer) { |
| DCHECK(FLAGS_disk_spill_encryption); |
| DCHECK(write_range_ != nullptr); |
| SCOPED_TIMER(encryption_timer_); |
| |
| // GCM mode will verify the integrity by itself |
| if (!key_.IsGcmMode()) { |
| if (!hash_.Verify(buffer.data(), buffer.len())) { |
| return Status(TErrorCode::SCRATCH_READ_VERIFY_FAILED, buffer.len(), |
| write_range_->file(), GetBackendString(), write_range_->offset()); |
| } |
| } |
| Status decrypt_status = key_.Decrypt(buffer.data(), buffer.len(), buffer.data()); |
| if (!decrypt_status.ok()) { |
| // Treat decryption failing as a verification failure, but include extra info from |
| // the decryption status. |
| Status result_status(TErrorCode::SCRATCH_READ_VERIFY_FAILED, buffer.len(), |
| write_range_->file(), GetBackendString(), write_range_->offset()); |
| result_status.MergeStatus(decrypt_status); |
| return result_status; |
| } |
| return Status::OK(); |
| } |
| |
| string TmpFileMgr::WriteHandle::DebugString() { |
| unique_lock<mutex> lock(write_state_lock_); |
| stringstream ss; |
| ss << "Write handle " << this << " file '" << file_->path() << "'" |
| << " is cancelled " << is_cancelled_ << " write in flight " << write_in_flight_; |
| if (write_range_ != NULL) { |
| ss << " data " << write_range_->data() << " len " << write_range_->len() |
| << " file offset " << write_range_->offset() |
| << " disk id " << write_range_->disk_id(); |
| } |
| return ss.str(); |
| } |
| } // namespace impala |