| #!/usr/bin/env python |
| |
| ## |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Read/Write Avro File Object Containers.""" |
| |
| from __future__ import absolute_import, division, print_function |
| |
| import io |
| import os |
| import random |
| import zlib |
| |
| import avro.io |
| import avro.schema |
| |
| try: |
| import snappy |
| has_snappy = True |
| except ImportError: |
| has_snappy = False |
| try: |
| import zstandard as zstd |
| has_zstandard = True |
| except ImportError: |
| has_zstandard = False |
| # |
| # Constants |
| # |
| |
| VERSION = 1 |
| MAGIC = 'Obj' + chr(VERSION) |
| MAGIC_SIZE = len(MAGIC) |
| SYNC_SIZE = 16 |
| SYNC_INTERVAL = 4000 * SYNC_SIZE # TODO(hammer): make configurable |
| META_SCHEMA = avro.schema.parse("""\ |
| {"type": "record", "name": "org.apache.avro.file.Header", |
| "fields" : [ |
| {"name": "magic", "type": {"type": "fixed", "name": "magic", "size": %d}}, |
| {"name": "meta", "type": {"type": "map", "values": "bytes"}}, |
| {"name": "sync", "type": {"type": "fixed", "name": "sync", "size": %d}}]} |
| """ % (MAGIC_SIZE, SYNC_SIZE)) |
| VALID_CODECS = ['null', 'deflate'] |
| if has_snappy: |
| VALID_CODECS.append('snappy') |
| if has_zstandard: |
| VALID_CODECS.append('zstandard') |
| VALID_ENCODINGS = ['binary'] # not used yet |
| |
| CODEC_KEY = "avro.codec" |
| SCHEMA_KEY = "avro.schema" |
| |
| # |
| # Exceptions |
| # |
| |
| class DataFileException(avro.schema.AvroException): |
| """ |
| Raised when there's a problem reading or writing file object containers. |
| """ |
| def __init__(self, fail_msg): |
| avro.schema.AvroException.__init__(self, fail_msg) |
| |
| # |
| # Write Path |
| # |
| |
| class DataFileWriter(object): |
| @staticmethod |
| def generate_sync_marker(): |
| return generate_sixteen_random_bytes() |
| |
| # TODO(hammer): make 'encoder' a metadata property |
| def __init__(self, writer, datum_writer, writers_schema=None, codec='null'): |
| """ |
| If the schema is not present, presume we're appending. |
| |
| @param writer: File-like object to write into. |
| """ |
| self._writer = writer |
| self._encoder = avro.io.BinaryEncoder(writer) |
| self._datum_writer = datum_writer |
| self._buffer_writer = io.BytesIO() |
| self._buffer_encoder = avro.io.BinaryEncoder(self._buffer_writer) |
| self._block_count = 0 |
| self._meta = {} |
| self._header_written = False |
| |
| if writers_schema is not None: |
| if codec not in VALID_CODECS: |
| raise DataFileException("Unknown codec: %r" % codec) |
| self._sync_marker = DataFileWriter.generate_sync_marker() |
| self.set_meta('avro.codec', codec) |
| self.set_meta('avro.schema', str(writers_schema)) |
| self.datum_writer.writers_schema = writers_schema |
| else: |
| # open writer for reading to collect metadata |
| dfr = DataFileReader(writer, avro.io.DatumReader()) |
| |
| # TODO(hammer): collect arbitrary metadata |
| # collect metadata |
| self._sync_marker = dfr.sync_marker |
| self.set_meta('avro.codec', dfr.get_meta('avro.codec')) |
| |
| # get schema used to write existing file |
| schema_from_file = dfr.get_meta('avro.schema') |
| self.set_meta('avro.schema', schema_from_file) |
| self.datum_writer.writers_schema = avro.schema.parse(schema_from_file) |
| |
| # seek to the end of the file and prepare for writing |
| writer.seek(0, 2) |
| self._header_written = True |
| |
| # read-only properties |
| writer = property(lambda self: self._writer) |
| encoder = property(lambda self: self._encoder) |
| datum_writer = property(lambda self: self._datum_writer) |
| buffer_writer = property(lambda self: self._buffer_writer) |
| buffer_encoder = property(lambda self: self._buffer_encoder) |
| sync_marker = property(lambda self: self._sync_marker) |
| meta = property(lambda self: self._meta) |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, type, value, traceback): |
| # Perform a close if there's no exception |
| if type is None: |
| self.close() |
| |
| # read/write properties |
| def set_block_count(self, new_val): |
| self._block_count = new_val |
| block_count = property(lambda self: self._block_count, set_block_count) |
| |
| # utility functions to read/write metadata entries |
| def get_meta(self, key): |
| return self._meta.get(key) |
| def set_meta(self, key, val): |
| self._meta[key] = val |
| |
| def _write_header(self): |
| header = {'magic': MAGIC, |
| 'meta': self.meta, |
| 'sync': self.sync_marker} |
| self.datum_writer.write_data(META_SCHEMA, header, self.encoder) |
| self._header_written = True |
| |
| # TODO(hammer): make a schema for blocks and use datum_writer |
| def _write_block(self): |
| if not self._header_written: |
| self._write_header() |
| |
| if self.block_count > 0: |
| # write number of items in block |
| self.encoder.write_long(self.block_count) |
| |
| # write block contents |
| uncompressed_data = self.buffer_writer.getvalue() |
| if self.get_meta(CODEC_KEY) == 'null': |
| compressed_data = uncompressed_data |
| compressed_data_length = len(compressed_data) |
| elif self.get_meta(CODEC_KEY) == 'deflate': |
| # The first two characters and last character are zlib |
| # wrappers around deflate data. |
| compressed_data = zlib.compress(uncompressed_data)[2:-1] |
| compressed_data_length = len(compressed_data) |
| elif self.get_meta(CODEC_KEY) == 'snappy': |
| compressed_data = snappy.compress(uncompressed_data) |
| compressed_data_length = len(compressed_data) + 4 # crc32 |
| elif self.get_meta(CODEC_KEY) == 'zstandard': |
| compressed_data = zstd.ZstdCompressor().compress(uncompressed_data) |
| compressed_data_length = len(compressed_data) |
| else: |
| fail_msg = '"%s" codec is not supported.' % self.get_meta(CODEC_KEY) |
| raise DataFileException(fail_msg) |
| |
| # Write length of block |
| self.encoder.write_long(compressed_data_length) |
| |
| # Write block |
| self.writer.write(compressed_data) |
| |
| # Write CRC32 checksum for Snappy |
| if self.get_meta(CODEC_KEY) == 'snappy': |
| self.encoder.write_crc32(uncompressed_data) |
| |
| # write sync marker |
| self.writer.write(self.sync_marker) |
| |
| # reset buffer |
| self.buffer_writer.truncate(0) |
| self.block_count = 0 |
| |
| def append(self, datum): |
| """Append a datum to the file.""" |
| self.datum_writer.write(datum, self.buffer_encoder) |
| self.block_count += 1 |
| |
| # if the data to write is larger than the sync interval, write the block |
| if self.buffer_writer.tell() >= SYNC_INTERVAL: |
| self._write_block() |
| |
| def sync(self): |
| """ |
| Return the current position as a value that may be passed to |
| DataFileReader.seek(long). Forces the end of the current block, |
| emitting a synchronization marker. |
| """ |
| self._write_block() |
| return self.writer.tell() |
| |
| def flush(self): |
| """Flush the current state of the file, including metadata.""" |
| self._write_block() |
| self.writer.flush() |
| |
| def close(self): |
| """Close the file.""" |
| self.flush() |
| self.writer.close() |
| |
| class DataFileReader(object): |
| """Read files written by DataFileWriter.""" |
| # TODO(hammer): allow user to specify expected schema? |
| # TODO(hammer): allow user to specify the encoder |
| def __init__(self, reader, datum_reader): |
| self._reader = reader |
| self._raw_decoder = avro.io.BinaryDecoder(reader) |
| self._datum_decoder = None # Maybe reset at every block. |
| self._datum_reader = datum_reader |
| |
| # read the header: magic, meta, sync |
| self._read_header() |
| |
| # ensure codec is valid |
| self.codec = self.get_meta('avro.codec') |
| if self.codec is None: |
| self.codec = "null" |
| if self.codec not in VALID_CODECS: |
| raise DataFileException('Unknown codec: %s.' % self.codec) |
| |
| # get file length |
| self._file_length = self.determine_file_length() |
| |
| # get ready to read |
| self._block_count = 0 |
| self.datum_reader.writers_schema = avro.schema.parse(self.get_meta(SCHEMA_KEY)) |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, type, value, traceback): |
| # Perform a close if there's no exception |
| if type is None: |
| self.close() |
| |
| def __iter__(self): |
| return self |
| |
| # read-only properties |
| reader = property(lambda self: self._reader) |
| raw_decoder = property(lambda self: self._raw_decoder) |
| datum_decoder = property(lambda self: self._datum_decoder) |
| datum_reader = property(lambda self: self._datum_reader) |
| sync_marker = property(lambda self: self._sync_marker) |
| meta = property(lambda self: self._meta) |
| file_length = property(lambda self: self._file_length) |
| |
| # read/write properties |
| def set_block_count(self, new_val): |
| self._block_count = new_val |
| block_count = property(lambda self: self._block_count, set_block_count) |
| |
| # utility functions to read/write metadata entries |
| def get_meta(self, key): |
| return self._meta.get(key) |
| def set_meta(self, key, val): |
| self._meta[key] = val |
| |
| def determine_file_length(self): |
| """ |
| Get file length and leave file cursor where we found it. |
| """ |
| remember_pos = self.reader.tell() |
| self.reader.seek(0, 2) |
| file_length = self.reader.tell() |
| self.reader.seek(remember_pos) |
| return file_length |
| |
| def is_EOF(self): |
| return self.reader.tell() == self.file_length |
| |
| def _read_header(self): |
| # seek to the beginning of the file to get magic block |
| self.reader.seek(0, 0) |
| |
| # read header into a dict |
| header = self.datum_reader.read_data( |
| META_SCHEMA, META_SCHEMA, self.raw_decoder) |
| |
| # check magic number |
| if header.get('magic') != MAGIC: |
| fail_msg = "Not an Avro data file: %s doesn't match %s."\ |
| % (header.get('magic'), MAGIC) |
| raise avro.schema.AvroException(fail_msg) |
| |
| # set metadata |
| self._meta = header['meta'] |
| |
| # set sync marker |
| self._sync_marker = header['sync'] |
| |
| def _read_block_header(self): |
| self.block_count = self.raw_decoder.read_long() |
| if self.codec == "null": |
| # Skip a long; we don't need to use the length. |
| self.raw_decoder.skip_long() |
| self._datum_decoder = self._raw_decoder |
| elif self.codec == 'deflate': |
| # Compressed data is stored as (length, data), which |
| # corresponds to how the "bytes" type is encoded. |
| data = self.raw_decoder.read_bytes() |
| # -15 is the log of the window size; negative indicates |
| # "raw" (no zlib headers) decompression. See zlib.h. |
| uncompressed = zlib.decompress(data, -15) |
| self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed)) |
| elif self.codec == 'snappy': |
| # Compressed data includes a 4-byte CRC32 checksum |
| length = self.raw_decoder.read_long() |
| data = self.raw_decoder.read(length - 4) |
| uncompressed = snappy.decompress(data) |
| self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed)) |
| self.raw_decoder.check_crc32(uncompressed); |
| elif self.codec == 'zstandard': |
| length = self.raw_decoder.read_long() |
| data = self.raw_decoder.read(length) |
| uncompressed = bytearray() |
| dctx = zstd.ZstdDecompressor() |
| with dctx.stream_reader(io.BytesIO(data)) as reader: |
| while True: |
| chunk = reader.read(16384) |
| if not chunk: |
| break |
| uncompressed.extend(chunk) |
| self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed)) |
| else: |
| raise DataFileException("Unknown codec: %r" % self.codec) |
| |
| def _skip_sync(self): |
| """ |
| Read the length of the sync marker; if it matches the sync marker, |
| return True. Otherwise, seek back to where we started and return False. |
| """ |
| proposed_sync_marker = self.reader.read(SYNC_SIZE) |
| if proposed_sync_marker != self.sync_marker: |
| self.reader.seek(-SYNC_SIZE, 1) |
| return False |
| return True |
| |
| def next(self): |
| """Return the next datum in the file.""" |
| while self.block_count == 0: |
| if self.is_EOF() or (self._skip_sync() and self.is_EOF()): |
| raise StopIteration |
| self._read_block_header() |
| |
| datum = self.datum_reader.read(self.datum_decoder) |
| self.block_count -= 1 |
| return datum |
| |
| def close(self): |
| """Close this reader.""" |
| self.reader.close() |
| |
| def generate_sixteen_random_bytes(): |
| try: |
| return os.urandom(16) |
| except NotImplementedError: |
| return [chr(random.randrange(256)) for i in range(16)] |