blob: 7b1d66092953ba2135b0f2e58fb01a336282c328 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#' Open a multi-file dataset
#'
#' Arrow Datasets allow you to query against data that has been split across
#' multiple files. This sharding of data may indicate partitioning, which
#' can accelerate queries that only touch some partitions (files). Call
#' `open_dataset()` to point to a directory of data files and return a
#' `Dataset`, then use `dplyr` methods to query it.
#'
#' @param sources Either:
#' * a string path to a directory containing data files
#' * a list of `Dataset` objects as created by this function
#' * a list of `DatasetFactory` objects as created by [dataset_factory()].
#' @param schema [Schema] for the dataset. If `NULL` (the default), the schema
#' will be inferred from the data sources.
#' @param partitioning When `sources` is a file path, one of
#' * a `Schema`, in which case the file paths relative to `sources` will be
#' parsed, and path segments will be matched with the schema fields. For
#' example, `schema(year = int16(), month = int8())` would create partitions
#' for file paths like "2019/01/file.parquet", "2019/02/file.parquet", etc.
#' * a character vector that defines the field names corresponding to those
#' path segments (that is, you're providing the names that would correspond
#' to a `Schema` but the types will be autodetected)
#' * a `HivePartitioning` or `HivePartitioningFactory`, as returned
#' by [hive_partition()] which parses explicit or autodetected fields from
#' Hive-style path segments
#' * `NULL` for no partitioning
#'
#' The default is to autodetect Hive-style partitions.
#' @param unify_schemas logical: should all data fragments (files, `Dataset`s)
#' be scanned in order to create a unified schema from them? If `FALSE`, only
#' the first fragment will be inspected for its schema. Use this
#' fast path when you know and trust that all fragments have an identical schema.
#' The default is `FALSE` when creating a dataset from a file path (because
#' there may be many files and scanning may be slow) but `TRUE` when `sources`
#' is a list of `Dataset`s (because there should be few `Dataset`s in the list
#' and their `Schema`s are already in memory).
#' @param ... additional arguments passed to `dataset_factory()` when
#' `sources` is a file path, otherwise ignored. These may include "format" to
#' indicate the file format, or other format-specific options.
#' @return A [Dataset] R6 object. Use `dplyr` methods on it to query the data,
#' or call [`$NewScan()`][Scanner] to construct a query directly.
#' @export
#' @seealso `vignette("dataset", package = "arrow")`
#' @include arrow-package.R
open_dataset <- function(sources,
schema = NULL,
partitioning = hive_partition(),
unify_schemas = NULL,
...) {
if (is_list_of(sources, "Dataset")) {
if (is.null(schema)) {
if (is.null(unify_schemas) || isTRUE(unify_schemas)) {
# Default is to unify schemas here
schema <- unify_schemas(schemas = map(sources, ~.$schema))
} else {
# Take the first one.
schema <- sources[[1]]$schema
}
}
# Enforce that all datasets have the same schema
sources <- lapply(sources, function(x) {
x$schema <- schema
x
})
return(shared_ptr(UnionDataset, dataset___UnionDataset__create(sources, schema)))
}
factory <- DatasetFactory$create(sources, partitioning = partitioning, ...)
# Default is _not_ to inspect/unify schemas
factory$Finish(schema, isTRUE(unify_schemas))
}
#' Multi-file datasets
#'
#' @description
#' Arrow Datasets allow you to query against data that has been split across
#' multiple files. This sharding of data may indicate partitioning, which
#' can accelerate queries that only touch some partitions (files).
#'
#' A `Dataset` contains one or more `Fragments`, such as files, of potentially
#' differing type and partitioning.
#'
#' For `Dataset$create()`, see [open_dataset()], which is an alias for it.
#'
#' `DatasetFactory` is used to provide finer control over the creation of `Dataset`s.
#'
#' @section Factory:
#' `DatasetFactory` is used to create a `Dataset`, inspect the [Schema] of the
#' fragments contained in it, and declare a partitioning.
#' `FileSystemDatasetFactory` is a subclass of `DatasetFactory` for
#' discovering files in the local file system, the only currently supported
#' file system.
#'
#' For the `DatasetFactory$create()` factory method, see [dataset_factory()], an
#' alias for it. A `DatasetFactory` has:
#'
#' - `$Inspect(unify_schemas)`: If `unify_schemas` is `TRUE`, all fragments
#' will be scanned and a unified [Schema] will be created from them; if `FALSE`
#' (default), only the first fragment will be inspected for its schema. Use this
#' fast path when you know and trust that all fragments have an identical schema.
#' - `$Finish(schema, unify_schemas)`: Returns a `Dataset`. If `schema` is provided,
#' it will be used for the `Dataset`; if omitted, a `Schema` will be created from
#' inspecting the fragments (files) in the dataset, following `unify_schemas`
#' as described above.
#'
#' `FileSystemDatasetFactory$create()` is a lower-level factory method and
#' takes the following arguments:
#' * `filesystem`: A [FileSystem]
#' * `selector`: A [FileSelector]
#' * `format`: A [FileFormat]
#' * `partitioning`: Either `Partitioning`, `PartitioningFactory`, or `NULL`
#' @section Methods:
#'
#' A `Dataset` has the following methods:
#' - `$NewScan()`: Returns a [ScannerBuilder] for building a query
#' - `$schema`: Active binding that returns the [Schema] of the Dataset; you
#' may also replace the dataset's schema by using `ds$schema <- new_schema`.
#' This method currently supports only adding, removing, or reordering
#' fields in the schema: you cannot alter or cast the field types.
#'
#' `FileSystemDataset` has the following methods:
#' - `$files`: Active binding, returns the files of the `FileSystemDataset`
#' - `$format`: Active binding, returns the [FileFormat] of the `FileSystemDataset`
#'
#' `UnionDataset` has the following methods:
#' - `$children`: Active binding, returns all child `Dataset`s.
#'
#' @export
#' @seealso [open_dataset()] for a simple interface to creating a `Dataset`
Dataset <- R6Class("Dataset", inherit = ArrowObject,
public = list(
..dispatch = function() {
type <- self$type
if (type == "union") {
shared_ptr(UnionDataset, self$pointer())
} else if (type == "filesystem") {
shared_ptr(FileSystemDataset, self$pointer())
} else {
self
}
},
# @description
# Start a new scan of the data
# @return A [ScannerBuilder]
NewScan = function() unique_ptr(ScannerBuilder, dataset___Dataset__NewScan(self)),
ToString = function() self$schema$ToString()
),
active = list(
schema = function(schema) {
if (missing(schema)) {
shared_ptr(Schema, dataset___Dataset__schema(self))
} else {
assert_is(schema, "Schema")
invisible(shared_ptr(Dataset, dataset___Dataset__ReplaceSchema(self, schema)))
}
},
metadata = function() self$schema$metadata,
num_rows = function() {
warning("Number of rows unknown; returning NA", call. = FALSE)
NA_integer_
},
num_cols = function() length(self$schema),
# @description
# Return the Dataset's type.
type = function() dataset___Dataset__type_name(self)
)
)
Dataset$create <- open_dataset
#' @name FileSystemDataset
#' @rdname Dataset
#' @export
FileSystemDataset <- R6Class("FileSystemDataset", inherit = Dataset,
public = list(
.class_title = function() {
nfiles <- length(self$files)
file_type <- self$format$type
pretty_file_type <- list(
parquet = "Parquet",
ipc = "Feather"
)[[file_type]]
paste(
class(self)[[1]],
"with",
nfiles,
pretty_file_type %||% file_type,
ifelse(nfiles == 1, "file", "files")
)
}
),
active = list(
# @description
# Return the files contained in this `FileSystemDataset`
files = function() dataset___FileSystemDataset__files(self),
# @description
# Return the format of files in this `Dataset`
format = function() {
shared_ptr(FileFormat, dataset___FileSystemDataset__format(self))$..dispatch()
},
# @description
# Return the filesystem of files in this `Dataset`
filesystem = function() {
shared_ptr(FileSystem, dataset___FileSystemDataset__filesystem(self))$..dispatch()
},
num_rows = function() {
if (inherits(self$format, "ParquetFileFormat")) {
# It's generally fast enough to skim the files directly
sum(map_int(self$files, ~ParquetFileReader$create(.x)$num_rows))
} else {
# TODO: implement for other file formats
warning("Number of rows unknown; returning NA", call. = FALSE)
NA_integer_
# Could do a scan, picking only the last column, which hopefully is virtual
# But this is can be slow
# Scanner$create(self, projection = tail(names(self), 1))$ToTable()$num_rows
# See also https://issues.apache.org/jira/browse/ARROW-9697
}
}
)
)
#' @name UnionDataset
#' @rdname Dataset
#' @export
UnionDataset <- R6Class("UnionDataset", inherit = Dataset,
active = list(
# @description
# Return the UnionDataset's child `Dataset`s
children = function() {
map(dataset___UnionDataset__children(self), ~shared_ptr(Dataset, .)$..dispatch())
}
)
)
#' @name InMemoryDataset
#' @rdname Dataset
#' @export
InMemoryDataset <- R6Class("InMemoryDataset", inherit = Dataset)
InMemoryDataset$create <- function(x) {
if (!inherits(x, "Table")) {
x <- Table$create(x)
}
shared_ptr(InMemoryDataset, dataset___InMemoryDataset__create(x))
}
#' @export
names.Dataset <- function(x) names(x$schema)
#' @export
dim.Dataset <- function(x) c(x$num_rows, x$num_cols)
#' @export
c.Dataset <- function(...) Dataset$create(list(...))
#' @export
head.Dataset <- function(x, n = 6L, ...) {
assert_that(n > 0) # For now
scanner <- Scanner$create(ensure_group_vars(x))
shared_ptr(Table, dataset___Scanner__head(scanner, n))
}
#' @export
tail.Dataset <- function(x, n = 6L, ...) {
assert_that(n > 0) # For now
result <- list()
batch_num <- 0
scanner <- Scanner$create(ensure_group_vars(x))
for (scan_task in rev(dataset___Scanner__Scan(scanner))) {
for (batch in rev(shared_ptr(ScanTask, scan_task)$Execute())) {
batch_num <- batch_num + 1
result[[batch_num]] <- tail(batch, n)
n <- n - nrow(batch)
if (n <= 0) break
}
if (n <= 0) break
}
Table$create(!!!rev(result))
}
#' @export
`[.Dataset` <- function(x, i, j, ..., drop = FALSE) {
if (nargs() == 2L) {
# List-like column extraction (x[i])
return(x[, i])
}
if (!missing(j)) {
x <- select.Dataset(x, j)
}
if (!missing(i)) {
x <- take_dataset_rows(x, i)
}
x
}
take_dataset_rows <- function(x, i) {
# TODO: move this to cpp
if (!is.numeric(i) || any(i < 0)) {
stop("Only slicing with positive indices is supported", call. = FALSE)
}
result <- list()
result_order <- order(i)
i <- sort(i) - 1L
scanner <- Scanner$create(ensure_group_vars(x))
for (scan_task in dataset___Scanner__Scan(scanner)) {
for (batch in shared_ptr(ScanTask, scan_task)$Execute()) {
# Take all rows that are in this batch
this_batch_nrows <- batch$num_rows
in_this_batch <- i > -1L & i < this_batch_nrows
if (any(in_this_batch)) {
result[[length(result) + 1L]] <- batch$Take(i[in_this_batch])
}
i <- i - this_batch_nrows
if (all(i < 0L)) break
}
if (all(i < 0L)) break
}
tab <- Table$create(!!!result)
# Now sort
tab$Take(result_order - 1L)
}