blob: 838fb9c8eab9e39ad10884bab6ac4f54cff83f95 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::error::CoreError;
use crate::error::Result;
use crate::metadata::meta_field::MetaField;
use apache_avro::schema::Schema as AvroSchema;
use apache_avro::types::Value as AvroValue;
use arrow_array::{RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use once_cell::sync::Lazy;
use serde_json::Value as JsonValue;
use std::fs;
use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;
static DELETE_RECORD_AVRO_SCHEMA_IN_JSON: Lazy<Result<JsonValue>> = Lazy::new(|| {
let schema_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("schemas")
.join("HoodieDeleteRecord.avsc");
let content = fs::read_to_string(schema_path)
.map_err(|e| CoreError::Schema(format!("Failed to read schema file: {}", e)))?;
serde_json::from_str(&content)
.map_err(|e| CoreError::Schema(format!("Failed to parse schema to JSON: {}", e)))
});
// TODO further improve perf by using once_cell for the whole function based on table config
// OR
// TODO better make avro-arrow conversion work for multiple union types
pub fn avro_schema_for_delete_record(delete_record_value: &AvroValue) -> Result<AvroSchema> {
let fields = match delete_record_value {
AvroValue::Record(fields) => fields,
_ => {
return Err(CoreError::Schema(
"Expected a record for delete record schema".to_string(),
))
}
};
// Extract the ordering value type position from the union (always at field position 2)
let ordering_val = &fields[2].1;
let ordering_val_type_pos = match ordering_val {
AvroValue::Union(type_pos, _) => {
if *type_pos == 0 {
return Err(CoreError::Schema(
"Ordering value type position must not be 0 (null) in delete log block"
.to_string(),
));
}
*type_pos
}
_ => {
return Err(CoreError::Schema(
"Expected a union for ordering value in delete record schema".to_string(),
))
}
};
let json = DELETE_RECORD_AVRO_SCHEMA_IN_JSON
.as_ref()
.map_err(|e| CoreError::Schema(e.to_string()))?;
let mut json = json.clone();
// Modify the orderingVal type array to keep only null and the selected type
{
let type_array = json
.get_mut("fields")
.and_then(|v| v.as_array_mut())
.and_then(|fields| fields.get_mut(2)) // orderingVal is always at position 2
.and_then(|field| field.get_mut("type"))
.and_then(|v| v.as_array_mut())
.ok_or_else(|| {
CoreError::Schema("Could not access orderingVal type array in schema".to_string())
})?;
// Validate bounds
if ordering_val_type_pos as usize >= type_array.len() {
return Err(CoreError::Schema(format!(
"Type position {} is out of bounds (max: {})",
ordering_val_type_pos,
type_array.len() - 1
)));
}
// Keep only "null" (index 0) and the type at the specified position
let null_type = type_array[0].clone();
let selected_type = type_array[ordering_val_type_pos as usize].clone();
*type_array = vec![null_type, selected_type];
}
// Parse and return the modified schema
AvroSchema::parse(&json).map_err(CoreError::AvroError)
}
static DELETE_RECORD_LIST_AVRO_SCHEMA: Lazy<Result<AvroSchema>> = Lazy::new(|| {
let schema_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("schemas")
.join("HoodieDeleteRecordList.avsc");
let mut file = File::open(&schema_path)
.map_err(|e| CoreError::Schema(format!("Failed to open schema file: {}", e)))?;
AvroSchema::parse_reader(&mut file).map_err(CoreError::AvroError)
});
pub fn avro_schema_for_delete_record_list() -> Result<&'static AvroSchema> {
DELETE_RECORD_LIST_AVRO_SCHEMA
.as_ref()
.map_err(|e| CoreError::Schema(e.to_string()))
}
/// Transforms a RecordBatch representing delete records into a new RecordBatch
pub fn transform_delete_record_batch(
batch: &RecordBatch,
commit_time: &str,
ordering_field: &str,
) -> Result<RecordBatch> {
let num_rows = batch.num_rows();
// Create the new _hoodie_commit_time column
let commit_time_array = Arc::new(StringArray::from(vec![commit_time.to_string(); num_rows]));
// Get the original column data directly by position
let record_key_array = batch.column(0).clone(); // recordKey at pos 0
let partition_path_array = batch.column(1).clone(); // partitionPath at pos 1
let ordering_val_array = batch.column(2).clone(); // orderingVal at pos 2
// Create new columns vector with the new order
let new_columns = vec![
commit_time_array,
record_key_array,
partition_path_array,
ordering_val_array,
];
let new_fields = vec![
Arc::new(Field::new(
MetaField::CommitTime.as_ref(),
DataType::Utf8,
true,
)),
Arc::new(Field::new(
MetaField::RecordKey.as_ref(),
DataType::Utf8,
true,
)),
Arc::new(Field::new(
MetaField::PartitionPath.as_ref(),
DataType::Utf8,
true,
)),
Arc::new(Field::new(
ordering_field,
batch.schema().field(2).data_type().clone(),
batch.schema().field(2).is_nullable(),
)),
];
let new_schema = SchemaRef::from(Schema::new(new_fields));
RecordBatch::try_new(new_schema, new_columns).map_err(CoreError::ArrowError)
}
#[cfg(test)]
mod tests {
use super::*;
use apache_avro::schema::{DecimalSchema, RecordField, RecordSchema};
use arrow_array::{Array, Int64Array};
fn validate_delete_fields(
fields: &[RecordField],
minimized_ordering_field_type: Option<AvroSchema>,
) {
assert_eq!(fields.len(), 3);
assert_eq!(fields[0].name, "recordKey");
assert_eq!(fields[1].name, "partitionPath");
assert_eq!(fields[2].name, "orderingVal");
let record_key_field = &fields[0];
match record_key_field.clone().schema {
AvroSchema::Union(union) => {
assert_eq!(union.variants().len(), 2);
assert_eq!(union.variants()[0], AvroSchema::Null);
assert_eq!(union.variants()[1], AvroSchema::String);
}
_ => panic!("Expected a Union schema for recordKey"),
}
let partition_path_field = &fields[1];
match partition_path_field.clone().schema {
AvroSchema::Union(union) => {
assert_eq!(union.variants().len(), 2);
assert_eq!(union.variants()[0], AvroSchema::Null);
assert_eq!(union.variants()[1], AvroSchema::String);
}
_ => panic!("Expected a Union schema for partitionPath"),
}
let ordering_field = &fields[2];
match ordering_field.clone().schema {
AvroSchema::Union(union) => {
if let Some(minimized_type) = minimized_ordering_field_type {
assert_eq!(union.variants().len(), 2);
assert_eq!(union.variants()[0], AvroSchema::Null);
assert_eq!(union.variants()[1], minimized_type);
} else {
assert_eq!(union.variants().len(), 13);
assert_eq!(union.variants()[0], AvroSchema::Null);
assert_eq!(union.variants()[1], AvroSchema::Int);
assert_eq!(union.variants()[2], AvroSchema::Long);
assert_eq!(union.variants()[3], AvroSchema::Float);
assert_eq!(union.variants()[4], AvroSchema::Double);
assert_eq!(union.variants()[5], AvroSchema::Bytes);
assert_eq!(union.variants()[6], AvroSchema::String);
assert_eq!(
union.variants()[7],
AvroSchema::Decimal(DecimalSchema {
precision: 30,
scale: 15,
inner: Box::new(AvroSchema::Bytes)
})
);
assert_eq!(union.variants()[8], AvroSchema::Date);
assert_eq!(union.variants()[9], AvroSchema::TimeMillis);
assert_eq!(union.variants()[10], AvroSchema::TimeMicros);
assert_eq!(union.variants()[11], AvroSchema::TimestampMillis);
assert_eq!(union.variants()[12], AvroSchema::TimestampMicros);
}
}
_ => panic!("Expected a Union schema for orderingVal"),
}
}
#[test]
fn test_schema_for_delete_record() {
let delete_record_value = AvroValue::Record(vec![
(
"recordKey".to_string(),
AvroValue::String("key1".to_string()),
),
(
"partitionPath".to_string(),
AvroValue::String("path1".to_string()),
),
(
"orderingVal".to_string(),
AvroValue::Union(1, Box::new(AvroValue::Int(42))),
),
]);
let schema = avro_schema_for_delete_record(&delete_record_value).unwrap();
let schema_name = schema.name().unwrap().clone();
assert_eq!(schema_name.namespace.unwrap(), "org.apache.hudi.avro.model");
assert_eq!(schema_name.name, "HoodieDeleteRecord");
match schema {
AvroSchema::Record(RecordSchema { fields, .. }) => {
validate_delete_fields(&fields, Some(AvroSchema::Int));
}
_ => panic!("Expected a Record schema"),
}
}
#[test]
fn test_schema_for_delete_record_list() {
let schema = avro_schema_for_delete_record_list().unwrap();
let schema_name = schema.name().unwrap().clone();
assert_eq!(schema_name.namespace.unwrap(), "org.apache.hudi.avro.model");
assert_eq!(schema_name.name, "HoodieDeleteRecordList");
match schema {
AvroSchema::Record(RecordSchema { fields, .. }) => {
assert_eq!(fields.len(), 1);
assert_eq!(fields[0].name, "deleteRecordList");
let delete_record_list_field = &fields[0];
match delete_record_list_field.clone().schema {
AvroSchema::Array(array_schema) => {
let item_schema_name = array_schema.items.name().unwrap().clone();
assert_eq!(
item_schema_name.namespace.unwrap(),
"org.apache.hudi.avro.model"
);
assert_eq!(item_schema_name.name, "HoodieDeleteRecord");
let item_schema = array_schema.items;
match item_schema.as_ref() {
AvroSchema::Record(RecordSchema { fields, .. }) => {
validate_delete_fields(fields, None);
}
_ => panic!("Expected a Record schema for items in deleteRecordList"),
}
}
_ => panic!("Expected an Array schema for deleteRecordList"),
}
}
_ => panic!("Expected a Record schema at the top level"),
}
}
fn create_test_delete_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("recordKey", DataType::Utf8, false),
Field::new("partitionPath", DataType::Utf8, true),
Field::new("orderingVal", DataType::Int64, false),
]))
}
// Helper function to create a test RecordBatch
fn create_test_delete_record_batch() -> RecordBatch {
let schema = create_test_delete_schema();
let record_keys = Arc::new(StringArray::from(vec!["key1", "key2", "key3"]));
let partition_paths = Arc::new(StringArray::from(vec![
Some("path1"),
Some("path2"),
Some("path3"),
]));
let ordering_vals = Arc::new(Int64Array::from(vec![100, 200, 300]));
RecordBatch::try_new(schema, vec![record_keys, partition_paths, ordering_vals]).unwrap()
}
// Helper function to create empty RecordBatch
fn create_empty_delete_record_batch() -> RecordBatch {
let schema = create_test_delete_schema();
let record_keys = Arc::new(StringArray::from(Vec::<&str>::new()));
let partition_paths = Arc::new(StringArray::from(Vec::<Option<&str>>::new()));
let ordering_vals = Arc::new(Int64Array::from(Vec::<i64>::new()));
RecordBatch::try_new(schema, vec![record_keys, partition_paths, ordering_vals]).unwrap()
}
#[test]
fn test_transform_delete_record_batch_basic() {
let batch = create_test_delete_record_batch();
let commit_time = "20240101000000";
let ordering_field = "sequenceNumber";
let result = transform_delete_record_batch(&batch, commit_time, ordering_field).unwrap();
// Check number of rows preserved
assert_eq!(result.num_rows(), 3);
// Check number of columns (should be 4: commit_time + original 2 + renamed ordering field)
assert_eq!(result.num_columns(), 4);
// Check schema field names
let schema = result.schema();
assert_eq!(schema.field(0).name(), MetaField::CommitTime.as_ref());
assert_eq!(schema.field(1).name(), MetaField::RecordKey.as_ref());
assert_eq!(schema.field(2).name(), MetaField::PartitionPath.as_ref());
assert_eq!(schema.field(3).name(), ordering_field);
// Check commit_time column values
let commit_time_array = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(commit_time_array.len(), 3);
assert_eq!(commit_time_array.value(0), commit_time);
assert_eq!(commit_time_array.value(1), commit_time);
assert_eq!(commit_time_array.value(2), commit_time);
// Check record key values preserved
let record_key_array = result
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(record_key_array.value(0), "key1");
assert_eq!(record_key_array.value(1), "key2");
assert_eq!(record_key_array.value(2), "key3");
// Check partition path values preserved (including nulls)
let partition_path_array = result
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(partition_path_array.value(0), "path1");
assert_eq!(partition_path_array.value(1), "path2");
assert_eq!(partition_path_array.value(2), "path3");
// Check ordering value column values preserved
let ordering_val_array = result
.column(3)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(ordering_val_array.value(0), 100);
assert_eq!(ordering_val_array.value(1), 200);
assert_eq!(ordering_val_array.value(2), 300);
}
#[test]
fn test_transform_delete_record_batch_empty() {
let batch = create_empty_delete_record_batch();
let commit_time = "20240101000000";
let ordering_field = "sequenceNumber";
let result = transform_delete_record_batch(&batch, commit_time, ordering_field).unwrap();
// Check empty batch handling
assert_eq!(result.num_rows(), 0);
assert_eq!(result.num_columns(), 4);
// Check schema is still correct
let schema = result.schema();
assert_eq!(schema.field(0).name(), MetaField::CommitTime.as_ref());
assert_eq!(schema.field(1).name(), MetaField::RecordKey.as_ref());
assert_eq!(schema.field(2).name(), MetaField::PartitionPath.as_ref());
assert_eq!(schema.field(3).name(), "sequenceNumber");
// Check all arrays are empty
for i in 0..4 {
assert_eq!(result.column(i).len(), 0);
}
}
#[test]
fn test_commit_time_values() {
let batch = create_test_delete_record_batch();
let commit_times = ["20240101000000", "20231225123045", "20240630235959"];
for commit_time in commit_times {
let result = transform_delete_record_batch(&batch, commit_time, "seq").unwrap();
let commit_time_array = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
// All rows should have the same commit time
for i in 0..result.num_rows() {
assert_eq!(commit_time_array.value(i), commit_time);
}
}
}
}