blob: 29471b0f5da3833a0014c7ddc678488a05cee640 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#' Read serialized streams of Arrow data
#'
#' Reads connections, file paths, URLs, or raw vectors of serialized Arrow
#' data. Arrow documentation typically refers to this format as "Arrow IPC",
#' since its origin was as a means to transmit tables between processes
#' (e.g., multiple R sessions). This format can also be written to and read
#' from files or URLs and is essentially a high performance equivalent of
#' a CSV file that does a better job maintaining types.
#'
#' The nanoarrow package does not currently have the ability to write serialized
#' IPC data: use [arrow::write_ipc_stream()] to write data from R, or use
#' the equivalent writer from another Arrow implementation in Python, C++,
#' Rust, JavaScript, Julia, C#, and beyond.
#'
#' The media type of an Arrow stream is `application/vnd.apache.arrow.stream`
#' and the recommended file extension is `.arrows`.
#'
#' @param x A `raw()` vector, connection, or file path from which to read
#' binary data. Common extensions indicating compression (.gz, .bz2, .zip)
#' are automatically uncompressed.
#' @param lazy By default, `read_nanoarrow()` will read and discard a copy of
#' the reader's schema to ensure that invalid streams are discovered as
#' soon as possible. Use `lazy = TRUE` to defer this check until the reader
#' is actually consumed.
#' @param ... Currently unused.
#'
#' @return A [nanoarrow_array_stream][as_nanoarrow_array_stream]
#' @export
#'
#' @examples
#' as.data.frame(read_nanoarrow(example_ipc_stream()))
#'
read_nanoarrow <- function(x, ..., lazy = FALSE) {
UseMethod("read_nanoarrow")
}
#' @export
read_nanoarrow.raw <- function(x, ..., lazy = FALSE) {
buffer <- as_nanoarrow_buffer(x)
reader <- .Call(nanoarrow_c_ipc_array_reader_buffer, buffer)
check_stream_if_requested(reader, lazy)
}
#' @export
read_nanoarrow.character <- function(x, ..., lazy = FALSE) {
if (length(x) != 1) {
stop(sprintf("Can't interpret character(%d) as file path", length(x)))
}
con_type <- guess_connection_type(x)
if (con_type == "unz") {
con <- do.call(con_type, list(x, filename = guess_zip_filename(x)))
} else {
con <- do.call(con_type, list(x))
}
# Helps with error reporting when reading invalid files
reader <- read_nanoarrow(con, lazy = TRUE)
check_stream_if_requested(reader, lazy)
}
#' @export
read_nanoarrow.connection <- function(x, ..., lazy = FALSE) {
if (!isOpen(x)) {
# Unopened connections should be opened in binary mode
open(x, "rb")
stream <- tryCatch(
.Call(nanoarrow_c_ipc_array_reader_connection, x),
error = function(e) {
close(x)
stop(e)
}
)
# Close the connection when the array stream is released
stream_finalizer <- function() {
close(x)
}
finalizer_env <- new.env(parent = baseenv())
finalizer_env$x <- x
environment(stream_finalizer) <- finalizer_env
reader <- array_stream_set_finalizer(stream, stream_finalizer)
} else {
reader <- .Call(nanoarrow_c_ipc_array_reader_connection, x)
}
check_stream_if_requested(reader, lazy)
}
#' @rdname read_nanoarrow
#' @export
example_ipc_stream <- function() {
# data.frame(some_col = c(1L, 2L, 3L)) as a serialized schema/batch
schema <- as.raw(c(
0xff, 0xff, 0xff, 0xff, 0x10, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00,
0x0a, 0x00, 0x0e, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00,
0x00, 0x01, 0x04, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x0c, 0x00,
0x00, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00,
0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x84, 0xff,
0xff, 0xff, 0x18, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00,
0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x00, 0x00, 0x08, 0x00,
0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x00, 0x00, 0x00, 0x00,
0x01, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00, 0x12, 0x00, 0x18, 0x00,
0x08, 0x00, 0x06, 0x00, 0x07, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x10, 0x00, 0x14, 0x00,
0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x14, 0x00, 0x00, 0x00, 0x70, 0x00,
0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x08, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 0x00, 0x00,
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00,
0x04, 0x00, 0x08, 0x00, 0x08, 0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, 0x04, 0x00,
0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x76, 0x61, 0x6c,
0x75, 0x65, 0x5f, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x00,
0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x66, 0x69, 0x65,
0x6c, 0x64, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x08, 0x00, 0x07, 0x00, 0x08, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
))
batch <- as.raw(c(
0xff, 0xff, 0xff, 0xff, 0x88, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x0c, 0x00, 0x16, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0c, 0x00,
0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x18, 0x00, 0x00, 0x00, 0x10, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00, 0x0c, 0x00,
0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00, 0x10, 0x00,
0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00
))
c(schema, batch)
}
check_stream_if_requested <- function(reader, lazy) {
if (!lazy) {
# Report error as coming from read_nanoarrow() always
cnd_call <- sys.call(-1)
tryCatch(
reader$get_schema(),
error = function(e) {
reader$release()
e$call <- cnd_call
stop(e)
}
)
}
reader
}
guess_connection_type <- function(x) {
is_url <- grepl("://", x)
compressed_con <- switch(
tools::file_ext(x),
"gz" = "gzfile",
"bz2" = "bzfile",
"zip" = "unz"
)
if (is_url && !is.null(compressed_con)) {
stop("Reading compressed streams from URLs is not supported")
}
if (is_url) {
"url"
} else if (is.null(compressed_con)) {
"file"
} else {
compressed_con
}
}
guess_zip_filename <- function(x) {
files <- utils::unzip(x, list = TRUE)[[1]]
if (length(files) != 1) {
stop(
sprintf(
"Unzip only supported of archives with exactly one file (found %d)",
length(files)
)
)
}
files
}