blob: d6914257b1f7b6c3dd7bb008a03f946a54a0f790 [file] [log] [blame]
<?php
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache\Avro\DataFile;
use Apache\Avro\AvroException;
use Apache\Avro\AvroIO;
use Apache\Avro\Datum\AvroIOBinaryEncoder;
use Apache\Avro\Datum\AvroIODatumReader;
use Apache\Avro\Datum\AvroIODatumWriter;
use Apache\Avro\IO\AvroStringIO;
use Apache\Avro\Schema\AvroSchema;
/**
* Writes Avro data to an AvroIO source using an AvroSchema
* @package Avro
*/
class AvroDataIOWriter
{
/**
* @var AvroIO object container where data is written
*/
private $io;
/**
* @var AvroIOBinaryEncoder encoder for object container
*/
private $encoder;
/**
* @var AvroIODatumWriter
*/
private $datum_writer;
/**
* @var AvroStringIO buffer for writing
*/
private $buffer;
/**
* @var AvroIOBinaryEncoder encoder for buffer
*/
private $buffer_encoder;
/**
* @var int count of items written to block
*/
private $block_count; // AvroIOBinaryEncoder
/**
* @var array map of object container metadata
*/
private $metadata;
/**
* @var string compression codec
*/
private $codec;
/**
* @param AvroIO $io
* @param AvroIODatumWriter $datum_writer
* @param AvroSchema $writers_schema
* @param string $codec
*/
public function __construct($io, $datum_writer, $writers_schema = null, $codec = AvroDataIO::NULL_CODEC)
{
if (!($io instanceof AvroIO)) {
throw new AvroDataIOException('io must be instance of AvroIO');
}
$this->io = $io;
$this->encoder = new AvroIOBinaryEncoder($this->io);
$this->datum_writer = $datum_writer;
$this->buffer = new AvroStringIO();
$this->buffer_encoder = new AvroIOBinaryEncoder($this->buffer);
$this->block_count = 0;
$this->metadata = array();
if ($writers_schema) {
if (!AvroDataIO::isValidCodec($codec)) {
throw new AvroDataIOException(
sprintf('codec %s is not supported', $codec)
);
}
$this->sync_marker = self::generateSyncMarker();
$this->metadata[AvroDataIO::METADATA_CODEC_ATTR] = $this->codec = $codec;
$this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR] = (string) $writers_schema;
$this->writeHeader();
} else {
$dfr = new AvroDataIOReader($this->io, new AvroIODatumReader());
$this->sync_marker = $dfr->sync_marker;
$this->metadata[AvroDataIO::METADATA_CODEC_ATTR] = $this->codec
= $dfr->metadata[AvroDataIO::METADATA_CODEC_ATTR];
$schema_from_file = $dfr->metadata[AvroDataIO::METADATA_SCHEMA_ATTR];
$this->metadata[AvroDataIO::METADATA_SCHEMA_ATTR] = $schema_from_file;
$this->datum_writer->writersSchema = AvroSchema::parse($schema_from_file);
$this->seek(0, SEEK_END);
}
}
/**
* @returns string a new, unique sync marker.
*/
private static function generateSyncMarker()
{
// From https://php.net/manual/en/function.mt-rand.php comments
return pack(
'S8',
random_int(0, 0xffff),
random_int(0, 0xffff),
random_int(0, 0xffff),
random_int(0, 0xffff) | 0x4000,
random_int(0, 0xffff) | 0x8000,
random_int(0, 0xffff),
random_int(0, 0xffff),
random_int(0, 0xffff)
);
}
/**
* Writes the header of the AvroIO object container
*/
private function writeHeader()
{
$this->write(AvroDataIO::magic());
$this->datum_writer->writeData(
AvroDataIO::metadataSchema(),
$this->metadata,
$this->encoder
);
$this->write($this->sync_marker);
}
/**
* @param string $bytes
* @uses AvroIO::write()
*/
private function write($bytes)
{
return $this->io->write($bytes);
}
/**
* @param int $offset
* @param int $whence
* @uses AvroIO::seek()
*/
private function seek($offset, $whence)
{
return $this->io->seek($offset, $whence);
}
/**
* @param mixed $datum
*/
public function append($datum)
{
$this->datum_writer->write($datum, $this->buffer_encoder);
$this->block_count++;
if ($this->buffer->length() >= AvroDataIO::SYNC_INTERVAL) {
$this->writeBlock();
}
}
/**
* Writes a block of data to the AvroIO object container.
*/
private function writeBlock()
{
if ($this->block_count > 0) {
$this->encoder->writeLong($this->block_count);
$to_write = (string) $this->buffer;
if ($this->codec === AvroDataIO::DEFLATE_CODEC) {
$to_write = gzdeflate($to_write);
} elseif ($this->codec === AvroDataIO::ZSTANDARD_CODEC) {
if (!extension_loaded('zstd')) {
throw new AvroException('Please install ext-zstd to use zstandard compression.');
}
$to_write = zstd_compress($to_write);
} elseif ($this->codec === AvroDataIO::SNAPPY_CODEC) {
if (!extension_loaded('snappy')) {
throw new AvroException('Please install ext-snappy to use snappy compression.');
}
$crc32 = crc32($to_write);
$compressed = snappy_compress($to_write);
$to_write = pack('a*N', $compressed, $crc32);
} elseif ($this->codec === AvroDataIO::BZIP2_CODEC) {
if (!extension_loaded('bz2')) {
throw new AvroException('Please install ext-bz2 to use bzip2 compression.');
}
$to_write = bzcompress($to_write);
}
$this->encoder->writeLong(strlen($to_write));
$this->write($to_write);
$this->write($this->sync_marker);
$this->buffer->truncate();
$this->block_count = 0;
}
}
/**
* Flushes buffer to AvroIO object container and closes it.
* @return mixed value of $io->close()
* @see AvroIO::close()
*/
public function close()
{
$this->flush();
return $this->io->close();
}
/**
* Flushes biffer to AvroIO object container.
* @returns mixed value of $io->flush()
* @see AvroIO::flush()
*/
private function flush()
{
$this->writeBlock();
return $this->io->flush();
}
}