blob: fa12fcf0710cf863dd47d361eb13c610bc006e7b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
package Avro::BinaryDecoder;
use strict;
use warnings;
use Config;
use Encode();
use Error::Simple;
use Avro::Schema;
our $VERSION = '++MODULE_VERSION++';
our $complement = ~0x7F;
unless ($Config{use64bitint}) {
require Math::BigInt;
$complement = Math::BigInt->new("0b" . ("1" x 57) . ("0" x 7));
}
=head2 decode(%param)
Resolve the given writer and reader_schema to decode the data provided by the
reader.
=over 4
=item * writer_schema
The schema that was used to encode the data provided by the C<reader>
=item * reader_schema
The schema we want to use to decode the data.
=item * reader
An object implementing a straightforward interface. C<read($buf, $nbytes)> and
C<seek($nbytes, $whence)> are expected. Typically a IO::String object or a
IO::File object. It is expected that this calls will block the decoder, if not
enough data is available for read.
=back
=cut
sub decode {
my $class = shift;
my %param = @_;
my ($writer_schema, $reader_schema, $reader)
= @param{qw/writer_schema reader_schema reader/};
my $type = Avro::Schema->match(
writer => $writer_schema,
reader => $reader_schema,
) or throw Avro::Schema::Error::Mismatch;
my $meth = "decode_$type";
return $class->$meth($writer_schema, $reader_schema, $reader);
}
sub skip {
my $class = shift;
my ($schema, $reader) = @_;
my $type = ref $schema ? $schema->type : $schema;
my $meth = "skip_$type";
return $class->$meth($schema, $reader);
}
sub decode_null { undef }
sub skip_boolean { &decode_boolean }
sub decode_boolean {
my $class = shift;
my $reader = pop;
$reader->read(my $bool, 1);
return unpack 'C', $bool;
}
sub skip_int { &decode_int }
sub decode_int {
my $class = shift;
my $reader = pop;
return zigzag(unsigned_varint($reader));
}
sub skip_long { &decode_long };
sub decode_long {
my $class = shift;
return decode_int($class, @_);
}
sub skip_float { &decode_float }
sub decode_float {
my $class = shift;
my $reader = pop;
$reader->read(my $buf, 4);
return unpack "f<", $buf;
}
sub skip_double { &decode_double }
sub decode_double {
my $class = shift;
my $reader = pop;
$reader->read(my $buf, 8);
return unpack "d<", $buf,
}
sub skip_bytes {
my $class = shift;
my $reader = pop;
my $size = decode_long($class, undef, undef, $reader);
$reader->seek($size, 0);
return;
}
sub decode_bytes {
my $class = shift;
my $reader = pop;
my $size = decode_long($class, undef, undef, $reader);
$reader->read(my $buf, $size);
return $buf;
}
sub skip_string { &skip_bytes }
sub decode_string {
my $class = shift;
my $reader = pop;
my $bytes = decode_bytes($class, undef, undef, $reader);
return Encode::decode_utf8($bytes);
}
sub skip_record {
my $class = shift;
my ($schema, $reader) = @_;
for my $field (@{ $schema->fields }){
skip($class, $field->{type}, $reader);
}
}
## 1.3.2 A record is encoded by encoding the values of its fields in the order
## that they are declared. In other words, a record is encoded as just the
## concatenation of the encodings of its fields. Field values are encoded per
## their schema.
sub decode_record {
my $class = shift;
my ($writer_schema, $reader_schema, $reader) = @_;
my $record;
my %extra_fields = %{ $reader_schema->fields_as_hash };
for my $field (@{ $writer_schema->fields }) {
my $name = $field->{name};
my $w_field_schema = $field->{type};
my $r_field_schema = delete $extra_fields{$name};
## 1.3.2 if the writer's record contains a field with a name not
## present in the reader's record, the writer's value for that field
## is ignored.
if (! $r_field_schema) {
$class->skip($w_field_schema, $reader);
next;
}
my $data = $class->decode(
writer_schema => $w_field_schema,
reader_schema => $r_field_schema->{type},
reader => $reader,
);
$record->{ $name } = $data;
}
for my $name (keys %extra_fields) {
## 1.3.2. if the reader's record schema has a field with no default
## value, and writer's schema does not have a field with the same
## name, an error is signalled.
unless (exists $extra_fields{$name}->{default}) {
throw Avro::Schema::Error::Mismatch(
"cannot resolve without default"
);
}
## 1.3.2 ... else the default value is used
$record->{ $name } = $extra_fields{$name}->{default};
}
return $record;
}
sub skip_enum { &skip_int }
## 1.3.2 An enum is encoded by a int, representing the zero-based position of
## the symbol in the schema.
sub decode_enum {
my $class = shift;
my ($writer_schema, $reader_schema, $reader) = @_;
my $index = decode_int($class, @_);
my $w_data = $writer_schema->symbols->[$index];
## 1.3.2 if the writer's symbol is not present in the reader's enum,
## then an error is signalled.
throw Avro::Schema::Error::Mismatch("enum unknown")
unless $reader_schema->is_data_valid($w_data);
return $w_data;
}
sub skip_block {
my $class = shift;
my ($reader, $block_content) = @_;
my $block_count = decode_long($class, undef, undef, $reader);
while ($block_count) {
if ($block_count < 0) {
$reader->seek($block_count, 0);
next;
}
else {
for (1..$block_count) {
$block_content->();
}
}
$block_count = decode_long($class, undef, undef, $reader);
}
}
sub skip_array {
my $class = shift;
my ($schema, $reader) = @_;
skip_block($reader, sub { $class->skip($schema->items, $reader) });
}
## 1.3.2 Arrays are encoded as a series of blocks. Each block consists of a
## long count value, followed by that many array items. A block with count zero
## indicates the end of the array. Each item is encoded per the array's item
## schema.
## If a block's count is negative, its absolute value is used, and the count is
## followed immediately by a long block size
sub decode_array {
my $class = shift;
my ($writer_schema, $reader_schema, $reader) = @_;
my $block_count = decode_long($class, @_);
my @array;
my $writer_items = $writer_schema->items;
my $reader_items = $reader_schema->items;
while ($block_count) {
my $block_size;
if ($block_count < 0) {
$block_count = -$block_count;
$block_size = decode_long($class, @_);
## XXX we can skip with $reader_schema?
}
for (1..$block_count) {
push @array, $class->decode(
writer_schema => $writer_items,
reader_schema => $reader_items,
reader => $reader,
);
}
$block_count = decode_long($class, @_);
}
return \@array;
}
sub skip_map {
my $class = shift;
my ($schema, $reader) = @_;
skip_block($reader, sub {
skip_string($class, $reader);
$class->skip($schema->values, $reader);
});
}
## 1.3.2 Maps are encoded as a series of blocks. Each block consists of a long
## count value, followed by that many key/value pairs. A block with count zero
## indicates the end of the map. Each item is encoded per the map's value
## schema.
##
## If a block's count is negative, its absolute value is used, and the count is
## followed immediately by a long block size indicating the number of bytes in
## the block. This block size permits fast skipping through data, e.g., when
## projecting a record to a subset of its fields.
sub decode_map {
my $class = shift;
my ($writer_schema, $reader_schema, $reader) = @_;
my %hash;
my $block_count = decode_long($class, @_);
my $writer_values = $writer_schema->values;
my $reader_values = $reader_schema->values;
while ($block_count) {
my $block_size;
if ($block_count < 0) {
$block_count = -$block_count;
$block_size = decode_long($class, @_);
## XXX we can skip with $reader_schema?
}
for (1..$block_count) {
my $key = decode_string($class, @_);
unless (defined $key && length $key) {
throw Avro::Schema::Error::Parse("key of map is invalid");
}
$hash{$key} = $class->decode(
writer_schema => $writer_values,
reader_schema => $reader_values,
reader => $reader,
);
}
$block_count = decode_long($class, @_);
}
return \%hash;
}
sub skip_union {
my $class = shift;
my ($schema, $reader) = @_;
my $idx = decode_long($class, undef, undef, $reader);
my $union_schema = $schema->schemas->[$idx]
or throw Avro::Schema::Error::Parse("union union member");
$class->skip($union_schema, $reader);
}
## 1.3.2 A union is encoded by first writing an int value indicating the
## zero-based position within the union of the schema of its value. The value
## is then encoded per the indicated schema within the union.
sub decode_union {
my $class = shift;
my ($writer_schema, $reader_schema, $reader) = @_;
my $idx = decode_long($class, @_);
my $union_schema = $writer_schema->schemas->[$idx];
## XXX TODO: schema resolution
# The first schema in the reader's union that matches the selected writer's
# union schema is recursively resolved against it. if none match, an error
# is signalled.
return $class->decode(
reader_schema => $union_schema,
writer_schema => $union_schema,
reader => $reader,
);
}
sub skip_fixed {
my $class = shift;
my ($schema, $reader) = @_;
$reader->seek($schema->size, 0);
}
## 1.3.2 Fixed instances are encoded using the number of bytes declared in the
## schema.
sub decode_fixed {
my $class = shift;
my ($writer_schema, $reader_schema, $reader) = @_;
$reader->read(my $buf, $writer_schema->size);
return $buf;
}
sub zigzag {
my $int = shift;
if (1 & $int) {
## odd values are encoded negative ints
return -( 1 + ($int >> 1) );
}
## even values are positive natural left shifted one bit
else {
return $int >> 1;
}
}
sub unsigned_varint {
my $reader = shift;
my $int = 0;
my $more;
my $shift = 0;
do {
$reader->read(my $buf, 1);
my $byte = ord $buf;
my $value = $byte & 0x7F;
$int |= $value << $shift;
$shift += 7;
$more = $byte & 0x80;
} until (! $more);
return $int;
}
1;