blob: 751edbb847c115777db9db04cabd7b4e29dc8d3e [file] [log] [blame]
#!/usr/bin/env python3
# -*- mode: python -*-
# -*- coding: utf-8 -*-
##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import itertools
import os
import tempfile
import unittest
import avro.codecs
import avro.datafile
import avro.io
import avro.schema
CODECS_TO_VALIDATE = avro.codecs.Codecs.supported_codec_names()
TEST_PAIRS = tuple((avro.schema.parse(schema), datum) for schema, datum in (
('"null"', None),
('"boolean"', True),
('"string"', 'adsfasdf09809dsf-=adsf'),
('"bytes"', b'12345abcd'),
('"int"', 1234),
('"long"', 1234),
('"float"', 1234.0),
('"double"', 1234.0),
('{"type": "fixed", "name": "Test", "size": 1}', b'B'),
('{"type": "enum", "name": "Test", "symbols": ["A", "B"]}', 'B'),
('{"type": "array", "items": "long"}', [1, 3, 2]),
('{"type": "map", "values": "long"}', {'a': 1,
'b': 3,
'c': 2}),
('["string", "null", "long"]', None),
("""\
{"type": "record",
"name": "Test",
"fields": [{"name": "f", "type": "long"}]}
""", {'f': 5}),
("""\
{"type": "record",
"name": "Lisp",
"fields": [{"name": "value",
"type": ["null", "string",
{"type": "record",
"name": "Cons",
"fields": [{"name": "car", "type": "Lisp"},
{"name": "cdr", "type": "Lisp"}]}]}]}
""", {'value': {'car': {'value': 'head'}, 'cdr': {'value': None}}}),
))
@contextlib.contextmanager
def writer(path, schema=None, codec=avro.datafile.NULL_CODEC, mode='wb'):
with avro.datafile.DataFileWriter(open(path, mode), avro.io.DatumWriter(), schema, codec) as dfw:
yield dfw
@contextlib.contextmanager
def reader(path, mode='rb'):
with avro.datafile.DataFileReader(open(path, mode), avro.io.DatumReader()) as dfr:
yield dfr
class TestDataFile(unittest.TestCase):
files = None
def setUp(self):
"""Initialize tempfiles collection."""
self.files = []
def tempfile(self):
"""Generate a tempfile and register it for cleanup."""
with tempfile.NamedTemporaryFile(delete=False, suffix='.avro') as f:
pass
self.files.append(f.name)
return f.name
def tearDown(self):
"""Clean up temporary files."""
for f in self.files:
os.unlink(f)
def test_append(self):
'''A datafile can be written to, appended to, and read from.'''
for codec in CODECS_TO_VALIDATE:
for schema, datum in TEST_PAIRS:
# write data in binary to file once
path = self.tempfile()
with writer(path, schema, codec) as dfw:
dfw.append(datum)
# open file, write, and close nine times
for _ in range(9):
with writer(path, mode='ab+') as dfw:
dfw.append(datum)
# read data in binary from file
with reader(path) as dfr:
data = list(itertools.islice(dfr, 10))
self.assertRaises(StopIteration, next, dfr)
self.assertEqual(len(data), 10)
self.assertEqual(data, [datum] * 10)
def test_round_trip(self):
'''A datafile can be written to and read from.'''
for codec in CODECS_TO_VALIDATE:
for schema, datum in TEST_PAIRS:
# write data in binary to file 10 times
path = self.tempfile()
with writer(path, schema, codec) as dfw:
for _ in range(10):
dfw.append(datum)
# read data in binary from file
with reader(path) as dfr:
data = list(itertools.islice(dfr, 10))
self.assertRaises(StopIteration, next, dfr)
self.assertEqual(len(data), 10)
self.assertEqual(data, [datum] * 10)
def test_context_manager(self):
'''A datafile closes its buffer object when it exits a with block.'''
path = self.tempfile()
for schema, _ in TEST_PAIRS:
with writer(path, schema) as dfw:
self.assertFalse(dfw.writer.closed)
self.assertTrue(dfw.writer.closed)
with reader(path) as dfr:
self.assertFalse(dfr.reader.closed)
self.assertTrue(dfr.reader.closed)
def test_metadata(self):
'''Metadata can be written to a datafile, and read from it later.'''
path = self.tempfile()
for schema, _ in TEST_PAIRS:
with writer(path, schema) as dfw:
dfw.set_meta('test.string', b'foo')
dfw.set_meta('test.number', b'1')
with reader(path) as dfr:
self.assertEqual(b'foo', dfr.get_meta('test.string'))
self.assertEqual(b'1', dfr.get_meta('test.number'))
def test_empty_datafile(self):
"""A reader should not fail to read a file consisting of a single empty block."""
path = self.tempfile()
for schema, _ in TEST_PAIRS:
with writer(path, schema) as dfw:
dfw.flush()
# Write an empty block
dfw.encoder.write_long(0)
dfw.encoder.write_long(0)
dfw.writer.write(dfw.sync_marker)
with reader(path) as dfr:
self.assertRaises(StopIteration, next, dfr)