blob: eefa6ae92eac72c1610c6bbdd524ff9acb4ab29e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#' Create ArrayStreams from batches
#'
#' @param batches A [list()] of [nanoarrow_array][as_nanoarrow_array] objects
#' or objects that can be coerced via [as_nanoarrow_array()].
#' @param schema A [nanoarrow_schema][as_nanoarrow_schema] or `NULL` to guess
#' based on the first schema.
#' @param validate Use `FALSE` to skip the validation step (i.e., if you
#' know that the arrays are valid).
#'
#' @return An [nanoarrow_array_stream][as_nanoarrow_array_stream]
#' @export
#'
#' @examples
#' (stream <- basic_array_stream(list(data.frame(a = 1, b = 2))))
#' as.data.frame(stream$get_next())
#' stream$get_next()
#'
basic_array_stream <- function(batches, schema = NULL, validate = TRUE) {
# Error for everything except a bare list (e.g., so that calling with
# a data.frame() does not unintentionally loop over columns)
if (!identical(class(batches), "list")) {
stop("`batches` must be an unclassed `list()`")
}
batches <- lapply(batches, as_nanoarrow_array, schema = schema)
if (is.null(schema) && length(batches) > 0) {
schema <- infer_nanoarrow_schema(batches[[1]])
} else if (is.null(schema)) {
stop("Can't infer schema from first batch if there are zero batches")
}
.Call(nanoarrow_c_basic_array_stream, batches, schema, validate)
}
#' Register an array stream finalizer
#'
#' In some cases, R functions that return a [nanoarrow_array_stream][as_nanoarrow_array_stream]
#' may require that the scope of some other object outlive that of the array
#' stream. If there is a need for that object to be released deterministically
#' (e.g., to close open files), you can register a function to run after the
#' stream's release callback is invoked from the R thread. Note that this
#' finalizer will **not** be run if the stream's release callback is invoked
#' from a **non**-R thread. In this case, the finalizer and its chain of
#' environments will be garbage-collected when `nanoarrow:::preserved_empty()`
#' is run.
#'
#' @param array_stream A [nanoarrow_array_stream][as_nanoarrow_array_stream]
#' @param finalizer A function that will be called with zero arguments.
#'
#' @return A newly allocated `array_stream` whose release callback will call
#' the supplied finalizer.
#' @export
#'
#' @examples
#' stream <- array_stream_set_finalizer(
#' basic_array_stream(list(1:5)),
#' function() message("All done!")
#' )
#' stream$release()
#'
array_stream_set_finalizer <- function(array_stream, finalizer) {
stopifnot(is.function(finalizer))
prot <- new.env(parent = emptyenv())
prot$array_stream_finalizer <- finalizer
class(prot) <- "nanoarrow_array_stream_finalizer"
nanoarrow_pointer_set_protected(array_stream, prot)
out <- nanoarrow_allocate_array_stream()
nanoarrow_pointer_export(array_stream, out)
out
}
#' Convert an object to a nanoarrow array_stream
#'
#' In nanoarrow, an 'array stream' corresponds to the `struct ArrowArrayStream`
#' as defined in the Arrow C Stream interface. This object is used to represent
#' a stream of [arrays][as_nanoarrow_array] with a common
#' [schema][as_nanoarrow_schema]. This is similar to an
#' [arrow::RecordBatchReader] except it can be used to represent a stream of
#' any type (not just record batches). Note that a stream of record batches
#' and a stream of non-nullable struct arrays are represented identically.
#' Also note that array streams are mutable objects and are passed by
#' reference and not by value.
#'
#' @param x An object to convert to a array_stream
#' @param ... Passed to S3 methods
#' @inheritParams as_nanoarrow_array
#'
#' @return An object of class 'nanoarrow_array_stream'
#' @export
#'
#' @examples
#' (stream <- as_nanoarrow_array_stream(data.frame(x = 1:5)))
#' stream$get_schema()
#' stream$get_next()
#'
#' # The last batch is returned as NULL
#' stream$get_next()
#'
#' # Release the stream
#' stream$release()
#'
as_nanoarrow_array_stream <- function(x, ..., schema = NULL) {
UseMethod("as_nanoarrow_array_stream")
}
#' @export
as_nanoarrow_array_stream.nanoarrow_array_stream <- function(x, ..., schema = NULL) {
if (is.null(schema)) {
return(x)
}
inferred_schema <- infer_nanoarrow_schema(x)
if (nanoarrow_schema_identical(schema, inferred_schema)) {
return(x)
}
NextMethod()
}
#' @export
as_nanoarrow_array_stream.nanoarrow_array <- function(x, ..., schema = NULL) {
if (is.null(schema)) {
return(basic_array_stream(list(x), validate = FALSE))
}
inferred_schema <- infer_nanoarrow_schema(x)
if (nanoarrow_schema_identical(schema, inferred_schema)) {
return(basic_array_stream(list(x), validate = FALSE))
}
as_nanoarrow_array_stream(
as_nanoarrow_array_stream(x),
schema = schema
)
}
#' @export
as_nanoarrow_array_stream.default <- function(x, ..., schema = NULL) {
assert_arrow_installed("default coerce to nanoarrow_array_stream")
as_nanoarrow_array_stream(
arrow::as_record_batch_reader(x, ..., schema = arrow::as_schema(schema)),
schema = schema
)
}
#' @export
as_nanoarrow_array_stream.data.frame <- function(x, ..., schema = NULL) {
if (is.null(schema)) {
schema <- infer_nanoarrow_schema(x)
} else {
schema <- as_nanoarrow_schema(schema)
}
x <- as_nanoarrow_array(x, schema = schema)
basic_array_stream(list(x), schema = schema)
}
#' @export
infer_nanoarrow_schema.nanoarrow_array_stream <- function(x, ...) {
x$get_schema()
}
#' @export
as.data.frame.nanoarrow_array_stream <- function(x, ...) {
# Always release the input: we are always consuming the entire stream.
# For more fine-grained behaviour on error, one can use
# convert_array_stream()
on.exit(x$release())
to <- infer_nanoarrow_ptype(x$get_schema())
if (!inherits(to, "data.frame")) {
stop("Can't convert non-struct array stream to data.frame")
}
convert_array_stream(x, to)
}
#' @export
as.vector.nanoarrow_array_stream <- function(x, mode) {
on.exit(x$release())
convert_array_stream(x)
}
#' @importFrom utils str
#' @export
str.nanoarrow_array_stream <- function(object, ...) {
cat(sprintf("%s\n", format(object)))
if (nanoarrow_pointer_is_valid(object)) {
# Use the str() of the list version but remove the first
# line of the output ("List of 2")
info <- list(
get_schema = object$get_schema,
get_next = object$get_next,
release = object$release
)
raw_str_output <- utils::capture.output(str(info, ..., give.attr = FALSE))
cat(paste0(raw_str_output[-1], collapse = "\n"))
cat("\n")
}
invisible(object)
}
#' @export
print.nanoarrow_array_stream <- function(x, ...) {
str(x, ...)
invisible(x)
}
#' @export
format.nanoarrow_array_stream <- function(x, ...) {
if (nanoarrow_pointer_is_valid(x)) {
tryCatch(
sprintf("<nanoarrow_array_stream %s>", nanoarrow_schema_formatted(x$get_schema())),
error = function(...) "<nanoarrow_array_stream[<error calling get_schema()]>"
)
} else {
"<nanoarrow_array_stream[invalid pointer]>"
}
}
# This is the list()-like interface to nanoarrow_array_stream that allows $ and [[
# to make nice auto-complete when interacting in an IDE
#' @export
length.nanoarrow_array_stream <- function(x, ...) {
3L
}
#' @export
names.nanoarrow_array_stream <- function(x, ...) {
c("get_schema", "get_next", "release")
}
#' @export
`[[.nanoarrow_array_stream` <- function(x, i, ...) {
force(x)
if (identical(i, "get_schema") || isTRUE(i == 1L)) {
function() .Call(nanoarrow_c_array_stream_get_schema, x)
} else if (identical(i, "get_next") || isTRUE(i == 2L)) {
function(schema = x$get_schema(), validate = TRUE) {
array <- .Call(nanoarrow_c_array_stream_get_next, x)
if (!nanoarrow_pointer_is_valid(array)) {
return(NULL)
}
nanoarrow_array_set_schema(array, schema, validate = validate)
array
}
} else if (identical(i, "release") || isTRUE(i == 3L)) {
function() nanoarrow_pointer_release(x)
} else {
NULL
}
}
#' @export
`$.nanoarrow_array_stream` <- function(x, i, ...) {
x[[i]]
}