blob: 686414e71cadf0b2af0026beae90eba1262db6d0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.vector.complex;
import io.netty.buffer.ArrowBuf;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.vector.AddOrGetResult;
import org.apache.arrow.vector.AllocationHelper;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VectorDescriptor;
import org.apache.arrow.vector.complex.impl.NullReader;
import org.apache.arrow.vector.complex.impl.RepeatedMapReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.ComplexHolder;
import org.apache.arrow.vector.holders.RepeatedMapHolder;
import org.apache.arrow.vector.types.MaterializedField;
import org.apache.arrow.vector.types.Types.DataMode;
import org.apache.arrow.vector.types.Types.MajorType;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.JsonStringArrayList;
import org.apache.arrow.vector.util.TransferPair;
import org.apache.commons.lang3.ArrayUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
public class RepeatedMapVector extends AbstractMapVector
implements RepeatedValueVector, RepeatedFixedWidthVectorLike {
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatedMapVector.class);
public final static MajorType TYPE = new MajorType(MinorType.MAP, DataMode.REPEATED);
final UInt4Vector offsets; // offsets to start of each record (considering record indices are 0-indexed)
private final RepeatedMapReaderImpl reader = new RepeatedMapReaderImpl(RepeatedMapVector.this);
private final RepeatedMapAccessor accessor = new RepeatedMapAccessor();
private final Mutator mutator = new Mutator();
private final EmptyValuePopulator emptyPopulator;
public RepeatedMapVector(MaterializedField field, BufferAllocator allocator, CallBack callBack){
super(field, allocator, callBack);
this.offsets = new UInt4Vector(BaseRepeatedValueVector.OFFSETS_FIELD, allocator);
this.emptyPopulator = new EmptyValuePopulator(offsets);
}
@Override
public UInt4Vector getOffsetVector() {
return offsets;
}
@Override
public ValueVector getDataVector() {
throw new UnsupportedOperationException();
}
@Override
public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor) {
throw new UnsupportedOperationException();
}
@Override
public void setInitialCapacity(int numRecords) {
offsets.setInitialCapacity(numRecords + 1);
for(final ValueVector v : (Iterable<ValueVector>) this) {
v.setInitialCapacity(numRecords * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
}
}
@Override
public RepeatedMapReaderImpl getReader() {
return reader;
}
@Override
public void allocateNew(int groupCount, int innerValueCount) {
clear();
try {
offsets.allocateNew(groupCount + 1);
for (ValueVector v : getChildren()) {
AllocationHelper.allocatePrecomputedChildCount(v, groupCount, 50, innerValueCount);
}
} catch (OutOfMemoryException e){
clear();
throw e;
}
offsets.zeroVector();
mutator.reset();
}
public Iterator<String> fieldNameIterator() {
return getChildFieldNames().iterator();
}
@Override
public List<ValueVector> getPrimitiveVectors() {
final List<ValueVector> primitiveVectors = super.getPrimitiveVectors();
primitiveVectors.add(offsets);
return primitiveVectors;
}
@Override
public int getBufferSize() {
if (getAccessor().getValueCount() == 0) {
return 0;
}
long bufferSize = offsets.getBufferSize();
for (final ValueVector v : (Iterable<ValueVector>) this) {
bufferSize += v.getBufferSize();
}
return (int) bufferSize;
}
@Override
public int getBufferSizeFor(final int valueCount) {
if (valueCount == 0) {
return 0;
}
long bufferSize = 0;
for (final ValueVector v : (Iterable<ValueVector>) this) {
bufferSize += v.getBufferSizeFor(valueCount);
}
return (int) bufferSize;
}
@Override
public void close() {
offsets.close();
super.close();
}
@Override
public TransferPair getTransferPair(BufferAllocator allocator) {
return new RepeatedMapTransferPair(this, getField().getPath(), allocator);
}
@Override
public TransferPair makeTransferPair(ValueVector to) {
return new RepeatedMapTransferPair(this, (RepeatedMapVector)to);
}
MapSingleCopier makeSingularCopier(MapVector to) {
return new MapSingleCopier(this, to);
}
protected static class MapSingleCopier {
private final TransferPair[] pairs;
public final RepeatedMapVector from;
public MapSingleCopier(RepeatedMapVector from, MapVector to) {
this.from = from;
this.pairs = new TransferPair[from.size()];
int i = 0;
ValueVector vector;
for (final String child:from.getChildFieldNames()) {
int preSize = to.size();
vector = from.getChild(child);
if (vector == null) {
continue;
}
final ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
if (to.size() != preSize) {
newVector.allocateNew();
}
pairs[i++] = vector.makeTransferPair(newVector);
}
}
public void copySafe(int fromSubIndex, int toIndex) {
for (TransferPair p : pairs) {
p.copyValueSafe(fromSubIndex, toIndex);
}
}
}
public TransferPair getTransferPairToSingleMap(String reference, BufferAllocator allocator) {
return new SingleMapTransferPair(this, reference, allocator);
}
@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
return new RepeatedMapTransferPair(this, ref, allocator);
}
@Override
public boolean allocateNewSafe() {
/* boolean to keep track if all the memory allocation were successful
* Used in the case of composite vectors when we need to allocate multiple
* buffers for multiple vectors. If one of the allocations failed we need to
* clear all the memory that we allocated
*/
boolean success = false;
try {
if (!offsets.allocateNewSafe()) {
return false;
}
success = super.allocateNewSafe();
} finally {
if (!success) {
clear();
}
}
offsets.zeroVector();
return success;
}
protected static class SingleMapTransferPair implements TransferPair {
private final TransferPair[] pairs;
private final RepeatedMapVector from;
private final MapVector to;
private static final MajorType MAP_TYPE = new MajorType(MinorType.MAP, DataMode.REQUIRED);
public SingleMapTransferPair(RepeatedMapVector from, String path, BufferAllocator allocator) {
this(from, new MapVector(MaterializedField.create(path, MAP_TYPE), allocator, from.callBack), false);
}
public SingleMapTransferPair(RepeatedMapVector from, MapVector to) {
this(from, to, true);
}
public SingleMapTransferPair(RepeatedMapVector from, MapVector to, boolean allocate) {
this.from = from;
this.to = to;
this.pairs = new TransferPair[from.size()];
int i = 0;
ValueVector vector;
for (final String child : from.getChildFieldNames()) {
int preSize = to.size();
vector = from.getChild(child);
if (vector == null) {
continue;
}
final ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
if (allocate && to.size() != preSize) {
newVector.allocateNew();
}
pairs[i++] = vector.makeTransferPair(newVector);
}
}
@Override
public void transfer() {
for (TransferPair p : pairs) {
p.transfer();
}
to.getMutator().setValueCount(from.getAccessor().getValueCount());
from.clear();
}
@Override
public ValueVector getTo() {
return to;
}
@Override
public void copyValueSafe(int from, int to) {
for (TransferPair p : pairs) {
p.copyValueSafe(from, to);
}
}
@Override
public void splitAndTransfer(int startIndex, int length) {
for (TransferPair p : pairs) {
p.splitAndTransfer(startIndex, length);
}
to.getMutator().setValueCount(length);
}
}
private static class RepeatedMapTransferPair implements TransferPair{
private final TransferPair[] pairs;
private final RepeatedMapVector to;
private final RepeatedMapVector from;
public RepeatedMapTransferPair(RepeatedMapVector from, String path, BufferAllocator allocator) {
this(from, new RepeatedMapVector(MaterializedField.create(path, TYPE), allocator, from.callBack), false);
}
public RepeatedMapTransferPair(RepeatedMapVector from, RepeatedMapVector to) {
this(from, to, true);
}
public RepeatedMapTransferPair(RepeatedMapVector from, RepeatedMapVector to, boolean allocate) {
this.from = from;
this.to = to;
this.pairs = new TransferPair[from.size()];
this.to.ephPair = null;
int i = 0;
ValueVector vector;
for (final String child : from.getChildFieldNames()) {
final int preSize = to.size();
vector = from.getChild(child);
if (vector == null) {
continue;
}
final ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
if (to.size() != preSize) {
newVector.allocateNew();
}
pairs[i++] = vector.makeTransferPair(newVector);
}
}
@Override
public void transfer() {
from.offsets.transferTo(to.offsets);
for (TransferPair p : pairs) {
p.transfer();
}
from.clear();
}
@Override
public ValueVector getTo() {
return to;
}
@Override
public void copyValueSafe(int srcIndex, int destIndex) {
RepeatedMapHolder holder = new RepeatedMapHolder();
from.getAccessor().get(srcIndex, holder);
to.emptyPopulator.populate(destIndex + 1);
int newIndex = to.offsets.getAccessor().get(destIndex);
//todo: make these bulk copies
for (int i = holder.start; i < holder.end; i++, newIndex++) {
for (TransferPair p : pairs) {
p.copyValueSafe(i, newIndex);
}
}
to.offsets.getMutator().setSafe(destIndex + 1, newIndex);
}
@Override
public void splitAndTransfer(final int groupStart, final int groups) {
final UInt4Vector.Accessor a = from.offsets.getAccessor();
final UInt4Vector.Mutator m = to.offsets.getMutator();
final int startPos = a.get(groupStart);
final int endPos = a.get(groupStart + groups);
final int valuesToCopy = endPos - startPos;
to.offsets.clear();
to.offsets.allocateNew(groups + 1);
int normalizedPos;
for (int i = 0; i < groups + 1; i++) {
normalizedPos = a.get(groupStart + i) - startPos;
m.set(i, normalizedPos);
}
m.setValueCount(groups + 1);
to.emptyPopulator.populate(groups);
for (final TransferPair p : pairs) {
p.splitAndTransfer(startPos, valuesToCopy);
}
}
}
transient private RepeatedMapTransferPair ephPair;
public void copyFromSafe(int fromIndex, int thisIndex, RepeatedMapVector from) {
if (ephPair == null || ephPair.from != from) {
ephPair = (RepeatedMapTransferPair) from.makeTransferPair(this);
}
ephPair.copyValueSafe(fromIndex, thisIndex);
}
@Override
public int getValueCapacity() {
return Math.max(offsets.getValueCapacity() - 1, 0);
}
@Override
public RepeatedMapAccessor getAccessor() {
return accessor;
}
@Override
public ArrowBuf[] getBuffers(boolean clear) {
final int expectedBufferSize = getBufferSize();
final int actualBufferSize = super.getBufferSize();
Preconditions.checkArgument(expectedBufferSize == actualBufferSize + offsets.getBufferSize());
return ArrayUtils.addAll(offsets.getBuffers(clear), super.getBuffers(clear));
}
// @Override
// public void load(SerializedField metadata, DrillBuf buffer) {
// final List<SerializedField> children = metadata.getChildList();
//
// final SerializedField offsetField = children.get(0);
// offsets.load(offsetField, buffer);
// int bufOffset = offsetField.getBufferLength();
//
// for (int i = 1; i < children.size(); i++) {
// final SerializedField child = children.get(i);
// final MaterializedField fieldDef = SerializedFieldHelper.create(child);
// ValueVector vector = getChild(fieldDef.getLastName());
// if (vector == null) {
// if we arrive here, we didn't have a matching vector.
// vector = BasicTypeHelper.getNewVector(fieldDef, allocator);
// putChild(fieldDef.getLastName(), vector);
// }
// final int vectorLength = child.getBufferLength();
// vector.load(child, buffer.slice(bufOffset, vectorLength));
// bufOffset += vectorLength;
// }
//
// assert bufOffset == buffer.capacity();
// }
//
//
// @Override
// public SerializedField getMetadata() {
// SerializedField.Builder builder = getField() //
// .getAsBuilder() //
// .setBufferLength(getBufferSize()) //
// while we don't need to actually read this on load, we need it to make sure we don't skip deserialization of this vector
// .setValueCount(accessor.getValueCount());
// builder.addChild(offsets.getMetadata());
// for (final ValueVector child : getChildren()) {
// builder.addChild(child.getMetadata());
// }
// return builder.build();
// }
@Override
public Mutator getMutator() {
return mutator;
}
public class RepeatedMapAccessor implements RepeatedAccessor {
@Override
public Object getObject(int index) {
final List<Object> list = new JsonStringArrayList<>();
final int end = offsets.getAccessor().get(index+1);
String fieldName;
for (int i = offsets.getAccessor().get(index); i < end; i++) {
final Map<String, Object> vv = Maps.newLinkedHashMap();
for (final MaterializedField field : getField().getChildren()) {
if (!field.equals(BaseRepeatedValueVector.OFFSETS_FIELD)) {
fieldName = field.getLastName();
final Object value = getChild(fieldName).getAccessor().getObject(i);
if (value != null) {
vv.put(fieldName, value);
}
}
}
list.add(vv);
}
return list;
}
@Override
public int getValueCount() {
return Math.max(offsets.getAccessor().getValueCount() - 1, 0);
}
@Override
public int getInnerValueCount() {
final int valueCount = getValueCount();
if (valueCount == 0) {
return 0;
}
return offsets.getAccessor().get(valueCount);
}
@Override
public int getInnerValueCountAt(int index) {
return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index);
}
@Override
public boolean isEmpty(int index) {
return false;
}
@Override
public boolean isNull(int index) {
return false;
}
public void get(int index, RepeatedMapHolder holder) {
assert index < getValueCapacity() :
String.format("Attempted to access index %d when value capacity is %d",
index, getValueCapacity());
final UInt4Vector.Accessor offsetsAccessor = offsets.getAccessor();
holder.start = offsetsAccessor.get(index);
holder.end = offsetsAccessor.get(index + 1);
}
public void get(int index, ComplexHolder holder) {
final FieldReader reader = getReader();
reader.setPosition(index);
holder.reader = reader;
}
public void get(int index, int arrayIndex, ComplexHolder holder) {
final RepeatedMapHolder h = new RepeatedMapHolder();
get(index, h);
final int offset = h.start + arrayIndex;
if (offset >= h.end) {
holder.reader = NullReader.INSTANCE;
} else {
reader.setSinglePosition(index, arrayIndex);
holder.reader = reader;
}
}
}
public class Mutator implements RepeatedMutator {
@Override
public void startNewValue(int index) {
emptyPopulator.populate(index + 1);
offsets.getMutator().setSafe(index + 1, offsets.getAccessor().get(index));
}
@Override
public void setValueCount(int topLevelValueCount) {
emptyPopulator.populate(topLevelValueCount);
offsets.getMutator().setValueCount(topLevelValueCount == 0 ? 0 : topLevelValueCount + 1);
int childValueCount = offsets.getAccessor().get(topLevelValueCount);
for (final ValueVector v : getChildren()) {
v.getMutator().setValueCount(childValueCount);
}
}
@Override
public void reset() {}
@Override
public void generateTestData(int values) {}
public int add(int index) {
final int prevEnd = offsets.getAccessor().get(index + 1);
offsets.getMutator().setSafe(index + 1, prevEnd + 1);
return prevEnd;
}
}
@Override
public void clear() {
getMutator().reset();
offsets.clear();
for(final ValueVector vector : getChildren()) {
vector.clear();
}
}
}