| <?php |
| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /** |
| * Classes handling reading and writing from and to AvroIO objects |
| * @package Avro |
| */ |
| |
| /** |
| * Raised when something unkind happens with respect to AvroDataIO. |
| * @package Avro |
| */ |
| class AvroDataIOException extends AvroException {} |
| |
| /** |
| * @package Avro |
| */ |
| class AvroDataIO |
| { |
| /** |
| * @var int used in file header |
| */ |
| const VERSION = 1; |
| |
| /** |
| * @var int count of bytes in synchronization marker |
| */ |
| const SYNC_SIZE = 16; |
| |
| /** |
| * @var int count of items per block, arbitrarily set to 4000 * SYNC_SIZE |
| * @todo make this value configurable |
| */ |
| const SYNC_INTERVAL = 64000; |
| |
| /** |
| * @var string map key for datafile metadata codec value |
| */ |
| const METADATA_CODEC_ATTR = 'avro.codec'; |
| |
| /** |
| * @var string map key for datafile metadata schema value |
| */ |
| const METADATA_SCHEMA_ATTR = 'avro.schema'; |
| /** |
| * @var string JSON for datafile metadata schema |
| */ |
| const METADATA_SCHEMA_JSON = '{"type":"map","values":"bytes"}'; |
| |
| /** |
| * @var string codec value for NULL codec |
| */ |
| const NULL_CODEC = 'null'; |
| |
| /** |
| * @var string codec value for deflate codec |
| */ |
| const DEFLATE_CODEC = 'deflate'; |
| |
| const SNAPPY_CODEC = 'snappy'; |
| |
| const ZSTANDARD_CODEC = 'zstandard'; |
| |
| const BZIP2_CODEC = 'bzip2'; |
| |
| /** |
| * @var array array of valid codec names |
| */ |
| private static $valid_codecs = [self::NULL_CODEC, self::DEFLATE_CODEC, self::SNAPPY_CODEC, self::ZSTANDARD_CODEC]; |
| |
| /** |
| * @var AvroSchema cached version of metadata schema object |
| */ |
| private static $metadata_schema; |
| |
| /** |
| * @returns the initial "magic" segment of an Avro container file header. |
| */ |
| public static function magic() { return ('Obj' . pack('c', self::VERSION)); } |
| |
| /** |
| * @returns int count of bytes in the initial "magic" segment of the |
| * Avro container file header |
| */ |
| public static function magic_size() { return strlen(self::magic()); } |
| |
| |
| /** |
| * @returns AvroSchema object of Avro container file metadata. |
| */ |
| public static function metadata_schema() |
| { |
| if (is_null(self::$metadata_schema)) |
| self::$metadata_schema = AvroSchema::parse(self::METADATA_SCHEMA_JSON); |
| return self::$metadata_schema; |
| } |
| |
| /** |
| * @param string $file_path file_path of file to open |
| * @param string $mode one of AvroFile::READ_MODE or AvroFile::WRITE_MODE |
| * @param string $schema_json JSON of writer's schema |
| * @param string $codec compression codec |
| * @returns AvroDataIOWriter instance of AvroDataIOWriter |
| * |
| * @throws AvroDataIOException if $writers_schema is not provided |
| * or if an invalid $mode is given. |
| */ |
| public static function open_file($file_path, $mode=AvroFile::READ_MODE, |
| $schema_json=null, $codec=self::NULL_CODEC) |
| { |
| $schema = !is_null($schema_json) |
| ? AvroSchema::parse($schema_json) : null; |
| |
| $io = false; |
| switch ($mode) |
| { |
| case AvroFile::WRITE_MODE: |
| if (is_null($schema)) |
| throw new AvroDataIOException('Writing an Avro file requires a schema.'); |
| $file = new AvroFile($file_path, AvroFile::WRITE_MODE); |
| $io = self::open_writer($file, $schema, $codec); |
| break; |
| case AvroFile::READ_MODE: |
| $file = new AvroFile($file_path, AvroFile::READ_MODE); |
| $io = self::open_reader($file, $schema); |
| break; |
| default: |
| throw new AvroDataIOException( |
| sprintf("Only modes '%s' and '%s' allowed. You gave '%s'.", |
| AvroFile::READ_MODE, AvroFile::WRITE_MODE, $mode)); |
| } |
| return $io; |
| } |
| |
| /** |
| * @returns array array of valid codecs |
| */ |
| public static function valid_codecs() |
| { |
| return self::$valid_codecs; |
| } |
| |
| /** |
| * @param string $codec |
| * @returns boolean true if $codec is a valid codec value and false otherwise |
| */ |
| public static function is_valid_codec($codec) |
| { |
| return in_array($codec, self::valid_codecs()); |
| } |
| |
| /** |
| * @param AvroIO $io |
| * @param AvroSchema $schema |
| * @param string $codec |
| * @returns AvroDataIOWriter |
| */ |
| protected static function open_writer($io, $schema, $codec=self::NULL_CODEC) |
| { |
| $writer = new AvroIODatumWriter($schema); |
| return new AvroDataIOWriter($io, $writer, $schema, $codec); |
| } |
| |
| /** |
| * @param AvroIO $io |
| * @param AvroSchema $schema |
| * @returns AvroDataIOReader |
| */ |
| protected static function open_reader($io, $schema) |
| { |
| $reader = new AvroIODatumReader(null, $schema); |
| return new AvroDataIOReader($io, $reader); |
| } |
| |
| } |
| |
| /** |
| * |
| * Reads Avro data from an AvroIO source using an AvroSchema. |
| * @package Avro |
| */ |
| class AvroDataIOReader |
| { |
| /** |
| * @var AvroIO |
| */ |
| private $io; |
| |
| /** |
| * @var AvroIOBinaryDecoder |
| */ |
| private $decoder; |
| |
| /** |
| * @var AvroIODatumReader |
| */ |
| private $datum_reader; |
| |
| /** |
| * @var string |
| */ |
| public $sync_marker; |
| |
| /** |
| * @var array object container metadata |
| */ |
| public $metadata; |
| |
| /** |
| * @var int count of items in block |
| */ |
| private $block_count; |
| |
| /** |
| * @var compression codec |
| */ |
| private $codec; |
| |
| /** |
| * @param AvroIO $io source from which to read |
| * @param AvroIODatumReader $datum_reader reader that understands |
| * the data schema |
| * @throws AvroDataIOException if $io is not an instance of AvroIO |
| * or the codec specified in the header |
| * is not supported |
| * @uses read_header() |
| */ |
| public function __construct($io, $datum_reader) |
| { |
| |
| if (!($io instanceof AvroIO)) |
| throw new AvroDataIOException('io must be instance of AvroIO'); |
| |
| $this->io = $io; |
| $this->decoder = new AvroIOBinaryDecoder($this->io); |
| $this->datum_reader = $datum_reader; |
| $this->read_header(); |
| |
| $codec = AvroUtil::array_value($this->metadata, |
| AvroDataIO::METADATA_CODEC_ATTR); |
| if ($codec && !AvroDataIO::is_valid_codec($codec)) |
| throw new AvroDataIOException(sprintf('Unknown codec: %s', $codec)); |
| $this->codec = $codec; |
| |
| $this->block_count = 0; |
| // FIXME: Seems unsanitary to set writers_schema here. |
| // Can't constructor take it as an argument? |
| $this->datum_reader->set_writers_schema( |
| AvroSchema::parse($this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR])); |
| } |
| |
| /** |
| * Reads header of object container |
| * @throws AvroDataIOException if the file is not an Avro data file. |
| */ |
| private function read_header() |
| { |
| $this->seek(0, AvroIO::SEEK_SET); |
| |
| $magic = $this->read(AvroDataIO::magic_size()); |
| |
| if (strlen($magic) < AvroDataIO::magic_size()) |
| throw new AvroDataIOException( |
| 'Not an Avro data file: shorter than the Avro magic block'); |
| |
| if (AvroDataIO::magic() != $magic) |
| throw new AvroDataIOException( |
| sprintf('Not an Avro data file: %s does not match %s', |
| $magic, AvroDataIO::magic())); |
| |
| $this->metadata = $this->datum_reader->read_data(AvroDataIO::metadata_schema(), |
| AvroDataIO::metadata_schema(), |
| $this->decoder); |
| $this->sync_marker = $this->read(AvroDataIO::SYNC_SIZE); |
| } |
| |
| /** |
| * @internal Would be nice to implement data() as an iterator, I think |
| * @returns array of data from object container. |
| */ |
| public function data() |
| { |
| $data = array(); |
| while (true) |
| { |
| if (0 == $this->block_count) |
| { |
| if ($this->is_eof()) |
| break; |
| |
| if ($this->skip_sync()) |
| if ($this->is_eof()) |
| break; |
| |
| $length = $this->read_block_header(); |
| $decoder = $this->decoder; |
| if ($this->codec == AvroDataIO::DEFLATE_CODEC) { |
| $compressed = $decoder->read($length); |
| $datum = gzinflate($compressed); |
| $decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum)); |
| } elseif ($this->codec === AvroDataIO::ZSTANDARD_CODEC) { |
| if (!extension_loaded('zstd')) { |
| throw new AvroException('Please install ext-zstd to use zstandard compression.'); |
| } |
| $compressed = $decoder->read($length); |
| $datum = zstd_uncompress($compressed); |
| $decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum)); |
| } elseif ($this->codec === AvroDataIO::SNAPPY_CODEC) { |
| if (!extension_loaded('snappy')) { |
| throw new AvroException('Please install ext-snappy to use snappy compression.'); |
| } |
| $compressed = $decoder->read($length); |
| $crc32 = unpack('N', substr($compressed, -4))[1]; |
| $datum = snappy_uncompress(substr($compressed, 0, -4)); |
| if ($crc32 === crc32($datum)) { |
| $decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum)); |
| } else { |
| $decoder = new AvroIOBinaryDecoder(new AvroStringIO(snappy_uncompress($datum))); |
| } |
| } elseif ($this->codec === AvroDataIO::BZIP2_CODEC) { |
| if (!extension_loaded('bz2')) { |
| throw new AvroException('Please install ext-bz2 to use bzip2 compression.'); |
| } |
| $compressed = $decoder->read($length); |
| $datum = bzdecompress($compressed); |
| $decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum)); |
| } |
| } |
| $data[] = $this->datum_reader->read($decoder); |
| $this->block_count -= 1; |
| } |
| return $data; |
| } |
| |
| /** |
| * Closes this writer (and its AvroIO object.) |
| * @uses AvroIO::close() |
| */ |
| public function close() { return $this->io->close(); } |
| |
| /** |
| * @uses AvroIO::seek() |
| */ |
| private function seek($offset, $whence) |
| { |
| return $this->io->seek($offset, $whence); |
| } |
| |
| /** |
| * @uses AvroIO::read() |
| */ |
| private function read($len) { return $this->io->read($len); } |
| |
| /** |
| * @uses AvroIO::is_eof() |
| */ |
| private function is_eof() { return $this->io->is_eof(); } |
| |
| private function skip_sync() |
| { |
| $proposed_sync_marker = $this->read(AvroDataIO::SYNC_SIZE); |
| if ($proposed_sync_marker != $this->sync_marker) |
| { |
| $this->seek(-AvroDataIO::SYNC_SIZE, AvroIO::SEEK_CUR); |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Reads the block header (which includes the count of items in the block |
| * and the length in bytes of the block) |
| * @returns int length in bytes of the block. |
| */ |
| private function read_block_header() |
| { |
| $this->block_count = $this->decoder->read_long(); |
| return $this->decoder->read_long(); |
| } |
| |
| } |
| |
| /** |
| * Writes Avro data to an AvroIO source using an AvroSchema |
| * @package Avro |
| */ |
| class AvroDataIOWriter |
| { |
| /** |
| * @returns string a new, unique sync marker. |
| */ |
| private static function generate_sync_marker() |
| { |
| // From https://php.net/manual/en/function.mt-rand.php comments |
| return pack('S8', |
| mt_rand(0, 0xffff), mt_rand(0, 0xffff), |
| mt_rand(0, 0xffff), |
| mt_rand(0, 0xffff) | 0x4000, |
| mt_rand(0, 0xffff) | 0x8000, |
| mt_rand(0, 0xffff), mt_rand(0, 0xffff), mt_rand(0, 0xffff)); |
| } |
| |
| /** |
| * @var AvroIO object container where data is written |
| */ |
| private $io; |
| |
| /** |
| * @var AvroIOBinaryEncoder encoder for object container |
| */ |
| private $encoder; |
| |
| /** |
| * @var AvroDatumWriter |
| */ |
| private $datum_writer; |
| |
| /** |
| * @var AvroStringIO buffer for writing |
| */ |
| private $buffer; |
| |
| /** |
| * @var AvroIOBinaryEncoder encoder for buffer |
| */ |
| private $buffer_encoder; // AvroIOBinaryEncoder |
| |
| /** |
| * @var int count of items written to block |
| */ |
| private $block_count; |
| |
| /** |
| * @var array map of object container metadata |
| */ |
| private $metadata; |
| |
| /** |
| * @var compression codec |
| */ |
| private $codec; |
| |
| /** |
| * @param AvroIO $io |
| * @param AvroIODatumWriter $datum_writer |
| * @param AvroSchema $writers_schema |
| * @param string $codec |
| */ |
| public function __construct($io, $datum_writer, $writers_schema=null, $codec=AvroDataIO::NULL_CODEC) |
| { |
| if (!($io instanceof AvroIO)) |
| throw new AvroDataIOException('io must be instance of AvroIO'); |
| |
| $this->io = $io; |
| $this->encoder = new AvroIOBinaryEncoder($this->io); |
| $this->datum_writer = $datum_writer; |
| $this->buffer = new AvroStringIO(); |
| $this->buffer_encoder = new AvroIOBinaryEncoder($this->buffer); |
| $this->block_count = 0; |
| $this->metadata = array(); |
| |
| if ($writers_schema) |
| { |
| if (!AvroDataIO::is_valid_codec($codec)) |
| throw new AvroDataIOException( |
| sprintf('codec %s is not supported', $codec)); |
| |
| $this->sync_marker = self::generate_sync_marker(); |
| $this->metadata[AvroDataIO::METADATA_CODEC_ATTR] = $this->codec = $codec; |
| $this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR] = (string) $writers_schema; |
| $this->write_header(); |
| } |
| else |
| { |
| $dfr = new AvroDataIOReader($this->io, new AvroIODatumReader()); |
| $this->sync_marker = $dfr->sync_marker; |
| $this->metadata[AvroDataIO::METADATA_CODEC_ATTR] = $this->codec |
| = $dfr->metadata[AvroDataIO::METADATA_CODEC_ATTR]; |
| $schema_from_file = $dfr->metadata[AvroDataIO::METADATA_SCHEMA_ATTR]; |
| $this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR] = $schema_from_file; |
| $this->datum_writer->writers_schema = AvroSchema::parse($schema_from_file); |
| $this->seek(0, SEEK_END); |
| } |
| } |
| |
| /** |
| * @param mixed $datum |
| */ |
| public function append($datum) |
| { |
| $this->datum_writer->write($datum, $this->buffer_encoder); |
| $this->block_count++; |
| |
| if ($this->buffer->length() >= AvroDataIO::SYNC_INTERVAL) |
| $this->write_block(); |
| } |
| |
| /** |
| * Flushes buffer to AvroIO object container and closes it. |
| * @return mixed value of $io->close() |
| * @see AvroIO::close() |
| */ |
| public function close() |
| { |
| $this->flush(); |
| return $this->io->close(); |
| } |
| |
| /** |
| * Flushes biffer to AvroIO object container. |
| * @returns mixed value of $io->flush() |
| * @see AvroIO::flush() |
| */ |
| private function flush() |
| { |
| $this->write_block(); |
| return $this->io->flush(); |
| } |
| |
| /** |
| * Writes a block of data to the AvroIO object container. |
| */ |
| private function write_block() |
| { |
| if ($this->block_count > 0) |
| { |
| $this->encoder->write_long($this->block_count); |
| $to_write = (string) $this->buffer; |
| |
| if ($this->codec === AvroDataIO::DEFLATE_CODEC) { |
| $to_write = gzdeflate($to_write); |
| } elseif ($this->codec === AvroDataIO::ZSTANDARD_CODEC) { |
| if (!extension_loaded('zstd')) { |
| throw new AvroException('Please install ext-zstd to use zstandard compression.'); |
| } |
| $to_write = zstd_compress($to_write); |
| } elseif ($this->codec === AvroDataIO::SNAPPY_CODEC) { |
| if (!extension_loaded('snappy')) { |
| throw new AvroException('Please install ext-snappy to use snappy compression.'); |
| } |
| $crc32 = crc32($to_write); |
| $compressed = snappy_compress($to_write); |
| $to_write = pack('a*N', $compressed, $crc32); |
| } elseif ($this->codec === AvroDataIO::BZIP2_CODEC) { |
| if (!extension_loaded('bz2')) { |
| throw new AvroException('Please install ext-bz2 to use bzip2 compression.'); |
| } |
| $to_write = bzcompress($to_write); |
| } |
| |
| $this->encoder->write_long(strlen($to_write)); |
| $this->write($to_write); |
| $this->write($this->sync_marker); |
| $this->buffer->truncate(); |
| $this->block_count = 0; |
| } |
| } |
| |
| /** |
| * Writes the header of the AvroIO object container |
| */ |
| private function write_header() |
| { |
| $this->write(AvroDataIO::magic()); |
| $this->datum_writer->write_data(AvroDataIO::metadata_schema(), |
| $this->metadata, $this->encoder); |
| $this->write($this->sync_marker); |
| } |
| |
| /** |
| * @param string $bytes |
| * @uses AvroIO::write() |
| */ |
| private function write($bytes) { return $this->io->write($bytes); } |
| |
| /** |
| * @param int $offset |
| * @param int $whence |
| * @uses AvroIO::seek() |
| */ |
| private function seek($offset, $whence) |
| { |
| return $this->io->seek($offset, $whence); |
| } |
| } |