blob: 0dc4dd0ee4587f0bdbe4a45da5b63bdc536feabd [file] [log] [blame]
<?php
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache\Avro\DataFile;
use Apache\Avro\AvroException;
use Apache\Avro\AvroIO;
use Apache\Avro\AvroUtil;
use Apache\Avro\Datum\AvroIOBinaryDecoder;
use Apache\Avro\Datum\AvroIODatumReader;
use Apache\Avro\IO\AvroStringIO;
use Apache\Avro\Schema\AvroSchema;
/**
*
* Reads Avro data from an AvroIO source using an AvroSchema.
* @package Avro
*/
class AvroDataIOReader
{
/**
* @var string
*/
public $sync_marker;
/**
* @var array object container metadata
*/
public $metadata;
/**
* @var AvroIO
*/
private $io;
/**
* @var AvroIOBinaryDecoder
*/
private $decoder;
/**
* @var AvroIODatumReader
*/
private $datum_reader;
/**
* @var int count of items in block
*/
private $block_count;
/**
* @var compression codec
*/
private $codec;
/**
* @param AvroIO $io source from which to read
* @param AvroIODatumReader $datum_reader reader that understands
* the data schema
* @throws AvroDataIOException if $io is not an instance of AvroIO
* or the codec specified in the header
* is not supported
* @uses readHeader()
*/
public function __construct($io, $datum_reader)
{
if (!($io instanceof AvroIO)) {
throw new AvroDataIOException('io must be instance of AvroIO');
}
$this->io = $io;
$this->decoder = new AvroIOBinaryDecoder($this->io);
$this->datum_reader = $datum_reader;
$this->readHeader();
$codec = $this->metadata[AvroDataIO::METADATA_CODEC_ATTR] ?? null;
if ($codec && !AvroDataIO::isValidCodec($codec)) {
throw new AvroDataIOException(sprintf('Unknown codec: %s', $codec));
}
$this->codec = $codec;
$this->block_count = 0;
// FIXME: Seems unsanitary to set writers_schema here.
// Can't constructor take it as an argument?
$this->datum_reader->setWritersSchema(
AvroSchema::parse($this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR])
);
}
/**
* Reads header of object container
* @throws AvroDataIOException if the file is not an Avro data file.
*/
private function readHeader()
{
$this->seek(0, AvroIO::SEEK_SET);
$magic = $this->read(AvroDataIO::magicSize());
if (strlen($magic) < AvroDataIO::magicSize()) {
throw new AvroDataIOException(
'Not an Avro data file: shorter than the Avro magic block'
);
}
if (AvroDataIO::magic() != $magic) {
throw new AvroDataIOException(
sprintf(
'Not an Avro data file: %s does not match %s',
$magic,
AvroDataIO::magic()
)
);
}
$this->metadata = $this->datum_reader->readData(
AvroDataIO::metadataSchema(),
AvroDataIO::metadataSchema(),
$this->decoder
);
$this->sync_marker = $this->read(AvroDataIO::SYNC_SIZE);
}
/**
* @uses AvroIO::seek()
*/
private function seek($offset, $whence)
{
return $this->io->seek($offset, $whence);
}
/**
* @uses AvroIO::read()
*/
private function read($len)
{
return $this->io->read($len);
}
/**
* @internal Would be nice to implement data() as an iterator, I think
* @returns array of data from object container.
*/
public function data()
{
$data = [];
$decoder = $this->decoder;
while (true) {
if (0 == $this->block_count) {
if ($this->isEof()) {
break;
}
if ($this->skipSync()) {
if ($this->isEof()) {
break;
}
}
$length = $this->readBlockHeader();
if ($this->codec == AvroDataIO::DEFLATE_CODEC) {
$compressed = $decoder->read($length);
$datum = gzinflate($compressed);
$decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum));
} elseif ($this->codec === AvroDataIO::ZSTANDARD_CODEC) {
if (!extension_loaded('zstd')) {
throw new AvroException('Please install ext-zstd to use zstandard compression.');
}
$compressed = $decoder->read($length);
$datum = zstd_uncompress($compressed);
$decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum));
} elseif ($this->codec === AvroDataIO::SNAPPY_CODEC) {
if (!extension_loaded('snappy')) {
throw new AvroException('Please install ext-snappy to use snappy compression.');
}
$compressed = $decoder->read($length);
$crc32 = unpack('N', substr($compressed, -4))[1];
$datum = snappy_uncompress(substr($compressed, 0, -4));
if ($crc32 === crc32($datum)) {
$decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum));
} else {
$decoder = new AvroIOBinaryDecoder(new AvroStringIO(snappy_uncompress($datum)));
}
} elseif ($this->codec === AvroDataIO::BZIP2_CODEC) {
if (!extension_loaded('bz2')) {
throw new AvroException('Please install ext-bz2 to use bzip2 compression.');
}
$compressed = $decoder->read($length);
$datum = bzdecompress($compressed);
$decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum));
}
}
$data[] = $this->datum_reader->read($decoder);
--$this->block_count;
}
return $data;
}
/**
* @uses AvroIO::isEof()
*/
private function isEof()
{
return $this->io->isEof();
}
private function skipSync()
{
$proposed_sync_marker = $this->read(AvroDataIO::SYNC_SIZE);
if ($proposed_sync_marker != $this->sync_marker) {
$this->seek(-AvroDataIO::SYNC_SIZE, AvroIO::SEEK_CUR);
return false;
}
return true;
}
/**
* Reads the block header (which includes the count of items in the block
* and the length in bytes of the block)
* @returns int length in bytes of the block.
*/
private function readBlockHeader()
{
$this->block_count = $this->decoder->readLong();
return $this->decoder->readLong();
}
/**
* Closes this writer (and its AvroIO object.)
* @uses AvroIO::close()
*/
public function close()
{
return $this->io->close();
}
}