blob: 3903b1901cf5ce7aa3313b2493497bbf05efb784 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
to_arrow <- function(x) {
UseMethod("to_arrow")
}
to_arrow.RecordBatch <- function(x) x
to_arrow.Table <- function(x) x
# splice the data frame as arguments of Table$create()
# see ?rlang::list2()
to_arrow.data.frame <- function(x) Table$create(!!!x)
#' Write Arrow formatted data
#'
#' @param x an [arrow::Table][Table], an [arrow::RecordBatch][RecordBatch] or a data.frame
#'
#' @param sink where to serialize to
#'
#' - A [arrow::RecordBatchWriter][RecordBatchWriter]: the `$write()`
#' of `x` is used. The stream is left open. This uses the streaming format
#' or the binary file format depending on the type of the writer.
#'
#' - A string file path: `x` is serialized with
#' a [arrow::RecordBatchFileWriter][RecordBatchFileWriter], i.e.
#' using the binary file format.
#'
#' - A raw vector: typically of length zero (its data is ignored, and only used for
#' dispatch). `x` is serialized using the streaming format, i.e. using the
#' [arrow::RecordBatchStreamWriter][RecordBatchStreamWriter]
#'
#' @param ... extra parameters, currently ignored
#'
#' `write_arrow` is a convenience function, the classes [arrow::RecordBatchFileWriter][RecordBatchFileWriter]
#' and [arrow::RecordBatchStreamWriter][RecordBatchStreamWriter] can be used for more flexibility.
#'
#' @export
write_arrow <- function(x, sink, ...) {
UseMethod("write_arrow", sink)
}
#' @export
write_arrow.RecordBatchWriter <- function(x, sink, ...){
sink$write(x)
}
#' @export
write_arrow.character <- function(x, sink, ...) {
assert_that(length(sink) == 1L)
x <- to_arrow(x)
file_stream <- FileOutputStream$create(sink)
on.exit(file_stream$close())
file_writer <- RecordBatchFileWriter$create(file_stream, x$schema)
on.exit({
# Re-set the exit code to close both connections, LIFO
file_writer$close()
file_stream$close()
})
# Available on R >= 3.5
# on.exit(file_writer$close(), add = TRUE, after = FALSE)
write_arrow(x, file_writer, ...)
}
#' @export
write_arrow.raw <- function(x, sink, ...) {
x <- to_arrow(x)
schema <- x$schema
# how many bytes do we need
mock_stream <- MockOutputStream$create()
writer <- RecordBatchStreamWriter$create(mock_stream, schema)
writer$write(x)
writer$close()
n <- mock_stream$GetExtentBytesWritten()
# now that we know the size, stream in a buffer backed by an R raw vector
bytes <- raw(n)
buffer_writer <- FixedSizeBufferWriter$create(buffer(bytes))
writer <- RecordBatchStreamWriter$create(buffer_writer, schema)
writer$write(x)
writer$close()
bytes
}