| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| module Avro |
| module IO |
| # Raised when datum is not an example of schema |
| class AvroTypeError < AvroError |
| def initialize(expected_schema, datum) |
| super("The datum #{datum.inspect} is not an example of schema #{expected_schema}") |
| end |
| end |
| |
| # Raised when writer's and reader's schema do not match |
| class SchemaMatchException < AvroError |
| def initialize(writers_schema, readers_schema) |
| super("Writer's schema #{writers_schema} and Reader's schema " + |
| "#{readers_schema} do not match.") |
| end |
| end |
| |
| # FIXME(jmhodges) move validate to this module? |
| |
| class BinaryDecoder |
| # Read leaf values |
| |
| # reader is an object on which we can call read, seek and tell. |
| attr_reader :reader |
| def initialize(reader) |
| @reader = reader |
| end |
| |
| def byte! |
| @reader.readbyte |
| end |
| |
| def read_null |
| # null is written as zero byte's |
| nil |
| end |
| |
| def read_boolean |
| byte! == 1 |
| end |
| |
| def read_int; read_long; end |
| |
| def read_long |
| # int and long values are written using variable-length, |
| # zig-zag coding. |
| b = byte! |
| n = b & 0x7F |
| shift = 7 |
| while (b & 0x80) != 0 |
| b = byte! |
| n |= (b & 0x7F) << shift |
| shift += 7 |
| end |
| (n >> 1) ^ -(n & 1) |
| end |
| |
| def read_float |
| # A float is written as 4 bytes. |
| # The float is converted into a 32-bit integer using a method |
| # equivalent to Java's floatToIntBits and then encoded in |
| # little-endian format. |
| read_and_unpack(4, 'e'.freeze) |
| end |
| |
| def read_double |
| # A double is written as 8 bytes. |
| # The double is converted into a 64-bit integer using a method |
| # equivalent to Java's doubleToLongBits and then encoded in |
| # little-endian format. |
| read_and_unpack(8, 'E'.freeze) |
| end |
| |
| def read_bytes |
| # Bytes are encoded as a long followed by that many bytes of |
| # data. |
| read(read_long) |
| end |
| |
| def read_string |
| # A string is encoded as a long followed by that many bytes of |
| # UTF-8 encoded character data. |
| read_bytes.tap do |string| |
| string.force_encoding('UTF-8'.freeze) if string.respond_to? :force_encoding |
| end |
| end |
| |
| def read(len) |
| # Read n bytes |
| @reader.read(len) |
| end |
| |
| def skip_null |
| nil |
| end |
| |
| def skip_boolean |
| skip(1) |
| end |
| |
| def skip_int |
| skip_long |
| end |
| |
| def skip_long |
| b = byte! |
| while (b & 0x80) != 0 |
| b = byte! |
| end |
| end |
| |
| def skip_float |
| skip(4) |
| end |
| |
| def skip_double |
| skip(8) |
| end |
| |
| def skip_bytes |
| skip(read_long) |
| end |
| |
| def skip_string |
| skip_bytes |
| end |
| |
| def skip(n) |
| reader.seek(reader.tell() + n) |
| end |
| |
| private |
| |
| # Optimize unpacking strings when `unpack1` is available (ruby >= 2.4) |
| if String.instance_methods.include?(:unpack1) |
| |
| def read_and_unpack(byte_count, format) |
| @reader.read(byte_count).unpack1(format) |
| end |
| |
| else |
| |
| def read_and_unpack(byte_count, format) |
| @reader.read(byte_count).unpack(format)[0] |
| end |
| |
| end |
| end |
| |
| # Write leaf values |
| class BinaryEncoder |
| attr_reader :writer |
| |
| def initialize(writer) |
| @writer = writer |
| end |
| |
| # null is written as zero bytes |
| def write_null(_datum) |
| nil |
| end |
| |
| # a boolean is written as a single byte |
| # whose value is either 0 (false) or 1 (true). |
| def write_boolean(datum) |
| on_disk = datum ? 1.chr : 0.chr |
| writer.write(on_disk) |
| end |
| |
| # int and long values are written using variable-length, |
| # zig-zag coding. |
| def write_int(n) |
| write_long(n) |
| end |
| |
| # int and long values are written using variable-length, |
| # zig-zag coding. |
| def write_long(n) |
| n = (n << 1) ^ (n >> 63) |
| while (n & ~0x7F) != 0 |
| @writer.write(((n & 0x7f) | 0x80).chr) |
| n >>= 7 |
| end |
| @writer.write(n.chr) |
| end |
| |
| # A float is written as 4 bytes. |
| # The float is converted into a 32-bit integer using a method |
| # equivalent to Java's floatToIntBits and then encoded in |
| # little-endian format. |
| def write_float(datum) |
| @writer.write([datum].pack('e'.freeze)) |
| end |
| |
| # A double is written as 8 bytes. |
| # The double is converted into a 64-bit integer using a method |
| # equivalent to Java's doubleToLongBits and then encoded in |
| # little-endian format. |
| def write_double(datum) |
| @writer.write([datum].pack('E'.freeze)) |
| end |
| |
| # Bytes are encoded as a long followed by that many bytes of data. |
| def write_bytes(datum) |
| write_long(datum.bytesize) |
| @writer.write(datum) |
| end |
| |
| # A string is encoded as a long followed by that many bytes of |
| # UTF-8 encoded character data |
| def write_string(datum) |
| datum = datum.encode('utf-8'.freeze) if datum.respond_to? :encode |
| write_bytes(datum) |
| end |
| |
| # Write an arbritary datum. |
| def write(datum) |
| writer.write(datum) |
| end |
| end |
| |
| class DatumReader |
| def self.match_schemas(writers_schema, readers_schema) |
| Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema) |
| end |
| |
| attr_accessor :writers_schema, :readers_schema |
| |
| def initialize(writers_schema=nil, readers_schema=nil) |
| @writers_schema = writers_schema |
| @readers_schema = readers_schema |
| end |
| |
| def read(decoder) |
| self.readers_schema = writers_schema unless readers_schema |
| read_data(writers_schema, readers_schema, decoder) |
| end |
| |
| def read_data(writers_schema, readers_schema, decoder) |
| # schema matching |
| unless self.class.match_schemas(writers_schema, readers_schema) |
| raise SchemaMatchException.new(writers_schema, readers_schema) |
| end |
| |
| # schema resolution: reader's schema is a union, writer's |
| # schema is not |
| if writers_schema.type_sym != :union && readers_schema.type_sym == :union |
| rs = readers_schema.schemas.find{|s| |
| self.class.match_schemas(writers_schema, s) |
| } |
| return read_data(writers_schema, rs, decoder) if rs |
| raise SchemaMatchException.new(writers_schema, readers_schema) |
| end |
| |
| # function dispatch for reading data based on type of writer's |
| # schema |
| datum = case writers_schema.type_sym |
| when :null; decoder.read_null |
| when :boolean; decoder.read_boolean |
| when :string; decoder.read_string |
| when :int; decoder.read_int |
| when :long; decoder.read_long |
| when :float; decoder.read_float |
| when :double; decoder.read_double |
| when :bytes; decoder.read_bytes |
| when :fixed; read_fixed(writers_schema, readers_schema, decoder) |
| when :enum; read_enum(writers_schema, readers_schema, decoder) |
| when :array; read_array(writers_schema, readers_schema, decoder) |
| when :map; read_map(writers_schema, readers_schema, decoder) |
| when :union; read_union(writers_schema, readers_schema, decoder) |
| when :record, :error, :request; read_record(writers_schema, readers_schema, decoder) |
| else |
| raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}" |
| end |
| |
| readers_schema.type_adapter.decode(datum) |
| end |
| |
| def read_fixed(writers_schema, _readers_schema, decoder) |
| decoder.read(writers_schema.size) |
| end |
| |
| def read_enum(writers_schema, readers_schema, decoder) |
| index_of_symbol = decoder.read_int |
| read_symbol = writers_schema.symbols[index_of_symbol] |
| |
| if !readers_schema.symbols.include?(read_symbol) && readers_schema.default |
| read_symbol = readers_schema.default |
| end |
| |
| # This implementation deviates from the spec by always returning |
| # a symbol. |
| read_symbol |
| end |
| |
| def read_array(writers_schema, readers_schema, decoder) |
| read_items = [] |
| block_count = decoder.read_long |
| while block_count != 0 |
| if block_count < 0 |
| block_count = -block_count |
| _block_size = decoder.read_long |
| end |
| block_count.times do |
| read_items << read_data(writers_schema.items, |
| readers_schema.items, |
| decoder) |
| end |
| block_count = decoder.read_long |
| end |
| |
| read_items |
| end |
| |
| def read_map(writers_schema, readers_schema, decoder) |
| read_items = {} |
| block_count = decoder.read_long |
| while block_count != 0 |
| if block_count < 0 |
| block_count = -block_count |
| _block_size = decoder.read_long |
| end |
| block_count.times do |
| key = decoder.read_string |
| read_items[key] = read_data(writers_schema.values, |
| readers_schema.values, |
| decoder) |
| end |
| block_count = decoder.read_long |
| end |
| |
| read_items |
| end |
| |
| def read_union(writers_schema, readers_schema, decoder) |
| index_of_schema = decoder.read_long |
| selected_writers_schema = writers_schema.schemas[index_of_schema] |
| |
| read_data(selected_writers_schema, readers_schema, decoder) |
| end |
| |
| def read_record(writers_schema, readers_schema, decoder) |
| readers_fields_hash = readers_schema.fields_hash |
| read_record = {} |
| writers_schema.fields.each do |field| |
| readers_field = readers_fields_hash[field.name] |
| if readers_field |
| field_val = read_data(field.type, readers_field.type, decoder) |
| read_record[field.name] = field_val |
| elsif readers_schema.fields_by_alias.key?(field.name) |
| readers_field = readers_schema.fields_by_alias[field.name] |
| field_val = read_data(field.type, readers_field.type, decoder) |
| read_record[readers_field.name] = field_val |
| else |
| skip_data(field.type, decoder) |
| end |
| end |
| |
| # fill in the default values |
| readers_fields_hash.each do |field_name, field| |
| next if read_record.key?(field_name) |
| |
| if field.default? |
| field_val = read_default_value(field.type, field.default) |
| read_record[field.name] = field_val |
| else |
| raise AvroError, "Missing data for #{field.type} with no default" |
| end |
| end |
| |
| read_record |
| end |
| |
| def read_default_value(field_schema, default_value) |
| # Basically a JSON Decoder? |
| case field_schema.type_sym |
| when :null |
| return nil |
| when :boolean |
| return default_value |
| when :int, :long |
| return Integer(default_value) |
| when :float, :double |
| return Float(default_value) |
| when :enum, :fixed, :string, :bytes |
| return default_value |
| when :array |
| read_array = [] |
| default_value.each do |json_val| |
| item_val = read_default_value(field_schema.items, json_val) |
| read_array << item_val |
| end |
| return read_array |
| when :map |
| read_map = {} |
| default_value.each do |key, json_val| |
| map_val = read_default_value(field_schema.values, json_val) |
| read_map[key] = map_val |
| end |
| return read_map |
| when :union |
| return read_default_value(field_schema.schemas[0], default_value) |
| when :record, :error |
| read_record = {} |
| field_schema.fields.each do |field| |
| json_val = default_value[field.name] |
| json_val = field.default unless json_val |
| field_val = read_default_value(field.type, json_val) |
| read_record[field.name] = field_val |
| end |
| return read_record |
| else |
| fail_msg = "Unknown type: #{field_schema.type}" |
| raise AvroError, fail_msg |
| end |
| end |
| |
| def skip_data(writers_schema, decoder) |
| case writers_schema.type_sym |
| when :null |
| decoder.skip_null |
| when :boolean |
| decoder.skip_boolean |
| when :string |
| decoder.skip_string |
| when :int |
| decoder.skip_int |
| when :long |
| decoder.skip_long |
| when :float |
| decoder.skip_float |
| when :double |
| decoder.skip_double |
| when :bytes |
| decoder.skip_bytes |
| when :fixed |
| skip_fixed(writers_schema, decoder) |
| when :enum |
| skip_enum(writers_schema, decoder) |
| when :array |
| skip_array(writers_schema, decoder) |
| when :map |
| skip_map(writers_schema, decoder) |
| when :union |
| skip_union(writers_schema, decoder) |
| when :record, :error, :request |
| skip_record(writers_schema, decoder) |
| else |
| raise AvroError, "Unknown schema type: #{writers_schema.type}" |
| end |
| end |
| |
| def skip_fixed(writers_schema, decoder) |
| decoder.skip(writers_schema.size) |
| end |
| |
| def skip_enum(_writers_schema, decoder) |
| decoder.skip_int |
| end |
| |
| def skip_union(writers_schema, decoder) |
| index = decoder.read_long |
| skip_data(writers_schema.schemas[index], decoder) |
| end |
| |
| def skip_array(writers_schema, decoder) |
| skip_blocks(decoder) { skip_data(writers_schema.items, decoder) } |
| end |
| |
| def skip_map(writers_schema, decoder) |
| skip_blocks(decoder) { |
| decoder.skip_string |
| skip_data(writers_schema.values, decoder) |
| } |
| end |
| |
| def skip_record(writers_schema, decoder) |
| writers_schema.fields.each{|f| skip_data(f.type, decoder) } |
| end |
| |
| private |
| def skip_blocks(decoder, &blk) |
| block_count = decoder.read_long |
| while block_count != 0 |
| if block_count < 0 |
| decoder.skip(decoder.read_long) |
| else |
| block_count.times(&blk) |
| end |
| block_count = decoder.read_long |
| end |
| end |
| end # DatumReader |
| |
| # DatumWriter for generic ruby objects |
| class DatumWriter |
| attr_accessor :writers_schema |
| def initialize(writers_schema=nil) |
| @writers_schema = writers_schema |
| end |
| |
| def write(datum, encoder) |
| write_data(writers_schema, datum, encoder) |
| end |
| |
| def write_data(writers_schema, logical_datum, encoder) |
| datum = writers_schema.type_adapter.encode(logical_datum) |
| |
| unless Schema.validate(writers_schema, datum, { recursive: false, encoded: true }) |
| raise AvroTypeError.new(writers_schema, datum) |
| end |
| |
| # function dispatch to write datum |
| case writers_schema.type_sym |
| when :null; encoder.write_null(datum) |
| when :boolean; encoder.write_boolean(datum) |
| when :string; encoder.write_string(datum) |
| when :int; encoder.write_int(datum) |
| when :long; encoder.write_long(datum) |
| when :float; encoder.write_float(datum) |
| when :double; encoder.write_double(datum) |
| when :bytes; encoder.write_bytes(datum) |
| when :fixed; write_fixed(writers_schema, datum, encoder) |
| when :enum; write_enum(writers_schema, datum, encoder) |
| when :array; write_array(writers_schema, datum, encoder) |
| when :map; write_map(writers_schema, datum, encoder) |
| when :union; write_union(writers_schema, datum, encoder) |
| when :record, :error, :request; write_record(writers_schema, datum, encoder) |
| else |
| raise AvroError.new("Unknown type: #{writers_schema.type}") |
| end |
| end |
| |
| def write_fixed(_writers_schema, datum, encoder) |
| encoder.write(datum) |
| end |
| |
| def write_enum(writers_schema, datum, encoder) |
| index_of_datum = writers_schema.symbols.index(datum) |
| encoder.write_int(index_of_datum) |
| end |
| |
| def write_array(writers_schema, datum, encoder) |
| raise AvroTypeError.new(writers_schema, datum) unless datum.is_a?(Array) |
| if datum.size > 0 |
| encoder.write_long(datum.size) |
| datum.each do |item| |
| write_data(writers_schema.items, item, encoder) |
| end |
| end |
| encoder.write_long(0) |
| end |
| |
| def write_map(writers_schema, datum, encoder) |
| raise AvroTypeError.new(writers_schema, datum) unless datum.is_a?(Hash) |
| if datum.size > 0 |
| encoder.write_long(datum.size) |
| datum.each do |k,v| |
| encoder.write_string(k) |
| write_data(writers_schema.values, v, encoder) |
| end |
| end |
| encoder.write_long(0) |
| end |
| |
| def write_union(writers_schema, datum, encoder) |
| index_of_schema = -1 |
| found = writers_schema.schemas. |
| find{|e| index_of_schema += 1; found = Schema.validate(e, datum) } |
| unless found # Because find_index doesn't exist in 1.8.6 |
| raise AvroTypeError.new(writers_schema, datum) |
| end |
| encoder.write_long(index_of_schema) |
| write_data(writers_schema.schemas[index_of_schema], datum, encoder) |
| end |
| |
| def write_record(writers_schema, datum, encoder) |
| raise AvroTypeError.new(writers_schema, datum) unless datum.is_a?(Hash) |
| writers_schema.fields.each do |field| |
| write_data(field.type, datum.key?(field.name) ? datum[field.name] : datum[field.name.to_sym], encoder) |
| end |
| end |
| end # DatumWriter |
| end |
| end |