| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| #' Read serialized streams of Arrow data |
| #' |
| #' Reads connections, file paths, URLs, or raw vectors of serialized Arrow |
| #' data. Arrow documentation typically refers to this format as "Arrow IPC", |
| #' since its origin was as a means to transmit tables between processes |
| #' (e.g., multiple R sessions). This format can also be written to and read |
| #' from files or URLs and is essentially a high performance equivalent of |
| #' a CSV file that does a better job maintaining types. |
| #' |
| #' The nanoarrow package does not currently have the ability to write serialized |
| #' IPC data: use [arrow::write_ipc_stream()] to write data from R, or use |
| #' the equivalent writer from another Arrow implementation in Python, C++, |
| #' Rust, JavaScript, Julia, C#, and beyond. |
| #' |
| #' The media type of an Arrow stream is `application/vnd.apache.arrow.stream` |
| #' and the recommended file extension is `.arrows`. |
| #' |
| #' @param x A `raw()` vector, connection, or file path from which to read |
| #' binary data. Common extensions indicating compression (.gz, .bz2, .zip) |
| #' are automatically uncompressed. |
| #' @param lazy By default, `read_nanoarrow()` will read and discard a copy of |
| #' the reader's schema to ensure that invalid streams are discovered as |
| #' soon as possible. Use `lazy = TRUE` to defer this check until the reader |
| #' is actually consumed. |
| #' @param ... Currently unused. |
| #' |
| #' @return A [nanoarrow_array_stream][as_nanoarrow_array_stream] |
| #' @export |
| #' |
| #' @examples |
| #' as.data.frame(read_nanoarrow(example_ipc_stream())) |
| #' |
| read_nanoarrow <- function(x, ..., lazy = FALSE) { |
| UseMethod("read_nanoarrow") |
| } |
| |
| #' @export |
| read_nanoarrow.raw <- function(x, ..., lazy = FALSE) { |
| buffer <- as_nanoarrow_buffer(x) |
| reader <- .Call(nanoarrow_c_ipc_array_reader_buffer, buffer) |
| check_stream_if_requested(reader, lazy) |
| } |
| |
| #' @export |
| read_nanoarrow.character <- function(x, ..., lazy = FALSE) { |
| if (length(x) != 1) { |
| stop(sprintf("Can't interpret character(%d) as file path", length(x))) |
| } |
| |
| con_type <- guess_connection_type(x) |
| if (con_type == "unz") { |
| con <- do.call(con_type, list(x, filename = guess_zip_filename(x))) |
| } else { |
| con <- do.call(con_type, list(x)) |
| } |
| |
| # Helps with error reporting when reading invalid files |
| reader <- read_nanoarrow(con, lazy = TRUE) |
| check_stream_if_requested(reader, lazy) |
| } |
| |
| #' @export |
| read_nanoarrow.connection <- function(x, ..., lazy = FALSE) { |
| if (!isOpen(x)) { |
| # Unopened connections should be opened in binary mode |
| open(x, "rb") |
| |
| stream <- tryCatch( |
| .Call(nanoarrow_c_ipc_array_reader_connection, x), |
| error = function(e) { |
| close(x) |
| stop(e) |
| } |
| ) |
| |
| # Close the connection when the array stream is released |
| stream_finalizer <- function() { |
| close(x) |
| } |
| |
| finalizer_env <- new.env(parent = baseenv()) |
| finalizer_env$x <- x |
| environment(stream_finalizer) <- finalizer_env |
| |
| reader <- array_stream_set_finalizer(stream, stream_finalizer) |
| } else { |
| reader <- .Call(nanoarrow_c_ipc_array_reader_connection, x) |
| } |
| |
| check_stream_if_requested(reader, lazy) |
| } |
| |
| #' @rdname read_nanoarrow |
| #' @export |
| example_ipc_stream <- function() { |
| # data.frame(some_col = c(1L, 2L, 3L)) as a serialized schema/batch |
| schema <- as.raw(c( |
| 0xff, 0xff, 0xff, 0xff, 0x10, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, |
| 0x0a, 0x00, 0x0e, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, |
| 0x00, 0x01, 0x04, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x0c, 0x00, |
| 0x00, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00, |
| 0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x84, 0xff, |
| 0xff, 0xff, 0x18, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, |
| 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x00, 0x00, 0x08, 0x00, |
| 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x00, 0x00, 0x00, 0x00, |
| 0x01, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00, 0x12, 0x00, 0x18, 0x00, |
| 0x08, 0x00, 0x06, 0x00, 0x07, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x10, 0x00, 0x14, 0x00, |
| 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x14, 0x00, 0x00, 0x00, 0x70, 0x00, |
| 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, |
| 0x08, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 0x00, 0x00, |
| 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, |
| 0x04, 0x00, 0x08, 0x00, 0x08, 0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, 0x04, 0x00, |
| 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x76, 0x61, 0x6c, |
| 0x75, 0x65, 0x5f, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x00, |
| 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x66, 0x69, 0x65, |
| 0x6c, 0x64, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x08, 0x00, 0x07, 0x00, 0x08, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 |
| )) |
| |
| batch <- as.raw(c( |
| 0xff, 0xff, 0xff, 0xff, 0x88, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x0c, 0x00, 0x16, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0c, 0x00, |
| 0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x18, 0x00, 0x00, 0x00, 0x10, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00, 0x0c, 0x00, |
| 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00, 0x10, 0x00, |
| 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, |
| 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, |
| 0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, |
| 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 |
| )) |
| |
| c(schema, batch) |
| } |
| |
| check_stream_if_requested <- function(reader, lazy) { |
| if (!lazy) { |
| # Report error as coming from read_nanoarrow() always |
| cnd_call <- sys.call(-1) |
| tryCatch( |
| reader$get_schema(), |
| error = function(e) { |
| reader$release() |
| e$call <- cnd_call |
| stop(e) |
| } |
| ) |
| } |
| |
| reader |
| } |
| |
| guess_connection_type <- function(x) { |
| is_url <- grepl("://", x) |
| |
| compressed_con <- switch( |
| tools::file_ext(x), |
| "gz" = "gzfile", |
| "bz2" = "bzfile", |
| "zip" = "unz" |
| ) |
| |
| if (is_url && !is.null(compressed_con)) { |
| stop("Reading compressed streams from URLs is not supported") |
| } |
| |
| if (is_url) { |
| "url" |
| } else if (is.null(compressed_con)) { |
| "file" |
| } else { |
| compressed_con |
| } |
| } |
| |
| guess_zip_filename <- function(x) { |
| files <- utils::unzip(x, list = TRUE)[[1]] |
| if (length(files) != 1) { |
| stop( |
| sprintf( |
| "Unzip only supported of archives with exactly one file (found %d)", |
| length(files) |
| ) |
| ) |
| } |
| |
| files |
| } |