blob: 82ac3a9da8b5d86ee2c0242dfaae1e6cadf3dd2f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using Avro.Generic;
using Avro.IO;
namespace Avro.File
{
/// <summary>
/// Stores in a file a sequence of data conforming to a schema. The schema is stored in the file
/// with the data. Each datum in a file is of the same schema. Data is written with a
/// <see cref="DatumWriter{T}" />. Data is grouped into blocks. A synchronization marker is
/// written between blocks, so that files may be split. Blocks may be compressed. Extensible
/// metadata is stored at the end of the file. Files may be appended to.
/// </summary>
/// <typeparam name="T">Type of datum to write to the file.</typeparam>
/// <seealso cref="IFileWriter&lt;T&gt;" />
public class DataFileWriter<T> : IFileWriter<T>
{
private Schema _schema;
private Codec _codec;
private Stream _stream;
private bool _leaveOpen;
private MemoryStream _blockStream;
private MemoryStream _compressedBlockStream;
private Encoder _encoder, _blockEncoder;
private DatumWriter<T> _writer;
private byte[] _syncData;
private bool _isOpen;
private bool _headerWritten;
private int _blockCount;
private int _syncInterval;
private IDictionary<string, byte[]> _metaData;
/// <summary>
/// Open a new writer instance to write
/// to a file path, using a Null codec
/// </summary>
/// <param name="writer">Datum writer to use.</param>
/// <param name="path">Path to the file.</param>
/// <returns>
/// A new file writer.
/// </returns>
public static IFileWriter<T> OpenWriter(DatumWriter<T> writer, string path)
{
return OpenWriter(writer, new FileStream(path, FileMode.Create), Codec.CreateCodec(Codec.Type.Null));
}
/// <summary>
/// Open a new writer instance to write
/// to an output stream, using a Null codec
/// </summary>
/// <param name="writer">Datum writer to use.</param>
/// <param name="outStream">Stream to write to.</param>
/// <returns>
/// A new file writer.
/// </returns>
public static IFileWriter<T> OpenWriter(DatumWriter<T> writer, Stream outStream)
{
return OpenWriter(writer, outStream, Codec.CreateCodec(Codec.Type.Null));
}
/// <summary>
/// Open a new writer instance to write
/// to an output stream, using a Null codec
/// </summary>
/// <param name="writer">Datum writer to use.</param>
/// <param name="outStream">Stream to write to.</param>
/// <param name="leaveOpen">Leave the stream open after disposing the object</param>
/// <returns>
/// A new file writer.
/// </returns>
public static IFileWriter<T> OpenWriter(DatumWriter<T> writer, Stream outStream, bool leaveOpen)
{
return OpenWriter(writer, outStream, Codec.CreateCodec(Codec.Type.Null), leaveOpen);
}
/// <summary>
/// Open a new writer instance to write
/// to a file path with a specified codec
/// </summary>
/// <param name="writer">Datum writer to use.</param>
/// <param name="path">Path to the file.</param>
/// <param name="codec">Codec to use when writing.</param>
/// <returns>
/// A new file writer.
/// </returns>
public static IFileWriter<T> OpenWriter(DatumWriter<T> writer, string path, Codec codec)
{
return OpenWriter(writer, new FileStream(path, FileMode.Create), codec);
}
/// <summary>
/// Open a new writer instance to write
/// to an output stream with a specified codec
/// </summary>
/// <param name="writer">Datum writer to use.</param>
/// <param name="outStream">Stream to write to.</param>
/// <param name="codec">Codec to use when writing.</param>
/// <returns>
/// A new file writer.
/// </returns>
public static IFileWriter<T> OpenWriter(DatumWriter<T> writer, Stream outStream, Codec codec)
{
return new DataFileWriter<T>(writer).Create(writer.Schema, outStream, codec, false);
}
/// <summary>
/// Open a new writer instance to write
/// to an output stream with a specified codec
/// </summary>
/// <param name="writer">Datum writer to use.</param>
/// <param name="outStream">Stream to write to.</param>
/// <param name="codec">Codec to use when writing.</param>
/// <param name="leaveOpen">Leave the stream open after disposing the object</param>
/// <returns>
/// A new file writer.
/// </returns>
public static IFileWriter<T> OpenWriter(DatumWriter<T> writer, Stream outStream, Codec codec, bool leaveOpen)
{
return new DataFileWriter<T>(writer).Create(writer.Schema, outStream, codec, leaveOpen);
}
/// <summary>
/// Open a new writer instance to append to a file path.
/// </summary>
/// <param name="writer">Datum writer to use.</param>
/// <param name="path">Path to the file.</param>
/// <returns>
/// A new file writer.
/// </returns>
public static IFileWriter<T> OpenAppendWriter(DatumWriter<T> writer, string path)
{
return new DataFileWriter<T>(writer).AppendTo(path);
}
/// <summary>
/// Open a new writer instance to append to an output stream.
/// Both in and out streams must point to the same file.
/// </summary>
/// <param name="writer">Datum writer to use.</param>
/// <param name="inStream">reading the existing file.</param>
/// <param name="outStream">stream to write to, positioned at the end of the existing file.</param>
/// <returns>
/// A new file writer.
/// </returns>
/// <exception cref="AvroRuntimeException">
/// {nameof(inStream)} must have Read access
/// or
/// {nameof(outStream)} must have Write access
/// </exception>
public static IFileWriter<T> OpenAppendWriter(DatumWriter<T> writer, Stream inStream, Stream outStream)
{
if (!inStream.CanRead)
{
throw new AvroRuntimeException($"{nameof(inStream)} must have Read access");
}
if (!outStream.CanWrite)
{
throw new AvroRuntimeException($"{nameof(outStream)} must have Write access");
}
return new DataFileWriter<T>(writer).AppendTo(inStream, outStream);
}
/// <summary>
/// Initializes a new instance of the <see cref="DataFileWriter{T}"/> class.
/// </summary>
/// <param name="writer">The writer.</param>
private DataFileWriter(DatumWriter<T> writer)
{
_writer = writer;
_syncInterval = DataFileConstants.DefaultSyncInterval;
}
/// <inheritdoc/>
public bool IsReservedMeta(string key)
{
return key.StartsWith(DataFileConstants.MetaDataReserved, StringComparison.Ordinal);
}
/// <inheritdoc/>
public void SetMeta(string key, byte[] value)
{
if (IsReservedMeta(key))
{
throw new AvroRuntimeException("Cannot set reserved meta key: " + key);
}
_metaData.Add(key, value);
}
/// <inheritdoc/>
public void SetMeta(string key, long value)
{
try
{
SetMeta(key, GetByteValue(value.ToString(CultureInfo.InvariantCulture)));
}
catch (Exception e)
{
throw new AvroRuntimeException(e.Message, e);
}
}
/// <inheritdoc/>
public void SetMeta(string key, string value)
{
try
{
SetMeta(key, GetByteValue(value));
}
catch (Exception e)
{
throw new AvroRuntimeException(e.Message, e);
}
}
/// <inheritdoc/>
public void SetSyncInterval(int syncInterval)
{
if (syncInterval < 32 || syncInterval > (1 << 30))
{
throw new AvroRuntimeException("Invalid sync interval value: " + syncInterval);
}
_syncInterval = syncInterval;
}
/// <inheritdoc/>
public void Append(T datum)
{
AssertOpen();
EnsureHeader();
long usedBuffer = _blockStream.Position;
try
{
_writer.Write(datum, _blockEncoder);
}
catch (Exception e)
{
_blockStream.Position = usedBuffer;
throw new AvroRuntimeException("Error appending datum to writer", e);
}
_blockCount++;
WriteIfBlockFull();
}
/// <summary>
/// Appends to file.
/// </summary>
/// <param name="path">The path.</param>
/// <returns>a file writer</returns>
private IFileWriter<T> AppendTo(string path)
{
using (var inStream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
{
var outStream = new FileStream(path, FileMode.Append);
return AppendTo(inStream, outStream);
}
// outStream does not need to be closed here. It will be closed by invoking Close()
// of this writer.
}
/// <summary>
/// Appends to stream.
/// </summary>
/// <param name="inStream">The in stream.</param>
/// <param name="outStream">The out stream.</param>
/// <returns></returns>
private IFileWriter<T> AppendTo(Stream inStream, Stream outStream)
{
using (var dataFileReader = DataFileReader<T>.OpenReader(inStream))
{
var header = dataFileReader.GetHeader();
_schema = header.Schema;
_syncData = header.SyncData;
_metaData = header.MetaData;
}
if (_metaData.TryGetValue(DataFileConstants.MetaDataCodec, out byte[] codecBytes))
{
string codec = System.Text.Encoding.UTF8.GetString(codecBytes);
_codec = Codec.CreateCodecFromString(codec);
}
else
{
_codec = Codec.CreateCodec(Codec.Type.Null);
}
_headerWritten = true;
_stream = outStream;
_stream.Seek(0, SeekOrigin.End);
Init();
return this;
}
/// <summary>
/// Ensures the header.
/// </summary>
private void EnsureHeader()
{
if (!_headerWritten)
{
WriteHeader();
_headerWritten = true;
}
}
/// <inheritdoc/>
public void Flush()
{
EnsureHeader();
SyncInternal();
}
/// <inheritdoc/>
public long Sync()
{
SyncInternal();
return _stream.Position;
}
/// <summary>
/// Synchronizes the internal.
/// </summary>
private void SyncInternal()
{
AssertOpen();
WriteBlock();
}
/// <inheritdoc/>
public void Close()
{
EnsureHeader();
Flush();
_stream.Flush();
if (!_leaveOpen)
_stream.Close();
_blockStream.Dispose();
_compressedBlockStream.Dispose();
_isOpen = false;
}
/// <summary>
/// Writes the header.
/// </summary>
private void WriteHeader()
{
_encoder.WriteFixed(DataFileConstants.Magic);
WriteMetaData();
WriteSyncData();
}
/// <summary>
/// Initializes this instance.
/// </summary>
private void Init()
{
_blockCount = 0;
_encoder = new BinaryEncoder(_stream);
_blockStream = new MemoryStream();
_blockEncoder = new BinaryEncoder(_blockStream);
_compressedBlockStream = new MemoryStream();
if (_codec == null)
_codec = Codec.CreateCodec(Codec.Type.Null);
_isOpen = true;
}
/// <summary>
/// Asserts the open.
/// </summary>
/// <exception cref="AvroRuntimeException">Cannot complete operation: avro file/stream not open</exception>
private void AssertOpen()
{
if (!_isOpen) throw new AvroRuntimeException("Cannot complete operation: avro file/stream not open");
}
private IFileWriter<T> Create(Schema schema, Stream outStream, Codec codec, bool leaveOpen)
{
_codec = codec;
_stream = outStream;
_metaData = new Dictionary<string, byte[]>();
_schema = schema;
_leaveOpen = leaveOpen;
Init();
return this;
}
/// <summary>
/// Writes the meta data.
/// </summary>
private void WriteMetaData()
{
// Add sync, code & schema to metadata
GenerateSyncData();
//SetMetaInternal(DataFileConstants.MetaDataSync, _syncData); - Avro 1.5.4 C
SetMetaInternal(DataFileConstants.MetaDataCodec, GetByteValue(_codec.GetName()));
SetMetaInternal(DataFileConstants.MetaDataSchema, GetByteValue(_schema.ToString()));
// write metadata
int size = _metaData.Count;
_encoder.WriteInt(size);
foreach (KeyValuePair<string, byte[]> metaPair in _metaData)
{
_encoder.WriteString(metaPair.Key);
_encoder.WriteBytes(metaPair.Value);
}
_encoder.WriteMapEnd();
}
/// <summary>
/// Writes if block full.
/// </summary>
private void WriteIfBlockFull()
{
if (BufferInUse() >= _syncInterval)
WriteBlock();
}
/// <summary>
/// Buffers the in use.
/// </summary>
/// <returns>
/// Position of block stream
/// </returns>
private long BufferInUse()
{
return _blockStream.Position;
}
/// <summary>
/// Writes the block.
/// </summary>
private void WriteBlock()
{
if (_blockCount > 0)
{
// write count
_encoder.WriteLong(_blockCount);
// write data
_codec.Compress(_blockStream, _compressedBlockStream);
_encoder.WriteBytes(_compressedBlockStream.GetBuffer(), 0, (int)_compressedBlockStream.Length);
// write sync marker
_encoder.WriteFixed(_syncData);
// reset / re-init block
_blockCount = 0;
_blockStream.SetLength(0);
}
}
/// <summary>
/// Writes the synchronize data.
/// </summary>
private void WriteSyncData()
{
_encoder.WriteFixed(_syncData);
}
/// <summary>
/// Generates the synchronize data.
/// </summary>
private void GenerateSyncData()
{
_syncData = new byte[16];
Random random = new Random();
random.NextBytes(_syncData);
}
/// <summary>
/// Sets the meta internal.
/// </summary>
/// <param name="key">The key.</param>
/// <param name="value">The value.</param>
private void SetMetaInternal(string key, byte[] value)
{
_metaData.Add(key, value);
}
/// <summary>
/// Gets the byte value.
/// </summary>
/// <param name="value">The value.</param>
/// <returns>byte array of string value</returns>
private byte[] GetByteValue(string value)
{
return System.Text.Encoding.UTF8.GetBytes(value);
}
/// <inheritdoc/>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Releases resources associated with this <see cref="DataFileWriter{T}"/>.
/// </summary>
/// <param name="disposing">
/// True if called from <see cref="Dispose()"/>; false otherwise.
/// </param>
protected virtual void Dispose(bool disposing)
{
Close();
if (disposing && !_leaveOpen)
_stream.Dispose();
}
}
}