ARROW-12267: [Rust] Implement support for timestamps in JSON writer
# Rationale
I want to write timestamps to JSON (e.g. https://github.com/influxdata/influxdb_iox/pull/1104) and when I did so I got a panic :)
When I tried to write a`TimestampNanosecondArray` I got the following error:
```
thread 'influxdb_ioxd::http::tests::test_query_json' panicked at 'Unsupported datatype: Timestamp(
Nanosecond,
None,
)', /Users/alamb/.cargo/git/checkouts/arrow-3a9cfebb6b7b2bdc/3e825a7/rust/arrow/src/json/writer.rs:326:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
```
# Changes
Add support for `DataType::Timestamp`, and tests for same
I did not add support for the other date / time / duration types, but instead filed a ticket to add such support: https://issues.apache.org/jira/browse/ARROW-12317
Closes #9968 from alamb/ARROW-12267/json_writer
Authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
diff --git a/rust/arrow/src/json/writer.rs b/rust/arrow/src/json/writer.rs
index 0716b56..c872b72 100644
--- a/rust/arrow/src/json/writer.rs
+++ b/rust/arrow/src/json/writer.rs
@@ -219,6 +219,24 @@
};
}
+macro_rules! set_timestamp_column_by_array_type {
+ ($array_type:ident, $col_name:ident, $rows:ident, $array:ident, $row_count:ident) => {
+ let arr = $array.as_any().downcast_ref::<$array_type>().unwrap();
+
+ $rows
+ .iter_mut()
+ .enumerate()
+ .take($row_count)
+ .for_each(|(i, row)| {
+ if !arr.is_null(i) {
+ if let Some(v) = arr.value_as_datetime(i) {
+ row.insert($col_name.to_string(), v.to_string().into());
+ }
+ }
+ });
+ };
+}
+
fn set_column_by_primitive_type<T: ArrowPrimitiveType>(
rows: &mut [JsonMap<String, Value>],
row_count: usize,
@@ -284,6 +302,42 @@
DataType::Utf8 => {
set_column_by_array_type!(as_string_array, col_name, rows, array, row_count);
}
+ DataType::Timestamp(TimeUnit::Second, _) => {
+ set_timestamp_column_by_array_type!(
+ TimestampSecondArray,
+ col_name,
+ rows,
+ array,
+ row_count
+ );
+ }
+ DataType::Timestamp(TimeUnit::Millisecond, _) => {
+ set_timestamp_column_by_array_type!(
+ TimestampMillisecondArray,
+ col_name,
+ rows,
+ array,
+ row_count
+ );
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) => {
+ set_timestamp_column_by_array_type!(
+ TimestampMicrosecondArray,
+ col_name,
+ rows,
+ array,
+ row_count
+ );
+ }
+ DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+ set_timestamp_column_by_array_type!(
+ TimestampNanosecondArray,
+ col_name,
+ rows,
+ array,
+ row_count
+ );
+ }
DataType::Struct(_) => {
let inner_objs =
struct_array_to_jsonmap_array(as_struct_array(array), row_count);
@@ -551,6 +605,62 @@
}
#[test]
+ fn write_timestamps() {
+ let ts_string = "2018-11-13T17:11:10.011375885995";
+ let ts_nanos = ts_string
+ .parse::<chrono::NaiveDateTime>()
+ .unwrap()
+ .timestamp_nanos();
+ let ts_micros = ts_nanos / 1000;
+ let ts_millis = ts_micros / 1000;
+ let ts_secs = ts_millis / 1000;
+
+ let arr_nanos =
+ TimestampNanosecondArray::from_opt_vec(vec![Some(ts_nanos), None], None);
+ let arr_micros =
+ TimestampMicrosecondArray::from_opt_vec(vec![Some(ts_micros), None], None);
+ let arr_millis =
+ TimestampMillisecondArray::from_opt_vec(vec![Some(ts_millis), None], None);
+ let arr_secs =
+ TimestampSecondArray::from_opt_vec(vec![Some(ts_secs), None], None);
+ let arr_names = StringArray::from(vec![Some("a"), Some("b")]);
+
+ let schema = Schema::new(vec![
+ Field::new("nanos", arr_nanos.data_type().clone(), false),
+ Field::new("micros", arr_micros.data_type().clone(), false),
+ Field::new("millis", arr_millis.data_type().clone(), false),
+ Field::new("secs", arr_secs.data_type().clone(), false),
+ Field::new("name", arr_names.data_type().clone(), false),
+ ]);
+ let schema = Arc::new(schema);
+
+ let batch = RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(arr_nanos),
+ Arc::new(arr_micros),
+ Arc::new(arr_millis),
+ Arc::new(arr_secs),
+ Arc::new(arr_names),
+ ],
+ )
+ .unwrap();
+
+ let mut buf = Vec::new();
+ {
+ let mut writer = LineDelimitedWriter::new(&mut buf);
+ writer.write_batches(&[batch]).unwrap();
+ }
+
+ assert_eq!(
+ String::from_utf8(buf).unwrap(),
+ r#"{"nanos":"2018-11-13 17:11:10.011375885","micros":"2018-11-13 17:11:10.011375","millis":"2018-11-13 17:11:10.011","secs":"2018-11-13 17:11:10","name":"a"}
+{"name":"b"}
+"#
+ );
+ }
+
+ #[test]
fn write_nested_structs() {
let schema = Schema::new(vec![
Field::new(
@@ -831,7 +941,6 @@
writer.finish().unwrap();
assert_eq!(String::from_utf8(writer.into_inner()).unwrap(), "");
}
-
#[test]
fn json_writer_one_row() {
let mut writer = ArrayWriter::new(vec![] as Vec<u8>);