blob: d58c203d2ae275b8333d8731b2231b4d4138370a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/memory_pool_internal.h"
#include <algorithm> // IWYU pragma: keep
#include <atomic>
#include <cstdlib> // IWYU pragma: keep
#include <cstring> // IWYU pragma: keep
#include <iostream> // IWYU pragma: keep
#include <limits>
#include <memory>
#include <mutex>
#include <optional>
#if defined(sun) || defined(__sun)
#include <stdlib.h>
#endif
#include "arrow/buffer.h"
#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/config.h"
#include "arrow/util/debug.h"
#include "arrow/util/int_util_overflow.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h" // IWYU pragma: keep
#include "arrow/util/string.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/ubsan.h"
#ifdef __GLIBC__
#include <malloc.h>
#endif
#ifdef ARROW_MIMALLOC
#include <mimalloc.h>
#endif
namespace arrow {
namespace memory_pool {
namespace internal {
alignas(kDefaultBufferAlignment) int64_t zero_size_area[1] = {kDebugXorSuffix};
} // namespace internal
} // namespace memory_pool
namespace {
constexpr char kDefaultBackendEnvVar[] = "ARROW_DEFAULT_MEMORY_POOL";
constexpr char kDebugMemoryEnvVar[] = "ARROW_DEBUG_MEMORY_POOL";
enum class MemoryPoolBackend : uint8_t { System, Jemalloc, Mimalloc };
struct SupportedBackend {
const char* name;
MemoryPoolBackend backend;
};
// See ARROW-12248 for why we use static in-function singletons rather than
// global constants below (in SupportedBackends() and UserSelectedBackend()).
// In some contexts (especially R bindings) `default_memory_pool()` may be
// called before all globals are initialized, and then the ARROW_DEFAULT_MEMORY_POOL
// environment variable would be ignored.
const std::vector<SupportedBackend>& SupportedBackends() {
static std::vector<SupportedBackend> backends = {
// ARROW-12316: Apple => mimalloc first, then jemalloc
// non-Apple => jemalloc first, then mimalloc
#if defined(ARROW_JEMALLOC) && !defined(__APPLE__)
{"jemalloc", MemoryPoolBackend::Jemalloc},
#endif
#ifdef ARROW_MIMALLOC
{"mimalloc", MemoryPoolBackend::Mimalloc},
#endif
#if defined(ARROW_JEMALLOC) && defined(__APPLE__)
{"jemalloc", MemoryPoolBackend::Jemalloc},
#endif
{"system", MemoryPoolBackend::System}
};
return backends;
}
// Return the MemoryPoolBackend selected by the user through the
// ARROW_DEFAULT_MEMORY_POOL environment variable, if any.
std::optional<MemoryPoolBackend> UserSelectedBackend() {
static auto user_selected_backend = []() -> std::optional<MemoryPoolBackend> {
auto unsupported_backend = [](const std::string& name) {
std::vector<std::string> supported;
for (const auto backend : SupportedBackends()) {
supported.push_back(std::string("'") + backend.name + "'");
}
ARROW_LOG(WARNING) << "Unsupported backend '" << name << "' specified in "
<< kDefaultBackendEnvVar << " (supported backends are "
<< internal::JoinStrings(supported, ", ") << ")";
};
auto maybe_name = internal::GetEnvVar(kDefaultBackendEnvVar);
if (!maybe_name.ok()) {
return {};
}
const auto name = *std::move(maybe_name);
if (name.empty()) {
// An empty environment variable is considered missing
return {};
}
const auto found = std::find_if(
SupportedBackends().begin(), SupportedBackends().end(),
[&](const SupportedBackend& backend) { return name == backend.name; });
if (found != SupportedBackends().end()) {
return found->backend;
}
unsupported_backend(name);
return {};
}();
return user_selected_backend;
}
MemoryPoolBackend DefaultBackend() {
auto backend = UserSelectedBackend();
if (backend.has_value()) {
return backend.value();
}
struct SupportedBackend default_backend = SupportedBackends().front();
return default_backend.backend;
}
using MemoryDebugHandler = std::function<void(uint8_t* ptr, int64_t size, const Status&)>;
struct DebugState {
void Invoke(uint8_t* ptr, int64_t size, const Status& st) {
std::lock_guard<std::mutex> lock(mutex_);
if (handler_) {
handler_(ptr, size, st);
}
}
void SetHandler(MemoryDebugHandler handler) {
std::lock_guard<std::mutex> lock(mutex_);
handler_ = std::move(handler);
}
static DebugState* Instance() {
// Instance is constructed on-demand. If it was a global static variable,
// it could be constructed after being used.
static DebugState instance;
return &instance;
}
private:
DebugState() = default;
ARROW_DISALLOW_COPY_AND_ASSIGN(DebugState);
std::mutex mutex_;
MemoryDebugHandler handler_;
};
void DebugAbort(uint8_t* ptr, int64_t size, const Status& st) { st.Abort(); }
void DebugTrap(uint8_t* ptr, int64_t size, const Status& st) {
ARROW_LOG(ERROR) << st.ToString();
arrow::internal::DebugTrap();
}
void DebugWarn(uint8_t* ptr, int64_t size, const Status& st) {
ARROW_LOG(WARNING) << st.ToString();
}
bool IsDebugEnabled() {
static const bool is_enabled = []() {
auto maybe_env_value = internal::GetEnvVar(kDebugMemoryEnvVar);
if (!maybe_env_value.ok()) {
return false;
}
auto env_value = *std::move(maybe_env_value);
if (env_value.empty() || env_value == "none") {
return false;
}
auto debug_state = DebugState::Instance();
if (env_value == "abort") {
debug_state->SetHandler(DebugAbort);
return true;
}
if (env_value == "trap") {
debug_state->SetHandler(DebugTrap);
return true;
}
if (env_value == "warn") {
debug_state->SetHandler(DebugWarn);
return true;
}
ARROW_LOG(WARNING) << "Invalid value for " << kDebugMemoryEnvVar << ": '" << env_value
<< "'. Valid values are 'abort', 'trap', 'warn', 'none'.";
return false;
}();
return is_enabled;
}
// An allocator wrapper that adds a suffix at the end of allocation to check
// for writes beyond the allocated area.
template <typename WrappedAllocator>
class DebugAllocator {
public:
static Status AllocateAligned(int64_t size, int64_t alignment, uint8_t** out) {
if (size == 0) {
*out = memory_pool::internal::kZeroSizeArea;
} else {
ARROW_ASSIGN_OR_RAISE(int64_t raw_size, RawSize(size));
DCHECK(raw_size > size) << "bug in raw size computation: " << raw_size
<< " for size " << size;
RETURN_NOT_OK(WrappedAllocator::AllocateAligned(raw_size, alignment, out));
InitAllocatedArea(*out, size);
}
return Status::OK();
}
static void ReleaseUnused() { WrappedAllocator::ReleaseUnused(); }
static Status ReallocateAligned(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) {
CheckAllocatedArea(*ptr, old_size, "reallocation");
if (*ptr == memory_pool::internal::kZeroSizeArea) {
return AllocateAligned(new_size, alignment, ptr);
}
if (new_size == 0) {
// Note that an overflow check isn't needed as `old_size` is supposed to have
// been successfully passed to AllocateAligned() before.
WrappedAllocator::DeallocateAligned(*ptr, old_size + kOverhead, alignment);
*ptr = memory_pool::internal::kZeroSizeArea;
return Status::OK();
}
ARROW_ASSIGN_OR_RAISE(int64_t raw_new_size, RawSize(new_size));
DCHECK(raw_new_size > new_size)
<< "bug in raw size computation: " << raw_new_size << " for size " << new_size;
RETURN_NOT_OK(WrappedAllocator::ReallocateAligned(old_size + kOverhead, raw_new_size,
alignment, ptr));
InitAllocatedArea(*ptr, new_size);
return Status::OK();
}
static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t alignment) {
CheckAllocatedArea(ptr, size, "deallocation");
if (ptr != memory_pool::internal::kZeroSizeArea) {
WrappedAllocator::DeallocateAligned(ptr, size + kOverhead, alignment);
}
}
private:
static Result<int64_t> RawSize(int64_t size) {
if (ARROW_PREDICT_FALSE(internal::AddWithOverflow(size, kOverhead, &size))) {
return Status::OutOfMemory("Memory allocation size too large");
}
return size;
}
static void InitAllocatedArea(uint8_t* ptr, int64_t size) {
DCHECK_NE(size, 0);
util::SafeStore(ptr + size, size ^ memory_pool::internal::kDebugXorSuffix);
}
static void CheckAllocatedArea(uint8_t* ptr, int64_t size, const char* context) {
// Check that memory wasn't clobbered at the end of the allocated area.
int64_t stored_size =
memory_pool::internal::kDebugXorSuffix ^ util::SafeLoadAs<int64_t>(ptr + size);
if (ARROW_PREDICT_FALSE(stored_size != size)) {
auto st = Status::Invalid("Wrong size on ", context, ": given size = ", size,
", actual size = ", stored_size);
DebugState::Instance()->Invoke(ptr, size, st);
}
}
static constexpr int64_t kOverhead = sizeof(int64_t);
};
// Helper class directing allocations to the standard system allocator.
class SystemAllocator {
public:
// Allocate memory according to the alignment requirements for Arrow
// (as of May 2016 64 bytes)
static Status AllocateAligned(int64_t size, int64_t alignment, uint8_t** out) {
if (size == 0) {
*out = memory_pool::internal::kZeroSizeArea;
return Status::OK();
}
#ifdef _WIN32
// Special code path for Windows
*out = reinterpret_cast<uint8_t*>(
_aligned_malloc(static_cast<size_t>(size), static_cast<size_t>(alignment)));
if (!*out) {
return Status::OutOfMemory("malloc of size ", size, " failed");
}
#elif defined(sun) || defined(__sun)
*out = reinterpret_cast<uint8_t*>(
memalign(static_cast<size_t>(alignment), static_cast<size_t>(size)));
if (!*out) {
return Status::OutOfMemory("malloc of size ", size, " failed");
}
#else
const int result =
posix_memalign(reinterpret_cast<void**>(out), static_cast<size_t>(alignment),
static_cast<size_t>(size));
if (result == ENOMEM) {
return Status::OutOfMemory("malloc of size ", size, " failed");
}
if (result == EINVAL) {
return Status::Invalid("invalid alignment parameter: ",
static_cast<size_t>(alignment));
}
#endif
return Status::OK();
}
static Status ReallocateAligned(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) {
uint8_t* previous_ptr = *ptr;
if (previous_ptr == memory_pool::internal::kZeroSizeArea) {
DCHECK_EQ(old_size, 0);
return AllocateAligned(new_size, alignment, ptr);
}
if (new_size == 0) {
DeallocateAligned(previous_ptr, old_size, alignment);
*ptr = memory_pool::internal::kZeroSizeArea;
return Status::OK();
}
// Note: We cannot use realloc() here as it doesn't guarantee alignment.
// Allocate new chunk
uint8_t* out = nullptr;
RETURN_NOT_OK(AllocateAligned(new_size, alignment, &out));
DCHECK(out);
// Copy contents and release old memory chunk
memcpy(out, *ptr, static_cast<size_t>(std::min(new_size, old_size)));
#ifdef _WIN32
_aligned_free(*ptr);
#else
free(*ptr);
#endif // defined(_WIN32)
*ptr = out;
return Status::OK();
}
static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t /*alignment*/) {
if (ptr == memory_pool::internal::kZeroSizeArea) {
DCHECK_EQ(size, 0);
} else {
#ifdef _WIN32
_aligned_free(ptr);
#else
free(ptr);
#endif
}
}
static void ReleaseUnused() {
#ifdef __GLIBC__
// The return value of malloc_trim is not an error but to inform
// you if memory was actually released or not, which we do not care about here
ARROW_UNUSED(malloc_trim(0));
#endif
}
};
#ifdef ARROW_MIMALLOC
// Helper class directing allocations to the mimalloc allocator.
class MimallocAllocator {
public:
static Status AllocateAligned(int64_t size, int64_t alignment, uint8_t** out) {
if (size == 0) {
*out = memory_pool::internal::kZeroSizeArea;
return Status::OK();
}
*out = reinterpret_cast<uint8_t*>(
mi_malloc_aligned(static_cast<size_t>(size), static_cast<size_t>(alignment)));
if (*out == NULL) {
return Status::OutOfMemory("malloc of size ", size, " failed");
}
return Status::OK();
}
static void ReleaseUnused() { mi_collect(true); }
static Status ReallocateAligned(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) {
uint8_t* previous_ptr = *ptr;
if (previous_ptr == memory_pool::internal::kZeroSizeArea) {
DCHECK_EQ(old_size, 0);
return AllocateAligned(new_size, alignment, ptr);
}
if (new_size == 0) {
DeallocateAligned(previous_ptr, old_size, alignment);
*ptr = memory_pool::internal::kZeroSizeArea;
return Status::OK();
}
*ptr = reinterpret_cast<uint8_t*>(
mi_realloc_aligned(previous_ptr, static_cast<size_t>(new_size), alignment));
if (*ptr == NULL) {
*ptr = previous_ptr;
return Status::OutOfMemory("realloc of size ", new_size, " failed");
}
return Status::OK();
}
static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t /*alignment*/) {
if (ptr == memory_pool::internal::kZeroSizeArea) {
DCHECK_EQ(size, 0);
} else {
mi_free(ptr);
}
}
};
#endif // defined(ARROW_MIMALLOC)
} // namespace
int64_t MemoryPool::max_memory() const { return -1; }
///////////////////////////////////////////////////////////////////////
// MemoryPool implementation that delegates its core duty
// to an Allocator class.
#ifndef NDEBUG
static constexpr uint8_t kAllocPoison = 0xBC;
static constexpr uint8_t kReallocPoison = 0xBD;
static constexpr uint8_t kDeallocPoison = 0xBE;
#endif
template <typename Allocator>
class BaseMemoryPoolImpl : public MemoryPool {
public:
~BaseMemoryPoolImpl() override {}
Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override {
if (size < 0) {
return Status::Invalid("negative malloc size");
}
if (static_cast<uint64_t>(size) >= std::numeric_limits<size_t>::max()) {
return Status::OutOfMemory("malloc size overflows size_t");
}
RETURN_NOT_OK(Allocator::AllocateAligned(size, alignment, out));
#ifndef NDEBUG
// Poison data
if (size > 0) {
DCHECK_NE(*out, nullptr);
(*out)[0] = kAllocPoison;
(*out)[size - 1] = kAllocPoison;
}
#endif
stats_.UpdateAllocatedBytes(size);
return Status::OK();
}
Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) override {
if (new_size < 0) {
return Status::Invalid("negative realloc size");
}
if (static_cast<uint64_t>(new_size) >= std::numeric_limits<size_t>::max()) {
return Status::OutOfMemory("realloc overflows size_t");
}
RETURN_NOT_OK(Allocator::ReallocateAligned(old_size, new_size, alignment, ptr));
#ifndef NDEBUG
// Poison data
if (new_size > old_size) {
DCHECK_NE(*ptr, nullptr);
(*ptr)[old_size] = kReallocPoison;
(*ptr)[new_size - 1] = kReallocPoison;
}
#endif
stats_.UpdateAllocatedBytes(new_size - old_size);
return Status::OK();
}
void Free(uint8_t* buffer, int64_t size, int64_t alignment) override {
#ifndef NDEBUG
// Poison data
if (size > 0) {
DCHECK_NE(buffer, nullptr);
buffer[0] = kDeallocPoison;
buffer[size - 1] = kDeallocPoison;
}
#endif
Allocator::DeallocateAligned(buffer, size, alignment);
stats_.UpdateAllocatedBytes(-size, /*is_free*/ true);
}
void ReleaseUnused() override { Allocator::ReleaseUnused(); }
int64_t bytes_allocated() const override { return stats_.bytes_allocated(); }
int64_t max_memory() const override { return stats_.max_memory(); }
int64_t total_bytes_allocated() const override {
return stats_.total_bytes_allocated();
}
int64_t num_allocations() const override { return stats_.num_allocations(); }
protected:
internal::MemoryPoolStats stats_;
};
class SystemMemoryPool : public BaseMemoryPoolImpl<SystemAllocator> {
public:
std::string backend_name() const override { return "system"; }
};
class SystemDebugMemoryPool : public BaseMemoryPoolImpl<DebugAllocator<SystemAllocator>> {
public:
std::string backend_name() const override { return "system"; }
};
#ifdef ARROW_JEMALLOC
class JemallocMemoryPool
: public BaseMemoryPoolImpl<memory_pool::internal::JemallocAllocator> {
public:
std::string backend_name() const override { return "jemalloc"; }
};
class JemallocDebugMemoryPool
: public BaseMemoryPoolImpl<
DebugAllocator<memory_pool::internal::JemallocAllocator>> {
public:
std::string backend_name() const override { return "jemalloc"; }
};
#endif
#ifdef ARROW_MIMALLOC
class MimallocMemoryPool : public BaseMemoryPoolImpl<MimallocAllocator> {
public:
std::string backend_name() const override { return "mimalloc"; }
};
class MimallocDebugMemoryPool
: public BaseMemoryPoolImpl<DebugAllocator<MimallocAllocator>> {
public:
std::string backend_name() const override { return "mimalloc"; }
};
#endif
std::unique_ptr<MemoryPool> MemoryPool::CreateDefault() {
auto backend = DefaultBackend();
switch (backend) {
case MemoryPoolBackend::System:
return IsDebugEnabled() ? std::unique_ptr<MemoryPool>(new SystemDebugMemoryPool)
: std::unique_ptr<MemoryPool>(new SystemMemoryPool);
#ifdef ARROW_JEMALLOC
case MemoryPoolBackend::Jemalloc:
return IsDebugEnabled() ? std::unique_ptr<MemoryPool>(new JemallocDebugMemoryPool)
: std::unique_ptr<MemoryPool>(new JemallocMemoryPool);
#endif
#ifdef ARROW_MIMALLOC
case MemoryPoolBackend::Mimalloc:
return IsDebugEnabled() ? std::unique_ptr<MemoryPool>(new MimallocDebugMemoryPool)
: std::unique_ptr<MemoryPool>(new MimallocMemoryPool);
#endif
default:
ARROW_LOG(FATAL) << "Internal error: cannot create default memory pool";
return nullptr;
}
}
static struct GlobalState {
~GlobalState() { finalizing_.store(true, std::memory_order_relaxed); }
bool is_finalizing() const { return finalizing_.load(std::memory_order_relaxed); }
MemoryPool* system_memory_pool() {
if (IsDebugEnabled()) {
return &system_debug_pool_;
} else {
return &system_pool_;
}
}
#ifdef ARROW_JEMALLOC
MemoryPool* jemalloc_memory_pool() {
if (IsDebugEnabled()) {
return &jemalloc_debug_pool_;
} else {
return &jemalloc_pool_;
}
}
#endif
#ifdef ARROW_MIMALLOC
MemoryPool* mimalloc_memory_pool() {
if (IsDebugEnabled()) {
return &mimalloc_debug_pool_;
} else {
return &mimalloc_pool_;
}
}
#endif
private:
std::atomic<bool> finalizing_{false}; // constructed first, destroyed last
SystemMemoryPool system_pool_;
SystemDebugMemoryPool system_debug_pool_;
#ifdef ARROW_JEMALLOC
JemallocMemoryPool jemalloc_pool_;
JemallocDebugMemoryPool jemalloc_debug_pool_;
#endif
#ifdef ARROW_MIMALLOC
MimallocMemoryPool mimalloc_pool_;
MimallocDebugMemoryPool mimalloc_debug_pool_;
#endif
} global_state;
MemoryPool* system_memory_pool() { return global_state.system_memory_pool(); }
Status jemalloc_memory_pool(MemoryPool** out) {
#ifdef ARROW_JEMALLOC
*out = global_state.jemalloc_memory_pool();
return Status::OK();
#else
return Status::NotImplemented("This Arrow build does not enable jemalloc");
#endif
}
Status mimalloc_memory_pool(MemoryPool** out) {
#ifdef ARROW_MIMALLOC
*out = global_state.mimalloc_memory_pool();
return Status::OK();
#else
return Status::NotImplemented("This Arrow build does not enable mimalloc");
#endif
}
MemoryPool* default_memory_pool() {
auto backend = DefaultBackend();
switch (backend) {
case MemoryPoolBackend::System:
return global_state.system_memory_pool();
#ifdef ARROW_JEMALLOC
case MemoryPoolBackend::Jemalloc:
return global_state.jemalloc_memory_pool();
#endif
#ifdef ARROW_MIMALLOC
case MemoryPoolBackend::Mimalloc:
return global_state.mimalloc_memory_pool();
#endif
default:
ARROW_LOG(FATAL) << "Internal error: cannot create default memory pool";
return nullptr;
}
}
#ifndef ARROW_JEMALLOC
Status jemalloc_set_decay_ms(int ms) {
return Status::NotImplemented("jemalloc support is not built");
}
Result<int64_t> jemalloc_get_stat(const char* name) {
return Status::NotImplemented("jemalloc support is not built");
}
Status jemalloc_peak_reset() {
return Status::NotImplemented("jemalloc support is not built");
}
Status jemalloc_stats_print(const char* opts) {
return Status::NotImplemented("jemalloc support is not built");
}
Status jemalloc_stats_print(std::function<void(const char*)> write_cb, const char* opts) {
return Status::NotImplemented("jemalloc support is not built");
}
Result<std::string> jemalloc_stats_string(const char* opts) {
return Status::NotImplemented("jemalloc support is not built");
}
#endif
///////////////////////////////////////////////////////////////////////
// LoggingMemoryPool implementation
LoggingMemoryPool::LoggingMemoryPool(MemoryPool* pool) : pool_(pool) {}
Status LoggingMemoryPool::Allocate(int64_t size, int64_t alignment, uint8_t** out) {
Status s = pool_->Allocate(size, alignment, out);
std::cout << "Allocate: size = " << size << ", alignment = " << alignment << std::endl;
return s;
}
Status LoggingMemoryPool::Reallocate(int64_t old_size, int64_t new_size,
int64_t alignment, uint8_t** ptr) {
Status s = pool_->Reallocate(old_size, new_size, ptr);
std::cout << "Reallocate: old_size = " << old_size << ", new_size = " << new_size
<< ", alignment = " << alignment << std::endl;
return s;
}
void LoggingMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) {
pool_->Free(buffer, size, alignment);
std::cout << "Free: size = " << size << ", alignment = " << alignment << std::endl;
}
int64_t LoggingMemoryPool::bytes_allocated() const {
int64_t nb_bytes = pool_->bytes_allocated();
std::cout << "bytes_allocated: " << nb_bytes << std::endl;
return nb_bytes;
}
int64_t LoggingMemoryPool::max_memory() const {
int64_t mem = pool_->max_memory();
std::cout << "max_memory: " << mem << std::endl;
return mem;
}
int64_t LoggingMemoryPool::total_bytes_allocated() const {
int64_t mem = pool_->total_bytes_allocated();
std::cout << "total_bytes_allocated: " << mem << std::endl;
return mem;
}
int64_t LoggingMemoryPool::num_allocations() const {
int64_t mem = pool_->num_allocations();
std::cout << "num_allocations: " << mem << std::endl;
return mem;
}
std::string LoggingMemoryPool::backend_name() const { return pool_->backend_name(); }
///////////////////////////////////////////////////////////////////////
// ProxyMemoryPool implementation
class ProxyMemoryPool::ProxyMemoryPoolImpl {
public:
explicit ProxyMemoryPoolImpl(MemoryPool* pool) : pool_(pool) {}
Status Allocate(int64_t size, int64_t alignment, uint8_t** out) {
RETURN_NOT_OK(pool_->Allocate(size, alignment, out));
stats_.UpdateAllocatedBytes(size);
return Status::OK();
}
Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) {
RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, alignment, ptr));
stats_.UpdateAllocatedBytes(new_size - old_size);
return Status::OK();
}
void Free(uint8_t* buffer, int64_t size, int64_t alignment) {
pool_->Free(buffer, size, alignment);
stats_.UpdateAllocatedBytes(-size, /*is_free=*/true);
}
int64_t bytes_allocated() const { return stats_.bytes_allocated(); }
int64_t max_memory() const { return stats_.max_memory(); }
int64_t total_bytes_allocated() const { return stats_.total_bytes_allocated(); }
int64_t num_allocations() const { return stats_.num_allocations(); }
std::string backend_name() const { return pool_->backend_name(); }
private:
MemoryPool* pool_;
internal::MemoryPoolStats stats_;
};
ProxyMemoryPool::ProxyMemoryPool(MemoryPool* pool) {
impl_.reset(new ProxyMemoryPoolImpl(pool));
}
ProxyMemoryPool::~ProxyMemoryPool() {}
Status ProxyMemoryPool::Allocate(int64_t size, int64_t alignment, uint8_t** out) {
return impl_->Allocate(size, alignment, out);
}
Status ProxyMemoryPool::Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) {
return impl_->Reallocate(old_size, new_size, alignment, ptr);
}
void ProxyMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) {
return impl_->Free(buffer, size, alignment);
}
int64_t ProxyMemoryPool::bytes_allocated() const { return impl_->bytes_allocated(); }
int64_t ProxyMemoryPool::max_memory() const { return impl_->max_memory(); }
int64_t ProxyMemoryPool::total_bytes_allocated() const {
return impl_->total_bytes_allocated();
}
int64_t ProxyMemoryPool::num_allocations() const { return impl_->num_allocations(); }
std::string ProxyMemoryPool::backend_name() const { return impl_->backend_name(); }
std::vector<std::string> SupportedMemoryBackendNames() {
std::vector<std::string> supported;
for (const auto backend : SupportedBackends()) {
supported.push_back(backend.name);
}
return supported;
}
// -----------------------------------------------------------------------
// Pool buffer and allocation
/// A Buffer whose lifetime is tied to a particular MemoryPool
class PoolBuffer final : public ResizableBuffer {
public:
explicit PoolBuffer(std::shared_ptr<MemoryManager> mm, MemoryPool* pool,
int64_t alignment)
: ResizableBuffer(nullptr, 0, std::move(mm)), pool_(pool), alignment_(alignment) {}
~PoolBuffer() override {
// Avoid calling pool_->Free if the global pools are destroyed
// (XXX this will not work with user-defined pools)
// This can happen if a Future is destructing on one thread while or
// after memory pools are destructed on the main thread (as there is
// no guarantee of destructor order between thread/memory pools)
uint8_t* ptr = mutable_data();
if (ptr && !global_state.is_finalizing()) {
pool_->Free(ptr, capacity_, alignment_);
}
}
Status Reserve(const int64_t capacity) override {
if (capacity < 0) {
return Status::Invalid("Negative buffer capacity: ", capacity);
}
uint8_t* ptr = mutable_data();
if (!ptr || capacity > capacity_) {
int64_t new_capacity = bit_util::RoundUpToMultipleOf64(capacity);
if (ptr) {
RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, alignment_, &ptr));
} else {
RETURN_NOT_OK(pool_->Allocate(new_capacity, alignment_, &ptr));
}
data_ = ptr;
capacity_ = new_capacity;
}
return Status::OK();
}
Status Resize(const int64_t new_size, bool shrink_to_fit = true) override {
if (ARROW_PREDICT_FALSE(new_size < 0)) {
return Status::Invalid("Negative buffer resize: ", new_size);
}
uint8_t* ptr = mutable_data();
if (ptr && shrink_to_fit && new_size <= size_) {
// Buffer is non-null and is not growing, so shrink to the requested size without
// excess space.
int64_t new_capacity = bit_util::RoundUpToMultipleOf64(new_size);
if (capacity_ != new_capacity) {
// Buffer hasn't got yet the requested size.
RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, alignment_, &ptr));
data_ = ptr;
capacity_ = new_capacity;
}
} else {
RETURN_NOT_OK(Reserve(new_size));
}
size_ = new_size;
return Status::OK();
}
static std::shared_ptr<PoolBuffer> MakeShared(MemoryPool* pool, int64_t alignment) {
std::shared_ptr<MemoryManager> mm;
if (pool == nullptr) {
pool = default_memory_pool();
mm = default_cpu_memory_manager();
} else {
mm = CPUDevice::memory_manager(pool);
}
return std::make_shared<PoolBuffer>(std::move(mm), pool, alignment);
}
static std::unique_ptr<PoolBuffer> MakeUnique(MemoryPool* pool, int64_t alignment) {
std::shared_ptr<MemoryManager> mm;
if (pool == nullptr) {
pool = default_memory_pool();
mm = default_cpu_memory_manager();
} else {
mm = CPUDevice::memory_manager(pool);
}
return std::make_unique<PoolBuffer>(std::move(mm), pool, alignment);
}
private:
MemoryPool* pool_;
int64_t alignment_;
};
namespace {
// A utility that does most of the work of the `AllocateBuffer` and
// `AllocateResizableBuffer` methods. The argument `buffer` should be a smart pointer to
// a PoolBuffer.
template <typename BufferPtr, typename PoolBufferPtr>
inline Result<BufferPtr> ResizePoolBuffer(PoolBufferPtr&& buffer, const int64_t size) {
RETURN_NOT_OK(buffer->Resize(size));
buffer->ZeroPadding();
return std::move(buffer);
}
} // namespace
Result<std::unique_ptr<Buffer>> AllocateBuffer(const int64_t size, MemoryPool* pool) {
return AllocateBuffer(size, kDefaultBufferAlignment, pool);
}
Result<std::unique_ptr<Buffer>> AllocateBuffer(const int64_t size,
const int64_t alignment,
MemoryPool* pool) {
return ResizePoolBuffer<std::unique_ptr<Buffer>>(
PoolBuffer::MakeUnique(pool, alignment), size);
}
Result<std::unique_ptr<ResizableBuffer>> AllocateResizableBuffer(const int64_t size,
MemoryPool* pool) {
return AllocateResizableBuffer(size, kDefaultBufferAlignment, pool);
}
Result<std::unique_ptr<ResizableBuffer>> AllocateResizableBuffer(const int64_t size,
const int64_t alignment,
MemoryPool* pool) {
return ResizePoolBuffer<std::unique_ptr<ResizableBuffer>>(
PoolBuffer::MakeUnique(pool, alignment), size);
}
} // namespace arrow