| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| #' Read a CSV or other delimited file with Arrow |
| #' |
| #' These functions uses the Arrow C++ CSV reader to read into a `tibble`. |
| #' Arrow C++ options have been mapped to argument names that follow those of |
| #' `readr::read_delim()`, and `col_select` was inspired by `vroom::vroom()`. |
| #' |
| #' `read_csv_arrow()` and `read_tsv_arrow()` are wrappers around |
| #' `read_delim_arrow()` that specify a delimeter. `read_csv2_arrow()` uses `;` for |
| #' the delimeter and `,` for the decimal point. |
| #' |
| #' Note that not all `readr` options are currently implemented here. Please file |
| #' an issue if you encounter one that `arrow` should support. |
| #' |
| #' If you need to control Arrow-specific reader parameters that don't have an |
| #' equivalent in `readr::read_csv()`, you can either provide them in the |
| #' `parse_options`, `convert_options`, or `read_options` arguments, or you can |
| #' use [CsvTableReader] directly for lower-level access. |
| #' |
| #' @section Specifying column types and names: |
| #' |
| #' By default, the CSV reader will infer the column names and data types from the file, but there |
| #' are a few ways you can specify them directly. |
| #' |
| #' One way is to provide an Arrow [Schema] in the `schema` argument, |
| #' which is an ordered map of column name to type. |
| #' When provided, it satisfies both the `col_names` and `col_types` arguments. |
| #' This is good if you know all of this information up front. |
| #' |
| #' You can also pass a `Schema` to the `col_types` argument. If you do this, |
| #' column names will still be inferred from the file unless you also specify |
| #' `col_names`. In either case, the column names in the `Schema` must match the |
| #' data's column names, whether they are explicitly provided or inferred. That |
| #' said, this `Schema` does not have to reference all columns: those omitted |
| #' will have their types inferred. |
| #' |
| #' Alternatively, you can declare column types by providing the compact string representation |
| #' that `readr` uses to the `col_types` argument. This means you provide a |
| #' single string, one character per column, where the characters map to Arrow |
| #' types analogously to the `readr` type mapping: |
| #' |
| #' * "c": [utf8()] |
| #' * "i": [int32()] |
| #' * "n": [float64()] |
| #' * "d": [float64()] |
| #' * "l": [bool()] |
| #' * "f": [dictionary()] |
| #' * "D": [date32()] |
| #' * "T": [`timestamp(unit = "ns")`][timestamp()] |
| #' * "t": [time32()] (The `unit` arg is set to the default value `"ms"`) |
| #' * "_": [null()] |
| #' * "-": [null()] |
| #' * "?": infer the type from the data |
| #' |
| #' If you use the compact string representation for `col_types`, you must also |
| #' specify `col_names`. |
| #' |
| #' Regardless of how types are specified, all columns with a `null()` type will |
| #' be dropped. |
| #' |
| #' Note that if you are specifying column names, whether by `schema` or |
| #' `col_names`, and the CSV file has a header row that would otherwise be used |
| #' to idenfity column names, you'll need to add `skip = 1` to skip that row. |
| #' |
| #' @param file A character file name or URI, literal data (either a single string or a [raw] vector), |
| #' an Arrow input stream, or a `FileSystem` with path (`SubTreeFileSystem`). |
| #' |
| #' If a file name, a memory-mapped Arrow [InputStream] will be opened and |
| #' closed when finished; compression will be detected from the file extension |
| #' and handled automatically. If an input stream is provided, it will be left |
| #' open. |
| #' |
| #' To be recognised as literal data, the input must be wrapped with `I()`. |
| #' @param delim Single character used to separate fields within a record. |
| #' @param quote Single character used to quote strings. |
| #' @param escape_double Does the file escape quotes by doubling them? |
| #' i.e. If this option is `TRUE`, the value `""""` represents |
| #' a single quote, `\"`. |
| #' @param escape_backslash Does the file use backslashes to escape special |
| #' characters? This is more general than `escape_double` as backslashes |
| #' can be used to escape the delimiter character, the quote character, or |
| #' to add special characters like `\\n`. |
| #' @param schema [Schema] that describes the table. If provided, it will be |
| #' used to satisfy both `col_names` and `col_types`. |
| #' @param col_names If `TRUE`, the first row of the input will be used as the |
| #' column names and will not be included in the data frame. If `FALSE`, column |
| #' names will be generated by Arrow, starting with "f0", "f1", ..., "fN". |
| #' Alternatively, you can specify a character vector of column names. |
| #' @param col_types A compact string representation of the column types, |
| #' an Arrow [Schema], or `NULL` (the default) to infer types from the data. |
| #' @param col_select A character vector of column names to keep, as in the |
| #' "select" argument to `data.table::fread()`, or a |
| #' [tidy selection specification][tidyselect::eval_select()] |
| #' of columns, as used in `dplyr::select()`. |
| #' @param na A character vector of strings to interpret as missing values. |
| #' @param quoted_na Should missing values inside quotes be treated as missing |
| #' values (the default) or strings. (Note that this is different from the |
| #' the Arrow C++ default for the corresponding convert option, |
| #' `strings_can_be_null`.) |
| #' @param skip_empty_rows Should blank rows be ignored altogether? If |
| #' `TRUE`, blank rows will not be represented at all. If `FALSE`, they will be |
| #' filled with missings. |
| #' @param skip Number of lines to skip before reading data. |
| #' @param timestamp_parsers User-defined timestamp parsers. If more than one |
| #' parser is specified, the CSV conversion logic will try parsing values |
| #' starting from the beginning of this vector. Possible values are: |
| #' - `NULL`: the default, which uses the ISO-8601 parser |
| #' - a character vector of [strptime][base::strptime()] parse strings |
| #' - a list of [TimestampParser] objects |
| #' @param parse_options see [CSV parsing options][csv_parse_options()]. |
| #' If given, this overrides any |
| #' parsing options provided in other arguments (e.g. `delim`, `quote`, etc.). |
| #' @param convert_options see [CSV conversion options][csv_convert_options()] |
| #' @param read_options see [CSV reading options][csv_read_options()] |
| #' @param as_data_frame Should the function return a `tibble` (default) or |
| #' an Arrow [Table]? |
| #' @param decimal_point Character to use for decimal point in floating point numbers. |
| #' |
| #' @return A `tibble`, or a Table if `as_data_frame = FALSE`. |
| #' @export |
| #' @examples |
| #' tf <- tempfile() |
| #' on.exit(unlink(tf)) |
| #' write.csv(mtcars, file = tf) |
| #' df <- read_csv_arrow(tf) |
| #' dim(df) |
| #' # Can select columns |
| #' df <- read_csv_arrow(tf, col_select = starts_with("d")) |
| #' |
| #' # Specifying column types and names |
| #' write.csv(data.frame(x = c(1, 3), y = c(2, 4)), file = tf, row.names = FALSE) |
| #' read_csv_arrow(tf, schema = schema(x = int32(), y = utf8()), skip = 1) |
| #' read_csv_arrow(tf, col_types = schema(y = utf8())) |
| #' read_csv_arrow(tf, col_types = "ic", col_names = c("x", "y"), skip = 1) |
| #' |
| #' # Note that if a timestamp column contains time zones, |
| #' # the string "T" `col_types` specification won't work. |
| #' # To parse timestamps with time zones, provide a [Schema] to `col_types` |
| #' # and specify the time zone in the type object: |
| #' tf <- tempfile() |
| #' write.csv(data.frame(x = "1970-01-01T12:00:00+12:00"), file = tf, row.names = FALSE) |
| #' read_csv_arrow( |
| #' tf, |
| #' col_types = schema(x = timestamp(unit = "us", timezone = "UTC")) |
| #' ) |
| #' |
| #' # Read directly from strings with `I()` |
| #' read_csv_arrow(I("x,y\n1,2\n3,4")) |
| #' read_delim_arrow(I(c("x y", "1 2", "3 4")), delim = " ") |
| read_delim_arrow <- function(file, |
| delim = ",", |
| quote = '"', |
| escape_double = TRUE, |
| escape_backslash = FALSE, |
| schema = NULL, |
| col_names = TRUE, |
| col_types = NULL, |
| col_select = NULL, |
| na = c("", "NA"), |
| quoted_na = TRUE, |
| skip_empty_rows = TRUE, |
| skip = 0L, |
| parse_options = NULL, |
| convert_options = NULL, |
| read_options = NULL, |
| as_data_frame = TRUE, |
| timestamp_parsers = NULL, |
| decimal_point = ".") { |
| if (inherits(schema, "Schema")) { |
| col_names <- names(schema) |
| col_types <- schema |
| } |
| if (is.null(parse_options)) { |
| parse_options <- readr_to_csv_parse_options( |
| delim, |
| quote, |
| escape_double, |
| escape_backslash, |
| skip_empty_rows |
| ) |
| } |
| if (is.null(read_options)) { |
| read_options <- readr_to_csv_read_options(skip, col_names) |
| } |
| if (is.null(convert_options)) { |
| convert_options <- readr_to_csv_convert_options( |
| na = na, |
| quoted_na = quoted_na, |
| decimal_point = decimal_point, |
| col_types = col_types, |
| col_names = read_options$column_names, |
| timestamp_parsers = timestamp_parsers |
| ) |
| } |
| |
| if (inherits(file, "AsIs")) { |
| if (is.raw(file)) { |
| # If a raw vector is wrapped by `I()`, we need to unclass the `AsIs` class to read the raw vector. |
| file <- unclass(file) |
| } else { |
| file <- charToRaw(paste(file, collapse = "\n")) |
| } |
| } |
| |
| if (!inherits(file, "InputStream")) { |
| compression <- detect_compression(file) |
| file <- make_readable_file(file, random_access = FALSE) |
| if (compression != "uncompressed") { |
| # TODO: accept compression and compression_level as args |
| file <- CompressedInputStream$create(file, compression) |
| } |
| on.exit(file$close()) |
| } |
| |
| reader <- CsvTableReader$create( |
| file, |
| read_options = read_options, |
| parse_options = parse_options, |
| convert_options = convert_options |
| ) |
| |
| tryCatch( |
| tab <- reader$Read(), |
| # n = 4 because we want the error to show up as being from read_delim_arrow() |
| # and not augment_io_error_msg() |
| error = function(e, call = caller_env(n = 4)) { |
| augment_io_error_msg(e, call, schema = schema) |
| } |
| ) |
| |
| # TODO: move this into convert_options using include_columns |
| col_select <- enquo(col_select) |
| if (!quo_is_null(col_select)) { |
| sim_df <- as.data.frame(tab$schema) |
| tab <- tab[eval_select(col_select, sim_df)] |
| } |
| |
| if (isTRUE(as_data_frame)) { |
| tab <- collect.ArrowTabular(tab) |
| } |
| |
| tab |
| } |
| |
| #' @rdname read_delim_arrow |
| #' @export |
| read_csv_arrow <- function(file, |
| quote = '"', |
| escape_double = TRUE, |
| escape_backslash = FALSE, |
| schema = NULL, |
| col_names = TRUE, |
| col_types = NULL, |
| col_select = NULL, |
| na = c("", "NA"), |
| quoted_na = TRUE, |
| skip_empty_rows = TRUE, |
| skip = 0L, |
| parse_options = NULL, |
| convert_options = NULL, |
| read_options = NULL, |
| as_data_frame = TRUE, |
| timestamp_parsers = NULL) { |
| mc <- match.call() |
| mc$delim <- "," |
| mc[[1]] <- get("read_delim_arrow", envir = asNamespace("arrow")) |
| eval.parent(mc) |
| } |
| |
| #' @rdname read_delim_arrow |
| #' @export |
| read_csv2_arrow <- function(file, |
| quote = '"', |
| escape_double = TRUE, |
| escape_backslash = FALSE, |
| schema = NULL, |
| col_names = TRUE, |
| col_types = NULL, |
| col_select = NULL, |
| na = c("", "NA"), |
| quoted_na = TRUE, |
| skip_empty_rows = TRUE, |
| skip = 0L, |
| parse_options = NULL, |
| convert_options = NULL, |
| read_options = NULL, |
| as_data_frame = TRUE, |
| timestamp_parsers = NULL) { |
| mc <- match.call() |
| mc$delim <- ";" |
| mc$decimal_point <- "," |
| mc[[1]] <- get("read_delim_arrow", envir = asNamespace("arrow")) |
| eval.parent(mc) |
| } |
| |
| #' @rdname read_delim_arrow |
| #' @export |
| read_tsv_arrow <- function(file, |
| quote = '"', |
| escape_double = TRUE, |
| escape_backslash = FALSE, |
| schema = NULL, |
| col_names = TRUE, |
| col_types = NULL, |
| col_select = NULL, |
| na = c("", "NA"), |
| quoted_na = TRUE, |
| skip_empty_rows = TRUE, |
| skip = 0L, |
| parse_options = NULL, |
| convert_options = NULL, |
| read_options = NULL, |
| as_data_frame = TRUE, |
| timestamp_parsers = NULL) { |
| mc <- match.call() |
| mc$delim <- "\t" |
| mc[[1]] <- get("read_delim_arrow", envir = asNamespace("arrow")) |
| eval.parent(mc) |
| } |
| |
| #' @title Arrow CSV and JSON table reader classes |
| #' @rdname CsvTableReader |
| #' @name CsvTableReader |
| #' @docType class |
| #' @usage NULL |
| #' @format NULL |
| #' @description `CsvTableReader` and `JsonTableReader` wrap the Arrow C++ CSV |
| #' and JSON table readers. See their usage in [read_csv_arrow()] and |
| #' [read_json_arrow()], respectively. |
| #' |
| #' @section Factory: |
| #' |
| #' The `CsvTableReader$create()` and `JsonTableReader$create()` factory methods |
| #' take the following arguments: |
| #' |
| #' - `file` An Arrow [InputStream] |
| #' - `convert_options` (CSV only), `parse_options`, `read_options`: see |
| #' [CsvReadOptions] |
| #' - `...` additional parameters. |
| #' |
| #' @section Methods: |
| #' |
| #' - `$Read()`: returns an Arrow Table. |
| #' |
| #' @include arrow-object.R |
| #' @export |
| CsvTableReader <- R6Class("CsvTableReader", |
| inherit = ArrowObject, |
| public = list( |
| Read = function() csv___TableReader__Read(self) |
| ) |
| ) |
| CsvTableReader$create <- function(file, |
| read_options = csv_read_options(), |
| parse_options = csv_parse_options(), |
| convert_options = csv_convert_options(), |
| ...) { |
| assert_is(file, "InputStream") |
| |
| if (is.list(read_options)) { |
| read_options <- do.call(csv_read_options, read_options) |
| } |
| |
| if (is.list(parse_options)) { |
| parse_options <- do.call(csv_parse_options, parse_options) |
| } |
| |
| if (is.list(convert_options)) { |
| convert_options <- do.call(csv_convert_options, convert_options) |
| } |
| |
| if (!(tolower(read_options$encoding) %in% c("utf-8", "utf8"))) { |
| file <- MakeReencodeInputStream(file, read_options$encoding) |
| } |
| |
| csv___TableReader__Make(file, read_options, parse_options, convert_options) |
| } |
| |
| #' CSV Reading Options |
| #' |
| #' @param use_threads Whether to use the global CPU thread pool |
| #' @param block_size Block size we request from the IO layer; also determines |
| #' the size of chunks when use_threads is `TRUE`. |
| #' @param skip_rows Number of lines to skip before reading data (default 0). |
| #' @param column_names Character vector to supply column names. If length-0 |
| #' (the default), the first non-skipped row will be parsed to generate column |
| #' names, unless `autogenerate_column_names` is `TRUE`. |
| #' @param autogenerate_column_names Logical: generate column names instead of |
| #' using the first non-skipped row (the default)? If `TRUE`, column names will |
| #' be "f0", "f1", ..., "fN". |
| #' @param encoding The file encoding. (default `"UTF-8"`) |
| #' @param skip_rows_after_names Number of lines to skip after the column names (default 0). |
| #' This number can be larger than the number of rows in one block, and empty rows are counted. |
| #' The order of application is as follows: |
| #' - `skip_rows` is applied (if non-zero); |
| #' - column names are read (unless `column_names` is set); |
| #' - `skip_rows_after_names` is applied (if non-zero). |
| #' |
| #' @examplesIf arrow_with_dataset() |
| #' tf <- tempfile() |
| #' on.exit(unlink(tf)) |
| #' writeLines("my file has a non-data header\nx\n1\n2", tf) |
| #' read_csv_arrow(tf, read_options = csv_read_options(skip_rows = 1)) |
| #' open_csv_dataset(tf, read_options = csv_read_options(skip_rows = 1)) |
| #' @export |
| csv_read_options <- function(use_threads = option_use_threads(), |
| block_size = 1048576L, |
| skip_rows = 0L, |
| column_names = character(0), |
| autogenerate_column_names = FALSE, |
| encoding = "UTF-8", |
| skip_rows_after_names = 0L) { |
| assert_that(is.string(encoding)) |
| |
| options <- csv___ReadOptions__initialize( |
| list( |
| use_threads = use_threads, |
| block_size = block_size, |
| skip_rows = skip_rows, |
| skip_rows_after_names = skip_rows_after_names, |
| column_names = column_names, |
| autogenerate_column_names = autogenerate_column_names |
| ) |
| ) |
| |
| options$encoding <- encoding |
| |
| options |
| } |
| |
| #' @title File reader options |
| #' @rdname CsvReadOptions |
| #' @name CsvReadOptions |
| #' @docType class |
| #' @usage NULL |
| #' @format NULL |
| #' @description `CsvReadOptions`, `CsvParseOptions`, `CsvConvertOptions`, |
| #' `JsonReadOptions`, `JsonParseOptions`, and `TimestampParser` are containers for various |
| #' file reading options. See their usage in [read_csv_arrow()] and |
| #' [read_json_arrow()], respectively. |
| #' |
| #' @section Factory: |
| #' |
| #' The `CsvReadOptions$create()` and `JsonReadOptions$create()` factory methods |
| #' take the following arguments: |
| #' |
| #' - `use_threads` Whether to use the global CPU thread pool |
| #' - `block_size` Block size we request from the IO layer; also determines |
| #' the size of chunks when use_threads is `TRUE`. NB: if `FALSE`, JSON input |
| #' must end with an empty line. |
| #' |
| #' `CsvReadOptions$create()` further accepts these additional arguments: |
| #' |
| #' - `skip_rows` Number of lines to skip before reading data (default 0). |
| #' - `column_names` Character vector to supply column names. If length-0 |
| #' (the default), the first non-skipped row will be parsed to generate column |
| #' names, unless `autogenerate_column_names` is `TRUE`. |
| #' - `autogenerate_column_names` Logical: generate column names instead of |
| #' using the first non-skipped row (the default)? If `TRUE`, column names will |
| #' be "f0", "f1", ..., "fN". |
| #' - `encoding` The file encoding. (default `"UTF-8"`) |
| #' - `skip_rows_after_names` Number of lines to skip after the column names (default 0). |
| #' This number can be larger than the number of rows in one block, and empty rows are counted. |
| #' The order of application is as follows: |
| #' - `skip_rows` is applied (if non-zero); |
| #' - column names are read (unless `column_names` is set); |
| #' - `skip_rows_after_names` is applied (if non-zero). |
| #' |
| #' `CsvParseOptions$create()` takes the following arguments: |
| #' |
| #' - `delimiter` Field delimiting character (default `","`) |
| #' - `quoting` Logical: are strings quoted? (default `TRUE`) |
| #' - `quote_char` Quoting character, if `quoting` is `TRUE` (default `'"'`) |
| #' - `double_quote` Logical: are quotes inside values double-quoted? (default `TRUE`) |
| #' - `escaping` Logical: whether escaping is used (default `FALSE`) |
| #' - `escape_char` Escaping character, if `escaping` is `TRUE` (default `"\\"`) |
| #' - `newlines_in_values` Logical: are values allowed to contain CR (`0x0d`) |
| #' and LF (`0x0a`) characters? (default `FALSE`) |
| #' - `ignore_empty_lines` Logical: should empty lines be ignored (default) or |
| #' generate a row of missing values (if `FALSE`)? |
| #' |
| #' `JsonParseOptions$create()` accepts only the `newlines_in_values` argument. |
| #' |
| #' `CsvConvertOptions$create()` takes the following arguments: |
| #' |
| #' - `check_utf8` Logical: check UTF8 validity of string columns? (default `TRUE`) |
| #' - `null_values` character vector of recognized spellings for null values. |
| #' Analogous to the `na.strings` argument to |
| #' [`read.csv()`][utils::read.csv()] or `na` in [readr::read_csv()]. |
| #' - `strings_can_be_null` Logical: can string / binary columns have |
| #' null values? Similar to the `quoted_na` argument to [readr::read_csv()]. |
| #' (default `FALSE`) |
| #' - `true_values` character vector of recognized spellings for `TRUE` values |
| #' - `false_values` character vector of recognized spellings for `FALSE` values |
| #' - `col_types` A `Schema` or `NULL` to infer types |
| #' - `auto_dict_encode` Logical: Whether to try to automatically |
| #' dictionary-encode string / binary data (think `stringsAsFactors`). Default `FALSE`. |
| #' This setting is ignored for non-inferred columns (those in `col_types`). |
| #' - `auto_dict_max_cardinality` If `auto_dict_encode`, string/binary columns |
| #' are dictionary-encoded up to this number of unique values (default 50), |
| #' after which it switches to regular encoding. |
| #' - `include_columns` If non-empty, indicates the names of columns from the |
| #' CSV file that should be actually read and converted (in the vector's order). |
| #' - `include_missing_columns` Logical: if `include_columns` is provided, should |
| #' columns named in it but not found in the data be included as a column of |
| #' type `null()`? The default (`FALSE`) means that the reader will instead |
| #' raise an error. |
| #' - `timestamp_parsers` User-defined timestamp parsers. If more than one |
| #' parser is specified, the CSV conversion logic will try parsing values |
| #' starting from the beginning of this vector. Possible values are |
| #' (a) `NULL`, the default, which uses the ISO-8601 parser; |
| #' (b) a character vector of [strptime][base::strptime()] parse strings; or |
| #' (c) a list of [TimestampParser] objects. |
| #' - `decimal_point` Character to use for decimal point in floating point numbers. Default: "." |
| #' |
| #' `TimestampParser$create()` takes an optional `format` string argument. |
| #' See [`strptime()`][base::strptime()] for example syntax. |
| #' The default is to use an ISO-8601 format parser. |
| #' |
| #' The `CsvWriteOptions$create()` factory method takes the following arguments: |
| #' - `include_header` Whether to write an initial header line with column names |
| #' - `batch_size` Maximum number of rows processed at a time. Default is 1024. |
| #' - `null_string` The string to be written for null values. Must not contain |
| #' quotation marks. Default is an empty string (`""`). |
| #' - `eol` The end of line character to use for ending rows. |
| #' - `delimiter` Field delimiter |
| #' - `quoting_style` Quoting style: "Needed" (Only enclose values in quotes which need them, because their CSV |
| #' rendering can contain quotes itself (e.g. strings or binary values)), "AllValid" (Enclose all valid values in |
| #' quotes), or "None" (Do not enclose any values in quotes). |
| #' |
| #' @section Active bindings: |
| #' |
| #' - `column_names`: from `CsvReadOptions` |
| #' |
| #' @export |
| CsvReadOptions <- R6Class("CsvReadOptions", |
| inherit = ArrowObject, |
| public = list( |
| encoding = NULL, |
| print = function(...) { |
| cat("CsvReadOptions\n") |
| for (attr in c( |
| "column_names", "block_size", "skip_rows", "autogenerate_column_names", |
| "use_threads", "skip_rows_after_names", "encoding" |
| )) { |
| cat(sprintf("%s: %s\n", attr, self[[attr]])) |
| } |
| invisible(self) |
| } |
| ), |
| active = list( |
| column_names = function() csv___ReadOptions__column_names(self), |
| block_size = function() csv___ReadOptions__block_size(self), |
| skip_rows = function() csv___ReadOptions__skip_rows(self), |
| autogenerate_column_names = function() csv___ReadOptions__autogenerate_column_names(self), |
| use_threads = function() csv___ReadOptions__use_threads(self), |
| skip_rows_after_names = function() csv___ReadOptions__skip_rows_after_names(self) |
| ) |
| ) |
| |
| CsvReadOptions$create <- csv_read_options |
| |
| readr_to_csv_write_options <- function(col_names = TRUE, |
| batch_size = 1024L, |
| delim = ",", |
| na = "", |
| eol = "\n", |
| quote = c("needed", "all", "none")) { |
| quoting_style_arrow_opts <- c("Needed", "AllValid", "None") |
| quote <- match(match.arg(quote), c("needed", "all", "none")) |
| quote <- quoting_style_arrow_opts[quote] |
| |
| csv_write_options( |
| include_header = col_names, |
| batch_size = batch_size, |
| delimiter = delim, |
| null_string = na, |
| eol = eol, |
| quoting_style = quote |
| ) |
| } |
| |
| #' CSV Writing Options |
| #' |
| #' @param include_header Whether to write an initial header line with column names |
| #' @param batch_size Maximum number of rows processed at a time. |
| #' @param null_string The string to be written for null values. Must not contain quotation marks. |
| #' @param delimiter Field delimiter |
| #' @param eol The end of line character to use for ending rows |
| #' @param quoting_style How to handle quotes. "Needed" (Only enclose values in quotes which need them, because their CSV |
| #' rendering can contain quotes itself (e.g. strings or binary values)), "AllValid" (Enclose all valid values in |
| #' quotes), or "None" (Do not enclose any values in quotes). |
| #' |
| #' @examples |
| #' tf <- tempfile() |
| #' on.exit(unlink(tf)) |
| #' write_csv_arrow(airquality, tf, write_options = csv_write_options(null_string = "-99")) |
| #' @export |
| csv_write_options <- function(include_header = TRUE, |
| batch_size = 1024L, |
| null_string = "", |
| delimiter = ",", |
| eol = "\n", |
| quoting_style = c("Needed", "AllValid", "None")) { |
| quoting_style <- match.arg(quoting_style) |
| quoting_style_opts <- c("Needed", "AllValid", "None") |
| quoting_style <- match(quoting_style, quoting_style_opts) - 1L |
| |
| assert_that(is.logical(include_header)) |
| assert_that(is_integerish(batch_size, n = 1, finite = TRUE), batch_size > 0) |
| assert_that(is.character(delimiter)) |
| assert_that(is.character(null_string)) |
| assert_that(!is.na(null_string)) |
| assert_that(length(null_string) == 1) |
| assert_that(!grepl('"', null_string), msg = "na argument must not contain quote characters.") |
| assert_that(is.character(eol)) |
| |
| csv___WriteOptions__initialize( |
| list( |
| include_header = include_header, |
| batch_size = as.integer(batch_size), |
| delimiter = delimiter, |
| null_string = as.character(null_string), |
| eol = eol, |
| quoting_style = quoting_style |
| ) |
| ) |
| } |
| |
| #' @rdname CsvReadOptions |
| #' @export |
| CsvWriteOptions <- R6Class("CsvWriteOptions", inherit = ArrowObject) |
| CsvWriteOptions$create <- csv_write_options |
| |
| readr_to_csv_read_options <- function(skip = 0, col_names = TRUE) { |
| if (isTRUE(col_names)) { |
| # C++ default to parse is 0-length string array |
| col_names <- character(0) |
| } |
| if (identical(col_names, FALSE)) { |
| csv_read_options(skip_rows = skip, autogenerate_column_names = TRUE) |
| } else { |
| csv_read_options(skip_rows = skip, column_names = col_names) |
| } |
| } |
| |
| #' CSV Parsing Options |
| #' |
| #' @param delimiter Field delimiting character |
| #' @param quoting Logical: are strings quoted? |
| #' @param quote_char Quoting character, if `quoting` is `TRUE` |
| #' @param double_quote Logical: are quotes inside values double-quoted? |
| #' @param escaping Logical: whether escaping is used |
| #' @param escape_char Escaping character, if `escaping` is `TRUE` |
| #' @param newlines_in_values Logical: are values allowed to contain CR (`0x0d`) |
| #' and LF (`0x0a`) characters? |
| #' @param ignore_empty_lines Logical: should empty lines be ignored (default) or |
| #' generate a row of missing values (if `FALSE`)? |
| #' @examplesIf arrow_with_dataset() |
| #' tf <- tempfile() |
| #' on.exit(unlink(tf)) |
| #' writeLines("x\n1\n\n2", tf) |
| #' read_csv_arrow(tf, parse_options = csv_parse_options(ignore_empty_lines = FALSE)) |
| #' open_csv_dataset(tf, parse_options = csv_parse_options(ignore_empty_lines = FALSE)) |
| #' @export |
| csv_parse_options <- function(delimiter = ",", |
| quoting = TRUE, |
| quote_char = '"', |
| double_quote = TRUE, |
| escaping = FALSE, |
| escape_char = "\\", |
| newlines_in_values = FALSE, |
| ignore_empty_lines = TRUE) { |
| csv___ParseOptions__initialize( |
| list( |
| delimiter = delimiter, |
| quoting = quoting, |
| quote_char = quote_char, |
| double_quote = double_quote, |
| escaping = escaping, |
| escape_char = escape_char, |
| newlines_in_values = newlines_in_values, |
| ignore_empty_lines = ignore_empty_lines |
| ) |
| ) |
| } |
| |
| #' @rdname CsvReadOptions |
| #' @usage NULL |
| #' @format NULL |
| #' @docType class |
| #' @export |
| CsvParseOptions <- R6Class("CsvParseOptions", inherit = ArrowObject) |
| CsvParseOptions$create <- csv_parse_options |
| |
| readr_to_csv_parse_options <- function(delim = ",", |
| quote = '"', |
| escape_double = TRUE, |
| escape_backslash = FALSE, |
| skip_empty_rows = TRUE) { |
| # This function translates from the readr argument list to the arrow arg names |
| # TODO: validate inputs |
| csv_parse_options( |
| delimiter = delim, |
| quoting = nzchar(quote), |
| quote_char = quote, |
| double_quote = escape_double, |
| escaping = escape_backslash, |
| escape_char = "\\", |
| newlines_in_values = escape_backslash, |
| ignore_empty_lines = skip_empty_rows |
| ) |
| } |
| |
| #' @rdname CsvReadOptions |
| #' @usage NULL |
| #' @format NULL |
| #' @docType class |
| #' @export |
| TimestampParser <- R6Class("TimestampParser", |
| inherit = ArrowObject, |
| public = list( |
| kind = function() TimestampParser__kind(self), |
| format = function() TimestampParser__format(self) |
| ) |
| ) |
| TimestampParser$create <- function(format = NULL) { |
| if (is.null(format)) { |
| TimestampParser__MakeISO8601() |
| } else { |
| TimestampParser__MakeStrptime(format) |
| } |
| } |
| |
| |
| #' CSV Convert Options |
| #' |
| #' @param check_utf8 Logical: check UTF8 validity of string columns? |
| #' @param null_values Character vector of recognized spellings for null values. |
| #' Analogous to the `na.strings` argument to |
| #' [`read.csv()`][utils::read.csv()] or `na` in [readr::read_csv()]. |
| #' @param strings_can_be_null Logical: can string / binary columns have |
| #' null values? Similar to the `quoted_na` argument to [readr::read_csv()] |
| #' @param true_values Character vector of recognized spellings for `TRUE` values |
| #' @param false_values Character vector of recognized spellings for `FALSE` values |
| #' @param col_types A `Schema` or `NULL` to infer types |
| #' @param auto_dict_encode Logical: Whether to try to automatically |
| #' dictionary-encode string / binary data (think `stringsAsFactors`). |
| #' This setting is ignored for non-inferred columns (those in `col_types`). |
| #' @param auto_dict_max_cardinality If `auto_dict_encode`, string/binary columns |
| #' are dictionary-encoded up to this number of unique values (default 50), |
| #' after which it switches to regular encoding. |
| #' @param include_columns If non-empty, indicates the names of columns from the |
| #' CSV file that should be actually read and converted (in the vector's order). |
| #' @param include_missing_columns Logical: if `include_columns` is provided, should |
| #' columns named in it but not found in the data be included as a column of |
| #' type `null()`? The default (`FALSE`) means that the reader will instead |
| #' raise an error. |
| #' @param timestamp_parsers User-defined timestamp parsers. If more than one |
| #' parser is specified, the CSV conversion logic will try parsing values |
| #' starting from the beginning of this vector. Possible values are |
| #' (a) `NULL`, the default, which uses the ISO-8601 parser; |
| #' (b) a character vector of [strptime][base::strptime()] parse strings; or |
| #' (c) a list of [TimestampParser] objects. |
| #' @param decimal_point Character to use for decimal point in floating point numbers. |
| #' |
| #' @examplesIf arrow_with_dataset() |
| #' tf <- tempfile() |
| #' on.exit(unlink(tf)) |
| #' writeLines("x\n1\nNULL\n2\nNA", tf) |
| #' read_csv_arrow(tf, convert_options = csv_convert_options(null_values = c("", "NA", "NULL"))) |
| #' open_csv_dataset(tf, convert_options = csv_convert_options(null_values = c("", "NA", "NULL"))) |
| #' @export |
| csv_convert_options <- function(check_utf8 = TRUE, |
| null_values = c("", "NA"), |
| true_values = c("T", "true", "TRUE"), |
| false_values = c("F", "false", "FALSE"), |
| strings_can_be_null = FALSE, |
| col_types = NULL, |
| auto_dict_encode = FALSE, |
| auto_dict_max_cardinality = 50L, |
| include_columns = character(), |
| include_missing_columns = FALSE, |
| timestamp_parsers = NULL, |
| decimal_point = ".") { |
| if (!is.null(col_types) && !inherits(col_types, "Schema")) { |
| abort(c( |
| "Unsupported `col_types` specification.", |
| i = "`col_types` must be NULL, or a <Schema>." |
| )) |
| } |
| |
| csv___ConvertOptions__initialize( |
| list( |
| check_utf8 = check_utf8, |
| null_values = null_values, |
| strings_can_be_null = strings_can_be_null, |
| col_types = col_types, |
| true_values = true_values, |
| false_values = false_values, |
| auto_dict_encode = auto_dict_encode, |
| auto_dict_max_cardinality = auto_dict_max_cardinality, |
| include_columns = include_columns, |
| include_missing_columns = include_missing_columns, |
| timestamp_parsers = timestamp_parsers, |
| decimal_point = decimal_point |
| ) |
| ) |
| } |
| |
| #' @rdname CsvReadOptions |
| #' @usage NULL |
| #' @format NULL |
| #' @docType class |
| #' @export |
| CsvConvertOptions <- R6Class("CsvConvertOptions", inherit = ArrowObject) |
| CsvConvertOptions$create <- csv_convert_options |
| |
| readr_to_csv_convert_options <- function(na, |
| quoted_na, |
| decimal_point, |
| col_types = NULL, |
| col_names = NULL, |
| timestamp_parsers = NULL) { |
| include_columns <- character() |
| |
| if (is.character(col_types)) { |
| if (length(col_types) != 1L) { |
| abort("`col_types` is a character vector that is not of size 1") |
| } |
| n <- nchar(col_types) |
| specs <- substring(col_types, seq_len(n), seq_len(n)) |
| if (!is_bare_character(col_names, n)) { |
| abort("Compact specification for `col_types` requires `col_names`") |
| } |
| |
| col_types <- set_names(nm = col_names, map2(specs, col_names, ~ { |
| switch(.x, |
| "c" = utf8(), |
| "i" = int32(), |
| "n" = float64(), |
| "d" = float64(), |
| "l" = bool(), |
| "f" = dictionary(), |
| "D" = date32(), |
| "T" = timestamp(unit = "ns"), |
| "t" = time32(), |
| "_" = null(), |
| "-" = null(), |
| "?" = NULL, |
| abort("Unsupported compact specification: '", .x, "' for column '", .y, "'") |
| ) |
| })) |
| # To "guess" types, omit them from col_types |
| col_types <- keep(col_types, ~ !is.null(.x)) |
| col_types <- schema(col_types) |
| } |
| |
| if (!is.null(col_types)) { |
| assert_is(col_types, "Schema") |
| # If any columns are null(), drop them |
| # (by specifying the other columns in include_columns) |
| nulls <- map_lgl(col_types$fields, ~ .$type$Equals(null())) |
| if (any(nulls)) { |
| include_columns <- setdiff(col_names, names(col_types)[nulls]) |
| } |
| } |
| csv_convert_options( |
| null_values = na, |
| strings_can_be_null = quoted_na, |
| col_types = col_types, |
| timestamp_parsers = timestamp_parsers, |
| include_columns = include_columns, |
| decimal_point = decimal_point |
| ) |
| } |
| |
| #' Write CSV file to disk |
| #' |
| #' @param x `data.frame`, [RecordBatch], or [Table] |
| #' @param sink A string file path, URI, or [OutputStream], or path in a file |
| #' system (`SubTreeFileSystem`) |
| #' @param file file name. Specify this or `sink`, not both. |
| #' @param include_header Whether to write an initial header line with column names |
| #' @param col_names identical to `include_header`. Specify this or |
| #' `include_headers`, not both. |
| #' @param batch_size Maximum number of rows processed at a time. Default is 1024. |
| #' @param na value to write for NA values. Must not contain quote marks. Default |
| #' is `""`. |
| #' @param write_options see [CSV write options][csv_write_options] |
| #' @param ... additional parameters |
| #' |
| #' @return The input `x`, invisibly. Note that if `sink` is an [OutputStream], |
| #' the stream will be left open. |
| #' @export |
| #' @examples |
| #' tf <- tempfile() |
| #' on.exit(unlink(tf)) |
| #' write_csv_arrow(mtcars, tf) |
| #' @include arrow-object.R |
| write_csv_arrow <- function(x, |
| sink, |
| file = NULL, |
| include_header = TRUE, |
| col_names = NULL, |
| batch_size = 1024L, |
| na = "", |
| write_options = NULL, |
| ...) { |
| unsupported_passed_args <- names(list(...)) |
| |
| if (length(unsupported_passed_args)) { |
| stop( |
| "The following ", |
| ngettext(length(unsupported_passed_args), "argument is ", "arguments are "), |
| "not yet supported in Arrow: ", |
| oxford_paste(unsupported_passed_args), |
| call. = FALSE |
| ) |
| } |
| |
| if (!missing(file) && !missing(sink)) { |
| stop( |
| "You have supplied both \"file\" and \"sink\" arguments. Please ", |
| "supply only one of them.", |
| call. = FALSE |
| ) |
| } |
| |
| if (missing(sink) && !missing(file)) { |
| sink <- file |
| } |
| |
| if (!missing(col_names) && !missing(include_header)) { |
| stop( |
| "You have supplied both \"col_names\" and \"include_header\" ", |
| "arguments. Please supply only one of them.", |
| call. = FALSE |
| ) |
| } |
| |
| if (missing(include_header) && !missing(col_names)) { |
| include_header <- col_names |
| } |
| |
| if (is.null(write_options)) { |
| write_options <- readr_to_csv_write_options( |
| col_names = include_header, |
| batch_size = batch_size, |
| na = na |
| ) |
| } |
| |
| x_out <- x |
| if (!inherits(x, "ArrowTabular")) { |
| tryCatch( |
| x <- as_record_batch_reader(x), |
| error = function(e) { |
| if (grepl("Input data frame columns must be named", conditionMessage(e))) { |
| abort(conditionMessage(e), parent = NA) |
| } else { |
| abort( |
| paste0( |
| "x must be an object of class 'data.frame', 'RecordBatch', ", |
| "'Dataset', 'Table', or 'RecordBatchReader' not '", class(x)[1], "'." |
| ), |
| parent = NA |
| ) |
| } |
| } |
| ) |
| } |
| |
| if (!inherits(sink, "OutputStream")) { |
| compression <- detect_compression(sink) |
| sink <- make_output_stream(sink) |
| if (compression != "uncompressed") { |
| # TODO: accept compression and compression_level as args |
| sink <- CompressedOutputStream$create(sink, codec = compression) |
| } |
| on.exit(sink$close()) |
| } |
| |
| if (inherits(x, "RecordBatch")) { |
| csv___WriteCSV__RecordBatch(x, write_options, sink) |
| } else if (inherits(x, "Table")) { |
| csv___WriteCSV__Table(x, write_options, sink) |
| } else if (inherits(x, c("RecordBatchReader"))) { |
| csv___WriteCSV__RecordBatchReader(x, write_options, sink) |
| } |
| |
| invisible(x_out) |
| } |