blob: f337f9c4a60e02f2bbf0947cf53be6d913d051d5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.vector.complex;
import io.netty.buffer.ArrowBuf;
import java.util.Iterator;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.vector.AddOrGetResult;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VectorDescriptor;
import org.apache.arrow.vector.complex.impl.NullReader;
import org.apache.arrow.vector.complex.impl.RepeatedListReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.ComplexHolder;
import org.apache.arrow.vector.holders.RepeatedListHolder;
import org.apache.arrow.vector.types.MaterializedField;
import org.apache.arrow.vector.types.Types.DataMode;
import org.apache.arrow.vector.types.Types.MajorType;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.JsonStringArrayList;
import org.apache.arrow.vector.util.TransferPair;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public class RepeatedListVector extends AbstractContainerVector
implements RepeatedValueVector, RepeatedFixedWidthVectorLike {
public final static MajorType TYPE = new MajorType(MinorType.LIST, DataMode.REPEATED);
private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this);
final DelegateRepeatedVector delegate;
protected static class DelegateRepeatedVector extends BaseRepeatedValueVector {
private final RepeatedListAccessor accessor = new RepeatedListAccessor();
private final RepeatedListMutator mutator = new RepeatedListMutator();
private final EmptyValuePopulator emptyPopulator;
private transient DelegateTransferPair ephPair;
public class RepeatedListAccessor extends BaseRepeatedValueVector.BaseRepeatedAccessor {
@Override
public Object getObject(int index) {
final List<Object> list = new JsonStringArrayList<>();
final int start = offsets.getAccessor().get(index);
final int until = offsets.getAccessor().get(index+1);
for (int i = start; i < until; i++) {
list.add(vector.getAccessor().getObject(i));
}
return list;
}
public void get(int index, RepeatedListHolder holder) {
assert index <= getValueCapacity();
holder.start = getOffsetVector().getAccessor().get(index);
holder.end = getOffsetVector().getAccessor().get(index+1);
}
public void get(int index, ComplexHolder holder) {
final FieldReader reader = getReader();
reader.setPosition(index);
holder.reader = reader;
}
public void get(int index, int arrayIndex, ComplexHolder holder) {
final RepeatedListHolder listHolder = new RepeatedListHolder();
get(index, listHolder);
int offset = listHolder.start + arrayIndex;
if (offset >= listHolder.end) {
holder.reader = NullReader.INSTANCE;
} else {
FieldReader r = getDataVector().getReader();
r.setPosition(offset);
holder.reader = r;
}
}
}
public class RepeatedListMutator extends BaseRepeatedValueVector.BaseRepeatedMutator {
public int add(int index) {
final int curEnd = getOffsetVector().getAccessor().get(index+1);
getOffsetVector().getMutator().setSafe(index + 1, curEnd + 1);
return curEnd;
}
@Override
public void startNewValue(int index) {
emptyPopulator.populate(index+1);
super.startNewValue(index);
}
@Override
public void setValueCount(int valueCount) {
emptyPopulator.populate(valueCount);
super.setValueCount(valueCount);
}
}
public class DelegateTransferPair implements TransferPair {
private final DelegateRepeatedVector target;
private final TransferPair[] children;
public DelegateTransferPair(DelegateRepeatedVector target) {
this.target = Preconditions.checkNotNull(target);
if (target.getDataVector() == DEFAULT_DATA_VECTOR) {
target.addOrGetVector(VectorDescriptor.create(getDataVector().getField()));
target.getDataVector().allocateNew();
}
this.children = new TransferPair[] {
getOffsetVector().makeTransferPair(target.getOffsetVector()),
getDataVector().makeTransferPair(target.getDataVector())
};
}
@Override
public void transfer() {
for (TransferPair child:children) {
child.transfer();
}
}
@Override
public ValueVector getTo() {
return target;
}
@Override
public void splitAndTransfer(int startIndex, int length) {
target.allocateNew();
for (int i = 0; i < length; i++) {
copyValueSafe(startIndex + i, i);
}
}
@Override
public void copyValueSafe(int srcIndex, int destIndex) {
final RepeatedListHolder holder = new RepeatedListHolder();
getAccessor().get(srcIndex, holder);
target.emptyPopulator.populate(destIndex+1);
final TransferPair vectorTransfer = children[1];
int newIndex = target.getOffsetVector().getAccessor().get(destIndex);
//todo: make this a bulk copy.
for (int i = holder.start; i < holder.end; i++, newIndex++) {
vectorTransfer.copyValueSafe(i, newIndex);
}
target.getOffsetVector().getMutator().setSafe(destIndex + 1, newIndex);
}
}
public DelegateRepeatedVector(String path, BufferAllocator allocator) {
this(MaterializedField.create(path, TYPE), allocator);
}
public DelegateRepeatedVector(MaterializedField field, BufferAllocator allocator) {
super(field, allocator);
emptyPopulator = new EmptyValuePopulator(getOffsetVector());
}
@Override
public void allocateNew() throws OutOfMemoryException {
if (!allocateNewSafe()) {
throw new OutOfMemoryException();
}
}
@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
return makeTransferPair(new DelegateRepeatedVector(ref, allocator));
}
@Override
public TransferPair makeTransferPair(ValueVector target) {
return new DelegateTransferPair(DelegateRepeatedVector.class.cast(target));
}
@Override
public RepeatedListAccessor getAccessor() {
return accessor;
}
@Override
public RepeatedListMutator getMutator() {
return mutator;
}
@Override
public FieldReader getReader() {
throw new UnsupportedOperationException();
}
public void copyFromSafe(int fromIndex, int thisIndex, DelegateRepeatedVector from) {
if(ephPair == null || ephPair.target != from) {
ephPair = DelegateTransferPair.class.cast(from.makeTransferPair(this));
}
ephPair.copyValueSafe(fromIndex, thisIndex);
}
}
protected class RepeatedListTransferPair implements TransferPair {
private final TransferPair delegate;
public RepeatedListTransferPair(TransferPair delegate) {
this.delegate = delegate;
}
public void transfer() {
delegate.transfer();
}
@Override
public void splitAndTransfer(int startIndex, int length) {
delegate.splitAndTransfer(startIndex, length);
}
@Override
public ValueVector getTo() {
final DelegateRepeatedVector delegateVector = DelegateRepeatedVector.class.cast(delegate.getTo());
return new RepeatedListVector(getField(), allocator, callBack, delegateVector);
}
@Override
public void copyValueSafe(int from, int to) {
delegate.copyValueSafe(from, to);
}
}
public RepeatedListVector(String path, BufferAllocator allocator, CallBack callBack) {
this(MaterializedField.create(path, TYPE), allocator, callBack);
}
public RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) {
this(field, allocator, callBack, new DelegateRepeatedVector(field, allocator));
}
protected RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack, DelegateRepeatedVector delegate) {
super(field, allocator, callBack);
this.delegate = Preconditions.checkNotNull(delegate);
final List<MaterializedField> children = Lists.newArrayList(field.getChildren());
final int childSize = children.size();
assert childSize < 3;
final boolean hasChild = childSize > 0;
if (hasChild) {
// the last field is data field
final MaterializedField child = children.get(childSize-1);
addOrGetVector(VectorDescriptor.create(child));
}
}
@Override
public RepeatedListReaderImpl getReader() {
return reader;
}
@Override
public DelegateRepeatedVector.RepeatedListAccessor getAccessor() {
return delegate.getAccessor();
}
@Override
public DelegateRepeatedVector.RepeatedListMutator getMutator() {
return delegate.getMutator();
}
@Override
public UInt4Vector getOffsetVector() {
return delegate.getOffsetVector();
}
@Override
public ValueVector getDataVector() {
return delegate.getDataVector();
}
@Override
public void allocateNew() throws OutOfMemoryException {
delegate.allocateNew();
}
@Override
public boolean allocateNewSafe() {
return delegate.allocateNewSafe();
}
@Override
public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor) {
final AddOrGetResult<T> result = delegate.addOrGetVector(descriptor);
if (result.isCreated() && callBack != null) {
callBack.doWork();
}
return result;
}
@Override
public int size() {
return delegate.size();
}
@Override
public int getBufferSize() {
return delegate.getBufferSize();
}
@Override
public int getBufferSizeFor(final int valueCount) {
return delegate.getBufferSizeFor(valueCount);
}
@Override
public void close() {
delegate.close();
}
@Override
public void clear() {
delegate.clear();
}
@Override
public TransferPair getTransferPair(BufferAllocator allocator) {
return new RepeatedListTransferPair(delegate.getTransferPair(allocator));
}
@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
return new RepeatedListTransferPair(delegate.getTransferPair(ref, allocator));
}
@Override
public TransferPair makeTransferPair(ValueVector to) {
final RepeatedListVector target = RepeatedListVector.class.cast(to);
return new RepeatedListTransferPair(delegate.makeTransferPair(target.delegate));
}
@Override
public int getValueCapacity() {
return delegate.getValueCapacity();
}
@Override
public ArrowBuf[] getBuffers(boolean clear) {
return delegate.getBuffers(clear);
}
// @Override
// public void load(SerializedField metadata, DrillBuf buf) {
// delegate.load(metadata, buf);
// }
// @Override
// public SerializedField getMetadata() {
// return delegate.getMetadata();
// }
@Override
public Iterator<ValueVector> iterator() {
return delegate.iterator();
}
@Override
public void setInitialCapacity(int numRecords) {
delegate.setInitialCapacity(numRecords);
}
/**
* @deprecated
* prefer using {@link #addOrGetVector(org.apache.arrow.vector.VectorDescriptor)} instead.
*/
@Override
public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
final AddOrGetResult<T> result = addOrGetVector(VectorDescriptor.create(type));
return result.getVector();
}
@Override
public <T extends ValueVector> T getChild(String name, Class<T> clazz) {
if (name != null) {
return null;
}
return typeify(delegate.getDataVector(), clazz);
}
@Override
public void allocateNew(int valueCount, int innerValueCount) {
clear();
getOffsetVector().allocateNew(valueCount + 1);
getMutator().reset();
}
@Override
public VectorWithOrdinal getChildVectorWithOrdinal(String name) {
if (name != null) {
return null;
}
return new VectorWithOrdinal(delegate.getDataVector(), 0);
}
public void copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from) {
delegate.copyFromSafe(fromIndex, thisIndex, from.delegate);
}
}