blob: 85ce839d0ce5750833a2aa0bca7d1ec4f7dd64af [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#' @title RecordBatchReader classes
#' @description Apache Arrow defines two formats for [serializing data for interprocess
#' communication (IPC)](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc):
#' a "stream" format and a "file" format, known as Feather.
#' `RecordBatchStreamReader` and `RecordBatchFileReader` are
#' interfaces for accessing record batches from input sources those formats,
#' respectively.
#'
#' For guidance on how to use these classes, see the examples section.
#'
#' @seealso [read_ipc_stream()] and [read_feather()] provide a much simpler interface
#' for reading data from these formats and are sufficient for many use cases.
#' @usage NULL
#' @format NULL
#' @docType class
#' @section Factory:
#'
#' The `RecordBatchFileReader$create()` and `RecordBatchStreamReader$create()`
#' factory methods instantiate the object and
#' take a single argument, named according to the class:
#'
#' - `file` A character file name, raw vector, or Arrow file connection object
#' (e.g. [RandomAccessFile]).
#' - `stream` A raw vector, [Buffer], or [InputStream].
#'
#' @section Methods:
#'
#' - `$read_next_batch()`: Returns a `RecordBatch`, iterating through the
#' Reader. If there are no further batches in the Reader, it returns `NULL`.
#' - `$schema`: Returns a [Schema] (active binding)
#' - `$batches()`: Returns a list of `RecordBatch`es
#' - `$read_table()`: Collects the reader's `RecordBatch`es into a [Table]
#' - `$get_batch(i)`: For `RecordBatchFileReader`, return a particular batch
#' by an integer index.
#' - `$num_record_batches()`: For `RecordBatchFileReader`, see how many batches
#' are in the file.
#'
#' @rdname RecordBatchReader
#' @name RecordBatchReader
#' @include arrow-package.R
#' @examples
#' \donttest{
#' tf <- tempfile()
#' on.exit(unlink(tf))
#'
#' batch <- record_batch(chickwts)
#'
#' # This opens a connection to the file in Arrow
#' file_obj <- FileOutputStream$create(tf)
#' # Pass that to a RecordBatchWriter to write data conforming to a schema
#' writer <- RecordBatchFileWriter$create(file_obj, batch$schema)
#' writer$write(batch)
#' # You may write additional batches to the stream, provided that they have
#' # the same schema.
#' # Call "close" on the writer to indicate end-of-file/stream
#' writer$close()
#' # Then, close the connection--closing the IPC message does not close the file
#' file_obj$close()
#'
#' # Now, we have a file we can read from. Same pattern: open file connection,
#' # then pass it to a RecordBatchReader
#' read_file_obj <- ReadableFile$create(tf)
#' reader <- RecordBatchFileReader$create(read_file_obj)
#' # RecordBatchFileReader knows how many batches it has (StreamReader does not)
#' reader$num_record_batches
#' # We could consume the Reader by calling $read_next_batch() until all are,
#' # consumed, or we can call $read_table() to pull them all into a Table
#' tab <- reader$read_table()
#' # Call as.data.frame to turn that Table into an R data.frame
#' df <- as.data.frame(tab)
#' # This should be the same data we sent
#' all.equal(df, chickwts, check.attributes = FALSE)
#' # Unlike the Writers, we don't have to close RecordBatchReaders,
#' # but we do still need to close the file connection
#' read_file_obj$close()
#' }
RecordBatchReader <- R6Class("RecordBatchReader", inherit = ArrowObject,
public = list(
read_next_batch = function() {
shared_ptr(RecordBatch, RecordBatchReader__ReadNext(self))
}
),
active = list(
schema = function() shared_ptr(Schema, RecordBatchReader__schema(self))
)
)
#' @rdname RecordBatchReader
#' @usage NULL
#' @format NULL
#' @export
RecordBatchStreamReader <- R6Class("RecordBatchStreamReader", inherit = RecordBatchReader,
public = list(
batches = function() map(ipc___RecordBatchStreamReader__batches(self), shared_ptr, class = RecordBatch),
read_table = function() shared_ptr(Table, Table__from_RecordBatchStreamReader(self))
)
)
RecordBatchStreamReader$create <- function(stream) {
if (inherits(stream, c("raw", "Buffer"))) {
# TODO: deprecate this because it doesn't close the connection to the Buffer
# (that's a problem, right?)
stream <- BufferReader$create(stream)
}
assert_is(stream, "InputStream")
shared_ptr(RecordBatchStreamReader, ipc___RecordBatchStreamReader__Open(stream))
}
#' @rdname RecordBatchReader
#' @usage NULL
#' @format NULL
#' @export
RecordBatchFileReader <- R6Class("RecordBatchFileReader", inherit = ArrowObject,
# Why doesn't this inherit from RecordBatchReader?
public = list(
get_batch = function(i) {
shared_ptr(RecordBatch, ipc___RecordBatchFileReader__ReadRecordBatch(self, i))
},
batches = function() {
map(ipc___RecordBatchFileReader__batches(self), shared_ptr, class = RecordBatch)
},
read_table = function() shared_ptr(Table, Table__from_RecordBatchFileReader(self))
),
active = list(
num_record_batches = function() ipc___RecordBatchFileReader__num_record_batches(self),
schema = function() shared_ptr(Schema, ipc___RecordBatchFileReader__schema(self))
)
)
RecordBatchFileReader$create <- function(file) {
if (inherits(file, c("raw", "Buffer"))) {
# TODO: deprecate this because it doesn't close the connection to the Buffer
# (that's a problem, right?)
file <- BufferReader$create(file)
}
assert_is(file, "InputStream")
shared_ptr(RecordBatchFileReader, ipc___RecordBatchFileReader__Open(file))
}