| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| #' @include record-batch.R |
| #' @title Table class |
| #' @description A Table is a sequence of [chunked arrays][ChunkedArray]. They |
| #' have a similar interface to [record batches][RecordBatch], but they can be |
| #' composed from multiple record batches or chunked arrays. |
| #' @usage NULL |
| #' @format NULL |
| #' @docType class |
| #' |
| #' @section S3 Methods and Usage: |
| #' Tables are data-frame-like, and many methods you expect to work on |
| #' a `data.frame` are implemented for `Table`. This includes `[`, `[[`, |
| #' `$`, `names`, `dim`, `nrow`, `ncol`, `head`, and `tail`. You can also pull |
| #' the data from an Arrow table into R with `as.data.frame()`. See the |
| #' examples. |
| #' |
| #' A caveat about the `$` method: because `Table` is an `R6` object, |
| #' `$` is also used to access the object's methods (see below). Methods take |
| #' precedence over the table's columns. So, `tab$Slice` would return the |
| #' "Slice" method function even if there were a column in the table called |
| #' "Slice". |
| #' |
| #' @section R6 Methods: |
| #' In addition to the more R-friendly S3 methods, a `Table` object has |
| #' the following R6 methods that map onto the underlying C++ methods: |
| #' |
| #' - `$column(i)`: Extract a `ChunkedArray` by integer position from the table |
| #' - `$ColumnNames()`: Get all column names (called by `names(tab)`) |
| #' - `$nbytes()`: Total number of bytes consumed by the elements of the table |
| #' - `$RenameColumns(value)`: Set all column names (called by `names(tab) <- value`) |
| #' - `$GetColumnByName(name)`: Extract a `ChunkedArray` by string name |
| #' - `$field(i)`: Extract a `Field` from the table schema by integer position |
| #' - `$SelectColumns(indices)`: Return new `Table` with specified columns, expressed as 0-based integers. |
| #' - `$Slice(offset, length = NULL)`: Create a zero-copy view starting at the |
| #' indicated integer offset and going for the given length, or to the end |
| #' of the table if `NULL`, the default. |
| #' - `$Take(i)`: return an `Table` with rows at positions given by |
| #' integers `i`. If `i` is an Arrow `Array` or `ChunkedArray`, it will be |
| #' coerced to an R vector before taking. |
| #' - `$Filter(i, keep_na = TRUE)`: return an `Table` with rows at positions where logical |
| #' vector or Arrow boolean-type `(Chunked)Array` `i` is `TRUE`. |
| #' - `$SortIndices(names, descending = FALSE)`: return an `Array` of integer row |
| #' positions that can be used to rearrange the `Table` in ascending or descending |
| #' order by the first named column, breaking ties with further named columns. |
| #' `descending` can be a logical vector of length one or of the same length as |
| #' `names`. |
| #' - `$serialize(output_stream, ...)`: Write the table to the given |
| #' [OutputStream] |
| #' - `$cast(target_schema, safe = TRUE, options = cast_options(safe))`: Alter |
| #' the schema of the record batch. |
| #' |
| #' There are also some active bindings: |
| #' - `$num_columns` |
| #' - `$num_rows` |
| #' - `$schema` |
| #' - `$metadata`: Returns the key-value metadata of the `Schema` as a named list. |
| #' Modify or replace by assigning in (`tab$metadata <- new_metadata`). |
| #' All list elements are coerced to string. See `schema()` for more information. |
| #' - `$columns`: Returns a list of `ChunkedArray`s |
| #' @rdname Table-class |
| #' @export |
| Table <- R6Class("Table", |
| inherit = ArrowTabular, |
| public = list( |
| column = function(i) Table__column(self, i), |
| ColumnNames = function() Table__ColumnNames(self), |
| nbytes = function() Table__ReferencedBufferSize(self), |
| RenameColumns = function(value) Table__RenameColumns(self, value), |
| GetColumnByName = function(name) { |
| assert_is(name, "character") |
| assert_that(length(name) == 1) |
| Table__GetColumnByName(self, name) |
| }, |
| RemoveColumn = function(i) Table__RemoveColumn(self, i), |
| AddColumn = function(i, new_field, value) Table__AddColumn(self, i, new_field, value), |
| SetColumn = function(i, new_field, value) Table__SetColumn(self, i, new_field, value), |
| ReplaceSchemaMetadata = function(new) { |
| Table__ReplaceSchemaMetadata(self, prepare_key_value_metadata(new)) |
| }, |
| field = function(i) Table__field(self, i), |
| serialize = function(output_stream, ...) write_table(self, output_stream, ...), |
| to_data_frame = function() { |
| Table__to_dataframe(self, use_threads = option_use_threads()) |
| }, |
| cast = function(target_schema, safe = TRUE, ..., options = cast_options(safe, ...)) { |
| assert_is(target_schema, "Schema") |
| assert_that(identical(self$schema$names, target_schema$names), msg = "incompatible schemas") |
| Table__cast(self, target_schema, options) |
| }, |
| SelectColumns = function(indices) Table__SelectColumns(self, indices), |
| Slice = function(offset, length = NULL) { |
| if (is.null(length)) { |
| Table__Slice1(self, offset) |
| } else { |
| Table__Slice2(self, offset, length) |
| } |
| }, |
| # Take, Filter, and SortIndices are methods on ArrowTabular |
| Equals = function(other, check_metadata = FALSE, ...) { |
| inherits(other, "Table") && Table__Equals(self, other, isTRUE(check_metadata)) |
| }, |
| Validate = function() Table__Validate(self), |
| ValidateFull = function() Table__ValidateFull(self) |
| ), |
| active = list( |
| num_columns = function() Table__num_columns(self), |
| num_rows = function() Table__num_rows(self), |
| schema = function() Table__schema(self), |
| columns = function() Table__columns(self) |
| ) |
| ) |
| |
| Table$create <- function(..., schema = NULL) { |
| dots <- list2(...) |
| # making sure there are always names |
| if (is.null(names(dots))) { |
| names(dots) <- rep_len("", length(dots)) |
| } |
| |
| if (length(dots) == 0 && inherits(schema, "Schema")) { |
| return(Table__from_schema(schema)) |
| } |
| |
| stopifnot(length(dots) > 0) |
| |
| if (all_record_batches(dots)) { |
| return(Table__from_record_batches(dots, schema)) |
| } |
| if (length(dots) == 1 && inherits(dots[[1]], c("RecordBatchReader", "RecordBatchFileReader"))) { |
| tab <- dots[[1]]$read_table() |
| if (!is.null(schema)) { |
| tab <- tab$cast(schema) |
| } |
| return(tab) |
| } |
| |
| # If any arrays are length 1, recycle them |
| dots <- recycle_scalars(dots) |
| |
| Table__from_dots(dots, schema, option_use_threads()) |
| } |
| |
| #' @export |
| names.Table <- function(x) x$ColumnNames() |
| |
| #' Concatenate one or more Tables |
| #' |
| #' Concatenate one or more [Table] objects into a single table. This operation |
| #' does not copy array data, but instead creates new chunked arrays for each |
| #' column that point at existing array data. |
| #' |
| #' @param ... A [Table] |
| #' @param unify_schemas If TRUE, the schemas of the tables will be first unified |
| #' with fields of the same name being merged, then each table will be promoted |
| #' to the unified schema before being concatenated. Otherwise, all tables should |
| #' have the same schema. |
| #' @examples |
| #' tbl <- arrow_table(name = rownames(mtcars), mtcars) |
| #' prius <- arrow_table(name = "Prius", mpg = 58, cyl = 4, disp = 1.8) |
| #' combined <- concat_tables(tbl, prius) |
| #' tail(combined)$to_data_frame() |
| #' @export |
| concat_tables <- function(..., unify_schemas = TRUE) { |
| tables <- list2(...) |
| |
| if (length(tables) == 0) { |
| abort("Must pass at least one Table.") |
| } |
| |
| if (!unify_schemas) { |
| # assert they have same schema |
| schema <- tables[[1]]$schema |
| unequal_schema_idx <- which.min(lapply(tables, function(x) x$schema == schema)) |
| if (unequal_schema_idx != 1) { |
| abort(c( |
| sprintf("Schema at index %i does not match the first schema.", unequal_schema_idx), |
| i = paste0("Schema 1:\n", schema$ToString()), |
| i = paste0( |
| sprintf("Schema %d:\n", unequal_schema_idx), |
| tables[[unequal_schema_idx]]$schema$ToString() |
| ) |
| )) |
| } |
| } |
| |
| Table__ConcatenateTables(tables, unify_schemas) |
| } |
| |
| #' @export |
| rbind.Table <- function(...) { |
| concat_tables(..., unify_schemas = FALSE) |
| } |
| |
| #' @export |
| cbind.Table <- function(...) { |
| call <- sys.call() |
| inputs <- list(...) |
| arg_names <- if (is.null(names(inputs))) { |
| rep("", length(inputs)) |
| } else { |
| names(inputs) |
| } |
| |
| cbind_check_length(inputs, call) |
| |
| columns <- flatten(map(seq_along(inputs), function(i) { |
| input <- inputs[[i]] |
| name <- arg_names[i] |
| |
| if (inherits(input, "ArrowTabular")) { |
| set_names(input$columns, names(input)) |
| } else if (inherits(input, "data.frame")) { |
| as.list(input) |
| } else { |
| if (name == "") { |
| abort("Vector and array arguments must have names", |
| i = sprintf("Argument ..%d is missing a name", i) |
| ) |
| } |
| list2("{name}" := input) |
| } |
| })) |
| |
| Table$create(!!!columns) |
| } |
| |
| #' Create an Arrow Table |
| #' |
| #' @param ... A `data.frame` or a named set of Arrays or vectors. If given a |
| #' mixture of data.frames and named vectors, the inputs will be autospliced together |
| #' (see examples). Alternatively, you can provide a single Arrow IPC |
| #' `InputStream`, `Message`, `Buffer`, or R `raw` object containing a `Buffer`. |
| #' @param schema a [Schema], or `NULL` (the default) to infer the schema from |
| #' the data in `...`. When providing an Arrow IPC buffer, `schema` is required. |
| #' @rdname table |
| #' @examples |
| #' tbl <- arrow_table(name = rownames(mtcars), mtcars) |
| #' dim(tbl) |
| #' dim(head(tbl)) |
| #' names(tbl) |
| #' tbl$mpg |
| #' tbl[["cyl"]] |
| #' as.data.frame(tbl[4:8, c("gear", "hp", "wt")]) |
| #' @seealso [Table] |
| #' @export |
| arrow_table <- Table$create |
| |
| |
| #' Convert an object to an Arrow Table |
| #' |
| #' Whereas [arrow_table()] constructs a table from one or more columns, |
| #' `as_arrow_table()` converts a single object to an Arrow [Table]. |
| #' |
| #' @param x An object to convert to an Arrow Table |
| #' @param ... Passed to S3 methods |
| #' @inheritParams arrow_table |
| #' |
| #' @return A [Table] |
| #' @export |
| #' |
| #' @examples |
| #' # use as_arrow_table() for a single object |
| #' as_arrow_table(data.frame(col1 = 1, col2 = "two")) |
| #' |
| #' # use arrow_table() to create from columns |
| #' arrow_table(col1 = 1, col2 = "two") |
| #' |
| as_arrow_table <- function(x, ..., schema = NULL) { |
| UseMethod("as_arrow_table") |
| } |
| |
| #' @rdname as_arrow_table |
| #' @export |
| as_arrow_table.default <- function(x, ...) { |
| # throw a classed error here so that we can customize the error message |
| # in as_writable_table() |
| abort( |
| sprintf( |
| "No method for `as_arrow_table()` for object of class %s", |
| paste(class(x), collapse = " / ") |
| ), |
| class = "arrow_no_method_as_arrow_table" |
| ) |
| } |
| |
| #' @rdname as_arrow_table |
| #' @export |
| as_arrow_table.Table <- function(x, ..., schema = NULL) { |
| if (is.null(schema)) { |
| x |
| } else { |
| x$cast(schema) |
| } |
| } |
| |
| #' @rdname as_arrow_table |
| #' @export |
| as_arrow_table.RecordBatch <- function(x, ..., schema = NULL) { |
| if (is.null(schema)) { |
| Table$create(x) |
| } else { |
| Table$create(x$cast(schema)) |
| } |
| } |
| |
| #' @rdname as_arrow_table |
| #' @export |
| as_arrow_table.data.frame <- function(x, ..., schema = NULL) { |
| check_named_cols(x) |
| Table$create(x, schema = schema) |
| } |
| |
| #' @rdname as_arrow_table |
| #' @export |
| as_arrow_table.RecordBatchReader <- function(x, ...) { |
| x$read_table() |
| } |
| |
| #' @rdname as_arrow_table |
| #' @export |
| as_arrow_table.Dataset <- function(x, ...) { |
| Scanner$create(x)$ToTable() |
| } |
| |
| #' @rdname as_arrow_table |
| #' @export |
| as_arrow_table.arrow_dplyr_query <- function(x, ...) { |
| reader <- as_record_batch_reader(x) |
| on.exit(reader$.unsafe_delete()) |
| |
| out <- as_arrow_table(reader) |
| # arrow_dplyr_query holds group_by information. Set it on the table metadata. |
| set_group_attributes( |
| out, |
| dplyr::group_vars(x), |
| dplyr::group_by_drop_default(x) |
| ) |
| } |
| |
| #' @rdname as_arrow_table |
| #' @export |
| as_arrow_table.Schema <- function(x, ...) { |
| Table__from_schema(x) |
| } |