Add support for logical types (#111)

* Added some missing serialization fields for the `Schema::Record`
variant.

* Implemented parsing canonical form of schemas.

* Remove redundant `pcf_float` function. Move `use` to module scope
instead of function scope.

* Added full union support

* Verify union has unique types. O(1) implementation of finding (usize,
    &Schema) for a given value in a union.

* Improved `PartialEq` performance. Implemented schema resolution for the
case where the writer is a union and the reader is not.

* Added some more testing for full union support

* WIP: Partial support for the `logicalTypes` described in the spec

* Fix complex parsing and error handling

* Implement Duration, Decimal and UUID

* Fix some validation assumptions

* Encode decimals optionally as bigint

* Add optional safe-uuid and safe-decimal features

* Fill out tests

* Clean up types.rs

* Fix decimal encoding

* Abstract over uuid

* Use crate

* Impl From for decimal

* Tighten up the public API

* Add test for timestamp micros

* Add de test for scalar logical types

* Formatting

* Docs

* More formatting

* No need for Eq on Schema

* Split logical scalar types out into separate modules

* Make safe features the only implementation

* Fix up merge issues

* Tiny binding cleanup

* Add ToAvro impl for uuid, decimal and duration

* Remove dead code

* use num_bigint::BigInt

* Expose duration field types in lib.rs

* Add logical type example to README.md and src/lib.rs

* Print out the records in the logical types example

* Use the exact same code path for encoding underlying value when encoding decimals

* Expand match in decimal decoding to provide a better error message

* Fix size in examples

* Remove incorrect apostrophe

* Add tests for negative decimal values and larger than necessary fixed size bytes

* Make sure the test hits endianness

* Make sign extension fallible

Co-authored-by: Jason Boatman <jason.boatman@multiscalehn.com>
11 files changed
tree: dea9a364e2134a3de796b6d1e2b736530d0ecaf3
  1. benches/
  2. examples/
  3. scripts/
  4. src/
  5. tests/
  6. .activate.sh
  7. .deactivate.sh
  8. .gitignore
  9. .pre-commit-config.yaml
  10. .requirements-precommit.txt
  11. .travis.yml
  12. Cargo.toml
  13. CHANGELOG.md
  14. LICENSE
  15. Makefile
  16. README.md
README.md

avro-rs

Latest Version Build Status MIT licensed

Documentation

A library for working with Apache Avro in Rust.

Please check our documentation for examples, tutorials and API reference.

We also support:

Example

Add to your Cargo.toml:

[dependencies]
avro-rs = "0.6"
failure = "0.1.5"
serde = { version = "1.0", features = ["derive"] }

Then try to write and read in Avro format like below:

use avro_rs::{Codec, Reader, Schema, Writer, from_value, types::Record};
use failure::Error;
use serde::{Serialize, Deserialize};

#[derive(Debug, Deserialize, Serialize)]
struct Test {
    a: i64,
    b: String,
}

fn main() -> Result<(), Error> {
    let raw_schema = r#"
        {
            "type": "record",
            "name": "test",
            "fields": [
                {"name": "a", "type": "long", "default": 42},
                {"name": "b", "type": "string"}
            ]
        }
    "#;

    let schema = Schema::parse_str(raw_schema)?;

    println!("{:?}", schema);

    let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);

    let mut record = Record::new(writer.schema()).unwrap();
    record.put("a", 27i64);
    record.put("b", "foo");

    writer.append(record)?;

    let test = Test {
        a: 27,
        b: "foo".to_owned(),
    };

    writer.append_ser(test)?;

    writer.flush()?;

    let input = writer.into_inner();
    let reader = Reader::with_schema(&schema, &input[..])?;

    for record in reader {
        println!("{:?}", from_value::<Test>(&record?));
    }
    Ok(())
}

Calculate Avro schema fingerprint

This library supports calculating the following fingerprints:

  • SHA-256
  • MD5

Note: Rabin fingerprinting is NOT SUPPORTED yet.

An example of fingerprinting for the supported fingerprints:

use avro_rs::Schema;
use failure::Error;
use md5::Md5;
use sha2::Sha256;

