| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Logic handling writing in Avro format at user level. |
| use crate::{ |
| encode::{encode, encode_internal, encode_to_vec}, |
| rabin::Rabin, |
| schema::{AvroSchema, ResolvedOwnedSchema, ResolvedSchema, Schema}, |
| ser::Serializer, |
| types::Value, |
| AvroResult, Codec, Error, |
| }; |
| use serde::Serialize; |
| use std::{collections::HashMap, convert::TryFrom, io::Write, marker::PhantomData}; |
| |
| const DEFAULT_BLOCK_SIZE: usize = 16000; |
| const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01"; |
| |
| /// Main interface for writing Avro formatted values. |
| #[derive(typed_builder::TypedBuilder)] |
| pub struct Writer<'a, W> { |
| schema: &'a Schema, |
| writer: W, |
| #[builder(default, setter(skip))] |
| resolved_schema: Option<ResolvedSchema<'a>>, |
| #[builder(default = Codec::Null)] |
| codec: Codec, |
| #[builder(default = DEFAULT_BLOCK_SIZE)] |
| block_size: usize, |
| #[builder(default = Vec::with_capacity(block_size), setter(skip))] |
| buffer: Vec<u8>, |
| #[builder(default, setter(skip))] |
| serializer: Serializer, |
| #[builder(default = 0, setter(skip))] |
| num_values: usize, |
| #[builder(default = generate_sync_marker())] |
| marker: [u8; 16], |
| #[builder(default = false, setter(skip))] |
| has_header: bool, |
| #[builder(default)] |
| user_metadata: HashMap<String, Value>, |
| } |
| |
| impl<'a, W: Write> Writer<'a, W> { |
| /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write |
| /// to. |
| /// No compression `Codec` will be used. |
| pub fn new(schema: &'a Schema, writer: W) -> Self { |
| Writer::with_codec(schema, writer, Codec::Null) |
| } |
| |
| /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the |
| /// `io::Write` trait to write to. |
| pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self { |
| let mut w = Self::builder() |
| .schema(schema) |
| .writer(writer) |
| .codec(codec) |
| .build(); |
| w.resolved_schema = ResolvedSchema::try_from(schema).ok(); |
| w |
| } |
| |
| /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the |
| /// `io::Write` trait to write to. |
| /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must |
| /// be provided in `schemata`. |
| pub fn with_schemata( |
| schema: &'a Schema, |
| schemata: Vec<&'a Schema>, |
| writer: W, |
| codec: Codec, |
| ) -> Self { |
| let mut w = Self::builder() |
| .schema(schema) |
| .writer(writer) |
| .codec(codec) |
| .build(); |
| w.resolved_schema = ResolvedSchema::try_from(schemata).ok(); |
| w |
| } |
| |
| /// Creates a `Writer` that will append values to already populated |
| /// `std::io::Write` using the provided `marker` |
| /// No compression `Codec` will be used. |
| pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> Self { |
| Writer::append_to_with_codec(schema, writer, Codec::Null, marker) |
| } |
| |
| /// Creates a `Writer` that will append values to already populated |
| /// `std::io::Write` using the provided `marker` |
| pub fn append_to_with_codec( |
| schema: &'a Schema, |
| writer: W, |
| codec: Codec, |
| marker: [u8; 16], |
| ) -> Self { |
| let mut w = Self::builder() |
| .schema(schema) |
| .writer(writer) |
| .codec(codec) |
| .marker(marker) |
| .build(); |
| w.has_header = true; |
| w.resolved_schema = ResolvedSchema::try_from(schema).ok(); |
| w |
| } |
| |
| /// Creates a `Writer` that will append values to already populated |
| /// `std::io::Write` using the provided `marker` |
| pub fn append_to_with_codec_schemata( |
| schema: &'a Schema, |
| schemata: Vec<&'a Schema>, |
| writer: W, |
| codec: Codec, |
| marker: [u8; 16], |
| ) -> Self { |
| let mut w = Self::builder() |
| .schema(schema) |
| .writer(writer) |
| .codec(codec) |
| .marker(marker) |
| .build(); |
| w.has_header = true; |
| w.resolved_schema = ResolvedSchema::try_from(schemata).ok(); |
| w |
| } |
| |
| /// Get a reference to the `Schema` associated to a `Writer`. |
| pub fn schema(&self) -> &'a Schema { |
| self.schema |
| } |
| |
| /// Append a compatible value (implementing the `ToAvro` trait) to a `Writer`, also performing |
| /// schema validation. |
| /// |
| /// Return the number of bytes written (it might be 0, see below). |
| /// |
| /// **NOTE** This function is not guaranteed to perform any actual write, since it relies on |
| /// internal buffering for performance reasons. If you want to be sure the value has been |
| /// written, then call [`flush`](struct.Writer.html#method.flush). |
| pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> { |
| let n = self.maybe_write_header()?; |
| |
| let avro = value.into(); |
| self.append_value_ref(&avro).map(|m| m + n) |
| } |
| |
| /// Append a compatible value to a `Writer`, also performing schema validation. |
| /// |
| /// Return the number of bytes written (it might be 0, see below). |
| /// |
| /// **NOTE** This function is not guaranteed to perform any actual write, since it relies on |
| /// internal buffering for performance reasons. If you want to be sure the value has been |
| /// written, then call [`flush`](struct.Writer.html#method.flush). |
| pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> { |
| let n = self.maybe_write_header()?; |
| |
| // Lazy init for users using the builder pattern with error throwing |
| match self.resolved_schema { |
| Some(ref rs) => { |
| write_value_ref_resolved(self.schema, rs, value, &mut self.buffer)?; |
| self.num_values += 1; |
| |
| if self.buffer.len() >= self.block_size { |
| return self.flush().map(|b| b + n); |
| } |
| |
| Ok(n) |
| } |
| None => { |
| let rs = ResolvedSchema::try_from(self.schema)?; |
| self.resolved_schema = Some(rs); |
| self.append_value_ref(value) |
| } |
| } |
| } |
| |
| /// Append anything implementing the `Serialize` trait to a `Writer` for |
| /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema |
| /// validation. |
| /// |
| /// Return the number of bytes written. |
| /// |
| /// **NOTE** This function is not guaranteed to perform any actual write, since it relies on |
| /// internal buffering for performance reasons. If you want to be sure the value has been |
| /// written, then call [`flush`](struct.Writer.html#method.flush). |
| pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> { |
| let avro_value = value.serialize(&mut self.serializer)?; |
| self.append(avro_value) |
| } |
| |
| /// Extend a `Writer` with an `Iterator` of compatible values (implementing the `ToAvro` |
| /// trait), also performing schema validation. |
| /// |
| /// Return the number of bytes written. |
| /// |
| /// **NOTE** This function forces the written data to be flushed (an implicit |
| /// call to [`flush`](struct.Writer.html#method.flush) is performed). |
| pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize> |
| where |
| I: IntoIterator<Item = T>, |
| { |
| /* |
| https://github.com/rust-lang/rfcs/issues/811 :( |
| let mut stream = values |
| .filter_map(|value| value.serialize(&mut self.serializer).ok()) |
| .map(|value| value.encode(self.schema)) |
| .collect::<Option<Vec<_>>>() |
| .ok_or_else(|| err_msg("value does not match given schema"))? |
| .into_iter() |
| .fold(Vec::new(), |mut acc, stream| { |
| num_values += 1; |
| acc.extend(stream); acc |
| }); |
| */ |
| |
| let mut num_bytes = 0; |
| for value in values { |
| num_bytes += self.append(value)?; |
| } |
| num_bytes += self.flush()?; |
| |
| Ok(num_bytes) |
| } |
| |
| /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for |
| /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema |
| /// validation. |
| /// |
| /// Return the number of bytes written. |
| /// |
| /// **NOTE** This function forces the written data to be flushed (an implicit |
| /// call to [`flush`](struct.Writer.html#method.flush) is performed). |
| pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize> |
| where |
| I: IntoIterator<Item = T>, |
| { |
| /* |
| https://github.com/rust-lang/rfcs/issues/811 :( |
| let mut stream = values |
| .filter_map(|value| value.serialize(&mut self.serializer).ok()) |
| .map(|value| value.encode(self.schema)) |
| .collect::<Option<Vec<_>>>() |
| .ok_or_else(|| err_msg("value does not match given schema"))? |
| .into_iter() |
| .fold(Vec::new(), |mut acc, stream| { |
| num_values += 1; |
| acc.extend(stream); acc |
| }); |
| */ |
| |
| let mut num_bytes = 0; |
| for value in values { |
| num_bytes += self.append_ser(value)?; |
| } |
| num_bytes += self.flush()?; |
| |
| Ok(num_bytes) |
| } |
| |
| /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema |
| /// validation on each value appended. |
| /// |
| /// Return the number of bytes written. |
| /// |
| /// **NOTE** This function forces the written data to be flushed (an implicit |
| /// call to [`flush`](struct.Writer.html#method.flush) is performed). |
| pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> { |
| let mut num_bytes = 0; |
| for value in values { |
| num_bytes += self.append_value_ref(value)?; |
| } |
| num_bytes += self.flush()?; |
| |
| Ok(num_bytes) |
| } |
| |
| /// Flush the content appended to a `Writer`. Call this function to make sure all the content |
| /// has been written before releasing the `Writer`. |
| /// |
| /// Return the number of bytes written. |
| pub fn flush(&mut self) -> AvroResult<usize> { |
| if self.num_values == 0 { |
| return Ok(0); |
| } |
| |
| self.codec.compress(&mut self.buffer)?; |
| |
| let num_values = self.num_values; |
| let stream_len = self.buffer.len(); |
| |
| let num_bytes = self.append_raw(&num_values.into(), &Schema::Long)? |
| + self.append_raw(&stream_len.into(), &Schema::Long)? |
| + self |
| .writer |
| .write(self.buffer.as_ref()) |
| .map_err(Error::WriteBytes)? |
| + self.append_marker()?; |
| |
| self.buffer.clear(); |
| self.num_values = 0; |
| |
| Ok(num_bytes) |
| } |
| |
| /// Return what the `Writer` is writing to, consuming the `Writer` itself. |
| /// |
| /// **NOTE** This function forces the written data to be flushed (an implicit |
| /// call to [`flush`](struct.Writer.html#method.flush) is performed). |
| pub fn into_inner(mut self) -> AvroResult<W> { |
| self.flush()?; |
| Ok(self.writer) |
| } |
| |
| /// Generate and append synchronization marker to the payload. |
| fn append_marker(&mut self) -> AvroResult<usize> { |
| // using .writer.write directly to avoid mutable borrow of self |
| // with ref borrowing of self.marker |
| self.writer.write(&self.marker).map_err(Error::WriteMarker) |
| } |
| |
| /// Append a raw Avro Value to the payload avoiding to encode it again. |
| fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> { |
| self.append_bytes(encode_to_vec(value, schema)?.as_ref()) |
| } |
| |
| /// Append pure bytes to the payload. |
| fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> { |
| self.writer.write(bytes).map_err(Error::WriteBytes) |
| } |
| |
| /// Adds custom metadata to the file. |
| /// This method could be used only before adding the first record to the writer. |
| pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> { |
| if !self.has_header { |
| if key.starts_with("avro.") { |
| return Err(Error::InvalidMetadataKey(key)); |
| } |
| self.user_metadata |
| .insert(key, Value::Bytes(value.as_ref().to_vec())); |
| Ok(()) |
| } else { |
| Err(Error::FileHeaderAlreadyWritten) |
| } |
| } |
| |
| /// Create an Avro header based on schema, codec and sync marker. |
| fn header(&self) -> Result<Vec<u8>, Error> { |
| let schema_bytes = serde_json::to_string(self.schema) |
| .map_err(Error::ConvertJsonToString)? |
| .into_bytes(); |
| |
| let mut metadata = HashMap::with_capacity(2); |
| metadata.insert("avro.schema", Value::Bytes(schema_bytes)); |
| metadata.insert("avro.codec", self.codec.into()); |
| |
| for (k, v) in &self.user_metadata { |
| metadata.insert(k.as_str(), v.clone()); |
| } |
| |
| let mut header = Vec::new(); |
| header.extend_from_slice(AVRO_OBJECT_HEADER); |
| encode( |
| &metadata.into(), |
| &Schema::Map(Box::new(Schema::Bytes)), |
| &mut header, |
| )?; |
| header.extend_from_slice(&self.marker); |
| |
| Ok(header) |
| } |
| |
| fn maybe_write_header(&mut self) -> AvroResult<usize> { |
| if !self.has_header { |
| let header = self.header()?; |
| let n = self.append_bytes(header.as_ref())?; |
| self.has_header = true; |
| Ok(n) |
| } else { |
| Ok(0) |
| } |
| } |
| } |
| |
| /// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also performing |
| /// schema validation. |
| /// |
| /// This is an internal function which gets the bytes buffer where to write as parameter instead of |
| /// creating a new one like `to_avro_datum`. |
| fn write_avro_datum<T: Into<Value>>( |
| schema: &Schema, |
| value: T, |
| buffer: &mut Vec<u8>, |
| ) -> Result<(), Error> { |
| let avro = value.into(); |
| if !avro.validate(schema) { |
| return Err(Error::Validation); |
| } |
| encode(&avro, schema, buffer)?; |
| Ok(()) |
| } |
| |
| fn write_avro_datum_schemata<T: Into<Value>>( |
| schema: &Schema, |
| schemata: Vec<&Schema>, |
| value: T, |
| buffer: &mut Vec<u8>, |
| ) -> AvroResult<()> { |
| let avro = value.into(); |
| let rs = ResolvedSchema::try_from(schemata)?; |
| let names = rs.get_names(); |
| let enclosing_namespace = schema.namespace(); |
| if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) { |
| return Err(Error::Validation); |
| } |
| encode_internal(&avro, schema, names, &enclosing_namespace, buffer) |
| } |
| |
| /// Writer that encodes messages according to the single object encoding v1 spec |
| /// Uses an API similar to the current File Writer |
| /// Writes all object bytes at once, and drains internal buffer |
| pub struct GenericSingleObjectWriter { |
| buffer: Vec<u8>, |
| resolved: ResolvedOwnedSchema, |
| } |
| |
| impl GenericSingleObjectWriter { |
| pub fn new_with_capacity( |
| schema: &Schema, |
| initial_buffer_cap: usize, |
| ) -> AvroResult<GenericSingleObjectWriter> { |
| let fingerprint = schema.fingerprint::<Rabin>(); |
| let mut buffer = Vec::with_capacity(initial_buffer_cap); |
| let header = [ |
| 0xC3, |
| 0x01, |
| fingerprint.bytes[0], |
| fingerprint.bytes[1], |
| fingerprint.bytes[2], |
| fingerprint.bytes[3], |
| fingerprint.bytes[4], |
| fingerprint.bytes[5], |
| fingerprint.bytes[6], |
| fingerprint.bytes[7], |
| ]; |
| buffer.extend_from_slice(&header); |
| |
| Ok(GenericSingleObjectWriter { |
| buffer, |
| resolved: ResolvedOwnedSchema::try_from(schema.clone())?, |
| }) |
| } |
| |
| /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header |
| pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> { |
| if self.buffer.len() != 10 { |
| Err(Error::IllegalSingleObjectWriterState) |
| } else { |
| write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?; |
| writer.write_all(&self.buffer).map_err(Error::WriteBytes)?; |
| let len = self.buffer.len(); |
| self.buffer.truncate(10); |
| Ok(len) |
| } |
| } |
| |
| /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header |
| pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> { |
| self.write_value_ref(&v, writer) |
| } |
| } |
| |
| /// Writer that encodes messages according to the single object encoding v1 spec |
| pub struct SpecificSingleObjectWriter<T> |
| where |
| T: AvroSchema, |
| { |
| inner: GenericSingleObjectWriter, |
| _model: PhantomData<T>, |
| } |
| |
| impl<T> SpecificSingleObjectWriter<T> |
| where |
| T: AvroSchema, |
| { |
| pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> { |
| let schema = T::get_schema(); |
| Ok(SpecificSingleObjectWriter { |
| inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?, |
| _model: PhantomData, |
| }) |
| } |
| } |
| |
| impl<T> SpecificSingleObjectWriter<T> |
| where |
| T: AvroSchema + Into<Value>, |
| { |
| /// Write the `Into<Value>` to the provided Write object. Returns a result with the number |
| /// of bytes written including the header |
| pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> { |
| let v: Value = data.into(); |
| self.inner.write_value_ref(&v, writer) |
| } |
| } |
| |
| impl<T> SpecificSingleObjectWriter<T> |
| where |
| T: AvroSchema + Serialize, |
| { |
| /// Write the referenced Serialize object to the provided Write object. Returns a result with |
| /// the number of bytes written including the header |
| pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> { |
| let mut serializer = Serializer::default(); |
| let v = data.serialize(&mut serializer)?; |
| self.inner.write_value_ref(&v, writer) |
| } |
| |
| /// Write the Serialize object to the provided Write object. Returns a result with the number |
| /// of bytes written including the header |
| pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> { |
| self.write_ref(&data, writer) |
| } |
| } |
| |
| fn write_value_ref_resolved( |
| schema: &Schema, |
| resolved_schema: &ResolvedSchema, |
| value: &Value, |
| buffer: &mut Vec<u8>, |
| ) -> AvroResult<()> { |
| match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) { |
| Some(err) => Err(Error::ValidationWithReason(err)), |
| None => encode_internal( |
| value, |
| schema, |
| resolved_schema.get_names(), |
| &schema.namespace(), |
| buffer, |
| ), |
| } |
| } |
| |
| fn write_value_ref_owned_resolved( |
| resolved_schema: &ResolvedOwnedSchema, |
| value: &Value, |
| buffer: &mut Vec<u8>, |
| ) -> AvroResult<()> { |
| let root_schema = resolved_schema.get_root_schema(); |
| if let Some(err) = value.validate_internal( |
| root_schema, |
| resolved_schema.get_names(), |
| &root_schema.namespace(), |
| ) { |
| return Err(Error::ValidationWithReason(err)); |
| } |
| encode_internal( |
| value, |
| root_schema, |
| resolved_schema.get_names(), |
| &root_schema.namespace(), |
| buffer, |
| )?; |
| Ok(()) |
| } |
| |
| /// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also |
| /// performing schema validation. |
| /// |
| /// **NOTE** This function has a quite small niche of usage and does NOT generate headers and sync |
| /// markers; use [`Writer`](struct.Writer.html) to be fully Avro-compatible if you don't know what |
| /// you are doing, instead. |
| pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> { |
| let mut buffer = Vec::new(); |
| write_avro_datum(schema, value, &mut buffer)?; |
| Ok(buffer) |
| } |
| |
| /// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also |
| /// performing schema validation. |
| /// If the provided `schema` is incomplete then its dependencies must be |
| /// provided in `schemata` |
| pub fn to_avro_datum_schemata<T: Into<Value>>( |
| schema: &Schema, |
| schemata: Vec<&Schema>, |
| value: T, |
| ) -> AvroResult<Vec<u8>> { |
| let mut buffer = Vec::new(); |
| write_avro_datum_schemata(schema, schemata, value, &mut buffer)?; |
| Ok(buffer) |
| } |
| |
| #[cfg(not(target_arch = "wasm32"))] |
| fn generate_sync_marker() -> [u8; 16] { |
| let mut marker = [0_u8; 16]; |
| std::iter::repeat_with(rand::random) |
| .take(16) |
| .enumerate() |
| .for_each(|(i, n)| marker[i] = n); |
| marker |
| } |
| |
| #[cfg(target_arch = "wasm32")] |
| fn generate_sync_marker() -> [u8; 16] { |
| let mut marker = [0_u8; 16]; |
| std::iter::repeat_with(quad_rand::rand) |
| .take(4) |
| .flat_map(|i| i.to_be_bytes()) |
| .enumerate() |
| .for_each(|(i, n)| marker[i] = n); |
| marker |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::{ |
| decimal::Decimal, |
| duration::{Days, Duration, Millis, Months}, |
| schema::{DecimalSchema, FixedSchema, Name}, |
| types::Record, |
| util::zig_i64, |
| }; |
| use pretty_assertions::assert_eq; |
| use serde_derive::{Deserialize, Serialize}; |
| |
| use apache_avro_test_helper::TestResult; |
| |
| const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len(); |
| |
| const SCHEMA: &str = r#" |
| { |
| "type": "record", |
| "name": "test", |
| "fields": [ |
| { |
| "name": "a", |
| "type": "long", |
| "default": 42 |
| }, |
| { |
| "name": "b", |
| "type": "string" |
| } |
| ] |
| } |
| "#; |
| const UNION_SCHEMA: &str = r#"["null", "long"]"#; |
| |
| #[test] |
| fn test_to_avro_datum() -> TestResult { |
| let schema = Schema::parse_str(SCHEMA)?; |
| let mut record = Record::new(&schema).unwrap(); |
| record.put("a", 27i64); |
| record.put("b", "foo"); |
| |
| let mut expected = Vec::new(); |
| zig_i64(27, &mut expected); |
| zig_i64(3, &mut expected); |
| expected.extend(vec![b'f', b'o', b'o'].into_iter()); |
| |
| assert_eq!(to_avro_datum(&schema, record)?, expected); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_union_not_null() -> TestResult { |
| let schema = Schema::parse_str(UNION_SCHEMA)?; |
| let union = Value::Union(1, Box::new(Value::Long(3))); |
| |
| let mut expected = Vec::new(); |
| zig_i64(1, &mut expected); |
| zig_i64(3, &mut expected); |
| |
| assert_eq!(to_avro_datum(&schema, union)?, expected); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_union_null() -> TestResult { |
| let schema = Schema::parse_str(UNION_SCHEMA)?; |
| let union = Value::Union(0, Box::new(Value::Null)); |
| |
| let mut expected = Vec::new(); |
| zig_i64(0, &mut expected); |
| |
| assert_eq!(to_avro_datum(&schema, union)?, expected); |
| |
| Ok(()) |
| } |
| |
| fn logical_type_test<T: Into<Value> + Clone>( |
| schema_str: &'static str, |
| |
| expected_schema: &Schema, |
| value: Value, |
| |
| raw_schema: &Schema, |
| raw_value: T, |
| ) -> TestResult { |
| let schema = Schema::parse_str(schema_str)?; |
| assert_eq!(&schema, expected_schema); |
| // The serialized format should be the same as the schema. |
| let ser = to_avro_datum(&schema, value.clone())?; |
| let raw_ser = to_avro_datum(raw_schema, raw_value)?; |
| assert_eq!(ser, raw_ser); |
| |
| // Should deserialize from the schema into the logical type. |
| let mut r = ser.as_slice(); |
| let de = crate::from_avro_datum(&schema, &mut r, None)?; |
| assert_eq!(de, value); |
| Ok(()) |
| } |
| |
| #[test] |
| fn date() -> TestResult { |
| logical_type_test( |
| r#"{"type": "int", "logicalType": "date"}"#, |
| &Schema::Date, |
| Value::Date(1_i32), |
| &Schema::Int, |
| 1_i32, |
| ) |
| } |
| |
| #[test] |
| fn time_millis() -> TestResult { |
| logical_type_test( |
| r#"{"type": "int", "logicalType": "time-millis"}"#, |
| &Schema::TimeMillis, |
| Value::TimeMillis(1_i32), |
| &Schema::Int, |
| 1_i32, |
| ) |
| } |
| |
| #[test] |
| fn time_micros() -> TestResult { |
| logical_type_test( |
| r#"{"type": "long", "logicalType": "time-micros"}"#, |
| &Schema::TimeMicros, |
| Value::TimeMicros(1_i64), |
| &Schema::Long, |
| 1_i64, |
| ) |
| } |
| |
| #[test] |
| fn timestamp_millis() -> TestResult { |
| logical_type_test( |
| r#"{"type": "long", "logicalType": "timestamp-millis"}"#, |
| &Schema::TimestampMillis, |
| Value::TimestampMillis(1_i64), |
| &Schema::Long, |
| 1_i64, |
| ) |
| } |
| |
| #[test] |
| fn timestamp_micros() -> TestResult { |
| logical_type_test( |
| r#"{"type": "long", "logicalType": "timestamp-micros"}"#, |
| &Schema::TimestampMicros, |
| Value::TimestampMicros(1_i64), |
| &Schema::Long, |
| 1_i64, |
| ) |
| } |
| |
| #[test] |
| fn decimal_fixed() -> TestResult { |
| let size = 30; |
| let inner = Schema::Fixed(FixedSchema { |
| name: Name::new("decimal")?, |
| aliases: None, |
| doc: None, |
| size, |
| attributes: Default::default(), |
| }); |
| let value = vec![0u8; size]; |
| logical_type_test( |
| r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#, |
| &Schema::Decimal(DecimalSchema { |
| precision: 20, |
| scale: 5, |
| inner: Box::new(inner.clone()), |
| }), |
| Value::Decimal(Decimal::from(value.clone())), |
| &inner, |
| Value::Fixed(size, value), |
| ) |
| } |
| |
| #[test] |
| fn decimal_bytes() -> TestResult { |
| let inner = Schema::Bytes; |
| let value = vec![0u8; 10]; |
| logical_type_test( |
| r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#, |
| &Schema::Decimal(DecimalSchema { |
| precision: 4, |
| scale: 3, |
| inner: Box::new(inner.clone()), |
| }), |
| Value::Decimal(Decimal::from(value.clone())), |
| &inner, |
| value, |
| ) |
| } |
| |
| #[test] |
| fn duration() -> TestResult { |
| let inner = Schema::Fixed(FixedSchema { |
| name: Name::new("duration")?, |
| aliases: None, |
| doc: None, |
| size: 12, |
| attributes: Default::default(), |
| }); |
| let value = Value::Duration(Duration::new( |
| Months::new(256), |
| Days::new(512), |
| Millis::new(1024), |
| )); |
| logical_type_test( |
| r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#, |
| &Schema::Duration, |
| value, |
| &inner, |
| Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]), |
| ) |
| } |
| |
| #[test] |
| fn test_writer_append() -> TestResult { |
| let schema = Schema::parse_str(SCHEMA)?; |
| let mut writer = Writer::new(&schema, Vec::new()); |
| |
| let mut record = Record::new(&schema).unwrap(); |
| record.put("a", 27i64); |
| record.put("b", "foo"); |
| |
| let n1 = writer.append(record.clone())?; |
| let n2 = writer.append(record.clone())?; |
| let n3 = writer.flush()?; |
| let result = writer.into_inner()?; |
| |
| assert_eq!(n1 + n2 + n3, result.len()); |
| |
| let mut data = Vec::new(); |
| zig_i64(27, &mut data); |
| zig_i64(3, &mut data); |
| data.extend(b"foo"); |
| data.extend(data.clone()); |
| |
| // starts with magic |
| assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER); |
| // ends with data and sync marker |
| let last_data_byte = result.len() - 16; |
| assert_eq!( |
| &result[last_data_byte - data.len()..last_data_byte], |
| data.as_slice() |
| ); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_writer_extend() -> TestResult { |
| let schema = Schema::parse_str(SCHEMA)?; |
| let mut writer = Writer::new(&schema, Vec::new()); |
| |
| let mut record = Record::new(&schema).unwrap(); |
| record.put("a", 27i64); |
| record.put("b", "foo"); |
| let record_copy = record.clone(); |
| let records = vec![record, record_copy]; |
| |
| let n1 = writer.extend(records.into_iter())?; |
| let n2 = writer.flush()?; |
| let result = writer.into_inner()?; |
| |
| assert_eq!(n1 + n2, result.len()); |
| |
| let mut data = Vec::new(); |
| zig_i64(27, &mut data); |
| zig_i64(3, &mut data); |
| data.extend(b"foo"); |
| data.extend(data.clone()); |
| |
| // starts with magic |
| assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER); |
| // ends with data and sync marker |
| let last_data_byte = result.len() - 16; |
| assert_eq!( |
| &result[last_data_byte - data.len()..last_data_byte], |
| data.as_slice() |
| ); |
| |
| Ok(()) |
| } |
| |
| #[derive(Debug, Clone, Deserialize, Serialize)] |
| struct TestSerdeSerialize { |
| a: i64, |
| b: String, |
| } |
| |
| #[test] |
| fn test_writer_append_ser() -> TestResult { |
| let schema = Schema::parse_str(SCHEMA)?; |
| let mut writer = Writer::new(&schema, Vec::new()); |
| |
| let record = TestSerdeSerialize { |
| a: 27, |
| b: "foo".to_owned(), |
| }; |
| |
| let n1 = writer.append_ser(record)?; |
| let n2 = writer.flush()?; |
| let result = writer.into_inner()?; |
| |
| assert_eq!(n1 + n2, result.len()); |
| |
| let mut data = Vec::new(); |
| zig_i64(27, &mut data); |
| zig_i64(3, &mut data); |
| data.extend(b"foo"); |
| |
| // starts with magic |
| assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER); |
| // ends with data and sync marker |
| let last_data_byte = result.len() - 16; |
| assert_eq!( |
| &result[last_data_byte - data.len()..last_data_byte], |
| data.as_slice() |
| ); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_writer_extend_ser() -> TestResult { |
| let schema = Schema::parse_str(SCHEMA)?; |
| let mut writer = Writer::new(&schema, Vec::new()); |
| |
| let record = TestSerdeSerialize { |
| a: 27, |
| b: "foo".to_owned(), |
| }; |
| let record_copy = record.clone(); |
| let records = vec![record, record_copy]; |
| |
| let n1 = writer.extend_ser(records.into_iter())?; |
| let n2 = writer.flush()?; |
| let result = writer.into_inner()?; |
| |
| assert_eq!(n1 + n2, result.len()); |
| |
| let mut data = Vec::new(); |
| zig_i64(27, &mut data); |
| zig_i64(3, &mut data); |
| data.extend(b"foo"); |
| data.extend(data.clone()); |
| |
| // starts with magic |
| assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER); |
| // ends with data and sync marker |
| let last_data_byte = result.len() - 16; |
| assert_eq!( |
| &result[last_data_byte - data.len()..last_data_byte], |
| data.as_slice() |
| ); |
| |
| Ok(()) |
| } |
| |
| fn make_writer_with_codec(schema: &Schema) -> Writer<'_, Vec<u8>> { |
| Writer::with_codec(schema, Vec::new(), Codec::Deflate) |
| } |
| |
| fn make_writer_with_builder(schema: &Schema) -> Writer<'_, Vec<u8>> { |
| Writer::builder() |
| .writer(Vec::new()) |
| .schema(schema) |
| .codec(Codec::Deflate) |
| .block_size(100) |
| .build() |
| } |
| |
| fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult { |
| let mut record = Record::new(schema).unwrap(); |
| record.put("a", 27i64); |
| record.put("b", "foo"); |
| |
| let n1 = writer.append(record.clone())?; |
| let n2 = writer.append(record.clone())?; |
| let n3 = writer.flush()?; |
| let result = writer.into_inner()?; |
| |
| assert_eq!(n1 + n2 + n3, result.len()); |
| |
| let mut data = Vec::new(); |
| zig_i64(27, &mut data); |
| zig_i64(3, &mut data); |
| data.extend(b"foo"); |
| data.extend(data.clone()); |
| Codec::Deflate.compress(&mut data)?; |
| |
| // starts with magic |
| assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER); |
| // ends with data and sync marker |
| let last_data_byte = result.len() - 16; |
| assert_eq!( |
| &result[last_data_byte - data.len()..last_data_byte], |
| data.as_slice() |
| ); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_writer_with_codec() -> TestResult { |
| let schema = Schema::parse_str(SCHEMA)?; |
| let writer = make_writer_with_codec(&schema); |
| check_writer(writer, &schema) |
| } |
| |
| #[test] |
| fn test_writer_with_builder() -> TestResult { |
| let schema = Schema::parse_str(SCHEMA)?; |
| let writer = make_writer_with_builder(&schema); |
| check_writer(writer, &schema) |
| } |
| |
| #[test] |
| fn test_logical_writer() -> TestResult { |
| const LOGICAL_TYPE_SCHEMA: &str = r#" |
| { |
| "type": "record", |
| "name": "logical_type_test", |
| "fields": [ |
| { |
| "name": "a", |
| "type": [ |
| "null", |
| { |
| "type": "long", |
| "logicalType": "timestamp-micros" |
| } |
| ] |
| } |
| ] |
| } |
| "#; |
| let codec = Codec::Deflate; |
| let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?; |
| let mut writer = Writer::builder() |
| .schema(&schema) |
| .codec(codec) |
| .writer(Vec::new()) |
| .build(); |
| |
| let mut record1 = Record::new(&schema).unwrap(); |
| record1.put( |
| "a", |
| Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))), |
| ); |
| |
| let mut record2 = Record::new(&schema).unwrap(); |
| record2.put("a", Value::Union(0, Box::new(Value::Null))); |
| |
| let n1 = writer.append(record1)?; |
| let n2 = writer.append(record2)?; |
| let n3 = writer.flush()?; |
| let result = writer.into_inner()?; |
| |
| assert_eq!(n1 + n2 + n3, result.len()); |
| |
| let mut data = Vec::new(); |
| // byte indicating not null |
| zig_i64(1, &mut data); |
| zig_i64(1234, &mut data); |
| |
| // byte indicating null |
| zig_i64(0, &mut data); |
| codec.compress(&mut data)?; |
| |
| // starts with magic |
| assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER); |
| // ends with data and sync marker |
| let last_data_byte = result.len() - 16; |
| assert_eq!( |
| &result[last_data_byte - data.len()..last_data_byte], |
| data.as_slice() |
| ); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_avro_3405_writer_add_metadata_success() -> TestResult { |
| let schema = Schema::parse_str(SCHEMA)?; |
| let mut writer = Writer::new(&schema, Vec::new()); |
| |
| writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?; |
| writer.add_user_metadata("strKey".to_string(), "strValue")?; |
| writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?; |
| writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?; |
| |
| let mut record = Record::new(&schema).unwrap(); |
| record.put("a", 27i64); |
| record.put("b", "foo"); |
| |
| writer.append(record.clone())?; |
| writer.append(record.clone())?; |
| writer.flush()?; |
| let result = writer.into_inner()?; |
| |
| assert_eq!(result.len(), 260); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_avro_3405_writer_add_metadata_failure() -> TestResult { |
| let schema = Schema::parse_str(SCHEMA)?; |
| let mut writer = Writer::new(&schema, Vec::new()); |
| |
| let mut record = Record::new(&schema).unwrap(); |
| record.put("a", 27i64); |
| record.put("b", "foo"); |
| writer.append(record.clone())?; |
| |
| match writer.add_user_metadata("stringKey".to_string(), String::from("value2")) { |
| Err(e @ Error::FileHeaderAlreadyWritten) => { |
| assert_eq!(e.to_string(), "The file metadata is already flushed.") |
| } |
| Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"), |
| Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"), |
| } |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult { |
| let schema = Schema::parse_str(SCHEMA)?; |
| let mut writer = Writer::new(&schema, Vec::new()); |
| |
| let key = "avro.stringKey".to_string(); |
| match writer.add_user_metadata(key.clone(), "value") { |
| Err(ref e @ Error::InvalidMetadataKey(_)) => { |
| assert_eq!(e.to_string(), format!("Metadata keys starting with 'avro.' are reserved for internal usage: {key}.")) |
| } |
| Err(e) => panic!( |
| "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}" |
| ), |
| Ok(_) => panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'"), |
| } |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult { |
| let schema = Schema::parse_str(SCHEMA)?; |
| |
| let mut user_meta_data: HashMap<String, Value> = HashMap::new(); |
| user_meta_data.insert( |
| "stringKey".to_string(), |
| Value::String("stringValue".to_string()), |
| ); |
| user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec())); |
| user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3])); |
| |
| let writer: Writer<'_, Vec<u8>> = Writer::builder() |
| .writer(Vec::new()) |
| .schema(&schema) |
| .user_metadata(user_meta_data.clone()) |
| .build(); |
| |
| assert_eq!(writer.user_metadata, user_meta_data); |
| |
| Ok(()) |
| } |
| |
| #[derive(Serialize, Clone)] |
| struct TestSingleObjectWriter { |
| a: i64, |
| b: f64, |
| c: Vec<String>, |
| } |
| |
| impl AvroSchema for TestSingleObjectWriter { |
| fn get_schema() -> Schema { |
| let schema = r#" |
| { |
| "type":"record", |
| "name":"TestSingleObjectWrtierSerialize", |
| "fields":[ |
| { |
| "name":"a", |
| "type":"long" |
| }, |
| { |
| "name":"b", |
| "type":"double" |
| }, |
| { |
| "name":"c", |
| "type":{ |
| "type":"array", |
| "items":"string" |
| } |
| } |
| ] |
| } |
| "#; |
| Schema::parse_str(schema).unwrap() |
| } |
| } |
| |
| impl From<TestSingleObjectWriter> for Value { |
| fn from(obj: TestSingleObjectWriter) -> Value { |
| Value::Record(vec![ |
| ("a".into(), obj.a.into()), |
| ("b".into(), obj.b.into()), |
| ( |
| "c".into(), |
| Value::Array(obj.c.into_iter().map(|s| s.into()).collect()), |
| ), |
| ]) |
| } |
| } |
| |
| #[test] |
| fn test_single_object_writer() -> TestResult { |
| let mut buf: Vec<u8> = Vec::new(); |
| let obj = TestSingleObjectWriter { |
| a: 300, |
| b: 34.555, |
| c: vec!["cat".into(), "dog".into()], |
| }; |
| let mut writer = GenericSingleObjectWriter::new_with_capacity( |
| &TestSingleObjectWriter::get_schema(), |
| 1024, |
| ) |
| .expect("Should resolve schema"); |
| let value = obj.into(); |
| let written_bytes = writer |
| .write_value_ref(&value, &mut buf) |
| .expect("Error serializing properly"); |
| |
| assert!(buf.len() > 10, "no bytes written"); |
| assert_eq!(buf.len(), written_bytes); |
| assert_eq!(buf[0], 0xC3); |
| assert_eq!(buf[1], 0x01); |
| assert_eq!( |
| &buf[2..10], |
| &TestSingleObjectWriter::get_schema() |
| .fingerprint::<Rabin>() |
| .bytes[..] |
| ); |
| let mut msg_binary = Vec::new(); |
| encode( |
| &value, |
| &TestSingleObjectWriter::get_schema(), |
| &mut msg_binary, |
| ) |
| .expect("encode should have failed by here as a dependency of any writing"); |
| assert_eq!(&buf[10..], &msg_binary[..]); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_writer_parity() -> TestResult { |
| let obj1 = TestSingleObjectWriter { |
| a: 300, |
| b: 34.555, |
| c: vec!["cat".into(), "dog".into()], |
| }; |
| |
| let mut buf1: Vec<u8> = Vec::new(); |
| let mut buf2: Vec<u8> = Vec::new(); |
| let mut buf3: Vec<u8> = Vec::new(); |
| |
| let mut generic_writer = GenericSingleObjectWriter::new_with_capacity( |
| &TestSingleObjectWriter::get_schema(), |
| 1024, |
| ) |
| .expect("Should resolve schema"); |
| let mut specific_writer = |
| SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024) |
| .expect("Resolved should pass"); |
| specific_writer |
| .write(obj1.clone(), &mut buf1) |
| .expect("Serialization expected"); |
| specific_writer |
| .write_value(obj1.clone(), &mut buf2) |
| .expect("Serialization expected"); |
| generic_writer |
| .write_value(obj1.into(), &mut buf3) |
| .expect("Serialization expected"); |
| assert_eq!(buf1, buf2); |
| assert_eq!(buf1, buf3); |
| |
| Ok(()) |
| } |
| } |