| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! # JSON Writer |
| //! |
| //! This JSON writer converts Arrow [`RecordBatch`]es into arrays of |
| //! JSON objects or JSON formatted byte streams. |
| //! |
| //! ## Writing JSON Objects |
| //! |
| //! To serialize [`RecordBatch`]es into array of |
| //! [JSON](https://docs.serde.rs/serde_json/) objects, use |
| //! [`record_batches_to_json_rows`]: |
| //! |
| //! ``` |
| //! use std::sync::Arc; |
| //! |
| //! use arrow::array::Int32Array; |
| //! use arrow::datatypes::{DataType, Field, Schema}; |
| //! use arrow::json; |
| //! use arrow::record_batch::RecordBatch; |
| //! |
| //! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); |
| //! let a = Int32Array::from(vec![1, 2, 3]); |
| //! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); |
| //! |
| //! let json_rows = json::writer::record_batches_to_json_rows(&[batch]); |
| //! assert_eq!( |
| //! serde_json::Value::Object(json_rows[1].clone()), |
| //! serde_json::json!({"a": 2}), |
| //! ); |
| //! ``` |
| //! |
| //! ## Writing JSON formatted byte streams |
| //! |
| //! To serialize [`RecordBatch`]es into line-delimited JSON bytes, use |
| //! [`LineDelimitedWriter`]: |
| //! |
| //! ``` |
| //! use std::sync::Arc; |
| //! |
| //! use arrow::array::Int32Array; |
| //! use arrow::datatypes::{DataType, Field, Schema}; |
| //! use arrow::json; |
| //! use arrow::record_batch::RecordBatch; |
| //! |
| //! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); |
| //! let a = Int32Array::from(vec![1, 2, 3]); |
| //! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); |
| //! |
| //! // Write the record batch out as JSON |
| //! let buf = Vec::new(); |
| //! let mut writer = json::LineDelimitedWriter::new(buf); |
| //! writer.write_batches(&vec![batch]).unwrap(); |
| //! writer.finish().unwrap(); |
| //! |
| //! // Get the underlying buffer back, |
| //! let buf = writer.into_inner(); |
| //! assert_eq!(r#"{"a":1} |
| //! {"a":2} |
| //! {"a":3} |
| //!"#, String::from_utf8(buf).unwrap()) |
| //! ``` |
| //! |
| //! To serialize [`RecordBatch`]es into a well formed JSON array, use |
| //! [`ArrayWriter`]: |
| //! |
| //! ``` |
| //! use std::sync::Arc; |
| //! |
| //! use arrow::array::Int32Array; |
| //! use arrow::datatypes::{DataType, Field, Schema}; |
| //! use arrow::json; |
| //! use arrow::record_batch::RecordBatch; |
| //! |
| //! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); |
| //! let a = Int32Array::from(vec![1, 2, 3]); |
| //! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); |
| //! |
| //! // Write the record batch out as a JSON array |
| //! let buf = Vec::new(); |
| //! let mut writer = json::ArrayWriter::new(buf); |
| //! writer.write_batches(&vec![batch]).unwrap(); |
| //! writer.finish().unwrap(); |
| //! |
| //! // Get the underlying buffer back, |
| //! let buf = writer.into_inner(); |
| //! assert_eq!(r#"[{"a":1},{"a":2},{"a":3}]"#, String::from_utf8(buf).unwrap()) |
| //! ``` |
| |
| use std::iter; |
| use std::{fmt::Debug, io::Write}; |
| |
| use serde_json::map::Map as JsonMap; |
| use serde_json::Value; |
| |
| use crate::array::*; |
| use crate::datatypes::*; |
| use crate::error::Result; |
| use crate::record_batch::RecordBatch; |
| |
| fn primitive_array_to_json<T: ArrowPrimitiveType>(array: &ArrayRef) -> Vec<Value> { |
| as_primitive_array::<T>(array) |
| .iter() |
| .map(|maybe_value| match maybe_value { |
| Some(v) => v.into_json_value().unwrap_or(Value::Null), |
| None => Value::Null, |
| }) |
| .collect() |
| } |
| |
| fn struct_array_to_jsonmap_array( |
| array: &StructArray, |
| row_count: usize, |
| ) -> Vec<JsonMap<String, Value>> { |
| let inner_col_names = array.column_names(); |
| |
| let mut inner_objs = iter::repeat(JsonMap::new()) |
| .take(row_count) |
| .collect::<Vec<JsonMap<String, Value>>>(); |
| |
| array |
| .columns() |
| .iter() |
| .enumerate() |
| .for_each(|(j, struct_col)| { |
| set_column_for_json_rows( |
| &mut inner_objs, |
| row_count, |
| struct_col, |
| inner_col_names[j], |
| ); |
| }); |
| |
| inner_objs |
| } |
| |
| /// Converts an arrow [`ArrayRef`] into a `Vec` of Serde JSON [`serde_json::Value`]'s |
| pub fn array_to_json_array(array: &ArrayRef) -> Vec<Value> { |
| match array.data_type() { |
| DataType::Null => iter::repeat(Value::Null).take(array.len()).collect(), |
| DataType::Boolean => as_boolean_array(array) |
| .iter() |
| .map(|maybe_value| match maybe_value { |
| Some(v) => v.into(), |
| None => Value::Null, |
| }) |
| .collect(), |
| |
| DataType::Utf8 => as_string_array(array) |
| .iter() |
| .map(|maybe_value| match maybe_value { |
| Some(v) => v.into(), |
| None => Value::Null, |
| }) |
| .collect(), |
| DataType::Int8 => primitive_array_to_json::<Int8Type>(array), |
| DataType::Int16 => primitive_array_to_json::<Int16Type>(array), |
| DataType::Int32 => primitive_array_to_json::<Int32Type>(array), |
| DataType::Int64 => primitive_array_to_json::<Int64Type>(array), |
| DataType::UInt8 => primitive_array_to_json::<UInt8Type>(array), |
| DataType::UInt16 => primitive_array_to_json::<UInt16Type>(array), |
| DataType::UInt32 => primitive_array_to_json::<UInt32Type>(array), |
| DataType::UInt64 => primitive_array_to_json::<UInt64Type>(array), |
| DataType::Float32 => primitive_array_to_json::<Float32Type>(array), |
| DataType::Float64 => primitive_array_to_json::<Float64Type>(array), |
| DataType::List(_) => as_list_array(array) |
| .iter() |
| .map(|maybe_value| match maybe_value { |
| Some(v) => Value::Array(array_to_json_array(&v)), |
| None => Value::Null, |
| }) |
| .collect(), |
| DataType::LargeList(_) => as_large_list_array(array) |
| .iter() |
| .map(|maybe_value| match maybe_value { |
| Some(v) => Value::Array(array_to_json_array(&v)), |
| None => Value::Null, |
| }) |
| .collect(), |
| DataType::Struct(_) => { |
| let jsonmaps = |
| struct_array_to_jsonmap_array(as_struct_array(array), array.len()); |
| jsonmaps.into_iter().map(Value::Object).collect() |
| } |
| _ => { |
| panic!( |
| "Unsupported datatype for array conversion: {:#?}", |
| array.data_type() |
| ); |
| } |
| } |
| } |
| |
| macro_rules! set_column_by_array_type { |
| ($cast_fn:ident, $col_name:ident, $rows:ident, $array:ident, $row_count:ident) => { |
| let arr = $cast_fn($array); |
| $rows.iter_mut().zip(arr.iter()).take($row_count).for_each( |
| |(row, maybe_value)| { |
| if let Some(v) = maybe_value { |
| row.insert($col_name.to_string(), v.into()); |
| } |
| }, |
| ); |
| }; |
| } |
| |
| macro_rules! set_temporal_column_by_array_type { |
| ($array_type:ident, $col_name:ident, $rows:ident, $array:ident, $row_count:ident, $cast_fn:ident) => { |
| let arr = $array.as_any().downcast_ref::<$array_type>().unwrap(); |
| |
| $rows |
| .iter_mut() |
| .enumerate() |
| .take($row_count) |
| .for_each(|(i, row)| { |
| if !arr.is_null(i) { |
| if let Some(v) = arr.$cast_fn(i) { |
| row.insert($col_name.to_string(), v.to_string().into()); |
| } |
| } |
| }); |
| }; |
| } |
| |
| fn set_column_by_primitive_type<T: ArrowPrimitiveType>( |
| rows: &mut [JsonMap<String, Value>], |
| row_count: usize, |
| array: &ArrayRef, |
| col_name: &str, |
| ) { |
| let primitive_arr = as_primitive_array::<T>(array); |
| |
| rows.iter_mut() |
| .zip(primitive_arr.iter()) |
| .take(row_count) |
| .for_each(|(row, maybe_value)| { |
| // when value is null, we simply skip setting the key |
| if let Some(j) = maybe_value.and_then(|v| v.into_json_value()) { |
| row.insert(col_name.to_string(), j); |
| } |
| }); |
| } |
| |
| fn set_column_for_json_rows( |
| rows: &mut [JsonMap<String, Value>], |
| row_count: usize, |
| array: &ArrayRef, |
| col_name: &str, |
| ) { |
| match array.data_type() { |
| DataType::Int8 => { |
| set_column_by_primitive_type::<Int8Type>(rows, row_count, array, col_name) |
| } |
| DataType::Int16 => { |
| set_column_by_primitive_type::<Int16Type>(rows, row_count, array, col_name) |
| } |
| DataType::Int32 => { |
| set_column_by_primitive_type::<Int32Type>(rows, row_count, array, col_name) |
| } |
| DataType::Int64 => { |
| set_column_by_primitive_type::<Int64Type>(rows, row_count, array, col_name) |
| } |
| DataType::UInt8 => { |
| set_column_by_primitive_type::<UInt8Type>(rows, row_count, array, col_name) |
| } |
| DataType::UInt16 => { |
| set_column_by_primitive_type::<UInt16Type>(rows, row_count, array, col_name) |
| } |
| DataType::UInt32 => { |
| set_column_by_primitive_type::<UInt32Type>(rows, row_count, array, col_name) |
| } |
| DataType::UInt64 => { |
| set_column_by_primitive_type::<UInt64Type>(rows, row_count, array, col_name) |
| } |
| DataType::Float32 => { |
| set_column_by_primitive_type::<Float32Type>(rows, row_count, array, col_name) |
| } |
| DataType::Float64 => { |
| set_column_by_primitive_type::<Float64Type>(rows, row_count, array, col_name) |
| } |
| DataType::Null => { |
| // when value is null, we simply skip setting the key |
| } |
| DataType::Boolean => { |
| set_column_by_array_type!(as_boolean_array, col_name, rows, array, row_count); |
| } |
| DataType::Utf8 => { |
| set_column_by_array_type!(as_string_array, col_name, rows, array, row_count); |
| } |
| DataType::Date32 => { |
| set_temporal_column_by_array_type!( |
| Date32Array, |
| col_name, |
| rows, |
| array, |
| row_count, |
| value_as_date |
| ); |
| } |
| DataType::Date64 => { |
| set_temporal_column_by_array_type!( |
| Date64Array, |
| col_name, |
| rows, |
| array, |
| row_count, |
| value_as_date |
| ); |
| } |
| DataType::Timestamp(TimeUnit::Second, _) => { |
| set_temporal_column_by_array_type!( |
| TimestampSecondArray, |
| col_name, |
| rows, |
| array, |
| row_count, |
| value_as_datetime |
| ); |
| } |
| DataType::Timestamp(TimeUnit::Millisecond, _) => { |
| set_temporal_column_by_array_type!( |
| TimestampMillisecondArray, |
| col_name, |
| rows, |
| array, |
| row_count, |
| value_as_datetime |
| ); |
| } |
| DataType::Timestamp(TimeUnit::Microsecond, _) => { |
| set_temporal_column_by_array_type!( |
| TimestampMicrosecondArray, |
| col_name, |
| rows, |
| array, |
| row_count, |
| value_as_datetime |
| ); |
| } |
| DataType::Timestamp(TimeUnit::Nanosecond, _) => { |
| set_temporal_column_by_array_type!( |
| TimestampNanosecondArray, |
| col_name, |
| rows, |
| array, |
| row_count, |
| value_as_datetime |
| ); |
| } |
| DataType::Time32(TimeUnit::Second) => { |
| set_temporal_column_by_array_type!( |
| Time32SecondArray, |
| col_name, |
| rows, |
| array, |
| row_count, |
| value_as_time |
| ); |
| } |
| DataType::Time32(TimeUnit::Millisecond) => { |
| set_temporal_column_by_array_type!( |
| Time32MillisecondArray, |
| col_name, |
| rows, |
| array, |
| row_count, |
| value_as_time |
| ); |
| } |
| DataType::Time64(TimeUnit::Microsecond) => { |
| set_temporal_column_by_array_type!( |
| Time64MicrosecondArray, |
| col_name, |
| rows, |
| array, |
| row_count, |
| value_as_time |
| ); |
| } |
| DataType::Time64(TimeUnit::Nanosecond) => { |
| set_temporal_column_by_array_type!( |
| Time64NanosecondArray, |
| col_name, |
| rows, |
| array, |
| row_count, |
| value_as_time |
| ); |
| } |
| DataType::Duration(TimeUnit::Second) => { |
| set_temporal_column_by_array_type!( |
| DurationSecondArray, |
| col_name, |
| rows, |
| array, |
| row_count, |
| value_as_duration |
| ); |
| } |
| DataType::Duration(TimeUnit::Millisecond) => { |
| set_temporal_column_by_array_type!( |
| DurationMillisecondArray, |
| col_name, |
| rows, |
| array, |
| row_count, |
| value_as_duration |
| ); |
| } |
| DataType::Duration(TimeUnit::Microsecond) => { |
| set_temporal_column_by_array_type!( |
| DurationMicrosecondArray, |
| col_name, |
| rows, |
| array, |
| row_count, |
| value_as_duration |
| ); |
| } |
| DataType::Duration(TimeUnit::Nanosecond) => { |
| set_temporal_column_by_array_type!( |
| DurationNanosecondArray, |
| col_name, |
| rows, |
| array, |
| row_count, |
| value_as_duration |
| ); |
| } |
| DataType::Struct(_) => { |
| let inner_objs = |
| struct_array_to_jsonmap_array(as_struct_array(array), row_count); |
| rows.iter_mut() |
| .take(row_count) |
| .zip(inner_objs.into_iter()) |
| .for_each(|(row, obj)| { |
| row.insert(col_name.to_string(), Value::Object(obj)); |
| }); |
| } |
| DataType::List(_) => { |
| let listarr = as_list_array(array); |
| rows.iter_mut() |
| .zip(listarr.iter()) |
| .take(row_count) |
| .for_each(|(row, maybe_value)| { |
| if let Some(v) = maybe_value { |
| row.insert( |
| col_name.to_string(), |
| Value::Array(array_to_json_array(&v)), |
| ); |
| } |
| }); |
| } |
| DataType::LargeList(_) => { |
| let listarr = as_large_list_array(array); |
| rows.iter_mut() |
| .zip(listarr.iter()) |
| .take(row_count) |
| .for_each(|(row, maybe_value)| { |
| if let Some(v) = maybe_value { |
| row.insert( |
| col_name.to_string(), |
| Value::Array(array_to_json_array(&v)), |
| ); |
| } |
| }); |
| } |
| DataType::Dictionary(_, value_type) => { |
| let slice = array.slice(0, row_count); |
| let hydrated = crate::compute::kernels::cast::cast(&slice, &value_type) |
| .expect("cannot cast dictionary to underlying values"); |
| set_column_for_json_rows(rows, row_count, &hydrated, col_name) |
| } |
| _ => { |
| panic!("Unsupported datatype: {:#?}", array.data_type()); |
| } |
| } |
| } |
| |
| /// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON |
| /// [`JsonMap`]s (objects) |
| pub fn record_batches_to_json_rows( |
| batches: &[RecordBatch], |
| ) -> Vec<JsonMap<String, Value>> { |
| let mut rows: Vec<JsonMap<String, Value>> = iter::repeat(JsonMap::new()) |
| .take(batches.iter().map(|b| b.num_rows()).sum()) |
| .collect(); |
| |
| if !rows.is_empty() { |
| let schema = batches[0].schema(); |
| let mut base = 0; |
| batches.iter().for_each(|batch| { |
| let row_count = batch.num_rows(); |
| batch.columns().iter().enumerate().for_each(|(j, col)| { |
| let col_name = schema.field(j).name(); |
| set_column_for_json_rows(&mut rows[base..], row_count, col, col_name); |
| }); |
| base += row_count; |
| }); |
| } |
| |
| rows |
| } |
| |
| /// This trait defines how to format a sequence of JSON objects to a |
| /// byte stream. |
| pub trait JsonFormat: Debug + Default { |
| #[inline] |
| /// write any bytes needed at the start of the file to the writer |
| fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<()> { |
| Ok(()) |
| } |
| |
| #[inline] |
| /// write any bytes needed for the start of each row |
| fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> Result<()> { |
| Ok(()) |
| } |
| |
| #[inline] |
| /// write any bytes needed for the end of each row |
| fn end_row<W: Write>(&self, _writer: &mut W) -> Result<()> { |
| Ok(()) |
| } |
| |
| /// write any bytes needed for the start of each row |
| fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<()> { |
| Ok(()) |
| } |
| } |
| |
| /// Produces JSON output with one record per line. For example |
| /// |
| /// ```json |
| /// {"foo":1} |
| /// {"bar":1} |
| /// |
| /// ``` |
| #[derive(Debug, Default)] |
| pub struct LineDelimited {} |
| |
| impl JsonFormat for LineDelimited { |
| fn end_row<W: Write>(&self, writer: &mut W) -> Result<()> { |
| writer.write_all(b"\n")?; |
| Ok(()) |
| } |
| } |
| |
| /// Produces JSON output as a single JSON array. For example |
| /// |
| /// ```json |
| /// [{"foo":1},{"bar":1}] |
| /// ``` |
| #[derive(Debug, Default)] |
| pub struct JsonArray {} |
| |
| impl JsonFormat for JsonArray { |
| fn start_stream<W: Write>(&self, writer: &mut W) -> Result<()> { |
| writer.write_all(b"[")?; |
| Ok(()) |
| } |
| |
| fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> Result<()> { |
| if !is_first_row { |
| writer.write_all(b",")?; |
| } |
| Ok(()) |
| } |
| |
| fn end_stream<W: Write>(&self, writer: &mut W) -> Result<()> { |
| writer.write_all(b"]")?; |
| Ok(()) |
| } |
| } |
| |
| /// A JSON writer which serializes [`RecordBatch`]es to newline delimited JSON objects |
| pub type LineDelimitedWriter<W> = Writer<W, LineDelimited>; |
| |
| /// A JSON writer which serializes [`RecordBatch`]es to JSON arrays |
| pub type ArrayWriter<W> = Writer<W, JsonArray>; |
| |
| /// A JSON writer which serializes [`RecordBatch`]es to a stream of |
| /// `u8` encoded JSON objects. See the module level documentation for |
| /// detailed usage and examples. The specific format of the stream is |
| /// controlled by the [`JsonFormat`] type parameter. |
| #[derive(Debug)] |
| pub struct Writer<W, F> |
| where |
| W: Write, |
| F: JsonFormat, |
| { |
| /// Underlying writer to use to write bytes |
| writer: W, |
| |
| /// Has the writer output any records yet? |
| started: bool, |
| |
| /// Is the writer finished? |
| finished: bool, |
| |
| /// Determines how the byte stream is formatted |
| format: F, |
| } |
| |
| impl<W, F> Writer<W, F> |
| where |
| W: Write, |
| F: JsonFormat, |
| { |
| /// Construct a new writer |
| pub fn new(writer: W) -> Self { |
| Self { |
| writer, |
| started: false, |
| finished: false, |
| format: F::default(), |
| } |
| } |
| |
| /// Write a single JSON row to the output writer |
| pub fn write_row(&mut self, row: &Value) -> Result<()> { |
| let is_first_row = !self.started; |
| if !self.started { |
| self.format.start_stream(&mut self.writer)?; |
| self.started = true; |
| } |
| |
| self.format.start_row(&mut self.writer, is_first_row)?; |
| self.writer.write_all(&serde_json::to_vec(row)?)?; |
| self.format.end_row(&mut self.writer)?; |
| Ok(()) |
| } |
| |
| /// Convert the [`RecordBatch`] into JSON rows, and write them to the output |
| pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> { |
| for row in record_batches_to_json_rows(batches) { |
| self.write_row(&Value::Object(row))?; |
| } |
| Ok(()) |
| } |
| |
| /// Finishes the output stream. This function must be called after |
| /// all record batches have been produced. (e.g. producing the final `']'` if writing |
| /// arrays. |
| pub fn finish(&mut self) -> Result<()> { |
| if self.started && !self.finished { |
| self.format.end_stream(&mut self.writer)?; |
| self.finished = true; |
| } |
| Ok(()) |
| } |
| |
| /// Unwraps this `Writer<W>`, returning the underlying writer |
| pub fn into_inner(self) -> W { |
| self.writer |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use std::convert::TryFrom; |
| use std::fs::{read_to_string, File}; |
| use std::sync::Arc; |
| |
| use serde_json::json; |
| |
| use crate::buffer::*; |
| use crate::json::reader::*; |
| |
| use super::*; |
| |
| #[test] |
| fn write_simple_rows() { |
| let schema = Schema::new(vec![ |
| Field::new("c1", DataType::Int32, true), |
| Field::new("c2", DataType::Utf8, true), |
| ]); |
| |
| let a = Int32Array::from(vec![Some(1), Some(2), Some(3), None, Some(5)]); |
| let b = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]); |
| |
| let batch = |
| RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]) |
| .unwrap(); |
| |
| let mut buf = Vec::new(); |
| { |
| let mut writer = LineDelimitedWriter::new(&mut buf); |
| writer.write_batches(&[batch]).unwrap(); |
| } |
| |
| assert_eq!( |
| String::from_utf8(buf).unwrap(), |
| r#"{"c1":1,"c2":"a"} |
| {"c1":2,"c2":"b"} |
| {"c1":3,"c2":"c"} |
| {"c2":"d"} |
| {"c1":5} |
| "# |
| ); |
| } |
| |
| #[test] |
| fn write_dictionary() { |
| let schema = Schema::new(vec![ |
| Field::new( |
| "c1", |
| DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), |
| true, |
| ), |
| Field::new( |
| "c2", |
| DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)), |
| true, |
| ), |
| ]); |
| |
| let a: DictionaryArray<Int32Type> = vec![ |
| Some("cupcakes"), |
| Some("foo"), |
| Some("foo"), |
| None, |
| Some("cupcakes"), |
| ] |
| .into_iter() |
| .collect(); |
| let b: DictionaryArray<Int8Type> = |
| vec![Some("sdsd"), Some("sdsd"), None, Some("sd"), Some("sdsd")] |
| .into_iter() |
| .collect(); |
| |
| let batch = |
| RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]) |
| .unwrap(); |
| |
| let mut buf = Vec::new(); |
| { |
| let mut writer = LineDelimitedWriter::new(&mut buf); |
| writer.write_batches(&[batch]).unwrap(); |
| } |
| |
| assert_eq!( |
| String::from_utf8(buf).unwrap(), |
| r#"{"c1":"cupcakes","c2":"sdsd"} |
| {"c1":"foo","c2":"sdsd"} |
| {"c1":"foo"} |
| {"c2":"sd"} |
| {"c1":"cupcakes","c2":"sdsd"} |
| "# |
| ); |
| } |
| |
| #[test] |
| fn write_timestamps() { |
| let ts_string = "2018-11-13T17:11:10.011375885995"; |
| let ts_nanos = ts_string |
| .parse::<chrono::NaiveDateTime>() |
| .unwrap() |
| .timestamp_nanos(); |
| let ts_micros = ts_nanos / 1000; |
| let ts_millis = ts_micros / 1000; |
| let ts_secs = ts_millis / 1000; |
| |
| let arr_nanos = |
| TimestampNanosecondArray::from_opt_vec(vec![Some(ts_nanos), None], None); |
| let arr_micros = |
| TimestampMicrosecondArray::from_opt_vec(vec![Some(ts_micros), None], None); |
| let arr_millis = |
| TimestampMillisecondArray::from_opt_vec(vec![Some(ts_millis), None], None); |
| let arr_secs = |
| TimestampSecondArray::from_opt_vec(vec![Some(ts_secs), None], None); |
| let arr_names = StringArray::from(vec![Some("a"), Some("b")]); |
| |
| let schema = Schema::new(vec![ |
| Field::new("nanos", arr_nanos.data_type().clone(), false), |
| Field::new("micros", arr_micros.data_type().clone(), false), |
| Field::new("millis", arr_millis.data_type().clone(), false), |
| Field::new("secs", arr_secs.data_type().clone(), false), |
| Field::new("name", arr_names.data_type().clone(), false), |
| ]); |
| let schema = Arc::new(schema); |
| |
| let batch = RecordBatch::try_new( |
| schema, |
| vec![ |
| Arc::new(arr_nanos), |
| Arc::new(arr_micros), |
| Arc::new(arr_millis), |
| Arc::new(arr_secs), |
| Arc::new(arr_names), |
| ], |
| ) |
| .unwrap(); |
| |
| let mut buf = Vec::new(); |
| { |
| let mut writer = LineDelimitedWriter::new(&mut buf); |
| writer.write_batches(&[batch]).unwrap(); |
| } |
| |
| assert_eq!( |
| String::from_utf8(buf).unwrap(), |
| r#"{"nanos":"2018-11-13 17:11:10.011375885","micros":"2018-11-13 17:11:10.011375","millis":"2018-11-13 17:11:10.011","secs":"2018-11-13 17:11:10","name":"a"} |
| {"name":"b"} |
| "# |
| ); |
| } |
| |
| #[test] |
| fn write_dates() { |
| let ts_string = "2018-11-13T17:11:10.011375885995"; |
| let ts_millis = ts_string |
| .parse::<chrono::NaiveDateTime>() |
| .unwrap() |
| .timestamp_millis(); |
| |
| let arr_date32 = Date32Array::from(vec![ |
| Some(i32::try_from(ts_millis / 1000 / (60 * 60 * 24)).unwrap()), |
| None, |
| ]); |
| let arr_date64 = Date64Array::from(vec![Some(ts_millis), None]); |
| let arr_names = StringArray::from(vec![Some("a"), Some("b")]); |
| |
| let schema = Schema::new(vec![ |
| Field::new("date32", arr_date32.data_type().clone(), false), |
| Field::new("date64", arr_date64.data_type().clone(), false), |
| Field::new("name", arr_names.data_type().clone(), false), |
| ]); |
| let schema = Arc::new(schema); |
| |
| let batch = RecordBatch::try_new( |
| schema, |
| vec![ |
| Arc::new(arr_date32), |
| Arc::new(arr_date64), |
| Arc::new(arr_names), |
| ], |
| ) |
| .unwrap(); |
| |
| let mut buf = Vec::new(); |
| { |
| let mut writer = LineDelimitedWriter::new(&mut buf); |
| writer.write_batches(&[batch]).unwrap(); |
| } |
| |
| assert_eq!( |
| String::from_utf8(buf).unwrap(), |
| r#"{"date32":"2018-11-13","date64":"2018-11-13","name":"a"} |
| {"name":"b"} |
| "# |
| ); |
| } |
| |
| #[test] |
| fn write_times() { |
| let arr_time32sec = Time32SecondArray::from(vec![Some(120), None]); |
| let arr_time32msec = Time32MillisecondArray::from(vec![Some(120), None]); |
| let arr_time64usec = Time64MicrosecondArray::from(vec![Some(120), None]); |
| let arr_time64nsec = Time64NanosecondArray::from(vec![Some(120), None]); |
| let arr_names = StringArray::from(vec![Some("a"), Some("b")]); |
| |
| let schema = Schema::new(vec![ |
| Field::new("time32sec", arr_time32sec.data_type().clone(), false), |
| Field::new("time32msec", arr_time32msec.data_type().clone(), false), |
| Field::new("time64usec", arr_time64usec.data_type().clone(), false), |
| Field::new("time64nsec", arr_time64nsec.data_type().clone(), false), |
| Field::new("name", arr_names.data_type().clone(), false), |
| ]); |
| let schema = Arc::new(schema); |
| |
| let batch = RecordBatch::try_new( |
| schema, |
| vec![ |
| Arc::new(arr_time32sec), |
| Arc::new(arr_time32msec), |
| Arc::new(arr_time64usec), |
| Arc::new(arr_time64nsec), |
| Arc::new(arr_names), |
| ], |
| ) |
| .unwrap(); |
| |
| let mut buf = Vec::new(); |
| { |
| let mut writer = LineDelimitedWriter::new(&mut buf); |
| writer.write_batches(&[batch]).unwrap(); |
| } |
| |
| assert_eq!( |
| String::from_utf8(buf).unwrap(), |
| r#"{"time32sec":"00:02:00","time32msec":"00:00:00.120","time64usec":"00:00:00.000120","time64nsec":"00:00:00.000000120","name":"a"} |
| {"name":"b"} |
| "# |
| ); |
| } |
| |
| #[test] |
| fn write_durations() { |
| let arr_durationsec = DurationSecondArray::from(vec![Some(120), None]); |
| let arr_durationmsec = DurationMillisecondArray::from(vec![Some(120), None]); |
| let arr_durationusec = DurationMicrosecondArray::from(vec![Some(120), None]); |
| let arr_durationnsec = DurationNanosecondArray::from(vec![Some(120), None]); |
| let arr_names = StringArray::from(vec![Some("a"), Some("b")]); |
| |
| let schema = Schema::new(vec![ |
| Field::new("duration_sec", arr_durationsec.data_type().clone(), false), |
| Field::new("duration_msec", arr_durationmsec.data_type().clone(), false), |
| Field::new("duration_usec", arr_durationusec.data_type().clone(), false), |
| Field::new("duration_nsec", arr_durationnsec.data_type().clone(), false), |
| Field::new("name", arr_names.data_type().clone(), false), |
| ]); |
| let schema = Arc::new(schema); |
| |
| let batch = RecordBatch::try_new( |
| schema, |
| vec![ |
| Arc::new(arr_durationsec), |
| Arc::new(arr_durationmsec), |
| Arc::new(arr_durationusec), |
| Arc::new(arr_durationnsec), |
| Arc::new(arr_names), |
| ], |
| ) |
| .unwrap(); |
| |
| let mut buf = Vec::new(); |
| { |
| let mut writer = LineDelimitedWriter::new(&mut buf); |
| writer.write_batches(&[batch]).unwrap(); |
| } |
| |
| assert_eq!( |
| String::from_utf8(buf).unwrap(), |
| r#"{"duration_sec":"PT120S","duration_msec":"PT0.120S","duration_usec":"PT0.000120S","duration_nsec":"PT0.000000120S","name":"a"} |
| {"name":"b"} |
| "# |
| ); |
| } |
| |
| #[test] |
| fn write_nested_structs() { |
| let schema = Schema::new(vec![ |
| Field::new( |
| "c1", |
| DataType::Struct(vec![ |
| Field::new("c11", DataType::Int32, false), |
| Field::new( |
| "c12", |
| DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)]), |
| false, |
| ), |
| ]), |
| false, |
| ), |
| Field::new("c2", DataType::Utf8, false), |
| ]); |
| |
| let c1 = StructArray::from(vec![ |
| ( |
| Field::new("c11", DataType::Int32, false), |
| Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef, |
| ), |
| ( |
| Field::new( |
| "c12", |
| DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)]), |
| false, |
| ), |
| Arc::new(StructArray::from(vec![( |
| Field::new("c121", DataType::Utf8, false), |
| Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) |
| as ArrayRef, |
| )])) as ArrayRef, |
| ), |
| ]); |
| let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]); |
| |
| let batch = |
| RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]) |
| .unwrap(); |
| |
| let mut buf = Vec::new(); |
| { |
| let mut writer = LineDelimitedWriter::new(&mut buf); |
| writer.write_batches(&[batch]).unwrap(); |
| } |
| |
| assert_eq!( |
| String::from_utf8(buf).unwrap(), |
| r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"} |
| {"c1":{"c12":{"c121":"f"}},"c2":"b"} |
| {"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"} |
| "# |
| ); |
| } |
| |
| #[test] |
| fn write_struct_with_list_field() { |
| let field_c1 = Field::new( |
| "c1", |
| DataType::List(Box::new(Field::new("c_list", DataType::Utf8, false))), |
| false, |
| ); |
| let field_c2 = Field::new("c2", DataType::Int32, false); |
| let schema = Schema::new(vec![field_c1.clone(), field_c2]); |
| |
| let a_values = StringArray::from(vec!["a", "a1", "b", "c", "d", "e"]); |
| // list column rows: ["a", "a1"], ["b"], ["c"], ["d"], ["e"] |
| let a_value_offsets = Buffer::from(&[0, 2, 3, 4, 5, 6].to_byte_slice()); |
| let a_list_data = ArrayData::builder(field_c1.data_type().clone()) |
| .len(5) |
| .add_buffer(a_value_offsets) |
| .add_child_data(a_values.data().clone()) |
| .null_bit_buffer(Buffer::from(vec![0b00011111])) |
| .build(); |
| let a = ListArray::from(a_list_data); |
| |
| let b = Int32Array::from(vec![1, 2, 3, 4, 5]); |
| |
| let batch = |
| RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]) |
| .unwrap(); |
| |
| let mut buf = Vec::new(); |
| { |
| let mut writer = LineDelimitedWriter::new(&mut buf); |
| writer.write_batches(&[batch]).unwrap(); |
| } |
| |
| assert_eq!( |
| String::from_utf8(buf).unwrap(), |
| r#"{"c1":["a","a1"],"c2":1} |
| {"c1":["b"],"c2":2} |
| {"c1":["c"],"c2":3} |
| {"c1":["d"],"c2":4} |
| {"c1":["e"],"c2":5} |
| "# |
| ); |
| } |
| |
| #[test] |
| fn write_nested_list() { |
| let list_inner_type = Field::new( |
| "a", |
| DataType::List(Box::new(Field::new("b", DataType::Int32, false))), |
| false, |
| ); |
| let field_c1 = Field::new( |
| "c1", |
| DataType::List(Box::new(list_inner_type.clone())), |
| false, |
| ); |
| let field_c2 = Field::new("c2", DataType::Utf8, false); |
| let schema = Schema::new(vec![field_c1.clone(), field_c2]); |
| |
| // list column rows: [[1, 2], [3]], [], [[4, 5, 6]] |
| let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]); |
| |
| let a_value_offsets = Buffer::from(&[0, 2, 3, 6].to_byte_slice()); |
| // Construct a list array from the above two |
| let a_list_data = ArrayData::builder(list_inner_type.data_type().clone()) |
| .len(3) |
| .add_buffer(a_value_offsets) |
| .null_bit_buffer(Buffer::from(vec![0b00000111])) |
| .add_child_data(a_values.data().clone()) |
| .build(); |
| |
| let c1_value_offsets = Buffer::from(&[0, 2, 2, 3].to_byte_slice()); |
| let c1_list_data = ArrayData::builder(field_c1.data_type().clone()) |
| .len(3) |
| .add_buffer(c1_value_offsets) |
| .add_child_data(a_list_data) |
| .build(); |
| |
| let c1 = ListArray::from(c1_list_data); |
| let c2 = StringArray::from(vec![Some("foo"), Some("bar"), None]); |
| |
| let batch = |
| RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]) |
| .unwrap(); |
| |
| let mut buf = Vec::new(); |
| { |
| let mut writer = LineDelimitedWriter::new(&mut buf); |
| writer.write_batches(&[batch]).unwrap(); |
| } |
| |
| assert_eq!( |
| String::from_utf8(buf).unwrap(), |
| r#"{"c1":[[1,2],[3]],"c2":"foo"} |
| {"c1":[],"c2":"bar"} |
| {"c1":[[4,5,6]]} |
| "# |
| ); |
| } |
| |
| #[test] |
| fn write_list_of_struct() { |
| let field_c1 = Field::new( |
| "c1", |
| DataType::List(Box::new(Field::new( |
| "s", |
| DataType::Struct(vec![ |
| Field::new("c11", DataType::Int32, false), |
| Field::new( |
| "c12", |
| DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)]), |
| false, |
| ), |
| ]), |
| false, |
| ))), |
| true, |
| ); |
| let field_c2 = Field::new("c2", DataType::Int32, false); |
| let schema = Schema::new(vec![field_c1.clone(), field_c2]); |
| |
| let struct_values = StructArray::from(vec![ |
| ( |
| Field::new("c11", DataType::Int32, false), |
| Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef, |
| ), |
| ( |
| Field::new( |
| "c12", |
| DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)]), |
| false, |
| ), |
| Arc::new(StructArray::from(vec![( |
| Field::new("c121", DataType::Utf8, false), |
| Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) |
| as ArrayRef, |
| )])) as ArrayRef, |
| ), |
| ]); |
| |
| // list column rows (c1): |
| // [{"c11": 1, "c12": {"c121": "e"}}, {"c12": {"c121": "f"}}], |
| // null, |
| // [{"c11": 5, "c12": {"c121": "g"}}] |
| let c1_value_offsets = Buffer::from(&[0, 2, 2, 3].to_byte_slice()); |
| let c1_list_data = ArrayData::builder(field_c1.data_type().clone()) |
| .len(3) |
| .add_buffer(c1_value_offsets) |
| .add_child_data(struct_values.data().clone()) |
| .null_bit_buffer(Buffer::from(vec![0b00000101])) |
| .build(); |
| let c1 = ListArray::from(c1_list_data); |
| |
| let c2 = Int32Array::from(vec![1, 2, 3]); |
| |
| let batch = |
| RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]) |
| .unwrap(); |
| |
| let mut buf = Vec::new(); |
| { |
| let mut writer = LineDelimitedWriter::new(&mut buf); |
| writer.write_batches(&[batch]).unwrap(); |
| } |
| |
| assert_eq!( |
| String::from_utf8(buf).unwrap(), |
| r#"{"c1":[{"c11":1,"c12":{"c121":"e"}},{"c12":{"c121":"f"}}],"c2":1} |
| {"c2":2} |
| {"c1":[{"c11":5,"c12":{"c121":"g"}}],"c2":3} |
| "# |
| ); |
| } |
| |
| fn test_write_for_file(test_file: &str) { |
| let builder = ReaderBuilder::new() |
| .infer_schema(None) |
| .with_batch_size(1024); |
| let mut reader: Reader<File> = builder |
| .build::<File>(File::open(test_file).unwrap()) |
| .unwrap(); |
| let batch = reader.next().unwrap().unwrap(); |
| |
| let mut buf = Vec::new(); |
| { |
| let mut writer = LineDelimitedWriter::new(&mut buf); |
| writer.write_batches(&[batch]).unwrap(); |
| } |
| |
| let result = String::from_utf8(buf).unwrap(); |
| let expected = read_to_string(test_file).unwrap(); |
| for (r, e) in result.lines().zip(expected.lines()) { |
| let mut expected_json = serde_json::from_str::<Value>(e).unwrap(); |
| // remove null value from object to make comparision consistent: |
| if let Value::Object(obj) = expected_json { |
| expected_json = Value::Object( |
| obj.into_iter().filter(|(_, v)| *v != Value::Null).collect(), |
| ); |
| } |
| assert_eq!(serde_json::from_str::<Value>(r).unwrap(), expected_json,); |
| } |
| } |
| |
| #[test] |
| fn write_basic_rows() { |
| test_write_for_file("test/data/basic.json"); |
| } |
| |
| #[test] |
| fn write_arrays() { |
| test_write_for_file("test/data/arrays.json"); |
| } |
| |
| #[test] |
| fn write_basic_nulls() { |
| test_write_for_file("test/data/basic_nulls.json"); |
| } |
| |
| #[test] |
| fn json_writer_empty() { |
| let mut writer = ArrayWriter::new(vec![] as Vec<u8>); |
| writer.finish().unwrap(); |
| assert_eq!(String::from_utf8(writer.into_inner()).unwrap(), ""); |
| } |
| #[test] |
| fn json_writer_one_row() { |
| let mut writer = ArrayWriter::new(vec![] as Vec<u8>); |
| let v = json!({ "an": "object" }); |
| writer.write_row(&v).unwrap(); |
| writer.finish().unwrap(); |
| assert_eq!( |
| String::from_utf8(writer.into_inner()).unwrap(), |
| r#"[{"an":"object"}]"# |
| ); |
| } |
| |
| #[test] |
| fn json_writer_two_rows() { |
| let mut writer = ArrayWriter::new(vec![] as Vec<u8>); |
| let v = json!({ "an": "object" }); |
| writer.write_row(&v).unwrap(); |
| let v = json!({ "another": "object" }); |
| writer.write_row(&v).unwrap(); |
| writer.finish().unwrap(); |
| assert_eq!( |
| String::from_utf8(writer.into_inner()).unwrap(), |
| r#"[{"an":"object"},{"another":"object"}]"# |
| ); |
| } |
| } |