blob: 261943ebec6fa826d7b4327d9c95a29b47f22b8a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.schema.row;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.BitSet;
import java.util.UUID;
import org.apache.ignite.internal.schema.AssemblyException;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRow.RowFlags;
import org.apache.ignite.internal.schema.BitmaskNativeType;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.Columns;
import org.apache.ignite.internal.schema.DecimalNativeType;
import org.apache.ignite.internal.schema.NativeType;
import org.apache.ignite.internal.schema.NativeTypeSpec;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.NumberNativeType;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.TemporalNativeType;
import static org.apache.ignite.internal.schema.BinaryRow.RowFlags.KEY_FLAGS_OFFSET;
import static org.apache.ignite.internal.schema.BinaryRow.RowFlags.VAL_FLAGS_OFFSET;
* Utility class to build rows using column appending pattern. The external user of this class must consult
* with the schema and provide the columns in strict internal column sort order during the row construction.
* <p>
* Additionally, the user of this class should pre-calculate the resulting row size when possible to avoid
* unnecessary data copies and allow some size optimizations to be applied.
* <p>
* Natively supported temporal types are encoded automatically with preserving sort order before writing.
* @see #utf8EncodedLength(CharSequence)
* @see TemporalTypesHelper
public class RowAssembler {
/** Schema. */
private final SchemaDescriptor schema;
/** The number of non-null varlen columns in values chunk. */
private final int valVartblLen;
/** Target byte buffer to write to. */
private final ExpandableByteBuf buf;
/** Current columns chunk. */
private Columns curCols;
/** Current field index (the field is unset). */
private int curCol;
/** Current offset for the next column to be appended. */
private int curOff;
/** Index of the current varlen table entry. Incremented each time non-null varlen column is appended. */
private int curVartblEntry;
/** Base offset of the current chunk */
private int baseOff;
/** Offset of the null-map for current chunk. */
private int nullMapOff;
/** Offset of the varlen table for current chunk. */
private int varTblOff;
/** Offset of data for current chunk. */
private int dataOff;
/** Row hashcode. */
private int keyHash;
/** Flags. */
private short flags;
/** Charset encoder for strings. Initialized lazily. */
private CharsetEncoder strEncoder;
* @param entries Number of non-null varlen columns.
* @return Total size of the varlen table.
private static int varTableChunkLength(int entries, int entrySize) {
return entries <= 1 ? 0 : Short.BYTES + (entries - 1) * entrySize;
* Calculates encoded string length.
* @param seq Char sequence.
* @return Encoded string length.
* @implNote This implementation is not tolerant to malformed char sequences.
public static int utf8EncodedLength(CharSequence seq) {
int cnt = 0;
for (int i = 0, len = seq.length(); i < len; i++) {
char ch = seq.charAt(i);
if (ch <= 0x7F)
else if (ch <= 0x7FF)
cnt += 2;
else if (Character.isHighSurrogate(ch)) {
cnt += 4;
cnt += 3;
return cnt;
* Helper method.
* @param rowAsm Writes column value to assembler.
* @param col Column.
* @param val Value.
public static void writeValue(RowAssembler rowAsm, Column col, Object val) {
if (val == null) {
switch (col.type().spec()) {
case INT8: {
case INT16: {
case INT32: {
case INT64: {
case FLOAT: {
case DOUBLE: {
case UUID: {
case STRING: {
case BYTES: {
case BITMASK: {
case NUMBER: {
case DECIMAL: {
throw new IllegalStateException("Unexpected value: " + col.type());
* Calculates byte size for BigInteger value.
public static int sizeInBytes(BigInteger val) {
return val.bitLength() / 8 + 1;
* Calculates byte size for BigDecimal value.
public static int sizeInBytes(BigDecimal val) {
return sizeInBytes(val.unscaledValue());
* Creates RowAssembler for chunks of unknown size.
* <p>
* RowAssembler will use adaptive buffer size and omit some optimizations for small key/value chunks.
* @param schema Row schema.
* @param nonNullVarlenKeyCols Number of non-null varlen columns in key chunk.
* @param nonNullVarlenValCols Number of non-null varlen columns in value chunk.
public RowAssembler(
SchemaDescriptor schema,
int nonNullVarlenKeyCols,
int nonNullVarlenValCols
) {
* Creates RowAssembler for chunks with estimated sizes.
* <p>
* RowAssembler will apply optimizations based on chunks sizes estimations.
* @param schema Row schema.
* @param keyVarlenSize Key payload size. Estimated upper-bound or zero if unknown.
* @param keyVarlenCols Number of non-null varlen columns in key chunk.
* @param valVarlenSize Value data size. Estimated upper-bound or zero if unknown.
* @param valVarlenCols Number of non-null varlen columns in value chunk.
public RowAssembler(
SchemaDescriptor schema,
int keyVarlenSize,
int keyVarlenCols,
int valVarlenSize,
int valVarlenCols
) {
this.schema = schema;
curCols = schema.keyColumns();
curCol = 0;
keyHash = 0;
strEncoder = null;
int keyVartblLen = varTableChunkLength(keyVarlenCols, Integer.BYTES);
valVartblLen = varTableChunkLength(valVarlenCols, Integer.BYTES);
initChunk(BinaryRow.KEY_CHUNK_OFFSET, curCols.nullMapSize(), keyVartblLen);
final Columns valCols = schema.valueColumns();
int size = BinaryRow.HEADER_SIZE + 2 * BinaryRow.CHUNK_LEN_FLD_SIZE +
keyVarlenSize + valVarlenSize +
keyVartblLen + valVartblLen +
curCols.fixsizeMaxLen() + valCols.fixsizeMaxLen() +
curCols.nullMapSize() + valCols.nullMapSize();
buf = new ExpandableByteBuf(size);
buf.putShort(0, (short)schema.version());
* Appends {@code null} value for the current column to the chunk.
* @return {@code this} for chaining.
public RowAssembler appendNull() {
if (!curCols.column(curCol).nullable())
throw new IllegalArgumentException("Failed to set column (null was passed, but column is not nullable): " + curCols.column(curCol));
if (isKeyChunk())
keyHash *= 31;
return this;
* Appends byte value for the current column to the chunk.
* @param val Column value.
* @return {@code this} for chaining.
public RowAssembler appendByte(byte val) {
buf.put(curOff, val);
if (isKeyChunk())
keyHash = 31 * keyHash + Byte.hashCode(val);
return this;
* Appends short value for the current column to the chunk.
* @param val Column value.
* @return {@code this} for chaining.
public RowAssembler appendShort(short val) {
buf.putShort(curOff, val);
if (isKeyChunk())
keyHash = 31 * keyHash + Short.hashCode(val);
return this;
* Appends int value for the current column to the chunk.
* @param val Column value.
* @return {@code this} for chaining.
public RowAssembler appendInt(int val) {
buf.putInt(curOff, val);
if (isKeyChunk())
keyHash = 31 * keyHash + Integer.hashCode(val);
return this;
* Appends long value for the current column to the chunk.
* @param val Column value.
* @return {@code this} for chaining.
public RowAssembler appendLong(long val) {
buf.putLong(curOff, val);
if (isKeyChunk())
keyHash = 31 * keyHash + Long.hashCode(val);
return this;
* Appends float value for the current column to the chunk.
* @param val Column value.
* @return {@code this} for chaining.
public RowAssembler appendFloat(float val) {
buf.putFloat(curOff, val);
if (isKeyChunk())
keyHash = 31 * keyHash + Float.hashCode(val);
return this;
* Appends double value for the current column to the chunk.
* @param val Column value.
* @return {@code this} for chaining.
public RowAssembler appendDouble(double val) {
buf.putDouble(curOff, val);
if (isKeyChunk())
keyHash = 31 * keyHash + Double.hashCode(val);
return this;
* Appends BigInteger value for the current column to the chunk.
* @param val Column value.
* @return {@code this} for chaining.
public RowAssembler appendNumber(BigInteger val) {
Column col = curCols.column(curCol);
NumberNativeType type = (NumberNativeType)col.type();
//0 is a magic number for "unlimited precision"
if (type.precision() > 0 && new BigDecimal(val).precision() > type.precision())
throw new IllegalArgumentException("Failed to set number value for column '" + + "' " +
"(max precision exceeds allocated precision) " +
"[number=" + val + ", max precision=" + type.precision() + "]");
byte[] bytes = val.toByteArray();
buf.putBytes(curOff, bytes);
if (isKeyChunk())
keyHash = 31 * keyHash + Arrays.hashCode(bytes);
writeVarlenOffset(curVartblEntry, curOff - dataOff);
return this;
* Appends BigDecimal value for the current column to the chunk.
* @param val Column value.
* @return {@code this} for chaining.
public RowAssembler appendDecimal(BigDecimal val) {
Column col = curCols.column(curCol);
DecimalNativeType type = (DecimalNativeType)col.type();
val = val.setScale(type.scale(), RoundingMode.HALF_UP);
if (val.precision() > type.precision())
throw new IllegalArgumentException("Failed to set decimal value for column '" + + "' " +
"(max precision exceeds allocated precision)" +
" [decimal=" + val + ", max precision=" + type.precision() + "]");
byte[] bytes = val.unscaledValue().toByteArray();
buf.putBytes(curOff, bytes);
if (isKeyChunk())
keyHash = 31 * keyHash + Arrays.hashCode(bytes);
writeVarlenOffset(curVartblEntry, curOff - dataOff);
return this;
* Appends UUID value for the current column to the chunk.
* @param uuid Column value.
* @return {@code this} for chaining.
public RowAssembler appendUuid(UUID uuid) {
buf.putLong(curOff, uuid.getLeastSignificantBits());
buf.putLong(curOff + 8, uuid.getMostSignificantBits());
if (isKeyChunk())
keyHash = 31 * keyHash + uuid.hashCode();
return this;
* Appends String value for the current column to the chunk.
* @param val Column value.
* @return {@code this} for chaining.
public RowAssembler appendString(String val) {
try {
int written = buf.putString(curOff, val, encoder());
writeVarlenOffset(curVartblEntry, curOff - dataOff);
if (isKeyChunk())
keyHash = 31 * keyHash + val.hashCode();
return this;
catch (CharacterCodingException e) {
throw new AssemblyException("Failed to encode string", e);
* Appends byte[] value for the current column to the chunk.
* @param val Column value.
* @return {@code this} for chaining.
public RowAssembler appendBytes(byte[] val) {
buf.putBytes(curOff, val);
if (isKeyChunk())
keyHash = 31 * keyHash + Arrays.hashCode(val);
writeVarlenOffset(curVartblEntry, curOff - dataOff);
return this;
* Appends BitSet value for the current column to the chunk.
* @param bitSet Column value.
* @return {@code this} for chaining.
public RowAssembler appendBitmask(BitSet bitSet) {
Column col = curCols.column(curCol);
BitmaskNativeType maskType = (BitmaskNativeType)col.type();
if (bitSet.length() > maskType.bits())
throw new IllegalArgumentException("Failed to set bitmask for column '" + + "' " +
"(mask size exceeds allocated size) [mask=" + bitSet + ", maxSize=" + maskType.bits() + "]");
byte[] arr = bitSet.toByteArray();
buf.putBytes(curOff, arr);
for (int i = 0; i < maskType.sizeInBytes() - arr.length; i++)
buf.put(curOff + arr.length + i, (byte)0);
if (isKeyChunk())
keyHash = 31 * keyHash + bitSet.hashCode();
return this;
* Appends LocalDate value for the current column to the chunk.
* @param val Column value.
* @return {@code this} for chaining.
public RowAssembler appendDate(LocalDate val) {
int date = TemporalTypesHelper.encodeDate(val);
writeDate(curOff, date);
if (isKeyChunk())
keyHash += 31 * keyHash + val.hashCode();
return this;
* Appends LocalTime value for the current column to the chunk.
* @param val Column value.
* @return {@code this} for chaining.
public RowAssembler appendTime(LocalTime val) {
TemporalNativeType type = (TemporalNativeType)curCols.column(curCol).type();
writeTime(buf, curOff, val, type);
if (isKeyChunk())
keyHash += 31 * keyHash + val.hashCode();
return this;
* Appends LocalDateTime value for the current column to the chunk.
* @param val Column value.
* @return {@code this} for chaining.
public RowAssembler appendDateTime(LocalDateTime val) {
TemporalNativeType type = (TemporalNativeType)curCols.column(curCol).type();
int date = TemporalTypesHelper.encodeDate(val.toLocalDate());
writeDate(curOff, date);
writeTime(buf, curOff + 3, val.toLocalTime(), type);
if (isKeyChunk())
keyHash += 31 * keyHash + val.hashCode();
return this;
* Appends Instant value for the current column to the chunk.
* @param val Column value.
* @return {@code this} for chaining.
public RowAssembler appendTimestamp(Instant val) {
TemporalNativeType type = (TemporalNativeType)curCols.column(curCol).type();
long seconds = val.getEpochSecond();
int nanos = TemporalTypesHelper.normalizeNanos(val.getNano(), type.precision());
buf.putLong(curOff, seconds);
if (type.precision() != 0) // Write only meaningful bytes.
buf.putInt(curOff + 8, nanos);
if (isKeyChunk()) {
keyHash += 31 * keyHash + Long.hashCode(seconds);
keyHash += 31 * keyHash + Integer.hashCode(nanos);
return this;
* @return Serialized row.
public BinaryRow build() {
return new ByteBufferRow(buf.unwrap());
* @return Row bytes.
public byte[] toBytes() {
return buf.toArray();
* Finish building row.
private void flush() {
if (schema.keyColumns() == curCols)
throw new AssemblyException("Key column missed: colIdx=" + curCol);
else {
if (curCol == 0) {
flags |= RowFlags.NO_VALUE_FLAG;
else if (schema.valueColumns().length() != curCol)
throw new AssemblyException("Value column missed: colIdx=" + curCol);
buf.putShort(BinaryRow.FLAGS_FIELD_OFFSET, flags);
buf.putInt(BinaryRow.KEY_HASH_FIELD_OFFSET, keyHash);
* @return UTF-8 string encoder.
private CharsetEncoder encoder() {
if (strEncoder == null)
strEncoder = StandardCharsets.UTF_8.newEncoder();
return strEncoder;
* Writes the given offset to the varlen table entry with the given index.
* @param entryIdx Vartable entry index.
* @param off Offset to write.
private void writeVarlenOffset(int entryIdx, int off) {
if (entryIdx == 0)
return; // Omit offset for very first varlen.
buf.putInt(varTblOff + Short.BYTES + (entryIdx - 1) * Integer.BYTES, off);
* Writes date.
* @param off Offset.
* @param date Compacted date.
private void writeDate(int off, int date) {
buf.putShort(off, (short)(date >>> 8));
buf.put(off + 2, (byte)(date & 0xFF));
* Writes time.
* @param buf
* @param off Offset.
* @param val Time.
* @param type Native type.
static void writeTime(ExpandableByteBuf buf, int off, LocalTime val, TemporalNativeType type) {
long time = TemporalTypesHelper.encodeTime(type, val);
if (type.precision() > 3) {
time = ((time >>> 32) << TemporalTypesHelper.NANOSECOND_PART_LEN) | (time & TemporalTypesHelper.NANOSECOND_PART_MASK);
buf.putInt(off, (int)(time >>> 16));
buf.putShort(off + 4, (short)(time & 0xFFFF_FFFFL));
else {
time = ((time >>> 32) << TemporalTypesHelper.MILLISECOND_PART_LEN) | (time & TemporalTypesHelper.MILLISECOND_PART_MASK);
buf.putInt(off, (int)time);
* Checks that the type being appended matches the column type.
* @param type Type spec that is attempted to be appended.
private void checkType(NativeTypeSpec type) {
Column col = curCols.column(curCol);
if (col.type().spec() != type)
throw new IllegalArgumentException("Failed to set column (int was passed, but column is of different " +
"type): " + col);
* Checks that the type being appended matches the column type.
* @param type Type that is attempted to be appended.
private void checkType(NativeType type) {
* Sets null flag in the null-map for the given column.
* @param colIdx Column index.
private void setNull(int colIdx) {
assert nullMapOff < varTblOff : "Null-map is omitted.";
int byteInMap = colIdx >> 3; // Equivalent expression for: colIidx / 8
int bitInByte = colIdx & 7; // Equivalent expression for: colIdx % 8
buf.ensureCapacity(nullMapOff + byteInMap + 1);
buf.put(nullMapOff + byteInMap, (byte)((Byte.toUnsignedInt(buf.get(nullMapOff + byteInMap))) | (1 << bitInByte)));
* Shifts current column indexes as necessary, also
* switch to value chunk writer when moving from key to value columns.
private void shiftColumn(int size) {
curOff += size;
if (curCol == curCols.length())
* Write chunk meta.
private void finishChunk() {
if (curVartblEntry > 1) {
assert varTblOff < dataOff : "Illegal writing of varlen when 'omit vartable' flag is set for a chunk.";
assert varTblOff + varTableChunkLength(curVartblEntry, Integer.BYTES) == dataOff : "Vartable overlow: size=" + curVartblEntry;
final VarTableFormat format = VarTableFormat.format(curOff - dataOff);
curOff -= format.compactVarTable(buf, varTblOff, curVartblEntry - 1);
flags |= format.formatFlags() << (isKeyChunk() ? KEY_FLAGS_OFFSET : VAL_FLAGS_OFFSET);
// Write sizes.
final int chunkLen = curOff - baseOff;
buf.putInt(baseOff, chunkLen);
if (schema.keyColumns() == curCols)
switchToValuChunk(BinaryRow.HEADER_SIZE + chunkLen);
* @param baseOff Chunk base offset.
private void switchToValuChunk(int baseOff) {
// Switch key->value columns.
curCols = schema.valueColumns();
curCol = 0;
// Create value chunk writer.
initChunk(baseOff, curCols.nullMapSize(), valVartblLen);
* Init chunk offsets and flags.
* @param baseOff Chunk base offset.
* @param nullMapLen Null-map length in bytes.
* @param vartblLen Vartable length in bytes.
private void initChunk(int baseOff, int nullMapLen, int vartblLen) {
this.baseOff = baseOff;
nullMapOff = baseOff + BinaryRow.CHUNK_LEN_FLD_SIZE;
varTblOff = nullMapOff + nullMapLen;
dataOff = varTblOff + vartblLen;
curOff = dataOff;
curVartblEntry = 0;
int flags = 0;
if (nullMapLen == 0)
flags |= VarTableFormat.OMIT_NULL_MAP_FLAG;
if (vartblLen == 0)
flags |= VarTableFormat.OMIT_VARTBL_FLAG;
this.flags |= flags << (isKeyChunk() ? KEY_FLAGS_OFFSET : VAL_FLAGS_OFFSET);
* @return {@code true} if current chunk is a key chunk, {@code false} otherwise.
private boolean isKeyChunk() {
return baseOff == BinaryRow.KEY_CHUNK_OFFSET;