blob: 032a12fcfac360f5b5fca9b181a81bac43d13fa4 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// PLASMA STORE: This is a simple object store server process
//
// It accepts incoming client connections on a unix domain socket
// (name passed in via the -s option of the executable) and uses a
// single thread to serve the clients. Each client establishes a
// connection and can create objects, wait for objects and seal
// objects through that connection.
//
// It keeps a hash table that maps object_ids (which are 20 byte long,
// just enough to store and SHA1 hash) to memory mapped files.
#include "plasma/store.h"
#include <assert.h>
#include <fcntl.h>
#include <getopt.h>
#include <limits.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/statvfs.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include <ctime>
#include <deque>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include "arrow/status.h"
#include "arrow/util/config.h"
#include "plasma/common.h"
#include "plasma/common_generated.h"
#include "plasma/fling.h"
#include "plasma/io.h"
#include "plasma/malloc.h"
#include "plasma/plasma_allocator.h"
#include "plasma/protocol.h"
#ifdef PLASMA_CUDA
#include "arrow/gpu/cuda_api.h"
using arrow::cuda::CudaBuffer;
using arrow::cuda::CudaContext;
using arrow::cuda::CudaDeviceManager;
#endif
using arrow::util::ArrowLog;
using arrow::util::ArrowLogLevel;
namespace fb = plasma::flatbuf;
namespace plasma {
void SetMallocGranularity(int value);
struct GetRequest {
GetRequest(Client* client, const std::vector<ObjectID>& object_ids);
/// The client that called get.
Client* client;
/// The ID of the timer that will time out and cause this wait to return to
/// the client if it hasn't already returned.
int64_t timer;
/// The object IDs involved in this request. This is used in the reply.
std::vector<ObjectID> object_ids;
/// The object information for the objects in this request. This is used in
/// the reply.
std::unordered_map<ObjectID, PlasmaObject> objects;
/// The minimum number of objects to wait for in this request.
int64_t num_objects_to_wait_for;
/// The number of object requests in this wait request that are already
/// satisfied.
int64_t num_satisfied;
};
GetRequest::GetRequest(Client* client, const std::vector<ObjectID>& object_ids)
: client(client),
timer(-1),
object_ids(object_ids.begin(), object_ids.end()),
objects(object_ids.size()),
num_satisfied(0) {
std::unordered_set<ObjectID> unique_ids(object_ids.begin(), object_ids.end());
num_objects_to_wait_for = unique_ids.size();
}
Client::Client(int fd) : fd(fd), notification_fd(-1) {}
PlasmaStore::PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_enabled,
const std::string& socket_name,
std::shared_ptr<ExternalStore> external_store)
: loop_(loop),
eviction_policy_(&store_info_, PlasmaAllocator::GetFootprintLimit()),
external_store_(external_store) {
store_info_.directory = directory;
store_info_.hugepages_enabled = hugepages_enabled;
}
// TODO(pcm): Get rid of this destructor by using RAII to clean up data.
PlasmaStore::~PlasmaStore() {}
const PlasmaStoreInfo* PlasmaStore::GetPlasmaStoreInfo() { return &store_info_; }
// If this client is not already using the object, add the client to the
// object's list of clients, otherwise do nothing.
void PlasmaStore::AddToClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry,
Client* client) {
// Check if this client is already using the object.
if (client->object_ids.find(object_id) != client->object_ids.end()) {
return;
}
// If there are no other clients using this object, notify the eviction policy
// that the object is being used.
if (entry->ref_count == 0) {
// Tell the eviction policy that this object is being used.
eviction_policy_.BeginObjectAccess(object_id);
}
// Increase reference count.
entry->ref_count++;
// Add object id to the list of object ids that this client is using.
client->object_ids.insert(object_id);
}
// Allocate memory
uint8_t* PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, int* fd,
int64_t* map_size, ptrdiff_t* offset, Client* client,
bool is_create) {
// First free up space from the client's LRU queue if quota enforcement is on.
if (evict_if_full) {
std::vector<ObjectID> client_objects_to_evict;
bool quota_ok = eviction_policy_.EnforcePerClientQuota(client, size, is_create,
&client_objects_to_evict);
if (!quota_ok) {
return nullptr;
}
EvictObjects(client_objects_to_evict);
}
// Try to evict objects until there is enough space.
uint8_t* pointer = nullptr;
while (true) {
// Allocate space for the new object. We use memalign instead of malloc
// in order to align the allocated region to a 64-byte boundary. This is not
// strictly necessary, but it is an optimization that could speed up the
// computation of a hash of the data (see compute_object_hash_parallel in
// plasma_client.cc). Note that even though this pointer is 64-byte aligned,
// it is not guaranteed that the corresponding pointer in the client will be
// 64-byte aligned, but in practice it often will be.
pointer = reinterpret_cast<uint8_t*>(PlasmaAllocator::Memalign(kBlockSize, size));
if (pointer || !evict_if_full) {
// If we manage to allocate the memory, return the pointer. If we cannot
// allocate the space, but we are also not allowed to evict anything to
// make more space, return an error to the client.
break;
}
// Tell the eviction policy how much space we need to create this object.
std::vector<ObjectID> objects_to_evict;
bool success = eviction_policy_.RequireSpace(size, &objects_to_evict);
EvictObjects(objects_to_evict);
// Return an error to the client if not enough space could be freed to
// create the object.
if (!success) {
break;
}
}
if (pointer != nullptr) {
GetMallocMapinfo(pointer, fd, map_size, offset);
ARROW_CHECK(*fd != -1);
}
return pointer;
}
#ifdef PLASMA_CUDA
arrow::Result<std::shared_ptr<CudaContext>> PlasmaStore::GetCudaContext(int device_num) {
DCHECK_NE(device_num, 0);
ARROW_ASSIGN_OR_RAISE(auto manager, CudaDeviceManager::Instance());
return manager->GetContext(device_num - 1);
}
Status PlasmaStore::AllocateCudaMemory(
int device_num, int64_t size, uint8_t** out_pointer,
std::shared_ptr<CudaIpcMemHandle>* out_ipc_handle) {
ARROW_ASSIGN_OR_RAISE(auto context, GetCudaContext(device_num));
ARROW_ASSIGN_OR_RAISE(auto cuda_buffer, context->Allocate(static_cast<int64_t>(size)));
*out_pointer = reinterpret_cast<uint8_t*>(cuda_buffer->address());
// The IPC handle will keep the buffer memory alive
return cuda_buffer->ExportForIpc().Value(out_ipc_handle);
}
Status PlasmaStore::FreeCudaMemory(int device_num, int64_t size, uint8_t* pointer) {
ARROW_ASSIGN_OR_RAISE(auto context, GetCudaContext(device_num));
RETURN_NOT_OK(context->Free(pointer, size));
return Status::OK();
}
#endif
// Create a new object buffer in the hash table.
PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_full,
int64_t data_size, int64_t metadata_size,
int device_num, Client* client,
PlasmaObject* result) {
ARROW_LOG(DEBUG) << "creating object " << object_id.hex();
auto entry = GetObjectTableEntry(&store_info_, object_id);
if (entry != nullptr) {
// There is already an object with the same ID in the Plasma Store, so
// ignore this request.
return PlasmaError::ObjectExists;
}
int fd = -1;
int64_t map_size = 0;
ptrdiff_t offset = 0;
uint8_t* pointer = nullptr;
auto total_size = data_size + metadata_size;
if (device_num == 0) {
pointer =
AllocateMemory(total_size, evict_if_full, &fd, &map_size, &offset, client, true);
if (!pointer) {
ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.hex()
<< ", data_size=" << data_size
<< ", metadata_size=" << metadata_size
<< ", will send a reply of PlasmaError::OutOfMemory";
return PlasmaError::OutOfMemory;
}
} else {
#ifdef PLASMA_CUDA
/// IPC GPU handle to share with clients.
std::shared_ptr<::arrow::cuda::CudaIpcMemHandle> ipc_handle;
auto st = AllocateCudaMemory(device_num, total_size, &pointer, &ipc_handle);
if (!st.ok()) {
ARROW_LOG(ERROR) << "Failed to allocate CUDA memory: " << st.ToString();
return PlasmaError::OutOfMemory;
}
result->ipc_handle = ipc_handle;
#else
ARROW_LOG(ERROR) << "device_num != 0 but CUDA not enabled";
return PlasmaError::OutOfMemory;
#endif
}
auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
entry = store_info_.objects.emplace(object_id, std::move(ptr)).first->second.get();
entry->data_size = data_size;
entry->metadata_size = metadata_size;
entry->pointer = pointer;
// TODO(pcm): Set the other fields.
entry->fd = fd;
entry->map_size = map_size;
entry->offset = offset;
entry->state = ObjectState::PLASMA_CREATED;
entry->device_num = device_num;
entry->create_time = std::time(nullptr);
entry->construct_duration = -1;
#ifdef PLASMA_CUDA
entry->ipc_handle = result->ipc_handle;
#endif
result->store_fd = fd;
result->data_offset = offset;
result->metadata_offset = offset + data_size;
result->data_size = data_size;
result->metadata_size = metadata_size;
result->device_num = device_num;
// Notify the eviction policy that this object was created. This must be done
// immediately before the call to AddToClientObjectIds so that the
// eviction policy does not have an opportunity to evict the object.
eviction_policy_.ObjectCreated(object_id, client, true);
// Record that this client is using this object.
AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client);
return PlasmaError::OK;
}
void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) {
DCHECK(object != nullptr);
DCHECK(entry != nullptr);
DCHECK(entry->state == ObjectState::PLASMA_SEALED);
#ifdef PLASMA_CUDA
if (entry->device_num != 0) {
object->ipc_handle = entry->ipc_handle;
}
#endif
object->store_fd = entry->fd;
object->data_offset = entry->offset;
object->metadata_offset = entry->offset + entry->data_size;
object->data_size = entry->data_size;
object->metadata_size = entry->metadata_size;
object->device_num = entry->device_num;
}
void PlasmaStore::RemoveGetRequest(GetRequest* get_request) {
// Remove the get request from each of the relevant object_get_requests hash
// tables if it is present there. It should only be present there if the get
// request timed out or if it was issued by a client that has disconnected.
for (ObjectID& object_id : get_request->object_ids) {
auto object_request_iter = object_get_requests_.find(object_id);
if (object_request_iter != object_get_requests_.end()) {
auto& get_requests = object_request_iter->second;
// Erase get_req from the vector.
auto it = std::find(get_requests.begin(), get_requests.end(), get_request);
if (it != get_requests.end()) {
get_requests.erase(it);
// If the vector is empty, remove the object ID from the map.
if (get_requests.empty()) {
object_get_requests_.erase(object_request_iter);
}
}
}
}
// Remove the get request.
if (get_request->timer != -1) {
ARROW_CHECK(loop_->RemoveTimer(get_request->timer) == kEventLoopOk);
}
delete get_request;
}
void PlasmaStore::RemoveGetRequestsForClient(Client* client) {
std::unordered_set<GetRequest*> get_requests_to_remove;
for (auto const& pair : object_get_requests_) {
for (GetRequest* get_request : pair.second) {
if (get_request->client == client) {
get_requests_to_remove.insert(get_request);
}
}
}
// It shouldn't be possible for a given client to be in the middle of multiple get
// requests.
ARROW_CHECK(get_requests_to_remove.size() <= 1);
for (GetRequest* get_request : get_requests_to_remove) {
RemoveGetRequest(get_request);
}
}
void PlasmaStore::ReturnFromGet(GetRequest* get_req) {
// Figure out how many file descriptors we need to send.
std::unordered_set<int> fds_to_send;
std::vector<int> store_fds;
std::vector<int64_t> mmap_sizes;
for (const auto& object_id : get_req->object_ids) {
PlasmaObject& object = get_req->objects[object_id];
int fd = object.store_fd;
if (object.data_size != -1 && fds_to_send.count(fd) == 0 && fd != -1) {
fds_to_send.insert(fd);
store_fds.push_back(fd);
mmap_sizes.push_back(GetMmapSize(fd));
}
}
// Send the get reply to the client.
Status s = SendGetReply(get_req->client->fd, &get_req->object_ids[0], get_req->objects,
get_req->object_ids.size(), store_fds, mmap_sizes);
WarnIfSigpipe(s.ok() ? 0 : -1, get_req->client->fd);
// If we successfully sent the get reply message to the client, then also send
// the file descriptors.
if (s.ok()) {
// Send all of the file descriptors for the present objects.
for (int store_fd : store_fds) {
// Only send the file descriptor if it hasn't been sent (see analogous
// logic in GetStoreFd in client.cc).
if (get_req->client->used_fds.find(store_fd) == get_req->client->used_fds.end()) {
WarnIfSigpipe(send_fd(get_req->client->fd, store_fd), get_req->client->fd);
get_req->client->used_fds.insert(store_fd);
}
}
}
// Remove the get request from each of the relevant object_get_requests hash
// tables if it is present there. It should only be present there if the get
// request timed out.
RemoveGetRequest(get_req);
}
void PlasmaStore::UpdateObjectGetRequests(const ObjectID& object_id) {
auto it = object_get_requests_.find(object_id);
// If there are no get requests involving this object, then return.
if (it == object_get_requests_.end()) {
return;
}
auto& get_requests = it->second;
// After finishing the loop below, get_requests and it will have been
// invalidated by the removal of object_id from object_get_requests_.
size_t index = 0;
size_t num_requests = get_requests.size();
for (size_t i = 0; i < num_requests; ++i) {
auto get_req = get_requests[index];
auto entry = GetObjectTableEntry(&store_info_, object_id);
ARROW_CHECK(entry != nullptr);
PlasmaObject_init(&get_req->objects[object_id], entry);
get_req->num_satisfied += 1;
// Record the fact that this client will be using this object and will
// be responsible for releasing this object.
AddToClientObjectIds(object_id, entry, get_req->client);
// If this get request is done, reply to the client.
if (get_req->num_satisfied == get_req->num_objects_to_wait_for) {
ReturnFromGet(get_req);
} else {
// The call to ReturnFromGet will remove the current element in the
// array, so we only increment the counter in the else branch.
index += 1;
}
}
// No get requests should be waiting for this object anymore. The object ID
// may have been removed from the object_get_requests_ by ReturnFromGet, but
// if the get request has not returned yet, then remove the object ID from the
// map here.
it = object_get_requests_.find(object_id);
if (it != object_get_requests_.end()) {
object_get_requests_.erase(object_id);
}
}
void PlasmaStore::ProcessGetRequest(Client* client,
const std::vector<ObjectID>& object_ids,
int64_t timeout_ms) {
// Create a get request for this object.
auto get_req = new GetRequest(client, object_ids);
std::vector<ObjectID> evicted_ids;
std::vector<ObjectTableEntry*> evicted_entries;
for (auto object_id : object_ids) {
// Check if this object is already present locally. If so, record that the
// object is being used and mark it as accounted for.
auto entry = GetObjectTableEntry(&store_info_, object_id);
if (entry && entry->state == ObjectState::PLASMA_SEALED) {
// Update the get request to take into account the present object.
PlasmaObject_init(&get_req->objects[object_id], entry);
get_req->num_satisfied += 1;
// If necessary, record that this client is using this object. In the case
// where entry == NULL, this will be called from SealObject.
AddToClientObjectIds(object_id, entry, client);
} else if (entry && entry->state == ObjectState::PLASMA_EVICTED) {
// Make sure the object pointer is not already allocated
ARROW_CHECK(!entry->pointer);
entry->pointer =
AllocateMemory(entry->data_size + entry->metadata_size, /*evict=*/true,
&entry->fd, &entry->map_size, &entry->offset, client, false);
if (entry->pointer) {
entry->state = ObjectState::PLASMA_CREATED;
entry->create_time = std::time(nullptr);
eviction_policy_.ObjectCreated(object_id, client, false);
AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client);
evicted_ids.push_back(object_id);
evicted_entries.push_back(entry);
} else {
// We are out of memory and cannot allocate memory for this object.
// Change the state of the object back to PLASMA_EVICTED so some
// other request can try again.
entry->state = ObjectState::PLASMA_EVICTED;
}
} else {
// Add a placeholder plasma object to the get request to indicate that the
// object is not present. This will be parsed by the client. We set the
// data size to -1 to indicate that the object is not present.
get_req->objects[object_id].data_size = -1;
// Add the get request to the relevant data structures.
object_get_requests_[object_id].push_back(get_req);
}
}
if (!evicted_ids.empty()) {
unsigned char digest[kDigestSize] = {};
std::vector<std::shared_ptr<Buffer>> buffers;
for (size_t i = 0; i < evicted_ids.size(); ++i) {
ARROW_CHECK(evicted_entries[i]->pointer != nullptr);
buffers.emplace_back(new arrow::MutableBuffer(evicted_entries[i]->pointer,
evicted_entries[i]->data_size));
}
if (external_store_->Get(evicted_ids, buffers).ok()) {
for (size_t i = 0; i < evicted_ids.size(); ++i) {
evicted_entries[i]->state = ObjectState::PLASMA_SEALED;
std::memcpy(&evicted_entries[i]->digest[0], &digest[0], kDigestSize);
evicted_entries[i]->construct_duration =
std::time(nullptr) - evicted_entries[i]->create_time;
PlasmaObject_init(&get_req->objects[evicted_ids[i]], evicted_entries[i]);
get_req->num_satisfied += 1;
}
} else {
// We tried to get the objects from the external store, but could not get them.
// Set the state of these objects back to PLASMA_EVICTED so some other request
// can try again.
for (size_t i = 0; i < evicted_ids.size(); ++i) {
evicted_entries[i]->state = ObjectState::PLASMA_EVICTED;
}
}
}
// If all of the objects are present already or if the timeout is 0, return to
// the client.
if (get_req->num_satisfied == get_req->num_objects_to_wait_for || timeout_ms == 0) {
ReturnFromGet(get_req);
} else if (timeout_ms != -1) {
// Set a timer that will cause the get request to return to the client. Note
// that a timeout of -1 is used to indicate that no timer should be set.
get_req->timer = loop_->AddTimer(timeout_ms, [this, get_req](int64_t timer_id) {
ReturnFromGet(get_req);
return kEventLoopTimerDone;
});
}
}
int PlasmaStore::RemoveFromClientObjectIds(const ObjectID& object_id,
ObjectTableEntry* entry, Client* client) {
auto it = client->object_ids.find(object_id);
if (it != client->object_ids.end()) {
client->object_ids.erase(it);
// Decrease reference count.
entry->ref_count--;
// If no more clients are using this object, notify the eviction policy
// that the object is no longer being used.
if (entry->ref_count == 0) {
if (deletion_cache_.count(object_id) == 0) {
// Tell the eviction policy that this object is no longer being used.
eviction_policy_.EndObjectAccess(object_id);
} else {
// Above code does not really delete an object. Instead, it just put an
// object to LRU cache which will be cleaned when the memory is not enough.
deletion_cache_.erase(object_id);
EvictObjects({object_id});
}
}
// Return 1 to indicate that the client was removed.
return 1;
} else {
// Return 0 to indicate that the client was not removed.
return 0;
}
}
void PlasmaStore::EraseFromObjectTable(const ObjectID& object_id) {
auto& object = store_info_.objects[object_id];
auto buff_size = object->data_size + object->metadata_size;
if (object->device_num == 0) {
PlasmaAllocator::Free(object->pointer, buff_size);
} else {
#ifdef PLASMA_CUDA
ARROW_CHECK_OK(FreeCudaMemory(object->device_num, buff_size, object->pointer));
#endif
}
store_info_.objects.erase(object_id);
}
void PlasmaStore::ReleaseObject(const ObjectID& object_id, Client* client) {
auto entry = GetObjectTableEntry(&store_info_, object_id);
ARROW_CHECK(entry != nullptr);
// Remove the client from the object's array of clients.
ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1);
}
// Check if an object is present.
ObjectStatus PlasmaStore::ContainsObject(const ObjectID& object_id) {
auto entry = GetObjectTableEntry(&store_info_, object_id);
return entry && (entry->state == ObjectState::PLASMA_SEALED ||
entry->state == ObjectState::PLASMA_EVICTED)
? ObjectStatus::OBJECT_FOUND
: ObjectStatus::OBJECT_NOT_FOUND;
}
void PlasmaStore::SealObjects(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& digests) {
std::vector<ObjectInfoT> infos;
ARROW_LOG(DEBUG) << "sealing " << object_ids.size() << " objects";
for (size_t i = 0; i < object_ids.size(); ++i) {
ObjectInfoT object_info;
auto entry = GetObjectTableEntry(&store_info_, object_ids[i]);
ARROW_CHECK(entry != nullptr);
ARROW_CHECK(entry->state == ObjectState::PLASMA_CREATED);
// Set the state of object to SEALED.
entry->state = ObjectState::PLASMA_SEALED;
// Set the object digest.
std::memcpy(&entry->digest[0], digests[i].c_str(), kDigestSize);
// Set object construction duration.
entry->construct_duration = std::time(nullptr) - entry->create_time;
object_info.object_id = object_ids[i].binary();
object_info.data_size = entry->data_size;
object_info.metadata_size = entry->metadata_size;
object_info.digest = digests[i];
infos.push_back(object_info);
}
PushNotifications(infos);
for (size_t i = 0; i < object_ids.size(); ++i) {
UpdateObjectGetRequests(object_ids[i]);
}
}
int PlasmaStore::AbortObject(const ObjectID& object_id, Client* client) {
auto entry = GetObjectTableEntry(&store_info_, object_id);
ARROW_CHECK(entry != nullptr) << "To abort an object it must be in the object table.";
ARROW_CHECK(entry->state != ObjectState::PLASMA_SEALED)
<< "To abort an object it must not have been sealed.";
auto it = client->object_ids.find(object_id);
if (it == client->object_ids.end()) {
// If the client requesting the abort is not the creator, do not
// perform the abort.
return 0;
} else {
// The client requesting the abort is the creator. Free the object.
EraseFromObjectTable(object_id);
client->object_ids.erase(it);
return 1;
}
}
PlasmaError PlasmaStore::DeleteObject(ObjectID& object_id) {
auto entry = GetObjectTableEntry(&store_info_, object_id);
// TODO(rkn): This should probably not fail, but should instead throw an
// error. Maybe we should also support deleting objects that have been
// created but not sealed.
if (entry == nullptr) {
// To delete an object it must be in the object table.
return PlasmaError::ObjectNotFound;
}
if (entry->state != ObjectState::PLASMA_SEALED) {
// To delete an object it must have been sealed.
// Put it into deletion cache, it will be deleted later.
deletion_cache_.emplace(object_id);
return PlasmaError::ObjectNotSealed;
}
if (entry->ref_count != 0) {
// To delete an object, there must be no clients currently using it.
// Put it into deletion cache, it will be deleted later.
deletion_cache_.emplace(object_id);
return PlasmaError::ObjectInUse;
}
eviction_policy_.RemoveObject(object_id);
EraseFromObjectTable(object_id);
// Inform all subscribers that the object has been deleted.
fb::ObjectInfoT notification;
notification.object_id = object_id.binary();
notification.is_deletion = true;
PushNotification(&notification);
return PlasmaError::OK;
}
void PlasmaStore::EvictObjects(const std::vector<ObjectID>& object_ids) {
if (object_ids.size() == 0) {
return;
}
std::vector<std::shared_ptr<arrow::Buffer>> evicted_object_data;
std::vector<ObjectTableEntry*> evicted_entries;
for (const auto& object_id : object_ids) {
ARROW_LOG(DEBUG) << "evicting object " << object_id.hex();
auto entry = GetObjectTableEntry(&store_info_, object_id);
// TODO(rkn): This should probably not fail, but should instead throw an
// error. Maybe we should also support deleting objects that have been
// created but not sealed.
ARROW_CHECK(entry != nullptr) << "To evict an object it must be in the object table.";
ARROW_CHECK(entry->state == ObjectState::PLASMA_SEALED)
<< "To evict an object it must have been sealed.";
ARROW_CHECK(entry->ref_count == 0)
<< "To evict an object, there must be no clients currently using it.";
// If there is a backing external store, then mark object for eviction to
// external store, free the object data pointer and keep a placeholder
// entry in ObjectTable
if (external_store_) {
evicted_object_data.push_back(std::make_shared<arrow::Buffer>(
entry->pointer, entry->data_size + entry->metadata_size));
evicted_entries.push_back(entry);
} else {
// If there is no backing external store, just erase the object entry
// and send a deletion notification.
EraseFromObjectTable(object_id);
// Inform all subscribers that the object has been deleted.
fb::ObjectInfoT notification;
notification.object_id = object_id.binary();
notification.is_deletion = true;
PushNotification(&notification);
}
}
if (external_store_ && !object_ids.empty()) {
ARROW_CHECK_OK(external_store_->Put(object_ids, evicted_object_data));
for (auto entry : evicted_entries) {
PlasmaAllocator::Free(entry->pointer, entry->data_size + entry->metadata_size);
entry->pointer = nullptr;
entry->state = ObjectState::PLASMA_EVICTED;
}
}
}
void PlasmaStore::ConnectClient(int listener_sock) {
int client_fd = AcceptClient(listener_sock);
Client* client = new Client(client_fd);
connected_clients_[client_fd] = std::unique_ptr<Client>(client);
// Add a callback to handle events on this socket.
// TODO(pcm): Check return value.
loop_->AddFileEvent(client_fd, kEventLoopRead, [this, client](int events) {
Status s = ProcessMessage(client);
if (!s.ok()) {
ARROW_LOG(FATAL) << "Failed to process file event: " << s;
}
});
ARROW_LOG(DEBUG) << "New connection with fd " << client_fd;
}
void PlasmaStore::DisconnectClient(int client_fd) {
ARROW_CHECK(client_fd > 0);
auto it = connected_clients_.find(client_fd);
ARROW_CHECK(it != connected_clients_.end());
loop_->RemoveFileEvent(client_fd);
// Close the socket.
close(client_fd);
ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd;
// Release all the objects that the client was using.
auto client = it->second.get();
eviction_policy_.ClientDisconnected(client);
std::unordered_map<ObjectID, ObjectTableEntry*> sealed_objects;
for (const auto& object_id : client->object_ids) {
auto it = store_info_.objects.find(object_id);
if (it == store_info_.objects.end()) {
continue;
}
if (it->second->state == ObjectState::PLASMA_SEALED) {
// Add sealed objects to a temporary list of object IDs. Do not perform
// the remove here, since it potentially modifies the object_ids table.
sealed_objects[it->first] = it->second.get();
} else {
// Abort unsealed object.
// Don't call AbortObject() because client->object_ids would be modified.
EraseFromObjectTable(object_id);
}
}
/// Remove all of the client's GetRequests.
RemoveGetRequestsForClient(client);
for (const auto& entry : sealed_objects) {
RemoveFromClientObjectIds(entry.first, entry.second, client);
}
if (client->notification_fd > 0) {
// This client has subscribed for notifications.
auto notify_fd = client->notification_fd;
loop_->RemoveFileEvent(notify_fd);
// Close socket.
close(notify_fd);
// Remove notification queue for this fd from global map.
pending_notifications_.erase(notify_fd);
// Reset fd.
client->notification_fd = -1;
}
connected_clients_.erase(it);
}
/// Send notifications about sealed objects to the subscribers. This is called
/// in SealObject. If the socket's send buffer is full, the notification will
/// be buffered, and this will be called again when the send buffer has room.
/// Since we call erase on pending_notifications_, all iterators get
/// invalidated, which is why we return a valid iterator to the next client to
/// be used in PushNotification.
///
/// \param it Iterator that points to the client to send the notification to.
/// \return Iterator pointing to the next client.
PlasmaStore::NotificationMap::iterator PlasmaStore::SendNotifications(
PlasmaStore::NotificationMap::iterator it) {
int client_fd = it->first;
auto& notifications = it->second.object_notifications;
int num_processed = 0;
bool closed = false;
// Loop over the array of pending notifications and send as many of them as
// possible.
for (size_t i = 0; i < notifications.size(); ++i) {
auto& notification = notifications.at(i);
// Decode the length, which is the first bytes of the message.
int64_t size = *(reinterpret_cast<int64_t*>(notification.get()));
// Attempt to send a notification about this object ID.
ssize_t nbytes = send(client_fd, notification.get(), sizeof(int64_t) + size, 0);
if (nbytes >= 0) {
ARROW_CHECK(nbytes == static_cast<ssize_t>(sizeof(int64_t)) + size);
} else if (nbytes == -1 &&
(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
ARROW_LOG(DEBUG) << "The socket's send buffer is full, so we are caching this "
"notification and will send it later.";
// Add a callback to the event loop to send queued notifications whenever
// there is room in the socket's send buffer. Callbacks can be added
// more than once here and will be overwritten. The callback is removed
// at the end of the method.
// TODO(pcm): Introduce status codes and check in case the file descriptor
// is added twice.
loop_->AddFileEvent(client_fd, kEventLoopWrite, [this, client_fd](int events) {
SendNotifications(pending_notifications_.find(client_fd));
});
break;
} else {
ARROW_LOG(WARNING) << "Failed to send notification to client on fd " << client_fd;
if (errno == EPIPE) {
closed = true;
break;
}
}
num_processed += 1;
}
// Remove the sent notifications from the array.
notifications.erase(notifications.begin(), notifications.begin() + num_processed);
// If we have sent all notifications, remove the fd from the event loop.
if (notifications.empty()) {
loop_->RemoveFileEvent(client_fd);
}
// Stop sending notifications if the pipe was broken.
if (closed) {
close(client_fd);
return pending_notifications_.erase(it);
} else {
return ++it;
}
}
void PlasmaStore::PushNotification(fb::ObjectInfoT* object_info) {
auto it = pending_notifications_.begin();
while (it != pending_notifications_.end()) {
std::vector<fb::ObjectInfoT> info;
info.push_back(*object_info);
auto notification = CreatePlasmaNotificationBuffer(info);
it->second.object_notifications.emplace_back(std::move(notification));
it = SendNotifications(it);
}
}
void PlasmaStore::PushNotifications(std::vector<fb::ObjectInfoT>& object_info) {
auto it = pending_notifications_.begin();
while (it != pending_notifications_.end()) {
auto notifications = CreatePlasmaNotificationBuffer(object_info);
it->second.object_notifications.emplace_back(std::move(notifications));
it = SendNotifications(it);
}
}
void PlasmaStore::PushNotification(fb::ObjectInfoT* object_info, int client_fd) {
auto it = pending_notifications_.find(client_fd);
if (it != pending_notifications_.end()) {
std::vector<fb::ObjectInfoT> info;
info.push_back(*object_info);
auto notification = CreatePlasmaNotificationBuffer(info);
it->second.object_notifications.emplace_back(std::move(notification));
SendNotifications(it);
}
}
// Subscribe to notifications about sealed objects.
void PlasmaStore::SubscribeToUpdates(Client* client) {
ARROW_LOG(DEBUG) << "subscribing to updates on fd " << client->fd;
if (client->notification_fd > 0) {
// This client has already subscribed. Return.
return;
}
// TODO(rkn): The store could block here if the client doesn't send a file
// descriptor.
int fd = recv_fd(client->fd);
if (fd < 0) {
// This may mean that the client died before sending the file descriptor.
ARROW_LOG(WARNING) << "Failed to receive file descriptor from client on fd "
<< client->fd << ".";
return;
}
// Add this fd to global map, which is needed for this client to receive notifications.
pending_notifications_[fd];
client->notification_fd = fd;
// Push notifications to the new subscriber about existing sealed objects.
for (const auto& entry : store_info_.objects) {
if (entry.second->state == ObjectState::PLASMA_SEALED) {
ObjectInfoT info;
info.object_id = entry.first.binary();
info.data_size = entry.second->data_size;
info.metadata_size = entry.second->metadata_size;
info.digest =
std::string(reinterpret_cast<char*>(&entry.second->digest[0]), kDigestSize);
PushNotification(&info, fd);
}
}
}
Status PlasmaStore::ProcessMessage(Client* client) {
fb::MessageType type;
Status s = ReadMessage(client->fd, &type, &input_buffer_);
ARROW_CHECK(s.ok() || s.IsIOError());
uint8_t* input = input_buffer_.data();
size_t input_size = input_buffer_.size();
ObjectID object_id;
PlasmaObject object = {};
// Process the different types of requests.
switch (type) {
case fb::MessageType::PlasmaCreateRequest: {
bool evict_if_full;
int64_t data_size;
int64_t metadata_size;
int device_num;
RETURN_NOT_OK(ReadCreateRequest(input, input_size, &object_id, &evict_if_full,
&data_size, &metadata_size, &device_num));
PlasmaError error_code = CreateObject(object_id, evict_if_full, data_size,
metadata_size, device_num, client, &object);
int64_t mmap_size = 0;
if (error_code == PlasmaError::OK && device_num == 0) {
mmap_size = GetMmapSize(object.store_fd);
}
HANDLE_SIGPIPE(
SendCreateReply(client->fd, object_id, &object, error_code, mmap_size),
client->fd);
// Only send the file descriptor if it hasn't been sent (see analogous
// logic in GetStoreFd in client.cc). Similar in ReturnFromGet.
if (error_code == PlasmaError::OK && device_num == 0 &&
client->used_fds.find(object.store_fd) == client->used_fds.end()) {
WarnIfSigpipe(send_fd(client->fd, object.store_fd), client->fd);
client->used_fds.insert(object.store_fd);
}
} break;
case fb::MessageType::PlasmaCreateAndSealRequest: {
bool evict_if_full;
std::string data;
std::string metadata;
std::string digest;
digest.reserve(kDigestSize);
RETURN_NOT_OK(ReadCreateAndSealRequest(input, input_size, &object_id,
&evict_if_full, &data, &metadata, &digest));
// CreateAndSeal currently only supports device_num = 0, which corresponds
// to the host.
int device_num = 0;
PlasmaError error_code = CreateObject(object_id, evict_if_full, data.size(),
metadata.size(), device_num, client, &object);
// If the object was successfully created, fill out the object data and seal it.
if (error_code == PlasmaError::OK) {
auto entry = GetObjectTableEntry(&store_info_, object_id);
ARROW_CHECK(entry != nullptr);
// Write the inlined data and metadata into the allocated object.
std::memcpy(entry->pointer, data.data(), data.size());
std::memcpy(entry->pointer + data.size(), metadata.data(), metadata.size());
SealObjects({object_id}, {digest});
// Remove the client from the object's array of clients because the
// object is not being used by any client. The client was added to the
// object's array of clients in CreateObject. This is analogous to the
// Release call that happens in the client's Seal method.
ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1);
}
// Reply to the client.
HANDLE_SIGPIPE(SendCreateAndSealReply(client->fd, error_code), client->fd);
} break;
case fb::MessageType::PlasmaCreateAndSealBatchRequest: {
bool evict_if_full;
std::vector<ObjectID> object_ids;
std::vector<std::string> data;
std::vector<std::string> metadata;
std::vector<std::string> digests;
RETURN_NOT_OK(ReadCreateAndSealBatchRequest(
input, input_size, &object_ids, &evict_if_full, &data, &metadata, &digests));
// CreateAndSeal currently only supports device_num = 0, which corresponds
// to the host.
int device_num = 0;
size_t i = 0;
PlasmaError error_code = PlasmaError::OK;
for (i = 0; i < object_ids.size(); i++) {
error_code = CreateObject(object_ids[i], evict_if_full, data[i].size(),
metadata[i].size(), device_num, client, &object);
if (error_code != PlasmaError::OK) {
break;
}
}
// if OK, seal all the objects,
// if error, abort the previous i objects immediately
if (error_code == PlasmaError::OK) {
for (i = 0; i < object_ids.size(); i++) {
auto entry = GetObjectTableEntry(&store_info_, object_ids[i]);
ARROW_CHECK(entry != nullptr);
// Write the inlined data and metadata into the allocated object.
std::memcpy(entry->pointer, data[i].data(), data[i].size());
std::memcpy(entry->pointer + data[i].size(), metadata[i].data(),
metadata[i].size());
}
SealObjects(object_ids, digests);
// Remove the client from the object's array of clients because the
// object is not being used by any client. The client was added to the
// object's array of clients in CreateObject. This is analogous to the
// Release call that happens in the client's Seal method.
for (i = 0; i < object_ids.size(); i++) {
auto entry = GetObjectTableEntry(&store_info_, object_ids[i]);
ARROW_CHECK(RemoveFromClientObjectIds(object_ids[i], entry, client) == 1);
}
} else {
for (size_t j = 0; j < i; j++) {
AbortObject(object_ids[j], client);
}
}
HANDLE_SIGPIPE(SendCreateAndSealBatchReply(client->fd, error_code), client->fd);
} break;
case fb::MessageType::PlasmaAbortRequest: {
RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id));
ARROW_CHECK(AbortObject(object_id, client) == 1) << "To abort an object, the only "
"client currently using it "
"must be the creator.";
HANDLE_SIGPIPE(SendAbortReply(client->fd, object_id), client->fd);
} break;
case fb::MessageType::PlasmaGetRequest: {
std::vector<ObjectID> object_ids_to_get;
int64_t timeout_ms;
RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get, &timeout_ms));
ProcessGetRequest(client, object_ids_to_get, timeout_ms);
} break;
case fb::MessageType::PlasmaReleaseRequest: {
RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id));
ReleaseObject(object_id, client);
} break;
case fb::MessageType::PlasmaDeleteRequest: {
std::vector<ObjectID> object_ids;
std::vector<PlasmaError> error_codes;
RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_ids));
error_codes.reserve(object_ids.size());
for (auto& object_id : object_ids) {
error_codes.push_back(DeleteObject(object_id));
}
HANDLE_SIGPIPE(SendDeleteReply(client->fd, object_ids, error_codes), client->fd);
} break;
case fb::MessageType::PlasmaContainsRequest: {
RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id));
if (ContainsObject(object_id) == ObjectStatus::OBJECT_FOUND) {
HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 1), client->fd);
} else {
HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd);
}
} break;
case fb::MessageType::PlasmaListRequest: {
RETURN_NOT_OK(ReadListRequest(input, input_size));
HANDLE_SIGPIPE(SendListReply(client->fd, store_info_.objects), client->fd);
} break;
case fb::MessageType::PlasmaSealRequest: {
std::string digest;
RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest));
SealObjects({object_id}, {digest});
HANDLE_SIGPIPE(SendSealReply(client->fd, object_id, PlasmaError::OK), client->fd);
} break;
case fb::MessageType::PlasmaEvictRequest: {
// This code path should only be used for testing.
int64_t num_bytes;
RETURN_NOT_OK(ReadEvictRequest(input, input_size, &num_bytes));
std::vector<ObjectID> objects_to_evict;
int64_t num_bytes_evicted =
eviction_policy_.ChooseObjectsToEvict(num_bytes, &objects_to_evict);
EvictObjects(objects_to_evict);
HANDLE_SIGPIPE(SendEvictReply(client->fd, num_bytes_evicted), client->fd);
} break;
case fb::MessageType::PlasmaRefreshLRURequest: {
std::vector<ObjectID> object_ids;
RETURN_NOT_OK(ReadRefreshLRURequest(input, input_size, &object_ids));
eviction_policy_.RefreshObjects(object_ids);
HANDLE_SIGPIPE(SendRefreshLRUReply(client->fd), client->fd);
} break;
case fb::MessageType::PlasmaSubscribeRequest:
SubscribeToUpdates(client);
break;
case fb::MessageType::PlasmaConnectRequest: {
HANDLE_SIGPIPE(SendConnectReply(client->fd, PlasmaAllocator::GetFootprintLimit()),
client->fd);
} break;
case fb::MessageType::PlasmaDisconnectClient:
ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd;
DisconnectClient(client->fd);
break;
case fb::MessageType::PlasmaSetOptionsRequest: {
std::string client_name;
int64_t output_memory_quota;
RETURN_NOT_OK(
ReadSetOptionsRequest(input, input_size, &client_name, &output_memory_quota));
client->name = client_name;
bool success = eviction_policy_.SetClientQuota(client, output_memory_quota);
HANDLE_SIGPIPE(SendSetOptionsReply(client->fd, success ? PlasmaError::OK
: PlasmaError::OutOfMemory),
client->fd);
} break;
case fb::MessageType::PlasmaGetDebugStringRequest: {
HANDLE_SIGPIPE(SendGetDebugStringReply(client->fd, eviction_policy_.DebugString()),
client->fd);
} break;
default:
// This code should be unreachable.
ARROW_CHECK(0);
}
return Status::OK();
}
class PlasmaStoreRunner {
public:
PlasmaStoreRunner() {}
void Start(char* socket_name, std::string directory, bool hugepages_enabled,
std::shared_ptr<ExternalStore> external_store) {
// Create the event loop.
loop_.reset(new EventLoop);
store_.reset(new PlasmaStore(loop_.get(), directory, hugepages_enabled, socket_name,
external_store));
plasma_config = store_->GetPlasmaStoreInfo();
// We are using a single memory-mapped file by mallocing and freeing a single
// large amount of space up front. According to the documentation,
// dlmalloc might need up to 128*sizeof(size_t) bytes for internal
// bookkeeping.
void* pointer = plasma::PlasmaAllocator::Memalign(
kBlockSize, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t));
ARROW_CHECK(pointer != nullptr);
// This will unmap the file, but the next one created will be as large
// as this one (this is an implementation detail of dlmalloc).
plasma::PlasmaAllocator::Free(
pointer, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t));
int socket = BindIpcSock(socket_name, true);
// TODO(pcm): Check return value.
ARROW_CHECK(socket >= 0);
loop_->AddFileEvent(socket, kEventLoopRead, [this, socket](int events) {
this->store_->ConnectClient(socket);
});
loop_->Start();
}
void Stop() { loop_->Stop(); }
void Shutdown() {
loop_->Shutdown();
loop_ = nullptr;
store_ = nullptr;
}
private:
std::unique_ptr<EventLoop> loop_;
std::unique_ptr<PlasmaStore> store_;
};
static std::unique_ptr<PlasmaStoreRunner> g_runner = nullptr;
void HandleSignal(int signal) {
if (signal == SIGTERM) {
ARROW_LOG(INFO) << "SIGTERM Signal received, closing Plasma Server...";
if (g_runner != nullptr) {
g_runner->Stop();
}
}
}
void StartServer(char* socket_name, std::string plasma_directory, bool hugepages_enabled,
std::shared_ptr<ExternalStore> external_store) {
// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
// to a client that has already died, the store could die.
signal(SIGPIPE, SIG_IGN);
g_runner.reset(new PlasmaStoreRunner());
signal(SIGTERM, HandleSignal);
g_runner->Start(socket_name, plasma_directory, hugepages_enabled, external_store);
}
// Function to use (instead of ARROW_LOG(FATAL)) for usage, etc. errors before
// the main server loop starts, so users don't get a backtrace if they
// simply forgot a command-line switch.
void ExitWithUsageError(const char* error_msg) {
std::cerr << gflags::ProgramInvocationShortName() << ": " << error_msg << std::endl;
exit(1);
}
} // namespace plasma
#ifdef __linux__
#define SHM_DEFAULT_PATH "/dev/shm"
#else
#define SHM_DEFAULT_PATH "/tmp"
#endif
// Command-line flags.
DEFINE_string(d, SHM_DEFAULT_PATH, "directory where to create the memory-backed file");
DEFINE_string(e, "",
"endpoint for external storage service, where objects "
"evicted from Plasma store can be written to, optional");
DEFINE_bool(h, false, "whether to enable hugepage support");
DEFINE_string(s, "",
"socket name where the Plasma store will listen for requests, required");
DEFINE_string(m, "", "amount of memory in bytes to use for Plasma store, required");
int main(int argc, char* argv[]) {
ArrowLog::StartArrowLog(argv[0], ArrowLogLevel::ARROW_INFO);
ArrowLog::InstallFailureSignalHandler();
gflags::SetUsageMessage("Shared-memory server for Arrow data.\nUsage: ");
gflags::SetVersionString(ARROW_VERSION_STRING);
char* socket_name = nullptr;
// Directory where plasma memory mapped files are stored.
std::string plasma_directory;
std::string external_store_endpoint;
bool hugepages_enabled = false;
int64_t system_memory = -1;
gflags::ParseCommandLineFlags(&argc, &argv, /*remove_flags=*/true);
plasma_directory = FLAGS_d;
external_store_endpoint = FLAGS_e;
hugepages_enabled = FLAGS_h;
if (!FLAGS_s.empty()) {
// We only check below if socket_name is null, so don't set it if the flag was empty.
socket_name = const_cast<char*>(FLAGS_s.c_str());
}
if (!FLAGS_m.empty()) {
char extra;
int scanned = sscanf(FLAGS_m.c_str(), "%" SCNd64 "%c", &system_memory, &extra);
if (scanned != 1) {
plasma::ExitWithUsageError(
"-m switch takes memory in bytes, with no letter suffix allowed");
}
// Set system memory capacity
plasma::PlasmaAllocator::SetFootprintLimit(static_cast<size_t>(system_memory));
ARROW_LOG(INFO) << "Allowing the Plasma store to use up to "
<< static_cast<double>(system_memory) / 1000000000 << "GB of memory.";
}
// Sanity check command line options.
if (socket_name == nullptr && system_memory == -1) {
// Nicer error message for the case where the user ran the program without
// any of the required command-line switches.
plasma::ExitWithUsageError(
"please specify socket for incoming connections with -s, "
"and the amount of memory (in bytes) to use with -m");
} else if (socket_name == nullptr) {
plasma::ExitWithUsageError("please specify socket for incoming connections with -s");
} else if (system_memory == -1) {
plasma::ExitWithUsageError(
"please specify the amount of memory (in bytes) to use with -m");
}
if (hugepages_enabled && plasma_directory.empty()) {
plasma::ExitWithUsageError(
"if you want to use hugepages, please specify path to huge pages "
"filesystem with -d");
}
ARROW_CHECK(!plasma_directory.empty());
ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory
<< " and huge page support "
<< (hugepages_enabled ? "enabled" : "disabled");
#ifdef __linux__
if (!hugepages_enabled) {
// On Linux, check that the amount of memory available in /dev/shm is large
// enough to accommodate the request. If it isn't, then fail.
int shm_fd = open(plasma_directory.c_str(), O_RDONLY);
struct statvfs shm_vfs_stats;
fstatvfs(shm_fd, &shm_vfs_stats);
// The value shm_vfs_stats.f_bsize is the block size, and the value
// shm_vfs_stats.f_bavail is the number of available blocks.
int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail;
close(shm_fd);
// Keep some safety margin for allocator fragmentation.
shm_mem_avail = 9 * shm_mem_avail / 10;
if (system_memory > shm_mem_avail) {
ARROW_LOG(WARNING)
<< "System memory request exceeds memory available in " << plasma_directory
<< ". The request is for " << system_memory
<< " bytes, and the amount available is " << shm_mem_avail
<< " bytes. You may be able to free up space by deleting files in "
"/dev/shm. If you are inside a Docker container, you may need to "
"pass an argument with the flag '--shm-size' to 'docker run'.";
system_memory = shm_mem_avail;
}
} else {
plasma::SetMallocGranularity(1024 * 1024 * 1024); // 1 GiB
}
#endif
// Get external store
std::shared_ptr<plasma::ExternalStore> external_store{nullptr};
if (!external_store_endpoint.empty()) {
std::string name;
ARROW_CHECK_OK(
plasma::ExternalStores::ExtractStoreName(external_store_endpoint, &name));
external_store = plasma::ExternalStores::GetStore(name);
if (external_store == nullptr) {
std::ostringstream error_msg;
error_msg << "no such external store \"" << name << "\"";
plasma::ExitWithUsageError(error_msg.str().c_str());
}
ARROW_LOG(DEBUG) << "connecting to external store...";
ARROW_CHECK_OK(external_store->Connect(external_store_endpoint));
}
ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
plasma::StartServer(socket_name, plasma_directory, hugepages_enabled, external_store);
plasma::g_runner->Shutdown();
plasma::g_runner = nullptr;
ArrowLog::UninstallSignalAction();
ArrowLog::ShutDownArrowLog();
return 0;
}