blob: ac2cbc1440f5b1f5ea98faaeee21b29e0d029fa5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#' @include record-batch.R
#' @title Table class
#' @description A Table is a sequence of [chunked arrays][ChunkedArray]. They
#' have a similar interface to [record batches][RecordBatch], but they can be
#' composed from multiple record batches or chunked arrays.
#' @usage NULL
#' @format NULL
#' @docType class
#'
#' @section S3 Methods and Usage:
#' Tables are data-frame-like, and many methods you expect to work on
#' a `data.frame` are implemented for `Table`. This includes `[`, `[[`,
#' `$`, `names`, `dim`, `nrow`, `ncol`, `head`, and `tail`. You can also pull
#' the data from an Arrow table into R with `as.data.frame()`. See the
#' examples.
#'
#' A caveat about the `$` method: because `Table` is an `R6` object,
#' `$` is also used to access the object's methods (see below). Methods take
#' precedence over the table's columns. So, `tab$Slice` would return the
#' "Slice" method function even if there were a column in the table called
#' "Slice".
#'
#' @section R6 Methods:
#' In addition to the more R-friendly S3 methods, a `Table` object has
#' the following R6 methods that map onto the underlying C++ methods:
#'
#' - `$column(i)`: Extract a `ChunkedArray` by integer position from the table
#' - `$ColumnNames()`: Get all column names (called by `names(tab)`)
#' - `$nbytes()`: Total number of bytes consumed by the elements of the table
#' - `$RenameColumns(value)`: Set all column names (called by `names(tab) <- value`)
#' - `$GetColumnByName(name)`: Extract a `ChunkedArray` by string name
#' - `$field(i)`: Extract a `Field` from the table schema by integer position
#' - `$SelectColumns(indices)`: Return new `Table` with specified columns, expressed as 0-based integers.
#' - `$Slice(offset, length = NULL)`: Create a zero-copy view starting at the
#' indicated integer offset and going for the given length, or to the end
#' of the table if `NULL`, the default.
#' - `$Take(i)`: return an `Table` with rows at positions given by
#' integers `i`. If `i` is an Arrow `Array` or `ChunkedArray`, it will be
#' coerced to an R vector before taking.
#' - `$Filter(i, keep_na = TRUE)`: return an `Table` with rows at positions where logical
#' vector or Arrow boolean-type `(Chunked)Array` `i` is `TRUE`.
#' - `$SortIndices(names, descending = FALSE)`: return an `Array` of integer row
#' positions that can be used to rearrange the `Table` in ascending or descending
#' order by the first named column, breaking ties with further named columns.
#' `descending` can be a logical vector of length one or of the same length as
#' `names`.
#' - `$serialize(output_stream, ...)`: Write the table to the given
#' [OutputStream]
#' - `$cast(target_schema, safe = TRUE, options = cast_options(safe))`: Alter
#' the schema of the record batch.
#'
#' There are also some active bindings:
#' - `$num_columns`
#' - `$num_rows`
#' - `$schema`
#' - `$metadata`: Returns the key-value metadata of the `Schema` as a named list.
#' Modify or replace by assigning in (`tab$metadata <- new_metadata`).
#' All list elements are coerced to string. See `schema()` for more information.
#' - `$columns`: Returns a list of `ChunkedArray`s
#' @rdname Table-class
#' @export
Table <- R6Class("Table",
inherit = ArrowTabular,
public = list(
column = function(i) Table__column(self, i),
ColumnNames = function() Table__ColumnNames(self),
nbytes = function() Table__ReferencedBufferSize(self),
RenameColumns = function(value) Table__RenameColumns(self, value),
GetColumnByName = function(name) {
assert_is(name, "character")
assert_that(length(name) == 1)
Table__GetColumnByName(self, name)
},
RemoveColumn = function(i) Table__RemoveColumn(self, i),
AddColumn = function(i, new_field, value) Table__AddColumn(self, i, new_field, value),
SetColumn = function(i, new_field, value) Table__SetColumn(self, i, new_field, value),
ReplaceSchemaMetadata = function(new) {
Table__ReplaceSchemaMetadata(self, prepare_key_value_metadata(new))
},
field = function(i) Table__field(self, i),
serialize = function(output_stream, ...) write_table(self, output_stream, ...),
to_data_frame = function() {
Table__to_dataframe(self, use_threads = option_use_threads())
},
cast = function(target_schema, safe = TRUE, ..., options = cast_options(safe, ...)) {
assert_is(target_schema, "Schema")
assert_that(identical(self$schema$names, target_schema$names), msg = "incompatible schemas")
Table__cast(self, target_schema, options)
},
SelectColumns = function(indices) Table__SelectColumns(self, indices),
Slice = function(offset, length = NULL) {
if (is.null(length)) {
Table__Slice1(self, offset)
} else {
Table__Slice2(self, offset, length)
}
},
# Take, Filter, and SortIndices are methods on ArrowTabular
Equals = function(other, check_metadata = FALSE, ...) {
inherits(other, "Table") && Table__Equals(self, other, isTRUE(check_metadata))
},
Validate = function() Table__Validate(self),
ValidateFull = function() Table__ValidateFull(self)
),
active = list(
num_columns = function() Table__num_columns(self),
num_rows = function() Table__num_rows(self),
schema = function() Table__schema(self),
columns = function() Table__columns(self)
)
)
Table$create <- function(..., schema = NULL) {
dots <- list2(...)
# making sure there are always names
if (is.null(names(dots))) {
names(dots) <- rep_len("", length(dots))
}
if (length(dots) == 0 && inherits(schema, "Schema")) {
return(Table__from_schema(schema))
}
stopifnot(length(dots) > 0)
if (all_record_batches(dots)) {
return(Table__from_record_batches(dots, schema))
}
if (length(dots) == 1 && inherits(dots[[1]], c("RecordBatchReader", "RecordBatchFileReader"))) {
tab <- dots[[1]]$read_table()
if (!is.null(schema)) {
tab <- tab$cast(schema)
}
return(tab)
}
# If any arrays are length 1, recycle them
dots <- recycle_scalars(dots)
Table__from_dots(dots, schema, option_use_threads())
}
#' @export
names.Table <- function(x) x$ColumnNames()
#' Concatenate one or more Tables
#'
#' Concatenate one or more [Table] objects into a single table. This operation
#' does not copy array data, but instead creates new chunked arrays for each
#' column that point at existing array data.
#'
#' @param ... A [Table]
#' @param unify_schemas If TRUE, the schemas of the tables will be first unified
#' with fields of the same name being merged, then each table will be promoted
#' to the unified schema before being concatenated. Otherwise, all tables should
#' have the same schema.
#' @examples
#' tbl <- arrow_table(name = rownames(mtcars), mtcars)
#' prius <- arrow_table(name = "Prius", mpg = 58, cyl = 4, disp = 1.8)
#' combined <- concat_tables(tbl, prius)
#' tail(combined)$to_data_frame()
#' @export
concat_tables <- function(..., unify_schemas = TRUE) {
tables <- list2(...)
if (length(tables) == 0) {
abort("Must pass at least one Table.")
}
if (!unify_schemas) {
# assert they have same schema
schema <- tables[[1]]$schema
unequal_schema_idx <- which.min(lapply(tables, function(x) x$schema == schema))
if (unequal_schema_idx != 1) {
abort(c(
sprintf("Schema at index %i does not match the first schema.", unequal_schema_idx),
i = paste0("Schema 1:\n", schema$ToString()),
i = paste0(
sprintf("Schema %d:\n", unequal_schema_idx),
tables[[unequal_schema_idx]]$schema$ToString()
)
))
}
}
Table__ConcatenateTables(tables, unify_schemas)
}
#' @export
rbind.Table <- function(...) {
concat_tables(..., unify_schemas = FALSE)
}
#' @export
cbind.Table <- function(...) {
call <- sys.call()
inputs <- list(...)
arg_names <- if (is.null(names(inputs))) {
rep("", length(inputs))
} else {
names(inputs)
}
cbind_check_length(inputs, call)
columns <- flatten(map(seq_along(inputs), function(i) {
input <- inputs[[i]]
name <- arg_names[i]
if (inherits(input, "ArrowTabular")) {
set_names(input$columns, names(input))
} else if (inherits(input, "data.frame")) {
as.list(input)
} else {
if (name == "") {
abort("Vector and array arguments must have names",
i = sprintf("Argument ..%d is missing a name", i)
)
}
list2("{name}" := input)
}
}))
Table$create(!!!columns)
}
#' Create an Arrow Table
#'
#' @param ... A `data.frame` or a named set of Arrays or vectors. If given a
#' mixture of data.frames and named vectors, the inputs will be autospliced together
#' (see examples). Alternatively, you can provide a single Arrow IPC
#' `InputStream`, `Message`, `Buffer`, or R `raw` object containing a `Buffer`.
#' @param schema a [Schema], or `NULL` (the default) to infer the schema from
#' the data in `...`. When providing an Arrow IPC buffer, `schema` is required.
#' @rdname table
#' @examples
#' tbl <- arrow_table(name = rownames(mtcars), mtcars)
#' dim(tbl)
#' dim(head(tbl))
#' names(tbl)
#' tbl$mpg
#' tbl[["cyl"]]
#' as.data.frame(tbl[4:8, c("gear", "hp", "wt")])
#' @seealso [Table]
#' @export
arrow_table <- Table$create
#' Convert an object to an Arrow Table
#'
#' Whereas [arrow_table()] constructs a table from one or more columns,
#' `as_arrow_table()` converts a single object to an Arrow [Table].
#'
#' @param x An object to convert to an Arrow Table
#' @param ... Passed to S3 methods
#' @inheritParams arrow_table
#'
#' @return A [Table]
#' @export
#'
#' @examples
#' # use as_arrow_table() for a single object
#' as_arrow_table(data.frame(col1 = 1, col2 = "two"))
#'
#' # use arrow_table() to create from columns
#' arrow_table(col1 = 1, col2 = "two")
#'
as_arrow_table <- function(x, ..., schema = NULL) {
UseMethod("as_arrow_table")
}
#' @rdname as_arrow_table
#' @export
as_arrow_table.default <- function(x, ...) {
# throw a classed error here so that we can customize the error message
# in as_writable_table()
abort(
sprintf(
"No method for `as_arrow_table()` for object of class %s",
paste(class(x), collapse = " / ")
),
class = "arrow_no_method_as_arrow_table"
)
}
#' @rdname as_arrow_table
#' @export
as_arrow_table.Table <- function(x, ..., schema = NULL) {
if (is.null(schema)) {
x
} else {
x$cast(schema)
}
}
#' @rdname as_arrow_table
#' @export
as_arrow_table.RecordBatch <- function(x, ..., schema = NULL) {
if (is.null(schema)) {
Table$create(x)
} else {
Table$create(x$cast(schema))
}
}
#' @rdname as_arrow_table
#' @export
as_arrow_table.data.frame <- function(x, ..., schema = NULL) {
check_named_cols(x)
Table$create(x, schema = schema)
}
#' @rdname as_arrow_table
#' @export
as_arrow_table.RecordBatchReader <- function(x, ...) {
x$read_table()
}
#' @rdname as_arrow_table
#' @export
as_arrow_table.Dataset <- function(x, ...) {
Scanner$create(x)$ToTable()
}
#' @rdname as_arrow_table
#' @export
as_arrow_table.arrow_dplyr_query <- function(x, ...) {
reader <- as_record_batch_reader(x)
on.exit(reader$.unsafe_delete())
out <- as_arrow_table(reader)
# arrow_dplyr_query holds group_by information. Set it on the table metadata.
set_group_attributes(
out,
dplyr::group_vars(x),
dplyr::group_by_drop_default(x)
)
}
#' @rdname as_arrow_table
#' @export
as_arrow_table.Schema <- function(x, ...) {
Table__from_schema(x)
}