blob: d476f4b4a2fa47cb14875519d02d6f04e89b66ff [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
package Avro::BinaryEncoder;
use strict;
use warnings;
use Config;
use Encode();
use Error::Simple;
use Regexp::Common qw(number);
our $VERSION = '++MODULE_VERSION++';
our $max64;
our $complement = ~0x7F;
if ($Config{use64bitint}) {
$max64 = 9223372036854775807;
}
else {
require Math::BigInt;
$complement = Math::BigInt->new("0b" . ("1" x 57) . ("0" x 7));
$max64 = Math::BigInt->new("0b0" . ("1" x 63));
}
=head2 encode(%param)
Encodes the given C<data> according to the given C<schema>, and pass it
to the C<emit_cb>
Params are:
=over 4
=item * data
The data to encode (can be any perl data structure, but it should match
schema)
=item * schema
The schema to use to encode C<data>
=item * emit_cb($byte_ref)
The callback that will be invoked with the a reference to the encoded data
in parameters.
=back
=cut
sub encode {
my $class = shift;
my %param = @_;
my ($schema, $data, $cb) = @param{qw/schema data emit_cb/};
## a schema can also be just a string
my $type = ref $schema ? $schema->type : $schema;
## might want to profile and optimize this
my $meth = "encode_$type";
$class->$meth($schema, $data, $cb);
return;
}
sub encode_null {
$_[3]->(\'');
}
sub encode_boolean {
my $class = shift;
my ($schema, $data, $cb) = @_;
$cb->( $data ? \"\x1" : \"\x0" );
}
sub encode_int {
my $class = shift;
my ($schema, $data, $cb) = @_;
if ($data !~ /^$RE{num}{int}$/) {
throw Avro::BinaryEncoder::Error("cannot convert '$data' to integer");
}
if (abs($data) > 0x7fffffff) {
throw Avro::BinaryEncoder::Error("int ($data) should be <= 32bits");
}
my $enc = unsigned_varint(zigzag($data));
$cb->(\$enc);
}
sub encode_long {
my $class = shift;
my ($schema, $data, $cb) = @_;
if ($data !~ /^$RE{num}{int}$/) {
throw Avro::BinaryEncoder::Error("cannot convert '$data' to long integer");
}
if (abs($data) > $max64) {
throw Avro::BinaryEncoder::Error("int ($data) should be <= 64bits");
}
my $enc = unsigned_varint(zigzag($data));
$cb->(\$enc);
}
sub encode_float {
my $class = shift;
my ($schema, $data, $cb) = @_;
my $enc = pack "f<", $data;
$cb->(\$enc);
}
sub encode_double {
my $class = shift;
my ($schema, $data, $cb) = @_;
my $enc = pack "d<", $data;
$cb->(\$enc);
}
sub encode_bytes {
my $class = shift;
my ($schema, $data, $cb) = @_;
encode_long($class, undef, bytes::length($data), $cb);
$cb->(\$data);
}
sub encode_string {
my $class = shift;
my ($schema, $data, $cb) = @_;
my $bytes = Encode::encode_utf8($data);
encode_long($class, undef, bytes::length($bytes), $cb);
$cb->(\$bytes);
}
## 1.3.2 A record is encoded by encoding the values of its fields in the order
## that they are declared. In other words, a record is encoded as just the
## concatenation of the encodings of its fields. Field values are encoded per
## their schema.
sub encode_record {
my $class = shift;
my ($schema, $data, $cb) = @_;
for my $field (@{ $schema->fields }) {
$class->encode(
schema => $field->{type},
data => $data->{ $field->{name} },
emit_cb => $cb,
);
}
}
## 1.3.2 An enum is encoded by a int, representing the zero-based position of
## the symbol in the schema.
sub encode_enum {
my $class = shift;
my ($schema, $data, $cb) = @_;
my $symbols = $schema->symbols_as_hash;
my $pos = $symbols->{ $data };
throw Avro::BinaryEncoder::Error("Cannot find enum $data")
unless defined $pos;
$class->encode_int(undef, $pos, $cb);
}
## 1.3.2 Arrays are encoded as a series of blocks. Each block consists of a
## long count value, followed by that many array items. A block with count zero
## indicates the end of the array. Each item is encoded per the array's item
## schema.
## If a block's count is negative, its absolute value is used, and the count is
## followed immediately by a long block size
## maybe here it would be worth configuring what a typical block size should be
sub encode_array {
my $class = shift;
my ($schema, $data, $cb) = @_;
## FIXME: multiple blocks
if (@$data) {
$class->encode_long(undef, scalar @$data, $cb);
for (@$data) {
$class->encode(
schema => $schema->items,
data => $_,
emit_cb => $cb,
);
}
}
## end of the only block
$class->encode_long(undef, 0, $cb);
}
## 1.3.2 Maps are encoded as a series of blocks. Each block consists of a long
## count value, followed by that many key/value pairs. A block with count zero
## indicates the end of the map. Each item is encoded per the map's value
## schema.
##
## (TODO)
## If a block's count is negative, its absolute value is used, and the count is
## followed immediately by a long block size indicating the number of bytes in
## the block. This block size permits fast skipping through data, e.g., when
## projecting a record to a subset of its fields.
sub encode_map {
my $class = shift;
my ($schema, $data, $cb) = @_;
my @keys = keys %$data;
if (@keys) {
$class->encode_long(undef, scalar @keys, $cb);
for (@keys) {
## the key
$class->encode_string(undef, $_, $cb);
## the value
$class->encode(
schema => $schema->values,
data => $data->{$_},
emit_cb => $cb,
);
}
}
## end of the only block
$class->encode_long(undef, 0, $cb);
}
## 1.3.2 A union is encoded by first writing an int value indicating the
## zero-based position within the union of the schema of its value. The value
## is then encoded per the indicated schema within the union.
sub encode_union {
my $class = shift;
my ($schema, $data, $cb) = @_;
my $idx = 0;
my $elected_schema;
for my $inner_schema (@{$schema->schemas}) {
if ($inner_schema->is_data_valid($data)) {
$elected_schema = $inner_schema;
last;
}
$idx++;
}
unless ($elected_schema) {
throw Avro::BinaryEncoder::Error("union cannot validate the data");
}
$class->encode_long(undef, $idx, $cb);
$class->encode(
schema => $elected_schema,
data => $data,
emit_cb => $cb,
);
}
## 1.3.2 Fixed instances are encoded using the number of bytes declared in the
## schema.
sub encode_fixed {
my $class = shift;
my ($schema, $data, $cb) = @_;
if (bytes::length $data != $schema->size) {
my $s1 = bytes::length $data;
my $s2 = $schema->size;
throw Avro::BinaryEncoder::Error("Fixed size doesn't match $s1!=$s2");
}
$cb->(\$data);
}
sub zigzag {
use warnings FATAL => 'numeric';
if ( $_[0] >= 0 ) {
return $_[0] << 1;
}
return (($_[0] << 1) ^ -1) | 0x1;
}
sub unsigned_varint {
my @bytes;
while ($_[0] & $complement) { # mask with continuation bit
push @bytes, ($_[0] & 0x7F) | 0x80; # out and set continuation bit
$_[0] >>= 7; # next please
}
push @bytes, $_[0]; # last byte
return pack "C*", @bytes;
}
package Avro::BinaryEncoder::Error;
use parent 'Error::Simple';
1;