Merge branch 'master' of https://github.com/apache/drill into DRILL-7863
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
index 680281e..b779c9a 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
@@ -53,7 +53,6 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
-import org.junit.runners.Suite.SuiteClasses;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,9 +66,15 @@
import java.util.concurrent.atomic.AtomicInteger;
@RunWith(Suite.class)
-@SuiteClasses({TestMongoFilterPushDown.class, TestMongoProjectPushDown.class,
- TestMongoQueries.class, TestMongoChunkAssignment.class,
- TestMongoStoragePluginUsesCredentialsStore.class})
+@Suite.SuiteClasses({
+ TestMongoFilterPushDown.class,
+ TestMongoProjectPushDown.class,
+ TestMongoQueries.class,
+ TestMongoChunkAssignment.class,
+ TestMongoStoragePluginUsesCredentialsStore.class,
+ TestMongoDrillIssue.class
+})
+
@Category({SlowTest.class, MongoStorageTest.class})
public class MongoTestSuite extends BaseTest implements MongoTestConstants {
diff --git a/contrib/udfs/README.md b/contrib/udfs/README.md
index 3ef761c..ae65e1d 100644
--- a/contrib/udfs/README.md
+++ b/contrib/udfs/README.md
@@ -272,3 +272,40 @@
SELECT parse_user_agent( `user_agent`, 'AgentName` ) as AgentName ...
```
which will just return the requested field. If the user agent string is empty, all fields will have the value of `Hacker`.
+
+## Map Schema Function
+This function allows you to drill down into the schema of maps. The REST API and JDBC interfaces will only return `MAP`, `LIST` for the MAP, however, it is not possible to get
+the schema of the inner map. The function `getMapSchema(<MAP>)` will return a `MAP` of the fields and datatypes.
+
+### Example Usage
+
+Using the data below, the query below will return the schema as shown below.
+```bash
+apache drill> SELECT getMapSchema(record) AS schema FROM dfs.test.`schema_test.json`;
++----------------------------------------------------------------------------------+
+| schema |
++----------------------------------------------------------------------------------+
+| {"int_field":"BIGINT","double_field":"FLOAT8","string_field":"VARCHAR","int_list":"REPEATED_BIGINT","double_list":"REPEATED_FLOAT8","map":"MAP"} |
++----------------------------------------------------------------------------------+
+1 row selected (0.298 seconds)
+```
+
+```json
+{
+ "record" : {
+ "int_field": 1,
+ "double_field": 2.0,
+ "string_field": "My string",
+ "int_list": [1,2,3],
+ "double_list": [1.0,2.0,3.0],
+ "map": {
+ "nested_int_field" : 5,
+ "nested_double_field": 5.0,
+ "nested_string_field": "5.0"
+ }
+ },
+ "single_field": 10
+}
+```
+
+The function returns an empty map if the row is `null`.
diff --git a/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/ComplexSchemaFunctions.java b/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/ComplexSchemaFunctions.java
new file mode 100644
index 0000000..50fef82
--- /dev/null
+++ b/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/ComplexSchemaFunctions.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.udfs;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+
+import javax.inject.Inject;
+
+public class ComplexSchemaFunctions {
+
+ /**
+ * This function exists to help the user understand the inner schemata of maps
+ * It is NOT recursive (yet).
+ */
+ @FunctionTemplate(names = {"get_map_schema", "getMapSchema"},
+ scope = FunctionTemplate.FunctionScope.SIMPLE,
+ nulls = NullHandling.INTERNAL)
+ public static class GetMapSchemaFunction implements DrillSimpleFunc {
+
+ @Param
+ FieldReader reader;
+
+ @Output
+ BaseWriter.ComplexWriter outWriter;
+
+ @Inject
+ DrillBuf outBuffer;
+
+ @Override
+ public void setup() {
+ // Nothing to see here...
+ }
+
+ @Override
+ public void eval() {
+ if (reader.isSet()) {
+ org.apache.drill.exec.udfs.ComplexSchemaUtils.getFields(reader, outWriter, outBuffer);
+ } else {
+ org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter queryMapWriter = outWriter.rootAsMap();
+ // Return empty map
+ queryMapWriter.start();
+ queryMapWriter.end();
+ }
+ }
+ }
+}
diff --git a/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/ComplexSchemaUtils.java b/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/ComplexSchemaUtils.java
new file mode 100644
index 0000000..7f027aa
--- /dev/null
+++ b/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/ComplexSchemaUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.udfs;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+
+import java.util.Iterator;
+
+public class ComplexSchemaUtils {
+
+ public static void getFields(FieldReader reader, BaseWriter.ComplexWriter outWriter, DrillBuf buffer) {
+
+ BaseWriter.MapWriter queryMapWriter = outWriter.rootAsMap();
+
+ if (reader.getType().getMinorType() != MinorType.MAP) {
+ // If the field is not a map, return an empty map
+ queryMapWriter.start();
+ queryMapWriter.end();
+ }
+
+ Iterator<String> fieldIterator = reader.iterator();
+ queryMapWriter.start();
+
+ while (fieldIterator.hasNext()) {
+ String fieldName = fieldIterator.next();
+ FieldReader fieldReader = reader.reader(fieldName);
+ String dataType = fieldReader.getType().getMinorType().toString();
+
+ DataMode dataMode = fieldReader.getType().getMode();
+ if (dataMode == DataMode.REPEATED) {
+ dataType = dataMode + "_" + dataType;
+ }
+
+ VarCharHolder rowHolder = new VarCharHolder();
+ byte[] rowStringBytes = dataType.getBytes();
+ buffer.reallocIfNeeded(rowStringBytes.length);
+ buffer.setBytes(0, rowStringBytes);
+
+ rowHolder.start = 0;
+ rowHolder.end = rowStringBytes.length;
+ rowHolder.buffer = buffer;
+
+ queryMapWriter.varChar(fieldName).write(rowHolder);
+ }
+ queryMapWriter.end();
+ }
+}
diff --git a/contrib/udfs/src/test/java/org/apache/drill/exec/udfs/TestComplexSchemaFunctions.java b/contrib/udfs/src/test/java/org/apache/drill/exec/udfs/TestComplexSchemaFunctions.java
new file mode 100644
index 0000000..bf453ef
--- /dev/null
+++ b/contrib/udfs/src/test/java/org/apache/drill/exec/udfs/TestComplexSchemaFunctions.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.udfs;
+
+import org.apache.drill.categories.SqlFunctionTest;
+import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertEquals;
+
+@Category({UnlikelyTest.class, SqlFunctionTest.class})
+public class TestComplexSchemaFunctions extends ClusterTest {
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+ startCluster(builder);
+ }
+
+ @Test
+ public void testMapSchemaFunction() throws RpcException {
+ String sql = "SELECT getMapSchema(record) AS schema FROM cp.`json/nestedSchema.json`";
+
+ QueryBuilder q = client.queryBuilder().sql(sql);
+ RowSet results = q.rowSet();
+ assertEquals(results.rowCount(), 1);
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addMap("schema")
+ .addNullable("int_field", MinorType.VARCHAR)
+ .addNullable("double_field", MinorType.VARCHAR)
+ .addNullable("string_field", MinorType.VARCHAR)
+ .addNullable("boolean_field", MinorType.VARCHAR)
+ .addNullable("int_list", MinorType.VARCHAR)
+ .addNullable("double_list", MinorType.VARCHAR)
+ .addNullable("boolean_list", MinorType.VARCHAR)
+ .addNullable("map", MinorType.VARCHAR)
+ .addNullable("repeated_map", MinorType.VARCHAR)
+ .resumeSchema()
+ .build();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow((Object)strArray("BIGINT", "FLOAT8", "VARCHAR", "BIT", "REPEATED_BIGINT", "REPEATED_FLOAT8", "REPEATED_BIT", "MAP", "REPEATED_MAP"))
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testMapSchemaFunctionWithInnerMap() throws RpcException {
+ String sql = "SELECT getMapSchema(t1.record.map) AS schema FROM cp.`json/nestedSchema.json` AS t1";
+
+ QueryBuilder q = client.queryBuilder().sql(sql);
+ RowSet results = q.rowSet();
+ assertEquals(results.rowCount(), 1);
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addMap("schema")
+ .addNullable("nested_int_field", MinorType.VARCHAR)
+ .addNullable("nested_double_field", MinorType.VARCHAR)
+ .addNullable("nested_string_field", MinorType.VARCHAR)
+ .resumeSchema()
+ .build();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow((Object)strArray("BIGINT", "FLOAT8", "VARCHAR"))
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testMapSchemaFunctionWithNull() throws RpcException {
+ String sql = "SELECT getMapSchema(null) AS schema FROM cp.`json/nestedSchema.json` AS t1";
+
+ QueryBuilder q = client.queryBuilder().sql(sql);
+ RowSet results = q.rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("schema", MinorType.MAP)
+ .build();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow((Object) mapArray())
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+}
diff --git a/contrib/udfs/src/test/resources/json/nestedSchema.json b/contrib/udfs/src/test/resources/json/nestedSchema.json
new file mode 100644
index 0000000..22abcc2
--- /dev/null
+++ b/contrib/udfs/src/test/resources/json/nestedSchema.json
@@ -0,0 +1,21 @@
+{
+ "record" : {
+ "int_field": 1,
+ "double_field": 2.0,
+ "string_field": "My string",
+ "boolean_field": true,
+ "int_list": [1,2,3],
+ "double_list": [1.0,2.0,3.0],
+ "boolean_list": [true, false, true],
+ "map": {
+ "nested_int_field" : 5,
+ "nested_double_field": 5.0,
+ "nested_string_field": "5.0"
+ },
+ "repeated_map": [
+ { "a" : 1 },
+ { "b" : "abc" }
+ ]
+ },
+ "single_field": 10
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index a5330dd..c36ba67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -777,7 +777,7 @@
}
public B limit(int maxRecords) {
- source.maxRecords = maxRecords;
+ this.maxRecords = maxRecords;
return self();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
index e046563..9e2c0e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
@@ -111,10 +111,10 @@
final RelNode newLimit;
if (projectRel != null) {
- final RelNode newProject = projectRel.copy(projectRel.getTraitSet(), ImmutableList.of((RelNode) newScanRel));
+ final RelNode newProject = projectRel.copy(projectRel.getTraitSet(), ImmutableList.of(newScanRel));
newLimit = limitRel.copy(limitRel.getTraitSet(), ImmutableList.of(newProject));
} else {
- newLimit = limitRel.copy(limitRel.getTraitSet(), ImmutableList.of((RelNode) newScanRel));
+ newLimit = limitRel.copy(limitRel.getTraitSet(), ImmutableList.of(newScanRel));
}
call.transformTo(newLimit);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index 85709b0..0a8e66a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -246,8 +246,9 @@
return message("Success");
} catch (PluginEncodingException e) {
logger.warn("Error in JSON mapping: {}", storagePluginConfig, e);
- return message("Invalid JSON");
+ return message("Invalid JSON: " + e.getMessage());
} catch (PluginException e) {
+ logger.error("Error while saving plugin", e);
return message(e.getMessage());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index 390a32c..38a489e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -49,6 +49,8 @@
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
+import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
/**
* Plugin registry. Caches plugin instances which correspond to configurations
@@ -510,6 +512,8 @@
return context.mapper().reader()
.forType(StoragePluginConfig.class)
.readValue(json);
+ } catch (InvalidTypeIdException | UnrecognizedPropertyException e) {
+ throw new PluginEncodingException(e.getMessage(), e);
} catch (IOException e) {
throw new PluginEncodingException("Failure when decoding plugin JSON", e);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 928ebac..580cc6c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.commons.collections.MapUtils;
import org.apache.drill.common.PlanStringBuilder;
@@ -42,6 +43,7 @@
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.metastore.store.FileTableMetadataProviderBuilder;
+import org.apache.drill.metastore.metadata.FileMetadata;
import org.apache.drill.metastore.metadata.LocationProvider;
import org.apache.drill.metastore.metadata.TableMetadataProvider;
import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -87,7 +89,6 @@
private List<CompleteFileWork> chunks;
private List<EndpointAffinity> endpointAffinities;
private final Path selectionRoot;
- private final int maxRecords;
@JsonCreator
public EasyGroupScan(
@@ -177,7 +178,6 @@
mappings = that.mappings;
partitionDepth = that.partitionDepth;
metadataProvider = that.metadataProvider;
- maxRecords = getMaxRecords();
}
@JsonIgnore
@@ -407,9 +407,13 @@
newScan.files = files;
newScan.matchAllMetadata = matchAllMetadata;
newScan.nonInterestingColumnsMetadata = nonInterestingColumnsMetadata;
+ newScan.maxRecords = maxRecords;
- newScan.fileSet = newScan.getFilesMetadata().keySet();
- newScan.selection = FileSelection.create(null, new ArrayList<>(newScan.fileSet), newScan.selectionRoot);
+ Map<Path, FileMetadata> filesMetadata = newScan.getFilesMetadata();
+ if (MapUtils.isNotEmpty(filesMetadata)) {
+ newScan.fileSet = filesMetadata.keySet();
+ newScan.selection = FileSelection.create(null, new ArrayList<>(newScan.fileSet), newScan.selectionRoot);
+ }
try {
newScan.initFromSelection(newScan.selection, newScan.formatPlugin);
} catch (IOException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index c13846d..2923bd2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -538,6 +538,7 @@
newScan.rowGroups = rowGroups;
newScan.matchAllMetadata = matchAllMetadata;
newScan.nonInterestingColumnsMetadata = nonInterestingColumnsMetadata;
+ newScan.maxRecords = maxRecords;
// since builder is used when pruning happens, entries and fileSet should be expanded
if (!newScan.getFilesMetadata().isEmpty()) {
newScan.entries = newScan.getFilesMetadata().keySet().stream()