| # frozen_string_literal: true |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| require 'openssl' |
| |
| module Avro |
| module DataFile |
| VERSION = 1 |
| MAGIC = "Obj" + [VERSION].pack('c') |
| MAGIC.force_encoding('BINARY') if MAGIC.respond_to?(:force_encoding) |
| MAGIC_SIZE = MAGIC.respond_to?(:bytesize) ? MAGIC.bytesize : MAGIC.size |
| SYNC_SIZE = 16 |
| SYNC_INTERVAL = 4000 * SYNC_SIZE |
| META_SCHEMA = Schema.parse('{"type": "map", "values": "bytes"}') |
| VALID_ENCODINGS = ['binary'].freeze # not used yet |
| |
| class DataFileError < AvroError; end |
| |
| def self.open(file_path, mode='r', schema=nil, codec=nil) |
| schema = Avro::Schema.parse(schema) if schema |
| case mode |
| when 'w' |
| unless schema |
| raise DataFileError, "Writing an Avro file requires a schema." |
| end |
| io = open_writer(File.open(file_path, 'wb'), schema, codec) |
| when 'r' |
| io = open_reader(File.open(file_path, 'rb'), schema) |
| else |
| raise DataFileError, "Only modes 'r' and 'w' allowed. You gave #{mode.inspect}." |
| end |
| |
| yield io if block_given? |
| io |
| ensure |
| io.close if block_given? && io |
| end |
| |
| def self.codecs |
| @codecs |
| end |
| |
| def self.register_codec(codec) |
| @codecs ||= {} |
| codec = codec.new if !codec.respond_to?(:codec_name) && codec.is_a?(Class) |
| @codecs[codec.codec_name.to_s] = codec |
| end |
| |
| def self.get_codec(codec) |
| codec ||= 'null' |
| if codec.respond_to?(:compress) && codec.respond_to?(:decompress) |
| codec # it's a codec instance |
| elsif codec.is_a?(Class) |
| codec.new # it's a codec class |
| elsif @codecs.include?(codec.to_s) |
| @codecs[codec.to_s] # it's a string or symbol (codec name) |
| else |
| raise DataFileError, "Unknown codec: #{codec.inspect}" |
| end |
| end |
| |
| class << self |
| private |
| def open_writer(file, schema, codec=nil) |
| writer = Avro::IO::DatumWriter.new(schema) |
| Avro::DataFile::Writer.new(file, writer, schema, codec) |
| end |
| |
| def open_reader(file, schema) |
| reader = Avro::IO::DatumReader.new(nil, schema) |
| Avro::DataFile::Reader.new(file, reader) |
| end |
| end |
| |
| class Writer |
| def self.generate_sync_marker |
| OpenSSL::Random.random_bytes(16) |
| end |
| |
| attr_reader :writer, :encoder, :datum_writer, :buffer_writer, :buffer_encoder, :sync_marker, :meta, :codec |
| attr_accessor :block_count |
| |
| def initialize(writer, datum_writer, writers_schema=nil, codec=nil, meta={}) |
| # If writers_schema is not present, presume we're appending |
| @writer = writer |
| @encoder = IO::BinaryEncoder.new(@writer) |
| @datum_writer = datum_writer |
| @meta = meta |
| @buffer_writer = StringIO.new(+'', 'w') |
| @buffer_writer.set_encoding('BINARY') if @buffer_writer.respond_to?(:set_encoding) |
| @buffer_encoder = IO::BinaryEncoder.new(@buffer_writer) |
| @block_count = 0 |
| |
| if writers_schema |
| @sync_marker = Writer.generate_sync_marker |
| @codec = DataFile.get_codec(codec) |
| @meta['avro.codec'] = @codec.codec_name.to_s |
| @meta['avro.schema'] = writers_schema.to_s |
| datum_writer.writers_schema = writers_schema |
| write_header |
| else |
| # open writer for reading to collect metadata |
| dfr = Reader.new(writer, Avro::IO::DatumReader.new) |
| |
| # FIXME(jmhodges): collect arbitrary metadata |
| # collect metadata |
| @sync_marker = dfr.sync_marker |
| @meta['avro.codec'] = dfr.meta['avro.codec'] |
| @codec = DataFile.get_codec(meta['avro.codec']) |
| |
| # get schema used to write existing file |
| schema_from_file = dfr.meta['avro.schema'] |
| @meta['avro.schema'] = schema_from_file |
| datum_writer.writers_schema = Schema.parse(schema_from_file) |
| |
| # seek to the end of the file and prepare for writing |
| writer.seek(0,2) |
| end |
| end |
| |
| # Append a datum to the file |
| def <<(datum) |
| datum_writer.write(datum, buffer_encoder) |
| self.block_count += 1 |
| |
| # if the data to write is larger than the sync interval, write |
| # the block |
| if buffer_writer.tell >= SYNC_INTERVAL |
| write_block |
| end |
| end |
| |
| # Return the current position as a value that may be passed to |
| # DataFileReader.seek(long). Forces the end of the current block, |
| # emitting a synchronization marker. |
| def sync |
| write_block |
| writer.tell |
| end |
| |
| # Flush the current state of the file, including metadata |
| def flush |
| write_block |
| writer.flush |
| end |
| |
| def close |
| flush |
| writer.close |
| end |
| |
| private |
| |
| def write_header |
| # write magic |
| writer.write(MAGIC) |
| |
| # write metadata |
| datum_writer.write_data(META_SCHEMA, meta, encoder) |
| |
| # write sync marker |
| writer.write(sync_marker) |
| end |
| |
| # TODO(jmhodges): make a schema for blocks and use datum_writer |
| # TODO(jmhodges): do we really need the number of items in the block? |
| def write_block |
| if block_count > 0 |
| # write number of items in block and block size in bytes |
| encoder.write_long(block_count) |
| to_write = codec.compress(buffer_writer.string) |
| encoder.write_long(to_write.respond_to?(:bytesize) ? to_write.bytesize : to_write.size) |
| |
| # write block contents |
| writer.write(to_write) |
| |
| # write sync marker |
| writer.write(sync_marker) |
| |
| # reset buffer |
| buffer_writer.truncate(0) |
| buffer_writer.rewind |
| self.block_count = 0 |
| end |
| end |
| end |
| |
| # Read files written by DataFileWriter |
| class Reader |
| include ::Enumerable |
| |
| # The reader and binary decoder for the raw file stream |
| attr_reader :reader, :decoder |
| |
| # The binary decoder for the contents of a block (after codec decompression) |
| attr_reader :block_decoder |
| |
| attr_reader :datum_reader, :sync_marker, :meta, :file_length, :codec |
| attr_accessor :block_count # records remaining in current block |
| |
| def initialize(reader, datum_reader) |
| @reader = reader |
| @decoder = IO::BinaryDecoder.new(reader) |
| @datum_reader = datum_reader |
| |
| # read the header: magic, meta, sync |
| read_header |
| |
| @codec = DataFile.get_codec(meta['avro.codec']) |
| |
| # get ready to read |
| @block_count = 0 |
| datum_reader.writers_schema = Schema.parse meta['avro.schema'] |
| end |
| |
| # Iterates through each datum in this file |
| # TODO(jmhodges): handle block of length zero |
| def each |
| loop do |
| if block_count == 0 |
| case |
| when eof?; break |
| when skip_sync |
| break if eof? |
| read_block_header |
| else |
| read_block_header |
| end |
| end |
| |
| datum = datum_reader.read(block_decoder) |
| self.block_count -= 1 |
| yield(datum) |
| end |
| end |
| |
| def eof?; reader.eof?; end |
| |
| def close |
| reader.close |
| end |
| |
| private |
| def read_header |
| # seek to the beginning of the file to get magic block |
| reader.seek(0, 0) |
| |
| # check magic number |
| magic_in_file = reader.read(MAGIC_SIZE) |
| if magic_in_file.size < MAGIC_SIZE |
| msg = 'Not an Avro data file: shorter than the Avro magic block' |
| raise DataFileError, msg |
| elsif magic_in_file != MAGIC |
| msg = "Not an Avro data file: #{magic_in_file.inspect} doesn't match #{MAGIC.inspect}" |
| raise DataFileError, msg |
| end |
| |
| # read metadata |
| @meta = datum_reader.read_data(META_SCHEMA, |
| META_SCHEMA, |
| decoder) |
| # read sync marker |
| @sync_marker = reader.read(SYNC_SIZE) |
| end |
| |
| def read_block_header |
| self.block_count = decoder.read_long |
| block_bytes = decoder.read_long |
| data = codec.decompress(reader.read(block_bytes)) |
| @block_decoder = IO::BinaryDecoder.new(StringIO.new(data)) |
| end |
| |
| # read the length of the sync marker; if it matches the sync |
| # marker, return true. Otherwise, seek back to where we started |
| # and return false |
| def skip_sync |
| proposed_sync_marker = reader.read(SYNC_SIZE) |
| if proposed_sync_marker != sync_marker |
| reader.seek(-SYNC_SIZE, 1) |
| false |
| else |
| true |
| end |
| end |
| end |
| |
| |
| class NullCodec |
| def codec_name; 'null'; end |
| def decompress(data); data; end |
| def compress(data); data; end |
| end |
| |
| class DeflateCodec |
| attr_reader :level |
| |
| def initialize(level=Zlib::DEFAULT_COMPRESSION) |
| @level = level |
| end |
| |
| def codec_name; 'deflate'; end |
| |
| def decompress(compressed) |
| # Passing a negative number to Inflate puts it into "raw" RFC1951 mode |
| # (without the RFC1950 header & checksum). See the docs for |
| # inflateInit2 in https://www.zlib.net/manual.html |
| zstream = Zlib::Inflate.new(-Zlib::MAX_WBITS) |
| data = zstream.inflate(compressed) |
| data << zstream.finish |
| ensure |
| zstream.close |
| end |
| |
| def compress(data) |
| zstream = Zlib::Deflate.new(level, -Zlib::MAX_WBITS) |
| compressed = zstream.deflate(data) |
| compressed << zstream.finish |
| ensure |
| zstream.close |
| end |
| end |
| |
| class SnappyCodec |
| def codec_name; 'snappy'; end |
| |
| def decompress(data) |
| load_snappy! |
| crc32 = data.slice(-4..-1).unpack('N').first |
| uncompressed = Snappy.inflate(data.slice(0..-5)) |
| |
| if crc32 == Zlib.crc32(uncompressed) |
| uncompressed |
| else |
| # older versions of avro-ruby didn't write the checksum, so if it |
| # doesn't match this must assume that it wasn't there and return |
| # the entire payload uncompressed. |
| Snappy.inflate(data) |
| end |
| rescue Snappy::Error |
| # older versions of avro-ruby didn't write the checksum, so removing |
| # the last 4 bytes may cause Snappy to fail. recover by assuming the |
| # payload is from an older file and uncompress the entire buffer. |
| Snappy.inflate(data) |
| end |
| |
| def compress(data) |
| load_snappy! |
| crc32 = Zlib.crc32(data) |
| compressed = Snappy.deflate(data) |
| [compressed, crc32].pack('a*N') |
| end |
| |
| private |
| |
| def load_snappy! |
| require 'snappy' unless defined?(Snappy) |
| rescue LoadError |
| raise LoadError, "Snappy compression is not available, please install the `snappy` gem." |
| end |
| end |
| |
| class ZstandardCodec |
| def codec_name; 'zstandard'; end |
| |
| def decompress(data) |
| load_zstandard! |
| Zstd.decompress(data) |
| end |
| |
| def compress(data) |
| load_zstandard! |
| Zstd.compress(data) |
| end |
| |
| private |
| |
| def load_zstandard! |
| require 'zstd-ruby' unless defined?(Zstd) |
| rescue LoadError |
| raise LoadError, "Zstandard compression is not available, please install the `zstd-ruby` gem." |
| end |
| end |
| |
| DataFile.register_codec NullCodec |
| DataFile.register_codec DeflateCodec |
| DataFile.register_codec SnappyCodec |
| DataFile.register_codec ZstandardCodec |
| |
| # TODO this constant won't be updated if you register another codec. |
| # Deprecated in favor of Avro::DataFile::codecs |
| VALID_CODECS = DataFile.codecs.keys |
| end |
| end |