blob: e59491dd6ffa936776eda94c553eb218f75fb1ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Internal.Table.Serialization
{
using System;
using System.Buffers.Binary;
using System.Collections.Generic;
using Buffers;
using Proto.MsgPack;
/// <summary>
/// Generic record serializer.
/// Works for tuples and user objects, any differences are handled by the underlying <see cref="IRecordSerializerHandler{T}"/>.
/// </summary>
/// <typeparam name="T">Record type.</typeparam>
internal sealed class RecordSerializer<T>
{
/** Table. */
private readonly Table _table;
/** Serialization handler. */
private readonly IRecordSerializerHandler<T> _handler;
/// <summary>
/// Initializes a new instance of the <see cref="RecordSerializer{T}"/> class.
/// </summary>
/// <param name="table">Table.</param>
/// <param name="handler">Handler.</param>
public RecordSerializer(Table table, IRecordSerializerHandler<T> handler)
{
_table = table;
_handler = handler;
}
/// <summary>
/// Gets the handler.
/// </summary>
public IRecordSerializerHandler<T> Handler => _handler;
/// <summary>
/// Reads the value part.
/// </summary>
/// <param name="buf">Buffer.</param>
/// <param name="schema">Schema or null when there is no value.</param>
/// <returns>Resulting record with key and value parts.</returns>
public Option<T> ReadValue(PooledBuffer buf, Schema schema)
{
var r = buf.GetReader();
r.Skip(); // Skip schema version.
return r.TryReadNil()
? default
: Option.Some(_handler.Read(ref r, schema));
}
/// <summary>
/// Read multiple records.
/// </summary>
/// <param name="buf">Buffer.</param>
/// <param name="schema">Schema or null when there is no value.</param>
/// <param name="keyOnly">Key only mode.</param>
/// <param name="resultFactory">Result factory.</param>
/// <param name="addAction">Adds items to the result.</param>
/// <typeparam name="TRes">Result type.</typeparam>
/// <returns>List of records.</returns>
public TRes ReadMultiple<TRes>(
PooledBuffer buf,
Schema? schema,
bool keyOnly,
Func<int, TRes> resultFactory,
Action<TRes, T> addAction)
{
if (schema == null)
{
// Null schema means empty collection.
return resultFactory(0);
}
// Skip schema version.
var r = buf.GetReader();
r.Skip();
var count = r.ReadInt32();
var res = resultFactory(count);
for (var i = 0; i < count; i++)
{
addAction(res, _handler.Read(ref r, schema, keyOnly));
}
return res;
}
/// <summary>
/// Read multiple records, where some of them might be null.
/// </summary>
/// <param name="buf">Buffer.</param>
/// <param name="schema">Schema or null when there is no value.</param>
/// <param name="resultFactory">Result factory.</param>
/// <param name="addAction">Adds items to the result.</param>
/// <typeparam name="TRes">Result type.</typeparam>
/// <returns>List of records.</returns>
public TRes ReadMultipleNullable<TRes>(
PooledBuffer buf,
Schema? schema,
Func<int, TRes> resultFactory,
Action<TRes, Option<T>> addAction)
{
if (schema == null)
{
// Null schema means empty collection.
return resultFactory(0);
}
// Skip schema version.
var r = buf.GetReader();
r.Skip();
var count = r.ReadInt32();
var res = resultFactory(count);
for (var i = 0; i < count; i++)
{
var option = r.ReadBoolean()
? Option.Some(_handler.Read(ref r, schema))
: default;
addAction(res, option);
}
return res;
}
/// <summary>
/// Write record.
/// </summary>
/// <param name="buf">Buffer.</param>
/// <param name="txId">Transaction id.</param>
/// <param name="schema">Schema.</param>
/// <param name="rec">Record.</param>
/// <param name="keyOnly">Key only columns.</param>
/// <returns>Colocation hash.</returns>
public (int ColocationHash, int TxIdPos) Write(
PooledArrayBuffer buf,
long? txId,
Schema schema,
T rec,
bool keyOnly = false)
{
var w = buf.MessageWriter;
var colocationHash = WriteWithHeader(ref w, txId, schema, rec, keyOnly);
return colocationHash;
}
/// <summary>
/// Write two records.
/// </summary>
/// <param name="buf">Buffer.</param>
/// <param name="txId">Transaction id.</param>
/// <param name="schema">Schema.</param>
/// <param name="t">Record 1.</param>
/// <param name="t2">Record 2.</param>
/// <param name="keyOnly">Key only columns.</param>
/// <returns>First record hash.</returns>
public (int ColocationHash, int TxIdPos) WriteTwo(
PooledArrayBuffer buf,
long? txId,
Schema schema,
T t,
T t2,
bool keyOnly = false)
{
var w = buf.MessageWriter;
var firstHash = WriteWithHeader(ref w, txId, schema, t, keyOnly);
_handler.Write(ref w, schema, t2, keyOnly);
return firstHash;
}
/// <summary>
/// Write multiple records.
/// </summary>
/// <param name="buf">Buffer.</param>
/// <param name="txId">Transaction.</param>
/// <param name="schema">Schema.</param>
/// <param name="recs">Records.</param>
/// <param name="keyOnly">Key only columns.</param>
/// <returns>First record hash.</returns>
public (int ColocationHash, int TxIdPos) WriteMultiple(
PooledArrayBuffer buf,
long? txId,
Schema schema,
IEnumerator<T> recs,
bool keyOnly = false)
{
var w = buf.MessageWriter;
var txIdPos = WriteIdAndTx(ref w, txId);
w.Write(schema.Version);
var count = 0;
var firstHash = 0;
var countSpan = buf.GetSpan(5);
buf.Advance(5);
do
{
var rec = recs.Current;
if (rec == null)
{
throw new ArgumentException("Record collection can't contain null elements.");
}
var hash = _handler.Write(ref w, schema, rec, keyOnly, computeHash: count == 0);
if (count == 0)
{
firstHash = hash;
}
count++;
}
while (recs.MoveNext()); // First MoveNext is called outside to check for empty IEnumerable.
countSpan[0] = MsgPackCode.Int32;
BinaryPrimitives.WriteInt32BigEndian(countSpan[1..], count);
return (firstHash, txIdPos);
}
/// <summary>
/// Write record with header.
/// </summary>
/// <param name="w">Writer.</param>
/// <param name="txId">Transaction id.</param>
/// <param name="schema">Schema.</param>
/// <param name="rec">Record.</param>
/// <param name="keyOnly">Key only columns.</param>
/// <returns>Colocation hash.</returns>
private (int ColocationHash, int TxIdPos) WriteWithHeader(
ref MsgPackWriter w,
long? txId,
Schema schema,
T rec,
bool keyOnly = false)
{
var txIdPos = WriteIdAndTx(ref w, txId);
w.Write(schema.Version);
var colocationHash = _handler.Write(ref w, schema, rec, keyOnly, computeHash: true);
return (colocationHash, txIdPos);
}
/// <summary>
/// Writes table id and transaction id, if present.
/// </summary>
/// <param name="w">Writer.</param>
/// <param name="txId">Transaction id.</param>
private int WriteIdAndTx(ref MsgPackWriter w, long? txId)
{
w.Write(_table.Id);
var txIdPos = w.Position;
w.WriteTx(txId);
return txIdPos;
}
}
}