blob: 248caa4027220592d3854c05c5598d640727cb1a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'test_help'
class TestIO < Test::Unit::TestCase
DATAFILE = 'tmp/test.rb.avro'
Schema = Avro::Schema
def test_null
check_default('"null"', "null", nil)
end
def test_boolean
check_default('"boolean"', "true", true)
check_default('"boolean"', "false", false)
end
def test_string
check_default('"string"', '"foo"', "foo")
end
def test_bytes
check_default('"bytes"', '"foo"', "foo")
end
def test_int
check_default('"int"', "5", 5)
end
def test_long
check_default('"long"', "9", 9)
end
def test_float
check_default('"float"', "1.2", 1.2)
end
def test_double
check_default('"double"', "1.2", 1.2)
end
def test_array
array_schema = '{"type": "array", "items": "long"}'
check_default(array_schema, "[1]", [1])
end
def test_map
map_schema = '{"type": "map", "values": "long"}'
check_default(map_schema, '{"a": 1}', {"a" => 1})
end
def test_record
record_schema = <<EOS
{"type": "record",
"name": "Test",
"fields": [{"name": "f",
"type": "long"}]}
EOS
check_default(record_schema, '{"f": 11}', {"f" => 11})
end
def test_enum
enum_schema = '{"type": "enum", "name": "Test","symbols": ["A", "B"]}'
check_default(enum_schema, '"B"', "B")
end
def test_recursive
recursive_schema = <<EOS
{"type": "record",
"name": "Node",
"fields": [{"name": "label", "type": "string"},
{"name": "children",
"type": {"type": "array", "items": "Node"}}]}
EOS
check(recursive_schema)
end
def test_union
union_schema = <<EOS
["string",
"null",
"long",
{"type": "record",
"name": "Cons",
"fields": [{"name": "car", "type": "string"},
{"name": "cdr", "type": "string"}]}]
EOS
check(union_schema)
check_default('["double", "long"]', "1.1", 1.1)
end
def test_lisp
lisp_schema = <<EOS
{"type": "record",
"name": "Lisp",
"fields": [{"name": "value",
"type": ["null", "string",
{"type": "record",
"name": "Cons",
"fields": [{"name": "car", "type": "Lisp"},
{"name": "cdr", "type": "Lisp"}]}]}]}
EOS
check(lisp_schema)
end
def test_fixed
fixed_schema = '{"type": "fixed", "name": "Test", "size": 1}'
check_default(fixed_schema, '"a"', "a")
end
def test_enum_with_duplicate
str = '{"type": "enum", "name": "Test","symbols" : ["AA", "AA"]}'
assert_raises(Avro::SchemaParseError) do
schema = Avro::Schema.parse str
end
end
BINARY_INT_ENCODINGS = [
[0, '00'],
[-1, '01'],
[1, '02'],
[-2, '03'],
[2, '04'],
[-64, '7f'],
[64, '80 01'],
[8192, '80 80 01'],
[-8193, '81 80 01'],
]
def avro_hexlify(reader)
bytes = []
current_byte = reader.read(1)
bytes << hexlify(current_byte)
while (current_byte[0] & 0x80) != 0
current_byte = reader.read(1)
bytes << hexlify(current_byte)
end
bytes.join ' '
end
def hexlify(msg)
msg.split("").collect { |c| c[0].to_s(16).rjust(2, '0') }.join
end
def test_binary_int_encoding
for value, hex_encoding in BINARY_INT_ENCODINGS
# write datum in binary to string buffer
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
datum_writer = Avro::IO::DatumWriter.new(Avro::Schema.parse('"int"'))
datum_writer.write(value, encoder)
buffer.seek(0)
hex_val = avro_hexlify(buffer)
assert_equal hex_encoding, hex_val
end
end
def test_binary_long_encoding
for value, hex_encoding in BINARY_INT_ENCODINGS
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
datum_writer = Avro::IO::DatumWriter.new(Avro::Schema.parse('"long"'))
datum_writer.write(value, encoder)
# read it out of the buffer and hexlify it
buffer.seek(0)
hex_val = avro_hexlify(buffer)
assert_equal hex_encoding, hex_val
end
end
def test_skip_long
for value_to_skip, hex_encoding in BINARY_INT_ENCODINGS
value_to_read = 6253
# write some data in binary to string buffer
writer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(writer)
datum_writer = Avro::IO::DatumWriter.new(Avro::Schema.parse('"long"'))
datum_writer.write(value_to_skip, encoder)
datum_writer.write(value_to_read, encoder)
# skip the value
reader = StringIO.new(writer.string())
decoder = Avro::IO::BinaryDecoder.new(reader)
decoder.skip_long()
# read data from string buffer
datum_reader = Avro::IO::DatumReader.new(Avro::Schema.parse('"long"'))
read_value = datum_reader.read(decoder)
# check it
assert_equal value_to_read, read_value
end
end
def test_skip_int
for value_to_skip, hex_encoding in BINARY_INT_ENCODINGS
value_to_read = 6253
writer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(writer)
datum_writer = Avro::IO::DatumWriter.new(Avro::Schema.parse('"int"'))
datum_writer.write(value_to_skip, encoder)
datum_writer.write(value_to_read, encoder)
reader = StringIO.new(writer.string)
decoder = Avro::IO::BinaryDecoder.new(reader)
decoder.skip_int
datum_reader = Avro::IO::DatumReader.new(Avro::Schema.parse('"int"'))
read_value = datum_reader.read(decoder)
assert_equal value_to_read, read_value
end
end
def test_skip_union
["hello", -1, 32, nil].each do |value_to_skip|
value_to_read = 6253
schema = Avro::Schema.parse('["int", "string", "null"]')
writer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(writer)
datum_writer = Avro::IO::DatumWriter.new(schema)
datum_writer.write(value_to_skip, encoder)
datum_writer.write(value_to_read, encoder)
reader = StringIO.new(writer.string)
decoder = Avro::IO::BinaryDecoder.new(reader)
datum_reader = Avro::IO::DatumReader.new(schema)
datum_reader.skip_data(schema, decoder)
read_value = datum_reader.read(decoder)
assert_equal value_to_read, read_value
end
end
def test_schema_promotion
promotable_schemas = ['"int"', '"long"', '"float"', '"double"']
incorrect = 0
promotable_schemas.each_with_index do |ws, i|
writers_schema = Avro::Schema.parse(ws)
datum_to_write = 219
for rs in promotable_schemas[(i + 1)..-1]
readers_schema = Avro::Schema.parse(rs)
writer, enc, dw = write_datum(datum_to_write, writers_schema)
datum_read = read_datum(writer, writers_schema, readers_schema)
if datum_read != datum_to_write
incorrect += 1
end
end
assert_equal(incorrect, 0)
end
end
private
def check_default(schema_json, default_json, default_value)
check(schema_json)
actual_schema = '{"type": "record", "name": "Foo", "fields": []}'
actual = Avro::Schema.parse(actual_schema)
expected_schema = <<EOS
{"type": "record",
"name": "Foo",
"fields": [{"name": "f", "type": #{schema_json}, "default": #{default_json}}]}
EOS
expected = Avro::Schema.parse(expected_schema)
reader = Avro::IO::DatumReader.new(actual, expected)
record = reader.read(Avro::IO::BinaryDecoder.new(StringIO.new))
assert_equal default_value, record["f"]
end
def check(str)
# parse schema, then convert back to string
schema = Avro::Schema.parse str
parsed_string = schema.to_s
# test that the round-trip didn't mess up anything
# NB: I don't think we should do this. Why enforce ordering?
assert_equal(Yajl.load(str),
Yajl.load(parsed_string))
# test __eq__
assert_equal(schema, Avro::Schema.parse(str))
# test hashcode doesn't generate infinite recursion
schema.hash
# test serialization of random data
randomdata = RandomData.new(schema)
9.times { checkser(schema, randomdata) }
# test writing of data to file
check_datafile(schema)
end
def checkser(schm, randomdata)
datum = randomdata.next
assert validate(schm, datum)
w = Avro::IO::DatumWriter.new(schm)
writer = StringIO.new "", "w"
w.write(datum, Avro::IO::BinaryEncoder.new(writer))
r = datum_reader(schm)
reader = StringIO.new(writer.string)
ob = r.read(Avro::IO::BinaryDecoder.new(reader))
assert_equal(datum, ob) # FIXME check on assertdata conditional
end
def check_datafile(schm)
seed = 0
count = 10
random_data = RandomData.new(schm, seed)
f = File.open(DATAFILE, 'wb')
dw = Avro::DataFile::Writer.new(f, datum_writer(schm), schm)
count.times{ dw << random_data.next }
dw.close
random_data = RandomData.new(schm, seed)
f = File.open(DATAFILE, 'r+')
dr = Avro::DataFile::Reader.new(f, datum_reader(schm))
last_index = nil
dr.each_with_index do |data, c|
last_index = c
# FIXME assertdata conditional
assert_equal(random_data.next, data)
end
dr.close
assert_equal count, last_index+1
end
def validate(schm, datum)
Avro::Schema.validate(schm, datum)
end
def datum_writer(schm)
Avro::IO::DatumWriter.new(schm)
end
def datum_reader(schm)
Avro::IO::DatumReader.new(schm)
end
def write_datum(datum, writers_schema)
writer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(writer)
datum_writer = Avro::IO::DatumWriter.new(writers_schema)
datum_writer.write(datum, encoder)
[writer, encoder, datum_writer]
end
def read_datum(buffer, writers_schema, readers_schema=nil)
reader = StringIO.new(buffer.string)
decoder = Avro::IO::BinaryDecoder.new(reader)
datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
datum_reader.read(decoder)
end
end