blob: e46505575f9f9d68ec55881fb1df9a610454f5a2 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'openssl'
module Avro
module DataFile
VERSION = 1
MAGIC = "Obj" + [VERSION].pack('c')
MAGIC.force_encoding('BINARY') if MAGIC.respond_to?(:force_encoding)
MAGIC_SIZE = MAGIC.respond_to?(:bytesize) ? MAGIC.bytesize : MAGIC.size
SYNC_SIZE = 16
SYNC_INTERVAL = 4000 * SYNC_SIZE
META_SCHEMA = Schema.parse('{"type": "map", "values": "bytes"}')
VALID_ENCODINGS = ['binary'] # not used yet
class DataFileError < AvroError; end
def self.open(file_path, mode='r', schema=nil, codec=nil)
schema = Avro::Schema.parse(schema) if schema
case mode
when 'w'
unless schema
raise DataFileError, "Writing an Avro file requires a schema."
end
io = open_writer(File.open(file_path, 'wb'), schema, codec)
when 'r'
io = open_reader(File.open(file_path, 'rb'), schema)
else
raise DataFileError, "Only modes 'r' and 'w' allowed. You gave #{mode.inspect}."
end
yield io if block_given?
io
ensure
io.close if block_given? && io
end
def self.codecs
@codecs
end
def self.register_codec(codec)
@codecs ||= {}
codec = codec.new if !codec.respond_to?(:codec_name) && codec.is_a?(Class)
@codecs[codec.codec_name.to_s] = codec
end
def self.get_codec(codec)
codec ||= 'null'
if codec.respond_to?(:compress) && codec.respond_to?(:decompress)
codec # it's a codec instance
elsif codec.is_a?(Class)
codec.new # it's a codec class
elsif @codecs.include?(codec.to_s)
@codecs[codec.to_s] # it's a string or symbol (codec name)
else
raise DataFileError, "Unknown codec: #{codec.inspect}"
end
end
class << self
private
def open_writer(file, schema, codec=nil)
writer = Avro::IO::DatumWriter.new(schema)
Avro::DataFile::Writer.new(file, writer, schema, codec)
end
def open_reader(file, schema)
reader = Avro::IO::DatumReader.new(nil, schema)
Avro::DataFile::Reader.new(file, reader)
end
end
class Writer
def self.generate_sync_marker
OpenSSL::Random.random_bytes(16)
end
attr_reader :writer, :encoder, :datum_writer, :buffer_writer, :buffer_encoder, :sync_marker, :meta, :codec
attr_accessor :block_count
def initialize(writer, datum_writer, writers_schema=nil, codec=nil, meta={})
# If writers_schema is not present, presume we're appending
@writer = writer
@encoder = IO::BinaryEncoder.new(@writer)
@datum_writer = datum_writer
@meta = meta
@buffer_writer = StringIO.new('', 'w')
@buffer_writer.set_encoding('BINARY') if @buffer_writer.respond_to?(:set_encoding)
@buffer_encoder = IO::BinaryEncoder.new(@buffer_writer)
@block_count = 0
if writers_schema
@sync_marker = Writer.generate_sync_marker
@codec = DataFile.get_codec(codec)
@meta['avro.codec'] = @codec.codec_name.to_s
@meta['avro.schema'] = writers_schema.to_s
datum_writer.writers_schema = writers_schema
write_header
else
# open writer for reading to collect metadata
dfr = Reader.new(writer, Avro::IO::DatumReader.new)
# FIXME(jmhodges): collect arbitrary metadata
# collect metadata
@sync_marker = dfr.sync_marker
@meta['avro.codec'] = dfr.meta['avro.codec']
@codec = DataFile.get_codec(meta['avro.codec'])
# get schema used to write existing file
schema_from_file = dfr.meta['avro.schema']
@meta['avro.schema'] = schema_from_file
datum_writer.writers_schema = Schema.parse(schema_from_file)
# seek to the end of the file and prepare for writing
writer.seek(0,2)
end
end
# Append a datum to the file
def <<(datum)
datum_writer.write(datum, buffer_encoder)
self.block_count += 1
# if the data to write is larger than the sync interval, write
# the block
if buffer_writer.tell >= SYNC_INTERVAL
write_block
end
end
# Return the current position as a value that may be passed to
# DataFileReader.seek(long). Forces the end of the current block,
# emitting a synchronization marker.
def sync
write_block
writer.tell
end
# Flush the current state of the file, including metadata
def flush
write_block
writer.flush
end
def close
flush
writer.close
end
private
def write_header
# write magic
writer.write(MAGIC)
# write metadata
datum_writer.write_data(META_SCHEMA, meta, encoder)
# write sync marker
writer.write(sync_marker)
end
# TODO(jmhodges): make a schema for blocks and use datum_writer
# TODO(jmhodges): do we really need the number of items in the block?
def write_block
if block_count > 0
# write number of items in block and block size in bytes
encoder.write_long(block_count)
to_write = codec.compress(buffer_writer.string)
encoder.write_long(to_write.respond_to?(:bytesize) ? to_write.bytesize : to_write.size)
# write block contents
writer.write(to_write)
# write sync marker
writer.write(sync_marker)
# reset buffer
buffer_writer.truncate(0)
buffer_writer.rewind
self.block_count = 0
end
end
end
# Read files written by DataFileWriter
class Reader
include ::Enumerable
# The reader and binary decoder for the raw file stream
attr_reader :reader, :decoder
# The binary decoder for the contents of a block (after codec decompression)
attr_reader :block_decoder
attr_reader :datum_reader, :sync_marker, :meta, :file_length, :codec
attr_accessor :block_count # records remaining in current block
def initialize(reader, datum_reader)
@reader = reader
@decoder = IO::BinaryDecoder.new(reader)
@datum_reader = datum_reader
# read the header: magic, meta, sync
read_header
@codec = DataFile.get_codec(meta['avro.codec'])
# get ready to read
@block_count = 0
datum_reader.writers_schema = Schema.parse meta['avro.schema']
end
# Iterates through each datum in this file
# TODO(jmhodges): handle block of length zero
def each
loop do
if block_count == 0
case
when eof?; break
when skip_sync
break if eof?
read_block_header
else
read_block_header
end
end
datum = datum_reader.read(block_decoder)
self.block_count -= 1
yield(datum)
end
end
def eof?; reader.eof?; end
def close
reader.close
end
private
def read_header
# seek to the beginning of the file to get magic block
reader.seek(0, 0)
# check magic number
magic_in_file = reader.read(MAGIC_SIZE)
if magic_in_file.size < MAGIC_SIZE
msg = 'Not an Avro data file: shorter than the Avro magic block'
raise DataFileError, msg
elsif magic_in_file != MAGIC
msg = "Not an Avro data file: #{magic_in_file.inspect} doesn't match #{MAGIC.inspect}"
raise DataFileError, msg
end
# read metadata
@meta = datum_reader.read_data(META_SCHEMA,
META_SCHEMA,
decoder)
# read sync marker
@sync_marker = reader.read(SYNC_SIZE)
end
def read_block_header
self.block_count = decoder.read_long
block_bytes = decoder.read_long
data = codec.decompress(reader.read(block_bytes))
@block_decoder = IO::BinaryDecoder.new(StringIO.new(data))
end
# read the length of the sync marker; if it matches the sync
# marker, return true. Otherwise, seek back to where we started
# and return false
def skip_sync
proposed_sync_marker = reader.read(SYNC_SIZE)
if proposed_sync_marker != sync_marker
reader.seek(-SYNC_SIZE, 1)
false
else
true
end
end
end
class NullCodec
def codec_name; 'null'; end
def decompress(data); data; end
def compress(data); data; end
end
class DeflateCodec
attr_reader :level
def initialize(level=Zlib::DEFAULT_COMPRESSION)
@level = level
end
def codec_name; 'deflate'; end
def decompress(compressed)
# Passing a negative number to Inflate puts it into "raw" RFC1951 mode
# (without the RFC1950 header & checksum). See the docs for
# inflateInit2 in http://www.zlib.net/manual.html
zstream = Zlib::Inflate.new(-Zlib::MAX_WBITS)
data = zstream.inflate(compressed)
data << zstream.finish
ensure
zstream.close
end
def compress(data)
zstream = Zlib::Deflate.new(level, -Zlib::MAX_WBITS)
compressed = zstream.deflate(data)
compressed << zstream.finish
ensure
zstream.close
end
end
class SnappyCodec
def codec_name; 'snappy'; end
def decompress(data)
load_snappy!
crc32 = data.slice(-4..-1).unpack('N').first
uncompressed = Snappy.inflate(data.slice(0..-5))
if crc32 == Zlib.crc32(uncompressed)
uncompressed
else
# older versions of avro-ruby didn't write the checksum, so if it
# doesn't match this must assume that it wasn't there and return
# the entire payload uncompressed.
Snappy.inflate(data)
end
rescue Snappy::Error
# older versions of avro-ruby didn't write the checksum, so removing
# the last 4 bytes may cause Snappy to fail. recover by assuming the
# payload is from an older file and uncompress the entire buffer.
Snappy.inflate(data)
end
def compress(data)
load_snappy!
crc32 = Zlib.crc32(data)
compressed = Snappy.deflate(data)
[compressed, crc32].pack('a*N')
end
private
def load_snappy!
require 'snappy' unless defined?(Snappy)
rescue LoadError
raise LoadError, "Snappy compression is not available, please install the `snappy` gem."
end
end
DataFile.register_codec NullCodec
DataFile.register_codec DeflateCodec
DataFile.register_codec SnappyCodec
# TODO this constant won't be updated if you register another codec.
# Deprecated in favor of Avro::DataFile::codecs
VALID_CODECS = DataFile.codecs.keys
end
end