blob: f479c8101b6303afbfaff3fd56b44d8c36f7aa92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <arrow-glib/buffer.hpp>
#include <arrow-glib/error.hpp>
#ifdef HAVE_ARROW_CUDA
# include <arrow-cuda-glib/cuda.hpp>
#endif
#include <plasma-glib/client.hpp>
#include <plasma-glib/object.hpp>
G_BEGIN_DECLS
/**
* SECTION: client
* @section_id: client-classes
* @title: Client related classes
* @include: plasma-glib/plasma-glib.h
*
* #GPlasmaClientOptions is a class for customizing plasma store
* connection.
*
* #GPlasmaClientCreateOptions is a class for customizing object creation.
*
* #GPlasmaClient is a class for an interface with a plasma store.
*
* Since: 0.12.0
*/
typedef struct GPlasmaClientCreatePrivate_ {
gint n_retries;
} GPlasmaClientOptionsPrivate;
enum {
PROP_N_RETRIES = 1
};
G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClientOptions,
gplasma_client_options,
G_TYPE_OBJECT)
#define GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(object) \
static_cast<GPlasmaClientOptionsPrivate *>( \
gplasma_client_options_get_instance_private( \
GPLASMA_CLIENT_OPTIONS(object)))
static void
gplasma_client_options_set_property(GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(object);
switch (prop_id) {
case PROP_N_RETRIES:
priv->n_retries = g_value_get_int(value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}
static void
gplasma_client_options_get_property(GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec)
{
auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(object);
switch (prop_id) {
case PROP_N_RETRIES:
g_value_set_int(value, priv->n_retries);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}
static void
gplasma_client_options_init(GPlasmaClientOptions *object)
{
}
static void
gplasma_client_options_class_init(GPlasmaClientOptionsClass *klass)
{
auto gobject_class = G_OBJECT_CLASS(klass);
gobject_class->set_property = gplasma_client_options_set_property;
gobject_class->get_property = gplasma_client_options_get_property;
GParamSpec *spec;
spec = g_param_spec_int("n-retries",
"N retries",
"The number of retries to connect plasma store. "
"-1 means that the system default value is used.",
-1,
G_MAXINT,
-1,
static_cast<GParamFlags>(G_PARAM_READWRITE |
G_PARAM_CONSTRUCT));
g_object_class_install_property(gobject_class, PROP_N_RETRIES, spec);
}
/**
* gplasma_client_options_new:
*
* Returns: A newly created #GPlasmaClientOptions.
*
* Since: 0.12.0
*/
GPlasmaClientOptions *
gplasma_client_options_new(void)
{
auto options = g_object_new(GPLASMA_TYPE_CLIENT_OPTIONS,
NULL);
return GPLASMA_CLIENT_OPTIONS(options);
}
/**
* gplasma_client_options_set_n_retries:
* @options: A #GPlasmaClientOptions.
* @n_retries: The number of retires on connect.
*
* Since: 0.12.0
*/
void
gplasma_client_options_set_n_retries(GPlasmaClientOptions *options,
gint n_retries)
{
auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(options);
priv->n_retries = n_retries;
}
/**
* gplasma_client_options_get_n_retries:
* @options: A #GPlasmaClientOptions.
*
* Returns: The number of retries on connect.
*
* Since: 0.12.0
*/
gint
gplasma_client_options_get_n_retries(GPlasmaClientOptions *options)
{
auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(options);
return priv->n_retries;
}
typedef struct GPlasmaClientCreateOptionsPrivate_ {
guint8 *metadata;
gsize metadata_size;
gint gpu_device;
} GPlasmaClientCreateOptionsPrivate;
enum {
PROP_GPU_DEVICE = 1
};
G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClientCreateOptions,
gplasma_client_create_options,
G_TYPE_OBJECT)
#define GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(object) \
static_cast<GPlasmaClientCreateOptionsPrivate *>( \
gplasma_client_create_options_get_instance_private( \
GPLASMA_CLIENT_CREATE_OPTIONS(object)))
static void
gplasma_client_create_options_set_property(GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(object);
switch (prop_id) {
case PROP_GPU_DEVICE:
priv->gpu_device = g_value_get_int(value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}
static void
gplasma_client_create_options_get_property(GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec)
{
auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(object);
switch (prop_id) {
case PROP_GPU_DEVICE:
g_value_set_int(value, priv->gpu_device);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}
static void
gplasma_client_create_options_init(GPlasmaClientCreateOptions *object)
{
}
static void
gplasma_client_create_options_class_init(GPlasmaClientCreateOptionsClass *klass)
{
auto gobject_class = G_OBJECT_CLASS(klass);
gobject_class->set_property = gplasma_client_create_options_set_property;
gobject_class->get_property = gplasma_client_create_options_get_property;
GParamSpec *spec;
spec = g_param_spec_int("gpu-device",
"GPU device",
"The GPU device number. -1 means GPU isn't used.",
-1,
G_MAXINT,
-1,
static_cast<GParamFlags>(G_PARAM_READWRITE |
G_PARAM_CONSTRUCT));
g_object_class_install_property(gobject_class, PROP_GPU_DEVICE, spec);
}
/**
* gplasma_client_create_options_new:
*
* Returns: A newly created #GPlasmaClientCreateOptions.
*
* Since: 0.12.0
*/
GPlasmaClientCreateOptions *
gplasma_client_create_options_new(void)
{
auto options = g_object_new(GPLASMA_TYPE_CLIENT_CREATE_OPTIONS,
NULL);
return GPLASMA_CLIENT_CREATE_OPTIONS(options);
}
/**
* gplasma_client_create_options_set_metadata:
* @options: A #GPlasmaClientCreateOptions.
* @metadata: (nullable) (array length=size): The metadata of a created object.
* @size: The number of bytes of the metadata.
*
* Since: 0.12.0
*/
void
gplasma_client_create_options_set_metadata(GPlasmaClientCreateOptions *options,
const guint8 *metadata,
gsize size)
{
auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(options);
if (priv->metadata) {
g_free(priv->metadata);
}
priv->metadata = static_cast<guint8 *>(g_memdup(metadata, size));
priv->metadata_size = size;
}
/**
* gplasma_client_create_options_get_metadata:
* @options: A #GPlasmaClientCreateOptions.
* @size: (nullable) (out): The number of bytes of the metadata.
*
* Returns: (nullable) (array length=size): The metadata of a created object.
*
* Since: 0.12.0
*/
const guint8 *
gplasma_client_create_options_get_metadata(GPlasmaClientCreateOptions *options,
gsize *size)
{
auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(options);
if (size) {
*size = priv->metadata_size;
}
return priv->metadata;
}
typedef struct GPlasmaClientPrivate_ {
plasma::PlasmaClient *client;
bool disconnected;
} GPlasmaClientPrivate;
enum {
PROP_CLIENT = 1
};
G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClient,
gplasma_client,
G_TYPE_OBJECT)
#define GPLASMA_CLIENT_GET_PRIVATE(object) \
static_cast<GPlasmaClientPrivate *>( \
gplasma_client_get_instance_private( \
GPLASMA_CLIENT(object)))
static void
gplasma_client_finalize(GObject *object)
{
auto priv = GPLASMA_CLIENT_GET_PRIVATE(object);
if (!priv->disconnected) {
auto status = priv->client->Disconnect();
if (!status.ok()) {
g_warning("[plasma][client][finalize] Failed to disconnect: %s",
status.ToString().c_str());
}
}
delete priv->client;
G_OBJECT_CLASS(gplasma_client_parent_class)->finalize(object);
}
static void
gplasma_client_set_property(GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
auto priv = GPLASMA_CLIENT_GET_PRIVATE(object);
switch (prop_id) {
case PROP_CLIENT:
priv->client =
static_cast<plasma::PlasmaClient *>(g_value_get_pointer(value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}
static void
gplasma_client_init(GPlasmaClient *object)
{
}
static void
gplasma_client_class_init(GPlasmaClientClass *klass)
{
GParamSpec *spec;
auto gobject_class = G_OBJECT_CLASS(klass);
gobject_class->finalize = gplasma_client_finalize;
gobject_class->set_property = gplasma_client_set_property;
spec = g_param_spec_pointer("client",
"Client",
"The raw plasma::PlasmaClient *",
static_cast<GParamFlags>(G_PARAM_WRITABLE |
G_PARAM_CONSTRUCT_ONLY));
g_object_class_install_property(gobject_class, PROP_CLIENT, spec);
}
/**
* gplasma_client_new:
* @store_socket_name: The name of the UNIX domain socket.
* @options: (nullable): The options to custom how to connect to plasma store.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: (nullable): A newly created #GPlasmaClient on success,
* %NULL on error.
*
* Since: 0.12.0
*/
GPlasmaClient *
gplasma_client_new(const gchar *store_socket_name,
GPlasmaClientOptions *options,
GError **error)
{
auto plasma_client = new plasma::PlasmaClient();
int n_retries = -1;
if (options) {
n_retries = gplasma_client_options_get_n_retries(options);
}
auto status = plasma_client->Connect(store_socket_name, "", 0, n_retries);
if (garrow_error_check(error, status, "[plasma][client][new]")) {
return gplasma_client_new_raw(plasma_client);
} else {
return NULL;
}
}
/**
* gplasma_client_create:
* @client: A #GPlasmaClient.
* @id: The ID for a newly created object.
* @data_size: The number of bytes of data for a newly created object.
* @options: (nullable): The option for creating an object.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: (nullable) (transfer full): A newly created #GPlasmaCreatedObject
* on success, %NULL on error.
*
* Since: 0.12.0
*/
GPlasmaCreatedObject *
gplasma_client_create(GPlasmaClient *client,
GPlasmaObjectID *id,
gsize data_size,
GPlasmaClientCreateOptions *options,
GError **error)
{
const auto context = "[plasma][client][create]";
auto plasma_client = gplasma_client_get_raw(client);
auto plasma_id = gplasma_object_id_get_raw(id);
const uint8_t *raw_metadata = nullptr;
int64_t raw_metadata_size = 0;
int device_number = 0;
if (options) {
auto options_priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(options);
raw_metadata = options_priv->metadata;
raw_metadata_size = options_priv->metadata_size;
if (options_priv->gpu_device >= 0) {
#ifndef HAVE_ARROW_CUDA
g_set_error(error,
GARROW_ERROR,
GARROW_ERROR_INVALID,
"%s Arrow CUDA GLib is needed to use GPU",
context);
return NULL;
#endif
device_number = options_priv->gpu_device + 1;
}
}
std::shared_ptr<arrow::Buffer> plasma_data;
auto status = plasma_client->Create(plasma_id,
data_size,
raw_metadata,
raw_metadata_size,
&plasma_data,
device_number);
if (!garrow_error_check(error, status, context)) {
return NULL;
}
GArrowBuffer *data = nullptr;
if (device_number == 0) {
auto plasma_mutable_data =
std::static_pointer_cast<arrow::MutableBuffer>(plasma_data);
data = GARROW_BUFFER(garrow_mutable_buffer_new_raw(&plasma_mutable_data));
#ifdef HAVE_ARROW_CUDA
} else {
auto plasma_cuda_data =
std::static_pointer_cast<arrow::cuda::CudaBuffer>(plasma_data);
data = GARROW_BUFFER(garrow_cuda_buffer_new_raw(&plasma_cuda_data));
#endif
}
std::shared_ptr<arrow::Buffer> plasma_metadata;
GArrowBuffer *metadata = nullptr;
if (raw_metadata_size > 0) {
plasma_metadata =
std::make_shared<arrow::Buffer>(raw_metadata, raw_metadata_size);
metadata = garrow_buffer_new_raw(&plasma_metadata);
}
return gplasma_created_object_new_raw(client,
id,
&plasma_data,
data,
metadata ? &plasma_metadata : nullptr,
metadata,
device_number - 1);
}
/**
* gplasma_client_refer_object:
* @client: A #GPlasmaClient.
* @id: The ID of the target object.
* @timeout_ms: The timeout in milliseconds. -1 means no timeout.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: (nullable) (transfer full): A found #GPlasmaReferredObject
* on success, %NULL on error.
*
* Since: 0.12.0
*/
GPlasmaReferredObject *
gplasma_client_refer_object(GPlasmaClient *client,
GPlasmaObjectID *id,
gint64 timeout_ms,
GError **error)
{
const auto context = "[plasma][client][refer-object]";
auto plasma_client = gplasma_client_get_raw(client);
auto plasma_id = gplasma_object_id_get_raw(id);
std::vector<plasma::ObjectID> plasma_ids;
plasma_ids.push_back(plasma_id);
std::vector<plasma::ObjectBuffer> plasma_object_buffers;
auto status = plasma_client->Get(plasma_ids,
timeout_ms,
&plasma_object_buffers);
if (!garrow_error_check(error, status, context)) {
return NULL;
}
auto plasma_object_buffer = plasma_object_buffers[0];
auto plasma_data = plasma_object_buffer.data;
auto plasma_metadata = plasma_object_buffer.metadata;
GArrowBuffer *data = nullptr;
GArrowBuffer *metadata = nullptr;
if (plasma_object_buffer.device_num == 0) {
data = garrow_buffer_new_raw(&plasma_data);
metadata = garrow_buffer_new_raw(&plasma_metadata);
} else {
#ifdef HAVE_ARROW_CUDA
std::shared_ptr<arrow::cuda::CudaBuffer> plasma_cuda_data;
status = arrow::cuda::CudaBuffer::FromBuffer(plasma_data,
&plasma_cuda_data);
if (!garrow_error_check(error, status, context)) {
return NULL;
}
std::shared_ptr<arrow::cuda::CudaBuffer> plasma_cuda_metadata;
status = arrow::cuda::CudaBuffer::FromBuffer(plasma_metadata,
&plasma_cuda_metadata);
if (!garrow_error_check(error, status, context)) {
return NULL;
}
data = GARROW_BUFFER(garrow_cuda_buffer_new_raw(&plasma_cuda_data));
metadata =
GARROW_BUFFER(garrow_cuda_buffer_new_raw(&plasma_cuda_metadata));
#else
g_set_error(error,
GARROW_ERROR,
GARROW_ERROR_INVALID,
"%s Arrow CUDA GLib is needed to use GPU",
context);
return NULL;
#endif
}
return gplasma_referred_object_new_raw(client,
id,
&plasma_data,
data,
&plasma_metadata,
metadata,
plasma_object_buffer.device_num - 1);
}
/**
* gplasma_client_disconnect:
* @client: A #GPlasmaClient.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: %TRUE on success, %FALSE if there was an error.
*
* Since: 0.12.0
*/
gboolean
gplasma_client_disconnect(GPlasmaClient *client,
GError **error)
{
auto priv = GPLASMA_CLIENT_GET_PRIVATE(client);
auto status = priv->client->Disconnect();
if (garrow_error_check(error, status, "[plasma][client][disconnect]")) {
priv->disconnected = true;
return TRUE;
} else {
return FALSE;
}
}
G_END_DECLS
GPlasmaClient *
gplasma_client_new_raw(plasma::PlasmaClient *plasma_client)
{
auto client = g_object_new(GPLASMA_TYPE_CLIENT,
"client", plasma_client,
NULL);
return GPLASMA_CLIENT(client);
}
plasma::PlasmaClient *
gplasma_client_get_raw(GPlasmaClient *client)
{
auto priv = GPLASMA_CLIENT_GET_PRIVATE(client);
return priv->client;
}