| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.vector.complex; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| import io.netty.buffer.DrillBuf; |
| |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.drill.common.types.TypeProtos.MajorType; |
| import org.apache.drill.common.types.TypeProtos.MinorType; |
| import org.apache.drill.common.types.Types; |
| import org.apache.drill.exec.expr.holders.ComplexHolder; |
| import org.apache.drill.exec.expr.holders.RepeatedListHolder; |
| import org.apache.drill.exec.memory.BufferAllocator; |
| import org.apache.drill.exec.memory.AllocationManager.BufferLedger; |
| import org.apache.drill.exec.exception.OutOfMemoryException; |
| import org.apache.drill.exec.proto.UserBitShared.SerializedField; |
| import org.apache.drill.exec.record.MaterializedField; |
| import org.apache.drill.exec.record.TransferPair; |
| import org.apache.drill.exec.util.JsonStringArrayList; |
| import org.apache.drill.exec.vector.AddOrGetResult; |
| import org.apache.drill.exec.util.CallBack; |
| import org.apache.drill.exec.vector.UInt4Vector; |
| import org.apache.drill.exec.vector.ValueVector; |
| import org.apache.drill.exec.vector.VectorDescriptor; |
| import org.apache.drill.exec.vector.complex.impl.NullReader; |
| import org.apache.drill.exec.vector.complex.impl.RepeatedListReaderImpl; |
| import org.apache.drill.exec.vector.complex.reader.FieldReader; |
| |
| public class RepeatedListVector extends AbstractContainerVector |
| implements RepeatedValueVector { |
| |
| protected static class DelegateRepeatedVector extends BaseRepeatedValueVector { |
| |
| private final RepeatedListAccessor accessor = new RepeatedListAccessor(); |
| private final RepeatedListMutator mutator = new RepeatedListMutator(); |
| private final EmptyValuePopulator emptyPopulator; |
| private transient DelegateTransferPair ephPair; |
| |
| public class RepeatedListAccessor extends BaseRepeatedValueVector.BaseRepeatedAccessor { |
| |
| @Override |
| public Object getObject(int index) { |
| final List<Object> list = new JsonStringArrayList<>(); |
| final int start = offsets.getAccessor().get(index); |
| final int until = offsets.getAccessor().get(index+1); |
| for (int i = start; i < until; i++) { |
| list.add(vector.getAccessor().getObject(i)); |
| } |
| return list; |
| } |
| |
| public void get(int index, RepeatedListHolder holder) { |
| assert index <= getValueCapacity(); |
| holder.start = getOffsetVector().getAccessor().get(index); |
| holder.end = getOffsetVector().getAccessor().get(index+1); |
| } |
| |
| public void get(int index, ComplexHolder holder) { |
| final FieldReader reader = getReader(); |
| reader.setPosition(index); |
| holder.reader = reader; |
| } |
| |
| public void get(int index, int arrayIndex, ComplexHolder holder) { |
| final RepeatedListHolder listHolder = new RepeatedListHolder(); |
| get(index, listHolder); |
| int offset = listHolder.start + arrayIndex; |
| if (offset >= listHolder.end) { |
| holder.reader = NullReader.INSTANCE; |
| } else { |
| FieldReader r = getDataVector().getReader(); |
| r.setPosition(offset); |
| holder.reader = r; |
| } |
| } |
| } |
| |
| public class RepeatedListMutator extends BaseRepeatedValueVector.BaseRepeatedMutator { |
| |
| public int add(int index) { |
| final int curEnd = getOffsetVector().getAccessor().get(index+1); |
| getOffsetVector().getMutator().setSafe(index + 1, curEnd + 1); |
| return curEnd; |
| } |
| |
| @Override |
| public void startNewValue(int index) { |
| emptyPopulator.populate(index+1); |
| super.startNewValue(index); |
| } |
| |
| @Override |
| public void setValueCount(int valueCount) { |
| emptyPopulator.populate(valueCount); |
| super.setValueCount(valueCount); |
| } |
| } |
| |
| public class DelegateTransferPair extends BaseRepeatedValueVectorTransferPair<DelegateRepeatedVector> { |
| |
| public DelegateTransferPair(DelegateRepeatedVector target) { |
| super(target); |
| } |
| |
| @Override |
| public void copyValueSafe(int srcIndex, int destIndex) { |
| final RepeatedListHolder holder = new RepeatedListHolder(); |
| getAccessor().get(srcIndex, holder); |
| target.emptyPopulator.populate(destIndex+1); |
| copyValueSafe(destIndex, holder.start, holder.end); |
| } |
| } |
| |
| public DelegateRepeatedVector(String path, BufferAllocator allocator) { |
| this(MaterializedField.create(path, TYPE), allocator); |
| } |
| |
| public DelegateRepeatedVector(MaterializedField field, BufferAllocator allocator) { |
| super(field, allocator); |
| emptyPopulator = new EmptyValuePopulator(getOffsetVector()); |
| } |
| |
| @Override |
| public void allocateNew() throws OutOfMemoryException { |
| if (!allocateNewSafe()) { |
| throw new OutOfMemoryException(); |
| } |
| } |
| |
| @Override |
| public TransferPair getTransferPair(String ref, BufferAllocator allocator) { |
| return makeTransferPair(new DelegateRepeatedVector(ref, allocator)); |
| } |
| |
| @Override |
| public TransferPair makeTransferPair(ValueVector target) { |
| return new DelegateTransferPair(DelegateRepeatedVector.class.cast(target)); |
| } |
| |
| @Override |
| public RepeatedListAccessor getAccessor() { return accessor; } |
| |
| @Override |
| public RepeatedListMutator getMutator() { return mutator; } |
| |
| @Override |
| public FieldReader getReader() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| public void copyFromSafe(int fromIndex, int thisIndex, DelegateRepeatedVector from) { |
| if (ephPair == null || ephPair.target != from) { |
| ephPair = DelegateTransferPair.class.cast(from.makeTransferPair(this)); |
| } |
| ephPair.copyValueSafe(fromIndex, thisIndex); |
| } |
| |
| @Override |
| public void copyEntry(int toIndex, ValueVector from, int fromIndex) { |
| copyFromSafe(fromIndex, toIndex, (DelegateRepeatedVector) from); |
| } |
| } |
| |
| protected class RepeatedListTransferPair implements TransferPair { |
| private final TransferPair delegate; |
| |
| public RepeatedListTransferPair(TransferPair delegate) { |
| this.delegate = delegate; |
| } |
| |
| @Override |
| public void transfer() { |
| delegate.transfer(); |
| } |
| |
| @Override |
| public void splitAndTransfer(int startIndex, int length) { |
| delegate.splitAndTransfer(startIndex, length); |
| } |
| |
| @Override |
| public ValueVector getTo() { |
| final DelegateRepeatedVector delegateVector = DelegateRepeatedVector.class.cast(delegate.getTo()); |
| return new RepeatedListVector(getField(), allocator, callBack, delegateVector); |
| } |
| |
| @Override |
| public void copyValueSafe(int from, int to) { |
| delegate.copyValueSafe(from, to); |
| } |
| } |
| |
| public RepeatedListVector(String path, BufferAllocator allocator, CallBack callBack) { |
| this(MaterializedField.create(path, TYPE), allocator, callBack); |
| } |
| |
| public RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) { |
| this(field, allocator, callBack, new DelegateRepeatedVector(field, allocator)); |
| } |
| |
| public final static MajorType TYPE = Types.repeated(MinorType.LIST); |
| private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this); |
| private final DelegateRepeatedVector delegate; |
| |
| protected RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack, DelegateRepeatedVector delegate) { |
| super(field, allocator, callBack); |
| this.delegate = Preconditions.checkNotNull(delegate); |
| |
| final List<MaterializedField> children = Lists.newArrayList(field.getChildren()); |
| final int childSize = children.size(); |
| assert childSize < 3; |
| final boolean hasChild = childSize > 0; |
| if (hasChild) { |
| // the last field is data field |
| final MaterializedField child = children.get(childSize-1); |
| addOrGetVector(VectorDescriptor.create(child)); |
| } |
| } |
| |
| @Override |
| public RepeatedListReaderImpl getReader() { return reader; } |
| |
| @Override |
| public DelegateRepeatedVector.RepeatedListAccessor getAccessor() { |
| return delegate.getAccessor(); |
| } |
| |
| @Override |
| public DelegateRepeatedVector.RepeatedListMutator getMutator() { |
| return delegate.getMutator(); |
| } |
| |
| @Override |
| public UInt4Vector getOffsetVector() { |
| return delegate.getOffsetVector(); |
| } |
| |
| @Override |
| public ValueVector getDataVector() { |
| return delegate.getDataVector(); |
| } |
| |
| @Override |
| public void allocateNew() throws OutOfMemoryException { |
| delegate.allocateNew(); |
| } |
| |
| @Override |
| public boolean allocateNewSafe() { |
| return delegate.allocateNewSafe(); |
| } |
| |
| @Override |
| public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor) { |
| final AddOrGetResult<T> result = delegate.addOrGetVector(descriptor); |
| if (result.isCreated() && callBack != null) { |
| callBack.doWork(); |
| } |
| this.field = delegate.getField(); |
| return result; |
| } |
| |
| public void setChildVector(ValueVector childVector) { |
| delegate.setChildVector(childVector); |
| } |
| |
| @Override |
| public int size() { |
| return delegate.size(); |
| } |
| |
| @Override |
| public int getBufferSize() { |
| return delegate.getBufferSize(); |
| } |
| |
| @Override |
| public int getAllocatedSize() { |
| return delegate.getAllocatedSize(); |
| } |
| |
| @Override |
| public int getBufferSizeFor(final int valueCount) { |
| return delegate.getBufferSizeFor(valueCount); |
| } |
| |
| @Override |
| public void close() { |
| delegate.close(); |
| } |
| |
| @Override |
| public void clear() { |
| delegate.clear(); |
| } |
| |
| @Override |
| public TransferPair getTransferPair(BufferAllocator allocator) { |
| return new RepeatedListTransferPair(delegate.getTransferPair(allocator)); |
| } |
| |
| @Override |
| public TransferPair getTransferPair(String ref, BufferAllocator allocator) { |
| return new RepeatedListTransferPair(delegate.getTransferPair(ref, allocator)); |
| } |
| |
| @Override |
| public TransferPair makeTransferPair(ValueVector to) { |
| final RepeatedListVector target = RepeatedListVector.class.cast(to); |
| return new RepeatedListTransferPair(delegate.makeTransferPair(target.delegate)); |
| } |
| |
| @Override |
| public int getValueCapacity() { |
| return delegate.getValueCapacity(); |
| } |
| |
| @Override |
| public DrillBuf[] getBuffers(boolean clear) { |
| return delegate.getBuffers(clear); |
| } |
| |
| @Override |
| public void load(SerializedField metadata, DrillBuf buf) { |
| delegate.load(metadata, buf); |
| } |
| |
| @Override |
| public SerializedField getMetadata() { |
| return delegate.getMetadata(); |
| } |
| |
| @Override |
| public Iterator<ValueVector> iterator() { |
| return delegate.iterator(); |
| } |
| |
| @Override |
| public void setInitialCapacity(int numRecords) { |
| delegate.setInitialCapacity(numRecords); |
| } |
| |
| /** |
| * @deprecated |
| * prefer using {@link #addOrGetVector(org.apache.drill.exec.vector.VectorDescriptor)} instead. |
| */ |
| @Deprecated |
| @Override |
| public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) { |
| final AddOrGetResult<T> result = addOrGetVector(VectorDescriptor.create(type)); |
| return result.getVector(); |
| } |
| |
| @Override |
| public <T extends ValueVector> T getChild(String name, Class<T> clazz) { |
| if (name != null) { |
| return null; |
| } |
| return typeify(delegate.getDataVector(), clazz); |
| } |
| |
| public void allocateNew(int valueCount, int innerValueCount) { |
| clear(); |
| getOffsetVector().allocateNew(valueCount + 1); |
| getOffsetVector().getMutator().setSafe(0, 0); |
| getMutator().reset(); |
| } |
| |
| public void allocateOffsetsNew(int groupCount) { |
| getOffsetVector().allocateNew(groupCount + 1); |
| getOffsetVector().zeroVector(); |
| } |
| |
| @Override |
| public VectorWithOrdinal getChildVectorWithOrdinal(String name) { |
| if (name != null) { |
| return null; |
| } |
| return new VectorWithOrdinal(delegate.getDataVector(), 0); |
| } |
| |
| public void copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from) { |
| delegate.copyFromSafe(fromIndex, thisIndex, from.delegate); |
| } |
| |
| @Override |
| public void copyEntry(int toIndex, ValueVector from, int fromIndex) { |
| copyFromSafe(fromIndex, toIndex, (RepeatedListVector) from); |
| } |
| |
| @Override |
| public void collectLedgers(Set<BufferLedger> ledgers) { |
| delegate.collectLedgers(ledgers); |
| } |
| |
| @Override |
| public int getPayloadByteCount(int valueCount) { |
| if (valueCount == 0) { |
| return 0; |
| } |
| return delegate.getPayloadByteCount(valueCount); |
| } |
| |
| @Override |
| public void exchange(ValueVector other) { |
| delegate.exchange(((RepeatedListVector) other).delegate); |
| } |
| |
| @Override |
| public void toNullable(ValueVector nullableVector) { |
| throw new UnsupportedOperationException(); |
| } |
| } |