blob: 1850eb255a4802e2feec440f845618608b29ed6c [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#' Read a CSV or other delimited file with Arrow
#'
#' These functions uses the Arrow C++ CSV reader to read into a `tibble`.
#' Arrow C++ options have been mapped to argument names that follow those of
#' `readr::read_delim()`, and `col_select` was inspired by `vroom::vroom()`.
#'
#' `read_csv_arrow()` and `read_tsv_arrow()` are wrappers around
#' `read_delim_arrow()` that specify a delimeter. `read_csv2_arrow()` uses `;` for
#' the delimeter and `,` for the decimal point.
#'
#' Note that not all `readr` options are currently implemented here. Please file
#' an issue if you encounter one that `arrow` should support.
#'
#' If you need to control Arrow-specific reader parameters that don't have an
#' equivalent in `readr::read_csv()`, you can either provide them in the
#' `parse_options`, `convert_options`, or `read_options` arguments, or you can
#' use [CsvTableReader] directly for lower-level access.
#'
#' @section Specifying column types and names:
#'
#' By default, the CSV reader will infer the column names and data types from the file, but there
#' are a few ways you can specify them directly.
#'
#' One way is to provide an Arrow [Schema] in the `schema` argument,
#' which is an ordered map of column name to type.
#' When provided, it satisfies both the `col_names` and `col_types` arguments.
#' This is good if you know all of this information up front.
#'
#' You can also pass a `Schema` to the `col_types` argument. If you do this,
#' column names will still be inferred from the file unless you also specify
#' `col_names`. In either case, the column names in the `Schema` must match the
#' data's column names, whether they are explicitly provided or inferred. That
#' said, this `Schema` does not have to reference all columns: those omitted
#' will have their types inferred.
#'
#' Alternatively, you can declare column types by providing the compact string representation
#' that `readr` uses to the `col_types` argument. This means you provide a
#' single string, one character per column, where the characters map to Arrow
#' types analogously to the `readr` type mapping:
#'
#' * "c": [utf8()]
#' * "i": [int32()]
#' * "n": [float64()]
#' * "d": [float64()]
#' * "l": [bool()]
#' * "f": [dictionary()]
#' * "D": [date32()]
#' * "T": [`timestamp(unit = "ns")`][timestamp()]
#' * "t": [time32()] (The `unit` arg is set to the default value `"ms"`)
#' * "_": [null()]
#' * "-": [null()]
#' * "?": infer the type from the data
#'
#' If you use the compact string representation for `col_types`, you must also
#' specify `col_names`.
#'
#' Regardless of how types are specified, all columns with a `null()` type will
#' be dropped.
#'
#' Note that if you are specifying column names, whether by `schema` or
#' `col_names`, and the CSV file has a header row that would otherwise be used
#' to idenfity column names, you'll need to add `skip = 1` to skip that row.
#'
#' @param file A character file name or URI, literal data (either a single string or a [raw] vector),
#' an Arrow input stream, or a `FileSystem` with path (`SubTreeFileSystem`).
#'
#' If a file name, a memory-mapped Arrow [InputStream] will be opened and
#' closed when finished; compression will be detected from the file extension
#' and handled automatically. If an input stream is provided, it will be left
#' open.
#'
#' To be recognised as literal data, the input must be wrapped with `I()`.
#' @param delim Single character used to separate fields within a record.
#' @param quote Single character used to quote strings.
#' @param escape_double Does the file escape quotes by doubling them?
#' i.e. If this option is `TRUE`, the value `""""` represents
#' a single quote, `\"`.
#' @param escape_backslash Does the file use backslashes to escape special
#' characters? This is more general than `escape_double` as backslashes
#' can be used to escape the delimiter character, the quote character, or
#' to add special characters like `\\n`.
#' @param schema [Schema] that describes the table. If provided, it will be
#' used to satisfy both `col_names` and `col_types`.
#' @param col_names If `TRUE`, the first row of the input will be used as the
#' column names and will not be included in the data frame. If `FALSE`, column
#' names will be generated by Arrow, starting with "f0", "f1", ..., "fN".
#' Alternatively, you can specify a character vector of column names.
#' @param col_types A compact string representation of the column types,
#' an Arrow [Schema], or `NULL` (the default) to infer types from the data.
#' @param col_select A character vector of column names to keep, as in the
#' "select" argument to `data.table::fread()`, or a
#' [tidy selection specification][tidyselect::eval_select()]
#' of columns, as used in `dplyr::select()`.
#' @param na A character vector of strings to interpret as missing values.
#' @param quoted_na Should missing values inside quotes be treated as missing
#' values (the default) or strings. (Note that this is different from the
#' the Arrow C++ default for the corresponding convert option,
#' `strings_can_be_null`.)
#' @param skip_empty_rows Should blank rows be ignored altogether? If
#' `TRUE`, blank rows will not be represented at all. If `FALSE`, they will be
#' filled with missings.
#' @param skip Number of lines to skip before reading data.
#' @param timestamp_parsers User-defined timestamp parsers. If more than one
#' parser is specified, the CSV conversion logic will try parsing values
#' starting from the beginning of this vector. Possible values are:
#' - `NULL`: the default, which uses the ISO-8601 parser
#' - a character vector of [strptime][base::strptime()] parse strings
#' - a list of [TimestampParser] objects
#' @param parse_options see [CSV parsing options][csv_parse_options()].
#' If given, this overrides any
#' parsing options provided in other arguments (e.g. `delim`, `quote`, etc.).
#' @param convert_options see [CSV conversion options][csv_convert_options()]
#' @param read_options see [CSV reading options][csv_read_options()]
#' @param as_data_frame Should the function return a `tibble` (default) or
#' an Arrow [Table]?
#' @param decimal_point Character to use for decimal point in floating point numbers.
#'
#' @return A `tibble`, or a Table if `as_data_frame = FALSE`.
#' @export
#' @examples
#' tf <- tempfile()
#' on.exit(unlink(tf))
#' write.csv(mtcars, file = tf)
#' df <- read_csv_arrow(tf)
#' dim(df)
#' # Can select columns
#' df <- read_csv_arrow(tf, col_select = starts_with("d"))
#'
#' # Specifying column types and names
#' write.csv(data.frame(x = c(1, 3), y = c(2, 4)), file = tf, row.names = FALSE)
#' read_csv_arrow(tf, schema = schema(x = int32(), y = utf8()), skip = 1)
#' read_csv_arrow(tf, col_types = schema(y = utf8()))
#' read_csv_arrow(tf, col_types = "ic", col_names = c("x", "y"), skip = 1)
#'
#' # Note that if a timestamp column contains time zones,
#' # the string "T" `col_types` specification won't work.
#' # To parse timestamps with time zones, provide a [Schema] to `col_types`
#' # and specify the time zone in the type object:
#' tf <- tempfile()
#' write.csv(data.frame(x = "1970-01-01T12:00:00+12:00"), file = tf, row.names = FALSE)
#' read_csv_arrow(
#' tf,
#' col_types = schema(x = timestamp(unit = "us", timezone = "UTC"))
#' )
#'
#' # Read directly from strings with `I()`
#' read_csv_arrow(I("x,y\n1,2\n3,4"))
#' read_delim_arrow(I(c("x y", "1 2", "3 4")), delim = " ")
read_delim_arrow <- function(file,
delim = ",",
quote = '"',
escape_double = TRUE,
escape_backslash = FALSE,
schema = NULL,
col_names = TRUE,
col_types = NULL,
col_select = NULL,
na = c("", "NA"),
quoted_na = TRUE,
skip_empty_rows = TRUE,
skip = 0L,
parse_options = NULL,
convert_options = NULL,
read_options = NULL,
as_data_frame = TRUE,
timestamp_parsers = NULL,
decimal_point = ".") {
if (inherits(schema, "Schema")) {
col_names <- names(schema)
col_types <- schema
}
if (is.null(parse_options)) {
parse_options <- readr_to_csv_parse_options(
delim,
quote,
escape_double,
escape_backslash,
skip_empty_rows
)
}
if (is.null(read_options)) {
read_options <- readr_to_csv_read_options(skip, col_names)
}
if (is.null(convert_options)) {
convert_options <- readr_to_csv_convert_options(
na = na,
quoted_na = quoted_na,
decimal_point = decimal_point,
col_types = col_types,
col_names = read_options$column_names,
timestamp_parsers = timestamp_parsers
)
}
if (inherits(file, "AsIs")) {
if (is.raw(file)) {
# If a raw vector is wrapped by `I()`, we need to unclass the `AsIs` class to read the raw vector.
file <- unclass(file)
} else {
file <- charToRaw(paste(file, collapse = "\n"))
}
}
if (!inherits(file, "InputStream")) {
compression <- detect_compression(file)
file <- make_readable_file(file, random_access = FALSE)
if (compression != "uncompressed") {
# TODO: accept compression and compression_level as args
file <- CompressedInputStream$create(file, compression)
}
on.exit(file$close())
}
reader <- CsvTableReader$create(
file,
read_options = read_options,
parse_options = parse_options,
convert_options = convert_options
)
tryCatch(
tab <- reader$Read(),
# n = 4 because we want the error to show up as being from read_delim_arrow()
# and not augment_io_error_msg()
error = function(e, call = caller_env(n = 4)) {
augment_io_error_msg(e, call, schema = schema)
}
)
# TODO: move this into convert_options using include_columns
col_select <- enquo(col_select)
if (!quo_is_null(col_select)) {
sim_df <- as.data.frame(tab$schema)
tab <- tab[eval_select(col_select, sim_df)]
}
if (isTRUE(as_data_frame)) {
tab <- collect.ArrowTabular(tab)
}
tab
}
#' @rdname read_delim_arrow
#' @export
read_csv_arrow <- function(file,
quote = '"',
escape_double = TRUE,
escape_backslash = FALSE,
schema = NULL,
col_names = TRUE,
col_types = NULL,
col_select = NULL,
na = c("", "NA"),
quoted_na = TRUE,
skip_empty_rows = TRUE,
skip = 0L,
parse_options = NULL,
convert_options = NULL,
read_options = NULL,
as_data_frame = TRUE,
timestamp_parsers = NULL) {
mc <- match.call()
mc$delim <- ","
mc[[1]] <- get("read_delim_arrow", envir = asNamespace("arrow"))
eval.parent(mc)
}
#' @rdname read_delim_arrow
#' @export
read_csv2_arrow <- function(file,
quote = '"',
escape_double = TRUE,
escape_backslash = FALSE,
schema = NULL,
col_names = TRUE,
col_types = NULL,
col_select = NULL,
na = c("", "NA"),
quoted_na = TRUE,
skip_empty_rows = TRUE,
skip = 0L,
parse_options = NULL,
convert_options = NULL,
read_options = NULL,
as_data_frame = TRUE,
timestamp_parsers = NULL) {
mc <- match.call()
mc$delim <- ";"
mc$decimal_point <- ","
mc[[1]] <- get("read_delim_arrow", envir = asNamespace("arrow"))
eval.parent(mc)
}
#' @rdname read_delim_arrow
#' @export
read_tsv_arrow <- function(file,
quote = '"',
escape_double = TRUE,
escape_backslash = FALSE,
schema = NULL,
col_names = TRUE,
col_types = NULL,
col_select = NULL,
na = c("", "NA"),
quoted_na = TRUE,
skip_empty_rows = TRUE,
skip = 0L,
parse_options = NULL,
convert_options = NULL,
read_options = NULL,
as_data_frame = TRUE,
timestamp_parsers = NULL) {
mc <- match.call()
mc$delim <- "\t"
mc[[1]] <- get("read_delim_arrow", envir = asNamespace("arrow"))
eval.parent(mc)
}
#' @title Arrow CSV and JSON table reader classes
#' @rdname CsvTableReader
#' @name CsvTableReader
#' @docType class
#' @usage NULL
#' @format NULL
#' @description `CsvTableReader` and `JsonTableReader` wrap the Arrow C++ CSV
#' and JSON table readers. See their usage in [read_csv_arrow()] and
#' [read_json_arrow()], respectively.
#'
#' @section Factory:
#'
#' The `CsvTableReader$create()` and `JsonTableReader$create()` factory methods
#' take the following arguments:
#'
#' - `file` An Arrow [InputStream]
#' - `convert_options` (CSV only), `parse_options`, `read_options`: see
#' [CsvReadOptions]
#' - `...` additional parameters.
#'
#' @section Methods:
#'
#' - `$Read()`: returns an Arrow Table.
#'
#' @include arrow-object.R
#' @export
CsvTableReader <- R6Class("CsvTableReader",
inherit = ArrowObject,
public = list(
Read = function() csv___TableReader__Read(self)
)
)
CsvTableReader$create <- function(file,
read_options = csv_read_options(),
parse_options = csv_parse_options(),
convert_options = csv_convert_options(),
...) {
assert_is(file, "InputStream")
if (is.list(read_options)) {
read_options <- do.call(csv_read_options, read_options)
}
if (is.list(parse_options)) {
parse_options <- do.call(csv_parse_options, parse_options)
}
if (is.list(convert_options)) {
convert_options <- do.call(csv_convert_options, convert_options)
}
if (!(tolower(read_options$encoding) %in% c("utf-8", "utf8"))) {
file <- MakeReencodeInputStream(file, read_options$encoding)
}
csv___TableReader__Make(file, read_options, parse_options, convert_options)
}
#' CSV Reading Options
#'
#' @param use_threads Whether to use the global CPU thread pool
#' @param block_size Block size we request from the IO layer; also determines
#' the size of chunks when use_threads is `TRUE`.
#' @param skip_rows Number of lines to skip before reading data (default 0).
#' @param column_names Character vector to supply column names. If length-0
#' (the default), the first non-skipped row will be parsed to generate column
#' names, unless `autogenerate_column_names` is `TRUE`.
#' @param autogenerate_column_names Logical: generate column names instead of
#' using the first non-skipped row (the default)? If `TRUE`, column names will
#' be "f0", "f1", ..., "fN".
#' @param encoding The file encoding. (default `"UTF-8"`)
#' @param skip_rows_after_names Number of lines to skip after the column names (default 0).
#' This number can be larger than the number of rows in one block, and empty rows are counted.
#' The order of application is as follows:
#' - `skip_rows` is applied (if non-zero);
#' - column names are read (unless `column_names` is set);
#' - `skip_rows_after_names` is applied (if non-zero).
#'
#' @examplesIf arrow_with_dataset()
#' tf <- tempfile()
#' on.exit(unlink(tf))
#' writeLines("my file has a non-data header\nx\n1\n2", tf)
#' read_csv_arrow(tf, read_options = csv_read_options(skip_rows = 1))
#' open_csv_dataset(tf, read_options = csv_read_options(skip_rows = 1))
#' @export
csv_read_options <- function(use_threads = option_use_threads(),
block_size = 1048576L,
skip_rows = 0L,
column_names = character(0),
autogenerate_column_names = FALSE,
encoding = "UTF-8",
skip_rows_after_names = 0L) {
assert_that(is.string(encoding))
options <- csv___ReadOptions__initialize(
list(
use_threads = use_threads,
block_size = block_size,
skip_rows = skip_rows,
skip_rows_after_names = skip_rows_after_names,
column_names = column_names,
autogenerate_column_names = autogenerate_column_names
)
)
options$encoding <- encoding
options
}
#' @title File reader options
#' @rdname CsvReadOptions
#' @name CsvReadOptions
#' @docType class
#' @usage NULL
#' @format NULL
#' @description `CsvReadOptions`, `CsvParseOptions`, `CsvConvertOptions`,
#' `JsonReadOptions`, `JsonParseOptions`, and `TimestampParser` are containers for various
#' file reading options. See their usage in [read_csv_arrow()] and
#' [read_json_arrow()], respectively.
#'
#' @section Factory:
#'
#' The `CsvReadOptions$create()` and `JsonReadOptions$create()` factory methods
#' take the following arguments:
#'
#' - `use_threads` Whether to use the global CPU thread pool
#' - `block_size` Block size we request from the IO layer; also determines
#' the size of chunks when use_threads is `TRUE`. NB: if `FALSE`, JSON input
#' must end with an empty line.
#'
#' `CsvReadOptions$create()` further accepts these additional arguments:
#'
#' - `skip_rows` Number of lines to skip before reading data (default 0).
#' - `column_names` Character vector to supply column names. If length-0
#' (the default), the first non-skipped row will be parsed to generate column
#' names, unless `autogenerate_column_names` is `TRUE`.
#' - `autogenerate_column_names` Logical: generate column names instead of
#' using the first non-skipped row (the default)? If `TRUE`, column names will
#' be "f0", "f1", ..., "fN".
#' - `encoding` The file encoding. (default `"UTF-8"`)
#' - `skip_rows_after_names` Number of lines to skip after the column names (default 0).
#' This number can be larger than the number of rows in one block, and empty rows are counted.
#' The order of application is as follows:
#' - `skip_rows` is applied (if non-zero);
#' - column names are read (unless `column_names` is set);
#' - `skip_rows_after_names` is applied (if non-zero).
#'
#' `CsvParseOptions$create()` takes the following arguments:
#'
#' - `delimiter` Field delimiting character (default `","`)
#' - `quoting` Logical: are strings quoted? (default `TRUE`)
#' - `quote_char` Quoting character, if `quoting` is `TRUE` (default `'"'`)
#' - `double_quote` Logical: are quotes inside values double-quoted? (default `TRUE`)
#' - `escaping` Logical: whether escaping is used (default `FALSE`)
#' - `escape_char` Escaping character, if `escaping` is `TRUE` (default `"\\"`)
#' - `newlines_in_values` Logical: are values allowed to contain CR (`0x0d`)
#' and LF (`0x0a`) characters? (default `FALSE`)
#' - `ignore_empty_lines` Logical: should empty lines be ignored (default) or
#' generate a row of missing values (if `FALSE`)?
#'
#' `JsonParseOptions$create()` accepts only the `newlines_in_values` argument.
#'
#' `CsvConvertOptions$create()` takes the following arguments:
#'
#' - `check_utf8` Logical: check UTF8 validity of string columns? (default `TRUE`)
#' - `null_values` character vector of recognized spellings for null values.
#' Analogous to the `na.strings` argument to
#' [`read.csv()`][utils::read.csv()] or `na` in [readr::read_csv()].
#' - `strings_can_be_null` Logical: can string / binary columns have
#' null values? Similar to the `quoted_na` argument to [readr::read_csv()].
#' (default `FALSE`)
#' - `true_values` character vector of recognized spellings for `TRUE` values
#' - `false_values` character vector of recognized spellings for `FALSE` values
#' - `col_types` A `Schema` or `NULL` to infer types
#' - `auto_dict_encode` Logical: Whether to try to automatically
#' dictionary-encode string / binary data (think `stringsAsFactors`). Default `FALSE`.
#' This setting is ignored for non-inferred columns (those in `col_types`).
#' - `auto_dict_max_cardinality` If `auto_dict_encode`, string/binary columns
#' are dictionary-encoded up to this number of unique values (default 50),
#' after which it switches to regular encoding.
#' - `include_columns` If non-empty, indicates the names of columns from the
#' CSV file that should be actually read and converted (in the vector's order).
#' - `include_missing_columns` Logical: if `include_columns` is provided, should
#' columns named in it but not found in the data be included as a column of
#' type `null()`? The default (`FALSE`) means that the reader will instead
#' raise an error.
#' - `timestamp_parsers` User-defined timestamp parsers. If more than one
#' parser is specified, the CSV conversion logic will try parsing values
#' starting from the beginning of this vector. Possible values are
#' (a) `NULL`, the default, which uses the ISO-8601 parser;
#' (b) a character vector of [strptime][base::strptime()] parse strings; or
#' (c) a list of [TimestampParser] objects.
#' - `decimal_point` Character to use for decimal point in floating point numbers. Default: "."
#'
#' `TimestampParser$create()` takes an optional `format` string argument.
#' See [`strptime()`][base::strptime()] for example syntax.
#' The default is to use an ISO-8601 format parser.
#'
#' The `CsvWriteOptions$create()` factory method takes the following arguments:
#' - `include_header` Whether to write an initial header line with column names
#' - `batch_size` Maximum number of rows processed at a time. Default is 1024.
#' - `null_string` The string to be written for null values. Must not contain
#' quotation marks. Default is an empty string (`""`).
#' - `eol` The end of line character to use for ending rows.
#' - `delimiter` Field delimiter
#' - `quoting_style` Quoting style: "Needed" (Only enclose values in quotes which need them, because their CSV
#' rendering can contain quotes itself (e.g. strings or binary values)), "AllValid" (Enclose all valid values in
#' quotes), or "None" (Do not enclose any values in quotes).
#'
#' @section Active bindings:
#'
#' - `column_names`: from `CsvReadOptions`
#'
#' @export
CsvReadOptions <- R6Class("CsvReadOptions",
inherit = ArrowObject,
public = list(
encoding = NULL,
print = function(...) {
cat("CsvReadOptions\n")
for (attr in c(
"column_names", "block_size", "skip_rows", "autogenerate_column_names",
"use_threads", "skip_rows_after_names", "encoding"
)) {
cat(sprintf("%s: %s\n", attr, self[[attr]]))
}
invisible(self)
}
),
active = list(
column_names = function() csv___ReadOptions__column_names(self),
block_size = function() csv___ReadOptions__block_size(self),
skip_rows = function() csv___ReadOptions__skip_rows(self),
autogenerate_column_names = function() csv___ReadOptions__autogenerate_column_names(self),
use_threads = function() csv___ReadOptions__use_threads(self),
skip_rows_after_names = function() csv___ReadOptions__skip_rows_after_names(self)
)
)
CsvReadOptions$create <- csv_read_options
readr_to_csv_write_options <- function(col_names = TRUE,
batch_size = 1024L,
delim = ",",
na = "",
eol = "\n",
quote = c("needed", "all", "none")) {
quoting_style_arrow_opts <- c("Needed", "AllValid", "None")
quote <- match(match.arg(quote), c("needed", "all", "none"))
quote <- quoting_style_arrow_opts[quote]
csv_write_options(
include_header = col_names,
batch_size = batch_size,
delimiter = delim,
null_string = na,
eol = eol,
quoting_style = quote
)
}
#' CSV Writing Options
#'
#' @param include_header Whether to write an initial header line with column names
#' @param batch_size Maximum number of rows processed at a time.
#' @param null_string The string to be written for null values. Must not contain quotation marks.
#' @param delimiter Field delimiter
#' @param eol The end of line character to use for ending rows
#' @param quoting_style How to handle quotes. "Needed" (Only enclose values in quotes which need them, because their CSV
#' rendering can contain quotes itself (e.g. strings or binary values)), "AllValid" (Enclose all valid values in
#' quotes), or "None" (Do not enclose any values in quotes).
#'
#' @examples
#' tf <- tempfile()
#' on.exit(unlink(tf))
#' write_csv_arrow(airquality, tf, write_options = csv_write_options(null_string = "-99"))
#' @export
csv_write_options <- function(include_header = TRUE,
batch_size = 1024L,
null_string = "",
delimiter = ",",
eol = "\n",
quoting_style = c("Needed", "AllValid", "None")) {
quoting_style <- match.arg(quoting_style)
quoting_style_opts <- c("Needed", "AllValid", "None")
quoting_style <- match(quoting_style, quoting_style_opts) - 1L
assert_that(is.logical(include_header))
assert_that(is_integerish(batch_size, n = 1, finite = TRUE), batch_size > 0)
assert_that(is.character(delimiter))
assert_that(is.character(null_string))
assert_that(!is.na(null_string))
assert_that(length(null_string) == 1)
assert_that(!grepl('"', null_string), msg = "na argument must not contain quote characters.")
assert_that(is.character(eol))
csv___WriteOptions__initialize(
list(
include_header = include_header,
batch_size = as.integer(batch_size),
delimiter = delimiter,
null_string = as.character(null_string),
eol = eol,
quoting_style = quoting_style
)
)
}
#' @rdname CsvReadOptions
#' @export
CsvWriteOptions <- R6Class("CsvWriteOptions", inherit = ArrowObject)
CsvWriteOptions$create <- csv_write_options
readr_to_csv_read_options <- function(skip = 0, col_names = TRUE) {
if (isTRUE(col_names)) {
# C++ default to parse is 0-length string array
col_names <- character(0)
}
if (identical(col_names, FALSE)) {
csv_read_options(skip_rows = skip, autogenerate_column_names = TRUE)
} else {
csv_read_options(skip_rows = skip, column_names = col_names)
}
}
#' CSV Parsing Options
#'
#' @param delimiter Field delimiting character
#' @param quoting Logical: are strings quoted?
#' @param quote_char Quoting character, if `quoting` is `TRUE`
#' @param double_quote Logical: are quotes inside values double-quoted?
#' @param escaping Logical: whether escaping is used
#' @param escape_char Escaping character, if `escaping` is `TRUE`
#' @param newlines_in_values Logical: are values allowed to contain CR (`0x0d`)
#' and LF (`0x0a`) characters?
#' @param ignore_empty_lines Logical: should empty lines be ignored (default) or
#' generate a row of missing values (if `FALSE`)?
#' @examplesIf arrow_with_dataset()
#' tf <- tempfile()
#' on.exit(unlink(tf))
#' writeLines("x\n1\n\n2", tf)
#' read_csv_arrow(tf, parse_options = csv_parse_options(ignore_empty_lines = FALSE))
#' open_csv_dataset(tf, parse_options = csv_parse_options(ignore_empty_lines = FALSE))
#' @export
csv_parse_options <- function(delimiter = ",",
quoting = TRUE,
quote_char = '"',
double_quote = TRUE,
escaping = FALSE,
escape_char = "\\",
newlines_in_values = FALSE,
ignore_empty_lines = TRUE) {
csv___ParseOptions__initialize(
list(
delimiter = delimiter,
quoting = quoting,
quote_char = quote_char,
double_quote = double_quote,
escaping = escaping,
escape_char = escape_char,
newlines_in_values = newlines_in_values,
ignore_empty_lines = ignore_empty_lines
)
)
}
#' @rdname CsvReadOptions
#' @usage NULL
#' @format NULL
#' @docType class
#' @export
CsvParseOptions <- R6Class("CsvParseOptions", inherit = ArrowObject)
CsvParseOptions$create <- csv_parse_options
readr_to_csv_parse_options <- function(delim = ",",
quote = '"',
escape_double = TRUE,
escape_backslash = FALSE,
skip_empty_rows = TRUE) {
# This function translates from the readr argument list to the arrow arg names
# TODO: validate inputs
csv_parse_options(
delimiter = delim,
quoting = nzchar(quote),
quote_char = quote,
double_quote = escape_double,
escaping = escape_backslash,
escape_char = "\\",
newlines_in_values = escape_backslash,
ignore_empty_lines = skip_empty_rows
)
}
#' @rdname CsvReadOptions
#' @usage NULL
#' @format NULL
#' @docType class
#' @export
TimestampParser <- R6Class("TimestampParser",
inherit = ArrowObject,
public = list(
kind = function() TimestampParser__kind(self),
format = function() TimestampParser__format(self)
)
)
TimestampParser$create <- function(format = NULL) {
if (is.null(format)) {
TimestampParser__MakeISO8601()
} else {
TimestampParser__MakeStrptime(format)
}
}
#' CSV Convert Options
#'
#' @param check_utf8 Logical: check UTF8 validity of string columns?
#' @param null_values Character vector of recognized spellings for null values.
#' Analogous to the `na.strings` argument to
#' [`read.csv()`][utils::read.csv()] or `na` in [readr::read_csv()].
#' @param strings_can_be_null Logical: can string / binary columns have
#' null values? Similar to the `quoted_na` argument to [readr::read_csv()]
#' @param true_values Character vector of recognized spellings for `TRUE` values
#' @param false_values Character vector of recognized spellings for `FALSE` values
#' @param col_types A `Schema` or `NULL` to infer types
#' @param auto_dict_encode Logical: Whether to try to automatically
#' dictionary-encode string / binary data (think `stringsAsFactors`).
#' This setting is ignored for non-inferred columns (those in `col_types`).
#' @param auto_dict_max_cardinality If `auto_dict_encode`, string/binary columns
#' are dictionary-encoded up to this number of unique values (default 50),
#' after which it switches to regular encoding.
#' @param include_columns If non-empty, indicates the names of columns from the
#' CSV file that should be actually read and converted (in the vector's order).
#' @param include_missing_columns Logical: if `include_columns` is provided, should
#' columns named in it but not found in the data be included as a column of
#' type `null()`? The default (`FALSE`) means that the reader will instead
#' raise an error.
#' @param timestamp_parsers User-defined timestamp parsers. If more than one
#' parser is specified, the CSV conversion logic will try parsing values
#' starting from the beginning of this vector. Possible values are
#' (a) `NULL`, the default, which uses the ISO-8601 parser;
#' (b) a character vector of [strptime][base::strptime()] parse strings; or
#' (c) a list of [TimestampParser] objects.
#' @param decimal_point Character to use for decimal point in floating point numbers.
#'
#' @examplesIf arrow_with_dataset()
#' tf <- tempfile()
#' on.exit(unlink(tf))
#' writeLines("x\n1\nNULL\n2\nNA", tf)
#' read_csv_arrow(tf, convert_options = csv_convert_options(null_values = c("", "NA", "NULL")))
#' open_csv_dataset(tf, convert_options = csv_convert_options(null_values = c("", "NA", "NULL")))
#' @export
csv_convert_options <- function(check_utf8 = TRUE,
null_values = c("", "NA"),
true_values = c("T", "true", "TRUE"),
false_values = c("F", "false", "FALSE"),
strings_can_be_null = FALSE,
col_types = NULL,
auto_dict_encode = FALSE,
auto_dict_max_cardinality = 50L,
include_columns = character(),
include_missing_columns = FALSE,
timestamp_parsers = NULL,
decimal_point = ".") {
if (!is.null(col_types) && !inherits(col_types, "Schema")) {
abort(c(
"Unsupported `col_types` specification.",
i = "`col_types` must be NULL, or a <Schema>."
))
}
csv___ConvertOptions__initialize(
list(
check_utf8 = check_utf8,
null_values = null_values,
strings_can_be_null = strings_can_be_null,
col_types = col_types,
true_values = true_values,
false_values = false_values,
auto_dict_encode = auto_dict_encode,
auto_dict_max_cardinality = auto_dict_max_cardinality,
include_columns = include_columns,
include_missing_columns = include_missing_columns,
timestamp_parsers = timestamp_parsers,
decimal_point = decimal_point
)
)
}
#' @rdname CsvReadOptions
#' @usage NULL
#' @format NULL
#' @docType class
#' @export
CsvConvertOptions <- R6Class("CsvConvertOptions", inherit = ArrowObject)
CsvConvertOptions$create <- csv_convert_options
readr_to_csv_convert_options <- function(na,
quoted_na,
decimal_point,
col_types = NULL,
col_names = NULL,
timestamp_parsers = NULL) {
include_columns <- character()
if (is.character(col_types)) {
if (length(col_types) != 1L) {
abort("`col_types` is a character vector that is not of size 1")
}
n <- nchar(col_types)
specs <- substring(col_types, seq_len(n), seq_len(n))
if (!is_bare_character(col_names, n)) {
abort("Compact specification for `col_types` requires `col_names`")
}
col_types <- set_names(nm = col_names, map2(specs, col_names, ~ {
switch(.x,
"c" = utf8(),
"i" = int32(),
"n" = float64(),
"d" = float64(),
"l" = bool(),
"f" = dictionary(),
"D" = date32(),
"T" = timestamp(unit = "ns"),
"t" = time32(),
"_" = null(),
"-" = null(),
"?" = NULL,
abort("Unsupported compact specification: '", .x, "' for column '", .y, "'")
)
}))
# To "guess" types, omit them from col_types
col_types <- keep(col_types, ~ !is.null(.x))
col_types <- schema(col_types)
}
if (!is.null(col_types)) {
assert_is(col_types, "Schema")
# If any columns are null(), drop them
# (by specifying the other columns in include_columns)
nulls <- map_lgl(col_types$fields, ~ .$type$Equals(null()))
if (any(nulls)) {
include_columns <- setdiff(col_names, names(col_types)[nulls])
}
}
csv_convert_options(
null_values = na,
strings_can_be_null = quoted_na,
col_types = col_types,
timestamp_parsers = timestamp_parsers,
include_columns = include_columns,
decimal_point = decimal_point
)
}
#' Write CSV file to disk
#'
#' @param x `data.frame`, [RecordBatch], or [Table]
#' @param sink A string file path, URI, or [OutputStream], or path in a file
#' system (`SubTreeFileSystem`)
#' @param file file name. Specify this or `sink`, not both.
#' @param include_header Whether to write an initial header line with column names
#' @param col_names identical to `include_header`. Specify this or
#' `include_headers`, not both.
#' @param batch_size Maximum number of rows processed at a time. Default is 1024.
#' @param na value to write for NA values. Must not contain quote marks. Default
#' is `""`.
#' @param write_options see [CSV write options][csv_write_options]
#' @param ... additional parameters
#'
#' @return The input `x`, invisibly. Note that if `sink` is an [OutputStream],
#' the stream will be left open.
#' @export
#' @examples
#' tf <- tempfile()
#' on.exit(unlink(tf))
#' write_csv_arrow(mtcars, tf)
#' @include arrow-object.R
write_csv_arrow <- function(x,
sink,
file = NULL,
include_header = TRUE,
col_names = NULL,
batch_size = 1024L,
na = "",
write_options = NULL,
...) {
unsupported_passed_args <- names(list(...))
if (length(unsupported_passed_args)) {
stop(
"The following ",
ngettext(length(unsupported_passed_args), "argument is ", "arguments are "),
"not yet supported in Arrow: ",
oxford_paste(unsupported_passed_args),
call. = FALSE
)
}
if (!missing(file) && !missing(sink)) {
stop(
"You have supplied both \"file\" and \"sink\" arguments. Please ",
"supply only one of them.",
call. = FALSE
)
}
if (missing(sink) && !missing(file)) {
sink <- file
}
if (!missing(col_names) && !missing(include_header)) {
stop(
"You have supplied both \"col_names\" and \"include_header\" ",
"arguments. Please supply only one of them.",
call. = FALSE
)
}
if (missing(include_header) && !missing(col_names)) {
include_header <- col_names
}
if (is.null(write_options)) {
write_options <- readr_to_csv_write_options(
col_names = include_header,
batch_size = batch_size,
na = na
)
}
x_out <- x
if (!inherits(x, "ArrowTabular")) {
tryCatch(
x <- as_record_batch_reader(x),
error = function(e) {
if (grepl("Input data frame columns must be named", conditionMessage(e))) {
abort(conditionMessage(e), parent = NA)
} else {
abort(
paste0(
"x must be an object of class 'data.frame', 'RecordBatch', ",
"'Dataset', 'Table', or 'RecordBatchReader' not '", class(x)[1], "'."
),
parent = NA
)
}
}
)
}
if (!inherits(sink, "OutputStream")) {
compression <- detect_compression(sink)
sink <- make_output_stream(sink)
if (compression != "uncompressed") {
# TODO: accept compression and compression_level as args
sink <- CompressedOutputStream$create(sink, codec = compression)
}
on.exit(sink$close())
}
if (inherits(x, "RecordBatch")) {
csv___WriteCSV__RecordBatch(x, write_options, sink)
} else if (inherits(x, "Table")) {
csv___WriteCSV__Table(x, write_options, sink)
} else if (inherits(x, c("RecordBatchReader"))) {
csv___WriteCSV__RecordBatchReader(x, write_options, sink)
}
invisible(x_out)
}