blob: d052c01683775eaff5af16d7b122d66c2f3edc3e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/io/interfaces.h"
#include <algorithm>
#include <cstdint>
#include <iterator>
#include <list>
#include <memory>
#include <mutex>
#include <sstream>
#include <typeinfo>
#include <utility>
#include "arrow/buffer.h"
#include "arrow/io/concurrency.h"
#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/string_view.h"
#include "arrow/util/thread_pool.h"
namespace arrow {
using internal::Executor;
using internal::TaskHints;
using internal::ThreadPool;
namespace io {
static IOContext g_default_io_context{};
IOContext::IOContext(MemoryPool* pool, StopToken stop_token)
: IOContext(pool, internal::GetIOThreadPool(), std::move(stop_token)) {}
const IOContext& default_io_context() { return g_default_io_context; }
FileInterface::~FileInterface() = default;
Status FileInterface::Abort() { return Close(); }
namespace {
class InputStreamBlockIterator {
public:
InputStreamBlockIterator(std::shared_ptr<InputStream> stream, int64_t block_size)
: stream_(std::move(stream)), block_size_(block_size) {}
Result<std::shared_ptr<Buffer>> Next() {
if (done_) {
return nullptr;
}
ARROW_ASSIGN_OR_RAISE(auto out, stream_->Read(block_size_));
if (out->size() == 0) {
done_ = true;
stream_.reset();
out.reset();
}
return out;
}
protected:
std::shared_ptr<InputStream> stream_;
int64_t block_size_;
bool done_ = false;
};
} // namespace
const IOContext& Readable::io_context() const { return g_default_io_context; }
Status InputStream::Advance(int64_t nbytes) { return Read(nbytes).status(); }
Result<util::string_view> InputStream::Peek(int64_t ARROW_ARG_UNUSED(nbytes)) {
return Status::NotImplemented("Peek not implemented");
}
bool InputStream::supports_zero_copy() const { return false; }
Result<Iterator<std::shared_ptr<Buffer>>> MakeInputStreamIterator(
std::shared_ptr<InputStream> stream, int64_t block_size) {
if (stream->closed()) {
return Status::Invalid("Cannot take iterator on closed stream");
}
DCHECK_GT(block_size, 0);
return Iterator<std::shared_ptr<Buffer>>(InputStreamBlockIterator(stream, block_size));
}
struct RandomAccessFile::Impl {
std::mutex lock_;
};
RandomAccessFile::~RandomAccessFile() = default;
RandomAccessFile::RandomAccessFile() : interface_impl_(new Impl()) {}
Result<int64_t> RandomAccessFile::ReadAt(int64_t position, int64_t nbytes, void* out) {
std::lock_guard<std::mutex> lock(interface_impl_->lock_);
RETURN_NOT_OK(Seek(position));
return Read(nbytes, out);
}
Result<std::shared_ptr<Buffer>> RandomAccessFile::ReadAt(int64_t position,
int64_t nbytes) {
std::lock_guard<std::mutex> lock(interface_impl_->lock_);
RETURN_NOT_OK(Seek(position));
return Read(nbytes);
}
// Default ReadAsync() implementation: simply issue the read on the context's executor
Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(const IOContext& ctx,
int64_t position,
int64_t nbytes) {
auto self = shared_from_this();
TaskHints hints;
hints.io_size = nbytes;
hints.external_id = ctx.external_id();
return DeferNotOk(internal::SubmitIO(
ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes); }));
}
Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(int64_t position,
int64_t nbytes) {
return ReadAsync(io_context(), position, nbytes);
}
// Default WillNeed() implementation: no-op
Status RandomAccessFile::WillNeed(const std::vector<ReadRange>& ranges) {
return Status::OK();
}
Status Writable::Write(util::string_view data) {
return Write(data.data(), static_cast<int64_t>(data.size()));
}
Status Writable::Write(const std::shared_ptr<Buffer>& data) {
return Write(data->data(), data->size());
}
Status Writable::Flush() { return Status::OK(); }
// An InputStream that reads from a delimited range of a RandomAccessFile
class FileSegmentReader
: public internal::InputStreamConcurrencyWrapper<FileSegmentReader> {
public:
FileSegmentReader(std::shared_ptr<RandomAccessFile> file, int64_t file_offset,
int64_t nbytes)
: file_(std::move(file)),
closed_(false),
position_(0),
file_offset_(file_offset),
nbytes_(nbytes) {
FileInterface::set_mode(FileMode::READ);
}
Status CheckOpen() const {
if (closed_) {
return Status::IOError("Stream is closed");
}
return Status::OK();
}
Status DoClose() {
closed_ = true;
return Status::OK();
}
Result<int64_t> DoTell() const {
RETURN_NOT_OK(CheckOpen());
return position_;
}
bool closed() const override { return closed_; }
Result<int64_t> DoRead(int64_t nbytes, void* out) {
RETURN_NOT_OK(CheckOpen());
int64_t bytes_to_read = std::min(nbytes, nbytes_ - position_);
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
file_->ReadAt(file_offset_ + position_, bytes_to_read, out));
position_ += bytes_read;
return bytes_read;
}
Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes) {
RETURN_NOT_OK(CheckOpen());
int64_t bytes_to_read = std::min(nbytes, nbytes_ - position_);
ARROW_ASSIGN_OR_RAISE(auto buffer,
file_->ReadAt(file_offset_ + position_, bytes_to_read));
position_ += buffer->size();
return buffer;
}
private:
std::shared_ptr<RandomAccessFile> file_;
bool closed_;
int64_t position_;
int64_t file_offset_;
int64_t nbytes_;
};
std::shared_ptr<InputStream> RandomAccessFile::GetStream(
std::shared_ptr<RandomAccessFile> file, int64_t file_offset, int64_t nbytes) {
return std::make_shared<FileSegmentReader>(std::move(file), file_offset, nbytes);
}
// -----------------------------------------------------------------------
// Implement utilities exported from concurrency.h and util_internal.h
namespace internal {
void CloseFromDestructor(FileInterface* file) {
Status st = file->Close();
if (!st.ok()) {
auto file_type = typeid(*file).name();
#ifdef NDEBUG
ARROW_LOG(ERROR) << "Error ignored when destroying file of type " << file_type << ": "
<< st;
#else
std::stringstream ss;
ss << "When destroying file of type " << file_type << ": " << st.message();
ARROW_LOG(FATAL) << st.WithMessage(ss.str());
#endif
}
}
Result<int64_t> ValidateReadRange(int64_t offset, int64_t size, int64_t file_size) {
if (offset < 0 || size < 0) {
return Status::Invalid("Invalid read (offset = ", offset, ", size = ", size, ")");
}
if (offset > file_size) {
return Status::IOError("Read out of bounds (offset = ", offset, ", size = ", size,
") in file of size ", file_size);
}
return std::min(size, file_size - offset);
}
Status ValidateWriteRange(int64_t offset, int64_t size, int64_t file_size) {
if (offset < 0 || size < 0) {
return Status::Invalid("Invalid write (offset = ", offset, ", size = ", size, ")");
}
if (offset + size > file_size) {
return Status::IOError("Write out of bounds (offset = ", offset, ", size = ", size,
") in file of size ", file_size);
}
return Status::OK();
}
Status ValidateRange(int64_t offset, int64_t size) {
if (offset < 0 || size < 0) {
return Status::Invalid("Invalid IO range (offset = ", offset, ", size = ", size, ")");
}
return Status::OK();
}
#ifndef NDEBUG
// Debug mode concurrency checking
struct SharedExclusiveChecker::Impl {
std::mutex mutex;
int64_t n_shared = 0;
int64_t n_exclusive = 0;
};
SharedExclusiveChecker::SharedExclusiveChecker() : impl_(new Impl) {}
void SharedExclusiveChecker::LockShared() {
std::lock_guard<std::mutex> lock(impl_->mutex);
// XXX The error message doesn't really describe the actual situation
// (e.g. ReadAt() called while Read() call in progress)
ARROW_CHECK_EQ(impl_->n_exclusive, 0)
<< "Attempted to take shared lock while locked exclusive";
++impl_->n_shared;
}
void SharedExclusiveChecker::UnlockShared() {
std::lock_guard<std::mutex> lock(impl_->mutex);
ARROW_CHECK_GT(impl_->n_shared, 0);
--impl_->n_shared;
}
void SharedExclusiveChecker::LockExclusive() {
std::lock_guard<std::mutex> lock(impl_->mutex);
ARROW_CHECK_EQ(impl_->n_shared, 0)
<< "Attempted to take exclusive lock while locked shared";
ARROW_CHECK_EQ(impl_->n_exclusive, 0)
<< "Attempted to take exclusive lock while already locked exclusive";
++impl_->n_exclusive;
}
void SharedExclusiveChecker::UnlockExclusive() {
std::lock_guard<std::mutex> lock(impl_->mutex);
ARROW_CHECK_EQ(impl_->n_exclusive, 1);
--impl_->n_exclusive;
}
#else
// Release mode no-op concurrency checking
struct SharedExclusiveChecker::Impl {};
SharedExclusiveChecker::SharedExclusiveChecker() {}
void SharedExclusiveChecker::LockShared() {}
void SharedExclusiveChecker::UnlockShared() {}
void SharedExclusiveChecker::LockExclusive() {}
void SharedExclusiveChecker::UnlockExclusive() {}
#endif
static std::shared_ptr<ThreadPool> MakeIOThreadPool() {
auto maybe_pool = ThreadPool::MakeEternal(/*threads=*/8);
if (!maybe_pool.ok()) {
maybe_pool.status().Abort("Failed to create global IO thread pool");
}
return *std::move(maybe_pool);
}
ThreadPool* GetIOThreadPool() {
static std::shared_ptr<ThreadPool> pool = MakeIOThreadPool();
return pool.get();
}
// -----------------------------------------------------------------------
// CoalesceReadRanges
namespace {
struct ReadRangeCombiner {
std::vector<ReadRange> Coalesce(std::vector<ReadRange> ranges) {
if (ranges.empty()) {
return ranges;
}
// Remove zero-sized ranges
auto end = std::remove_if(ranges.begin(), ranges.end(),
[](const ReadRange& range) { return range.length == 0; });
ranges.resize(end - ranges.begin());
// Sort in position order
std::sort(ranges.begin(), ranges.end(),
[](const ReadRange& a, const ReadRange& b) { return a.offset < b.offset; });
// Skip further processing if ranges is empty after removing zero-sized ranges.
if (ranges.empty()) {
return ranges;
}
#ifndef NDEBUG
for (size_t i = 0; i < ranges.size() - 1; ++i) {
const auto& left = ranges[i];
const auto& right = ranges[i + 1];
DCHECK_LE(left.offset, right.offset);
DCHECK_LE(left.offset + left.length, right.offset) << "Some read ranges overlap";
}
#endif
std::vector<ReadRange> coalesced;
auto itr = ranges.begin();
// Ensure ranges is not empty.
DCHECK_LE(itr, ranges.end());
// Start of the current coalesced range and end (exclusive) of previous range.
// Both are initialized with the start of first range which is a placeholder value.
int64_t coalesced_start = itr->offset;
int64_t prev_range_end = coalesced_start;
for (; itr < ranges.end(); ++itr) {
const int64_t current_range_start = itr->offset;
const int64_t current_range_end = current_range_start + itr->length;
// We don't expect to have 0 sized ranges.
DCHECK_LT(current_range_start, current_range_end);
// At this point, the coalesced range is [coalesced_start, prev_range_end).
// Stop coalescing if:
// - coalesced range is too large, or
// - distance (hole/gap) between consecutive ranges is too large.
if (current_range_end - coalesced_start > range_size_limit_ ||
current_range_start - prev_range_end > hole_size_limit_) {
DCHECK_LE(coalesced_start, prev_range_end);
// Append the coalesced range only if coalesced range size > 0.
if (prev_range_end > coalesced_start) {
coalesced.push_back({coalesced_start, prev_range_end - coalesced_start});
}
// Start a new coalesced range.
coalesced_start = current_range_start;
}
// Update the prev_range_end with the current range.
prev_range_end = current_range_end;
}
// Append the coalesced range only if coalesced range size > 0.
if (prev_range_end > coalesced_start) {
coalesced.push_back({coalesced_start, prev_range_end - coalesced_start});
}
DCHECK_EQ(coalesced.front().offset, ranges.front().offset);
DCHECK_EQ(coalesced.back().offset + coalesced.back().length,
ranges.back().offset + ranges.back().length);
return coalesced;
}
const int64_t hole_size_limit_;
const int64_t range_size_limit_;
};
}; // namespace
std::vector<ReadRange> CoalesceReadRanges(std::vector<ReadRange> ranges,
int64_t hole_size_limit,
int64_t range_size_limit) {
DCHECK_GT(range_size_limit, hole_size_limit);
ReadRangeCombiner combiner{hole_size_limit, range_size_limit};
return combiner.Coalesce(std::move(ranges));
}
} // namespace internal
} // namespace io
} // namespace arrow