blob: 3c893cdf2ffb5efa3b20966eb0cf842f05fec5f2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Arrow IPC File and Stream Readers
//!
//! The `FileReader` and `StreamReader` have similar interfaces,
//! however the `FileReader` expects a reader that supports `Seek`ing
use std::collections::HashMap;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
use crate::array::*;
use crate::buffer::Buffer;
use crate::compute::cast;
use crate::datatypes::{DataType, Field, IntervalUnit, Schema, SchemaRef};
use crate::error::{ArrowError, Result};
use crate::ipc;
use crate::record_batch::{RecordBatch, RecordBatchReader};
use ipc::CONTINUATION_MARKER;
use DataType::*;
/// Read a buffer based on offset and length
fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
let start_offset = buf.offset() as usize;
let end_offset = start_offset + buf.length() as usize;
let buf_data = &a_data[start_offset..end_offset];
Buffer::from(&buf_data)
}
/// Coordinates reading arrays based on data types.
///
/// Notes:
/// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls
/// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size.
/// We thus:
/// - check if the bit width of non-64-bit numbers is 64, and
/// - read the buffer as 64-bit (signed integer or float), and
/// - cast the 64-bit array to the appropriate data type
fn create_array(
nodes: &[ipc::FieldNode],
data_type: &DataType,
data: &[u8],
buffers: &[ipc::Buffer],
dictionaries: &[Option<ArrayRef>],
mut node_index: usize,
mut buffer_index: usize,
) -> (ArrayRef, usize, usize) {
use DataType::*;
let array = match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => {
let array = create_primitive_array(
&nodes[node_index],
data_type,
buffers[buffer_index..buffer_index + 3]
.iter()
.map(|buf| read_buffer(buf, data))
.collect(),
);
node_index += 1;
buffer_index += 3;
array
}
FixedSizeBinary(_) => {
let array = create_primitive_array(
&nodes[node_index],
data_type,
buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.collect(),
);
node_index += 1;
buffer_index += 2;
array
}
List(ref list_field) | LargeList(ref list_field) => {
let list_node = &nodes[node_index];
let list_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.collect();
node_index += 1;
buffer_index += 2;
let triple = create_array(
nodes,
list_field.data_type(),
data,
buffers,
dictionaries,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
create_list_array(list_node, data_type, &list_buffers[..], triple.0)
}
FixedSizeList(ref list_field, _) => {
let list_node = &nodes[node_index];
let list_buffers: Vec<Buffer> = buffers[buffer_index..=buffer_index]
.iter()
.map(|buf| read_buffer(buf, data))
.collect();
node_index += 1;
buffer_index += 1;
let triple = create_array(
nodes,
list_field.data_type(),
data,
buffers,
dictionaries,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
create_list_array(list_node, data_type, &list_buffers[..], triple.0)
}
Struct(struct_fields) => {
let struct_node = &nodes[node_index];
let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data);
node_index += 1;
buffer_index += 1;
// read the arrays for each field
let mut struct_arrays = vec![];
// TODO investigate whether just knowing the number of buffers could
// still work
for struct_field in struct_fields {
let triple = create_array(
nodes,
struct_field.data_type(),
data,
buffers,
dictionaries,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
struct_arrays.push((struct_field.clone(), triple.0));
}
let null_count = struct_node.null_count() as usize;
let struct_array = if null_count > 0 {
// create struct array from fields, arrays and null data
StructArray::from((struct_arrays, null_buffer))
} else {
StructArray::from(struct_arrays)
};
Arc::new(struct_array)
}
// Create dictionary array from RecordBatch
Dictionary(_, _) => {
let index_node = &nodes[node_index];
let index_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.collect();
let value_array = dictionaries[node_index].clone().unwrap();
node_index += 1;
buffer_index += 2;
create_dictionary_array(
index_node,
data_type,
&index_buffers[..],
value_array,
)
}
Null => {
let length = nodes[node_index].length() as usize;
let data = ArrayData::builder(data_type.clone())
.len(length)
.offset(0)
.build();
node_index += 1;
// no buffer increases
make_array(data)
}
_ => {
let array = create_primitive_array(
&nodes[node_index],
data_type,
buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.collect(),
);
node_index += 1;
buffer_index += 2;
array
}
};
(array, node_index, buffer_index)
}
/// Reads the correct number of buffers based on data type and null_count, and creates a
/// primitive array ref
fn create_primitive_array(
field_node: &ipc::FieldNode,
data_type: &DataType,
buffers: Vec<Buffer>,
) -> ArrayRef {
let length = field_node.length() as usize;
let null_count = field_node.null_count() as usize;
let array_data = match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => {
// read 3 buffers
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..3].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
FixedSizeBinary(_) => {
// read 3 buffers
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..2].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
Int8
| Int16
| Int32
| UInt8
| UInt16
| UInt32
| Time32(_)
| Date32
| Interval(IntervalUnit::YearMonth) => {
if buffers[1].len() / 8 == length && length != 1 {
// interpret as a signed i64, and cast appropriately
let mut builder = ArrayData::builder(DataType::Int64)
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
let values = Arc::new(Int64Array::from(builder.build())) as ArrayRef;
// this cast is infallible, the unwrap is safe
let casted = cast(&values, data_type).unwrap();
casted.data().clone()
} else {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
}
Float32 => {
if buffers[1].len() / 8 == length && length != 1 {
// interpret as a f64, and cast appropriately
let mut builder = ArrayData::builder(DataType::Float64)
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
let values = Arc::new(Float64Array::from(builder.build())) as ArrayRef;
// this cast is infallible, the unwrap is safe
let casted = cast(&values, data_type).unwrap();
casted.data().clone()
} else {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
}
Boolean
| Int64
| UInt64
| Float64
| Time64(_)
| Timestamp(_, _)
| Date64
| Duration(_)
| Interval(IntervalUnit::DayTime) => {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
Decimal(_, _) => {
// read 3 buffers
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..2].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
t => panic!("Data type {:?} either unsupported or not primitive", t),
};
make_array(array_data)
}
/// Reads the correct number of buffers based on list type and null_count, and creates a
/// list array ref
fn create_list_array(
field_node: &ipc::FieldNode,
data_type: &DataType,
buffers: &[Buffer],
child_array: ArrayRef,
) -> ArrayRef {
if let DataType::List(_) = *data_type {
let null_count = field_node.null_count() as usize;
let mut builder = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.buffers(buffers[1..2].to_vec())
.offset(0)
.child_data(vec![child_array.data().clone()]);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
make_array(builder.build())
} else if let DataType::LargeList(_) = *data_type {
let null_count = field_node.null_count() as usize;
let mut builder = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.buffers(buffers[1..2].to_vec())
.offset(0)
.child_data(vec![child_array.data().clone()]);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
make_array(builder.build())
} else if let DataType::FixedSizeList(_, _) = *data_type {
let null_count = field_node.null_count() as usize;
let mut builder = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.buffers(buffers[1..1].to_vec())
.offset(0)
.child_data(vec![child_array.data().clone()]);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
make_array(builder.build())
} else {
panic!("Cannot create list array from {:?}", data_type)
}
}
/// Reads the correct number of buffers based on list type and null_count, and creates a
/// list array ref
fn create_dictionary_array(
field_node: &ipc::FieldNode,
data_type: &DataType,
buffers: &[Buffer],
value_array: ArrayRef,
) -> ArrayRef {
if let DataType::Dictionary(_, _) = *data_type {
let null_count = field_node.null_count() as usize;
let mut builder = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.buffers(buffers[1..2].to_vec())
.offset(0)
.child_data(vec![value_array.data().clone()]);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
make_array(builder.build())
} else {
unreachable!("Cannot create dictionary array from {:?}", data_type)
}
}
/// Creates a record batch from binary data using the `ipc::RecordBatch` indexes and the `Schema`
pub fn read_record_batch(
buf: &[u8],
batch: ipc::RecordBatch,
schema: SchemaRef,
dictionaries: &[Option<ArrayRef>],
) -> Result<RecordBatch> {
let buffers = batch.buffers().ok_or_else(|| {
ArrowError::IoError("Unable to get buffers from IPC RecordBatch".to_string())
})?;
let field_nodes = batch.nodes().ok_or_else(|| {
ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string())
})?;
// keep track of buffer and node index, the functions that create arrays mutate these
let mut buffer_index = 0;
let mut node_index = 0;
let mut arrays = vec![];
// keep track of index as lists require more than one node
for field in schema.fields() {
let triple = create_array(
field_nodes,
field.data_type(),
&buf,
buffers,
dictionaries,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
arrays.push(triple.0);
}
RecordBatch::try_new(schema, arrays)
}
/// Read the dictionary from the buffer and provided metadata,
/// updating the `dictionaries_by_field` with the resulting dictionary
pub fn read_dictionary(
buf: &[u8],
batch: ipc::DictionaryBatch,
schema: &Schema,
dictionaries_by_field: &mut [Option<ArrayRef>],
) -> Result<()> {
if batch.isDelta() {
return Err(ArrowError::IoError(
"delta dictionary batches not supported".to_string(),
));
}
let id = batch.id();
let fields_using_this_dictionary = schema.fields_with_dict_id(id);
let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
ArrowError::InvalidArgumentError("dictionary id not found in schema".to_string())
})?;
// As the dictionary batch does not contain the type of the
// values array, we need to retrieve this from the schema.
// Get an array representing this dictionary's values.
let dictionary_values: ArrayRef = match first_field.data_type() {
DataType::Dictionary(_, ref value_type) => {
// Make a fake schema for the dictionary batch.
let schema = Schema {
fields: vec![Field::new("", value_type.as_ref().clone(), false)],
metadata: HashMap::new(),
};
// Read a single column
let record_batch = read_record_batch(
&buf,
batch.data().unwrap(),
Arc::new(schema),
&dictionaries_by_field,
)?;
Some(record_batch.column(0).clone())
}
_ => None,
}
.ok_or_else(|| {
ArrowError::InvalidArgumentError("dictionary id not found in schema".to_string())
})?;
// for all fields with this dictionary id, update the dictionaries vector
// in the reader. Note that a dictionary batch may be shared between many fields.
// We don't currently record the isOrdered field. This could be general
// attributes of arrays.
for (i, field) in schema.fields().iter().enumerate() {
if field.dict_id() == Some(id) {
// Add (possibly multiple) array refs to the dictionaries array.
dictionaries_by_field[i] = Some(dictionary_values.clone());
}
}
Ok(())
}
/// Arrow File reader
pub struct FileReader<R: Read + Seek> {
/// Buffered file reader that supports reading and seeking
reader: BufReader<R>,
/// The schema that is read from the file header
schema: SchemaRef,
/// The blocks in the file
///
/// A block indicates the regions in the file to read to get data
blocks: Vec<ipc::Block>,
/// A counter to keep track of the current block that should be read
current_block: usize,
/// The total number of blocks, which may contain record batches and other types
total_blocks: usize,
/// Optional dictionaries for each schema field.
///
/// Dictionaries may be appended to in the streaming format.
dictionaries_by_field: Vec<Option<ArrayRef>>,
/// Metadata version
metadata_version: ipc::MetadataVersion,
}
impl<R: Read + Seek> FileReader<R> {
/// Try to create a new file reader
///
/// Returns errors if the file does not meet the Arrow Format header and footer
/// requirements
pub fn try_new(reader: R) -> Result<Self> {
let mut reader = BufReader::new(reader);
// check if header and footer contain correct magic bytes
let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::IoError(
"Arrow file does not contain correct header".to_string(),
));
}
reader.seek(SeekFrom::End(-6))?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::IoError(
"Arrow file does not contain correct footer".to_string(),
));
}
// read footer length
let mut footer_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::End(-10))?;
reader.read_exact(&mut footer_size)?;
let footer_len = i32::from_le_bytes(footer_size);
// read footer
let mut footer_data = vec![0; footer_len as usize];
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
reader.read_exact(&mut footer_data)?;
let footer = ipc::root_as_footer(&footer_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as footer: {:?}", err))
})?;
let blocks = footer.recordBatches().ok_or_else(|| {
ArrowError::IoError(
"Unable to get record batches from IPC Footer".to_string(),
)
})?;
let total_blocks = blocks.len();
let ipc_schema = footer.schema().unwrap();
let schema = ipc::convert::fb_to_schema(ipc_schema);
// Create an array of optional dictionary value arrays, one per field.
let mut dictionaries_by_field = vec![None; schema.fields().len()];
for block in footer.dictionaries().unwrap() {
// read length from end of offset
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(block.offset() as u64))?;
reader.read_exact(&mut message_size)?;
let footer_len = if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size)?;
i32::from_le_bytes(message_size)
} else {
i32::from_le_bytes(message_size)
};
let mut block_data = vec![0; footer_len as usize];
reader.read_exact(&mut block_data)?;
let message = ipc::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {:?}", err))
})?;
match message.header_type() {
ipc::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().unwrap();
// read the block that makes up the dictionary batch into a buffer
let mut buf = vec![0; block.bodyLength() as usize];
reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
reader.read_exact(&mut buf)?;
read_dictionary(&buf, batch, &schema, &mut dictionaries_by_field)?;
}
t => {
return Err(ArrowError::IoError(format!(
"Expecting DictionaryBatch in dictionary blocks, found {:?}.",
t
)));
}
};
}
Ok(Self {
reader,
schema: Arc::new(schema),
blocks: blocks.to_vec(),
current_block: 0,
total_blocks,
dictionaries_by_field,
metadata_version: footer.version(),
})
}
/// Return the number of batches in the file
pub fn num_batches(&self) -> usize {
self.total_blocks
}
/// Return the schema of the file
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
/// Read a specific record batch
///
/// Sets the current block to the index, allowing random reads
pub fn set_index(&mut self, index: usize) -> Result<()> {
if index >= self.total_blocks {
Err(ArrowError::IoError(format!(
"Cannot set batch to index {} from {} total batches",
index, self.total_blocks
)))
} else {
self.current_block = index;
Ok(())
}
}
fn maybe_next(&mut self) -> Result<Option<RecordBatch>> {
let block = self.blocks[self.current_block];
self.current_block += 1;
// read length
self.reader.seek(SeekFrom::Start(block.offset() as u64))?;
let mut meta_buf = [0; 4];
self.reader.read_exact(&mut meta_buf)?;
if meta_buf == CONTINUATION_MARKER {
// continuation marker encountered, read message next
self.reader.read_exact(&mut meta_buf)?;
}
let meta_len = i32::from_le_bytes(meta_buf);
let mut block_data = vec![0; meta_len as usize];
self.reader.read_exact(&mut block_data)?;
let message = ipc::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as footer: {:?}", err))
})?;
// some old test data's footer metadata is not set, so we account for that
if self.metadata_version != ipc::MetadataVersion::V1
&& message.version() != self.metadata_version
{
return Err(ArrowError::IoError(
"Could not read IPC message as metadata versions mismatch".to_string(),
));
}
match message.header_type() {
ipc::MessageHeader::Schema => Err(ArrowError::IoError(
"Not expecting a schema when messages are read".to_string(),
)),
ipc::MessageHeader::RecordBatch => {
let batch = message.header_as_record_batch().ok_or_else(|| {
ArrowError::IoError(
"Unable to read IPC message as record batch".to_string(),
)
})?;
// read the block that makes up the record batch into a buffer
let mut buf = vec![0; block.bodyLength() as usize];
self.reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
self.reader.read_exact(&mut buf)?;
read_record_batch(
&buf,
batch,
self.schema(),
&self.dictionaries_by_field,
).map(Some)
}
ipc::MessageHeader::NONE => {
Ok(None)
}
t => Err(ArrowError::IoError(format!(
"Reading types other than record batches not yet supported, unable to read {:?}", t
))),
}
}
}
impl<R: Read + Seek> Iterator for FileReader<R> {
type Item = Result<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
// get current block
if self.current_block < self.total_blocks {
self.maybe_next().transpose()
} else {
None
}
}
}
impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
/// Arrow Stream reader
pub struct StreamReader<R: Read> {
/// Buffered stream reader
reader: BufReader<R>,
/// The schema that is read from the stream's first message
schema: SchemaRef,
/// Optional dictionaries for each schema field.
///
/// Dictionaries may be appended to in the streaming format.
dictionaries_by_field: Vec<Option<ArrayRef>>,
/// An indicator of whether the stream is complete.
///
/// This value is set to `true` the first time the reader's `next()` returns `None`.
finished: bool,
}
impl<R: Read> StreamReader<R> {
/// Try to create a new stream reader
///
/// The first message in the stream is the schema, the reader will fail if it does not
/// encounter a schema.
/// To check if the reader is done, use `is_finished(self)`
pub fn try_new(reader: R) -> Result<Self> {
let mut reader = BufReader::new(reader);
// determine metadata length
let mut meta_size: [u8; 4] = [0; 4];
reader.read_exact(&mut meta_size)?;
let meta_len = {
// If a continuation marker is encountered, skip over it and read
// the size from the next four bytes.
if meta_size == CONTINUATION_MARKER {
reader.read_exact(&mut meta_size)?;
}
i32::from_le_bytes(meta_size)
};
let mut meta_buffer = vec![0; meta_len as usize];
reader.read_exact(&mut meta_buffer)?;
let message = ipc::root_as_message(meta_buffer.as_slice()).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {:?}", err))
})?;
// message header is a Schema, so read it
let ipc_schema: ipc::Schema = message.header_as_schema().ok_or_else(|| {
ArrowError::IoError("Unable to read IPC message as schema".to_string())
})?;
let schema = ipc::convert::fb_to_schema(ipc_schema);
// Create an array of optional dictionary value arrays, one per field.
let dictionaries_by_field = vec![None; schema.fields().len()];
Ok(Self {
reader,
schema: Arc::new(schema),
finished: false,
dictionaries_by_field,
})
}
/// Return the schema of the stream
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
/// Check if the stream is finished
pub fn is_finished(&self) -> bool {
self.finished
}
fn maybe_next(&mut self) -> Result<Option<RecordBatch>> {
if self.finished {
return Ok(None);
}
// determine metadata length
let mut meta_size: [u8; 4] = [0; 4];
match self.reader.read_exact(&mut meta_size) {
Ok(()) => (),
Err(e) => {
return if e.kind() == std::io::ErrorKind::UnexpectedEof {
// Handle EOF without the "0xFFFFFFFF 0x00000000"
// valid according to:
// https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
self.finished = true;
Ok(None)
} else {
Err(ArrowError::from(e))
};
}
}
let meta_len = {
// If a continuation marker is encountered, skip over it and read
// the size from the next four bytes.
if meta_size == CONTINUATION_MARKER {
self.reader.read_exact(&mut meta_size)?;
}
i32::from_le_bytes(meta_size)
};
if meta_len == 0 {
// the stream has ended, mark the reader as finished
self.finished = true;
return Ok(None);
}
let mut meta_buffer = vec![0; meta_len as usize];
self.reader.read_exact(&mut meta_buffer)?;
let vecs = &meta_buffer.to_vec();
let message = ipc::root_as_message(vecs).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {:?}", err))
})?;
match message.header_type() {
ipc::MessageHeader::Schema => Err(ArrowError::IoError(
"Not expecting a schema when messages are read".to_string(),
)),
ipc::MessageHeader::RecordBatch => {
let batch = message.header_as_record_batch().ok_or_else(|| {
ArrowError::IoError(
"Unable to read IPC message as record batch".to_string(),
)
})?;
// read the block that makes up the record batch into a buffer
let mut buf = vec![0; message.bodyLength() as usize];
self.reader.read_exact(&mut buf)?;
read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_field).map(Some)
}
ipc::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().ok_or_else(|| {
ArrowError::IoError(
"Unable to read IPC message as dictionary batch".to_string(),
)
})?;
// read the block that makes up the dictionary batch into a buffer
let mut buf = vec![0; message.bodyLength() as usize];
self.reader.read_exact(&mut buf)?;
read_dictionary(
&buf, batch, &self.schema, &mut self.dictionaries_by_field
)?;
// read the next message until we encounter a RecordBatch
self.maybe_next()
}
ipc::MessageHeader::NONE => {
Ok(None)
}
t => Err(ArrowError::IoError(
format!("Reading types other than record batches not yet supported, unable to read {:?} ", t)
)),
}
}
}
impl<R: Read> Iterator for StreamReader<R> {
type Item = Result<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
self.maybe_next().transpose()
}
}
impl<R: Read> RecordBatchReader for StreamReader<R> {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
use flate2::read::GzDecoder;
use crate::util::integration_util::*;
#[test]
fn read_generated_files_014() {
let testdata = crate::util::test_util::arrow_test_data();
let version = "0.14.1";
// the test is repetitive, thus we can read all supported files at once
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_dictionary",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
"generated_decimal",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
testdata, version, path
))
.unwrap();
let mut reader = FileReader::try_new(file).unwrap();
// read expected JSON output
let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader));
});
}
#[test]
#[should_panic(expected = "Big Endian is not supported for Decimal!")]
fn read_decimal_be_file_should_panic() {
let testdata = crate::util::test_util::arrow_test_data();
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_decimal.arrow_file",
testdata
))
.unwrap();
FileReader::try_new(file).unwrap();
}
#[test]
fn read_generated_be_files_should_work() {
// complementary to the previous test
let testdata = crate::util::test_util::arrow_test_data();
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_dictionary",
"generated_nested",
"generated_null_trivial",
"generated_null",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/1.0.0-bigendian/{}.arrow_file",
testdata, path
))
.unwrap();
FileReader::try_new(file).unwrap();
});
}
#[test]
fn read_generated_streams_014() {
let testdata = crate::util::test_util::arrow_test_data();
let version = "0.14.1";
// the test is repetitive, thus we can read all supported files at once
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_dictionary",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
"generated_decimal",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.stream",
testdata, version, path
))
.unwrap();
let mut reader = StreamReader::try_new(file).unwrap();
// read expected JSON output
let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader));
// the next batch must be empty
assert!(reader.next().is_none());
// the stream must indicate that it's finished
assert!(reader.is_finished());
});
}
#[test]
fn read_generated_files_100() {
let testdata = crate::util::test_util::arrow_test_data();
let version = "1.0.0-littleendian";
// the test is repetitive, thus we can read all supported files at once
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_dictionary",
"generated_nested",
"generated_null_trivial",
"generated_null",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
testdata, version, path
))
.unwrap();
let mut reader = FileReader::try_new(file).unwrap();
// read expected JSON output
let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader));
});
}
#[test]
fn read_generated_streams_100() {
let testdata = crate::util::test_util::arrow_test_data();
let version = "1.0.0-littleendian";
// the test is repetitive, thus we can read all supported files at once
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_dictionary",
"generated_nested",
"generated_null_trivial",
"generated_null",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.stream",
testdata, version, path
))
.unwrap();
let mut reader = StreamReader::try_new(file).unwrap();
// read expected JSON output
let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader));
// the next batch must be empty
assert!(reader.next().is_none());
// the stream must indicate that it's finished
assert!(reader.is_finished());
});
}
#[test]
fn test_arrow_single_float_row() {
let schema = Schema::new(vec![
Field::new("a", DataType::Float32, false),
Field::new("b", DataType::Float32, false),
Field::new("c", DataType::Int32, false),
Field::new("d", DataType::Int32, false),
]);
let arrays = vec![
Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
Arc::new(Int32Array::from(vec![2])) as ArrayRef,
Arc::new(Int32Array::from(vec![1])) as ArrayRef,
];
let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
// create stream writer
let file = File::create("target/debug/testdata/float.stream").unwrap();
let mut stream_writer =
crate::ipc::writer::StreamWriter::try_new(file, &schema).unwrap();
stream_writer.write(&batch).unwrap();
stream_writer.finish().unwrap();
// read stream back
let file = File::open("target/debug/testdata/float.stream").unwrap();
let reader = StreamReader::try_new(file).unwrap();
reader.for_each(|batch| {
let batch = batch.unwrap();
assert!(
batch
.column(0)
.as_any()
.downcast_ref::<Float32Array>()
.unwrap()
.value(0)
!= 0.0
);
assert!(
batch
.column(1)
.as_any()
.downcast_ref::<Float32Array>()
.unwrap()
.value(0)
!= 0.0
);
})
}
/// Read gzipped JSON file
fn read_gzip_json(version: &str, path: &str) -> ArrowJson {
let testdata = crate::util::test_util::arrow_test_data();
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.json.gz",
testdata, version, path
))
.unwrap();
let mut gz = GzDecoder::new(&file);
let mut s = String::new();
gz.read_to_string(&mut s).unwrap();
// convert to Arrow JSON
let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
arrow_json
}
}