DRILL-7377: Nested schemas for dynamic EVF columns
The Result Set Loader (part of EVF) allows adding columns up-front
before reading rows (so-called "early schema.") Such schemas allow
nested columns (maps with members, repeated lists with a type, etc.)
The Result Set Loader also allows adding columns dynamically
while loading data (so-called "late schema".) Previously, the code
assumed that columns would be added top-down: first the map, then
the map's contents, etc.
Charles found a need to allow adding a nested column (a repeated
list with a declared list type.)
This patch revises the code to use the same mechanism in both the
early- and late-schema cases, allowing adding nested columns at
any time.
Testing: Added a new unit test case for the repeated list late
schema with content case.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java
index e0d9e27..bf1256d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/BuildFromSchema.java
@@ -32,6 +32,21 @@
* <p>
* Recursion is much easier if we can go bottom-up. But, writers
* require top-down construction.
+ * <p>
+ * This particular class builds a column and all its contents.
+ * For example, given a map, which contains a repeated list which
+ * contains a repeated INT, this class first builds the map,
+ * then adds the repeated list, then adds the INT array. To do
+ * so, it will create a copy of the structured metadata.
+ * <p>
+ * A drawback of this approach is that the metadata objects used
+ * in the "parent" writers will be copies of, not the same as, those
+ * in the schema from which we are building the writers. At present,
+ * this is not an issue, but it is something to be aware of as uses
+ * become more sophisticated.
+ * <p>
+ * This class contrasts with the @{link ColumnBuilder} class which
+ * builds the structure within a single vector and writer.
*/
public class BuildFromSchema {
@@ -49,6 +64,11 @@
ObjectWriter add(ColumnMetadata colSchema);
}
+ /**
+ * Shim used for adding a column to a tuple directly.
+ * This method will recursively invoke this builder
+ * to expand any nested content.
+ */
private static class TupleShim implements ParentShim {
private final TupleWriter writer;
@@ -63,6 +83,24 @@
}
}
+ /**
+ * Shim used when implementing the add of a column to
+ * a tuple in the result set loader. Directly calls the
+ * internal method to add a column to the "tuple state."
+ */
+ private static class TupleStateShim implements ParentShim {
+ private final TupleState state;
+
+ public TupleStateShim(TupleState state) {
+ this.state = state;
+ }
+
+ @Override
+ public ObjectWriter add(ColumnMetadata colSchema) {
+ return state.addColumn(colSchema).writer();
+ }
+ }
+
private static class UnionShim implements ParentShim {
private final VariantWriter writer;
@@ -89,6 +127,12 @@
}
}
+ private static BuildFromSchema instance = new BuildFromSchema();
+
+ private BuildFromSchema() { }
+
+ public static BuildFromSchema instance() { return instance; }
+
/**
* When creating a schema up front, provide the schema of the desired tuple,
* then build vectors and writers to match. Allows up-front schema definition
@@ -105,17 +149,36 @@
}
}
- private void buildColumn(ParentShim parent, ColumnMetadata colSchema) {
+ /**
+ * Build a column recursively. Called internally when adding a column
+ * via the addColumn() method on the tuple writer.
+ */
+
+ public ObjectWriter buildColumn(TupleState state, ColumnMetadata colSchema) {
+ return buildColumn(new TupleStateShim(state), colSchema);
+ }
+
+ /**
+ * Build the column writer, and any nested content, returning the built
+ * column writer as a generic object writer.
+ *
+ * @param parent the shim that implements the logic to add a column
+ * to a tuple, list, repeated list, or union.
+ * @param colSchema the schema of the column to add
+ * @return the object writer for the added column
+ */
+
+ private ObjectWriter buildColumn(ParentShim parent, ColumnMetadata colSchema) {
if (colSchema.isMultiList()) {
- buildRepeatedList(parent, colSchema);
+ return buildRepeatedList(parent, colSchema);
} else if (colSchema.isMap()) {
- buildMap(parent, colSchema);
+ return buildMap(parent, colSchema);
} else if (isSingleList(colSchema)) {
- buildSingleList(parent, colSchema);
+ return buildSingleList(parent, colSchema);
} else if (colSchema.isVariant()) {
- buildVariant(parent, colSchema);
+ return buildVariant(parent, colSchema);
} else {
- buildPrimitive(parent, colSchema);
+ return buildPrimitive(parent, colSchema);
}
}
@@ -133,13 +196,14 @@
return colSchema.isVariant() && colSchema.isArray() && colSchema.variantSchema().isSingleType();
}
- private void buildPrimitive(ParentShim parent, ColumnMetadata colSchema) {
- parent.add(colSchema);
+ private ObjectWriter buildPrimitive(ParentShim parent, ColumnMetadata colSchema) {
+ return parent.add(colSchema);
}
- private void buildMap(ParentShim parent, ColumnMetadata colSchema) {
+ private ObjectWriter buildMap(ParentShim parent, ColumnMetadata colSchema) {
final ObjectWriter colWriter = parent.add(colSchema.cloneEmpty());
expandMap(colWriter, colSchema);
+ return colWriter;
}
private void expandMap(ObjectWriter colWriter, ColumnMetadata colSchema) {
@@ -162,9 +226,10 @@
* @param colSchema the schema of the variant (LIST or UNION) column
*/
- private void buildVariant(ParentShim parent, ColumnMetadata colSchema) {
+ private ObjectWriter buildVariant(ParentShim parent, ColumnMetadata colSchema) {
final ObjectWriter colWriter = parent.add(colSchema.cloneEmpty());
expandVariant(colWriter, colSchema);
+ return colWriter;
}
private void expandVariant(ObjectWriter colWriter, ColumnMetadata colSchema) {
@@ -182,13 +247,14 @@
}
}
- private void buildSingleList(ParentShim parent, ColumnMetadata colSchema) {
+ private ObjectWriter buildSingleList(ParentShim parent, ColumnMetadata colSchema) {
final ColumnMetadata seed = colSchema.cloneEmpty();
final ColumnMetadata subtype = colSchema.variantSchema().listSubtype();
seed.variantSchema().addType(subtype.cloneEmpty());
seed.variantSchema().becomeSimple();
final ObjectWriter listWriter = parent.add(seed);
expandColumn(listWriter, subtype);
+ return listWriter;
}
/**
@@ -200,14 +266,16 @@
* @param colSchema schema definition of the array
*/
- private void buildRepeatedList(ParentShim parent, ColumnMetadata colSchema) {
+ private ObjectWriter buildRepeatedList(ParentShim parent, ColumnMetadata colSchema) {
final ColumnMetadata seed = colSchema.cloneEmpty();
- final RepeatedListWriter listWriter = (RepeatedListWriter) parent.add(seed).array();
+ final ObjectWriter objWriter = parent.add(seed);
+ final RepeatedListWriter listWriter = (RepeatedListWriter) objWriter.array();
final ColumnMetadata elements = colSchema.childSchema();
if (elements != null) {
final RepeatedListShim listShim = new RepeatedListShim(listWriter);
buildColumn(listShim, elements);
}
+ return objWriter;
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
index 466214e..57b07fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
@@ -78,7 +78,13 @@
* projected (not in the list), then creates a dummy writer. Issues an error if
* the column is projected, but the implied projection type is incompatible with
* the actual type. (Such as trying to project an INT as x[0].)
+ * <p>
+ * This class builds the internal structure of a vector. If building a "container"
+ * vector (map, list, repeated list or union), this class expects the container
+ * to be added empty, then the members to be added one by one. See
+ * {@link BuildFromSchema} for the class that builds up a compound structure.
*/
+
public class ColumnBuilder {
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
index 2cfdeec..4d70065 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
@@ -310,7 +310,7 @@
// won't be if known up front.
logger.debug("Schema: " + options.schema.toString());
- new BuildFromSchema().buildTuple(rootWriter, options.schema);
+ BuildFromSchema.instance().buildTuple(rootWriter, options.schema);
}
// If we want to project nothing, then we do, in fact, have a
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
index 0c6c6af..9d8f798 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/TupleState.java
@@ -21,7 +21,6 @@
import java.util.Collection;
import java.util.List;
-import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.resultSet.ProjectionSet;
import org.apache.drill.exec.physical.resultSet.ResultVectorCache;
import org.apache.drill.exec.physical.resultSet.impl.ColumnState.BaseContainerColumnState;
@@ -490,19 +489,7 @@
@Override
public ObjectWriter addColumn(TupleWriter tupleWriter, ColumnMetadata columnSchema) {
-
- // Verify name is not a (possibly case insensitive) duplicate.
-
- final TupleMetadata tupleSchema = schema();
- final String colName = columnSchema.name();
- if (tupleSchema.column(colName) != null) {
- throw UserException
- .validationError()
- .message("Duplicate column name: ", colName)
- .build(ResultSetLoaderImpl.logger);
- }
-
- return addColumn(columnSchema).writer();
+ return BuildFromSchema.instance().buildColumn(this, columnSchema);
}
@Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderRepeatedList.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderRepeatedList.java
index 880b2d8..2010cc3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderRepeatedList.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderRepeatedList.java
@@ -17,22 +17,17 @@
*/
package org.apache.drill.exec.physical.resultSet.impl;
-import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
-import static org.apache.drill.test.rowSet.RowSetUtilities.singleObjArray;
-import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
import java.util.Arrays;
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.ColumnMetadata.StructureType;
@@ -48,14 +43,19 @@
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.ValueType;
import org.apache.drill.exec.vector.accessor.writer.RepeatedListWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.exec.physical.rowSet.RowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
import org.apache.drill.test.rowSet.RowSetUtilities;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.singleObjArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
/**
* Tests repeated list support. Repeated lists add another layer of dimensionality
@@ -173,6 +173,9 @@
public void test2DLateSchemaIncremental() {
final TupleMetadata schema = new SchemaBuilder()
.add("id", MinorType.INT)
+ .addRepeatedList("list1")
+ .addArray(MinorType.VARCHAR)
+ .resumeSchema()
.addRepeatedList("list2")
.addArray(MinorType.VARCHAR)
.resumeSchema()
@@ -199,7 +202,7 @@
// Sanity check of writer structure
assertEquals(2, writer.size());
- final ObjectWriter listObj = writer.column("list2");
+ final ObjectWriter listObj = writer.column("list1");
assertEquals(ObjectType.ARRAY, listObj.type());
final ArrayWriter listWriter = listObj.array();
@@ -221,7 +224,7 @@
// Define the inner type.
final RepeatedListWriter listWriterImpl = (RepeatedListWriter) listWriter;
- listWriterImpl.defineElement(MaterializedField.create("list2", Types.repeated(MinorType.VARCHAR)));
+ listWriterImpl.defineElement(MaterializedField.create("list1", Types.repeated(MinorType.VARCHAR)));
// Sanity check of completed structure
@@ -234,17 +237,45 @@
// Write values
writer
- .addRow(5, objArray(strArray("a", "b"), strArray("c", "d")));
+ .addRow(5, objArray(strArray("a1", "b1"), strArray("c1", "d1")));
+
+ // Add the second list, with a complete type
+
+ writer.addColumn(schema.metadata(2));
+
+ // Sanity check of writer structure
+
+ assertEquals(3, writer.size());
+ final ObjectWriter list2Obj = writer.column("list2");
+ assertEquals(ObjectType.ARRAY, list2Obj.type());
+ final ArrayWriter list2Writer = list2Obj.array();
+ assertEquals(ObjectType.ARRAY, list2Writer.entryType());
+ final ArrayWriter inner2Writer = list2Writer.array();
+ assertEquals(ObjectType.SCALAR, inner2Writer.entryType());
+ final ScalarWriter str2Writer = inner2Writer.scalar();
+ assertEquals(ValueType.STRING, str2Writer.valueType());
+
+ // Write values
+
+ writer
+ .addRow(6,
+ objArray(strArray("a2", "b2"), strArray("c2", "d2")),
+ objArray(strArray("w2", "x2"), strArray("y2", "z2")));
+
+ // Add the second list, with a complete type
// Verify the values.
// (Relies on the row set level repeated list tests having passed.)
final RowSet expected = fixture.rowSetBuilder(schema)
- .addRow(1, objArray())
- .addRow(2, objArray())
- .addRow(3, objArray())
- .addRow(4, objArray(objArray(), null))
- .addRow(5, objArray(strArray("a", "b"), strArray("c", "d")))
+ .addRow(1, objArray(), objArray())
+ .addRow(2, objArray(), objArray())
+ .addRow(3, objArray(), objArray())
+ .addRow(4, objArray(objArray(), null), objArray())
+ .addRow(5, objArray(strArray("a1", "b1"), strArray("c1", "d1")), objArray())
+ .addRow(6,
+ objArray(strArray("a2", "b2"), strArray("c2", "d2")),
+ objArray(strArray("w2", "x2"), strArray("y2", "z2")))
.build();
RowSetUtilities.verify(expected, fixture.wrap(rsLoader.harvest()));
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
index 659d9da..948b8c7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -32,6 +33,8 @@
import org.apache.drill.exec.vector.accessor.TupleWriter;
import org.apache.drill.exec.vector.accessor.VariantWriter;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implementation for a writer for a tuple (a row or a map.) Provides access to each
@@ -143,6 +146,8 @@
ObjectWriter addColumn(TupleWriter tuple, MaterializedField field);
}
+ protected static final Logger logger = LoggerFactory.getLogger(AbstractTupleWriter.class);
+
protected final TupleMetadata tupleSchema;
protected final List<AbstractObjectWriter> writers;
protected ColumnWriterIndex vectorIndex;
@@ -205,20 +210,29 @@
@Override
public int addColumn(ColumnMetadata column) {
- if (listener == null) {
- throw new UnsupportedOperationException("addColumn");
- }
+ verifyAddColumn(column.name());
final AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, column);
return addColumnWriter(colWriter);
}
@Override
public int addColumn(MaterializedField field) {
+ verifyAddColumn(field.getName());
+ final AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, field);
+ return addColumnWriter(colWriter);
+ }
+
+ private void verifyAddColumn(String colName) {
if (listener == null) {
throw new UnsupportedOperationException("addColumn");
}
- final AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, field);
- return addColumnWriter(colWriter);
+
+ if (tupleSchema().column(colName) != null) {
+ throw UserException
+ .validationError()
+ .message("Duplicate column name: ", colName)
+ .build(logger);
+ }
}
@Override
@@ -429,6 +443,8 @@
this.listener = listener;
}
+ public TupleWriterListener listener() { return listener; }
+
@Override
public void bindListener(ColumnWriterListener listener) { }