DRILL-8204: Allow Provided Schema for HTTP Plugin in JSON Mode (#2526)
* Initial commit
* Map working
* WIP
* Added builder
* Lists in maps working
* Add documentation
* Cleaned up UT
* Final Revision
* Fix checkstyle
* Minor tweak
* removed extra test file
* Removed unused import
* Added inline schema support
* Addressed review comments
* Removed unused import
* Removed json string
* Final Revisions
* Fixed unit test
diff --git a/contrib/storage-http/JSON_Options.md b/contrib/storage-http/JSON_Options.md
new file mode 100644
index 0000000..3579068
--- /dev/null
+++ b/contrib/storage-http/JSON_Options.md
@@ -0,0 +1,79 @@
+# JSON Options and Configuration
+
+Drill has a collection of JSON configuration options to allow you to configure how Drill interprets JSON files. These are set at the global level, however the HTTP plugin
+allows you to configure these options individually per connection and override the Drill defaults. The options are:
+
+* `allowNanInf`: Configures the connection to interpret `NaN` and `Inf` values
+* `allTextMode`: By default, Drill attempts to infer data types from JSON data. If the data is malformed, Drill may throw schema change exceptions. If your data is
+ inconsistent, you can enable `allTextMode` which when true, Drill will read all JSON values as strings, rather than try to infer the data type.
+* `readNumbersAsDouble`: By default Drill will attempt to interpret integers, floating point number types and strings. One challenge is when data is consistent, Drill may
+ throw schema change exceptions. In addition to `allTextMode`, you can make Drill less sensitive by setting the `readNumbersAsDouble` to `true` which causes Drill to read all
+ numeric fields in JSON data as `double` data type rather than trying to distinguish between ints and doubles.
+* `enableEscapeAnyChar`: Allows a user to escape any character with a \
+* `skipMalformedRecords`: Allows Drill to skip malformed records and recover without throwing exceptions.
+* `skipMalformedDocument`: Allows Drill to skip entire malformed documents without throwing errors.
+
+All of these can be set by adding the `jsonOptions` to your connection configuration as shown below:
+
+```json
+
+"jsonOptions": {
+ "allTextMode": true,
+ "readNumbersAsDouble": true
+}
+
+```
+
+## Schema Provisioning
+One of the challenges of querying APIs is inconsistent data. Drill allows you to provide a schema for individual endpoints. You can do this in one of three ways:
+
+1. By providing a schema inline [See: Specifying Schema as Table Function Parameter](https://drill.apache.org/docs/plugin-configuration-basics/#specifying-the-schema-as-table-function-parameter)
+2. By providing a schema in the configuration for the endpoint.
+
+The schema provisioning currently supports complex types of Arrays and Maps at any nesting level.
+
+### Example Schema Provisioning:
+```json
+"jsonOptions": {
+ "schema": {
+ "columns":[
+ {
+ "name":"outer_map",
+ "type":"ARRAY<STRUCT<`bigint_col` BIGINT, `boolean_col` BOOLEAN, `date_col` DATE, `double_col` DOUBLE, `interval_col` INTERVAL, `int_col` BIGINT, `timestamp_col` TIMESTAMP, `time_col` TIME, `varchar_col` VARCHAR>>","mode":"REPEATED"
+ }, {
+ "name":"field1",
+ "type":"VARCHAR",
+ "mode":"OPTIONAL"
+ },
+ ]
+ }
+}
+```
+
+## Dealing With Inconsistent Schemas
+One of the major challenges of interacting with JSON data is when the schema is inconsistent. Drill has a `UNION` data type which is marked as experimental. At the time of
+writing, the HTTP plugin does not support the `UNION`, however supplying a schema can solve a lot of those issues.
+
+### Json Mode
+Drill offers the option of reading all JSON values as a string. While this can complicate downstream analytics, it can also be a more memory-efficient way of reading data with
+inconsistent schema. Unfortunately, at the time of writing, JSON-mode is only available with a provided schema. However, future work will allow this mode to be enabled for
+any JSON data.
+
+#### Enabling JSON Mode:
+You can enable JSON mode simply by adding the `drill.json-mode` property with a value of `json` to a field, as shown below:
+
+```json
+"jsonOptions": {
+ "readNumbersAsDouble": true,
+ "schema": {
+ "type": "tuple_schema",
+ "columns": [
+ {
+ "name": "custom_fields",
+ "type": "ARRAY<STRUCT<`value` VARCHAR PROPERTIES { 'drill.json-mode' = 'json' }>>",
+ "mode": "REPEATED"
+ }
+ ]
+ }
+}
+```
diff --git a/contrib/storage-http/README.md b/contrib/storage-http/README.md
index 7a808cf..0f188ba 100644
--- a/contrib/storage-http/README.md
+++ b/contrib/storage-http/README.md
@@ -250,29 +250,7 @@
format plugin.
#### JSON Configuration
-Drill has a collection of JSON configuration options to allow you to configure how Drill interprets JSON files. These are set at the global level, however the HTTP plugin
-allows you to configure these options individually per connection and override the Drill defaults. The options are:
-
-* `allowNanInf`: Configures the connection to interpret `NaN` and `Inf` values
-* `allTextMode`: By default, Drill attempts to infer data types from JSON data. If the data is malformed, Drill may throw schema change exceptions. If your data is
- inconsistent, you can enable `allTextMode` which when true, Drill will read all JSON values as strings, rather than try to infer the data type.
-* `readNumbersAsDouble`: By default Drill will attempt to interpret integers, floating point number types and strings. One challenge is when data is consistent, Drill may
- throw schema change exceptions. In addition to `allTextMode`, you can make Drill less sensitive by setting the `readNumbersAsDouble` to `true` which causes Drill to read all
- numeric fields in JSON data as `double` data type rather than trying to distinguish between ints and doubles.
-* `enableEscapeAnyChar`: Allows a user to escape any character with a \
-* `skipMalformedRecords`: Allows Drill to skip malformed records and recover without throwing exceptions.
-* `skipMalformedDocument`: Allows Drill to skip entire malformed documents without throwing errors.
-
-All of these can be set by adding the `jsonOptions` to your connection configuration as shown below:
-
-```json
-
-"jsonOptions": {
- "allTextMode": true,
- "readNumbersAsDouble": true
-}
-
-```
+[Read the documentation for configuring json options, including schema provisioning.](JSON_Options.md)
#### Authorization
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index b99ec57..a6ce240 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -29,7 +29,9 @@
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.easy.json.loader.JsonLoader;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
@@ -111,7 +113,6 @@
.build();
// JSON loader setup
-
resultSetLoader = negotiator.build();
if (implicitColumnsAreProjected()) {
implicitColumns = new ImplicitColumns(resultSetLoader.writer());
@@ -141,6 +142,11 @@
} else {
jsonBuilder.standardOptions(negotiator.queryOptions());
}
+
+ if (getSchema(negotiator) != null) {
+ jsonBuilder.providedSchema(getSchema(negotiator));
+ }
+
jsonLoader = jsonBuilder.build();
} catch (Throwable t) {
@@ -153,6 +159,35 @@
return true;
}
+ /**
+ * This function obtains the correct schema for the {@link JsonLoader}. There are four possibilities:
+ * 1. The schema is provided in the configuration only. In this case, that schema will be returned.
+ * 2. The schema is provided in both the configuration and inline. These two schemas will be merged together.
+ * 3. The schema is provided inline in a query. In this case, that schema will be returned.
+ * 4. No schema is provided. Function returns null.
+ * @param negotiator {@link SchemaNegotiator} The schema negotiator with all the connection information
+ * @return The built {@link TupleMetadata} of the provided schema, null if none provided.
+ */
+ private TupleMetadata getSchema(SchemaNegotiator negotiator) {
+ if (subScan.tableSpec().connectionConfig().jsonOptions() != null &&
+ subScan.tableSpec().connectionConfig().jsonOptions().schema() != null) {
+ TupleMetadata configuredSchema = subScan.tableSpec().connectionConfig().jsonOptions().schema();
+
+ // If it has a provided schema both inline and in the config, merge the two, otherwise, return the config schema
+ if (negotiator.hasProvidedSchema()) {
+ TupleMetadata inlineSchema = negotiator.providedSchema();
+ return FixedReceiver.Builder.mergeSchemas(configuredSchema, inlineSchema);
+ } else {
+ return configuredSchema;
+ }
+ } else {
+ if (negotiator.hasProvidedSchema()) {
+ return negotiator.providedSchema();
+ }
+ }
+ return null;
+ }
+
protected void buildImplicitColumns() {
// Add String fields
for (String fieldName : STRING_METADATA_FIELDS) {
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
index 5eab823..3da54d0 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.http;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -28,6 +29,7 @@
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -35,8 +37,11 @@
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.http.util.SimpleHttp;
import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.metastore.metadata.TableMetadata;
+import org.apache.drill.metastore.metadata.TableMetadataProvider;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -53,11 +58,12 @@
// Used only in planner, not serialized
private int hashCode;
+ private MetadataProviderManager metadataProviderManager;
/**
* Creates a new group scan from the storage plugin.
*/
- public HttpGroupScan (HttpScanSpec scanSpec) {
+ public HttpGroupScan (HttpScanSpec scanSpec, MetadataProviderManager metadataProviderManager) {
super(scanSpec.queryUserName());
this.httpScanSpec = scanSpec;
this.username = scanSpec.queryUserName();
@@ -66,6 +72,7 @@
this.filterSelectivity = 0.0;
this.scanStats = computeScanStats();
this.maxRecords = -1;
+ this.metadataProviderManager = metadataProviderManager;
}
/**
@@ -79,6 +86,7 @@
this.filterSelectivity = that.filterSelectivity;
this.maxRecords = that.maxRecords;
this.username = that.username;
+ this.metadataProviderManager = that.metadataProviderManager;
// Calcite makes many copies in the later stage of planning
// without changing anything. Retain the previous stats.
@@ -98,6 +106,7 @@
// to again assign columns. Retain filters, but compute new stats.
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
+ this.metadataProviderManager = that.metadataProviderManager;
this.scanStats = computeScanStats();
this.username = that.username;
this.maxRecords = that.maxRecords;
@@ -118,6 +127,7 @@
this.filterSelectivity = filterSelectivity;
this.scanStats = computeScanStats();
this.maxRecords = that.maxRecords;
+ this.metadataProviderManager = that.metadataProviderManager;
}
/**
@@ -134,6 +144,7 @@
this.filterSelectivity = that.filterSelectivity;
this.scanStats = computeScanStats();
this.maxRecords = maxRecords;
+ this.metadataProviderManager = that.metadataProviderManager;
}
@@ -192,7 +203,7 @@
@Override
public SubScan getSpecificScan(int minorFragmentId) {
- return new HttpSubScan(httpScanSpec, columns, filters, maxRecords);
+ return new HttpSubScan(httpScanSpec, columns, filters, maxRecords, getSchema());
}
@Override
@@ -216,6 +227,33 @@
return new HttpGroupScan(this);
}
+ public TupleMetadata getSchema() {
+ if (metadataProviderManager == null) {
+ return null;
+ }
+ try {
+ return metadataProviderManager.getSchemaProvider().read().getSchema();
+ } catch (IOException | NullPointerException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public TableMetadata getTableMetadata() {
+ if (getMetadataProvider() == null) {
+ return null;
+ }
+ return getMetadataProvider().getTableMetadata();
+ }
+
+ @Override
+ public TableMetadataProvider getMetadataProvider() {
+ if (metadataProviderManager == null) {
+ return null;
+ }
+ return metadataProviderManager.getTableMetadataProvider();
+ }
+
@Override
public ScanStats getScanStats() {
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpJsonOptions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpJsonOptions.java
index edd77f9..c8274e1 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpJsonOptions.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpJsonOptions.java
@@ -24,6 +24,7 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
@@ -51,6 +52,8 @@
@JsonProperty
private final Boolean skipMalformedDocument;
+ @JsonProperty
+ private final TupleMetadata schema;
HttpJsonOptions(HttpJsonOptionsBuilder builder) {
this.allowNanInf = builder.allowNanInf;
@@ -59,6 +62,7 @@
this.enableEscapeAnyChar = builder.enableEscapeAnyChar;
this.skipMalformedRecords = builder.skipMalformedRecords;
this.skipMalformedDocument = builder.skipMalformedDocument;
+ this.schema = builder.schema;
}
public static HttpJsonOptionsBuilder builder() {
@@ -80,12 +84,13 @@
if (enableEscapeAnyChar != null) {
options.enableEscapeAnyChar = enableEscapeAnyChar;
}
- if (skipMalformedDocument != null) {
- options.skipMalformedDocument = skipMalformedDocument;
- }
if (skipMalformedRecords != null) {
options.skipMalformedRecords = skipMalformedRecords;
}
+ if (skipMalformedDocument != null) {
+ options.skipMalformedDocument = skipMalformedDocument;
+ }
+
return options;
}
@@ -119,6 +124,11 @@
return this.skipMalformedDocument;
}
+ @JsonProperty("schema")
+ public TupleMetadata schema() {
+ return this.schema;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -132,12 +142,14 @@
&& Objects.equals(allTextMode, that.allTextMode)
&& Objects.equals(readNumbersAsDouble, that.readNumbersAsDouble)
&& Objects.equals(enableEscapeAnyChar, that.enableEscapeAnyChar)
- && Objects.equals(skipMalformedRecords, that.skipMalformedRecords);
+ && Objects.equals(skipMalformedDocument, that.skipMalformedDocument)
+ && Objects.equals(skipMalformedRecords, that.skipMalformedRecords)
+ && Objects.equals(schema, that.schema);
}
@Override
public int hashCode() {
- return Objects.hash(allowNanInf, allTextMode, readNumbersAsDouble, enableEscapeAnyChar, skipMalformedRecords, skipMalformedDocument);
+ return Objects.hash(allowNanInf, allTextMode, readNumbersAsDouble, enableEscapeAnyChar, skipMalformedDocument, skipMalformedRecords, schema);
}
@Override
@@ -149,6 +161,7 @@
.field("enableEscapeAnyChar", enableEscapeAnyChar)
.field("skipMalformedRecords", skipMalformedRecords)
.field("skipMalformedDocument", skipMalformedDocument)
+ .field("schema", schema)
.toString();
}
@@ -166,6 +179,8 @@
private Boolean skipMalformedDocument;
+ private TupleMetadata schema;
+
public HttpJsonOptionsBuilder allowNanInf(Boolean allowNanInf) {
this.allowNanInf = allowNanInf;
return this;
@@ -196,6 +211,11 @@
return this;
}
+ public HttpJsonOptionsBuilder schema(TupleMetadata schema) {
+ this.schema = schema;
+ return this;
+ }
+
public HttpJsonOptions build() {
return new HttpJsonOptions(this);
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
index 3bdee3f..57797b3 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
@@ -68,6 +68,7 @@
HttpSubScan subScan) {
ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
builder.projection(subScan.columns());
+ builder.providedSchema(subScan.schema());
builder.setUserName(subScan.getUserName());
// Provide custom error context
@@ -102,7 +103,6 @@
paginatorConfig = subScan.tableSpec().connectionConfig().paginator();
if (paginatorConfig != null) {
- // TODO Handle the case of no limit queries in pagination
logger.debug("Creating paginator using config: {}", paginatorConfig);
// Initialize the paginator and generate the base URLs
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
index 181c92e..6787c60 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
@@ -20,6 +20,8 @@
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.oauth.OAuthTokenProvider;
import org.apache.drill.exec.oauth.PersistentTokenTable;
import org.apache.drill.exec.oauth.TokenRegistry;
@@ -27,6 +29,7 @@
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -35,6 +38,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
+import java.util.List;
import java.util.Set;
public class HttpStoragePlugin extends AbstractStoragePlugin {
@@ -82,9 +86,36 @@
}
@Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+ SessionOptionManager options) throws IOException {
+ return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS,
+ options, null);
+ }
+
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+ SessionOptionManager options, MetadataProviderManager metadataProviderManager) throws IOException {
+ return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS,
+ options, metadataProviderManager);
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+ List<SchemaPath> columns) throws IOException {
+ return getPhysicalScan(userName, selection, columns, null, null);
+ }
+
+ @Override
public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+ return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, null);
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options,
+ MetadataProviderManager metadataProviderManager) throws IOException {
HttpScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<HttpScanSpec>() {});
- return new HttpGroupScan(scanSpec);
+ return new HttpGroupScan(scanSpec, metadataProviderManager);
}
@Override
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
index 7e7dc33..3136518 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
@@ -33,6 +33,7 @@
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
@JsonTypeName("http-sub-scan")
public class HttpSubScan extends AbstractBase implements SubScan {
@@ -43,19 +44,22 @@
private final List<SchemaPath> columns;
private final Map<String, String> filters;
private final int maxRecords;
+ private final TupleMetadata schema;
@JsonCreator
public HttpSubScan(
@JsonProperty("tableSpec") HttpScanSpec tableSpec,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("filters") Map<String, String> filters,
- @JsonProperty("maxRecords") int maxRecords
+ @JsonProperty("maxRecords") int maxRecords,
+ @JsonProperty("schema") TupleMetadata schema
) {
super(tableSpec.queryUserName());
this.tableSpec = tableSpec;
this.columns = columns;
this.filters = filters;
this.maxRecords = maxRecords;
+ this.schema = schema;
}
@JsonProperty("tableSpec")
@@ -78,6 +82,11 @@
return maxRecords;
}
+ @JsonProperty("schema")
+ public TupleMetadata schema() {
+ return schema;
+ }
+
@Override
public <T, X, E extends Throwable> T accept(
PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
@@ -86,7 +95,7 @@
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
- return new HttpSubScan(tableSpec, columns, filters, maxRecords);
+ return new HttpSubScan(tableSpec, columns, filters, maxRecords, schema);
}
@Override
@@ -107,12 +116,13 @@
.field("columns", columns)
.field("filters", filters)
.field("maxRecords", maxRecords)
+ .field("schema", schema)
.toString();
}
@Override
public int hashCode() {
- return Objects.hash(tableSpec, columns, filters);
+ return Objects.hash(tableSpec, columns, filters, maxRecords, schema);
}
@Override
@@ -127,6 +137,7 @@
return Objects.equals(tableSpec, other.tableSpec)
&& Objects.equals(columns, other.columns)
&& Objects.equals(filters, other.filters)
- && Objects.equals(maxRecords, other.maxRecords);
+ && Objects.equals(maxRecords, other.maxRecords)
+ && Objects.equals(schema, other.schema);
}
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 0b11030..936008a 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -824,13 +824,6 @@
return inputArguments;
}
-/*
- public static HttpStoragePluginConfig getPluginConfig(String name, DrillbitContext context) throws PluginException {
- HttpStoragePlugin httpStoragePlugin = getStoragePlugin(context, name);
- return httpStoragePlugin.getConfig();
- }
-*/
-
public static HttpApiConfig getEndpointConfig(String endpoint, HttpStoragePluginConfig pluginConfig) {
HttpApiConfig endpointConfig = pluginConfig.getConnection(endpoint);
if (endpointConfig == null) {
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestProvidedSchema.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestProvidedSchema.java
new file mode 100644
index 0000000..e703fb4
--- /dev/null
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestProvidedSchema.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http;
+
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.util.DrillFileUtils;
+
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+
+
+public class TestProvidedSchema extends ClusterTest {
+ private static final int MOCK_SERVER_PORT = 47777;
+
+ private static String TEST_JSON_PAGE1;
+ private static String TEST_SCHEMA_CHANGE1;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ TEST_JSON_PAGE1 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/p1.json"), Charsets.UTF_8).read();
+ TEST_SCHEMA_CHANGE1 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/schema_change_1.json"), Charsets.UTF_8).read();
+
+ dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
+ makeMockConfig(cluster);
+ }
+
+ public static void makeMockConfig(ClusterFixture cluster) {
+
+ TupleMetadata simpleSchema = new SchemaBuilder()
+ .addNullable("col_1", MinorType.FLOAT8)
+ .addNullable("col_2", MinorType.FLOAT8)
+ .addNullable("col_3", MinorType.FLOAT8)
+ .build();
+
+ HttpJsonOptions jsonOptions = new HttpJsonOptions.HttpJsonOptionsBuilder()
+ .schema(simpleSchema)
+ .build();
+
+ HttpApiConfig basicJson = HttpApiConfig.builder()
+ .url("http://localhost:47777/json")
+ .method("get")
+ .jsonOptions(jsonOptions)
+ .requireTail(false)
+ .inputType("json")
+ .build();
+
+ TupleMetadata mapSchema = new SchemaBuilder()
+ .addNullable("field1", MinorType.VARCHAR)
+ .addMap("field2")
+ .addNullable("nested_value1", MinorType.VARCHAR)
+ .addNullable("nested_value2", MinorType.VARCHAR)
+ .resumeSchema()
+ .buildSchema();
+
+ HttpJsonOptions jsonOptionsSchemaChange = new HttpJsonOptions.HttpJsonOptionsBuilder()
+ .schema(mapSchema)
+ .skipMalformedRecords(true)
+ .build();
+
+ HttpApiConfig schemaChange = HttpApiConfig.builder()
+ .url("http://localhost:47777/json")
+ .method("get")
+ .jsonOptions(jsonOptionsSchemaChange)
+ .requireTail(false)
+ .inputType("json")
+ .build();
+
+ TupleMetadata partialMapSchema = new SchemaBuilder()
+ .addNullable("field1", MinorType.VARCHAR)
+ .addMap("field2")
+ .addNullable("nested_value1", MinorType.VARCHAR)
+ .resumeSchema()
+ .buildSchema();
+
+
+ HttpApiConfig partialSchema = HttpApiConfig.builder()
+ .url("http://localhost:47777/json")
+ .method("get")
+ .jsonOptions(HttpJsonOptions.builder().schema(partialMapSchema).build())
+ .requireTail(false)
+ .inputType("json")
+ .build();
+
+ ColumnMetadata jsonColumn = MetadataUtils.newScalar("field2", MinorType.VARCHAR, DataMode.OPTIONAL);
+ jsonColumn.setProperty("drill.json-mode", "json");
+
+ TupleMetadata jsonModeSchema = new SchemaBuilder()
+ .addNullable("field1", MinorType.VARCHAR)
+ .add(jsonColumn)
+ .build();
+
+ HttpJsonOptions jsonModeOptions = HttpJsonOptions.builder()
+ .schema(jsonModeSchema)
+ .skipMalformedRecords(true)
+ .build();
+
+ HttpApiConfig jsonModeConfig = HttpApiConfig.builder()
+ .url("http://localhost:47777/json")
+ .method("get")
+ .jsonOptions(jsonModeOptions)
+ .requireTail(false)
+ .inputType("json")
+ .build();
+
+ HttpApiConfig noSchema = HttpApiConfig.builder()
+ .url("http://localhost:47777/json")
+ .method("get")
+ .requireTail(false)
+ .inputType("json")
+ .build();
+
+ Map<String, HttpApiConfig> configs = new HashMap<>();
+ configs.put("basicJson", basicJson);
+ configs.put("schemaChange", schemaChange);
+ configs.put("partialSchema", partialSchema);
+ configs.put("jsonMode", jsonModeConfig);
+ configs.put("noSchema", noSchema);
+
+ HttpStoragePluginConfig mockStorageConfigWithWorkspace =
+ new HttpStoragePluginConfig(false, configs, 2, "globaluser", "globalpass", "",
+ 80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
+ UsernamePasswordCredentials.USERNAME, "globaluser",
+ UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
+ mockStorageConfigWithWorkspace.setEnabled(true);
+ cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
+ }
+
+ @Test
+ public void testProvidedSchema() throws Exception {
+ String sql = "SELECT * FROM `local`.`basicJson`";
+ try (MockWebServer server = startServer()) {
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1));
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("col_1", MinorType.FLOAT8)
+ .addNullable("col_2", MinorType.FLOAT8)
+ .addNullable("col_3", MinorType.FLOAT8)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1.0, 2.0, 3.0)
+ .addRow(4.0, 5.0, 6.0)
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
+ @Test
+ public void testSchemaChangeWithProvidedSchema() throws Exception {
+ String sql = "SELECT * FROM `local`.`schemaChange`";
+ try (MockWebServer server = startServer()) {
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_SCHEMA_CHANGE1));
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("field1", MinorType.VARCHAR)
+ .addMap("field2")
+ .addNullable("nested_value1", MinorType.VARCHAR)
+ .addNullable("nested_value2", MinorType.VARCHAR)
+ .resumeSchema()
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("value1", strArray(null, null))
+ .addRow("value3", strArray("nv1", "nv2"))
+ .addRow("value5", strArray("nv3", "nv4"))
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
+ @Test
+ public void testPartialSchema() throws Exception {
+ String sql = "SELECT * FROM `local`.`partialSchema`";
+ try (MockWebServer server = startServer()) {
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_SCHEMA_CHANGE1));
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("field1", MinorType.VARCHAR)
+ .addMap("field2")
+ .addNullable("nested_value1", MinorType.VARCHAR)
+ .addNullable("nested_value2", MinorType.VARCHAR)
+ .resumeSchema()
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("value1", strArray(null, null))
+ .addRow("value3", strArray("nv1", "nv2"))
+ .addRow("value5", strArray("nv3", "nv4"))
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
+ @Test
+ public void testInlineSchema() throws Exception {
+ String sql = "SELECT * FROM table(`local`.`noSchema` " +
+ "(schema => 'inline=(`field1` VARCHAR, `field2` VARCHAR properties {`drill.json-mode` = `json`})'" +
+ "))";
+
+ try (MockWebServer server = startServer()) {
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_SCHEMA_CHANGE1));
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("field1", MinorType.VARCHAR)
+ .addNullable("field2", MinorType.VARCHAR)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("value1", "value2")
+ .addRow("value3", "{\"nested_value1\": nv1, \"nested_value2\": nv2}")
+ .addRow("value5", "{\"nested_value1\": nv3, \"nested_value2\": nv4}")
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
+ @Test
+ public void testPartialJSONSchema() throws Exception {
+ String sql = "SELECT * FROM `local`.`partialSchema`";
+ try (MockWebServer server = startServer()) {
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_SCHEMA_CHANGE1));
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("field1", MinorType.VARCHAR)
+ .addMap("field2")
+ .addNullable("nested_value1", MinorType.VARCHAR)
+ .addNullable("nested_value2", MinorType.VARCHAR)
+ .resumeSchema()
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("value1", strArray(null, null))
+ .addRow("value3", strArray("nv1", "nv2"))
+ .addRow("value5", strArray("nv3", "nv4"))
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
+ @Test
+ public void testJsonMode() throws Exception {
+ String sql = "SELECT * FROM `local`.`jsonMode`";
+ try (MockWebServer server = startServer()) {
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_SCHEMA_CHANGE1));
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("field1", MinorType.VARCHAR)
+ .addNullable("field2", MinorType.VARCHAR)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("value1", "value2")
+ .addRow("value3", "{\"nested_value1\": nv1, \"nested_value2\": nv2}")
+ .addRow("value5", "{\"nested_value1\": nv3, \"nested_value2\": nv4}")
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+ /**
+ * Helper function to start the MockHTTPServer
+ * @return Started Mock server
+ * @throws IOException If the server cannot start, throws IOException
+ */
+ public MockWebServer startServer() throws IOException {
+ MockWebServer server = new MockWebServer();
+ server.start(MOCK_SERVER_PORT);
+ return server;
+ }
+}
diff --git a/contrib/storage-http/src/test/resources/data/schema_change_1.json b/contrib/storage-http/src/test/resources/data/schema_change_1.json
new file mode 100644
index 0000000..bf6a571
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/schema_change_1.json
@@ -0,0 +1,16 @@
+{
+ "field1": "value1",
+ "field2": "value2"
+} {
+ "field1": "value3",
+ "field2": {
+ "nested_value1": "nv1",
+ "nested_value2": "nv2"
+ }
+} {
+ "field1": "value5",
+ "field2": {
+ "nested_value1": "nv3",
+ "nested_value2": "nv4"
+ }
+}
diff --git a/contrib/storage-http/src/test/resources/data/schema_change_2.json b/contrib/storage-http/src/test/resources/data/schema_change_2.json
new file mode 100644
index 0000000..1ed2d12
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/schema_change_2.json
@@ -0,0 +1,30 @@
+{
+ "result": {
+ "data": [
+ {
+ "workStopDuration": "00:14:29",
+ "driver": "UnknownDriverId",
+ "id": "b2"
+ },
+ {
+
+ "workDrivingDuration": "00:00:13",
+ "workStopDuration": "00:01:57.0630000",
+ "driver": "UnknownDriverId",
+ "id": "b4"
+ },
+ {
+ "workStopDuration": "00:00:00",
+ "device": {
+ "id": "b4"
+ },
+ "driver": {
+ "id": "b12",
+ "isDriver": true
+ },
+ "id": "bF"
+ }
+ ],
+ },
+ "jsonrpc": "2.0"
+}