blob: de68c18da85132d350952b90c481c9857386206c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Logic handling reading from Avro format at user level.
use crate::{
decode::{decode, decode_internal},
from_value,
rabin::Rabin,
schema::{AvroSchema, ResolvedOwnedSchema, Schema},
types::Value,
util, AvroResult, Codec, Error,
};
use serde::de::DeserializeOwned;
use serde_json::from_slice;
use std::{
collections::HashMap,
convert::TryFrom,
io::{ErrorKind, Read},
marker::PhantomData,
str::FromStr,
};
// Internal Block reader.
#[derive(Debug, Clone)]
struct Block<R> {
reader: R,
// Internal buffering to reduce allocation.
buf: Vec<u8>,
buf_idx: usize,
// Number of elements expected to exist within this block.
message_count: usize,
marker: [u8; 16],
codec: Codec,
writer_schema: Schema,
user_metadata: HashMap<String, Vec<u8>>,
}
impl<R: Read> Block<R> {
fn new(reader: R) -> AvroResult<Block<R>> {
let mut block = Block {
reader,
codec: Codec::Null,
writer_schema: Schema::Null,
buf: vec![],
buf_idx: 0,
message_count: 0,
marker: [0; 16],
user_metadata: Default::default(),
};
block.read_header()?;
Ok(block)
}
/// Try to read the header and to set the writer `Schema`, the `Codec` and the marker based on
/// its content.
fn read_header(&mut self) -> AvroResult<()> {
let meta_schema = Schema::Map(Box::new(Schema::Bytes));
let mut buf = [0u8; 4];
self.reader
.read_exact(&mut buf)
.map_err(Error::ReadHeader)?;
if buf != [b'O', b'b', b'j', 1u8] {
return Err(Error::HeaderMagic);
}
if let Value::Map(metadata) = decode(&meta_schema, &mut self.reader)? {
self.read_writer_schema(&metadata)?;
self.codec = read_codec(&metadata)?;
for (key, value) in metadata {
if key == "avro.schema" || key == "avro.codec" {
// already processed
} else if key.starts_with("avro.") {
warn!("Ignoring unknown metadata key: {}", key);
} else {
self.read_user_metadata(key, value);
}
}
} else {
return Err(Error::GetHeaderMetadata);
}
self.reader
.read_exact(&mut self.marker)
.map_err(Error::ReadMarker)
}
fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
// The buffer needs to contain exactly `n` elements, otherwise codecs will potentially read
// invalid bytes.
//
// The are two cases to handle here:
//
// 1. `n > self.buf.len()`:
// In this case we call `Vec::resize`, which guarantees that `self.buf.len() == n`.
// 2. `n < self.buf.len()`:
// We need to resize to ensure that the buffer len is safe to read `n` elements.
//
// TODO: Figure out a way to avoid having to truncate for the second case.
self.buf.resize(util::safe_len(n)?, 0);
self.reader
.read_exact(&mut self.buf)
.map_err(Error::ReadIntoBuf)?;
self.buf_idx = 0;
Ok(())
}
/// Try to read a data block, also performing schema resolution for the objects contained in
/// the block. The objects are stored in an internal buffer to the `Reader`.
fn read_block_next(&mut self) -> AvroResult<()> {
assert!(self.is_empty(), "Expected self to be empty!");
match util::read_long(&mut self.reader) {
Ok(block_len) => {
self.message_count = block_len as usize;
let block_bytes = util::read_long(&mut self.reader)?;
self.fill_buf(block_bytes as usize)?;
let mut marker = [0u8; 16];
self.reader
.read_exact(&mut marker)
.map_err(Error::ReadBlockMarker)?;
if marker != self.marker {
return Err(Error::GetBlockMarker);
}
// NOTE (JAB): This doesn't fit this Reader pattern very well.
// `self.buf` is a growable buffer that is reused as the reader is iterated.
// For non `Codec::Null` variants, `decompress` will allocate a new `Vec`
// and replace `buf` with the new one, instead of reusing the same buffer.
// We can address this by using some "limited read" type to decode directly
// into the buffer. But this is fine, for now.
self.codec.decompress(&mut self.buf)
}
Err(Error::ReadVariableIntegerBytes(io_err)) => {
if let ErrorKind::UnexpectedEof = io_err.kind() {
// to not return any error in case we only finished to read cleanly from the stream
Ok(())
} else {
Err(Error::ReadVariableIntegerBytes(io_err))
}
}
Err(e) => Err(e),
}
}
fn len(&self) -> usize {
self.message_count
}
fn is_empty(&self) -> bool {
self.len() == 0
}
fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
if self.is_empty() {
self.read_block_next()?;
if self.is_empty() {
return Ok(None);
}
}
let mut block_bytes = &self.buf[self.buf_idx..];
let b_original = block_bytes.len();
let item = from_avro_datum(&self.writer_schema, &mut block_bytes, read_schema)?;
if b_original == block_bytes.len() {
// from_avro_datum did not consume any bytes, so return an error to avoid an infinite loop
return Err(Error::ReadBlock);
}
self.buf_idx += b_original - block_bytes.len();
self.message_count -= 1;
Ok(Some(item))
}
fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
let json = metadata
.get("avro.schema")
.and_then(|bytes| {
if let Value::Bytes(ref bytes) = *bytes {
from_slice(bytes.as_ref()).ok()
} else {
None
}
})
.ok_or(Error::GetAvroSchemaFromMap)?;
self.writer_schema = Schema::parse(&json)?;
Ok(())
}
fn read_user_metadata(&mut self, key: String, value: Value) {
match value {
Value::Bytes(ref vec) => {
self.user_metadata.insert(key, vec.clone());
}
wrong => {
warn!(
"User metadata values must be Value::Bytes, found {:?}",
wrong
);
}
}
}
}
fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
let result = metadata
.get("avro.codec")
.map(|codec| {
if let Value::Bytes(ref bytes) = *codec {
match std::str::from_utf8(bytes.as_ref()) {
Ok(utf8) => Ok(utf8),
Err(utf8_error) => Err(Error::ConvertToUtf8Error(utf8_error)),
}
} else {
Err(Error::BadCodecMetadata)
}
})
.map(|codec_res| match codec_res {
Ok(codec) => match Codec::from_str(codec) {
Ok(codec) => Ok(codec),
Err(_) => Err(Error::CodecNotSupported(codec.to_owned())),
},
Err(err) => Err(err),
});
match result {
Some(res) => res,
None => Ok(Codec::Null),
}
}
/// Main interface for reading Avro formatted values.
///
/// To be used as an iterator:
///
/// ```no_run
/// # use apache_avro::Reader;
/// # use std::io::Cursor;
/// # let input = Cursor::new(Vec::<u8>::new());
/// for value in Reader::new(input).unwrap() {
/// match value {
/// Ok(v) => println!("{:?}", v),
/// Err(e) => println!("Error: {}", e),
/// };
/// }
/// ```
pub struct Reader<'a, R> {
block: Block<R>,
reader_schema: Option<&'a Schema>,
errored: bool,
should_resolve_schema: bool,
}
impl<'a, R: Read> Reader<'a, R> {
/// Creates a `Reader` given something implementing the `io::Read` trait to read from.
/// No reader `Schema` will be set.
///
/// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
let block = Block::new(reader)?;
let reader = Reader {
block,
reader_schema: None,
errored: false,
should_resolve_schema: false,
};
Ok(reader)
}
/// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
/// to read from.
///
/// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
let block = Block::new(reader)?;
let mut reader = Reader {
block,
reader_schema: Some(schema),
errored: false,
should_resolve_schema: false,
};
// Check if the reader and writer schemas disagree.
reader.should_resolve_schema = reader.writer_schema() != schema;
Ok(reader)
}
/// Get a reference to the writer `Schema`.
#[inline]
pub fn writer_schema(&self) -> &Schema {
&self.block.writer_schema
}
/// Get a reference to the optional reader `Schema`.
#[inline]
pub fn reader_schema(&self) -> Option<&Schema> {
self.reader_schema
}
/// Get a reference to the user metadata
#[inline]
pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
&self.block.user_metadata
}
#[inline]
fn read_next(&mut self) -> AvroResult<Option<Value>> {
let read_schema = if self.should_resolve_schema {
self.reader_schema
} else {
None
};
self.block.read_next(read_schema)
}
}
impl<'a, R: Read> Iterator for Reader<'a, R> {
type Item = AvroResult<Value>;
fn next(&mut self) -> Option<Self::Item> {
// to prevent keep on reading after the first error occurs
if self.errored {
return None;
};
match self.read_next() {
Ok(opt) => opt.map(Ok),
Err(e) => {
self.errored = true;
Some(Err(e))
}
}
}
}
/// Decode a `Value` encoded in Avro format given its `Schema` and anything implementing `io::Read`
/// to read from.
///
/// In case a reader `Schema` is provided, schema resolution will also be performed.
///
/// **NOTE** This function has a quite small niche of usage and does NOT take care of reading the
/// header and consecutive data blocks; use [`Reader`](struct.Reader.html) if you don't know what
/// you are doing, instead.
pub fn from_avro_datum<R: Read>(
writer_schema: &Schema,
reader: &mut R,
reader_schema: Option<&Schema>,
) -> AvroResult<Value> {
let value = decode(writer_schema, reader)?;
match reader_schema {
Some(schema) => value.resolve(schema),
None => Ok(value),
}
}
pub struct GenericSingleObjectReader {
write_schema: ResolvedOwnedSchema,
expected_header: [u8; 10],
}
impl GenericSingleObjectReader {
pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
let fingerprint = schema.fingerprint::<Rabin>();
let expected_header = [
0xC3,
0x01,
fingerprint.bytes[0],
fingerprint.bytes[1],
fingerprint.bytes[2],
fingerprint.bytes[3],
fingerprint.bytes[4],
fingerprint.bytes[5],
fingerprint.bytes[6],
fingerprint.bytes[7],
];
Ok(GenericSingleObjectReader {
write_schema: ResolvedOwnedSchema::try_from(schema)?,
expected_header,
})
}
pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
let mut header: [u8; 10] = [0; 10];
match reader.read(&mut header) {
Ok(size) => {
if size == 10 && self.expected_header == header {
decode_internal(
self.write_schema.get_root_schema(),
self.write_schema.get_names(),
&None,
reader,
)
} else {
Err(Error::SingleObjectHeaderMismatch(
self.expected_header,
header,
))
}
}
Err(io_error) => Err(Error::ReadHeader(io_error)),
}
}
}
pub struct SpecificSingleObjectReader<T>
where
T: AvroSchema,
{
inner: GenericSingleObjectReader,
_model: PhantomData<T>,
}
impl<T> SpecificSingleObjectReader<T>
where
T: AvroSchema,
{
pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
Ok(SpecificSingleObjectReader {
inner: GenericSingleObjectReader::new(T::get_schema())?,
_model: PhantomData,
})
}
}
impl<T> SpecificSingleObjectReader<T>
where
T: AvroSchema + From<Value>,
{
pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
self.inner.read_value(reader).map(|v| v.into())
}
}
impl<T> SpecificSingleObjectReader<T>
where
T: AvroSchema + DeserializeOwned,
{
pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
from_value::<T>(&self.inner.read_value(reader)?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{encode::encode, from_value, types::Record, Reader};
use pretty_assertions::assert_eq;
use serde::Deserialize;
use std::io::Cursor;
const SCHEMA: &str = r#"
{
"type": "record",
"name": "test",
"fields": [
{
"name": "a",
"type": "long",
"default": 42
},
{
"name": "b",
"type": "string"
}
]
}
"#;
const UNION_SCHEMA: &str = r#"["null", "long"]"#;
const ENCODED: &[u8] = &[
79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
];
#[test]
fn test_from_avro_datum() {
let schema = Schema::parse_str(SCHEMA).unwrap();
let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
let mut record = Record::new(&schema).unwrap();
record.put("a", 27i64);
record.put("b", "foo");
let expected = record.into();
assert_eq!(
from_avro_datum(&schema, &mut encoded, None).unwrap(),
expected
);
}
#[test]
fn test_from_avro_datum_with_union_to_struct() {
const TEST_RECORD_SCHEMA_3240: &str = r#"
{
"type": "record",
"name": "test",
"fields": [
{
"name": "a",
"type": "long",
"default": 42
},
{
"name": "b",
"type": "string"
},
{
"name": "a_nullable_array",
"type": ["null", {"type": "array", "items": {"type": "string"}}],
"default": null
},
{
"name": "a_nullable_boolean",
"type": ["null", {"type": "boolean"}],
"default": null
},
{
"name": "a_nullable_string",
"type": ["null", {"type": "string"}],
"default": null
}
]
}
"#;
#[derive(Default, Debug, Deserialize, PartialEq)]
struct TestRecord3240 {
a: i64,
b: String,
a_nullable_array: Option<Vec<String>>,
// we are missing the 'a_nullable_boolean' field to simulate missing keys
// a_nullable_boolean: Option<bool>,
a_nullable_string: Option<String>,
}
let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240).unwrap();
let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
let expected_record: TestRecord3240 = TestRecord3240 {
a: 27i64,
b: String::from("foo"),
a_nullable_array: None,
a_nullable_string: None,
};
let avro_datum = from_avro_datum(&schema, &mut encoded, None).unwrap();
let parsed_record: TestRecord3240 = match &avro_datum {
Value::Record(_) => from_value::<TestRecord3240>(&avro_datum).unwrap(),
unexpected => panic!(
"could not map avro data to struct, found unexpected: {:?}",
unexpected
),
};
assert_eq!(parsed_record, expected_record);
}
#[test]
fn test_null_union() {
let schema = Schema::parse_str(UNION_SCHEMA).unwrap();
let mut encoded: &'static [u8] = &[2, 0];
assert_eq!(
from_avro_datum(&schema, &mut encoded, None).unwrap(),
Value::Union(1, Box::new(Value::Long(0)))
);
}
#[test]
fn test_reader_iterator() {
let schema = Schema::parse_str(SCHEMA).unwrap();
let reader = Reader::with_schema(&schema, ENCODED).unwrap();
let mut record1 = Record::new(&schema).unwrap();
record1.put("a", 27i64);
record1.put("b", "foo");
let mut record2 = Record::new(&schema).unwrap();
record2.put("a", 42i64);
record2.put("b", "bar");
let expected = vec![record1.into(), record2.into()];
for (i, value) in reader.enumerate() {
assert_eq!(value.unwrap(), expected[i]);
}
}
#[test]
fn test_reader_invalid_header() {
let schema = Schema::parse_str(SCHEMA).unwrap();
let invalid = ENCODED.iter().copied().skip(1).collect::<Vec<u8>>();
assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
}
#[test]
fn test_reader_invalid_block() {
let schema = Schema::parse_str(SCHEMA).unwrap();
let invalid = ENCODED
.iter()
.copied()
.rev()
.skip(19)
.collect::<Vec<u8>>()
.into_iter()
.rev()
.collect::<Vec<u8>>();
let reader = Reader::with_schema(&schema, &invalid[..]).unwrap();
for value in reader {
assert!(value.is_err());
}
}
#[test]
fn test_reader_empty_buffer() {
let empty = Cursor::new(Vec::new());
assert!(Reader::new(empty).is_err());
}
#[test]
fn test_reader_only_header() {
let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
let reader = Reader::new(&invalid[..]).unwrap();
for value in reader {
assert!(value.is_err());
}
}
#[test]
fn test_avro_3405_read_user_metadata_success() {
use crate::writer::Writer;
let schema = Schema::parse_str(SCHEMA).unwrap();
let mut writer = Writer::new(&schema, Vec::new());
let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
user_meta_data.insert(
"stringKey".to_string(),
"stringValue".to_string().into_bytes(),
);
user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
for (k, v) in user_meta_data.iter() {
writer.add_user_metadata(k.to_string(), v).unwrap();
}
let mut record = Record::new(&schema).unwrap();
record.put("a", 27i64);
record.put("b", "foo");
writer.append(record.clone()).unwrap();
writer.append(record.clone()).unwrap();
writer.flush().unwrap();
let result = writer.into_inner().unwrap();
let reader = Reader::new(&result[..]).unwrap();
assert_eq!(reader.user_metadata(), &user_meta_data);
}
#[derive(Deserialize, Clone, PartialEq, Debug)]
struct TestSingleObjectReader {
a: i64,
b: f64,
c: Vec<String>,
}
impl AvroSchema for TestSingleObjectReader {
fn get_schema() -> Schema {
let schema = r#"
{
"type":"record",
"name":"TestSingleObjectWrtierSerialize",
"fields":[
{
"name":"a",
"type":"long"
},
{
"name":"b",
"type":"double"
},
{
"name":"c",
"type":{
"type":"array",
"items":"string"
}
}
]
}
"#;
Schema::parse_str(schema).unwrap()
}
}
impl From<Value> for TestSingleObjectReader {
fn from(obj: Value) -> TestSingleObjectReader {
if let Value::Record(fields) = obj {
let mut a = None;
let mut b = None;
let mut c = vec![];
for (field_name, v) in fields {
match (field_name.as_str(), v) {
("a", Value::Long(i)) => a = Some(i),
("b", Value::Double(d)) => b = Some(d),
("c", Value::Array(v)) => {
for inner_val in v {
if let Value::String(s) = inner_val {
c.push(s);
}
}
}
(key, value) => panic!("Unexpected pair: {:?} -> {:?}", key, value),
}
}
TestSingleObjectReader {
a: a.unwrap(),
b: b.unwrap(),
c,
}
} else {
panic!("Expected a Value::Record but was {:?}", obj)
}
}
}
impl From<TestSingleObjectReader> for Value {
fn from(obj: TestSingleObjectReader) -> Value {
Value::Record(vec![
("a".into(), obj.a.into()),
("b".into(), obj.b.into()),
(
"c".into(),
Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
),
])
}
}
#[test]
fn test_avro_3507_single_object_reader() {
let obj = TestSingleObjectReader {
a: 42,
b: 3.33,
c: vec!["cat".into(), "dog".into()],
};
let mut to_read = Vec::<u8>::new();
to_read.extend_from_slice(&[0xC3, 0x01]);
to_read.extend_from_slice(
&TestSingleObjectReader::get_schema()
.fingerprint::<Rabin>()
.bytes[..],
);
encode(
&obj.clone().into(),
&TestSingleObjectReader::get_schema(),
&mut to_read,
)
.expect("Encode should succeed");
let mut to_read = &to_read[..];
let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
.expect("Schema should resolve");
let val = generic_reader
.read_value(&mut to_read)
.expect("Should read");
let expected_value: Value = obj.into();
assert_eq!(expected_value, val);
}
#[test]
fn test_avro_3507_reader_parity() {
let obj = TestSingleObjectReader {
a: 42,
b: 3.33,
c: vec!["cat".into(), "dog".into()],
};
let mut to_read = Vec::<u8>::new();
to_read.extend_from_slice(&[0xC3, 0x01]);
to_read.extend_from_slice(
&TestSingleObjectReader::get_schema()
.fingerprint::<Rabin>()
.bytes[..],
);
encode(
&obj.clone().into(),
&TestSingleObjectReader::get_schema(),
&mut to_read,
)
.expect("Encode should succeed");
let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
.expect("Schema should resolve");
let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
.expect("schema should resolve");
let mut to_read1 = &to_read[..];
let mut to_read2 = &to_read[..];
let mut to_read3 = &to_read[..];
let val = generic_reader
.read_value(&mut to_read1)
.expect("Should read");
let read_obj1 = specific_reader
.read_from_value(&mut to_read2)
.expect("Should read from value");
let read_obj2 = specific_reader
.read(&mut to_read3)
.expect("Should read from deserilize");
let expected_value: Value = obj.clone().into();
assert_eq!(obj, read_obj1);
assert_eq!(obj, read_obj2);
assert_eq!(val, expected_value)
}
#[cfg(not(feature = "snappy"))]
#[test]
fn test_avro_3549_read_not_enabled_codec() {
let snappy_compressed_avro = vec![
79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
];
if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
} else {
panic!("Expected an error in the reading of the codec!");
}
}
}