blob: 0a8620fa098a92c33eb93db9b1c2669eb6cd3ffe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.record;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.UnionVector;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
/**
* Utility class for dealing with changing schemas
*/
public class SchemaUtil {
/**
* Returns the merger of schemas. The merged schema will include the union all columns. If there is a type conflict
* between columns with the same schemapath but different types, the merged schema will contain a Union type.
* @param schemas
* @return
*/
public static BatchSchema mergeSchemas(BatchSchema... schemas) {
Map<SchemaPath,Set<MinorType>> typeSetMap = Maps.newLinkedHashMap();
for (BatchSchema s : schemas) {
for (MaterializedField field : s) {
SchemaPath path = SchemaPath.getSimplePath(field.getName());
Set<MinorType> currentTypes = typeSetMap.get(path);
if (currentTypes == null) {
currentTypes = Sets.newHashSet();
typeSetMap.put(path, currentTypes);
}
MinorType newType = field.getType().getMinorType();
if (newType == MinorType.MAP || newType == MinorType.LIST) {
throw new RuntimeException("Schema change not currently supported for schemas with complex types");
}
if (newType == MinorType.UNION) {
currentTypes.addAll(field.getType().getSubTypeList());
} else {
currentTypes.add(newType);
}
}
}
List<MaterializedField> fields = Lists.newArrayList();
for (SchemaPath path : typeSetMap.keySet()) {
Set<MinorType> types = typeSetMap.get(path);
if (types.size() > 1) {
MajorType.Builder builder = MajorType.newBuilder().setMinorType(MinorType.UNION).setMode(DataMode.OPTIONAL);
for (MinorType t : types) {
builder.addSubType(t);
}
MaterializedField field = MaterializedField.create(path.getLastSegment().getNameSegment().getPath(), builder.build());
fields.add(field);
} else {
MaterializedField field = MaterializedField.create(path.getLastSegment().getNameSegment().getPath(),
Types.optional(types.iterator().next()));
fields.add(field);
}
}
SchemaBuilder schemaBuilder = new SchemaBuilder();
BatchSchema s = schemaBuilder.addFields(fields).setSelectionVectorMode(schemas[0].getSelectionVectorMode()).build();
return s;
}
private static ValueVector coerceVector(ValueVector v, VectorContainer c, MaterializedField field,
int recordCount, BufferAllocator allocator) {
if (v != null) {
int valueCount = v.getAccessor().getValueCount();
TransferPair tp = v.getTransferPair(allocator);
tp.transfer();
if (v.getField().getType().getMinorType().equals(field.getType().getMinorType())) {
if (field.getType().getMinorType() == MinorType.UNION) {
UnionVector u = (UnionVector) tp.getTo();
for (MinorType t : field.getType().getSubTypeList()) {
u.addSubType(t);
}
}
return tp.getTo();
} else {
ValueVector newVector = TypeHelper.getNewVector(field, allocator);
Preconditions.checkState(field.getType().getMinorType() == MinorType.UNION, "Can only convert vector to Union vector");
UnionVector u = (UnionVector) newVector;
u.setFirstType(tp.getTo(), valueCount);
return u;
}
} else {
v = TypeHelper.getNewVector(field, allocator);
v.allocateNew();
v.getMutator().setValueCount(recordCount);
return v;
}
}
/**
* Creates a copy a record batch, converting any fields as necessary to coerce it into the provided schema
* @param in
* @param toSchema
* @param context
* @return
*/
public static VectorContainer coerceContainer(VectorAccessible in, BatchSchema toSchema, OperatorContext context) {
return coerceContainer(in, toSchema, context.getAllocator());
}
public static VectorContainer coerceContainer(VectorAccessible in, BatchSchema toSchema, BufferAllocator allocator) {
int recordCount = in.getRecordCount();
boolean isHyper = false;
Map<String, Object> vectorMap = Maps.newHashMap();
for (VectorWrapper<?> w : in) {
if (w.isHyper()) {
isHyper = true;
final ValueVector[] vvs = w.getValueVectors();
vectorMap.put(vvs[0].getField().getName(), vvs);
} else {
assert !isHyper;
final ValueVector v = w.getValueVector();
vectorMap.put(v.getField().getName(), v);
}
}
VectorContainer c = new VectorContainer(allocator);
for (MaterializedField field : toSchema) {
if (isHyper) {
final ValueVector[] vvs = (ValueVector[]) vectorMap.remove(field.getName());
final ValueVector[] vvsOut;
if (vvs == null) {
vvsOut = new ValueVector[1];
vvsOut[0] = coerceVector(null, c, field, recordCount, allocator);
} else {
vvsOut = new ValueVector[vvs.length];
for (int i = 0; i < vvs.length; ++i) {
vvsOut[i] = coerceVector(vvs[i], c, field, recordCount, allocator);
}
}
c.add(vvsOut);
} else {
final ValueVector v = (ValueVector) vectorMap.remove(field.getName());
c.add(coerceVector(v, c, field, recordCount, allocator));
}
}
c.buildSchema(in.getSchema().getSelectionVectorMode());
c.setRecordCount(recordCount);
Preconditions.checkState(vectorMap.size() == 0, "Leftover vector from incoming batch");
return c;
}
public static TupleMetadata fromBatchSchema(BatchSchema batchSchema) {
TupleSchema tuple = new TupleSchema();
for (MaterializedField field : batchSchema) {
tuple.add(MetadataUtils.fromView(field));
}
return tuple;
}
/**
* Returns list of {@link SchemaPath} for fields taken from specified schema.
*
* @param schema the source of fields to return
* @return list of {@link SchemaPath}
*/
public static List<SchemaPath> getSchemaPaths(TupleMetadata schema) {
return SchemaUtil.getColumnPaths(schema, null).stream()
.map(stringList -> SchemaPath.getCompoundPath(stringList.toArray(new String[0])))
.collect(Collectors.toList());
}
private static List<List<String>> getColumnPaths(TupleMetadata schema, List<String> parentNames) {
List<List<String>> result = new ArrayList<>();
for (ColumnMetadata columnMetadata : schema) {
if (columnMetadata.isMap()) {
List<String> currentNames = parentNames == null
? new ArrayList<>()
: new ArrayList<>(parentNames);
currentNames.add(columnMetadata.name());
result.addAll(getColumnPaths(columnMetadata.tupleSchema(), currentNames));
} else {
result.add(Collections.singletonList(columnMetadata.name()));
}
}
return result;
}
}