DRILL-7359: Add support for DICT type in RowSet Framework
closes #1870
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index c0638b5..f80bd80 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -813,7 +813,7 @@
default:
if (componentType.isStruct()) {
// for the case when nested type is struct, it should be placed into repeated map
- return MetadataUtils.newMapArray(name, childColumnMetadata.mapSchema());
+ return MetadataUtils.newMapArray(name, childColumnMetadata.tupleSchema());
} else {
// otherwise creates column metadata with repeated data mode
return new PrimitiveColumnMetadata(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
index 1bbff77..8975a0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
@@ -27,6 +27,7 @@
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.complex.DictVector;
/**
* Perform a schema projection for the case of an explicit list of
@@ -78,6 +79,18 @@
}
}
+ private void resolveDictValueColumn(ResolvedTuple outputTuple,
+ RequestedColumn inputCol, TupleMetadata readerSchema) {
+ int tableColIndex = readerSchema.index(DictVector.FIELD_VALUE_NAME);
+ if (tableColIndex == -1) {
+ resolveNullColumn(outputTuple, inputCol);
+ } else {
+ resolveTableColumn(outputTuple, inputCol,
+ readerSchema.metadata(tableColIndex),
+ tableColIndex);
+ }
+ }
+
private void resolveTableColumn(ResolvedTuple outputTuple,
RequestedColumn requestedCol,
ColumnMetadata column, int sourceIndex) {
@@ -88,7 +101,11 @@
// that x is a map.
if (requestedCol.isTuple()) {
- resolveMap(outputTuple, requestedCol, column, sourceIndex);
+ if (column.isDict()) {
+ resolveDict(outputTuple, requestedCol, column, sourceIndex);
+ } else {
+ resolveMap(outputTuple, requestedCol, column, sourceIndex);
+ }
}
// Is the requested column implied to be an array?
@@ -132,7 +149,7 @@
ResolvedMapColumn mapCol = new ResolvedMapColumn(outputTuple,
column.schema(), sourceIndex);
resolveTuple(mapCol.members(), requestedCol.mapProjection(),
- column.mapSchema());
+ column.tupleSchema());
// If the projection is simple, then just project the map column
// as is. A projection is simple if all map columns from the table
@@ -158,6 +175,35 @@
}
}
+ private void resolveDict(ResolvedTuple outputTuple,
+ RequestedColumn requestedCol, ColumnMetadata column,
+ int sourceIndex) {
+
+ // If the actual column isn't a dict, then the request is invalid.
+
+ if (!column.isDict()) {
+ throw UserException
+ .validationError()
+ .message("Project list implies a dict column, but actual column is not a dict")
+ .addContext("Projected column:", requestedCol.fullName())
+ .addContext("Table column:", column.name())
+ .addContext("Type:", column.type().name())
+ .addContext(scanProj.context())
+ .build(logger);
+ }
+
+ ResolvedDictColumn dictColumn = new ResolvedDictColumn(outputTuple, column.schema(), sourceIndex);
+ resolveDictTuple(dictColumn.members(), requestedCol.mapProjection(), column.tupleSchema());
+
+ // The same as for Map
+ if (dictColumn.members().isSimpleProjection()) {
+ outputTuple.removeChild(dictColumn.members());
+ projectTableColumn(outputTuple, requestedCol, column, sourceIndex);
+ } else {
+ outputTuple.add(dictColumn);
+ }
+ }
+
private void resolveTuple(ResolvedTuple mapTuple,
RequestedTuple requestedTuple, TupleMetadata mapSchema) {
for (RequestedColumn col : requestedTuple.projections()) {
@@ -165,6 +211,13 @@
}
}
+ private void resolveDictTuple(ResolvedTuple mapTuple,
+ RequestedTuple requestedTuple, TupleMetadata mapSchema) {
+ for (RequestedColumn col : requestedTuple.projections()) {
+ resolveDictValueColumn(mapTuple, col, mapSchema);
+ }
+ }
+
private void resolveArray(ResolvedTuple outputTuple,
RequestedColumn requestedCol, ColumnMetadata column,
int sourceIndex) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java
index 636e981..41c575b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java
@@ -95,7 +95,7 @@
if (outputSchema != null) {
ColumnMetadata colSchema = outputSchema.metadata(mapName);
if (colSchema != null) {
- builder.setOutputSchema(colSchema.mapSchema());
+ builder.setOutputSchema(colSchema.tupleSchema());
}
}
return builder.build();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedDictColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedDictColumn.java
new file mode 100644
index 0000000..cc75117
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedDictColumn.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.common.types.Types;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedDict;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedSingleDict;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedDictArray;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class ResolvedDictColumn extends ResolvedColumn {
+
+ private final MaterializedField schema;
+ private final ResolvedTuple parent;
+ private final ResolvedDict members;
+
+ public ResolvedDictColumn(ResolvedTuple parent, String name) {
+ super(parent, -1);
+ schema = MaterializedField.create(name,
+ Types.required(MinorType.DICT));
+ this.parent = parent;
+ members = new ResolvedSingleDict(this);
+ parent.addChild(members);
+ }
+
+ public ResolvedDictColumn(ResolvedTuple parent,
+ MaterializedField schema, int sourceIndex) {
+ super(parent, sourceIndex);
+ this.schema = schema;
+ this.parent = parent;
+
+ assert schema.getType().getMinorType() == MinorType.DICT;
+ if (schema.getType().getMode() == DataMode.REPEATED) {
+ members = new ResolvedDictArray(this);
+ } else {
+ members = new ResolvedSingleDict(this);
+ }
+ parent.addChild(members);
+ }
+
+ public ResolvedTuple parent() {
+ return parent;
+ }
+
+ @Override
+ public String name() {
+ return schema.getName();
+ }
+
+ public ResolvedTuple members() {
+ return members;
+ }
+
+ @Override
+ public void project(ResolvedTuple dest) {
+ dest.addVector(members.buildVector());
+ }
+
+ @Override
+ public MaterializedField schema() {
+ return schema;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
index 8628873..5b8294c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
@@ -28,7 +28,9 @@
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.DictVector;
import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
import org.apache.drill.exec.vector.complex.RepeatedMapVector;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
@@ -293,6 +295,136 @@
}
}
+ public static abstract class ResolvedDict extends ResolvedTuple {
+
+ protected final ResolvedDictColumn parentColumn;
+
+ public ResolvedDict(ResolvedDictColumn parentColumn) {
+ super(parentColumn.parent().nullBuilder == null
+ ? null : parentColumn.parent().nullBuilder.newChild(parentColumn.name()));
+ this.parentColumn = parentColumn;
+ }
+
+ public abstract ValueVector buildVector();
+
+ @Override
+ public String name() {
+ return parentColumn.name();
+ }
+ }
+
+ public static class ResolvedSingleDict extends ResolvedDict {
+
+ protected DictVector inputVector;
+ protected DictVector outputVector;
+
+ public ResolvedSingleDict(ResolvedDictColumn parentColumn) {
+ super(parentColumn);
+ }
+
+ @Override
+ public void addVector(ValueVector vector) {
+ outputVector.putChild(vector.getField().getName(), vector);
+ }
+
+ @Override
+ public ValueVector vector(int index) {
+ assert inputVector != null;
+ return inputVector.getChildByOrdinal(index);
+ }
+
+ @Override
+ public ValueVector buildVector() {
+ if (parentColumn.sourceIndex() != -1) {
+ ResolvedTuple parentTuple = parentColumn.parent();
+ inputVector = (DictVector) parentTuple.vector(parentColumn.sourceIndex());
+ }
+ MaterializedField colSchema = parentColumn.schema();
+ MaterializedField dictField = MaterializedField.create(colSchema.getName(), colSchema.getType());
+ outputVector = new DictVector(dictField, parentColumn.parent().allocator(), null);
+ buildColumns();
+ return outputVector;
+ }
+
+ @Override
+ public BufferAllocator allocator() {
+ return outputVector.getAllocator();
+ }
+
+ @Override
+ public String name() {
+ return parentColumn.name();
+ }
+
+ @Override
+ public void setRowCount(int rowCount) {
+ outputVector.getMutator().setValueCount(rowCount);
+ cascadeRowCount(rowCount);
+ }
+
+ @Override
+ public int innerCardinality(int outerCardinality) {
+ return outerCardinality;
+ }
+ }
+
+ public static class ResolvedDictArray extends ResolvedDict {
+
+ protected RepeatedDictVector inputVector;
+ protected RepeatedDictVector outputVector;
+
+ private int valueCount;
+
+ public ResolvedDictArray(ResolvedDictColumn parentColumn) {
+ super(parentColumn);
+ }
+
+ @Override
+ public void addVector(ValueVector vector) {
+ ((DictVector) outputVector.getDataVector()).putChild(vector.getField().getName(), vector);
+ }
+
+ @Override
+ public ValueVector vector(int index) {
+ assert inputVector != null;
+ return ((DictVector) inputVector.getDataVector()).getChildByOrdinal(index);
+ }
+
+ @Override
+ public RepeatedDictVector buildVector() {
+ if (parentColumn.sourceIndex() != -1) {
+ ResolvedTuple parentTuple = parentColumn.parent();
+ inputVector = (RepeatedDictVector) parentTuple.vector(parentColumn.sourceIndex());
+ }
+ MaterializedField colSchema = parentColumn.schema();
+ MaterializedField dictField = MaterializedField.create(colSchema.getName(), colSchema.getType());
+ outputVector = new RepeatedDictVector(dictField, parentColumn.parent().allocator(), null);
+ valueCount = inputVector.getOffsetVector().getAccessor().getValueCount();
+ buildColumns();
+ return outputVector;
+ }
+
+ @Override
+ public BufferAllocator allocator() {
+ return outputVector.getAllocator();
+ }
+
+ @Override
+ public String name() {
+ return parentColumn.name();
+ }
+
+ @Override
+ public void setRowCount(int rowCount) {
+ cascadeRowCount(valueCount);
+ }
+
+ @Override
+ public int innerCardinality(int outerCardinality) {
+ return valueCount;
+ }
+ }
+
protected final List<ResolvedColumn> members = new ArrayList<>();
protected final NullColumnBuilder nullBuilder;
protected List<ResolvedTuple> children;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractProjectionSet.java
index be3f3f6..ec12e0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractProjectionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/AbstractProjectionSet.java
@@ -73,7 +73,7 @@
}
protected TypeConverter childConverter(ColumnMetadata outputSchema) {
- TupleMetadata childSchema = outputSchema == null ? null : outputSchema.mapSchema();
+ TupleMetadata childSchema = outputSchema == null ? null : outputSchema.tupleSchema();
return typeConverter == null ? null :
typeConverter.childConverter(childSchema);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java
index 016cc63..7fdec52 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/EmptyProjectionSet.java
@@ -36,6 +36,11 @@
}
@Override
+ public ColumnReadProjection readDictProjection(ColumnMetadata col) {
+ return readProjection(col);
+ }
+
+ @Override
public void setErrorContext(CustomErrorContext errorContext) { }
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java
index cfa82dd..daf2f1a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ExplicitProjectionSet.java
@@ -21,11 +21,13 @@
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.resultSet.ProjectionSet;
import org.apache.drill.exec.physical.resultSet.project.ProjectionType;
+import org.apache.drill.exec.physical.resultSet.project.RequestedColumnImpl;
import org.apache.drill.exec.physical.resultSet.project.RequestedTuple;
import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.TupleProjectionType;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.accessor.convert.ColumnConversionFactory;
+import org.apache.drill.exec.vector.complex.DictVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,16 +53,20 @@
if (reqCol == null) {
return new UnprojectedReadColumn(col);
}
+
+ return getReadProjection(col, reqCol);
+ }
+
+ private ColumnReadProjection getReadProjection(ColumnMetadata col, RequestedColumn reqCol) {
ColumnMetadata outputSchema = outputSchema(col);
validateProjection(reqCol, outputSchema == null ? col : outputSchema);
- if (!col.isMap()) {
+ if (!col.isMap() && !col.isDict()) {
// Non-map column.
ColumnConversionFactory conv = conversion(col, outputSchema);
return new ProjectedReadColumn(col, reqCol, outputSchema, conv);
- }
- else {
+ } else {
// Maps are tuples. Create a tuple projection and wrap it in
// a column projection.
@@ -84,7 +90,11 @@
mapProjection = new ExplicitProjectionSet(reqCol.mapProjection(), childConverter);
}
- return new ProjectedMapColumn(col, reqCol, outputSchema, mapProjection);
+ if (col.isMap()) {
+ return new ProjectedMapColumn(col, reqCol, outputSchema, mapProjection);
+ } else {
+ return new ProjectedDictColumn(col, reqCol, outputSchema, mapProjection);
+ }
}
}
@@ -110,5 +120,21 @@
}
@Override
+ public ColumnReadProjection readDictProjection(ColumnMetadata col) {
+ // Unlike for a MAP, requestedProj contains a key value, rather than nested field's name:
+ // create DICT's members somewhat artificially
+
+ assert DictVector.fieldNames.contains(col.name());
+ if (col.name().equals(DictVector.FIELD_KEY_NAME)) {
+ // This field is considered not projected but its
+ // vector and writer will be instantiated later.
+ return new UnprojectedReadColumn(col);
+ }
+
+ RequestedColumn reqCol = new RequestedColumnImpl(requestedProj, col.name()); // this is the 'value' column
+ return getReadProjection(col, reqCol);
+ }
+
+ @Override
public boolean isEmpty() { return requestedProj.projections().isEmpty(); }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedDictColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedDictColumn.java
new file mode 100644
index 0000000..9f4eecf
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/ProjectedDictColumn.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project.projSet;
+
+import org.apache.drill.exec.physical.resultSet.ProjectionSet;
+import org.apache.drill.exec.physical.resultSet.project.ProjectionType;
+import org.apache.drill.exec.physical.resultSet.project.RequestedTuple;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+
+public class ProjectedDictColumn extends ProjectedReadColumn {
+
+ private final ProjectionSet tupleProjection;
+
+ public ProjectedDictColumn(ColumnMetadata readSchema,
+ RequestedTuple.RequestedColumn requestedCol, ColumnMetadata outputSchema,
+ ProjectionSet tupleProjection) {
+ super(readSchema, requestedCol, outputSchema, null);
+ this.tupleProjection = tupleProjection;
+ }
+
+ @Override
+ public ProjectionSet mapProjection() {
+ return tupleProjection;
+ }
+
+ @Override
+ public ProjectionType projectionType() {
+ return super.projectionType().isArray() ? ProjectionType.DICT_ARRAY : ProjectionType.ARRAY;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java
index 640a371..1da5a2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/WildcardProjectionSet.java
@@ -43,7 +43,7 @@
} else if (isSpecial(outputSchema)) {
return new UnprojectedReadColumn(col);
}
- if (col.isMap()) {
+ if (col.isMap() || col.isDict()) {
return new ProjectedMapColumn(col, null, outputSchema,
new WildcardProjectionSet(childConverter(outputSchema), isStrict));
@@ -53,6 +53,11 @@
}
}
+ @Override
+ public ColumnReadProjection readDictProjection(ColumnMetadata col) {
+ return readProjection(col);
+ }
+
// Wildcard means use whatever schema is provided by the reader,
// so the projection itself is non-empty even if the reader has no
// columns.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java
index 298c0e6..208defd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ProjectionSet.java
@@ -102,5 +102,6 @@
void setErrorContext(CustomErrorContext errorContext);
ColumnReadProjection readProjection(ColumnMetadata col);
+ ColumnReadProjection readDictProjection(ColumnMetadata col);
boolean isEmpty();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java
index bf1256d..54d3c0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java
@@ -177,6 +177,8 @@
return buildSingleList(parent, colSchema);
} else if (colSchema.isVariant()) {
return buildVariant(parent, colSchema);
+ } else if (colSchema.isDict()) {
+ return buildDict(parent, colSchema);
} else {
return buildPrimitive(parent, colSchema);
}
@@ -208,9 +210,9 @@
private void expandMap(ObjectWriter colWriter, ColumnMetadata colSchema) {
if (colSchema.isArray()) {
- buildTuple(colWriter.array().tuple(), colSchema.mapSchema());
+ buildTuple(colWriter.array().tuple(), colSchema.tupleSchema());
} else {
- buildTuple(colWriter.tuple(), colSchema.mapSchema());
+ buildTuple(colWriter.tuple(), colSchema.tupleSchema());
}
}
@@ -280,7 +282,7 @@
/**
* We've just built a writer for column. If the column is structured
- * (AKA "complex", meaning a map or list or array), then we need to
+ * (AKA "complex", meaning a map, list, array or dict), then we need to
* build writer for the components of the column. We do that recursively
* here.
*
@@ -300,8 +302,24 @@
assert false;
} else if (colSchema.isVariant()) {
expandVariant(colWriter, colSchema);
+ } else if (colSchema.isDict()) {
+ expandDict(colWriter, colSchema);
// } else {
// Nothing to expand for primitives
}
}
+
+ private ObjectWriter buildDict(ParentShim parent, ColumnMetadata colSchema) {
+ final ObjectWriter colWriter = parent.add(colSchema.cloneEmpty());
+ expandDict(colWriter, colSchema);
+ return colWriter;
+ }
+
+ private void expandDict(ObjectWriter colWriter, ColumnMetadata colSchema) {
+ if (colSchema.isArray()) {
+ buildTuple(colWriter.array().dict().tuple(), colSchema.tupleSchema());
+ } else {
+ buildTuple(colWriter.dict().tuple(), colSchema.tupleSchema());
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
index 57b07fa..3f8abce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
@@ -50,11 +50,14 @@
import org.apache.drill.exec.vector.accessor.writer.EmptyListShim;
import org.apache.drill.exec.vector.accessor.writer.ListWriterImpl;
import org.apache.drill.exec.vector.accessor.writer.MapWriter;
+import org.apache.drill.exec.vector.accessor.writer.ObjectDictWriter;
import org.apache.drill.exec.vector.accessor.writer.RepeatedListWriter;
import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl;
import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl.VariantObjectWriter;
+import org.apache.drill.exec.vector.complex.DictVector;
import org.apache.drill.exec.vector.complex.ListVector;
import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
import org.apache.drill.exec.vector.complex.RepeatedListVector;
import org.apache.drill.exec.vector.complex.RepeatedMapVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
@@ -99,8 +102,15 @@
*/
public ColumnState buildColumn(ContainerState parent, ColumnMetadata columnSchema) {
- ColumnReadProjection colProj = parent.projectionSet().readProjection(columnSchema);
+ ColumnReadProjection colProj;
+ if (parent instanceof TupleState.DictState) {
+ colProj = parent.projectionSet().readDictProjection(columnSchema);
+ } else {
+ colProj = parent.projectionSet().readProjection(columnSchema);
+ }
switch (colProj.providedSchema().structureType()) {
+ case DICT:
+ return buildDict(parent, colProj);
case TUPLE:
return buildMap(parent, colProj);
case VARIANT:
@@ -134,7 +144,7 @@
ColumnMetadata columnSchema = colProj.providedSchema();
ValueVector vector;
- if (!colProj.isProjected()) {
+ if (!colProj.isProjected() && !allowCreation(parent)) {
// Column is not projected. No materialized backing for the column.
@@ -180,6 +190,21 @@
}
/**
+ * Check if this is a special case when vector, writer and column state should be
+ * created for a primitive field though the field itself is not projected. This is
+ * needed in case when {@code DICT}'s {@code value} is accessed by key, because
+ * {@code DICT}'s {@code keys} field is not projected but is needed to be initialized
+ * to ensure the dict vector is constructed properly ({@code DICT} should have both
+ * {@code keys} and {@code values} vectors as they are paired).
+ *
+ * @param parent container containing the primitive
+ * @return {@code true} if the parent is {@code DICT} and its {@code value} is accessed by key
+ */
+ private boolean allowCreation(ContainerState parent) {
+ return parent instanceof TupleState.DictState && !parent.projectionSet().isEmpty();
+ }
+
+ /**
* Build a new map (single or repeated) column. Except for maps nested inside
* of unions, no map vector is created
* here, instead we create a tuple state to hold the columns, and defer the
@@ -198,7 +223,7 @@
// calls.
assert columnSchema.isMap();
- assert columnSchema.mapSchema().size() == 0;
+ assert columnSchema.tupleSchema().isEmpty();
// Create the vector, vector state and writer.
@@ -223,7 +248,7 @@
// have content that varies from batch to batch. Only the leaf
// vectors can be cached.
- assert columnSchema.mapSchema().isEmpty();
+ assert columnSchema.tupleSchema().isEmpty();
vector = new MapVector(columnSchema.schema(), parent.loader().allocator(), null);
vectorState = new MapVectorState(vector, new NullVectorState());
}
@@ -256,7 +281,7 @@
// have content that varies from batch to batch. Only the leaf
// vectors can be cached.
- assert columnSchema.mapSchema().isEmpty();
+ assert columnSchema.tupleSchema().isEmpty();
mapVector = new RepeatedMapVector(mapColSchema.schema(),
parent.loader().allocator(), null);
offsetVector = mapVector.getOffsetVector();
@@ -534,4 +559,138 @@
return new RepeatedListColumnState(parent.loader(),
arrayWriter, vectorState, listState);
}
+
+ private ColumnState buildDict(ContainerState parent, ColumnReadProjection colProj) {
+ ColumnMetadata columnSchema = colProj.providedSchema();
+
+ // When dynamically adding columns, must add the (empty)
+ // dict by itself, then add columns to the dict via separate
+ // calls (the same way as is done for MAP).
+
+ assert columnSchema.isDict();
+ assert columnSchema.tupleSchema().isEmpty();
+
+ // Create the vector, vector state and writer.
+
+ if (columnSchema.isArray()) {
+ return buildDictArray(parent, colProj);
+ } else {
+ return buildSingleDict(parent, colProj);
+ }
+ }
+
+ private ColumnState buildDictArray(ContainerState parent, ColumnReadProjection colProj) {
+ ColumnMetadata columnSchema = colProj.providedSchema();
+
+ // Create the dict's offset vector.
+
+ RepeatedDictVector repeatedDictVector;
+ UInt4Vector offsetVector;
+ if (!colProj.isProjected()) {
+ repeatedDictVector = null;
+ offsetVector = null;
+ } else {
+
+ // Creating the dict vector will create its contained vectors if we
+ // give it a materialized field with children. So, instead pass a clone
+ // without children so we can add them.
+
+ final ColumnMetadata dictColMetadata = columnSchema.cloneEmpty();
+
+ // Don't get the dict vector from the vector cache. Dict vectors may
+ // have content that varies from batch to batch. Only the leaf
+ // vectors can be cached.
+
+ assert columnSchema.tupleSchema().isEmpty();
+ repeatedDictVector = new RepeatedDictVector(dictColMetadata.schema(),
+ parent.loader().allocator(), null);
+ offsetVector = repeatedDictVector.getOffsetVector();
+ }
+
+ // Create the writer using the offset vector
+
+ final AbstractObjectWriter writer = ObjectDictWriter.buildDictArray(
+ columnSchema, repeatedDictVector, new ArrayList<>());
+
+ // Wrap the offset vector in a vector state
+
+ VectorState offsetVectorState;
+ VectorState dictOffsetVectorState;
+ if (!colProj.isProjected()) {
+ offsetVectorState = new NullVectorState();
+ dictOffsetVectorState = new NullVectorState();
+ } else {
+ AbstractArrayWriter arrayWriter = (AbstractArrayWriter) writer.array();
+ offsetVectorState = new OffsetVectorState(
+ arrayWriter.offsetWriter(),
+ offsetVector,
+ writer.array().entry().events());
+ dictOffsetVectorState = new OffsetVectorState(
+ ((AbstractArrayWriter) arrayWriter.array()).offsetWriter(),
+ ((DictVector) repeatedDictVector.getDataVector()).getOffsetVector(),
+ writer.array().entry().dict().entry().events());
+ }
+ final VectorState mapVectorState =
+ new TupleState.DictArrayVectorState(repeatedDictVector, offsetVectorState, dictOffsetVectorState);
+
+ // Assemble it all into the column state.
+
+ final TupleState.DictArrayState dictArrayState = new TupleState.DictArrayState(parent.loader(),
+ parent.vectorCache().childCache(columnSchema.name()),
+ colProj.mapProjection());
+ return new TupleState.DictColumnState(
+ dictArrayState, writer, mapVectorState, parent.isVersioned());
+ }
+
+ private ColumnState buildSingleDict(ContainerState parent, ColumnReadProjection colProj) {
+ ColumnMetadata columnSchema = colProj.providedSchema();
+
+ // Create the dict's offset vector.
+
+ DictVector dictVector;
+ UInt4Vector offsetVector;
+ if (!colProj.isProjected()) {
+ dictVector = null;
+ offsetVector = null;
+ } else {
+
+ // Creating the dict vector will create its contained vectors if we
+ // give it a materialized field with children. So, instead pass a clone
+ // without children so we can add them.
+
+ final ColumnMetadata dictColMetadata = columnSchema.cloneEmpty();
+
+ // Don't get the dict vector from the vector cache. Dict vectors may
+ // have content that varies from batch to batch. Only the leaf
+ // vectors can be cached.
+
+ assert columnSchema.tupleSchema().isEmpty();
+ dictVector = new DictVector(dictColMetadata.schema(), parent.loader().allocator(), null);
+ offsetVector = dictVector.getOffsetVector();
+ }
+
+ // Create the writer using the offset vector
+
+ final AbstractObjectWriter writer = ObjectDictWriter.buildDict(columnSchema, dictVector, new ArrayList<>());
+
+ // Wrap the offset vector in a vector state
+
+ VectorState offsetVectorState;
+ if (!colProj.isProjected()) {
+ offsetVectorState = new NullVectorState();
+ } else {
+ offsetVectorState = new OffsetVectorState(
+ (((AbstractArrayWriter) writer.dict()).offsetWriter()),
+ offsetVector,
+ writer.dict().entry().events());
+ }
+ final VectorState mapVectorState = new TupleState.SingleDictVectorState(dictVector, offsetVectorState);
+
+ // Assemble it all into the column state.
+
+ final TupleState.SingleDictState dictArrayState = new TupleState.SingleDictState(parent.loader(), parent.vectorCache().childCache(columnSchema.name()),
+ colProj.mapProjection());
+ return new TupleState.DictColumnState(
+ dictArrayState, writer, mapVectorState, parent.isVersioned());
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
index 9d8f798..b2b4d4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
@@ -38,6 +38,8 @@
import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.DictVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
/**
* Represents the loader state for a tuple: a row or a map. This is "state" in
@@ -142,7 +144,7 @@
} else {
outputSchema = schema();
}
- mapState.bindOutputSchema(outputSchema.mapSchema());
+ mapState.bindOutputSchema(outputSchema.tupleSchema());
}
public MapState mapState() { return mapState; }
@@ -230,7 +232,9 @@
@Override
public void dump(HierarchicalFormatter format) {
- // TODO
+ format.startObject(this)
+ .attribute("field", mapVector != null ? mapVector.getField() : "null")
+ .endObject();
}
}
@@ -539,7 +543,7 @@
prevHarvestIndex = i;
}
- // If the column is a map, then we have to recurse into the map
+ // If the column is a map or a dict, then we have to recurse into it
// itself. If the map is inside a union, then the map's vectors
// already appear in the map vector, but we still must update the
// output schema.
@@ -547,6 +551,9 @@
if (colState.schema().isMap()) {
final MapState childMap = ((MapColumnState) colState).mapState();
childMap.updateOutput(curSchemaVersion);
+ } else if (colState.schema().isDict()) {
+ final DictState child = ((DictColumnState) colState).dictState();
+ child.updateOutput(curSchemaVersion);
}
}
}
@@ -567,4 +574,224 @@
.endArray()
.endObject();
}
+
+ public static class DictColumnState extends BaseContainerColumnState {
+ protected final DictState dictState;
+ protected boolean isVersioned;
+ protected final ColumnMetadata outputSchema;
+
+ public DictColumnState(DictState dictState,
+ AbstractObjectWriter writer,
+ VectorState vectorState,
+ boolean isVersioned) {
+ super(dictState.loader(), writer, vectorState);
+ this.dictState = dictState;
+ dictState.bindColumnState(this);
+ this.isVersioned = isVersioned;
+ if (isVersioned) {
+ outputSchema = schema().cloneEmpty();
+ } else {
+ outputSchema = schema();
+ }
+ dictState.bindOutputSchema(outputSchema.tupleSchema());
+ }
+
+ @Override
+ public void buildOutput(TupleState tupleState) {
+ outputIndex = tupleState.addOutputColumn(vector(), outputSchema());
+ }
+
+ public DictState dictState() {
+ return dictState;
+ }
+
+ @Override
+ public ContainerState container() {
+ return dictState;
+ }
+
+ @Override
+ public boolean isProjected() {
+ return dictState.hasProjections();
+ }
+
+ public boolean isVersioned() {
+ return isVersioned;
+ }
+
+ @Override
+ public ColumnMetadata outputSchema() { return outputSchema; }
+ }
+
+ public static abstract class DictState extends MapState {
+
+ public DictState(LoaderInternals events,
+ ResultVectorCache vectorCache,
+ ProjectionSet projectionSet) {
+ super(events, vectorCache, projectionSet);
+ }
+
+ public void bindColumnState(ColumnState colState) {
+ super.bindColumnState(colState);
+ writer().bindListener(this);
+ }
+
+ @Override
+ protected boolean isVersioned() {
+ return ((DictColumnState) parentColumn).isVersioned();
+ }
+
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ format.startObject(this)
+ .attribute("column", parentColumn.schema().name())
+ .attribute("cardinality", innerCardinality())
+ .endObject();
+ }
+ }
+
+ public static class SingleDictState extends DictState {
+
+ public SingleDictState(LoaderInternals events,
+ ResultVectorCache vectorCache,
+ ProjectionSet projectionSet) {
+ super(events, vectorCache, projectionSet);
+ }
+
+ @Override
+ public AbstractTupleWriter writer() {
+ return (AbstractTupleWriter) parentColumn.writer().dict().tuple();
+ }
+ }
+
+ public static class DictArrayState extends DictState {
+
+ public DictArrayState(LoaderInternals events,
+ ResultVectorCache vectorCache,
+ ProjectionSet projectionSet) {
+ super(events, vectorCache, projectionSet);
+ }
+
+ @Override
+ public int addOutputColumn(ValueVector vector, ColumnMetadata colSchema) {
+ final RepeatedDictVector repeatedDictVector = parentColumn.vector();
+ DictVector dictVector = (DictVector) repeatedDictVector.getDataVector();
+ if (isVersioned()) {
+ dictVector.putChild(colSchema.name(), vector);
+ }
+ final int index = outputSchema.addColumn(colSchema);
+ assert dictVector.size() == outputSchema.size();
+ assert dictVector.getField().getChildren().size() == outputSchema.size();
+ return index;
+ }
+
+ @Override
+ public AbstractTupleWriter writer() {
+ return (AbstractTupleWriter) parentColumn.writer().array().dict().tuple();
+ }
+ }
+
+ public static abstract class DictVectorState<T extends ValueVector> implements VectorState {
+
+ protected final T vector;
+ protected final VectorState offsets;
+
+ public DictVectorState(T vector, VectorState offsets) {
+ this.vector = vector;
+ this.offsets = offsets;
+ }
+
+ @Override
+ public T vector() {
+ return vector;
+ }
+
+ @Override
+ public int allocate(int cardinality) {
+ return offsets.allocate(cardinality);
+ }
+
+ @Override
+ public void rollover(int cardinality) {
+ offsets.rollover(cardinality);
+ }
+
+ @Override
+ public void harvestWithLookAhead() {
+ offsets.harvestWithLookAhead();
+ }
+
+ @Override
+ public void startBatchWithLookAhead() {
+ offsets.harvestWithLookAhead();
+ }
+
+ @Override
+ public void close() {
+ offsets.close();
+ }
+
+ public VectorState offsetVectorState() {
+ return offsets;
+ }
+
+ @Override
+ public boolean isProjected() {
+ return offsets.isProjected();
+ }
+
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ format.startObject(this)
+ .attribute("field", vector != null ? vector.getField() : "null")
+ .endObject();
+ }
+ }
+
+ public static class SingleDictVectorState extends DictVectorState<DictVector> {
+
+ public SingleDictVectorState(DictVector vector, VectorState offsets) {
+ super(vector, offsets);
+ }
+ }
+
+ public static class DictArrayVectorState extends DictVectorState<RepeatedDictVector> {
+
+ // offsets for the data vector
+ private final VectorState dictOffsets;
+
+ public DictArrayVectorState(RepeatedDictVector vector, VectorState offsets, VectorState dictOffsets) {
+ super(vector, offsets);
+ this.dictOffsets = dictOffsets;
+ }
+
+ @Override
+ public int allocate(int cardinality) {
+ return offsets.allocate(cardinality);
+ }
+
+ @Override
+ public void rollover(int cardinality) {
+ super.rollover(cardinality);
+ dictOffsets.rollover(cardinality);
+ }
+
+ @Override
+ public void harvestWithLookAhead() {
+ super.harvestWithLookAhead();
+ dictOffsets.harvestWithLookAhead();
+ }
+
+ @Override
+ public void startBatchWithLookAhead() {
+ super.startBatchWithLookAhead();
+ dictOffsets.harvestWithLookAhead();
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ dictOffsets.close();
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/MetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/MetadataProvider.java
index 49c50be..915b800 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/MetadataProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/MetadataProvider.java
@@ -85,8 +85,9 @@
return new ArraySchemaCreator((RepeatedListColumnMetadata) colMetadata);
case VARIANT:
return new VariantSchemaCreator((VariantSchema) colMetadata.variantSchema());
+ case DICT:
case TUPLE:
- return new MetadataCreator((TupleSchema) colMetadata.mapSchema());
+ return new MetadataCreator((TupleSchema) colMetadata.tupleSchema());
default:
throw new UnsupportedOperationException();
}
@@ -187,8 +188,9 @@
return new ArraySchemaRetrieval(colMetadata);
case VARIANT:
return new VariantSchemaRetrieval((VariantSchema) colMetadata.variantSchema());
+ case DICT:
case TUPLE:
- return new MetadataRetrieval(colMetadata.mapSchema());
+ return new MetadataRetrieval(colMetadata.tupleSchema());
default:
throw new UnsupportedOperationException();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/hyper/HyperReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/hyper/HyperReaderBuilder.java
index 533d6c1..b698aac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/hyper/HyperReaderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/hyper/HyperReaderBuilder.java
@@ -37,11 +37,13 @@
import org.apache.drill.exec.vector.accessor.ColumnReaderIndex;
import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader;
import org.apache.drill.exec.vector.accessor.reader.ArrayReaderImpl;
+import org.apache.drill.exec.vector.accessor.reader.DictReaderImpl;
import org.apache.drill.exec.vector.accessor.reader.MapReader;
import org.apache.drill.exec.vector.accessor.reader.UnionReaderImpl;
import org.apache.drill.exec.vector.accessor.reader.VectorAccessor;
import org.apache.drill.exec.vector.accessor.reader.VectorAccessors;
import org.apache.drill.exec.vector.accessor.reader.VectorAccessors.BaseHyperVectorAccessor;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
/**
* Base reader builder for a hyper-batch. The semantics of hyper-batches are
@@ -151,6 +153,8 @@
protected AbstractObjectReader buildVectorReader(VectorAccessor va, ColumnMetadata metadata) {
switch(metadata.type()) {
+ case DICT:
+ return buildDict(va, metadata);
case MAP:
return buildMap(va, metadata.mode(), metadata);
case UNION:
@@ -162,6 +166,28 @@
}
}
+ private AbstractObjectReader buildDict(VectorAccessor va, ColumnMetadata metadata) {
+ boolean isArray = metadata.isArray();
+
+ ValueVector vector = va.vector();
+ VectorAccessor dictAccessor;
+ if (isArray) {
+ ValueVector dictVector = ((RepeatedValueVector) vector).getDataVector();
+ dictAccessor = new VectorAccessors.SingleVectorAccessor(dictVector);
+ } else {
+ dictAccessor = va;
+ }
+
+ List<AbstractObjectReader> readers = buildMapMembers(dictAccessor, metadata.tupleSchema());
+ AbstractObjectReader reader = DictReaderImpl.build(metadata, dictAccessor, readers);
+
+ if (!isArray) {
+ return reader;
+ }
+
+ return ArrayReaderImpl.buildTuple(metadata, va, reader);
+ }
+
private AbstractObjectReader buildMap(VectorAccessor va, DataMode mode, ColumnMetadata metadata) {
boolean isArray = mode == DataMode.REPEATED;
@@ -171,7 +197,7 @@
AbstractObjectReader mapReader = MapReader.build(
metadata,
isArray ? null : va,
- buildMapMembers(va, metadata.mapSchema()));
+ buildMapMembers(va, metadata.tupleSchema()));
// Single map
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseWriterBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseWriterBuilder.java
index b503ecb..c8a0f97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseWriterBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseWriterBuilder.java
@@ -32,12 +32,15 @@
import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
import org.apache.drill.exec.vector.accessor.writer.ListWriterImpl;
import org.apache.drill.exec.vector.accessor.writer.MapWriter;
+import org.apache.drill.exec.vector.accessor.writer.ObjectDictWriter;
import org.apache.drill.exec.vector.accessor.writer.RepeatedListWriter;
import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl;
import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl.VariantObjectWriter;
import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.DictVector;
import org.apache.drill.exec.vector.complex.ListVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
import org.apache.drill.exec.vector.complex.RepeatedListVector;
import org.apache.drill.exec.vector.complex.UnionVector;
@@ -75,19 +78,32 @@
private AbstractObjectWriter buildVectorWriter(ValueVector vector, VectorDescrip descrip) {
final MajorType type = vector.getField().getType();
switch (type.getMinorType()) {
- case MAP:
- return MapWriter.buildMapWriter(descrip.metadata,
- (AbstractMapVector) vector,
- buildMap((AbstractMapVector) vector, descrip));
+ case DICT:
+ return buildDict(vector, descrip);
+ case MAP:
+ return MapWriter.buildMapWriter(descrip.metadata,
+ (AbstractMapVector) vector,
+ buildMap((AbstractMapVector) vector, descrip));
- case UNION:
- return buildUnion((UnionVector) vector, descrip);
+ case UNION:
+ return buildUnion((UnionVector) vector, descrip);
- case LIST:
- return buildList(vector, descrip);
+ case LIST:
+ return buildList(vector, descrip);
- default:
- return ColumnWriterFactory.buildColumnWriter(descrip.metadata, conversionFactory, vector);
+ default:
+ return ColumnWriterFactory.buildColumnWriter(descrip.metadata, conversionFactory, vector);
+ }
+ }
+
+ private AbstractObjectWriter buildDict(ValueVector vector, VectorDescrip descrip) {
+ if (vector.getField().getType().getMode() == DataMode.REPEATED) {
+ ValueVector dataVector = ((RepeatedDictVector) vector).getDataVector();
+ List<AbstractObjectWriter> writers = buildMap((AbstractMapVector) dataVector, descrip);
+ return ObjectDictWriter.buildDictArray(descrip.metadata, (RepeatedDictVector) vector, writers);
+ } else {
+ List<AbstractObjectWriter> writers = buildMap((AbstractMapVector) vector, descrip);
+ return ObjectDictWriter.buildDict(descrip.metadata, (DictVector) vector, writers);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BuildVectorsFromMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BuildVectorsFromMetadata.java
index 6bae853..ca2802e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BuildVectorsFromMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BuildVectorsFromMetadata.java
@@ -28,7 +28,9 @@
import org.apache.drill.exec.record.metadata.VariantMetadata;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.DictVector;
import org.apache.drill.exec.vector.complex.ListVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
import org.apache.drill.exec.vector.complex.RepeatedListVector;
import org.apache.drill.exec.vector.complex.UnionVector;
@@ -68,6 +70,8 @@
private ValueVector buildVector(ColumnMetadata metadata) {
switch (metadata.structureType()) {
+ case DICT:
+ return buildDict(metadata);
case TUPLE:
return buildMap(metadata);
case VARIANT:
@@ -107,7 +111,7 @@
// without children so we can add the children as we add vectors.
final AbstractMapVector mapVector = (AbstractMapVector) TypeHelper.getNewVector(schema.emptySchema(), allocator, null);
- populateMap(mapVector, schema.mapSchema(), false);
+ populateMap(mapVector, schema.tupleSchema(), false);
return mapVector;
}
@@ -155,7 +159,7 @@
// for now.
populateMap((AbstractMapVector) childVector,
- variantSchema.member(MinorType.MAP).mapSchema(),
+ variantSchema.member(MinorType.MAP).tupleSchema(),
true);
break;
default:
@@ -181,4 +185,31 @@
populateUnion(vector.fullPromoteToUnion(), variantSchema);
}
}
+
+ private ValueVector buildDict(ColumnMetadata metadata) {
+ if (metadata.isArray()) {
+ return buildRepeatedDict(metadata);
+ } else {
+ return buildMap(metadata);
+ }
+ }
+
+ private RepeatedDictVector buildRepeatedDict(ColumnMetadata schema) {
+
+ // Creating the repeated dict vector will create its contained vectors if we
+ // give it a materialized field with children. So, instead pass a clone
+ // without children so we can add the children as we add vectors.
+
+ final RepeatedDictVector repeatedDictVector = (RepeatedDictVector) TypeHelper.getNewVector(schema.emptySchema(), allocator, null);
+ populateDict(repeatedDictVector, schema.tupleSchema());
+ return repeatedDictVector;
+ }
+
+ private void populateDict(RepeatedDictVector vector, TupleMetadata dictMetadata) {
+ for (int i = 0; i < dictMetadata.size(); i++) {
+ final ColumnMetadata childSchema = dictMetadata.metadata(i);
+ DictVector dataVector = (DictVector) vector.getDataVector();
+ dataVector.putChild(childSchema.name(), buildVector(childSchema));
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java
index 0d330d6..5f0680b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java
@@ -39,13 +39,16 @@
import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader;
import org.apache.drill.exec.vector.accessor.reader.AbstractScalarReader;
import org.apache.drill.exec.vector.accessor.reader.ArrayReaderImpl;
+import org.apache.drill.exec.vector.accessor.reader.DictReaderImpl;
import org.apache.drill.exec.vector.accessor.reader.MapReader;
import org.apache.drill.exec.vector.accessor.reader.UnionReaderImpl;
import org.apache.drill.exec.vector.accessor.reader.VectorAccessor;
import org.apache.drill.exec.vector.accessor.reader.VectorAccessors.SingleVectorAccessor;
import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.DictVector;
import org.apache.drill.exec.vector.complex.ListVector;
import org.apache.drill.exec.vector.complex.RepeatedListVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
import org.apache.drill.exec.vector.complex.UnionVector;
/**
@@ -119,22 +122,48 @@
final MajorType type = va.type();
switch(type.getMinorType()) {
- case MAP:
- return buildMap((AbstractMapVector) vector, va, type.getMode(), descrip);
- case UNION:
- return buildUnion((UnionVector) vector, va, descrip);
- case LIST:
- return buildList(vector, va, descrip);
- case LATE:
+ case DICT:
+ return buildDict(vector, va, descrip);
+ case MAP:
+ return buildMap((AbstractMapVector) vector, va, type.getMode(), descrip);
+ case UNION:
+ return buildUnion((UnionVector) vector, va, descrip);
+ case LIST:
+ return buildList(vector, va, descrip);
+ case LATE:
- // Occurs for a list with no type: a list of nulls.
+ // Occurs for a list with no type: a list of nulls.
- return AbstractScalarReader.nullReader(descrip.metadata);
- default:
- return buildScalarReader(va, descrip.metadata);
+ return AbstractScalarReader.nullReader(descrip.metadata);
+ default:
+ return buildScalarReader(va, descrip.metadata);
}
}
+ private AbstractObjectReader buildDict(ValueVector vector, VectorAccessor va, VectorDescrip descrip) {
+
+ boolean isArray = descrip.metadata.isArray();
+
+ DictVector dictVector;
+ VectorAccessor dictAccessor;
+ if (isArray) {
+ dictVector = (DictVector) ((RepeatedValueVector) vector).getDataVector();
+ dictAccessor = new SingleVectorAccessor(dictVector);
+ } else {
+ dictVector = (DictVector) vector;
+ dictAccessor = va;
+ }
+
+ List<AbstractObjectReader> readers = buildMapMembers(dictVector, descrip.childProvider());
+ AbstractObjectReader reader = DictReaderImpl.build(descrip.metadata, dictAccessor, readers);
+
+ if (!isArray) {
+ return reader;
+ }
+
+ return ArrayReaderImpl.buildTuple(descrip.metadata, va, reader);
+ }
+
private AbstractObjectReader buildMap(AbstractMapVector vector, VectorAccessor va, DataMode mode, VectorDescrip descrip) {
final boolean isArray = mode == DataMode.REPEATED;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SingleSchemaInference.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SingleSchemaInference.java
index 0ce4f49..8de2d68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SingleSchemaInference.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SingleSchemaInference.java
@@ -31,7 +31,9 @@
import org.apache.drill.exec.record.metadata.VariantSchema;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.DictVector;
import org.apache.drill.exec.vector.complex.ListVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
import org.apache.drill.exec.vector.complex.RepeatedListVector;
import org.apache.drill.exec.vector.complex.UnionVector;
@@ -67,6 +69,8 @@
private ColumnMetadata inferVector(ValueVector vector) {
final MaterializedField field = vector.getField();
switch (field.getType().getMinorType()) {
+ case DICT:
+ return MetadataUtils.newDict(field, inferDictSchema(vector));
case MAP:
return MetadataUtils.newMap(field, inferMapSchema((AbstractMapVector) vector));
case LIST:
@@ -83,6 +87,20 @@
}
}
+ private TupleSchema inferDictSchema(ValueVector vector) {
+ final List<ColumnMetadata> columns = new ArrayList<>();
+ DictVector dictVector;
+ if (vector.getField().getType().getMode() == DataMode.REPEATED) {
+ dictVector = (DictVector) ((RepeatedDictVector) vector).getDataVector();
+ } else {
+ dictVector = (DictVector) vector;
+ }
+ for (int i = 0; i < dictVector.size(); i++) {
+ columns.add(inferVector(dictVector.getChildByOrdinal(i)));
+ }
+ return MetadataUtils.fromColumns(columns);
+ }
+
private TupleSchema inferMapSchema(AbstractMapVector vector) {
final List<ColumnMetadata> columns = new ArrayList<>();
for (int i = 0; i < vector.size(); i++) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/VectorAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/VectorAllocator.java
index 17b2fd5..ae403fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/VectorAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/VectorAllocator.java
@@ -29,6 +29,7 @@
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
import org.apache.drill.exec.vector.complex.RepeatedMapVector;
/**
@@ -78,6 +79,12 @@
} else {
allocateMap((AbstractMapVector) vector, metadata, valueCount, mdProvider);
}
+ } else if (type.getMinorType() == MinorType.DICT) {
+ if (type.getMode() == DataMode.REPEATED) {
+ allocateDictArray((RepeatedDictVector) vector, metadata, valueCount, mdProvider);
+ } else {
+ allocateMap((AbstractMapVector) vector, metadata, valueCount, mdProvider);
+ }
} else {
allocatePrimitive(vector, metadata, valueCount);
}
@@ -98,9 +105,16 @@
allocateMap(vector, metadata, expectedValueCount, mdProvider);
}
+ private void allocateDictArray(RepeatedDictVector vector, ColumnMetadata metadata,
+ int valueCount, MetadataProvider mdProvider) {
+ vector.getOffsetVector().allocateNew(valueCount);
+ final int expectedValueCount = valueCount * metadata.expectedElementCount();
+ allocateMap((AbstractMapVector) vector.getDataVector(), metadata, expectedValueCount, mdProvider);
+ }
+
private void allocateMap(AbstractMapVector vector, ColumnMetadata metadata, int valueCount, MetadataProvider mdProvider) {
final MetadataProvider mapProvider = mdProvider.childProvider(metadata);
- final TupleMetadata mapSchema = metadata.mapSchema();
+ final TupleMetadata mapSchema = metadata.tupleSchema();
assert mapSchema != null;
int i = 0;
for (final ValueVector child : vector) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ProjectionType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ProjectionType.java
index 7db2887..363f51d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ProjectionType.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/ProjectionType.java
@@ -17,9 +17,9 @@
*/
package org.apache.drill.exec.physical.resultSet.project;
-import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
/**
* Specifies the type of projection obtained by parsing the
@@ -79,14 +79,22 @@
* Combination of array and map hints.
*/
- TUPLE_ARRAY; // x[0].y
+ TUPLE_ARRAY, // x[0].y
+
+ DICT, // x[0] or x['key'] (depends on key type)
+
+ DICT_ARRAY; // x[0][42] or x[0]['key'] (depends on key type)
public boolean isTuple() {
return this == ProjectionType.TUPLE || this == ProjectionType.TUPLE_ARRAY;
}
public boolean isArray() {
- return this == ProjectionType.ARRAY || this == ProjectionType.TUPLE_ARRAY;
+ return this == ProjectionType.ARRAY || this == ProjectionType.TUPLE_ARRAY || this == DICT_ARRAY;
+ }
+
+ public boolean isDict() {
+ return this == DICT || this == DICT_ARRAY;
}
/**
@@ -99,17 +107,12 @@
}
public static ProjectionType typeFor(MajorType majorType) {
+ boolean repeated = Types.isRepeated(majorType);
if (majorType.getMinorType() == MinorType.MAP) {
- if (majorType.getMode() == DataMode.REPEATED) {
- return TUPLE_ARRAY;
- } else {
- return TUPLE;
- }
- }
- if (majorType.getMode() == DataMode.REPEATED) {
- return ARRAY;
- }
- if (majorType.getMinorType() == MinorType.LIST) {
+ return repeated ? TUPLE_ARRAY : TUPLE;
+ } else if (majorType.getMinorType() == MinorType.DICT) {
+ return repeated ? DICT_ARRAY : DICT;
+ } else if (repeated || majorType.getMinorType() == MinorType.LIST) {
return ARRAY;
}
return SCALAR;
@@ -143,13 +146,17 @@
switch (this) {
case ARRAY:
- return readType == ARRAY || readType == TUPLE_ARRAY;
+ return readType == ARRAY || readType == TUPLE_ARRAY
+ || readType == DICT // the actual key type should be validated later
+ || readType == DICT_ARRAY;
case TUPLE_ARRAY:
- return readType == TUPLE_ARRAY;
+ return readType == TUPLE_ARRAY || readType == DICT_ARRAY;
case SCALAR:
return readType == SCALAR;
case TUPLE:
- return readType == TUPLE || readType == TUPLE_ARRAY;
+ return readType == TUPLE || readType == TUPLE_ARRAY || readType == DICT || readType == DICT_ARRAY;
+ case DICT:
+ return readType == DICT || readType == DICT_ARRAY;
case UNPROJECTED:
case GENERAL:
case WILDCARD:
@@ -169,6 +176,8 @@
return "tuple (a.x)";
case TUPLE_ARRAY:
return "tuple array (a[n].x)";
+ case DICT:
+ return "dict (a['key'])";
case WILDCARD:
return "wildcard (*)";
default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedColumnImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedColumnImpl.java
index e0bc18f..55a252d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedColumnImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedColumnImpl.java
@@ -73,6 +73,11 @@
@Override
public boolean isTuple() { return type.isTuple(); }
+ @Override
+ public boolean isDict() {
+ return type.isDict();
+ }
+
public RequestedTuple asTuple() {
if (members == null) {
members = new RequestedTupleImpl(this);
@@ -177,6 +182,10 @@
return "map column";
case TUPLE_ARRAY:
return "repeated map";
+ case DICT:
+ return "dict column";
+ case DICT_ARRAY:
+ return "repeated dict column";
case WILDCARD:
return "wildcard";
default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTuple.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTuple.java
index d0455d0..d9b3e1a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTuple.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTuple.java
@@ -75,6 +75,7 @@
boolean isSimple();
boolean isArray();
boolean isTuple();
+ boolean isDict();
String fullName();
RequestedTuple mapProjection();
boolean nameEquals(String target);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
index 7eceb56..0a8620f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
@@ -212,7 +212,7 @@
? new ArrayList<>()
: new ArrayList<>(parentNames);
currentNames.add(columnMetadata.name());
- result.addAll(getColumnPaths(columnMetadata.mapSchema(), currentNames));
+ result.addAll(getColumnPaths(columnMetadata.tupleSchema(), currentNames));
} else {
result.add(Collections.singletonList(columnMetadata.name()));
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
index 1d63738..a672d19 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
@@ -339,7 +339,7 @@
// if column is a map / struct, recursively scan nested columns
if (column.isMap()) {
List<Records.Column> mapRecords =
- columns(schemaPath, table, column.mapSchema(), columnName, currentIndex, true);
+ columns(schemaPath, table, column.tupleSchema(), columnName, currentIndex, true);
records.addAll(mapRecords);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java
index 0b6e0ee..0c02123 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java
@@ -136,6 +136,7 @@
}
ValueVector v;
+ MajorType finalType = null;
if (vector instanceof DictVector) {
v = ((DictVector) vector).getValues();
if (addToBreadCrumb) {
@@ -147,6 +148,7 @@
depth = 0;
builder.setDict(depth);
}
+ finalType = ((DictVector) vector).getLastPathType();
} else if (vector instanceof AbstractContainerVector) {
String fieldName = null;
if (seg.isNamed()) {
@@ -176,7 +178,7 @@
if(addToBreadCrumb) {
builder.intermediateType(v.getField().getType());
}
- builder.finalType(v.getField().getType());
+ builder.finalType(finalType != null ? finalType : v.getField().getType());
} else {
builder.finalType(v.getField().getType().toBuilder().setMode(DataMode.OPTIONAL).build());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java
index 3255df6..a3f428d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java
@@ -132,7 +132,7 @@
PathSegment child = column.getChild();
if (child != null && child.isNamed()) {
String name = column.getNameSegment().getPath();
- ColumnMetadata childMetadata = columnMetadata.mapSchema().metadata(name);
+ ColumnMetadata childMetadata = columnMetadata.tupleSchema().metadata(name);
writeColumnToMapWriter(writer.map(name), child, childMetadata, allTextMode);
} else {
writeSingleOrArrayColumn(columnMetadata, writer, allTextMode);
@@ -181,9 +181,13 @@
private static void writeColumn(ColumnMetadata columnMetadata,
BaseWriter.MapWriter fieldWriter, boolean allTextMode) {
switch (columnMetadata.structureType()) {
+ case DICT:
+ writeSchemaColumns(
+ columnMetadata.tupleSchema(), fieldWriter.dict(columnMetadata.name()), allTextMode);
+ break;
case TUPLE:
writeSchemaColumns(
- columnMetadata.mapSchema(), fieldWriter.map(columnMetadata.name()), allTextMode);
+ columnMetadata.tupleSchema(), fieldWriter.map(columnMetadata.name()), allTextMode);
break;
case MULTI_ARRAY:
writeArrayColumn(columnMetadata.childSchema(), fieldWriter.list(columnMetadata.name()), allTextMode);
@@ -213,8 +217,11 @@
private static void writeArrayColumn(ColumnMetadata columnMetadata,
BaseWriter.ListWriter fieldWriter, boolean allTextMode) {
switch (columnMetadata.structureType()) {
+ case DICT:
+ writeSchemaColumns(columnMetadata.tupleSchema(), fieldWriter.dict(), allTextMode);
+ break;
case TUPLE:
- writeSchemaColumns(columnMetadata.mapSchema(), fieldWriter.map(), allTextMode);
+ writeSchemaColumns(columnMetadata.tupleSchema(), fieldWriter.map(), allTextMode);
break;
case MULTI_ARRAY:
writeArrayColumn(columnMetadata.childSchema(), fieldWriter.list(), allTextMode);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TestProjectionSet.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TestProjectionSet.java
index ed7ba0e..a59065d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TestProjectionSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TestProjectionSet.java
@@ -91,7 +91,7 @@
assertFalse(mCol.isProjected());
ColumnReadProjection bCol = mCol.mapProjection().readProjection(
- readSchema.metadata("m").mapSchema().metadata("b"));
+ readSchema.metadata("m").tupleSchema().metadata("b"));
assertFalse(bCol.isProjected());
}
@@ -135,7 +135,7 @@
assertTrue(mCol.isProjected());
ColumnReadProjection bCol = mCol.mapProjection().readProjection(
- readSchema.metadata("m").mapSchema().metadata("b"));
+ readSchema.metadata("m").tupleSchema().metadata("b"));
assertTrue(bCol.isProjected());
}
@@ -209,7 +209,7 @@
.add("h", MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
- TupleMetadata mReadSchema = readSchema.metadata("m").mapSchema();
+ TupleMetadata mReadSchema = readSchema.metadata("m").tupleSchema();
mReadSchema.metadata("f").setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
TupleMetadata outputSchema = new SchemaBuilder()
@@ -219,7 +219,7 @@
.add("g", MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
- TupleMetadata mOutputSchema = outputSchema.metadata("m").mapSchema();
+ TupleMetadata mOutputSchema = outputSchema.metadata("m").tupleSchema();
mOutputSchema.metadata("g").setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
TypeConverter converter = TypeConverter.builder()
@@ -315,8 +315,8 @@
.add("e", MinorType.INT)
.resumeSchema()
.buildSchema();
- TupleMetadata mReadSchema = readSchema.metadata("m").mapSchema();
- TupleMetadata m2ReadSchema = readSchema.metadata("m2").mapSchema();
+ TupleMetadata mReadSchema = readSchema.metadata("m").tupleSchema();
+ TupleMetadata m2ReadSchema = readSchema.metadata("m2").tupleSchema();
TupleMetadata outputSchema = new SchemaBuilder()
.addMap("m")
@@ -324,7 +324,7 @@
.resumeSchema()
.buildSchema();
outputSchema.setBooleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP, true);
- TupleMetadata mOutputSchema = outputSchema.metadata("m").mapSchema();
+ TupleMetadata mOutputSchema = outputSchema.metadata("m").tupleSchema();
TypeConverter converter = TypeConverter.builder()
.providedSchema(outputSchema)
@@ -415,9 +415,9 @@
ColumnMetadata m1Schema = readSchema.metadata("m1");
ColumnMetadata m2Schema = readSchema.metadata("m2");
ColumnMetadata m3Schema = readSchema.metadata("m3");
- TupleMetadata m1ReadSchema = m1Schema.mapSchema();
- TupleMetadata m2ReadSchema = m2Schema.mapSchema();
- TupleMetadata m3ReadSchema = m3Schema.mapSchema();
+ TupleMetadata m1ReadSchema = m1Schema.tupleSchema();
+ TupleMetadata m2ReadSchema = m2Schema.tupleSchema();
+ TupleMetadata m3ReadSchema = m3Schema.tupleSchema();
// Project one member of map m1, all of m2, none of m3
@@ -476,7 +476,7 @@
.buildSchema();
ColumnMetadata m1Schema = readSchema.metadata("m1");
- TupleMetadata m1ReadSchema = m1Schema.mapSchema();
+ TupleMetadata m1ReadSchema = m1Schema.tupleSchema();
// Project one member of map1, all of map2, none of map3
@@ -515,7 +515,7 @@
.buildSchema();
ColumnMetadata mSchema = readSchema.metadata("m");
- TupleMetadata mReadSchema = mSchema.mapSchema();
+ TupleMetadata mReadSchema = mSchema.tupleSchema();
TupleMetadata outputSchema = new SchemaBuilder()
.addMap("m")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderDictArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderDictArray.java
new file mode 100644
index 0000000..dc62928
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderDictArray.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.impl;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.map;
+import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.validate.BatchValidator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.complex.DictVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test dict array support in the result set loader.
+ */
+@Category(RowSetTests.class)
+public class TestResultSetLoaderDictArray extends SubOperatorTest {
+
+ @Test
+ public void testBasics() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addDictArray("d", MinorType.INT)
+ .value(MinorType.VARCHAR)
+ .resumeSchema()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Verify structure and schema
+
+ TupleMetadata actualSchema = rootWriter.tupleSchema();
+ assertEquals(2, actualSchema.size());
+ assertTrue(actualSchema.metadata(1).isArray());
+ assertTrue(actualSchema.metadata(1).isDict());
+ assertEquals(2, actualSchema.metadata("d").tupleSchema().size());
+ assertEquals(2, actualSchema.column("d").getChildren().size());
+ DictWriter dictWriter = rootWriter.array("d").dict();
+ assertSame(actualSchema.metadata("d").tupleSchema(), dictWriter.schema().tupleSchema());
+
+ // Write a couple of rows with arrays.
+
+ rsLoader.startBatch();
+ rootWriter
+ .addRow(10, objArray(
+ map(110, "d1.1", 111, "d1.2", 112, "d1.3"),
+ map(120, "d2.2"))
+ )
+ .addRow(20, objArray())
+ .addRow(30, objArray(
+ map(310, "d3.1", 311, "d3.2", 313, "d3.4", 317, "d3.9"),
+ map(320, "d4.2"),
+ map(332, "d5.1", 339, "d5.5", 337, "d5.6"))
+ );
+
+ // Verify the batch
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+ RepeatedDictVector repeatedDictVector = (RepeatedDictVector) actual.container().getValueVector(1).getValueVector();
+ MaterializedField dictArrayField = repeatedDictVector.getField(); // RepeatedDictVector contains one child - DictVector
+ assertEquals(1, dictArrayField.getChildren().size());
+ DictVector dictVector = (DictVector) repeatedDictVector.getDataVector();
+ Iterator<MaterializedField> iter = dictVector.getField().getChildren().iterator();
+ assertTrue(dictWriter.keyWriter().schema().schema().isEquivalent(iter.next()));
+ assertTrue(dictWriter.valueWriter().scalar().schema().schema().isEquivalent(iter.next()));
+
+ SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(10, objArray(
+ map(110, "d1.1", 111, "d1.2", 112, "d1.3"),
+ map(120, "d2.2"))
+ )
+ .addRow(20, objArray())
+ .addRow(30, objArray(
+ map(310, "d3.1", 311, "d3.2", 313, "d3.4", 317, "d3.9"),
+ map(320, "d4.2"),
+ map(332, "d5.1", 339, "d5.5", 337, "d5.6"))
+ )
+ .build();
+ RowSetUtilities.verify(expected, actual);
+
+ rsLoader.close();
+ }
+
+ @Test
+ public void testArrayValue() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addDictArray("d", MinorType.INT)
+ .repeatedValue(MinorType.VARCHAR)
+ .resumeSchema()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Write a couple of rows
+
+ rsLoader.startBatch();
+ rootWriter
+ .addRow(10, objArray(
+ map(110, strArray("d1.1.1", "d1.1.2"), 111, strArray("d1.1.3", "d1.1.4"), 112, strArray("d1.1.5", "d1.1.6")),
+ map(120, strArray("d1.2.1", "d1.2.2"))))
+ .addRow(20, objArray())
+ .addRow(30, objArray(
+ map(310, strArray("d3.1.1", "d3.2.2"), 311, strArray("d3.1.3", "d3.2.4", "d3.1.5", "d3.1.6")),
+ map(320, strArray(), 321, strArray("d3.2.2")),
+ map(330, strArray("d3.3.1", "d1.2.2"))));
+
+ // Verify the batch
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+ SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(10, objArray(
+ map(110, strArray("d1.1.1", "d1.1.2"), 111, strArray("d1.1.3", "d1.1.4"), 112, strArray("d1.1.5", "d1.1.6")),
+ map(120, strArray("d1.2.1", "d1.2.2"))))
+ .addRow(20, objArray())
+ .addRow(30, objArray(
+ map(310, strArray("d3.1.1", "d3.2.2"), 311, strArray("d3.1.3", "d3.2.4", "d3.1.5", "d3.1.6")),
+ map(320, strArray(), 321, strArray("d3.2.2")),
+ map(330, strArray("d3.3.1", "d1.2.2"))))
+ .build();
+ RowSetUtilities.verify(expected, actual);
+
+ rsLoader.close();
+ }
+
+ @Test
+ public void testScalarValue() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addDictArray("d", MinorType.VARCHAR)
+ .value(MinorType.INT)
+ .resumeSchema()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Write a couple of rows
+
+ rsLoader.startBatch();
+ rootWriter
+ .addRow(10, objArray(
+ map("a", 1, "b", 2, "d", 4),
+ map("a", 2, "c", 3, "d", 1, "e", 4)
+ ))
+ .addRow(20, objArray())
+ .addRow(30, objArray(
+ map("a", 2, "c", 4, "d", 5, "e", 6, "f", 11),
+ map("a", 1, "d", 6, "c", 3),
+ map("b", 2, "a", 3))
+ );
+
+ // Verify the batch
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+ SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(10, objArray(
+ map("a", 1, "b", 2, "d", 4),
+ map("a", 2, "c", 3, "d", 1, "e", 4)
+ ))
+ .addRow(20, objArray())
+ .addRow(30, objArray(
+ map("a", 2, "c", 4, "d", 5, "e", 6, "f", 11),
+ map("a", 1, "d", 6, "c", 3),
+ map("b", 2, "a", 3))
+ )
+ .build();
+ RowSetUtilities.verify(expected, actual);
+
+ rsLoader.close();
+ }
+
+ @Test
+ public void testDictValue() {
+ TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addDictArray("d", MinorType.VARCHAR)
+ .dictValue()
+ .key(MinorType.VARCHAR)
+ .value(MinorType.VARCHAR)
+ .resumeDict()
+ .resumeSchema()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Write a couple of rows
+
+ rsLoader.startBatch();
+ rootWriter
+ .addRow(10, objArray(
+ map("a", map("a", "a1", "b", "a2", "c", "a3"), "b", map("d", "a4"), "c", map()),
+ map("b", map("b", "a2"))
+ ))
+ .addRow(20, objArray())
+ .addRow(30, objArray(
+ map("a", map("a", "b1", "b", "b1")),
+ map("b", map("e", "b2"), "a", map("h", "b1", "g", "b3"), "c", map("a", "b4")),
+ map("b", map("a", "b3", "c", "c3"), "a", map())));
+
+ // Verify the batch
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+ SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(10, objArray(
+ map("a", map("a", "a1", "b", "a2", "c", "a3"), "b", map("d", "a4"), "c", map()),
+ map("b", map("b", "a2"))
+ ))
+ .addRow(20, objArray())
+ .addRow(30, objArray(
+ map("a", map("a", "b1", "b", "b1")),
+ map("b", map("e", "b2"), "a", map("h", "b1", "g", "b3"), "c", map("a", "b4")),
+ map("b", map("a", "b3", "c", "c3"), "a", map())))
+ .build();
+ RowSetUtilities.verify(expected, actual);
+
+ rsLoader.close();
+ }
+
+ /**
+ * Test that memory is released if the loader is closed with an active
+ * batch (that is, before the batch is harvested.)
+ */
+ @Test
+ public void testCloseWithoutHarvest() {
+ TupleMetadata schema = new SchemaBuilder()
+ .addDictArray("d", MinorType.INT)
+ .value(MinorType.VARCHAR)
+ .resumeSchema()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ ArrayWriter arrayWriter = rootWriter.array("d");
+ DictWriter dictWriter = arrayWriter.dict();
+ rsLoader.startBatch();
+ for (int i = 0; i < 40; i++) {
+ rootWriter.start();
+ for (int j = 0; j < 3; j++) {
+ dictWriter.keyWriter().setInt(i);
+ dictWriter.valueWriter().scalar().setString("b-" + i);
+ arrayWriter.save();
+ }
+ rootWriter.save();
+ }
+
+ // Don't harvest the batch. Allocator will complain if the
+ // loader does not release memory.
+
+ rsLoader.close();
+ }
+
+ @Test
+ public void testKeyOverflow() {
+ TupleMetadata schema = new SchemaBuilder()
+ .addDictArray("d", MinorType.VARCHAR)
+ .value(MinorType.INT)
+ .resumeSchema()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ byte[] key = new byte[523];
+ Arrays.fill(key, (byte) 'X');
+
+ int arraySize = 3; // number of dicts in each row
+ int dictSize = 1; // number of entries in each dict
+
+ // Number of rows should be driven by vector size.
+ // Our row count should include the overflow row
+
+ ArrayWriter arrayDictWriter = rootWriter.array(0);
+ DictWriter dictWriter = arrayDictWriter.dict();
+ ScalarWriter keyWriter = dictWriter.keyWriter();
+ ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
+
+ int expectedCount = ValueVector.MAX_BUFFER_SIZE / (key.length * dictSize * arraySize);
+ System.out.println("expectedCoutn: " + expectedCount);
+ {
+ int count = 0;
+ while (! rootWriter.isFull()) {
+ rootWriter.start();
+ for (int i = 0; i < arraySize; i++) {
+ for (int j = 0; j < dictSize; j++) {
+ keyWriter.setBytes(key, key.length);
+ valueWriter.setInt(0); // acts as a placeholder, the actual value is not important
+ dictWriter.save(); // not necessary for scalars, just for completeness
+ }
+ arrayDictWriter.save();
+ }
+ rootWriter.save();
+ count++;
+ }
+
+ assertEquals(expectedCount + 1, count);
+ System.out.println("count: " + count);
+
+ // Loader's row count should include only "visible" rows
+
+ assertEquals(expectedCount, rootWriter.rowCount());
+
+ // Total count should include invisible and look-ahead rows.
+
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+ // Result should exclude the overflow row
+
+ VectorContainer container = rsLoader.harvest();
+ BatchValidator.validate(container);
+ RowSet result = fixture.wrap(container);
+ assertEquals(expectedCount, result.rowCount());
+ result.clear();
+ }
+
+ // Next batch should start with the overflow row
+
+ {
+ rsLoader.startBatch();
+ assertEquals(1, rootWriter.rowCount());
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+ VectorContainer container = rsLoader.harvest();
+ BatchValidator.validate(container);
+ RowSet result = fixture.wrap(container);
+ assertEquals(1, result.rowCount());
+ result.clear();
+ }
+
+ rsLoader.close();
+ }
+
+ @Test
+ public void testValueOverflow() {
+ TupleMetadata schema = new SchemaBuilder()
+ .addDictArray("d", MinorType.INT)
+ .value(MinorType.VARCHAR)
+ .resumeSchema()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ byte[] value = new byte[523];
+ Arrays.fill(value, (byte) 'X');
+
+ int arraySize = 2; // number of dicts in each row; array size is the same for every row to find expected row count easier
+ int dictSize = 4; // number of entries in each dict
+
+ // Number of rows should be driven by vector size.
+ // Our row count should include the overflow row
+
+ ArrayWriter arrayDictWriter = rootWriter.array(0);
+ DictWriter dictWriter = arrayDictWriter.dict();
+ ScalarWriter keyWriter = dictWriter.keyWriter();
+ ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
+
+ int expectedCount = ValueVector.MAX_BUFFER_SIZE / (value.length * dictSize * arraySize);
+ {
+ int count = 0;
+ while (! rootWriter.isFull()) {
+ rootWriter.start();
+ for (int i = 0; i < arraySize; i++) {
+ for (int j = 0; j < dictSize; j++) {
+ keyWriter.setInt(0); // acts as a placeholder, the actual value is not important
+ valueWriter.setBytes(value, value.length);
+ dictWriter.save(); // not necessary for scalars, just for completeness
+ }
+ arrayDictWriter.save();
+ }
+ rootWriter.save();
+ count++;
+ }
+
+ assertEquals(expectedCount + 1, count);
+
+ // Loader's row count should include only "visible" rows
+
+ assertEquals(expectedCount, rootWriter.rowCount());
+
+ // Total count should include invisible and look-ahead rows.
+
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+ // Result should exclude the overflow row
+
+ VectorContainer container = rsLoader.harvest();
+ BatchValidator.validate(container);
+ RowSet result = fixture.wrap(container);
+ assertEquals(expectedCount, result.rowCount());
+ result.clear();
+ }
+
+ // Next batch should start with the overflow row
+
+ {
+ rsLoader.startBatch();
+ assertEquals(1, rootWriter.rowCount());
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+ VectorContainer container = rsLoader.harvest();
+ BatchValidator.validate(container);
+ RowSet result = fixture.wrap(container);
+ assertEquals(1, result.rowCount());
+ result.clear();
+ }
+
+ rsLoader.close();
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderDicts.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderDicts.java
new file mode 100644
index 0000000..3d1af87
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderDicts.java
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.impl;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.map;
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.validate.BatchValidator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.DictColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.DictVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+
+/**
+ * Test (non-array) dict support in the result set loader and related classes.
+ */
+@Category(RowSetTests.class)
+public class TestResultSetLoaderDicts extends SubOperatorTest {
+
+ @Test
+ public void testBasics() {
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addDict("d", MinorType.INT)
+ .value(MinorType.VARCHAR)
+ .resumeSchema()
+ .buildSchema();
+ final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ assertFalse(rsLoader.isProjectionEmpty());
+ final RowSetLoader rootWriter = rsLoader.writer();
+
+ // Verify structure and schema
+
+ assertEquals(4, rsLoader.schemaVersion());
+ final TupleMetadata actualSchema = rootWriter.tupleSchema();
+ assertEquals(2, actualSchema.size());
+ assertTrue(actualSchema.metadata(1).isDict());
+ assertEquals(2, actualSchema.metadata("d").tupleSchema().size());
+ assertEquals(2, actualSchema.column("d").getChildren().size());
+
+ rsLoader.startBatch();
+
+ // Write a row the way that clients will do.
+
+ final ScalarWriter aWriter = rootWriter.scalar("a");
+ final DictWriter dictWriter = rootWriter.dict("d");
+ final ScalarWriter keyWriter = dictWriter.keyWriter();
+ final ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
+
+ rootWriter.start();
+ aWriter.setInt(10);
+
+ keyWriter.setInt(110);
+ valueWriter.setString("fred");
+ dictWriter.save();
+ keyWriter.setInt(111);
+ valueWriter.setString("george");
+ dictWriter.save();
+
+ rootWriter.save();
+
+ // Write another using the test-time conveniences
+
+ rootWriter.addRow(20, map(210, "barney", 211, "bart", 212, "jerry"));
+
+ // Harvest the batch
+
+ final RowSet actual = fixture.wrap(rsLoader.harvest());
+ assertEquals(4, rsLoader.schemaVersion());
+ assertEquals(2, actual.rowCount());
+ final DictVector dictVector = (DictVector) actual.container().getValueVector(1).getValueVector();
+ assertEquals(2, dictVector.getAccessor().getValueCount());
+
+ // Validate data
+
+ final SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(10, map(110, "fred", 111, "george"))
+ .addRow(20, map(210, "barney", 211, "bart", 212, "jerry"))
+ .build();
+
+ RowSetUtilities.verify(expected, actual);
+ rsLoader.close();
+ }
+
+ /**
+ * Test adding a dict to a loader after writing the first row.
+ */
+ @Test
+ public void testDictAddition() {
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .buildSchema();
+ final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ assertEquals(1, rsLoader.schemaVersion());
+ final RowSetLoader rootWriter = rsLoader.writer();
+
+ // Start without the dict. Add then add a dict after the first row.
+
+ rsLoader.startBatch();
+ rootWriter.addRow(10);
+
+ MaterializedField dictField = SchemaBuilder.columnSchema("d", MinorType.DICT, DataMode.REQUIRED);
+ dictField.addChild(SchemaBuilder.columnSchema(DictVector.FIELD_KEY_NAME, MinorType.VARCHAR, DataMode.REQUIRED));
+ dictField.addChild(SchemaBuilder.columnSchema(DictVector.FIELD_VALUE_NAME, MinorType.VARCHAR, DataMode.REQUIRED));
+ DictColumnMetadata dictMetadata = MetadataUtils.newDict(dictField);
+
+ final int dictIndex = rootWriter.addColumn(dictMetadata);
+ final DictWriter dictWriter = rootWriter.dict(dictIndex);
+
+ // Ensure metadata was added
+
+ final TupleMetadata actualSchema = rootWriter.tupleSchema();
+ assertTrue(actualSchema.metadata(1).isDict());
+ assertEquals(2, actualSchema.metadata("d").tupleSchema().size());
+ assertEquals(2, actualSchema.column("d").getChildren().size());
+ assertEquals(2, actualSchema.size());
+ assertEquals(2, dictWriter.schema().tupleSchema().size());
+
+ rootWriter.addRow(20, map("name", "fred", "lastname", "smith"))
+ .addRow(30, map("name", "barney", "lastname", "johnson"));
+
+ final RowSet actual = fixture.wrap(rsLoader.harvest());
+ assertEquals(4, rsLoader.schemaVersion());
+ assertEquals(3, actual.rowCount());
+
+ final DictVector dictVector = (DictVector) actual.container().getValueVector(1).getValueVector();
+ assertEquals(2, dictVector.getField().getChildren().size());
+
+ // Validate first batch
+
+ final TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addDict("d", MinorType.VARCHAR)
+ .value(MinorType.VARCHAR)
+ .resumeSchema()
+ .buildSchema();
+ final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(10, map())
+ .addRow(20, map("name", "fred", "lastname", "smith"))
+ .addRow(30, map("name", "barney", "lastname", "johnson"))
+ .build();
+
+ RowSetUtilities.verify(expected, actual);
+ rsLoader.close();
+ }
+
+ /**
+ * Create dict with map value. Then, add columns to the map
+ * on the fly. Use required, variable-width columns since
+ * those require the most processing and are most likely to
+ * fail if anything is out of place.
+ */
+ @Test
+ public void testMapValueRequiredFields() {
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addDict("d", MinorType.VARCHAR)
+ .mapValue()
+ .add("b", MinorType.VARCHAR)
+ .resumeDict()
+ .resumeSchema()
+ .buildSchema();
+ final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ assertEquals(5, rsLoader.schemaVersion());
+ final RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ rootWriter.addRow(10, map("a", mapValue("c1"), "b", mapValue("c2")));
+
+ // Validate first batch
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+ assertEquals(5, rsLoader.schemaVersion());
+ SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(10, map("a", mapValue("c1"), "b", mapValue("c2")))
+ .build();
+
+ RowSetUtilities.verify(expected, actual);
+
+ // Now add columns in the second batch.
+
+ rsLoader.startBatch();
+ rootWriter.addRow(20, map("a2", mapValue("c11"), "b2", mapValue("c12"), "c2", mapValue("c13")));
+
+ final DictWriter dictWriter = rootWriter.dict("d");
+ final TupleWriter nestedMapWriter = dictWriter.valueWriter().tuple();
+ nestedMapWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.VARCHAR, DataMode.REQUIRED));
+
+ rootWriter.addRow(30, map("a3", mapValue("c21", "d21")));
+
+ // And another set while the write proceeds.
+
+ nestedMapWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.REQUIRED));
+
+ rootWriter.addRow(40, map("a4", mapValue("c31", "d31", "e31"), "b4", mapValue("c32", "d32", "e32")));
+
+ // Validate second batch
+
+ actual = fixture.wrap(rsLoader.harvest());
+ assertEquals(7, rsLoader.schemaVersion());
+
+ final TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addDict("d", MinorType.VARCHAR)
+ .mapValue()
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .add("d", MinorType.VARCHAR)
+ .resumeDict()
+ .resumeSchema()
+ .buildSchema();
+ expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(20, map("a2", mapValue("c11", "", ""), "b2", mapValue("c12", "", ""), "c2", mapValue("c13", "", "")))
+ .addRow(30, map("a3", mapValue("c21", "d21", "")))
+ .addRow(40, map("a4", mapValue("c31", "d31", "e31"), "b4", mapValue("c32", "d32", "e32")))
+ .build();
+
+ RowSetUtilities.verify(expected, actual);
+ rsLoader.close();
+ }
+
+ /**
+ * Create dict with map value. Then, add columns to the map
+ * on the fly. Use required, variable-width columns since
+ * those require the most processing and are most likely to
+ * fail if anything is out of place.
+ */
+ @Test
+ public void testMapValueNullableFields() {
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addDict("d", MinorType.VARCHAR)
+ .mapValue()
+ .addNullable("b", MinorType.VARCHAR)
+ .resumeDict()
+ .resumeSchema()
+ .buildSchema();
+ final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ assertEquals(5, rsLoader.schemaVersion());
+ final RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ rootWriter.addRow(10, map("a", mapValue("c1"), "b", mapValue("c2")));
+
+ // Validate first batch
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+ assertEquals(5, rsLoader.schemaVersion());
+ SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(10, map("a", mapValue("c1"), "b", mapValue("c2")))
+ .build();
+
+ RowSetUtilities.verify(expected, actual);
+
+ // Now add columns in the second batch.
+
+ rsLoader.startBatch();
+ rootWriter.addRow(20, map("a2", mapValue("c11"), "b2", mapValue("c12"), "c2", mapValue("c13")));
+
+ final DictWriter dictWriter = rootWriter.dict("d");
+ final TupleWriter nestedMapWriter = dictWriter.valueWriter().tuple();
+ nestedMapWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.VARCHAR, DataMode.OPTIONAL));
+
+ rootWriter.addRow(30, map("a3", mapValue("c21", "d21")));
+
+ // And another set while the write proceeds.
+
+ nestedMapWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.OPTIONAL));
+
+ rootWriter.addRow(40, map("a4", mapValue("c31", "d31", "e31"), "b4", mapValue("c32", "d32", "e32")));
+
+ // Validate second batch
+
+ actual = fixture.wrap(rsLoader.harvest());
+ assertEquals(7, rsLoader.schemaVersion());
+
+ final TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addDict("d", MinorType.VARCHAR)
+ .mapValue()
+ .addNullable("b", MinorType.VARCHAR)
+ .addNullable("c", MinorType.VARCHAR)
+ .addNullable("d", MinorType.VARCHAR)
+ .resumeDict()
+ .resumeSchema()
+ .buildSchema();
+ expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(20, map("a2", mapValue("c11", null, null), "b2", mapValue("c12", null, null), "c2", mapValue("c13", null, null)))
+ .addRow(30, map("a3", mapValue("c21", "d21", null)))
+ .addRow(40, map("a4", mapValue("c31", "d31", "e31"), "b4", mapValue("c32", "d32", "e32")))
+ .build();
+
+ RowSetUtilities.verify(expected, actual);
+ rsLoader.close();
+ }
+
+ @Test
+ public void testArrayValue() {
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addDict("d", MinorType.INT)
+ .repeatedValue(MinorType.INT)
+ .resumeSchema()
+ .buildSchema();
+ final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ final RowSetLoader rootWriter = rsLoader.writer();
+
+ // Write some rows
+
+ rsLoader.startBatch();
+ rootWriter
+ .addRow(10, map(
+ 1, intArray(110, 120, 130),
+ 2, intArray(111, 121)
+ ))
+ .addRow(20, map())
+ .addRow(30, map(
+ 1, intArray(),
+ 2, intArray(310, 320),
+ 3, intArray(311, 321, 331, 341),
+ 4, intArray(312, 322, 332)
+ ));
+
+ // Validate first batch
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+ SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(10, map(
+ 1, intArray(110, 120, 130),
+ 2, intArray(111, 121)
+ ))
+ .addRow(20, map())
+ .addRow(30, map(
+ 1, intArray(),
+ 2, intArray(310, 320),
+ 3, intArray(311, 321, 331, 341),
+ 4, intArray(312, 322, 332)
+ ))
+ .build();
+
+ RowSetUtilities.verify(expected, actual);
+
+ // Add another rows in the second batch.
+
+ rsLoader.startBatch();
+ rootWriter
+ .addRow(40, map(1, intArray(410, 420)))
+ .addRow(50, map(1, intArray(510), 2, intArray(511, 531)));
+
+ // Validate first batch. The new array should have been back-filled with
+ // empty offsets for the missing rows.
+
+ actual = fixture.wrap(rsLoader.harvest());
+ expected = fixture.rowSetBuilder(actual.schema())
+ .addRow(40, map(1, intArray(410, 420)))
+ .addRow(50, map(1, intArray(510), 2, intArray(511, 531)))
+ .build();
+
+ RowSetUtilities.verify(expected, actual);
+ rsLoader.close();
+ }
+
+ @Test
+ public void testDictValue() {
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addDict("d", MinorType.INT)
+ .dictValue()
+ .key(MinorType.INT)
+ .nullableValue(MinorType.VARCHAR)
+ .resumeDict()
+ .resumeSchema()
+ .buildSchema();
+ final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setSchema(schema)
+ .build();
+ final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ final RowSetLoader rootWriter = rsLoader.writer();
+
+ // Write some rows
+
+ rsLoader.startBatch();
+ rootWriter
+ .addRow(10, map(
+ 1, map(1, "a", 2, "b", 4, "c"),
+ 2, map(2, "a2", 1, "c2")
+ ))
+ .addRow(20, map())
+ .addRow(30, map(
+ 1, map(),
+ 2, map(1, "a3"),
+ 3, map(2, "b4", 4, "n4", 1, null),
+ 4, map(3, "m5", 1, "a5", 2, "c5", 8, "m5", 21, "h5")
+ ));
+
+ // Validate first batch
+
+ RowSet actual = fixture.wrap(rsLoader.harvest());
+ SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(10, map(
+ 1, map(1, "a", 2, "b", 4, "c"),
+ 2, map(2, "a2", 1, "c2")
+ ))
+ .addRow(20, map())
+ .addRow(30, map(
+ 1, map(),
+ 2, map(1, "a3"),
+ 3, map(2, "b4", 4, "n4", 1, null),
+ 4, map(3, "m5", 1, "a5", 2, "c5", 8, "m5", 21, "h5")
+ ))
+ .build();
+
+ RowSetUtilities.verify(expected, actual);
+
+ // Add another rows in the second batch.
+
+ rsLoader.startBatch();
+ rootWriter
+ .addRow(40, map(
+ 1, map(1, "j6", 0, "k6"))
+ )
+ .addRow(50, map(
+ 1, map(2, "l7"),
+ 2, map(1, "o8", 5, "p8", 7, "u8")
+ ));
+
+ // Validate first batch. The new dict should have been back-filled with
+ // empty offsets for the missing rows.
+
+ actual = fixture.wrap(rsLoader.harvest());
+ expected = fixture.rowSetBuilder(actual.schema())
+ .addRow(40, map(
+ 1, map(1, "j6", 0, "k6"))
+ )
+ .addRow(50, map(
+ 1, map(2, "l7"),
+ 2, map(1, "o8", 5, "p8", 7, "u8")
+ ))
+ .build();
+
+ RowSetUtilities.verify(expected, actual);
+ rsLoader.close();
+ }
+
+ @Test
+ public void testKeyOverflow() {
+ TupleMetadata schema = new SchemaBuilder()
+ .addDict("d", MinorType.VARCHAR)
+ .value(MinorType.INT)
+ .resumeSchema()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ byte[] key = new byte[523];
+ Arrays.fill(key, (byte) 'X');
+
+ int dictSize = 4; // number of entries in each dict
+
+ // Number of rows should be driven by vector size.
+ // Our row count should include the overflow row
+
+ DictWriter dictWriter = rootWriter.dict(0);
+ ScalarWriter keyWriter = dictWriter.keyWriter();
+ ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
+
+ int expectedCount = ValueVector.MAX_BUFFER_SIZE / (key.length * dictSize);
+ {
+ int count = 0;
+ while (! rootWriter.isFull()) {
+ rootWriter.start();
+ for (int i = 0; i < dictSize; i++) {
+ keyWriter.setBytes(key, key.length);
+ valueWriter.setInt(0); // acts as a placeholder, the actual value is not important
+ dictWriter.save(); // not necessary for scalars, just for completeness
+ }
+ rootWriter.save();
+ count++;
+ }
+
+ assertEquals(expectedCount + 1, count);
+
+ // Loader's row count should include only "visible" rows
+
+ assertEquals(expectedCount, rootWriter.rowCount());
+
+ // Total count should include invisible and look-ahead rows.
+
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+ // Result should exclude the overflow row
+
+ VectorContainer container = rsLoader.harvest();
+ BatchValidator.validate(container);
+ RowSet result = fixture.wrap(container);
+ assertEquals(expectedCount, result.rowCount());
+ result.clear();
+ }
+
+ // Next batch should start with the overflow row
+
+ {
+ rsLoader.startBatch();
+ assertEquals(1, rootWriter.rowCount());
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+ VectorContainer container = rsLoader.harvest();
+ BatchValidator.validate(container);
+ RowSet result = fixture.wrap(container);
+ assertEquals(1, result.rowCount());
+ result.clear();
+ }
+
+ rsLoader.close();
+ }
+
+ @Test
+ public void testValueOverflow() {
+ TupleMetadata schema = new SchemaBuilder()
+ .addDict("d", MinorType.INT)
+ .value(MinorType.VARCHAR)
+ .resumeSchema()
+ .buildSchema();
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ rsLoader.startBatch();
+ byte[] value = new byte[523];
+ Arrays.fill(value, (byte) 'X');
+
+ int dictSize = 4; // number of entries in each dict
+
+ // Number of rows should be driven by vector size.
+ // Our row count should include the overflow row
+
+ DictWriter dictWriter = rootWriter.dict(0);
+ ScalarWriter keyWriter = dictWriter.keyWriter();
+ ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
+
+ int expectedCount = ValueVector.MAX_BUFFER_SIZE / (value.length * dictSize);
+ {
+ int count = 0;
+ while (! rootWriter.isFull()) {
+ rootWriter.start();
+ for (int i = 0; i < dictSize; i++) {
+ keyWriter.setInt(0); // acts as a placeholder, the actual value is not important
+ valueWriter.setBytes(value, value.length);
+ dictWriter.save(); // not necessary for scalars, just for completeness
+ }
+ rootWriter.save();
+ count++;
+ }
+
+ assertEquals(expectedCount + 1, count);
+
+ // Loader's row count should include only "visible" rows
+
+ assertEquals(expectedCount, rootWriter.rowCount());
+
+ // Total count should include invisible and look-ahead rows.
+
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+ // Result should exclude the overflow row
+
+ VectorContainer container = rsLoader.harvest();
+ BatchValidator.validate(container);
+ RowSet result = fixture.wrap(container);
+ assertEquals(expectedCount, result.rowCount());
+ result.clear();
+ }
+
+ // Next batch should start with the overflow row
+
+ {
+ rsLoader.startBatch();
+ assertEquals(1, rootWriter.rowCount());
+ assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+ VectorContainer container = rsLoader.harvest();
+ BatchValidator.validate(container);
+ RowSet result = fixture.wrap(container);
+ assertEquals(1, result.rowCount());
+ result.clear();
+ }
+
+ rsLoader.close();
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMapArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMapArray.java
index 08529cd..bfb94f5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMapArray.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMapArray.java
@@ -85,11 +85,11 @@
assertEquals(2, actualSchema.size());
assertTrue(actualSchema.metadata(1).isArray());
assertTrue(actualSchema.metadata(1).isMap());
- assertEquals(2, actualSchema.metadata("m").mapSchema().size());
+ assertEquals(2, actualSchema.metadata("m").tupleSchema().size());
assertEquals(2, actualSchema.column("m").getChildren().size());
TupleWriter mapWriter = rootWriter.array("m").tuple();
- assertSame(actualSchema.metadata("m").mapSchema(), mapWriter.schema().mapSchema());
- assertSame(mapWriter.tupleSchema(), mapWriter.schema().mapSchema());
+ assertSame(actualSchema.metadata("m").tupleSchema(), mapWriter.schema().tupleSchema());
+ assertSame(mapWriter.tupleSchema(), mapWriter.schema().tupleSchema());
assertSame(mapWriter.tupleSchema().metadata(0), mapWriter.scalar(0).schema());
assertSame(mapWriter.tupleSchema().metadata(1), mapWriter.scalar(1).schema());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMaps.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMaps.java
index 630796d..4030f1e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMaps.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderMaps.java
@@ -83,7 +83,7 @@
final TupleMetadata actualSchema = rootWriter.tupleSchema();
assertEquals(3, actualSchema.size());
assertTrue(actualSchema.metadata(1).isMap());
- assertEquals(2, actualSchema.metadata("m").mapSchema().size());
+ assertEquals(2, actualSchema.metadata("m").tupleSchema().size());
assertEquals(2, actualSchema.column("m").getChildren().size());
rsLoader.startBatch();
@@ -246,7 +246,7 @@
// Ensure metadata was added
assertTrue(mapWriter.tupleSchema().size() == 1);
- assertSame(mapWriter.tupleSchema(), mapWriter.schema().mapSchema());
+ assertSame(mapWriter.tupleSchema(), mapWriter.schema().tupleSchema());
assertSame(mapWriter.tupleSchema().metadata(colIndex), mapWriter.scalar(colIndex).schema());
rootWriter
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java
index 84392cf..75f4880 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.resultSet.impl;
import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.map;
import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
import static org.junit.Assert.assertEquals;
@@ -42,6 +43,7 @@
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.DictWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.exec.physical.rowSet.RowSet;
@@ -228,7 +230,7 @@
TupleWriter m1Writer = rootWriter.tuple("m1");
assertTrue(m1Md.isMap());
assertTrue(m1Writer.isProjected());
- assertEquals(2, m1Md.mapSchema().size());
+ assertEquals(2, m1Md.tupleSchema().size());
assertTrue(m1Writer.column("a").isProjected());
assertTrue(m1Writer.column("b").isProjected());
@@ -236,7 +238,7 @@
TupleWriter m2Writer = rootWriter.tuple("m2");
assertTrue(m2Md.isMap());
assertTrue(m2Writer.isProjected());
- assertEquals(2, m2Md.mapSchema().size());
+ assertEquals(2, m2Md.tupleSchema().size());
assertFalse(m2Writer.column("c").isProjected());
assertTrue(m2Writer.column("d").isProjected());
@@ -244,7 +246,7 @@
TupleWriter m3Writer = rootWriter.tuple("m3");
assertTrue(m3Md.isMap());
assertFalse(m3Writer.isProjected());
- assertEquals(2, m3Md.mapSchema().size());
+ assertEquals(2, m3Md.tupleSchema().size());
assertFalse(m3Writer.column("e").isProjected());
assertFalse(m3Writer.column("f").isProjected());
@@ -305,7 +307,7 @@
TupleWriter m1Writer = rootWriter.tuple("m1");
assertTrue(m1Md.isMap());
assertTrue(m1Writer.isProjected());
- assertEquals(2, m1Md.mapSchema().size());
+ assertEquals(2, m1Md.tupleSchema().size());
assertTrue(m1Writer.column("a").isProjected());
assertTrue(m1Writer.column("b").isProjected());
@@ -595,4 +597,102 @@
assertTrue(e.getErrorType() == ErrorType.VALIDATION);
}
}
+
+ @Test
+ public void testDictProjection() {
+
+ final String dictName1 = "d1";
+ final String dictName2 = "d2";
+
+ // There is no test for case when obtaining a value by key as this is not as simple projection
+ // as it is in case of map - there is a need to find a value corresponding to a key
+ // (the functionality is currently present in DictReader) and final column schema should be
+ // changed from dict structure with `key` and `value` children to a simple `value`.
+ List<SchemaPath> selection = RowSetTestUtils.projectList(dictName1);
+ TupleMetadata schema = new SchemaBuilder()
+ .addDict(dictName1, MinorType.VARCHAR)
+ .value(MinorType.INT)
+ .resumeSchema()
+ .addDict(dictName2, MinorType.VARCHAR)
+ .value(MinorType.INT)
+ .resumeSchema()
+ .buildSchema();
+ ResultSetOptions options = new OptionBuilder()
+ .setProjection(ProjectionSetFactory.build(selection))
+ .setSchema(schema)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+ RowSetLoader rootWriter = rsLoader.writer();
+
+ // Verify the projected columns
+
+ TupleMetadata actualSchema = rootWriter.tupleSchema();
+ ColumnMetadata dictMetadata1 = actualSchema.metadata(dictName1);
+ DictWriter dictWriter1 = rootWriter.dict(dictName1);
+ assertTrue(dictMetadata1.isDict());
+ assertTrue(dictWriter1.isProjected());
+ assertEquals(2, dictMetadata1.tupleSchema().size());
+ assertTrue(dictWriter1.keyWriter().isProjected());
+ assertTrue(dictWriter1.valueWriter().isProjected());
+
+ ColumnMetadata dictMetadata2 = actualSchema.metadata(dictName2);
+ DictWriter dictWriter2 = rootWriter.dict(dictName2);
+ assertTrue(dictMetadata2.isDict());
+ assertFalse(dictWriter2.isProjected());
+ assertEquals(2, dictMetadata2.tupleSchema().size());
+ assertFalse(dictWriter2.keyWriter().isProjected());
+ assertFalse(dictWriter2.valueWriter().isProjected());
+
+ // Write a couple of rows.
+
+ rsLoader.startBatch();
+ rootWriter.start();
+ rootWriter
+ .addRow(map( "a", 1, "b", 2), map( "c", 3, "d", 4))
+ .addRow(map("a", 11, "b", 12), map("c", 13, "d", 14));
+
+ // Verify. Only the projected columns appear in the result set.
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addDict(dictName1, MinorType.VARCHAR)
+ .value(MinorType.INT)
+ .resumeSchema()
+ .buildSchema();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(map( "a", 1, "b", 2))
+ .addRow(map("a", 11, "b", 12))
+ .build();
+ RowSetUtilities.verify(expected, fixture.wrap(rsLoader.harvest()));
+ rsLoader.close();
+ }
+
+ @Test
+ public void testDictStringKeyAccess() {
+ List<SchemaPath> selection = RowSetTestUtils.projectList("col.a"); // the same as col['a'], but number is expected in brackets ([])
+ TupleMetadata schema = new SchemaBuilder()
+ .addDict("col", MinorType.VARCHAR)
+ .value(MinorType.INT)
+ .resumeSchema()
+ .buildSchema();
+ ResultSetOptions options = new OptionBuilder()
+ .setProjection(ProjectionSetFactory.build(selection))
+ .setSchema(schema)
+ .build();
+ new ResultSetLoaderImpl(fixture.allocator(), options); // no validation error
+ }
+
+ @Test
+ public void testDictNumericKeyAccess() {
+ List<SchemaPath> selection = RowSetTestUtils.projectList("col[0]");
+ TupleMetadata schema = new SchemaBuilder()
+ .addDict("col", MinorType.INT)
+ .value(MinorType.VARCHAR)
+ .resumeSchema()
+ .buildSchema();
+ ResultSetOptions options = new OptionBuilder()
+ .setProjection(ProjectionSetFactory.build(selection))
+ .setSchema(schema)
+ .build();
+ new ResultSetLoaderImpl(fixture.allocator(), options); // no validation error
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestDummyWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestDummyWriter.java
index 59be129..c8c2aa0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestDummyWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestDummyWriter.java
@@ -28,11 +28,14 @@
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.DictWriter;
import org.apache.drill.exec.vector.accessor.ValueType;
import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory;
import org.apache.drill.exec.vector.accessor.writer.MapWriter;
+import org.apache.drill.exec.vector.accessor.writer.ObjectDictWriter;
+import org.apache.drill.exec.vector.complex.DictVector;
import org.apache.drill.test.SubOperatorTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -137,7 +140,7 @@
// Create the writers
{
- TupleMetadata mapSchema = schema.metadata("m1").mapSchema();
+ TupleMetadata mapSchema = schema.metadata("m1").tupleSchema();
List<AbstractObjectWriter> members = new ArrayList<>();
members.add(ColumnWriterFactory.buildColumnWriter(mapSchema.metadata("a"), null, null));
members.add(ColumnWriterFactory.buildColumnWriter(mapSchema.metadata("b"), null, null));
@@ -145,7 +148,7 @@
}
{
- TupleMetadata mapSchema = schema.metadata("m2").mapSchema();
+ TupleMetadata mapSchema = schema.metadata("m2").tupleSchema();
List<AbstractObjectWriter> members = new ArrayList<>();
members.add(ColumnWriterFactory.buildColumnWriter(mapSchema.metadata("c"), null, null));
writers.add(MapWriter.buildMapWriter(schema.metadata("m2"), null, members));
@@ -185,4 +188,83 @@
rootWriter.saveRow();
rootWriter.endWrite();
}
+
+ @Test
+ public void testDummyDict() {
+
+ final String dictName = "d";
+ final String dictArrayName = "da";
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addDict(dictName, MinorType.INT)
+ .repeatedValue(MinorType.VARCHAR)
+ .resumeSchema()
+ .addDictArray(dictArrayName, MinorType.VARCHAR)
+ .value(MinorType.INT)
+ .resumeSchema()
+ .buildSchema();
+
+ List<AbstractObjectWriter> writers = new ArrayList<>();
+
+
+ final String keyFieldName = DictVector.FIELD_KEY_NAME;
+ final String valueFieldName = DictVector.FIELD_VALUE_NAME;
+
+ // Create key and value writers for dict
+
+ ColumnMetadata dictMetadata = schema.metadata(dictName);
+ TupleMetadata dictSchema = dictMetadata.tupleSchema();
+ List<AbstractObjectWriter> dictFields = new ArrayList<>();
+ dictFields.add(ColumnWriterFactory.buildColumnWriter(dictSchema.metadata(keyFieldName), null, null));
+ dictFields.add(ColumnWriterFactory.buildColumnWriter(dictSchema.metadata(valueFieldName), null, null));
+ writers.add(ObjectDictWriter.buildDict(dictMetadata, null, dictFields));
+
+ // Create key and value writers for dict array
+
+ ColumnMetadata dictArrayMetadata = schema.metadata(dictArrayName);
+ TupleMetadata dictArraySchema = dictArrayMetadata.tupleSchema();
+ List<AbstractObjectWriter> dictArrayFields = new ArrayList<>();
+ dictArrayFields.add(ColumnWriterFactory.buildColumnWriter(dictArraySchema.metadata(keyFieldName), null, null));
+ dictArrayFields.add(ColumnWriterFactory.buildColumnWriter(dictArraySchema.metadata(valueFieldName), null, null));
+ writers.add(ObjectDictWriter.buildDictArray(dictArrayMetadata, null, dictArrayFields));
+
+ AbstractTupleWriter rootWriter = new RootWriterFixture(schema, writers);
+
+ // Events are ignored.
+
+ rootWriter.startWrite();
+ rootWriter.startRow();
+
+ // Nothing is projected
+
+ DictWriter dictWriter = rootWriter.dict(dictName);
+ assertFalse(dictWriter.isProjected());
+ assertFalse(dictWriter.keyWriter().isProjected());
+ assertFalse(dictWriter.valueWriter().array().scalar().isProjected());
+
+ DictWriter dictWriter1 = rootWriter.array(dictArrayName).dict();
+ assertFalse(dictWriter1.isProjected());
+ assertFalse(dictWriter1.keyWriter().isProjected());
+ assertFalse(dictWriter1.valueWriter().scalar().isProjected());
+
+ // Dummy columns seem real.
+
+ rootWriter.dict(dictName).keyWriter().setInt(20);
+ rootWriter.dict(0).valueWriter().array().scalar().setString("foo");
+
+ // Dummy array dict seems real.
+
+ rootWriter.array(dictArrayName).dict().keyWriter().setString("foo");
+ rootWriter.array(dictArrayName).dict().valueWriter().scalar().setInt(30);
+ rootWriter.array(dictArrayName).save();
+ rootWriter.array(1).dict().keyWriter().setString("bar");
+ rootWriter.array(1).dict().valueWriter().scalar().setInt(40);
+ rootWriter.array(1).save();
+
+ // More ignored events.
+
+ rootWriter.restartRow();
+ rootWriter.saveRow();
+ rootWriter.endWrite();
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestMapAccessors.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestMapAccessors.java
index d128fcd..4057f33 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestMapAccessors.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestMapAccessors.java
@@ -80,7 +80,7 @@
final TupleMetadata actualSchema = rootWriter.tupleSchema();
assertEquals(3, actualSchema.size());
assertTrue(actualSchema.metadata(1).isMap());
- assertEquals(2, actualSchema.metadata("m").mapSchema().size());
+ assertEquals(2, actualSchema.metadata("m").tupleSchema().size());
assertEquals(2, actualSchema.column("m").getChildren().size());
// Write a row the way that clients will do.
@@ -230,11 +230,11 @@
assertEquals(2, actualSchema.size());
assertTrue(actualSchema.metadata(1).isArray());
assertTrue(actualSchema.metadata(1).isMap());
- assertEquals(2, actualSchema.metadata("m").mapSchema().size());
+ assertEquals(2, actualSchema.metadata("m").tupleSchema().size());
assertEquals(2, actualSchema.column("m").getChildren().size());
TupleWriter mapWriter = rootWriter.array("m").tuple();
- assertSame(actualSchema.metadata("m").mapSchema(), mapWriter.schema().mapSchema());
- assertSame(mapWriter.tupleSchema(), mapWriter.schema().mapSchema());
+ assertSame(actualSchema.metadata("m").tupleSchema(), mapWriter.schema().tupleSchema());
+ assertSame(mapWriter.tupleSchema(), mapWriter.schema().tupleSchema());
assertSame(mapWriter.tupleSchema().metadata(0), mapWriter.scalar(0).schema());
assertSame(mapWriter.tupleSchema().metadata(1), mapWriter.scalar(1).schema());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java
index 4f4d0fc..975811e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java
@@ -18,7 +18,9 @@
package org.apache.drill.exec.physical.rowSet;
import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.map;
import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.singleObjArray;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
@@ -26,7 +28,9 @@
import static org.junit.Assert.fail;
import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
import java.util.Arrays;
+import java.util.Map;
import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -38,13 +42,18 @@
import org.apache.drill.exec.vector.VectorOverflowException;
import org.apache.drill.exec.vector.accessor.ArrayReader;
import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.KeyAccessor;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ScalarReader;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.TupleReader;
import org.apache.drill.exec.vector.accessor.TupleWriter;
import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.accessor.DictReader;
+import org.apache.drill.exec.vector.complex.DictVector;
import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
import org.apache.drill.exec.vector.complex.RepeatedMapVector;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSetUtilities;
@@ -584,6 +593,502 @@
RowSetUtilities.verify(expected, actual);
}
+ @Test
+ public void testDictStructure() {
+ final String dictName = "d";
+
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("id", MinorType.INT)
+ .addDict(dictName, MinorType.INT)
+ .value(MinorType.VARCHAR) // required int
+ .resumeSchema()
+ .buildSchema();
+ final ExtendableRowSet rowSet = fixture.rowSet(schema);
+ final RowSetWriter writer = rowSet.writer();
+
+ // Dict
+ // Pick out components and lightly test. (Assumes structure
+ // tested earlier is still valid, so no need to exhaustively
+ // test again.)
+
+ assertEquals(ObjectType.ARRAY, writer.column(dictName).type());
+ assertTrue(writer.column(dictName).schema().isDict());
+
+ final ScalarWriter idWriter = writer.column(0).scalar();
+ final DictWriter dictWriter = writer.column(1).dict();
+
+ assertEquals(ValueType.INTEGER, dictWriter.keyType());
+ assertEquals(ObjectType.SCALAR, dictWriter.valueType());
+
+ final ScalarWriter keyWriter = dictWriter.keyWriter();
+ final ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
+
+ assertEquals(ValueType.INTEGER, keyWriter.valueType());
+ assertEquals(ValueType.STRING, valueWriter.valueType());
+
+ // Write data
+ idWriter.setInt(1);
+
+ keyWriter.setInt(11);
+ valueWriter.setString("a");
+ dictWriter.save(); // Advance to next entry position
+ keyWriter.setInt(12);
+ valueWriter.setString("b");
+ dictWriter.save();
+ writer.save();
+
+ idWriter.setInt(2);
+
+ keyWriter.setInt(21);
+ valueWriter.setString("c");
+ dictWriter.save();
+ writer.save();
+
+ idWriter.setInt(3);
+
+ keyWriter.setInt(31);
+ valueWriter.setString("d");
+ dictWriter.save();
+ keyWriter.setInt(32);
+ valueWriter.setString("e");
+ dictWriter.save();
+ writer.save();
+
+ // Finish the row set and get a reader.
+
+ final SingleRowSet actual = writer.done();
+ final RowSetReader reader = actual.reader();
+
+ // Verify reader structure
+
+ assertEquals(ObjectType.ARRAY, reader.column(dictName).type());
+
+ final DictReader dictReader = reader.dict(1);
+ assertEquals(ObjectType.ARRAY, dictReader.type());
+
+ assertEquals(ValueType.INTEGER, dictReader.keyColumnType());
+ assertEquals(ObjectType.SCALAR, dictReader.valueColumnType());
+
+ // Row 1: get value reader with its position set to entry corresponding to a key
+
+ assertTrue(reader.next());
+ assertFalse(dictReader.isNull()); // dict itself is not null
+
+ dictReader.getAsString();
+
+ final KeyAccessor keyAccessor = dictReader.keyAccessor();
+ final ScalarReader valueReader = dictReader.valueReader().scalar();
+
+ assertTrue(keyAccessor.find(12));
+ assertEquals("b", valueReader.getString());
+ assertTrue(keyAccessor.find(11));
+ assertEquals("a", valueReader.getString());
+
+ // compare entire dict
+ Map<Object, Object> map = map(11, "a", 12, "b");
+ assertEquals(map, dictReader.getObject());
+
+ // Row 2
+
+ assertTrue(reader.next());
+ assertFalse(keyAccessor.find(22)); // the dict does not contain an entry with the key
+ assertTrue(keyAccessor.find(21));
+ assertEquals("c", valueReader.getString());
+
+ map = map(21, "c");
+ assertEquals(map, dictReader.getObject());
+
+ // Row 3
+
+ assertTrue(reader.next());
+
+ assertTrue(keyAccessor.find(31));
+ assertEquals("d", valueReader.getString());
+ assertFalse(keyAccessor.find(33));
+ assertTrue(keyAccessor.find(32));
+ assertEquals("e", valueReader.getString());
+
+ map = map(31, "d", 32, "e");
+ assertEquals(map, dictReader.getObject());
+
+ assertFalse(reader.next());
+
+ // Verify that the dict accessor's value count was set.
+
+ final DictVector dictVector = (DictVector) actual.container().getValueVector(1).getValueVector();
+ assertEquals(3, dictVector.getAccessor().getValueCount());
+
+ final SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(1, map(11, "a", 12, "b"))
+ .addRow(2, map(21, "c"))
+ .addRow(3, map(31, "d", 32, "e"))
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ }
+
+ @Test
+ public void testDictStructureMapValue() {
+ final String dictName = "d";
+ final int bScale = 1;
+
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("id", MinorType.INT)
+ .addDict(dictName, MinorType.INT)
+ .mapValue()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARDECIMAL, 8, bScale)
+ .resumeDict()
+ .resumeSchema()
+ .buildSchema();
+ final ExtendableRowSet rowSet = fixture.rowSet(schema);
+ final RowSetWriter writer = rowSet.writer();
+
+ // Dict with Map value
+
+ assertEquals(ObjectType.ARRAY, writer.column(dictName).type());
+
+ final ScalarWriter idWriter = writer.scalar(0);
+ final DictWriter dictWriter = writer.column(1).dict();
+
+ assertEquals(ValueType.INTEGER, dictWriter.keyType());
+ assertEquals(ObjectType.TUPLE, dictWriter.valueType());
+
+ final ScalarWriter keyWriter = dictWriter.keyWriter();
+ final TupleWriter valueWriter = dictWriter.valueWriter().tuple();
+
+ assertEquals(ValueType.INTEGER, keyWriter.valueType());
+
+ ScalarWriter aWriter = valueWriter.scalar("a");
+ ScalarWriter bWriter = valueWriter.scalar("b");
+ assertEquals(ValueType.INTEGER, aWriter.valueType());
+ assertEquals(ValueType.DECIMAL, bWriter.valueType());
+
+ // Write data
+
+ idWriter.setInt(1);
+
+ keyWriter.setInt(11);
+ aWriter.setInt(10);
+ bWriter.setDecimal(BigDecimal.valueOf(1));
+ dictWriter.save(); // advance to next entry position
+
+ keyWriter.setInt(12);
+ aWriter.setInt(11);
+ bWriter.setDecimal(BigDecimal.valueOf(2));
+ dictWriter.save();
+
+ writer.save();
+
+ idWriter.setInt(2);
+
+ keyWriter.setInt(21);
+ aWriter.setInt(20);
+ bWriter.setDecimal(BigDecimal.valueOf(3));
+ dictWriter.save();
+
+ writer.save();
+
+ idWriter.setInt(3);
+
+ keyWriter.setInt(31);
+ aWriter.setInt(30);
+ bWriter.setDecimal(BigDecimal.valueOf(4));
+ dictWriter.save();
+
+ keyWriter.setInt(32);
+ aWriter.setInt(31);
+ bWriter.setDecimal(BigDecimal.valueOf(5));
+ dictWriter.save();
+
+ keyWriter.setInt(33);
+ aWriter.setInt(32);
+ bWriter.setDecimal(BigDecimal.valueOf(6));
+ dictWriter.save();
+
+ writer.save();
+
+ // Finish the row set and get a reader.
+
+ final SingleRowSet actual = writer.done();
+ final RowSetReader reader = actual.reader();
+
+ // Verify reader structure
+
+ assertEquals(ObjectType.ARRAY, reader.column(dictName).type());
+
+ final DictReader dictReader = reader.dict(1);
+ assertEquals(ObjectType.ARRAY, dictReader.type());
+
+ assertEquals(ValueType.INTEGER, dictReader.keyColumnType());
+ assertEquals(ObjectType.TUPLE, dictReader.valueColumnType());
+
+ final KeyAccessor keyAccessor = dictReader.keyAccessor();
+ final TupleReader valueReader = dictReader.valueReader().tuple();
+
+ // Row 1: get value reader with its position set to entry corresponding to a key
+
+ assertTrue(reader.next());
+ assertFalse(dictReader.isNull()); // dict itself is not null
+
+ assertTrue(keyAccessor.find(12));
+ assertEquals(11, valueReader.scalar("a").getInt());
+ assertEquals(BigDecimal.valueOf(2.0), valueReader.scalar("b").getDecimal());
+
+ // MapReader#getObject() returns a List containing values for each column
+ // rather than mapping of column name to it's value, hence List is expected for Dict's value.
+ Map<Object, Object> map = map(
+ 11, Arrays.asList(10, BigDecimal.valueOf(1.0)),
+ 12, Arrays.asList(11, BigDecimal.valueOf(2.0))
+ );
+ assertEquals(map, dictReader.getObject());
+
+ // Row 2
+
+ assertTrue(reader.next());
+ assertFalse(keyAccessor.find(222));
+ assertTrue(keyAccessor.find(21));
+ assertEquals(Arrays.asList(20, BigDecimal.valueOf(3.0)), valueReader.getObject());
+
+ map = map(21, Arrays.asList(20, BigDecimal.valueOf(3.0)));
+ assertEquals(map, dictReader.getObject());
+
+ // Row 3
+
+ assertTrue(reader.next());
+
+ assertTrue(keyAccessor.find(32));
+ assertFalse(valueReader.isNull());
+ assertEquals(31, valueReader.scalar("a").getInt());
+ assertEquals(BigDecimal.valueOf(5.0), valueReader.scalar("b").getDecimal());
+
+ assertTrue(keyAccessor.find(31));
+ assertEquals(30, valueReader.scalar("a").getInt());
+ assertEquals(BigDecimal.valueOf(4.0), valueReader.scalar("b").getDecimal());
+
+ assertFalse(keyAccessor.find(404));
+
+ map = map(
+ 31, Arrays.asList(30, BigDecimal.valueOf(4.0)),
+ 32, Arrays.asList(31, BigDecimal.valueOf(5.0)),
+ 33, Arrays.asList(32, BigDecimal.valueOf(6.0))
+ );
+ assertEquals(map, dictReader.getObject());
+
+ assertFalse(reader.next());
+
+ // Verify that the dict accessor's value count was set.
+
+ final DictVector dictVector = (DictVector) actual.container().getValueVector(1).getValueVector();
+ assertEquals(3, dictVector.getAccessor().getValueCount());
+
+ final SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(1, map(
+ 11, objArray(10, BigDecimal.valueOf(1.0)),
+ 12, objArray(11, BigDecimal.valueOf(2.0))
+ ))
+ .addRow(2, map(21, objArray(20, BigDecimal.valueOf(3.0))))
+ .addRow(3, map(
+ 31, objArray(30, BigDecimal.valueOf(4.0)),
+ 32, objArray(31, BigDecimal.valueOf(5.0)),
+ 33, objArray(32, BigDecimal.valueOf(6.0))
+ ))
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ }
+
+ @Test
+ public void testRepeatedDictStructure() {
+ final String dictName = "d";
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("id", MinorType.INT)
+ .addDictArray(dictName, MinorType.FLOAT4)
+ .value(MinorType.VARCHAR)
+ .resumeSchema()
+ .buildSchema();
+ final ExtendableRowSet rowSet = fixture.rowSet(schema);
+ final RowSetWriter writer = rowSet.writer();
+
+ // Repeated dict
+
+ assertEquals(ObjectType.ARRAY, writer.column(dictName).type());
+
+ final ScalarWriter idWriter = writer.scalar(0);
+
+ final ArrayWriter dictArrayWriter = writer.column(1).array();
+ assertEquals(ObjectType.ARRAY, dictArrayWriter.entryType());
+
+ DictWriter dictWriter = dictArrayWriter.dict();
+
+ assertEquals(ValueType.DOUBLE, dictWriter.keyType());
+ assertEquals(ObjectType.SCALAR, dictWriter.valueType());
+
+ final ScalarWriter keyWriter = dictWriter.keyWriter();
+ final ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
+ assertEquals(ValueType.DOUBLE, keyWriter.valueType());
+ assertEquals(ValueType.STRING, valueWriter.valueType());
+
+ // Write data
+
+ idWriter.setInt(1);
+
+ keyWriter.setDouble(1);
+ valueWriter.setString("a");
+ dictWriter.save(); // advance to next entry position
+ keyWriter.setDouble(2);
+ valueWriter.setString("b");
+ dictWriter.save();
+ dictArrayWriter.save(); // advance to next array position
+
+ keyWriter.setDouble(3);
+ valueWriter.setString("c");
+ dictWriter.save();
+ dictArrayWriter.save();
+
+ writer.save(); // advance to next row
+
+ idWriter.setInt(2);
+
+ keyWriter.setDouble(11);
+ valueWriter.setString("d");
+ dictWriter.save();
+ keyWriter.setDouble(12);
+ valueWriter.setString("e");
+ dictWriter.save();
+ dictArrayWriter.save();
+
+ writer.save();
+
+ idWriter.setInt(3);
+
+ keyWriter.setDouble(21);
+ valueWriter.setString("f");
+ dictWriter.save();
+ keyWriter.setDouble(22);
+ valueWriter.setString("g");
+ dictWriter.save();
+ keyWriter.setDouble(23);
+ valueWriter.setString("h");
+ dictWriter.save();
+ dictArrayWriter.save();
+
+ keyWriter.setDouble(24);
+ valueWriter.setString("i");
+ dictWriter.save();
+ keyWriter.setDouble(25);
+ valueWriter.setString("j");
+ dictWriter.save();
+ keyWriter.setDouble(26.5);
+ valueWriter.setString("k");
+ dictWriter.save();
+ keyWriter.setDouble(27);
+ valueWriter.setString("l");
+ dictWriter.save();
+ keyWriter.setDouble(28);
+ valueWriter.setString("m");
+ dictWriter.save();
+ dictArrayWriter.save();
+
+ writer.save();
+
+ // Finish the row set and get a reader.
+
+ final SingleRowSet actual = writer.done();
+ final RowSetReader reader = actual.reader();
+
+ // Verify reader structure
+
+ assertEquals(ObjectType.ARRAY, reader.column(dictName).type());
+
+ final ArrayReader dictArrayReader = reader.array(1);
+ assertEquals(ObjectType.ARRAY, dictArrayReader.entryType());
+
+ final DictReader dictReader = dictArrayReader.entry().dict();
+ assertEquals(ValueType.DOUBLE, dictReader.keyColumnType());
+ assertEquals(ObjectType.SCALAR, dictReader.valueColumnType());
+
+ final KeyAccessor keyAccessor = dictReader.keyAccessor();
+ final ScalarReader valueReader = dictReader.valueReader().scalar();
+
+ // Row 1
+
+ assertTrue(reader.next());
+ assertFalse(dictArrayReader.isNull()); // array is not null
+
+ assertTrue(dictArrayReader.next());
+ assertFalse(dictArrayReader.isNull()); // first dict is not null
+
+ assertTrue(keyAccessor.find(2.0f));
+ assertEquals("b", valueReader.getObject());
+ assertTrue(keyAccessor.find(1.0f));
+ assertEquals("a", valueReader.getObject());
+ assertFalse(keyAccessor.find(1.1f)); // no entry for given key
+
+ assertTrue(dictArrayReader.next());
+
+ assertTrue(keyAccessor.find(3.0f));
+ assertEquals("c", valueReader.getObject());
+ assertFalse(keyAccessor.find(1.0f));
+
+ assertEquals(Arrays.asList(map(1.0, "a", 2.0, "b"), map(3.0, "c")), dictArrayReader.getObject());
+
+ // Row 2
+
+ assertTrue(reader.next());
+
+ assertTrue(dictArrayReader.next());
+
+ assertTrue(keyAccessor.find(11.0f));
+ assertEquals("d", valueReader.getString());
+ assertFalse(keyAccessor.find(1.0f));
+ assertTrue(keyAccessor.find(12.0f));
+ assertEquals("e", valueReader.getString());
+
+ // Row 3: use explicit positioning
+
+ assertTrue(reader.next());
+ dictArrayReader.setPosn(1);
+ assertTrue(keyAccessor.find(24.0f));
+ assertEquals("i", valueReader.getString());
+ assertTrue(keyAccessor.find(26.5f));
+ assertEquals("k", valueReader.getString());
+ assertTrue(keyAccessor.find(28.0f));
+ assertEquals("m", valueReader.getString());
+ assertFalse(keyAccessor.find(35.0f));
+ assertTrue(keyAccessor.find(27.0f));
+ assertEquals("l", valueReader.getString());
+
+ Map<Object, Object> element1 = map(24.0, "i", 25.0, "j", 26.5, "k", 27.0, "l", 28.0, "m");
+ assertEquals(element1, dictReader.getObject());
+
+ dictArrayReader.setPosn(0);
+ assertTrue(keyAccessor.find(23.0f));
+ assertEquals("h", valueReader.getObject());
+ assertTrue(keyAccessor.find(21.0f));
+ assertEquals("f", valueReader.getObject());
+ assertFalse(keyAccessor.find(23.05f));
+
+ Map<Object, Object> element0 = map(21.0, "f", 22.0, "g", 23.0, "h");
+ assertEquals(element0, dictReader.getObject());
+
+ assertEquals(Arrays.asList(element0, element1), dictArrayReader.getObject());
+
+ assertFalse(reader.next());
+
+ // Verify that the dict accessor's value count was set.
+
+ final RepeatedDictVector vector = (RepeatedDictVector) actual.container().getValueVector(1).getValueVector();
+ assertEquals(3, vector.getAccessor().getValueCount());
+
+ final SingleRowSet expected = fixture.rowSetBuilder(schema)
+ .addRow(1, objArray(map(1.0f, "a", 2.0f, "b"), map(3.0f, "c")))
+ .addRow(2, objArray(singleObjArray(map(11.0f, "d", 12.0f, "e"))))
+ .addRow(3, objArray(
+ map(21.0f, "f", 22.0f, "g", 23.0f, "h"),
+ map(24.0f, "i", 25.0f, "j", 26.5f, "k", 27.0f, "l", 28.0f, "m")))
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ }
+
/**
* Test an array of ints (as an example fixed-width type)
* at the top level of a schema.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestSchemaBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestSchemaBuilder.java
index 673050a..1996d05 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestSchemaBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestSchemaBuilder.java
@@ -135,7 +135,7 @@
assertTrue(m.isMap());
assertEquals(DataMode.REQUIRED, m.mode());
- TupleMetadata mapSchema = m.mapSchema();
+ TupleMetadata mapSchema = m.tupleSchema();
assertNotNull(mapSchema);
assertEquals(4, mapSchema.size());
@@ -281,6 +281,40 @@
}
/**
+ * Tests creating a dict within a row.
+ * Also the basic dict add key and value columns methods.
+ */
+ @Test
+ public void testDictInRow() {
+ TupleMetadata schema = new SchemaBuilder()
+ .addDict("d", MinorType.VARCHAR)
+ .nullableValue(MinorType.FLOAT8)
+ .resumeSchema()
+ .buildSchema();
+
+ assertEquals(1, schema.size());
+
+ ColumnMetadata d = schema.metadata(0);
+ assertEquals("d", d.name());
+ assertTrue(d.isDict());
+ assertEquals(DataMode.REQUIRED, d.mode());
+
+ TupleMetadata dictSchema = d.tupleSchema();
+ assertNotNull(dictSchema);
+ assertEquals(2, dictSchema.size());
+
+ ColumnMetadata keyMetadata = dictSchema.metadata(0);
+ assertEquals("key", keyMetadata.name());
+ assertEquals(MinorType.VARCHAR, keyMetadata.type());
+ assertEquals(DataMode.REQUIRED, keyMetadata.mode());
+
+ ColumnMetadata valueMetadata = dictSchema.metadata(1);
+ assertEquals("value", valueMetadata.name());
+ assertEquals(MinorType.FLOAT8, valueMetadata.type());
+ assertEquals(DataMode.OPTIONAL, valueMetadata.mode());
+ }
+
+ /**
* Test methods to provide a width (precision) for VarChar
* columns. The schema builder does not provide shortcuts for
* VarChar in lists, unions or repeated lists because these
@@ -304,7 +338,7 @@
assertEquals(21, schema.metadata("a").precision());
assertEquals(22, schema.metadata("b").precision());
- TupleMetadata mapSchema = schema.metadata("m").mapSchema();
+ TupleMetadata mapSchema = schema.metadata("m").tupleSchema();
assertEquals(23, mapSchema.metadata("c").precision());
assertEquals(24, mapSchema.metadata("d").precision());
}
@@ -343,7 +377,7 @@
assertEquals(7, c.precision());
assertEquals(4, c.scale());
- ColumnMetadata d = schema.metadata("m").mapSchema().metadata("d");
+ ColumnMetadata d = schema.metadata("m").tupleSchema().metadata("d");
assertEquals(DataMode.OPTIONAL, d.mode());
assertEquals(8, d.precision());
assertEquals(1, d.scale());
@@ -395,13 +429,13 @@
assertEquals(38, g.precision());
assertEquals(4, g.scale());
- ColumnMetadata d = schema.metadata("m").mapSchema().metadata("d");
+ ColumnMetadata d = schema.metadata("m").tupleSchema().metadata("d");
assertEquals(MinorType.VARDECIMAL, d.type());
assertEquals(DataMode.OPTIONAL, d.mode());
assertEquals(8, d.precision());
assertEquals(1, d.scale());
- ColumnMetadata f = schema.metadata("m").mapSchema().metadata("f");
+ ColumnMetadata f = schema.metadata("m").tupleSchema().metadata("f");
assertEquals(MinorType.VARDECIMAL, f.type());
assertEquals(DataMode.REQUIRED, f.mode());
assertEquals(38, f.precision());
@@ -463,8 +497,8 @@
.resumeSchema()
.buildSchema();
- TupleMetadata m1Schema = schema.metadata("m1").mapSchema();
- TupleMetadata m2Schema = m1Schema.metadata("m2").mapSchema();
+ TupleMetadata m1Schema = schema.metadata("m1").tupleSchema();
+ TupleMetadata m2Schema = m1Schema.metadata("m2").tupleSchema();
ColumnMetadata a = m2Schema.metadata(0);
assertEquals("a", a.name());
@@ -490,7 +524,7 @@
.resumeSchema()
.buildSchema();
- TupleMetadata m1Schema = schema.metadata("m1").mapSchema();
+ TupleMetadata m1Schema = schema.metadata("m1").tupleSchema();
VariantMetadata uSchema = m1Schema.metadata("u").variantSchema();
assertTrue(uSchema.hasType(MinorType.INT));
@@ -516,7 +550,7 @@
.resumeSchema()
.buildSchema();
- TupleMetadata m1Schema = schema.metadata("m1").mapSchema();
+ TupleMetadata m1Schema = schema.metadata("m1").tupleSchema();
ColumnMetadata r = m1Schema.metadata(0);
assertEquals("r", r.name());
@@ -550,7 +584,7 @@
ColumnMetadata mapType = variant.member(MinorType.MAP);
assertNotNull(mapType);
- TupleMetadata mapSchema = mapType.mapSchema();
+ TupleMetadata mapSchema = mapType.tupleSchema();
assertEquals(2, mapSchema.size());
assertTrue(variant.hasType(MinorType.FLOAT8));
@@ -626,7 +660,7 @@
ColumnMetadata list = schema.metadata("x");
ColumnMetadata mapCol = list.childSchema();
assertTrue(mapCol.isMap());
- TupleMetadata mapSchema = mapCol.mapSchema();
+ TupleMetadata mapSchema = mapCol.tupleSchema();
ColumnMetadata a = mapSchema.metadata("a");
assertEquals(MinorType.INT, a.type());
@@ -715,7 +749,7 @@
assertTrue(columnMetadata.isNullable());
assertEquals("m1", columnMetadata.name());
- TupleMetadata schema = columnMetadata.mapSchema();
+ TupleMetadata schema = columnMetadata.tupleSchema();
ColumnMetadata col0 = schema.metadata(0);
assertEquals("b", col0.name());
@@ -727,7 +761,7 @@
assertTrue(col1.isMap());
assertFalse(col1.isNullable());
- ColumnMetadata child = col1.mapSchema().metadata(0);
+ ColumnMetadata child = col1.tupleSchema().metadata(0);
assertEquals("v", child.name());
assertEquals(MinorType.VARCHAR, child.type());
assertTrue(child.isNullable());
@@ -751,7 +785,7 @@
assertTrue(child.isArray());
assertTrue(child.isMap());
- TupleMetadata mapSchema = child.mapSchema();
+ TupleMetadata mapSchema = child.tupleSchema();
ColumnMetadata col0 = mapSchema.metadata(0);
assertEquals("v", col0.name());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java
index 6502d83..d6237e1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/TestTupleSchema.java
@@ -67,7 +67,7 @@
// Generic checks
assertEquals(ColumnMetadata.StructureType.PRIMITIVE, col.structureType());
- assertNull(col.mapSchema());
+ assertNull(col.tupleSchema());
assertTrue(field.isEquivalent(col.schema()));
assertEquals(field.getName(), col.name());
assertEquals(field.getType().getMinorType(), col.type());
@@ -160,7 +160,7 @@
ColumnMetadata col = MetadataUtils.fromField(field);
assertEquals(ColumnMetadata.StructureType.PRIMITIVE, col.structureType());
- assertNull(col.mapSchema());
+ assertNull(col.tupleSchema());
assertFalse(col.isNullable());
assertFalse(col.isArray());
assertTrue(col.isVariableWidth());
@@ -271,9 +271,9 @@
ColumnMetadata col = MetadataUtils.fromField(field);
assertTrue(col instanceof MapColumnMetadata);
- assertNotNull(col.mapSchema());
- assertEquals(0, col.mapSchema().size());
- assertSame(col, col.mapSchema().parent());
+ assertNotNull(col.tupleSchema());
+ assertEquals(0, col.tupleSchema().size());
+ assertSame(col, col.tupleSchema().parent());
MapColumnMetadata mapCol = (MapColumnMetadata) col;
assertNull(mapCol.parentTuple());
@@ -301,8 +301,8 @@
ColumnMetadata col = MetadataUtils.fromField(field);
assertTrue(col instanceof MapColumnMetadata);
- assertNotNull(col.mapSchema());
- assertEquals(0, col.mapSchema().size());
+ assertNotNull(col.tupleSchema());
+ assertEquals(0, col.tupleSchema().size());
assertFalse(col.isNullable());
assertTrue(col.isArray());
@@ -548,15 +548,15 @@
MaterializedField fieldA = SchemaBuilder.columnSchema("a", MinorType.MAP, DataMode.REQUIRED);
ColumnMetadata colA = root.add(fieldA);
- TupleMetadata mapA = colA.mapSchema();
+ TupleMetadata mapA = colA.tupleSchema();
MaterializedField fieldB = SchemaBuilder.columnSchema("b.x", MinorType.MAP, DataMode.REQUIRED);
ColumnMetadata colB = mapA.add(fieldB);
- TupleMetadata mapB = colB.mapSchema();
+ TupleMetadata mapB = colB.tupleSchema();
MaterializedField fieldC = SchemaBuilder.columnSchema("c.y", MinorType.MAP, DataMode.REQUIRED);
ColumnMetadata colC = mapB.add(fieldC);
- TupleMetadata mapC = colC.mapSchema();
+ TupleMetadata mapC = colC.tupleSchema();
MaterializedField fieldD = SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.REQUIRED);
ColumnMetadata colD = mapC.add(fieldD);
@@ -602,7 +602,7 @@
// Copying should be deep.
TupleMetadata root2 = root.copy();
- assertEquals(2, root2.metadata(0).mapSchema().metadata(0).mapSchema().metadata(0).mapSchema().size());
+ assertEquals(2, root2.metadata(0).tupleSchema().metadata(0).tupleSchema().metadata(0).tupleSchema().size());
assert(root.isEquivalent(root2));
// Generate a materialized field and compare.
@@ -640,11 +640,11 @@
// Get the parts.
- TupleMetadata mapA = colA.mapSchema();
+ TupleMetadata mapA = colA.tupleSchema();
ColumnMetadata colB = mapA.metadata("b.x");
- TupleMetadata mapB = colB.mapSchema();
+ TupleMetadata mapB = colB.tupleSchema();
ColumnMetadata colC = mapB.metadata("c.y");
- TupleMetadata mapC = colC.mapSchema();
+ TupleMetadata mapC = colC.tupleSchema();
ColumnMetadata colD = mapC.metadata("d");
ColumnMetadata colE = mapC.metadata("e");
@@ -800,7 +800,7 @@
assertTrue(union.hasType(MinorType.LIST));
ColumnMetadata mapCol = union.member(MinorType.MAP);
- TupleMetadata mapSchema = mapCol.mapSchema();
+ TupleMetadata mapSchema = mapCol.tupleSchema();
assertEquals(2, mapSchema.size());
ColumnMetadata listCol = union.member(MinorType.LIST);
@@ -933,4 +933,58 @@
schema.removeProperty("missing_prop");
assertEquals(1, schema.properties().size());
}
+
+ @Test
+ public void testDictColumn() {
+ MaterializedField field = SchemaBuilder.columnSchema("d", MinorType.DICT, DataMode.REQUIRED);
+ ColumnMetadata col = MetadataUtils.fromField(field);
+
+ assertTrue(col instanceof DictColumnMetadata);
+ assertNotNull(col.tupleSchema());
+ assertEquals(0, col.tupleSchema().size());
+ assertSame(col, col.tupleSchema().parent());
+
+ DictColumnMetadata dictCol = (DictColumnMetadata) col;
+ assertNull(dictCol.parentTuple());
+
+ assertEquals(ColumnMetadata.StructureType.DICT, col.structureType());
+ assertFalse(col.isNullable());
+ assertFalse(col.isArray());
+ assertFalse(col.isVariableWidth());
+ assertTrue(col.isDict());
+ assertFalse(col.isVariant());
+
+ assertEquals(0, col.expectedWidth());
+ col.setExpectedWidth(10);
+ assertEquals(0, col.expectedWidth());
+
+ assertEquals(1, col.expectedElementCount());
+ col.setExpectedElementCount(2);
+ assertEquals(1, col.expectedElementCount());
+ }
+
+ @Test
+ public void testRepeatedDictColumn() {
+ MaterializedField field = SchemaBuilder.columnSchema("da", MinorType.DICT, DataMode.REPEATED);
+ ColumnMetadata col = MetadataUtils.fromField(field);
+
+ assertTrue(col instanceof DictColumnMetadata);
+ assertNotNull(col.tupleSchema());
+ assertEquals(0, col.tupleSchema().size());
+
+ assertFalse(col.isNullable());
+ assertTrue(col.isArray());
+ assertFalse(col.isVariableWidth());
+ assertTrue(col.isDict());
+ assertFalse(col.isVariant());
+
+ assertEquals(0, col.expectedWidth());
+ col.setExpectedWidth(10);
+ assertEquals(0, col.expectedWidth());
+
+ assertEquals(ColumnMetadata.DEFAULT_ARRAY_SIZE, col.expectedElementCount());
+
+ col.setExpectedElementCount(2);
+ assertEquals(2, col.expectedElementCount());
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
index c17e20c..5a634fd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
@@ -766,6 +766,23 @@
}
@Test
+ public void testMapSchemaArrayValueGetByKeyElementByIndex() throws Exception {
+ String sql = "select map_array_value['key1'][3] element from dfs.`%s`";
+
+ TestBuilder testBuilder = testBuilder()
+ .sqlQuery(sql, generateMapSchema().getFileName())
+ .unOrdered()
+ .baselineColumns("element");
+
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ double val = (double) (i + 1) * 3;
+ testBuilder.baselineValues(val);
+ }
+
+ testBuilder.go();
+ }
+
+ @Test
public void testMapSchemaValueInFilter() throws Exception {
String sql = "select map_field['key1'] val from dfs.`%s` where map_field['key1'] < %d";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
index f5b9cd7..54e8252 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
@@ -642,6 +642,17 @@
}
@Test
+ public void testDictArrayTypeOf() throws Exception {
+ String query = "select typeof(map_array) as type from cp.`store/parquet/complex/map/parquet/000000_0.parquet` limit 1";
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("type")
+ .baselineValuesForSingleColumn("ARRAY<DICT<BIGINT,INT>>")
+ .go();
+ }
+
+ @Test
public void testDictTypeOf() throws Exception {
String query = "select typeof(map_array[0]) as type from cp.`store/parquet/complex/map/parquet/000000_0.parquet` limit 1";
testBuilder()
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
index b1c196b..4b6d35a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
@@ -21,6 +21,8 @@
import static org.junit.Assert.assertTrue;
import java.math.BigDecimal;
+import java.util.LinkedHashMap;
+import java.util.Map;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -262,4 +264,21 @@
public static BigDecimal dec(String value) {
return new BigDecimal(value);
}
+
+ /**
+ * Convenience method to bootstrap a map object given key-value sequence.
+ *
+ * @param entry key-value sequence
+ * @return map containing key-value pairs from passed sequence
+ */
+ public static Map<Object, Object> map(Object... entry) {
+ assert entry.length % 2 == 0 : "Array length should be even.";
+
+ // LinkedHashMap is chosen to preserve entry order
+ Map<Object, Object> map = new LinkedHashMap<>();
+ for (int i = 0; i < entry.length; i += 2) {
+ map.put(entry[i], entry[i + 1]);
+ }
+ return map;
+ }
}
diff --git a/exec/vector/src/main/codegen/templates/KeyAccessors.java b/exec/vector/src/main/codegen/templates/KeyAccessors.java
new file mode 100644
index 0000000..39b3b33
--- /dev/null
+++ b/exec/vector/src/main/codegen/templates/KeyAccessors.java
@@ -0,0 +1,106 @@
+<@pp.dropOutputFile />
+<@pp.changeOutputFile name="/org/apache/drill/exec/vector/accessor/KeyAccessors.java" />
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+
+import org.joda.time.DateTimeZone;
+import org.joda.time.Period;
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+/**
+ * This class is generated using Freemarker and the ${.template_name} template.
+ */
+public class KeyAccessors {
+
+ <#list [
+ {"name": "Boolean", "javaType": "boolean"},
+ {"name": "Integer", "javaType": "int"},
+ {"name": "Long", "javaType": "long"},
+ {"name": "Double", "javaType": "double"},
+ {"name": "String", "javaType": "String"},
+ {"name": "Bytes", "javaType": "byte[]"},
+ {"name": "Decimal", "javaType": "BigDecimal"},
+ {"name": "Period", "javaType": "Period"},
+ {"name": "Date", "javaType": "LocalDate"},
+ {"name": "Time", "javaType": "LocalTime"},
+ {"name": "Timestamp", "javaType": "Instant"}
+ ] as valueType>
+
+ public static class ${valueType.name}KeyAccessor extends AbstractKeyAccessor {
+
+ public ${valueType.name}KeyAccessor(DictReader dictReader, ScalarReader keyReader) {
+ super(dictReader, keyReader);
+ }
+
+ @Override
+ public boolean find(${valueType.javaType} key) {
+ dictReader.rewind();
+ while (dictReader.next()) {
+ <#if valueType.name == "Bytes">
+ if (Arrays.equals(key, keyReader.getBytes())) {
+ <#elseif valueType.name == "Integer">
+ if (key == keyReader.getInt()) {
+ <#elseif valueType.name == "Boolean" || valueType.name == "Long" || valueType.name == "Double">
+ if (key == keyReader.get${valueType.name}()) {
+ <#else>
+ if (key.equals(keyReader.get${valueType.name}())) { // key should not be null so it is safe
+ </#if>
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+ </#list>
+
+ public static KeyAccessor getAccessor(DictReader dictReader, ScalarReader keyReader) {
+ switch (keyReader.valueType()) {
+ case BOOLEAN:
+ return new BooleanKeyAccessor(dictReader, keyReader);
+ case BYTES:
+ return new BytesKeyAccessor(dictReader, keyReader);
+ case DECIMAL:
+ return new DecimalKeyAccessor(dictReader, keyReader);
+ case DOUBLE:
+ return new DoubleKeyAccessor(dictReader, keyReader);
+ case INTEGER:
+ return new IntegerKeyAccessor(dictReader, keyReader);
+ case LONG:
+ return new LongKeyAccessor(dictReader, keyReader);
+ case PERIOD:
+ return new PeriodKeyAccessor(dictReader, keyReader);
+ case STRING:
+ return new StringKeyAccessor(dictReader, keyReader);
+ case DATE:
+ return new DateKeyAccessor(dictReader, keyReader);
+ case TIME:
+ return new TimeKeyAccessor(dictReader, keyReader);
+ case TIMESTAMP:
+ return new TimestampKeyAccessor(dictReader, keyReader);
+ default:
+ throw new IllegalStateException("Unexpected key type: " + keyReader.valueType());
+ }
+ }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
index 8d7f9d0..99bcdb3 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
@@ -153,7 +153,7 @@
}
@Override
- public TupleMetadata mapSchema() { return null; }
+ public TupleMetadata tupleSchema() { return null; }
@Override
public VariantMetadata variantSchema() { return null; }
@@ -279,9 +279,9 @@
buf.append(", variant: ")
.append(variantSchema().toString());
}
- if (mapSchema() != null) {
+ if (tupleSchema() != null) {
buf.append(", schema: ")
- .append(mapSchema().toString());
+ .append(tupleSchema().toString());
}
if (hasProperties()) {
buf.append(", properties: ")
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java
index 1ae0007..6f79336 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractMapColumnMetadata.java
@@ -84,7 +84,7 @@
}
@Override
- public TupleMetadata mapSchema() {
+ public TupleMetadata tupleSchema() {
return schema;
}
@@ -123,7 +123,7 @@
}
builder.append(getStringType())
.append("<").append(
- mapSchema().toMetadataList().stream()
+ tupleSchema().toMetadataList().stream()
.map(ColumnMetadata::columnString)
.collect(Collectors.joining(", "))
)
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
index c8bc0fd..a56193e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/ColumnMetadata.java
@@ -118,7 +118,12 @@
* the code has evolved.
*/
- MULTI_ARRAY
+ MULTI_ARRAY,
+
+ /**
+ * Dict or repeated dict.
+ */
+ DICT
}
int DEFAULT_ARRAY_SIZE = 10;
@@ -131,7 +136,7 @@
* @return the tuple schema
*/
- TupleMetadata mapSchema();
+ TupleMetadata tupleSchema();
/**
* Schema for <tt>VARIANT</tt> columns.
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictBuilder.java
new file mode 100644
index 0000000..f5af894
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictBuilder.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record.metadata;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.vector.complex.DictVector;
+
+/**
+ * Internal structure for building a dict. Dict is an array of key-value pairs
+ * with defined types for key and value (key and value fields are defined within {@link TupleSchema}).
+ * Key can be {@link org.apache.drill.common.types.TypeProtos.DataMode#REQUIRED} primitive,
+ * while value can be primitive or complex.
+ * <p>Column is added to the parent container during creation
+ * and all <tt>resumeXXX</tt> methods return qualified parent container.</p>
+ *
+ * @see DictVector
+ */
+public class DictBuilder implements SchemaContainer {
+
+ /**
+ * Schema containing key and value fields' definition.
+ */
+ private final TupleSchema schema = new TupleSchema();
+ private final SchemaContainer parent;
+ private final String name;
+ private final TypeProtos.DataMode mode;
+
+ public DictBuilder(SchemaContainer parent, String name, TypeProtos.DataMode mode) {
+ this.parent = parent;
+ this.name = name;
+ this.mode = mode;
+ }
+
+ @Override
+ public void addColumn(ColumnMetadata column) {
+ // As dict does not support complex key, this method is used to
+ // nest a complex value only
+ if (!DictVector.FIELD_VALUE_NAME.equals(column.name())) {
+ String message = String.format(
+ "Expected column with name '%s'. Found: '%s'.", DictVector.FIELD_VALUE_NAME, column.name());
+ throw new IllegalArgumentException(message);
+ }
+
+ if (isFieldSet(column.name())) {
+ String message = String.format("Field '%s' is already defined in dict.", column.name());
+ throw new IllegalArgumentException(message);
+ }
+
+ schema.addColumn(column);
+ }
+
+ public DictBuilder key(TypeProtos.MinorType type) {
+ TypeProtos.MajorType keyType = Types.withMode(type, TypeProtos.DataMode.REQUIRED);
+ return key(keyType);
+ }
+
+ /**
+ * Use this method to set types with width or scale and precision,
+ * e.g. {@link org.apache.drill.common.types.TypeProtos.MinorType#VARDECIMAL} with scale and precision or
+ * {@link org.apache.drill.common.types.TypeProtos.MinorType#VARCHAR} etc.
+ *
+ * @param type desired type for key
+ * @return {@code this} builder
+ * @throws IllegalStateException if key field is already set
+ * @throws IllegalArgumentException if {@code type} is not supported (either complex or nullable)
+ */
+ public DictBuilder key(TypeProtos.MajorType type) {
+ final String fieldName = DictVector.FIELD_KEY_NAME;
+ if (isFieldSet(fieldName)) {
+ throw new IllegalStateException(String.format("Filed '%s' is already defined.", fieldName));
+ }
+
+ if (!isSupportedKeyType(type)) {
+ throw new IllegalArgumentException(
+ String.format("'%s' in dict should be non-nullable primitive. Found: %s", fieldName, type));
+ }
+
+ addField(fieldName, type);
+ return this;
+ }
+
+ /**
+ * Checks if the field identified by name was already set.
+ *
+ * @param name name of the field
+ * @return {@code true} if the schema contains field with the {@code name}; {@code false} otherwise.
+ */
+ private boolean isFieldSet(String name) {
+ return schema.index(name) != -1;
+ }
+
+ public DictBuilder value(TypeProtos.MinorType type) {
+ return value(type, TypeProtos.DataMode.REQUIRED);
+ }
+
+ public DictBuilder nullableValue(TypeProtos.MinorType type) {
+ return value(type, TypeProtos.DataMode.OPTIONAL);
+ }
+
+ public DictBuilder repeatedValue(TypeProtos.MinorType type) {
+ return value(type, TypeProtos.DataMode.REPEATED);
+ }
+
+ private DictBuilder value(TypeProtos.MinorType type, TypeProtos.DataMode mode) {
+ TypeProtos.MajorType valueType = Types.withMode(type, mode);
+ return value(valueType);
+ }
+
+ /**
+ * Define non-complex value type. For complex types use {@link #mapValue()}, {@link #mapArrayValue()} etc.
+ *
+ * @param type desired non-complex type for value.
+ * @return {@code this} builder
+ * @throws IllegalStateException if value is already set
+ * @throws IllegalArgumentException if {@code type} is either {@code MAP},
+ * {@code LIST}, {@code DICT} or {@code UNION}.
+ * @see #mapValue() method to define value as {@code MAP}
+ * @see #mapArrayValue() method to define value as {@code REPEATED MAP}
+ * @see #listValue() method to define value as {@code LIST}
+ * @see #unionValue() method to define value as {@code UNION}
+ * @see #dictValue() method to define value as {@code DICT}
+ * @see #dictArrayValue() method to define value as {@code REPEATED DICT}
+ */
+ public DictBuilder value(TypeProtos.MajorType type) {
+ final String fieldName = DictVector.FIELD_VALUE_NAME;
+ if (isFieldSet(fieldName)) {
+ throw new IllegalStateException(String.format("Field '%s' is already defined.", fieldName));
+ }
+
+ if (Types.isComplex(type) || Types.isUnion(type)) {
+ String msg = String.format("Complex type found %s when defining '%s'. " +
+ "Use mapValue(), listValue() etc. in case of complex value type.", fieldName, type);
+ throw new IllegalArgumentException(msg);
+ }
+
+ addField(fieldName, type);
+ return this;
+ }
+
+ /**
+ * Adds field (either key or value) after validation to the schema.
+ *
+ * @param name name of the field
+ * @param type type of the field
+ */
+ private void addField(String name, TypeProtos.MajorType type) {
+ ColumnBuilder builder = new ColumnBuilder(name, type.getMinorType())
+ .setMode(type.getMode());
+
+ if (type.hasScale()) {
+ builder.setPrecisionAndScale(type.getPrecision(), type.getScale());
+ } else if (type.hasPrecision()) {
+ builder.setPrecision(type.getPrecision());
+ }
+ if (type.hasWidth()) {
+ builder.setWidth(type.getWidth());
+ }
+
+ if (Types.isRepeated(type)) {
+ schema.add(SchemaBuilder.columnSchema(name, type.getMinorType(), type.getMode()));
+ return;
+ }
+
+ schema.add(builder.build());
+ }
+
+ public MapBuilder mapValue() {
+ return new MapBuilder(this, DictVector.FIELD_VALUE_NAME, TypeProtos.DataMode.REQUIRED);
+ }
+
+ public MapBuilder mapArrayValue() {
+ return new MapBuilder(this, DictVector.FIELD_VALUE_NAME, TypeProtos.DataMode.REPEATED);
+ }
+
+ public DictBuilder dictValue() {
+ return new DictBuilder(this, DictVector.FIELD_VALUE_NAME, TypeProtos.DataMode.REQUIRED);
+ }
+
+ public DictBuilder dictArrayValue() {
+ return new DictBuilder(this, DictVector.FIELD_VALUE_NAME, TypeProtos.DataMode.REPEATED);
+ }
+
+ public UnionBuilder unionValue() {
+ return new UnionBuilder(this, DictVector.FIELD_VALUE_NAME, TypeProtos.MinorType.UNION);
+ }
+
+ public UnionBuilder listValue() {
+ return new UnionBuilder(this, DictVector.FIELD_VALUE_NAME, TypeProtos.MinorType.LIST);
+ }
+
+ public RepeatedListBuilder repeatedListValue() {
+ return new RepeatedListBuilder(this, DictVector.FIELD_VALUE_NAME);
+ }
+
+ public DictColumnMetadata buildColumn() {
+ validateKeyValuePresent();
+ return new DictColumnMetadata(name, mode, schema);
+ }
+
+ private void validateKeyValuePresent() {
+ for (String fieldName : DictVector.fieldNames) {
+ ColumnMetadata columnMetadata = schema.metadata(fieldName);
+ if (columnMetadata == null) {
+ throw new IllegalStateException(String.format("Field %s is absent in DICT.", fieldName));
+ }
+ }
+ }
+
+ public void build() {
+ if (parent != null) {
+ parent.addColumn(buildColumn());
+ }
+ }
+
+ public SchemaBuilder resumeSchema() {
+ build();
+ return (SchemaBuilder) parent;
+ }
+
+ public MapBuilder resumeMap() {
+ build();
+ return (MapBuilder) parent;
+ }
+
+ public RepeatedListBuilder resumeList() {
+ build();
+ return (RepeatedListBuilder) parent;
+ }
+
+ public UnionBuilder resumeUnion() {
+ build();
+ return (UnionBuilder) parent;
+ }
+
+ public DictBuilder resumeDict() {
+ build();
+ return (DictBuilder) parent;
+ }
+
+ private boolean isSupportedKeyType(TypeProtos.MajorType type) {
+ return !Types.isComplex(type)
+ && !Types.isUnion(type)
+ && type.getMode() == TypeProtos.DataMode.REQUIRED;
+ }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java
index 967792e..50957e8 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/DictColumnMetadata.java
@@ -36,10 +36,10 @@
* the children) of the materialized field provided.
*
* @param schema the schema to use
- * @param mapSchema parent schema
+ * @param tupleSchema parent schema
*/
- DictColumnMetadata(MaterializedField schema, TupleSchema mapSchema) {
- super(schema, mapSchema);
+ DictColumnMetadata(MaterializedField schema, TupleSchema tupleSchema) {
+ super(schema, tupleSchema);
}
public DictColumnMetadata(DictColumnMetadata from) {
@@ -69,4 +69,9 @@
protected String getStringType() {
return "MAP";
}
+
+ @Override
+ public StructureType structureType() {
+ return StructureType.DICT;
+ }
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java
index 6c8a2fe..6f10bb0 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MapBuilder.java
@@ -144,6 +144,22 @@
return tupleBuilder.addRepeatedList(this, name);
}
+ public DictBuilder addDict(String name, MinorType keyType) {
+ return tupleBuilder.addDict(this, name).key(keyType);
+ }
+
+ public DictBuilder addDict(String name, MajorType keyType) {
+ return tupleBuilder.addDict(this, name).key(keyType);
+ }
+
+ public DictBuilder addDictArray(String name, MinorType keyType) {
+ return tupleBuilder.addDictArray(this, name).key(keyType);
+ }
+
+ public DictBuilder addDictArray(String name, MajorType keyType) {
+ return tupleBuilder.addDictArray(this, name).key(keyType);
+ }
+
public MapColumnMetadata buildColumn() {
return new MapColumnMetadata(memberName, mode, tupleBuilder.schema());
}
@@ -176,4 +192,9 @@
build();
return (UnionBuilder) parent;
}
+
+ public DictBuilder resumeDict() {
+ build();
+ return (DictBuilder) parent;
+ }
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
index 43b38c6..05936a3 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
@@ -17,13 +17,16 @@
*/
package org.apache.drill.exec.record.metadata;
+import java.util.Collection;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.complex.DictVector;
public class MetadataUtils {
@@ -38,7 +41,7 @@
/**
* Create a column metadata object that holds the given
* {@link MaterializedField}. The type of the object will be either a
- * primitive or map column, depending on the field's type. The logic
+ * primitive, map or dict column, depending on the field's type. The logic
* here mimics the code as written, which is very messy in some places.
*
* @param field the materialized field to wrap
@@ -90,6 +93,8 @@
public static ColumnMetadata fromView(MaterializedField field) {
if (field.getType().getMinorType() == MinorType.MAP) {
return new MapColumnMetadata(field, null);
+ } else if (field.getType().getMinorType() == MinorType.DICT) {
+ return newDict(field);
} else {
return new PrimitiveColumnMetadata(field);
}
@@ -138,8 +143,25 @@
return new DictColumnMetadata(field, fromFields(field.getChildren()));
}
+ public static DictColumnMetadata newDict(MaterializedField field, TupleSchema schema) {
+ validateDictChildren(schema.toFieldList());
+ return new DictColumnMetadata(field.getName(), field.getDataMode(), schema);
+ }
+
+ private static void validateDictChildren(List<MaterializedField> entryFields) {
+ Collection<String> children = entryFields.stream()
+ .map(MaterializedField::getName)
+ .collect(Collectors.toList());
+ String message = "DICT does not contain %s.";
+ if (!children.contains(DictVector.FIELD_KEY_NAME)) {
+ throw new IllegalStateException(String.format(message, DictVector.FIELD_KEY_NAME));
+ } else if (!children.contains(DictVector.FIELD_VALUE_NAME)) {
+ throw new IllegalStateException(String.format(message, DictVector.FIELD_VALUE_NAME));
+ }
+ }
+
public static DictColumnMetadata newDict(String name, TupleMetadata schema) {
- return new DictColumnMetadata(name, DataMode.OPTIONAL, (TupleSchema) schema);
+ return new DictColumnMetadata(name, DataMode.REQUIRED, (TupleSchema) schema);
}
public static VariantColumnMetadata newVariant(MaterializedField field, VariantSchema schema) {
@@ -167,13 +189,13 @@
public static PrimitiveColumnMetadata newScalar(String name, MinorType type,
DataMode mode) {
- assert type != MinorType.MAP && type != MinorType.UNION && type != MinorType.LIST;
+ assert isScalar(type);
return new PrimitiveColumnMetadata(name, type, mode);
}
public static PrimitiveColumnMetadata newScalar(String name, MajorType type) {
MinorType minorType = type.getMinorType();
- assert minorType != MinorType.MAP && minorType != MinorType.UNION && minorType != MinorType.LIST;
+ assert isScalar(minorType);
return new PrimitiveColumnMetadata(name, type);
}
@@ -205,4 +227,10 @@
return new PrimitiveColumnMetadata(field);
}
+ private static boolean isScalar(MinorType type) {
+ return !(type == MinorType.MAP
+ || type == MinorType.UNION
+ || type == MinorType.LIST
+ || type == MinorType.DICT);
+ }
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java
index c02da5e..895e170 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java
@@ -60,6 +60,12 @@
return new MapBuilder(this, name, DataMode.REPEATED);
}
+ public DictBuilder addDictArray() {
+ // Existing code uses the repeated list name as the name of
+ // the vector within the list.
+ return new DictBuilder(this, name, DataMode.REPEATED);
+ }
+
public RepeatedListBuilder addArray(MinorType type) {
// Existing code uses the repeated list name as the name of
// the vector within the list.
@@ -127,6 +133,11 @@
return (MapBuilder) parent;
}
+ public DictBuilder resumeDict() {
+ build();
+ return (DictBuilder) parent;
+ }
+
@Override
public void addColumn(ColumnMetadata column) {
assert child == null;
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java
index 96eb863..264f3ad 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/SchemaBuilder.java
@@ -182,6 +182,22 @@
return tupleBuilder.addMapArray(this, name);
}
+ public DictBuilder addDict(String name, MinorType keyType) {
+ return tupleBuilder.addDict(this, name).key(keyType);
+ }
+
+ public DictBuilder addDict(String name, MajorType keyType) {
+ return tupleBuilder.addDict(this, name).key(keyType);
+ }
+
+ public DictBuilder addDictArray(String name, MinorType keyType) {
+ return tupleBuilder.addDictArray(this, name).key(keyType);
+ }
+
+ public DictBuilder addDictArray(String name, MajorType keyType) {
+ return tupleBuilder.addDictArray(this, name).key(keyType);
+ }
+
public UnionBuilder addUnion(String name) {
return tupleBuilder.addUnion(this, name);
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleBuilder.java
index 7ce2607..33514d5 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleBuilder.java
@@ -132,6 +132,14 @@
return new MapBuilder(parent, name, DataMode.REPEATED);
}
+ public DictBuilder addDict(SchemaContainer parent, String name) {
+ return new DictBuilder(parent, name, DataMode.REQUIRED);
+ }
+
+ public DictBuilder addDictArray(SchemaContainer parent, String name) {
+ return new DictBuilder(parent, name, DataMode.REPEATED);
+ }
+
public UnionBuilder addUnion(SchemaContainer parent, String name) {
return new UnionBuilder(parent, name, MinorType.UNION);
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
index cb43c64..9650d5e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
@@ -66,8 +66,9 @@
this.parentMap = parentMap;
}
- public TupleMetadata copy() {
- TupleMetadata tuple = new TupleSchema();
+ @Override
+ public TupleSchema copy() {
+ TupleSchema tuple = new TupleSchema();
for (ColumnMetadata md : this) {
tuple.addColumn(md.copy());
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/UnionBuilder.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/UnionBuilder.java
index eee9b3a..d09c810 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/UnionBuilder.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/UnionBuilder.java
@@ -83,6 +83,11 @@
return new RepeatedListBuilder(this, Types.typeKey(MinorType.LIST));
}
+ public DictBuilder addDict() {
+ checkType(MinorType.DICT);
+ return new DictBuilder(this, Types.typeKey(MinorType.DICT), DataMode.OPTIONAL);
+ }
+
public VariantColumnMetadata buildColumn() {
return new VariantColumnMetadata(name, type, union);
}
@@ -107,4 +112,9 @@
build();
return (UnionBuilder) parent;
}
-}
\ No newline at end of file
+
+ public DictBuilder resumeDict() {
+ build();
+ return (DictBuilder) parent;
+ }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/AbstractKeyAccessor.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/AbstractKeyAccessor.java
new file mode 100644
index 0000000..97731ba
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/AbstractKeyAccessor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+import org.joda.time.Period;
+
+import java.math.BigDecimal;
+
+public class AbstractKeyAccessor implements KeyAccessor {
+
+ private static final String UNSUPPORTED_MESSAGE_TEMPLATE = "%s does not support a key of type %s.";
+
+ protected final DictReader dictReader;
+ protected final ScalarReader keyReader;
+
+ protected AbstractKeyAccessor(DictReader dictReader, ScalarReader keyReader) {
+ this.dictReader = dictReader;
+ this.keyReader = keyReader;
+ }
+
+ @Override
+ public boolean find(boolean key) {
+ throw unsupported("boolean");
+ }
+
+ @Override
+ public boolean find(int key) {
+ throw unsupported("int");
+ }
+
+ @Override
+ public boolean find(BigDecimal key) {
+ throw unsupported("BigDecimal");
+ }
+
+ @Override
+ public boolean find(double key) {
+ throw unsupported("double");
+ }
+
+ @Override
+ public boolean find(long key) {
+ throw unsupported("long");
+ }
+
+ @Override
+ public boolean find(String key) {
+ throw unsupported("String");
+ }
+
+ @Override
+ public boolean find(byte[] key) {
+ throw unsupported("byte[]");
+ }
+
+ @Override
+ public boolean find(Period key) {
+ throw unsupported("Period");
+ }
+
+ @Override
+ public boolean find(LocalDate key) {
+ throw unsupported("LocalDate");
+ }
+
+ @Override
+ public boolean find(LocalTime key) {
+ throw unsupported("LocalTime");
+ }
+
+ @Override
+ public boolean find(Instant key) {
+ throw unsupported("Instant");
+ }
+
+ private UnsupportedOperationException unsupported(String type) {
+ return new UnsupportedOperationException(String.format(
+ UNSUPPORTED_MESSAGE_TEMPLATE, this.getClass().getName(), type));
+ }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java
index df0c6ad..297e615 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ArrayWriter.java
@@ -54,7 +54,7 @@
* without having to first retrieve the variant (although the indirect
* route is, of course, available.)
*
- * @see {@link ArrayReader}
+ * @see ArrayReader
*/
public interface ArrayWriter extends ColumnWriter {
@@ -87,6 +87,7 @@
TupleWriter tuple();
ArrayWriter array();
VariantWriter variant();
+ DictWriter dict();
/**
* When the array contains a tuple or an array, call <tt>save()</tt>
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/DictReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/DictReader.java
new file mode 100644
index 0000000..0c2fb0c
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/DictReader.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+public interface DictReader extends ArrayReader {
+
+ KeyAccessor keyAccessor();
+
+ ObjectReader valueReader();
+
+ /**
+ * Number of entries in the dict.
+ * @return the number of entries
+ */
+ @Override
+ int size();
+
+ ValueType keyColumnType();
+
+ ObjectType valueColumnType();
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/DictWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/DictWriter.java
new file mode 100644
index 0000000..0f085df
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/DictWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+/**
+ * Physically the writer is an array writer with special tuple writer as its element.
+ * Its {@link #type()} returns {@code ARRAY} and {@link #entryType()} returns {@code TUPLE}.
+ * To know whether it is a Dict logically, one may use {@code schema().isDict()}.
+ *
+ * @see org.apache.drill.exec.vector.accessor.writer.DictEntryWriter
+ * @see org.apache.drill.exec.vector.accessor.writer.ObjectDictWriter
+ * @see org.apache.drill.exec.vector.complex.DictVector
+ */
+public interface DictWriter extends ArrayWriter {
+
+ /**
+ * Returns scalar type of the key field.
+ * @return type of the key
+ */
+ ValueType keyType();
+
+ /**
+ * Returns object type of the value field.
+ * @return type of the value
+ */
+ ObjectType valueType();
+
+ /**
+ * Returns the writer associated with key field.
+ * @return key writer
+ */
+ ScalarWriter keyWriter();
+
+ /**
+ * Returns the writer associated with value field.
+ * @return value writer
+ */
+ ObjectWriter valueWriter();
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/KeyAccessor.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/KeyAccessor.java
new file mode 100644
index 0000000..d3dbed7
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/KeyAccessor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+import org.joda.time.Period;
+
+import java.math.BigDecimal;
+
+public interface KeyAccessor {
+
+ boolean find(boolean key);
+ boolean find(int key);
+ boolean find(BigDecimal key);
+ boolean find(double key);
+ boolean find(long key);
+ boolean find(String key);
+ boolean find(byte[] key);
+ boolean find(Period key);
+ boolean find(LocalDate key);
+ boolean find(LocalTime key);
+ boolean find(Instant key);
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectReader.java
index c9244b5..aff4b5b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectReader.java
@@ -34,6 +34,7 @@
TupleReader tuple();
ArrayReader array();
VariantReader variant();
+ DictReader dict();
/**
* Gets the reader as a generic type, for dynamic
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
index 5d8077c..ed3a888 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
@@ -48,6 +48,7 @@
TupleWriter tuple();
ArrayWriter array();
VariantWriter variant();
+ DictWriter dict();
/**
* Generic version of the above, for dynamic handling of
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleReader.java
index 16e90ea..e6ac2ce 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleReader.java
@@ -66,4 +66,6 @@
ArrayReader array(String colName);
VariantReader variant(int colIndex);
VariantReader variant(String colName);
+ DictReader dict(int colIndex);
+ DictReader dict(String colName);
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
index 2f5c5df..de2a374 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
@@ -113,6 +113,10 @@
VariantWriter variant(String colName);
+ DictWriter dict(int colIndex);
+
+ DictWriter dict(String colName);
+
ObjectType type(int colIndex);
ObjectType type(String colName);
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractObjectReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractObjectReader.java
index 126fbe8..7a6e6e2 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractObjectReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractObjectReader.java
@@ -20,6 +20,7 @@
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.accessor.ArrayReader;
import org.apache.drill.exec.vector.accessor.ColumnReader;
+import org.apache.drill.exec.vector.accessor.DictReader;
import org.apache.drill.exec.vector.accessor.ObjectReader;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ScalarReader;
@@ -46,6 +47,11 @@
throw new UnsupportedOperationException();
}
+ @Override
+ public DictReader dict() {
+ throw new UnsupportedOperationException();
+ }
+
public abstract ReaderEvents events();
@Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
index 77fae8f..0c699cd 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
@@ -23,6 +23,7 @@
import org.apache.drill.exec.vector.accessor.ArrayReader;
import org.apache.drill.exec.vector.accessor.ColumnReader;
import org.apache.drill.exec.vector.accessor.ColumnReaderIndex;
+import org.apache.drill.exec.vector.accessor.DictReader;
import org.apache.drill.exec.vector.accessor.ObjectReader;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ScalarReader;
@@ -38,11 +39,12 @@
public static class TupleObjectReader extends AbstractObjectReader {
- private final AbstractTupleReader tupleReader;
+ protected final AbstractTupleReader tupleReader;
public TupleObjectReader(AbstractTupleReader tupleReader) {
this.tupleReader = tupleReader;
}
+
@Override
public TupleReader tuple() {
return tupleReader;
@@ -65,10 +67,10 @@
public ColumnReader reader() { return tupleReader; }
}
- private final AbstractObjectReader readers[];
+ protected final AbstractObjectReader[] readers;
protected NullStateReader nullStateReader;
- protected AbstractTupleReader(AbstractObjectReader readers[]) {
+ protected AbstractTupleReader(AbstractObjectReader[] readers) {
this.readers = readers;
}
@@ -112,7 +114,8 @@
public ObjectReader column(String colName) {
int index = tupleSchema().index(colName);
if (index == -1) {
- return null; }
+ return null;
+ }
return readers[index];
}
@@ -167,6 +170,16 @@
}
@Override
+ public DictReader dict(int colIndex) {
+ return column(colIndex).dict();
+ }
+
+ @Override
+ public DictReader dict(String colName) {
+ return column(colName).dict();
+ }
+
+ @Override
public void reposition() {
for (AbstractObjectReader reader : readers) {
reader.events().reposition();
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
index fbafa8c..474b8d7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
@@ -130,7 +130,6 @@
* Given a 0-based index relative to the current array, return an absolute offset
* vector location for the array value.
*
- * @param index 0-based index into the current array
* @return absolute offset vector location for the array value
*/
@@ -169,12 +168,12 @@
public int logicalIndex() { return position; }
}
+ protected final AbstractObjectReader elementReader;
+ protected ElementReaderIndex elementIndex;
+ protected NullStateReader nullStateReader;
private final ColumnMetadata schema;
private final VectorAccessor arrayAccessor;
private final OffsetVectorReader offsetReader;
- private final AbstractObjectReader elementReader;
- protected ElementReaderIndex elementIndex;
- protected NullStateReader nullStateReader;
public ArrayReaderImpl(ColumnMetadata schema, VectorAccessor va,
AbstractObjectReader elementReader) {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/DictEntryReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/DictEntryReader.java
new file mode 100644
index 0000000..de9f193
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/DictEntryReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.reader;
+
+import java.util.List;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ColumnReaderIndex;
+
+/**
+ * Reader for a Dict entry. The entry is a special kind of tuple.
+ */
+public class DictEntryReader extends AbstractTupleReader {
+
+ private final ColumnMetadata schema;
+ private final VectorAccessor accessor;
+
+ protected DictEntryReader(ColumnMetadata schema, VectorAccessor accessor,
+ AbstractObjectReader[] readers) {
+ super(readers);
+ assert readers.length == 2;
+ this.schema = schema;
+ this.accessor = accessor;
+ }
+
+ public static TupleObjectReader build(ColumnMetadata schema,
+ VectorAccessor accessor,
+ AbstractObjectReader[] readers) {
+ DictEntryReader entryReader = new DictEntryReader(schema, accessor, readers);
+ entryReader.bindNullState(NullStateReaders.REQUIRED_STATE_READER);
+ return new TupleObjectReader(entryReader);
+ }
+
+ public static TupleObjectReader build(ColumnMetadata schema,
+ VectorAccessor accessor,
+ List<AbstractObjectReader> readers) {
+ AbstractObjectReader[] readerArray = new AbstractObjectReader[readers.size()];
+ return build(schema, accessor, readers.toArray(readerArray));
+ }
+
+ public AbstractObjectReader keyReader() {
+ return (AbstractObjectReader) column(0);
+ }
+
+ public AbstractObjectReader valueReader() {
+ return (AbstractObjectReader) column(1);
+ }
+
+ @Override
+ public void bindIndex(ColumnReaderIndex index) {
+ if (accessor != null) {
+ accessor.bind(index);
+ }
+ super.bindIndex(index);
+ }
+
+ @Override
+ public ColumnMetadata schema() {
+ return schema;
+ }
+
+ @Override
+ public TupleMetadata tupleSchema() {
+ return schema.tupleSchema();
+ }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/DictReaderImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/DictReaderImpl.java
new file mode 100644
index 0000000..780e243
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/DictReaderImpl.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.reader;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.KeyAccessor;
+import org.apache.drill.exec.vector.accessor.KeyAccessors;
+import org.apache.drill.exec.vector.accessor.ObjectReader;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.ScalarReader;
+import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.accessor.DictReader;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DictReaderImpl extends ArrayReaderImpl implements DictReader, ReaderEvents {
+
+ public static class DictObjectReader extends ArrayObjectReader {
+
+ public DictObjectReader(DictReaderImpl dictReader) {
+ super(dictReader);
+ }
+
+ @Override
+ public DictReader dict() {
+ return (DictReader) array();
+ }
+ }
+
+ private final ScalarReader keyReader;
+ private final AbstractObjectReader valueReader;
+ private final KeyAccessor keyAccessor;
+
+ public DictReaderImpl(ColumnMetadata metadata, VectorAccessor va, AbstractTupleReader.TupleObjectReader entryObjectReader) {
+ super(metadata, va, entryObjectReader);
+ DictEntryReader reader = (DictEntryReader) entryObjectReader.reader();
+ this.keyReader = reader.keyReader().scalar();
+ this.valueReader = reader.valueReader();
+ keyAccessor = KeyAccessors.getAccessor(this, keyReader);
+ }
+
+ public static DictObjectReader build(ColumnMetadata schema, VectorAccessor dictAccessor,
+ List<AbstractObjectReader> readers) {
+ AbstractTupleReader.TupleObjectReader entryReader = DictEntryReader.build(schema, dictAccessor, readers);
+ DictReaderImpl dictReader = new DictReaderImpl(schema, dictAccessor, entryReader);
+ dictReader.bindNullState(NullStateReaders.REQUIRED_STATE_READER);
+ return new DictObjectReader(dictReader);
+ }
+
+ @Override
+ public KeyAccessor keyAccessor() {
+ return keyAccessor;
+ }
+
+ @Override
+ public ObjectReader valueReader() {
+ return valueReader;
+ }
+
+ @Override
+ public ValueType keyColumnType() {
+ return keyReader.valueType();
+ }
+
+ @Override
+ public ObjectType valueColumnType() {
+ return valueReader.type();
+ }
+
+ @Override
+ public Map<Object, Object> getObject() {
+ rewind();
+ Map<Object, Object> map = new HashMap<>();
+ while (next()) {
+ map.put(keyReader.getObject(), valueReader.getObject());
+ }
+ return map;
+ }
+
+ @Override
+ public String getAsString() {
+ rewind();
+ StringBuilder buf = new StringBuilder();
+ buf.append("{");
+ boolean comma = false;
+ while (next()) {
+ if (comma) {
+ buf.append(", ");
+ }
+ buf.append(keyReader.getAsString())
+ .append(':')
+ .append(valueReader.getAsString());
+ comma = true;
+ }
+ buf.append("}");
+ return buf.toString();
+ }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/MapReader.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/MapReader.java
index 7329391..d1cf58f 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/MapReader.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/MapReader.java
@@ -40,12 +40,12 @@
private final VectorAccessor mapAccessor;
- protected MapReader(ColumnMetadata schema, AbstractObjectReader readers[]) {
+ protected MapReader(ColumnMetadata schema, AbstractObjectReader[] readers) {
this(schema, null, readers);
}
protected MapReader(ColumnMetadata schema,
- VectorAccessor mapAccessor, AbstractObjectReader readers[]) {
+ VectorAccessor mapAccessor, AbstractObjectReader[] readers) {
super(readers);
this.schema = schema;
this.mapAccessor = mapAccessor;
@@ -53,7 +53,7 @@
public static TupleObjectReader build(ColumnMetadata schema,
VectorAccessor mapAccessor,
- AbstractObjectReader readers[]) {
+ AbstractObjectReader[] readers) {
MapReader mapReader = new MapReader(schema, mapAccessor, readers);
mapReader.bindNullState(NullStateReaders.REQUIRED_STATE_READER);
return new TupleObjectReader(mapReader);
@@ -62,7 +62,7 @@
public static AbstractObjectReader build(ColumnMetadata schema,
VectorAccessor mapAccessor,
List<AbstractObjectReader> readers) {
- AbstractObjectReader readerArray[] = new AbstractObjectReader[readers.size()];
+ AbstractObjectReader[] readerArray = new AbstractObjectReader[readers.size()];
return build(schema, mapAccessor, readers.toArray(readerArray));
}
@@ -78,5 +78,5 @@
public ColumnMetadata schema() { return schema; }
@Override
- public TupleMetadata tupleSchema() { return schema.mapSchema(); }
+ public TupleMetadata tupleSchema() { return schema.tupleSchema(); }
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java
index f1ea09a..0385091 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java
@@ -25,6 +25,7 @@
public class NullStateReaders {
public static final RequiredStateReader REQUIRED_STATE_READER = new RequiredStateReader();
+ public static final NullStateReader NULL_STATE_READER = new AlwaysNullStateReader();
private NullStateReaders() { }
@@ -190,4 +191,15 @@
}
}
+ protected static class AlwaysNullStateReader implements NullStateReader {
+
+ @Override
+ public void bindIndex(ColumnReaderIndex rowIndex) {
+ }
+
+ @Override
+ public boolean isNull() {
+ return true;
+ }
+ }
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
index 1dfc65a..a1720ba 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
@@ -81,10 +81,10 @@
private final VectorAccessor unionAccessor;
private final VectorAccessor typeAccessor;
private final UInt1ColumnReader typeReader;
- private final AbstractObjectReader variants[];
+ private final AbstractObjectReader[] variants;
protected NullStateReader nullStateReader;
- public UnionReaderImpl(ColumnMetadata schema, VectorAccessor va, AbstractObjectReader variants[]) {
+ public UnionReaderImpl(ColumnMetadata schema, VectorAccessor va, AbstractObjectReader[] variants) {
this.schema = schema;
this.unionAccessor = va;
typeReader = new UInt1ColumnReader();
@@ -115,6 +115,7 @@
NullStateReader nullReader;
MinorType type = MinorType.values()[i];
switch(type) {
+ case DICT:
case MAP:
case LIST:
nullReader = new NullStateReaders.ComplexMemberStateReader(typeReader, type);
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
index 18702ac..b836b25 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
@@ -24,6 +24,7 @@
import org.apache.drill.exec.vector.accessor.ColumnReader;
import org.apache.drill.exec.vector.accessor.ColumnWriter;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.DictWriter;
import org.apache.drill.exec.vector.accessor.ObjectReader;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
@@ -99,7 +100,7 @@
public static class ArrayObjectWriter extends AbstractObjectWriter {
- private final AbstractArrayWriter arrayWriter;
+ protected final AbstractArrayWriter arrayWriter;
public ArrayObjectWriter(AbstractArrayWriter arrayWriter) {
this.arrayWriter = arrayWriter;
@@ -319,6 +320,11 @@
}
@Override
+ public DictWriter dict() {
+ return elementObjWriter.dict();
+ }
+
+ @Override
public int size() { return elementIndex.arraySize(); }
@Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
index d2b65ff..58ba6c2 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
@@ -21,6 +21,7 @@
import org.apache.drill.exec.vector.accessor.ArrayWriter;
import org.apache.drill.exec.vector.accessor.ColumnReader;
import org.apache.drill.exec.vector.accessor.ObjectReader;
+import org.apache.drill.exec.vector.accessor.DictWriter;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
@@ -61,6 +62,11 @@
}
@Override
+ public DictWriter dict() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public abstract WriterEvents events();
@Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
index 246b4c1..7b5309c 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
@@ -28,6 +28,7 @@
import org.apache.drill.exec.vector.accessor.ColumnReader;
import org.apache.drill.exec.vector.accessor.ColumnWriter;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.DictWriter;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
@@ -109,7 +110,7 @@
public static class TupleObjectWriter extends AbstractObjectWriter {
- private final AbstractTupleWriter tupleWriter;
+ protected final AbstractTupleWriter tupleWriter;
public TupleObjectWriter(AbstractTupleWriter tupleWriter) {
this.tupleWriter = tupleWriter;
@@ -148,6 +149,35 @@
ObjectWriter addColumn(TupleWriter tuple, MaterializedField field);
}
+ /**
+ * Wrap the outer index to avoid incrementing the array index
+ * on the call to <tt>nextElement().</tt> The increment
+ * is done at the tuple level, not the column level.
+ */
+
+ static class MemberWriterIndex implements ColumnWriterIndex {
+ private ColumnWriterIndex baseIndex;
+
+ MemberWriterIndex(ColumnWriterIndex baseIndex) {
+ this.baseIndex = baseIndex;
+ }
+
+ @Override public int rowStartIndex() { return baseIndex.rowStartIndex(); }
+ @Override public int vectorIndex() { return baseIndex.vectorIndex(); }
+ @Override public void nextElement() { }
+ @Override public void prevElement() { }
+ @Override public void rollover() { }
+
+ @Override public ColumnWriterIndex outerIndex() {
+ return baseIndex.outerIndex();
+ }
+
+ @Override
+ public String toString() {
+ return "[" + getClass().getSimpleName() + " baseIndex = " + baseIndex.toString() + "]";
+ }
+ }
+
protected static final Logger logger = LoggerFactory.getLogger(AbstractTupleWriter.class);
protected final TupleMetadata tupleSchema;
@@ -431,6 +461,16 @@
}
@Override
+ public DictWriter dict(int colIndex) {
+ return column(colIndex).dict();
+ }
+
+ @Override
+ public DictWriter dict(String colName) {
+ return column(colName).dict();
+ }
+
+ @Override
public ObjectType type(int colIndex) {
return column(colIndex).type();
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/DictEntryWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/DictEntryWriter.java
new file mode 100644
index 0000000..120b1ea
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/DictEntryWriter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.writer;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.ColumnReader;
+import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.complex.DictVector;
+
+/**
+ * Writer for a Dict entry. The entry is a special kind of tuple.
+ */
+public class DictEntryWriter extends AbstractTupleWriter {
+
+ public static class DictEntryObjectWriter extends TupleObjectWriter {
+
+ public DictEntryObjectWriter(DictEntryWriter entryWriter) {
+ super(entryWriter);
+ }
+
+ void setKeyValue(Object key, Object value) {
+ tupleWriter.set(0, key);
+ tupleWriter.set(1, value);
+ }
+
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ format.startObject(this)
+ .attribute("dictEntryWriter");
+ tupleWriter.dump(format);
+ format.endObject();
+ }
+ }
+
+ private static class DummyDictEntryWriter extends DictEntryWriter {
+
+ DummyDictEntryWriter(ColumnMetadata schema, List<AbstractObjectWriter> writers) {
+ super(schema, writers);
+ }
+
+ @Override
+ public boolean isProjected() {
+ return false;
+ }
+
+ @Override
+ public void copy(ColumnReader from) {
+ }
+ }
+
+ public static DictEntryObjectWriter buildDictEntryWriter(ColumnMetadata schema,
+ List<AbstractObjectWriter> keyValueWriters,
+ DictVector vector) {
+ DictEntryWriter dictEntryWriter;
+ if (vector != null) {
+ dictEntryWriter = new DictEntryWriter(schema, keyValueWriters);
+ } else {
+ dictEntryWriter = new DummyDictEntryWriter(schema, keyValueWriters);
+ }
+ return new DictEntryObjectWriter(dictEntryWriter);
+ }
+
+ private final ColumnMetadata dictColumnSchema;
+
+ public DictEntryWriter(ColumnMetadata schema, List<AbstractObjectWriter> writers) {
+ super(schema.tupleSchema(), writers);
+ dictColumnSchema = schema;
+ }
+
+ @Override
+ public void bindIndex(ColumnWriterIndex index) {
+
+ // Similarly to a repeated map, the provided index is an array element
+ // index. Convert this to an index that will not increment the element
+ // index on each write so that a dict with key and value members won't
+ // increment the index for each member. Rather, the index must be
+ // incremented at the array level.
+
+ bindIndex(index, new MemberWriterIndex(index));
+ }
+
+ @Override
+ public boolean isProjected() {
+ return true;
+ }
+
+ @Override
+ public ColumnMetadata schema() {
+ return dictColumnSchema;
+ }
+
+ @Override
+ public void setObject(Object value) {
+ if (value instanceof Map.Entry) {
+ Map.Entry entry = (Map.Entry) value;
+ set(0, entry.getKey());
+ set(1, entry.getValue());
+ } else {
+ super.setObject(value);
+ }
+ }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
index 38d6a5a..3245323 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
@@ -36,41 +36,6 @@
public abstract class MapWriter extends AbstractTupleWriter {
/**
- * Wrap the outer index to avoid incrementing the array index
- * on the call to <tt>nextElement().</tt> For maps, the increment
- * is done at the map level, not the column level.
- */
-
- private static class MemberWriterIndex implements ColumnWriterIndex {
- private final ColumnWriterIndex baseIndex;
-
- private MemberWriterIndex(ColumnWriterIndex baseIndex) {
- this.baseIndex = baseIndex;
- }
-
- @Override public int rowStartIndex() { return baseIndex.rowStartIndex(); }
- @Override public int vectorIndex() { return baseIndex.vectorIndex(); }
- @Override public void nextElement() { }
- @Override public void prevElement() { }
- @Override public void rollover() { }
-
- @Override public ColumnWriterIndex outerIndex() {
- return baseIndex.outerIndex();
- }
-
- @Override
- public String toString() {
- return new StringBuilder()
- .append("[")
- .append(getClass().getSimpleName())
- .append(" baseIndex = ")
- .append(baseIndex.toString())
- .append("]")
- .toString();
- }
- }
-
- /**
* Writer for a single (non-array) map. Clients don't really "write" maps;
* rather, this writer is a holder for the columns within the map, and those
* columns are what is written.
@@ -172,7 +137,7 @@
protected final ColumnMetadata mapColumnSchema;
protected MapWriter(ColumnMetadata schema, List<AbstractObjectWriter> writers) {
- super(schema.mapSchema(), writers);
+ super(schema.tupleSchema(), writers);
mapColumnSchema = schema;
}
@@ -224,7 +189,7 @@
}
public static AbstractObjectWriter buildMapWriter(ColumnMetadata schema, AbstractMapVector vector) {
- assert schema.mapSchema().size() == 0;
+ assert schema.tupleSchema().size() == 0;
return buildMapWriter(schema, vector, new ArrayList<AbstractObjectWriter>());
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectDictWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectDictWriter.java
new file mode 100644
index 0000000..a845828
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ObjectDictWriter.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.writer;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.ObjectWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.writer.dummy.DummyArrayWriter;
+import org.apache.drill.exec.vector.accessor.writer.dummy.DummyDictWriter;
+import org.apache.drill.exec.vector.complex.DictVector;
+import org.apache.drill.exec.vector.complex.RepeatedDictVector;
+
+import java.lang.reflect.Array;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The implementation represents the writer as an array writer
+ * with special dict entry writer as its element writer.
+ */
+public class ObjectDictWriter extends ObjectArrayWriter implements DictWriter {
+
+ public static class DictObjectWriter extends AbstractArrayWriter.ArrayObjectWriter {
+
+ public DictObjectWriter(DictWriter dictWriter) {
+ super((AbstractArrayWriter) dictWriter);
+ }
+
+ @Override
+ public DictWriter dict() {
+ return (DictWriter) arrayWriter;
+ }
+
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ format.startObject(this)
+ .attribute("dictWriter");
+ arrayWriter.dump(format);
+ format.endObject();
+ }
+ }
+
+ public static ObjectDictWriter.DictObjectWriter buildDict(ColumnMetadata metadata, DictVector vector,
+ List<AbstractObjectWriter> keyValueWriters) {
+ DictEntryWriter.DictEntryObjectWriter entryObjectWriter =
+ DictEntryWriter.buildDictEntryWriter(metadata, keyValueWriters, vector);
+ DictWriter objectDictWriter;
+ if (vector != null) {
+ objectDictWriter = new ObjectDictWriter(metadata, vector.getOffsetVector(), entryObjectWriter);
+ } else {
+ objectDictWriter = new DummyDictWriter(metadata, entryObjectWriter);
+ }
+ return new ObjectDictWriter.DictObjectWriter(objectDictWriter);
+ }
+
+ public static ArrayObjectWriter buildDictArray(ColumnMetadata metadata, RepeatedDictVector vector,
+ List<AbstractObjectWriter> keyValueWriters) {
+ final DictVector dataVector;
+ if (vector != null) {
+ dataVector = (DictVector) vector.getDataVector();
+ } else {
+ dataVector = null;
+ }
+ ObjectDictWriter.DictObjectWriter dictWriter = buildDict(metadata, dataVector, keyValueWriters);
+ AbstractArrayWriter arrayWriter;
+ if (vector != null) {
+ arrayWriter = new ObjectArrayWriter(metadata, vector.getOffsetVector(), dictWriter);
+ } else {
+ arrayWriter = new DummyArrayWriter(metadata, dictWriter);
+ }
+ return new ArrayObjectWriter(arrayWriter);
+ }
+
+ public static final int FIELD_KEY_ORDINAL = 0;
+ public static final int FIELD_VALUE_ORDINAL = 1;
+
+ private final DictEntryWriter.DictEntryObjectWriter entryObjectWriter;
+
+ public ObjectDictWriter(ColumnMetadata schema, UInt4Vector offsetVector,
+ DictEntryWriter.DictEntryObjectWriter entryObjectWriter) {
+ super(schema, offsetVector, entryObjectWriter);
+ this.entryObjectWriter = entryObjectWriter;
+ }
+
+ @Override
+ public ValueType keyType() {
+ return tuple().scalar(FIELD_KEY_ORDINAL).valueType();
+ }
+
+ @Override
+ public ObjectType valueType() {
+ return tuple().type(FIELD_VALUE_ORDINAL);
+ }
+
+ @Override
+ public ScalarWriter keyWriter() {
+ return tuple().scalar(FIELD_KEY_ORDINAL);
+ }
+
+ @Override
+ public ObjectWriter valueWriter() {
+ return tuple().column(FIELD_VALUE_ORDINAL);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setObject(Object object) {
+ if (object == null) {
+ return;
+ }
+
+ if (object instanceof Map) {
+ Map<Object, Object> map = (Map<Object, Object>) object;
+ for (Map.Entry<Object, Object> entry : map.entrySet()) {
+ entryObjectWriter.setKeyValue(entry.getKey(), entry.getValue());
+ save();
+ }
+ } else {
+ int size = Array.getLength(object);
+ assert size % 2 == 0;
+ for (int i = 0; i < size; i += 2) {
+ Object key = Array.get(object, i);
+ Object value = Array.get(object, i + 1);
+ entryObjectWriter.setKeyValue(key, value);
+ save();
+ }
+ }
+ }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyDictWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyDictWriter.java
new file mode 100644
index 0000000..7e38470
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyDictWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.writer.dummy;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.ObjectWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.accessor.writer.DictEntryWriter;
+import org.apache.drill.exec.vector.accessor.writer.ObjectDictWriter;
+
+public class DummyDictWriter extends DummyArrayWriter implements DictWriter {
+
+ public DummyDictWriter(ColumnMetadata schema, DictEntryWriter.DictEntryObjectWriter entryObjectWriter) {
+ super(schema, entryObjectWriter);
+ }
+
+ @Override
+ public ValueType keyType() {
+ return tuple().scalar(ObjectDictWriter.FIELD_KEY_ORDINAL).valueType();
+ }
+
+ @Override
+ public ObjectType valueType() {
+ return tuple().type(ObjectDictWriter.FIELD_VALUE_ORDINAL);
+ }
+
+ @Override
+ public ScalarWriter keyWriter() {
+ return tuple().scalar(ObjectDictWriter.FIELD_KEY_ORDINAL);
+ }
+
+ @Override
+ public ObjectWriter valueWriter() {
+ return tuple().column(ObjectDictWriter.FIELD_VALUE_ORDINAL);
+ }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/DictVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/DictVector.java
index 2fadebc..9c86946 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/DictVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/DictVector.java
@@ -23,6 +23,7 @@
import java.util.Map;
import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
@@ -44,6 +45,7 @@
* it may have 2 children only, named {@link #FIELD_KEY_NAME} and {@link #FIELD_VALUE_NAME}.
* The {@link #FIELD_KEY_NAME} can be of primitive type only and its values should not be {@code null},
* while the other, {@link #FIELD_VALUE_NAME}, field can be either of primitive or complex type.
+ * Value field can hold {@code null} values.
*
* <p>This vector has it's own {@link org.apache.drill.exec.vector.complex.reader.FieldReader} and
* {@link org.apache.drill.exec.vector.complex.writer.FieldWriter} to ensure data is read and written correctly.
@@ -255,7 +257,12 @@
@Override
MajorType getLastPathType() {
- return valueType;
+ if (Types.isRepeated(valueType)) {
+ return valueType;
+ }
+ return valueType.toBuilder()
+ .setMode(TypeProtos.DataMode.OPTIONAL)
+ .build();
}
@Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedDictReaderImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedDictReaderImpl.java
index 7ebac29..22e2183 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedDictReaderImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedDictReaderImpl.java
@@ -138,6 +138,6 @@
@Override
public String getTypeString() {
- return "ARRAY<" + reader.getTypeString() + '>';
+ return "ARRAY<" + reader().getTypeString() + '>';
}
}
diff --git a/exec/vector/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java b/exec/vector/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java
index 0a6bb78..2e41986 100644
--- a/exec/vector/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java
+++ b/exec/vector/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java
@@ -240,7 +240,7 @@
assertTrue(map.isMap());
assertEquals(TypeProtos.DataMode.REQUIRED, map.mode());
- TupleMetadata mapSchema = map.mapSchema();
+ TupleMetadata mapSchema = map.tupleSchema();
assertFalse(mapSchema.metadata("m1").isNullable());
assertTrue(mapSchema.metadata("m2").isNullable());
}
@@ -259,7 +259,7 @@
ColumnMetadata mapArray = schema.metadata("ma");
assertTrue(mapArray.isArray());
assertTrue(mapArray.isMap());
- TupleMetadata mapSchema = mapArray.mapSchema();
+ TupleMetadata mapSchema = mapArray.tupleSchema();
assertFalse(mapSchema.metadata("m1").isNullable());
assertTrue(mapSchema.metadata("m2").isNullable());
}
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java
index 3f3e40b..4643c31 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java
@@ -50,7 +50,7 @@
while (!colPath.isLastPath() && colMetadata != null) {
if (colMetadata.isDict()) {
// get dict's value field metadata
- colMetadata = colMetadata.mapSchema().metadata(0).mapSchema().metadata(1);
+ colMetadata = colMetadata.tupleSchema().metadata(0).tupleSchema().metadata(1);
break;
}
if (!colMetadata.isMap()) {
@@ -58,7 +58,7 @@
break;
}
colPath = (PathSegment.NameSegment) colPath.getChild();
- colMetadata = colMetadata.mapSchema().metadata(colPath.getPath());
+ colMetadata = colMetadata.tupleSchema().metadata(colPath.getPath());
}
return colMetadata;
}
@@ -94,7 +94,7 @@
throw new DrillRuntimeException(String.format("Expected map or dict, but was %s", colMetadata.majorType()));
}
- schema = colMetadata.mapSchema();
+ schema = colMetadata.tupleSchema();
colPath = (PathSegment.NameSegment) colPath.getChild();
}