blob: 2523c51fad1f274ac11d4655baefaaaac12bd647 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.data;
import com.google.common.primitives.Ints;
import org.apache.druid.common.utils.ByteUtils;
import org.apache.druid.io.Channels;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
/**
*/
public class VSizeColumnarInts implements ColumnarInts, Comparable<VSizeColumnarInts>, WritableSupplier<ColumnarInts>
{
public static final byte VERSION = 0x0;
private static final MetaSerdeHelper<VSizeColumnarInts> META_SERDE_HELPER = MetaSerdeHelper
.firstWriteByte((VSizeColumnarInts x) -> VERSION)
.writeByte(x -> ByteUtils.checkedCast(x.numBytes))
.writeInt(x -> x.buffer.remaining());
public static VSizeColumnarInts fromArray(int[] array)
{
return fromArray(array, Ints.max(array));
}
public static VSizeColumnarInts fromArray(int[] array, int maxValue)
{
return fromIndexedInts(new ArrayBasedIndexedInts(array), maxValue);
}
public static VSizeColumnarInts fromIndexedInts(IndexedInts ints, int maxValue)
{
int numBytes = getNumBytesForMax(maxValue);
final ByteBuffer buffer = ByteBuffer.allocate((ints.size() * numBytes) + (4 - numBytes));
writeToBuffer(buffer, ints, numBytes, maxValue);
return new VSizeColumnarInts(buffer.asReadOnlyBuffer(), numBytes);
}
private static void writeToBuffer(ByteBuffer buffer, IndexedInts ints, int numBytes, int maxValue)
{
ByteBuffer helperBuffer = ByteBuffer.allocate(Integer.BYTES);
for (int i = 0, size = ints.size(); i < size; i++) {
int val = ints.get(i);
if (val < 0) {
throw new IAE("integer values must be positive, got[%d], i[%d]", val, i);
}
if (val > maxValue) {
throw new IAE("val[%d] > maxValue[%d], please don't lie about maxValue. i[%d]", val, maxValue, i);
}
helperBuffer.putInt(0, val);
buffer.put(helperBuffer.array(), Integer.BYTES - numBytes, numBytes);
}
buffer.position(0);
}
public static byte getNumBytesForMax(int maxValue)
{
if (maxValue < 0) {
throw new IAE("maxValue[%s] must be positive", maxValue);
}
if (maxValue <= 0xFF) {
return 1;
} else if (maxValue <= 0xFFFF) {
return 2;
} else if (maxValue <= 0xFFFFFF) {
return 3;
}
return 4;
}
private final ByteBuffer buffer;
private final int numBytes;
private final int bitsToShift;
private final int size;
public VSizeColumnarInts(ByteBuffer buffer, int numBytes)
{
this.buffer = buffer;
this.numBytes = numBytes;
bitsToShift = 32 - (numBytes << 3); // numBytes * 8
int numBufferBytes = 4 - numBytes;
size = (buffer.remaining() - numBufferBytes) / numBytes;
}
@Override
public int size()
{
return size;
}
@Override
public int get(int index)
{
return buffer.getInt(buffer.position() + (index * numBytes)) >>> bitsToShift;
}
public int getNumBytesNoPadding()
{
return buffer.remaining() - (Integer.BYTES - numBytes);
}
public void writeBytesNoPaddingTo(HeapByteBufferWriteOutBytes out)
{
ByteBuffer toWrite = buffer.slice();
toWrite.limit(toWrite.limit() - (Integer.BYTES - numBytes));
out.write(toWrite);
}
@Override
public int compareTo(VSizeColumnarInts o)
{
int retVal = Ints.compare(numBytes, o.numBytes);
if (retVal == 0) {
retVal = buffer.compareTo(o.buffer);
}
return retVal;
}
public int getNumBytes()
{
return numBytes;
}
@Override
public long getSerializedSize()
{
return META_SERDE_HELPER.size(this) + buffer.remaining();
}
@Override
public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
META_SERDE_HELPER.writeTo(channel, this);
Channels.writeFully(channel, buffer.asReadOnlyBuffer());
}
@Override
public ColumnarInts get()
{
return this;
}
public static VSizeColumnarInts readFromByteBuffer(ByteBuffer buffer)
{
byte versionFromBuffer = buffer.get();
if (VERSION == versionFromBuffer) {
int numBytes = buffer.get();
int size = buffer.getInt();
ByteBuffer bufferToUse = buffer.asReadOnlyBuffer();
bufferToUse.limit(bufferToUse.position() + size);
buffer.position(bufferToUse.limit());
return new VSizeColumnarInts(
bufferToUse,
numBytes
);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
@Override
public void close()
{
// Do nothing
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("buffer", buffer);
}
}