| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Logic for parsing and interacting with schemas in Avro format. |
| use crate::{error::Error, types, util::MapHelper, AvroResult}; |
| use digest::Digest; |
| use lazy_static::lazy_static; |
| use regex::Regex; |
| use serde::{ |
| ser::{SerializeMap, SerializeSeq}, |
| Deserialize, Serialize, Serializer, |
| }; |
| use serde_json::{Map, Value}; |
| use std::{ |
| borrow::Cow, |
| collections::{BTreeMap, HashMap, HashSet}, |
| convert::{TryFrom, TryInto}, |
| fmt, |
| hash::Hash, |
| str::FromStr, |
| }; |
| use strum_macros::{EnumDiscriminants, EnumString}; |
| |
| lazy_static! { |
| static ref ENUM_SYMBOL_NAME_R: Regex = Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").unwrap(); |
| |
| // An optional namespace (with optional dots) followed by a name without any dots in it. |
| static ref SCHEMA_NAME_R: Regex = |
| Regex::new(r"^((?P<namespace>[A-Za-z_][A-Za-z0-9_\.]*)*\.)?(?P<name>[A-Za-z_][A-Za-z0-9_]*)$").unwrap(); |
| } |
| |
| /// Represents an Avro schema fingerprint |
| /// More information about Avro schema fingerprints can be found in the |
| /// [Avro Schema Fingerprint documentation](https://avro.apache.org/docs/current/spec.html#schema_fingerprints) |
| pub struct SchemaFingerprint { |
| pub bytes: Vec<u8>, |
| } |
| |
| impl fmt::Display for SchemaFingerprint { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| write!( |
| f, |
| "{}", |
| self.bytes |
| .iter() |
| .map(|byte| format!("{:02x}", byte)) |
| .collect::<Vec<String>>() |
| .join("") |
| ) |
| } |
| } |
| |
| /// Represents any valid Avro schema |
| /// More information about Avro schemas can be found in the |
| /// [Avro Specification](https://avro.apache.org/docs/current/spec.html#schemas) |
| #[derive(Clone, Debug, EnumDiscriminants)] |
| #[strum_discriminants(name(SchemaKind), derive(Hash, Ord, PartialOrd))] |
| pub enum Schema { |
| /// A `null` Avro schema. |
| Null, |
| /// A `boolean` Avro schema. |
| Boolean, |
| /// An `int` Avro schema. |
| Int, |
| /// A `long` Avro schema. |
| Long, |
| /// A `float` Avro schema. |
| Float, |
| /// A `double` Avro schema. |
| Double, |
| /// A `bytes` Avro schema. |
| /// `Bytes` represents a sequence of 8-bit unsigned bytes. |
| Bytes, |
| /// A `string` Avro schema. |
| /// `String` represents a unicode character sequence. |
| String, |
| /// A `array` Avro schema. Avro arrays are required to have the same type for each element. |
| /// This variant holds the `Schema` for the array element type. |
| Array(Box<Schema>), |
| /// A `map` Avro schema. |
| /// `Map` holds a pointer to the `Schema` of its values, which must all be the same schema. |
| /// `Map` keys are assumed to be `string`. |
| Map(Box<Schema>), |
| /// A `union` Avro schema. |
| Union(UnionSchema), |
| /// A `record` Avro schema. |
| /// |
| /// The `lookup` table maps field names to their position in the `Vec` |
| /// of `fields`. |
| Record { |
| name: Name, |
| aliases: Aliases, |
| doc: Documentation, |
| fields: Vec<RecordField>, |
| lookup: BTreeMap<String, usize>, |
| }, |
| /// An `enum` Avro schema. |
| Enum { |
| name: Name, |
| aliases: Aliases, |
| doc: Documentation, |
| symbols: Vec<String>, |
| }, |
| /// A `fixed` Avro schema. |
| Fixed { |
| name: Name, |
| aliases: Aliases, |
| doc: Documentation, |
| size: usize, |
| }, |
| /// Logical type which represents `Decimal` values. The underlying type is serialized and |
| /// deserialized as `Schema::Bytes` or `Schema::Fixed`. |
| /// |
| /// `scale` defaults to 0 and is an integer greater than or equal to 0 and `precision` is an |
| /// integer greater than 0. |
| Decimal { |
| precision: DecimalMetadata, |
| scale: DecimalMetadata, |
| inner: Box<Schema>, |
| }, |
| /// A universally unique identifier, annotating a string. |
| Uuid, |
| /// Logical type which represents the number of days since the unix epoch. |
| /// Serialization format is `Schema::Int`. |
| Date, |
| /// The time of day in number of milliseconds after midnight with no reference any calendar, |
| /// time zone or date in particular. |
| TimeMillis, |
| /// The time of day in number of microseconds after midnight with no reference any calendar, |
| /// time zone or date in particular. |
| TimeMicros, |
| /// An instant in time represented as the number of milliseconds after the UNIX epoch. |
| TimestampMillis, |
| /// An instant in time represented as the number of microseconds after the UNIX epoch. |
| TimestampMicros, |
| /// An amount of time defined by a number of months, days and milliseconds. |
| Duration, |
| // A reference to another schema. |
| Ref { |
| name: Name, |
| }, |
| } |
| |
| impl PartialEq for Schema { |
| /// Assess equality of two `Schema` based on [Parsing Canonical Form]. |
| /// |
| /// [Parsing Canonical Form]: |
| /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas |
| fn eq(&self, other: &Self) -> bool { |
| self.canonical_form() == other.canonical_form() |
| } |
| } |
| |
| impl SchemaKind { |
| pub fn is_primitive(self) -> bool { |
| matches!( |
| self, |
| SchemaKind::Null |
| | SchemaKind::Boolean |
| | SchemaKind::Int |
| | SchemaKind::Long |
| | SchemaKind::Double |
| | SchemaKind::Float |
| | SchemaKind::Bytes |
| | SchemaKind::String, |
| ) |
| } |
| |
| pub fn is_named(self) -> bool { |
| matches!( |
| self, |
| SchemaKind::Record | SchemaKind::Enum | SchemaKind::Fixed | SchemaKind::Ref |
| ) |
| } |
| } |
| |
| impl From<&types::Value> for SchemaKind { |
| fn from(value: &types::Value) -> Self { |
| use crate::types::Value; |
| match value { |
| Value::Null => Self::Null, |
| Value::Boolean(_) => Self::Boolean, |
| Value::Int(_) => Self::Int, |
| Value::Long(_) => Self::Long, |
| Value::Float(_) => Self::Float, |
| Value::Double(_) => Self::Double, |
| Value::Bytes(_) => Self::Bytes, |
| Value::String(_) => Self::String, |
| Value::Array(_) => Self::Array, |
| Value::Map(_) => Self::Map, |
| Value::Union(_, _) => Self::Union, |
| Value::Record(_) => Self::Record, |
| Value::Enum(_, _) => Self::Enum, |
| Value::Fixed(_, _) => Self::Fixed, |
| Value::Decimal { .. } => Self::Decimal, |
| Value::Uuid(_) => Self::Uuid, |
| Value::Date(_) => Self::Date, |
| Value::TimeMillis(_) => Self::TimeMillis, |
| Value::TimeMicros(_) => Self::TimeMicros, |
| Value::TimestampMillis(_) => Self::TimestampMillis, |
| Value::TimestampMicros(_) => Self::TimestampMicros, |
| Value::Duration { .. } => Self::Duration, |
| } |
| } |
| } |
| |
| /// Represents names for `record`, `enum` and `fixed` Avro schemas. |
| /// |
| /// Each of these `Schema`s have a `fullname` composed of two parts: |
| /// * a name |
| /// * a namespace |
| /// |
| /// `aliases` can also be defined, to facilitate schema evolution. |
| /// |
| /// More information about schema names can be found in the |
| /// [Avro specification](https://avro.apache.org/docs/current/spec.html#names) |
| #[derive(Clone, Debug, Hash, PartialEq, Eq)] |
| pub struct Name { |
| pub name: String, |
| pub namespace: Namespace, |
| } |
| |
| /// Represents documentation for complex Avro schemas. |
| pub type Documentation = Option<String>; |
| /// Represents the aliases for Named Schema |
| pub type Aliases = Option<Vec<Alias>>; |
| /// Represents Schema lookup within a schema env |
| pub(crate) type Names = HashMap<Name, Schema>; |
| /// Represents Schema lookup within a schema |
| pub(crate) type NamesRef<'a> = HashMap<Name, &'a Schema>; |
| /// Represents the namespace for Named Schema |
| pub type Namespace = Option<String>; |
| |
| impl Name { |
| /// Create a new `Name`. |
| /// Parses the optional `namespace` from the `name` string. |
| /// `aliases` will not be defined. |
| pub fn new(name: &str) -> AvroResult<Self> { |
| let (name, namespace) = Name::get_name_and_namespace(name)?; |
| Ok(Self { name, namespace }) |
| } |
| |
| fn get_name_and_namespace(name: &str) -> AvroResult<(String, Namespace)> { |
| let caps = SCHEMA_NAME_R |
| .captures(name) |
| .ok_or_else(|| Error::InvalidSchemaName(name.to_string(), SCHEMA_NAME_R.as_str()))?; |
| Ok(( |
| caps["name"].to_string(), |
| caps.name("namespace").map(|s| s.as_str().to_string()), |
| )) |
| } |
| |
| /// Parse a `serde_json::Value` into a `Name`. |
| pub(crate) fn parse(complex: &Map<String, Value>) -> AvroResult<Self> { |
| let (name, namespace_from_name) = complex |
| .name() |
| .map(|name| Name::get_name_and_namespace(name.as_str()).unwrap()) |
| .ok_or(Error::GetNameField)?; |
| // FIXME Reading name from the type is wrong ! The name there is just a metadata (AVRO-3430) |
| let type_name = match complex.get("type") { |
| Some(Value::Object(complex_type)) => complex_type.name().or(None), |
| _ => None, |
| }; |
| |
| Ok(Self { |
| name: type_name.unwrap_or(name), |
| namespace: namespace_from_name.or_else(|| complex.string("namespace")), |
| }) |
| } |
| |
| /// Return the `fullname` of this `Name` |
| /// |
| /// More information about fullnames can be found in the |
| /// [Avro specification](https://avro.apache.org/docs/current/spec.html#names) |
| pub fn fullname(&self, default_namespace: Namespace) -> String { |
| if self.name.contains('.') { |
| self.name.clone() |
| } else { |
| let namespace = self.namespace.clone().or(default_namespace); |
| |
| match namespace { |
| Some(ref namespace) => format!("{}.{}", namespace, self.name), |
| None => self.name.clone(), |
| } |
| } |
| } |
| |
| /// Return the fully qualified name needed for indexing or searching for the schema within a schema/schema env context. Puts the enclosing namespace into the name's namespace for clarity in schema/schema env parsing |
| /// ```ignore |
| /// use apache_avro::schema::Name; |
| /// |
| /// assert_eq!( |
| /// Name::new("some_name").unwrap().fully_qualified_name(&Some("some_namespace".into())), |
| /// Name::new("some_namespace.some_name").unwrap() |
| /// ); |
| /// assert_eq!( |
| /// Name::new("some_namespace.some_name").unwrap().fully_qualified_name(&Some("other_namespace".into())), |
| /// Name::new("some_namespace.some_name").unwrap() |
| /// ); |
| /// ``` |
| pub fn fully_qualified_name(&self, enclosing_namespace: &Namespace) -> Name { |
| Name { |
| name: self.name.clone(), |
| namespace: self |
| .namespace |
| .clone() |
| .or_else(|| enclosing_namespace.clone()), |
| } |
| } |
| } |
| |
| impl From<&str> for Name { |
| fn from(name: &str) -> Self { |
| Name::new(name).unwrap() |
| } |
| } |
| |
| impl fmt::Display for Name { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.write_str(&self.fullname(None)[..]) |
| } |
| } |
| |
| impl<'de> Deserialize<'de> for Name { |
| fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> |
| where |
| D: serde::de::Deserializer<'de>, |
| { |
| serde_json::Value::deserialize(deserializer).and_then(|value| { |
| use serde::de::Error; |
| if let Value::Object(json) = value { |
| Name::parse(&json).map_err(D::Error::custom) |
| } else { |
| Err(D::Error::custom(format!( |
| "Expected a JSON object: {:?}", |
| value |
| ))) |
| } |
| }) |
| } |
| } |
| |
| /// Newtype pattern for `Name` to better control the `serde_json::Value` representation. |
| /// Aliases are serialized as an array of plain strings in the JSON representation. |
| #[derive(Clone, Debug, Hash, PartialEq, Eq)] |
| pub struct Alias(Name); |
| |
| impl Alias { |
| pub fn new(name: &str) -> AvroResult<Self> { |
| Name::new(name).map(Self) |
| } |
| |
| pub fn name(&self) -> String { |
| self.0.name.clone() |
| } |
| |
| pub fn namespace(&self) -> Namespace { |
| self.0.namespace.clone() |
| } |
| |
| pub fn fullname(&self, default_namespace: Namespace) -> String { |
| self.0.fullname(default_namespace) |
| } |
| |
| pub fn fully_qualified_name(&self, default_namespace: &Namespace) -> Name { |
| self.0.fully_qualified_name(default_namespace) |
| } |
| } |
| |
| impl From<&str> for Alias { |
| fn from(name: &str) -> Self { |
| Alias::new(name).unwrap() |
| } |
| } |
| |
| impl Serialize for Alias { |
| fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> |
| where |
| S: Serializer, |
| { |
| serializer.serialize_str(&self.fullname(None)) |
| } |
| } |
| |
| pub(crate) struct ResolvedSchema<'s> { |
| names_ref: NamesRef<'s>, |
| root_schema: &'s Schema, |
| } |
| |
| impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> { |
| type Error = Error; |
| |
| fn try_from(schema: &'s Schema) -> AvroResult<Self> { |
| let names = HashMap::new(); |
| let mut rs = ResolvedSchema { |
| names_ref: names, |
| root_schema: schema, |
| }; |
| Self::from_internal(rs.root_schema, &mut rs.names_ref, &None)?; |
| Ok(rs) |
| } |
| } |
| |
| impl<'s> ResolvedSchema<'s> { |
| pub(crate) fn get_root_schema(&self) -> &'s Schema { |
| self.root_schema |
| } |
| pub(crate) fn get_names(&self) -> &NamesRef<'s> { |
| &self.names_ref |
| } |
| |
| fn from_internal( |
| schema: &'s Schema, |
| names_ref: &mut NamesRef<'s>, |
| enclosing_namespace: &Namespace, |
| ) -> AvroResult<()> { |
| match schema { |
| Schema::Array(schema) | Schema::Map(schema) => { |
| Self::from_internal(schema, names_ref, enclosing_namespace) |
| } |
| Schema::Union(UnionSchema { schemas, .. }) => { |
| for schema in schemas { |
| Self::from_internal(schema, names_ref, enclosing_namespace)? |
| } |
| Ok(()) |
| } |
| Schema::Enum { name, .. } | Schema::Fixed { name, .. } => { |
| let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); |
| if names_ref |
| .insert(fully_qualified_name.clone(), schema) |
| .is_some() |
| { |
| Err(Error::AmbiguousSchemaDefinition(fully_qualified_name)) |
| } else { |
| Ok(()) |
| } |
| } |
| Schema::Record { name, fields, .. } => { |
| let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); |
| if names_ref |
| .insert(fully_qualified_name.clone(), schema) |
| .is_some() |
| { |
| Err(Error::AmbiguousSchemaDefinition(fully_qualified_name)) |
| } else { |
| let record_namespace = fully_qualified_name.namespace; |
| for field in fields { |
| Self::from_internal(&field.schema, names_ref, &record_namespace)? |
| } |
| Ok(()) |
| } |
| } |
| Schema::Ref { name } => { |
| let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); |
| names_ref |
| .get(&fully_qualified_name) |
| .map(|_| ()) |
| .ok_or(Error::SchemaResolutionError(fully_qualified_name)) |
| } |
| _ => Ok(()), |
| } |
| } |
| } |
| |
| pub(crate) struct ResolvedOwnedSchema { |
| names: Names, |
| root_schema: Schema, |
| } |
| |
| impl TryFrom<Schema> for ResolvedOwnedSchema { |
| type Error = Error; |
| |
| fn try_from(schema: Schema) -> AvroResult<Self> { |
| let names = HashMap::new(); |
| let mut rs = ResolvedOwnedSchema { |
| names, |
| root_schema: schema, |
| }; |
| Self::from_internal(&rs.root_schema, &mut rs.names, &None)?; |
| Ok(rs) |
| } |
| } |
| |
| impl ResolvedOwnedSchema { |
| pub(crate) fn get_root_schema(&self) -> &Schema { |
| &self.root_schema |
| } |
| pub(crate) fn get_names(&self) -> &Names { |
| &self.names |
| } |
| |
| fn from_internal( |
| schema: &Schema, |
| names: &mut Names, |
| enclosing_namespace: &Namespace, |
| ) -> AvroResult<()> { |
| match schema { |
| Schema::Array(schema) | Schema::Map(schema) => { |
| Self::from_internal(schema, names, enclosing_namespace) |
| } |
| Schema::Union(UnionSchema { schemas, .. }) => { |
| for schema in schemas { |
| Self::from_internal(schema, names, enclosing_namespace)? |
| } |
| Ok(()) |
| } |
| Schema::Enum { name, .. } | Schema::Fixed { name, .. } => { |
| let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); |
| if names |
| .insert(fully_qualified_name.clone(), schema.clone()) |
| .is_some() |
| { |
| Err(Error::AmbiguousSchemaDefinition(fully_qualified_name)) |
| } else { |
| Ok(()) |
| } |
| } |
| Schema::Record { name, fields, .. } => { |
| let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); |
| if names |
| .insert(fully_qualified_name.clone(), schema.clone()) |
| .is_some() |
| { |
| Err(Error::AmbiguousSchemaDefinition(fully_qualified_name)) |
| } else { |
| let record_namespace = fully_qualified_name.namespace; |
| for field in fields { |
| Self::from_internal(&field.schema, names, &record_namespace)? |
| } |
| Ok(()) |
| } |
| } |
| Schema::Ref { name } => { |
| let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); |
| names |
| .get(&fully_qualified_name) |
| .map(|_| ()) |
| .ok_or(Error::SchemaResolutionError(fully_qualified_name)) |
| } |
| _ => Ok(()), |
| } |
| } |
| } |
| |
| /// Represents a `field` in a `record` Avro schema. |
| #[derive(Clone, Debug, PartialEq)] |
| pub struct RecordField { |
| /// Name of the field. |
| pub name: String, |
| /// Documentation of the field. |
| pub doc: Documentation, |
| /// Default value of the field. |
| /// This value will be used when reading Avro datum if schema resolution |
| /// is enabled. |
| pub default: Option<Value>, |
| /// Schema of the field. |
| pub schema: Schema, |
| /// Order of the field. |
| /// |
| /// **NOTE** This currently has no effect. |
| pub order: RecordFieldOrder, |
| /// Position of the field in the list of `field` of its parent `Schema` |
| pub position: usize, |
| } |
| |
| /// Represents any valid order for a `field` in a `record` Avro schema. |
| #[derive(Clone, Debug, PartialEq, EnumString)] |
| #[strum(serialize_all = "kebab_case")] |
| pub enum RecordFieldOrder { |
| Ascending, |
| Descending, |
| Ignore, |
| } |
| |
| impl RecordField { |
| /// Parse a `serde_json::Value` into a `RecordField`. |
| fn parse( |
| field: &Map<String, Value>, |
| position: usize, |
| parser: &mut Parser, |
| enclosing_namespace: &Namespace, |
| ) -> AvroResult<Self> { |
| let name = field.name().ok_or(Error::GetNameFieldFromRecord)?; |
| |
| // TODO: "type" = "<record name>" |
| let schema = parser.parse_complex(field, enclosing_namespace)?; |
| |
| let default = field.get("default").cloned(); |
| |
| let order = field |
| .get("order") |
| .and_then(|order| order.as_str()) |
| .and_then(|order| RecordFieldOrder::from_str(order).ok()) |
| .unwrap_or(RecordFieldOrder::Ascending); |
| |
| Ok(RecordField { |
| name, |
| doc: field.doc(), |
| default, |
| schema, |
| order, |
| position, |
| }) |
| } |
| } |
| |
| #[derive(Debug, Clone)] |
| pub struct UnionSchema { |
| pub(crate) schemas: Vec<Schema>, |
| // Used to ensure uniqueness of schema inputs, and provide constant time finding of the |
| // schema index given a value. |
| // **NOTE** that this approach does not work for named types, and will have to be modified |
| // to support that. A simple solution is to also keep a mapping of the names used. |
| variant_index: BTreeMap<SchemaKind, usize>, |
| } |
| |
| impl UnionSchema { |
| /// Creates a new UnionSchema from a vector of schemas. |
| pub fn new(schemas: Vec<Schema>) -> AvroResult<Self> { |
| let mut vindex = BTreeMap::new(); |
| for (i, schema) in schemas.iter().enumerate() { |
| if let Schema::Union(_) = schema { |
| return Err(Error::GetNestedUnion); |
| } |
| let kind = SchemaKind::from(schema); |
| if !kind.is_named() && vindex.insert(kind, i).is_some() { |
| return Err(Error::GetUnionDuplicate); |
| } |
| } |
| Ok(UnionSchema { |
| schemas, |
| variant_index: vindex, |
| }) |
| } |
| |
| /// Returns a slice to all variants of this schema. |
| pub fn variants(&self) -> &[Schema] { |
| &self.schemas |
| } |
| |
| /// Returns true if the first variant of this `UnionSchema` is `Null`. |
| pub fn is_nullable(&self) -> bool { |
| !self.schemas.is_empty() && self.schemas[0] == Schema::Null |
| } |
| |
| /// Optionally returns a reference to the schema matched by this value, as well as its position |
| /// within this union. |
| pub fn find_schema(&self, value: &types::Value) -> Option<(usize, &Schema)> { |
| let schema_kind = SchemaKind::from(value); |
| if let Some(&i) = self.variant_index.get(&schema_kind) { |
| // fast path |
| Some((i, &self.schemas[i])) |
| } else { |
| // slow path (required for matching logical or named types) |
| self.schemas |
| .iter() |
| .enumerate() |
| .find(|(_, schema)| value.validate(schema)) |
| } |
| } |
| } |
| |
| // No need to compare variant_index, it is derivative of schemas. |
| impl PartialEq for UnionSchema { |
| fn eq(&self, other: &UnionSchema) -> bool { |
| self.schemas.eq(&other.schemas) |
| } |
| } |
| |
| type DecimalMetadata = usize; |
| pub(crate) type Precision = DecimalMetadata; |
| pub(crate) type Scale = DecimalMetadata; |
| |
| fn parse_json_integer_for_decimal(value: &serde_json::Number) -> Result<DecimalMetadata, Error> { |
| Ok(if value.is_u64() { |
| let num = value |
| .as_u64() |
| .ok_or_else(|| Error::GetU64FromJson(value.clone()))?; |
| num.try_into() |
| .map_err(|e| Error::ConvertU64ToUsize(e, num))? |
| } else if value.is_i64() { |
| let num = value |
| .as_i64() |
| .ok_or_else(|| Error::GetI64FromJson(value.clone()))?; |
| num.try_into() |
| .map_err(|e| Error::ConvertI64ToUsize(e, num))? |
| } else { |
| return Err(Error::GetPrecisionOrScaleFromJson(value.clone())); |
| }) |
| } |
| |
| #[derive(Default)] |
| struct Parser { |
| input_schemas: HashMap<Name, Value>, |
| // A map of name -> Schema::Ref |
| // Used to resolve cyclic references, i.e. when a |
| // field's type is a reference to its record's type |
| resolving_schemas: Names, |
| input_order: Vec<Name>, |
| // A map of name -> fully parsed Schema |
| // Used to avoid parsing the same schema twice |
| parsed_schemas: Names, |
| } |
| |
| impl Schema { |
| /// Converts `self` into its [Parsing Canonical Form]. |
| /// |
| /// [Parsing Canonical Form]: |
| /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas |
| pub fn canonical_form(&self) -> String { |
| let json = serde_json::to_value(self) |
| .unwrap_or_else(|e| panic!("Cannot parse Schema from JSON: {0}", e)); |
| parsing_canonical_form(&json) |
| } |
| |
| /// Generate [fingerprint] of Schema's [Parsing Canonical Form]. |
| /// |
| /// [Parsing Canonical Form]: |
| /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas |
| /// [fingerprint]: |
| /// https://avro.apache.org/docs/current/spec.html#schema_fingerprints |
| pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint { |
| let mut d = D::new(); |
| d.update(self.canonical_form()); |
| SchemaFingerprint { |
| bytes: d.finalize().to_vec(), |
| } |
| } |
| |
| /// Create a `Schema` from a string representing a JSON Avro schema. |
| pub fn parse_str(input: &str) -> Result<Schema, Error> { |
| let mut parser = Parser::default(); |
| parser.parse_str(input) |
| } |
| |
| /// Create a array of `Schema`'s from a list of named JSON Avro schemas (Record, Enum, and |
| /// Fixed). |
| /// |
| /// It is allowed that the schemas have cross-dependencies; these will be resolved |
| /// during parsing. |
| /// |
| /// If two of the input schemas have the same fullname, an Error will be returned. |
| pub fn parse_list(input: &[&str]) -> Result<Vec<Schema>, Error> { |
| let mut input_schemas: HashMap<Name, Value> = HashMap::with_capacity(input.len()); |
| let mut input_order: Vec<Name> = Vec::with_capacity(input.len()); |
| for js in input { |
| let schema: Value = serde_json::from_str(js).map_err(Error::ParseSchemaJson)?; |
| if let Value::Object(inner) = &schema { |
| let name = Name::parse(inner)?; |
| let previous_value = input_schemas.insert(name.clone(), schema); |
| if previous_value.is_some() { |
| return Err(Error::NameCollision(name.fullname(None))); |
| } |
| input_order.push(name); |
| } else { |
| return Err(Error::GetNameField); |
| } |
| } |
| let mut parser = Parser { |
| input_schemas, |
| resolving_schemas: HashMap::default(), |
| input_order, |
| parsed_schemas: HashMap::with_capacity(input.len()), |
| }; |
| parser.parse_list() |
| } |
| |
| pub fn parse(value: &Value) -> AvroResult<Schema> { |
| let mut parser = Parser::default(); |
| parser.parse(value, &None) |
| } |
| } |
| |
| impl Parser { |
| /// Create a `Schema` from a string representing a JSON Avro schema. |
| fn parse_str(&mut self, input: &str) -> Result<Schema, Error> { |
| let value = serde_json::from_str(input).map_err(Error::ParseSchemaJson)?; |
| self.parse(&value, &None) |
| } |
| |
| /// Create an array of `Schema`'s from an iterator of JSON Avro schemas. It is allowed that |
| /// the schemas have cross-dependencies; these will be resolved during parsing. |
| fn parse_list(&mut self) -> Result<Vec<Schema>, Error> { |
| while !self.input_schemas.is_empty() { |
| let next_name = self |
| .input_schemas |
| .keys() |
| .next() |
| .expect("Input schemas unexpectedly empty") |
| .to_owned(); |
| let (name, value) = self |
| .input_schemas |
| .remove_entry(&next_name) |
| .expect("Key unexpectedly missing"); |
| let parsed = self.parse(&value, &None)?; |
| self.parsed_schemas |
| .insert(get_schema_type_name(name, value), parsed); |
| } |
| |
| let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len()); |
| for name in self.input_order.drain(0..) { |
| let parsed = self |
| .parsed_schemas |
| .remove(&name) |
| .expect("One of the input schemas was unexpectedly not parsed"); |
| parsed_schemas.push(parsed); |
| } |
| Ok(parsed_schemas) |
| } |
| |
| /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro |
| /// schema. |
| fn parse(&mut self, value: &Value, enclosing_namespace: &Namespace) -> AvroResult<Schema> { |
| match *value { |
| Value::String(ref t) => self.parse_known_schema(t.as_str(), enclosing_namespace), |
| Value::Object(ref data) => self.parse_complex(data, enclosing_namespace), |
| Value::Array(ref data) => self.parse_union(data, enclosing_namespace), |
| _ => Err(Error::ParseSchemaFromValidJson), |
| } |
| } |
| |
| /// Parse a `serde_json::Value` representing an Avro type whose Schema is known into a |
| /// `Schema`. A Schema for a `serde_json::Value` is known if it is primitive or has |
| /// been parsed previously by the parsed and stored in its map of parsed_schemas. |
| fn parse_known_schema( |
| &mut self, |
| name: &str, |
| enclosing_namespace: &Namespace, |
| ) -> AvroResult<Schema> { |
| match name { |
| "null" => Ok(Schema::Null), |
| "boolean" => Ok(Schema::Boolean), |
| "int" => Ok(Schema::Int), |
| "long" => Ok(Schema::Long), |
| "double" => Ok(Schema::Double), |
| "float" => Ok(Schema::Float), |
| "bytes" => Ok(Schema::Bytes), |
| "string" => Ok(Schema::String), |
| _ => self.fetch_schema_ref(name, enclosing_namespace), |
| } |
| } |
| |
| /// Given a name, tries to retrieve the parsed schema from `parsed_schemas`. |
| /// If a parsed schema is not found, it checks if a currently resolving |
| /// schema with that name exists. |
| /// If a resolving schema is not found, it checks if a json with that name exists |
| /// in `input_schemas` and then parses it (removing it from `input_schemas`) |
| /// and adds the parsed schema to `parsed_schemas`. |
| /// |
| /// This method allows schemas definitions that depend on other types to |
| /// parse their dependencies (or look them up if already parsed). |
| fn fetch_schema_ref( |
| &mut self, |
| name: &str, |
| enclosing_namespace: &Namespace, |
| ) -> AvroResult<Schema> { |
| fn get_schema_ref(parsed: &Schema) -> Schema { |
| match &parsed { |
| Schema::Record { ref name, .. } |
| | Schema::Enum { ref name, .. } |
| | Schema::Fixed { ref name, .. } => Schema::Ref { name: name.clone() }, |
| _ => parsed.clone(), |
| } |
| } |
| |
| let name = Name::new(name)?; |
| let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); |
| |
| if self.parsed_schemas.get(&fully_qualified_name).is_some() { |
| return Ok(Schema::Ref { name }); |
| } |
| if let Some(resolving_schema) = self.resolving_schemas.get(&fully_qualified_name) { |
| return Ok(resolving_schema.clone()); |
| } |
| |
| let value = self |
| .input_schemas |
| .remove(&fully_qualified_name) |
| // TODO make a better descriptive error message here that conveys that a named schema cannot be found |
| .ok_or_else(|| Error::ParsePrimitive(fully_qualified_name.fullname(None)))?; |
| |
| // parsing a full schema from inside another schema. Other full schema will not inherit namespace |
| let parsed = self.parse(&value, &None)?; |
| self.parsed_schemas |
| .insert(get_schema_type_name(name, value), parsed.clone()); |
| |
| Ok(get_schema_ref(&parsed)) |
| } |
| |
| fn parse_precision_and_scale( |
| complex: &Map<String, Value>, |
| ) -> Result<(Precision, Scale), Error> { |
| fn get_decimal_integer( |
| complex: &Map<String, Value>, |
| key: &'static str, |
| ) -> Result<DecimalMetadata, Error> { |
| match complex.get(key) { |
| Some(&Value::Number(ref value)) => parse_json_integer_for_decimal(value), |
| None => { |
| if key == "scale" { |
| Ok(0) |
| } else { |
| Err(Error::GetDecimalMetadataFromJson(key)) |
| } |
| } |
| Some(value) => Err(Error::GetDecimalMetadataValueFromJson { |
| key: key.into(), |
| value: value.clone(), |
| }), |
| } |
| } |
| let precision = get_decimal_integer(complex, "precision")?; |
| let scale = get_decimal_integer(complex, "scale")?; |
| |
| if precision < 1 { |
| return Err(Error::DecimalPrecisionMuBePositive { precision }); |
| } |
| |
| if precision < scale { |
| Err(Error::DecimalPrecisionLessThanScale { precision, scale }) |
| } else { |
| Ok((precision, scale)) |
| } |
| } |
| |
| /// Parse a `serde_json::Value` representing a complex Avro type into a |
| /// `Schema`. |
| /// |
| /// Avro supports "recursive" definition of types. |
| /// e.g: {"type": {"type": "string"}} |
| fn parse_complex( |
| &mut self, |
| complex: &Map<String, Value>, |
| enclosing_namespace: &Namespace, |
| ) -> AvroResult<Schema> { |
| fn logical_verify_type( |
| complex: &Map<String, Value>, |
| kinds: &[SchemaKind], |
| parser: &mut Parser, |
| enclosing_namespace: &Namespace, |
| ) -> AvroResult<Schema> { |
| match complex.get("type") { |
| Some(value) => { |
| let ty = parser.parse(value, enclosing_namespace)?; |
| |
| if kinds |
| .iter() |
| .any(|&kind| SchemaKind::from(ty.clone()) == kind) |
| { |
| Ok(ty) |
| } else { |
| match get_type_rec(value.clone()) { |
| Ok(v) => Err(Error::GetLogicalTypeVariant(v)), |
| Err(err) => Err(err), |
| } |
| } |
| } |
| None => Err(Error::GetLogicalTypeField), |
| } |
| } |
| |
| fn get_type_rec(json_value: Value) -> AvroResult<Value> { |
| match json_value { |
| typ @ Value::String(_) => Ok(typ), |
| Value::Object(ref complex) => match complex.get("type") { |
| Some(v) => get_type_rec(v.clone()), |
| None => Err(Error::GetComplexTypeField), |
| }, |
| _ => Err(Error::GetComplexTypeField), |
| } |
| } |
| |
| // checks whether the logicalType is supported by the type |
| fn try_logical_type( |
| logical_type: &str, |
| complex: &Map<String, Value>, |
| kinds: &[SchemaKind], |
| ok_schema: Schema, |
| parser: &mut Parser, |
| enclosing_namespace: &Namespace, |
| ) -> AvroResult<Schema> { |
| match logical_verify_type(complex, kinds, parser, enclosing_namespace) { |
| // type and logicalType match! |
| Ok(_) => Ok(ok_schema), |
| // the logicalType is not expected for this type! |
| Err(Error::GetLogicalTypeVariant(json_value)) => match json_value { |
| Value::String(_) => match parser.parse(&json_value, enclosing_namespace) { |
| Ok(schema) => { |
| warn!( |
| "Ignoring invalid logical type '{}' for schema of type: {:?}!", |
| logical_type, schema |
| ); |
| Ok(schema) |
| } |
| Err(parse_err) => Err(parse_err), |
| }, |
| _ => Err(Error::GetLogicalTypeVariant(json_value)), |
| }, |
| err => err, |
| } |
| } |
| |
| match complex.get("logicalType") { |
| Some(&Value::String(ref t)) => match t.as_str() { |
| "decimal" => { |
| let inner = Box::new(logical_verify_type( |
| complex, |
| &[SchemaKind::Fixed, SchemaKind::Bytes], |
| self, |
| enclosing_namespace, |
| )?); |
| |
| let (precision, scale) = Self::parse_precision_and_scale(complex)?; |
| |
| return Ok(Schema::Decimal { |
| precision, |
| scale, |
| inner, |
| }); |
| } |
| "uuid" => { |
| logical_verify_type(complex, &[SchemaKind::String], self, enclosing_namespace)?; |
| return Ok(Schema::Uuid); |
| } |
| "date" => { |
| return try_logical_type( |
| "date", |
| complex, |
| &[SchemaKind::Int], |
| Schema::Date, |
| self, |
| enclosing_namespace, |
| ); |
| } |
| "time-millis" => { |
| return try_logical_type( |
| "time-millis", |
| complex, |
| &[SchemaKind::Int], |
| Schema::TimeMillis, |
| self, |
| enclosing_namespace, |
| ); |
| } |
| "time-micros" => { |
| return try_logical_type( |
| "time-micros", |
| complex, |
| &[SchemaKind::Long], |
| Schema::TimeMicros, |
| self, |
| enclosing_namespace, |
| ); |
| } |
| "timestamp-millis" => { |
| return try_logical_type( |
| "timestamp-millis", |
| complex, |
| &[SchemaKind::Long], |
| Schema::TimestampMillis, |
| self, |
| enclosing_namespace, |
| ); |
| } |
| "timestamp-micros" => { |
| return try_logical_type( |
| "timestamp-micros", |
| complex, |
| &[SchemaKind::Long], |
| Schema::TimestampMicros, |
| self, |
| enclosing_namespace, |
| ); |
| } |
| "duration" => { |
| logical_verify_type(complex, &[SchemaKind::Fixed], self, enclosing_namespace)?; |
| return Ok(Schema::Duration); |
| } |
| // In this case, of an unknown logical type, we just pass through to the underlying |
| // type. |
| _ => {} |
| }, |
| // The spec says to ignore invalid logical types and just continue through to the |
| // underlying type - It is unclear whether that applies to this case or not, where the |
| // `logicalType` is not a string. |
| Some(_) => return Err(Error::GetLogicalTypeFieldType), |
| _ => {} |
| } |
| match complex.get("type") { |
| Some(&Value::String(ref t)) => match t.as_str() { |
| "record" => self.parse_record(complex, enclosing_namespace), |
| "enum" => self.parse_enum(complex, enclosing_namespace), |
| "array" => self.parse_array(complex, enclosing_namespace), |
| "map" => self.parse_map(complex, enclosing_namespace), |
| "fixed" => self.parse_fixed(complex, enclosing_namespace), |
| other => self.parse_known_schema(other, enclosing_namespace), |
| }, |
| Some(&Value::Object(ref data)) => self.parse_complex(data, enclosing_namespace), |
| Some(&Value::Array(ref variants)) => self.parse_union(variants, enclosing_namespace), |
| Some(unknown) => Err(Error::GetComplexType(unknown.clone())), |
| None => Err(Error::GetComplexTypeField), |
| } |
| } |
| |
| fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) { |
| let resolving_schema = Schema::Ref { name: name.clone() }; |
| self.resolving_schemas |
| .insert(name.clone(), resolving_schema.clone()); |
| |
| let namespace = &name.namespace; |
| |
| if let Some(ref aliases) = aliases { |
| aliases.iter().for_each(|alias| { |
| let alias_fullname = alias.fully_qualified_name(namespace); |
| self.resolving_schemas |
| .insert(alias_fullname, resolving_schema.clone()); |
| }); |
| } |
| } |
| |
| fn register_parsed_schema( |
| &mut self, |
| fully_qualified_name: &Name, |
| schema: &Schema, |
| aliases: &Aliases, |
| ) { |
| // FIXME, this should be globally aware, so if there is something overwriting something |
| // else then there is an ambiguous schema definition. An appropriate error should be thrown |
| self.parsed_schemas |
| .insert(fully_qualified_name.clone(), schema.clone()); |
| self.resolving_schemas.remove(fully_qualified_name); |
| |
| let namespace = &fully_qualified_name.namespace; |
| |
| if let Some(ref aliases) = aliases { |
| aliases.iter().for_each(|alias| { |
| let alias_fullname = alias.fully_qualified_name(namespace); |
| self.resolving_schemas.remove(&alias_fullname); |
| self.parsed_schemas.insert(alias_fullname, schema.clone()); |
| }); |
| } |
| } |
| |
| /// Returns already parsed schema or a schema that is currently being resolved. |
| fn get_already_seen_schema( |
| &self, |
| complex: &Map<String, Value>, |
| enclosing_namespace: &Namespace, |
| ) -> Option<&Schema> { |
| match complex.get("type") { |
| Some(Value::String(ref typ)) => { |
| let name = Name::new(typ.as_str()) |
| .unwrap() |
| .fully_qualified_name(enclosing_namespace); |
| self.resolving_schemas |
| .get(&name) |
| .or_else(|| self.parsed_schemas.get(&name)) |
| } |
| _ => None, |
| } |
| } |
| |
| /// Parse a `serde_json::Value` representing a Avro record type into a |
| /// `Schema`. |
| fn parse_record( |
| &mut self, |
| complex: &Map<String, Value>, |
| enclosing_namespace: &Namespace, |
| ) -> AvroResult<Schema> { |
| let fields_opt = complex.get("fields"); |
| |
| if fields_opt.is_none() { |
| if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) { |
| return Ok(seen.clone()); |
| } |
| } |
| |
| let name = Name::parse(complex)?; |
| let aliases = fix_aliases_namespace(complex.aliases(), &name.namespace); |
| |
| let mut lookup = BTreeMap::new(); |
| let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); |
| self.register_resolving_schema(&fully_qualified_name, &aliases); |
| |
| let fields: Vec<RecordField> = fields_opt |
| .and_then(|fields| fields.as_array()) |
| .ok_or(Error::GetRecordFieldsJson) |
| .and_then(|fields| { |
| fields |
| .iter() |
| .filter_map(|field| field.as_object()) |
| .enumerate() |
| .map(|(position, field)| { |
| RecordField::parse(field, position, self, &fully_qualified_name.namespace) |
| }) |
| .collect::<Result<_, _>>() |
| })?; |
| |
| for field in &fields { |
| lookup.insert(field.name.clone(), field.position); |
| } |
| |
| let schema = Schema::Record { |
| name, |
| aliases: aliases.clone(), |
| doc: complex.doc(), |
| fields, |
| lookup, |
| }; |
| |
| self.register_parsed_schema(&fully_qualified_name, &schema, &aliases); |
| Ok(schema) |
| } |
| |
| /// Parse a `serde_json::Value` representing a Avro enum type into a |
| /// `Schema`. |
| fn parse_enum( |
| &mut self, |
| complex: &Map<String, Value>, |
| enclosing_namespace: &Namespace, |
| ) -> AvroResult<Schema> { |
| let symbols_opt = complex.get("symbols"); |
| |
| if symbols_opt.is_none() { |
| if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) { |
| return Ok(seen.clone()); |
| } |
| } |
| |
| let name = Name::parse(complex)?; |
| let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); |
| let aliases = fix_aliases_namespace(complex.aliases(), &name.namespace); |
| |
| let symbols: Vec<String> = symbols_opt |
| .and_then(|v| v.as_array()) |
| .ok_or(Error::GetEnumSymbolsField) |
| .and_then(|symbols| { |
| symbols |
| .iter() |
| .map(|symbol| symbol.as_str().map(|s| s.to_string())) |
| .collect::<Option<_>>() |
| .ok_or(Error::GetEnumSymbols) |
| })?; |
| |
| let mut existing_symbols: HashSet<&String> = HashSet::with_capacity(symbols.len()); |
| for symbol in symbols.iter() { |
| // Ensure enum symbol names match [A-Za-z_][A-Za-z0-9_]* |
| if !ENUM_SYMBOL_NAME_R.is_match(symbol) { |
| return Err(Error::EnumSymbolName(symbol.to_string())); |
| } |
| |
| // Ensure there are no duplicate symbols |
| if existing_symbols.contains(&symbol) { |
| return Err(Error::EnumSymbolDuplicate(symbol.to_string())); |
| } |
| |
| existing_symbols.insert(symbol); |
| } |
| |
| let schema = Schema::Enum { |
| name, |
| aliases: aliases.clone(), |
| doc: complex.doc(), |
| symbols, |
| }; |
| |
| self.register_parsed_schema(&fully_qualified_name, &schema, &aliases); |
| |
| Ok(schema) |
| } |
| |
| /// Parse a `serde_json::Value` representing a Avro array type into a |
| /// `Schema`. |
| fn parse_array( |
| &mut self, |
| complex: &Map<String, Value>, |
| enclosing_namespace: &Namespace, |
| ) -> AvroResult<Schema> { |
| complex |
| .get("items") |
| .ok_or(Error::GetArrayItemsField) |
| .and_then(|items| self.parse(items, enclosing_namespace)) |
| .map(|schema| Schema::Array(Box::new(schema))) |
| } |
| |
| /// Parse a `serde_json::Value` representing a Avro map type into a |
| /// `Schema`. |
| fn parse_map( |
| &mut self, |
| complex: &Map<String, Value>, |
| enclosing_namespace: &Namespace, |
| ) -> AvroResult<Schema> { |
| complex |
| .get("values") |
| .ok_or(Error::GetMapValuesField) |
| .and_then(|items| self.parse(items, enclosing_namespace)) |
| .map(|schema| Schema::Map(Box::new(schema))) |
| } |
| |
| /// Parse a `serde_json::Value` representing a Avro union type into a |
| /// `Schema`. |
| fn parse_union( |
| &mut self, |
| items: &[Value], |
| enclosing_namespace: &Namespace, |
| ) -> AvroResult<Schema> { |
| items |
| .iter() |
| .map(|v| self.parse(v, enclosing_namespace)) |
| .collect::<Result<Vec<_>, _>>() |
| .and_then(|schemas| Ok(Schema::Union(UnionSchema::new(schemas)?))) |
| } |
| |
| /// Parse a `serde_json::Value` representing a Avro fixed type into a |
| /// `Schema`. |
| fn parse_fixed( |
| &mut self, |
| complex: &Map<String, Value>, |
| enclosing_namespace: &Namespace, |
| ) -> AvroResult<Schema> { |
| let size_opt = complex.get("size"); |
| if size_opt.is_none() { |
| if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) { |
| return Ok(seen.clone()); |
| } |
| } |
| |
| let doc = complex.get("doc").and_then(|v| match &v { |
| Value::String(ref docstr) => Some(docstr.clone()), |
| _ => None, |
| }); |
| |
| let size = size_opt |
| .and_then(|v| v.as_i64()) |
| .ok_or(Error::GetFixedSizeField)?; |
| |
| let name = Name::parse(complex)?; |
| let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); |
| let aliases = fix_aliases_namespace(complex.aliases(), &name.namespace); |
| |
| let schema = Schema::Fixed { |
| name, |
| aliases: aliases.clone(), |
| doc, |
| size: size as usize, |
| }; |
| |
| self.register_parsed_schema(&fully_qualified_name, &schema, &aliases); |
| |
| Ok(schema) |
| } |
| } |
| |
| // A type alias may be specified either as a fully namespace-qualified, or relative |
| // to the namespace of the name it is an alias for. For example, if a type named "a.b" |
| // has aliases of "c" and "x.y", then the fully qualified names of its aliases are "a.c" |
| // and "x.y". |
| // https://avro.apache.org/docs/current/spec.html#Aliases |
| fn fix_aliases_namespace(aliases: Option<Vec<String>>, namespace: &Namespace) -> Aliases { |
| aliases.map(|aliases| { |
| aliases |
| .iter() |
| .map(|alias| { |
| if alias.find('.').is_none() { |
| match namespace { |
| Some(ref ns) => format!("{}.{}", ns, alias), |
| None => alias.clone(), |
| } |
| } else { |
| alias.clone() |
| } |
| }) |
| .map(|alias| Alias::new(alias.as_str()).unwrap()) |
| .collect() |
| }) |
| } |
| |
| fn get_schema_type_name(name: Name, value: Value) -> Name { |
| match value.get("type") { |
| Some(Value::Object(complex_type)) => match complex_type.name() { |
| Some(name) => Name::new(name.as_str()).unwrap(), |
| _ => name, |
| }, |
| _ => name, |
| } |
| } |
| |
| impl Serialize for Schema { |
| fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> |
| where |
| S: Serializer, |
| { |
| match *self { |
| Schema::Ref { ref name } => serializer.serialize_str(&name.fullname(None)), |
| Schema::Null => serializer.serialize_str("null"), |
| Schema::Boolean => serializer.serialize_str("boolean"), |
| Schema::Int => serializer.serialize_str("int"), |
| Schema::Long => serializer.serialize_str("long"), |
| Schema::Float => serializer.serialize_str("float"), |
| Schema::Double => serializer.serialize_str("double"), |
| Schema::Bytes => serializer.serialize_str("bytes"), |
| Schema::String => serializer.serialize_str("string"), |
| Schema::Array(ref inner) => { |
| let mut map = serializer.serialize_map(Some(2))?; |
| map.serialize_entry("type", "array")?; |
| map.serialize_entry("items", &*inner.clone())?; |
| map.end() |
| } |
| Schema::Map(ref inner) => { |
| let mut map = serializer.serialize_map(Some(2))?; |
| map.serialize_entry("type", "map")?; |
| map.serialize_entry("values", &*inner.clone())?; |
| map.end() |
| } |
| Schema::Union(ref inner) => { |
| let variants = inner.variants(); |
| let mut seq = serializer.serialize_seq(Some(variants.len()))?; |
| for v in variants { |
| seq.serialize_element(v)?; |
| } |
| seq.end() |
| } |
| Schema::Record { |
| ref name, |
| ref aliases, |
| ref doc, |
| ref fields, |
| .. |
| } => { |
| let mut map = serializer.serialize_map(None)?; |
| map.serialize_entry("type", "record")?; |
| if let Some(ref n) = name.namespace { |
| map.serialize_entry("namespace", n)?; |
| } |
| map.serialize_entry("name", &name.name)?; |
| if let Some(ref docstr) = doc { |
| map.serialize_entry("doc", docstr)?; |
| } |
| if let Some(ref aliases) = aliases { |
| map.serialize_entry("aliases", aliases)?; |
| } |
| map.serialize_entry("fields", fields)?; |
| map.end() |
| } |
| Schema::Enum { |
| ref name, |
| ref symbols, |
| ref aliases, |
| .. |
| } => { |
| let mut map = serializer.serialize_map(None)?; |
| map.serialize_entry("type", "enum")?; |
| if let Some(ref n) = name.namespace { |
| map.serialize_entry("namespace", n)?; |
| } |
| map.serialize_entry("name", &name.name)?; |
| map.serialize_entry("symbols", symbols)?; |
| |
| if let Some(ref aliases) = aliases { |
| map.serialize_entry("aliases", aliases)?; |
| } |
| map.end() |
| } |
| Schema::Fixed { |
| ref name, |
| ref doc, |
| ref size, |
| ref aliases, |
| .. |
| } => { |
| let mut map = serializer.serialize_map(None)?; |
| map.serialize_entry("type", "fixed")?; |
| if let Some(ref n) = name.namespace { |
| map.serialize_entry("namespace", n)?; |
| } |
| map.serialize_entry("name", &name.name)?; |
| if let Some(ref docstr) = doc { |
| map.serialize_entry("doc", docstr)?; |
| } |
| map.serialize_entry("size", size)?; |
| |
| if let Some(ref aliases) = aliases { |
| map.serialize_entry("aliases", aliases)?; |
| } |
| map.end() |
| } |
| Schema::Decimal { |
| ref scale, |
| ref precision, |
| ref inner, |
| } => { |
| let mut map = serializer.serialize_map(None)?; |
| map.serialize_entry("type", &*inner.clone())?; |
| map.serialize_entry("logicalType", "decimal")?; |
| map.serialize_entry("scale", scale)?; |
| map.serialize_entry("precision", precision)?; |
| map.end() |
| } |
| Schema::Uuid => { |
| let mut map = serializer.serialize_map(None)?; |
| map.serialize_entry("type", "string")?; |
| map.serialize_entry("logicalType", "uuid")?; |
| map.end() |
| } |
| Schema::Date => { |
| let mut map = serializer.serialize_map(None)?; |
| map.serialize_entry("type", "int")?; |
| map.serialize_entry("logicalType", "date")?; |
| map.end() |
| } |
| Schema::TimeMillis => { |
| let mut map = serializer.serialize_map(None)?; |
| map.serialize_entry("type", "int")?; |
| map.serialize_entry("logicalType", "time-millis")?; |
| map.end() |
| } |
| Schema::TimeMicros => { |
| let mut map = serializer.serialize_map(None)?; |
| map.serialize_entry("type", "long")?; |
| map.serialize_entry("logicalType", "time-micros")?; |
| map.end() |
| } |
| Schema::TimestampMillis => { |
| let mut map = serializer.serialize_map(None)?; |
| map.serialize_entry("type", "long")?; |
| map.serialize_entry("logicalType", "timestamp-millis")?; |
| map.end() |
| } |
| Schema::TimestampMicros => { |
| let mut map = serializer.serialize_map(None)?; |
| map.serialize_entry("type", "long")?; |
| map.serialize_entry("logicalType", "timestamp-micros")?; |
| map.end() |
| } |
| Schema::Duration => { |
| let mut map = serializer.serialize_map(None)?; |
| |
| // the Avro doesn't indicate what the name of the underlying fixed type of a |
| // duration should be or typically is. |
| let inner = Schema::Fixed { |
| name: Name::new("duration").unwrap(), |
| aliases: None, |
| doc: None, |
| size: 12, |
| }; |
| map.serialize_entry("type", &inner)?; |
| map.serialize_entry("logicalType", "duration")?; |
| map.end() |
| } |
| } |
| } |
| } |
| |
| impl Serialize for RecordField { |
| fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> |
| where |
| S: Serializer, |
| { |
| let mut map = serializer.serialize_map(None)?; |
| map.serialize_entry("name", &self.name)?; |
| map.serialize_entry("type", &self.schema)?; |
| |
| if let Some(ref default) = self.default { |
| map.serialize_entry("default", default)?; |
| } |
| |
| map.end() |
| } |
| } |
| |
| /// Parses a **valid** avro schema into the Parsing Canonical Form. |
| /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas |
| fn parsing_canonical_form(schema: &serde_json::Value) -> String { |
| match schema { |
| serde_json::Value::Object(map) => pcf_map(map), |
| serde_json::Value::String(s) => pcf_string(s), |
| serde_json::Value::Array(v) => pcf_array(v), |
| json => panic!( |
| "got invalid JSON value for canonical form of schema: {0}", |
| json |
| ), |
| } |
| } |
| |
| fn pcf_map(schema: &Map<String, serde_json::Value>) -> String { |
| // Look for the namespace variant up front. |
| let ns = schema.get("namespace").and_then(|v| v.as_str()); |
| let mut fields = Vec::new(); |
| for (k, v) in schema { |
| // Reduce primitive types to their simple form. ([PRIMITIVE] rule) |
| if schema.len() == 1 && k == "type" { |
| // Invariant: function is only callable from a valid schema, so this is acceptable. |
| if let serde_json::Value::String(s) = v { |
| return pcf_string(s); |
| } |
| } |
| |
| // Strip out unused fields ([STRIP] rule) |
| if field_ordering_position(k).is_none() || k == "default" || k == "doc" || k == "aliases" { |
| continue; |
| } |
| |
| // Fully qualify the name, if it isn't already ([FULLNAMES] rule). |
| if k == "name" { |
| // Invariant: Only valid schemas. Must be a string. |
| let name = v.as_str().unwrap(); |
| let n = match ns { |
| Some(namespace) if !name.contains('.') => { |
| Cow::Owned(format!("{}.{}", namespace, name)) |
| } |
| _ => Cow::Borrowed(name), |
| }; |
| |
| fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n)))); |
| continue; |
| } |
| |
| // Strip off quotes surrounding "size" type, if they exist ([INTEGERS] rule). |
| if k == "size" || k == "precision" || k == "scale" { |
| let i = match v.as_str() { |
| Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"), |
| None => v.as_i64().unwrap(), |
| }; |
| fields.push((k, format!("{}:{}", pcf_string(k), i))); |
| continue; |
| } |
| |
| // For anything else, recursively process the result. |
| fields.push(( |
| k, |
| format!("{}:{}", pcf_string(k), parsing_canonical_form(v)), |
| )); |
| } |
| |
| // Sort the fields by their canonical ordering ([ORDER] rule). |
| fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap()); |
| let inter = fields |
| .into_iter() |
| .map(|(_, v)| v) |
| .collect::<Vec<_>>() |
| .join(","); |
| format!("{{{}}}", inter) |
| } |
| |
| fn pcf_array(arr: &[serde_json::Value]) -> String { |
| let inter = arr |
| .iter() |
| .map(parsing_canonical_form) |
| .collect::<Vec<String>>() |
| .join(","); |
| format!("[{}]", inter) |
| } |
| |
| fn pcf_string(s: &str) -> String { |
| format!("\"{}\"", s) |
| } |
| |
| const RESERVED_FIELDS: &[&str] = &[ |
| "name", |
| "type", |
| "fields", |
| "symbols", |
| "items", |
| "values", |
| "size", |
| "logicalType", |
| "order", |
| "doc", |
| "aliases", |
| "default", |
| "precision", |
| "scale", |
| ]; |
| |
| // Used to define the ordering and inclusion of fields. |
| fn field_ordering_position(field: &str) -> Option<usize> { |
| RESERVED_FIELDS |
| .iter() |
| .position(|&f| f == field) |
| .map(|pos| pos + 1) |
| } |
| |
| /// Trait for types that serve as an Avro data model. Derive implementation available |
| /// through `derive` feature. Do not implement directly! |
| /// Implement `apache_avro::schema::derive::AvroSchemaComponent` to get this trait |
| /// through a blanket implementation. |
| pub trait AvroSchema { |
| fn get_schema() -> Schema; |
| } |
| |
| #[cfg(feature = "derive")] |
| pub mod derive { |
| use super::*; |
| |
| /// Trait for types that serve as fully defined components inside an Avro data model. Derive |
| /// implementation available through `derive` feature. This is what is implemented by |
| /// the `derive(AvroSchema)` macro. |
| /// |
| /// # Implementation guide |
| /// |
| ///### Simple implementation |
| /// To construct a non named simple schema, it is possible to ignore the input argument making the |
| /// general form implementation look like |
| /// ```ignore |
| /// impl AvroSchemaComponent for AType { |
| /// fn get_schema_in_ctxt(_: &mut Names, _: &Namespace) -> Schema { |
| /// Schema::? |
| /// } |
| ///} |
| /// ``` |
| /// ### Passthrough implementation |
| /// To construct a schema for a Type that acts as in "inner" type, such as for smart pointers, simply |
| /// pass through the arguments to the inner type |
| /// ```ignore |
| /// impl AvroSchemaComponent for PassthroughType { |
| /// fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema { |
| /// InnerType::get_schema_in_ctxt(names, enclosing_namespace) |
| /// } |
| ///} |
| /// ``` |
| ///### Complex implementation |
| /// To implement this for Named schema there is a general form needed to avoid creating invalid |
| /// schemas or infinite loops. |
| /// ```ignore |
| /// impl AvroSchemaComponent for ComplexType { |
| /// fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) -> Schema { |
| /// // Create the fully qualified name for your type given the enclosing namespace |
| /// let name = apache_avro::schema::Name::new("MyName") |
| /// .expect("Unable to parse schema name") |
| /// .fully_qualified_name(enclosing_namespace); |
| /// let enclosing_namespace = &name.namespace; |
| /// // Check, if your name is already defined, and if so, return a ref to that name |
| /// if named_schemas.contains_key(&name) { |
| /// apache_avro::schema::Schema::Ref{name: name.clone()} |
| /// } else { |
| /// named_schemas.insert(name.clone(), apache_avro::schema::Schema::Ref{name: name.clone()}); |
| /// // YOUR SCHEMA DEFINITION HERE with the name equivalent to "MyName". |
| /// // For non-simple sub types delegate to their implementation of AvroSchemaComponent |
| /// } |
| /// } |
| ///} |
| /// ``` |
| pub trait AvroSchemaComponent { |
| fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace) |
| -> Schema; |
| } |
| |
| impl<T> AvroSchema for T |
| where |
| T: AvroSchemaComponent, |
| { |
| fn get_schema() -> Schema { |
| T::get_schema_in_ctxt(&mut HashMap::default(), &Option::None) |
| } |
| } |
| |
| macro_rules! impl_schema( |
| ($type:ty, $variant_constructor:expr) => ( |
| impl AvroSchemaComponent for $type { |
| fn get_schema_in_ctxt(_: &mut Names, _: &Namespace) -> Schema { |
| $variant_constructor |
| } |
| } |
| ); |
| ); |
| |
| impl_schema!(i8, Schema::Int); |
| impl_schema!(i16, Schema::Int); |
| impl_schema!(i32, Schema::Int); |
| impl_schema!(i64, Schema::Long); |
| impl_schema!(u8, Schema::Int); |
| impl_schema!(u16, Schema::Int); |
| impl_schema!(u32, Schema::Long); |
| impl_schema!(f32, Schema::Float); |
| impl_schema!(f64, Schema::Double); |
| impl_schema!(String, Schema::String); |
| impl_schema!(uuid::Uuid, Schema::Uuid); |
| impl_schema!(core::time::Duration, Schema::Duration); |
| |
| impl<T> AvroSchemaComponent for Vec<T> |
| where |
| T: AvroSchemaComponent, |
| { |
| fn get_schema_in_ctxt( |
| named_schemas: &mut Names, |
| enclosing_namespace: &Namespace, |
| ) -> Schema { |
| Schema::Array(Box::new(T::get_schema_in_ctxt( |
| named_schemas, |
| enclosing_namespace, |
| ))) |
| } |
| } |
| |
| impl<T> AvroSchemaComponent for Option<T> |
| where |
| T: AvroSchemaComponent, |
| { |
| fn get_schema_in_ctxt( |
| named_schemas: &mut Names, |
| enclosing_namespace: &Namespace, |
| ) -> Schema { |
| let inner_schema = T::get_schema_in_ctxt(named_schemas, enclosing_namespace); |
| Schema::Union(UnionSchema { |
| schemas: vec![Schema::Null, inner_schema.clone()], |
| variant_index: vec![Schema::Null, inner_schema] |
| .iter() |
| .enumerate() |
| .map(|(idx, s)| (SchemaKind::from(s), idx)) |
| .collect(), |
| }) |
| } |
| } |
| |
| impl<T> AvroSchemaComponent for Map<String, T> |
| where |
| T: AvroSchemaComponent, |
| { |
| fn get_schema_in_ctxt( |
| named_schemas: &mut Names, |
| enclosing_namespace: &Namespace, |
| ) -> Schema { |
| Schema::Map(Box::new(T::get_schema_in_ctxt( |
| named_schemas, |
| enclosing_namespace, |
| ))) |
| } |
| } |
| |
| impl<T> AvroSchemaComponent for HashMap<String, T> |
| where |
| T: AvroSchemaComponent, |
| { |
| fn get_schema_in_ctxt( |
| named_schemas: &mut Names, |
| enclosing_namespace: &Namespace, |
| ) -> Schema { |
| Schema::Map(Box::new(T::get_schema_in_ctxt( |
| named_schemas, |
| enclosing_namespace, |
| ))) |
| } |
| } |
| |
| impl<T> AvroSchemaComponent for Box<T> |
| where |
| T: AvroSchemaComponent, |
| { |
| fn get_schema_in_ctxt( |
| named_schemas: &mut Names, |
| enclosing_namespace: &Namespace, |
| ) -> Schema { |
| T::get_schema_in_ctxt(named_schemas, enclosing_namespace) |
| } |
| } |
| |
| impl<T> AvroSchemaComponent for std::sync::Mutex<T> |
| where |
| T: AvroSchemaComponent, |
| { |
| fn get_schema_in_ctxt( |
| named_schemas: &mut Names, |
| enclosing_namespace: &Namespace, |
| ) -> Schema { |
| T::get_schema_in_ctxt(named_schemas, enclosing_namespace) |
| } |
| } |
| |
| impl<T> AvroSchemaComponent for Cow<'_, T> |
| where |
| T: AvroSchemaComponent + Clone, |
| { |
| fn get_schema_in_ctxt( |
| named_schemas: &mut Names, |
| enclosing_namespace: &Namespace, |
| ) -> Schema { |
| T::get_schema_in_ctxt(named_schemas, enclosing_namespace) |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use pretty_assertions::assert_eq; |
| |
| #[test] |
| fn test_invalid_schema() { |
| assert!(Schema::parse_str("invalid").is_err()); |
| } |
| |
| #[test] |
| fn test_primitive_schema() { |
| assert_eq!(Schema::Null, Schema::parse_str("\"null\"").unwrap()); |
| assert_eq!(Schema::Int, Schema::parse_str("\"int\"").unwrap()); |
| assert_eq!(Schema::Double, Schema::parse_str("\"double\"").unwrap()); |
| } |
| |
| #[test] |
| fn test_array_schema() { |
| let schema = Schema::parse_str(r#"{"type": "array", "items": "string"}"#).unwrap(); |
| assert_eq!(Schema::Array(Box::new(Schema::String)), schema); |
| } |
| |
| #[test] |
| fn test_map_schema() { |
| let schema = Schema::parse_str(r#"{"type": "map", "values": "double"}"#).unwrap(); |
| assert_eq!(Schema::Map(Box::new(Schema::Double)), schema); |
| } |
| |
| #[test] |
| fn test_union_schema() { |
| let schema = Schema::parse_str(r#"["null", "int"]"#).unwrap(); |
| assert_eq!( |
| Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()), |
| schema |
| ); |
| } |
| |
| #[test] |
| fn test_union_unsupported_schema() { |
| let schema = Schema::parse_str(r#"["null", ["null", "int"], "string"]"#); |
| assert!(schema.is_err()); |
| } |
| |
| #[test] |
| fn test_multi_union_schema() { |
| let schema = Schema::parse_str(r#"["null", "int", "float", "string", "bytes"]"#); |
| assert!(schema.is_ok()); |
| let schema = schema.unwrap(); |
| assert_eq!(SchemaKind::from(&schema), SchemaKind::Union); |
| let union_schema = match schema { |
| Schema::Union(u) => u, |
| _ => unreachable!(), |
| }; |
| assert_eq!(union_schema.variants().len(), 5); |
| let mut variants = union_schema.variants().iter(); |
| assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Null); |
| assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Int); |
| assert_eq!( |
| SchemaKind::from(variants.next().unwrap()), |
| SchemaKind::Float |
| ); |
| assert_eq!( |
| SchemaKind::from(variants.next().unwrap()), |
| SchemaKind::String |
| ); |
| assert_eq!( |
| SchemaKind::from(variants.next().unwrap()), |
| SchemaKind::Bytes |
| ); |
| assert_eq!(variants.next(), None); |
| } |
| |
| // AVRO-3248 |
| #[test] |
| fn test_union_of_records() { |
| use std::iter::FromIterator; |
| |
| // A and B are the same except the name. |
| let schema_str_a = r#"{ |
| "name": "A", |
| "type": "record", |
| "fields": [ |
| {"name": "field_one", "type": "float"} |
| ] |
| }"#; |
| |
| let schema_str_b = r#"{ |
| "name": "B", |
| "type": "record", |
| "fields": [ |
| {"name": "field_one", "type": "float"} |
| ] |
| }"#; |
| |
| // we get Error::GetNameField if we put ["A", "B"] directly here. |
| let schema_str_c = r#"{ |
| "name": "C", |
| "type": "record", |
| "fields": [ |
| {"name": "field_one", "type": ["A", "B"]} |
| ] |
| }"#; |
| |
| let schema_c = Schema::parse_list(&[schema_str_a, schema_str_b, schema_str_c]) |
| .unwrap() |
| .last() |
| .unwrap() |
| .clone(); |
| |
| let schema_c_expected = Schema::Record { |
| name: Name::new("C").unwrap(), |
| aliases: None, |
| doc: None, |
| fields: vec![RecordField { |
| name: "field_one".to_string(), |
| doc: None, |
| default: None, |
| schema: Schema::Union( |
| UnionSchema::new(vec![ |
| Schema::Ref { |
| name: Name::new("A").unwrap(), |
| }, |
| Schema::Ref { |
| name: Name::new("B").unwrap(), |
| }, |
| ]) |
| .unwrap(), |
| ), |
| order: RecordFieldOrder::Ignore, |
| position: 0, |
| }], |
| lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]), |
| }; |
| |
| assert_eq!(schema_c, schema_c_expected); |
| } |
| |
| // AVRO-3584 : recursion in type definitions |
| #[test] |
| fn avro_3584_test_recursion_records() { |
| // A and B are the same except the name. |
| let schema_str_a = r#"{ |
| "name": "A", |
| "type": "record", |
| "fields": [ {"name": "field_one", "type": "B"} ] |
| }"#; |
| |
| let schema_str_b = r#"{ |
| "name": "B", |
| "type": "record", |
| "fields": [ {"name": "field_one", "type": "A"} ] |
| }"#; |
| |
| let list = Schema::parse_list(&[schema_str_a, schema_str_b]).unwrap(); |
| |
| let schema_a = list.first().unwrap().clone(); |
| |
| match schema_a { |
| Schema::Record { fields, .. } => { |
| let f1 = fields.get(0); |
| |
| let ref_schema = Schema::Ref { |
| name: Name::new("B").unwrap(), |
| }; |
| assert_eq!(ref_schema, f1.unwrap().schema); |
| } |
| _ => panic!("Expected a record schema!"), |
| } |
| } |
| |
| // AVRO-3248 |
| #[test] |
| fn test_nullable_record() { |
| use std::iter::FromIterator; |
| |
| let schema_str_a = r#"{ |
| "name": "A", |
| "type": "record", |
| "fields": [ |
| {"name": "field_one", "type": "float"} |
| ] |
| }"#; |
| |
| // we get Error::GetNameField if we put ["null", "B"] directly here. |
| let schema_str_option_a = r#"{ |
| "name": "OptionA", |
| "type": "record", |
| "fields": [ |
| {"name": "field_one", "type": ["null", "A"], "default": "null"} |
| ] |
| }"#; |
| |
| let schema_option_a = Schema::parse_list(&[schema_str_a, schema_str_option_a]) |
| .unwrap() |
| .last() |
| .unwrap() |
| .clone(); |
| |
| let schema_option_a_expected = Schema::Record { |
| name: Name::new("OptionA").unwrap(), |
| aliases: None, |
| doc: None, |
| fields: vec![RecordField { |
| name: "field_one".to_string(), |
| doc: None, |
| default: Some(Value::String("null".to_string())), |
| schema: Schema::Union( |
| UnionSchema::new(vec![ |
| Schema::Null, |
| Schema::Ref { |
| name: Name::new("A").unwrap(), |
| }, |
| ]) |
| .unwrap(), |
| ), |
| order: RecordFieldOrder::Ignore, |
| position: 0, |
| }], |
| lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]), |
| }; |
| |
| assert_eq!(schema_option_a, schema_option_a_expected); |
| } |
| |
| #[test] |
| fn test_record_schema() { |
| let parsed = Schema::parse_str( |
| r#" |
| { |
| "type": "record", |
| "name": "test", |
| "fields": [ |
| {"name": "a", "type": "long", "default": 42}, |
| {"name": "b", "type": "string"} |
| ] |
| } |
| "#, |
| ) |
| .unwrap(); |
| |
| let mut lookup = BTreeMap::new(); |
| lookup.insert("a".to_owned(), 0); |
| lookup.insert("b".to_owned(), 1); |
| |
| let expected = Schema::Record { |
| name: Name::new("test").unwrap(), |
| aliases: None, |
| doc: None, |
| fields: vec![ |
| RecordField { |
| name: "a".to_string(), |
| doc: None, |
| default: Some(Value::Number(42i64.into())), |
| schema: Schema::Long, |
| order: RecordFieldOrder::Ascending, |
| position: 0, |
| }, |
| RecordField { |
| name: "b".to_string(), |
| doc: None, |
| default: None, |
| schema: Schema::String, |
| order: RecordFieldOrder::Ascending, |
| position: 1, |
| }, |
| ], |
| lookup, |
| }; |
| |
| assert_eq!(parsed, expected); |
| } |
| |
| // AVRO-3302 |
| #[test] |
| fn test_record_schema_with_currently_parsing_schema() { |
| let schema = Schema::parse_str( |
| r#" |
| { |
| "type": "record", |
| "name": "test", |
| "fields": [{ |
| "name": "recordField", |
| "type": { |
| "type": "record", |
| "name": "Node", |
| "fields": [ |
| {"name": "label", "type": "string"}, |
| {"name": "children", "type": {"type": "array", "items": "Node"}} |
| ] |
| } |
| }] |
| } |
| "#, |
| ) |
| .unwrap(); |
| |
| let mut lookup = BTreeMap::new(); |
| lookup.insert("recordField".to_owned(), 0); |
| |
| let mut node_lookup = BTreeMap::new(); |
| node_lookup.insert("children".to_owned(), 1); |
| node_lookup.insert("label".to_owned(), 0); |
| |
| let expected = Schema::Record { |
| name: Name::new("test").unwrap(), |
| aliases: None, |
| doc: None, |
| fields: vec![RecordField { |
| name: "recordField".to_string(), |
| doc: None, |
| default: None, |
| schema: Schema::Record { |
| name: Name::new("Node").unwrap(), |
| aliases: None, |
| doc: None, |
| fields: vec![ |
| RecordField { |
| name: "label".to_string(), |
| doc: None, |
| default: None, |
| schema: Schema::String, |
| order: RecordFieldOrder::Ascending, |
| position: 0, |
| }, |
| RecordField { |
| name: "children".to_string(), |
| doc: None, |
| default: None, |
| schema: Schema::Array(Box::new(Schema::Ref { |
| name: Name::new("Node").unwrap(), |
| })), |
| order: RecordFieldOrder::Ascending, |
| position: 1, |
| }, |
| ], |
| lookup: node_lookup, |
| }, |
| order: RecordFieldOrder::Ascending, |
| position: 0, |
| }], |
| lookup, |
| }; |
| assert_eq!(schema, expected); |
| |
| let canonical_form = &schema.canonical_form(); |
| let expected = r#"{"name":"test","type":"record","fields":[{"name":"recordField","type":{"name":"Node","type":"record","fields":[{"name":"label","type":"string"},{"name":"children","type":{"type":"array","items":"Node"}}]}}]}"#; |
| assert_eq!(canonical_form, &expected); |
| } |
| |
| // https://github.com/flavray/avro-rs/pull/99#issuecomment-1016948451 |
| #[test] |
| fn test_parsing_of_recursive_type_enum() { |
| let schema = r#" |
| { |
| "type": "record", |
| "name": "User", |
| "namespace": "office", |
| "fields": [ |
| { |
| "name": "details", |
| "type": [ |
| { |
| "type": "record", |
| "name": "Employee", |
| "fields": [ |
| { |
| "name": "gender", |
| "type": { |
| "type": "enum", |
| "name": "Gender", |
| "symbols": [ |
| "male", |
| "female" |
| ] |
| }, |
| "default": "female" |
| } |
| ] |
| }, |
| { |
| "type": "record", |
| "name": "Manager", |
| "fields": [ |
| { |
| "name": "gender", |
| "type": "Gender" |
| } |
| ] |
| } |
| ] |
| } |
| ] |
| } |
| "#; |
| |
| let schema = Schema::parse_str(schema).unwrap(); |
| let schema_str = schema.canonical_form(); |
| let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"Employee","type":"record","fields":[{"name":"gender","type":{"name":"Gender","type":"enum","symbols":["male","female"]}}]},{"name":"Manager","type":"record","fields":[{"name":"gender","type":"Gender"}]}]}]}"#; |
| assert_eq!(schema_str, expected); |
| } |
| |
| #[test] |
| fn test_parsing_of_recursive_type_fixed() { |
| let schema = r#" |
| { |
| "type": "record", |
| "name": "User", |
| "namespace": "office", |
| "fields": [ |
| { |
| "name": "details", |
| "type": [ |
| { |
| "type": "record", |
| "name": "Employee", |
| "fields": [ |
| { |
| "name": "id", |
| "type": { |
| "type": "fixed", |
| "name": "EmployeeId", |
| "size": 16 |
| }, |
| "default": "female" |
| } |
| ] |
| }, |
| { |
| "type": "record", |
| "name": "Manager", |
| "fields": [ |
| { |
| "name": "id", |
| "type": "EmployeeId" |
| } |
| ] |
| } |
| ] |
| } |
| ] |
| } |
| "#; |
| |
| let schema = Schema::parse_str(schema).unwrap(); |
| let schema_str = schema.canonical_form(); |
| let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"Employee","type":"record","fields":[{"name":"id","type":{"name":"EmployeeId","type":"fixed","size":16}}]},{"name":"Manager","type":"record","fields":[{"name":"id","type":"EmployeeId"}]}]}]}"#; |
| assert_eq!(schema_str, expected); |
| } |
| |
| // AVRO-3302 |
| #[test] |
| fn test_record_schema_with_currently_parsing_schema_aliases() { |
| let schema = Schema::parse_str( |
| r#" |
| { |
| "type": "record", |
| "name": "LongList", |
| "aliases": ["LinkedLongs"], |
| "fields" : [ |
| {"name": "value", "type": "long"}, |
| {"name": "next", "type": ["null", "LinkedLongs"]} |
| ] |
| } |
| "#, |
| ) |
| .unwrap(); |
| |
| let mut lookup = BTreeMap::new(); |
| lookup.insert("value".to_owned(), 0); |
| lookup.insert("next".to_owned(), 1); |
| |
| let expected = Schema::Record { |
| name: Name { |
| name: "LongList".to_owned(), |
| namespace: None, |
| }, |
| aliases: Some(vec![Alias::new("LinkedLongs").unwrap()]), |
| doc: None, |
| fields: vec![ |
| RecordField { |
| name: "value".to_string(), |
| doc: None, |
| default: None, |
| schema: Schema::Long, |
| order: RecordFieldOrder::Ascending, |
| position: 0, |
| }, |
| RecordField { |
| name: "next".to_string(), |
| doc: None, |
| default: None, |
| schema: Schema::Union( |
| UnionSchema::new(vec![ |
| Schema::Null, |
| Schema::Ref { |
| name: Name { |
| name: "LongList".to_owned(), |
| namespace: None, |
| }, |
| }, |
| ]) |
| .unwrap(), |
| ), |
| order: RecordFieldOrder::Ascending, |
| position: 1, |
| }, |
| ], |
| lookup, |
| }; |
| assert_eq!(schema, expected); |
| |
| let canonical_form = &schema.canonical_form(); |
| let expected = r#"{"name":"LongList","type":"record","fields":[{"name":"value","type":"long"},{"name":"next","type":["null","LongList"]}]}"#; |
| assert_eq!(canonical_form, &expected); |
| } |
| |
| // AVRO-3370 |
| #[test] |
| fn test_record_schema_with_currently_parsing_schema_named_record() { |
| let schema = Schema::parse_str( |
| r#" |
| { |
| "type" : "record", |
| "name" : "record", |
| "fields" : [ |
| { "name" : "value", "type" : "long" }, |
| { "name" : "next", "type" : "record" } |
| ] |
| } |
| "#, |
| ) |
| .unwrap(); |
| |
| let mut lookup = BTreeMap::new(); |
| lookup.insert("value".to_owned(), 0); |
| lookup.insert("next".to_owned(), 1); |
| |
| let expected = Schema::Record { |
| name: Name { |
| name: "record".to_owned(), |
| namespace: None, |
| }, |
| aliases: None, |
| doc: None, |
| fields: vec![ |
| RecordField { |
| name: "value".to_string(), |
| doc: None, |
| default: None, |
| schema: Schema::Long, |
| order: RecordFieldOrder::Ascending, |
| position: 0, |
| }, |
| RecordField { |
| name: "next".to_string(), |
| doc: None, |
| default: None, |
| schema: Schema::Ref { |
| name: Name { |
| name: "record".to_owned(), |
| namespace: None, |
| }, |
| }, |
| order: RecordFieldOrder::Ascending, |
| position: 1, |
| }, |
| ], |
| lookup, |
| }; |
| assert_eq!(schema, expected); |
| |
| let canonical_form = &schema.canonical_form(); |
| let expected = r#"{"name":"record","type":"record","fields":[{"name":"value","type":"long"},{"name":"next","type":"record"}]}"#; |
| assert_eq!(canonical_form, &expected); |
| } |
| |
| // AVRO-3370 |
| #[test] |
| fn test_record_schema_with_currently_parsing_schema_named_enum() { |
| let schema = Schema::parse_str( |
| r#" |
| { |
| "type" : "record", |
| "name" : "record", |
| "fields" : [ |
| { |
| "type" : "enum", |
| "name" : "enum", |
| "symbols": ["one", "two", "three"] |
| }, |
| { "name" : "next", "type" : "enum" } |
| ] |
| } |
| "#, |
| ) |
| .unwrap(); |
| |
| let mut lookup = BTreeMap::new(); |
| lookup.insert("enum".to_owned(), 0); |
| lookup.insert("next".to_owned(), 1); |
| |
| let expected = Schema::Record { |
| name: Name { |
| name: "record".to_owned(), |
| namespace: None, |
| }, |
| aliases: None, |
| doc: None, |
| fields: vec![ |
| RecordField { |
| name: "enum".to_string(), |
| doc: None, |
| default: None, |
| schema: Schema::Enum { |
| name: Name { |
| name: "enum".to_owned(), |
| namespace: None, |
| }, |
| aliases: None, |
| doc: None, |
| symbols: vec!["one".to_string(), "two".to_string(), "three".to_string()], |
| }, |
| order: RecordFieldOrder::Ascending, |
| position: 0, |
| }, |
| RecordField { |
| name: "next".to_string(), |
| doc: None, |
| default: None, |
| schema: Schema::Enum { |
| name: Name { |
| name: "enum".to_owned(), |
| namespace: None, |
| }, |
| aliases: None, |
| doc: None, |
| symbols: vec!["one".to_string(), "two".to_string(), "three".to_string()], |
| }, |
| order: RecordFieldOrder::Ascending, |
| position: 1, |
| }, |
| ], |
| lookup, |
| }; |
| assert_eq!(schema, expected); |
| |
| let canonical_form = &schema.canonical_form(); |
| let expected = r#"{"name":"record","type":"record","fields":[{"name":"enum","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}},{"name":"next","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}}]}"#; |
| assert_eq!(canonical_form, &expected); |
| } |
| |
| // AVRO-3370 |
| #[test] |
| fn test_record_schema_with_currently_parsing_schema_named_fixed() { |
| let schema = Schema::parse_str( |
| r#" |
| { |
| "type" : "record", |
| "name" : "record", |
| "fields" : [ |
| { |
| "type" : "fixed", |
| "name" : "fixed", |
| "size": 456 |
| }, |
| { "name" : "next", "type" : "fixed" } |
| ] |
| } |
| "#, |
| ) |
| .unwrap(); |
| |
| let mut lookup = BTreeMap::new(); |
| lookup.insert("fixed".to_owned(), 0); |
| lookup.insert("next".to_owned(), 1); |
| |
| let expected = Schema::Record { |
| name: Name { |
| name: "record".to_owned(), |
| namespace: None, |
| }, |
| aliases: None, |
| doc: None, |
| fields: vec![ |
| RecordField { |
| name: "fixed".to_string(), |
| doc: None, |
| default: None, |
| schema: Schema::Fixed { |
| name: Name { |
| name: "fixed".to_owned(), |
| namespace: None, |
| }, |
| aliases: None, |
| doc: None, |
| size: 456, |
| }, |
| order: RecordFieldOrder::Ascending, |
| position: 0, |
| }, |
| RecordField { |
| name: "next".to_string(), |
| doc: None, |
| default: None, |
| schema: Schema::Fixed { |
| name: Name { |
| name: "fixed".to_owned(), |
| namespace: None, |
| }, |
| aliases: None, |
| doc: None, |
| size: 456, |
| }, |
| order: RecordFieldOrder::Ascending, |
| position: 1, |
| }, |
| ], |
| lookup, |
| }; |
| assert_eq!(schema, expected); |
| |
| let canonical_form = &schema.canonical_form(); |
| let expected = r#"{"name":"record","type":"record","fields":[{"name":"fixed","type":{"name":"fixed","type":"fixed","size":456}},{"name":"next","type":{"name":"fixed","type":"fixed","size":456}}]}"#; |
| assert_eq!(canonical_form, &expected); |
| } |
| |
| #[test] |
| fn test_enum_schema() { |
| let schema = Schema::parse_str( |
| r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "hearts"]}"#, |
| ).unwrap(); |
| |
| let expected = Schema::Enum { |
| name: Name::new("Suit").unwrap(), |
| aliases: None, |
| doc: None, |
| symbols: vec![ |
| "diamonds".to_owned(), |
| "spades".to_owned(), |
| "clubs".to_owned(), |
| "hearts".to_owned(), |
| ], |
| }; |
| |
| assert_eq!(expected, schema); |
| } |
| |
| #[test] |
| fn test_enum_schema_duplicate() { |
| // Duplicate "diamonds" |
| let schema = Schema::parse_str( |
| r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "diamonds"]}"#, |
| ); |
| assert!(schema.is_err()); |
| } |
| |
| #[test] |
| fn test_enum_schema_name() { |
| // Invalid name "0000" does not match [A-Za-z_][A-Za-z0-9_]* |
| let schema = Schema::parse_str( |
| r#"{"type": "enum", "name": "Enum", "symbols": ["0000", "variant"]}"#, |
| ); |
| assert!(schema.is_err()); |
| } |
| |
| #[test] |
| fn test_fixed_schema() { |
| let schema = Schema::parse_str(r#"{"type": "fixed", "name": "test", "size": 16}"#).unwrap(); |
| |
| let expected = Schema::Fixed { |
| name: Name::new("test").unwrap(), |
| aliases: None, |
| doc: None, |
| size: 16usize, |
| }; |
| |
| assert_eq!(expected, schema); |
| } |
| |
| #[test] |
| fn test_fixed_schema_with_documentation() { |
| let schema = Schema::parse_str( |
| r#"{"type": "fixed", "name": "test", "size": 16, "doc": "FixedSchema documentation"}"#, |
| ) |
| .unwrap(); |
| |
| let expected = Schema::Fixed { |
| name: Name::new("test").unwrap(), |
| aliases: None, |
| doc: Some(String::from("FixedSchema documentation")), |
| size: 16usize, |
| }; |
| |
| assert_eq!(expected, schema); |
| } |
| |
| #[test] |
| fn test_no_documentation() { |
| let schema = |
| Schema::parse_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#) |
| .unwrap(); |
| |
| let doc = match schema { |
| Schema::Enum { doc, .. } => doc, |
| _ => return, |
| }; |
| |
| assert!(doc.is_none()); |
| } |
| |
| #[test] |
| fn test_documentation() { |
| let schema = Schema::parse_str( |
| r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"# |
| ).unwrap(); |
| |
| let doc = match schema { |
| Schema::Enum { doc, .. } => doc, |
| _ => None, |
| }; |
| |
| assert_eq!("Some documentation".to_owned(), doc.unwrap()); |
| } |
| |
| // Tests to ensure Schema is Send + Sync. These tests don't need to _do_ anything, if they can |
| // compile, they pass. |
| #[test] |
| fn test_schema_is_send() { |
| fn send<S: Send>(_s: S) {} |
| |
| let schema = Schema::Null; |
| send(schema); |
| } |
| |
| #[test] |
| fn test_schema_is_sync() { |
| fn sync<S: Sync>(_s: S) {} |
| |
| let schema = Schema::Null; |
| sync(&schema); |
| sync(schema); |
| } |
| |
| #[test] |
| #[cfg_attr(miri, ignore)] // Sha256 uses an inline assembly instructions which is not supported by miri |
| fn test_schema_fingerprint() { |
| use crate::rabin::Rabin; |
| use md5::Md5; |
| use sha2::Sha256; |
| |
| let raw_schema = r#" |
| { |
| "type": "record", |
| "name": "test", |
| "fields": [ |
| {"name": "a", "type": "long", "default": 42}, |
| {"name": "b", "type": "string"}, |
| {"name": "c", "type": "long", "logicalType": "timestamp-micros"} |
| ] |
| } |
| "#; |
| |
| let schema = Schema::parse_str(raw_schema).unwrap(); |
| assert_eq!( |
| "abf662f831715ff78f88545a05a9262af75d6406b54e1a8a174ff1d2b75affc4", |
| format!("{}", schema.fingerprint::<Sha256>()) |
| ); |
| |
| assert_eq!( |
| "6e21c350f71b1a34e9efe90970f1bc69", |
| format!("{}", schema.fingerprint::<Md5>()) |
| ); |
| assert_eq!( |
| "28cf0a67d9937bb3", |
| format!("{}", schema.fingerprint::<Rabin>()) |
| ) |
| } |
| |
| #[test] |
| fn test_logical_types() { |
| let schema = Schema::parse_str(r#"{"type": "int", "logicalType": "date"}"#).unwrap(); |
| assert_eq!(schema, Schema::Date); |
| |
| let schema = |
| Schema::parse_str(r#"{"type": "long", "logicalType": "timestamp-micros"}"#).unwrap(); |
| assert_eq!(schema, Schema::TimestampMicros); |
| } |
| |
| #[test] |
| fn test_nullable_logical_type() { |
| let schema = Schema::parse_str( |
| r#"{"type": ["null", {"type": "long", "logicalType": "timestamp-micros"}]}"#, |
| ) |
| .unwrap(); |
| assert_eq!( |
| schema, |
| Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::TimestampMicros]).unwrap()) |
| ); |
| } |
| |
| #[test] |
| fn record_field_order_from_str() { |
| use std::str::FromStr; |
| |
| assert_eq!( |
| RecordFieldOrder::from_str("ascending").unwrap(), |
| RecordFieldOrder::Ascending |
| ); |
| assert_eq!( |
| RecordFieldOrder::from_str("descending").unwrap(), |
| RecordFieldOrder::Descending |
| ); |
| assert_eq!( |
| RecordFieldOrder::from_str("ignore").unwrap(), |
| RecordFieldOrder::Ignore |
| ); |
| assert!(RecordFieldOrder::from_str("not an ordering").is_err()); |
| } |
| |
| /// AVRO-3374 |
| #[test] |
| fn test_avro_3374_preserve_namespace_for_primitive() { |
| let schema = Schema::parse_str( |
| r#" |
| { |
| "type" : "record", |
| "name" : "ns.int", |
| "fields" : [ |
| {"name" : "value", "type" : "int"}, |
| {"name" : "next", "type" : [ "null", "ns.int" ]} |
| ] |
| } |
| "#, |
| ) |
| .unwrap(); |
| |
| let json = schema.canonical_form(); |
| assert_eq!( |
| json, |
| r#"{"name":"ns.int","type":"record","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","ns.int"]}]}"# |
| ); |
| } |
| |
| #[test] |
| fn test_avro_3433_preserve_schema_refs_in_json() { |
| let schema = r#" |
| { |
| "name": "test.test", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "bar", |
| "type": { "name": "test.foo", "type": "record", "fields": [{ "name": "id", "type": "long" }] } |
| }, |
| { "name": "baz", "type": "test.foo" } |
| ] |
| } |
| "#; |
| |
| let schema = Schema::parse_str(schema).unwrap(); |
| |
| let expected = r#"{"name":"test.test","type":"record","fields":[{"name":"bar","type":{"name":"test.foo","type":"record","fields":[{"name":"id","type":"long"}]}},{"name":"baz","type":"test.foo"}]}"#; |
| assert_eq!(schema.canonical_form(), expected); |
| } |
| |
| #[test] |
| fn test_read_namespace_from_name() { |
| let schema = r#" |
| { |
| "name": "space.name", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "num", |
| "type": "int" |
| } |
| ] |
| } |
| "#; |
| |
| let schema = Schema::parse_str(schema).unwrap(); |
| if let Schema::Record { name, .. } = schema { |
| assert_eq!(name.name, "name"); |
| assert_eq!(name.namespace, Some("space".to_string())); |
| } else { |
| panic!("Expected a record schema!"); |
| } |
| } |
| |
| #[test] |
| fn test_namespace_from_name_has_priority_over_from_field() { |
| let schema = r#" |
| { |
| "name": "space1.name", |
| "namespace": "space2", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "num", |
| "type": "int" |
| } |
| ] |
| } |
| "#; |
| |
| let schema = Schema::parse_str(schema).unwrap(); |
| if let Schema::Record { name, .. } = schema { |
| assert_eq!(name.namespace, Some("space1".to_string())); |
| } else { |
| panic!("Expected a record schema!"); |
| } |
| } |
| |
| #[test] |
| fn test_namespace_from_field() { |
| let schema = r#" |
| { |
| "name": "name", |
| "namespace": "space2", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "num", |
| "type": "int" |
| } |
| ] |
| } |
| "#; |
| |
| let schema = Schema::parse_str(schema).unwrap(); |
| if let Schema::Record { name, .. } = schema { |
| assert_eq!(name.namespace, Some("space2".to_string())); |
| } else { |
| panic!("Expected a record schema!"); |
| } |
| } |
| |
| #[test] |
| /// Zero-length namespace is considered as no-namespace. |
| fn test_namespace_from_name_with_empty_value() { |
| let name = Name::new(".name").unwrap(); |
| assert_eq!(name.name, "name"); |
| assert_eq!(name.namespace, None); |
| } |
| |
| #[test] |
| /// Whitespace is not allowed in the name. |
| fn test_name_with_whitespace_value() { |
| match Name::new(" ") { |
| Err(Error::InvalidSchemaName(_, _)) => {} |
| _ => panic!("Expected an Error::InvalidSchemaName!"), |
| } |
| } |
| |
| #[test] |
| /// The name must be non-empty. |
| fn test_name_with_no_name_part() { |
| match Name::new("space.") { |
| Err(Error::InvalidSchemaName(_, _)) => {} |
| _ => panic!("Expected an Error::InvalidSchemaName!"), |
| } |
| } |
| |
| #[test] |
| fn avro_3448_test_proper_resolution_inner_record_inherited_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": [ |
| "null", |
| { |
| "type":"record", |
| "name":"inner_record_name", |
| "fields":[ |
| { |
| "name":"inner_field_1", |
| "type":"double" |
| } |
| ] |
| } |
| ] |
| }, |
| { |
| "name": "outer_field_2", |
| "type" : "inner_record_name" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| assert_eq!(rs.get_names().len(), 2); |
| for s in &["space.record_name", "space.inner_record_name"] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| } |
| |
| #[test] |
| fn avro_3448_test_proper_resolution_inner_record_qualified_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": [ |
| "null", |
| { |
| "type":"record", |
| "name":"inner_record_name", |
| "fields":[ |
| { |
| "name":"inner_field_1", |
| "type":"double" |
| } |
| ] |
| } |
| ] |
| }, |
| { |
| "name": "outer_field_2", |
| "type" : "space.inner_record_name" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| assert_eq!(rs.get_names().len(), 2); |
| for s in &["space.record_name", "space.inner_record_name"] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| } |
| |
| #[test] |
| fn avro_3448_test_proper_resolution_inner_enum_inherited_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": [ |
| "null", |
| { |
| "type":"enum", |
| "name":"inner_enum_name", |
| "symbols":["Extensive","Testing"] |
| } |
| ] |
| }, |
| { |
| "name": "outer_field_2", |
| "type" : "inner_enum_name" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| assert_eq!(rs.get_names().len(), 2); |
| for s in &["space.record_name", "space.inner_enum_name"] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| } |
| |
| #[test] |
| fn avro_3448_test_proper_resolution_inner_enum_qualified_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": [ |
| "null", |
| { |
| "type":"enum", |
| "name":"inner_enum_name", |
| "symbols":["Extensive","Testing"] |
| } |
| ] |
| }, |
| { |
| "name": "outer_field_2", |
| "type" : "space.inner_enum_name" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| assert_eq!(rs.get_names().len(), 2); |
| for s in &["space.record_name", "space.inner_enum_name"] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| } |
| |
| #[test] |
| fn avro_3448_test_proper_resolution_inner_fixed_inherited_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": [ |
| "null", |
| { |
| "type":"fixed", |
| "name":"inner_fixed_name", |
| "size": 16 |
| } |
| ] |
| }, |
| { |
| "name": "outer_field_2", |
| "type" : "inner_fixed_name" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| assert_eq!(rs.get_names().len(), 2); |
| for s in &["space.record_name", "space.inner_fixed_name"] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| } |
| |
| #[test] |
| fn avro_3448_test_proper_resolution_inner_fixed_qualified_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": [ |
| "null", |
| { |
| "type":"fixed", |
| "name":"inner_fixed_name", |
| "size": 16 |
| } |
| ] |
| }, |
| { |
| "name": "outer_field_2", |
| "type" : "space.inner_fixed_name" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| assert_eq!(rs.get_names().len(), 2); |
| for s in &["space.record_name", "space.inner_fixed_name"] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| } |
| |
| #[test] |
| fn avro_3448_test_proper_resolution_inner_record_inner_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": [ |
| "null", |
| { |
| "type":"record", |
| "name":"inner_record_name", |
| "namespace":"inner_space", |
| "fields":[ |
| { |
| "name":"inner_field_1", |
| "type":"double" |
| } |
| ] |
| } |
| ] |
| }, |
| { |
| "name": "outer_field_2", |
| "type" : "inner_space.inner_record_name" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| assert_eq!(rs.get_names().len(), 2); |
| for s in &["space.record_name", "inner_space.inner_record_name"] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| } |
| |
| #[test] |
| fn avro_3448_test_proper_resolution_inner_enum_inner_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": [ |
| "null", |
| { |
| "type":"enum", |
| "name":"inner_enum_name", |
| "namespace": "inner_space", |
| "symbols":["Extensive","Testing"] |
| } |
| ] |
| }, |
| { |
| "name": "outer_field_2", |
| "type" : "inner_space.inner_enum_name" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| assert_eq!(rs.get_names().len(), 2); |
| for s in &["space.record_name", "inner_space.inner_enum_name"] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| } |
| |
| #[test] |
| fn avro_3448_test_proper_resolution_inner_fixed_inner_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": [ |
| "null", |
| { |
| "type":"fixed", |
| "name":"inner_fixed_name", |
| "namespace": "inner_space", |
| "size": 16 |
| } |
| ] |
| }, |
| { |
| "name": "outer_field_2", |
| "type" : "inner_space.inner_fixed_name" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| assert_eq!(rs.get_names().len(), 2); |
| for s in &["space.record_name", "inner_space.inner_fixed_name"] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| } |
| |
| #[test] |
| fn avro_3448_test_proper_multi_level_resolution_inner_record_outer_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": [ |
| "null", |
| { |
| "type":"record", |
| "name":"middle_record_name", |
| "fields":[ |
| { |
| "name":"middle_field_1", |
| "type":[ |
| "null", |
| { |
| "type":"record", |
| "name":"inner_record_name", |
| "fields":[ |
| { |
| "name":"inner_field_1", |
| "type":"double" |
| } |
| ] |
| } |
| ] |
| } |
| ] |
| } |
| ] |
| }, |
| { |
| "name": "outer_field_2", |
| "type" : "space.inner_record_name" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| assert_eq!(rs.get_names().len(), 3); |
| for s in &[ |
| "space.record_name", |
| "space.middle_record_name", |
| "space.inner_record_name", |
| ] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| } |
| |
| #[test] |
| fn avro_3448_test_proper_multi_level_resolution_inner_record_middle_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": [ |
| "null", |
| { |
| "type":"record", |
| "name":"middle_record_name", |
| "namespace":"middle_namespace", |
| "fields":[ |
| { |
| "name":"middle_field_1", |
| "type":[ |
| "null", |
| { |
| "type":"record", |
| "name":"inner_record_name", |
| "fields":[ |
| { |
| "name":"inner_field_1", |
| "type":"double" |
| } |
| ] |
| } |
| ] |
| } |
| ] |
| } |
| ] |
| }, |
| { |
| "name": "outer_field_2", |
| "type" : "middle_namespace.inner_record_name" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| assert_eq!(rs.get_names().len(), 3); |
| for s in &[ |
| "space.record_name", |
| "middle_namespace.middle_record_name", |
| "middle_namespace.inner_record_name", |
| ] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| } |
| |
| #[test] |
| fn avro_3448_test_proper_multi_level_resolution_inner_record_inner_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": [ |
| "null", |
| { |
| "type":"record", |
| "name":"middle_record_name", |
| "namespace":"middle_namespace", |
| "fields":[ |
| { |
| "name":"middle_field_1", |
| "type":[ |
| "null", |
| { |
| "type":"record", |
| "name":"inner_record_name", |
| "namespace":"inner_namespace", |
| "fields":[ |
| { |
| "name":"inner_field_1", |
| "type":"double" |
| } |
| ] |
| } |
| ] |
| } |
| ] |
| } |
| ] |
| }, |
| { |
| "name": "outer_field_2", |
| "type" : "inner_namespace.inner_record_name" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| assert_eq!(rs.get_names().len(), 3); |
| for s in &[ |
| "space.record_name", |
| "middle_namespace.middle_record_name", |
| "inner_namespace.inner_record_name", |
| ] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| } |
| |
| #[test] |
| fn avro_3448_test_proper_in_array_resolution_inherited_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": { |
| "type":"array", |
| "items":{ |
| "type":"record", |
| "name":"in_array_record", |
| "fields": [ |
| { |
| "name":"array_record_field", |
| "type":"string" |
| } |
| ] |
| } |
| } |
| }, |
| { |
| "name":"outer_field_2", |
| "type":"in_array_record" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| assert_eq!(rs.get_names().len(), 2); |
| for s in &["space.record_name", "space.in_array_record"] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| } |
| |
| #[test] |
| fn avro_3448_test_proper_in_map_resolution_inherited_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": { |
| "type":"map", |
| "values":{ |
| "type":"record", |
| "name":"in_map_record", |
| "fields": [ |
| { |
| "name":"map_record_field", |
| "type":"string" |
| } |
| ] |
| } |
| } |
| }, |
| { |
| "name":"outer_field_2", |
| "type":"in_map_record" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| assert_eq!(rs.get_names().len(), 2); |
| for s in &["space.record_name", "space.in_map_record"] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| } |
| |
| #[test] |
| fn avro_3466_test_to_json_inner_enum_inner_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": [ |
| "null", |
| { |
| "type":"enum", |
| "name":"inner_enum_name", |
| "namespace": "inner_space", |
| "symbols":["Extensive","Testing"] |
| } |
| ] |
| }, |
| { |
| "name": "outer_field_2", |
| "type" : "inner_space.inner_enum_name" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| |
| // confirm we have expected 2 full-names |
| assert_eq!(rs.get_names().len(), 2); |
| for s in &["space.record_name", "inner_space.inner_enum_name"] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| |
| // convert Schema back to JSON string |
| let schema_str = serde_json::to_string(&schema).expect("test failed"); |
| let _schema = Schema::parse_str(&schema_str).expect("test failed"); |
| assert_eq!(schema, _schema); |
| } |
| |
| #[test] |
| fn avro_3466_test_to_json_inner_fixed_inner_namespace() { |
| let schema = r#" |
| { |
| "name": "record_name", |
| "namespace": "space", |
| "type": "record", |
| "fields": [ |
| { |
| "name": "outer_field_1", |
| "type": [ |
| "null", |
| { |
| "type":"fixed", |
| "name":"inner_fixed_name", |
| "namespace": "inner_space", |
| "size":54 |
| } |
| ] |
| }, |
| { |
| "name": "outer_field_2", |
| "type" : "inner_space.inner_fixed_name" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema).unwrap(); |
| let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse"); |
| |
| // confirm we have expected 2 full-names |
| assert_eq!(rs.get_names().len(), 2); |
| for s in &["space.record_name", "inner_space.inner_fixed_name"] { |
| assert!(rs.get_names().contains_key(&Name::new(s).unwrap())); |
| } |
| |
| // convert Schema back to JSON string |
| let schema_str = serde_json::to_string(&schema).expect("test failed"); |
| let _schema = Schema::parse_str(&schema_str).expect("test failed"); |
| assert_eq!(schema, _schema); |
| } |
| |
| fn assert_avro_3512_aliases(aliases: &Aliases) { |
| match aliases { |
| Some(aliases) => { |
| assert_eq!(aliases.len(), 3); |
| assert_eq!(aliases[0], Alias::new("space.b").unwrap()); |
| assert_eq!(aliases[1], Alias::new("x.y").unwrap()); |
| assert_eq!(aliases[2], Alias::new(".c").unwrap()); |
| } |
| None => { |
| panic!("'aliases' must be Some"); |
| } |
| } |
| } |
| |
| #[test] |
| fn avro_3512_alias_with_null_namespace_record() { |
| let schema = Schema::parse_str( |
| r#" |
| { |
| "type": "record", |
| "name": "a", |
| "namespace": "space", |
| "aliases": ["b", "x.y", ".c"], |
| "fields" : [ |
| {"name": "time", "type": "long"} |
| ] |
| } |
| "#, |
| ) |
| .unwrap(); |
| |
| if let Schema::Record { ref aliases, .. } = schema { |
| assert_avro_3512_aliases(aliases); |
| } else { |
| panic!("The Schema should be a record: {:?}", schema); |
| } |
| } |
| |
| #[test] |
| fn avro_3512_alias_with_null_namespace_enum() { |
| let schema = Schema::parse_str( |
| r#" |
| { |
| "type": "enum", |
| "name": "a", |
| "namespace": "space", |
| "aliases": ["b", "x.y", ".c"], |
| "symbols" : [ |
| "symbol1", "symbol2" |
| ] |
| } |
| "#, |
| ) |
| .unwrap(); |
| |
| if let Schema::Enum { ref aliases, .. } = schema { |
| assert_avro_3512_aliases(aliases); |
| } else { |
| panic!("The Schema should be an enum: {:?}", schema); |
| } |
| } |
| |
| #[test] |
| fn avro_3512_alias_with_null_namespace_fixed() { |
| let schema = Schema::parse_str( |
| r#" |
| { |
| "type": "fixed", |
| "name": "a", |
| "namespace": "space", |
| "aliases": ["b", "x.y", ".c"], |
| "size" : 12 |
| } |
| "#, |
| ) |
| .unwrap(); |
| |
| if let Schema::Fixed { ref aliases, .. } = schema { |
| assert_avro_3512_aliases(aliases); |
| } else { |
| panic!("The Schema should be a fixed: {:?}", schema); |
| } |
| } |
| |
| #[test] |
| fn avro_3518_serialize_aliases_record() { |
| let schema = Schema::parse_str( |
| r#" |
| { |
| "type": "record", |
| "name": "a", |
| "namespace": "space", |
| "aliases": ["b", "x.y", ".c"], |
| "fields" : [ |
| {"name": "time", "type": "long"} |
| ] |
| } |
| "#, |
| ) |
| .unwrap(); |
| |
| let value = serde_json::to_value(&schema).unwrap(); |
| let serialized = serde_json::to_string(&value).unwrap(); |
| assert_eq!( |
| r#"{"aliases":["space.b","x.y","c"],"fields":[{"name":"time","type":"long"}],"name":"a","namespace":"space","type":"record"}"#, |
| &serialized |
| ); |
| assert_eq!(schema, Schema::parse_str(&serialized).unwrap()); |
| } |
| |
| #[test] |
| fn avro_3518_serialize_aliases_enum() { |
| let schema = Schema::parse_str( |
| r#" |
| { |
| "type": "enum", |
| "name": "a", |
| "namespace": "space", |
| "aliases": ["b", "x.y", ".c"], |
| "symbols" : [ |
| "symbol1", "symbol2" |
| ] |
| } |
| "#, |
| ) |
| .unwrap(); |
| |
| let value = serde_json::to_value(&schema).unwrap(); |
| let serialized = serde_json::to_string(&value).unwrap(); |
| assert_eq!( |
| r#"{"aliases":["space.b","x.y","c"],"name":"a","namespace":"space","symbols":["symbol1","symbol2"],"type":"enum"}"#, |
| &serialized |
| ); |
| assert_eq!(schema, Schema::parse_str(&serialized).unwrap()); |
| } |
| |
| #[test] |
| fn avro_3518_serialize_aliases_fixed() { |
| let schema = Schema::parse_str( |
| r#" |
| { |
| "type": "fixed", |
| "name": "a", |
| "namespace": "space", |
| "aliases": ["b", "x.y", ".c"], |
| "size" : 12 |
| } |
| "#, |
| ) |
| .unwrap(); |
| |
| let value = serde_json::to_value(&schema).unwrap(); |
| let serialized = serde_json::to_string(&value).unwrap(); |
| assert_eq!( |
| r#"{"aliases":["space.b","x.y","c"],"name":"a","namespace":"space","size":12,"type":"fixed"}"#, |
| &serialized |
| ); |
| assert_eq!(schema, Schema::parse_str(&serialized).unwrap()); |
| } |
| |
| #[test] |
| fn avro_3130_parse_anonymous_union_type() { |
| let schema_str = r#" |
| { |
| "type": "record", |
| "name": "AccountEvent", |
| "fields": [ |
| {"type": |
| ["null", |
| { "name": "accountList", |
| "type": { |
| "type": "array", |
| "items": "long" |
| } |
| } |
| ], |
| "name":"NullableLongArray" |
| } |
| ] |
| } |
| "#; |
| let schema = Schema::parse_str(schema_str).unwrap(); |
| |
| if let Schema::Record { name, fields, .. } = schema { |
| assert_eq!(name, Name::new("AccountEvent").unwrap()); |
| |
| let field = &fields[0]; |
| assert_eq!(&field.name, "NullableLongArray"); |
| |
| if let Schema::Union(ref union) = field.schema { |
| assert_eq!(union.schemas[0], Schema::Null); |
| |
| if let Schema::Array(ref array_schema) = union.schemas[1] { |
| if let Schema::Long = **array_schema { |
| // OK |
| } else { |
| panic!("Expected a Schema::Array of type Long"); |
| } |
| } else { |
| panic!("Expected Schema::Array"); |
| } |
| } else { |
| panic!("Expected Schema::Union"); |
| } |
| } else { |
| panic!("Expected Schema::Record"); |
| } |
| } |
| } |