blob: e9017825782f43a41d11b2851c7a9d691c29f3e9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#' Scan the contents of a dataset
#'
#' @description
#' A `Scanner` iterates over a [Dataset]'s fragments and returns data
#' according to given row filtering and column projection. A `ScannerBuilder`
#' can help create one.
#'
#' @section Factory:
#' `Scanner$create()` wraps the `ScannerBuilder` interface to make a `Scanner`.
#' It takes the following arguments:
#'
#' * `dataset`: A `Dataset` or `arrow_dplyr_query` object, as returned by the
#' `dplyr` methods on `Dataset`.
#' * `projection`: A character vector of column names to select
#' * `filter`: A `Expression` to filter the scanned rows by, or `TRUE` (default)
#' to keep all rows.
#' * `use_threads`: logical: should scanning use multithreading? Default `TRUE`
#' * `...`: Additional arguments, currently ignored
#' @section Methods:
#' `ScannerBuilder` has the following methods:
#'
#' - `$Project(cols)`: Indicate that the scan should only return columns given
#' by `cols`, a character vector of column names
#' - `$Filter(expr)`: Filter rows by an [Expression].
#' - `$UseThreads(threads)`: logical: should the scan use multithreading?
#' The method's default input is `TRUE`, but you must call the method to enable
#' multithreading because the scanner default is `FALSE`.
#' - `$BatchSize(batch_size)`: integer: Maximum row count of scanned record
#' batches, default is 32K. If scanned record batches are overflowing memory
#' then this method can be called to reduce their size.
#' - `$schema`: Active binding, returns the [Schema] of the Dataset
#' - `$Finish()`: Returns a `Scanner`
#'
#' `Scanner` currently has a single method, `$ToTable()`, which evaluates the
#' query and returns an Arrow [Table].
#' @rdname Scanner
#' @name Scanner
#' @export
Scanner <- R6Class("Scanner", inherit = ArrowObject,
public = list(
ToTable = function() shared_ptr(Table, dataset___Scanner__ToTable(self)),
Scan = function() map(dataset___Scanner__Scan(self), shared_ptr, class = ScanTask)
),
active = list(
schema = function() shared_ptr(Schema, dataset___Scanner__schema(self))
)
)
Scanner$create <- function(dataset,
projection = NULL,
filter = TRUE,
use_threads = option_use_threads(),
batch_size = NULL,
...) {
if (inherits(dataset, "arrow_dplyr_query")) {
return(Scanner$create(
dataset$.data,
dataset$selected_columns,
dataset$filtered_rows,
use_threads,
...
))
}
if (inherits(dataset, c("data.frame", "RecordBatch", "Table"))) {
dataset <- InMemoryDataset$create(dataset)
}
assert_is(dataset, "Dataset")
scanner_builder <- dataset$NewScan()
if (use_threads) {
scanner_builder$UseThreads()
}
if (!is.null(projection)) {
scanner_builder$Project(projection)
}
if (!isTRUE(filter)) {
scanner_builder$Filter(filter)
}
if (is_integerish(batch_size)) {
scanner_builder$BatchSize(batch_size)
}
scanner_builder$Finish()
}
#' @export
names.Scanner <- function(x) names(x$schema)
ScanTask <- R6Class("ScanTask", inherit = ArrowObject,
public = list(
Execute = function() map(dataset___ScanTask__get_batches(self), shared_ptr, class = RecordBatch)
)
)
#' Apply a function to a stream of RecordBatches
#'
#' As an alternative to calling `collect()` on a `Dataset` query, you can
#' use this function to access the stream of `RecordBatch`es in the `Dataset`.
#' This lets you aggregate on each chunk and pull the intermediate results into
#' a `data.frame` for further aggregation, even if you couldn't fit the whole
#' `Dataset` result in memory.
#'
#' This is experimental and not recommended for production use.
#'
#' @param X A `Dataset` or `arrow_dplyr_query` object, as returned by the
#' `dplyr` methods on `Dataset`.
#' @param FUN A function or `purrr`-style lambda expression to apply to each
#' batch
#' @param ... Additional arguments passed to `FUN`
#' @param .data.frame logical: collect the resulting chunks into a single
#' `data.frame`? Default `TRUE`
#' @export
map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
if (.data.frame) {
lapply <- map_dfr
}
scanner <- Scanner$create(ensure_group_vars(X))
FUN <- as_mapper(FUN)
# message("Making ScanTasks")
lapply(scanner$Scan(), function(scan_task) {
# This outer lapply could be parallelized
# message("Making Batches")
lapply(scan_task$Execute(), function(batch) {
# message("Processing Batch")
# This inner lapply cannot be parallelized
# TODO: wrap batch in arrow_dplyr_query with X$selected_columns and X$group_by_vars
# if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE
FUN(batch, ...)
})
})
}
#' @usage NULL
#' @format NULL
#' @rdname Scanner
#' @export
ScannerBuilder <- R6Class("ScannerBuilder", inherit = ArrowObject,
public = list(
Project = function(cols) {
assert_is(cols, "character")
dataset___ScannerBuilder__Project(self, cols)
self
},
Filter = function(expr) {
assert_is(expr, "Expression")
dataset___ScannerBuilder__Filter(self, expr)
self
},
UseThreads = function(threads = option_use_threads()) {
dataset___ScannerBuilder__UseThreads(self, threads)
self
},
BatchSize = function(batch_size) {
dataset___ScannerBuilder__BatchSize(self, batch_size)
self
},
Finish = function() unique_ptr(Scanner, dataset___ScannerBuilder__Finish(self))
),
active = list(
schema = function() shared_ptr(Schema, dataset___ScannerBuilder__schema(self))
)
)
#' @export
names.ScannerBuilder <- function(x) names(x$schema)