blob: e3644213b73289c8605273d27bf0c71b45005860 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'test_help'
class TestIO < Test::Unit::TestCase
DATAFILE = 'tmp/test.rb.avro'
Schema = Avro::Schema
def test_null
check('"null"')
check_default('"null"', "null", nil)
end
def test_boolean
check('"boolean"')
check_default('"boolean"', "true", true)
check_default('"boolean"', "false", false)
end
def test_string
check('"string"')
check_default('"string"', '"foo"', "foo")
end
def test_bytes
check('"bytes"')
check_default('"bytes"', '"foo"', "foo")
end
def test_int
check('"int"')
check_default('"int"', "5", 5)
end
def test_long
check('"long"')
check_default('"long"', "9", 9)
end
def test_float
check('"float"')
check_default('"float"', "1.2", 1.2)
end
def test_double
check('"double"')
check_default('"double"', "1.2", 1.2)
end
def test_array
array_schema = '{"type": "array", "items": "long"}'
check(array_schema)
check_default(array_schema, "[1]", [1])
end
def test_map
map_schema = '{"type": "map", "values": "long"}'
check(map_schema)
check_default(map_schema, '{"a": 1}', {"a" => 1})
end
def test_record
record_schema = <<EOS
{"type": "record",
"name": "Test",
"fields": [{"name": "f",
"type": "long"}]}
EOS
check(record_schema)
check_default(record_schema, '{"f": 11}', {"f" => 11})
end
def test_record_with_logical_type
record_schema = <<EOS
{"type": "record",
"name": "Test",
"fields": [{"name": "ts",
"type": {"type": "long",
"logicalType": "timestamp-micros"}},
{"name": "ts2",
"type": {"type": "long",
"logicalType": "timestamp-millis"}}]}
EOS
check(record_schema)
end
def test_error
error_schema = <<EOS
{"type": "error",
"name": "TestError",
"fields": [{"name": "message",
"type": "string"}]}
EOS
check(error_schema)
check_default(error_schema, '{"message": "boom"}', {"message" => "boom"})
end
def test_enum
enum_schema = '{"type": "enum", "name": "Test","symbols": ["A", "B"]}'
check(enum_schema)
check_default(enum_schema, '"B"', "B")
end
def test_enum_with_default
enum_schema = '{"type": "enum", "name": "Test", "symbols": ["A", "B"], "default": "A"}'
check(enum_schema)
# Field default is used for missing field.
check_default(enum_schema, '"B"', "B")
end
def test_recursive
recursive_schema = <<EOS
{"type": "record",
"name": "Node",
"fields": [{"name": "label", "type": "string"},
{"name": "children",
"type": {"type": "array", "items": "Node"}}]}
EOS
check(recursive_schema)
end
def test_union
union_schema = <<EOS
["string",
{"type": "int", "logicalType": "date"},
"null",
"long",
{"type": "record",
"name": "Cons",
"fields": [{"name": "car", "type": "string"},
{"name": "cdr", "type": "string"}]}]
EOS
check(union_schema)
check_default('["double", "long"]', "1.1", 1.1)
end
def test_lisp
lisp_schema = <<EOS
{"type": "record",
"name": "Lisp",
"fields": [{"name": "value",
"type": ["null", "string",
{"type": "record",
"name": "Cons",
"fields": [{"name": "car", "type": "Lisp"},
{"name": "cdr", "type": "Lisp"}]}]}]}
EOS
check(lisp_schema)
end
def test_fixed
fixed_schema = '{"type": "fixed", "name": "Test", "size": 1}'
check(fixed_schema)
check_default(fixed_schema, '"a"', "a")
end
def test_record_variable_key_types
datum = { sym: "foo", "str"=>"bar"}
ret_val = { "sym"=> "foo", "str"=>"bar"}
schema = Schema.parse('{"type":"record", "name":"rec", "fields":[{"name":"sym", "type":"string"}, {"name":"str", "type":"string"}]}')
writer, _encoder, _datum_writer = write_datum(datum, schema)
ret_datum = read_datum(writer, schema)
assert_equal ret_datum, ret_val
end
def test_record_with_nil
schema = Avro::Schema.parse('{"type":"record", "name":"rec", "fields":[{"type":"int", "name":"i"}]}')
assert_raise(Avro::IO::AvroTypeError) do
write_datum(nil, schema)
end
end
def test_array_with_nil
schema = Avro::Schema.parse('{"type":"array", "items":"int"}')
assert_raise(Avro::IO::AvroTypeError) do
write_datum(nil, schema)
end
end
def test_map_with_nil
schema = Avro::Schema.parse('{"type":"map", "values":"long"}')
assert_raise(Avro::IO::AvroTypeError) do
write_datum(nil, schema)
end
end
def test_enum_with_duplicate
str = '{"type": "enum", "name": "Test","symbols" : ["AA", "AA"]}'
assert_raises(Avro::SchemaParseError.new('Duplicate symbol: ["AA", "AA"]')) do
Avro::Schema.parse str
end
end
BINARY_INT_ENCODINGS = [
[0, '00'],
[-1, '01'],
[1, '02'],
[-2, '03'],
[2, '04'],
[-64, '7f'],
[64, '80 01'],
[8192, '80 80 01'],
[-8193, '81 80 01'],
]
def avro_hexlify(reader)
bytes = []
current_byte = reader.read(1)
bytes << hexlify(current_byte)
while (current_byte.unpack('C').first & 0x80) != 0
current_byte = reader.read(1)
bytes << hexlify(current_byte)
end
bytes.join ' '
end
def hexlify(msg)
msg.unpack("H*")
end
def test_binary_int_encoding
for value, hex_encoding in BINARY_INT_ENCODINGS
# write datum in binary to string buffer
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
datum_writer = Avro::IO::DatumWriter.new(Avro::Schema.parse('"int"'))
datum_writer.write(value, encoder)
buffer.seek(0)
hex_val = avro_hexlify(buffer)
assert_equal hex_encoding, hex_val
end
end
def test_binary_long_encoding
for value, hex_encoding in BINARY_INT_ENCODINGS
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
datum_writer = Avro::IO::DatumWriter.new(Avro::Schema.parse('"long"'))
datum_writer.write(value, encoder)
# read it out of the buffer and hexlify it
buffer.seek(0)
hex_val = avro_hexlify(buffer)
assert_equal hex_encoding, hex_val
end
end
def test_utf8_string_encoding
[
"\xC3".force_encoding('ISO-8859-1'),
"\xC3\x83".force_encoding('UTF-8')
].each do |value|
output = ''.force_encoding('BINARY')
encoder = Avro::IO::BinaryEncoder.new(StringIO.new(output))
datum_writer = Avro::IO::DatumWriter.new(Avro::Schema.parse('"string"'))
datum_writer.write(value, encoder)
assert_equal "\x04\xc3\x83".force_encoding('BINARY'), output
end
end
def test_bytes_encoding
[
"\xC3\x83".force_encoding('BINARY'),
"\xC3\x83".force_encoding('ISO-8859-1'),
"\xC3\x83".force_encoding('UTF-8')
].each do |value|
output = ''.force_encoding('BINARY')
encoder = Avro::IO::BinaryEncoder.new(StringIO.new(output))
datum_writer = Avro::IO::DatumWriter.new(Avro::Schema.parse('"bytes"'))
datum_writer.write(value, encoder)
assert_equal "\x04\xc3\x83".force_encoding('BINARY'), output
end
end
def test_fixed_encoding
[
"\xC3\x83".force_encoding('BINARY'),
"\xC3\x83".force_encoding('ISO-8859-1'),
"\xC3\x83".force_encoding('UTF-8')
].each do |value|
output = ''.force_encoding('BINARY')
encoder = Avro::IO::BinaryEncoder.new(StringIO.new(output))
schema = '{"type": "fixed", "name": "TwoBytes", "size": 2}'
datum_writer = Avro::IO::DatumWriter.new(Avro::Schema.parse(schema))
datum_writer.write(value, encoder)
assert_equal "\xc3\x83".force_encoding('BINARY'), output
end
end
def test_skip_long
for value_to_skip, _hex_encoding in BINARY_INT_ENCODINGS
value_to_read = 6253
# write some data in binary to string buffer
writer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(writer)
datum_writer = Avro::IO::DatumWriter.new(Avro::Schema.parse('"long"'))
datum_writer.write(value_to_skip, encoder)
datum_writer.write(value_to_read, encoder)
# skip the value
reader = StringIO.new(writer.string())
decoder = Avro::IO::BinaryDecoder.new(reader)
decoder.skip_long()
# read data from string buffer
datum_reader = Avro::IO::DatumReader.new(Avro::Schema.parse('"long"'))
read_value = datum_reader.read(decoder)
# check it
assert_equal value_to_read, read_value
end
end
def test_skip_int
for value_to_skip, _hex_encoding in BINARY_INT_ENCODINGS
value_to_read = 6253
writer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(writer)
datum_writer = Avro::IO::DatumWriter.new(Avro::Schema.parse('"int"'))
datum_writer.write(value_to_skip, encoder)
datum_writer.write(value_to_read, encoder)
reader = StringIO.new(writer.string)
decoder = Avro::IO::BinaryDecoder.new(reader)
decoder.skip_int
datum_reader = Avro::IO::DatumReader.new(Avro::Schema.parse('"int"'))
read_value = datum_reader.read(decoder)
assert_equal value_to_read, read_value
end
end
def test_skip_union
["hello", -1, 32, nil].each do |value_to_skip|
value_to_read = 6253
schema = Avro::Schema.parse('["int", "string", "null"]')
writer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(writer)
datum_writer = Avro::IO::DatumWriter.new(schema)
datum_writer.write(value_to_skip, encoder)
datum_writer.write(value_to_read, encoder)
reader = StringIO.new(writer.string)
decoder = Avro::IO::BinaryDecoder.new(reader)
datum_reader = Avro::IO::DatumReader.new(schema)
datum_reader.skip_data(schema, decoder)
read_value = datum_reader.read(decoder)
assert_equal value_to_read, read_value
end
end
def test_schema_promotion
promotable_schemas = ['"int"', '"long"', '"float"', '"double"']
incorrect = 0
promotable_schemas.each_with_index do |ws, i|
writers_schema = Avro::Schema.parse(ws)
datum_to_write = 219
for rs in promotable_schemas[(i + 1)..-1]
readers_schema = Avro::Schema.parse(rs)
writer, _enc, _dw = write_datum(datum_to_write, writers_schema)
datum_read = read_datum(writer, writers_schema, readers_schema)
if datum_read != datum_to_write
incorrect += 1
end
end
assert_equal(incorrect, 0)
end
end
def test_interchangeable_schemas
interchangeable_schemas = ['"string"', '"bytes"']
incorrect = 0
interchangeable_schemas.each_with_index do |ws, i|
writers_schema = Avro::Schema.parse(ws)
datum_to_write = 'foo'
readers_schema = Avro::Schema.parse(interchangeable_schemas[i == 0 ? 1 : 0])
writer, * = write_datum(datum_to_write, writers_schema)
datum_read = read_datum(writer, writers_schema, readers_schema)
if datum_read != datum_to_write
incorrect += 1
end
end
assert_equal(incorrect, 0)
end
def test_unknown_enum_symbol
writers_schema = Avro::Schema.parse(<<-SCHEMA)
{
"type": "enum",
"name": "test",
"symbols": ["B", "C"]
}
SCHEMA
readers_schema = Avro::Schema.parse(<<-SCHEMA)
{
"type": "enum",
"name": "test",
"symbols": ["A", "B"]
}
SCHEMA
datum_to_write = "C"
writer, * = write_datum(datum_to_write, writers_schema)
datum_read = read_datum(writer, writers_schema, readers_schema)
# Ruby implementation did not follow the spec and returns the writer's symbol here
assert_equal(datum_read, datum_to_write)
end
def test_unknown_enum_symbol_with_enum_default
writers_schema = Avro::Schema.parse(<<-SCHEMA)
{
"type": "enum",
"name": "test",
"symbols": ["B", "C"]
}
SCHEMA
readers_schema = Avro::Schema.parse(<<-SCHEMA)
{
"type": "enum",
"name": "test",
"symbols": ["A", "B", "UNKNOWN"],
"default": "UNKNOWN"
}
SCHEMA
datum_to_write = "C"
writer, * = write_datum(datum_to_write, writers_schema)
datum_read = read_datum(writer, writers_schema, readers_schema)
assert_equal(datum_read, "UNKNOWN")
end
def test_array_schema_promotion
writers_schema = Avro::Schema.parse('{"type":"array", "items":"int"}')
readers_schema = Avro::Schema.parse('{"type":"array", "items":"long"}')
datum_to_write = [1, 2]
writer, * = write_datum(datum_to_write, writers_schema)
datum_read = read_datum(writer, writers_schema, readers_schema)
assert_equal(datum_read, datum_to_write)
end
def test_map_schema_promotion
writers_schema = Avro::Schema.parse('{"type":"map", "values":"int"}')
readers_schema = Avro::Schema.parse('{"type":"map", "values":"long"}')
datum_to_write = { 'foo' => 1, 'bar' => 2 }
writer, * = write_datum(datum_to_write, writers_schema)
datum_read = read_datum(writer, writers_schema, readers_schema)
assert_equal(datum_read, datum_to_write)
end
def test_snappy_backward_compat
# a snappy-compressed block payload without the checksum
# this has no back-references, just one literal so the last 9
# bytes are the uncompressed payload.
old_snappy_bytes = "\x09\x20\x02\x06\x02\x0a\x67\x72\x65\x65\x6e"
uncompressed_bytes = "\x02\x06\x02\x0a\x67\x72\x65\x65\x6e"
snappy = Avro::DataFile::SnappyCodec.new
assert_equal(uncompressed_bytes, snappy.decompress(old_snappy_bytes))
end
private
def check_no_default(schema_json)
actual_schema = '{"type": "record", "name": "Foo", "fields": []}'
actual = Avro::Schema.parse(actual_schema)
expected_schema = <<EOS
{"type": "record",
"name": "Foo",
"fields": [{"name": "f", "type": #{schema_json}}]}
EOS
expected = Avro::Schema.parse(expected_schema)
reader = Avro::IO::DatumReader.new(actual, expected)
assert_raise Avro::AvroError do
value = reader.read(Avro::IO::BinaryDecoder.new(StringIO.new))
assert_not_equal(value, :no_default) # should never return this
end
end
def check_default(schema_json, default_json, default_value)
actual_schema = '{"type": "record", "name": "Foo", "fields": []}'
actual = Avro::Schema.parse(actual_schema)
expected_schema = <<EOS
{"type": "record",
"name": "Foo",
"fields": [{"name": "f", "type": #{schema_json}, "default": #{default_json}}]}
EOS
expected = Avro::Schema.parse(expected_schema)
reader = Avro::IO::DatumReader.new(actual, expected)
record = reader.read(Avro::IO::BinaryDecoder.new(StringIO.new))
assert_equal default_value, record["f"]
end
def check(str)
# parse schema, then convert back to string
schema = Avro::Schema.parse str
parsed_string = schema.to_s
# test that the round-trip didn't mess up anything
# NB: I don't think we should do this. Why enforce ordering?
assert_equal(MultiJson.load(str),
MultiJson.load(parsed_string))
# test __eq__
assert_equal(schema, Avro::Schema.parse(str))
# test hashcode doesn't generate infinite recursion
schema.hash
# test serialization of random data
randomdata = RandomData.new(schema)
9.times { checkser(schema, randomdata) }
# test writing of data to file
check_datafile(schema)
# check that AvroError is raised when there is no default
check_no_default(str)
end
def checkser(schm, randomdata)
datum = randomdata.next
assert validate(schm, datum), 'datum is not valid for schema'
w = Avro::IO::DatumWriter.new(schm)
writer = StringIO.new "", "w"
w.write(datum, Avro::IO::BinaryEncoder.new(writer))
r = datum_reader(schm)
reader = StringIO.new(writer.string)
ob = r.read(Avro::IO::BinaryDecoder.new(reader))
assert_equal(datum, ob) # FIXME check on assertdata conditional
end
def check_datafile(schm)
seed = 0
count = 10
random_data = RandomData.new(schm, seed)
f = File.open(DATAFILE, 'wb')
dw = Avro::DataFile::Writer.new(f, datum_writer(schm), schm)
count.times{ dw << random_data.next }
dw.close
random_data = RandomData.new(schm, seed)
f = File.open(DATAFILE, 'r+')
dr = Avro::DataFile::Reader.new(f, datum_reader(schm))
last_index = nil
dr.each_with_index do |data, c|
last_index = c
# FIXME assertdata conditional
assert_equal(random_data.next, data)
end
dr.close
assert_equal count, last_index+1
end
def validate(schm, datum)
Avro::Schema.validate(schm, datum)
end
def datum_writer(schm)
Avro::IO::DatumWriter.new(schm)
end
def datum_reader(schm)
Avro::IO::DatumReader.new(schm)
end
def write_datum(datum, writers_schema)
writer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(writer)
datum_writer = Avro::IO::DatumWriter.new(writers_schema)
datum_writer.write(datum, encoder)
[writer, encoder, datum_writer]
end
def read_datum(buffer, writers_schema, readers_schema=nil)
reader = StringIO.new(buffer.string)
decoder = Avro::IO::BinaryDecoder.new(reader)
datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
datum_reader.read(decoder)
end
end