blob: b1f3e85e56930f0fbcf9d58eacacc481da7456ef [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#' Convert an Array Stream into an R vector
#'
#' Converts `array_stream` to the type specified by `to`. This is a low-level
#' interface; most users should use `as.data.frame()` or `as.vector()` unless
#' finer-grained control is needed over the conversion. See [convert_array()]
#' for details of the conversion process; see [infer_nanoarrow_ptype()] for
#' default inferences of `to`.
#'
#' @param array_stream A [nanoarrow_array_stream][as_nanoarrow_array_stream].
#' @param size The exact size of the output, if known. If specified,
#' slightly more efficient implementation may be used to collect the output.
#' @param n The maximum number of batches to pull from the array stream.
#' @inheritParams convert_array
#' @inheritParams basic_array_stream
#'
#' @return
#' - `convert_array_stream()`: An R vector of type `to`.
#' - `collect_array_stream()`: A `list()` of [nanoarrow_array][as_nanoarrow_array]
#' @export
#'
#' @examples
#' stream <- as_nanoarrow_array_stream(data.frame(x = 1:5))
#' str(convert_array_stream(stream))
#' str(convert_array_stream(stream, to = data.frame(x = double())))
#'
#' stream <- as_nanoarrow_array_stream(data.frame(x = 1:5))
#' collect_array_stream(stream)
#'
convert_array_stream <- function(array_stream, to = NULL, size = NULL, n = Inf) {
stopifnot(
inherits(array_stream, "nanoarrow_array_stream")
)
schema <- .Call(nanoarrow_c_array_stream_get_schema, array_stream)
if (is.null(to)) {
to <- infer_nanoarrow_ptype(schema)
} else if (is.function(to)) {
to <- to(schema, infer_nanoarrow_ptype(schema))
}
n <- as.double(n)[1]
if (!is.null(size)) {
# The underlying nanoarrow_c_convert_array_stream() currently requires that
# the total length of all batches is known in advance. If the caller
# provided this we can save a bit of work.
.Call(
nanoarrow_c_convert_array_stream,
array_stream,
to,
as.double(size)[1],
n
)
} else {
# Otherwise, we need to collect all batches and calculate the total length
# before calling nanoarrow_c_convert_array_stream().
batches <- collect_array_stream(
array_stream,
n,
schema = schema,
validate = FALSE
)
# If there is exactly one batch, use convert_array(). Converting a single
# array currently takes a more efficient code path for types that can be
# converted as ALTREP (e.g., strings).
if (length(batches) == 1L) {
return(.Call(nanoarrow_c_convert_array, batches[[1]], to))
}
# Otherwise, compute the final size, create another array stream,
# and call convert_array_stream() with a known size. Using .Call()
# directly because we have already type checked the inputs.
size <- .Call(nanoarrow_c_array_list_total_length, batches)
basic_stream <- .Call(nanoarrow_c_basic_array_stream, batches, schema, FALSE)
.Call(
nanoarrow_c_convert_array_stream,
basic_stream,
to,
as.double(size),
Inf
)
}
}
#' @rdname convert_array_stream
#' @export
collect_array_stream <- function(array_stream, n = Inf, schema = NULL,
validate = TRUE) {
stopifnot(
inherits(array_stream, "nanoarrow_array_stream")
)
if (is.null(schema)) {
schema <- .Call(nanoarrow_c_array_stream_get_schema, array_stream)
}
batches <- vector("list", 1024L)
n_batches <- 0
get_next <- array_stream$get_next
while (n_batches < n) {
array <- get_next(schema, validate = validate)
if (is.null(array)) {
break
}
n_batches <- n_batches + 1
# This assignment has reasonable (but not great) performance when
# n_batches > 1024 in recent versions of R because R overallocates vectors
# slightly to support this pattern. It may be worth moving this
# implementation to C or C++ in the future if the collect step becomes a
# bottleneck.
batches[[n_batches]] <- array
}
batches[seq_len(n_batches)]
}