blob: 18279893887d72720e2b7b8a89bf8e0e5d14381b [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <deque>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "plasma/common.h"
#include "plasma/events.h"
#include "plasma/external_store.h"
#include "plasma/plasma.h"
#include "plasma/protocol.h"
#include "plasma/quota_aware_policy.h"
namespace arrow {
class Status;
} // namespace arrow
namespace plasma {
namespace flatbuf {
struct ObjectInfoT;
enum class PlasmaError;
} // namespace flatbuf
using flatbuf::ObjectInfoT;
using flatbuf::PlasmaError;
struct GetRequest;
struct NotificationQueue {
/// The object notifications for clients. We notify the client about the
/// objects in the order that the objects were sealed or deleted.
std::deque<std::unique_ptr<uint8_t[]>> object_notifications;
};
class PlasmaStore {
public:
using NotificationMap = std::unordered_map<int, NotificationQueue>;
// TODO: PascalCase PlasmaStore methods.
PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_enabled,
const std::string& socket_name,
std::shared_ptr<ExternalStore> external_store);
~PlasmaStore();
/// Get a const pointer to the internal PlasmaStoreInfo object.
const PlasmaStoreInfo* GetPlasmaStoreInfo();
/// Create a new object. The client must do a call to release_object to tell
/// the store when it is done with the object.
///
/// \param object_id Object ID of the object to be created.
/// \param evict_if_full If this is true, then when the object store is full,
/// try to evict objects that are not currently referenced before
/// creating the object. Else, do not evict any objects and
/// immediately return an PlasmaError::OutOfMemory.
/// \param data_size Size in bytes of the object to be created.
/// \param metadata_size Size in bytes of the object metadata.
/// \param device_num The number of the device where the object is being
/// created.
/// device_num = 0 corresponds to the host,
/// device_num = 1 corresponds to GPU0,
/// device_num = 2 corresponds to GPU1, etc.
/// \param client The client that created the object.
/// \param result The object that has been created.
/// \return One of the following error codes:
/// - PlasmaError::OK, if the object was created successfully.
/// - PlasmaError::ObjectExists, if an object with this ID is already
/// present in the store. In this case, the client should not call
/// plasma_release.
/// - PlasmaError::OutOfMemory, if the store is out of memory and
/// cannot create the object. In this case, the client should not call
/// plasma_release.
PlasmaError CreateObject(const ObjectID& object_id, bool evict_if_full,
int64_t data_size, int64_t metadata_size, int device_num,
Client* client, PlasmaObject* result);
/// Abort a created but unsealed object. If the client is not the
/// creator, then the abort will fail.
///
/// \param object_id Object ID of the object to be aborted.
/// \param client The client who created the object. If this does not
/// match the creator of the object, then the abort will fail.
/// \return 1 if the abort succeeds, else 0.
int AbortObject(const ObjectID& object_id, Client* client);
/// Delete a specific object by object_id that have been created in the hash table.
///
/// \param object_id Object ID of the object to be deleted.
/// \return One of the following error codes:
/// - PlasmaError::OK, if the object was delete successfully.
/// - PlasmaError::ObjectNotFound, if ths object isn't existed.
/// - PlasmaError::ObjectInUse, if the object is in use.
PlasmaError DeleteObject(ObjectID& object_id);
/// Evict objects returned by the eviction policy.
///
/// \param object_ids Object IDs of the objects to be evicted.
void EvictObjects(const std::vector<ObjectID>& object_ids);
/// Process a get request from a client. This method assumes that we will
/// eventually have these objects sealed. If one of the objects has not yet
/// been sealed, the client that requested the object will be notified when it
/// is sealed.
///
/// For each object, the client must do a call to release_object to tell the
/// store when it is done with the object.
///
/// \param client The client making this request.
/// \param object_ids Object IDs of the objects to be gotten.
/// \param timeout_ms The timeout for the get request in milliseconds.
void ProcessGetRequest(Client* client, const std::vector<ObjectID>& object_ids,
int64_t timeout_ms);
/// Seal a vector of objects. The objects are now immutable and can be accessed with
/// get.
///
/// \param object_ids The vector of Object IDs of the objects to be sealed.
/// \param digests The vector of digests of the objects. This is used to tell if two
/// objects with the same object ID are the same.
void SealObjects(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& digests);
/// Check if the plasma store contains an object:
///
/// \param object_id Object ID that will be checked.
/// \return OBJECT_FOUND if the object is in the store, OBJECT_NOT_FOUND if
/// not
ObjectStatus ContainsObject(const ObjectID& object_id);
/// Record the fact that a particular client is no longer using an object.
///
/// \param object_id The object ID of the object that is being released.
/// \param client The client making this request.
void ReleaseObject(const ObjectID& object_id, Client* client);
/// Subscribe a file descriptor to updates about new sealed objects.
///
/// \param client The client making this request.
void SubscribeToUpdates(Client* client);
/// Connect a new client to the PlasmaStore.
///
/// \param listener_sock The socket that is listening to incoming connections.
void ConnectClient(int listener_sock);
/// Disconnect a client from the PlasmaStore.
///
/// \param client_fd The client file descriptor that is disconnected.
void DisconnectClient(int client_fd);
NotificationMap::iterator SendNotifications(NotificationMap::iterator it);
arrow::Status ProcessMessage(Client* client);
private:
void PushNotification(ObjectInfoT* object_notification);
void PushNotifications(std::vector<ObjectInfoT>& object_notifications);
void PushNotification(ObjectInfoT* object_notification, int client_fd);
void AddToClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry,
Client* client);
/// Remove a GetRequest and clean up the relevant data structures.
///
/// \param get_request The GetRequest to remove.
void RemoveGetRequest(GetRequest* get_request);
/// Remove all of the GetRequests for a given client.
///
/// \param client The client whose GetRequests should be removed.
void RemoveGetRequestsForClient(Client* client);
void ReturnFromGet(GetRequest* get_req);
void UpdateObjectGetRequests(const ObjectID& object_id);
int RemoveFromClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry,
Client* client);
void EraseFromObjectTable(const ObjectID& object_id);
uint8_t* AllocateMemory(size_t size, bool evict_if_full, int* fd, int64_t* map_size,
ptrdiff_t* offset, Client* client, bool is_create);
#ifdef PLASMA_CUDA
arrow::Result<std::shared_ptr<arrow::cuda::CudaContext>> GetCudaContext(int device_num);
Status AllocateCudaMemory(int device_num, int64_t size, uint8_t** out_pointer,
std::shared_ptr<CudaIpcMemHandle>* out_ipc_handle);
Status FreeCudaMemory(int device_num, int64_t size, uint8_t* out_pointer);
#endif
/// Event loop of the plasma store.
EventLoop* loop_;
/// The plasma store information, including the object tables, that is exposed
/// to the eviction policy.
PlasmaStoreInfo store_info_;
/// The state that is managed by the eviction policy.
QuotaAwarePolicy eviction_policy_;
/// Input buffer. This is allocated only once to avoid mallocs for every
/// call to process_message.
std::vector<uint8_t> input_buffer_;
/// A hash table mapping object IDs to a vector of the get requests that are
/// waiting for the object to arrive.
std::unordered_map<ObjectID, std::vector<GetRequest*>> object_get_requests_;
/// The pending notifications that have not been sent to subscribers because
/// the socket send buffers were full. This is a hash table from client file
/// descriptor to an array of object_ids to send to that client.
/// TODO(pcm): Consider putting this into the Client data structure and
/// reorganize the code slightly.
NotificationMap pending_notifications_;
std::unordered_map<int, std::unique_ptr<Client>> connected_clients_;
std::unordered_set<ObjectID> deletion_cache_;
/// Manages worker threads for handling asynchronous/multi-threaded requests
/// for reading/writing data to/from external store.
std::shared_ptr<ExternalStore> external_store_;
};
} // namespace plasma