fn main() -> Result<(), Error> {
    let raw_schema = r#"
        {
            "type": "record",
            "name": "test",
            "fields": [
                {"name": "a", "type": "long", "default": 42},
                {"name": "b", "type": "string"}
            ]
        }
    "#;
    let schema = Schema::parse_str(raw_schema)?;
    println!("{}", schema.fingerprint::<Sha256>());
    println!("{}", schema.fingerprint::<Md5>());
    Ok(())
}

Ill-formed data

In order to ease decoding, the Binary Encoding specification of Avro data requires some fields to have their length encoded alongside the data.

If encoded data passed to a Reader has been ill-formed, it can happen that the bytes meant to contain the length of data are bogus and could result in extravagant memory allocation.

To shield users from ill-formed data, avro-rs sets a limit (default: 512MB) to any allocation it will perform when decoding data.

If you expect some of your data fields to be larger than this limit, be sure to make use of the max_allocation_bytes function before reading any data (we leverage Rust's std::sync::Once mechanism to initialize this value, if any call to decode is made before a call to max_allocation_bytes, the limit will be 512MB throughout the lifetime of the program).

use avro_rs::max_allocation_bytes;


fn main() {
    max_allocation_bytes(2 * 1024 * 1024 * 1024);  // 2GB

    // ... happily decode large data
}

Logical types

use avro_rs::{
    types::Record, types::Value, Codec, Days, Decimal, Duration, Millis, Months, Reader, Schema,
    Writer,
};
use num_bigint::ToBigInt;
use failure::Error;

fn main() -> Result<(), Error> {
    let raw_schema = r#"
    {
      "type": "record",
      "name": "test",
      "fields": [
        {
          "name": "decimal_fixed",
          "type": {
            "type": "fixed",
            "size": 2,
            "name": "decimal"
          },
          "logicalType": "decimal",
          "precision": 4,
          "scale": 2
        },
        {
          "name": "decimal_var",
          "type": "bytes",
          "logicalType": "decimal",
          "precision": 10,
          "scale": 3
        },
        {
          "name": "uuid",
          "type": "string",
          "logicalType": "uuid"
        },
        {
          "name": "date",
          "type": "int",
          "logicalType": "date"
        },
        {
          "name": "time_millis",
          "type": "int",
          "logicalType": "time-millis"
        },
        {
          "name": "time_micros",
          "type": "long",
          "logicalType": "time-micros"
        },
        {
          "name": "timestamp_millis",
          "type": "long",
          "logicalType": "timestamp-millis"
        },
        {
          "name": "timestamp_micros",
          "type": "long",
          "logicalType": "timestamp-micros"
        },
        {
          "name": "duration",
          "type": {
            "type": "fixed",
            "size": 12,
            "name": "duration"
          },
          "logicalType": "duration"
        }
      ]
    }
    "#;

    let schema = Schema::parse_str(raw_schema)?;

    println!("{:?}", schema);

    let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);

    let mut record = Record::new(writer.schema()).unwrap();
    record.put("decimal_fixed", Decimal::from(9936.to_bigint().unwrap().to_signed_bytes_be()));
    record.put("decimal_var", Decimal::from((-32442.to_bigint().unwrap()).to_signed_bytes_be()));
    record.put("uuid", uuid::Uuid::new_v4());
    record.put("date", Value::Date(1));
    record.put("time_millis", Value::TimeMillis(2));
    record.put("time_micros", Value::TimeMicros(3));
    record.put("timestamp_millis", Value::TimestampMillis(4));
    record.put("timestamp_micros", Value::TimestampMicros(5));
    record.put("duration", Duration::new(Months::new(6), Days::new(7), Millis::new(8)));

    writer.append(record)?;
    writer.flush()?;

    let input = writer.into_inner();
    let reader = Reader::with_schema(&schema, &input[..])?;

    for record in reader {
        println!("{:?}", record?);
    }
    Ok(())
}

License

This project is licensed under MIT License. Please note that this is not an official project maintained by Apache Avro.

Contributing

Everyone is encouraged to contribute! You can contribute by forking the GitHub repo and making a pull request or opening an issue. All contributions will be licensed under MIT License.