DRILL-7454: Convert Avro to EVF
1. Replaced old format implementation with EVF.
2. Updated, added and improved performance for Avro tests.
3. Code refactoring.
closes #1951
diff --git a/common/src/test/java/org/apache/drill/test/BaseTest.java b/common/src/test/java/org/apache/drill/test/BaseTest.java
index adf0d5e..6256b5e 100644
--- a/common/src/test/java/org/apache/drill/test/BaseTest.java
+++ b/common/src/test/java/org/apache/drill/test/BaseTest.java
@@ -34,7 +34,7 @@
*/
ProtobufPatcher.patch();
/*
- * Some libraries, such as Hadoop or HBase, depend on incompatible versions of Guava.
+ * Some libraries, such as Hadoop, HBase, Iceberg depend on incompatible versions of Guava.
* This code adds back some methods to so that the libraries can work with single Guava version.
*/
GuavaPatcher.patch();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedReader.java
index a9df2e0..b033685 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedReader.java
@@ -19,8 +19,8 @@
/**
* Extended version of a record reader which uses a size-aware batch mutator.
- * Use this for all new readers. Replaces the original {@link RecordReader}
- * interface.
+ * Use this for all new readers. Replaces the original
+ * {@link org.apache.drill.exec.store.RecordReader} interface.
* <p>
* This interface is used to create readers that work with the projection
* mechanism to provide services for handling projection, setting up the result
@@ -33,12 +33,12 @@
* <ul>
* <li>Constructor: allocate no resources. Obtain a reference to a reader-specific
* schema and projection manager.</li>
- * <li>{@link open()}: Use the provided {@link SchemaNegotiator} to configure the
+ * <li>{@link #open(SchemaNegotiator)}: Use the provided {@link SchemaNegotiator} to configure the
* scanner framework for this reader by specifying a schema (if known), desired
* row counts and other configuration options. Call {@link SchemaNegotiator#build()}
- * to obtain a {@link ResultSetLoader} to use to capture the rows that the reader
- * reads.</li>
- * <li>{@link next()}: called for each batch. The batch is written using the
+ * to obtain a {@link org.apache.drill.exec.physical.resultSet.RowSetLoader}
+ * to use to capture the rows that the reader reads.</li>
+ * <li>{@link #next()}: called for each batch. The batch is written using the
* result set loader obtained above. The scanner framework handles details of
* tracking version changes, handling overflow, limiting record counts, and
* so on. Return <tt>true</tt> to indicate a batch is available, <tt>false</tt>
@@ -48,7 +48,7 @@
* <tt>next()</tt> returns </tt>false</tt>.</li>
* <p>
* If an error occurs, the reader can throw a {@link RuntimeException}
- * from any method. A {@link UserException} is preferred to provide
+ * from any method. A <tt>UserException</tt> is preferred to provide
* detailed information about the source of the problem.
*/
@@ -59,7 +59,7 @@
* to <tt>next()</tt>. Allocate resources here, not in the constructor.
* Example: open files, allocate buffers, etc.
*
- * @param schemaNegotiator mechanism to negotiate select and table
+ * @param negotiator mechanism to negotiate select and table
* schemas, then create the row set reader used to load data into
* value vectors
*
@@ -93,7 +93,7 @@
* <tt>next()</tt> should be called again, <tt>false</tt> to indicate
* that EOF was reached
*
- * @throws RutimeException (<tt>UserException</tt> preferred) if an
+ * @throws RuntimeException (<tt>UserException</tt> preferred) if an
* error occurs that should fail the query.
*/
@@ -106,7 +106,7 @@
* <tt>open()</tt> returns normally; will not be called if <tt>open()</tt>
* throws an exception.
*
- * @throws RutimeException (<tt>UserException</tt> preferred) if an
+ * @throws RuntimeException (<tt>UserException</tt> preferred) if an
* error occurs that should fail the query.
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
index 9acc830..edc6acf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -232,7 +232,7 @@
* per batch for this scan. Readers can adjust this, but the adjustment is capped
* at the value specified here
*
- * @param scanBatchSize maximum records per batch
+ * @param batchRecordLimit maximum records per batch
*/
public void setBatchRecordLimit(int batchRecordLimit) {
@@ -250,7 +250,7 @@
* nullable int. This type is used for all missing columns. (Readers
* that need per-column control need a different mechanism.)
*
- * @param nullType
+ * @param nullType the type to use for null columns
*/
public void setNullType(MajorType nullType) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java
index 5f0680b..3220efa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/SimpleReaderBuilder.java
@@ -31,7 +31,6 @@
import org.apache.drill.exec.physical.resultSet.model.ReaderBuilder;
import org.apache.drill.exec.physical.resultSet.model.ReaderIndex;
import org.apache.drill.exec.physical.rowSet.IndirectRowIndex;
-import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSetReaderImpl;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -57,7 +56,8 @@
* <p>
* Derived classes handle the details of the various kinds of readers.
* Today there is a single subclass that builds (test-time)
- * {@link RowSet} objects. The idea, however, is that we may eventually
+ * {@link org.apache.drill.exec.physical.rowSet.RowSet} objects.
+ * The idea, however, is that we may eventually
* want to create a "result set reader" for use in internal operators,
* in parallel to the "result set loader". The result set reader would
* handle a stream of incoming batches. The extant RowSet class handles
@@ -200,7 +200,7 @@
private AbstractObjectReader buildUnion(UnionVector vector, VectorAccessor unionAccessor, VectorDescrip descrip) {
final MetadataProvider provider = descrip.childProvider();
- final AbstractObjectReader variants[] = new AbstractObjectReader[MinorType.values().length];
+ final AbstractObjectReader[] variants = new AbstractObjectReader[MinorType.values().length];
int i = 0;
for (final MinorType type : vector.getField().getType().getSubTypeList()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTupleImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTupleImpl.java
index 9e0d783..3e91466 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTupleImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/project/RequestedTupleImpl.java
@@ -274,14 +274,12 @@
member.projectAllElements();
return;
- } else if (member.hasIndex(index)) {
- throw UserException
- .validationError()
- .message("Duplicate array index in project list: %s[%d]",
- member.fullName(), index)
- .build(logger);
}
- member.addIndex(index);
+
+ // Allow duplicate indexes. Example: z[0], z[0]['orange']
+ if (!member.hasIndex(index)) {
+ member.addIndex(index);
+ }
// Drills SQL parser does not support map arrays: a[0].c
// But, the SchemaPath does support them, so no harm in
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExtendableRelDataType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExtendableRelDataType.java
deleted file mode 100644
index f41ad2a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExtendableRelDataType.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.logical;
-
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.drill.exec.planner.types.ExtendableRelDataTypeHolder;
-import org.apache.drill.exec.planner.types.RelDataTypeDrillImpl;
-
-/**
- * RelDataType for non-dynamic table structure which
- * may be extended by adding partitions or implicit columns.
- */
-public class ExtendableRelDataType extends RelDataTypeDrillImpl {
-
- public ExtendableRelDataType(ExtendableRelDataTypeHolder holder, RelDataTypeFactory typeFactory) {
- super(holder, typeFactory);
- }
-
- @Override
- protected void generateTypeString(StringBuilder sb, boolean withDetail) {
- sb.append("(ExtendableRelDataType").append(getFieldNames()).append(")");
- }
-
- @Override
- public boolean isDynamicStruct() {
- return false;
- }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java
index 9ad751d..08dcc47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/DrillValidator.java
@@ -18,8 +18,6 @@
package org.apache.drill.exec.planner.sql.conversion;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -29,25 +27,19 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.validate.SelectScope;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
import org.apache.calcite.sql.validate.SqlValidatorImpl;
import org.apache.calcite.sql.validate.SqlValidatorScope;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.util.Static;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
class DrillValidator extends SqlValidatorImpl {
- private final OptionManager sessionOptions;
-
DrillValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader,
- RelDataTypeFactory typeFactory, SqlConformance conformance, OptionManager sessionOptions) {
+ RelDataTypeFactory typeFactory, SqlConformance conformance) {
super(opTab, catalogReader, typeFactory, conformance);
- this.sessionOptions = sessionOptions;
}
@Override
@@ -79,31 +71,6 @@
return SqlValidatorUtil.getAlias(node, ordinal);
}
- /**
- * Checks that specified expression is not implicit column and
- * adds it to a select list, ensuring that its alias does not
- * clash with any existing expressions on the list.
- * <p>
- * This method may be used when {@link RelDataType#isDynamicStruct}
- * method returns false. Each column from table row type except
- * the implicit is added into specified list, aliases and fieldList.
- * In the opposite case when {@link RelDataType#isDynamicStruct}
- * returns true, only dynamic star is added into specified
- * list, aliases and fieldList.
- */
- @Override
- protected void addToSelectList(List<SqlNode> list,
- Set<String> aliases,
- List<Map.Entry<String, RelDataType>> fieldList,
- SqlNode exp,
- SelectScope scope,
- final boolean includeSystemVars) {
- if (!ColumnExplorer.initImplicitFileColumns(sessionOptions)
- .containsKey(SqlValidatorUtil.getAlias(exp, -1))) {
- super.addToSelectList(list, aliases, fieldList, exp, scope, includeSystemVars);
- }
- }
-
@Override
protected void inferUnknownTypes(RelDataType inferredType, SqlValidatorScope scope, SqlNode node) {
// calls validateQuery() for SqlSelect to be sure that temporary table name will be changed
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java
index 522a8fb..03ed970 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/conversion/SqlConverter.java
@@ -63,6 +63,8 @@
import org.apache.drill.exec.planner.types.DrillRelDataTypeSystem;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.util.Utilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Class responsible for managing:
@@ -73,7 +75,7 @@
* <ul/>
*/
public class SqlConverter {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SqlConverter.class);
+ private static final Logger logger = LoggerFactory.getLogger(SqlConverter.class);
private final JavaTypeFactory typeFactory;
private final SqlParser.Config parserConfig;
@@ -137,7 +139,7 @@
this::getDefaultSchema);
this.opTab = new ChainedSqlOperatorTable(Arrays.asList(context.getDrillOperatorTable(), catalog));
this.costFactory = (settings.useDefaultCosting()) ? null : new DrillCostBase.DrillCostFactory();
- this.validator = new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance(), session.getOptions());
+ this.validator = new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance());
validator.setIdentifierExpansion(true);
cluster = null;
}
@@ -158,7 +160,7 @@
this.catalog = catalog;
this.opTab = parent.opTab;
this.planner = parent.planner;
- this.validator = new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance(), parent.session.getOptions());
+ this.validator = new DrillValidator(opTab, catalog, typeFactory, parserConfig.conformance());
this.temporarySchema = parent.temporarySchema;
this.session = parent.session;
this.drillConfig = parent.drillConfig;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/ExtendableRelDataTypeHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/ExtendableRelDataTypeHolder.java
deleted file mode 100644
index ce024e2..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/types/ExtendableRelDataTypeHolder.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.types;
-
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-import java.util.List;
-
-/**
- * Holder for list of RelDataTypeField which may be expanded by implicit columns.
- */
-public class ExtendableRelDataTypeHolder extends AbstractRelDataTypeHolder {
- private final List<String> implicitColumnNames;
-
- public ExtendableRelDataTypeHolder(List<RelDataTypeField> fields, List<String> implicitColumnNames) {
- super(fields);
- this.implicitColumnNames = implicitColumnNames;
- }
-
- /**
- * Returns RelDataTypeField field with specified name.
- * If field is implicit and absent in the fields list, it will be added.
- *
- * @param typeFactory RelDataTypeFactory which will be used
- * for the creation of RelDataType for new fields.
- * @param fieldName name of the field.
- * @return RelDataTypeField field
- */
- public RelDataTypeField getField(RelDataTypeFactory typeFactory, String fieldName) {
-
- /* First check if this field name exists in our field list */
- for (RelDataTypeField f : fields) {
- if (fieldName.equalsIgnoreCase(f.getName())) {
- return f;
- }
- }
- RelDataTypeField newField = null;
-
- if (isImplicitField(fieldName)) {
- // This implicit field does not exist in our field list, add it
- newField = new RelDataTypeFieldImpl(
- fieldName,
- fields.size(),
- typeFactory.createTypeWithNullability(
- typeFactory.createSqlType(SqlTypeName.VARCHAR), true));
- fields.add(newField);
- }
- return newField;
- }
-
- /**
- * Checks that specified field is implicit.
- *
- * @param fieldName name of the field which should be checked
- * @return {@code true} if specified filed is implicit
- */
- private boolean isImplicitField(String fieldName) {
- for (String implicitColumn : implicitColumnNames) {
- if (implicitColumn.equalsIgnoreCase(fieldName)) {
- return true;
- }
- }
- return false;
- }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
new file mode 100644
index 0000000..6213015
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.FsInput;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.stream.IntStream;
+
+public class AvroBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+
+ private static final Logger logger = LoggerFactory.getLogger(AvroBatchReader.class);
+
+ private Path filePath;
+ private long endPosition;
+ private DataFileReader<GenericRecord> reader;
+ private ResultSetLoader loader;
+ private List<ColumnConverter> converters;
+ // re-use container instance
+ private GenericRecord record = null;
+
+ @Override
+ public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
+ FileSplit split = negotiator.split();
+ filePath = split.getPath();
+
+ // Avro files are splittable, define reading start / end positions
+ long startPosition = split.getStart();
+ endPosition = startPosition + split.getLength();
+
+ logger.debug("Processing Avro file: {}, start position: {}, end position: {}",
+ filePath, startPosition, endPosition);
+
+ reader = prepareReader(split, negotiator.fileSystem(),
+ negotiator.userName(), negotiator.context().getFragmentContext().getQueryUserName());
+
+ logger.debug("Avro file schema: {}", reader.getSchema());
+ TupleMetadata schema = AvroSchemaUtil.convert(reader.getSchema());
+ logger.debug("Avro file converted schema: {}", schema);
+ negotiator.setTableSchema(schema, true);
+ loader = negotiator.build();
+ converters = ColumnConvertersUtil.initConverters(schema, loader.writer());
+
+ return true;
+ }
+
+ @Override
+ public boolean next() {
+ RowSetLoader rowWriter = loader.writer();
+ while (!rowWriter.isFull()) {
+ if (!nextLine(rowWriter)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void close() {
+ if (reader == null) {
+ return;
+ }
+
+ try {
+ reader.close();
+ } catch (IOException e) {
+ logger.warn("Error closing Avro reader: {}", e.getMessage(), e);
+ } finally {
+ reader = null;
+ }
+ }
+
+ @Override
+ public String toString() {
+ long currentPosition = -1L;
+ try {
+ if (reader != null) {
+ currentPosition = reader.tell();
+ }
+ } catch (IOException e) {
+ logger.trace("Unable to obtain Avro reader position: {}", e.getMessage(), e);
+ }
+ return "AvroBatchReader[File=" + filePath
+ + ", Position=" + currentPosition
+ + "]";
+ }
+
+ /**
+ * Initialized Avro data reader based on given file system and file path.
+ * Moves reader to the sync point from where to start reading the data.
+ *
+ * @param fileSplit file split
+ * @param fs file system
+ * @param opUserName name of the user whom to impersonate while reading the data
+ * @param queryUserName name of the user who issues the query
+ * @return Avro file reader
+ */
+ private DataFileReader<GenericRecord> prepareReader(FileSplit fileSplit, FileSystem fs, String opUserName, String queryUserName) {
+ try {
+ UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+ DataFileReader<GenericRecord> reader = ugi.doAs((PrivilegedExceptionAction<DataFileReader<GenericRecord>>) () ->
+ new DataFileReader<>(new FsInput(fileSplit.getPath(), fs.getConf()), new GenericDatumReader<GenericRecord>()));
+
+ // move to sync point from where to read the file
+ reader.sync(fileSplit.getStart());
+ return reader;
+ } catch (IOException | InterruptedException e) {
+ throw UserException.dataReadError(e)
+ .message("Error preparing Avro reader")
+ .addContext("Reader", this)
+ .build(logger);
+ }
+ }
+
+ private boolean nextLine(RowSetLoader rowWriter) {
+ try {
+ if (!reader.hasNext() || reader.pastSync(endPosition)) {
+ return false;
+ }
+ record = reader.next(record);
+ } catch (IOException e) {
+ throw UserException.dataReadError(e)
+ .addContext("Reader", this)
+ .build(logger);
+ }
+
+ Schema schema = record.getSchema();
+
+ if (Schema.Type.RECORD != schema.getType()) {
+ throw UserException.dataReadError()
+ .message("Root object must be record type. Found: %s", schema.getType())
+ .addContext("Reader", this)
+ .build(logger);
+ }
+
+ rowWriter.start();
+ IntStream.range(0, rowWriter.size())
+ .forEach(i -> converters.get(i).convert(record.get(i)));
+ rowWriter.save();
+
+ return true;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java
deleted file mode 100644
index 10d90d4..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.avro;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.avro.LogicalType;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericContainer;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.mapred.FsInput;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.planner.logical.DrillTable;
-import org.apache.drill.exec.planner.logical.ExtendableRelDataType;
-import org.apache.drill.exec.planner.types.ExtendableRelDataTypeHolder;
-import org.apache.drill.exec.store.ColumnExplorer;
-import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.dfs.FormatSelection;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
-public class AvroDrillTable extends DrillTable {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroDrillTable.class);
-
- private final DataFileReader<GenericContainer> reader;
- private final SchemaConfig schemaConfig;
- private ExtendableRelDataTypeHolder holder;
-
- public AvroDrillTable(String storageEngineName,
- FileSystemPlugin plugin,
- SchemaConfig schemaConfig,
- FormatSelection selection) {
- super(storageEngineName, plugin, schemaConfig.getUserName(), selection);
- List<Path> asFiles = selection.getAsFiles();
- Path path = asFiles.get(0);
- this.schemaConfig = schemaConfig;
- try {
- reader = new DataFileReader<>(new FsInput(path, plugin.getFsConf()), new GenericDatumReader<GenericContainer>());
- } catch (IOException e) {
- throw UserException.dataReadError(e).build(logger);
- }
- }
-
- @Override
- public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- // ExtendableRelDataTypeHolder is reused to preserve previously added implicit columns
- if (holder == null) {
- List<RelDataType> typeList = Lists.newArrayList();
- List<String> fieldNameList = Lists.newArrayList();
-
- // adds partition columns to RowType since they always present in star queries
- List<String> partitions =
- ColumnExplorer.getPartitionColumnNames(((FormatSelection) getSelection()).getSelection(), schemaConfig);
- for (String partitionName : partitions) {
- fieldNameList.add(partitionName);
- typeList.add(typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARCHAR), true));
- }
-
- // adds non-partition table columns to RowType
- Schema schema = reader.getSchema();
- for (Field field : schema.getFields()) {
- fieldNameList.add(field.name());
- typeList.add(getNullableRelDataTypeFromAvroType(typeFactory, field.schema()));
- }
-
- holder = new ExtendableRelDataTypeHolder(
- typeFactory.createStructType(typeList, fieldNameList).getFieldList(),
- ColumnExplorer.getImplicitColumnsNames(schemaConfig));
- }
-
- return new ExtendableRelDataType(holder, typeFactory);
- }
-
- private RelDataType getNullableRelDataTypeFromAvroType(
- RelDataTypeFactory typeFactory, Schema fieldSchema) {
- LogicalType logicalType = fieldSchema.getLogicalType();
- String logicalTypeName = logicalType != null ? logicalType.getName() : "";
- RelDataType relDataType = null;
- switch (fieldSchema.getType()) {
- case ARRAY:
- RelDataType eleType = getNullableRelDataTypeFromAvroType(typeFactory, fieldSchema.getElementType());
- relDataType = typeFactory.createArrayType(eleType, -1);
- break;
- case BOOLEAN:
- relDataType = typeFactory.createSqlType(SqlTypeName.BOOLEAN);
- break;
- case BYTES:
- switch (logicalTypeName) {
- case "decimal":
- relDataType = typeFactory.createSqlType(SqlTypeName.DECIMAL);
- break;
- default:
- relDataType = typeFactory.createSqlType(SqlTypeName.BINARY);
- }
- break;
- case DOUBLE:
- relDataType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
- break;
- case FIXED:
- switch (logicalTypeName) {
- case "decimal":
- relDataType = typeFactory.createSqlType(SqlTypeName.DECIMAL);
- break;
- default:
- logger.error("{} type not supported", fieldSchema.getType());
- throw UserException.unsupportedError().message("FIXED type not supported yet").build(logger);
- }
- break;
- case FLOAT:
- relDataType = typeFactory.createSqlType(SqlTypeName.FLOAT);
- break;
- case INT:
- switch (logicalTypeName) {
- case "date":
- relDataType = typeFactory.createSqlType(SqlTypeName.DATE);
- break;
- case "time-millis":
- relDataType = typeFactory.createSqlType(SqlTypeName.TIME);
- break;
- default:
- relDataType = typeFactory.createSqlType(SqlTypeName.INTEGER);
- }
- break;
- case LONG:
- switch (logicalTypeName) {
- case "date":
- relDataType = typeFactory.createSqlType(SqlTypeName.DATE);
- break;
- case "time-micros":
- relDataType = typeFactory.createSqlType(SqlTypeName.TIME);
- break;
- case "timestamp-millis":
- case "timestamp-micros":
- relDataType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
- break;
- default:
- relDataType = typeFactory.createSqlType(SqlTypeName.BIGINT);
- }
- break;
- case MAP:
- RelDataType valueType = getNullableRelDataTypeFromAvroType(typeFactory, fieldSchema.getValueType());
- RelDataType keyType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
- relDataType = typeFactory.createMapType(keyType, valueType);
- break;
- case NULL:
- relDataType = typeFactory.createSqlType(SqlTypeName.NULL);
- break;
- case RECORD:
-// List<String> fieldNameList = Lists.newArrayList();
-// List<RelDataType> fieldRelDataTypeList = Lists.newArrayList();
-// for(Field field : fieldSchema.getFields()) {
-// fieldNameList.add(field.name());
-// fieldRelDataTypeList.add(getNullableRelDataTypeFromAvroType(typeFactory, field.schema()));
-// }
-// relDataType = typeFactory.createStructType(fieldRelDataTypeList, fieldNameList);
-
- //TODO This has to be mapped to struct type but because of calcite issue,
- //for now mapping it to map type.
- keyType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
- valueType = typeFactory.createSqlType(SqlTypeName.ANY);
- relDataType = typeFactory.createMapType(keyType, valueType);
- break;
- case ENUM:
- case STRING:
- relDataType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
- break;
- case UNION:
- RelDataType optinalType = getNullableRelDataTypeFromAvroType(typeFactory, fieldSchema.getTypes().get(1));
- relDataType = typeFactory.createTypeWithNullability(optinalType, true);
- break;
- }
- return relDataType;
- }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java
index 2551ba5..6e8a9a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatConfig.java
@@ -17,23 +17,38 @@
*/
package org.apache.drill.exec.store.avro;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.common.logical.FormatPluginConfig;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
/**
* Format plugin config for Avro data files.
*/
-@JsonTypeName("avro")
+@JsonTypeName(AvroFormatPlugin.DEFAULT_NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class AvroFormatConfig implements FormatPluginConfig {
+ public List<String> extensions = Collections.singletonList("avro");
+
@Override
public int hashCode() {
- return 101; // XXX - WHAT IS THIS SUPPOSED TO BE?
+ return Objects.hash(extensions);
}
@Override
- public boolean equals(Object obj) {
- return obj instanceof AvroFormatConfig;
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AvroFormatConfig that = (AvroFormatConfig) o;
+ return Objects.equals(extensions, that.extensions);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index dc2046a..952fb1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -17,117 +17,62 @@
*/
package org.apache.drill.exec.store.avro;
-import java.io.IOException;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
-import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.dfs.FormatMatcher;
-import org.apache.drill.exec.store.dfs.FormatSelection;
-import org.apache.drill.exec.store.dfs.MagicString;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.hadoop.conf.Configuration;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
/**
* Format plugin for Avro data files.
*/
public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
- private final AvroFormatMatcher matcher;
+ public static final String DEFAULT_NAME = "avro";
- public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
- StoragePluginConfig storagePluginConfig) {
- this(name, context, fsConf, storagePluginConfig, new AvroFormatConfig());
+ public AvroFormatPlugin(String name,
+ DrillbitContext context,
+ Configuration fsConf,
+ StoragePluginConfig storageConfig,
+ AvroFormatConfig formatConfig) {
+ super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
}
- public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
- super(name, context, fsConf, config, formatPluginConfig, true, false, true, false, Lists.newArrayList("avro"), "avro");
- this.matcher = new AvroFormatMatcher(this);
+ private static EasyFormatConfig easyConfig(Configuration fsConf, AvroFormatConfig formatConfig) {
+ EasyFormatConfig config = new EasyFormatConfig();
+ config.readable = true;
+ config.writable = false;
+ config.blockSplittable = true;
+ config.compressible = false;
+ config.supportsProjectPushdown = true;
+ config.extensions = formatConfig.extensions;
+ config.fsConf = fsConf;
+ config.defaultName = DEFAULT_NAME;
+ config.readerOperatorType = CoreOperatorType.AVRO_SUB_SCAN_VALUE;
+ config.useEnhancedScan = true;
+ return config;
}
@Override
- public boolean supportsPushDown() {
- return true;
+ protected FileScanFramework.FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
+ FileScanFramework.FileScanBuilder builder = new FileScanFramework.FileScanBuilder();
+ builder.setReaderFactory(new AvroReaderFactory());
+ initScanBuilder(builder, scan);
+ builder.setNullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+ return builder;
}
- @Override
- public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns, String userName) throws ExecutionSetupException {
- return new AvroRecordReader(context, fileWork.getPath(), fileWork.getStart(), fileWork.getLength(), dfs, columns,
- userName);
- }
-
- @Override
- public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
- throw new UnsupportedOperationException("unimplemented");
- }
-
- @Override
- public int getReaderOperatorType() {
- return CoreOperatorType.AVRO_SUB_SCAN_VALUE;
- }
-
- @Override
- public int getWriterOperatorType() {
- throw new UnsupportedOperationException("unimplemented");
- }
-
- @Override
- public FormatMatcher getMatcher() {
- return this.matcher;
- }
-
- @Override
- public boolean supportsStatistics() {
- return false;
- }
-
- @Override
- public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
- throw new UnsupportedOperationException("unimplemented");
- }
-
- @Override
- public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException {
- throw new UnsupportedOperationException("unimplemented");
- }
-
- private static class AvroFormatMatcher extends BasicFormatMatcher {
-
- public AvroFormatMatcher(AvroFormatPlugin plugin) {
- super(plugin, ImmutableList.of(Pattern.compile(".*\\.avro$")), ImmutableList.<MagicString>of());
- }
+ private static class AvroReaderFactory extends FileScanFramework.FileReaderFactory {
@Override
- public DrillTable isReadable(DrillFileSystem fs,
- FileSelection selection, FileSystemPlugin fsPlugin,
- String storageEngineName, SchemaConfig schemaConfig) throws IOException {
- if (isFileReadable(fs, selection.getFirstPath(fs))) {
- return new AvroDrillTable(storageEngineName, fsPlugin, schemaConfig,
- new FormatSelection(plugin.getConfig(), selection));
- }
- return null;
+ public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
+ return new AvroBatchReader();
}
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
deleted file mode 100644
index 34a035c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.avro;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.avro.LogicalType;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.generic.GenericContainer;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericFixed;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.FsInput;
-import org.apache.avro.util.Utf8;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
-import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.drill.exec.vector.complex.DictVector;
-import org.apache.drill.exec.vector.complex.fn.FieldSelection;
-import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-
-import io.netty.buffer.DrillBuf;
-import org.joda.time.DateTimeConstants;
-
-/**
- * A RecordReader implementation for Avro data files.
- *
- * @see org.apache.drill.exec.store.RecordReader
- */
-public class AvroRecordReader extends AbstractRecordReader {
-
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroRecordReader.class);
-
- private final Path hadoop;
- private final long start;
- private final long end;
- private final FieldSelection fieldSelection;
- private final OptionManager optionManager;
- private DrillBuf buffer;
- private VectorContainerWriter writer;
-
- private DataFileReader<GenericContainer> reader = null;
- private FileSystem fs;
-
- private final String opUserName;
- private final String queryUserName;
-
- private static final int DEFAULT_BATCH_SIZE = 4096;
-
-
- public AvroRecordReader(final FragmentContext fragmentContext,
- final Path inputPath,
- final long start,
- final long length,
- final FileSystem fileSystem,
- final List<SchemaPath> projectedColumns,
- final String userName) {
- hadoop = inputPath;
- this.start = start;
- this.end = start + length;
- buffer = fragmentContext.getManagedBuffer();
- this.fs = fileSystem;
- this.opUserName = userName;
- this.queryUserName = fragmentContext.getQueryUserName();
- setColumns(projectedColumns);
- this.fieldSelection = FieldSelection.getFieldSelection(projectedColumns);
- optionManager = fragmentContext.getOptions();
- }
-
- private DataFileReader<GenericContainer> getReader(final Path hadoop, final FileSystem fs) throws ExecutionSetupException {
- try {
- final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(this.opUserName, this.queryUserName);
- return ugi.doAs((PrivilegedExceptionAction<DataFileReader<GenericContainer>>) () ->
- new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>()));
- } catch (IOException | InterruptedException e) {
- throw new ExecutionSetupException(
- String.format("Error in creating avro reader for file: %s", hadoop), e);
- }
- }
-
- @Override
- public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
- writer = new VectorContainerWriter(output);
-
- try {
- reader = getReader(hadoop, fs);
- logger.debug("Processing file : {}, start position : {}, end position : {} ", hadoop, start, end);
- reader.sync(this.start);
- } catch (IOException e) {
- throw new ExecutionSetupException(e);
- }
- }
-
- @Override
- public int next() {
- final Stopwatch watch = Stopwatch.createStarted();
-
- if (reader == null) {
- throw new IllegalStateException("Avro reader is not open.");
- }
- if (!reader.hasNext()) {
- return 0;
- }
-
- int recordCount = 0;
- writer.allocate();
- writer.reset();
-
- try {
- for (GenericContainer container = null;
- recordCount < DEFAULT_BATCH_SIZE && reader.hasNext() && !reader.pastSync(end);
- recordCount++) {
- writer.setPosition(recordCount);
- container = reader.next(container);
- processRecord(container, container.getSchema());
- }
-
- writer.setValueCount(recordCount);
-
- } catch (IOException e) {
- throw new DrillRuntimeException(e);
- }
-
- logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS));
- return recordCount;
- }
-
- private void processRecord(final GenericContainer container, final Schema schema) {
-
- final Schema.Type type = schema.getType();
-
- switch (type) {
- case RECORD:
- process(container, schema, null, new MapOrListWriterImpl(writer.rootAsMap()), fieldSelection);
- break;
- default:
- throw new DrillRuntimeException("Root object must be record type. Found: " + type);
- }
- }
-
- private void process(final Object value, final Schema schema, final String fieldName, MapOrListWriterImpl writer, FieldSelection fieldSelection) {
- if (value == null) {
- return;
- }
- final Schema.Type type = schema.getType();
-
- switch (type) {
- case RECORD:
- // list field of MapOrListWriter will be non null when we want to store array of maps/records.
- MapOrListWriterImpl _writer = writer;
-
- for (final Schema.Field field : schema.getFields()) {
- if (field.schema().getType() == Schema.Type.RECORD ||
- (field.schema().getType() == Schema.Type.UNION &&
- field.schema().getTypes().get(0).getType() == Schema.Type.NULL &&
- field.schema().getTypes().get(1).getType() == Schema.Type.RECORD)) {
- _writer = (MapOrListWriterImpl) writer.map(field.name());
- }
-
- process(((GenericRecord) value).get(field.name()), field.schema(), field.name(), _writer, fieldSelection.getChild(field.name()));
-
- }
- break;
- case ARRAY:
- assert fieldName != null;
- final GenericArray<?> array = (GenericArray<?>) value;
- Schema elementSchema = array.getSchema().getElementType();
- Type elementType = elementSchema.getType();
- if (elementType == Schema.Type.RECORD) {
- writer = (MapOrListWriterImpl) writer.list(fieldName).listoftmap(fieldName);
- } else if (elementType == Schema.Type.MAP) {
- writer = (MapOrListWriterImpl) writer.list(fieldName);
- writer.listOfDict();
- } else {
- writer = (MapOrListWriterImpl) writer.list(fieldName);
- }
- for (final Object o : array) {
- writer.start();
- process(o, elementSchema, fieldName, writer, fieldSelection.getChild(fieldName));
- writer.end();
- }
- break;
- case UNION:
- // currently supporting only nullable union (optional fields) like ["null", "some-type"].
- if (schema.getTypes().get(0).getType() != Schema.Type.NULL) {
- throw new UnsupportedOperationException("Avro union type must be of the format : [\"null\", \"some-type\"]");
- }
- process(value, schema.getTypes().get(1), fieldName, writer, fieldSelection);
- break;
- case MAP:
- @SuppressWarnings("unchecked")
- Map<Object, Object> map = (Map<Object, Object>) value;
- // key type in Avro MAP is assumed to be string
- Schema keySchema = Schema.create(Type.STRING);
- Schema valueSchema = schema.getValueType();
-
- writer = (MapOrListWriterImpl) writer.dict(fieldName);
- BaseWriter.DictWriter dictWriter = (BaseWriter.DictWriter) writer.map;
-
- dictWriter.start();
- for (Entry<Object, Object> entry : map.entrySet()) {
- dictWriter.startKeyValuePair();
- processPrimitive(entry.getKey(), keySchema, DictVector.FIELD_KEY_NAME, writer);
- process(entry.getValue(), valueSchema, DictVector.FIELD_VALUE_NAME, writer, FieldSelection.ALL_VALID);
- dictWriter.endKeyValuePair();
- }
- dictWriter.end();
- break;
- case FIXED:
- case ENUM: // Enum symbols are strings
- case NULL: // Treat null type as a primitive
- default:
- assert fieldName != null;
-
- if (writer.isMapWriter()) {
- if (fieldSelection.isNeverValid()) {
- break;
- }
- }
-
- processPrimitive(value, schema, fieldName, writer);
- break;
- }
-
- }
-
- private void processPrimitive(final Object value, final Schema schema, final String fieldName,
- final MapOrListWriterImpl writer) {
- LogicalType logicalType = schema.getLogicalType();
- String logicalTypeName = logicalType != null ? logicalType.getName() : "";
-
- if (value == null) {
- return;
- }
-
- switch (schema.getType()) {
- case STRING:
- byte[] binary;
- final int length;
- if (value instanceof Utf8) {
- binary = ((Utf8) value).getBytes();
- length = ((Utf8) value).getByteLength();
- } else {
- binary = value.toString().getBytes(Charsets.UTF_8);
- length = binary.length;
- }
- ensure(length);
- buffer.setBytes(0, binary);
- writer.varChar(fieldName).writeVarChar(0, length, buffer);
- break;
- case INT:
- switch (logicalTypeName) {
- case "date":
- writer.date(fieldName).writeDate((int) value * (long) DateTimeConstants.MILLIS_PER_DAY);
- break;
- case "time-millis":
- writer.time(fieldName).writeTime((Integer) value);
- break;
- default:
- writer.integer(fieldName).writeInt((Integer) value);
- }
- break;
- case LONG:
- switch (logicalTypeName) {
- case "date":
- writer.date(fieldName).writeDate((Long) value);
- break;
- case "time-micros":
- writer.time(fieldName).writeTime((int) ((long) value / 1000));
- break;
- case "timestamp-millis":
- writer.timeStamp(fieldName).writeTimeStamp((Long) value);
- break;
- case "timestamp-micros":
- writer.timeStamp(fieldName).writeTimeStamp((long) value / 1000);
- break;
- default:
- writer.bigInt(fieldName).writeBigInt((Long) value);
- }
- break;
- case FLOAT:
- writer.float4(fieldName).writeFloat4((Float) value);
- break;
- case DOUBLE:
- writer.float8(fieldName).writeFloat8((Double) value);
- break;
- case BOOLEAN:
- writer.bit(fieldName).writeBit((Boolean) value ? 1 : 0);
- break;
- case BYTES:
- final ByteBuffer buf = (ByteBuffer) value;
- length = buf.remaining();
- ensure(length);
- buffer.setBytes(0, buf);
- switch (logicalTypeName) {
- case "decimal":
- ParquetReaderUtility.checkDecimalTypeEnabled(optionManager);
- LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
- writer.varDecimal(fieldName, decimalType.getPrecision(), decimalType.getScale())
- .writeVarDecimal(0, length, buffer, decimalType.getPrecision(), decimalType.getScale());
- break;
- default:
- writer.binary(fieldName).writeVarBinary(0, length, buffer);
- }
- break;
- case NULL:
- // Nothing to do for null type
- break;
- case ENUM:
- final String symbol = value.toString();
- final byte[] b;
- try {
- b = symbol.getBytes(Charsets.UTF_8.name());
- } catch (UnsupportedEncodingException e) {
- throw new DrillRuntimeException("Unable to read enum value for field: " + fieldName, e);
- }
- ensure(b.length);
- buffer.setBytes(0, b);
- writer.varChar(fieldName).writeVarChar(0, b.length, buffer);
- break;
- case FIXED:
- GenericFixed genericFixed = (GenericFixed) value;
- switch (logicalTypeName) {
- case "decimal":
- ParquetReaderUtility.checkDecimalTypeEnabled(optionManager);
- LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
- writer.varDecimal(fieldName, decimalType.getPrecision(), decimalType.getScale())
- .writeVarDecimal(new BigDecimal(new BigInteger(genericFixed.bytes()), decimalType.getScale()));
- break;
- default:
- throw new UnsupportedOperationException("Unimplemented type: " + schema.getType().toString());
- }
- break;
- default:
- throw new DrillRuntimeException("Unhandled Avro type: " + schema.getType().toString());
- }
- }
-
- private boolean selected(SchemaPath field) {
- if (isStarQuery()) {
- return true;
- }
- for (final SchemaPath sp : getColumns()) {
- if (sp.contains(field)) {
- return true;
- }
- }
- return false;
- }
-
- private void ensure(final int length) {
- buffer = buffer.reallocIfNeeded(length);
- }
-
- @Override
- public void close() {
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException e) {
- logger.warn("Error closing Avro reader", e);
- } finally {
- reader = null;
- }
- }
- }
-
- @Override
- public String toString() {
- long currentPosition = -1L;
- try {
- if (reader != null) {
- currentPosition = reader.tell();
- }
- } catch (IOException e) {
- logger.trace("Unable to obtain reader position.", e);
- }
- return "AvroRecordReader[File=" + hadoop
- + ", Position=" + currentPosition
- + "]";
- }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSchemaUtil.java
new file mode 100644
index 0000000..53f08d9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroSchemaUtil.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.DictBuilder;
+import org.apache.drill.exec.record.metadata.MapBuilder;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.RepeatedListBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.vector.complex.DictVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class that provides methods to interact with Avro schema.
+ */
+public class AvroSchemaUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(AvroSchemaUtil.class);
+
+ public static final String AVRO_LOGICAL_TYPE_PROPERTY = "avro_logical_type";
+
+ public static final String DECIMAL_LOGICAL_TYPE = "decimal";
+ public static final String TIMESTAMP_MICROS_LOGICAL_TYPE = "timestamp-micros";
+ public static final String TIMESTAMP_MILLIS_LOGICAL_TYPE = "timestamp-millis";
+ public static final String DATE_LOGICAL_TYPE = "date";
+ public static final String TIME_MICROS_LOGICAL_TYPE = "time-micros";
+ public static final String TIME_MILLIS_LOGICAL_TYPE = "time-millis";
+ public static final String DURATION_LOGICAL_TYPE = "duration";
+
+ /**
+ * Converts Avro schema into Drill metadata description of the schema.
+ *
+ * @param schema Avro schema
+ * @return metadata description of the schema
+ * @throws UserException if schema contains unsupported types
+ */
+ public static TupleMetadata convert(Schema schema) {
+ return SchemaConverter.INSTANCE.convert(schema);
+ }
+
+ /**
+ * Avro represents nullable type as union of null and another schema: ["null", "some-type"].
+ * This method extracts non-nullable schema for given union schema.
+ *
+ * @param schema Avro schema
+ * @param columnName column name
+ * @return non-nullable Avro schema
+ * @throws UserException if given schema is not a union or represents complex union
+ */
+ public static Schema extractSchemaFromNullable(Schema schema, String columnName) {
+ if (!schema.isUnion()) {
+ throw UserException.validationError()
+ .message("Expected union type, but received: %s", schema.getType())
+ .addContext("Column", columnName)
+ .build(logger);
+ }
+ List<Schema> unionSchemas = schema.getTypes();
+
+ // exclude all schemas with null type
+ List<Schema> nonNullSchemas = unionSchemas.stream()
+ .filter(unionSchema -> !Schema.Type.NULL.equals(unionSchema.getType()))
+ .collect(Collectors.toList());
+
+ // if original schema has two elements and only one non-nullable schema, this is simple nullable type
+ if (unionSchemas.size() == 2 && nonNullSchemas.size() == 1) {
+ return nonNullSchemas.get(0);
+ } else {
+ return throwUnsupportedErrorForType("complex union", columnName);
+ }
+ }
+
+ private static <T> T throwUnsupportedErrorForType(String type, String columnName) {
+ throw UserException.unsupportedError()
+ .message("'%s' type is not supported", type)
+ .addContext("Column", columnName)
+ .build(logger);
+ }
+
+ /**
+ * Class is responsible for converting Avro schema into Drill metadata description of the schema.
+ * It does not hold state and thus is thread-safe.
+ */
+ private static class SchemaConverter {
+
+ private static final SchemaConverter INSTANCE = new SchemaConverter();
+
+ TupleMetadata convert(Schema schema) {
+ /*
+ Avro allows to reference types by name, sometimes reference can be done to the type under construction.
+ For example, a linked-list of 64-bit values:
+ {
+ "type": "record",
+ "name": "LongList",
+ "fields" : [
+ {"name": "value", "type": "long"}, // each element has a long
+ {"name": "next", "type": ["null", "LongList"]} // optional next element
+ ]
+ }
+
+ Since we cannot build record type which is not constructed yet, when such situation is detected,
+ record type is set to Drill Map without columns, columns will be detected when reading actual data.
+
+ `typeNamesUnderConstruction` is a holder to store record type names under construction to detect
+ reference to the types which are not yet constructed.
+ */
+ Set<String> typeNamesUnderConstruction = new HashSet<>();
+ TupleSchema tupleSchema = new TupleSchema();
+
+ // add current record type to the set of types under construction
+ typeNamesUnderConstruction.add(schema.getFullName());
+
+ List<Schema.Field> fields = schema.getFields();
+ fields.stream()
+ .map(field -> convert(field, typeNamesUnderConstruction))
+ .forEach(tupleSchema::add);
+ return tupleSchema;
+ }
+
+ private ColumnMetadata convert(Schema.Field field, Set<String> typeNamesUnderConstruction) {
+ Schema fieldSchema = field.schema();
+ return defineColumn(field.name(), fieldSchema, TypeProtos.DataMode.REQUIRED, typeNamesUnderConstruction);
+ }
+
+ private ColumnMetadata defineColumn(String name, Schema fieldSchema,
+ TypeProtos.DataMode mode,
+ Set<String> typeNamesUnderConstruction) {
+ String logicalTypeName = getLogicalTypeName(fieldSchema);
+ switch (fieldSchema.getType()) {
+ case INT:
+ switch (logicalTypeName) {
+ case DATE_LOGICAL_TYPE:
+ return initField(name, TypeProtos.MinorType.DATE, mode);
+ case TIME_MILLIS_LOGICAL_TYPE:
+ return initField(name, TypeProtos.MinorType.TIME, mode);
+ default:
+ return initField(name, TypeProtos.MinorType.INT, mode);
+ }
+ case LONG:
+ switch (logicalTypeName) {
+ case TIMESTAMP_MICROS_LOGICAL_TYPE:
+ case TIMESTAMP_MILLIS_LOGICAL_TYPE:
+ ColumnMetadata timestampColumn = initField(name, TypeProtos.MinorType.TIMESTAMP, mode);
+ // add avro logical type property to know how to convert timestamp value
+ timestampColumn.setProperty(AVRO_LOGICAL_TYPE_PROPERTY, logicalTypeName);
+ return timestampColumn;
+ case TIME_MICROS_LOGICAL_TYPE:
+ return initField(name, TypeProtos.MinorType.TIME, mode);
+ default:
+ return initField(name, TypeProtos.MinorType.BIGINT, mode);
+ }
+ case FLOAT:
+ return initField(name, TypeProtos.MinorType.FLOAT4, mode);
+ case DOUBLE:
+ return initField(name, TypeProtos.MinorType.FLOAT8, mode);
+ case FIXED:
+ if (DURATION_LOGICAL_TYPE.equals(logicalTypeName)) {
+ return initField(name, TypeProtos.MinorType.INTERVAL, mode);
+ }
+ // fall through
+ case BYTES:
+ if (DECIMAL_LOGICAL_TYPE.equals(logicalTypeName)) {
+ LogicalTypes.Decimal decimalLogicalType = (LogicalTypes.Decimal) fieldSchema.getLogicalType();
+ TypeProtos.MajorType majorType = Types.withPrecisionAndScale(TypeProtos.MinorType.VARDECIMAL,
+ mode, decimalLogicalType.getPrecision(), decimalLogicalType.getScale());
+ return initField(name, majorType);
+ } else {
+ return initField(name, TypeProtos.MinorType.VARBINARY, mode);
+ }
+ case BOOLEAN:
+ return initField(name, TypeProtos.MinorType.BIT, mode);
+ case ENUM:
+ case STRING:
+ return initField(name, TypeProtos.MinorType.VARCHAR, mode);
+ case NULL:
+ return initField(name, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
+ case UNION:
+ Schema schema = extractSchemaFromNullable(fieldSchema, name);
+ TypeProtos.DataMode nullableMode =
+ TypeProtos.DataMode.REPEATED == mode ? TypeProtos.DataMode.REPEATED : TypeProtos.DataMode.OPTIONAL;
+ return defineColumn(name, schema, nullableMode, typeNamesUnderConstruction);
+ case RECORD:
+ MapBuilder recordBuilder = new MapBuilder(null, name, mode);
+ String typeName = fieldSchema.getFullName();
+ // if type name is not under construction, proceed adding columns
+ // otherwise leave type as Drill Map without columns
+ if (typeNamesUnderConstruction.add(typeName)) {
+ fieldSchema.getFields().stream()
+ .map(field -> convert(field, typeNamesUnderConstruction))
+ .forEach(recordBuilder::addColumn);
+ // remove record type name since it was constructed
+ typeNamesUnderConstruction.remove(typeName);
+ }
+ return recordBuilder.buildColumn();
+ case ARRAY:
+ Schema elementSchema = fieldSchema.getElementType();
+ boolean hasNestedArray = elementSchema.isUnion()
+ ? Schema.Type.ARRAY == extractSchemaFromNullable(elementSchema, name).getType()
+ : Schema.Type.ARRAY == elementSchema.getType();
+
+ if (hasNestedArray) {
+ RepeatedListBuilder builder = new RepeatedListBuilder(null, name);
+ builder.addColumn(defineColumn(name, elementSchema, TypeProtos.DataMode.REQUIRED, typeNamesUnderConstruction));
+ return builder.buildColumn();
+ }
+ return defineColumn(name, elementSchema, TypeProtos.DataMode.REPEATED, typeNamesUnderConstruction);
+ case MAP:
+ DictBuilder dictBuilder = new DictBuilder(null, name, mode);
+ // per Avro specification Map key is always of varchar type
+ dictBuilder.key(TypeProtos.MinorType.VARCHAR);
+ Schema valueSchema = fieldSchema.getValueType();
+ ColumnMetadata valueColumn = defineColumn(DictVector.FIELD_VALUE_NAME, valueSchema,
+ TypeProtos.DataMode.REQUIRED, typeNamesUnderConstruction);
+ dictBuilder.addColumn(valueColumn);
+ return dictBuilder.buildColumn();
+ default:
+ return throwUnsupportedErrorForType(fieldSchema.getType().getName(), name);
+ }
+ }
+
+ /**
+ * Identifies logical type name for the column if present.
+ * Column schema can have logical type set as object or through property.
+ *
+ * @param schema Avro column schema
+ * @return logical type name if present, empty string otherwise
+ */
+ private String getLogicalTypeName(Schema schema) {
+ String name = schema.getLogicalType() != null
+ ? schema.getLogicalType().getName()
+ : schema.getProp(LogicalType.LOGICAL_TYPE_PROP);
+ return name == null ? "" : name;
+ }
+
+ private ColumnMetadata initField(String name, TypeProtos.MinorType minorType, TypeProtos.DataMode mode) {
+ TypeProtos.MajorType majorType = Types.withMode(minorType, mode);
+ return initField(name, majorType);
+ }
+
+ private ColumnMetadata initField(String name, TypeProtos.MajorType majorType) {
+ MaterializedField materializedField = MaterializedField.create(name, majorType);
+ return MetadataUtils.fromField(materializedField);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java
deleted file mode 100644
index aee5a1e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroTypeHelper.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.avro;
-
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.Types;
-
-/**
- * Utility class for working with Avro data types.
- */
-public final class AvroTypeHelper {
-
- // XXX - Decide what to do about Avro's NULL type
- /*
- public static final MajorType MAJOR_TYPE_NULL_OPTIONAL = Types.optional(MinorType.NULL);
- public static final MajorType MAJOR_TYPE_NULL_REQUIRED = Types.required(MinorType.NULL);
- public static final MajorType MAJOR_TYPE_NULL_REPEATED = Types.repeated(MinorType.NULL);
- */
- public static final MajorType MAJOR_TYPE_BOOL_OPTIONAL = Types.optional(MinorType.UINT1);
- public static final MajorType MAJOR_TYPE_BOOL_REQUIRED = Types.required(MinorType.UINT1);
- public static final MajorType MAJOR_TYPE_BOOL_REPEATED = Types.repeated(MinorType.UINT1);
- public static final MajorType MAJOR_TYPE_INT_OPTIONAL = Types.optional(MinorType.INT);
- public static final MajorType MAJOR_TYPE_INT_REQUIRED = Types.required(MinorType.INT);
- public static final MajorType MAJOR_TYPE_INT_REPEATED = Types.repeated(MinorType.INT);
- public static final MajorType MAJOR_TYPE_BIGINT_OPTIONAL = Types.optional(MinorType.BIGINT);
- public static final MajorType MAJOR_TYPE_BIGINT_REQUIRED = Types.required(MinorType.BIGINT);
- public static final MajorType MAJOR_TYPE_BIGINT_REPEATED = Types.repeated(MinorType.BIGINT);
- public static final MajorType MAJOR_TYPE_FLOAT4_OPTIONAL = Types.optional(MinorType.FLOAT4);
- public static final MajorType MAJOR_TYPE_FLOAT4_REQUIRED = Types.required(MinorType.FLOAT4);
- public static final MajorType MAJOR_TYPE_FLOAT4_REPEATED = Types.repeated(MinorType.FLOAT4);
- public static final MajorType MAJOR_TYPE_FLOAT8_OPTIONAL = Types.optional(MinorType.FLOAT8);
- public static final MajorType MAJOR_TYPE_FLOAT8_REQUIRED = Types.required(MinorType.FLOAT8);
- public static final MajorType MAJOR_TYPE_FLOAT8_REPEATED = Types.repeated(MinorType.FLOAT8);
- public static final MajorType MAJOR_TYPE_VARBINARY_OPTIONAL = Types.optional(MinorType.VARBINARY);
- public static final MajorType MAJOR_TYPE_VARBINARY_REQUIRED = Types.required(MinorType.VARBINARY);
- public static final MajorType MAJOR_TYPE_VARBINARY_REPEATED = Types.repeated(MinorType.VARBINARY);
- public static final MajorType MAJOR_TYPE_VARCHAR_OPTIONAL = Types.optional(MinorType.VARCHAR);
- public static final MajorType MAJOR_TYPE_VARCHAR_REQUIRED = Types.required(MinorType.VARCHAR);
- public static final MajorType MAJOR_TYPE_VARCHAR_REPEATED = Types.repeated(MinorType.VARCHAR);
-
-
- private static final String UNSUPPORTED = "Unsupported type: %s [%s]";
-
- private AvroTypeHelper() { }
-
- /**
- * Maintains a mapping between Avro types and Drill types. Given an Avro data
- * type, this method will return the corresponding Drill field major type.
- *
- * @param field Avro field
- * @return Major type or null if no corresponding type
- */
- public static MajorType getFieldMajorType(final Field field, final DataMode mode) {
- return getFieldMajorType(field.schema().getType(), mode);
- }
-
- /**
- * Maintains a mapping between Avro types and Drill types. Given an Avro data
- * type, this method will return the corresponding Drill field major type.
- *
- * @param type Avro type
- * @param mode Data mode
- * @return Drill major type or null if no corresponding type
- */
- public static MajorType getFieldMajorType(final Type type, final DataMode mode) {
-
- switch (type) {
- case MAP:
- case RECORD:
- case ENUM:
- case UNION:
- throw new UnsupportedOperationException("Complex types are unimplemented");
- case NULL:
- /*
- switch (mode) {
- case OPTIONAL:
- return MAJOR_TYPE_NULL_OPTIONAL;
- case REQUIRED:
- return MAJOR_TYPE_NULL_REQUIRED;
- case REPEATED:
- return MAJOR_TYPE_NULL_REPEATED;
- }
- break;
- */
- throw new UnsupportedOperationException(String.format(UNSUPPORTED, type.getName(), mode.name()));
- case ARRAY:
- break;
- case BOOLEAN:
- switch (mode) {
- case OPTIONAL:
- return MAJOR_TYPE_BOOL_OPTIONAL;
- case REQUIRED:
- return MAJOR_TYPE_BOOL_REQUIRED;
- case REPEATED:
- return MAJOR_TYPE_BOOL_REPEATED;
- }
- break;
- case INT:
- switch (mode) {
- case OPTIONAL:
- return MAJOR_TYPE_INT_OPTIONAL;
- case REQUIRED:
- return MAJOR_TYPE_INT_REQUIRED;
- case REPEATED:
- return MAJOR_TYPE_INT_REPEATED;
- }
- break;
- case LONG:
- switch (mode) {
- case OPTIONAL:
- return MAJOR_TYPE_BIGINT_OPTIONAL;
- case REQUIRED:
- return MAJOR_TYPE_BIGINT_REQUIRED;
- case REPEATED:
- return MAJOR_TYPE_BIGINT_REPEATED;
- }
- break;
- case FLOAT:
- switch (mode) {
- case OPTIONAL:
- return MAJOR_TYPE_FLOAT4_OPTIONAL;
- case REQUIRED:
- return MAJOR_TYPE_FLOAT4_REQUIRED;
- case REPEATED:
- return MAJOR_TYPE_FLOAT4_REPEATED;
- }
- break;
- case DOUBLE:
- switch (mode) {
- case OPTIONAL:
- return MAJOR_TYPE_FLOAT8_OPTIONAL;
- case REQUIRED:
- return MAJOR_TYPE_FLOAT8_REQUIRED;
- case REPEATED:
- return MAJOR_TYPE_FLOAT8_REPEATED;
- }
- break;
- case BYTES:
- switch (mode) {
- case OPTIONAL:
- return MAJOR_TYPE_VARBINARY_OPTIONAL;
- case REQUIRED:
- return MAJOR_TYPE_VARBINARY_REQUIRED;
- case REPEATED:
- return MAJOR_TYPE_VARBINARY_REPEATED;
- }
- break;
- case STRING:
- switch (mode) {
- case OPTIONAL:
- return MAJOR_TYPE_VARCHAR_OPTIONAL;
- case REQUIRED:
- return MAJOR_TYPE_VARCHAR_REQUIRED;
- case REPEATED:
- return MAJOR_TYPE_VARCHAR_REPEATED;
- }
- break;
- default:
- throw new UnsupportedOperationException(String.format(UNSUPPORTED, type.getName(), mode.name()));
- }
-
- throw new UnsupportedOperationException(String.format(UNSUPPORTED, type.getName(), mode.name()));
- }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConverter.java
new file mode 100644
index 0000000..1a63f7b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConverter.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.joda.time.DateTimeConstants;
+import org.joda.time.Period;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+/**
+ * Converts and sets given value into the specific column writer.
+ */
+public interface ColumnConverter {
+
+ void convert(Object value);
+
+ /**
+ * Does nothing, is used when column is not projected to avoid unnecessary
+ * column values conversions and writes.
+ */
+ class DummyColumnConverter implements ColumnConverter {
+
+ public static final DummyColumnConverter INSTANCE = new DummyColumnConverter();
+
+ @Override
+ public void convert(Object value) {
+ // do nothing
+ }
+ }
+
+ /**
+ * Converts and writes scalar values using provided {@link #valueConverter}.
+ * {@link #valueConverter} has different implementation depending
+ * on the scalar value type.
+ */
+ class ScalarColumnConverter implements ColumnConverter {
+
+ private final Consumer<Object> valueConverter;
+
+ public ScalarColumnConverter(Consumer<Object> valueConverter) {
+ this.valueConverter = valueConverter;
+ }
+
+ public static ScalarColumnConverter init(ScalarWriter writer) {
+ ColumnMetadata columnMetadata = writer.schema();
+ switch (columnMetadata.type()) {
+ case VARCHAR:
+ return new ScalarColumnConverter(value -> {
+ byte[] binary;
+ int length;
+ if (value instanceof Utf8) {
+ Utf8 utf8 = (Utf8) value;
+ binary = utf8.getBytes();
+ length = utf8.getByteLength();
+ } else {
+ binary = value.toString().getBytes(Charsets.UTF_8);
+ length = binary.length;
+ }
+ writer.setBytes(binary, length);
+ });
+ case VARBINARY:
+ return new ScalarColumnConverter(value -> {
+ if (value instanceof ByteBuffer) {
+ ByteBuffer buf = (ByteBuffer) value;
+ writer.setBytes(buf.array(), buf.remaining());
+ } else {
+ byte[] bytes = ((GenericFixed) value).bytes();
+ writer.setBytes(bytes, bytes.length);
+ }
+ });
+ case VARDECIMAL:
+ return new ScalarColumnConverter(value -> {
+ BigInteger bigInteger;
+ if (value instanceof ByteBuffer) {
+ ByteBuffer decBuf = (ByteBuffer) value;
+ bigInteger = new BigInteger(decBuf.array());
+ } else {
+ GenericFixed genericFixed = (GenericFixed) value;
+ bigInteger = new BigInteger(genericFixed.bytes());
+ }
+ BigDecimal decimalValue = new BigDecimal(bigInteger, writer.schema().scale());
+ writer.setDecimal(decimalValue);
+ });
+ case TIMESTAMP:
+ return new ScalarColumnConverter(value -> {
+ String avroLogicalType = writer.schema().property(AvroSchemaUtil.AVRO_LOGICAL_TYPE_PROPERTY);
+ if (AvroSchemaUtil.TIMESTAMP_MILLIS_LOGICAL_TYPE.equals(avroLogicalType)) {
+ writer.setLong((long) value);
+ } else {
+ writer.setLong((long) value / 1000);
+ }
+ });
+ case DATE:
+ return new ScalarColumnConverter(value -> writer.setLong((int) value * (long) DateTimeConstants.MILLIS_PER_DAY));
+ case TIME:
+ return new ScalarColumnConverter(value -> {
+ if (value instanceof Long) {
+ writer.setInt((int) ((long) value / 1000));
+ } else {
+ writer.setInt((int) value);
+ }
+ });
+ case INTERVAL:
+ return new ScalarColumnConverter(value -> {
+ GenericFixed genericFixed = (GenericFixed) value;
+ IntBuffer intBuf = ByteBuffer.wrap(genericFixed.bytes())
+ .order(ByteOrder.LITTLE_ENDIAN)
+ .asIntBuffer();
+
+ Period period = Period.months(intBuf.get(0))
+ .withDays(intBuf.get(1)
+ ).withMillis(intBuf.get(2));
+
+ writer.setPeriod(period);
+ });
+ default:
+ return new ScalarColumnConverter(writer::setObject);
+ }
+ }
+
+ @Override
+ public void convert(Object value) {
+ if (value == null) {
+ return;
+ }
+
+ valueConverter.accept(value);
+ }
+ }
+
+ /**
+ * Converts and writes array values using {@link #valueConverter}
+ * into {@link #arrayWriter}.
+ */
+ class ArrayColumnConverter implements ColumnConverter {
+
+ private final ArrayWriter arrayWriter;
+ private final ColumnConverter valueConverter;
+
+ public ArrayColumnConverter(ArrayWriter arrayWriter, ColumnConverter valueConverter) {
+ this.arrayWriter = arrayWriter;
+ this.valueConverter = valueConverter;
+ }
+
+ @Override
+ public void convert(Object value) {
+ if (value == null || !arrayWriter.isProjected()) {
+ return;
+ }
+
+ GenericArray<?> array = (GenericArray<?>) value;
+ array.forEach(arrayValue -> {
+ valueConverter.convert(arrayValue);
+ arrayWriter.save();
+ });
+ }
+ }
+
+ /**
+ * Converts and writes all map children using provided {@link #converters}.
+ * If {@link #converters} are empty, generates their converters based on
+ * {@link GenericRecord} schema.
+ */
+ class MapColumnConverter implements ColumnConverter {
+
+ private final TupleWriter tupleWriter;
+ private final List<ColumnConverter> converters;
+
+ public MapColumnConverter(TupleWriter tupleWriter, List<ColumnConverter> converters) {
+ this.tupleWriter = tupleWriter;
+ this.converters = new ArrayList<>(converters);
+ }
+
+ @Override
+ public void convert(Object value) {
+ if (value == null) {
+ return;
+ }
+
+ GenericRecord genericRecord = (GenericRecord) value;
+
+ if (converters.isEmpty()) {
+ // fill in tuple schema for cases when it contains recursive named record types
+ TupleMetadata metadata = AvroSchemaUtil.convert(genericRecord.getSchema());
+ metadata.toMetadataList().forEach(tupleWriter::addColumn);
+
+ IntStream.range(0, metadata.size())
+ .mapToObj(i -> ColumnConvertersUtil.getConverter(metadata.metadata(i), tupleWriter.column(i)))
+ .forEach(converters::add);
+ }
+
+ IntStream.range(0, converters.size())
+ .forEach(i -> converters.get(i).convert(genericRecord.get(i)));
+ }
+ }
+
+ /**
+ * Converts and writes dict values using provided key / value converters.
+ */
+ class DictColumnConverter implements ColumnConverter {
+
+ private final DictWriter dictWriter;
+ private final ColumnConverter keyConverter;
+ private final ColumnConverter valueConverter;
+
+ public DictColumnConverter(DictWriter dictWriter, ColumnConverter keyConverter, ColumnConverter valueConverter) {
+ this.dictWriter = dictWriter;
+ this.keyConverter = keyConverter;
+ this.valueConverter = valueConverter;
+ }
+
+ @Override
+ public void convert(Object value) {
+ if (value == null) {
+ return;
+ }
+
+ @SuppressWarnings("unchecked") Map<Object, Object> map = (Map<Object, Object>) value;
+ map.forEach((key, val) -> {
+ keyConverter.convert(key);
+ valueConverter.convert(val);
+ dictWriter.save();
+ });
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConvertersUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConvertersUtil.java
new file mode 100644
index 0000000..99308fd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConvertersUtil.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.avro.ColumnConverter.ArrayColumnConverter;
+import org.apache.drill.exec.store.avro.ColumnConverter.DictColumnConverter;
+import org.apache.drill.exec.store.avro.ColumnConverter.DummyColumnConverter;
+import org.apache.drill.exec.store.avro.ColumnConverter.MapColumnConverter;
+import org.apache.drill.exec.store.avro.ColumnConverter.ScalarColumnConverter;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.DictWriter;
+import org.apache.drill.exec.vector.accessor.ObjectWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.DictVector;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class ColumnConvertersUtil {
+
+ /**
+ * Based on given converted Avro schema and current row writer generates list of
+ * column converters based on column type.
+ *
+ * @param schema converted Avro schema
+ * @param rowWriter current row writer
+ * @return list of column converters
+ */
+ public static List<ColumnConverter> initConverters(TupleMetadata schema, RowSetLoader rowWriter) {
+ return IntStream.range(0, schema.size())
+ .mapToObj(i -> getConverter(schema.metadata(i), rowWriter.column(i)))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Based on column type, creates corresponding column converter
+ * which holds conversion logic and appropriate writer to set converted data into.
+ * For columns which are not projected, {@link DummyColumnConverter} is used.
+ *
+ * @param metadata column metadata
+ * @param writer column writer
+ * @return column converter
+ */
+ public static ColumnConverter getConverter(ColumnMetadata metadata, ObjectWriter writer) {
+ if (!writer.isProjected()) {
+ return DummyColumnConverter.INSTANCE;
+ }
+
+ if (metadata.isArray()) {
+ return getArrayConverter(metadata, writer.array());
+ }
+
+ if (metadata.isMap()) {
+ return getMapConverter(metadata.tupleSchema(), writer.tuple());
+ }
+
+ if (metadata.isDict()) {
+ return getDictConverter(metadata.tupleSchema(), writer.dict());
+ }
+
+ return getScalarConverter(writer.scalar());
+ }
+
+ private static ColumnConverter getArrayConverter(ColumnMetadata metadata, ArrayWriter arrayWriter) {
+ ObjectWriter valueWriter = arrayWriter.entry();
+ ColumnConverter valueConverter;
+ if (metadata.isMap()) {
+ valueConverter = getMapConverter(metadata.tupleSchema(), valueWriter.tuple());
+ } else if (metadata.isDict()) {
+ valueConverter = getDictConverter(metadata.tupleSchema(), valueWriter.dict());
+ } else if (metadata.isMultiList()) {
+ valueConverter = getConverter(metadata.childSchema(), valueWriter);
+ } else {
+ valueConverter = getScalarConverter(valueWriter.scalar());
+ }
+ return new ArrayColumnConverter(arrayWriter, valueConverter);
+ }
+
+ private static ColumnConverter getMapConverter(TupleMetadata metadata, TupleWriter tupleWriter) {
+ List<ColumnConverter> converters = IntStream.range(0, metadata.size())
+ .mapToObj(i -> getConverter(metadata.metadata(i), tupleWriter.column(i)))
+ .collect(Collectors.toList());
+ return new MapColumnConverter(tupleWriter, converters);
+ }
+
+ private static ColumnConverter getDictConverter(TupleMetadata metadata, DictWriter dictWriter) {
+ ColumnConverter keyConverter = getScalarConverter(dictWriter.keyWriter());
+ ColumnConverter valueConverter = getConverter(metadata.metadata(DictVector.FIELD_VALUE_NAME), dictWriter.valueWriter());
+ return new DictColumnConverter(dictWriter, keyConverter, valueConverter);
+ }
+
+ private static ColumnConverter getScalarConverter(ScalarWriter scalarWriter) {
+ return ScalarColumnConverter.init(scalarWriter);
+ }
+}
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index 75e363f..8c0428d 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -50,7 +50,8 @@
"type" : "pcapng"
},
"avro" : {
- "type" : "avro"
+ "type" : "avro",
+ "extensions" : [ "avro" ]
},
"sequencefile" : {
"type" : "sequencefile",
@@ -163,4 +164,4 @@
"enabled" : true
}
}
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
index 718b6e9..ce03232 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/TestImpersonationQueries.java
@@ -17,11 +17,11 @@
*/
package org.apache.drill.exec.impersonation;
+import org.apache.drill.exec.store.avro.AvroDataGenerator;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.categories.SecurityTest;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.util.DrillFileUtils;
-import org.apache.drill.exec.store.avro.AvroTestUtil;
import org.apache.drill.exec.store.dfs.WorkspaceConfig;
import org.apache.drill.categories.SlowTest;
import org.apache.hadoop.fs.FileSystem;
@@ -168,7 +168,8 @@
fs.setOwner(dfsFile, user, group);
fs.setPermission(dfsFile, new FsPermission((short) 0700));
- localFile = new Path(AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues().getFilePath());
+ AvroDataGenerator avroDataGenerator = new AvroDataGenerator(dirTestWatcher);
+ localFile = new Path(avroDataGenerator.generateSimplePrimitiveSchema_NoNullValues().getFilePath());
dfsFile = new Path(getUserHome(user), "simple.avro");
fs.copyFromLocalFile(localFile, dfsFile);
fs.setOwner(dfsFile, user, group);
@@ -283,12 +284,9 @@
createView(org1Users[0], org1Groups[0], "simple_avro_view",
String.format("SELECT h_boolean, e_double FROM %s.`%s` t", MINI_DFS_STORAGE_PLUGIN_NAME,
new Path(getUserHome(org1Users[0]), "simple.avro")));
- try {
- updateClient(org1Users[1]);
- test("SELECT h_boolean FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", "simple_avro_view");
- } catch (UserRemoteException e) {
- assertNull("This test should pass.", e);
- }
+
+ updateClient(org1Users[1]);
+ test("SELECT h_boolean FROM %s.%s.%s", MINI_DFS_STORAGE_PLUGIN_NAME, "tmp", "simple_avro_view");
}
@Test // DRILL-7250
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectedTuple.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectedTuple.java
index c82870d..b4700a6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectedTuple.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/project/TestProjectedTuple.java
@@ -25,6 +25,7 @@
import static org.junit.Assert.fail;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.drill.categories.RowSetTests;
@@ -76,7 +77,7 @@
// Empty list means nothing is projected
- RequestedTuple projSet = RequestedTupleImpl.parse(new ArrayList<SchemaPath>());
+ RequestedTuple projSet = RequestedTupleImpl.parse(Collections.emptyList());
assertEquals(TupleProjectionType.NONE, projSet.type());
assertTrue(projSet instanceof ImpliedTupleRequest);
List<RequestedColumn> cols = projSet.projections();
@@ -297,7 +298,7 @@
RequestedColumn wildcard = cols.get(0);
assertEquals(ProjectionType.WILDCARD, wildcard.type());
assertEquals(SchemaPath.DYNAMIC_STAR, wildcard.name());
- assertTrue(! wildcard.isSimple());
+ assertFalse(wildcard.isSimple());
assertTrue(wildcard.isWildcard());
assertNull(wildcard.mapProjection());
assertNull(wildcard.indexes());
@@ -326,7 +327,7 @@
assertTrue(a.isArray());
assertFalse(a.isSimple());
assertFalse(a.isTuple());
- boolean indexes[] = a.indexes();
+ boolean[] indexes = a.indexes();
assertNotNull(indexes);
assertEquals(4, indexes.length);
assertFalse(indexes[0]);
@@ -335,15 +336,29 @@
assertTrue(indexes[3]);
}
+ /**
+ * Duplicate array entries are allowed to handle the
+ * use case of a[1], a[1].z. Each element is reported once;
+ * the project operator will create copies as needed.
+ */
@Test
public void testArrayDups() {
- try {
- RequestedTupleImpl.parse(
- RowSetTestUtils.projectList("a[1]", "a[3]", "a[1]"));
- fail();
- } catch (UserException e) {
- // Expected
- }
+ RequestedTuple projSet = RequestedTupleImpl.parse(
+ RowSetTestUtils.projectList("a[1]", "a[3]", "a[1]", "a[3].z"));
+
+ List<RequestedColumn> cols = projSet.projections();
+ assertEquals(1, cols.size());
+
+ RequestedColumn a = cols.get(0);
+ assertEquals("a", a.name());
+ assertTrue(a.isArray());
+ boolean[] indexes = a.indexes();
+ assertNotNull(indexes);
+ assertEquals(4, indexes.length);
+ assertFalse(indexes[0]);
+ assertTrue(indexes[1]);
+ assertFalse(indexes[2]);
+ assertTrue(indexes[3]);
}
@Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
index af29f06..6da29f8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
@@ -790,13 +790,13 @@
}
@Test // DRILL-6944
- public void testMapTypeFullyQualifiedInNewlyCreatedView() throws Exception {
+ public void testMapTypeTreatedAsAnyInNewlyCreatedView() throws Exception {
try {
test("CREATE VIEW dfs.tmp.`mapf_view` AS SELECT `mapf` FROM dfs.`avro/map_string_to_long.avro`");
testPlanWithAttributesMatchingPatterns("SELECT * FROM dfs.tmp.`mapf_view`", new String[]{
- "Screen : rowType = RecordType\\(\\(VARCHAR\\(65535\\), BIGINT\\) MAP mapf\\)",
- "Project\\(mapf=\\[\\$0\\]\\) : rowType = RecordType\\(\\(VARCHAR\\(65535\\), BIGINT\\) MAP mapf\\)",
- "Scan.*avro/map_string_to_long.avro.*rowType = RecordType\\(\\(VARCHAR\\(65535\\), BIGINT\\) MAP mapf\\)"
+ "Screen : rowType = RecordType\\(ANY mapf\\)",
+ "Project\\(mapf=\\[\\$0\\]\\) : rowType = RecordType\\(ANY mapf\\)",
+ "Scan.*avro/map_string_to_long.avro.*rowType = RecordType\\(ANY mapf\\)"
}, null);
} finally {
test("DROP VIEW IF EXISTS dfs.tmp.`mapf_view`");
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
index b4c8b14..4aca1a8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
@@ -19,7 +19,7 @@
import org.apache.drill.PlanTestBase;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.store.avro.AvroTestUtil;
+import org.apache.drill.exec.store.avro.AvroDataGenerator;
import org.junit.Test;
import java.nio.file.Paths;
@@ -63,8 +63,8 @@
@Test
public void testAvro() throws Exception {
- AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues(5);
- String file = testSetup.getFileName();
+ AvroDataGenerator dataGenerator = new AvroDataGenerator(dirTestWatcher);
+ String file = dataGenerator.generateSimplePrimitiveSchema_NoNullValues(5).getFileName();
testPhysicalPlanSubmission(
String.format("select * from dfs.`%s`", file),
String.format("select * from table(dfs.`%s`(type=>'avro'))", file)
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroDataGenerator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroDataGenerator.java
new file mode 100644
index 0000000..b737092
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroDataGenerator.java
@@ -0,0 +1,853 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.util.JsonStringHashMap;
+import org.apache.drill.exec.util.Text;
+import org.apache.drill.test.BaseDirTestWatcher;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Paths;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Utilities for generating Avro test data.
+ */
+public class AvroDataGenerator {
+
+ public static final int RECORD_COUNT = 50;
+ public static int ARRAY_SIZE = 4;
+
+ private final BaseDirTestWatcher dirTestWatcher;
+
+ public AvroDataGenerator(BaseDirTestWatcher dirTestWatcher) {
+ this.dirTestWatcher = dirTestWatcher;
+ }
+
+ /**
+ * Class to write records to an Avro file while simultaneously
+ * constructing a corresponding list of records in the format taken in
+ * by the Drill test builder to describe expected results.
+ */
+ public static class AvroTestRecordWriter implements Closeable {
+
+ private final List<Map<String, Object>> expectedRecords;
+ private final Schema schema;
+ private final DataFileWriter<GenericData.Record> writer;
+ private final String filePath;
+ private final String fileName;
+
+ private GenericData.Record currentAvroRecord;
+ private Map<String, Object> currentExpectedRecord;
+
+ public AvroTestRecordWriter(Schema schema, File file) {
+ writer = new DataFileWriter<>(new GenericDatumWriter<>(schema));
+ try {
+ writer.create(schema, file);
+ } catch (IOException e) {
+ throw new RuntimeException("Error creating file in Avro test setup.", e);
+ }
+ this.schema = schema;
+ currentExpectedRecord = new TreeMap<>();
+ expectedRecords = new ArrayList<>();
+ filePath = file.getAbsolutePath();
+ fileName = file.getName();
+ }
+
+ public void startRecord() {
+ currentAvroRecord = new GenericData.Record(schema);
+ currentExpectedRecord = new TreeMap<>();
+ }
+
+ public void put(String key, Object value) {
+ currentAvroRecord.put(key, value);
+ // convert binary values into byte[], the format they will be given
+ // in the Drill result set in the test framework
+ currentExpectedRecord.put("`" + key + "`", convertAvroValToDrill(value, true));
+ }
+
+ // TODO - fix this the test wrapper to prevent the need for this hack
+ // to make the root behave differently than nested fields for String vs. Text
+ private Object convertAvroValToDrill(Object value, boolean root) {
+ if (value instanceof ByteBuffer) {
+ ByteBuffer bb = ((ByteBuffer) value);
+ byte[] drillVal = new byte[((ByteBuffer) value).remaining()];
+ bb.get(drillVal);
+ bb.position(0);
+ value = drillVal;
+ } else if (!root && value instanceof CharSequence) {
+ value = new Text(value.toString());
+ } else if (value instanceof GenericData.Array) {
+ GenericData.Array<?> array = ((GenericData.Array<?>) value);
+ JsonStringArrayList<Object> drillList = new JsonStringArrayList<>();
+ for (Object o : array) {
+ drillList.add(convertAvroValToDrill(o, false));
+ }
+ value = drillList;
+ } else if (value instanceof GenericData.EnumSymbol) {
+ value = value.toString();
+ } else if (value instanceof GenericData.Record) {
+ GenericData.Record rec = ((GenericData.Record) value);
+ JsonStringHashMap<String, Object> newRecord = new JsonStringHashMap<>();
+ for (Schema.Field field : rec.getSchema().getFields()) {
+ Object val = rec.get(field.name());
+ newRecord.put(field.name(), convertAvroValToDrill(val, false));
+ }
+ value = newRecord;
+ }
+ return value;
+ }
+
+ public void endRecord() throws IOException {
+ writer.append(currentAvroRecord);
+ expectedRecords.add(currentExpectedRecord);
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public List<Map<String, Object>> getExpectedRecords() {
+ return expectedRecords;
+ }
+ }
+
+
+ public AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues() throws Exception {
+ return generateSimplePrimitiveSchema_NoNullValues(RECORD_COUNT);
+ }
+
+ public AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues(int numRecords) throws Exception {
+ return generateSimplePrimitiveSchema_NoNullValues(numRecords, "");
+ }
+
+ /**
+ * Generates Avro table with specified rows number.
+ *
+ * @param numRecords rows number in the table
+ * @param tablePath table path
+ * @return AvroTestRecordWriter instance
+ */
+ public AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues(int numRecords, String tablePath) throws Exception {
+ Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("b_int").type().intType().noDefault()
+ .name("c_long").type().longType().noDefault()
+ .name("d_float").type().floatType().noDefault()
+ .name("e_double").type().doubleType().noDefault()
+ .name("f_bytes").type().bytesType().noDefault()
+ .name("g_null").type().nullType().noDefault()
+ .name("h_boolean").type().booleanType().noDefault()
+ .endRecord();
+
+ File file = File.createTempFile("avro-simple-primitive-no-nulls-test", ".avro",
+ dirTestWatcher.makeRootSubDir(Paths.get(tablePath)));
+ try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+ ByteBuffer bb = ByteBuffer.allocate(2);
+ bb.put(0, (byte) 'a');
+ for (int i = 0; i < numRecords; i++) {
+ bb.put(1, (byte) ('0' + (i % 10)));
+ bb.position(0);
+ record.startRecord();
+ record.put("a_string", "a_" + i);
+ record.put("b_int", i);
+ record.put("c_long", (long) i);
+ record.put("d_float", (float) i);
+ record.put("e_double", (double) i);
+ record.put("f_bytes", bb);
+ record.put("g_null", null);
+ record.put("h_boolean", (i % 2 == 0));
+ record.endRecord();
+ }
+ return record;
+ }
+ }
+
+ public AvroTestRecordWriter generateUnionSchema_WithNullValues() throws Exception {
+ Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("b_int").type().intType().noDefault()
+ .name("c_long").type().longType().noDefault()
+ .name("d_float").type().floatType().noDefault()
+ .name("e_double").type().doubleType().noDefault()
+ .name("f_bytes").type().bytesType().noDefault()
+ .name("g_null").type().nullType().noDefault()
+ .name("h_boolean").type().booleanType().noDefault()
+ .name("i_union").type().optional().doubleType()
+ .endRecord();
+
+ File file = File.createTempFile("avro-simple-primitive-with-nulls-test", ".avro", dirTestWatcher.getRootDir());
+ try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+ ByteBuffer bb = ByteBuffer.allocate(1);
+ bb.put(0, (byte) 1);
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ record.startRecord();
+ record.put("a_string", "a_" + i);
+ record.put("b_int", i);
+ record.put("c_long", (long) i);
+ record.put("d_float", (float) i);
+ record.put("e_double", (double) i);
+ record.put("f_bytes", bb);
+ record.put("g_null", null);
+ record.put("h_boolean", (i % 2 == 0));
+ record.put("i_union", (i % 2 == 0 ? (double) i : null));
+ record.endRecord();
+ }
+ return record;
+ }
+ }
+
+ public AvroTestRecordWriter generateUnionSchema_WithNonNullValues() throws Exception {
+ Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("i_union").type().unionOf().doubleType().and().longType().endUnion().noDefault()
+ .endRecord();
+
+ File file = File.createTempFile("avro-complex-union-no-nulls-test", ".avro", dirTestWatcher.getRootDir());
+ try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ record.startRecord();
+ record.put("a_string", "a_" + i);
+ record.put("i_union", (i % 2 == 0 ? (double) i : (long) i));
+ record.endRecord();
+ }
+ return record;
+ }
+ }
+
+ public AvroTestRecordWriter generateSimpleEnumSchema_NoNullValues() throws Exception {
+ String[] symbols = { "E_SYM_A", "E_SYM_B", "E_SYM_C", "E_SYM_D" };
+
+ Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("b_enum").type().enumeration("my_enum").symbols(symbols).noDefault()
+ .endRecord();
+
+ File file = File.createTempFile("avro-primitive-with-enum-no-nulls-test", ".avro", dirTestWatcher.getRootDir());
+ Schema enumSchema = schema.getField("b_enum").schema();
+
+ try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ record.startRecord();
+ record.put("a_string", "a_" + i);
+ GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(enumSchema, symbols[(i + symbols.length) % symbols.length]);
+ record.put("b_enum", symbol);
+ record.endRecord();
+ }
+ return record;
+ }
+ }
+
+ public AvroTestRecordWriter generateSimpleArraySchema_NoNullValues() throws Exception {
+ File file = File.createTempFile("avro-simple-array-no-nulls-test", ".avro", dirTestWatcher.getRootDir());
+ Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("b_int").type().intType().noDefault()
+ .name("c_string_array").type().array().items().stringType().noDefault()
+ .name("d_int_array").type().array().items().intType().noDefault()
+ .name("e_float_array").type().array().items().floatType().noDefault()
+ .endRecord();
+
+ try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ record.startRecord();
+ record.put("a_string", "a_" + i);
+ record.put("b_int", i);
+ {
+ GenericArray<String> array = new GenericData.Array<>(ARRAY_SIZE, schema.getField("c_string_array").schema());
+ for (int j = 0; j < ARRAY_SIZE; j++) {
+ array.add(j, "c_string_array_" + i + "_" + j);
+ }
+ record.put("c_string_array", array);
+ }
+ {
+ GenericArray<Integer> array = new GenericData.Array<>(ARRAY_SIZE, schema.getField("d_int_array").schema());
+ for (int j = 0; j < ARRAY_SIZE; j++) {
+ array.add(j, i * j);
+ }
+ record.put("d_int_array", array);
+ }
+ {
+ GenericArray<Float> array = new GenericData.Array<>(ARRAY_SIZE, schema.getField("e_float_array").schema());
+ for (int j = 0; j < ARRAY_SIZE; j++) {
+ array.add(j, (float) (i * j));
+ }
+ record.put("e_float_array", array);
+ }
+ record.endRecord();
+ }
+ return record;
+ }
+ }
+
+ public AvroTestRecordWriter generateSimpleNestedSchema_NoNullValues() throws Exception {
+ File file = File.createTempFile("avro-simple-nested-no-nulls-test", ".avro", dirTestWatcher.getRootDir());
+ Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("b_int").type().intType().noDefault()
+ .name("c_record").type().record("my_record_1")
+ .namespace("foo.blah.org")
+ .fields()
+ .name("nested_1_string").type().stringType().noDefault()
+ .name("nested_1_int").type().intType().noDefault()
+ .endRecord()
+ .noDefault()
+ .endRecord();
+
+ Schema nestedSchema = schema.getField("c_record").schema();
+
+ try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ record.startRecord();
+ record.put("a_string", "a_" + i);
+ record.put("b_int", i);
+
+ GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+ nestedRecord.put("nested_1_string", "nested_1_string_" + i);
+ nestedRecord.put("nested_1_int", i * i);
+
+ record.put("c_record", nestedRecord);
+ record.endRecord();
+ }
+ return record;
+ }
+ }
+
+ public AvroTestRecordWriter generateUnionNestedArraySchema_withNullValues() throws Exception {
+ File file = File.createTempFile("avro-nested-with-nulls-test", ".avro", dirTestWatcher.getRootDir());
+ Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("b_int").type().intType().noDefault()
+ .name("c_array").type().optional().array().items().record("my_record_1")
+ .namespace("foo.blah.org").fields()
+ .name("nested_1_string").type().optional().stringType()
+ .name("nested_1_int").type().optional().intType()
+ .endRecord()
+ .endRecord();
+
+ Schema nestedSchema = schema.getField("c_array").schema();
+ Schema arraySchema = nestedSchema.getTypes().get(1);
+ Schema itemSchema = arraySchema.getElementType();
+
+ try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ record.startRecord();
+ record.put("a_string", "a_" + i);
+ record.put("b_int", i);
+
+ if (i % 2 == 0) {
+ GenericArray<GenericRecord> array = new GenericData.Array<>(1, arraySchema);
+ GenericRecord nestedRecord = new GenericData.Record(itemSchema);
+ nestedRecord.put("nested_1_string", "nested_1_string_" + i);
+ nestedRecord.put("nested_1_int", i * i);
+ array.add(nestedRecord);
+ record.put("c_array", array);
+ }
+ record.endRecord();
+ }
+ return record;
+ }
+ }
+
+ public AvroTestRecordWriter generateNestedArraySchema() throws IOException {
+ return generateNestedArraySchema(RECORD_COUNT, ARRAY_SIZE);
+ }
+
+ public AvroTestRecordWriter generateNestedArraySchema(int numRecords, int numArrayItems) throws IOException {
+ File file = File.createTempFile("avro-nested-array-test", ".avro", dirTestWatcher.getRootDir());
+ Schema schema = SchemaBuilder.record("AvroRecordReaderTest").namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_int").type().intType().noDefault()
+ .name("b_array").type().array().items()
+ .record("my_record_1").namespace("foo.blah.org")
+ .fields()
+ .name("nested_1_int").type().optional().intType()
+ .endRecord()
+ .arrayDefault(Collections.emptyList())
+ .endRecord();
+
+ Schema arraySchema = schema.getField("b_array").schema();
+ Schema itemSchema = arraySchema.getElementType();
+
+ try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+ for (int i = 0; i < numRecords; i++) {
+ record.startRecord();
+ record.put("a_int", i);
+ GenericArray<GenericRecord> array = new GenericData.Array<>(ARRAY_SIZE, arraySchema);
+
+ for (int j = 0; j < numArrayItems; j++) {
+ GenericRecord nestedRecord = new GenericData.Record(itemSchema);
+ nestedRecord.put("nested_1_int", j);
+ array.add(nestedRecord);
+ }
+ record.put("b_array", array);
+ record.endRecord();
+ }
+ return record;
+ }
+ }
+
+ public AvroTestRecordWriter generateMapSchema_withNullValues() throws Exception {
+ File file = File.createTempFile("avro-map-with-nulls-test", ".avro", dirTestWatcher.getRootDir());
+ Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("b_int").type().intType().noDefault()
+ .name("c_map").type().optional().map().values(Schema.create(Type.STRING))
+ .endRecord();
+
+ try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ record.startRecord();
+ record.put("a_string", "a_" + i);
+ record.put("b_int", i);
+
+ if (i % 2 == 0) {
+ Map<String, String> strMap = new HashMap<>();
+ strMap.put("key1", "nested_1_string_" + i);
+ strMap.put("key2", "nested_1_string_" + (i + 1 ));
+ record.put("c_map", strMap);
+ }
+ record.endRecord();
+ }
+ return record;
+ }
+ }
+
+ public AvroTestRecordWriter generateMapSchemaComplex_withNullValues() throws Exception {
+ File file = File.createTempFile("avro-map-complex-with-nulls-test", ".avro", dirTestWatcher.getRootDir());
+ Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("b_int").type().intType().noDefault()
+ .name("c_map").type().optional().map().values(Schema.create(Type.STRING))
+ .name("d_map").type().optional().map().values(Schema.createArray(Schema.create(Type.DOUBLE)))
+ .endRecord();
+
+ Schema arrayMapSchema = schema.getField("d_map").schema();
+ Schema arrayItemSchema = arrayMapSchema.getTypes().get(1).getValueType();
+
+ try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ record.startRecord();
+ record.put("a_string", "a_" + i);
+ record.put("b_int", i);
+
+ if (i % 2 == 0) {
+ Map<String, String> c_map = new HashMap<>();
+ c_map.put("key1", "nested_1_string_" + i);
+ c_map.put("key2", "nested_1_string_" + (i + 1 ));
+ record.put("c_map", c_map);
+ } else {
+ Map<String, GenericArray<Double>> d_map = new HashMap<>();
+ GenericArray<Double> array = new GenericData.Array<>(ARRAY_SIZE, arrayItemSchema);
+ for (int j = 0; j < ARRAY_SIZE; j++) {
+ array.add((double)j);
+ }
+ d_map.put("key1", array);
+ d_map.put("key2", array);
+
+ record.put("d_map", d_map);
+ }
+ record.endRecord();
+ }
+ return record;
+ }
+ }
+
+ public AvroTestRecordWriter generateUnionNestedSchema_withNullValues() throws Exception {
+ File file = File.createTempFile("avro-nested-with-nulls-test", ".avro", dirTestWatcher.getRootDir());
+ Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("b_int").type().intType().noDefault()
+ .name("c_record").type().optional().record("my_record_1")
+ .namespace("foo.blah.org").fields()
+ .name("nested_1_string").type().optional().stringType()
+ .name("nested_1_int").type().optional().intType()
+ .endRecord()
+ .endRecord();
+
+ Schema nestedSchema = schema.getField("c_record").schema();
+ Schema optionalSchema = nestedSchema.getTypes().get(1);
+
+ try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ record.startRecord();
+ record.put("a_string", "a_" + i);
+ record.put("b_int", i);
+
+ if (i % 2 == 0) {
+ GenericRecord nestedRecord = new GenericData.Record(optionalSchema);
+ nestedRecord.put("nested_1_string", "nested_1_string_" + i);
+ nestedRecord.put("nested_1_int", i * i);
+ record.put("c_record", nestedRecord);
+ }
+ record.endRecord();
+ }
+ return record;
+ }
+ }
+
+ public AvroTestRecordWriter generateDoubleNestedSchema_NoNullValues() throws Exception {
+ File file = File.createTempFile("avro-double-nested-no-nulls-test", ".avro", dirTestWatcher.getRootDir());
+ Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringType().noDefault()
+ .name("b_int").type().intType().noDefault()
+ .name("c_record").type().record("my_record_1")
+ .namespace("foo.blah.org")
+ .fields()
+ .name("nested_1_string").type().stringType().noDefault()
+ .name("nested_1_int").type().intType().noDefault()
+ .name("nested_1_record").type().record("my_double_nested_record_1")
+ .namespace("foo.blah.org.rot")
+ .fields()
+ .name("double_nested_1_string").type().stringType().noDefault()
+ .name("double_nested_1_int").type().intType().noDefault()
+ .endRecord()
+ .noDefault()
+ .endRecord()
+ .noDefault()
+ .endRecord();
+
+ Schema nestedSchema = schema.getField("c_record").schema();
+ Schema doubleNestedSchema = nestedSchema.getField("nested_1_record").schema();
+
+ try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ record.startRecord();
+ record.put("a_string", "a_" + i);
+ record.put("b_int", i);
+
+ GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
+ nestedRecord.put("nested_1_string", "nested_1_string_" + i);
+ nestedRecord.put("nested_1_int", i * i);
+
+ GenericRecord doubleNestedRecord = new GenericData.Record(doubleNestedSchema);
+ doubleNestedRecord.put("double_nested_1_string", "double_nested_1_string_" + i + "_" + i);
+ doubleNestedRecord.put("double_nested_1_int", i * i * i);
+
+ nestedRecord.put("nested_1_record", doubleNestedRecord);
+ record.put("c_record", nestedRecord);
+
+ record.endRecord();
+ }
+ return record;
+ }
+ }
+
+ public String generateLinkedList(int numRows) throws Exception {
+ File file = File.createTempFile("avro-linked-list-test", ".avro", dirTestWatcher.getRootDir());
+ Schema schema = SchemaBuilder.record("LongList")
+ .namespace("org.apache.drill.exec.store.avro")
+ .aliases("LinkedLongs")
+ .fields()
+ .name("value").type().optional().longType()
+ .name("next").type().optional().type("LongList")
+ .endRecord();
+
+ try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<>(schema))) {
+ writer.create(schema, file);
+ GenericRecord previousRecord = null;
+ for (int i = 0; i < numRows; i++) {
+ GenericRecord record = (GenericRecord) (previousRecord == null ? new GenericData.Record(schema) : previousRecord.get("next"));
+ record.put("value", (long) i);
+ if (previousRecord != null) {
+ writer.append(previousRecord);
+ }
+ GenericRecord nextRecord = new GenericData.Record(record.getSchema());
+ record.put("next", nextRecord);
+ previousRecord = record;
+ }
+ writer.append(previousRecord);
+ }
+ return file.getName();
+ }
+
+ public AvroTestRecordWriter generateStringAndUtf8Data() throws Exception {
+ Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("a_string").type().stringBuilder().prop("avro.java.string", "String").endString().noDefault()
+ .name("b_utf8").type().stringType().noDefault()
+ .endRecord();
+
+ File file = File.createTempFile("avro-string-utf8-test", ".avro", dirTestWatcher.getRootDir());
+ try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ record.startRecord();
+ record.put("a_string", "a_" + i);
+ record.put("b_utf8", "b_" + i);
+ record.endRecord();
+ }
+ return record;
+ }
+ }
+
+ public AvroTestRecordWriter generateMapSchema() throws Exception {
+ File file = File.createTempFile("avro-map-test", ".avro", dirTestWatcher.getRootDir());
+ Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
+ .namespace("org.apache.drill.exec.store.avro")
+ .fields()
+ .name("map_field").type().optional().map().values(Schema.create(Type.LONG))
+ .name("map_array").type().optional().array().items(Schema.createMap(Schema.create(Type.INT)))
+ .name("map_array_value").type().optional().map().values(Schema.createArray(Schema.create(Type.DOUBLE)))
+ .endRecord();
+
+ Schema mapArraySchema = schema.getField("map_array").schema();
+ Schema arrayItemSchema = mapArraySchema.getTypes().get(1);
+
+ try (AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file)) {
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ record.startRecord();
+
+ // Create map with long values
+ Map<String, Long> map = new HashMap<>();
+ map.put("key1", (long) i);
+ map.put("key2", (long) i + 1);
+ record.put("map_field", map);
+
+ // Create list of map with int values
+ GenericArray<Map<String, Integer>> array = new GenericData.Array<>(ARRAY_SIZE, arrayItemSchema);
+ for (int j = 0; j < ARRAY_SIZE; j++) {
+ Map<String, Integer> mapInt = new HashMap<>();
+ mapInt.put("key1", (i + 1) * (j + 50));
+ mapInt.put("key2", (i + 1) * (j + 100));
+ array.add(mapInt);
+ }
+ record.put("map_array", array);
+
+ // create map with array value
+ Map<String, GenericArray<Double>> mapArrayValue = new HashMap<>();
+ GenericArray<Double> doubleArray = new GenericData.Array<>(ARRAY_SIZE, arrayItemSchema);
+ for (int j = 0; j < ARRAY_SIZE; j++) {
+ doubleArray.add((double) (i + 1) * j);
+ }
+ mapArrayValue.put("key1", doubleArray);
+ mapArrayValue.put("key2", doubleArray);
+ record.put("map_array_value", mapArrayValue);
+
+ record.endRecord();
+ }
+ return record;
+ }
+ }
+
+ public String generateDecimalData(int numRecords) throws Exception {
+ File file = File.createTempFile("avro-decimal-test", ".avro", dirTestWatcher.getRootDir());
+ Schema decBytes = LogicalTypes.decimal(10, 2)
+ .addToSchema(SchemaBuilder.builder().bytesType());
+
+ Schema decFixed = LogicalTypes.decimal(5, 2)
+ .addToSchema(SchemaBuilder.builder().fixed("dec_fixed").size(5));
+
+ Schema schema = SchemaBuilder.record("rec")
+ .fields()
+ .name("col_dec_pos_bytes").type(decBytes).noDefault()
+ .name("col_dec_neg_bytes").type(decBytes).noDefault()
+ .name("col_dec_pos_fixed").type(decFixed).noDefault()
+ .name("col_dec_neg_fixed").type(decFixed).noDefault()
+ .endRecord();
+
+ try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<>(schema))) {
+ writer.create(schema, file);
+ for (int i = 0; i < numRecords; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+
+ ByteBuffer posBytes = ByteBuffer.wrap(BigInteger.valueOf(100 + i).toByteArray());
+ record.put("col_dec_pos_bytes", posBytes);
+
+ ByteBuffer negBytes = ByteBuffer.wrap(BigInteger.valueOf(-200 + i).toByteArray());
+ record.put("col_dec_neg_bytes", negBytes);
+
+ byte[] posFixedBytes = new byte[5];
+ byte[] posValueBytes = BigInteger.valueOf(300 + i).toByteArray();
+ int posDiff = posFixedBytes.length - posValueBytes.length;
+ assert posDiff > -1;
+ System.arraycopy(posValueBytes, 0, posFixedBytes, posDiff, posValueBytes.length);
+ Arrays.fill(posFixedBytes, 0, posDiff, (byte) 0);
+
+ GenericData.Fixed posFixed = new GenericData.Fixed(decFixed, posFixedBytes);
+ record.put("col_dec_pos_fixed", posFixed);
+
+ byte[] negFixedBytes = new byte[5];
+ byte[] negValueBytes = BigInteger.valueOf(-400 + i).toByteArray();
+ int negDiff = negFixedBytes.length - negValueBytes.length;
+ assert negDiff > -1;
+ System.arraycopy(negValueBytes, 0, negFixedBytes, negDiff, negValueBytes.length);
+ Arrays.fill(negFixedBytes, 0, negDiff, (byte) -1);
+
+ GenericData.Fixed negFixed = new GenericData.Fixed(decFixed, negFixedBytes);
+ record.put("col_dec_neg_fixed", negFixed);
+
+ writer.append(record);
+ }
+ }
+ return file.getName();
+ }
+
+ public String generateDateTimeData(LocalDateTime dateTime) throws Exception {
+ File file = File.createTempFile("avro-date-time-test", ".avro", dirTestWatcher.getRootDir());
+
+ Schema timestampMillis = LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType());
+ Schema timestampMicros = LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
+ Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+ Schema timeMillis = LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
+ Schema timeMicros = LogicalTypes.timeMicros().addToSchema(SchemaBuilder.builder().longType());
+
+ Schema schema = SchemaBuilder.record("rec")
+ .fields()
+ .name("col_timestamp_millis").type(timestampMillis).noDefault()
+ .name("col_timestamp_micros").type(timestampMicros).noDefault()
+ .name("col_date").type(date).noDefault()
+ .name("col_time_millis").type(timeMillis).noDefault()
+ .name("col_time_micros").type(timeMicros).noDefault()
+ .endRecord();
+
+ try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<>(schema))) {
+ writer.create(schema, file);
+ GenericRecord record = new GenericData.Record(schema);
+ long timestampMillisValue = dateTime.toInstant(ZoneOffset.UTC).toEpochMilli();
+ record.put("col_timestamp_millis", timestampMillisValue);
+ record.put("col_timestamp_micros", timestampMillisValue * 1000);
+ record.put("col_date", dateTime.toLocalDate().toEpochDay());
+ long startOfDayMillis = dateTime.toLocalDate().atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli();
+ long timeMillisValue = timestampMillisValue - startOfDayMillis;
+ record.put("col_time_millis", timeMillisValue);
+ record.put("col_time_micros", timeMillisValue * 1000);
+ writer.append(record);
+ }
+ return file.getName();
+ }
+
+ public String generateDuration(int numRows) throws Exception {
+ File file = File.createTempFile("avro-duration-test", ".avro", dirTestWatcher.getRootDir());
+
+ Schema durationSchema = new LogicalType("duration")
+ .addToSchema(SchemaBuilder.builder().fixed("duration_fixed").size(12));
+
+ Schema schema = SchemaBuilder.record("record")
+ .fields()
+ .name("col_duration").type(durationSchema).noDefault()
+ .endRecord();
+
+ try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<>(schema))) {
+ writer.create(schema, file);
+ for (int i = 0; i < numRows; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+
+ ByteBuffer bb = ByteBuffer.allocate(12);
+ bb.order(ByteOrder.LITTLE_ENDIAN);
+ bb.putInt(10 + i); // month
+ bb.putInt(100 + i); // days
+ bb.putInt(1000 + i); // milliseconds
+
+ GenericData.Fixed fixed = new GenericData.Fixed(durationSchema, bb.array());
+ record.put("col_duration", fixed);
+ writer.append(record);
+ }
+ }
+ return file.getName();
+ }
+
+ public String generateMultiDimensionalArray(int numRecords, int arraySize) throws Exception {
+ File file = File.createTempFile("avro-multi-dimensional-array-test", ".avro", dirTestWatcher.getRootDir());
+
+ String colTwoDimsName = "col_array_two_dims";
+ Schema schema = SchemaBuilder.record("rec")
+ .fields()
+ .name(colTwoDimsName).type()
+ .array().items()
+ .array().items()
+ .stringType()
+ .noDefault()
+ .endRecord();
+
+ try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<>(schema))) {
+ writer.create(schema, file);
+
+ for (int i = 0; i < numRecords; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ Schema twoDimsSchema = schema.getField(colTwoDimsName).schema();
+ GenericArray<GenericArray<String>> arrayTwoDims = new GenericData.Array<>(numRecords, twoDimsSchema);
+ for (int a = 0; a < arraySize; a++) {
+ GenericArray<String> nestedArray = new GenericData.Array<>(2, twoDimsSchema.getElementType());
+ nestedArray.add(String.format("val_%s_%s_0", i, a));
+ nestedArray.add(String.format("val_%s_%s_1", i, a));
+ arrayTwoDims.add(nestedArray);
+ }
+ record.put(colTwoDimsName, arrayTwoDims);
+ writer.append(record);
+ }
+ }
+
+ return file.getName();
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
index 5a634fd..27fd926 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java
@@ -17,243 +17,123 @@
*/
package org.apache.drill.exec.store.avro;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.ARRAY_SIZE;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.RECORD_COUNT;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateDoubleNestedSchema_NoNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateLinkedList;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateMapSchema;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateMapSchemaComplex_withNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateMapSchema_withNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateNestedArraySchema;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateSimpleArraySchema_NoNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateSimpleEnumSchema_NoNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateSimpleNestedSchema_NoNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateStringAndUtf8Data;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateUnionNestedArraySchema_withNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateUnionNestedSchema_withNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateUnionSchema_WithNonNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.generateUnionSchema_WithNullValues;
-import static org.apache.drill.exec.store.avro.AvroTestUtil.write;
-import static org.apache.drill.test.TestBuilder.listOf;
-import static org.apache.drill.test.TestBuilder.mapOfObject;
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.util.JsonStringHashMap;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.TestBuilder;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.Period;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.io.File;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import org.apache.avro.specific.TestRecordWithLogicalTypes;
-import org.apache.commons.io.FileUtils;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.exceptions.UserRemoteException;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ExecTest;
-import org.apache.drill.exec.expr.fn.impl.DateUtility;
-import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.util.JsonStringHashMap;
-import org.apache.drill.exec.work.ExecErrorConstants;
-import org.apache.drill.test.BaseTestQuery;
-import org.apache.drill.test.TestBuilder;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import static org.apache.drill.test.TestBuilder.listOf;
+import static org.apache.drill.test.TestBuilder.mapOf;
+import static org.apache.drill.test.TestBuilder.mapOfObject;
/**
* Unit tests for Avro record reader.
*/
-public class AvroFormatTest extends BaseTestQuery {
+public class AvroFormatTest extends ClusterTest {
private static String mapTableName;
+ private static AvroDataGenerator dataGenerator;
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
@BeforeClass
- public static void init() throws Exception {
+ public static void setup() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+ startCluster(builder);
+ dataGenerator = new AvroDataGenerator(dirTestWatcher);
// Create temporary table containing map and map array
- mapTableName = generateMapSchema().getFileName();
- }
-
- // XXX
- // 1. Need to test nested field names with same name as top-level names for conflict.
- // 2. Avro supports recursive types? Can we test this?
-
- @Test
- public void testBatchCutoff() throws Exception {
- final AvroTestUtil.AvroTestRecordWriter testSetup = generateSimplePrimitiveSchema_NoNullValues(5000);
- final String file = testSetup.getFileName();
- final String sql = "select a_string, b_int, c_long, d_float, e_double, f_bytes, h_boolean, g_null from dfs.`%s`";
- test(sql, file);
- testBuilder()
- .sqlQuery(sql, file)
- .unOrdered()
- .expectsNumBatches(2)
- .baselineRecords(testSetup.getExpectedRecords())
- .go();
- }
-
- /**
- * Previously a bug in the Avro table metadata would cause wrong results
- * for some queries on varchar types, as a length was not provided during metadata
- * population. In some cases casts were being added with the default length
- * of 1 and truncating values.
- *
- * @throws Exception
- */
- @Test
- public void testFiltersOnVarchar() throws Exception {
- final String file = generateSimplePrimitiveSchema_NoNullValues(5000).getFileName();
- final String sql = "select a_string from dfs.`%s` where a_string = 'a_1'";
- testBuilder()
- .sqlQuery(sql, file)
- .unOrdered()
- .baselineColumns("a_string")
- .baselineValues("a_1")
- .go();
-
- final String sql2 = "select a_string from dfs.`%s` where a_string IN ('a_1')";
- testBuilder()
- .sqlQuery(sql2, file)
- .unOrdered()
- .baselineColumns("a_string")
- .baselineValues("a_1")
- .go();
- }
-
- @Test
- public void testFiltersOnVarBinary() throws Exception {
- final String file = generateSimplePrimitiveSchema_NoNullValues(5000).getFileName();
- final String sql = "select f_bytes from dfs.`%s` where f_bytes = BINARY_STRING('\\x61\\x31')";
- TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql, file)
- .unOrdered()
- .baselineColumns("f_bytes");
-
- for (int i = 0; i < 500; i++) {
- testBuilder.baselineValues(new byte[] {'a', '1'});
- }
- testBuilder.go();
-
- final String sql2 = "select f_bytes from dfs.`%s` where f_bytes IN (BINARY_STRING('\\x61\\x31'))";
- testBuilder = testBuilder()
- .sqlQuery(sql2, file)
- .unOrdered()
- .baselineColumns("f_bytes");
-
- for (int i = 0; i < 500; i++) {
- testBuilder.baselineValues(new byte[] {'a', '1'});
- }
- testBuilder.go();
+ mapTableName = dataGenerator.generateMapSchema().getFileName();
}
@Test
public void testSimplePrimitiveSchema_NoNullValues() throws Exception {
- final AvroTestUtil.AvroTestRecordWriter testSetup = generateSimplePrimitiveSchema_NoNullValues();
- final String file = testSetup.getFileName();
- final String sql = "select a_string, b_int, c_long, d_float, e_double, f_bytes, h_boolean, g_null from dfs.`%s`";
- test(sql, file);
+ AvroDataGenerator.AvroTestRecordWriter testSetup = dataGenerator.generateSimplePrimitiveSchema_NoNullValues();
+ String file = testSetup.getFileName();
+ String sql = "select a_string, b_int, c_long, d_float, e_double, f_bytes, h_boolean, g_null from dfs.`%s`";
testBuilder()
- .sqlQuery(sql, file)
- .unOrdered()
- .baselineRecords(testSetup.getExpectedRecords())
- .go();
+ .sqlQuery(sql, file)
+ .unOrdered()
+ .baselineRecords(testSetup.getExpectedRecords())
+ .go();
}
@Test
public void testSimplePrimitiveSchema_StarQuery() throws Exception {
- simpleAvroTestHelper(generateSimplePrimitiveSchema_NoNullValues(), "select * from dfs.`%s`");
- }
-
- private List<Map<String, Object>> project(
- List<Map<String,Object>> incomingRecords,
- List<String> projectCols) {
- List<Map<String,Object>> output = Lists.newArrayList();
- for (Map<String, Object> incomingRecord : incomingRecords) {
- final JsonStringHashMap<String, Object> newRecord = new JsonStringHashMap<>();
- for (String s : incomingRecord.keySet()) {
- if (projectCols.contains(s)) {
- newRecord.put(s, incomingRecord.get(s));
- }
- }
- output.add(newRecord);
- }
- return output;
+ simpleAvroTestHelper(dataGenerator.generateSimplePrimitiveSchema_NoNullValues());
}
@Test
public void testSimplePrimitiveSchema_SelectColumnSubset() throws Exception {
- final AvroTestUtil.AvroTestRecordWriter testSetup = generateSimplePrimitiveSchema_NoNullValues();
- final String file = testSetup.getFileName();
- List<String> projectList = Lists.newArrayList("`h_boolean`", "`e_double`");
+ AvroDataGenerator.AvroTestRecordWriter testSetup = dataGenerator.generateSimplePrimitiveSchema_NoNullValues();
+ String file = testSetup.getFileName();
+ List<String> projectList = Arrays.asList("`h_boolean`", "`e_double`");
testBuilder()
- .sqlQuery("select h_boolean, e_double from dfs.`%s`", file)
- .unOrdered()
- .baselineRecords(project(testSetup.getExpectedRecords(), projectList))
- .go();
- }
-
- @Test
- public void testSimplePrimitiveSchema_NoColumnsExistInTheSchema() throws Exception {
- final String file = generateSimplePrimitiveSchema_NoNullValues().getFileName();
- try {
- test("select h_dummy1, e_dummy2 from dfs.`%s`", file);
- Assert.fail("Test should fail as h_dummy1 and e_dummy2 does not exist.");
- } catch(UserException ue) {
- Assert.assertTrue("Test should fail as h_dummy1 and e_dummy2 does not exist.",
- ue.getMessage().contains("Column 'h_dummy1' not found in any table"));
- }
- }
-
- @Test
- public void testSimplePrimitiveSchema_OneExistAndOneDoesNotExistInTheSchema() throws Exception {
- final String file = generateSimplePrimitiveSchema_NoNullValues().getFileName();
- try {
- test("select h_boolean, e_dummy2 from dfs.`%s`", file);
- Assert.fail("Test should fail as e_dummy2 does not exist.");
- } catch(UserException ue) {
- Assert.assertTrue("Test should fail as e_dummy2 does not exist.", true);
- }
+ .sqlQuery("select h_boolean, e_double from dfs.`%s`", file)
+ .unOrdered()
+ .baselineRecords(project(testSetup.getExpectedRecords(), projectList))
+ .go();
}
@Test
public void testImplicitColumnsWithStar() throws Exception {
- AvroTestUtil.AvroTestRecordWriter testWriter = generateSimplePrimitiveSchema_NoNullValues(1);
- final String file = testWriter.getFileName();
+ AvroDataGenerator.AvroTestRecordWriter testWriter = dataGenerator.generateSimplePrimitiveSchema_NoNullValues(1);
// removes "." and ".." from the path
- String tablePath = new File(testWriter.getFilePath()).getCanonicalPath();
+ String tablePathString = new File(testWriter.getFilePath()).getCanonicalPath();
+ Path tablePath = new Path(tablePathString);
List<Map<String, Object>> expectedRecords = testWriter.getExpectedRecords();
- expectedRecords.get(0).put("`filename`", file);
+ expectedRecords.get(0).put("`filename`", tablePath.getName());
expectedRecords.get(0).put("`suffix`", "avro");
- expectedRecords.get(0).put("`fqn`", tablePath);
- expectedRecords.get(0).put("`filepath`", new File(tablePath).getParent());
+ expectedRecords.get(0).put("`fqn`", tablePath.toUri().getPath());
+ expectedRecords.get(0).put("`filepath`", tablePath.getParent().toUri().getPath());
try {
testBuilder()
- .sqlQuery("select filename, *, suffix, fqn, filepath from dfs.`%s`", file)
- .unOrdered()
- .baselineRecords(expectedRecords)
- .go();
+ .sqlQuery("select filename, *, suffix, fqn, filepath from dfs.`%s`", tablePath.getName())
+ .unOrdered()
+ .baselineRecords(expectedRecords)
+ .go();
} finally {
- FileUtils.deleteQuietly(new File(tablePath));
+ FileUtils.deleteQuietly(new File(tablePath.toUri().getPath()));
}
}
@Test
public void testImplicitColumnAlone() throws Exception {
- AvroTestUtil.AvroTestRecordWriter testWriter = generateSimplePrimitiveSchema_NoNullValues(1);
- final String file = testWriter.getFileName();
+ AvroDataGenerator.AvroTestRecordWriter testWriter = dataGenerator.generateSimplePrimitiveSchema_NoNullValues(1);
+ String file = testWriter.getFileName();
// removes "." and ".." from the path
String tablePath = new File(testWriter.getFilePath()).getCanonicalPath();
try {
testBuilder()
- .sqlQuery("select filename from dfs.`%s`", file)
- .unOrdered()
- .baselineColumns("filename")
- .baselineValues(file)
- .go();
+ .sqlQuery("select filename from dfs.`%s`", file)
+ .unOrdered()
+ .baselineColumns("filename")
+ .baselineValues(file)
+ .go();
} finally {
FileUtils.deleteQuietly(new File(tablePath));
}
@@ -261,18 +141,17 @@
@Test
public void testImplicitColumnInWhereClause() throws Exception {
- AvroTestUtil.AvroTestRecordWriter testWriter = generateSimplePrimitiveSchema_NoNullValues(1);
- final String file = testWriter.getFileName();
+ AvroDataGenerator.AvroTestRecordWriter testWriter = dataGenerator.generateSimplePrimitiveSchema_NoNullValues(1);
+ String file = testWriter.getFileName();
// removes "." and ".." from the path
String tablePath = new File(testWriter.getFilePath()).getCanonicalPath();
- List<Map<String, Object>> expectedRecords = testWriter.getExpectedRecords();
try {
testBuilder()
- .sqlQuery("select * from dfs.`%1$s` where filename = '%1$s'", file)
- .unOrdered()
- .baselineRecords(expectedRecords)
- .go();
+ .sqlQuery("select * from dfs.`%1$s` where filename = '%1$s'", file)
+ .unOrdered()
+ .baselineRecords(testWriter.getExpectedRecords())
+ .go();
} finally {
FileUtils.deleteQuietly(new File(tablePath));
}
@@ -280,302 +159,234 @@
@Test
public void testPartitionColumn() throws Exception {
- setSessionOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL, "directory");
+ client.alterSession(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL, "directory");
String file = "avroTable";
String partitionColumn = "2018";
- AvroTestUtil.AvroTestRecordWriter testWriter =
- generateSimplePrimitiveSchema_NoNullValues(1, FileUtils.getFile(file, partitionColumn).getPath());
+ String tablePath = FileUtils.getFile(file, partitionColumn).getPath();
+ AvroDataGenerator.AvroTestRecordWriter testWriter = dataGenerator.generateSimplePrimitiveSchema_NoNullValues(1, tablePath);
try {
testBuilder()
- .sqlQuery("select directory0 from dfs.`%s`", file)
- .unOrdered()
- .baselineColumns("directory0")
- .baselineValues(partitionColumn)
- .go();
- } finally {
- FileUtils.deleteQuietly(new File(testWriter.getFilePath()));
- resetSessionOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
- }
- }
-
- @Test
- public void testSelectAllWithPartitionColumn() throws Exception {
- String file = "avroTable";
- String partitionColumn = "2018";
- AvroTestUtil.AvroTestRecordWriter testWriter =
- generateSimplePrimitiveSchema_NoNullValues(1, FileUtils.getFile(file, partitionColumn).getPath());
- List<Map<String, Object>> expectedRecords = testWriter.getExpectedRecords();
- expectedRecords.get(0).put("`dir0`", partitionColumn);
- try {
- testBuilder()
- .sqlQuery("select * from dfs.`%s`", file)
- .unOrdered()
- .baselineRecords(expectedRecords)
- .go();
- } finally {
- FileUtils.deleteQuietly(new File(testWriter.getFilePath()));
- }
- }
-
- @Test
- public void testAvroTableWithLogicalTypesDecimal() throws Exception {
- ExecTest.mockUtcDateTimeZone();
- LocalDate date = DateUtility.parseLocalDate("2018-02-03");
- LocalTime time = DateUtility.parseLocalTime("19:25:03.0");
- LocalDateTime timestamp = DateUtility.parseLocalDateTime("2018-02-03 19:25:03.0");
-
- // Avro uses joda package
- org.joda.time.DateTime jodaDateTime = org.joda.time.DateTime.parse("2018-02-03T19:25:03");
- BigDecimal bigDecimal = new BigDecimal("123.45");
-
- TestRecordWithLogicalTypes record = new TestRecordWithLogicalTypes(
- true,
- 34,
- 35L,
- 3.14F,
- 3019.34,
- "abc",
- jodaDateTime.toLocalDate(),
- jodaDateTime.toLocalTime(),
- jodaDateTime,
- bigDecimal
- );
-
- File data = write(TestRecordWithLogicalTypes.getClassSchema(), record);
-
- final String query = "select * from dfs.`%s`";
-
- testBuilder()
- .sqlQuery(query, data.getName())
+ .sqlQuery("select directory0 from dfs.`%s`", file)
.unOrdered()
- .baselineColumns("b", "i32", "i64", "f32", "f64", "s", "d", "t", "ts", "dec")
- .baselineValues(true, 34, 35L, 3.14F, 3019.34, "abc", date, time, timestamp, bigDecimal)
+ .baselineColumns("directory0")
+ .baselineValues(partitionColumn)
.go();
- }
-
- @Test
- public void testAvroWithDisabledDecimalType() throws Exception {
- TestRecordWithLogicalTypes record = new TestRecordWithLogicalTypes(
- true,
- 34,
- 35L,
- 3.14F,
- 3019.34,
- "abc",
- org.joda.time.LocalDate.now(),
- org.joda.time.LocalTime.now(),
- org.joda.time.DateTime.now(),
- new BigDecimal("123.45")
- );
-
- File data = write(TestRecordWithLogicalTypes.getClassSchema(), record);
- final String query = String.format("select * from dfs.`%s`", data.getName());
-
- try {
- alterSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, false);
- errorMsgTestHelper(query, ExecErrorConstants.DECIMAL_DISABLE_ERR_MSG);
} finally {
- resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+ client.resetSession(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
+ FileUtils.deleteQuietly(new File(testWriter.getFilePath()));
}
}
@Test
public void testSimpleArraySchema_NoNullValues() throws Exception {
- final String file = generateSimpleArraySchema_NoNullValues().getFileName();
- final String sql = "select a_string, c_string_array[0], e_float_array[2] from dfs.`%s`";
- test(sql, file);
+ String file = dataGenerator.generateSimpleArraySchema_NoNullValues().getFileName();
+ String sql = "select a_string, c_string_array[0] as csa, e_float_array[2] as efa " +
+ "from dfs.`%s` where a_string in ('a_0', 'a_15')";
+
+ testBuilder()
+ .sqlQuery(sql, file)
+ .unOrdered()
+ .baselineColumns("a_string", "csa", "efa")
+ .baselineValues("a_0", "c_string_array_0_0", 0.0F)
+ .baselineValues("a_15", "c_string_array_15_0", 30.0F)
+ .go();
}
@Test
public void testSimpleArraySchema_StarQuery() throws Exception {
- simpleAvroTestHelper(generateSimpleArraySchema_NoNullValues(), "select * from dfs.`%s`");
+ simpleAvroTestHelper(dataGenerator.generateSimpleArraySchema_NoNullValues());
}
@Test
public void testDoubleNestedSchema_NoNullValues_NotAllColumnsProjected() throws Exception {
- final String file = generateDoubleNestedSchema_NoNullValues().getFileName();
- final String sql = "select t.c_record.nested_1_int, t.c_record.nested_1_record.double_nested_1_int from dfs.`%s` t";
- test(sql, file);
+ String file = dataGenerator.generateDoubleNestedSchema_NoNullValues().getFileName();
+ String sql = "select a_string, t.c_record.nested_1_int as ni, " +
+ "t.c_record.nested_1_record.double_nested_1_int as dni from dfs.`%s` t " +
+ "where a_string in ('a_3', 'a_11')";
+
+ testBuilder()
+ .sqlQuery(sql, file)
+ .unOrdered()
+ .baselineColumns("a_string", "ni", "dni")
+ .baselineValues("a_3", 9, 27)
+ .baselineValues("a_11", 121, 1331)
+ .go();
}
@Test
public void testSimpleNestedSchema_NoNullValues() throws Exception {
- final String file = generateSimpleNestedSchema_NoNullValues().getFileName();
- final String sql = "select a_string, b_int, t.c_record.nested_1_string, t.c_record.nested_1_int from dfs.`%s` t";
- test(sql, file);
+ String file = dataGenerator.generateSimpleNestedSchema_NoNullValues().getFileName();
+ String sql = "select a_string, b_int, t.c_record.nested_1_string as ns," +
+ " t.c_record.nested_1_int as ni from dfs.`%s` t where b_int in (6, 19)";
+
+ testBuilder()
+ .sqlQuery(sql, file)
+ .unOrdered()
+ .baselineColumns("a_string", "b_int", "ns", "ni")
+ .baselineValues("a_6", 6, "nested_1_string_6", 36)
+ .baselineValues("a_19", 19, "nested_1_string_19", 361)
+ .go();
}
@Test
public void testSimpleNestedSchema_StarQuery() throws Exception {
- final AvroTestUtil.AvroTestRecordWriter testSetup = generateSimpleNestedSchema_NoNullValues();
- final String file = testSetup.getFileName();
- testBuilder()
- .sqlQuery("select * from dfs.`%s`", file)
- .unOrdered()
- .baselineRecords(testSetup.getExpectedRecords())
- .go();
+ simpleAvroTestHelper(dataGenerator.generateSimpleNestedSchema_NoNullValues());
}
@Test
public void testDoubleNestedSchema_NoNullValues() throws Exception {
- final String file = generateDoubleNestedSchema_NoNullValues().getFileName();
- final String sql = "select a_string, b_int, t.c_record.nested_1_string, t.c_record.nested_1_int, " +
- "t.c_record.nested_1_record.double_nested_1_string, " +
- "t.c_record.nested_1_record.double_nested_1_int " +
- "from dfs.`%s` t";
- test(sql, file);
+ String file = dataGenerator.generateDoubleNestedSchema_NoNullValues().getFileName();
+ String sql = "select a_string, b_int, t.c_record.nested_1_string as ns, t.c_record.nested_1_int as ni, " +
+ "t.c_record.nested_1_record.double_nested_1_string as dns, t.c_record.nested_1_record.double_nested_1_int as dni " +
+ "from dfs.`%s` t where b_int in (2, 14)";
- final String sql2 = "select t.c_record.nested_1_string from dfs.`%s` t limit 1";
- TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql2, file)
- .unOrdered()
- .baselineColumns("EXPR$0");
- for (int i = 0; i < 1; i++) {
- testBuilder.baselineValues("nested_1_string_" + i);
- }
- testBuilder.go();
+ testBuilder()
+ .sqlQuery(sql, file)
+ .unOrdered()
+ .baselineColumns("a_string", "b_int", "ns", "ni", "dns", "dni")
+ .baselineValues("a_2", 2, "nested_1_string_2", 4, "double_nested_1_string_2_2", 8)
+ .baselineValues("a_14", 14, "nested_1_string_14", 196, "double_nested_1_string_14_14", 2744)
+ .go();
}
@Test
public void testDoubleNestedSchema_StarQuery() throws Exception {
- simpleAvroTestHelper(generateDoubleNestedSchema_NoNullValues(), "select * from dfs.`%s`");
- }
-
- private static void simpleAvroTestHelper(AvroTestUtil.AvroTestRecordWriter testSetup, final String sql) throws Exception {
- testBuilder()
- .sqlQuery(sql, testSetup.getFileName())
- .unOrdered()
- .baselineRecords(testSetup.getExpectedRecords())
- .go();
+ simpleAvroTestHelper(dataGenerator.generateDoubleNestedSchema_NoNullValues());
}
@Test
public void testSimpleEnumSchema_NoNullValues() throws Exception {
- final AvroTestUtil.AvroTestRecordWriter testSetup = generateSimpleEnumSchema_NoNullValues();
- final String file = testSetup.getFileName();
- final String sql = "select a_string, b_enum from dfs.`%s`";
- List<String> projectList = Lists.newArrayList("`a_string`", "`b_enum`");
+ AvroDataGenerator.AvroTestRecordWriter testSetup = dataGenerator.generateSimpleEnumSchema_NoNullValues();
+ String file = testSetup.getFileName();
+ String sql = "select a_string, b_enum from dfs.`%s`";
+ List<String> projectList = Arrays.asList("`a_string`", "`b_enum`");
testBuilder()
- .sqlQuery(sql, file)
- .unOrdered()
- .baselineRecords(project(testSetup.getExpectedRecords(), projectList))
- .go();
+ .sqlQuery(sql, file)
+ .unOrdered()
+ .baselineRecords(project(testSetup.getExpectedRecords(), projectList))
+ .go();
}
@Test
public void testSimpleEnumSchema_StarQuery() throws Exception {
- simpleAvroTestHelper(generateSimpleEnumSchema_NoNullValues(), "select * from dfs.`%s`");
+ simpleAvroTestHelper(dataGenerator.generateSimpleEnumSchema_NoNullValues());
}
@Test
public void testSimpleUnionSchema_StarQuery() throws Exception {
- simpleAvroTestHelper(generateUnionSchema_WithNullValues(), "select * from dfs.`%s`");
+ simpleAvroTestHelper(dataGenerator.generateUnionSchema_WithNullValues());
}
@Test
public void testShouldFailSimpleUnionNonNullSchema_StarQuery() throws Exception {
- final String file = generateUnionSchema_WithNonNullValues().getFileName();
- try {
- test("select * from dfs.`%s`", file);
- Assert.fail("Test should fail as union is only supported for optional fields");
- } catch(UserRemoteException e) {
- String message = e.getMessage();
- Assert.assertTrue(message.contains("Avro union type must be of the format : [\"null\", \"some-type\"]"));
- }
+ String file = dataGenerator.generateUnionSchema_WithNonNullValues().getFileName();
+
+ thrown.expect(UserRemoteException.class);
+ thrown.expectMessage("UNSUPPORTED_OPERATION ERROR");
+ run("select * from dfs.`%s`", file);
}
@Test
public void testNestedUnionSchema_withNullValues() throws Exception {
- final String file = generateUnionNestedSchema_withNullValues().getFileName();
- final String sql = "select t.c_record.nested_1_string,t.c_record.nested_1_int from dfs.`%s` t";
- test(sql, file);
+ String file = dataGenerator.generateUnionNestedSchema_withNullValues().getFileName();
+ String sql = "select a_string, t.c_record.nested_1_string as ns, " +
+ "t.c_record.nested_1_int as ni from dfs.`%s` t where a_string in ('a_0', 'a_1')";
+
+ testBuilder()
+ .sqlQuery(sql, file)
+ .unOrdered()
+ .baselineColumns("a_string", "ns", "ni")
+ .baselineValues("a_0", "nested_1_string_0", 0)
+ .baselineValues("a_1", null, null)
+ .go();
}
- // DRILL-4574"></a>
+ // DRILL-4574
@Test
public void testFlattenPrimitiveArray() throws Exception {
- final String file = generateSimpleArraySchema_NoNullValues().getFileName();
- final String sql = "select a_string, flatten(c_string_array) as array_item from dfs.`%s` t";
+ String file = dataGenerator.generateSimpleArraySchema_NoNullValues().getFileName();
+ String sql = "select a_string, flatten(c_string_array) as array_item from dfs.`%s` t";
TestBuilder testBuilder = testBuilder()
.sqlQuery(sql, file)
.unOrdered()
.baselineColumns("a_string", "array_item");
- for (int i = 0; i < RECORD_COUNT; i++) {
-
- for (int j = 0; j < ARRAY_SIZE; j++) {
+ for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
+ for (int j = 0; j < AvroDataGenerator.ARRAY_SIZE; j++) {
testBuilder.baselineValues("a_" + i, "c_string_array_" + i + "_" + j);
}
}
-
testBuilder.go();
}
- private TestBuilder nestedArrayQueryTestBuilder(String file) {
- final String sql = "select rec_nr, array_item['nested_1_int'] as array_item_nested_int from "
- + "(select a_int as rec_nr, flatten(t.b_array) as array_item from dfs.`%s` t) a";
-
- TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql, file)
- .unOrdered()
- .baselineColumns("rec_nr", "array_item_nested_int");
-
- return testBuilder;
- }
-
//DRILL-4574
@Test
public void testFlattenComplexArray() throws Exception {
- final String file = generateNestedArraySchema().getFileName();
+ String file = dataGenerator.generateNestedArraySchema().getFileName();
TestBuilder testBuilder = nestedArrayQueryTestBuilder(file);
- for (int i = 0; i < RECORD_COUNT; i++) {
- for (int j = 0; j < ARRAY_SIZE; j++) {
+ for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
+ for (int j = 0; j < AvroDataGenerator.ARRAY_SIZE; j++) {
testBuilder.baselineValues(i, j);
}
}
testBuilder.go();
-
}
//DRILL-4574
@Test
public void testFlattenEmptyComplexArrayMustYieldNoResults() throws Exception {
- final String file = generateNestedArraySchema(RECORD_COUNT, 0).getFilePath();
+ String file = dataGenerator.generateNestedArraySchema(AvroDataGenerator.RECORD_COUNT, 0).getFilePath();
TestBuilder testBuilder = nestedArrayQueryTestBuilder(file);
testBuilder.expectsEmptyResultSet();
}
@Test
public void testNestedUnionArraySchema_withNullValues() throws Exception {
- final String file = generateUnionNestedArraySchema_withNullValues().getFileName();
- final String sql = "select t.c_array[0].nested_1_string,t.c_array[0].nested_1_int from dfs.`%s` t";
- test(sql, file);
+ String file = dataGenerator.generateUnionNestedArraySchema_withNullValues().getFileName();
+ String sql = "select a_string, t.c_array[0].nested_1_string as ns, " +
+ "t.c_array[0].nested_1_int as ni from dfs.`%s` t where a_string in ('a_2', 'a_3')";
+
+ testBuilder()
+ .sqlQuery(sql, file)
+ .unOrdered()
+ .baselineColumns("a_string", "ns", "ni")
+ .baselineValues("a_2", "nested_1_string_2", 4)
+ .baselineValues("a_3", null, null)
+ .go();
}
@Test
public void testMapSchema_withNullValues() throws Exception {
- final String file = generateMapSchema_withNullValues().getFileName();
- final String sql = "select c_map['key1'],c_map['key2'] from dfs.`%s`";
- test(sql, file);
+ String file = dataGenerator.generateMapSchema_withNullValues().getFileName();
+ String sql = "select a_string, c_map['key1'] as k1, c_map['key2'] as k2 " +
+ "from dfs.`%s` where a_string in ('a_4', 'a_5')";
+
+ testBuilder()
+ .sqlQuery(sql, file)
+ .unOrdered()
+ .baselineColumns("a_string", "k1", "k2")
+ .baselineValues("a_4", "nested_1_string_4", "nested_1_string_5")
+ .baselineValues("a_5", null, null)
+ .go();
}
@Test
public void testMapSchemaComplex_withNullValues() throws Exception {
- final String file = generateMapSchemaComplex_withNullValues().getFileName();
- final String sql = "select d_map['key1'] nested_key1, d_map['key2'] nested_key2 from dfs.`%s`";
+ String file = dataGenerator.generateMapSchemaComplex_withNullValues().getFileName();
+ String sql = "select d_map['key1'] nested_key1, d_map['key2'] nested_key2 from dfs.`%s`";
TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql, file)
- .unOrdered()
- .baselineColumns("nested_key1", "nested_key2");
+ .sqlQuery(sql, file)
+ .unOrdered()
+ .baselineColumns("nested_key1", "nested_key2");
- final List<Object> expectedList = Lists.newArrayList();
- for (int i = 0; i < ARRAY_SIZE; i++) {
- expectedList.add((double)i);
+ List<Object> expectedList = new ArrayList<>();
+ for (int i = 0; i < AvroDataGenerator.ARRAY_SIZE; i++) {
+ expectedList.add((double) i);
}
- final List<Object> emptyList = listOf();
- for (int i = 0; i < RECORD_COUNT; i += 2) {
+ List<Object> emptyList = listOf();
+ for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i += 2) {
testBuilder.baselineValues(expectedList, expectedList);
testBuilder.baselineValues(emptyList, emptyList);
}
@@ -584,26 +395,39 @@
@Test
public void testStringAndUtf8Data() throws Exception {
- simpleAvroTestHelper(generateStringAndUtf8Data(), "select * from dfs.`%s`");
+ simpleAvroTestHelper(dataGenerator.generateStringAndUtf8Data());
}
@Test
public void testLinkedList() throws Exception {
- final String file = generateLinkedList();
- final String sql = "select * from dfs.`%s`";
- test(sql, file);
+ int numRows = 5;
+ String file = dataGenerator.generateLinkedList(numRows);
+
+ TestBuilder testBuilder = testBuilder()
+ .sqlQuery("select * from dfs.`%s` t", file)
+ .unOrdered()
+ .baselineColumns("value", "next");
+
+ for (long i = 0; i < numRows; i++) {
+ if (i == numRows - 1) { // last row
+ testBuilder.baselineValues(i, mapOf("next", new JsonStringHashMap<>()));
+ continue;
+ }
+ testBuilder.baselineValues(i, mapOf("value", i + 1, "next", new JsonStringHashMap<>()));
+ }
+ testBuilder.go();
}
@Test
public void testCountStar() throws Exception {
- final String file = generateStringAndUtf8Data().getFileName();
- final String sql = "select count(*) as row_count from dfs.`%s`";
+ String file = dataGenerator.generateStringAndUtf8Data().getFileName();
+ String sql = "select count(*) as row_count from dfs.`%s`";
testBuilder()
- .sqlQuery(sql, file)
- .ordered()
- .baselineColumns("row_count")
- .baselineValues((long) RECORD_COUNT)
- .go();
+ .sqlQuery(sql, file)
+ .ordered()
+ .baselineColumns("row_count")
+ .baselineValues((long) AvroDataGenerator.RECORD_COUNT)
+ .go();
}
@Test
@@ -611,11 +435,11 @@
String sql = "select map_field from dfs.`%s`";
TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql, mapTableName)
- .unOrdered()
- .baselineColumns("map_field");
+ .sqlQuery(sql, mapTableName)
+ .unOrdered()
+ .baselineColumns("map_field");
- for (long i = 0; i < RECORD_COUNT; i++) {
+ for (long i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
testBuilder.baselineValues(mapOfObject("key1", i, "key2", i + 1));
}
testBuilder.go();
@@ -623,14 +447,14 @@
@Test
public void testMapSchemaGetByKey() throws Exception {
- String sql = "select map_field['key1'] val1, map_field['key2'] val2 from dfs.`%s`";
+ String sql = "select map_field['key1'] val1, map_field['key2'] val2 from dfs.`%s`";
TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql, mapTableName)
- .unOrdered()
- .baselineColumns("val1", "val2");
+ .sqlQuery(sql, mapTableName)
+ .unOrdered()
+ .baselineColumns("val1", "val2");
- for (long i = 0; i < RECORD_COUNT; i++) {
+ for (long i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
testBuilder.baselineValues(i, i + 1);
}
testBuilder.go();
@@ -641,12 +465,12 @@
String sql = "select map_field['notExists'] as map_field from dfs.`%s`";
TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql, mapTableName)
- .unOrdered()
- .baselineColumns("map_field");
+ .sqlQuery(sql, mapTableName)
+ .unOrdered()
+ .baselineColumns("map_field");
Object[] nullValue = new Object[] {null};
- for (long i = 0; i < RECORD_COUNT; i++) {
+ for (long i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
testBuilder.baselineValues(nullValue);
}
testBuilder.go();
@@ -657,11 +481,11 @@
String sql = "select t.map_field.key1 val1, t.map_field.key2 val2 from dfs.`%s` t";
TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql, mapTableName)
- .unOrdered()
- .baselineColumns("val1", "val2");
+ .sqlQuery(sql, mapTableName)
+ .unOrdered()
+ .baselineColumns("val1", "val2");
- for (long i = 0; i < RECORD_COUNT; i++) {
+ for (long i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
testBuilder.baselineValues(i, i + 1);
}
testBuilder.go();
@@ -672,14 +496,14 @@
String sql = "select map_array from dfs.`%s`";
TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql, mapTableName)
- .unOrdered()
- .baselineColumns("map_array");
+ .sqlQuery(sql, mapTableName)
+ .unOrdered()
+ .baselineColumns("map_array");
- for (int i = 0; i < RECORD_COUNT; i++) {
+ for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
List<Object> array = listOf();
- for (int j = 0; j < ARRAY_SIZE; j++) {
+ for (int j = 0; j < AvroDataGenerator.ARRAY_SIZE; j++) {
array.add(mapOfObject(
"key1", (i + 1) * (j + 50),
"key2", (i + 1) * (j + 100)
@@ -696,11 +520,11 @@
String sql = "select map_array[%d] element from dfs.`%s`";
TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql, elementIndex, mapTableName)
- .unOrdered()
- .baselineColumns("element");
+ .sqlQuery(sql, elementIndex, mapTableName)
+ .unOrdered()
+ .baselineColumns("element");
- for (int i = 0; i < RECORD_COUNT; i++) {
+ for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
testBuilder.baselineValues(mapOfObject(
"key1", (i + 1) * (elementIndex + 50),
"key2", (i + 1) * (elementIndex + 100)
@@ -715,11 +539,11 @@
String sql = "select map_array[%d]['key2'] val from dfs.`%s`";
TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql, elementIndex, mapTableName)
- .unOrdered()
- .baselineColumns("val");
+ .sqlQuery(sql, elementIndex, mapTableName)
+ .unOrdered()
+ .baselineColumns("val");
- for (int i = 0; i < RECORD_COUNT; i++) {
+ for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
testBuilder.baselineValues((i + 1) * (elementIndex + 100));
}
testBuilder.go();
@@ -730,13 +554,13 @@
String sql = "select map_array_value from dfs.`%s`";
TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql, mapTableName)
- .unOrdered()
- .baselineColumns("map_array_value");
+ .sqlQuery(sql, mapTableName)
+ .unOrdered()
+ .baselineColumns("map_array_value");
- for (int i = 0; i < RECORD_COUNT; i++) {
+ for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
List<Object> doubleArray = listOf();
- for (double j = 0; j < ARRAY_SIZE; j++) {
+ for (double j = 0; j < AvroDataGenerator.ARRAY_SIZE; j++) {
doubleArray.add((double) (i + 1) * j);
}
testBuilder.baselineValues(mapOfObject("key1", doubleArray, "key2", doubleArray));
@@ -750,13 +574,13 @@
String sql = "select map_array_value['key1'] element from dfs.`%s`";
TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql, generateMapSchema().getFileName())
- .unOrdered()
- .baselineColumns("element");
+ .sqlQuery(sql, dataGenerator.generateMapSchema().getFileName())
+ .unOrdered()
+ .baselineColumns("element");
- for (int i = 0; i < RECORD_COUNT; i++) {
+ for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
List<Object> doubleArray = listOf();
- for (double j = 0; j < ARRAY_SIZE; j++) {
+ for (double j = 0; j < AvroDataGenerator.ARRAY_SIZE; j++) {
doubleArray.add((double) (i + 1) * j);
}
testBuilder.baselineValues(doubleArray);
@@ -770,11 +594,11 @@
String sql = "select map_array_value['key1'][3] element from dfs.`%s`";
TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql, generateMapSchema().getFileName())
- .unOrdered()
- .baselineColumns("element");
+ .sqlQuery(sql, mapTableName)
+ .unOrdered()
+ .baselineColumns("element");
- for (int i = 0; i < RECORD_COUNT; i++) {
+ for (int i = 0; i < AvroDataGenerator.RECORD_COUNT; i++) {
double val = (double) (i + 1) * 3;
testBuilder.baselineValues(val);
}
@@ -786,11 +610,11 @@
public void testMapSchemaValueInFilter() throws Exception {
String sql = "select map_field['key1'] val from dfs.`%s` where map_field['key1'] < %d";
- long filterValue = RECORD_COUNT / 10;
+ long filterValue = AvroDataGenerator.RECORD_COUNT / 10;
TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql, mapTableName, filterValue)
- .unOrdered()
- .baselineColumns("val");
+ .sqlQuery(sql, mapTableName, filterValue)
+ .unOrdered()
+ .baselineColumns("val");
for (long i = 0; i < filterValue; i++) {
testBuilder.baselineValues(i);
@@ -803,16 +627,134 @@
String sql = "select map_array[%d]['key2'] val from dfs.`%s` where map_array[%d]['key2'] > %d";
int elementIndex = 1;
- int startRecord = 5001;
- int filterValue = 5002 * (elementIndex + 100);
+ int startRecord = 30;
+ int filterValue = startRecord * (elementIndex + 100);
TestBuilder testBuilder = testBuilder()
- .sqlQuery(sql, elementIndex, mapTableName, elementIndex, filterValue)
- .unOrdered()
- .baselineColumns("val");
+ .sqlQuery(sql, elementIndex, mapTableName, elementIndex, filterValue)
+ .unOrdered()
+ .baselineColumns("val");
- for (int i = startRecord + 1; i < RECORD_COUNT; i++) {
+ for (int i = startRecord; i < AvroDataGenerator.RECORD_COUNT; i++) {
testBuilder.baselineValues((i + 1) * (elementIndex + 100));
}
testBuilder.go();
}
+
+ @Test
+ public void testDecimal() throws Exception {
+ int numRows = 5;
+ String fileName = dataGenerator.generateDecimalData(numRows);
+
+ TestBuilder testBuilder = testBuilder()
+ .sqlQuery("select * from dfs.`%s`", fileName)
+ .unOrdered()
+ .baselineColumns("col_dec_pos_bytes", "col_dec_neg_bytes", "col_dec_pos_fixed", "col_dec_neg_fixed");
+
+ for (int i = 0; i < numRows; i++) {
+ testBuilder.baselineValues(
+ new BigDecimal(BigInteger.valueOf(100 + i), 2),
+ new BigDecimal(BigInteger.valueOf(-200 + i), 2),
+ new BigDecimal(BigInteger.valueOf(300 + i), 2),
+ new BigDecimal(BigInteger.valueOf(-400 + i), 2));
+ }
+ testBuilder.go();
+ }
+
+ @Test
+ public void testDateTime() throws Exception {
+ LocalDateTime dateTime = LocalDateTime.now(ZoneId.of("UTC")).withNano(0);
+ LocalDate localDate = dateTime.toLocalDate();
+ LocalTime localTime = dateTime.toLocalTime();
+
+ String fileName = dataGenerator.generateDateTimeData(dateTime);
+
+ testBuilder()
+ .sqlQuery("select * from dfs.`%s`", fileName)
+ .unOrdered()
+ .baselineColumns("col_timestamp_millis", "col_timestamp_micros", "col_date", "col_time_millis", "col_time_micros")
+ .baselineValues(dateTime, dateTime, localDate, localTime, localTime)
+ .go();
+ }
+
+ @Test
+ public void testDuration() throws Exception {
+ int numRows = 5;
+ String fileName = dataGenerator.generateDuration(numRows);
+
+ TestBuilder testBuilder = testBuilder()
+ .sqlQuery("select * from dfs.`%s`", fileName)
+ .unOrdered()
+ .baselineColumns("col_duration");
+
+ for (int i = 0; i < numRows; i++) {
+ testBuilder.baselineValues(Period.months(10 + i).withDays(100 + i).withMillis(1000 + i));
+ }
+ testBuilder.go();
+ }
+
+ @Test
+ public void testMultiDimensionalArray() throws Exception {
+ int numRecords = 5;
+ int arraySize = 3;
+ String fileName = dataGenerator.generateMultiDimensionalArray(numRecords, arraySize);
+
+ TestBuilder testBuilder = testBuilder()
+ .sqlQuery("select * from dfs.`%s`", fileName)
+ .unOrdered()
+ .baselineColumns("col_array_two_dims");
+
+ for (int i = 0; i < numRecords; i++) {
+ JsonStringArrayList<Object> nestedArray = new JsonStringArrayList<>();
+ for (int a = 0; a < arraySize; a++) {
+ nestedArray.add(listOf(String.format("val_%s_%s_0", i, a), String.format("val_%s_%s_1", i, a)));
+ }
+ testBuilder.baselineValues(nestedArray);
+ }
+ testBuilder.go();
+ }
+
+ @Test
+ public void testWithProvidedSchema() throws Exception {
+ testBuilder()
+ .sqlQuery("select * from " +
+ "table(dfs.`%s`(schema=>'inline=(col_i int not null default `15`, a_string varchar)')) " +
+ "where a_string = 'a_0'",
+ dataGenerator.generateStringAndUtf8Data().getFileName())
+ .unOrdered()
+ .baselineColumns("col_i", "a_string", "b_utf8")
+ .baselineValues(15, "a_0", "b_0")
+ .go();
+ }
+
+ private void simpleAvroTestHelper(AvroDataGenerator.AvroTestRecordWriter testSetup) throws Exception {
+ testBuilder()
+ .sqlQuery("select * from dfs.`%s`", testSetup.getFileName())
+ .unOrdered()
+ .baselineRecords(testSetup.getExpectedRecords())
+ .go();
+ }
+
+ private List<Map<String, Object>> project(List<Map<String,Object>> incomingRecords, List<String> projectCols) {
+ List<Map<String,Object>> output = new ArrayList<>();
+ for (Map<String, Object> incomingRecord : incomingRecords) {
+ JsonStringHashMap<String, Object> newRecord = new JsonStringHashMap<>();
+ for (String s : incomingRecord.keySet()) {
+ if (projectCols.contains(s)) {
+ newRecord.put(s, incomingRecord.get(s));
+ }
+ }
+ output.add(newRecord);
+ }
+ return output;
+ }
+
+ private TestBuilder nestedArrayQueryTestBuilder(String file) {
+ String sql = "select rec_nr, array_item['nested_1_int'] as array_item_nested_int from "
+ + "(select a_int as rec_nr, flatten(t.b_array) as array_item from dfs.`%s` t) a";
+
+ return testBuilder()
+ .sqlQuery(sql, file)
+ .unOrdered()
+ .baselineColumns("rec_nr", "array_item_nested_int");
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroSchemaUtilTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroSchemaUtilTest.java
new file mode 100644
index 0000000..1e548fd
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroSchemaUtilTest.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.avro;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.BaseTest;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AvroSchemaUtilTest extends BaseTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testExtractSchemaFromNullableNotUnion() {
+ Schema schema = SchemaBuilder.builder().stringType();
+
+ thrown.expect(UserException.class);
+ thrown.expectMessage("VALIDATION ERROR");
+
+ AvroSchemaUtil.extractSchemaFromNullable(schema, "s");
+ }
+
+ @Test
+ public void testExtractSchemaFromNullableComplexUnion() {
+ Schema schema = SchemaBuilder.unionOf()
+ .doubleType().and()
+ .longType().and()
+ .nullType()
+ .endUnion();
+
+ thrown.expect(UserException.class);
+ thrown.expectMessage("UNSUPPORTED_OPERATION ERROR");
+
+ AvroSchemaUtil.extractSchemaFromNullable(schema, "u");
+ }
+
+ @Test
+ public void testExtractSchemaFromNullable() {
+ Schema schema = SchemaBuilder.builder().nullable().stringType();
+ Schema actual = AvroSchemaUtil.extractSchemaFromNullable(schema, "s");
+
+ assertEquals(SchemaBuilder.builder().stringType(), actual);
+ }
+
+ @Test
+ public void testConvertSimpleTypes() {
+ Schema schema = SchemaBuilder.record("rec")
+ .fields()
+ .requiredString("col_string")
+ .requiredBytes("col_bytes")
+ .requiredBoolean("col_boolean")
+ .requiredInt("col_int")
+ .requiredLong("col_long")
+ .requiredFloat("col_float")
+ .requiredDouble("col_double")
+ .optionalString("col_opt_string")
+ .optionalBytes("col_opt_bytes")
+ .optionalBoolean("col_opt_boolean")
+ .optionalInt("col_opt_int")
+ .optionalLong("col_opt_long")
+ .optionalFloat("col_opt_float")
+ .optionalDouble("col_opt_double")
+ .endRecord();
+
+ TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+ .add("col_string", TypeProtos.MinorType.VARCHAR)
+ .add("col_bytes", TypeProtos.MinorType.VARBINARY)
+ .add("col_boolean", TypeProtos.MinorType.BIT)
+ .add("col_int", TypeProtos.MinorType.INT)
+ .add("col_long", TypeProtos.MinorType.BIGINT)
+ .add("col_float", TypeProtos.MinorType.FLOAT4)
+ .add("col_double", TypeProtos.MinorType.FLOAT8)
+ .addNullable("col_opt_string", TypeProtos.MinorType.VARCHAR)
+ .addNullable("col_opt_bytes", TypeProtos.MinorType.VARBINARY)
+ .addNullable("col_opt_boolean", TypeProtos.MinorType.BIT)
+ .addNullable("col_opt_int", TypeProtos.MinorType.INT)
+ .addNullable("col_opt_long", TypeProtos.MinorType.BIGINT)
+ .addNullable("col_opt_float", TypeProtos.MinorType.FLOAT4)
+ .addNullable("col_opt_double", TypeProtos.MinorType.FLOAT8)
+ .buildSchema();
+
+ assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+ }
+
+ @Test
+ public void testConvertDecimal() {
+ Schema decBytes = LogicalTypes.decimal(10, 2)
+ .addToSchema(SchemaBuilder.builder().bytesType());
+
+ Schema decFixed = LogicalTypes.decimal(5, 2)
+ .addToSchema(SchemaBuilder.builder().fixed("dec_fixed").size(5));
+
+ Schema schema = SchemaBuilder.record("rec")
+ .fields()
+ .name("col_dec_bytes").type(decBytes).noDefault()
+ .name("col_dec_fixed").type(decFixed).noDefault()
+ .endRecord();
+
+ TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+ .add("col_dec_bytes", TypeProtos.MinorType.VARDECIMAL, 10, 2)
+ .add("col_dec_fixed", TypeProtos.MinorType.VARDECIMAL, 5, 2)
+ .buildSchema();
+
+ assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+ }
+
+ @Test
+ public void testConvertDateTime() {
+ Schema timestampMillis = LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType());
+ Schema timestampMicros = LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
+ Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+ Schema timeMillis = LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
+ Schema timeMicros = LogicalTypes.timeMicros().addToSchema(SchemaBuilder.builder().longType());
+ Schema duration = new LogicalType("duration")
+ .addToSchema(SchemaBuilder.builder().fixed("duration_fixed").size(12));
+
+ Schema schema = SchemaBuilder.record("rec")
+ .fields()
+ .name("col_timestamp_millis").type(timestampMillis).noDefault()
+ .name("col_timestamp_micros").type(timestampMicros).noDefault()
+ .name("col_date").type(date).noDefault()
+ .name("col_time_millis").type(timeMillis).noDefault()
+ .name("col_time_micros").type(timeMicros).noDefault()
+ .name("col_duration").type(duration).noDefault()
+ .endRecord();
+
+ TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+ .add("col_timestamp_millis", TypeProtos.MinorType.TIMESTAMP)
+ .add("col_timestamp_micros", TypeProtos.MinorType.TIMESTAMP)
+ .add("col_date", TypeProtos.MinorType.DATE)
+ .add("col_time_millis", TypeProtos.MinorType.TIME)
+ .add("col_time_micros", TypeProtos.MinorType.TIME)
+ .add("col_duration", TypeProtos.MinorType.INTERVAL)
+ .buildSchema();
+
+ assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+ }
+
+ @Test
+ public void testConvertNullType() {
+ Schema schema = SchemaBuilder.record("rec")
+ .fields()
+ .name("col_null").type().nullType().noDefault()
+ .endRecord();
+
+ TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+ .addNullable("col_null", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+ }
+
+ @Test
+ public void testConvertEnum() {
+ Schema schema = SchemaBuilder.record("rec")
+ .fields()
+ .name("col_enum").type().enumeration("letters").symbols("A", "B", "C").noDefault()
+ .endRecord();
+
+ TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+ .add("col_enum", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+
+ assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+ }
+
+ @Test
+ public void testConvertFixed() {
+ Schema schema = SchemaBuilder.record("rec")
+ .fields()
+ .name("col_fixed").type().fixed("md5").size(16).noDefault()
+ .endRecord();
+
+ TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+ .add("col_fixed", TypeProtos.MinorType.VARBINARY)
+ .buildSchema();
+
+ assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+ }
+
+ @Test
+ public void testConvertArray() {
+ Schema schema = SchemaBuilder.record("rec")
+ .fields()
+ .name("col_array").type().array().items().booleanType().noDefault()
+ .name("col_opt_array").type().array().items().nullable().longType().noDefault()
+ .name("col_nested_array").type().array().items()
+ .array().items()
+ .stringType()
+ .noDefault()
+ .name("col_array_map").type().array().items()
+ .record("arr_rec")
+ .fields()
+ .optionalString("s")
+ .requiredLong("i")
+ .endRecord()
+ .noDefault()
+ .name("col_array_dict").type().array().items()
+ .map().values().intType().noDefault()
+ .endRecord();
+
+ TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+ .addArray("col_array", TypeProtos.MinorType.BIT)
+ .addArray("col_opt_array", TypeProtos.MinorType.BIGINT)
+ .addArray("col_nested_array", TypeProtos.MinorType.VARCHAR, 2)
+ .addMapArray("col_array_map")
+ .addNullable("s", TypeProtos.MinorType.VARCHAR)
+ .add("i", TypeProtos.MinorType.BIGINT)
+ .resumeSchema()
+ .addDictArray("col_array_dict", TypeProtos.MinorType.VARCHAR)
+ .value(TypeProtos.MinorType.INT)
+ .resumeSchema()
+ .buildSchema();
+
+ assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+ }
+
+ @Test
+ public void testConvertMap() {
+ Schema schema = SchemaBuilder.record("rec")
+ .fields()
+ .name("col_map_simple").type().record("map_simple_rec")
+ .fields()
+ .optionalInt("i")
+ .requiredString("s")
+ .endRecord()
+ .noDefault()
+ .name("col_map_complex").type().record("map_complex_rec")
+ .fields()
+ .name("col_nested_map").type().record("map_nested_rec")
+ .fields()
+ .optionalBoolean("nest_b")
+ .requiredDouble("nest_d")
+ .endRecord()
+ .noDefault()
+ .name("col_nested_dict").type().map().values().stringType().noDefault()
+ .name("col_nested_array").type().array().items().booleanType().noDefault()
+ .endRecord()
+ .noDefault()
+ .endRecord();
+
+ TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+ .addMap("col_map_simple")
+ .addNullable("i", TypeProtos.MinorType.INT)
+ .add("s", TypeProtos.MinorType.VARCHAR)
+ .resumeSchema()
+ .addMap("col_map_complex")
+ .addMap("col_nested_map")
+ .addNullable("nest_b", TypeProtos.MinorType.BIT)
+ .add("nest_d", TypeProtos.MinorType.FLOAT8)
+ .resumeMap()
+ .addDict("col_nested_dict", TypeProtos.MinorType.VARCHAR)
+ .value(TypeProtos.MinorType.VARCHAR)
+ .resumeMap()
+ .addArray("col_nested_array", TypeProtos.MinorType.BIT)
+ .resumeSchema()
+ .buildSchema();
+
+ assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+ }
+
+ @Test
+ public void testConvertDict() {
+ Schema schema = SchemaBuilder.record("rec")
+ .fields()
+ .name("col_dict").type().map().values().stringType().noDefault()
+ .name("col_opt_dict").type().map().values().nullable().intType().noDefault()
+ .name("col_dict_val_array").type().map().values().array().items().stringType().noDefault()
+ .name("col_dict_val_dict").type().map().values().map().values().intType().noDefault()
+ .name("col_dict_val_map").type().map().values()
+ .record("dict_val")
+ .fields()
+ .optionalInt("i")
+ .requiredString("s")
+ .endRecord().noDefault()
+ .endRecord();
+
+ TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+ .addDict("col_dict", TypeProtos.MinorType.VARCHAR).value(TypeProtos.MinorType.VARCHAR).resumeSchema()
+ .addDict("col_opt_dict", TypeProtos.MinorType.VARCHAR).nullableValue(TypeProtos.MinorType.INT).resumeSchema()
+ .addDict("col_dict_val_array", TypeProtos.MinorType.VARCHAR)
+ .repeatedValue(TypeProtos.MinorType.VARCHAR).resumeSchema()
+ .addDict("col_dict_val_dict", TypeProtos.MinorType.VARCHAR)
+ .dictValue()
+ .key(TypeProtos.MinorType.VARCHAR)
+ .value(TypeProtos.MinorType.INT)
+ .resumeDict()
+ .resumeSchema()
+ .addDict("col_dict_val_map", TypeProtos.MinorType.VARCHAR)
+ .mapValue()
+ .addNullable("i", TypeProtos.MinorType.INT)
+ .add("s", TypeProtos.MinorType.VARCHAR)
+ .resumeDict()
+ .resumeSchema()
+ .buildSchema();
+
+ assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+ }
+
+ @Test
+ public void testConvertComplexUnion() {
+ Schema schema = SchemaBuilder.record("rec")
+ .fields()
+ .optionalString("s")
+ .name("u").type().unionOf()
+ .stringType().and().longType().and().nullType().endUnion().noDefault()
+ .endRecord();
+
+ thrown.expect(UserException.class);
+ thrown.expectMessage("UNSUPPORTED_OPERATION ERROR");
+
+ AvroSchemaUtil.convert(schema);
+ }
+
+ @Test
+ public void testConvertWithNamedTypes() {
+ Schema schema = SchemaBuilder.record("MixedList")
+ .fields()
+ .name("rec_l").type().record("LongList")
+ .fields()
+ .requiredLong("l")
+ .name("list_l_1").type("LongList").noDefault()
+ .name("list_l_2").type("LongList").noDefault()
+ .endRecord()
+ .noDefault()
+ .name("rec_s").type().record("StringList")
+ .fields()
+ .requiredString("s")
+ .name("list_s_1").type("StringList").noDefault()
+ .name("list_s_2").type("StringList").noDefault()
+ .endRecord()
+ .noDefault()
+ .name("rec_m").type().record("rec_m")
+ .fields()
+ .name("list_l").type("LongList").noDefault()
+ .name("list_s").type("StringList").noDefault()
+ .name("a").type().array().items().type("MixedList").noDefault()
+ .endRecord()
+ .noDefault()
+ .name("mixed").type("MixedList").noDefault()
+ .endRecord();
+
+ TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+ .addMap("rec_l")
+ .add("l", TypeProtos.MinorType.BIGINT)
+ .addMap("list_l_1")
+ .resumeMap()
+ .addMap("list_l_2")
+ .resumeMap()
+ .resumeSchema()
+ .addMap("rec_s")
+ .add("s", TypeProtos.MinorType.VARCHAR)
+ .addMap("list_s_1")
+ .resumeMap()
+ .addMap("list_s_2")
+ .resumeMap()
+ .resumeSchema()
+ .addMap("rec_m")
+ .addMap("list_l")
+ .add("l", TypeProtos.MinorType.BIGINT)
+ .addMap("list_l_1")
+ .resumeMap()
+ .addMap("list_l_2")
+ .resumeMap()
+ .resumeMap()
+ .addMap("list_s")
+ .add("s", TypeProtos.MinorType.VARCHAR)
+ .addMap("list_s_1")
+ .resumeMap()
+ .addMap("list_s_2")
+ .resumeMap()
+ .resumeMap()
+ .addMapArray("a")
+ .resumeMap()
+ .resumeSchema()
+ .addMap("mixed")
+ .resumeSchema()
+ .build();
+
+ assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+ }
+
+ @Test
+ public void testConvertWithNamespaces() {
+ Schema schema = SchemaBuilder.record("rec").namespace("n1")
+ .fields()
+ .requiredString("s")
+ .name("m").type().record("rec").namespace("n2")
+ .fields()
+ .requiredLong("l")
+ .endRecord()
+ .noDefault()
+ .endRecord();
+
+ TupleMetadata tupleMetadata = new org.apache.drill.exec.record.metadata.SchemaBuilder()
+ .add("s", TypeProtos.MinorType.VARCHAR)
+ .addMap("m")
+ .add("l", TypeProtos.MinorType.BIGINT)
+ .resumeSchema()
+ .buildSchema();
+
+ assertTrue(tupleMetadata.isEquivalent(AvroSchemaUtil.convert(schema)));
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
deleted file mode 100644
index 6f3d19d..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java
+++ /dev/null
@@ -1,798 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.avro;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.drill.exec.util.JsonStringArrayList;
-import org.apache.drill.exec.util.JsonStringHashMap;
-import org.apache.drill.exec.util.Text;
-import org.apache.drill.test.BaseTestQuery;
-
-/**
- * Utilities for generating Avro test data.
- */
-public class AvroTestUtil {
-
- public static final int RECORD_COUNT = 10_000;
- public static int ARRAY_SIZE = 20;
-
- /**
- * Class to write records to an Avro file while simultaneously
- * constructing a corresponding list of records in the format taken in
- * by the Drill test builder to describe expected results.
- */
- public static class AvroTestRecordWriter implements Closeable {
- private final List<Map<String, Object>> expectedRecords;
- GenericData.Record currentAvroRecord;
- TreeMap<String, Object> currentExpectedRecord;
- private Schema schema;
- private final DataFileWriter<GenericData.Record> writer;
- private final String filePath;
- private final String fileName;
-
- private AvroTestRecordWriter(Schema schema, File file) {
- writer = new DataFileWriter<GenericData.Record>(new GenericDatumWriter<GenericData.Record>(schema));
- try {
- writer.create(schema, file);
- } catch (IOException e) {
- throw new RuntimeException("Error creating file in Avro test setup.", e);
- }
- this.schema = schema;
- currentExpectedRecord = new TreeMap<>();
- expectedRecords = new ArrayList<>();
- filePath = file.getAbsolutePath();
- fileName = file.getName();
- }
-
- public void startRecord() {
- currentAvroRecord = new GenericData.Record(schema);
- currentExpectedRecord = new TreeMap<>();
- }
-
- public void put(String key, Object value) {
- currentAvroRecord.put(key, value);
- // convert binary values into byte[], the format they will be given
- // in the Drill result set in the test framework
- currentExpectedRecord.put("`" + key + "`", convertAvroValToDrill(value, true));
- }
-
- // TODO - fix this the test wrapper to prevent the need for this hack
- // to make the root behave differently than nested fields for String vs. Text
- private Object convertAvroValToDrill(Object value, boolean root) {
- if (value instanceof ByteBuffer) {
- ByteBuffer bb = ((ByteBuffer)value);
- byte[] drillVal = new byte[((ByteBuffer)value).remaining()];
- bb.get(drillVal);
- bb.position(0);
- value = drillVal;
- } else if (!root && value instanceof CharSequence) {
- value = new Text(value.toString());
- } else if (value instanceof GenericData.Array) {
- GenericData.Array array = ((GenericData.Array) value);
- final JsonStringArrayList<Object> drillList = new JsonStringArrayList<>();
- for (Object o : array) {
- drillList.add(convertAvroValToDrill(o, false));
- }
- value = drillList;
- } else if (value instanceof GenericData.EnumSymbol) {
- value = value.toString();
- } else if (value instanceof GenericData.Record) {
- GenericData.Record rec = ((GenericData.Record) value);
- final JsonStringHashMap<String, Object> newRecord = new JsonStringHashMap<>();
- for (Schema.Field field : rec.getSchema().getFields()) {
- Object val = rec.get(field.name());
- newRecord.put(field.name(), convertAvroValToDrill(val, false));
- }
- value = newRecord;
- }
- return value;
- }
-
- public void endRecord() throws IOException {
- writer.append(currentAvroRecord);
- expectedRecords.add(currentExpectedRecord);
- }
-
- @Override
- public void close() throws IOException {
- writer.close();
- }
-
- public String getFilePath() {
- return filePath;
- }
-
- public String getFileName() {
- return fileName;
- }
-
- public List<Map<String, Object>>getExpectedRecords() {
- return expectedRecords;
- }
- }
-
-
- public static AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues() throws Exception {
- return generateSimplePrimitiveSchema_NoNullValues(RECORD_COUNT);
- }
-
- public static AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues(int numRecords) throws Exception {
- return generateSimplePrimitiveSchema_NoNullValues(numRecords, "");
- }
-
- /**
- * Generates Avro table with specified rows number in specified path.
- *
- * @param numRecords rows number in the table
- * @param tablePath table path
- * @return AvroTestRecordWriter instance
- */
- public static AvroTestRecordWriter generateSimplePrimitiveSchema_NoNullValues(int numRecords, String tablePath)
- throws Exception {
- final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
- .namespace("org.apache.drill.exec.store.avro")
- .fields()
- .name("a_string").type().stringType().noDefault()
- .name("b_int").type().intType().noDefault()
- .name("c_long").type().longType().noDefault()
- .name("d_float").type().floatType().noDefault()
- .name("e_double").type().doubleType().noDefault()
- .name("f_bytes").type().bytesType().noDefault()
- .name("g_null").type().nullType().noDefault()
- .name("h_boolean").type().booleanType().noDefault()
- .endRecord();
-
- final File file = File.createTempFile("avro-primitive-test", ".avro",
- BaseTestQuery.dirTestWatcher.makeRootSubDir(Paths.get(tablePath)));
- final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-
- try {
- ByteBuffer bb = ByteBuffer.allocate(2);
- bb.put(0, (byte) 'a');
-
- for (int i = 0; i < numRecords; i++) {
- bb.put(1, (byte) ('0' + (i % 10)));
- bb.position(0);
- record.startRecord();
- record.put("a_string", "a_" + i);
- record.put("b_int", i);
- record.put("c_long", (long) i);
- record.put("d_float", (float) i);
- record.put("e_double", (double) i);
- record.put("f_bytes", bb);
- record.put("g_null", null);
- record.put("h_boolean", (i % 2 == 0));
- record.endRecord();
- }
- } finally {
- record.close();
- }
-
- return record;
- }
-
- public static AvroTestRecordWriter generateUnionSchema_WithNullValues() throws Exception {
- final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
- .namespace("org.apache.drill.exec.store.avro")
- .fields()
- .name("a_string").type().stringType().noDefault()
- .name("b_int").type().intType().noDefault()
- .name("c_long").type().longType().noDefault()
- .name("d_float").type().floatType().noDefault()
- .name("e_double").type().doubleType().noDefault()
- .name("f_bytes").type().bytesType().noDefault()
- .name("g_null").type().nullType().noDefault()
- .name("h_boolean").type().booleanType().noDefault()
- .name("i_union").type().optional().doubleType()
- .endRecord();
-
- final File file = File.createTempFile("avro-primitive-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
- final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
-
- try {
- ByteBuffer bb = ByteBuffer.allocate(1);
- bb.put(0, (byte) 1);
-
- for (int i = 0; i < RECORD_COUNT; i++) {
- record.startRecord();
- record.put("a_string", "a_" + i);
- record.put("b_int", i);
- record.put("c_long", (long) i);
- record.put("d_float", (float) i);
- record.put("e_double", (double) i);
- record.put("f_bytes", bb);
- record.put("g_null", null);
- record.put("h_boolean", (i % 2 == 0));
- record.put("i_union", (i % 2 == 0 ? (double) i : null));
- record.endRecord();
- }
- } finally {
- record.close();
- }
-
- return record;
- }
-
- public static AvroTestRecordWriter generateUnionSchema_WithNonNullValues() throws Exception {
- final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
- .namespace("org.apache.drill.exec.store.avro")
- .fields()
- .name("a_string").type().stringType().noDefault()
- .name("b_int").type().intType().noDefault()
- .name("c_long").type().longType().noDefault()
- .name("d_float").type().floatType().noDefault()
- .name("e_double").type().doubleType().noDefault()
- .name("f_bytes").type().bytesType().noDefault()
- .name("g_null").type().nullType().noDefault()
- .name("h_boolean").type().booleanType().noDefault()
- .name("i_union").type().unionOf().doubleType().and().longType().endUnion().noDefault()
- .endRecord();
-
- final File file = File.createTempFile("avro-primitive-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
- final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
- try {
-
- ByteBuffer bb = ByteBuffer.allocate(1);
- bb.put(0, (byte) 1);
-
- for (int i = 0; i < RECORD_COUNT; i++) {
- record.startRecord();
- record.put("a_string", "a_" + i);
- record.put("b_int", i);
- record.put("c_long", (long) i);
- record.put("d_float", (float) i);
- record.put("e_double", (double) i);
- record.put("f_bytes", bb);
- record.put("g_null", null);
- record.put("h_boolean", (i % 2 == 0));
- record.put("i_union", (i % 2 == 0 ? (double) i : (long) i));
- record.endRecord();
- }
- } finally {
- record.close();
- }
-
- return record;
- }
-
- public static AvroTestRecordWriter generateSimpleEnumSchema_NoNullValues() throws Exception {
- final String[] symbols = { "E_SYM_A", "E_SYM_B", "E_SYM_C", "E_SYM_D" };
-
- final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
- .namespace("org.apache.drill.exec.store.avro")
- .fields()
- .name("a_string").type().stringType().noDefault()
- .name("b_enum").type().enumeration("my_enum").symbols(symbols).noDefault()
- .endRecord();
-
- final File file = File.createTempFile("avro-primitive-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
- final Schema enumSchema = schema.getField("b_enum").schema();
-
- final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
- try {
-
- for (int i = 0; i < RECORD_COUNT; i++) {
- record.startRecord();
- record.put("a_string", "a_" + i);
- final GenericData.EnumSymbol symbol =
- new GenericData.EnumSymbol(enumSchema, symbols[(i + symbols.length) % symbols.length]);
- record.put("b_enum", symbol);
- record.endRecord();
- }
- } finally {
- record.close();
- }
-
- return record;
- }
-
- public static AvroTestRecordWriter generateSimpleArraySchema_NoNullValues() throws Exception {
- final File file = File.createTempFile("avro-array-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
- final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
- .namespace("org.apache.drill.exec.store.avro")
- .fields()
- .name("a_string").type().stringType().noDefault()
- .name("b_int").type().intType().noDefault()
- .name("c_string_array").type().array().items().stringType().noDefault()
- .name("d_int_array").type().array().items().intType().noDefault()
- .name("e_float_array").type().array().items().floatType().noDefault()
- .endRecord();
-
- final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
- try {
- for (int i = 0; i < RECORD_COUNT; i++) {
- record.startRecord();
- record.put("a_string", "a_" + i);
- record.put("b_int", i);
- {
- GenericArray<String> array = new GenericData.Array<>(ARRAY_SIZE, schema.getField("c_string_array").schema());
- for (int j = 0; j < ARRAY_SIZE; j++) {
- array.add(j, "c_string_array_" + i + "_" + j);
- }
- record.put("c_string_array", array);
- }
- {
- GenericArray<Integer> array = new GenericData.Array<>(ARRAY_SIZE, schema.getField("d_int_array").schema());
- for (int j = 0; j < ARRAY_SIZE; j++) {
- array.add(j, i * j);
- }
- record.put("d_int_array", array);
- }
- {
- GenericArray<Float> array = new GenericData.Array<>(ARRAY_SIZE, schema.getField("e_float_array").schema());
- for (int j = 0; j < ARRAY_SIZE; j++) {
- array.add(j, (float) (i * j));
- }
- record.put("e_float_array", array);
- }
- record.endRecord();
- }
-
- } finally {
- record.close();
- }
- return record;
- }
-
- public static AvroTestRecordWriter generateSimpleNestedSchema_NoNullValues() throws Exception {
- final File file = File.createTempFile("avro-nested-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
- final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
- .namespace("org.apache.drill.exec.store.avro")
- .fields()
- .name("a_string").type().stringType().noDefault()
- .name("b_int").type().intType().noDefault()
- .name("c_record").type().record("my_record_1")
- .namespace("foo.blah.org")
- .fields()
- .name("nested_1_string").type().stringType().noDefault()
- .name("nested_1_int").type().intType().noDefault()
- .endRecord()
- .noDefault()
- .endRecord();
-
- final Schema nestedSchema = schema.getField("c_record").schema();
-
- final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
- try {
- for (int i = 0; i < RECORD_COUNT; i++) {
- record.startRecord();
- record.put("a_string", "a_" + i);
- record.put("b_int", i);
-
- final GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
- nestedRecord.put("nested_1_string", "nested_1_string_" + i);
- nestedRecord.put("nested_1_int", i * i);
-
- record.put("c_record", nestedRecord);
- record.endRecord();
- }
- } finally {
- record.close();
- }
-
- return record;
- }
-
- public static AvroTestRecordWriter generateUnionNestedArraySchema_withNullValues() throws Exception {
- final File file = File.createTempFile("avro-nested-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
- final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
- .namespace("org.apache.drill.exec.store.avro")
- .fields()
- .name("a_string").type().stringType().noDefault()
- .name("b_int").type().intType().noDefault()
- .name("c_array").type().optional().array().items().record("my_record_1")
- .namespace("foo.blah.org").fields()
- .name("nested_1_string").type().optional().stringType()
- .name("nested_1_int").type().optional().intType()
- .endRecord()
- .endRecord();
-
- final Schema nestedSchema = schema.getField("c_array").schema();
- final Schema arraySchema = nestedSchema.getTypes().get(1);
- final Schema itemSchema = arraySchema.getElementType();
-
- final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
- try {
- for (int i = 0; i < RECORD_COUNT; i++) {
- record.startRecord();
- record.put("a_string", "a_" + i);
- record.put("b_int", i);
-
- if (i % 2 == 0) {
- GenericArray<GenericRecord> array = new GenericData.Array<>(1, arraySchema);
- final GenericRecord nestedRecord = new GenericData.Record(itemSchema);
- nestedRecord.put("nested_1_string", "nested_1_string_" + i);
- nestedRecord.put("nested_1_int", i * i);
- array.add(nestedRecord);
- record.put("c_array", array);
- }
- record.endRecord();
- }
- } finally {
- record.close();
- }
-
- return record;
- }
-
- public static AvroTestRecordWriter generateNestedArraySchema() throws IOException {
- return generateNestedArraySchema(RECORD_COUNT, ARRAY_SIZE);
- }
-
- public static AvroTestRecordWriter generateNestedArraySchema(int numRecords, int numArrayItems) throws IOException {
- final File file = File.createTempFile("avro-nested-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
- final Schema schema = SchemaBuilder.record("AvroRecordReaderTest").namespace("org.apache.drill.exec.store.avro")
- .fields().name("a_int").type().intType().noDefault().name("b_array").type().array().items()
- .record("my_record_1").namespace("foo.blah.org").fields().name("nested_1_int").type().optional().intType()
- .endRecord().arrayDefault(Collections.emptyList()).endRecord();
-
- final Schema arraySchema = schema.getField("b_array").schema();
- final Schema itemSchema = arraySchema.getElementType();
-
- final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
- try {
- for (int i = 0; i < numRecords; i++) {
- record.startRecord();
- record.put("a_int", i);
- GenericArray<GenericRecord> array = new GenericData.Array<>(ARRAY_SIZE, arraySchema);
-
- for (int j = 0; j < numArrayItems; j++) {
- final GenericRecord nestedRecord = new GenericData.Record(itemSchema);
- nestedRecord.put("nested_1_int", j);
- array.add(nestedRecord);
- }
- record.put("b_array", array);
- record.endRecord();
- }
- } finally {
- record.close();
- }
-
- return record;
- }
-
- public static AvroTestRecordWriter generateMapSchema_withNullValues() throws Exception {
- final File file = File.createTempFile("avro-nested-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
- final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
- .namespace("org.apache.drill.exec.store.avro")
- .fields()
- .name("a_string").type().stringType().noDefault()
- .name("b_int").type().intType().noDefault()
- .name("c_map").type().optional().map().values(Schema.create(Type.STRING))
- .endRecord();
-
- final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
- try {
- for (int i = 0; i < RECORD_COUNT; i++) {
- record.startRecord();
- record.put("a_string", "a_" + i);
- record.put("b_int", i);
-
- if (i % 2 == 0) {
- Map<String, String> strMap = new HashMap<>();
- strMap.put("key1", "nested_1_string_" + i);
- strMap.put("key2", "nested_1_string_" + (i + 1 ));
- record.put("c_map", strMap);
- }
- record.endRecord();
- }
- } finally {
- record.close();
- }
-
- return record;
- }
-
- public static AvroTestRecordWriter generateMapSchemaComplex_withNullValues() throws Exception {
- final File file = File.createTempFile("avro-nested-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
- final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
- .namespace("org.apache.drill.exec.store.avro")
- .fields()
- .name("a_string").type().stringType().noDefault()
- .name("b_int").type().intType().noDefault()
- .name("c_map").type().optional().map().values(Schema.create(Type.STRING))
- .name("d_map").type().optional().map().values(Schema.createArray(Schema.create(Type.DOUBLE)))
- .endRecord();
-
- final Schema arrayMapSchema = schema.getField("d_map").schema();
- final Schema arrayItemSchema = arrayMapSchema.getTypes().get(1).getValueType();
-
- final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
- try {
- for (int i = 0; i < RECORD_COUNT; i++) {
- record.startRecord();
- record.put("a_string", "a_" + i);
- record.put("b_int", i);
-
- if (i % 2 == 0) {
- Map<String, String> c_map = new HashMap<>();
- c_map.put("key1", "nested_1_string_" + i);
- c_map.put("key2", "nested_1_string_" + (i + 1 ));
- record.put("c_map", c_map);
- } else {
- Map<String, GenericArray<Double>> d_map = new HashMap<>();
- GenericArray<Double> array = new GenericData.Array<>(ARRAY_SIZE, arrayItemSchema);
- for (int j = 0; j < ARRAY_SIZE; j++) {
- array.add((double)j);
- }
- d_map.put("key1", array);
- d_map.put("key2", array);
-
- record.put("d_map", d_map);
- }
- record.endRecord();
- }
- } finally {
- record.close();
- }
-
- return record;
- }
-
- public static AvroTestRecordWriter generateUnionNestedSchema_withNullValues() throws Exception {
- final File file = File.createTempFile("avro-nested-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
- final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
- .namespace("org.apache.drill.exec.store.avro")
- .fields()
- .name("a_string").type().stringType().noDefault()
- .name("b_int").type().intType().noDefault()
- .name("c_record").type().optional().record("my_record_1")
- .namespace("foo.blah.org").fields()
- .name("nested_1_string").type().optional().stringType()
- .name("nested_1_int").type().optional().intType()
- .endRecord()
- .endRecord();
-
- final Schema nestedSchema = schema.getField("c_record").schema();
- final Schema optionalSchema = nestedSchema.getTypes().get(1);
-
- final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
- try {
- for (int i = 0; i < RECORD_COUNT; i++) {
- record.startRecord();
- record.put("a_string", "a_" + i);
- record.put("b_int", i);
-
- if (i % 2 == 0) {
- final GenericRecord nestedRecord = new GenericData.Record(optionalSchema);
- nestedRecord.put("nested_1_string", "nested_1_string_" + i);
- nestedRecord.put("nested_1_int", i * i);
- record.put("c_record", nestedRecord);
- }
- record.endRecord();
- }
- } finally {
- record.close();
- }
-
- return record;
- }
-
- public static AvroTestRecordWriter generateDoubleNestedSchema_NoNullValues() throws Exception {
- final File file = File.createTempFile("avro-double-nested-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
- final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
- .namespace("org.apache.drill.exec.store.avro")
- .fields()
- .name("a_string").type().stringType().noDefault()
- .name("b_int").type().intType().noDefault()
- .name("c_record").type().record("my_record_1")
- .namespace("foo.blah.org")
- .fields()
- .name("nested_1_string").type().stringType().noDefault()
- .name("nested_1_int").type().intType().noDefault()
- .name("nested_1_record").type().record("my_double_nested_record_1")
- .namespace("foo.blah.org.rot")
- .fields()
- .name("double_nested_1_string").type().stringType().noDefault()
- .name("double_nested_1_int").type().intType().noDefault()
- .endRecord()
- .noDefault()
- .endRecord()
- .noDefault()
- .endRecord();
-
- final Schema nestedSchema = schema.getField("c_record").schema();
- final Schema doubleNestedSchema = nestedSchema.getField("nested_1_record").schema();
-
- final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
- try {
- for (int i = 0; i < RECORD_COUNT; i++) {
- record.startRecord();
- record.put("a_string", "a_" + i);
- record.put("b_int", i);
-
- final GenericRecord nestedRecord = new GenericData.Record(nestedSchema);
- nestedRecord.put("nested_1_string", "nested_1_string_" + i);
- nestedRecord.put("nested_1_int", i * i);
-
- final GenericRecord doubleNestedRecord = new GenericData.Record(doubleNestedSchema);
- doubleNestedRecord.put("double_nested_1_string", "double_nested_1_string_" + i + "_" + i);
- doubleNestedRecord.put("double_nested_1_int", i * i * i);
-
- nestedRecord.put("nested_1_record", doubleNestedRecord);
- record.put("c_record", nestedRecord);
-
- record.endRecord();
- }
- } finally {
- record.close();
- }
-
- return record;
- }
-
- public static String generateLinkedList() throws Exception {
- final File file = File.createTempFile("avro-linkedlist", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
- final Schema schema = SchemaBuilder.record("LongList")
- .namespace("org.apache.drill.exec.store.avro")
- .aliases("LinkedLongs")
- .fields()
- .name("value").type().optional().longType()
- .name("next").type().optional().type("LongList")
- .endRecord();
-
- final DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schema));
- writer.create(schema, file);
- GenericRecord previousRecord = null;
- try {
- for (int i = 0; i < RECORD_COUNT; i++) {
- GenericRecord record = (GenericRecord) (previousRecord == null ? new GenericData.Record(schema) : previousRecord.get("next"));
- record.put("value", (long) i);
- if (previousRecord != null) {
- writer.append(previousRecord);
- }
- GenericRecord nextRecord = new GenericData.Record(record.getSchema());
- record.put("next", nextRecord);
- previousRecord = record;
- }
- writer.append(previousRecord);
- } finally {
- writer.close();
- }
-
- return file.getName();
- }
-
- public static AvroTestRecordWriter generateStringAndUtf8Data() throws Exception {
-
- final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
- .namespace("org.apache.drill.exec.store.avro")
- .fields()
- .name("a_string").type().stringBuilder().prop("avro.java.string", "String").endString().noDefault()
- .name("b_utf8").type().stringType().noDefault()
- .endRecord();
-
- final File file = File.createTempFile("avro-primitive-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
- final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
- try {
-
- ByteBuffer bb = ByteBuffer.allocate(1);
- bb.put(0, (byte) 1);
-
- for (int i = 0; i < RECORD_COUNT; i++) {
- record.startRecord();
- record.put("a_string", "a_" + i);
- record.put("b_utf8", "b_" + i);
- record.endRecord();
- }
- } finally {
- record.close();
- }
-
- return record;
- }
-
- /**
- * Creates Avro table with specified schema and specified data
- * @param schema table schema
- * @param data table data
- * @param <D> record type
- * @return file with newly created Avro table.
- * @throws IOException if an error is appeared during creation or filling the file.
- */
- public static <D extends SpecificRecord> File write(Schema schema, D... data) throws IOException {
- File file = File.createTempFile("avro-primitive-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
-
- DatumWriter writer = SpecificData.get().createDatumWriter(schema);
-
- try (DataFileWriter<D> fileWriter = new DataFileWriter<>(writer)) {
- fileWriter.create(schema, file);
- for (D datum : data) {
- fileWriter.append(datum);
- }
- }
-
- return file;
- }
-
- public static AvroTestRecordWriter generateMapSchema() throws Exception {
- final File file = File.createTempFile("avro-map-test", ".avro", BaseTestQuery.dirTestWatcher.getRootDir());
- final Schema schema = SchemaBuilder.record("AvroRecordReaderTest")
- .namespace("org.apache.drill.exec.store.avro")
- .fields()
- .name("map_field").type().optional().map().values(Schema.create(Type.LONG))
- .name("map_array").type().optional().array().items(Schema.createMap(Schema.create(Type.INT)))
- .name("map_array_value").type().optional().map().values(Schema.createArray(Schema.create(Type.DOUBLE)))
- .endRecord();
-
- final Schema mapArraySchema = schema.getField("map_array").schema();
- final Schema arrayItemSchema = mapArraySchema.getTypes().get(1);
-
- final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file);
- try {
- for (int i = 0; i < RECORD_COUNT; i++) {
- record.startRecord();
-
- // Create map with long values
- Map<String, Long> map = new HashMap<>();
- map.put("key1", (long) i);
- map.put("key2", (long) i + 1);
- record.put("map_field", map);
-
- // Create list of map with int values
- GenericArray<Map<String, Integer>> array = new GenericData.Array<>(ARRAY_SIZE, arrayItemSchema);
- for (int j = 0; j < ARRAY_SIZE; j++) {
- Map<String, Integer> mapInt = new HashMap<>();
- mapInt.put("key1", (i + 1) * (j + 50));
- mapInt.put("key2", (i + 1) * (j + 100));
- array.add(mapInt);
- }
- record.put("map_array", array);
-
- // create map with array value
- Map<String, GenericArray<Double>> mapArrayValue = new HashMap<>();
- GenericArray<Double> doubleArray = new GenericData.Array<>(ARRAY_SIZE, arrayItemSchema);
- for (int j = 0; j < ARRAY_SIZE; j++) {
- doubleArray.add((double) (i + 1) * j);
- }
- mapArrayValue.put("key1", doubleArray);
- mapArrayValue.put("key2", doubleArray);
- record.put("map_array_value", mapArrayValue);
-
- record.endRecord();
- }
- } finally {
- record.close();
- }
-
- return record;
- }
-}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
index 44a4847..b33ad22 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
@@ -87,7 +87,7 @@
* @param value a value that matches the primary setter above, or null
* to set the column to null
*
- * @See {@link ColumnWriter#setObject()} for the generic case
+ * @see ColumnWriter#setObject(Object) for the generic case
*/
void setValue(Object value);
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractConvertFromString.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractConvertFromString.java
index bc190ac..b3cd3f7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractConvertFromString.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractConvertFromString.java
@@ -33,7 +33,7 @@
/**
* Property to control how the conversion handles blanks. Blanks are
- * zero-length text fields (after triming whitespace.)
+ * zero-length text fields (after trimming whitespace.)
*/
public static final String BLANK_ACTION_PROP = "blank-as";
@@ -251,7 +251,7 @@
}
@Override
- public void setBytes(byte bytes[], int length) {
+ public void setBytes(byte[] bytes, int length) {
setString(new String(bytes, 0, length, Charsets.UTF_8));
}
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
index 60810fb..3f92ab1 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
@@ -36,7 +36,7 @@
* perform the type conversion, such as overriding "setString" to convert
* from a string representation of a value to the actual format.
* <p>
- * The {@link #setObject()} method works here: the object is passed
+ * The {@link #setObject(Object)} method works here: the object is passed
* to this class's set methods, allowing, say, setting a string object
* for an int column in the case above.
*/
diff --git a/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfig.java
index 3e32977..5674933 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfig.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfig.java
@@ -32,9 +32,7 @@
* live under one storage system. The storage systems themselves are described
* in {@see StoragePluginConfig}s.
*/
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property="type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface FormatPluginConfig {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatPluginConfig.class);
-
}
diff --git a/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfigBase.java b/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfigBase.java
index 315f7cf..90ffaa8 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfigBase.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/FormatPluginConfigBase.java
@@ -20,10 +20,12 @@
import java.util.Set;
import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+public abstract class FormatPluginConfigBase implements FormatPluginConfig {
-public abstract class FormatPluginConfigBase implements FormatPluginConfig{
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatPluginConfigBase.class);
+ private static final Logger logger = LoggerFactory.getLogger(FormatPluginConfigBase.class);
/**
* scan for implementations of see <b>FormatPlugin</b>.
diff --git a/pom.xml b/pom.xml
index adc50dc..a7d87ab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,7 @@
<javassist.version>3.26.0-GA</javassist.version>
<msgpack.version>0.6.6</msgpack.version>
<reflections.version>0.9.10</reflections.version>
- <avro.version>1.9.0</avro.version>
+ <avro.version>1.9.1</avro.version>
<metrics.version>4.0.2</metrics.version>
<jetty.version>9.3.25.v20180904</jetty.version>
<jersey.version>2.25.1</jersey.version>
@@ -110,6 +110,7 @@
<joda.version>2.10.5</joda.version>
<javax.el.version>3.0.0</javax.el.version>
<surefire.version>3.0.0-M4</surefire.version>
+ <commons.compress>1.19</commons.compress>
</properties>
<scm>
@@ -1834,6 +1835,11 @@
<artifactId>javax.el</artifactId>
<version>${javax.el.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>${commons.compress}</version>
+ </dependency>
</dependencies>
</dependencyManagement>