AVRO-2919 Refactor Datafile Tests (#946)
- Ensure avro.test.test_datafile tests always close resources.
- Use tempfiles to ensure datafile tests can be run in parallel.
diff --git a/lang/py/avro/test/test_datafile.py b/lang/py/avro/test/test_datafile.py
index a823d58..751edbb 100644
--- a/lang/py/avro/test/test_datafile.py
+++ b/lang/py/avro/test/test_datafile.py
@@ -19,24 +19,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import, division, print_function
-
+import contextlib
+import itertools
import os
+import tempfile
import unittest
-from avro import datafile, io, schema
-from avro.codecs import Codecs
+import avro.codecs
+import avro.datafile
+import avro.io
+import avro.schema
-try:
- unicode
-except NameError:
- unicode = str
-
-
-SCHEMAS_TO_VALIDATE = (
+CODECS_TO_VALIDATE = avro.codecs.Codecs.supported_codec_names()
+TEST_PAIRS = tuple((avro.schema.parse(schema), datum) for schema, datum in (
('"null"', None),
('"boolean"', True),
- ('"string"', unicode('adsfasdf09809dsf-=adsf')),
+ ('"string"', 'adsfasdf09809dsf-=adsf'),
('"bytes"', b'12345abcd'),
('"int"', 1234),
('"long"', 1234),
@@ -45,9 +43,9 @@
('{"type": "fixed", "name": "Test", "size": 1}', b'B'),
('{"type": "enum", "name": "Test", "symbols": ["A", "B"]}', 'B'),
('{"type": "array", "items": "long"}', [1, 3, 2]),
- ('{"type": "map", "values": "long"}', {unicode('a'): 1,
- unicode('b'): 3,
- unicode('c'): 2}),
+ ('{"type": "map", "values": "long"}', {'a': 1,
+ 'b': 3,
+ 'c': 2}),
('["string", "null", "long"]', None),
("""\
{"type": "record",
@@ -63,162 +61,112 @@
"name": "Cons",
"fields": [{"name": "car", "type": "Lisp"},
{"name": "cdr", "type": "Lisp"}]}]}]}
- """, {'value': {'car': {'value': unicode('head')}, 'cdr': {'value': None}}}),
-)
+ """, {'value': {'car': {'value': 'head'}, 'cdr': {'value': None}}}),
+))
-FILENAME = 'test_datafile.out'
-CODECS_TO_VALIDATE = Codecs.supported_codec_names()
+
+@contextlib.contextmanager
+def writer(path, schema=None, codec=avro.datafile.NULL_CODEC, mode='wb'):
+ with avro.datafile.DataFileWriter(open(path, mode), avro.io.DatumWriter(), schema, codec) as dfw:
+ yield dfw
+
+
+@contextlib.contextmanager
+def reader(path, mode='rb'):
+ with avro.datafile.DataFileReader(open(path, mode), avro.io.DatumReader()) as dfr:
+ yield dfr
class TestDataFile(unittest.TestCase):
- def test_round_trip(self):
- print('')
- print('TEST ROUND TRIP')
- print('===============')
- print('')
- correct = 0
- for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
- for codec in CODECS_TO_VALIDATE:
- print('')
- print('SCHEMA NUMBER %d' % (i + 1))
- print('================')
- print('')
- print('Schema: %s' % example_schema)
- print('Datum: %s' % datum)
- print('Codec: %s' % codec)
+ files = None
- # write data in binary to file 10 times
- writer = open(FILENAME, 'wb')
- datum_writer = io.DatumWriter()
- schema_object = schema.parse(example_schema)
- dfw = datafile.DataFileWriter(writer, datum_writer, schema_object, codec=codec)
- for i in range(10):
- dfw.append(datum)
- dfw.close()
+ def setUp(self):
+ """Initialize tempfiles collection."""
+ self.files = []
- # read data in binary from file
- reader = open(FILENAME, 'rb')
- datum_reader = io.DatumReader()
- dfr = datafile.DataFileReader(reader, datum_reader)
- round_trip_data = []
- for datum in dfr:
- round_trip_data.append(datum)
+ def tempfile(self):
+ """Generate a tempfile and register it for cleanup."""
+ with tempfile.NamedTemporaryFile(delete=False, suffix='.avro') as f:
+ pass
+ self.files.append(f.name)
+ return f.name
- print('Round Trip Data: %s' % round_trip_data)
- print('Round Trip Data Length: %d' % len(round_trip_data))
- is_correct = [datum] * 10 == round_trip_data
- if is_correct:
- correct += 1
- print('Correct Round Trip: %s' % is_correct)
- print('')
- os.remove(FILENAME)
- self.assertEqual(correct, len(CODECS_TO_VALIDATE) * len(SCHEMAS_TO_VALIDATE))
+ def tearDown(self):
+ """Clean up temporary files."""
+ for f in self.files:
+ os.unlink(f)
def test_append(self):
- print('')
- print('TEST APPEND')
- print('===========')
- print('')
- correct = 0
- for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
- for codec in CODECS_TO_VALIDATE:
- print('')
- print('SCHEMA NUMBER %d' % (i + 1))
- print('================')
- print('')
- print('Schema: %s' % example_schema)
- print('Datum: %s' % datum)
- print('Codec: %s' % codec)
-
+ '''A datafile can be written to, appended to, and read from.'''
+ for codec in CODECS_TO_VALIDATE:
+ for schema, datum in TEST_PAIRS:
# write data in binary to file once
- writer = open(FILENAME, 'wb')
- datum_writer = io.DatumWriter()
- schema_object = schema.parse(example_schema)
- dfw = datafile.DataFileWriter(writer, datum_writer, schema_object, codec=codec)
- dfw.append(datum)
- dfw.close()
+ path = self.tempfile()
+ with writer(path, schema, codec) as dfw:
+ dfw.append(datum)
# open file, write, and close nine times
- for i in range(9):
- writer = open(FILENAME, 'ab+')
- dfw = datafile.DataFileWriter(writer, io.DatumWriter())
- dfw.append(datum)
- dfw.close()
+ for _ in range(9):
+ with writer(path, mode='ab+') as dfw:
+ dfw.append(datum)
# read data in binary from file
- reader = open(FILENAME, 'rb')
- datum_reader = io.DatumReader()
- dfr = datafile.DataFileReader(reader, datum_reader)
- appended_data = []
- for datum in dfr:
- appended_data.append(datum)
+ with reader(path) as dfr:
+ data = list(itertools.islice(dfr, 10))
+ self.assertRaises(StopIteration, next, dfr)
+ self.assertEqual(len(data), 10)
+ self.assertEqual(data, [datum] * 10)
- print('Appended Data: %s' % appended_data)
- print('Appended Data Length: %d' % len(appended_data))
- is_correct = [datum] * 10 == appended_data
- if is_correct:
- correct += 1
- print('Correct Appended: %s' % is_correct)
- print('')
- os.remove(FILENAME)
- self.assertEqual(correct, len(CODECS_TO_VALIDATE) * len(SCHEMAS_TO_VALIDATE))
+ def test_round_trip(self):
+ '''A datafile can be written to and read from.'''
+ for codec in CODECS_TO_VALIDATE:
+ for schema, datum in TEST_PAIRS:
+ # write data in binary to file 10 times
+ path = self.tempfile()
+ with writer(path, schema, codec) as dfw:
+ for _ in range(10):
+ dfw.append(datum)
+
+ # read data in binary from file
+ with reader(path) as dfr:
+ data = list(itertools.islice(dfr, 10))
+ self.assertRaises(StopIteration, next, dfr)
+ self.assertEqual(len(data), 10)
+ self.assertEqual(data, [datum] * 10)
def test_context_manager(self):
- """Test the writer with a 'with' statement."""
- writer = open(FILENAME, 'wb')
- datum_writer = io.DatumWriter()
- sample_schema, sample_datum = SCHEMAS_TO_VALIDATE[1]
- schema_object = schema.parse(sample_schema)
- with datafile.DataFileWriter(writer, datum_writer, schema_object) as dfw:
- dfw.append(sample_datum)
- self.assertTrue(writer.closed)
+ '''A datafile closes its buffer object when it exits a with block.'''
+ path = self.tempfile()
+ for schema, _ in TEST_PAIRS:
+ with writer(path, schema) as dfw:
+ self.assertFalse(dfw.writer.closed)
+ self.assertTrue(dfw.writer.closed)
- # Test the reader with a 'with' statement.
- datums = []
- reader = open(FILENAME, 'rb')
- datum_reader = io.DatumReader()
- with datafile.DataFileReader(reader, datum_reader) as dfr:
- for datum in dfr:
- datums.append(datum)
- self.assertTrue(reader.closed)
+ with reader(path) as dfr:
+ self.assertFalse(dfr.reader.closed)
+ self.assertTrue(dfr.reader.closed)
def test_metadata(self):
- # Test the writer with a 'with' statement.
- writer = open(FILENAME, 'wb')
- datum_writer = io.DatumWriter()
- sample_schema, sample_datum = SCHEMAS_TO_VALIDATE[1]
- schema_object = schema.parse(sample_schema)
- with datafile.DataFileWriter(writer, datum_writer, schema_object) as dfw:
- dfw.set_meta('test.string', b'foo')
- dfw.set_meta('test.number', b'1')
- dfw.append(sample_datum)
- self.assertTrue(writer.closed)
-
- # Test the reader with a 'with' statement.
- datums = []
- reader = open(FILENAME, 'rb')
- datum_reader = io.DatumReader()
- with datafile.DataFileReader(reader, datum_reader) as dfr:
- self.assertEqual(b'foo', dfr.get_meta('test.string'))
- self.assertEqual(b'1', dfr.get_meta('test.number'))
- for datum in dfr:
- datums.append(datum)
- self.assertTrue(reader.closed)
+ '''Metadata can be written to a datafile, and read from it later.'''
+ path = self.tempfile()
+ for schema, _ in TEST_PAIRS:
+ with writer(path, schema) as dfw:
+ dfw.set_meta('test.string', b'foo')
+ dfw.set_meta('test.number', b'1')
+ with reader(path) as dfr:
+ self.assertEqual(b'foo', dfr.get_meta('test.string'))
+ self.assertEqual(b'1', dfr.get_meta('test.number'))
def test_empty_datafile(self):
"""A reader should not fail to read a file consisting of a single empty block."""
- sample_schema = schema.parse(SCHEMAS_TO_VALIDATE[1][0])
- with datafile.DataFileWriter(open(FILENAME, 'wb'), io.DatumWriter(),
- sample_schema) as dfw:
- dfw.flush()
- # Write an empty block
- dfw.encoder.write_long(0)
- dfw.encoder.write_long(0)
- dfw.writer.write(dfw.sync_marker)
+ path = self.tempfile()
+ for schema, _ in TEST_PAIRS:
+ with writer(path, schema) as dfw:
+ dfw.flush()
+ # Write an empty block
+ dfw.encoder.write_long(0)
+ dfw.encoder.write_long(0)
+ dfw.writer.write(dfw.sync_marker)
- with datafile.DataFileReader(open(FILENAME, 'rb'), io.DatumReader()) as dfr:
- self.assertEqual([], list(dfr))
-
-
-if __name__ == '__main__':
- unittest.main()
+ with reader(path) as dfr:
+ self.assertRaises(StopIteration, next, dfr)