blob: 363c92533c3d729421120da66bd2b07aa820b7f7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.vector.complex;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.arrow.memory.BaseAllocator;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.AddOrGetResult;
import org.apache.arrow.vector.BaseFixedWidthVector;
import org.apache.arrow.vector.BaseValueVector;
import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.DensityAwareVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.OversizedAllocationException;
import org.apache.arrow.vector.util.SchemaChangeRuntimeException;
import io.netty.buffer.ArrowBuf;
/** Base class for Vectors that contain repeated values. */
public abstract class BaseRepeatedValueVector extends BaseValueVector implements RepeatedValueVector, BaseListVector {
public static final FieldVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE;
public static final String DATA_VECTOR_NAME = "$data$";
public static final byte OFFSET_WIDTH = 4;
protected ArrowBuf offsetBuffer;
protected FieldVector vector;
protected final CallBack callBack;
protected int valueCount;
protected int offsetAllocationSizeInBytes = INITIAL_VALUE_ALLOCATION * OFFSET_WIDTH;
private final String name;
protected String defaultDataVectorName = DATA_VECTOR_NAME;
protected BaseRepeatedValueVector(String name, BufferAllocator allocator, CallBack callBack) {
this(name, allocator, DEFAULT_DATA_VECTOR, callBack);
}
protected BaseRepeatedValueVector(String name, BufferAllocator allocator, FieldVector vector, CallBack callBack) {
super(allocator);
this.name = name;
this.offsetBuffer = allocator.getEmpty();
this.vector = Preconditions.checkNotNull(vector, "data vector cannot be null");
this.callBack = callBack;
this.valueCount = 0;
}
@Override
public String getName() {
return name;
}
@Override
public boolean allocateNewSafe() {
boolean dataAlloc = false;
try {
allocateOffsetBuffer(offsetAllocationSizeInBytes);
dataAlloc = vector.allocateNewSafe();
} catch (Exception e) {
e.printStackTrace();
clear();
return false;
} finally {
if (!dataAlloc) {
clear();
}
}
return dataAlloc;
}
protected void allocateOffsetBuffer(final long size) {
final int curSize = (int) size;
offsetBuffer = allocator.buffer(curSize);
offsetBuffer.readerIndex(0);
offsetAllocationSizeInBytes = curSize;
offsetBuffer.setZero(0, offsetBuffer.capacity());
}
@Override
public void reAlloc() {
reallocOffsetBuffer();
vector.reAlloc();
}
protected void reallocOffsetBuffer() {
final int currentBufferCapacity = offsetBuffer.capacity();
long baseSize = offsetAllocationSizeInBytes;
if (baseSize < (long) currentBufferCapacity) {
baseSize = (long) currentBufferCapacity;
}
long newAllocationSize = baseSize * 2L;
newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);
assert newAllocationSize >= 1;
if (newAllocationSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Unable to expand the buffer");
}
final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize);
newBuf.setBytes(0, offsetBuffer, 0, currentBufferCapacity);
newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity);
offsetBuffer.getReferenceManager().release(1);
offsetBuffer = newBuf;
offsetAllocationSizeInBytes = (int) newAllocationSize;
}
@Override
@Deprecated
public UInt4Vector getOffsetVector() {
throw new UnsupportedOperationException("There is no inner offset vector");
}
@Override
public FieldVector getDataVector() {
return vector;
}
@Override
public void setInitialCapacity(int numRecords) {
offsetAllocationSizeInBytes = (numRecords + 1) * OFFSET_WIDTH;
if (vector instanceof BaseFixedWidthVector || vector instanceof BaseVariableWidthVector) {
vector.setInitialCapacity(numRecords * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
} else {
vector.setInitialCapacity(numRecords);
}
}
/**
* Specialized version of setInitialCapacity() for ListVector. This is
* used by some callers when they want to explicitly control and be
* conservative about memory allocated for inner data vector. This is
* very useful when we are working with memory constraints for a query
* and have a fixed amount of memory reserved for the record batch. In
* such cases, we are likely to face OOM or related problems when
* we reserve memory for a record batch with value count x and
* do setInitialCapacity(x) such that each vector allocates only
* what is necessary and not the default amount but the multiplier
* forces the memory requirement to go beyond what was needed.
*
* @param numRecords value count
* @param density density of ListVector. Density is the average size of
* list per position in the List vector. For example, a
* density value of 10 implies each position in the list
* vector has a list of 10 values.
* A density value of 0.1 implies out of 10 positions in
* the list vector, 1 position has a list of size 1 and
* remaining positions are null (no lists) or empty lists.
* This helps in tightly controlling the memory we provision
* for inner data vector.
*/
@Override
public void setInitialCapacity(int numRecords, double density) {
if ((numRecords * density) >= Integer.MAX_VALUE) {
throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
}
offsetAllocationSizeInBytes = (numRecords + 1) * OFFSET_WIDTH;
int innerValueCapacity = Math.max((int)(numRecords * density), 1);
if (vector instanceof DensityAwareVector) {
((DensityAwareVector)vector).setInitialCapacity(innerValueCapacity, density);
} else {
vector.setInitialCapacity(innerValueCapacity);
}
}
@Override
public int getValueCapacity() {
final int offsetValueCapacity = Math.max(getOffsetBufferValueCapacity() - 1, 0);
if (vector == DEFAULT_DATA_VECTOR) {
return offsetValueCapacity;
}
return Math.min(vector.getValueCapacity(), offsetValueCapacity);
}
protected int getOffsetBufferValueCapacity() {
return offsetBuffer.capacity() / OFFSET_WIDTH;
}
@Override
public int getBufferSize() {
if (valueCount == 0) {
return 0;
}
return ((valueCount + 1) * OFFSET_WIDTH) + vector.getBufferSize();
}
@Override
public int getBufferSizeFor(int valueCount) {
if (valueCount == 0) {
return 0;
}
int innerVectorValueCount = offsetBuffer.getInt(valueCount * OFFSET_WIDTH);
return ((valueCount + 1) * OFFSET_WIDTH) + vector.getBufferSizeFor(innerVectorValueCount);
}
@Override
public Iterator<ValueVector> iterator() {
return Collections.<ValueVector>singleton(getDataVector()).iterator();
}
@Override
public void clear() {
offsetBuffer = releaseBuffer(offsetBuffer);
vector.clear();
valueCount = 0;
super.clear();
}
@Override
public void reset() {
offsetBuffer.setZero(0, offsetBuffer.capacity());
vector.reset();
valueCount = 0;
}
@Override
public ArrowBuf[] getBuffers(boolean clear) {
final ArrowBuf[] buffers;
if (getBufferSize() == 0) {
buffers = new ArrowBuf[0];
} else {
List<ArrowBuf> list = new ArrayList<>();
list.add(offsetBuffer);
list.addAll(Arrays.asList(vector.getBuffers(false)));
buffers = list.toArray(new ArrowBuf[list.size()]);
}
if (clear) {
for (ArrowBuf buffer : buffers) {
buffer.getReferenceManager().retain();
}
clear();
}
return buffers;
}
/**
* Get value indicating if inner vector is set.
* @return 1 if inner vector is explicitly set via #addOrGetVector else 0
*/
public int size() {
return vector == DEFAULT_DATA_VECTOR ? 0 : 1;
}
/**
* Initialize the data vector (and execute callback) if it hasn't already been done,
* returns the data vector.
*/
public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(FieldType fieldType) {
boolean created = false;
if (vector instanceof ZeroVector) {
vector = fieldType.createNewSingleVector(defaultDataVectorName, allocator, callBack);
// returned vector must have the same field
created = true;
if (callBack != null &&
// not a schema change if changing from ZeroVector to ZeroVector
(fieldType.getType().getTypeID() != ArrowTypeID.Null)) {
callBack.doWork();
}
}
if (vector.getField().getType().getTypeID() != fieldType.getType().getTypeID()) {
final String msg = String.format("Inner vector type mismatch. Requested type: [%s], actual type: [%s]",
fieldType.getType().getTypeID(), vector.getField().getType().getTypeID());
throw new SchemaChangeRuntimeException(msg);
}
return new AddOrGetResult<>((T) vector, created);
}
protected void replaceDataVector(FieldVector v) {
vector.clear();
vector = v;
}
@Override
public int getValueCount() {
return valueCount;
}
/* returns the value count for inner data vector for this list vector */
public int getInnerValueCount() {
return vector.getValueCount();
}
/** Returns the value count for inner data vector at a particular index. */
public int getInnerValueCountAt(int index) {
return offsetBuffer.getInt((index + 1) * OFFSET_WIDTH) -
offsetBuffer.getInt(index * OFFSET_WIDTH);
}
/** Return if value at index is null (this implementation is always false). */
public boolean isNull(int index) {
return false;
}
/** Return if value at index is empty (this implementation is always false). */
public boolean isEmpty(int index) {
return false;
}
/** Starts a new repeated value. */
public int startNewValue(int index) {
while (index >= getOffsetBufferValueCapacity()) {
reallocOffsetBuffer();
}
int offset = offsetBuffer.getInt(index * OFFSET_WIDTH);
offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, offset);
setValueCount(index + 1);
return offset;
}
/** Preallocates the number of repeated values. */
public void setValueCount(int valueCount) {
this.valueCount = valueCount;
while (valueCount > getOffsetBufferValueCapacity()) {
reallocOffsetBuffer();
}
final int childValueCount = valueCount == 0 ? 0 :
offsetBuffer.getInt(valueCount * OFFSET_WIDTH);
vector.setValueCount(childValueCount);
}
}