blob: 6dcb18f1c3e2701a91fc79377decd3c2335fc63e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#' @include arrow-package.R
#' @title RecordBatch class
#' @description A record batch is a collection of equal-length arrays matching
#' a particular [Schema]. It is a table-like data structure that is semantically
#' a sequence of [fields][Field], each a contiguous Arrow [Array].
#' @usage NULL
#' @format NULL
#' @docType class
#'
#' @section S3 Methods and Usage:
#' Record batches are data-frame-like, and many methods you expect to work on
#' a `data.frame` are implemented for `RecordBatch`. This includes `[`, `[[`,
#' `$`, `names`, `dim`, `nrow`, `ncol`, `head`, and `tail`. You can also pull
#' the data from an Arrow record batch into R with `as.data.frame()`. See the
#' examples.
#'
#' A caveat about the `$` method: because `RecordBatch` is an `R6` object,
#' `$` is also used to access the object's methods (see below). Methods take
#' precedence over the table's columns. So, `batch$Slice` would return the
#' "Slice" method function even if there were a column in the table called
#' "Slice".
#'
#' A caveat about the `[` method for row operations: only "slicing" is
#' currently supported. That is, you can select a continuous range of rows
#' from the table, but you can't filter with a `logical` vector or take an
#' arbitrary selection of rows by integer indices.
#'
#' @section R6 Methods:
#' In addition to the more R-friendly S3 methods, a `RecordBatch` object has
#' the following R6 methods that map onto the underlying C++ methods:
#'
#' - `$Equals(other)`: Returns `TRUE` if the `other` record batch is equal
#' - `$column(i)`: Extract an `Array` by integer position from the batch
#' - `$column_name(i)`: Get a column's name by integer position
#' - `$names()`: Get all column names (called by `names(batch)`)
#' - `$GetColumnByName(name)`: Extract an `Array` by string name
#' - `$RemoveColumn(i)`: Drops a column from the batch by integer position
#' - `$select(spec)`: Return a new record batch with a selection of columns.
#' This supports the usual `character`, `numeric`, and `logical` selection
#' methods as well as "tidy select" expressions.
#' - `$Slice(offset, length = NULL)`: Create a zero-copy view starting at the
#' indicated integer offset and going for the given length, or to the end
#' of the table if `NULL`, the default.
#' - `$serialize()`: Returns a raw vector suitable for interprocess communication
#' - `$cast(target_schema, safe = TRUE, options = cast_options(safe))`: Alter
#' the schema of the record batch.
#'
#' There are also some active bindings
#' - `$num_columns`
#' - `$num_rows`
#' - `$schema`
#' - `$columns`: Returns a list of `Array`s
#' @rdname RecordBatch
#' @name RecordBatch
RecordBatch <- R6Class("RecordBatch", inherit = Object,
public = list(
column = function(i) {
assert_is(i, c("numeric", "integer"))
assert_that(length(i) == 1)
shared_ptr(Array, RecordBatch__column(self, i))
},
column_name = function(i) RecordBatch__column_name(self, i),
names = function() RecordBatch__names(self),
Equals = function(other) {
assert_is(other, "RecordBatch")
RecordBatch__Equals(self, other)
},
GetColumnByName = function(name) {
assert_is(name, "character")
assert_that(length(name) == 1)
shared_ptr(Array, RecordBatch__GetColumnByName(self, name))
},
select = function(spec) {
spec <- enquo(spec)
if (quo_is_null(spec)) {
self
} else {
all_vars <- self$names()
vars <- vars_select(all_vars, !!spec)
indices <- match(vars, all_vars)
shared_ptr(RecordBatch, RecordBatch__select(self, indices))
}
},
RemoveColumn = function(i){
shared_ptr(RecordBatch, RecordBatch__RemoveColumn(self, i))
},
Slice = function(offset, length = NULL) {
if (is.null(length)) {
shared_ptr(RecordBatch, RecordBatch__Slice1(self, offset))
} else {
shared_ptr(RecordBatch, RecordBatch__Slice2(self, offset, length))
}
},
serialize = function() ipc___SerializeRecordBatch__Raw(self),
ToString = function() ToString_tabular(self),
cast = function(target_schema, safe = TRUE, options = cast_options(safe)) {
assert_is(target_schema, "Schema")
assert_is(options, "CastOptions")
assert_that(identical(self$schema$names, target_schema$names), msg = "incompatible schemas")
shared_ptr(RecordBatch, RecordBatch__cast(self, target_schema, options))
}
),
active = list(
num_columns = function() RecordBatch__num_columns(self),
num_rows = function() RecordBatch__num_rows(self),
schema = function() shared_ptr(Schema, RecordBatch__schema(self)),
columns = function() map(RecordBatch__columns(self), shared_ptr, Array)
)
)
RecordBatch$create <- function(..., schema = NULL){
arrays <- list2(...)
# making sure there are always names
if (is.null(names(arrays))) {
names(arrays) <- rep_len("", length(arrays))
}
stopifnot(length(arrays) > 0)
shared_ptr(RecordBatch, RecordBatch__from_arrays(schema, arrays))
}
#' @param ... A `data.frame` or a named set of Arrays or vectors. If given a
#' mixture of data.frames and vectors, the inputs will be autospliced together
#' (see examples).
#' @param schema a [Schema], or `NULL` (the default) to infer the schema from
#' the data in `...`
#' @rdname RecordBatch
#' @examples
#' \donttest{
#' batch <- record_batch(name = rownames(mtcars), mtcars)
#' dim(batch)
#' dim(head(batch))
#' names(batch)
#' batch$mpg
#' batch[["cyl"]]
#' as.data.frame(batch[4:8, c("gear", "hp", "wt")])
#' }
#' @export
record_batch <- RecordBatch$create
#' @export
names.RecordBatch <- function(x) {
x$names()
}
#' @importFrom methods as
#' @export
`[.RecordBatch` <- function(x, i, j, ..., drop = FALSE) {
if (!missing(i)) {
if (is.numeric(i) &&
length(i) > 0 &&
all(i > 0) &&
identical(i, as(seq(i[1], i[length(i)], 1), class(i)))) {
x <- x$Slice(i[1] - 1, length(i))
} else {
stop('Only row "Slicing" (taking rows a:b) currently supported', call. = FALSE)
}
}
if (!missing(j)) {
x <- x$select(j)
if (drop && ncol(x) == 1L) {
x <- x$column(0)
}
}
x
}
#' @export
`[[.RecordBatch` <- function(x, i, ...) {
if (is.character(i)) {
x$GetColumnByName(i)
} else if (is.numeric(i)) {
x$column(i - 1)
} else {
stop("'i' must be character or numeric, not ", class(i), call. = FALSE)
}
}
#' @export
`$.RecordBatch` <- function(x, name, ...) {
assert_is(name, "character")
assert_that(length(name) == 1L)
if (name %in% ls(x)) {
get(name, x)
} else {
x$GetColumnByName(name)
}
}
#' @export
dim.RecordBatch <- function(x) {
c(x$num_rows, x$num_columns)
}
#' @export
as.data.frame.RecordBatch <- function(x, row.names = NULL, optional = FALSE, use_threads = TRUE, ...){
RecordBatch__to_dataframe(x, use_threads = option_use_threads())
}
#' @importFrom utils head
#' @export
head.RecordBatch <- function(x, n = 6L, ...) {
assert_is(n, c("numeric", "integer"))
assert_that(length(n) == 1)
if (n < 0) {
# head(x, negative) means all but the last n rows
n <- nrow(x) + n
}
x$Slice(0, n)
}
#' @importFrom utils tail
#' @export
tail.RecordBatch <- function(x, n = 6L, ...) {
assert_is(n, c("numeric", "integer"))
assert_that(length(n) == 1)
if (n < 0) {
# tail(x, negative) means all but the first n rows
n <- -n
} else {
n <- nrow(x) - n
}
x$Slice(n)
}
ToString_tabular <- function(x, ...) {
# Generic to work with both RecordBatch and Table
sch <- unlist(strsplit(x$schema$ToString(), "\n"))
sch <- sub("(.*): (.*)", "$\\1 <\\2>", sch)
dims <- sprintf("%s rows x %s columns", nrow(x), ncol(x))
paste(c(dims, sch), collapse = "\n")
}