| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.segment.data; |
| |
| import com.google.common.primitives.Ints; |
| import org.apache.druid.collections.ResourceHolder; |
| import org.apache.druid.common.config.NullHandling; |
| import org.apache.druid.common.utils.SerializerUtils; |
| import org.apache.druid.io.Channels; |
| import org.apache.druid.java.util.common.IAE; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.guava.CloseQuietly; |
| import org.apache.druid.java.util.common.guava.Comparators; |
| import org.apache.druid.java.util.common.io.Closer; |
| import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; |
| import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; |
| import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; |
| import org.apache.druid.segment.serde.MetaSerdeHelper; |
| import org.apache.druid.segment.serde.Serializer; |
| import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes; |
| |
| import javax.annotation.Nullable; |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.ByteOrder; |
| import java.nio.channels.WritableByteChannel; |
| import java.util.Arrays; |
| import java.util.Iterator; |
| |
| /** |
| * A generic, flat storage mechanism. Use static methods fromArray() or fromIterable() to construct. If input |
| * is sorted, supports binary search index lookups. If input is not sorted, only supports array-like index lookups. |
| * <p> |
| * V1 Storage Format: |
| * <p> |
| * byte 1: version (0x1) |
| * byte 2 == 0x1 =>; allowReverseLookup |
| * bytes 3-6 =>; numBytesUsed |
| * bytes 7-10 =>; numElements |
| * bytes 10-((numElements * 4) + 10): integers representing *end* offsets of byte serialized values |
| * bytes ((numElements * 4) + 10)-(numBytesUsed + 2): 4-byte integer representing length of value, followed by bytes |
| * for value. Length of value stored has no meaning, if next offset is strictly greater than the current offset, |
| * and if they are the same, -1 at this field means null, and 0 at this field means some object |
| * (potentially non-null - e. g. in the string case, that is serialized as an empty sequence of bytes). |
| * <p> |
| * V2 Storage Format |
| * Meta, header and value files are separate and header file stored in native endian byte order. |
| * Meta File: |
| * byte 1: version (0x2) |
| * byte 2 == 0x1 =>; allowReverseLookup |
| * bytes 3-6: numberOfElementsPerValueFile expressed as power of 2. That means all the value files contains same |
| * number of items except last value file and may have fewer elements. |
| * bytes 7-10 =>; numElements |
| * bytes 11-14 =>; columnNameLength |
| * bytes 15-columnNameLength =>; columnName |
| * <p> |
| * Header file name is identified as: StringUtils.format("%s_header", columnName) |
| * value files are identified as: StringUtils.format("%s_value_%d", columnName, fileNumber) |
| * number of value files == numElements/numberOfElementsPerValueFile |
| */ |
| public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer |
| { |
| static final byte VERSION_ONE = 0x1; |
| static final byte VERSION_TWO = 0x2; |
| static final byte REVERSE_LOOKUP_ALLOWED = 0x1; |
| static final byte REVERSE_LOOKUP_DISALLOWED = 0x0; |
| |
| static final int NULL_VALUE_SIZE_MARKER = -1; |
| |
| private static final MetaSerdeHelper<GenericIndexed> META_SERDE_HELPER = MetaSerdeHelper |
| .firstWriteByte((GenericIndexed x) -> VERSION_ONE) |
| .writeByte(x -> x.allowReverseLookup ? REVERSE_LOOKUP_ALLOWED : REVERSE_LOOKUP_DISALLOWED) |
| .writeInt(x -> Ints.checkedCast(x.theBuffer.remaining() + (long) Integer.BYTES)) |
| .writeInt(x -> x.size); |
| |
| private static final SerializerUtils SERIALIZER_UTILS = new SerializerUtils(); |
| |
| /** |
| * An ObjectStrategy that returns a big-endian ByteBuffer pointing to the original data. |
| * |
| * The returned ByteBuffer is a fresh read-only instance, so it is OK for callers to modify its position, limit, etc. |
| * However, it does point to the original data, so callers must take care not to use it if the original data may |
| * have been freed. |
| */ |
| public static final ObjectStrategy<ByteBuffer> BYTE_BUFFER_STRATEGY = new ObjectStrategy<ByteBuffer>() |
| { |
| @Override |
| public Class<ByteBuffer> getClazz() |
| { |
| return ByteBuffer.class; |
| } |
| |
| @Override |
| public ByteBuffer fromByteBuffer(final ByteBuffer buffer, final int numBytes) |
| { |
| final ByteBuffer dup = buffer.asReadOnlyBuffer(); |
| dup.limit(buffer.position() + numBytes); |
| return dup; |
| } |
| |
| @Override |
| @Nullable |
| public byte[] toBytes(@Nullable ByteBuffer buf) |
| { |
| if (buf == null) { |
| return null; |
| } |
| |
| // This method doesn't have javadocs and I'm not sure if it is OK to modify the "val" argument. Copy defensively. |
| final ByteBuffer dup = buf.duplicate(); |
| final byte[] bytes = new byte[dup.remaining()]; |
| dup.get(bytes); |
| return bytes; |
| } |
| |
| @Override |
| public int compare(ByteBuffer o1, ByteBuffer o2) |
| { |
| return o1.compareTo(o2); |
| } |
| }; |
| |
| public static final ObjectStrategy<String> STRING_STRATEGY = new ObjectStrategy<String>() |
| { |
| @Override |
| public Class<String> getClazz() |
| { |
| return String.class; |
| } |
| |
| @Override |
| public String fromByteBuffer(final ByteBuffer buffer, final int numBytes) |
| { |
| return StringUtils.fromUtf8(buffer, numBytes); |
| } |
| |
| @Override |
| @Nullable |
| public byte[] toBytes(@Nullable String val) |
| { |
| return StringUtils.toUtf8Nullable(NullHandling.nullToEmptyIfNeeded(val)); |
| } |
| |
| @Override |
| public int compare(String o1, String o2) |
| { |
| return Comparators.<String>naturalNullsFirst().compare(o1, o2); |
| } |
| }; |
| |
| public static <T> GenericIndexed<T> read(ByteBuffer buffer, ObjectStrategy<T> strategy) |
| { |
| byte versionFromBuffer = buffer.get(); |
| |
| if (VERSION_ONE == versionFromBuffer) { |
| return createGenericIndexedVersionOne(buffer, strategy); |
| } else if (VERSION_TWO == versionFromBuffer) { |
| throw new IAE( |
| "use read(ByteBuffer buffer, ObjectStrategy<T> strategy, SmooshedFileMapper fileMapper)" |
| + " to read version 2 indexed." |
| ); |
| } |
| throw new IAE("Unknown version[%d]", (int) versionFromBuffer); |
| } |
| |
| public static <T> GenericIndexed<T> read(ByteBuffer buffer, ObjectStrategy<T> strategy, SmooshedFileMapper fileMapper) |
| { |
| byte versionFromBuffer = buffer.get(); |
| |
| if (VERSION_ONE == versionFromBuffer) { |
| return createGenericIndexedVersionOne(buffer, strategy); |
| } else if (VERSION_TWO == versionFromBuffer) { |
| return createGenericIndexedVersionTwo(buffer, strategy, fileMapper); |
| } |
| |
| throw new IAE("Unknown version [%s]", versionFromBuffer); |
| } |
| |
| public static <T> GenericIndexed<T> fromArray(T[] objects, ObjectStrategy<T> strategy) |
| { |
| return fromIterable(Arrays.asList(objects), strategy); |
| } |
| |
| static GenericIndexed<ResourceHolder<ByteBuffer>> ofCompressedByteBuffers( |
| Iterable<ByteBuffer> buffers, |
| CompressionStrategy compression, |
| int bufferSize, |
| ByteOrder order, |
| Closer closer |
| ) |
| { |
| return fromIterableVersionOne( |
| buffers, |
| GenericIndexedWriter.compressedByteBuffersWriteObjectStrategy(compression, bufferSize, closer), |
| false, |
| new DecompressingByteBufferObjectStrategy(order, compression) |
| ); |
| } |
| |
| public static <T> GenericIndexed<T> fromIterable(Iterable<T> objectsIterable, ObjectStrategy<T> strategy) |
| { |
| return fromIterableVersionOne(objectsIterable, strategy, true, strategy); |
| } |
| |
| static int getNumberOfFilesRequired(int bagSize, long numWritten) |
| { |
| int numberOfFilesRequired = (int) (numWritten / bagSize); |
| if ((numWritten % bagSize) != 0) { |
| numberOfFilesRequired += 1; |
| } |
| return numberOfFilesRequired; |
| } |
| |
| |
| private final boolean versionOne; |
| |
| private final ObjectStrategy<T> strategy; |
| private final boolean allowReverseLookup; |
| private final int size; |
| private final ByteBuffer headerBuffer; |
| |
| private final ByteBuffer firstValueBuffer; |
| |
| private final ByteBuffer[] valueBuffers; |
| private int logBaseTwoOfElementsPerValueFile; |
| private int relativeIndexMask; |
| |
| @Nullable |
| private final ByteBuffer theBuffer; |
| |
| /** |
| * Constructor for version one. |
| */ |
| GenericIndexed( |
| ByteBuffer buffer, |
| ObjectStrategy<T> strategy, |
| boolean allowReverseLookup |
| ) |
| { |
| this.versionOne = true; |
| |
| this.theBuffer = buffer; |
| this.strategy = strategy; |
| this.allowReverseLookup = allowReverseLookup; |
| size = theBuffer.getInt(); |
| |
| int indexOffset = theBuffer.position(); |
| int valuesOffset = theBuffer.position() + size * Integer.BYTES; |
| |
| buffer.position(valuesOffset); |
| // Ensure the value buffer's limit equals to capacity. |
| firstValueBuffer = buffer.slice(); |
| valueBuffers = new ByteBuffer[]{firstValueBuffer}; |
| buffer.position(indexOffset); |
| headerBuffer = buffer.slice(); |
| } |
| |
| |
| /** |
| * Constructor for version two. |
| */ |
| GenericIndexed( |
| ByteBuffer[] valueBuffs, |
| ByteBuffer headerBuff, |
| ObjectStrategy<T> strategy, |
| boolean allowReverseLookup, |
| int logBaseTwoOfElementsPerValueFile, |
| int numWritten |
| ) |
| { |
| this.versionOne = false; |
| |
| this.theBuffer = null; |
| this.strategy = strategy; |
| this.allowReverseLookup = allowReverseLookup; |
| this.valueBuffers = valueBuffs; |
| this.firstValueBuffer = valueBuffers[0]; |
| this.headerBuffer = headerBuff; |
| this.size = numWritten; |
| this.logBaseTwoOfElementsPerValueFile = logBaseTwoOfElementsPerValueFile; |
| this.relativeIndexMask = (1 << logBaseTwoOfElementsPerValueFile) - 1; |
| headerBuffer.order(ByteOrder.nativeOrder()); |
| } |
| |
| /** |
| * Checks if {@code index} a valid `element index` in GenericIndexed. |
| * Similar to Preconditions.checkElementIndex() except this method throws {@link IAE} with custom error message. |
| * <p> |
| * Used here to get existing behavior(same error message and exception) of V1 GenericIndexed. |
| * |
| * @param index index identifying an element of an GenericIndexed. |
| */ |
| private void checkIndex(int index) |
| { |
| if (index < 0) { |
| throw new IAE("Index[%s] < 0", index); |
| } |
| if (index >= size) { |
| throw new IAE("Index[%d] >= size[%d]", index, size); |
| } |
| } |
| |
| public Class<? extends T> getClazz() |
| { |
| return strategy.getClazz(); |
| } |
| |
| @Override |
| public int size() |
| { |
| return size; |
| } |
| |
| @Override |
| public T get(int index) |
| { |
| return versionOne ? getVersionOne(index) : getVersionTwo(index); |
| } |
| |
| /** |
| * Returns the index of "value" in this GenericIndexed object, or (-(insertion point) - 1) if the value is not |
| * present, in the manner of Arrays.binarySearch. This strengthens the contract of Indexed, which only guarantees |
| * that values-not-found will return some negative number. |
| * |
| * @param value value to search for |
| * |
| * @return index of value, or negative number equal to (-(insertion point) - 1). |
| */ |
| @Override |
| public int indexOf(@Nullable T value) |
| { |
| return indexOf(this, value); |
| } |
| |
| private int indexOf(Indexed<T> indexed, @Nullable T value) |
| { |
| if (!allowReverseLookup) { |
| throw new UnsupportedOperationException("Reverse lookup not allowed."); |
| } |
| |
| int minIndex = 0; |
| int maxIndex = size - 1; |
| while (minIndex <= maxIndex) { |
| int currIndex = (minIndex + maxIndex) >>> 1; |
| |
| T currValue = indexed.get(currIndex); |
| int comparison = strategy.compare(currValue, value); |
| if (comparison == 0) { |
| return currIndex; |
| } |
| |
| if (comparison < 0) { |
| minIndex = currIndex + 1; |
| } else { |
| maxIndex = currIndex - 1; |
| } |
| } |
| |
| return -(minIndex + 1); |
| } |
| |
| @Override |
| public Iterator<T> iterator() |
| { |
| return IndexedIterable.create(this).iterator(); |
| } |
| |
| @Override |
| public long getSerializedSize() |
| { |
| if (!versionOne) { |
| throw new UnsupportedOperationException("Method not supported for version 2 GenericIndexed."); |
| } |
| return getSerializedSizeVersionOne(); |
| } |
| |
| @Override |
| public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException |
| { |
| if (versionOne) { |
| writeToVersionOne(channel); |
| } else { |
| throw new UnsupportedOperationException( |
| "GenericIndexed serialization for V2 is unsupported. Use GenericIndexedWriter instead."); |
| } |
| } |
| |
| /** |
| * Create a non-thread-safe Indexed, which may perform better than the underlying Indexed. |
| * |
| * @return a non-thread-safe Indexed |
| */ |
| public GenericIndexed<T>.BufferIndexed singleThreaded() |
| { |
| return versionOne ? singleThreadedVersionOne() : singleThreadedVersionTwo(); |
| } |
| |
| @Nullable |
| private T copyBufferAndGet(ByteBuffer valueBuffer, int startOffset, int endOffset) |
| { |
| ByteBuffer copyValueBuffer = valueBuffer.asReadOnlyBuffer(); |
| int size = endOffset - startOffset; |
| // When size is 0 and SQL compatibility is enabled also check for null marker before returning null. |
| // When SQL compatibility is not enabled return null for both null as well as empty string case. |
| if (size == 0 && (NullHandling.replaceWithDefault() |
| || copyValueBuffer.get(startOffset - Integer.BYTES) == NULL_VALUE_SIZE_MARKER)) { |
| return null; |
| } |
| copyValueBuffer.position(startOffset); |
| // fromByteBuffer must not modify the buffer limit |
| return strategy.fromByteBuffer(copyValueBuffer, size); |
| } |
| |
| @Override |
| public void inspectRuntimeShape(RuntimeShapeInspector inspector) |
| { |
| inspector.visit("versionOne", versionOne); |
| inspector.visit("headerBuffer", headerBuffer); |
| if (versionOne) { |
| inspector.visit("firstValueBuffer", firstValueBuffer); |
| } else { |
| // Inspecting just one example of valueBuffer, not needed to inspect the whole array, because all buffers in it |
| // are the same. |
| inspector.visit("valueBuffer", valueBuffers.length > 0 ? valueBuffers[0] : null); |
| } |
| inspector.visit("strategy", strategy); |
| } |
| |
| @Override |
| public String toString() |
| { |
| StringBuilder sb = new StringBuilder("GenericIndexed["); |
| if (size() > 0) { |
| for (int i = 0; i < size(); i++) { |
| T value = get(i); |
| sb.append(value).append(',').append(' '); |
| } |
| sb.setLength(sb.length() - 2); |
| } |
| sb.append(']'); |
| return sb.toString(); |
| } |
| |
| abstract class BufferIndexed implements Indexed<T> |
| { |
| int lastReadSize; |
| |
| @Override |
| public int size() |
| { |
| return size; |
| } |
| |
| @Nullable |
| T bufferedIndexedGet(ByteBuffer copyValueBuffer, int startOffset, int endOffset) |
| { |
| int size = endOffset - startOffset; |
| // When size is 0 and SQL compatibility is enabled also check for null marker before returning null. |
| // When SQL compatibility is not enabled return null for both null as well as empty string case. |
| if (size == 0 && (NullHandling.replaceWithDefault() |
| || copyValueBuffer.get(startOffset - Integer.BYTES) == NULL_VALUE_SIZE_MARKER)) { |
| return null; |
| } |
| lastReadSize = size; |
| |
| // ObjectStrategy.fromByteBuffer() is allowed to reset the limit of the buffer. So if the limit is changed, |
| // position() call in the next line could throw an exception, if the position is set beyond the new limit. clear() |
| // sets the limit to the maximum possible, the capacity. It is safe to reset the limit to capacity, because the |
| // value buffer(s) initial limit equals to capacity. |
| copyValueBuffer.clear(); |
| copyValueBuffer.position(startOffset); |
| return strategy.fromByteBuffer(copyValueBuffer, size); |
| } |
| |
| /** |
| * This method makes no guarantees with respect to thread safety |
| * |
| * @return the size in bytes of the last value read |
| */ |
| int getLastValueSize() |
| { |
| return lastReadSize; |
| } |
| |
| @Override |
| public int indexOf(@Nullable T value) |
| { |
| return GenericIndexed.this.indexOf(this, value); |
| } |
| |
| @Override |
| public Iterator<T> iterator() |
| { |
| return GenericIndexed.this.iterator(); |
| } |
| } |
| |
| @Override |
| public void close() |
| { |
| // nothing to close |
| } |
| |
| /////////////// |
| // VERSION ONE |
| /////////////// |
| |
| private static <T> GenericIndexed<T> createGenericIndexedVersionOne(ByteBuffer byteBuffer, ObjectStrategy<T> strategy) |
| { |
| boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED; |
| int size = byteBuffer.getInt(); |
| ByteBuffer bufferToUse = byteBuffer.asReadOnlyBuffer(); |
| bufferToUse.limit(bufferToUse.position() + size); |
| byteBuffer.position(bufferToUse.limit()); |
| |
| return new GenericIndexed<>( |
| bufferToUse, |
| strategy, |
| allowReverseLookup |
| ); |
| } |
| |
| private static <T, U> GenericIndexed<U> fromIterableVersionOne( |
| Iterable<T> objectsIterable, |
| ObjectStrategy<T> strategy, |
| boolean allowReverseLookup, |
| ObjectStrategy<U> resultObjectStrategy |
| ) |
| { |
| Iterator<T> objects = objectsIterable.iterator(); |
| if (!objects.hasNext()) { |
| final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES).putInt(0); |
| buffer.flip(); |
| return new GenericIndexed<>(buffer, resultObjectStrategy, allowReverseLookup); |
| } |
| |
| int count = 0; |
| |
| HeapByteBufferWriteOutBytes headerOut = new HeapByteBufferWriteOutBytes(); |
| HeapByteBufferWriteOutBytes valuesOut = new HeapByteBufferWriteOutBytes(); |
| try { |
| T prevVal = null; |
| do { |
| count++; |
| T next = objects.next(); |
| if (allowReverseLookup && prevVal != null && !(strategy.compare(prevVal, next) < 0)) { |
| allowReverseLookup = false; |
| } |
| |
| if (next != null) { |
| valuesOut.writeInt(0); |
| strategy.writeTo(next, valuesOut); |
| } else { |
| valuesOut.writeInt(NULL_VALUE_SIZE_MARKER); |
| } |
| |
| headerOut.writeInt(Ints.checkedCast(valuesOut.size())); |
| |
| if (prevVal instanceof Closeable) { |
| CloseQuietly.close((Closeable) prevVal); |
| } |
| prevVal = next; |
| } while (objects.hasNext()); |
| |
| if (prevVal instanceof Closeable) { |
| CloseQuietly.close((Closeable) prevVal); |
| } |
| } |
| catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| |
| ByteBuffer theBuffer = ByteBuffer.allocate(Ints.checkedCast(Integer.BYTES + headerOut.size() + valuesOut.size())); |
| theBuffer.putInt(count); |
| headerOut.writeTo(theBuffer); |
| valuesOut.writeTo(theBuffer); |
| theBuffer.flip(); |
| |
| return new GenericIndexed<>(theBuffer.asReadOnlyBuffer(), resultObjectStrategy, allowReverseLookup); |
| } |
| |
| private long getSerializedSizeVersionOne() |
| { |
| return META_SERDE_HELPER.size(this) + (long) theBuffer.remaining(); |
| } |
| |
| @Nullable |
| private T getVersionOne(int index) |
| { |
| checkIndex(index); |
| |
| final int startOffset; |
| final int endOffset; |
| |
| if (index == 0) { |
| startOffset = Integer.BYTES; |
| endOffset = headerBuffer.getInt(0); |
| } else { |
| int headerPosition = (index - 1) * Integer.BYTES; |
| startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES; |
| endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES); |
| } |
| return copyBufferAndGet(firstValueBuffer, startOffset, endOffset); |
| } |
| |
| private BufferIndexed singleThreadedVersionOne() |
| { |
| final ByteBuffer copyBuffer = firstValueBuffer.asReadOnlyBuffer(); |
| return new BufferIndexed() |
| { |
| @Override |
| public T get(final int index) |
| { |
| checkIndex(index); |
| |
| final int startOffset; |
| final int endOffset; |
| |
| if (index == 0) { |
| startOffset = Integer.BYTES; |
| endOffset = headerBuffer.getInt(0); |
| } else { |
| int headerPosition = (index - 1) * Integer.BYTES; |
| startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES; |
| endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES); |
| } |
| return bufferedIndexedGet(copyBuffer, startOffset, endOffset); |
| } |
| |
| @Override |
| public void inspectRuntimeShape(RuntimeShapeInspector inspector) |
| { |
| inspector.visit("headerBuffer", headerBuffer); |
| inspector.visit("copyBuffer", copyBuffer); |
| inspector.visit("strategy", strategy); |
| } |
| }; |
| } |
| |
| private void writeToVersionOne(WritableByteChannel channel) throws IOException |
| { |
| META_SERDE_HELPER.writeTo(channel, this); |
| Channels.writeFully(channel, theBuffer.asReadOnlyBuffer()); |
| } |
| |
| |
| /////////////// |
| // VERSION TWO |
| /////////////// |
| |
| private static <T> GenericIndexed<T> createGenericIndexedVersionTwo( |
| ByteBuffer byteBuffer, |
| ObjectStrategy<T> strategy, |
| SmooshedFileMapper fileMapper |
| ) |
| { |
| if (fileMapper == null) { |
| throw new IAE("SmooshedFileMapper can not be null for version 2."); |
| } |
| boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED; |
| int logBaseTwoOfElementsPerValueFile = byteBuffer.getInt(); |
| int numElements = byteBuffer.getInt(); |
| |
| try { |
| String columnName = SERIALIZER_UTILS.readString(byteBuffer); |
| int elementsPerValueFile = 1 << logBaseTwoOfElementsPerValueFile; |
| int numberOfFilesRequired = getNumberOfFilesRequired(elementsPerValueFile, numElements); |
| ByteBuffer[] valueBuffersToUse = new ByteBuffer[numberOfFilesRequired]; |
| for (int i = 0; i < numberOfFilesRequired; i++) { |
| // SmooshedFileMapper.mapFile() contract guarantees that the valueBuffer's limit equals to capacity. |
| ByteBuffer valueBuffer = fileMapper.mapFile(GenericIndexedWriter.generateValueFileName(columnName, i)); |
| valueBuffersToUse[i] = valueBuffer.asReadOnlyBuffer(); |
| } |
| ByteBuffer headerBuffer = fileMapper.mapFile(GenericIndexedWriter.generateHeaderFileName(columnName)); |
| return new GenericIndexed<>( |
| valueBuffersToUse, |
| headerBuffer, |
| strategy, |
| allowReverseLookup, |
| logBaseTwoOfElementsPerValueFile, |
| numElements |
| ); |
| } |
| catch (IOException e) { |
| throw new RuntimeException("File mapping failed.", e); |
| } |
| } |
| |
| @Nullable |
| private T getVersionTwo(int index) |
| { |
| checkIndex(index); |
| |
| final int startOffset; |
| final int endOffset; |
| |
| int relativePositionOfIndex = index & relativeIndexMask; |
| if (relativePositionOfIndex == 0) { |
| int headerPosition = index * Integer.BYTES; |
| startOffset = Integer.BYTES; |
| endOffset = headerBuffer.getInt(headerPosition); |
| } else { |
| int headerPosition = (index - 1) * Integer.BYTES; |
| startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES; |
| endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES); |
| } |
| int fileNum = index >> logBaseTwoOfElementsPerValueFile; |
| return copyBufferAndGet(valueBuffers[fileNum], startOffset, endOffset); |
| } |
| |
| private BufferIndexed singleThreadedVersionTwo() |
| { |
| final ByteBuffer[] copyValueBuffers = new ByteBuffer[valueBuffers.length]; |
| for (int i = 0; i < valueBuffers.length; i++) { |
| copyValueBuffers[i] = valueBuffers[i].asReadOnlyBuffer(); |
| } |
| |
| return new BufferIndexed() |
| { |
| @Override |
| public T get(final int index) |
| { |
| checkIndex(index); |
| |
| final int startOffset; |
| final int endOffset; |
| |
| int relativePositionOfIndex = index & relativeIndexMask; |
| if (relativePositionOfIndex == 0) { |
| int headerPosition = index * Integer.BYTES; |
| startOffset = 4; |
| endOffset = headerBuffer.getInt(headerPosition); |
| } else { |
| int headerPosition = (index - 1) * Integer.BYTES; |
| startOffset = headerBuffer.getInt(headerPosition) + Integer.BYTES; |
| endOffset = headerBuffer.getInt(headerPosition + Integer.BYTES); |
| } |
| int fileNum = index >> logBaseTwoOfElementsPerValueFile; |
| return bufferedIndexedGet(copyValueBuffers[fileNum], startOffset, endOffset); |
| } |
| |
| @Override |
| public void inspectRuntimeShape(RuntimeShapeInspector inspector) |
| { |
| inspector.visit("headerBuffer", headerBuffer); |
| // Inspecting just one example of copyValueBuffer, not needed to inspect the whole array, because all buffers |
| // in it are the same. |
| inspector.visit("copyValueBuffer", copyValueBuffers.length > 0 ? copyValueBuffers[0] : null); |
| inspector.visit("strategy", strategy); |
| } |
| }; |
| } |
| } |