| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| // PLASMA STORE: This is a simple object store server process |
| // |
| // It accepts incoming client connections on a unix domain socket |
| // (name passed in via the -s option of the executable) and uses a |
| // single thread to serve the clients. Each client establishes a |
| // connection and can create objects, wait for objects and seal |
| // objects through that connection. |
| // |
| // It keeps a hash table that maps object_ids (which are 20 byte long, |
| // just enough to store and SHA1 hash) to memory mapped files. |
| |
| #include "plasma/store.h" |
| |
| #include <assert.h> |
| #include <fcntl.h> |
| #include <getopt.h> |
| #include <limits.h> |
| #include <signal.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <sys/ioctl.h> |
| #include <sys/socket.h> |
| #include <sys/statvfs.h> |
| #include <sys/types.h> |
| #include <sys/un.h> |
| #include <unistd.h> |
| |
| #include <ctime> |
| #include <deque> |
| #include <iostream> |
| #include <memory> |
| #include <sstream> |
| #include <string> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| |
| #include "arrow/status.h" |
| #include "arrow/util/config.h" |
| |
| #include "plasma/common.h" |
| #include "plasma/common_generated.h" |
| #include "plasma/fling.h" |
| #include "plasma/io.h" |
| #include "plasma/malloc.h" |
| #include "plasma/plasma_allocator.h" |
| #include "plasma/protocol.h" |
| |
| #ifdef PLASMA_CUDA |
| #include "arrow/gpu/cuda_api.h" |
| |
| using arrow::cuda::CudaBuffer; |
| using arrow::cuda::CudaContext; |
| using arrow::cuda::CudaDeviceManager; |
| #endif |
| |
| using arrow::util::ArrowLog; |
| using arrow::util::ArrowLogLevel; |
| |
| namespace fb = plasma::flatbuf; |
| |
| namespace plasma { |
| |
| void SetMallocGranularity(int value); |
| |
| struct GetRequest { |
| GetRequest(Client* client, const std::vector<ObjectID>& object_ids); |
| /// The client that called get. |
| Client* client; |
| /// The ID of the timer that will time out and cause this wait to return to |
| /// the client if it hasn't already returned. |
| int64_t timer; |
| /// The object IDs involved in this request. This is used in the reply. |
| std::vector<ObjectID> object_ids; |
| /// The object information for the objects in this request. This is used in |
| /// the reply. |
| std::unordered_map<ObjectID, PlasmaObject> objects; |
| /// The minimum number of objects to wait for in this request. |
| int64_t num_objects_to_wait_for; |
| /// The number of object requests in this wait request that are already |
| /// satisfied. |
| int64_t num_satisfied; |
| }; |
| |
| GetRequest::GetRequest(Client* client, const std::vector<ObjectID>& object_ids) |
| : client(client), |
| timer(-1), |
| object_ids(object_ids.begin(), object_ids.end()), |
| objects(object_ids.size()), |
| num_satisfied(0) { |
| std::unordered_set<ObjectID> unique_ids(object_ids.begin(), object_ids.end()); |
| num_objects_to_wait_for = unique_ids.size(); |
| } |
| |
| Client::Client(int fd) : fd(fd), notification_fd(-1) {} |
| |
| PlasmaStore::PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_enabled, |
| const std::string& socket_name, |
| std::shared_ptr<ExternalStore> external_store) |
| : loop_(loop), |
| eviction_policy_(&store_info_, PlasmaAllocator::GetFootprintLimit()), |
| external_store_(external_store) { |
| store_info_.directory = directory; |
| store_info_.hugepages_enabled = hugepages_enabled; |
| } |
| |
| // TODO(pcm): Get rid of this destructor by using RAII to clean up data. |
| PlasmaStore::~PlasmaStore() {} |
| |
| const PlasmaStoreInfo* PlasmaStore::GetPlasmaStoreInfo() { return &store_info_; } |
| |
| // If this client is not already using the object, add the client to the |
| // object's list of clients, otherwise do nothing. |
| void PlasmaStore::AddToClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry, |
| Client* client) { |
| // Check if this client is already using the object. |
| if (client->object_ids.find(object_id) != client->object_ids.end()) { |
| return; |
| } |
| // If there are no other clients using this object, notify the eviction policy |
| // that the object is being used. |
| if (entry->ref_count == 0) { |
| // Tell the eviction policy that this object is being used. |
| eviction_policy_.BeginObjectAccess(object_id); |
| } |
| // Increase reference count. |
| entry->ref_count++; |
| |
| // Add object id to the list of object ids that this client is using. |
| client->object_ids.insert(object_id); |
| } |
| |
| // Allocate memory |
| uint8_t* PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, int* fd, |
| int64_t* map_size, ptrdiff_t* offset, Client* client, |
| bool is_create) { |
| // First free up space from the client's LRU queue if quota enforcement is on. |
| if (evict_if_full) { |
| std::vector<ObjectID> client_objects_to_evict; |
| bool quota_ok = eviction_policy_.EnforcePerClientQuota(client, size, is_create, |
| &client_objects_to_evict); |
| if (!quota_ok) { |
| return nullptr; |
| } |
| EvictObjects(client_objects_to_evict); |
| } |
| |
| // Try to evict objects until there is enough space. |
| uint8_t* pointer = nullptr; |
| while (true) { |
| // Allocate space for the new object. We use memalign instead of malloc |
| // in order to align the allocated region to a 64-byte boundary. This is not |
| // strictly necessary, but it is an optimization that could speed up the |
| // computation of a hash of the data (see compute_object_hash_parallel in |
| // plasma_client.cc). Note that even though this pointer is 64-byte aligned, |
| // it is not guaranteed that the corresponding pointer in the client will be |
| // 64-byte aligned, but in practice it often will be. |
| pointer = reinterpret_cast<uint8_t*>(PlasmaAllocator::Memalign(kBlockSize, size)); |
| if (pointer || !evict_if_full) { |
| // If we manage to allocate the memory, return the pointer. If we cannot |
| // allocate the space, but we are also not allowed to evict anything to |
| // make more space, return an error to the client. |
| break; |
| } |
| // Tell the eviction policy how much space we need to create this object. |
| std::vector<ObjectID> objects_to_evict; |
| bool success = eviction_policy_.RequireSpace(size, &objects_to_evict); |
| EvictObjects(objects_to_evict); |
| // Return an error to the client if not enough space could be freed to |
| // create the object. |
| if (!success) { |
| break; |
| } |
| } |
| |
| if (pointer != nullptr) { |
| GetMallocMapinfo(pointer, fd, map_size, offset); |
| ARROW_CHECK(*fd != -1); |
| } |
| return pointer; |
| } |
| |
| #ifdef PLASMA_CUDA |
| arrow::Result<std::shared_ptr<CudaContext>> PlasmaStore::GetCudaContext(int device_num) { |
| DCHECK_NE(device_num, 0); |
| ARROW_ASSIGN_OR_RAISE(auto manager, CudaDeviceManager::Instance()); |
| return manager->GetContext(device_num - 1); |
| } |
| |
| Status PlasmaStore::AllocateCudaMemory( |
| int device_num, int64_t size, uint8_t** out_pointer, |
| std::shared_ptr<CudaIpcMemHandle>* out_ipc_handle) { |
| ARROW_ASSIGN_OR_RAISE(auto context, GetCudaContext(device_num)); |
| ARROW_ASSIGN_OR_RAISE(auto cuda_buffer, context->Allocate(static_cast<int64_t>(size))); |
| *out_pointer = reinterpret_cast<uint8_t*>(cuda_buffer->address()); |
| // The IPC handle will keep the buffer memory alive |
| return cuda_buffer->ExportForIpc().Value(out_ipc_handle); |
| } |
| |
| Status PlasmaStore::FreeCudaMemory(int device_num, int64_t size, uint8_t* pointer) { |
| ARROW_ASSIGN_OR_RAISE(auto context, GetCudaContext(device_num)); |
| RETURN_NOT_OK(context->Free(pointer, size)); |
| return Status::OK(); |
| } |
| #endif |
| |
| // Create a new object buffer in the hash table. |
| PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_full, |
| int64_t data_size, int64_t metadata_size, |
| int device_num, Client* client, |
| PlasmaObject* result) { |
| ARROW_LOG(DEBUG) << "creating object " << object_id.hex(); |
| |
| auto entry = GetObjectTableEntry(&store_info_, object_id); |
| if (entry != nullptr) { |
| // There is already an object with the same ID in the Plasma Store, so |
| // ignore this request. |
| return PlasmaError::ObjectExists; |
| } |
| |
| int fd = -1; |
| int64_t map_size = 0; |
| ptrdiff_t offset = 0; |
| uint8_t* pointer = nullptr; |
| auto total_size = data_size + metadata_size; |
| |
| if (device_num == 0) { |
| pointer = |
| AllocateMemory(total_size, evict_if_full, &fd, &map_size, &offset, client, true); |
| if (!pointer) { |
| ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.hex() |
| << ", data_size=" << data_size |
| << ", metadata_size=" << metadata_size |
| << ", will send a reply of PlasmaError::OutOfMemory"; |
| return PlasmaError::OutOfMemory; |
| } |
| } else { |
| #ifdef PLASMA_CUDA |
| /// IPC GPU handle to share with clients. |
| std::shared_ptr<::arrow::cuda::CudaIpcMemHandle> ipc_handle; |
| auto st = AllocateCudaMemory(device_num, total_size, &pointer, &ipc_handle); |
| if (!st.ok()) { |
| ARROW_LOG(ERROR) << "Failed to allocate CUDA memory: " << st.ToString(); |
| return PlasmaError::OutOfMemory; |
| } |
| result->ipc_handle = ipc_handle; |
| #else |
| ARROW_LOG(ERROR) << "device_num != 0 but CUDA not enabled"; |
| return PlasmaError::OutOfMemory; |
| #endif |
| } |
| |
| auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry()); |
| entry = store_info_.objects.emplace(object_id, std::move(ptr)).first->second.get(); |
| entry->data_size = data_size; |
| entry->metadata_size = metadata_size; |
| entry->pointer = pointer; |
| // TODO(pcm): Set the other fields. |
| entry->fd = fd; |
| entry->map_size = map_size; |
| entry->offset = offset; |
| entry->state = ObjectState::PLASMA_CREATED; |
| entry->device_num = device_num; |
| entry->create_time = std::time(nullptr); |
| entry->construct_duration = -1; |
| |
| #ifdef PLASMA_CUDA |
| entry->ipc_handle = result->ipc_handle; |
| #endif |
| |
| result->store_fd = fd; |
| result->data_offset = offset; |
| result->metadata_offset = offset + data_size; |
| result->data_size = data_size; |
| result->metadata_size = metadata_size; |
| result->device_num = device_num; |
| // Notify the eviction policy that this object was created. This must be done |
| // immediately before the call to AddToClientObjectIds so that the |
| // eviction policy does not have an opportunity to evict the object. |
| eviction_policy_.ObjectCreated(object_id, client, true); |
| // Record that this client is using this object. |
| AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client); |
| return PlasmaError::OK; |
| } |
| |
| void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) { |
| DCHECK(object != nullptr); |
| DCHECK(entry != nullptr); |
| DCHECK(entry->state == ObjectState::PLASMA_SEALED); |
| #ifdef PLASMA_CUDA |
| if (entry->device_num != 0) { |
| object->ipc_handle = entry->ipc_handle; |
| } |
| #endif |
| object->store_fd = entry->fd; |
| object->data_offset = entry->offset; |
| object->metadata_offset = entry->offset + entry->data_size; |
| object->data_size = entry->data_size; |
| object->metadata_size = entry->metadata_size; |
| object->device_num = entry->device_num; |
| } |
| |
| void PlasmaStore::RemoveGetRequest(GetRequest* get_request) { |
| // Remove the get request from each of the relevant object_get_requests hash |
| // tables if it is present there. It should only be present there if the get |
| // request timed out or if it was issued by a client that has disconnected. |
| for (ObjectID& object_id : get_request->object_ids) { |
| auto object_request_iter = object_get_requests_.find(object_id); |
| if (object_request_iter != object_get_requests_.end()) { |
| auto& get_requests = object_request_iter->second; |
| // Erase get_req from the vector. |
| auto it = std::find(get_requests.begin(), get_requests.end(), get_request); |
| if (it != get_requests.end()) { |
| get_requests.erase(it); |
| // If the vector is empty, remove the object ID from the map. |
| if (get_requests.empty()) { |
| object_get_requests_.erase(object_request_iter); |
| } |
| } |
| } |
| } |
| // Remove the get request. |
| if (get_request->timer != -1) { |
| ARROW_CHECK(loop_->RemoveTimer(get_request->timer) == kEventLoopOk); |
| } |
| delete get_request; |
| } |
| |
| void PlasmaStore::RemoveGetRequestsForClient(Client* client) { |
| std::unordered_set<GetRequest*> get_requests_to_remove; |
| for (auto const& pair : object_get_requests_) { |
| for (GetRequest* get_request : pair.second) { |
| if (get_request->client == client) { |
| get_requests_to_remove.insert(get_request); |
| } |
| } |
| } |
| |
| // It shouldn't be possible for a given client to be in the middle of multiple get |
| // requests. |
| ARROW_CHECK(get_requests_to_remove.size() <= 1); |
| for (GetRequest* get_request : get_requests_to_remove) { |
| RemoveGetRequest(get_request); |
| } |
| } |
| |
| void PlasmaStore::ReturnFromGet(GetRequest* get_req) { |
| // Figure out how many file descriptors we need to send. |
| std::unordered_set<int> fds_to_send; |
| std::vector<int> store_fds; |
| std::vector<int64_t> mmap_sizes; |
| for (const auto& object_id : get_req->object_ids) { |
| PlasmaObject& object = get_req->objects[object_id]; |
| int fd = object.store_fd; |
| if (object.data_size != -1 && fds_to_send.count(fd) == 0 && fd != -1) { |
| fds_to_send.insert(fd); |
| store_fds.push_back(fd); |
| mmap_sizes.push_back(GetMmapSize(fd)); |
| } |
| } |
| |
| // Send the get reply to the client. |
| Status s = SendGetReply(get_req->client->fd, &get_req->object_ids[0], get_req->objects, |
| get_req->object_ids.size(), store_fds, mmap_sizes); |
| WarnIfSigpipe(s.ok() ? 0 : -1, get_req->client->fd); |
| // If we successfully sent the get reply message to the client, then also send |
| // the file descriptors. |
| if (s.ok()) { |
| // Send all of the file descriptors for the present objects. |
| for (int store_fd : store_fds) { |
| // Only send the file descriptor if it hasn't been sent (see analogous |
| // logic in GetStoreFd in client.cc). |
| if (get_req->client->used_fds.find(store_fd) == get_req->client->used_fds.end()) { |
| WarnIfSigpipe(send_fd(get_req->client->fd, store_fd), get_req->client->fd); |
| get_req->client->used_fds.insert(store_fd); |
| } |
| } |
| } |
| |
| // Remove the get request from each of the relevant object_get_requests hash |
| // tables if it is present there. It should only be present there if the get |
| // request timed out. |
| RemoveGetRequest(get_req); |
| } |
| |
| void PlasmaStore::UpdateObjectGetRequests(const ObjectID& object_id) { |
| auto it = object_get_requests_.find(object_id); |
| // If there are no get requests involving this object, then return. |
| if (it == object_get_requests_.end()) { |
| return; |
| } |
| |
| auto& get_requests = it->second; |
| |
| // After finishing the loop below, get_requests and it will have been |
| // invalidated by the removal of object_id from object_get_requests_. |
| size_t index = 0; |
| size_t num_requests = get_requests.size(); |
| for (size_t i = 0; i < num_requests; ++i) { |
| auto get_req = get_requests[index]; |
| auto entry = GetObjectTableEntry(&store_info_, object_id); |
| ARROW_CHECK(entry != nullptr); |
| |
| PlasmaObject_init(&get_req->objects[object_id], entry); |
| get_req->num_satisfied += 1; |
| // Record the fact that this client will be using this object and will |
| // be responsible for releasing this object. |
| AddToClientObjectIds(object_id, entry, get_req->client); |
| |
| // If this get request is done, reply to the client. |
| if (get_req->num_satisfied == get_req->num_objects_to_wait_for) { |
| ReturnFromGet(get_req); |
| } else { |
| // The call to ReturnFromGet will remove the current element in the |
| // array, so we only increment the counter in the else branch. |
| index += 1; |
| } |
| } |
| |
| // No get requests should be waiting for this object anymore. The object ID |
| // may have been removed from the object_get_requests_ by ReturnFromGet, but |
| // if the get request has not returned yet, then remove the object ID from the |
| // map here. |
| it = object_get_requests_.find(object_id); |
| if (it != object_get_requests_.end()) { |
| object_get_requests_.erase(object_id); |
| } |
| } |
| |
| void PlasmaStore::ProcessGetRequest(Client* client, |
| const std::vector<ObjectID>& object_ids, |
| int64_t timeout_ms) { |
| // Create a get request for this object. |
| auto get_req = new GetRequest(client, object_ids); |
| std::vector<ObjectID> evicted_ids; |
| std::vector<ObjectTableEntry*> evicted_entries; |
| for (auto object_id : object_ids) { |
| // Check if this object is already present locally. If so, record that the |
| // object is being used and mark it as accounted for. |
| auto entry = GetObjectTableEntry(&store_info_, object_id); |
| if (entry && entry->state == ObjectState::PLASMA_SEALED) { |
| // Update the get request to take into account the present object. |
| PlasmaObject_init(&get_req->objects[object_id], entry); |
| get_req->num_satisfied += 1; |
| // If necessary, record that this client is using this object. In the case |
| // where entry == NULL, this will be called from SealObject. |
| AddToClientObjectIds(object_id, entry, client); |
| } else if (entry && entry->state == ObjectState::PLASMA_EVICTED) { |
| // Make sure the object pointer is not already allocated |
| ARROW_CHECK(!entry->pointer); |
| |
| entry->pointer = |
| AllocateMemory(entry->data_size + entry->metadata_size, /*evict=*/true, |
| &entry->fd, &entry->map_size, &entry->offset, client, false); |
| if (entry->pointer) { |
| entry->state = ObjectState::PLASMA_CREATED; |
| entry->create_time = std::time(nullptr); |
| eviction_policy_.ObjectCreated(object_id, client, false); |
| AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client); |
| evicted_ids.push_back(object_id); |
| evicted_entries.push_back(entry); |
| } else { |
| // We are out of memory and cannot allocate memory for this object. |
| // Change the state of the object back to PLASMA_EVICTED so some |
| // other request can try again. |
| entry->state = ObjectState::PLASMA_EVICTED; |
| } |
| } else { |
| // Add a placeholder plasma object to the get request to indicate that the |
| // object is not present. This will be parsed by the client. We set the |
| // data size to -1 to indicate that the object is not present. |
| get_req->objects[object_id].data_size = -1; |
| // Add the get request to the relevant data structures. |
| object_get_requests_[object_id].push_back(get_req); |
| } |
| } |
| |
| if (!evicted_ids.empty()) { |
| unsigned char digest[kDigestSize] = {}; |
| std::vector<std::shared_ptr<Buffer>> buffers; |
| for (size_t i = 0; i < evicted_ids.size(); ++i) { |
| ARROW_CHECK(evicted_entries[i]->pointer != nullptr); |
| buffers.emplace_back(new arrow::MutableBuffer(evicted_entries[i]->pointer, |
| evicted_entries[i]->data_size)); |
| } |
| if (external_store_->Get(evicted_ids, buffers).ok()) { |
| for (size_t i = 0; i < evicted_ids.size(); ++i) { |
| evicted_entries[i]->state = ObjectState::PLASMA_SEALED; |
| std::memcpy(&evicted_entries[i]->digest[0], &digest[0], kDigestSize); |
| evicted_entries[i]->construct_duration = |
| std::time(nullptr) - evicted_entries[i]->create_time; |
| PlasmaObject_init(&get_req->objects[evicted_ids[i]], evicted_entries[i]); |
| get_req->num_satisfied += 1; |
| } |
| } else { |
| // We tried to get the objects from the external store, but could not get them. |
| // Set the state of these objects back to PLASMA_EVICTED so some other request |
| // can try again. |
| for (size_t i = 0; i < evicted_ids.size(); ++i) { |
| evicted_entries[i]->state = ObjectState::PLASMA_EVICTED; |
| } |
| } |
| } |
| |
| // If all of the objects are present already or if the timeout is 0, return to |
| // the client. |
| if (get_req->num_satisfied == get_req->num_objects_to_wait_for || timeout_ms == 0) { |
| ReturnFromGet(get_req); |
| } else if (timeout_ms != -1) { |
| // Set a timer that will cause the get request to return to the client. Note |
| // that a timeout of -1 is used to indicate that no timer should be set. |
| get_req->timer = loop_->AddTimer(timeout_ms, [this, get_req](int64_t timer_id) { |
| ReturnFromGet(get_req); |
| return kEventLoopTimerDone; |
| }); |
| } |
| } |
| |
| int PlasmaStore::RemoveFromClientObjectIds(const ObjectID& object_id, |
| ObjectTableEntry* entry, Client* client) { |
| auto it = client->object_ids.find(object_id); |
| if (it != client->object_ids.end()) { |
| client->object_ids.erase(it); |
| // Decrease reference count. |
| entry->ref_count--; |
| |
| // If no more clients are using this object, notify the eviction policy |
| // that the object is no longer being used. |
| if (entry->ref_count == 0) { |
| if (deletion_cache_.count(object_id) == 0) { |
| // Tell the eviction policy that this object is no longer being used. |
| eviction_policy_.EndObjectAccess(object_id); |
| } else { |
| // Above code does not really delete an object. Instead, it just put an |
| // object to LRU cache which will be cleaned when the memory is not enough. |
| deletion_cache_.erase(object_id); |
| EvictObjects({object_id}); |
| } |
| } |
| // Return 1 to indicate that the client was removed. |
| return 1; |
| } else { |
| // Return 0 to indicate that the client was not removed. |
| return 0; |
| } |
| } |
| |
| void PlasmaStore::EraseFromObjectTable(const ObjectID& object_id) { |
| auto& object = store_info_.objects[object_id]; |
| auto buff_size = object->data_size + object->metadata_size; |
| if (object->device_num == 0) { |
| PlasmaAllocator::Free(object->pointer, buff_size); |
| } else { |
| #ifdef PLASMA_CUDA |
| ARROW_CHECK_OK(FreeCudaMemory(object->device_num, buff_size, object->pointer)); |
| #endif |
| } |
| store_info_.objects.erase(object_id); |
| } |
| |
| void PlasmaStore::ReleaseObject(const ObjectID& object_id, Client* client) { |
| auto entry = GetObjectTableEntry(&store_info_, object_id); |
| ARROW_CHECK(entry != nullptr); |
| // Remove the client from the object's array of clients. |
| ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1); |
| } |
| |
| // Check if an object is present. |
| ObjectStatus PlasmaStore::ContainsObject(const ObjectID& object_id) { |
| auto entry = GetObjectTableEntry(&store_info_, object_id); |
| return entry && (entry->state == ObjectState::PLASMA_SEALED || |
| entry->state == ObjectState::PLASMA_EVICTED) |
| ? ObjectStatus::OBJECT_FOUND |
| : ObjectStatus::OBJECT_NOT_FOUND; |
| } |
| |
| void PlasmaStore::SealObjects(const std::vector<ObjectID>& object_ids, |
| const std::vector<std::string>& digests) { |
| std::vector<ObjectInfoT> infos; |
| |
| ARROW_LOG(DEBUG) << "sealing " << object_ids.size() << " objects"; |
| for (size_t i = 0; i < object_ids.size(); ++i) { |
| ObjectInfoT object_info; |
| auto entry = GetObjectTableEntry(&store_info_, object_ids[i]); |
| ARROW_CHECK(entry != nullptr); |
| ARROW_CHECK(entry->state == ObjectState::PLASMA_CREATED); |
| // Set the state of object to SEALED. |
| entry->state = ObjectState::PLASMA_SEALED; |
| // Set the object digest. |
| std::memcpy(&entry->digest[0], digests[i].c_str(), kDigestSize); |
| // Set object construction duration. |
| entry->construct_duration = std::time(nullptr) - entry->create_time; |
| |
| object_info.object_id = object_ids[i].binary(); |
| object_info.data_size = entry->data_size; |
| object_info.metadata_size = entry->metadata_size; |
| object_info.digest = digests[i]; |
| infos.push_back(object_info); |
| } |
| |
| PushNotifications(infos); |
| |
| for (size_t i = 0; i < object_ids.size(); ++i) { |
| UpdateObjectGetRequests(object_ids[i]); |
| } |
| } |
| |
| int PlasmaStore::AbortObject(const ObjectID& object_id, Client* client) { |
| auto entry = GetObjectTableEntry(&store_info_, object_id); |
| ARROW_CHECK(entry != nullptr) << "To abort an object it must be in the object table."; |
| ARROW_CHECK(entry->state != ObjectState::PLASMA_SEALED) |
| << "To abort an object it must not have been sealed."; |
| auto it = client->object_ids.find(object_id); |
| if (it == client->object_ids.end()) { |
| // If the client requesting the abort is not the creator, do not |
| // perform the abort. |
| return 0; |
| } else { |
| // The client requesting the abort is the creator. Free the object. |
| EraseFromObjectTable(object_id); |
| client->object_ids.erase(it); |
| return 1; |
| } |
| } |
| |
| PlasmaError PlasmaStore::DeleteObject(ObjectID& object_id) { |
| auto entry = GetObjectTableEntry(&store_info_, object_id); |
| // TODO(rkn): This should probably not fail, but should instead throw an |
| // error. Maybe we should also support deleting objects that have been |
| // created but not sealed. |
| if (entry == nullptr) { |
| // To delete an object it must be in the object table. |
| return PlasmaError::ObjectNotFound; |
| } |
| |
| if (entry->state != ObjectState::PLASMA_SEALED) { |
| // To delete an object it must have been sealed. |
| // Put it into deletion cache, it will be deleted later. |
| deletion_cache_.emplace(object_id); |
| return PlasmaError::ObjectNotSealed; |
| } |
| |
| if (entry->ref_count != 0) { |
| // To delete an object, there must be no clients currently using it. |
| // Put it into deletion cache, it will be deleted later. |
| deletion_cache_.emplace(object_id); |
| return PlasmaError::ObjectInUse; |
| } |
| |
| eviction_policy_.RemoveObject(object_id); |
| EraseFromObjectTable(object_id); |
| // Inform all subscribers that the object has been deleted. |
| fb::ObjectInfoT notification; |
| notification.object_id = object_id.binary(); |
| notification.is_deletion = true; |
| PushNotification(¬ification); |
| |
| return PlasmaError::OK; |
| } |
| |
| void PlasmaStore::EvictObjects(const std::vector<ObjectID>& object_ids) { |
| if (object_ids.size() == 0) { |
| return; |
| } |
| |
| std::vector<std::shared_ptr<arrow::Buffer>> evicted_object_data; |
| std::vector<ObjectTableEntry*> evicted_entries; |
| for (const auto& object_id : object_ids) { |
| ARROW_LOG(DEBUG) << "evicting object " << object_id.hex(); |
| auto entry = GetObjectTableEntry(&store_info_, object_id); |
| // TODO(rkn): This should probably not fail, but should instead throw an |
| // error. Maybe we should also support deleting objects that have been |
| // created but not sealed. |
| ARROW_CHECK(entry != nullptr) << "To evict an object it must be in the object table."; |
| ARROW_CHECK(entry->state == ObjectState::PLASMA_SEALED) |
| << "To evict an object it must have been sealed."; |
| ARROW_CHECK(entry->ref_count == 0) |
| << "To evict an object, there must be no clients currently using it."; |
| |
| // If there is a backing external store, then mark object for eviction to |
| // external store, free the object data pointer and keep a placeholder |
| // entry in ObjectTable |
| if (external_store_) { |
| evicted_object_data.push_back(std::make_shared<arrow::Buffer>( |
| entry->pointer, entry->data_size + entry->metadata_size)); |
| evicted_entries.push_back(entry); |
| } else { |
| // If there is no backing external store, just erase the object entry |
| // and send a deletion notification. |
| EraseFromObjectTable(object_id); |
| // Inform all subscribers that the object has been deleted. |
| fb::ObjectInfoT notification; |
| notification.object_id = object_id.binary(); |
| notification.is_deletion = true; |
| PushNotification(¬ification); |
| } |
| } |
| |
| if (external_store_ && !object_ids.empty()) { |
| ARROW_CHECK_OK(external_store_->Put(object_ids, evicted_object_data)); |
| for (auto entry : evicted_entries) { |
| PlasmaAllocator::Free(entry->pointer, entry->data_size + entry->metadata_size); |
| entry->pointer = nullptr; |
| entry->state = ObjectState::PLASMA_EVICTED; |
| } |
| } |
| } |
| |
| void PlasmaStore::ConnectClient(int listener_sock) { |
| int client_fd = AcceptClient(listener_sock); |
| |
| Client* client = new Client(client_fd); |
| connected_clients_[client_fd] = std::unique_ptr<Client>(client); |
| |
| // Add a callback to handle events on this socket. |
| // TODO(pcm): Check return value. |
| loop_->AddFileEvent(client_fd, kEventLoopRead, [this, client](int events) { |
| Status s = ProcessMessage(client); |
| if (!s.ok()) { |
| ARROW_LOG(FATAL) << "Failed to process file event: " << s; |
| } |
| }); |
| ARROW_LOG(DEBUG) << "New connection with fd " << client_fd; |
| } |
| |
| void PlasmaStore::DisconnectClient(int client_fd) { |
| ARROW_CHECK(client_fd > 0); |
| auto it = connected_clients_.find(client_fd); |
| ARROW_CHECK(it != connected_clients_.end()); |
| loop_->RemoveFileEvent(client_fd); |
| // Close the socket. |
| close(client_fd); |
| ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd; |
| // Release all the objects that the client was using. |
| auto client = it->second.get(); |
| eviction_policy_.ClientDisconnected(client); |
| std::unordered_map<ObjectID, ObjectTableEntry*> sealed_objects; |
| for (const auto& object_id : client->object_ids) { |
| auto it = store_info_.objects.find(object_id); |
| if (it == store_info_.objects.end()) { |
| continue; |
| } |
| |
| if (it->second->state == ObjectState::PLASMA_SEALED) { |
| // Add sealed objects to a temporary list of object IDs. Do not perform |
| // the remove here, since it potentially modifies the object_ids table. |
| sealed_objects[it->first] = it->second.get(); |
| } else { |
| // Abort unsealed object. |
| // Don't call AbortObject() because client->object_ids would be modified. |
| EraseFromObjectTable(object_id); |
| } |
| } |
| |
| /// Remove all of the client's GetRequests. |
| RemoveGetRequestsForClient(client); |
| |
| for (const auto& entry : sealed_objects) { |
| RemoveFromClientObjectIds(entry.first, entry.second, client); |
| } |
| |
| if (client->notification_fd > 0) { |
| // This client has subscribed for notifications. |
| auto notify_fd = client->notification_fd; |
| loop_->RemoveFileEvent(notify_fd); |
| // Close socket. |
| close(notify_fd); |
| // Remove notification queue for this fd from global map. |
| pending_notifications_.erase(notify_fd); |
| // Reset fd. |
| client->notification_fd = -1; |
| } |
| |
| connected_clients_.erase(it); |
| } |
| |
| /// Send notifications about sealed objects to the subscribers. This is called |
| /// in SealObject. If the socket's send buffer is full, the notification will |
| /// be buffered, and this will be called again when the send buffer has room. |
| /// Since we call erase on pending_notifications_, all iterators get |
| /// invalidated, which is why we return a valid iterator to the next client to |
| /// be used in PushNotification. |
| /// |
| /// \param it Iterator that points to the client to send the notification to. |
| /// \return Iterator pointing to the next client. |
| PlasmaStore::NotificationMap::iterator PlasmaStore::SendNotifications( |
| PlasmaStore::NotificationMap::iterator it) { |
| int client_fd = it->first; |
| auto& notifications = it->second.object_notifications; |
| |
| int num_processed = 0; |
| bool closed = false; |
| // Loop over the array of pending notifications and send as many of them as |
| // possible. |
| for (size_t i = 0; i < notifications.size(); ++i) { |
| auto& notification = notifications.at(i); |
| // Decode the length, which is the first bytes of the message. |
| int64_t size = *(reinterpret_cast<int64_t*>(notification.get())); |
| |
| // Attempt to send a notification about this object ID. |
| ssize_t nbytes = send(client_fd, notification.get(), sizeof(int64_t) + size, 0); |
| if (nbytes >= 0) { |
| ARROW_CHECK(nbytes == static_cast<ssize_t>(sizeof(int64_t)) + size); |
| } else if (nbytes == -1 && |
| (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) { |
| ARROW_LOG(DEBUG) << "The socket's send buffer is full, so we are caching this " |
| "notification and will send it later."; |
| // Add a callback to the event loop to send queued notifications whenever |
| // there is room in the socket's send buffer. Callbacks can be added |
| // more than once here and will be overwritten. The callback is removed |
| // at the end of the method. |
| // TODO(pcm): Introduce status codes and check in case the file descriptor |
| // is added twice. |
| loop_->AddFileEvent(client_fd, kEventLoopWrite, [this, client_fd](int events) { |
| SendNotifications(pending_notifications_.find(client_fd)); |
| }); |
| break; |
| } else { |
| ARROW_LOG(WARNING) << "Failed to send notification to client on fd " << client_fd; |
| if (errno == EPIPE) { |
| closed = true; |
| break; |
| } |
| } |
| num_processed += 1; |
| } |
| // Remove the sent notifications from the array. |
| notifications.erase(notifications.begin(), notifications.begin() + num_processed); |
| |
| // If we have sent all notifications, remove the fd from the event loop. |
| if (notifications.empty()) { |
| loop_->RemoveFileEvent(client_fd); |
| } |
| |
| // Stop sending notifications if the pipe was broken. |
| if (closed) { |
| close(client_fd); |
| return pending_notifications_.erase(it); |
| } else { |
| return ++it; |
| } |
| } |
| |
| void PlasmaStore::PushNotification(fb::ObjectInfoT* object_info) { |
| auto it = pending_notifications_.begin(); |
| while (it != pending_notifications_.end()) { |
| std::vector<fb::ObjectInfoT> info; |
| info.push_back(*object_info); |
| auto notification = CreatePlasmaNotificationBuffer(info); |
| it->second.object_notifications.emplace_back(std::move(notification)); |
| it = SendNotifications(it); |
| } |
| } |
| |
| void PlasmaStore::PushNotifications(std::vector<fb::ObjectInfoT>& object_info) { |
| auto it = pending_notifications_.begin(); |
| while (it != pending_notifications_.end()) { |
| auto notifications = CreatePlasmaNotificationBuffer(object_info); |
| it->second.object_notifications.emplace_back(std::move(notifications)); |
| it = SendNotifications(it); |
| } |
| } |
| |
| void PlasmaStore::PushNotification(fb::ObjectInfoT* object_info, int client_fd) { |
| auto it = pending_notifications_.find(client_fd); |
| if (it != pending_notifications_.end()) { |
| std::vector<fb::ObjectInfoT> info; |
| info.push_back(*object_info); |
| auto notification = CreatePlasmaNotificationBuffer(info); |
| it->second.object_notifications.emplace_back(std::move(notification)); |
| SendNotifications(it); |
| } |
| } |
| |
| // Subscribe to notifications about sealed objects. |
| void PlasmaStore::SubscribeToUpdates(Client* client) { |
| ARROW_LOG(DEBUG) << "subscribing to updates on fd " << client->fd; |
| if (client->notification_fd > 0) { |
| // This client has already subscribed. Return. |
| return; |
| } |
| |
| // TODO(rkn): The store could block here if the client doesn't send a file |
| // descriptor. |
| int fd = recv_fd(client->fd); |
| if (fd < 0) { |
| // This may mean that the client died before sending the file descriptor. |
| ARROW_LOG(WARNING) << "Failed to receive file descriptor from client on fd " |
| << client->fd << "."; |
| return; |
| } |
| |
| // Add this fd to global map, which is needed for this client to receive notifications. |
| pending_notifications_[fd]; |
| client->notification_fd = fd; |
| |
| // Push notifications to the new subscriber about existing sealed objects. |
| for (const auto& entry : store_info_.objects) { |
| if (entry.second->state == ObjectState::PLASMA_SEALED) { |
| ObjectInfoT info; |
| info.object_id = entry.first.binary(); |
| info.data_size = entry.second->data_size; |
| info.metadata_size = entry.second->metadata_size; |
| info.digest = |
| std::string(reinterpret_cast<char*>(&entry.second->digest[0]), kDigestSize); |
| PushNotification(&info, fd); |
| } |
| } |
| } |
| |
| Status PlasmaStore::ProcessMessage(Client* client) { |
| fb::MessageType type; |
| Status s = ReadMessage(client->fd, &type, &input_buffer_); |
| ARROW_CHECK(s.ok() || s.IsIOError()); |
| |
| uint8_t* input = input_buffer_.data(); |
| size_t input_size = input_buffer_.size(); |
| ObjectID object_id; |
| PlasmaObject object = {}; |
| |
| // Process the different types of requests. |
| switch (type) { |
| case fb::MessageType::PlasmaCreateRequest: { |
| bool evict_if_full; |
| int64_t data_size; |
| int64_t metadata_size; |
| int device_num; |
| RETURN_NOT_OK(ReadCreateRequest(input, input_size, &object_id, &evict_if_full, |
| &data_size, &metadata_size, &device_num)); |
| PlasmaError error_code = CreateObject(object_id, evict_if_full, data_size, |
| metadata_size, device_num, client, &object); |
| int64_t mmap_size = 0; |
| if (error_code == PlasmaError::OK && device_num == 0) { |
| mmap_size = GetMmapSize(object.store_fd); |
| } |
| HANDLE_SIGPIPE( |
| SendCreateReply(client->fd, object_id, &object, error_code, mmap_size), |
| client->fd); |
| // Only send the file descriptor if it hasn't been sent (see analogous |
| // logic in GetStoreFd in client.cc). Similar in ReturnFromGet. |
| if (error_code == PlasmaError::OK && device_num == 0 && |
| client->used_fds.find(object.store_fd) == client->used_fds.end()) { |
| WarnIfSigpipe(send_fd(client->fd, object.store_fd), client->fd); |
| client->used_fds.insert(object.store_fd); |
| } |
| } break; |
| case fb::MessageType::PlasmaCreateAndSealRequest: { |
| bool evict_if_full; |
| std::string data; |
| std::string metadata; |
| std::string digest; |
| digest.reserve(kDigestSize); |
| RETURN_NOT_OK(ReadCreateAndSealRequest(input, input_size, &object_id, |
| &evict_if_full, &data, &metadata, &digest)); |
| // CreateAndSeal currently only supports device_num = 0, which corresponds |
| // to the host. |
| int device_num = 0; |
| PlasmaError error_code = CreateObject(object_id, evict_if_full, data.size(), |
| metadata.size(), device_num, client, &object); |
| |
| // If the object was successfully created, fill out the object data and seal it. |
| if (error_code == PlasmaError::OK) { |
| auto entry = GetObjectTableEntry(&store_info_, object_id); |
| ARROW_CHECK(entry != nullptr); |
| // Write the inlined data and metadata into the allocated object. |
| std::memcpy(entry->pointer, data.data(), data.size()); |
| std::memcpy(entry->pointer + data.size(), metadata.data(), metadata.size()); |
| SealObjects({object_id}, {digest}); |
| // Remove the client from the object's array of clients because the |
| // object is not being used by any client. The client was added to the |
| // object's array of clients in CreateObject. This is analogous to the |
| // Release call that happens in the client's Seal method. |
| ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1); |
| } |
| |
| // Reply to the client. |
| HANDLE_SIGPIPE(SendCreateAndSealReply(client->fd, error_code), client->fd); |
| } break; |
| case fb::MessageType::PlasmaCreateAndSealBatchRequest: { |
| bool evict_if_full; |
| std::vector<ObjectID> object_ids; |
| std::vector<std::string> data; |
| std::vector<std::string> metadata; |
| std::vector<std::string> digests; |
| |
| RETURN_NOT_OK(ReadCreateAndSealBatchRequest( |
| input, input_size, &object_ids, &evict_if_full, &data, &metadata, &digests)); |
| |
| // CreateAndSeal currently only supports device_num = 0, which corresponds |
| // to the host. |
| int device_num = 0; |
| size_t i = 0; |
| PlasmaError error_code = PlasmaError::OK; |
| for (i = 0; i < object_ids.size(); i++) { |
| error_code = CreateObject(object_ids[i], evict_if_full, data[i].size(), |
| metadata[i].size(), device_num, client, &object); |
| if (error_code != PlasmaError::OK) { |
| break; |
| } |
| } |
| |
| // if OK, seal all the objects, |
| // if error, abort the previous i objects immediately |
| if (error_code == PlasmaError::OK) { |
| for (i = 0; i < object_ids.size(); i++) { |
| auto entry = GetObjectTableEntry(&store_info_, object_ids[i]); |
| ARROW_CHECK(entry != nullptr); |
| // Write the inlined data and metadata into the allocated object. |
| std::memcpy(entry->pointer, data[i].data(), data[i].size()); |
| std::memcpy(entry->pointer + data[i].size(), metadata[i].data(), |
| metadata[i].size()); |
| } |
| |
| SealObjects(object_ids, digests); |
| // Remove the client from the object's array of clients because the |
| // object is not being used by any client. The client was added to the |
| // object's array of clients in CreateObject. This is analogous to the |
| // Release call that happens in the client's Seal method. |
| for (i = 0; i < object_ids.size(); i++) { |
| auto entry = GetObjectTableEntry(&store_info_, object_ids[i]); |
| ARROW_CHECK(RemoveFromClientObjectIds(object_ids[i], entry, client) == 1); |
| } |
| } else { |
| for (size_t j = 0; j < i; j++) { |
| AbortObject(object_ids[j], client); |
| } |
| } |
| |
| HANDLE_SIGPIPE(SendCreateAndSealBatchReply(client->fd, error_code), client->fd); |
| } break; |
| case fb::MessageType::PlasmaAbortRequest: { |
| RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id)); |
| ARROW_CHECK(AbortObject(object_id, client) == 1) << "To abort an object, the only " |
| "client currently using it " |
| "must be the creator."; |
| HANDLE_SIGPIPE(SendAbortReply(client->fd, object_id), client->fd); |
| } break; |
| case fb::MessageType::PlasmaGetRequest: { |
| std::vector<ObjectID> object_ids_to_get; |
| int64_t timeout_ms; |
| RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get, &timeout_ms)); |
| ProcessGetRequest(client, object_ids_to_get, timeout_ms); |
| } break; |
| case fb::MessageType::PlasmaReleaseRequest: { |
| RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id)); |
| ReleaseObject(object_id, client); |
| } break; |
| case fb::MessageType::PlasmaDeleteRequest: { |
| std::vector<ObjectID> object_ids; |
| std::vector<PlasmaError> error_codes; |
| RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_ids)); |
| error_codes.reserve(object_ids.size()); |
| for (auto& object_id : object_ids) { |
| error_codes.push_back(DeleteObject(object_id)); |
| } |
| HANDLE_SIGPIPE(SendDeleteReply(client->fd, object_ids, error_codes), client->fd); |
| } break; |
| case fb::MessageType::PlasmaContainsRequest: { |
| RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id)); |
| if (ContainsObject(object_id) == ObjectStatus::OBJECT_FOUND) { |
| HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 1), client->fd); |
| } else { |
| HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd); |
| } |
| } break; |
| case fb::MessageType::PlasmaListRequest: { |
| RETURN_NOT_OK(ReadListRequest(input, input_size)); |
| HANDLE_SIGPIPE(SendListReply(client->fd, store_info_.objects), client->fd); |
| } break; |
| case fb::MessageType::PlasmaSealRequest: { |
| std::string digest; |
| RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest)); |
| SealObjects({object_id}, {digest}); |
| HANDLE_SIGPIPE(SendSealReply(client->fd, object_id, PlasmaError::OK), client->fd); |
| } break; |
| case fb::MessageType::PlasmaEvictRequest: { |
| // This code path should only be used for testing. |
| int64_t num_bytes; |
| RETURN_NOT_OK(ReadEvictRequest(input, input_size, &num_bytes)); |
| std::vector<ObjectID> objects_to_evict; |
| int64_t num_bytes_evicted = |
| eviction_policy_.ChooseObjectsToEvict(num_bytes, &objects_to_evict); |
| EvictObjects(objects_to_evict); |
| HANDLE_SIGPIPE(SendEvictReply(client->fd, num_bytes_evicted), client->fd); |
| } break; |
| case fb::MessageType::PlasmaRefreshLRURequest: { |
| std::vector<ObjectID> object_ids; |
| RETURN_NOT_OK(ReadRefreshLRURequest(input, input_size, &object_ids)); |
| eviction_policy_.RefreshObjects(object_ids); |
| HANDLE_SIGPIPE(SendRefreshLRUReply(client->fd), client->fd); |
| } break; |
| case fb::MessageType::PlasmaSubscribeRequest: |
| SubscribeToUpdates(client); |
| break; |
| case fb::MessageType::PlasmaConnectRequest: { |
| HANDLE_SIGPIPE(SendConnectReply(client->fd, PlasmaAllocator::GetFootprintLimit()), |
| client->fd); |
| } break; |
| case fb::MessageType::PlasmaDisconnectClient: |
| ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd; |
| DisconnectClient(client->fd); |
| break; |
| case fb::MessageType::PlasmaSetOptionsRequest: { |
| std::string client_name; |
| int64_t output_memory_quota; |
| RETURN_NOT_OK( |
| ReadSetOptionsRequest(input, input_size, &client_name, &output_memory_quota)); |
| client->name = client_name; |
| bool success = eviction_policy_.SetClientQuota(client, output_memory_quota); |
| HANDLE_SIGPIPE(SendSetOptionsReply(client->fd, success ? PlasmaError::OK |
| : PlasmaError::OutOfMemory), |
| client->fd); |
| } break; |
| case fb::MessageType::PlasmaGetDebugStringRequest: { |
| HANDLE_SIGPIPE(SendGetDebugStringReply(client->fd, eviction_policy_.DebugString()), |
| client->fd); |
| } break; |
| default: |
| // This code should be unreachable. |
| ARROW_CHECK(0); |
| } |
| return Status::OK(); |
| } |
| |
| class PlasmaStoreRunner { |
| public: |
| PlasmaStoreRunner() {} |
| |
| void Start(char* socket_name, std::string directory, bool hugepages_enabled, |
| std::shared_ptr<ExternalStore> external_store) { |
| // Create the event loop. |
| loop_.reset(new EventLoop); |
| store_.reset(new PlasmaStore(loop_.get(), directory, hugepages_enabled, socket_name, |
| external_store)); |
| plasma_config = store_->GetPlasmaStoreInfo(); |
| |
| // We are using a single memory-mapped file by mallocing and freeing a single |
| // large amount of space up front. According to the documentation, |
| // dlmalloc might need up to 128*sizeof(size_t) bytes for internal |
| // bookkeeping. |
| void* pointer = plasma::PlasmaAllocator::Memalign( |
| kBlockSize, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t)); |
| ARROW_CHECK(pointer != nullptr); |
| // This will unmap the file, but the next one created will be as large |
| // as this one (this is an implementation detail of dlmalloc). |
| plasma::PlasmaAllocator::Free( |
| pointer, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t)); |
| |
| int socket = BindIpcSock(socket_name, true); |
| // TODO(pcm): Check return value. |
| ARROW_CHECK(socket >= 0); |
| |
| loop_->AddFileEvent(socket, kEventLoopRead, [this, socket](int events) { |
| this->store_->ConnectClient(socket); |
| }); |
| loop_->Start(); |
| } |
| |
| void Stop() { loop_->Stop(); } |
| |
| void Shutdown() { |
| loop_->Shutdown(); |
| loop_ = nullptr; |
| store_ = nullptr; |
| } |
| |
| private: |
| std::unique_ptr<EventLoop> loop_; |
| std::unique_ptr<PlasmaStore> store_; |
| }; |
| |
| static std::unique_ptr<PlasmaStoreRunner> g_runner = nullptr; |
| |
| void HandleSignal(int signal) { |
| if (signal == SIGTERM) { |
| ARROW_LOG(INFO) << "SIGTERM Signal received, closing Plasma Server..."; |
| if (g_runner != nullptr) { |
| g_runner->Stop(); |
| } |
| } |
| } |
| |
| void StartServer(char* socket_name, std::string plasma_directory, bool hugepages_enabled, |
| std::shared_ptr<ExternalStore> external_store) { |
| // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write |
| // to a client that has already died, the store could die. |
| signal(SIGPIPE, SIG_IGN); |
| |
| g_runner.reset(new PlasmaStoreRunner()); |
| signal(SIGTERM, HandleSignal); |
| g_runner->Start(socket_name, plasma_directory, hugepages_enabled, external_store); |
| } |
| |
| // Function to use (instead of ARROW_LOG(FATAL)) for usage, etc. errors before |
| // the main server loop starts, so users don't get a backtrace if they |
| // simply forgot a command-line switch. |
| void ExitWithUsageError(const char* error_msg) { |
| std::cerr << gflags::ProgramInvocationShortName() << ": " << error_msg << std::endl; |
| exit(1); |
| } |
| |
| } // namespace plasma |
| |
| #ifdef __linux__ |
| #define SHM_DEFAULT_PATH "/dev/shm" |
| #else |
| #define SHM_DEFAULT_PATH "/tmp" |
| #endif |
| |
| // Command-line flags. |
| DEFINE_string(d, SHM_DEFAULT_PATH, "directory where to create the memory-backed file"); |
| DEFINE_string(e, "", |
| "endpoint for external storage service, where objects " |
| "evicted from Plasma store can be written to, optional"); |
| DEFINE_bool(h, false, "whether to enable hugepage support"); |
| DEFINE_string(s, "", |
| "socket name where the Plasma store will listen for requests, required"); |
| DEFINE_string(m, "", "amount of memory in bytes to use for Plasma store, required"); |
| |
| int main(int argc, char* argv[]) { |
| ArrowLog::StartArrowLog(argv[0], ArrowLogLevel::ARROW_INFO); |
| ArrowLog::InstallFailureSignalHandler(); |
| |
| gflags::SetUsageMessage("Shared-memory server for Arrow data.\nUsage: "); |
| gflags::SetVersionString(ARROW_VERSION_STRING); |
| |
| char* socket_name = nullptr; |
| // Directory where plasma memory mapped files are stored. |
| std::string plasma_directory; |
| std::string external_store_endpoint; |
| bool hugepages_enabled = false; |
| int64_t system_memory = -1; |
| |
| gflags::ParseCommandLineFlags(&argc, &argv, /*remove_flags=*/true); |
| plasma_directory = FLAGS_d; |
| external_store_endpoint = FLAGS_e; |
| hugepages_enabled = FLAGS_h; |
| if (!FLAGS_s.empty()) { |
| // We only check below if socket_name is null, so don't set it if the flag was empty. |
| socket_name = const_cast<char*>(FLAGS_s.c_str()); |
| } |
| |
| if (!FLAGS_m.empty()) { |
| char extra; |
| int scanned = sscanf(FLAGS_m.c_str(), "%" SCNd64 "%c", &system_memory, &extra); |
| if (scanned != 1) { |
| plasma::ExitWithUsageError( |
| "-m switch takes memory in bytes, with no letter suffix allowed"); |
| } |
| |
| // Set system memory capacity |
| plasma::PlasmaAllocator::SetFootprintLimit(static_cast<size_t>(system_memory)); |
| ARROW_LOG(INFO) << "Allowing the Plasma store to use up to " |
| << static_cast<double>(system_memory) / 1000000000 << "GB of memory."; |
| } |
| |
| // Sanity check command line options. |
| if (socket_name == nullptr && system_memory == -1) { |
| // Nicer error message for the case where the user ran the program without |
| // any of the required command-line switches. |
| plasma::ExitWithUsageError( |
| "please specify socket for incoming connections with -s, " |
| "and the amount of memory (in bytes) to use with -m"); |
| } else if (socket_name == nullptr) { |
| plasma::ExitWithUsageError("please specify socket for incoming connections with -s"); |
| } else if (system_memory == -1) { |
| plasma::ExitWithUsageError( |
| "please specify the amount of memory (in bytes) to use with -m"); |
| } |
| if (hugepages_enabled && plasma_directory.empty()) { |
| plasma::ExitWithUsageError( |
| "if you want to use hugepages, please specify path to huge pages " |
| "filesystem with -d"); |
| } |
| ARROW_CHECK(!plasma_directory.empty()); |
| ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory |
| << " and huge page support " |
| << (hugepages_enabled ? "enabled" : "disabled"); |
| |
| #ifdef __linux__ |
| if (!hugepages_enabled) { |
| // On Linux, check that the amount of memory available in /dev/shm is large |
| // enough to accommodate the request. If it isn't, then fail. |
| int shm_fd = open(plasma_directory.c_str(), O_RDONLY); |
| struct statvfs shm_vfs_stats; |
| fstatvfs(shm_fd, &shm_vfs_stats); |
| // The value shm_vfs_stats.f_bsize is the block size, and the value |
| // shm_vfs_stats.f_bavail is the number of available blocks. |
| int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail; |
| close(shm_fd); |
| // Keep some safety margin for allocator fragmentation. |
| shm_mem_avail = 9 * shm_mem_avail / 10; |
| if (system_memory > shm_mem_avail) { |
| ARROW_LOG(WARNING) |
| << "System memory request exceeds memory available in " << plasma_directory |
| << ". The request is for " << system_memory |
| << " bytes, and the amount available is " << shm_mem_avail |
| << " bytes. You may be able to free up space by deleting files in " |
| "/dev/shm. If you are inside a Docker container, you may need to " |
| "pass an argument with the flag '--shm-size' to 'docker run'."; |
| system_memory = shm_mem_avail; |
| } |
| } else { |
| plasma::SetMallocGranularity(1024 * 1024 * 1024); // 1 GiB |
| } |
| #endif |
| |
| // Get external store |
| std::shared_ptr<plasma::ExternalStore> external_store{nullptr}; |
| if (!external_store_endpoint.empty()) { |
| std::string name; |
| ARROW_CHECK_OK( |
| plasma::ExternalStores::ExtractStoreName(external_store_endpoint, &name)); |
| external_store = plasma::ExternalStores::GetStore(name); |
| if (external_store == nullptr) { |
| std::ostringstream error_msg; |
| error_msg << "no such external store \"" << name << "\""; |
| plasma::ExitWithUsageError(error_msg.str().c_str()); |
| } |
| ARROW_LOG(DEBUG) << "connecting to external store..."; |
| ARROW_CHECK_OK(external_store->Connect(external_store_endpoint)); |
| } |
| |
| ARROW_LOG(DEBUG) << "starting server listening on " << socket_name; |
| plasma::StartServer(socket_name, plasma_directory, hugepages_enabled, external_store); |
| plasma::g_runner->Shutdown(); |
| plasma::g_runner = nullptr; |
| |
| ArrowLog::UninstallSignalAction(); |
| ArrowLog::ShutDownArrowLog(); |
| return 0; |
| } |