blob: 483588ab4bb674be0448de31b1d6a3d35e692807 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
context("RecordBatch.*(Reader|Writer)")
test_that("RecordBatchStreamReader / Writer", {
tbl <- tibble::tibble(
x = 1:10,
y = letters[1:10]
)
batch <- record_batch(tbl)
tab <- Table$create(tbl)
sink <- BufferOutputStream$create()
expect_equal(sink$tell(), 0)
writer <- RecordBatchStreamWriter$create(sink, batch$schema)
expect_r6_class(writer, "RecordBatchWriter")
writer$write(batch)
writer$write(tab)
writer$write(tbl)
expect_true(sink$tell() > 0)
writer$close()
buf <- sink$finish()
expect_r6_class(buf, "Buffer")
reader <- RecordBatchStreamReader$create(buf)
expect_r6_class(reader, "RecordBatchStreamReader")
batch1 <- reader$read_next_batch()
expect_r6_class(batch1, "RecordBatch")
expect_equal(batch, batch1)
batch2 <- reader$read_next_batch()
expect_r6_class(batch2, "RecordBatch")
expect_equal(batch, batch2)
batch3 <- reader$read_next_batch()
expect_r6_class(batch3, "RecordBatch")
expect_equal(batch, batch3)
expect_null(reader$read_next_batch())
})
test_that("RecordBatchFileReader / Writer", {
sink <- BufferOutputStream$create()
writer <- RecordBatchFileWriter$create(sink, batch$schema)
expect_r6_class(writer, "RecordBatchWriter")
writer$write(batch)
writer$write(tab)
writer$write(tbl)
writer$close()
buf <- sink$finish()
expect_r6_class(buf, "Buffer")
reader <- RecordBatchFileReader$create(buf)
expect_r6_class(reader, "RecordBatchFileReader")
batch1 <- reader$get_batch(0)
expect_r6_class(batch1, "RecordBatch")
expect_equal(batch, batch1)
expect_equal(reader$num_record_batches, 3)
})
test_that("StreamReader read_table", {
sink <- BufferOutputStream$create()
writer <- RecordBatchStreamWriter$create(sink, batch$schema)
expect_r6_class(writer, "RecordBatchWriter")
writer$write(batch)
writer$write(tab)
writer$write(tbl)
writer$close()
buf <- sink$finish()
reader <- RecordBatchStreamReader$create(buf)
out <- reader$read_table()
expect_identical(dim(out), c(30L, 2L))
})
test_that("FileReader read_table", {
sink <- BufferOutputStream$create()
writer <- RecordBatchFileWriter$create(sink, batch$schema)
expect_r6_class(writer, "RecordBatchWriter")
writer$write(batch)
writer$write(tab)
writer$write(tbl)
writer$close()
buf <- sink$finish()
reader <- RecordBatchFileReader$create(buf)
out <- reader$read_table()
expect_identical(dim(out), c(30L, 2L))
})
test_that("MetadataFormat", {
expect_identical(get_ipc_metadata_version(5), 4L)
expect_identical(get_ipc_metadata_version("V4"), 3L)
expect_identical(get_ipc_metadata_version(NULL), 4L)
Sys.setenv(ARROW_PRE_0_15_IPC_FORMAT = 1)
expect_identical(get_ipc_metadata_version(NULL), 3L)
Sys.setenv(ARROW_PRE_0_15_IPC_FORMAT = "")
expect_identical(get_ipc_metadata_version(NULL), 4L)
Sys.setenv(ARROW_PRE_1_0_METADATA_VERSION = 1)
expect_identical(get_ipc_metadata_version(NULL), 3L)
Sys.setenv(ARROW_PRE_1_0_METADATA_VERSION = "")
expect_error(
get_ipc_metadata_version(99),
"99 is not a valid IPC MetadataVersion"
)
expect_error(
get_ipc_metadata_version("45"),
'"45" is not a valid IPC MetadataVersion'
)
})
test_that("reader with 0 batches", {
# IPC stream containing only a schema (ARROW-10642)
sink <- BufferOutputStream$create()
writer <- RecordBatchStreamWriter$create(sink, schema(a = int32()))
writer$close()
buf <- sink$finish()
reader <- RecordBatchStreamReader$create(buf)
tab <- reader$read_table()
expect_r6_class(tab, "Table")
expect_identical(dim(tab), c(0L, 1L))
})