blob: c201c766d4782d437ee2cb406b3ddc0f0f3f0ac9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.vector.complex;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.netty.buffer.DrillBuf;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.holders.ComplexHolder;
import org.apache.drill.exec.expr.holders.RepeatedListHolder;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.util.JsonStringArrayList;
import org.apache.drill.exec.vector.AddOrGetResult;
import org.apache.drill.exec.util.CallBack;
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VectorDescriptor;
import org.apache.drill.exec.vector.complex.impl.NullReader;
import org.apache.drill.exec.vector.complex.impl.RepeatedListReaderImpl;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
public class RepeatedListVector extends AbstractContainerVector
implements RepeatedValueVector {
protected static class DelegateRepeatedVector extends BaseRepeatedValueVector {
private final RepeatedListAccessor accessor = new RepeatedListAccessor();
private final RepeatedListMutator mutator = new RepeatedListMutator();
private final EmptyValuePopulator emptyPopulator;
private transient DelegateTransferPair ephPair;
public class RepeatedListAccessor extends BaseRepeatedValueVector.BaseRepeatedAccessor {
@Override
public Object getObject(int index) {
final List<Object> list = new JsonStringArrayList<>();
final int start = offsets.getAccessor().get(index);
final int until = offsets.getAccessor().get(index+1);
for (int i = start; i < until; i++) {
list.add(vector.getAccessor().getObject(i));
}
return list;
}
public void get(int index, RepeatedListHolder holder) {
assert index <= getValueCapacity();
holder.start = getOffsetVector().getAccessor().get(index);
holder.end = getOffsetVector().getAccessor().get(index+1);
}
public void get(int index, ComplexHolder holder) {
final FieldReader reader = getReader();
reader.setPosition(index);
holder.reader = reader;
}
public void get(int index, int arrayIndex, ComplexHolder holder) {
final RepeatedListHolder listHolder = new RepeatedListHolder();
get(index, listHolder);
int offset = listHolder.start + arrayIndex;
if (offset >= listHolder.end) {
holder.reader = NullReader.INSTANCE;
} else {
FieldReader r = getDataVector().getReader();
r.setPosition(offset);
holder.reader = r;
}
}
}
public class RepeatedListMutator extends BaseRepeatedValueVector.BaseRepeatedMutator {
public int add(int index) {
final int curEnd = getOffsetVector().getAccessor().get(index+1);
getOffsetVector().getMutator().setSafe(index + 1, curEnd + 1);
return curEnd;
}
@Override
public void startNewValue(int index) {
emptyPopulator.populate(index+1);
super.startNewValue(index);
}
@Override
public void setValueCount(int valueCount) {
emptyPopulator.populate(valueCount);
super.setValueCount(valueCount);
}
}
public class DelegateTransferPair extends BaseRepeatedValueVectorTransferPair<DelegateRepeatedVector> {
public DelegateTransferPair(DelegateRepeatedVector target) {
super(target);
}
@Override
public void copyValueSafe(int srcIndex, int destIndex) {
final RepeatedListHolder holder = new RepeatedListHolder();
getAccessor().get(srcIndex, holder);
target.emptyPopulator.populate(destIndex+1);
copyValueSafe(destIndex, holder.start, holder.end);
}
}
public DelegateRepeatedVector(String path, BufferAllocator allocator) {
this(MaterializedField.create(path, TYPE), allocator);
}
public DelegateRepeatedVector(MaterializedField field, BufferAllocator allocator) {
super(field, allocator);
emptyPopulator = new EmptyValuePopulator(getOffsetVector());
}
@Override
public void allocateNew() throws OutOfMemoryException {
if (!allocateNewSafe()) {
throw new OutOfMemoryException();
}
}
@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
return makeTransferPair(new DelegateRepeatedVector(ref, allocator));
}
@Override
public TransferPair makeTransferPair(ValueVector target) {
return new DelegateTransferPair(DelegateRepeatedVector.class.cast(target));
}
@Override
public RepeatedListAccessor getAccessor() { return accessor; }
@Override
public RepeatedListMutator getMutator() { return mutator; }
@Override
public FieldReader getReader() {
throw new UnsupportedOperationException();
}
public void copyFromSafe(int fromIndex, int thisIndex, DelegateRepeatedVector from) {
if (ephPair == null || ephPair.target != from) {
ephPair = DelegateTransferPair.class.cast(from.makeTransferPair(this));
}
ephPair.copyValueSafe(fromIndex, thisIndex);
}
@Override
public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
copyFromSafe(fromIndex, toIndex, (DelegateRepeatedVector) from);
}
}
protected class RepeatedListTransferPair implements TransferPair {
private final TransferPair delegate;
public RepeatedListTransferPair(TransferPair delegate) {
this.delegate = delegate;
}
@Override
public void transfer() {
delegate.transfer();
}
@Override
public void splitAndTransfer(int startIndex, int length) {
delegate.splitAndTransfer(startIndex, length);
}
@Override
public ValueVector getTo() {
final DelegateRepeatedVector delegateVector = DelegateRepeatedVector.class.cast(delegate.getTo());
return new RepeatedListVector(getField(), allocator, callBack, delegateVector);
}
@Override
public void copyValueSafe(int from, int to) {
delegate.copyValueSafe(from, to);
}
}
public RepeatedListVector(String path, BufferAllocator allocator, CallBack callBack) {
this(MaterializedField.create(path, TYPE), allocator, callBack);
}
public RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) {
this(field, allocator, callBack, new DelegateRepeatedVector(field, allocator));
}
public final static MajorType TYPE = Types.repeated(MinorType.LIST);
private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this);
private final DelegateRepeatedVector delegate;
protected RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack, DelegateRepeatedVector delegate) {
super(field, allocator, callBack);
this.delegate = Preconditions.checkNotNull(delegate);
final List<MaterializedField> children = Lists.newArrayList(field.getChildren());
final int childSize = children.size();
assert childSize < 3;
final boolean hasChild = childSize > 0;
if (hasChild) {
// the last field is data field
final MaterializedField child = children.get(childSize-1);
addOrGetVector(VectorDescriptor.create(child));
}
}
@Override
public RepeatedListReaderImpl getReader() { return reader; }
@Override
public DelegateRepeatedVector.RepeatedListAccessor getAccessor() {
return delegate.getAccessor();
}
@Override
public DelegateRepeatedVector.RepeatedListMutator getMutator() {
return delegate.getMutator();
}
@Override
public UInt4Vector getOffsetVector() {
return delegate.getOffsetVector();
}
@Override
public ValueVector getDataVector() {
return delegate.getDataVector();
}
@Override
public void allocateNew() throws OutOfMemoryException {
delegate.allocateNew();
}
@Override
public boolean allocateNewSafe() {
return delegate.allocateNewSafe();
}
@Override
public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor) {
final AddOrGetResult<T> result = delegate.addOrGetVector(descriptor);
if (result.isCreated() && callBack != null) {
callBack.doWork();
}
this.field = delegate.getField();
return result;
}
public void setChildVector(ValueVector childVector) {
delegate.setChildVector(childVector);
}
@Override
public int size() {
return delegate.size();
}
@Override
public int getBufferSize() {
return delegate.getBufferSize();
}
@Override
public int getAllocatedSize() {
return delegate.getAllocatedSize();
}
@Override
public int getBufferSizeFor(final int valueCount) {
return delegate.getBufferSizeFor(valueCount);
}
@Override
public void close() {
delegate.close();
}
@Override
public void clear() {
delegate.clear();
}
@Override
public TransferPair getTransferPair(BufferAllocator allocator) {
return new RepeatedListTransferPair(delegate.getTransferPair(allocator));
}
@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
return new RepeatedListTransferPair(delegate.getTransferPair(ref, allocator));
}
@Override
public TransferPair makeTransferPair(ValueVector to) {
final RepeatedListVector target = RepeatedListVector.class.cast(to);
return new RepeatedListTransferPair(delegate.makeTransferPair(target.delegate));
}
@Override
public int getValueCapacity() {
return delegate.getValueCapacity();
}
@Override
public DrillBuf[] getBuffers(boolean clear) {
return delegate.getBuffers(clear);
}
@Override
public void load(SerializedField metadata, DrillBuf buf) {
delegate.load(metadata, buf);
}
@Override
public SerializedField getMetadata() {
return delegate.getMetadata();
}
@Override
public Iterator<ValueVector> iterator() {
return delegate.iterator();
}
@Override
public void setInitialCapacity(int numRecords) {
delegate.setInitialCapacity(numRecords);
}
/**
* @deprecated
* prefer using {@link #addOrGetVector(org.apache.drill.exec.vector.VectorDescriptor)} instead.
*/
@Deprecated
@Override
public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
final AddOrGetResult<T> result = addOrGetVector(VectorDescriptor.create(type));
return result.getVector();
}
@Override
public <T extends ValueVector> T getChild(String name, Class<T> clazz) {
if (name != null) {
return null;
}
return typeify(delegate.getDataVector(), clazz);
}
public void allocateNew(int valueCount, int innerValueCount) {
clear();
getOffsetVector().allocateNew(valueCount + 1);
getOffsetVector().getMutator().setSafe(0, 0);
getMutator().reset();
}
public void allocateOffsetsNew(int groupCount) {
getOffsetVector().allocateNew(groupCount + 1);
getOffsetVector().zeroVector();
}
@Override
public VectorWithOrdinal getChildVectorWithOrdinal(String name) {
if (name != null) {
return null;
}
return new VectorWithOrdinal(delegate.getDataVector(), 0);
}
public void copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from) {
delegate.copyFromSafe(fromIndex, thisIndex, from.delegate);
}
@Override
public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
copyFromSafe(fromIndex, toIndex, (RepeatedListVector) from);
}
@Override
public void collectLedgers(Set<BufferLedger> ledgers) {
delegate.collectLedgers(ledgers);
}
@Override
public int getPayloadByteCount(int valueCount) {
if (valueCount == 0) {
return 0;
}
return delegate.getPayloadByteCount(valueCount);
}
@Override
public void exchange(ValueVector other) {
delegate.exchange(((RepeatedListVector) other).delegate);
}
@Override
public void toNullable(ValueVector nullableVector) {
throw new UnsupportedOperationException();
}
}