blob: 793a2c8e5dfa0446475cd04dc3afce52cefe0d87 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/memory_pool.h"
#include <algorithm> // IWYU pragma: keep
#include <cstdlib> // IWYU pragma: keep
#include <cstring> // IWYU pragma: keep
#include <iostream> // IWYU pragma: keep
#include <limits>
#include <memory>
#if defined(sun) || defined(__sun)
#include <stdlib.h>
#endif
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h" // IWYU pragma: keep
#include "arrow/util/optional.h"
#include "arrow/util/string.h"
#ifdef ARROW_JEMALLOC
// Needed to support jemalloc 3 and 4
#define JEMALLOC_MANGLE
// Explicitly link to our version of jemalloc
#include "jemalloc_ep/dist/include/jemalloc/jemalloc.h"
#endif
#ifdef ARROW_MIMALLOC
#include <mimalloc.h>
#endif
#ifdef ARROW_JEMALLOC
// Compile-time configuration for jemalloc options.
// Note the prefix ("je_arrow_") must match the symbol prefix given when
// building jemalloc.
// See discussion in https://github.com/jemalloc/jemalloc/issues/1621
// ARROW-6910(wesm): we found that jemalloc's default behavior with respect to
// dirty / muzzy pages (see definitions of these in the jemalloc documentation)
// conflicted with user expectations, and would even cause memory use problems
// in some cases. By enabling the background_thread option and reducing the
// decay time from 10 seconds to 1 seconds, memory is released more
// aggressively (and in the background) to the OS. This can be configured
// further by using the arrow::jemalloc_set_decay_ms API
#undef USE_JEMALLOC_BACKGROUND_THREAD
#ifndef __APPLE__
// ARROW-6977: jemalloc's background_thread isn't always enabled on macOS
#define USE_JEMALLOC_BACKGROUND_THREAD
#endif
// In debug mode, add memory poisoning on alloc / free
#ifdef NDEBUG
#define JEMALLOC_DEBUG_OPTIONS ""
#else
#define JEMALLOC_DEBUG_OPTIONS ",junk:true"
#endif
const char* je_arrow_malloc_conf =
("oversize_threshold:0"
#ifdef USE_JEMALLOC_BACKGROUND_THREAD
",dirty_decay_ms:1000"
",muzzy_decay_ms:1000"
",background_thread:true"
#else
// ARROW-6994: return memory immediately to the OS if the
// background_thread option isn't available
",dirty_decay_ms:0"
",muzzy_decay_ms:0"
#endif
JEMALLOC_DEBUG_OPTIONS); // NOLINT: whitespace/parens
#endif // ARROW_JEMALLOC
namespace arrow {
namespace {
constexpr size_t kAlignment = 64;
constexpr char kDefaultBackendEnvVar[] = "ARROW_DEFAULT_MEMORY_POOL";
enum class MemoryPoolBackend : uint8_t { System, Jemalloc, Mimalloc };
struct SupportedBackend {
const char* name;
MemoryPoolBackend backend;
};
// See ARROW-12248 for why we use static in-function singletons rather than
// global constants below (in SupportedBackends() and UserSelectedBackend()).
// In some contexts (especially R bindings) `default_memory_pool()` may be
// called before all globals are initialized, and then the ARROW_DEFAULT_MEMORY_POOL
// environment variable would be ignored.
const std::vector<SupportedBackend>& SupportedBackends() {
static std::vector<SupportedBackend> backends = {
// ARROW-12316: Apple => mimalloc first, then jemalloc
// non-Apple => jemalloc first, then mimalloc
#if defined(ARROW_JEMALLOC) && !defined(__APPLE__)
{"jemalloc", MemoryPoolBackend::Jemalloc},
#endif
#ifdef ARROW_MIMALLOC
{"mimalloc", MemoryPoolBackend::Mimalloc},
#endif
#if defined(ARROW_JEMALLOC) && defined(__APPLE__)
{"jemalloc", MemoryPoolBackend::Jemalloc},
#endif
{"system", MemoryPoolBackend::System}
};
return backends;
}
// Return the MemoryPoolBackend selected by the user through the
// ARROW_DEFAULT_MEMORY_POOL environment variable, if any.
util::optional<MemoryPoolBackend> UserSelectedBackend() {
static auto user_selected_backend = []() -> util::optional<MemoryPoolBackend> {
auto unsupported_backend = [](const std::string& name) {
std::vector<std::string> supported;
for (const auto backend : SupportedBackends()) {
supported.push_back(std::string("'") + backend.name + "'");
}
ARROW_LOG(WARNING) << "Unsupported backend '" << name << "' specified in "
<< kDefaultBackendEnvVar << " (supported backends are "
<< internal::JoinStrings(supported, ", ") << ")";
};
auto maybe_name = internal::GetEnvVar(kDefaultBackendEnvVar);
if (!maybe_name.ok()) {
return {};
}
const auto name = *std::move(maybe_name);
if (name.empty()) {
// An empty environment variable is considered missing
return {};
}
const auto found = std::find_if(
SupportedBackends().begin(), SupportedBackends().end(),
[&](const SupportedBackend& backend) { return name == backend.name; });
if (found != SupportedBackends().end()) {
return found->backend;
}
unsupported_backend(name);
return {};
}();
return user_selected_backend;
}
MemoryPoolBackend DefaultBackend() {
auto backend = UserSelectedBackend();
if (backend.has_value()) {
return backend.value();
}
struct SupportedBackend default_backend = SupportedBackends().front();
return default_backend.backend;
}
// A static piece of memory for 0-size allocations, so as to return
// an aligned non-null pointer.
alignas(kAlignment) static uint8_t zero_size_area[1];
// Helper class directing allocations to the standard system allocator.
class SystemAllocator {
public:
// Allocate memory according to the alignment requirements for Arrow
// (as of May 2016 64 bytes)
static Status AllocateAligned(int64_t size, uint8_t** out) {
if (size == 0) {
*out = zero_size_area;
return Status::OK();
}
#ifdef _WIN32
// Special code path for Windows
*out = reinterpret_cast<uint8_t*>(
_aligned_malloc(static_cast<size_t>(size), kAlignment));
if (!*out) {
return Status::OutOfMemory("malloc of size ", size, " failed");
}
#elif defined(sun) || defined(__sun)
*out = reinterpret_cast<uint8_t*>(memalign(kAlignment, static_cast<size_t>(size)));
if (!*out) {
return Status::OutOfMemory("malloc of size ", size, " failed");
}
#else
const int result = posix_memalign(reinterpret_cast<void**>(out), kAlignment,
static_cast<size_t>(size));
if (result == ENOMEM) {
return Status::OutOfMemory("malloc of size ", size, " failed");
}
if (result == EINVAL) {
return Status::Invalid("invalid alignment parameter: ", kAlignment);
}
#endif
return Status::OK();
}
static Status ReallocateAligned(int64_t old_size, int64_t new_size, uint8_t** ptr) {
uint8_t* previous_ptr = *ptr;
if (previous_ptr == zero_size_area) {
DCHECK_EQ(old_size, 0);
return AllocateAligned(new_size, ptr);
}
if (new_size == 0) {
DeallocateAligned(previous_ptr, old_size);
*ptr = zero_size_area;
return Status::OK();
}
// Note: We cannot use realloc() here as it doesn't guarantee alignment.
// Allocate new chunk
uint8_t* out = nullptr;
RETURN_NOT_OK(AllocateAligned(new_size, &out));
DCHECK(out);
// Copy contents and release old memory chunk
memcpy(out, *ptr, static_cast<size_t>(std::min(new_size, old_size)));
#ifdef _WIN32
_aligned_free(*ptr);
#else
free(*ptr);
#endif // defined(_WIN32)
*ptr = out;
return Status::OK();
}
static void DeallocateAligned(uint8_t* ptr, int64_t size) {
if (ptr == zero_size_area) {
DCHECK_EQ(size, 0);
} else {
#ifdef _WIN32
_aligned_free(ptr);
#else
free(ptr);
#endif
}
}
};
#ifdef ARROW_JEMALLOC
// Helper class directing allocations to the jemalloc allocator.
class JemallocAllocator {
public:
static Status AllocateAligned(int64_t size, uint8_t** out) {
if (size == 0) {
*out = zero_size_area;
return Status::OK();
}
*out = reinterpret_cast<uint8_t*>(
mallocx(static_cast<size_t>(size), MALLOCX_ALIGN(kAlignment)));
if (*out == NULL) {
return Status::OutOfMemory("malloc of size ", size, " failed");
}
return Status::OK();
}
static Status ReallocateAligned(int64_t old_size, int64_t new_size, uint8_t** ptr) {
uint8_t* previous_ptr = *ptr;
if (previous_ptr == zero_size_area) {
DCHECK_EQ(old_size, 0);
return AllocateAligned(new_size, ptr);
}
if (new_size == 0) {
DeallocateAligned(previous_ptr, old_size);
*ptr = zero_size_area;
return Status::OK();
}
*ptr = reinterpret_cast<uint8_t*>(
rallocx(*ptr, static_cast<size_t>(new_size), MALLOCX_ALIGN(kAlignment)));
if (*ptr == NULL) {
*ptr = previous_ptr;
return Status::OutOfMemory("realloc of size ", new_size, " failed");
}
return Status::OK();
}
static void DeallocateAligned(uint8_t* ptr, int64_t size) {
if (ptr == zero_size_area) {
DCHECK_EQ(size, 0);
} else {
dallocx(ptr, MALLOCX_ALIGN(kAlignment));
}
}
};
#endif // defined(ARROW_JEMALLOC)
#ifdef ARROW_MIMALLOC
// Helper class directing allocations to the mimalloc allocator.
class MimallocAllocator {
public:
static Status AllocateAligned(int64_t size, uint8_t** out) {
if (size == 0) {
*out = zero_size_area;
return Status::OK();
}
*out = reinterpret_cast<uint8_t*>(
mi_malloc_aligned(static_cast<size_t>(size), kAlignment));
if (*out == NULL) {
return Status::OutOfMemory("malloc of size ", size, " failed");
}
return Status::OK();
}
static Status ReallocateAligned(int64_t old_size, int64_t new_size, uint8_t** ptr) {
uint8_t* previous_ptr = *ptr;
if (previous_ptr == zero_size_area) {
DCHECK_EQ(old_size, 0);
return AllocateAligned(new_size, ptr);
}
if (new_size == 0) {
DeallocateAligned(previous_ptr, old_size);
*ptr = zero_size_area;
return Status::OK();
}
*ptr = reinterpret_cast<uint8_t*>(
mi_realloc_aligned(previous_ptr, static_cast<size_t>(new_size), kAlignment));
if (*ptr == NULL) {
*ptr = previous_ptr;
return Status::OutOfMemory("realloc of size ", new_size, " failed");
}
return Status::OK();
}
static void DeallocateAligned(uint8_t* ptr, int64_t size) {
if (ptr == zero_size_area) {
DCHECK_EQ(size, 0);
} else {
mi_free(ptr);
}
}
};
#endif // defined(ARROW_MIMALLOC)
} // namespace
int64_t MemoryPool::max_memory() const { return -1; }
///////////////////////////////////////////////////////////////////////
// MemoryPool implementation that delegates its core duty
// to an Allocator class.
#ifndef NDEBUG
static constexpr uint8_t kAllocPoison = 0xBC;
static constexpr uint8_t kReallocPoison = 0xBD;
static constexpr uint8_t kDeallocPoison = 0xBE;
#endif
template <typename Allocator>
class BaseMemoryPoolImpl : public MemoryPool {
public:
~BaseMemoryPoolImpl() override {}
Status Allocate(int64_t size, uint8_t** out) override {
if (size < 0) {
return Status::Invalid("negative malloc size");
}
if (static_cast<uint64_t>(size) >= std::numeric_limits<size_t>::max()) {
return Status::CapacityError("malloc size overflows size_t");
}
RETURN_NOT_OK(Allocator::AllocateAligned(size, out));
#ifndef NDEBUG
// Poison data
if (size > 0) {
DCHECK_NE(*out, nullptr);
(*out)[0] = kAllocPoison;
(*out)[size - 1] = kAllocPoison;
}
#endif
stats_.UpdateAllocatedBytes(size);
return Status::OK();
}
Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override {
if (new_size < 0) {
return Status::Invalid("negative realloc size");
}
if (static_cast<uint64_t>(new_size) >= std::numeric_limits<size_t>::max()) {
return Status::CapacityError("realloc overflows size_t");
}
RETURN_NOT_OK(Allocator::ReallocateAligned(old_size, new_size, ptr));
#ifndef NDEBUG
// Poison data
if (new_size > old_size) {
DCHECK_NE(*ptr, nullptr);
(*ptr)[old_size] = kReallocPoison;
(*ptr)[new_size - 1] = kReallocPoison;
}
#endif
stats_.UpdateAllocatedBytes(new_size - old_size);
return Status::OK();
}
void Free(uint8_t* buffer, int64_t size) override {
#ifndef NDEBUG
// Poison data
if (size > 0) {
DCHECK_NE(buffer, nullptr);
buffer[0] = kDeallocPoison;
buffer[size - 1] = kDeallocPoison;
}
#endif
Allocator::DeallocateAligned(buffer, size);
stats_.UpdateAllocatedBytes(-size);
}
int64_t bytes_allocated() const override { return stats_.bytes_allocated(); }
int64_t max_memory() const override { return stats_.max_memory(); }
protected:
internal::MemoryPoolStats stats_;
};
class SystemMemoryPool : public BaseMemoryPoolImpl<SystemAllocator> {
public:
std::string backend_name() const override { return "system"; }
};
#ifdef ARROW_JEMALLOC
class JemallocMemoryPool : public BaseMemoryPoolImpl<JemallocAllocator> {
public:
std::string backend_name() const override { return "jemalloc"; }
};
#endif
#ifdef ARROW_MIMALLOC
class MimallocMemoryPool : public BaseMemoryPoolImpl<MimallocAllocator> {
public:
std::string backend_name() const override { return "mimalloc"; }
};
#endif
std::unique_ptr<MemoryPool> MemoryPool::CreateDefault() {
auto backend = DefaultBackend();
switch (backend) {
case MemoryPoolBackend::System:
return std::unique_ptr<MemoryPool>(new SystemMemoryPool);
#ifdef ARROW_JEMALLOC
case MemoryPoolBackend::Jemalloc:
return std::unique_ptr<MemoryPool>(new JemallocMemoryPool);
#endif
#ifdef ARROW_MIMALLOC
case MemoryPoolBackend::Mimalloc:
return std::unique_ptr<MemoryPool>(new MimallocMemoryPool);
#endif
default:
ARROW_LOG(FATAL) << "Internal error: cannot create default memory pool";
return nullptr;
}
}
static SystemMemoryPool system_pool;
#ifdef ARROW_JEMALLOC
static JemallocMemoryPool jemalloc_pool;
#endif
#ifdef ARROW_MIMALLOC
static MimallocMemoryPool mimalloc_pool;
#endif
MemoryPool* system_memory_pool() { return &system_pool; }
Status jemalloc_memory_pool(MemoryPool** out) {
#ifdef ARROW_JEMALLOC
*out = &jemalloc_pool;
return Status::OK();
#else
return Status::NotImplemented("This Arrow build does not enable jemalloc");
#endif
}
Status mimalloc_memory_pool(MemoryPool** out) {
#ifdef ARROW_MIMALLOC
*out = &mimalloc_pool;
return Status::OK();
#else
return Status::NotImplemented("This Arrow build does not enable mimalloc");
#endif
}
MemoryPool* default_memory_pool() {
auto backend = DefaultBackend();
switch (backend) {
case MemoryPoolBackend::System:
return &system_pool;
#ifdef ARROW_JEMALLOC
case MemoryPoolBackend::Jemalloc:
return &jemalloc_pool;
#endif
#ifdef ARROW_MIMALLOC
case MemoryPoolBackend::Mimalloc:
return &mimalloc_pool;
#endif
default:
ARROW_LOG(FATAL) << "Internal error: cannot create default memory pool";
return nullptr;
}
}
#define RETURN_IF_JEMALLOC_ERROR(ERR) \
do { \
if (err != 0) { \
return Status::UnknownError(std::strerror(ERR)); \
} \
} while (0)
Status jemalloc_set_decay_ms(int ms) {
#ifdef ARROW_JEMALLOC
ssize_t decay_time_ms = static_cast<ssize_t>(ms);
int err = mallctl("arenas.dirty_decay_ms", nullptr, nullptr, &decay_time_ms,
sizeof(decay_time_ms));
RETURN_IF_JEMALLOC_ERROR(err);
err = mallctl("arenas.muzzy_decay_ms", nullptr, nullptr, &decay_time_ms,
sizeof(decay_time_ms));
RETURN_IF_JEMALLOC_ERROR(err);
return Status::OK();
#else
return Status::Invalid("jemalloc support is not built");
#endif
}
///////////////////////////////////////////////////////////////////////
// LoggingMemoryPool implementation
LoggingMemoryPool::LoggingMemoryPool(MemoryPool* pool) : pool_(pool) {}
Status LoggingMemoryPool::Allocate(int64_t size, uint8_t** out) {
Status s = pool_->Allocate(size, out);
std::cout << "Allocate: size = " << size << std::endl;
return s;
}
Status LoggingMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
Status s = pool_->Reallocate(old_size, new_size, ptr);
std::cout << "Reallocate: old_size = " << old_size << " - new_size = " << new_size
<< std::endl;
return s;
}
void LoggingMemoryPool::Free(uint8_t* buffer, int64_t size) {
pool_->Free(buffer, size);
std::cout << "Free: size = " << size << std::endl;
}
int64_t LoggingMemoryPool::bytes_allocated() const {
int64_t nb_bytes = pool_->bytes_allocated();
std::cout << "bytes_allocated: " << nb_bytes << std::endl;
return nb_bytes;
}
int64_t LoggingMemoryPool::max_memory() const {
int64_t mem = pool_->max_memory();
std::cout << "max_memory: " << mem << std::endl;
return mem;
}
std::string LoggingMemoryPool::backend_name() const { return pool_->backend_name(); }
///////////////////////////////////////////////////////////////////////
// ProxyMemoryPool implementation
class ProxyMemoryPool::ProxyMemoryPoolImpl {
public:
explicit ProxyMemoryPoolImpl(MemoryPool* pool) : pool_(pool) {}
Status Allocate(int64_t size, uint8_t** out) {
RETURN_NOT_OK(pool_->Allocate(size, out));
stats_.UpdateAllocatedBytes(size);
return Status::OK();
}
Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, ptr));
stats_.UpdateAllocatedBytes(new_size - old_size);
return Status::OK();
}
void Free(uint8_t* buffer, int64_t size) {
pool_->Free(buffer, size);
stats_.UpdateAllocatedBytes(-size);
}
int64_t bytes_allocated() const { return stats_.bytes_allocated(); }
int64_t max_memory() const { return stats_.max_memory(); }
std::string backend_name() const { return pool_->backend_name(); }
private:
MemoryPool* pool_;
internal::MemoryPoolStats stats_;
};
ProxyMemoryPool::ProxyMemoryPool(MemoryPool* pool) {
impl_.reset(new ProxyMemoryPoolImpl(pool));
}
ProxyMemoryPool::~ProxyMemoryPool() {}
Status ProxyMemoryPool::Allocate(int64_t size, uint8_t** out) {
return impl_->Allocate(size, out);
}
Status ProxyMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
return impl_->Reallocate(old_size, new_size, ptr);
}
void ProxyMemoryPool::Free(uint8_t* buffer, int64_t size) {
return impl_->Free(buffer, size);
}
int64_t ProxyMemoryPool::bytes_allocated() const { return impl_->bytes_allocated(); }
int64_t ProxyMemoryPool::max_memory() const { return impl_->max_memory(); }
std::string ProxyMemoryPool::backend_name() const { return impl_->backend_name(); }
std::vector<std::string> SupportedMemoryBackendNames() {
std::vector<std::string> supported;
for (const auto backend : SupportedBackends()) {
supported.push_back(backend.name);
}
return supported;
}
} // namespace arrow