blob: 70ec1d451db192300081a38640fe598d2f374dec [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <functional>
#include <memory>
#include <utility>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include "common/object-pool.h"
#include "common/status.h"
#include "gen-cpp/Types_types.h" // for TUniqueId
#include "util/condition-variable.h"
#include "util/mem-range.h"
#include "util/metrics-fwd.h"
#include "util/openssl-util.h"
#include "util/runtime-profile.h"
#include "util/spinlock.h"
namespace impala {
namespace io {
class DiskIoMgr;
class RequestContext;
class ScanRange;
class WriteRange;
/// TmpFileMgr provides an abstraction for management of temporary (a.k.a. scratch) files
/// on the filesystem and I/O to and from them. TmpFileMgr manages multiple scratch
/// directories across multiple devices, configured via the --scratch_dirs option.
/// TmpFileMgr manages I/O to scratch files in order to abstract away details of which
/// files are allocated and recovery from certain I/O errors. I/O is done via DiskIoMgr.
/// TmpFileMgr encrypts data written to disk if enabled by the --disk_spill_encryption
/// command-line flag.
/// FileGroups manage scratch space across multiple devices. To write to scratch space,
/// first a FileGroup is created, then FileGroup::Write() is called to asynchronously
/// write a memory buffer to one of the scratch files. FileGroup::Write() returns a
/// WriteHandle, which is used by the caller to identify that write operation. The
/// caller is notified when the asynchronous write completes via a callback, after which
/// the caller can use the WriteHandle to read back the data.
/// Each WriteHandle is backed by a range of data in a scratch file. The first call to
/// Write() will create files for the FileGroup with unique filenames on the configured
/// temporary devices. At most one directory per device is used (unless overridden for
/// testing). The file range of a WriteHandle can be replaced with a different one if
/// a write error is encountered and the data instead needs to be written to a different
/// disk.
/// Free Space Management:
/// Free space is managed within a FileGroup: once a WriteHandle is destroyed, the file
/// range backing it can be recycled for a different WriteHandle. Scratch file ranges
/// are grouped into size classes, each for a power-of-two number of bytes. Free file
/// ranges of each size class are managed separately (i.e. there is no splitting or
/// coalescing of ranges).
/// Resource Management:
/// TmpFileMgr provides some basic support for managing local disk space consumption.
/// A FileGroup can be created with a limit on the total number of bytes allocated across
/// all files. Writes that would exceed the limit fail with an error status.
/// TODO: IMPALA-4683: we could implement smarter handling of failures, e.g. to
/// temporarily blacklist devices that show I/O errors.
class TmpFileMgr {
class File; // Needs to be public for TmpFileMgrTest.
class WriteHandle;
/// DeviceId is an internal unique identifier for a temporary device managed by
/// TmpFileMgr. DeviceIds in the range [0, num tmp devices) are allocated arbitrarily.
/// Needs to be public for TmpFileMgrTest.
typedef int DeviceId;
/// Same typedef as io::WriteRange::WriteDoneCallback.
typedef std::function<void(const Status&)> WriteDoneCallback;
/// A configured temporary directory that TmpFileMgr allocates files in.
struct TmpDir {
TmpDir(const std::string& path, int64_t bytes_limit, IntGauge* bytes_used_metric)
: path(path), bytes_limit(bytes_limit), bytes_used_metric(bytes_used_metric) {}
/// Path to the temporary directory.
const std::string path;
/// Limit on bytes that should be written to this path. Set to maximum value
/// of int64_t if there is no limit.
int64_t const bytes_limit;
/// The current bytes of scratch used for this temporary directory.
IntGauge* const bytes_used_metric;
/// Represents a group of temporary files - one per disk with a scratch directory. The
/// total allocated bytes of the group can be bound by setting the space allocation
/// limit. The owner of the FileGroup object is responsible for calling the Close()
/// method to delete all the files in the group.
/// Public methods of FileGroup and WriteHandle are safe to call concurrently from
/// multiple threads as long as different WriteHandle arguments are provided.
class FileGroup {
/// Initialize a new file group, which will create files using 'tmp_file_mgr'
/// and perform I/O using 'io_mgr'. Adds counters to 'profile' to track scratch
/// space used. 'unique_id' is a unique ID that is used to prefix any scratch file
/// names. It is an error to create multiple FileGroups with the same 'unique_id'.
/// 'bytes_limit' is the limit on the total file space to allocate.
FileGroup(TmpFileMgr* tmp_file_mgr, io::DiskIoMgr* io_mgr, RuntimeProfile* profile,
const TUniqueId& unique_id, int64_t bytes_limit = -1);
/// Asynchronously writes 'buffer' to a temporary file of this file group. If there
/// are multiple scratch files, this can write to any of them, and will attempt to
/// recover from I/O errors on one file by writing to a different file. The memory
/// referenced by 'buffer' must remain valid until the write completes. The callee
/// may rewrite the data in 'buffer' in-place (e.g. to do in-place encryption or
/// compression). The caller should not modify the data in 'buffer' until the write
/// completes or is cancelled, otherwise invalid data may be written to disk.
/// Returns an error if the scratch space cannot be allocated or the write cannot
/// be started. Otherwise 'handle' is set and 'cb' will be called asynchronously from
/// a different thread when the write completes successfully or unsuccessfully or is
/// cancelled.
/// 'handle' must be destroyed by passing the DestroyWriteHandle() or RestoreData().
Status Write(MemRange buffer, WriteDoneCallback cb,
std::unique_ptr<WriteHandle>* handle) WARN_UNUSED_RESULT;
/// Synchronously read the data referenced by 'handle' from the temporary file into
/// 'buffer'. buffer.len() must be the same as handle->len(). Can only be called
/// after a write successfully completes. Should not be called while an async read
/// is in flight. Equivalent to calling ReadAsync() then WaitForAsyncRead().
Status Read(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT;
/// Asynchronously read the data referenced by 'handle' from the temporary file into
/// 'buffer'. buffer.len() must be the same as handle->len(). Can only be called
/// after a write successfully completes. WaitForAsyncRead() must be called before the
/// data in the buffer is valid. Should not be called while an async read
/// is already in flight.
Status ReadAsync(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT;
/// Wait until the read started for 'handle' by ReadAsync() completes. 'buffer'
/// should be the same buffer passed into ReadAsync(). Returns an error if the
/// read fails. Retrying a failed read by calling ReadAsync() again is allowed.
Status WaitForAsyncRead(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT;
/// Restore the original data in the 'buffer' passed to Write(), decrypting or
/// decompressing as necessary. Returns an error if restoring the data fails.
/// The write must not be in-flight - the caller is responsible for waiting for
/// the write to complete.
Status RestoreData(
std::unique_ptr<WriteHandle> handle, MemRange buffer) WARN_UNUSED_RESULT;
/// Wait for the in-flight I/Os to complete and destroy resources associated with
/// 'handle'.
void DestroyWriteHandle(std::unique_ptr<WriteHandle> handle);
/// Calls Remove() on all the files in the group and deletes them.
void Close();
std::string DebugString();
const TUniqueId& unique_id() const { return unique_id_; }
TmpFileMgr* tmp_file_mgr() const { return tmp_file_mgr_; }
friend class File;
friend class TmpFileMgrTest;
/// Initializes the file group with one temporary file per disk with a scratch
/// directory. Returns OK if at least one temporary file could be created.
/// Returns an error if no temporary files were successfully created. Must only be
/// called once. Must be called with 'lock_' held.
Status CreateFiles() WARN_UNUSED_RESULT;
/// Allocate 'num_bytes' bytes in a temporary file. Try multiple disks if error
/// occurs. Returns an error only if no temporary files are usable or the scratch
/// limit is exceeded. Must be called without 'lock_' held.
Status AllocateSpace(
int64_t num_bytes, File** tmp_file, int64_t* file_offset) WARN_UNUSED_RESULT;
/// Add the scratch range from 'handle' to 'free_ranges_' and destroy handle. Must be
/// called without 'lock_' held.
void RecycleFileRange(std::unique_ptr<WriteHandle> handle);
/// Called when the DiskIoMgr write completes for 'handle'. On error, will attempt
/// to retry the write. On success or if the write can't be retried, calls
/// handle->WriteComplete().
void WriteComplete(WriteHandle* handle, const Status& write_status);
/// Handles a write error. Logs the write error and blacklists the device for this
/// file group if the cause was an I/O error. Blacklisting limits the number of times
/// a write is retried because each device will only be tried once. Returns OK if it
/// successfully reissued the write. Returns an error status if the original error
/// was unrecoverable or an unrecoverable error is encountered when reissuing the
/// write. The error status will include all previous I/O errors in its details.
Status RecoverWriteError(
WriteHandle* handle, const Status& write_status) WARN_UNUSED_RESULT;
/// Return a SCRATCH_ALLOCATION_FAILED error with the appropriate information,
/// including scratch directories, the amount of scratch allocated and previous
/// errors that caused this failure. If some directories were at capacity,
/// but had not encountered an error, the indices of these directories in
/// tmp_file_mgr_->tmp_dir_ should be included in 'at_capacity_dirs'.
/// 'lock_' must be held by caller.
Status ScratchAllocationFailedStatus(const std::vector<int>& at_capacity_dirs);
/// The TmpFileMgr it is associated with.
TmpFileMgr* const tmp_file_mgr_;
/// DiskIoMgr used for all I/O to temporary files.
io::DiskIoMgr* const io_mgr_;
/// I/O context used for all reads and writes. Registered in constructor.
std::unique_ptr<io::RequestContext> io_ctx_;
/// Stores scan ranges allocated in Read(). Needed because ScanRange objects may be
/// touched by DiskIoMgr even after the scan is finished.
/// TODO: IMPALA-4249: remove once lifetime of ScanRange objects is better defined.
ObjectPool scan_range_pool_;
/// Unique across all FileGroups. Used to prefix file names.
const TUniqueId unique_id_;
/// Max write space allowed (-1 means no limit).
const int64_t bytes_limit_;
/// Number of write operations (includes writes started but not yet complete).
RuntimeProfile::Counter* const write_counter_;
/// Number of bytes written to disk (includes writes started but not yet complete).
RuntimeProfile::Counter* const bytes_written_counter_;
/// Number of read operations (includes reads started but not yet complete).
RuntimeProfile::Counter* const read_counter_;
/// Number of bytes read from disk (includes reads started but not yet complete).
RuntimeProfile::Counter* const bytes_read_counter_;
/// Amount of scratch space allocated in bytes.
RuntimeProfile::Counter* const scratch_space_bytes_used_counter_;
/// Time spent waiting for disk reads.
RuntimeProfile::Counter* const disk_read_timer_;
/// Time spent in disk spill encryption, decryption, and integrity checking.
RuntimeProfile::Counter* encryption_timer_;
/// Protects below members.
SpinLock lock_;
/// List of files representing the FileGroup.
std::vector<std::unique_ptr<File>> tmp_files_;
/// Total space allocated in this group's files.
int64_t current_bytes_allocated_;
/// Index into 'tmp_files' denoting the file to which the next temporary file range
/// should be allocated from. Used to implement round-robin allocation from temporary
/// files.
int next_allocation_index_;
/// Each vector in free_ranges_[i] is a vector of File/offset pairs for free scratch
/// ranges of length 2^i bytes. Has 64 entries so that every int64_t length has a
/// valid list associated with it.
std::vector<std::vector<std::pair<File*, int64_t>>> free_ranges_;
/// Errors encountered when creating/writing scratch files. We store the history so
/// that we can report the original cause of the scratch errors if we run out of
/// devices to write to.
std::vector<Status> scratch_errors_;
/// A handle to a write operation, backed by a range of a temporary file. The operation
/// is either in-flight or has completed. If it completed with no error and wasn't
/// cancelled then the data is in the file and can be read back.
/// WriteHandle is returned from FileGroup::Write(). After the write completes, the
/// handle can be passed to FileGroup::Read() to read back the data zero or more times.
/// FileGroup::DestroyWriteHandle() can be called at any time to destroy the handle and
/// allow reuse of the scratch file range written to. Alternatively,
/// FileGroup::RestoreData() can be called to reverse the effects of FileGroup::Write()
/// by destroying the handle and restoring the original data to the buffer, so long as
/// the data in the buffer was not modified by the caller.
/// Public methods of WriteHandle are safe to call concurrently from multiple threads.
class WriteHandle {
/// The write must be destroyed by passing it to FileGroup - destroying it before
/// the write completes is an error.
/// Cancel any in-flight read synchronously.
void CancelRead();
/// Path of temporary file backing the block. Intended for use in testing.
/// Returns empty string if no backing file allocated.
std::string TmpFilePath() const;
/// The length of the write range in bytes.
int64_t len() const;
std::string DebugString();
friend class FileGroup;
friend class TmpFileMgrTest;
WriteHandle(RuntimeProfile::Counter* encryption_timer, WriteDoneCallback cb);
/// Starts a write of 'buffer' to 'offset' of 'file'. 'write_in_flight_' must be false
/// before calling. After returning, 'write_in_flight_' is true on success or false on
/// failure and 'is_cancelled_' is set to true on failure.
Status Write(io::RequestContext* io_ctx, File* file,
int64_t offset, MemRange buffer,
WriteDoneCallback callback) WARN_UNUSED_RESULT;
/// Retry the write after the initial write failed with an error, instead writing to
/// 'offset' of 'file'. 'write_in_flight_' must be true before calling.
/// After returning, 'write_in_flight_' is true on success or false on failure.
Status RetryWrite(io::RequestContext* io_ctx, File* file,
int64_t offset) WARN_UNUSED_RESULT;
/// Called when the write has completed successfully or not. Sets 'write_in_flight_'
/// then calls 'cb_'.
void WriteComplete(const Status& write_status);
/// Cancels any in-flight writes or reads. Reads are cancelled synchronously and
/// writes are cancelled asynchronously. After Cancel() is called, writes are not
/// retried. The write callback may be called with a CANCELLED_INTERNALLY status
/// (unless it succeeded or encountered a different error first).
void Cancel();
/// Blocks until the write completes either successfully or unsuccessfully.
/// May return before the write callback has been called.
void WaitForWrite();
/// Encrypts the data in 'buffer' in-place and computes 'hash_'.
Status EncryptAndHash(MemRange buffer) WARN_UNUSED_RESULT;
/// Verifies the integrity hash and decrypts the contents of 'buffer' in place.
Status CheckHashAndDecrypt(MemRange buffer) WARN_UNUSED_RESULT;
/// Callback to be called when the write completes.
WriteDoneCallback cb_;
/// Reference to the FileGroup's 'encryption_timer_'.
RuntimeProfile::Counter* encryption_timer_;
/// The DiskIoMgr write range for this write.
boost::scoped_ptr<io::WriteRange> write_range_;
/// The temporary file being written to.
File* file_;
/// If --disk_spill_encryption is on, a AES 256-bit key and initialization vector.
/// Regenerated for each write.
EncryptionKey key_;
/// If --disk_spill_encryption is on, our hash of the data being written. Filled in
/// on writes; verified on reads. This is calculated _after_ encryption.
IntegrityHash hash_;
/// The scan range for the read that is currently in flight. NULL when no read is in
/// flight.
io::ScanRange* read_range_;
/// Protects all fields below while 'write_in_flight_' is true. At other times, it is
/// invalid to call WriteRange/FileGroup methods concurrently from multiple threads,
/// so no locking is required. This is a terminal lock and should not be held while
/// acquiring other locks or invoking 'cb_'.
boost::mutex write_state_lock_;
/// True if the the write has been cancelled (but is not necessarily complete).
bool is_cancelled_;
/// True if a write is in flight.
bool write_in_flight_;
/// Signalled when the write completes and 'write_in_flight_' becomes false, before
/// 'cb_' is invoked.
ConditionVariable write_complete_cv_;
/// Creates the configured tmp directories. If multiple directories are specified per
/// disk, only one is created and used. Must be called after DiskInfo::Init().
Status Init(MetricGroup* metrics) WARN_UNUSED_RESULT;
/// Custom initialization - initializes with the provided list of directories.
/// If one_dir_per_device is true, only use one temporary directory per device.
/// This interface is intended for testing purposes. 'tmp_dir_specifiers'
/// use the command-line syntax, i.e. <path>[:<limit>]. The first variant takes
/// a comma-separated list, the second takes a vector.
Status InitCustom(const std::string& tmp_dirs_spec, bool one_dir_per_device,
MetricGroup* metrics) WARN_UNUSED_RESULT;
Status InitCustom(const std::vector<std::string>& tmp_dir_specifiers,
bool one_dir_per_device, MetricGroup* metrics) WARN_UNUSED_RESULT;
/// Return the scratch directory path for the device.
std::string GetTmpDirPath(DeviceId device_id) const;
/// Total number of devices with tmp directories that are active. There is one tmp
/// directory per device.
int NumActiveTmpDevices();
/// Return vector with device ids of all tmp devices being actively used.
/// I.e. those that haven't been blacklisted.
std::vector<DeviceId> ActiveTmpDevices();
friend class TmpFileMgrTest;
/// Return a new File handle with a path based on file_group->unique_id. The file is
/// associated with the 'file_group' and the file path is within the (single) scratch
/// directory on the specified device id. The caller owns the returned handle and is
/// responsible for deleting it. The file is not created - creation is deferred until
/// the file is written.
void NewFile(FileGroup* file_group, DeviceId device_id,
std::unique_ptr<File>* new_file);
bool initialized_;
/// The paths of the created tmp directories.
std::vector<TmpDir> tmp_dirs_;
/// Metrics to track active scratch directories.
IntGauge* num_active_scratch_dirs_metric_;
SetMetric<std::string>* active_scratch_dirs_metric_;
/// Metrics to track the scratch space HWM.
AtomicHighWaterMarkGauge* scratch_bytes_used_metric_;