blob: bd75239cc75d1c6330baffc2f0a710063498d8a9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <boost/thread/shared_mutex.hpp>
#include "common/atomic.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/io/file-writer.h"
#include "runtime/tmp-file-mgr.h"
namespace impala {
/// TmpFile is a handle to a physical file in a temporary directory. File space
/// can be allocated and files removed using AllocateSpace() and Remove(). Used
/// internally by TmpFileMgr.
/// Creation of the physical file in the file system is deferred until the file is
/// written by DiskIoMgr.
/// Methods of TmpFile are not thread-safe.
class TmpFile {
TmpFile(TmpFileGroup* file_group, TmpFileMgr::DeviceId device_id,
const std::string& path, bool expected_local = true);
virtual ~TmpFile() {}
/// Allocates 'num_bytes' bytes in this file for a new block of data if there is
/// free capacity in this temporary directory. If there is insufficient capacity,
/// return false. Otherwise, update state and return true.
/// This function does not actually perform any file operations.
/// On success, sets 'offset' to the file offset of the first byte in the allocated
/// range on success.
virtual bool AllocateSpace(int64_t num_bytes, int64_t* offset) = 0;
/// Called when an IO error is encountered for this file. Logs the error and blacklists
/// the file. Returns true if the file just became blacklisted.
bool Blacklist(const ErrorMsg& msg);
/// Delete the physical file including buffer on disk, if one was created.
/// It is not valid to read or write to a file after calling Remove().
virtual Status Remove() = 0;
/// Get the disk ID that should be used for IO mgr queueing.
int AssignDiskQueue(bool is_local_buffer = false) const;
/// Try to punch a hole in the file of size 'len' at 'offset'.
Status PunchHole(int64_t offset, int64_t len);
/// Returns the exact file handle for writing. For Spilling to remote, the default
/// is to return the local buffer file instead of the remote file.
virtual io::DiskFile* GetWriteFile() = 0;
/// Returns the path of the TmpFile.
/// If it is a remote temporary file, the path should be a remote scratch space path.
/// Similarly, if it is a local one, the path should be a local path.
const std::string& path() const { return path_; }
/// Caller must hold TmpFileMgr::FileGroup::lock_.
bool is_blacklisted() const { return blacklisted_; }
/// Returns the current length of the file.
int64_t len() const { return allocation_offset_; }
/// Returns the disk id of the temporary file.
int disk_id() const { return disk_id_; }
/// Returns if the temporary file is in local file system.
bool is_local() { return expected_local_; }
/// Returns the path of the local buffer of the TmpFile.
const string& LocalBuffPath() { return local_buffer_path_; }
std::string DebugString();
/// Helper to get the TmpDir that this file is associated with.
TmpFileMgr::TmpDir* GetDir();
/// Helper to get the TmpFileGroup that this file is associated with.
TmpFileGroup* FileGroup() const { return file_group_; }
/// Return the pointer of the disk file.
io::DiskFile* DiskFile() { return disk_file_.get(); }
/// Return the disk type where the file locates.
io::DiskFileType disk_type() { return disk_type_; }
friend class TmpFileMgrTest;
friend class TmpFileLocal;
friend class TmpFileRemote;
friend class TmpFileDummy;
friend class TmpWriteHandle;
/// The name of the sub-directory that Impala creates within each configured scratch
/// directory.
const static std::string TMP_SUB_DIR_NAME;
/// Space (in MB) that must ideally be available for writing on a scratch
/// directory. A warning is issued if available space is less than this threshold.
const static uint64_t AVAILABLE_SPACE_THRESHOLD_MB;
/// The TmpFileGroup this belongs to. Cannot be null.
TmpFileGroup* const file_group_;
/// Path of the physical file in the filesystem.
const std::string path_;
/// The temporary device this file is stored on.
const TmpFileMgr::DeviceId device_id_;
/// The id of the disk on which the physical file lies.
int disk_id_;
// If the file is expected to be in the local file system.
bool expected_local_;
/// Total bytes of the file that have been given out by AllocateSpace(). Note that
/// these bytes may not be actually using space on the filesystem, either because the
/// data hasn't been written or a hole has been punched. Modified by AllocateSpace().
int64_t allocation_offset_ = 0;
/// Bytes reclaimed through hole punching.
AtomicInt64 bytes_reclaimed_{0};
/// Set to true to indicate that we shouldn't allocate any more space in this file.
/// Protected by TmpFileMgr::FileGroup::lock_.
bool blacklisted_;
/// Specify the type of the disk where the file locates.
io::DiskFileType disk_type_;
/// The file path of the local buffer of the TmpFile.
string local_buffer_path_;
/// The disk file of the temporary file which is a handle to manage the status and
/// operate on the physical file.
std::unique_ptr<io::DiskFile> disk_file_;
/// TmpFileLocal is a derived class of TmpFile to provide methods to handle a
/// physical file in a temporary directory of local filesystem.
class TmpFileLocal : public TmpFile {
TmpFileLocal(TmpFileGroup* file_group, TmpFileMgr::DeviceId device_id,
const std::string& path, bool expected_local = true);
bool AllocateSpace(int64_t num_bytes, int64_t* offset);
io::DiskFile* GetWriteFile();
Status Remove();
/// TmpFileRemote is a derived class of TmpFile to provide methods to handle a
/// physical file in a temporary directory of remote filesystem.
/// Locking:
/// For remote temporary files, a locking mechanism is applied to guarantee a safe write,
/// read or upload on the file.A remote temporary file can have two DiskFiles, a local
/// buffer and a remote file.
/// Each DiskFile owns two type of locks, a file lock and a status lock.
/// DiskFile::lock_ -- file lock
/// DiskFile::status_lock_ -- status lock
/// For doing file deleting operation, a unique file lock is needed. For other types of
/// operations on the file, like reading or writing, a shared file lock is needed to
/// protect the file from deleting.
/// For status transition, the thread needs to hold the status lock, details can be found
/// in the header of TmpFileRemote.
/// If an operation requires locks from two DiskFiles, such as an upload operation, the
/// sequence of acquiring the lock must be from the local file to the remote file to
/// avoid deadlocks, and file locks need to be acquired before status locks.
/// By default, there are two DiskFiles, local buffer and remote file, which are
/// used to manage the status of the temporary file. For each DiskFile, there are
/// three status, InWriting/Persisted/Deleted, details can be found in the header of
/// DiskFile.
/// Assume that:
/// Local buffer: InWriting A, Persisted B, Deleted C
/// Remote file: InWriting D, Persisted E, Deleted F
/// The normal status transition procedure of a remote temporary file should be:
/// AD ---> BD Local Buffer File Closed
/// BD ---> BE Upload Complete
/// BE ---> CE Local Buffer Evicted
/// Any State ---> CF Temporary File Destoryed
/// Most of the state transitions are done in the DiskIoMgr when an IO operation is done.
/// For reading or changing the state, a status lock of the specific DiskFile should be
/// acquired.
class TmpFileRemote : public TmpFile {
TmpFileRemote(TmpFileGroup* file_group, TmpFileMgr::DeviceId device_id,
const std::string& path, const std::string& local_buffer_path,
bool expected_local = false, const char* url = nullptr);
bool AllocateSpace(int64_t num_bytes, int64_t* offset);
io::DiskFile* GetWriteFile();
TmpFileMgr::TmpDir* GetLocalBufferDir() const;
Status Remove();
/// Returns the size of the file.
int64_t file_size() const { return file_size_; }
/// Returns if the disk buffer file pointer.
io::DiskFile* DiskBufferFile() { return disk_buffer_file_.get(); }
/// Set the at_capacity_ indicator to true.
void SetAtCapacity() {
at_capacity_ = true;
/// Set the file is enqueued or not.
/// The function is thread-safe.
void SetEnqueued(bool is_enqueued) {
std::lock_guard<SpinLock> l(lock_);
DCHECK(is_enqueued != enqueued_);
enqueued_ = is_enqueued;
/// Set the buffer of the file is returned to the pool.
/// The function is thread-safe.
void SetBufferReturned() {
std::lock_guard<SpinLock> l(lock_);
buffer_returned_ = true;
/// Returns if the file is enqueued to the tmp file available pool.
/// The function is thread-safe.
bool is_enqueued() {
std::lock_guard<SpinLock> l(lock_);
return enqueued_;
/// Returns if the buffer is returned to the tmp file available pool.
/// The function is thread-safe.
bool is_buffer_returned() {
std::lock_guard<SpinLock> l(lock_);
return buffer_returned_;
friend class TmpWriteHandle;
friend class TmpFileMgr;
friend class TmpFileGroup;
friend class TmpFileBufferPool;
/// The default file size of the temporary file, but the actual file size can be a
/// little over it if the size of the last page written to the file is over the
/// remaining space.
int64_t file_size_ = 0;
/// Bogus value of mtime for HDFS files.
const int64_t mtime_{100000};
/// The range for doing file uploading.
std::unique_ptr<io::RemoteOperRange> upload_range_;
// The pointer of the disk buffer file, which is the local buffer
// of the disk file when disk file is a remote disk file.
std::unique_ptr<io::DiskFile> disk_buffer_file_;
/// The hdfs connection used to connect to the remote scratch path.
hdfsFS hdfs_conn_;
/// at_capacity_ is set to true if the file can't assign space anymore when the
/// assigned space is equal to or just over the default file size.
bool at_capacity_ = false;
/// Protect below members.
SpinLock lock_;
/// Indicates if the file is enqueued to the pool. For debug use.
bool enqueued_ = false;
/// True if the buffer of the file is returned to the pool. We assume that the buffer
/// only returns once and only needs to be returned when the buffer space is reserved.
bool buffer_returned_ = false;
/// TmpFileDummy is a derived class of TmpFile for dummy allocation, used in
/// TmpFileBufferPool only.
class TmpFileDummy : public TmpFile {
TmpFileDummy() : TmpFile(nullptr, -1, "") { disk_type_ = io::DiskFileType::DUMMY; }
bool AllocateSpace(int64_t num_bytes, int64_t* offset) { return true; }
io::DiskFile* GetWriteFile() { return nullptr; }
Status Remove() { return Status::OK(); }
/// Temporary file buffer pool allows the temporary files to return their buffer to the
/// pool and can be evicted to make room for other files. The pool also provides an async
/// way for the write ranges to wait until there is an available space to reserve before
/// they are sent to the disk queues for writing.
class TmpFileBufferPool {
TmpFileBufferPool(TmpFileMgr* tmp_file_mgr);
// Loop to reserve space for the ranges which are put into the queue write_ranges_, the
// space is gained from tmp_files_avail_pool_ by calling DequeueTmpFilesPool(). Once
// the space is reserved, the write ranges which are belong to the same file will be
// sent to the disk queues to write. If an error happens during the sending to the disk
// queue, the callback function of the write ranges would be called with the status.
void TmpFileSpaceReserveThreadLoop();
/// Enqueue the write ranges to wait for buffer space reservation. All ranges of the
/// same file would be put into a map write_ranges_to_add_ to wait until the
/// reservation is done and being sent to the disk queue after that. Specially, the
/// first write range of a file (offset is 0) is used to doing the reservation.
Status EnqueueWriteRange(io::WriteRange* range, TmpFile* file);
/// Function is called to remove all enqueued write ranges belonging to the "io_ctx"
/// when the TmpFileGroup which the "io_ctx" belongs to is closing.
void RemoveWriteRanges(io::RequestContext* io_ctx);
// Enqueue the temporary file whose buffer file is available to be evicted.
void EnqueueTmpFilesPool(std::shared_ptr<TmpFile>& tmp_file, bool front);
// Dequeue a temporary file, whose buffer is supposed to be available being evicted,
// from the available pool and make room for other files' buffer.
Status DequeueTmpFilesPool(std::shared_ptr<TmpFile>* tmp_file, bool quick_return);
friend class TmpFileMgr;
friend class TmpFileMgrTest;
/// The TmpFileMgr the TmpFileBufferPool belongs to.
TmpFileMgr* tmp_file_mgr_ = nullptr;
/// A pool stores the pointer of remote temporary files whose buffer are available to be
/// evicted.
/// The file is enqueued when:
/// 1. The file is uploaded to the remote directory.
/// 2. The file is deleted without unloading.
/// The file is enqueued by calling EnqueueTmpFilesPool(), and dequeued by
/// DequeueTmpFilesPool().
std::list<std::shared_ptr<TmpFile>> tmp_files_avail_pool_;
/// Condition variable for tmp files available pool to wait for available
/// tmp files to be evicted. Wait() in the DequeueTmpFilesPool() and NotifyOne()
/// in the EnqueueTmpFilesPool().
ConditionVariable tmp_files_available_cv_;
/// Protects tmp files available pool members.
std::mutex tmp_files_avail_pool_lock_;
/// Lock that protects below members.
std::mutex lock_;
/// Condition variable to signal the space reservation thread that there is work to do
/// or the thread should shut down. The space reservation thread will be woken up when
/// there is a write range added to the queue write_ranges_.
ConditionVariable work_available_;
/// The write ranges waiting for the buffer space reservation.
/// The key is the pointer of the DiskFile which the ranges are writing into.
std::unordered_map<io::DiskFile*, std::vector<io::WriteRange*>> write_ranges_to_add_;
/// Records the relationship of io_ctx and disk files, data inserts when add write
/// ranges for waiting the reservation, removes when the request context is
/// deconstructing when the TmpFileGroup is Closing.
std::unordered_map<io::RequestContext*, std::unordered_set<io::DiskFile*>>
/// The write ranges waiting for the buffer space reservation (only the first range of
/// a file).
std::list<io::WriteRange*> write_ranges_;
/// An index to bind the write range to its iterator in write_ranges_ and its TmpFile.
std::pair<std::list<io::WriteRange*>::const_iterator, TmpFile*>>
/// The current write range which is waiting for the reservation.
io::WriteRange* cur_write_range_ = nullptr;
/// The temporary file which the current write range waiting for reservation is
/// associated with.
std::shared_ptr<TmpFile> cur_tmp_file_;
/// Dummy TmpFile for the case that needs to return the space back to the pool without
/// having a TmpFile.
std::shared_ptr<TmpFile> tmp_file_dummy_;
/// True if the TmpFileMgr is destroying.
bool shut_down_ = false;
/// A timer metric for recording the dequeue waiting time.
HistogramMetric* dequeue_timer_metric_ = nullptr;
/// A helper function to add or remove all of the write ranges related to the same disk
/// file in the pool.
/// When the function is called after the reservation of a buffer file is done, it adds
/// all of the ranges of the file to the disk queue. If any adding fails, all other
/// writeback function of the ranges would be put to the vector "write_callbacks" for
/// the caller to callback with an error status. All ranges belonging to the file would
/// be removed from the pool.
/// When the function is called when the writing tasks are cancelled (happens when the
/// TmpFileGroup is closing), then is_cancelled is set to true, the function is used to
/// remove all the ranges belonging to the specific disk_file in the pool.
/// Caller should hold the lock_.
Status MoveWriteRangesHelper(io::DiskFile* disk_file,
std::vector<TmpFileMgr::WriteDoneCallback>* write_callbacks, bool is_cancelled);
/// Internal function for RemoveWriteRanges();
void RemoveWriteRangesInternal(io::RequestContext* io_ctx,
std::vector<TmpFileMgr::WriteDoneCallback>* write_callbacks);
} // namespace impala