blob: 3039c7e6e6ae1d30d6a7e15939cabd23789e62a5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#define R_NO_REMAP
#include <R.h>
#include <Rinternals.h>
#include "nanoarrow_ipc.h"
#include "buffer.h"
#include "nanoarrow/r.h"
#include "util.h"
static void finalize_input_stream_xptr(SEXP input_stream_xptr) {
struct ArrowIpcInputStream* input_stream =
(struct ArrowIpcInputStream*)R_ExternalPtrAddr(input_stream_xptr);
if (input_stream != NULL && input_stream->release != NULL) {
input_stream->release(input_stream);
}
if (input_stream != NULL) {
ArrowFree(input_stream);
}
}
static SEXP input_stream_owning_xptr(void) {
struct ArrowIpcInputStream* input_stream =
(struct ArrowIpcInputStream*)ArrowMalloc(sizeof(struct ArrowIpcInputStream));
input_stream->release = NULL;
SEXP input_stream_xptr =
PROTECT(R_MakeExternalPtr(input_stream, R_NilValue, R_NilValue));
R_RegisterCFinalizer(input_stream_xptr, &finalize_input_stream_xptr);
UNPROTECT(1);
return input_stream_xptr;
}
SEXP nanoarrow_c_ipc_array_reader_buffer(SEXP buffer_xptr) {
struct ArrowBuffer* buffer = buffer_from_xptr(buffer_xptr);
SEXP array_stream_xptr = PROTECT(nanoarrow_array_stream_owning_xptr());
struct ArrowArrayStream* array_stream =
nanoarrow_output_array_stream_from_xptr(array_stream_xptr);
SEXP input_stream_xptr = PROTECT(input_stream_owning_xptr());
struct ArrowIpcInputStream* input_stream =
(struct ArrowIpcInputStream*)R_ExternalPtrAddr(input_stream_xptr);
int code = ArrowIpcInputStreamInitBuffer(input_stream, buffer);
if (code != NANOARROW_OK) {
Rf_error("ArrowIpcInputStreamInitBuffer() failed");
}
code = ArrowIpcArrayStreamReaderInit(array_stream, input_stream, NULL);
if (code != NANOARROW_OK) {
Rf_error("ArrowIpcArrayStreamReaderInit() failed");
}
UNPROTECT(2);
return array_stream_xptr;
}
struct ConnectionInputStreamHandler {
SEXP con;
uint8_t* buf;
int64_t buf_size_bytes;
int64_t* size_read_out;
struct ArrowError* error;
int return_code;
};
static SEXP handle_readbin_error(SEXP cond, void* hdata) {
struct ConnectionInputStreamHandler* data = (struct ConnectionInputStreamHandler*)hdata;
SEXP fun = PROTECT(Rf_install("conditionMessage"));
SEXP call = PROTECT(Rf_lang2(fun, cond));
SEXP result = PROTECT(Rf_eval(call, R_BaseEnv));
SEXP result0 = STRING_ELT(result, 0);
const char* cond_msg = Rf_translateCharUTF8(result0);
ArrowErrorSet(data->error, "R execution error: %s", cond_msg);
data->return_code = EIO;
UNPROTECT(3);
return R_NilValue;
}
static SEXP call_readbin(void* hdata) {
struct ConnectionInputStreamHandler* data = (struct ConnectionInputStreamHandler*)hdata;
SEXP n = PROTECT(Rf_ScalarReal((double)data->buf_size_bytes));
SEXP call = PROTECT(Rf_lang4(nanoarrow_sym_readbin, data->con, nanoarrow_ptype_raw, n));
SEXP result = PROTECT(Rf_eval(call, R_BaseEnv));
R_xlen_t bytes_read = Rf_xlength(result);
memcpy(data->buf, RAW(result), bytes_read);
*(data->size_read_out) = bytes_read;
UNPROTECT(3);
return R_NilValue;
}
static ArrowErrorCode read_con_input_stream(struct ArrowIpcInputStream* stream,
uint8_t* buf, int64_t buf_size_bytes,
int64_t* size_read_out,
struct ArrowError* error) {
if (!nanoarrow_is_main_thread()) {
ArrowErrorSet(error, "Can't read from R connection on a non-R thread");
return EIO;
}
struct ConnectionInputStreamHandler data;
data.con = (SEXP)stream->private_data;
data.buf = buf;
data.buf_size_bytes = buf_size_bytes;
data.size_read_out = size_read_out;
data.error = error;
data.return_code = NANOARROW_OK;
R_tryCatchError(&call_readbin, &data, &handle_readbin_error, &data);
return data.return_code;
}
static void release_con_input_stream(struct ArrowIpcInputStream* stream) {
nanoarrow_release_sexp((SEXP)stream->private_data);
}
SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con) {
SEXP array_stream_xptr = PROTECT(nanoarrow_array_stream_owning_xptr());
struct ArrowArrayStream* array_stream =
nanoarrow_output_array_stream_from_xptr(array_stream_xptr);
SEXP input_stream_xptr = PROTECT(input_stream_owning_xptr());
struct ArrowIpcInputStream* input_stream =
(struct ArrowIpcInputStream*)R_ExternalPtrAddr(input_stream_xptr);
input_stream->read = &read_con_input_stream;
input_stream->release = &release_con_input_stream;
input_stream->private_data = (SEXP)con;
nanoarrow_preserve_sexp(con);
int code = ArrowIpcArrayStreamReaderInit(array_stream, input_stream, NULL);
if (code != NANOARROW_OK) {
Rf_error("ArrowIpcArrayStreamReaderInit() failed");
}
UNPROTECT(2);
return array_stream_xptr;
}