blob: f071895fd35fe7c407618aae4c84ce515333c358 [file] [log] [blame]
<?php
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Classes handling reading and writing from and to AvroIO objects
* @package Avro
*/
/**
* Raised when something unkind happens with respect to AvroDataIO.
* @package Avro
*/
class AvroDataIOException extends AvroException {}
/**
* @package Avro
*/
class AvroDataIO
{
/**
* @var int used in file header
*/
const VERSION = 1;
/**
* @var int count of bytes in synchronization marker
*/
const SYNC_SIZE = 16;
/**
* @var int count of items per block, arbitrarily set to 4000 * SYNC_SIZE
* @todo make this value configurable
*/
const SYNC_INTERVAL = 64000;
/**
* @var string map key for datafile metadata codec value
*/
const METADATA_CODEC_ATTR = 'avro.codec';
/**
* @var string map key for datafile metadata schema value
*/
const METADATA_SCHEMA_ATTR = 'avro.schema';
/**
* @var string JSON for datafile metadata schema
*/
const METADATA_SCHEMA_JSON = '{"type":"map","values":"bytes"}';
/**
* @var string codec value for NULL codec
*/
const NULL_CODEC = 'null';
/**
* @var string codec value for deflate codec
*/
const DEFLATE_CODEC = 'deflate';
const SNAPPY_CODEC = 'snappy';
const ZSTANDARD_CODEC = 'zstandard';
const BZIP2_CODEC = 'bzip2';
/**
* @var array array of valid codec names
*/
private static $valid_codecs = [self::NULL_CODEC, self::DEFLATE_CODEC, self::SNAPPY_CODEC, self::ZSTANDARD_CODEC];
/**
* @var AvroSchema cached version of metadata schema object
*/
private static $metadata_schema;
/**
* @returns the initial "magic" segment of an Avro container file header.
*/
public static function magic() { return ('Obj' . pack('c', self::VERSION)); }
/**
* @returns int count of bytes in the initial "magic" segment of the
* Avro container file header
*/
public static function magic_size() { return strlen(self::magic()); }
/**
* @returns AvroSchema object of Avro container file metadata.
*/
public static function metadata_schema()
{
if (is_null(self::$metadata_schema))
self::$metadata_schema = AvroSchema::parse(self::METADATA_SCHEMA_JSON);
return self::$metadata_schema;
}
/**
* @param string $file_path file_path of file to open
* @param string $mode one of AvroFile::READ_MODE or AvroFile::WRITE_MODE
* @param string $schema_json JSON of writer's schema
* @param string $codec compression codec
* @returns AvroDataIOWriter instance of AvroDataIOWriter
*
* @throws AvroDataIOException if $writers_schema is not provided
* or if an invalid $mode is given.
*/
public static function open_file($file_path, $mode=AvroFile::READ_MODE,
$schema_json=null, $codec=self::NULL_CODEC)
{
$schema = !is_null($schema_json)
? AvroSchema::parse($schema_json) : null;
$io = false;
switch ($mode)
{
case AvroFile::WRITE_MODE:
if (is_null($schema))
throw new AvroDataIOException('Writing an Avro file requires a schema.');
$file = new AvroFile($file_path, AvroFile::WRITE_MODE);
$io = self::open_writer($file, $schema, $codec);
break;
case AvroFile::READ_MODE:
$file = new AvroFile($file_path, AvroFile::READ_MODE);
$io = self::open_reader($file, $schema);
break;
default:
throw new AvroDataIOException(
sprintf("Only modes '%s' and '%s' allowed. You gave '%s'.",
AvroFile::READ_MODE, AvroFile::WRITE_MODE, $mode));
}
return $io;
}
/**
* @returns array array of valid codecs
*/
public static function valid_codecs()
{
return self::$valid_codecs;
}
/**
* @param string $codec
* @returns boolean true if $codec is a valid codec value and false otherwise
*/
public static function is_valid_codec($codec)
{
return in_array($codec, self::valid_codecs());
}
/**
* @param AvroIO $io
* @param AvroSchema $schema
* @param string $codec
* @returns AvroDataIOWriter
*/
protected static function open_writer($io, $schema, $codec=self::NULL_CODEC)
{
$writer = new AvroIODatumWriter($schema);
return new AvroDataIOWriter($io, $writer, $schema, $codec);
}
/**
* @param AvroIO $io
* @param AvroSchema $schema
* @returns AvroDataIOReader
*/
protected static function open_reader($io, $schema)
{
$reader = new AvroIODatumReader(null, $schema);
return new AvroDataIOReader($io, $reader);
}
}
/**
*
* Reads Avro data from an AvroIO source using an AvroSchema.
* @package Avro
*/
class AvroDataIOReader
{
/**
* @var AvroIO
*/
private $io;
/**
* @var AvroIOBinaryDecoder
*/
private $decoder;
/**
* @var AvroIODatumReader
*/
private $datum_reader;
/**
* @var string
*/
public $sync_marker;
/**
* @var array object container metadata
*/
public $metadata;
/**
* @var int count of items in block
*/
private $block_count;
/**
* @var compression codec
*/
private $codec;
/**
* @param AvroIO $io source from which to read
* @param AvroIODatumReader $datum_reader reader that understands
* the data schema
* @throws AvroDataIOException if $io is not an instance of AvroIO
* or the codec specified in the header
* is not supported
* @uses read_header()
*/
public function __construct($io, $datum_reader)
{
if (!($io instanceof AvroIO))
throw new AvroDataIOException('io must be instance of AvroIO');
$this->io = $io;
$this->decoder = new AvroIOBinaryDecoder($this->io);
$this->datum_reader = $datum_reader;
$this->read_header();
$codec = AvroUtil::array_value($this->metadata,
AvroDataIO::METADATA_CODEC_ATTR);
if ($codec && !AvroDataIO::is_valid_codec($codec))
throw new AvroDataIOException(sprintf('Unknown codec: %s', $codec));
$this->codec = $codec;
$this->block_count = 0;
// FIXME: Seems unsanitary to set writers_schema here.
// Can't constructor take it as an argument?
$this->datum_reader->set_writers_schema(
AvroSchema::parse($this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR]));
}
/**
* Reads header of object container
* @throws AvroDataIOException if the file is not an Avro data file.
*/
private function read_header()
{
$this->seek(0, AvroIO::SEEK_SET);
$magic = $this->read(AvroDataIO::magic_size());
if (strlen($magic) < AvroDataIO::magic_size())
throw new AvroDataIOException(
'Not an Avro data file: shorter than the Avro magic block');
if (AvroDataIO::magic() != $magic)
throw new AvroDataIOException(
sprintf('Not an Avro data file: %s does not match %s',
$magic, AvroDataIO::magic()));
$this->metadata = $this->datum_reader->read_data(AvroDataIO::metadata_schema(),
AvroDataIO::metadata_schema(),
$this->decoder);
$this->sync_marker = $this->read(AvroDataIO::SYNC_SIZE);
}
/**
* @internal Would be nice to implement data() as an iterator, I think
* @returns array of data from object container.
*/
public function data()
{
$data = array();
while (true)
{
if (0 == $this->block_count)
{
if ($this->is_eof())
break;
if ($this->skip_sync())
if ($this->is_eof())
break;
$length = $this->read_block_header();
$decoder = $this->decoder;
if ($this->codec == AvroDataIO::DEFLATE_CODEC) {
$compressed = $decoder->read($length);
$datum = gzinflate($compressed);
$decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum));
} elseif ($this->codec === AvroDataIO::ZSTANDARD_CODEC) {
if (!extension_loaded('zstd')) {
throw new AvroException('Please install ext-zstd to use zstandard compression.');
}
$compressed = $decoder->read($length);
$datum = zstd_uncompress($compressed);
$decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum));
} elseif ($this->codec === AvroDataIO::SNAPPY_CODEC) {
if (!extension_loaded('snappy')) {
throw new AvroException('Please install ext-snappy to use snappy compression.');
}
$compressed = $decoder->read($length);
$crc32 = unpack('N', substr($compressed, -4))[1];
$datum = snappy_uncompress(substr($compressed, 0, -4));
if ($crc32 === crc32($datum)) {
$decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum));
} else {
$decoder = new AvroIOBinaryDecoder(new AvroStringIO(snappy_uncompress($datum)));
}
} elseif ($this->codec === AvroDataIO::BZIP2_CODEC) {
if (!extension_loaded('bz2')) {
throw new AvroException('Please install ext-bz2 to use bzip2 compression.');
}
$compressed = $decoder->read($length);
$datum = bzdecompress($compressed);
$decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum));
}
}
$data[] = $this->datum_reader->read($decoder);
$this->block_count -= 1;
}
return $data;
}
/**
* Closes this writer (and its AvroIO object.)
* @uses AvroIO::close()
*/
public function close() { return $this->io->close(); }
/**
* @uses AvroIO::seek()
*/
private function seek($offset, $whence)
{
return $this->io->seek($offset, $whence);
}
/**
* @uses AvroIO::read()
*/
private function read($len) { return $this->io->read($len); }
/**
* @uses AvroIO::is_eof()
*/
private function is_eof() { return $this->io->is_eof(); }
private function skip_sync()
{
$proposed_sync_marker = $this->read(AvroDataIO::SYNC_SIZE);
if ($proposed_sync_marker != $this->sync_marker)
{
$this->seek(-AvroDataIO::SYNC_SIZE, AvroIO::SEEK_CUR);
return false;
}
return true;
}
/**
* Reads the block header (which includes the count of items in the block
* and the length in bytes of the block)
* @returns int length in bytes of the block.
*/
private function read_block_header()
{
$this->block_count = $this->decoder->read_long();
return $this->decoder->read_long();
}
}
/**
* Writes Avro data to an AvroIO source using an AvroSchema
* @package Avro
*/
class AvroDataIOWriter
{
/**
* @returns string a new, unique sync marker.
*/
private static function generate_sync_marker()
{
// From https://php.net/manual/en/function.mt-rand.php comments
return pack('S8',
mt_rand(0, 0xffff), mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0xffff) | 0x4000,
mt_rand(0, 0xffff) | 0x8000,
mt_rand(0, 0xffff), mt_rand(0, 0xffff), mt_rand(0, 0xffff));
}
/**
* @var AvroIO object container where data is written
*/
private $io;
/**
* @var AvroIOBinaryEncoder encoder for object container
*/
private $encoder;
/**
* @var AvroDatumWriter
*/
private $datum_writer;
/**
* @var AvroStringIO buffer for writing
*/
private $buffer;
/**
* @var AvroIOBinaryEncoder encoder for buffer
*/
private $buffer_encoder; // AvroIOBinaryEncoder
/**
* @var int count of items written to block
*/
private $block_count;
/**
* @var array map of object container metadata
*/
private $metadata;
/**
* @var compression codec
*/
private $codec;
/**
* @param AvroIO $io
* @param AvroIODatumWriter $datum_writer
* @param AvroSchema $writers_schema
* @param string $codec
*/
public function __construct($io, $datum_writer, $writers_schema=null, $codec=AvroDataIO::NULL_CODEC)
{
if (!($io instanceof AvroIO))
throw new AvroDataIOException('io must be instance of AvroIO');
$this->io = $io;
$this->encoder = new AvroIOBinaryEncoder($this->io);
$this->datum_writer = $datum_writer;
$this->buffer = new AvroStringIO();
$this->buffer_encoder = new AvroIOBinaryEncoder($this->buffer);
$this->block_count = 0;
$this->metadata = array();
if ($writers_schema)
{
if (!AvroDataIO::is_valid_codec($codec))
throw new AvroDataIOException(
sprintf('codec %s is not supported', $codec));
$this->sync_marker = self::generate_sync_marker();
$this->metadata[AvroDataIO::METADATA_CODEC_ATTR] = $this->codec = $codec;
$this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR] = (string) $writers_schema;
$this->write_header();
}
else
{
$dfr = new AvroDataIOReader($this->io, new AvroIODatumReader());
$this->sync_marker = $dfr->sync_marker;
$this->metadata[AvroDataIO::METADATA_CODEC_ATTR] = $this->codec
= $dfr->metadata[AvroDataIO::METADATA_CODEC_ATTR];
$schema_from_file = $dfr->metadata[AvroDataIO::METADATA_SCHEMA_ATTR];
$this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR] = $schema_from_file;
$this->datum_writer->writers_schema = AvroSchema::parse($schema_from_file);
$this->seek(0, SEEK_END);
}
}
/**
* @param mixed $datum
*/
public function append($datum)
{
$this->datum_writer->write($datum, $this->buffer_encoder);
$this->block_count++;
if ($this->buffer->length() >= AvroDataIO::SYNC_INTERVAL)
$this->write_block();
}
/**
* Flushes buffer to AvroIO object container and closes it.
* @return mixed value of $io->close()
* @see AvroIO::close()
*/
public function close()
{
$this->flush();
return $this->io->close();
}
/**
* Flushes biffer to AvroIO object container.
* @returns mixed value of $io->flush()
* @see AvroIO::flush()
*/
private function flush()
{
$this->write_block();
return $this->io->flush();
}
/**
* Writes a block of data to the AvroIO object container.
*/
private function write_block()
{
if ($this->block_count > 0)
{
$this->encoder->write_long($this->block_count);
$to_write = (string) $this->buffer;
if ($this->codec === AvroDataIO::DEFLATE_CODEC) {
$to_write = gzdeflate($to_write);
} elseif ($this->codec === AvroDataIO::ZSTANDARD_CODEC) {
if (!extension_loaded('zstd')) {
throw new AvroException('Please install ext-zstd to use zstandard compression.');
}
$to_write = zstd_compress($to_write);
} elseif ($this->codec === AvroDataIO::SNAPPY_CODEC) {
if (!extension_loaded('snappy')) {
throw new AvroException('Please install ext-snappy to use snappy compression.');
}
$crc32 = crc32($to_write);
$compressed = snappy_compress($to_write);
$to_write = pack('a*N', $compressed, $crc32);
} elseif ($this->codec === AvroDataIO::BZIP2_CODEC) {
if (!extension_loaded('bz2')) {
throw new AvroException('Please install ext-bz2 to use bzip2 compression.');
}
$to_write = bzcompress($to_write);
}
$this->encoder->write_long(strlen($to_write));
$this->write($to_write);
$this->write($this->sync_marker);
$this->buffer->truncate();
$this->block_count = 0;
}
}
/**
* Writes the header of the AvroIO object container
*/
private function write_header()
{
$this->write(AvroDataIO::magic());
$this->datum_writer->write_data(AvroDataIO::metadata_schema(),
$this->metadata, $this->encoder);
$this->write($this->sync_marker);
}
/**
* @param string $bytes
* @uses AvroIO::write()
*/
private function write($bytes) { return $this->io->write($bytes); }
/**
* @param int $offset
* @param int $whence
* @uses AvroIO::seek()
*/
private function seek($offset, $whence)
{
return $this->io->seek($offset, $whence);
}
}