blob: dc8242e3ace1c55e8c1d611e13319caa9abb9157 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.vector.complex;
import io.netty.buffer.DrillBuf;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.drill.common.collections.MapWithOrdinal;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.expr.BasicTypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.util.CallBack;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for MapVectors. Currently used by AbstractRepeatedMapVector and MapVector
*/
public abstract class AbstractMapVector extends AbstractContainerVector {
private static final Logger logger = LoggerFactory.getLogger(AbstractContainerVector.class);
// Maintains a map with key as field name and value is the vector itself
private final MapWithOrdinal<String, ValueVector> vectors = new MapWithOrdinal<>();
protected AbstractMapVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) {
super(field.clone(), allocator, callBack);
// create the hierarchy of the child vectors based on the materialized field
for (MaterializedField child : field.getChildren()) {
if (child.getName().equals(BaseRepeatedValueVector.OFFSETS_FIELD.getName())) {
continue;
}
final String fieldName = child.getName();
final ValueVector v = BasicTypeHelper.getNewVector(child, allocator, callBack);
putVector(fieldName, v);
}
}
@Override
public void close() {
for (final ValueVector valueVector : vectors.values()) {
valueVector.close();
}
vectors.clear();
super.close();
}
@Override
public boolean allocateNewSafe() {
/* boolean to keep track if all the memory allocation were successful
* Used in the case of composite vectors when we need to allocate multiple
* buffers for multiple vectors. If one of the allocations failed we need to
* clear all the memory that we allocated
*/
boolean success = false;
try {
for (final ValueVector v : vectors.values()) {
if (! v.allocateNewSafe()) {
return false;
}
}
success = true;
} finally {
if (! success) {
clear();
}
}
return true;
}
/**
* Adds a new field with the given parameters or replaces the existing one and consequently returns the resultant
* {@link org.apache.drill.exec.vector.ValueVector}.
*
* Execution takes place in the following order:
* <ul>
* <li>
* if field is new, create and insert a new vector of desired type.
* </li>
* <li>
* if field exists and existing vector is of desired vector type, return the vector.
* </li>
* <li>
* if field exists and null filled, clear the existing vector; create and insert a new vector of desired type.
* </li>
* <li>
* otherwise, throw an {@link java.lang.IllegalStateException}
* </li>
* </ul>
*
* @param name name of the field
* @param type type of the field
* @param clazz class of expected vector type
* @param <T> class type of expected vector type
* @throws java.lang.IllegalStateException raised if there is a hard schema change
*
* @return resultant {@link org.apache.drill.exec.vector.ValueVector}
*/
@SuppressWarnings("unchecked")
@Override
public <T extends ValueVector> T addOrGet(String name, TypeProtos.MajorType type, Class<T> clazz) {
final ValueVector existing = getChild(name);
boolean create = false;
if (existing == null) {
create = true;
} else if (clazz.isAssignableFrom(existing.getClass())) {
return (T) existing;
} else if (nullFilled(existing)) {
existing.clear();
// Since it's removing old vector and adding new one based on new type, it should do same for Materialized field,
// Otherwise there will be duplicate of same field with same name but different type.
field.removeChild(existing.getField());
create = true;
}
if (create) {
final T vector = (T) BasicTypeHelper.getNewVector(name, allocator, type, callBack);
putChild(name, vector);
if (callBack != null) {
callBack.doWork();
}
return vector;
}
final String message = "Drill does not support schema change yet. Existing[%s] and desired[%s] vector types mismatch";
throw new IllegalStateException(String.format(message, existing.getClass().getSimpleName(), clazz.getSimpleName()));
}
private boolean nullFilled(ValueVector vector) {
for (int r = 0; r < vector.getAccessor().getValueCount(); r++) {
if (! vector.getAccessor().isNull(r)) {
return false;
}
}
return true;
}
/**
* Returns a {@link org.apache.drill.exec.vector.ValueVector} corresponding to the given ordinal identifier.
*/
public ValueVector getChildByOrdinal(int id) {
return vectors.getByOrdinal(id);
}
/**
* Returns a {@link org.apache.drill.exec.vector.ValueVector} instance of subtype of <T> corresponding to the given
* field name if exists or null.
*/
@Override
public <T extends ValueVector> T getChild(String name, Class<T> clazz) {
final ValueVector v = vectors.get(name.toLowerCase());
if (v == null) {
return null;
}
return typeify(v, clazz);
}
/**
* Inserts the vector with the given name if it does not exist else replaces it with the new value.
*
* Note that this method does not enforce any vector type check nor throws a schema change exception.
*/
public void putChild(String name, ValueVector vector) {
putVector(name, vector);
field.addChild(vector.getField());
}
/**
* Inserts the input vector into the map if it does not exist, replaces if it exists already
* @param name field name
* @param vector vector to be inserted
*/
protected void putVector(String name, ValueVector vector) {
final ValueVector old = vectors.put(
Preconditions.checkNotNull(name, "field name cannot be null").toLowerCase(),
Preconditions.checkNotNull(vector, "vector cannot be null")
);
if (old != null && old != vector) {
logger.debug("Field [{}] mutated from [{}] to [{}]", name, old.getClass().getSimpleName(),
vector.getClass().getSimpleName());
}
}
/**
* Returns a sequence of underlying child vectors.
*/
protected Collection<ValueVector> getChildren() {
return vectors.values();
}
/**
* Returns the number of underlying child vectors.
*/
@Override
public int size() {
return vectors.size();
}
@Override
public Iterator<ValueVector> iterator() {
return vectors.values().iterator();
}
/**
* Returns a list of scalar child vectors recursing the entire vector hierarchy.
*/
public List<ValueVector> getPrimitiveVectors() {
final List<ValueVector> primitiveVectors = Lists.newArrayList();
for (final ValueVector v : vectors.values()) {
if (v instanceof AbstractMapVector) {
AbstractMapVector mapVector = (AbstractMapVector) v;
primitiveVectors.addAll(mapVector.getPrimitiveVectors());
} else {
primitiveVectors.add(v);
}
}
return primitiveVectors;
}
/**
* Returns a vector with its corresponding ordinal mapping if field exists or null.
*/
@Override
public VectorWithOrdinal getChildVectorWithOrdinal(String name) {
final int ordinal = vectors.getOrdinal(name.toLowerCase());
if (ordinal < 0) {
return null;
}
final ValueVector vector = vectors.getByOrdinal(ordinal);
return new VectorWithOrdinal(vector, ordinal);
}
@Override
public DrillBuf[] getBuffers(boolean clear) {
final List<DrillBuf> buffers = Lists.newArrayList();
for (final ValueVector vector : vectors.values()) {
for (final DrillBuf buf : vector.getBuffers(false)) {
buffers.add(buf);
if (clear) {
buf.retain(1);
}
}
if (clear) {
vector.clear();
}
}
return buffers.toArray(new DrillBuf[buffers.size()]);
}
@Override
public int getBufferSize() {
int actualBufSize = 0;
for (final ValueVector v : vectors.values()) {
for (final DrillBuf buf : v.getBuffers(false)) {
actualBufSize += buf.writerIndex();
}
}
return actualBufSize;
}
@Override
public int getAllocatedSize() {
int size = 0;
for (final ValueVector v : vectors.values()) {
size += v.getAllocatedSize();
}
return size;
}
@Override
public void collectLedgers(Set<BufferLedger> ledgers) {
for (final ValueVector v : vectors.values()) {
v.collectLedgers(ledgers);
}
}
@Override
public int getPayloadByteCount(int valueCount) {
if (valueCount == 0) {
return 0;
}
int count = 0;
for (final ValueVector v : vectors.values()) {
count += v.getPayloadByteCount(valueCount);
}
return count;
}
@Override
public void exchange(ValueVector other) {
AbstractMapVector otherMap = (AbstractMapVector) other;
if (vectors.size() != otherMap.vectors.size()) {
throw new IllegalStateException("Maps have different column counts");
}
for (int i = 0; i < vectors.size(); i++) {
assert vectors.getByOrdinal(i).getField().isEquivalent(
otherMap.vectors.getByOrdinal(i).getField());
vectors.getByOrdinal(i).exchange(otherMap.vectors.getByOrdinal(i));
}
}
}