blob: cf572d769df5b5ef3b2015a23767ed8dbbbe907c [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Common code used in the integration test binaries
// The unused_crate_dependencies lint does not work well for crates defining additional examples/bin targets
#![allow(unused_crate_dependencies)]
#![warn(missing_docs)]
use serde_json::Value;
use arrow::array::{Array, StructArray};
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::error::{ArrowError, Result};
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema, from_ffi_and_data_type};
use arrow::record_batch::RecordBatch;
use arrow::util::test_util::arrow_test_data;
use arrow_integration_test::*;
use std::collections::HashMap;
use std::ffi::{CStr, CString, c_char, c_int};
use std::fs::File;
use std::io::BufReader;
use std::iter::zip;
use std::ptr;
use std::sync::Arc;
/// The expected username for the basic auth integration test.
pub const AUTH_USERNAME: &str = "arrow";
/// The expected password for the basic auth integration test.
pub const AUTH_PASSWORD: &str = "flight";
pub mod flight_client_scenarios;
pub mod flight_server_scenarios;
/// An Arrow file in JSON format
pub struct ArrowFile {
/// The schema of the file
pub schema: Schema,
// we can evolve this into a concrete Arrow type
// this is temporarily not being read from
dictionaries: HashMap<i64, ArrowJsonDictionaryBatch>,
arrow_json: Value,
}
impl ArrowFile {
/// Read a single [RecordBatch] from the file
pub fn read_batch(&self, batch_num: usize) -> Result<RecordBatch> {
let b = self.arrow_json["batches"].get(batch_num).unwrap();
let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
record_batch_from_json(&self.schema, json_batch, Some(&self.dictionaries))
}
/// Read all [RecordBatch]es from the file
pub fn read_batches(&self) -> Result<Vec<RecordBatch>> {
self.arrow_json["batches"]
.as_array()
.unwrap()
.iter()
.map(|b| {
let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
record_batch_from_json(&self.schema, json_batch, Some(&self.dictionaries))
})
.collect()
}
}
/// Canonicalize the names of map fields in a schema
pub fn canonicalize_schema(schema: &Schema) -> Schema {
let fields = schema
.fields()
.iter()
.map(|field| match field.data_type() {
DataType::Map(child_field, sorted) => match child_field.data_type() {
DataType::Struct(fields) if fields.len() == 2 => {
let first_field = &fields[0];
let key_field =
Arc::new(Field::new("key", first_field.data_type().clone(), false));
let second_field = &fields[1];
let value_field = Arc::new(Field::new(
"value",
second_field.data_type().clone(),
second_field.is_nullable(),
));
let fields = Fields::from([key_field, value_field]);
let struct_type = DataType::Struct(fields);
let child_field = Field::new("entries", struct_type, false);
Arc::new(Field::new(
field.name().as_str(),
DataType::Map(Arc::new(child_field), *sorted),
field.is_nullable(),
))
}
_ => panic!("The child field of Map type should be Struct type with 2 fields."),
},
_ => field.clone(),
})
.collect::<Fields>();
Schema::new(fields).with_metadata(schema.metadata().clone())
}
/// Read an Arrow file in JSON format
pub fn open_json_file(json_name: &str) -> Result<ArrowFile> {
let json_file = File::open(json_name)?;
let reader = BufReader::new(json_file);
let arrow_json: Value = serde_json::from_reader(reader).unwrap();
let schema = schema_from_json(&arrow_json["schema"])?;
// read dictionaries
let mut dictionaries = HashMap::new();
if let Some(dicts) = arrow_json.get("dictionaries") {
for d in dicts
.as_array()
.expect("Unable to get dictionaries as array")
{
let json_dict: ArrowJsonDictionaryBatch =
serde_json::from_value(d.clone()).expect("Unable to get dictionary from JSON");
// TODO: convert to a concrete Arrow type
dictionaries.insert(json_dict.id, json_dict);
}
}
Ok(ArrowFile {
schema,
dictionaries,
arrow_json,
})
}
/// Read gzipped JSON test file
///
/// For example given the input:
/// version = `0.17.1`
/// path = `generated_union`
///
/// Returns the contents of
/// `arrow-ipc-stream/integration/0.17.1/generated_union.json.gz`
pub fn read_gzip_json(version: &str, path: &str) -> ArrowJson {
use flate2::read::GzDecoder;
use std::io::Read;
let testdata = arrow_test_data();
let file = File::open(format!(
"{testdata}/arrow-ipc-stream/integration/{version}/{path}.json.gz"
))
.unwrap();
let mut gz = GzDecoder::new(&file);
let mut s = String::new();
gz.read_to_string(&mut s).unwrap();
// convert to Arrow JSON
let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
arrow_json
}
/// C Data Integration entrypoint to export the schema from a JSON file
fn cdata_integration_export_schema_from_json(
c_json_name: *const c_char,
out: *mut FFI_ArrowSchema,
) -> Result<()> {
let json_name = unsafe { CStr::from_ptr(c_json_name) };
let f = open_json_file(json_name.to_str()?)?;
let c_schema = FFI_ArrowSchema::try_from(&f.schema)?;
// Move exported schema into output struct
unsafe { ptr::write(out, c_schema) };
Ok(())
}
/// C Data Integration entrypoint to export a batch from a JSON file
fn cdata_integration_export_batch_from_json(
c_json_name: *const c_char,
batch_num: c_int,
out: *mut FFI_ArrowArray,
) -> Result<()> {
let json_name = unsafe { CStr::from_ptr(c_json_name) };
let b = open_json_file(json_name.to_str()?)?.read_batch(batch_num.try_into().unwrap())?;
let a = StructArray::from(b).into_data();
let c_array = FFI_ArrowArray::new(&a);
// Move exported array into output struct
unsafe { ptr::write(out, c_array) };
Ok(())
}
fn cdata_integration_import_schema_and_compare_to_json(
c_json_name: *const c_char,
c_schema: *mut FFI_ArrowSchema,
) -> Result<()> {
let json_name = unsafe { CStr::from_ptr(c_json_name) };
let json_schema = open_json_file(json_name.to_str()?)?.schema;
// The source ArrowSchema will be released when this is dropped
let imported_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema) };
let imported_schema = Schema::try_from(&imported_schema)?;
// compare schemas
if canonicalize_schema(&json_schema) != canonicalize_schema(&imported_schema) {
return Err(ArrowError::ComputeError(format!(
"Schemas do not match.\n- JSON: {json_schema:?}\n- Imported: {imported_schema:?}",
)));
}
Ok(())
}
fn compare_batches(a: &RecordBatch, b: &RecordBatch) -> Result<()> {
if a.num_columns() != b.num_columns() {
return Err(ArrowError::InvalidArgumentError(
"batches do not have the same number of columns".to_string(),
));
}
for (a_column, b_column) in zip(a.columns(), b.columns()) {
if a_column != b_column {
return Err(ArrowError::InvalidArgumentError(
"batch columns are not the same".to_string(),
));
}
}
Ok(())
}
fn cdata_integration_import_batch_and_compare_to_json(
c_json_name: *const c_char,
batch_num: c_int,
c_array: *mut FFI_ArrowArray,
) -> Result<()> {
let json_name = unsafe { CStr::from_ptr(c_json_name) };
let json_batch =
open_json_file(json_name.to_str()?)?.read_batch(batch_num.try_into().unwrap())?;
let schema = json_batch.schema();
let data_type_for_import = DataType::Struct(schema.fields.clone());
let imported_array = unsafe { FFI_ArrowArray::from_raw(c_array) };
let imported_array = unsafe { from_ffi_and_data_type(imported_array, data_type_for_import) }?;
imported_array.validate_full()?;
let imported_batch = RecordBatch::from(StructArray::from(imported_array));
compare_batches(&json_batch, &imported_batch)
}
// If Result is an error, then export a const char* to its string display, otherwise NULL
fn result_to_c_error<T, E: std::fmt::Display>(result: &std::result::Result<T, E>) -> *mut c_char {
match result {
Ok(_) => ptr::null_mut(),
Err(e) => CString::new(format!("{e}")).unwrap().into_raw(),
}
}
/// Release a const char* exported by result_to_c_error()
///
/// # Safety
///
/// The pointer is assumed to have been obtained using CString::into_raw.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn arrow_rs_free_error(c_error: *mut c_char) {
if !c_error.is_null() {
drop(unsafe { CString::from_raw(c_error) });
}
}
/// A C-ABI for exporting an Arrow schema from a JSON file
#[unsafe(no_mangle)]
pub extern "C" fn arrow_rs_cdata_integration_export_schema_from_json(
c_json_name: *const c_char,
out: *mut FFI_ArrowSchema,
) -> *mut c_char {
let r = cdata_integration_export_schema_from_json(c_json_name, out);
result_to_c_error(&r)
}
/// A C-ABI to compare an Arrow schema against a JSON file
#[unsafe(no_mangle)]
pub extern "C" fn arrow_rs_cdata_integration_import_schema_and_compare_to_json(
c_json_name: *const c_char,
c_schema: *mut FFI_ArrowSchema,
) -> *mut c_char {
let r = cdata_integration_import_schema_and_compare_to_json(c_json_name, c_schema);
result_to_c_error(&r)
}
/// A C-ABI for exporting a RecordBatch from a JSON file
#[unsafe(no_mangle)]
pub extern "C" fn arrow_rs_cdata_integration_export_batch_from_json(
c_json_name: *const c_char,
batch_num: c_int,
out: *mut FFI_ArrowArray,
) -> *mut c_char {
let r = cdata_integration_export_batch_from_json(c_json_name, batch_num, out);
result_to_c_error(&r)
}
/// A C-ABI to compare a RecordBatch against a JSON file
#[unsafe(no_mangle)]
pub extern "C" fn arrow_rs_cdata_integration_import_batch_and_compare_to_json(
c_json_name: *const c_char,
batch_num: c_int,
c_array: *mut FFI_ArrowArray,
) -> *mut c_char {
let r = cdata_integration_import_batch_and_compare_to_json(c_json_name, batch_num, c_array);
result_to_c_error(&r)
}