| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use serde::{Deserialize, Serialize}; |
| use std::collections::HashMap; |
| |
| /// The metadata key used for storing the JSON encoded [`Schema`] |
| pub const SCHEMA_METADATA_KEY: &str = "avro.schema"; |
| |
| /// Either a [`PrimitiveType`] or a reference to a previously defined named type |
| /// |
| /// <https://avro.apache.org/docs/1.11.1/specification/#names> |
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] |
| #[serde(untagged)] |
| pub enum TypeName<'a> { |
| Primitive(PrimitiveType), |
| Ref(&'a str), |
| } |
| |
| /// A primitive type |
| /// |
| /// <https://avro.apache.org/docs/1.11.1/specification/#primitive-types> |
| #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] |
| #[serde(rename_all = "camelCase")] |
| pub enum PrimitiveType { |
| Null, |
| Boolean, |
| Int, |
| Long, |
| Float, |
| Double, |
| Bytes, |
| String, |
| } |
| |
| /// Additional attributes within a [`Schema`] |
| /// |
| /// <https://avro.apache.org/docs/1.11.1/specification/#schema-declaration> |
| #[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize, Serialize)] |
| #[serde(rename_all = "camelCase")] |
| pub struct Attributes<'a> { |
| /// A logical type name |
| /// |
| /// <https://avro.apache.org/docs/1.11.1/specification/#logical-types> |
| #[serde(default)] |
| pub logical_type: Option<&'a str>, |
| |
| /// Additional JSON attributes |
| #[serde(flatten)] |
| pub additional: HashMap<&'a str, serde_json::Value>, |
| } |
| |
| impl<'a> Attributes<'a> { |
| /// Returns the field metadata for this [`Attributes`] |
| pub(crate) fn field_metadata(&self) -> HashMap<String, String> { |
| self.additional |
| .iter() |
| .map(|(k, v)| (k.to_string(), v.to_string())) |
| .collect() |
| } |
| } |
| |
| /// A type definition that is not a variant of [`ComplexType`] |
| #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] |
| #[serde(rename_all = "camelCase")] |
| pub struct Type<'a> { |
| #[serde(borrow)] |
| pub r#type: TypeName<'a>, |
| #[serde(flatten)] |
| pub attributes: Attributes<'a>, |
| } |
| |
| /// An Avro schema |
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] |
| #[serde(untagged)] |
| pub enum Schema<'a> { |
| #[serde(borrow)] |
| TypeName(TypeName<'a>), |
| #[serde(borrow)] |
| Union(Vec<Schema<'a>>), |
| #[serde(borrow)] |
| Complex(ComplexType<'a>), |
| #[serde(borrow)] |
| Type(Type<'a>), |
| } |
| |
| /// A complex type |
| /// |
| /// <https://avro.apache.org/docs/1.11.1/specification/#complex-types> |
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] |
| #[serde(tag = "type", rename_all = "camelCase")] |
| pub enum ComplexType<'a> { |
| #[serde(borrow)] |
| Record(Record<'a>), |
| #[serde(borrow)] |
| Enum(Enum<'a>), |
| #[serde(borrow)] |
| Array(Array<'a>), |
| #[serde(borrow)] |
| Map(Map<'a>), |
| #[serde(borrow)] |
| Fixed(Fixed<'a>), |
| } |
| |
| /// A record |
| /// |
| /// <https://avro.apache.org/docs/1.11.1/specification/#schema-record> |
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] |
| pub struct Record<'a> { |
| #[serde(borrow)] |
| pub name: &'a str, |
| #[serde(borrow, default)] |
| pub namespace: Option<&'a str>, |
| #[serde(borrow, default)] |
| pub doc: Option<&'a str>, |
| #[serde(borrow, default)] |
| pub aliases: Vec<&'a str>, |
| #[serde(borrow)] |
| pub fields: Vec<Field<'a>>, |
| #[serde(flatten)] |
| pub attributes: Attributes<'a>, |
| } |
| |
| /// A field within a [`Record`] |
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] |
| pub struct Field<'a> { |
| #[serde(borrow)] |
| pub name: &'a str, |
| #[serde(borrow, default)] |
| pub doc: Option<&'a str>, |
| #[serde(borrow)] |
| pub r#type: Schema<'a>, |
| #[serde(borrow, default)] |
| pub default: Option<&'a str>, |
| } |
| |
| /// An enumeration |
| /// |
| /// <https://avro.apache.org/docs/1.11.1/specification/#enums> |
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] |
| pub struct Enum<'a> { |
| #[serde(borrow)] |
| pub name: &'a str, |
| #[serde(borrow, default)] |
| pub namespace: Option<&'a str>, |
| #[serde(borrow, default)] |
| pub doc: Option<&'a str>, |
| #[serde(borrow, default)] |
| pub aliases: Vec<&'a str>, |
| #[serde(borrow)] |
| pub symbols: Vec<&'a str>, |
| #[serde(borrow, default)] |
| pub default: Option<&'a str>, |
| #[serde(flatten)] |
| pub attributes: Attributes<'a>, |
| } |
| |
| /// An array |
| /// |
| /// <https://avro.apache.org/docs/1.11.1/specification/#arrays> |
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] |
| pub struct Array<'a> { |
| #[serde(borrow)] |
| pub items: Box<Schema<'a>>, |
| #[serde(flatten)] |
| pub attributes: Attributes<'a>, |
| } |
| |
| /// A map |
| /// |
| /// <https://avro.apache.org/docs/1.11.1/specification/#maps> |
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] |
| pub struct Map<'a> { |
| #[serde(borrow)] |
| pub values: Box<Schema<'a>>, |
| #[serde(flatten)] |
| pub attributes: Attributes<'a>, |
| } |
| |
| /// A fixed length binary array |
| /// |
| /// <https://avro.apache.org/docs/1.11.1/specification/#fixed> |
| #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] |
| pub struct Fixed<'a> { |
| #[serde(borrow)] |
| pub name: &'a str, |
| #[serde(borrow, default)] |
| pub namespace: Option<&'a str>, |
| #[serde(borrow, default)] |
| pub aliases: Vec<&'a str>, |
| pub size: usize, |
| #[serde(flatten)] |
| pub attributes: Attributes<'a>, |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::codec::{AvroDataType, AvroField}; |
| use arrow_schema::{DataType, Fields, TimeUnit}; |
| use serde_json::json; |
| |
| #[test] |
| fn test_deserialize() { |
| let t: Schema = serde_json::from_str("\"string\"").unwrap(); |
| assert_eq!( |
| t, |
| Schema::TypeName(TypeName::Primitive(PrimitiveType::String)) |
| ); |
| |
| let t: Schema = serde_json::from_str("[\"int\", \"null\"]").unwrap(); |
| assert_eq!( |
| t, |
| Schema::Union(vec![ |
| Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)), |
| Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)), |
| ]) |
| ); |
| |
| let t: Type = serde_json::from_str( |
| r#"{ |
| "type":"long", |
| "logicalType":"timestamp-micros" |
| }"#, |
| ) |
| .unwrap(); |
| |
| let timestamp = Type { |
| r#type: TypeName::Primitive(PrimitiveType::Long), |
| attributes: Attributes { |
| logical_type: Some("timestamp-micros"), |
| additional: Default::default(), |
| }, |
| }; |
| |
| assert_eq!(t, timestamp); |
| |
| let t: ComplexType = serde_json::from_str( |
| r#"{ |
| "type":"fixed", |
| "name":"fixed", |
| "namespace":"topLevelRecord.value", |
| "size":11, |
| "logicalType":"decimal", |
| "precision":25, |
| "scale":2 |
| }"#, |
| ) |
| .unwrap(); |
| |
| let decimal = ComplexType::Fixed(Fixed { |
| name: "fixed", |
| namespace: Some("topLevelRecord.value"), |
| aliases: vec![], |
| size: 11, |
| attributes: Attributes { |
| logical_type: Some("decimal"), |
| additional: vec![("precision", json!(25)), ("scale", json!(2))] |
| .into_iter() |
| .collect(), |
| }, |
| }); |
| |
| assert_eq!(t, decimal); |
| |
| let schema: Schema = serde_json::from_str( |
| r#"{ |
| "type":"record", |
| "name":"topLevelRecord", |
| "fields":[ |
| { |
| "name":"value", |
| "type":[ |
| { |
| "type":"fixed", |
| "name":"fixed", |
| "namespace":"topLevelRecord.value", |
| "size":11, |
| "logicalType":"decimal", |
| "precision":25, |
| "scale":2 |
| }, |
| "null" |
| ] |
| } |
| ] |
| }"#, |
| ) |
| .unwrap(); |
| |
| assert_eq!( |
| schema, |
| Schema::Complex(ComplexType::Record(Record { |
| name: "topLevelRecord", |
| namespace: None, |
| doc: None, |
| aliases: vec![], |
| fields: vec![Field { |
| name: "value", |
| doc: None, |
| r#type: Schema::Union(vec![ |
| Schema::Complex(decimal), |
| Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)), |
| ]), |
| default: None, |
| },], |
| attributes: Default::default(), |
| })) |
| ); |
| |
| let schema: Schema = serde_json::from_str( |
| r#"{ |
| "type": "record", |
| "name": "LongList", |
| "aliases": ["LinkedLongs"], |
| "fields" : [ |
| {"name": "value", "type": "long"}, |
| {"name": "next", "type": ["null", "LongList"]} |
| ] |
| }"#, |
| ) |
| .unwrap(); |
| |
| assert_eq!( |
| schema, |
| Schema::Complex(ComplexType::Record(Record { |
| name: "LongList", |
| namespace: None, |
| doc: None, |
| aliases: vec!["LinkedLongs"], |
| fields: vec![ |
| Field { |
| name: "value", |
| doc: None, |
| r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)), |
| default: None, |
| }, |
| Field { |
| name: "next", |
| doc: None, |
| r#type: Schema::Union(vec![ |
| Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)), |
| Schema::TypeName(TypeName::Ref("LongList")), |
| ]), |
| default: None, |
| } |
| ], |
| attributes: Attributes::default(), |
| })) |
| ); |
| |
| // Recursive schema are not supported |
| let err = AvroField::try_from(&schema).unwrap_err().to_string(); |
| assert_eq!(err, "Parser error: Failed to resolve .LongList"); |
| |
| let schema: Schema = serde_json::from_str( |
| r#"{ |
| "type":"record", |
| "name":"topLevelRecord", |
| "fields":[ |
| { |
| "name":"id", |
| "type":[ |
| "int", |
| "null" |
| ] |
| }, |
| { |
| "name":"timestamp_col", |
| "type":[ |
| { |
| "type":"long", |
| "logicalType":"timestamp-micros" |
| }, |
| "null" |
| ] |
| } |
| ] |
| }"#, |
| ) |
| .unwrap(); |
| |
| assert_eq!( |
| schema, |
| Schema::Complex(ComplexType::Record(Record { |
| name: "topLevelRecord", |
| namespace: None, |
| doc: None, |
| aliases: vec![], |
| fields: vec![ |
| Field { |
| name: "id", |
| doc: None, |
| r#type: Schema::Union(vec![ |
| Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)), |
| Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)), |
| ]), |
| default: None, |
| }, |
| Field { |
| name: "timestamp_col", |
| doc: None, |
| r#type: Schema::Union(vec![ |
| Schema::Type(timestamp), |
| Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)), |
| ]), |
| default: None, |
| } |
| ], |
| attributes: Default::default(), |
| })) |
| ); |
| let codec = AvroField::try_from(&schema).unwrap(); |
| assert_eq!( |
| codec.field(), |
| arrow_schema::Field::new( |
| "topLevelRecord", |
| DataType::Struct(Fields::from(vec![ |
| arrow_schema::Field::new("id", DataType::Int32, true), |
| arrow_schema::Field::new( |
| "timestamp_col", |
| DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), |
| true |
| ), |
| ])), |
| false |
| ) |
| ); |
| |
| let schema: Schema = serde_json::from_str( |
| r#"{ |
| "type": "record", |
| "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc", |
| "fields": [ |
| {"name": "clientHash", "type": {"type": "fixed", "name": "MD5", "size": 16}}, |
| {"name": "clientProtocol", "type": ["null", "string"]}, |
| {"name": "serverHash", "type": "MD5"}, |
| {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]} |
| ] |
| }"#, |
| ) |
| .unwrap(); |
| |
| assert_eq!( |
| schema, |
| Schema::Complex(ComplexType::Record(Record { |
| name: "HandshakeRequest", |
| namespace: Some("org.apache.avro.ipc"), |
| doc: None, |
| aliases: vec![], |
| fields: vec![ |
| Field { |
| name: "clientHash", |
| doc: None, |
| r#type: Schema::Complex(ComplexType::Fixed(Fixed { |
| name: "MD5", |
| namespace: None, |
| aliases: vec![], |
| size: 16, |
| attributes: Default::default(), |
| })), |
| default: None, |
| }, |
| Field { |
| name: "clientProtocol", |
| doc: None, |
| r#type: Schema::Union(vec![ |
| Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)), |
| Schema::TypeName(TypeName::Primitive(PrimitiveType::String)), |
| ]), |
| default: None, |
| }, |
| Field { |
| name: "serverHash", |
| doc: None, |
| r#type: Schema::TypeName(TypeName::Ref("MD5")), |
| default: None, |
| }, |
| Field { |
| name: "meta", |
| doc: None, |
| r#type: Schema::Union(vec![ |
| Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)), |
| Schema::Complex(ComplexType::Map(Map { |
| values: Box::new(Schema::TypeName(TypeName::Primitive( |
| PrimitiveType::Bytes |
| ))), |
| attributes: Default::default(), |
| })), |
| ]), |
| default: None, |
| } |
| ], |
| attributes: Default::default(), |
| })) |
| ); |
| } |
| } |