| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| module Avro |
| module IO |
| # Raised when datum is not an example of schema |
| class AvroTypeError < AvroError |
| def initialize(expected_schema, datum) |
| super("The datum #{datum.inspect} is not an example of schema #{expected_schema}") |
| end |
| end |
| |
| # Raised when writer's and reader's schema do not match |
| class SchemaMatchException < AvroError |
| def initialize(writers_schema, readers_schema) |
| super("Writer's schema #{writers_schema} and Reader's schema " + |
| "#{readers_schema} do not match.") |
| end |
| end |
| |
| # FIXME(jmhodges) move validate to this module? |
| |
| class BinaryDecoder |
| # Read leaf values |
| |
| # reader is an object on which we can call read, seek and tell. |
| attr_reader :reader |
| def initialize(reader) |
| @reader = reader |
| end |
| |
| def byte! |
| @reader.read(1)[0] |
| end |
| |
| def read_null |
| # null is written as zero byte's |
| nil |
| end |
| |
| def read_boolean |
| byte! == 1 |
| end |
| |
| def read_int; read_long; end |
| |
| def read_long |
| # int and long values are written using variable-length, |
| # zig-zag coding. |
| b = byte! |
| n = b & 0x7F |
| shift = 7 |
| while (b & 0x80) != 0 |
| b = byte! |
| n |= (b & 0x7F) << shift |
| shift += 7 |
| end |
| (n >> 1) ^ -(n & 1) |
| end |
| |
| def read_float |
| # A float is written as 4 bytes. |
| # The float is converted into a 32-bit integer using a method |
| # equivalent to Java's floatToIntBits and then encoded in |
| # little-endian format. |
| @reader.read(4).unpack('e')[0] |
| end |
| |
| def read_double |
| # A double is written as 8 bytes. |
| # The double is converted into a 64-bit integer using a method |
| # equivalent to Java's doubleToLongBits and then encoded in |
| # little-endian format. |
| @reader.read(8).unpack('E')[0] |
| end |
| |
| def read_bytes |
| # Bytes are encoded as a long followed by that many bytes of |
| # data. |
| read(read_long) |
| end |
| |
| def read_string |
| # A string is encoded as a long followed by that many bytes of |
| # UTF-8 encoded character data. |
| # FIXME utf-8 encode this in 1.9 |
| read_bytes |
| end |
| |
| def read(len) |
| # Read n bytes |
| @reader.read(len) |
| end |
| |
| def skip_null |
| nil |
| end |
| |
| def skip_boolean |
| skip(1) |
| end |
| |
| def skip_int |
| skip_long |
| end |
| |
| def skip_long |
| b = byte! |
| while (b & 0x80) != 0 |
| b = byte! |
| end |
| end |
| |
| def skip_float |
| skip(4) |
| end |
| |
| def skip_double |
| skip(8) |
| end |
| |
| def skip_bytes |
| skip(read_long) |
| end |
| |
| def skip_string |
| skip_bytes |
| end |
| |
| def skip(n) |
| reader.seek(reader.tell() + n) |
| end |
| end |
| |
| # Write leaf values |
| class BinaryEncoder |
| attr_reader :writer |
| |
| def initialize(writer) |
| @writer = writer |
| end |
| |
| # null is written as zero bytes |
| def write_null(datum) |
| nil |
| end |
| |
| # a boolean is written as a single byte |
| # whose value is either 0 (false) or 1 (true). |
| def write_boolean(datum) |
| on_disk = datum ? 1.chr : 0.chr |
| writer.write(on_disk) |
| end |
| |
| # int and long values are written using variable-length, |
| # zig-zag coding. |
| def write_int(n) |
| write_long(n) |
| end |
| |
| # int and long values are written using variable-length, |
| # zig-zag coding. |
| def write_long(n) |
| foo = n |
| n = (n << 1) ^ (n >> 63) |
| while (n & ~0x7F) != 0 |
| @writer.write(((n & 0x7f) | 0x80).chr) |
| n >>= 7 |
| end |
| @writer.write(n.chr) |
| end |
| |
| # A float is written as 4 bytes. |
| # The float is converted into a 32-bit integer using a method |
| # equivalent to Java's floatToIntBits and then encoded in |
| # little-endian format. |
| def write_float(datum) |
| @writer.write([datum].pack('e')) |
| end |
| |
| # A double is written as 8 bytes. |
| # The double is converted into a 64-bit integer using a method |
| # equivalent to Java's doubleToLongBits and then encoded in |
| # little-endian format. |
| def write_double(datum) |
| @writer.write([datum].pack('E')) |
| end |
| |
| # Bytes are encoded as a long followed by that many bytes of data. |
| def write_bytes(datum) |
| write_long(datum.size) |
| @writer.write(datum) |
| end |
| |
| # A string is encoded as a long followed by that many bytes of |
| # UTF-8 encoded character data |
| def write_string(datum) |
| # FIXME utf-8 encode this in 1.9 |
| write_bytes(datum) |
| end |
| |
| # Write an arbritary datum. |
| def write(datum) |
| writer.write(datum) |
| end |
| end |
| |
| class DatumReader |
| def self.check_props(schema_one, schema_two, prop_list) |
| prop_list.all? do |prop| |
| schema_one.send(prop) == schema_two.send(prop) |
| end |
| end |
| |
| def self.match_schemas(writers_schema, readers_schema) |
| w_type = writers_schema.type |
| r_type = readers_schema.type |
| |
| # This conditional is begging for some OO love. |
| if w_type == 'union' || r_type == 'union' |
| return true |
| end |
| |
| if w_type == r_type |
| if Schema::PRIMITIVE_TYPES.include?(w_type) && |
| Schema::PRIMITIVE_TYPES.include?(r_type) |
| return true |
| end |
| |
| case r_type |
| when 'record' |
| return check_props(writers_schema, readers_schema, [:fullname]) |
| when 'error' |
| return check_props(writers_scheam, readers_schema, [:fullname]) |
| when 'request' |
| return true |
| when 'fixed' |
| return check_props(writers_schema, readers_schema, [:fullname, :size]) |
| when 'enum' |
| return check_props(writers_schema, readers_schema, [:fullname]) |
| when 'map' |
| return check_props(writers_schema.values, readers_schema.values, [:type]) |
| when 'array' |
| return check_props(writers_schema.items, readers_schema.items, [:type]) |
| end |
| end |
| |
| # Handle schema promotion |
| if w_type == 'int' && ['long', 'float', 'double'].include?(r_type) |
| return true |
| elsif w_type == 'long' && ['float', 'double'].include?(r_type) |
| return true |
| elsif w_type == 'float' && r_type == 'double' |
| return true |
| end |
| |
| return false |
| end |
| |
| attr_accessor :writers_schema, :readers_schema |
| |
| def initialize(writers_schema=nil, readers_schema=nil) |
| @writers_schema = writers_schema |
| @readers_schema = readers_schema |
| end |
| |
| def read(decoder) |
| self.readers_schema = writers_schema unless readers_schema |
| read_data(writers_schema, readers_schema, decoder) |
| end |
| |
| def read_data(writers_schema, readers_schema, decoder) |
| # schema matching |
| unless self.class.match_schemas(writers_schema, readers_schema) |
| raise SchemaMatchException.new(writers_schema, readers_schema) |
| end |
| |
| # schema resolution: reader's schema is a union, writer's |
| # schema is not |
| if writers_schema.type != 'union' && readers_schema.type == 'union' |
| rs = readers_schema.schemas.find{|s| |
| self.class.match_schemas(writers_schema, s) |
| } |
| return read_data(writers_schema, rs, decoder) if rs |
| raise SchemaMatchException.new(writers_schema, readers_schema) |
| end |
| |
| # function dispatch for reading data based on type of writer's |
| # schema |
| case writers_schema.type |
| when 'null'; decoder.read_null |
| when 'boolean'; decoder.read_boolean |
| when 'string'; decoder.read_string |
| when 'int'; decoder.read_int |
| when 'long'; decoder.read_long |
| when 'float'; decoder.read_float |
| when 'double'; decoder.read_double |
| when 'bytes'; decoder.read_bytes |
| when 'fixed'; read_fixed(writers_schema, readers_schema, decoder) |
| when 'enum'; read_enum(writers_schema, readers_schema, decoder) |
| when 'array'; read_array(writers_schema, readers_schema, decoder) |
| when 'map'; read_map(writers_schema, readers_schema, decoder) |
| when 'union'; read_union(writers_schema, readers_schema, decoder) |
| when 'record', 'errors', 'request'; read_record(writers_schema, readers_schema, decoder) |
| else |
| raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}" |
| end |
| end |
| |
| def read_fixed(writers_schema, readers_schema, decoder) |
| decoder.read(writers_schema.size) |
| end |
| |
| def read_enum(writers_schema, readers_schema, decoder) |
| index_of_symbol = decoder.read_int |
| read_symbol = writers_schema.symbols[index_of_symbol] |
| |
| # TODO(jmhodges): figure out what unset means for resolution |
| # schema resolution |
| unless readers_schema.symbols.include?(read_symbol) |
| # 'unset' here |
| end |
| |
| read_symbol |
| end |
| |
| def read_array(writers_schema, readers_schema, decoder) |
| read_items = [] |
| block_count = decoder.read_long |
| while block_count != 0 |
| if block_count < 0 |
| block_count = -block_count |
| block_size = decoder.read_long |
| end |
| block_count.times do |
| read_items << read_data(writers_schema.items, |
| readers_schema.items, |
| decoder) |
| end |
| block_count = decoder.read_long |
| end |
| |
| read_items |
| end |
| |
| def read_map(writers_schema, readers_schema, decoder) |
| read_items = {} |
| block_count = decoder.read_long |
| while block_count != 0 |
| if block_count < 0 |
| block_count = -block_count |
| block_size = decoder.read_long |
| end |
| block_count.times do |
| key = decoder.read_string |
| read_items[key] = read_data(writers_schema.values, |
| readers_schema.values, |
| decoder) |
| end |
| block_count = decoder.read_long |
| end |
| |
| read_items |
| end |
| |
| def read_union(writers_schema, readers_schema, decoder) |
| index_of_schema = decoder.read_long |
| selected_writers_schema = writers_schema.schemas[index_of_schema] |
| |
| read_data(selected_writers_schema, readers_schema, decoder) |
| end |
| |
| def read_record(writers_schema, readers_schema, decoder) |
| readers_fields_hash = readers_schema.fields_hash |
| read_record = {} |
| writers_schema.fields.each do |field| |
| if readers_field = readers_fields_hash[field.name] |
| field_val = read_data(field.type, readers_field.type, decoder) |
| read_record[field.name] = field_val |
| else |
| skip_data(field.type, decoder) |
| end |
| end |
| |
| # fill in the default values |
| if readers_fields_hash.size > read_record.size |
| writers_fields_hash = writers_schema.fields_hash |
| readers_fields_hash.each do |field_name, field| |
| unless writers_fields_hash.has_key? field_name |
| if !field.default.nil? |
| field_val = read_default_value(field.type, field.default) |
| read_record[field.name] = field_val |
| else |
| # FIXME(jmhodges) another 'unset' here |
| end |
| end |
| end |
| end |
| |
| read_record |
| end |
| |
| def read_default_value(field_schema, default_value) |
| # Basically a JSON Decoder? |
| case field_schema.type |
| when 'null' |
| return nil |
| when 'boolean' |
| return default_value |
| when 'int', 'long' |
| return Integer(default_value) |
| when 'float', 'double' |
| return Float(default_value) |
| when 'enum', 'fixed', 'string', 'bytes' |
| return default_value |
| when 'array' |
| read_array = [] |
| default_value.each do |json_val| |
| item_val = read_default_value(field_schema.items, json_val) |
| read_array << item_val |
| end |
| return read_array |
| when 'map' |
| read_map = {} |
| default_value.each do |key, json_val| |
| map_val = read_default_value(field_schema.values, json_val) |
| read_map[key] = map_val |
| end |
| return read_map |
| when 'union' |
| return read_default_value(field_schema.schemas[0], default_value) |
| when 'record' |
| read_record = {} |
| field_schema.fields.each do |field| |
| json_val = default_value[field.name] |
| json_val = field.default unless json_val |
| field_val = read_default_value(field.type, json_val) |
| read_record[field.name] = field_val |
| end |
| return read_record |
| else |
| fail_msg = "Unknown type: #{field_schema.type}" |
| raise AvroError(fail_msg) |
| end |
| end |
| |
| def skip_data(writers_schema, decoder) |
| case writers_schema.type |
| when 'null' |
| decoder.skip_null |
| when 'boolean' |
| decoder.skip_boolean |
| when 'string' |
| decoder.skip_string |
| when 'int' |
| decoder.skip_int |
| when 'long' |
| decoder.skip_long |
| when 'float' |
| decoder.skip_float |
| when 'double' |
| decoder.skip_double |
| when 'bytes' |
| decoder.skip_bytes |
| when 'fixed' |
| skip_fixed(writers_schema, decoder) |
| when 'enum' |
| skip_enum(writers_schema, decoder) |
| when 'array' |
| skip_array(writers_schema, decoder) |
| when 'map' |
| skip_map(writers_schema, decoder) |
| when 'union' |
| skip_union(writers_schema, decoder) |
| when 'record', 'error', 'request' |
| skip_record(writers_schema, decoder) |
| else |
| raise AvroError, "Unknown schema type: #{schm.type}" |
| end |
| end |
| |
| def skip_fixed(writers_schema, decoder) |
| decoder.skip(writers_schema.size) |
| end |
| |
| def skip_enum(writers_schema, decoder) |
| decoder.skip_int |
| end |
| |
| def skip_union(writers_schema, decoder) |
| index = decoder.read_long |
| skip_data(writers_schema.schemas[index], decoder) |
| end |
| |
| def skip_array(writers_schema, decoder) |
| skip_blocks(decoder) { skip_data(writers_schema.items, decoder) } |
| end |
| |
| def skip_map(writers_schema, decoder) |
| skip_blocks(decoder) { |
| decoder.skip_string |
| skip_data(writers_schema.values, decoder) |
| } |
| end |
| |
| def skip_record(writers_schema, decoder) |
| writers_schema.fields.each{|f| skip_data(f.type, decoder) } |
| end |
| |
| private |
| def skip_blocks(decoder, &blk) |
| block_count = decoder.read_long |
| while block_count != 0 |
| if block_count < 0 |
| decoder.skip(decoder.read_long) |
| else |
| block_count.times &blk |
| end |
| block_count = decoder.read_long |
| end |
| end |
| end # DatumReader |
| |
| # DatumWriter for generic ruby objects |
| class DatumWriter |
| attr_accessor :writers_schema |
| def initialize(writers_schema=nil) |
| @writers_schema = writers_schema |
| end |
| |
| def write(datum, encoder) |
| write_data(writers_schema, datum, encoder) |
| end |
| |
| def write_data(writers_schema, datum, encoder) |
| unless Schema.validate(writers_schema, datum) |
| raise AvroTypeError.new(writers_schema, datum) |
| end |
| |
| # function dispatch to write datum |
| case writers_schema.type |
| when 'null'; encoder.write_null(datum) |
| when 'boolean'; encoder.write_boolean(datum) |
| when 'string'; encoder.write_string(datum) |
| when 'int'; encoder.write_int(datum) |
| when 'long'; encoder.write_long(datum) |
| when 'float'; encoder.write_float(datum) |
| when 'double'; encoder.write_double(datum) |
| when 'bytes'; encoder.write_bytes(datum) |
| when 'fixed'; write_fixed(writers_schema, datum, encoder) |
| when 'enum'; write_enum(writers_schema, datum, encoder) |
| when 'array'; write_array(writers_schema, datum, encoder) |
| when 'map'; write_map(writers_schema, datum, encoder) |
| when 'union'; write_union(writers_schema, datum, encoder) |
| when 'record', 'errors', 'request'; write_record(writers_schema, datum, encoder) |
| else |
| raise AvroError.new("Unknown type: #{writers_schema.type}") |
| end |
| end |
| |
| def write_fixed(writers_schema, datum, encoder) |
| encoder.write(datum) |
| end |
| |
| def write_enum(writers_schema, datum, encoder) |
| index_of_datum = writers_schema.symbols.index(datum) |
| encoder.write_int(index_of_datum) |
| end |
| |
| def write_array(writers_schema, datum, encoder) |
| if datum.size > 0 |
| encoder.write_long(datum.size) |
| datum.each do |item| |
| write_data(writers_schema.items, item, encoder) |
| end |
| end |
| encoder.write_long(0) |
| end |
| |
| def write_map(writers_schema, datum, encoder) |
| if datum.size > 0 |
| encoder.write_long(datum.size) |
| datum.each do |k,v| |
| encoder.write_string(k) |
| write_data(writers_schema.values, v, encoder) |
| end |
| end |
| encoder.write_long(0) |
| end |
| |
| def write_union(writers_schema, datum, encoder) |
| index_of_schema = -1 |
| found = writers_schema.schemas. |
| find{|e| index_of_schema += 1; found = Schema.validate(e, datum) } |
| unless found # Because find_index doesn't exist in 1.8.6 |
| raise AvroTypeError.new(writers_schema, datum) |
| end |
| encoder.write_long(index_of_schema) |
| write_data(writers_schema.schemas[index_of_schema], datum, encoder) |
| end |
| |
| def write_record(writers_schema, datum, encoder) |
| writers_schema.fields.each do |field| |
| write_data(field.type, datum[field.name], encoder) |
| end |
| end |
| end # DatumWriter |
| end |
| end |