blob: 19c1476254ca37eaf28c8916377c4b2e533fa581 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.dataformat;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.table.api.types.GenericType;
import org.apache.flink.table.dataformat.util.BinaryRowUtil;
import org.apache.flink.table.dataformat.util.MultiSegUtil;
import org.apache.flink.table.util.hash.Murmur32;
import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
import static org.apache.flink.table.dataformat.BinaryRow.calculateBitSetWidthInBytes;
import static org.apache.flink.table.dataformat.BinaryRow.getBinaryStringFromSeg;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Its memory storage structure and BinaryRow exactly the same, the only different is it supports
* all bytes in variable MemorySegments.
*/
public final class NestedRow implements BaseRow {
private final int arity;
private final int nullBitsSizeInBytes;
private MemorySegment[] segments;
private int baseOffset;
private int sizeInBytes;
public NestedRow(int arity) {
checkArgument(arity >= 0);
this.arity = arity;
this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity);
}
private int getFieldOffset(int pos) {
return baseOffset + nullBitsSizeInBytes + pos * 8;
}
private void assertIndexIsValid(int index) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < arity : "index (" + index + ") should < " + arity;
}
public int getBaseOffset() {
return baseOffset;
}
public int getSizeInBytes() {
return sizeInBytes;
}
public MemorySegment[] getSegments() {
return segments;
}
@Override
public int getArity() {
return arity;
}
@Override
public byte getHeader() {
return MultiSegUtil.getByte(segments, baseOffset);
}
@Override
public void setHeader(byte header) {
throw new UnsupportedOperationException();
}
public void pointTo(MemorySegment segment, int baseOffset, int sizeInBytes) {
pointTo(new MemorySegment[]{segment}, baseOffset, sizeInBytes);
}
public void pointTo(MemorySegment[] segments, int baseOffset, int sizeInBytes) {
this.segments = segments;
this.baseOffset = baseOffset;
this.sizeInBytes = sizeInBytes;
}
private void setNotNullAt(int i) {
assertIndexIsValid(i);
MultiSegUtil.bitUnSet(segments, baseOffset, i + 8);
}
/**
* See {@link BinaryRow#setNullAt(int)}.
*/
@Override
public void setNullAt(int i) {
assertIndexIsValid(i);
MultiSegUtil.bitSet(segments, baseOffset, i + 8);
MultiSegUtil.setLong(segments, getFieldOffset(i), 0);
}
@Override
public void setInt(int pos, int value) {
assertIndexIsValid(pos);
setNotNullAt(pos);
MultiSegUtil.setInt(segments, getFieldOffset(pos), value);
}
@Override
public void setLong(int pos, long value) {
assertIndexIsValid(pos);
setNotNullAt(pos);
MultiSegUtil.setLong(segments, getFieldOffset(pos), value);
}
@Override
public void setDouble(int pos, double value) {
assertIndexIsValid(pos);
setNotNullAt(pos);
MultiSegUtil.setDouble(segments, getFieldOffset(pos), value);
}
@Override
public void setChar(int pos, char value) {
assertIndexIsValid(pos);
setNotNullAt(pos);
MultiSegUtil.setChar(segments, getFieldOffset(pos), value);
}
@Override
public void setDecimal(int pos, Decimal value, int precision, int scale) {
assertIndexIsValid(pos);
if (Decimal.isCompact(precision)) {
// compact format
if (value == null) {
setNullAt(pos);
} else {
setLong(pos, value.toUnscaledLong());
}
} else {
int fieldOffset = getFieldOffset(pos);
int cursor = (int) (MultiSegUtil.getLong(segments, fieldOffset) >>> 32);
assert cursor > 0 : "invalid cursor " + cursor;
// zero-out the bytes
MultiSegUtil.setLong(segments, baseOffset + cursor, 0L);
MultiSegUtil.setLong(segments, baseOffset + cursor + 8, 0L);
if (value == null) {
setNullAt(pos);
// keep the offset for future update
MultiSegUtil.setLong(segments, fieldOffset, ((long) cursor) << 32);
} else {
byte[] bytes = value.toUnscaledBytes();
assert (bytes.length <= 16);
// Write the bytes to the variable length portion.
BinaryRowUtil.copyFromBytes(segments, baseOffset + cursor, bytes, 0, bytes.length);
setLong(pos, ((long) cursor << 32) | ((long) bytes.length));
}
}
}
@Override
public void setBoolean(int pos, boolean value) {
assertIndexIsValid(pos);
setNotNullAt(pos);
MultiSegUtil.setBoolean(segments, getFieldOffset(pos), value);
}
@Override
public void setShort(int pos, short value) {
assertIndexIsValid(pos);
setNotNullAt(pos);
MultiSegUtil.setShort(segments, getFieldOffset(pos), value);
}
@Override
public void setByte(int pos, byte value) {
assertIndexIsValid(pos);
setNotNullAt(pos);
MultiSegUtil.setByte(segments, getFieldOffset(pos), value);
}
@Override
public void setFloat(int pos, float value) {
assertIndexIsValid(pos);
setNotNullAt(pos);
MultiSegUtil.setFloat(segments, getFieldOffset(pos), value);
}
@Override
public boolean isNullAt(int pos) {
assertIndexIsValid(pos);
return MultiSegUtil.bitGet(segments, baseOffset, pos + 8);
}
@Override
public boolean getBoolean(int pos) {
assertIndexIsValid(pos);
return MultiSegUtil.getBoolean(segments, getFieldOffset(pos));
}
@Override
public byte getByte(int pos) {
assertIndexIsValid(pos);
return MultiSegUtil.getByte(segments, getFieldOffset(pos));
}
@Override
public short getShort(int pos) {
assertIndexIsValid(pos);
return MultiSegUtil.getShort(segments, getFieldOffset(pos));
}
@Override
public int getInt(int pos) {
assertIndexIsValid(pos);
return MultiSegUtil.getInt(segments, getFieldOffset(pos));
}
@Override
public long getLong(int pos) {
assertIndexIsValid(pos);
return MultiSegUtil.getLong(segments, getFieldOffset(pos));
}
@Override
public float getFloat(int pos) {
assertIndexIsValid(pos);
return MultiSegUtil.getFloat(segments, getFieldOffset(pos));
}
@Override
public double getDouble(int pos) {
assertIndexIsValid(pos);
return MultiSegUtil.getDouble(segments, getFieldOffset(pos));
}
@Override
public char getChar(int pos) {
assertIndexIsValid(pos);
return MultiSegUtil.getChar(segments, getFieldOffset(pos));
}
@Override
public BinaryString getBinaryString(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
final long offsetAndSize = MultiSegUtil.getLong(segments, fieldOffset);
BinaryString ret = new BinaryString();
getBinaryStringFromSeg(segments, baseOffset, fieldOffset, offsetAndSize, ret);
return ret;
}
@Override
public BinaryString getBinaryString(int pos, BinaryString reuse) {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
final long offsetAndSize = MultiSegUtil.getLong(segments, fieldOffset);
getBinaryStringFromSeg(segments, baseOffset, fieldOffset, offsetAndSize, reuse);
return reuse;
}
@Override
public Decimal getDecimal(int pos, int precision, int scale) {
assertIndexIsValid(pos);
if (Decimal.isCompact(precision)) {
long longVal = MultiSegUtil.getLong(segments, getFieldOffset(pos));
return Decimal.fromUnscaledLong(precision, scale, longVal);
}
int fieldOffset = getFieldOffset(pos);
final long offsetAndSize = MultiSegUtil.getLong(segments, fieldOffset);
final int size = ((int) offsetAndSize);
int offset = (int) (offsetAndSize >> 32);
byte[] bytes = BinaryRowUtil.copy(segments, baseOffset + offset, size);
return Decimal.fromUnscaledBytes(precision, scale, bytes);
}
@Override
public <T> T getGeneric(int pos, TypeSerializer<T> serializer) {
final long offsetAndSize = getLong(pos);
final int size = ((int) offsetAndSize);
int offset = (int) (offsetAndSize >> 32);
try {
return InstantiationUtil.deserializeFromByteArray(
serializer, BinaryRowUtil.copy(segments, baseOffset + offset, size));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public <T> T getGeneric(int pos, GenericType<T> type) {
return getGeneric(pos, type.getSerializer());
}
@Override
public BaseRow getBaseRow(int ordinal, int numFields) {
final long offsetAndSize = getLong(ordinal);
return BinaryRow.getBaseRow(segments, baseOffset, offsetAndSize, numFields);
}
@Override
public BaseArray getBaseArray(int ordinal) {
final long offsetAndSize = getLong(ordinal);
return BinaryRow.getBinaryArray(segments, baseOffset, offsetAndSize);
}
@Override
public BaseMap getBaseMap(int ordinal) {
final long offsetAndSize = getLong(ordinal);
return BinaryRow.getBinaryMap(segments, baseOffset, offsetAndSize);
}
@Override
public byte[] getByteArray(int ordinal) {
int fieldOffset = getFieldOffset(ordinal);
final long offsetAndSize = MultiSegUtil.getLong(segments, fieldOffset);
return BinaryRow.getByteArray(segments, baseOffset, fieldOffset, offsetAndSize);
}
public NestedRow copy() {
return copy(new NestedRow(arity));
}
public NestedRow copy(BaseRow reuse) {
return copyInternal((NestedRow) reuse);
}
private NestedRow copyInternal(NestedRow reuse) {
byte[] bytes = BinaryRowUtil.copy(segments, baseOffset, sizeInBytes);
reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
return reuse;
}
@Override
public boolean equals(Object o) {
return equalsFrom(o, 0);
}
@Override
public boolean equalsWithoutHeader(BaseRow o) {
throw new UnsupportedOperationException();
}
private boolean equalsFrom(Object o, int startIndex) {
if (o != null && o instanceof NestedRow) {
NestedRow other = (NestedRow) o;
return sizeInBytes == other.sizeInBytes && BinaryRowUtil.equals(
segments,
baseOffset + startIndex,
other.segments,
other.baseOffset + startIndex,
sizeInBytes - startIndex);
} else {
return false;
}
}
@Override
public int hashCode() {
if (segments.length == 1) {
return Murmur32.hashBytesByWords(segments[0], baseOffset, sizeInBytes, 42);
} else {
return hashSlow();
}
}
private int hashSlow() {
byte[] bytes = new byte[sizeInBytes];
BinaryRowUtil.copySlow(segments, baseOffset, bytes, sizeInBytes);
return Murmur32.hashBytesByWords(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes, 42);
}
}