blob: 1312a2676ae4c55c02946b795f6a8f83d5ef331d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#' Read a CSV or other delimited file with Arrow
#'
#' These functions uses the Arrow C++ CSV reader to read into a `data.frame`.
#' Arrow C++ options have been mapped to argument names that follow those of
#' `readr::read_delim()`, and `col_select` was inspired by `vroom::vroom()`.
#'
#' `read_csv_arrow()` and `read_tsv_arrow()` are wrappers around
#' `read_delim_arrow()` that specify a delimiter.
#'
#' Note that not all `readr` options are currently implemented here. Please file
#' an issue if you encounter one that `arrow` should support.
#'
#' If you need to control Arrow-specific reader parameters that don't have an
#' equivalent in `readr::read_csv()`, you can either provide them in the
#' `parse_options`, `convert_options`, or `read_options` arguments, or you can
#' use [CsvTableReader] directly for lower-level access.
#'
#' @section Specifying column types and names:
#'
#' By default, the CSV reader will infer the column names and data types from the file, but there
#' are a few ways you can specify them directly.
#'
#' One way is to provide an Arrow [Schema] in the `schema` argument,
#' which is an ordered map of column name to type.
#' When provided, it satisfies both the `col_names` and `col_types` arguments.
#' This is good if you know all of this information up front.
#'
#' You can also pass a `Schema` to the `col_types` argument. If you do this,
#' column names will still be inferred from the file unless you also specify
#' `col_names`. In either case, the column names in the `Schema` must match the
#' data's column names, whether they are explicitly provided or inferred. That
#' said, this `Schema` does not have to reference all columns: those omitted
#' will have their types inferred.
#'
#' Alternatively, you can declare column types by providing the compact string representation
#' that `readr` uses to the `col_types` argument. This means you provide a
#' single string, one character per column, where the characters map to Arrow
#' types analogously to the `readr` type mapping:
#'
#' * "c": `utf8()`
#' * "i": `int32()`
#' * "n": `float64()`
#' * "d": `float64()`
#' * "l": `bool()`
#' * "f": `dictionary()`
#' * "D": `date32()`
#' * "T": `time32()`
#' * "t": `timestamp()`
#' * "_": `null()`
#' * "-": `null()`
#' * "?": infer the type from the data
#'
#' If you use the compact string representation for `col_types`, you must also
#' specify `col_names`.
#'
#' Regardless of how types are specified, all columns with a `null()` type will
#' be dropped.
#'
#' Note that if you are specifying column names, whether by `schema` or
#' `col_names`, and the CSV file has a header row that would otherwise be used
#' to idenfity column names, you'll need to add `skip = 1` to skip that row.
#'
#' @param file A character file name or URI, `raw` vector, an Arrow input stream,
#' or a `FileSystem` with path (`SubTreeFileSystem`).
#' If a file name, a memory-mapped Arrow [InputStream] will be opened and
#' closed when finished; compression will be detected from the file extension
#' and handled automatically. If an input stream is provided, it will be left
#' open.
#' @param delim Single character used to separate fields within a record.
#' @param quote Single character used to quote strings.
#' @param escape_double Does the file escape quotes by doubling them?
#' i.e. If this option is `TRUE`, the value `""""` represents
#' a single quote, `\"`.
#' @param escape_backslash Does the file use backslashes to escape special
#' characters? This is more general than `escape_double` as backslashes
#' can be used to escape the delimiter character, the quote character, or
#' to add special characters like `\\n`.
#' @param schema [Schema] that describes the table. If provided, it will be
#' used to satisfy both `col_names` and `col_types`.
#' @param col_names If `TRUE`, the first row of the input will be used as the
#' column names and will not be included in the data frame. If `FALSE`, column
#' names will be generated by Arrow, starting with "f0", "f1", ..., "fN".
#' Alternatively, you can specify a character vector of column names.
#' @param col_types A compact string representation of the column types, or
#' `NULL` (the default) to infer types from the data.
#' @param col_select A character vector of column names to keep, as in the
#' "select" argument to `data.table::fread()`, or a
#' [tidy selection specification][tidyselect::vars_select()]
#' of columns, as used in `dplyr::select()`.
#' @param na A character vector of strings to interpret as missing values.
#' @param quoted_na Should missing values inside quotes be treated as missing
#' values (the default) or strings. (Note that this is different from the
#' the Arrow C++ default for the corresponding convert option,
#' `strings_can_be_null`.)
#' @param skip_empty_rows Should blank rows be ignored altogether? If
#' `TRUE`, blank rows will not be represented at all. If `FALSE`, they will be
#' filled with missings.
#' @param skip Number of lines to skip before reading data.
#' @param timestamp_parsers User-defined timestamp parsers. If more than one
#' parser is specified, the CSV conversion logic will try parsing values
#' starting from the beginning of this vector. Possible values are:
#' - `NULL`: the default, which uses the ISO-8601 parser
#' - a character vector of [strptime][base::strptime()] parse strings
#' - a list of [TimestampParser] objects
#' @param parse_options see [file reader options][CsvReadOptions].
#' If given, this overrides any
#' parsing options provided in other arguments (e.g. `delim`, `quote`, etc.).
#' @param convert_options see [file reader options][CsvReadOptions]
#' @param read_options see [file reader options][CsvReadOptions]
#' @param as_data_frame Should the function return a `data.frame` (default) or
#' an Arrow [Table]?
#'
#' @return A `data.frame`, or a Table if `as_data_frame = FALSE`.
#' @export
#' @examplesIf arrow_available()
#' tf <- tempfile()
#' on.exit(unlink(tf))
#' write.csv(mtcars, file = tf)
#' df <- read_csv_arrow(tf)
#' dim(df)
#' # Can select columns
#' df <- read_csv_arrow(tf, col_select = starts_with("d"))
read_delim_arrow <- function(file,
delim = ",",
quote = '"',
escape_double = TRUE,
escape_backslash = FALSE,
schema = NULL,
col_names = TRUE,
col_types = NULL,
col_select = NULL,
na = c("", "NA"),
quoted_na = TRUE,
skip_empty_rows = TRUE,
skip = 0L,
parse_options = NULL,
convert_options = NULL,
read_options = NULL,
as_data_frame = TRUE,
timestamp_parsers = NULL) {
if (inherits(schema, "Schema")) {
col_names <- names(schema)
col_types <- schema
}
if (is.null(parse_options)) {
parse_options <- readr_to_csv_parse_options(
delim,
quote,
escape_double,
escape_backslash,
skip_empty_rows
)
}
if (is.null(read_options)) {
read_options <- readr_to_csv_read_options(skip, col_names)
}
if (is.null(convert_options)) {
convert_options <- readr_to_csv_convert_options(
na,
quoted_na,
col_types = col_types,
col_names = read_options$column_names,
timestamp_parsers = timestamp_parsers
)
}
if (!inherits(file, "InputStream")) {
file <- make_readable_file(file)
on.exit(file$close())
}
reader <- CsvTableReader$create(
file,
read_options = read_options,
parse_options = parse_options,
convert_options = convert_options
)
tab <- reader$Read()
# TODO: move this into convert_options using include_columns
col_select <- enquo(col_select)
if (!quo_is_null(col_select)) {
tab <- tab[vars_select(names(tab), !!col_select)]
}
if (isTRUE(as_data_frame)) {
tab <- as.data.frame(tab)
}
tab
}
#' @rdname read_delim_arrow
#' @export
read_csv_arrow <- function(file,
quote = '"',
escape_double = TRUE,
escape_backslash = FALSE,
schema = NULL,
col_names = TRUE,
col_types = NULL,
col_select = NULL,
na = c("", "NA"),
quoted_na = TRUE,
skip_empty_rows = TRUE,
skip = 0L,
parse_options = NULL,
convert_options = NULL,
read_options = NULL,
as_data_frame = TRUE,
timestamp_parsers = NULL) {
mc <- match.call()
mc$delim <- ","
mc[[1]] <- get("read_delim_arrow", envir = asNamespace("arrow"))
eval.parent(mc)
}
#' @rdname read_delim_arrow
#' @export
read_tsv_arrow <- function(file,
quote = '"',
escape_double = TRUE,
escape_backslash = FALSE,
schema = NULL,
col_names = TRUE,
col_types = NULL,
col_select = NULL,
na = c("", "NA"),
quoted_na = TRUE,
skip_empty_rows = TRUE,
skip = 0L,
parse_options = NULL,
convert_options = NULL,
read_options = NULL,
as_data_frame = TRUE,
timestamp_parsers = NULL) {
mc <- match.call()
mc$delim <- "\t"
mc[[1]] <- get("read_delim_arrow", envir = asNamespace("arrow"))
eval.parent(mc)
}
#' @title Arrow CSV and JSON table reader classes
#' @rdname CsvTableReader
#' @name CsvTableReader
#' @docType class
#' @usage NULL
#' @format NULL
#' @description `CsvTableReader` and `JsonTableReader` wrap the Arrow C++ CSV
#' and JSON table readers. See their usage in [read_csv_arrow()] and
#' [read_json_arrow()], respectively.
#'
#' @section Factory:
#'
#' The `CsvTableReader$create()` and `JsonTableReader$create()` factory methods
#' take the following arguments:
#'
#' - `file` An Arrow [InputStream]
#' - `convert_options` (CSV only), `parse_options`, `read_options`: see
#' [CsvReadOptions]
#' - `...` additional parameters.
#'
#' @section Methods:
#'
#' - `$Read()`: returns an Arrow Table.
#'
#' @include arrow-package.R
#' @export
CsvTableReader <- R6Class("CsvTableReader", inherit = ArrowObject,
public = list(
Read = function() csv___TableReader__Read(self)
)
)
CsvTableReader$create <- function(file,
read_options = CsvReadOptions$create(),
parse_options = CsvParseOptions$create(),
convert_options = CsvConvertOptions$create(),
...) {
assert_is(file, "InputStream")
csv___TableReader__Make(file, read_options, parse_options, convert_options)
}
#' @title File reader options
#' @rdname CsvReadOptions
#' @name CsvReadOptions
#' @docType class
#' @usage NULL
#' @format NULL
#' @description `CsvReadOptions`, `CsvParseOptions`, `CsvConvertOptions`,
#' `JsonReadOptions`, `JsonParseOptions`, and `TimestampParser` are containers for various
#' file reading options. See their usage in [read_csv_arrow()] and
#' [read_json_arrow()], respectively.
#'
#' @section Factory:
#'
#' The `CsvReadOptions$create()` and `JsonReadOptions$create()` factory methods
#' take the following arguments:
#'
#' - `use_threads` Whether to use the global CPU thread pool
#' - `block_size` Block size we request from the IO layer; also determines
#' the size of chunks when use_threads is `TRUE`. NB: if `FALSE`, JSON input
#' must end with an empty line.
#'
#' `CsvReadOptions$create()` further accepts these additional arguments:
#'
#' - `skip_rows` Number of lines to skip before reading data (default 0)
#' - `column_names` Character vector to supply column names. If length-0
#' (the default), the first non-skipped row will be parsed to generate column
#' names, unless `autogenerate_column_names` is `TRUE`.
#' - `autogenerate_column_names` Logical: generate column names instead of
#' using the first non-skipped row (the default)? If `TRUE`, column names will
#' be "f0", "f1", ..., "fN".
#'
#' `CsvParseOptions$create()` takes the following arguments:
#'
#' - `delimiter` Field delimiting character (default `","`)
#' - `quoting` Logical: are strings quoted? (default `TRUE`)
#' - `quote_char` Quoting character, if `quoting` is `TRUE`
#' - `double_quote` Logical: are quotes inside values double-quoted? (default `TRUE`)
#' - `escaping` Logical: whether escaping is used (default `FALSE`)
#' - `escape_char` Escaping character, if `escaping` is `TRUE`
#' - `newlines_in_values` Logical: are values allowed to contain CR (`0x0d`)
#' and LF (`0x0a`) characters? (default `FALSE`)
#' - `ignore_empty_lines` Logical: should empty lines be ignored (default) or
#' generate a row of missing values (if `FALSE`)?
#'
#' `JsonParseOptions$create()` accepts only the `newlines_in_values` argument.
#'
#' `CsvConvertOptions$create()` takes the following arguments:
#'
#' - `check_utf8` Logical: check UTF8 validity of string columns? (default `TRUE`)
#' - `null_values` character vector of recognized spellings for null values.
#' Analogous to the `na.strings` argument to
#' [`read.csv()`][utils::read.csv()] or `na` in `readr::read_csv()`.
#' - `strings_can_be_null` Logical: can string / binary columns have
#' null values? Similar to the `quoted_na` argument to `readr::read_csv()`.
#' (default `FALSE`)
#' - `true_values` character vector of recognized spellings for `TRUE` values
#' - `false_values` character vector of recognized spellings for `FALSE` values
#' - `col_types` A `Schema` or `NULL` to infer types
#' - `auto_dict_encode` Logical: Whether to try to automatically
#' dictionary-encode string / binary data (think `stringsAsFactors`). Default `FALSE`.
#' This setting is ignored for non-inferred columns (those in `col_types`).
#' - `auto_dict_max_cardinality` If `auto_dict_encode`, string/binary columns
#' are dictionary-encoded up to this number of unique values (default 50),
#' after which it switches to regular encoding.
#' - `include_columns` If non-empty, indicates the names of columns from the
#' CSV file that should be actually read and converted (in the vector's order).
#' - `include_missing_columns` Logical: if `include_columns` is provided, should
#' columns named in it but not found in the data be included as a column of
#' type `null()`? The default (`FALSE`) means that the reader will instead
#' raise an error.
#' - `timestamp_parsers` User-defined timestamp parsers. If more than one
#' parser is specified, the CSV conversion logic will try parsing values
#' starting from the beginning of this vector. Possible values are
#' (a) `NULL`, the default, which uses the ISO-8601 parser;
#' (b) a character vector of [strptime][base::strptime()] parse strings; or
#' (c) a list of [TimestampParser] objects.
#'
#' `TimestampParser$create()` takes an optional `format` string argument.
#' See [`strptime()`][base::strptime()] for example syntax.
#' The default is to use an ISO-8601 format parser.
#'
#' The `CsvWriteOptions$create()` factory method takes the following arguments:
#' - `include_header` Whether to write an initial header line with column names
#' - `batch_size` Maximum number of rows processed at a time. Default is 1024.
#'
#' @section Active bindings:
#'
#' - `column_names`: from `CsvReadOptions`
#'
#' @export
CsvReadOptions <- R6Class("CsvReadOptions",
inherit = ArrowObject,
active = list(
column_names = function() csv___ReadOptions__column_names(self)
)
)
CsvReadOptions$create <- function(use_threads = option_use_threads(),
block_size = 1048576L,
skip_rows = 0L,
column_names = character(0),
autogenerate_column_names = FALSE) {
csv___ReadOptions__initialize(
list(
use_threads = use_threads,
block_size = block_size,
skip_rows = skip_rows,
column_names = column_names,
autogenerate_column_names = autogenerate_column_names
)
)
}
#' @rdname CsvReadOptions
#' @export
CsvWriteOptions <- R6Class("CsvWriteOptions", inherit = ArrowObject)
CsvWriteOptions$create <- function(include_header = TRUE, batch_size = 1024L) {
assert_that(is_integerish(batch_size, n = 1, finite = TRUE), batch_size > 0)
csv___WriteOptions__initialize(
list(
include_header = include_header,
batch_size = as.integer(batch_size)
)
)
}
readr_to_csv_read_options <- function(skip, col_names, col_types) {
if (isTRUE(col_names)) {
# C++ default to parse is 0-length string array
col_names <- character(0)
}
if (identical(col_names, FALSE)) {
CsvReadOptions$create(skip_rows = skip, autogenerate_column_names = TRUE)
} else {
CsvReadOptions$create(skip_rows = skip, column_names = col_names)
}
}
#' @rdname CsvReadOptions
#' @usage NULL
#' @format NULL
#' @docType class
#' @export
CsvParseOptions <- R6Class("CsvParseOptions", inherit = ArrowObject)
CsvParseOptions$create <- function(delimiter = ",",
quoting = TRUE,
quote_char = '"',
double_quote = TRUE,
escaping = FALSE,
escape_char = '\\',
newlines_in_values = FALSE,
ignore_empty_lines = TRUE) {
csv___ParseOptions__initialize(
list(
delimiter = delimiter,
quoting = quoting,
quote_char = quote_char,
double_quote = double_quote,
escaping = escaping,
escape_char = escape_char,
newlines_in_values = newlines_in_values,
ignore_empty_lines = ignore_empty_lines
)
)
}
readr_to_csv_parse_options <- function(delim = ",",
quote = '"',
escape_double = TRUE,
escape_backslash = FALSE,
skip_empty_rows = TRUE) {
# This function translates from the readr argument list to the arrow arg names
# TODO: validate inputs
CsvParseOptions$create(
delimiter = delim,
quoting = nzchar(quote),
quote_char = quote,
double_quote = escape_double,
escaping = escape_backslash,
escape_char = '\\',
newlines_in_values = escape_backslash,
ignore_empty_lines = skip_empty_rows
)
}
#' @rdname CsvReadOptions
#' @usage NULL
#' @format NULL
#' @docType class
#' @export
TimestampParser <- R6Class("TimestampParser", inherit = ArrowObject,
public = list(
kind = function() TimestampParser__kind(self),
format = function() TimestampParser__format(self)
)
)
TimestampParser$create <- function(format = NULL) {
if (is.null(format)) {
TimestampParser__MakeISO8601()
} else {
TimestampParser__MakeStrptime(format)
}
}
#' @rdname CsvReadOptions
#' @usage NULL
#' @format NULL
#' @docType class
#' @export
CsvConvertOptions <- R6Class("CsvConvertOptions", inherit = ArrowObject)
CsvConvertOptions$create <- function(check_utf8 = TRUE,
null_values = c("", "NA"),
true_values = c("T", "true", "TRUE"),
false_values= c("F", "false", "FALSE"),
strings_can_be_null = FALSE,
col_types = NULL,
auto_dict_encode = FALSE,
auto_dict_max_cardinality = 50L,
include_columns = character(),
include_missing_columns = FALSE,
timestamp_parsers = NULL) {
if (!is.null(col_types) && !inherits(col_types, "Schema")) {
abort(c(
"Unsupported `col_types` specification.",
i = "`col_types` must be NULL, or a <Schema>."
))
}
csv___ConvertOptions__initialize(
list(
check_utf8 = check_utf8,
null_values = null_values,
strings_can_be_null = strings_can_be_null,
col_types = col_types,
true_values = true_values,
false_values = false_values,
auto_dict_encode = auto_dict_encode,
auto_dict_max_cardinality = auto_dict_max_cardinality,
include_columns = include_columns,
include_missing_columns = include_missing_columns,
timestamp_parsers = timestamp_parsers
)
)
}
readr_to_csv_convert_options <- function(na,
quoted_na,
col_types = NULL,
col_names = NULL,
timestamp_parsers = NULL) {
include_columns <- character()
if (is.character(col_types)) {
if (length(col_types) != 1L) {
abort("`col_types` is a character vector that is not of size 1")
}
n <- nchar(col_types)
specs <- substring(col_types, seq_len(n), seq_len(n))
if (!is_bare_character(col_names, n)) {
abort("Compact specification for `col_types` requires `col_names`")
}
col_types <- set_names(nm = col_names, map2(specs, col_names, ~{
switch(.x,
"c" = utf8(),
"i" = int32(),
"n" = float64(),
"d" = float64(),
"l" = bool(),
"f" = dictionary(),
"D" = date32(),
"T" = time32(),
"t" = timestamp(),
"_" = null(),
"-" = null(),
"?" = NULL,
abort("Unsupported compact specification: '", .x,"' for column '", .y, "'")
)
}))
# To "guess" types, omit them from col_types
col_types <- keep(col_types, ~!is.null(.x))
col_types <- schema(!!!col_types)
}
if (!is.null(col_types)) {
assert_is(col_types, "Schema")
# If any columns are null(), drop them
# (by specifying the other columns in include_columns)
nulls <- map_lgl(col_types$fields, ~.$type$Equals(null()))
if (any(nulls)) {
include_columns <- setdiff(col_names, names(col_types)[nulls])
}
}
CsvConvertOptions$create(
null_values = na,
strings_can_be_null = quoted_na,
col_types = col_types,
timestamp_parsers = timestamp_parsers,
include_columns = include_columns
)
}
#' Write CSV file to disk
#'
#' @param x `data.frame`, [RecordBatch], or [Table]
#' @param sink A string file path, URI, or [OutputStream], or path in a file
#' system (`SubTreeFileSystem`)
#' @param include_header Whether to write an initial header line with column names
#' @param batch_size Maximum number of rows processed at a time. Default is 1024.
#'
#' @return The input `x`, invisibly. Note that if `sink` is an [OutputStream],
#' the stream will be left open.
#' @export
#' @examplesIf arrow_available()
#' tf <- tempfile()
#' on.exit(unlink(tf))
#' write_csv_arrow(mtcars, tf)
#' @include arrow-package.R
write_csv_arrow <- function(x,
sink,
include_header = TRUE,
batch_size = 1024L) {
write_options <- CsvWriteOptions$create(include_header, batch_size)
x_out <- x
if (is.data.frame(x)) {
x <- Table$create(x)
}
assert_that(is_writable_table(x))
if (!inherits(sink, "OutputStream")) {
sink <- make_output_stream(sink)
on.exit(sink$close())
}
if (inherits(x, "RecordBatch")) {
csv___WriteCSV__RecordBatch(x, write_options, sink)
} else if (inherits(x, "Table")) {
csv___WriteCSV__Table(x, write_options, sink)
}
invisible(x_out)
}