blob: a81b62b4a3938ea075d86f5ea0aa7edf2b96b41e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.accord.serializers;
import java.nio.ByteBuffer;
import java.util.Arrays;
import com.google.common.primitives.Ints;
import accord.api.Key;
import accord.local.CommandsForKey;
import accord.local.CommandsForKey.TxnInfo;
import accord.local.CommandsForKey.InternalStatus;
import accord.local.CommandsForKey.Unmanaged;
import accord.local.Node;
import accord.primitives.Routable.Domain;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.utils.Invariants;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.utils.vint.VIntCoding;
import static accord.local.CommandsForKey.NO_PENDING_UNMANAGED;
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
import static accord.primitives.Txn.Kind.Read;
import static accord.primitives.Txn.Kind.Write;
import static accord.utils.ArrayBuffers.cachedInts;
import static accord.utils.ArrayBuffers.cachedTxnIds;
import static org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer.TxnIdFlags.EXTENDED;
import static org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer.TxnIdFlags.EXTENDED_BITS;
import static org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer.TxnIdFlags.RAW;
import static org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer.TxnIdFlags.RAW_BITS;
import static org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer.TxnIdFlags.STANDARD;
import static org.apache.cassandra.utils.ByteBufferUtil.readLeastSignificantBytes;
import static org.apache.cassandra.utils.ByteBufferUtil.writeLeastSignificantBytes;
import static org.apache.cassandra.utils.ByteBufferUtil.writeMostSignificantBytes;
public class CommandsForKeySerializer
{
private static final int HAS_MISSING_DEPS_HEADER_BIT = 0x1;
private static final int HAS_EXECUTE_AT_HEADER_BIT = 0x2;
private static final int HAS_NON_STANDARD_FLAGS = 0x4;
/**
* We read/write a fixed number of intial bytes for each command, with an initial flexible number of flag bits
* and the remainder interpreted as the HLC/epoch/node.
*
* The preamble encodes:
* vint32: number of commands
* vint32: number of unique node Ids
* [unique node ids]
* two flag bytes:
* bit 0 is set if there are any missing ids;
* bit 1 is set if there are any executeAt specified
* bit 2 is set if there are any queries present besides reads/writes
* bits 3-4 number of header bytes to read for each command
* bits 5-6: level 0 extra hlc bytes to read
* bits 7-8: level 1 extra hlc bytes to read (+ 1 + level 0)
* bits 9-10: level 2 extra hlc bytes to read (+ 1 + level 1)
* bits 12-13: level 3 extra hlc bytes to read (+ 1 + level 2)
*
* In order, for each command, we consume:
* 3 bits for the InternalStatus of the command
* 1 optional bit: if the status encodes an executeAt, indicating if the executeAt is not the TxnId
* 1 optional bit: if the status encodes any dependencies and there are non-zero missing ids, indicating if there are any missing for this command
* 1 or 2 bits for the kind of the TxnId: 0=key read, 1=key write, 2=exclusive sync point,3=read 16 bits
* 1 bit encoding if the epoch has changed
* 2 optional bits: if the prior bit is set, indicating how many bits should be read for the epoch increment: 0=none (increment by 1); 1=4, 2=8, 3=32
* 4 option bits: if prior bits=01, epoch delta
* N node id bits (where 2^N unique node ids in the CFK)
* 2 bits indicating how many more payload bytes should be read, with mapping written in header
* all remaining bits are interpreted as a delta from the prior HLC
*
* if txnId kind flag is 3, read an additional 2 bytes for TxnId flag
* if epoch increment flag is 2 or 3, read additional 1 or 4 bytes for epoch delta
* if executeAt is expected, read vint32 for epoch, vint32 for delta from txnId hlc, and ceil(N/8) bytes for node id
*
* After writing all transactions, we then write out the missing txnid collections. This is written at the end
* so that on deserialization we have already read all of the TxnId. This also permits more efficient serialization,
* as we can encode a single bit stream with the optimal number of bits.
* TODO (desired): we could prefix this collection with the subset of TxnId that are actually missing from any other
* deps, so as to shrink this collection much further.
*/
// TODO (expected): offer filtering option that does not need to reconstruct objects/info, reusing prior encoding decisions
// TODO (expected): accept new redundantBefore on load to avoid deserializing stale data
// TODO (desired): determine timestamp resolution as a factor of 10
public static ByteBuffer toBytesWithoutKey(CommandsForKey cfk)
{
int commandCount = cfk.size();
if (commandCount == 0)
return ByteBuffer.allocate(1);
int[] nodeIds = cachedInts().getInts(Math.min(64, commandCount));
try
{
// first compute the unique Node Ids and some basic characteristics of the data, such as
// whether we have any missing transactions to encode, any executeAt that are not equal to their TxnId
// and whether there are any non-standard flag bits to encode
boolean hasNonStandardFlags = false;
int nodeIdCount = 0, missingIdCount = 0, executeAtCount = 0, bitsPerExecuteAtFlags = 0;
int bitsPerExecuteAtEpochDelta = 0, bitsPerExecuteAtHlcDelta = 1; // to permit us to use full 64 bits and encode in 5 bits we force at least one hlc bit
{
for (int i = 0 ; i < commandCount ; ++i)
{
if (nodeIdCount + 1 >= nodeIds.length)
{
nodeIdCount = compact(nodeIds);
if (nodeIdCount > nodeIds.length/2)
nodeIds = cachedInts().resize(nodeIds, nodeIds.length, nodeIds.length * 2);
}
TxnInfo txn = cfk.get(i);
hasNonStandardFlags |= txnIdFlags(txn) != STANDARD;
nodeIds[nodeIdCount++] = txn.node.id;
missingIdCount += txn.missing().length;
if (txn.executeAt == txn)
continue;
nodeIds[nodeIdCount++] = txn.executeAt.node.id;
bitsPerExecuteAtEpochDelta = Math.max(bitsPerExecuteAtEpochDelta, numberOfBitsToRepresent(txn.executeAt.epoch() - txn.epoch()));
bitsPerExecuteAtHlcDelta = Math.max(bitsPerExecuteAtHlcDelta, numberOfBitsToRepresent(txn.executeAt.hlc() - txn.hlc()));
bitsPerExecuteAtFlags = Math.max(bitsPerExecuteAtFlags, numberOfBitsToRepresent(txn.executeAt.flags()));
executeAtCount += 1;
}
nodeIdCount = compact(nodeIds);
Invariants.checkState(nodeIdCount > 0);
}
// We can now use this information to calculate the fixed header size, compute the amount
// of additional space we'll need to store the TxnId and its basic info
int bitsPerNodeId = numberOfBitsToRepresent(nodeIdCount);
int minHeaderBits = 7 + bitsPerNodeId + (hasNonStandardFlags ? 1 : 0);
int infoHeaderBits = (executeAtCount > 0 ? 1 : 0) + (missingIdCount > 0 ? 1 : 0);
int maxHeaderBits = minHeaderBits;
int totalBytes = 0;
long prevEpoch = cfk.get(0).epoch();
long prevHlc = cfk.get(0).hlc();
int[] bytesHistogram = cachedInts().getInts(12);
Arrays.fill(bytesHistogram, 0);
for (int i = 0 ; i < commandCount ; ++i)
{
int headerBits = minHeaderBits;
int payloadBits = 0;
TxnId txnId = cfk.txnId(i);
{
long epoch = txnId.epoch();
Invariants.checkState(epoch >= prevEpoch);
long epochDelta = epoch - prevEpoch;
long hlc = txnId.hlc();
long hlcDelta = hlc - prevHlc;
if (epochDelta > 0)
{
if (hlcDelta < 0)
hlcDelta = -1 - hlcDelta;
headerBits += 3;
if (epochDelta > 1)
{
if (epochDelta <= 0xf) headerBits += 4;
else if (epochDelta <= 0xff) totalBytes += 1;
else { totalBytes += 4; Invariants.checkState(epochDelta <= 0xffffffffL); }
}
}
payloadBits += numberOfBitsToRepresent(hlcDelta);
prevEpoch = epoch;
prevHlc = hlc;
}
if (hasNonStandardFlags && txnIdFlags(txnId) == RAW)
totalBytes += 2;
TxnInfo info = cfk.get(i);
if (info.status.hasInfo)
headerBits += infoHeaderBits;
maxHeaderBits = Math.max(headerBits, maxHeaderBits);
int basicBytes = (headerBits + payloadBits + 7)/8;
bytesHistogram[basicBytes]++;
}
int minBasicBytes = -1, maxBasicBytes = 0;
for (int i = 0 ; i < bytesHistogram.length ; ++i)
{
if (bytesHistogram[i] == 0) continue;
if (minBasicBytes == -1) minBasicBytes = i;
maxBasicBytes = i;
}
for (int i = minBasicBytes + 1 ; i <= maxBasicBytes ; ++i)
bytesHistogram[i] += bytesHistogram[i-1];
int flags = (missingIdCount > 0 ? HAS_MISSING_DEPS_HEADER_BIT : 0)
| (executeAtCount > 0 ? HAS_EXECUTE_AT_HEADER_BIT : 0)
| (hasNonStandardFlags ? HAS_NON_STANDARD_FLAGS : 0);
int headerBytes = (maxHeaderBits+7)/8;
flags |= Invariants.checkArgument(headerBytes - 1, headerBytes <= 4) << 3;
int hlcBytesLookup;
{ // 2bits per size, first value may be zero and remainder may be increments of 1-4;
// only need to be able to encode a distribution of approx. 8 bytes at most, so
// pick lowest number we need first, then next lowest as 25th %ile while ensuring value of 1-4;
// then pick highest number we need, ensuring at least 2 greater than second (leaving room for third)
// then pick third number as 75th %ile, but at least 1 less than highest, and one more than second
// finally, ensure third then second are distributed so that there is no more than a gap of 4 between them and the next
int l0 = Math.max(0, Math.min(3, minBasicBytes - headerBytes));
int l1 = Arrays.binarySearch(bytesHistogram, minBasicBytes, maxBasicBytes, commandCount/4);
l1 = Math.max(l0+1, Math.min(l0+4, (l1 < 0 ? -1 - l1 : l1) - headerBytes));
int l3 = Math.max(l1+2, maxBasicBytes - headerBytes);
int l2 = Arrays.binarySearch(bytesHistogram, minBasicBytes, maxBasicBytes,(3*commandCount)/4);
l2 = Math.max(l1+1, Math.min(l3-1, (l2 < 0 ? -1 -l2 : l2) - headerBytes));
while (l3-l2 > 4) ++l2;
while (l2-l1 > 4) ++l1;
hlcBytesLookup = setHlcBytes(l0, l1, l2, l3);
flags |= (l0 | ((l1-(1+l0))<<2) | ((l2-(1+l1))<<4) | ((l3-(1+l2))<<6)) << 5;
}
int hlcFlagLookup = hlcBytesLookupToHlcFlagLookup(hlcBytesLookup);
totalBytes += bytesHistogram[minBasicBytes] * (headerBytes + getHlcBytes(hlcBytesLookup, getHlcFlag(hlcFlagLookup, minBasicBytes - headerBytes)));
for (int i = minBasicBytes + 1 ; i <= maxBasicBytes ; ++i)
totalBytes += (bytesHistogram[i] - bytesHistogram[i-1]) * (headerBytes + getHlcBytes(hlcBytesLookup, getHlcFlag(hlcFlagLookup, i - headerBytes)));
totalBytes += TypeSizes.sizeofUnsignedVInt(commandCount);
totalBytes += TypeSizes.sizeofUnsignedVInt(nodeIdCount);
totalBytes += TypeSizes.sizeofUnsignedVInt(nodeIds[0]);
for (int i = 1 ; i < nodeIdCount ; ++i)
totalBytes += TypeSizes.sizeofUnsignedVInt(nodeIds[i] - nodeIds[i-1]);
totalBytes += 2;
cachedInts().forceDiscard(bytesHistogram);
prevEpoch = cfk.get(0).epoch();
prevHlc = cfk.get(0).hlc();
// account for encoding redundantBefore
totalBytes += TypeSizes.sizeofUnsignedVInt(prevEpoch);
totalBytes += TypeSizes.sizeofUnsignedVInt(prevHlc);
if (missingIdCount + executeAtCount > 0)
{
// account for encoding missing id stream
int missingIdBits = 1 + numberOfBitsToRepresent(commandCount);
int executeAtBits = bitsPerNodeId
+ bitsPerExecuteAtEpochDelta
+ bitsPerExecuteAtHlcDelta
+ bitsPerExecuteAtFlags;
totalBytes += (missingIdBits * missingIdCount + executeAtBits * executeAtCount + 7)/8;
if (executeAtCount > 0)
totalBytes += 2;
}
// count unmanaged bytes
int unmanagedPendingCommitCount = 0;
for (int i = 0 ; i < cfk.unmanagedCount() ; ++i)
{
Unmanaged unmanaged = cfk.getUnmanaged(i);
if (unmanaged.pending == Unmanaged.Pending.COMMIT)
++unmanagedPendingCommitCount;
totalBytes += CommandSerializers.txnId.serializedSize();
// TODO (desired): this could be more efficient, e.g. referencing one of the TxnInfo indexes for timestamp
totalBytes += CommandSerializers.timestamp.serializedSize();
}
totalBytes += TypeSizes.sizeofUnsignedVInt(unmanagedPendingCommitCount);
totalBytes += TypeSizes.sizeofUnsignedVInt(cfk.unmanagedCount() - unmanagedPendingCommitCount);
ByteBuffer out = ByteBuffer.allocate(totalBytes);
VIntCoding.writeUnsignedVInt32(commandCount, out);
VIntCoding.writeUnsignedVInt32(nodeIdCount, out);
VIntCoding.writeUnsignedVInt32(nodeIds[0], out);
for (int i = 1 ; i < nodeIdCount ; ++i) // TODO (desired): can encode more efficiently as a stream of N bit integers
VIntCoding.writeUnsignedVInt32(nodeIds[i] - nodeIds[i-1], out);
out.putShort((short)flags);
VIntCoding.writeUnsignedVInt(prevEpoch, out);
VIntCoding.writeUnsignedVInt(prevHlc, out);
int executeAtMask = executeAtCount > 0 ? 1 : 0;
int missingDepsMask = missingIdCount > 0 ? 1 : 0;
int flagsIncrement = hasNonStandardFlags ? 2 : 1;
// TODO (desired): check this loop compiles correctly to only branch on epoch case, for binarySearch and flushing
for (int i = 0 ; i < commandCount ; ++i)
{
TxnId txnId = cfk.txnId(i);
TxnInfo info = cfk.get(i);
InternalStatus status = info.status;
long bits = status.ordinal();
int bitIndex = 3;
int statusHasInfo = status.hasInfo ? 1 : 0;
long hasExecuteAt = info.executeAt != null & info.executeAt != txnId ? 1 : 0;
bits |= hasExecuteAt << bitIndex;
bitIndex += statusHasInfo & executeAtMask;
long hasMissingIds = info.missing() != CommandsForKey.NO_TXNIDS ? 1 : 0;
bits |= hasMissingIds << bitIndex;
bitIndex += statusHasInfo & missingDepsMask;
long flagBits = txnIdFlagsBits(txnId);
boolean writeFullFlags = flagBits == RAW_BITS;
bits |= flagBits << bitIndex;
bitIndex += flagsIncrement;
long hlcBits;
int extraEpochDeltaBytes = 0;
{
long epoch = txnId.epoch();
long delta = epoch - prevEpoch;
long hlc = txnId.hlc();
hlcBits = hlc - prevHlc;
if (delta == 0)
{
bitIndex++;
}
else
{
bits |= 1L << bitIndex++;
if (hlcBits < 0)
{
hlcBits = -1 - hlcBits;
bits |= 1L << bitIndex;
}
bitIndex++;
if (delta > 1)
{
if (delta <= 0xf)
{
bits |= 1L << bitIndex;
bits |= delta << (bitIndex + 2);
bitIndex += 4;
}
else
{
bits |= (delta <= 0xff ? 2L : 3L) << bitIndex;
extraEpochDeltaBytes = Ints.checkedCast(delta);
}
}
bitIndex += 2;
}
prevEpoch = epoch;
prevHlc = hlc;
}
bits |= ((long)Arrays.binarySearch(nodeIds, 0, nodeIdCount, txnId.node.id)) << bitIndex;
bitIndex += bitsPerNodeId;
bits |= hlcBits << (bitIndex + 2);
hlcBits >>>= 8*headerBytes - (bitIndex + 2);
int hlcFlag = getHlcFlag(hlcFlagLookup, (7 + numberOfBitsToRepresent(hlcBits))/8);
bits |= ((long)hlcFlag) << bitIndex;
writeLeastSignificantBytes(bits, headerBytes, out);
writeLeastSignificantBytes(hlcBits, getHlcBytes(hlcBytesLookup, hlcFlag), out);
if (writeFullFlags)
out.putShort((short)txnId.flags());
if (extraEpochDeltaBytes > 0)
{
if (extraEpochDeltaBytes <= 0xff) out.put((byte)extraEpochDeltaBytes);
else out.putInt(extraEpochDeltaBytes);
}
}
VIntCoding.writeUnsignedVInt32(unmanagedPendingCommitCount, out);
VIntCoding.writeUnsignedVInt32(cfk.unmanagedCount() - unmanagedPendingCommitCount, out);
Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ? Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT;
{
int offset = 0;
for (int i = 0 ; i < cfk.unmanagedCount() ; ++i)
{
Unmanaged unmanaged = cfk.getUnmanaged(i);
Invariants.checkState(unmanaged.pending == pending);
offset += CommandSerializers.txnId.serialize(unmanaged.txnId, out, ByteBufferAccessor.instance, offset);
offset += CommandSerializers.timestamp.serialize(unmanaged.waitingUntil, out, ByteBufferAccessor.instance, offset);
if (--unmanagedPendingCommitCount == 0) pending = Unmanaged.Pending.APPLY;
}
out.position(out.position() + offset);
}
if ((executeAtCount | missingIdCount) > 0)
{
int bitsPerCommandId = numberOfBitsToRepresent(commandCount);
int bitsPerMissingId = 1 + bitsPerCommandId;
int bitsPerExecuteAt = bitsPerExecuteAtEpochDelta + bitsPerExecuteAtHlcDelta + bitsPerExecuteAtFlags + bitsPerNodeId;
Invariants.checkState(bitsPerExecuteAtEpochDelta < 64);
Invariants.checkState(bitsPerExecuteAtHlcDelta <= 64);
Invariants.checkState(bitsPerExecuteAtFlags <= 16);
if (executeAtMask > 0) // we encode both 15 and 16 bits for flag length as 15 to fit in a short
out.putShort((short) ((bitsPerExecuteAtEpochDelta << 10) | ((bitsPerExecuteAtHlcDelta-1) << 4) | (Math.min(15, bitsPerExecuteAtFlags))));
long buffer = 0L;
int bufferCount = 0;
for (int i = 0 ; i < commandCount ; ++i)
{
TxnInfo txn = cfk.get(i);
if (txn.executeAt != txn)
{
Timestamp executeAt = txn.executeAt;
int nodeIdx = Arrays.binarySearch(nodeIds, 0, nodeIdCount, executeAt.node.id);
if (bitsPerExecuteAt <= 64)
{
Invariants.checkState(executeAt.epoch() >= txn.epoch());
long executeAtBits = executeAt.epoch() - txn.epoch();
int offset = bitsPerExecuteAtEpochDelta;
executeAtBits |= (executeAt.hlc() - txn.hlc()) << offset ;
offset += bitsPerExecuteAtHlcDelta;
executeAtBits |= ((long)executeAt.flags()) << offset;
offset += bitsPerExecuteAtFlags;
executeAtBits |= ((long)nodeIdx) << offset;
buffer = flushBits(buffer, bufferCount, executeAtBits, bitsPerExecuteAt, out);
bufferCount = (bufferCount + bitsPerExecuteAt) & 63;
}
else
{
buffer = flushBits(buffer, bufferCount, executeAt.epoch() - txn.epoch(), bitsPerExecuteAtEpochDelta, out);
bufferCount = (bufferCount + bitsPerExecuteAtEpochDelta) & 63;
buffer = flushBits(buffer, bufferCount, executeAt.hlc() - txn.hlc(), bitsPerExecuteAtHlcDelta, out);
bufferCount = (bufferCount + bitsPerExecuteAtHlcDelta) & 63;
buffer = flushBits(buffer, bufferCount, executeAt.flags(), bitsPerExecuteAtFlags, out);
bufferCount = (bufferCount + bitsPerExecuteAtFlags) & 63;
buffer = flushBits(buffer, bufferCount, nodeIdx, bitsPerNodeId, out);
bufferCount = (bufferCount + bitsPerNodeId) & 63;
}
}
TxnId[] missing = txn.missing();
if (missing.length > 0)
{
int j = 0;
while (j < missing.length - 1)
{
int missingId = cfk.indexOf(missing[j++]);
buffer = flushBits(buffer, bufferCount, missingId, bitsPerMissingId, out);
bufferCount = (bufferCount + bitsPerMissingId) & 63;
}
int missingId = cfk.indexOf(missing[missing.length - 1]);
missingId |= 1L << bitsPerCommandId;
buffer = flushBits(buffer, bufferCount, missingId, bitsPerMissingId, out);
bufferCount = (bufferCount + bitsPerMissingId) & 63;
}
}
writeMostSignificantBytes(buffer, (bufferCount + 7)/8, out);
}
Invariants.checkState(!out.hasRemaining());
out.flip();
return out;
}
finally
{
cachedInts().forceDiscard(nodeIds);
}
}
private static long flushBits(long buffer, int bufferCount, long add, int addCount, ByteBuffer out)
{
Invariants.checkArgument(addCount == 64 || 0 == (add & (-1L << addCount)));
int total = bufferCount + addCount;
if (total < 64)
{
return buffer | (add << 64 - total);
}
else
{
buffer |= add >>> total - 64;
out.putLong(buffer);
return total == 64 ? 0 : (add << (128 - total));
}
}
public static CommandsForKey fromBytes(Key key, ByteBuffer in)
{
if (!in.hasRemaining())
return null;
in = in.duplicate();
int commandCount = VIntCoding.readUnsignedVInt32(in);
if (commandCount == 0)
return new CommandsForKey(key);
TxnId[] txnIds = cachedTxnIds().get(commandCount);
TxnInfo[] txns = new TxnInfo[commandCount];
int nodeIdCount = VIntCoding.readUnsignedVInt32(in);
int bitsPerNodeId = numberOfBitsToRepresent(nodeIdCount);
long nodeIdMask = (1L << bitsPerNodeId) - 1;
Node.Id[] nodeIds = new Node.Id[nodeIdCount]; // TODO (expected): use a shared reusable scratch buffer
{
int prev = VIntCoding.readUnsignedVInt32(in);
nodeIds[0] = new Node.Id(prev);
for (int i = 1 ; i < nodeIdCount ; ++i)
nodeIds[i] = new Node.Id(prev += VIntCoding.readUnsignedVInt32(in));
}
int missingDepsMasks, executeAtMasks, txnIdFlagsMask;
int headerByteCount, hlcBytesLookup;
{
int flags = in.getShort();
missingDepsMasks = 0 != (flags & HAS_MISSING_DEPS_HEADER_BIT) ? 1 : 0;
executeAtMasks = 0 != (flags & HAS_EXECUTE_AT_HEADER_BIT) ? 1 : 0;
txnIdFlagsMask = 0 != (flags & HAS_NON_STANDARD_FLAGS) ? 3 : 1;
headerByteCount = 1 + ((flags >>> 3) & 0x3);
hlcBytesLookup = setHlcByteDeltas((flags >>> 5) & 0x3, (flags >>> 7) & 0x3, (flags >>> 9) & 0x3, (flags >>> 11) & 0x3);
}
long prevEpoch = VIntCoding.readUnsignedVInt(in);
long prevHlc = VIntCoding.readUnsignedVInt(in);
for (int i = 0 ; i < commandCount ; ++i)
{
long header = readLeastSignificantBytes(headerByteCount, in);
header |= 1L << (8 * headerByteCount); // marker so we know where to shift-left most-significant bytes to
InternalStatus status = InternalStatus.get((int) (header & 0x7));
header >>>= 3;
int executeAtInfoOffset, missingDepsInfoOffset;
{
int infoMask = status.hasInfo ? 1 : 0;
int executeAtMask = infoMask & executeAtMasks, missingDepsMask = infoMask & missingDepsMasks;
executeAtInfoOffset = ((int)header & executeAtMask) << 1;
header >>>= executeAtMask;
missingDepsInfoOffset = (int)header & missingDepsMask;
header >>>= missingDepsMask;
}
Txn.Kind kind = TXN_ID_FLAG_BITS_KIND_LOOKUP[((int)header & txnIdFlagsMask)];
header >>>= Integer.bitCount(txnIdFlagsMask);
boolean hlcIsNegative = false;
long epoch = prevEpoch;
int readEpochBytes = 0;
{
boolean hasEpochDelta = (header & 1) == 1;
header >>>= 1;
if (hasEpochDelta)
{
hlcIsNegative = (header & 1) == 1;
header >>>= 1;
int epochFlag = ((int)header & 0x3);
header >>>= 2;
switch (epochFlag)
{
default: throw new AssertionError("Unexpected value not 0-3");
case 0: ++epoch; break;
case 1: epoch += (header & 0xf); header >>>= 4; break;
case 2: readEpochBytes = 1; break;
case 3: readEpochBytes = 4; break;
}
}
}
Node.Id node = nodeIds[(int)(header & nodeIdMask)];
header >>>= bitsPerNodeId;
int readHlcBytes = getHlcBytes(hlcBytesLookup, (int)(header & 0x3));
header >>>= 2;
long hlc = header;
{
long highestBit = Long.highestOneBit(hlc);
hlc ^= highestBit;
int hlcShift = Long.numberOfTrailingZeros(highestBit);
hlc |= readLeastSignificantBytes(readHlcBytes, in) << hlcShift;
}
if (hlcIsNegative)
hlc = -1-hlc;
hlc += prevHlc;
int flags = kind != null ? 0 : in.getShort();
if (readEpochBytes > 0)
epoch += readEpochBytes == 1 ? (in.get() & 0xff) : in.getInt();
TxnId txnId = kind != null ? new TxnId(epoch, hlc, kind, Domain.Key, node)
: TxnId.fromValues(epoch, hlc, flags, node);
txnIds[i] = txnId;
txns[i] = DECODE_INFOS[(executeAtInfoOffset | missingDepsInfoOffset)*STATUS_COUNT + status.ordinal()];
prevEpoch = epoch;
prevHlc = hlc;
}
int unmanagedPendingCommitCount = VIntCoding.readUnsignedVInt32(in);
int unmanagedCount = unmanagedPendingCommitCount + VIntCoding.readUnsignedVInt32(in);
Unmanaged[] unmanageds;
if (unmanagedCount == 0)
{
unmanageds = NO_PENDING_UNMANAGED;
}
else
{
unmanageds = new Unmanaged[unmanagedCount];
Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ? Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT;
int offset = 0;
for (int i = 0 ; i < unmanagedCount ; ++i)
{
TxnId txnId = CommandSerializers.txnId.deserialize(in, ByteBufferAccessor.instance, offset);
offset += CommandSerializers.txnId.serializedSize();
Timestamp waitingUntil = CommandSerializers.timestamp.deserialize(in, ByteBufferAccessor.instance, offset);
offset += CommandSerializers.timestamp.serializedSize();
unmanageds[i] = new Unmanaged(pending, txnId, waitingUntil);
if (--unmanagedPendingCommitCount == 0) pending = Unmanaged.Pending.APPLY;
}
in.position(in.position() + offset);
}
if (executeAtMasks + missingDepsMasks > 0)
{
TxnId[] missingIdBuffer = cachedTxnIds().get(8);
int missingIdCount = 0, maxIdBufferCount = 0;
int bitsPerTxnId = numberOfBitsToRepresent(commandCount);
int txnIdMask = (1 << bitsPerTxnId) - 1;
int bitsPerMissingId = bitsPerTxnId + 1;
int decodeBits = executeAtMasks > 0 ? in.getShort() & 0xffff : 0;
int bitsPerEpochDelta = decodeBits >>> 10;
int bitsPerHlcDelta = 1 + ((decodeBits >>> 4) & 0x3f);
int bitsPerFlags = decodeBits & 0xf;
if (bitsPerFlags == 15) bitsPerFlags = 16;
int bitsPerExecuteAt = bitsPerEpochDelta + bitsPerHlcDelta + bitsPerFlags + bitsPerNodeId;
long epochDeltaMask = bitsPerEpochDelta == 0 ? 0 : (-1L >>> (64 - bitsPerEpochDelta));
long hlcDeltaMask = (-1L >>> (64 - bitsPerHlcDelta));
long flagsMask = bitsPerFlags == 0 ? 0 : (-1L >>> (64 - bitsPerFlags));
final BitReader reader = new BitReader();
for (int i = 0 ; i < commandCount ; ++i)
{
TxnId txnId = txnIds[i];
TxnInfo placeholder = txns[i];
Timestamp executeAt;
if (placeholder.executeAt == null)
{
long epoch, hlc;
int flags;
Node.Id id;
if (bitsPerExecuteAt <= 64)
{
long executeAtBits = reader.read(bitsPerExecuteAt, in);
epoch = txnId.epoch() + (executeAtBits & epochDeltaMask);
executeAtBits >>>= bitsPerEpochDelta;
hlc = txnId.hlc() + (executeAtBits & hlcDeltaMask);
executeAtBits >>>= bitsPerHlcDelta;
flags = (int)(executeAtBits & flagsMask);
executeAtBits >>>= bitsPerFlags;
id = nodeIds[(int)(executeAtBits & nodeIdMask)];
}
else
{
epoch = txnId.epoch() + reader.read(bitsPerEpochDelta, in);
hlc = txnId.hlc() + reader.read(bitsPerHlcDelta, in);
flags = (int) reader.read(bitsPerFlags, in);
id = nodeIds[(int)(reader.read(bitsPerNodeId, in))];
}
executeAt = Timestamp.fromValues(epoch, hlc, flags, id);
}
else
{
executeAt = txnId;
}
TxnId[] missing = placeholder.missing();
if (missing == null)
{
int prev = -1;
while (true)
{
if (missingIdCount == missingIdBuffer.length)
missingIdBuffer = cachedTxnIds().resize(missingIdBuffer, missingIdCount, missingIdCount * 2);
int next = (int) reader.read(bitsPerMissingId, in);
Invariants.checkState(next > prev);
missingIdBuffer[missingIdCount++] = txnIds[next & txnIdMask];
if (next >= commandCount)
break; // finished this array
prev = next;
}
missing = Arrays.copyOf(missingIdBuffer, missingIdCount);
maxIdBufferCount = missingIdCount;
missingIdCount = 0;
}
txns[i] = TxnInfo.create(txnId, placeholder.status, executeAt, missing);
}
cachedTxnIds().forceDiscard(missingIdBuffer, maxIdBufferCount);
}
else
{
for (int i = 0 ; i < commandCount ; ++i)
txns[i] = TxnInfo.create(txnIds[i], txns[i].status, txnIds[i]);
}
cachedTxnIds().forceDiscard(txnIds, commandCount);
return CommandsForKey.SerializerSupport.create(key, txns, unmanageds);
}
private static int getHlcBytes(int lookup, int index)
{
return (lookup >>> (index * 4)) & 0xf;
}
private static int setHlcBytes(int value1, int value2, int value3, int value4)
{
return value1 | (value2 << 4) | (value3 << 8) | (value4 << 12);
}
private static int setHlcByteDeltas(int value1, int value2, int value3, int value4)
{
value2 += 1 + value1;
value3 += 1 + value2;
value4 += 1 + value3;
return setHlcBytes(value1, value2, value3, value4);
}
private static int getHlcFlag(int flagsLookup, int bytes)
{
return (flagsLookup >>> (bytes * 2)) & 0x3;
}
private static int hlcBytesLookupToHlcFlagLookup(int bytesLookup)
{
int flagsLookup = 0;
int flagIndex = 0;
for (int bytesIndex = 0 ; bytesIndex < 4 ; bytesIndex++)
{
int flagLimit = getHlcBytes(bytesLookup, bytesIndex);
while (flagIndex <= flagLimit)
flagsLookup |= bytesIndex << (2 * flagIndex++);
}
return flagsLookup;
}
private static int compact(int[] buffer)
{
Arrays.sort(buffer);
int count = 0;
int j = 0;
while (j < buffer.length)
{
int prev;
buffer[count++] = prev = buffer[j];
while (++j < buffer.length && buffer[j] == prev) {}
}
return count;
}
private static int numberOfBitsToRepresent(long value)
{
return 64 - Long.numberOfLeadingZeros(value);
}
private static int numberOfBitsToRepresent(int value)
{
return 32 - Integer.numberOfLeadingZeros(value);
}
static final class BitReader
{
private long bitBuffer;
private int bitCount;
long read(int readCount, ByteBuffer in)
{
long result = bitBuffer >>> (64 - readCount);
int remaining = bitCount - readCount;
if (remaining >= 0)
{
bitBuffer <<= readCount;
bitCount = remaining;
}
else if (in.remaining() >= 8)
{
readCount -= bitCount;
bitBuffer = in.getLong();
bitCount = 64 - readCount;
result |= (bitBuffer >>> bitCount);
bitBuffer <<= readCount;
}
else
{
readCount -= bitCount;
while (readCount > 8)
{
long next = in.get() & 0xff;
readCount -= 8;
result |= next << readCount;
}
long next = in.get() & 0xff;
bitCount = 8 - readCount;
result |= next >>> bitCount;
bitBuffer = next << (64 - bitCount);
}
return result;
}
}
enum TxnIdFlags
{
STANDARD, EXTENDED, RAW;
static final int EXTENDED_BITS = 0x2;
static final int RAW_BITS = 0x3;
}
private static TxnIdFlags txnIdFlags(TxnId txnId)
{
if (txnId.flags() > Timestamp.IDENTITY_FLAGS || txnId.domain() != Domain.Key)
return RAW;
switch (txnId.kind())
{
default: throw new AssertionError("Unhandled Kind: " + txnId.kind());
case Read:
case Write:
return STANDARD;
case ExclusiveSyncPoint:
return EXTENDED;
case SyncPoint:
case LocalOnly:
case EphemeralRead:
return RAW;
}
}
private static long txnIdFlagsBits(TxnId txnId)
{
switch (txnIdFlags(txnId))
{
default: throw new AssertionError("Unhandled TxnIdFlag: " + txnIdFlags(txnId));
case RAW: return RAW_BITS;
case EXTENDED: return EXTENDED_BITS;
case STANDARD:
return txnId.kind() == Read ? 0 : 1;
}
}
private static final Txn.Kind[] TXN_ID_FLAG_BITS_KIND_LOOKUP = new Txn.Kind[] { Read, Write, ExclusiveSyncPoint, null };
private static final int STATUS_COUNT = InternalStatus.values().length;
private static final TxnInfo[] DECODE_INFOS = new TxnInfo[4 * STATUS_COUNT];
static
{
for (InternalStatus status : InternalStatus.values())
{
int ordinal = status.ordinal();
DECODE_INFOS[ordinal] = TxnInfo.createMock(TxnId.NONE, status, TxnId.NONE, CommandsForKey.NO_TXNIDS);
DECODE_INFOS[STATUS_COUNT+ordinal] = TxnInfo.createMock(TxnId.NONE, status, TxnId.NONE, null);
DECODE_INFOS[2*STATUS_COUNT+ordinal] = TxnInfo.createMock(TxnId.NONE, status, null, CommandsForKey.NO_TXNIDS);
DECODE_INFOS[3*STATUS_COUNT+ordinal] = TxnInfo.createMock(TxnId.NONE, status, null, null);
}
}
}