| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| // |
| // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. See the AUTHORS file for names of contributors. |
| |
| #include "util/file_reader_writer.h" |
| |
| #include <algorithm> |
| #include <mutex> |
| |
| #include "monitoring/histogram.h" |
| #include "monitoring/iostats_context_imp.h" |
| #include "port/port.h" |
| #include "util/random.h" |
| #include "util/rate_limiter.h" |
| #include "util/sync_point.h" |
| |
| namespace rocksdb { |
| |
| #ifndef NDEBUG |
| namespace { |
| bool IsFileSectorAligned(const size_t off, size_t sector_size) { |
| return off % sector_size == 0; |
| } |
| } |
| #endif |
| |
| Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { |
| Status s; |
| if (use_direct_io()) { |
| #ifndef ROCKSDB_LITE |
| size_t offset = offset_.fetch_add(n); |
| size_t alignment = file_->GetRequiredBufferAlignment(); |
| size_t aligned_offset = TruncateToPageBoundary(alignment, offset); |
| size_t offset_advance = offset - aligned_offset; |
| size_t size = Roundup(offset + n, alignment) - aligned_offset; |
| size_t r = 0; |
| AlignedBuffer buf; |
| buf.Alignment(alignment); |
| buf.AllocateNewBuffer(size); |
| Slice tmp; |
| s = file_->PositionedRead(aligned_offset, size, &tmp, buf.BufferStart()); |
| if (s.ok() && offset_advance < tmp.size()) { |
| buf.Size(tmp.size()); |
| r = buf.Read(scratch, offset_advance, |
| std::min(tmp.size() - offset_advance, n)); |
| } |
| *result = Slice(scratch, r); |
| #endif // !ROCKSDB_LITE |
| } else { |
| s = file_->Read(n, result, scratch); |
| } |
| IOSTATS_ADD(bytes_read, result->size()); |
| return s; |
| } |
| |
| |
| Status SequentialFileReader::Skip(uint64_t n) { |
| #ifndef ROCKSDB_LITE |
| if (use_direct_io()) { |
| offset_ += n; |
| return Status::OK(); |
| } |
| #endif // !ROCKSDB_LITE |
| return file_->Skip(n); |
| } |
| |
| Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, |
| char* scratch) const { |
| Status s; |
| uint64_t elapsed = 0; |
| { |
| StopWatch sw(env_, stats_, hist_type_, |
| (stats_ != nullptr) ? &elapsed : nullptr); |
| IOSTATS_TIMER_GUARD(read_nanos); |
| if (use_direct_io()) { |
| #ifndef ROCKSDB_LITE |
| size_t alignment = file_->GetRequiredBufferAlignment(); |
| size_t aligned_offset = TruncateToPageBoundary(alignment, offset); |
| size_t offset_advance = offset - aligned_offset; |
| size_t read_size = Roundup(offset + n, alignment) - aligned_offset; |
| AlignedBuffer buf; |
| buf.Alignment(alignment); |
| buf.AllocateNewBuffer(read_size); |
| while (buf.CurrentSize() < read_size) { |
| size_t allowed; |
| if (rate_limiter_ != nullptr) { |
| allowed = rate_limiter_->RequestToken( |
| buf.Capacity() - buf.CurrentSize(), buf.Alignment(), |
| Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead); |
| } else { |
| assert(buf.CurrentSize() == 0); |
| allowed = read_size; |
| } |
| Slice tmp; |
| s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, &tmp, |
| buf.Destination()); |
| buf.Size(buf.CurrentSize() + tmp.size()); |
| if (!s.ok() || tmp.size() < allowed) { |
| break; |
| } |
| } |
| size_t res_len = 0; |
| if (s.ok() && offset_advance < buf.CurrentSize()) { |
| res_len = buf.Read(scratch, offset_advance, |
| std::min(buf.CurrentSize() - offset_advance, n)); |
| } |
| *result = Slice(scratch, res_len); |
| #endif // !ROCKSDB_LITE |
| } else { |
| size_t pos = 0; |
| const char* res_scratch = nullptr; |
| while (pos < n) { |
| size_t allowed; |
| if (for_compaction_ && rate_limiter_ != nullptr) { |
| allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */, |
| Env::IOPriority::IO_LOW, stats_, |
| RateLimiter::OpType::kRead); |
| } else { |
| allowed = n; |
| } |
| Slice tmp_result; |
| s = file_->Read(offset + pos, allowed, &tmp_result, scratch + pos); |
| if (res_scratch == nullptr) { |
| // we can't simply use `scratch` because reads of mmap'd files return |
| // data in a different buffer. |
| res_scratch = tmp_result.data(); |
| } else { |
| // make sure chunks are inserted contiguously into `res_scratch`. |
| assert(tmp_result.data() == res_scratch + pos); |
| } |
| pos += tmp_result.size(); |
| if (!s.ok() || tmp_result.size() < allowed) { |
| break; |
| } |
| } |
| *result = Slice(res_scratch, s.ok() ? pos : 0); |
| } |
| IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size()); |
| } |
| if (stats_ != nullptr && file_read_hist_ != nullptr) { |
| file_read_hist_->Add(elapsed); |
| } |
| return s; |
| } |
| |
| Status WritableFileWriter::Append(const Slice& data) { |
| const char* src = data.data(); |
| size_t left = data.size(); |
| Status s; |
| pending_sync_ = true; |
| |
| TEST_KILL_RANDOM("WritableFileWriter::Append:0", |
| rocksdb_kill_odds * REDUCE_ODDS2); |
| |
| { |
| IOSTATS_TIMER_GUARD(prepare_write_nanos); |
| TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite"); |
| writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left); |
| } |
| |
| // See whether we need to enlarge the buffer to avoid the flush |
| if (buf_.Capacity() - buf_.CurrentSize() < left) { |
| for (size_t cap = buf_.Capacity(); |
| cap < max_buffer_size_; // There is still room to increase |
| cap *= 2) { |
| // See whether the next available size is large enough. |
| // Buffer will never be increased to more than max_buffer_size_. |
| size_t desired_capacity = std::min(cap * 2, max_buffer_size_); |
| if (desired_capacity - buf_.CurrentSize() >= left || |
| (use_direct_io() && desired_capacity == max_buffer_size_)) { |
| buf_.AllocateNewBuffer(desired_capacity, true); |
| break; |
| } |
| } |
| } |
| |
| // Flush only when buffered I/O |
| if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) { |
| if (buf_.CurrentSize() > 0) { |
| s = Flush(); |
| if (!s.ok()) { |
| return s; |
| } |
| } |
| assert(buf_.CurrentSize() == 0); |
| } |
| |
| // We never write directly to disk with direct I/O on. |
| // or we simply use it for its original purpose to accumulate many small |
| // chunks |
| if (use_direct_io() || (buf_.Capacity() >= left)) { |
| while (left > 0) { |
| size_t appended = buf_.Append(src, left); |
| left -= appended; |
| src += appended; |
| |
| if (left > 0) { |
| s = Flush(); |
| if (!s.ok()) { |
| break; |
| } |
| } |
| } |
| } else { |
| // Writing directly to file bypassing the buffer |
| assert(buf_.CurrentSize() == 0); |
| s = WriteBuffered(src, left); |
| } |
| |
| TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds); |
| if (s.ok()) { |
| filesize_ += data.size(); |
| } |
| return s; |
| } |
| |
| Status WritableFileWriter::Close() { |
| |
| // Do not quit immediately on failure the file MUST be closed |
| Status s; |
| |
| // Possible to close it twice now as we MUST close |
| // in __dtor, simply flushing is not enough |
| // Windows when pre-allocating does not fill with zeros |
| // also with unbuffered access we also set the end of data. |
| if (!writable_file_) { |
| return s; |
| } |
| |
| s = Flush(); // flush cache to OS |
| |
| Status interim; |
| // In direct I/O mode we write whole pages so |
| // we need to let the file know where data ends. |
| if (use_direct_io()) { |
| interim = writable_file_->Truncate(filesize_); |
| if (!interim.ok() && s.ok()) { |
| s = interim; |
| } |
| } |
| |
| TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds); |
| interim = writable_file_->Close(); |
| if (!interim.ok() && s.ok()) { |
| s = interim; |
| } |
| |
| writable_file_.reset(); |
| TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds); |
| |
| return s; |
| } |
| |
| // write out the cached data to the OS cache or storage if direct I/O |
| // enabled |
| Status WritableFileWriter::Flush() { |
| Status s; |
| TEST_KILL_RANDOM("WritableFileWriter::Flush:0", |
| rocksdb_kill_odds * REDUCE_ODDS2); |
| |
| if (buf_.CurrentSize() > 0) { |
| if (use_direct_io()) { |
| #ifndef ROCKSDB_LITE |
| s = WriteDirect(); |
| #endif // !ROCKSDB_LITE |
| } else { |
| s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); |
| } |
| if (!s.ok()) { |
| return s; |
| } |
| } |
| |
| s = writable_file_->Flush(); |
| |
| if (!s.ok()) { |
| return s; |
| } |
| |
| // sync OS cache to disk for every bytes_per_sync_ |
| // TODO: give log file and sst file different options (log |
| // files could be potentially cached in OS for their whole |
| // life time, thus we might not want to flush at all). |
| |
| // We try to avoid sync to the last 1MB of data. For two reasons: |
| // (1) avoid rewrite the same page that is modified later. |
| // (2) for older version of OS, write can block while writing out |
| // the page. |
| // Xfs does neighbor page flushing outside of the specified ranges. We |
| // need to make sure sync range is far from the write offset. |
| if (!use_direct_io() && bytes_per_sync_) { |
| const uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced. |
| const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB. |
| if (filesize_ > kBytesNotSyncRange) { |
| uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange; |
| offset_sync_to -= offset_sync_to % kBytesAlignWhenSync; |
| assert(offset_sync_to >= last_sync_size_); |
| if (offset_sync_to > 0 && |
| offset_sync_to - last_sync_size_ >= bytes_per_sync_) { |
| s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); |
| last_sync_size_ = offset_sync_to; |
| } |
| } |
| } |
| |
| return s; |
| } |
| |
| Status WritableFileWriter::Sync(bool use_fsync) { |
| Status s = Flush(); |
| if (!s.ok()) { |
| return s; |
| } |
| TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds); |
| if (!use_direct_io() && pending_sync_) { |
| s = SyncInternal(use_fsync); |
| if (!s.ok()) { |
| return s; |
| } |
| } |
| TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds); |
| pending_sync_ = false; |
| return Status::OK(); |
| } |
| |
| Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) { |
| if (!writable_file_->IsSyncThreadSafe()) { |
| return Status::NotSupported( |
| "Can't WritableFileWriter::SyncWithoutFlush() because " |
| "WritableFile::IsSyncThreadSafe() is false"); |
| } |
| TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1"); |
| Status s = SyncInternal(use_fsync); |
| TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2"); |
| return s; |
| } |
| |
| Status WritableFileWriter::SyncInternal(bool use_fsync) { |
| Status s; |
| IOSTATS_TIMER_GUARD(fsync_nanos); |
| TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); |
| if (use_fsync) { |
| s = writable_file_->Fsync(); |
| } else { |
| s = writable_file_->Sync(); |
| } |
| return s; |
| } |
| |
| Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { |
| IOSTATS_TIMER_GUARD(range_sync_nanos); |
| TEST_SYNC_POINT("WritableFileWriter::RangeSync:0"); |
| return writable_file_->RangeSync(offset, nbytes); |
| } |
| |
| // This method writes to disk the specified data and makes use of the rate |
| // limiter if available |
| Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { |
| Status s; |
| assert(!use_direct_io()); |
| const char* src = data; |
| size_t left = size; |
| |
| while (left > 0) { |
| size_t allowed; |
| if (rate_limiter_ != nullptr) { |
| allowed = rate_limiter_->RequestToken( |
| left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_, |
| RateLimiter::OpType::kWrite); |
| } else { |
| allowed = left; |
| } |
| |
| { |
| IOSTATS_TIMER_GUARD(write_nanos); |
| TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); |
| s = writable_file_->Append(Slice(src, allowed)); |
| if (!s.ok()) { |
| return s; |
| } |
| } |
| |
| IOSTATS_ADD(bytes_written, allowed); |
| TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds); |
| |
| left -= allowed; |
| src += allowed; |
| } |
| buf_.Size(0); |
| return s; |
| } |
| |
| |
| // This flushes the accumulated data in the buffer. We pad data with zeros if |
| // necessary to the whole page. |
| // However, during automatic flushes padding would not be necessary. |
| // We always use RateLimiter if available. We move (Refit) any buffer bytes |
| // that are left over the |
| // whole number of pages to be written again on the next flush because we can |
| // only write on aligned |
| // offsets. |
| #ifndef ROCKSDB_LITE |
| Status WritableFileWriter::WriteDirect() { |
| assert(use_direct_io()); |
| Status s; |
| const size_t alignment = buf_.Alignment(); |
| assert((next_write_offset_ % alignment) == 0); |
| |
| // Calculate whole page final file advance if all writes succeed |
| size_t file_advance = |
| TruncateToPageBoundary(alignment, buf_.CurrentSize()); |
| |
| // Calculate the leftover tail, we write it here padded with zeros BUT we |
| // will write |
| // it again in the future either on Close() OR when the current whole page |
| // fills out |
| size_t leftover_tail = buf_.CurrentSize() - file_advance; |
| |
| // Round up and pad |
| buf_.PadToAlignmentWith(0); |
| |
| const char* src = buf_.BufferStart(); |
| uint64_t write_offset = next_write_offset_; |
| size_t left = buf_.CurrentSize(); |
| |
| while (left > 0) { |
| // Check how much is allowed |
| size_t size; |
| if (rate_limiter_ != nullptr) { |
| size = rate_limiter_->RequestToken(left, buf_.Alignment(), |
| writable_file_->GetIOPriority(), |
| stats_, RateLimiter::OpType::kWrite); |
| } else { |
| size = left; |
| } |
| |
| { |
| IOSTATS_TIMER_GUARD(write_nanos); |
| TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); |
| // direct writes must be positional |
| s = writable_file_->PositionedAppend(Slice(src, size), write_offset); |
| if (!s.ok()) { |
| buf_.Size(file_advance + leftover_tail); |
| return s; |
| } |
| } |
| |
| IOSTATS_ADD(bytes_written, size); |
| left -= size; |
| src += size; |
| write_offset += size; |
| assert((next_write_offset_ % alignment) == 0); |
| } |
| |
| if (s.ok()) { |
| // Move the tail to the beginning of the buffer |
| // This never happens during normal Append but rather during |
| // explicit call to Flush()/Sync() or Close() |
| buf_.RefitTail(file_advance, leftover_tail); |
| // This is where we start writing next time which may or not be |
| // the actual file size on disk. They match if the buffer size |
| // is a multiple of whole pages otherwise filesize_ is leftover_tail |
| // behind |
| next_write_offset_ += file_advance; |
| } |
| return s; |
| } |
| #endif // !ROCKSDB_LITE |
| |
| namespace { |
| class ReadaheadRandomAccessFile : public RandomAccessFile { |
| public: |
| ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file, |
| size_t readahead_size) |
| : file_(std::move(file)), |
| alignment_(file_->GetRequiredBufferAlignment()), |
| readahead_size_(Roundup(readahead_size, alignment_)), |
| buffer_(), |
| buffer_offset_(0), |
| buffer_len_(0) { |
| |
| buffer_.Alignment(alignment_); |
| buffer_.AllocateNewBuffer(readahead_size_); |
| } |
| |
| ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete; |
| |
| ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) = delete; |
| |
| virtual Status Read(uint64_t offset, size_t n, Slice* result, |
| char* scratch) const override { |
| |
| if (n + alignment_ >= readahead_size_) { |
| return file_->Read(offset, n, result, scratch); |
| } |
| |
| std::unique_lock<std::mutex> lk(lock_); |
| |
| size_t cached_len = 0; |
| // Check if there is a cache hit, means that [offset, offset + n) is either |
| // completely or partially in the buffer |
| // If it's completely cached, including end of file case when offset + n is |
| // greater than EOF, return |
| if (TryReadFromCache(offset, n, &cached_len, scratch) && |
| (cached_len == n || |
| // End of file |
| buffer_len_ < readahead_size_)) { |
| *result = Slice(scratch, cached_len); |
| return Status::OK(); |
| } |
| size_t advanced_offset = offset + cached_len; |
| // In the case of cache hit advanced_offset is already aligned, means that |
| // chunk_offset equals to advanced_offset |
| size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset); |
| Slice readahead_result; |
| |
| Status s = ReadIntoBuffer(chunk_offset, readahead_size_); |
| if (s.ok()) { |
| // In the case of cache miss, i.e. when cached_len equals 0, an offset can |
| // exceed the file end position, so the following check is required |
| if (advanced_offset < chunk_offset + buffer_len_) { |
| // In the case of cache miss, the first chunk_padding bytes in buffer_ |
| // are |
| // stored for alignment only and must be skipped |
| size_t chunk_padding = advanced_offset - chunk_offset; |
| auto remaining_len = |
| std::min(buffer_len_ - chunk_padding, n - cached_len); |
| memcpy(scratch + cached_len, buffer_.BufferStart() + chunk_padding, |
| remaining_len); |
| *result = Slice(scratch, cached_len + remaining_len); |
| } else { |
| *result = Slice(scratch, cached_len); |
| } |
| } |
| return s; |
| } |
| |
| virtual Status Prefetch(uint64_t offset, size_t n) override { |
| size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset); |
| if (prefetch_offset == buffer_offset_) { |
| return Status::OK(); |
| } |
| return ReadIntoBuffer(prefetch_offset, |
| Roundup(offset + n, alignment_) - prefetch_offset); |
| } |
| |
| virtual size_t GetUniqueId(char* id, size_t max_size) const override { |
| return file_->GetUniqueId(id, max_size); |
| } |
| |
| virtual void Hint(AccessPattern pattern) override { file_->Hint(pattern); } |
| |
| virtual Status InvalidateCache(size_t offset, size_t length) override { |
| return file_->InvalidateCache(offset, length); |
| } |
| |
| virtual bool use_direct_io() const override { |
| return file_->use_direct_io(); |
| } |
| |
| private: |
| bool TryReadFromCache(uint64_t offset, size_t n, size_t* cached_len, |
| char* scratch) const { |
| if (offset < buffer_offset_ || offset >= buffer_offset_ + buffer_len_) { |
| *cached_len = 0; |
| return false; |
| } |
| uint64_t offset_in_buffer = offset - buffer_offset_; |
| *cached_len = |
| std::min(buffer_len_ - static_cast<size_t>(offset_in_buffer), n); |
| memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len); |
| return true; |
| } |
| |
| Status ReadIntoBuffer(uint64_t offset, size_t n) const { |
| if (n > buffer_.Capacity()) { |
| n = buffer_.Capacity(); |
| } |
| assert(IsFileSectorAligned(offset, alignment_)); |
| assert(IsFileSectorAligned(n, alignment_)); |
| Slice result; |
| Status s = file_->Read(offset, n, &result, buffer_.BufferStart()); |
| if (s.ok()) { |
| buffer_offset_ = offset; |
| buffer_len_ = result.size(); |
| } |
| return s; |
| } |
| |
| std::unique_ptr<RandomAccessFile> file_; |
| const size_t alignment_; |
| size_t readahead_size_; |
| |
| mutable std::mutex lock_; |
| mutable AlignedBuffer buffer_; |
| mutable uint64_t buffer_offset_; |
| mutable size_t buffer_len_; |
| }; |
| } // namespace |
| |
| Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader, |
| uint64_t offset, size_t n) { |
| size_t alignment = reader->file()->GetRequiredBufferAlignment(); |
| uint64_t roundup_offset = Roundup(offset, alignment); |
| uint64_t roundup_len = Roundup(n, alignment); |
| buffer_.Alignment(alignment); |
| buffer_.AllocateNewBuffer(roundup_len); |
| |
| Slice result; |
| Status s = |
| reader->Read(roundup_offset, roundup_len, &result, buffer_.BufferStart()); |
| if (s.ok()) { |
| buffer_offset_ = roundup_offset; |
| buffer_len_ = result.size(); |
| } |
| return s; |
| } |
| |
| bool FilePrefetchBuffer::TryReadFromCache(uint64_t offset, size_t n, |
| Slice* result) const { |
| if (offset < buffer_offset_ || offset + n > buffer_offset_ + buffer_len_) { |
| return false; |
| } |
| uint64_t offset_in_buffer = offset - buffer_offset_; |
| *result = Slice(buffer_.BufferStart() + offset_in_buffer, n); |
| return true; |
| } |
| |
| std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile( |
| std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size) { |
| std::unique_ptr<RandomAccessFile> result( |
| new ReadaheadRandomAccessFile(std::move(file), readahead_size)); |
| return result; |
| } |
| |
| Status NewWritableFile(Env* env, const std::string& fname, |
| unique_ptr<WritableFile>* result, |
| const EnvOptions& options) { |
| Status s = env->NewWritableFile(fname, result, options); |
| TEST_KILL_RANDOM("NewWritableFile:0", rocksdb_kill_odds * REDUCE_ODDS2); |
| return s; |
| } |
| |
| } // namespace rocksdb |