DRILL-8204: Allow Provided Schema for HTTP Plugin in JSON Mode (#2526)

* Initial commit

* Map working

* WIP

* Added builder

* Lists in maps working

* Add documentation

* Cleaned up UT

* Final Revision

* Fix checkstyle

* Minor tweak

* removed extra test file

* Removed unused import

* Added inline schema support

* Addressed review comments

* Removed unused import

* Removed json string

* Final Revisions

* Fixed unit test
diff --git a/contrib/storage-http/JSON_Options.md b/contrib/storage-http/JSON_Options.md
new file mode 100644
index 0000000..3579068
--- /dev/null
+++ b/contrib/storage-http/JSON_Options.md
@@ -0,0 +1,79 @@
+# JSON Options and Configuration 
+
+Drill has a collection of JSON configuration options to allow you to configure how Drill interprets JSON files.  These are set at the global level, however the HTTP plugin
+allows you to configure these options individually per connection and override the Drill defaults.  The options are:
+
+* `allowNanInf`:  Configures the connection to interpret `NaN` and `Inf` values
+* `allTextMode`:  By default, Drill attempts to infer data types from JSON data. If the data is malformed, Drill may throw schema change exceptions. If your data is
+  inconsistent, you can enable `allTextMode` which when true, Drill will read all JSON values as strings, rather than try to infer the data type.
+* `readNumbersAsDouble`:  By default Drill will attempt to interpret integers, floating point number types and strings.  One challenge is when data is consistent, Drill may
+  throw schema change exceptions. In addition to `allTextMode`, you can make Drill less sensitive by setting the `readNumbersAsDouble` to `true` which causes Drill to read all
+  numeric fields in JSON data as `double` data type rather than trying to distinguish between ints and doubles.
+* `enableEscapeAnyChar`:  Allows a user to escape any character with a \
+* `skipMalformedRecords`:  Allows Drill to skip malformed records and recover without throwing exceptions.
+* `skipMalformedDocument`:  Allows Drill to skip entire malformed documents without throwing errors.
+
+All of these can be set by adding the `jsonOptions` to your connection configuration as shown below:
+
+```json
+
+"jsonOptions": {
+  "allTextMode": true, 
+  "readNumbersAsDouble": true
+}
+
+```
+
+## Schema Provisioning
+One of the challenges of querying APIs is inconsistent data.  Drill allows you to provide a schema for individual endpoints.  You can do this in one of three ways: 
+
+1. By providing a schema inline [See: Specifying Schema as Table Function Parameter](https://drill.apache.org/docs/plugin-configuration-basics/#specifying-the-schema-as-table-function-parameter)
+2. By providing a schema in the configuration for the endpoint.
+
+The schema provisioning currently supports complex types of Arrays and Maps at any nesting level.
+
+### Example Schema Provisioning:
+```json
+"jsonOptions": {
+  "schema": {
+    "columns":[
+      {
+        "name":"outer_map",
+        "type":"ARRAY<STRUCT<`bigint_col` BIGINT, `boolean_col` BOOLEAN, `date_col` DATE, `double_col` DOUBLE, `interval_col` INTERVAL, `int_col` BIGINT, `timestamp_col` TIMESTAMP, `time_col` TIME, `varchar_col` VARCHAR>>","mode":"REPEATED"
+      }, {
+        "name":"field1",
+        "type":"VARCHAR",
+        "mode":"OPTIONAL"
+      },
+    ]
+  }
+}
+```
+
+## Dealing With Inconsistent Schemas
+One of the major challenges of interacting with JSON data is when the schema is inconsistent.  Drill has a `UNION` data type which is marked as experimental. At the time of
+writing, the HTTP plugin does not support the `UNION`, however supplying a schema can solve a lot of those issues.
+
+### Json Mode
+Drill offers the option of reading all JSON values as a string. While this can complicate downstream analytics, it can also be a more memory-efficient way of reading data with 
+inconsistent schema.  Unfortunately, at the time of writing, JSON-mode is only available with a provided schema.  However, future work will allow this mode to be enabled for 
+any JSON data.
+
+#### Enabling JSON Mode:
+You can enable JSON mode simply by adding the `drill.json-mode` property with a value of `json` to a field, as shown below:
+
+```json
+"jsonOptions": {
+  "readNumbersAsDouble": true,
+  "schema": {
+    "type": "tuple_schema",
+      "columns": [
+        {
+          "name": "custom_fields",
+          "type": "ARRAY<STRUCT<`value` VARCHAR PROPERTIES { 'drill.json-mode' = 'json' }>>",
+          "mode": "REPEATED"
+      }
+    ]
+  }
+}
+```
diff --git a/contrib/storage-http/README.md b/contrib/storage-http/README.md
index 7a808cf..0f188ba 100644
--- a/contrib/storage-http/README.md
+++ b/contrib/storage-http/README.md
@@ -250,29 +250,7 @@
 format plugin. 
 
 #### JSON Configuration
-Drill has a collection of JSON configuration options to allow you to configure how Drill interprets JSON files.  These are set at the global level, however the HTTP plugin 
-allows you to configure these options individually per connection and override the Drill defaults.  The options are:
-
-* `allowNanInf`:  Configures the connection to interpret `NaN` and `Inf` values
-* `allTextMode`:  By default, Drill attempts to infer data types from JSON data. If the data is malformed, Drill may throw schema change exceptions. If your data is 
-  inconsistent, you can enable `allTextMode` which when true, Drill will read all JSON values as strings, rather than try to infer the data type. 
-* `readNumbersAsDouble`:  By default Drill will attempt to interpret integers, floating point number types and strings.  One challenge is when data is consistent, Drill may 
-  throw schema change exceptions. In addition to `allTextMode`, you can make Drill less sensitive by setting the `readNumbersAsDouble` to `true` which causes Drill to read all 
-  numeric fields in JSON data as `double` data type rather than trying to distinguish between ints and doubles.
-* `enableEscapeAnyChar`:  Allows a user to escape any character with a \
-* `skipMalformedRecords`:  Allows Drill to skip malformed records and recover without throwing exceptions.
-* `skipMalformedDocument`:  Allows Drill to skip entire malformed documents without throwing errors.
-
-All of these can be set by adding the `jsonOptions` to your connection configuration as shown below:
-
-```json
-
-"jsonOptions": {
-  "allTextMode": true, 
-  "readNumbersAsDouble": true
-}
-
-```
+[Read the documentation for configuring json options, including schema provisioning.](JSON_Options.md)
 
 #### Authorization
 
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index b99ec57..a6ce240 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -29,7 +29,9 @@
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoader;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
@@ -111,7 +113,6 @@
       .build();
 
     // JSON loader setup
-
     resultSetLoader = negotiator.build();
     if (implicitColumnsAreProjected()) {
       implicitColumns = new ImplicitColumns(resultSetLoader.writer());
@@ -141,6 +142,11 @@
       } else {
         jsonBuilder.standardOptions(negotiator.queryOptions());
       }
+
+      if (getSchema(negotiator) != null) {
+        jsonBuilder.providedSchema(getSchema(negotiator));
+      }
+
       jsonLoader = jsonBuilder.build();
     } catch (Throwable t) {
 
@@ -153,6 +159,35 @@
     return true;
   }
 
+  /**
+   * This function obtains the correct schema for the {@link JsonLoader}.  There are four possibilities:
+   * 1.  The schema is provided in the configuration only.  In this case, that schema will be returned.
+   * 2.  The schema is provided in both the configuration and inline.  These two schemas will be merged together.
+   * 3.  The schema is provided inline in a query.  In this case, that schema will be returned.
+   * 4.  No schema is provided.  Function returns null.
+   * @param negotiator {@link SchemaNegotiator} The schema negotiator with all the connection information
+   * @return The built {@link TupleMetadata} of the provided schema, null if none provided.
+   */
+  private TupleMetadata getSchema(SchemaNegotiator negotiator) {
+    if (subScan.tableSpec().connectionConfig().jsonOptions() != null &&
+      subScan.tableSpec().connectionConfig().jsonOptions().schema() != null) {
+      TupleMetadata configuredSchema = subScan.tableSpec().connectionConfig().jsonOptions().schema();
+
+      // If it has a provided schema both inline and in the config, merge the two, otherwise, return the config schema
+      if (negotiator.hasProvidedSchema()) {
+        TupleMetadata inlineSchema = negotiator.providedSchema();
+        return FixedReceiver.Builder.mergeSchemas(configuredSchema, inlineSchema);
+      } else {
+        return configuredSchema;
+      }
+    } else {
+       if (negotiator.hasProvidedSchema()) {
+         return negotiator.providedSchema();
+       }
+    }
+    return null;
+  }
+
   protected void buildImplicitColumns() {
     // Add String fields
     for (String fieldName : STRING_METADATA_FIELDS) {
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
index 5eab823..3da54d0 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.http;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -28,6 +29,7 @@
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.expression.SchemaPath;
 
+import org.apache.drill.exec.metastore.MetadataProviderManager;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -35,8 +37,11 @@
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.http.util.SimpleHttp;
 import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.metastore.metadata.TableMetadata;
+import org.apache.drill.metastore.metadata.TableMetadataProvider;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 
@@ -53,11 +58,12 @@
 
   // Used only in planner, not serialized
   private int hashCode;
+  private MetadataProviderManager metadataProviderManager;
 
   /**
    * Creates a new group scan from the storage plugin.
    */
-  public HttpGroupScan (HttpScanSpec scanSpec) {
+  public HttpGroupScan (HttpScanSpec scanSpec, MetadataProviderManager metadataProviderManager) {
     super(scanSpec.queryUserName());
     this.httpScanSpec = scanSpec;
     this.username = scanSpec.queryUserName();
@@ -66,6 +72,7 @@
     this.filterSelectivity = 0.0;
     this.scanStats = computeScanStats();
     this.maxRecords = -1;
+    this.metadataProviderManager = metadataProviderManager;
   }
 
   /**
@@ -79,6 +86,7 @@
     this.filterSelectivity = that.filterSelectivity;
     this.maxRecords = that.maxRecords;
     this.username = that.username;
+    this.metadataProviderManager = that.metadataProviderManager;
 
     // Calcite makes many copies in the later stage of planning
     // without changing anything. Retain the previous stats.
@@ -98,6 +106,7 @@
     // to again assign columns. Retain filters, but compute new stats.
     this.filters = that.filters;
     this.filterSelectivity = that.filterSelectivity;
+    this.metadataProviderManager = that.metadataProviderManager;
     this.scanStats = computeScanStats();
     this.username = that.username;
     this.maxRecords = that.maxRecords;
@@ -118,6 +127,7 @@
     this.filterSelectivity = filterSelectivity;
     this.scanStats = computeScanStats();
     this.maxRecords = that.maxRecords;
+    this.metadataProviderManager = that.metadataProviderManager;
   }
 
   /**
@@ -134,6 +144,7 @@
     this.filterSelectivity = that.filterSelectivity;
     this.scanStats = computeScanStats();
     this.maxRecords = maxRecords;
+    this.metadataProviderManager = that.metadataProviderManager;
   }
 
 
@@ -192,7 +203,7 @@
 
   @Override
   public SubScan getSpecificScan(int minorFragmentId) {
-    return new HttpSubScan(httpScanSpec, columns, filters, maxRecords);
+    return new HttpSubScan(httpScanSpec, columns, filters, maxRecords, getSchema());
   }
 
   @Override
@@ -216,6 +227,33 @@
     return new HttpGroupScan(this);
   }
 
+  public TupleMetadata getSchema() {
+    if (metadataProviderManager == null) {
+      return null;
+    }
+    try {
+      return metadataProviderManager.getSchemaProvider().read().getSchema();
+    } catch (IOException | NullPointerException e) {
+      return null;
+    }
+  }
+
+  @Override
+  public TableMetadata getTableMetadata() {
+    if (getMetadataProvider() == null) {
+      return null;
+    }
+    return getMetadataProvider().getTableMetadata();
+  }
+
+  @Override
+  public TableMetadataProvider getMetadataProvider() {
+    if (metadataProviderManager == null) {
+      return null;
+    }
+    return metadataProviderManager.getTableMetadataProvider();
+  }
+
   @Override
   public ScanStats getScanStats() {
 
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpJsonOptions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpJsonOptions.java
index edd77f9..c8274e1 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpJsonOptions.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpJsonOptions.java
@@ -24,6 +24,7 @@
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
 import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
 
@@ -51,6 +52,8 @@
   @JsonProperty
   private final Boolean skipMalformedDocument;
 
+  @JsonProperty
+  private final TupleMetadata schema;
 
   HttpJsonOptions(HttpJsonOptionsBuilder builder) {
     this.allowNanInf = builder.allowNanInf;
@@ -59,6 +62,7 @@
     this.enableEscapeAnyChar = builder.enableEscapeAnyChar;
     this.skipMalformedRecords = builder.skipMalformedRecords;
     this.skipMalformedDocument = builder.skipMalformedDocument;
+    this.schema = builder.schema;
   }
 
   public static HttpJsonOptionsBuilder builder() {
@@ -80,12 +84,13 @@
     if (enableEscapeAnyChar != null) {
       options.enableEscapeAnyChar = enableEscapeAnyChar;
     }
-    if (skipMalformedDocument != null) {
-      options.skipMalformedDocument = skipMalformedDocument;
-    }
     if (skipMalformedRecords != null) {
       options.skipMalformedRecords = skipMalformedRecords;
     }
+    if (skipMalformedDocument != null) {
+      options.skipMalformedDocument = skipMalformedDocument;
+    }
+
     return options;
   }
 
@@ -119,6 +124,11 @@
     return this.skipMalformedDocument;
   }
 
+  @JsonProperty("schema")
+  public TupleMetadata schema() {
+    return this.schema;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -132,12 +142,14 @@
       && Objects.equals(allTextMode, that.allTextMode)
       && Objects.equals(readNumbersAsDouble, that.readNumbersAsDouble)
       && Objects.equals(enableEscapeAnyChar, that.enableEscapeAnyChar)
-      && Objects.equals(skipMalformedRecords, that.skipMalformedRecords);
+      && Objects.equals(skipMalformedDocument, that.skipMalformedDocument)
+      && Objects.equals(skipMalformedRecords, that.skipMalformedRecords)
+      && Objects.equals(schema, that.schema);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(allowNanInf, allTextMode, readNumbersAsDouble, enableEscapeAnyChar, skipMalformedRecords, skipMalformedDocument);
+    return Objects.hash(allowNanInf, allTextMode, readNumbersAsDouble, enableEscapeAnyChar, skipMalformedDocument, skipMalformedRecords, schema);
   }
 
   @Override
@@ -149,6 +161,7 @@
       .field("enableEscapeAnyChar", enableEscapeAnyChar)
       .field("skipMalformedRecords", skipMalformedRecords)
       .field("skipMalformedDocument", skipMalformedDocument)
+      .field("schema", schema)
       .toString();
   }
 
@@ -166,6 +179,8 @@
 
     private Boolean skipMalformedDocument;
 
+    private TupleMetadata schema;
+
     public HttpJsonOptionsBuilder allowNanInf(Boolean allowNanInf) {
       this.allowNanInf = allowNanInf;
       return this;
@@ -196,6 +211,11 @@
       return this;
     }
 
+    public HttpJsonOptionsBuilder schema(TupleMetadata schema) {
+      this.schema = schema;
+      return this;
+    }
+
     public HttpJsonOptions build() {
       return new HttpJsonOptions(this);
     }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
index 3bdee3f..57797b3 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
@@ -68,6 +68,7 @@
       HttpSubScan subScan) {
     ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
     builder.projection(subScan.columns());
+    builder.providedSchema(subScan.schema());
     builder.setUserName(subScan.getUserName());
 
     // Provide custom error context
@@ -102,7 +103,6 @@
 
       paginatorConfig = subScan.tableSpec().connectionConfig().paginator();
       if (paginatorConfig != null) {
-        // TODO Handle the case of no limit queries in pagination
         logger.debug("Creating paginator using config: {}", paginatorConfig);
 
         // Initialize the paginator and generate the base URLs
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
index 181c92e..6787c60 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePlugin.java
@@ -20,6 +20,8 @@
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
 import org.apache.drill.exec.oauth.OAuthTokenProvider;
 import org.apache.drill.exec.oauth.PersistentTokenTable;
 import org.apache.drill.exec.oauth.TokenRegistry;
@@ -27,6 +29,7 @@
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.planner.PlannerPhase;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -35,6 +38,7 @@
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.IOException;
+import java.util.List;
 import java.util.Set;
 
 public class HttpStoragePlugin extends AbstractStoragePlugin {
@@ -82,9 +86,36 @@
   }
 
   @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+                                           SessionOptionManager options) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS,
+      options, null);
+  }
+
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+                                           SessionOptionManager options, MetadataProviderManager metadataProviderManager) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS,
+      options, metadataProviderManager);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+                                           List<SchemaPath> columns) throws IOException {
+    return getPhysicalScan(userName, selection, columns, null, null);
+  }
+
+  @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, null);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options,
+                                           MetadataProviderManager metadataProviderManager) throws IOException {
     HttpScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<HttpScanSpec>() {});
-    return new HttpGroupScan(scanSpec);
+    return new HttpGroupScan(scanSpec, metadataProviderManager);
   }
 
   @Override
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
index 7e7dc33..3136518 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
@@ -33,6 +33,7 @@
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 
 @JsonTypeName("http-sub-scan")
 public class HttpSubScan extends AbstractBase implements SubScan {
@@ -43,19 +44,22 @@
   private final List<SchemaPath> columns;
   private final Map<String, String> filters;
   private final int maxRecords;
+  private final TupleMetadata schema;
 
   @JsonCreator
   public HttpSubScan(
     @JsonProperty("tableSpec") HttpScanSpec tableSpec,
     @JsonProperty("columns") List<SchemaPath> columns,
     @JsonProperty("filters") Map<String, String> filters,
-    @JsonProperty("maxRecords") int maxRecords
+    @JsonProperty("maxRecords") int maxRecords,
+    @JsonProperty("schema") TupleMetadata schema
     ) {
     super(tableSpec.queryUserName());
     this.tableSpec = tableSpec;
     this.columns = columns;
     this.filters = filters;
     this.maxRecords = maxRecords;
+    this.schema = schema;
   }
 
   @JsonProperty("tableSpec")
@@ -78,6 +82,11 @@
     return maxRecords;
   }
 
+  @JsonProperty("schema")
+  public TupleMetadata schema() {
+    return schema;
+  }
+
  @Override
   public <T, X, E extends Throwable> T accept(
    PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
@@ -86,7 +95,7 @@
 
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    return new HttpSubScan(tableSpec, columns, filters, maxRecords);
+    return new HttpSubScan(tableSpec, columns, filters, maxRecords, schema);
   }
 
   @Override
@@ -107,12 +116,13 @@
       .field("columns", columns)
       .field("filters", filters)
       .field("maxRecords", maxRecords)
+      .field("schema", schema)
       .toString();
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(tableSpec, columns, filters);
+    return Objects.hash(tableSpec, columns, filters, maxRecords, schema);
   }
 
   @Override
@@ -127,6 +137,7 @@
     return Objects.equals(tableSpec, other.tableSpec)
       && Objects.equals(columns, other.columns)
       && Objects.equals(filters, other.filters)
-      && Objects.equals(maxRecords, other.maxRecords);
+      && Objects.equals(maxRecords, other.maxRecords)
+      && Objects.equals(schema, other.schema);
   }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 0b11030..936008a 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -824,13 +824,6 @@
     return inputArguments;
   }
 
-/*
-  public static HttpStoragePluginConfig getPluginConfig(String name, DrillbitContext context) throws PluginException {
-    HttpStoragePlugin httpStoragePlugin = getStoragePlugin(context, name);
-    return httpStoragePlugin.getConfig();
-  }
-*/
-
   public static HttpApiConfig getEndpointConfig(String endpoint, HttpStoragePluginConfig pluginConfig) {
     HttpApiConfig endpointConfig = pluginConfig.getConnection(endpoint);
     if (endpointConfig == null) {
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestProvidedSchema.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestProvidedSchema.java
new file mode 100644
index 0000000..e703fb4
--- /dev/null
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestProvidedSchema.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http;
+
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.util.DrillFileUtils;
+
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+
+
+public class TestProvidedSchema extends ClusterTest {
+  private static final int MOCK_SERVER_PORT = 47777;
+
+  private static String TEST_JSON_PAGE1;
+  private static String TEST_SCHEMA_CHANGE1;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    TEST_JSON_PAGE1 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/p1.json"), Charsets.UTF_8).read();
+    TEST_SCHEMA_CHANGE1 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/schema_change_1.json"), Charsets.UTF_8).read();
+
+    dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
+    makeMockConfig(cluster);
+  }
+
+  public static void makeMockConfig(ClusterFixture cluster) {
+
+    TupleMetadata simpleSchema = new SchemaBuilder()
+      .addNullable("col_1", MinorType.FLOAT8)
+      .addNullable("col_2", MinorType.FLOAT8)
+      .addNullable("col_3", MinorType.FLOAT8)
+      .build();
+
+    HttpJsonOptions jsonOptions = new HttpJsonOptions.HttpJsonOptionsBuilder()
+      .schema(simpleSchema)
+      .build();
+
+    HttpApiConfig basicJson = HttpApiConfig.builder()
+      .url("http://localhost:47777/json")
+      .method("get")
+      .jsonOptions(jsonOptions)
+      .requireTail(false)
+      .inputType("json")
+      .build();
+
+    TupleMetadata mapSchema = new SchemaBuilder()
+      .addNullable("field1", MinorType.VARCHAR)
+      .addMap("field2")
+      .addNullable("nested_value1", MinorType.VARCHAR)
+      .addNullable("nested_value2", MinorType.VARCHAR)
+      .resumeSchema()
+      .buildSchema();
+
+    HttpJsonOptions jsonOptionsSchemaChange = new HttpJsonOptions.HttpJsonOptionsBuilder()
+      .schema(mapSchema)
+      .skipMalformedRecords(true)
+      .build();
+
+    HttpApiConfig schemaChange = HttpApiConfig.builder()
+      .url("http://localhost:47777/json")
+      .method("get")
+      .jsonOptions(jsonOptionsSchemaChange)
+      .requireTail(false)
+      .inputType("json")
+      .build();
+
+    TupleMetadata partialMapSchema = new SchemaBuilder()
+      .addNullable("field1", MinorType.VARCHAR)
+      .addMap("field2")
+      .addNullable("nested_value1", MinorType.VARCHAR)
+      .resumeSchema()
+      .buildSchema();
+
+
+    HttpApiConfig partialSchema = HttpApiConfig.builder()
+      .url("http://localhost:47777/json")
+      .method("get")
+      .jsonOptions(HttpJsonOptions.builder().schema(partialMapSchema).build())
+      .requireTail(false)
+      .inputType("json")
+      .build();
+
+    ColumnMetadata jsonColumn = MetadataUtils.newScalar("field2", MinorType.VARCHAR, DataMode.OPTIONAL);
+    jsonColumn.setProperty("drill.json-mode", "json");
+
+    TupleMetadata jsonModeSchema = new SchemaBuilder()
+      .addNullable("field1", MinorType.VARCHAR)
+      .add(jsonColumn)
+      .build();
+
+    HttpJsonOptions jsonModeOptions = HttpJsonOptions.builder()
+      .schema(jsonModeSchema)
+      .skipMalformedRecords(true)
+      .build();
+
+    HttpApiConfig jsonModeConfig = HttpApiConfig.builder()
+      .url("http://localhost:47777/json")
+      .method("get")
+      .jsonOptions(jsonModeOptions)
+      .requireTail(false)
+      .inputType("json")
+      .build();
+
+    HttpApiConfig noSchema = HttpApiConfig.builder()
+      .url("http://localhost:47777/json")
+      .method("get")
+      .requireTail(false)
+      .inputType("json")
+      .build();
+
+    Map<String, HttpApiConfig> configs = new HashMap<>();
+    configs.put("basicJson", basicJson);
+    configs.put("schemaChange", schemaChange);
+    configs.put("partialSchema", partialSchema);
+    configs.put("jsonMode", jsonModeConfig);
+    configs.put("noSchema", noSchema);
+
+    HttpStoragePluginConfig mockStorageConfigWithWorkspace =
+      new HttpStoragePluginConfig(false, configs, 2, "globaluser", "globalpass", "",
+        80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
+        UsernamePasswordCredentials.USERNAME, "globaluser",
+        UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
+    mockStorageConfigWithWorkspace.setEnabled(true);
+    cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
+  }
+
+  @Test
+  public void testProvidedSchema() throws Exception {
+    String sql = "SELECT * FROM `local`.`basicJson`";
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1));
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("col_1", MinorType.FLOAT8)
+        .addNullable("col_2", MinorType.FLOAT8)
+        .addNullable("col_3", MinorType.FLOAT8)
+        .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow(1.0, 2.0, 3.0)
+        .addRow(4.0, 5.0, 6.0)
+        .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
+  public void testSchemaChangeWithProvidedSchema() throws Exception {
+    String sql = "SELECT * FROM `local`.`schemaChange`";
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_SCHEMA_CHANGE1));
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("field1", MinorType.VARCHAR)
+        .addMap("field2")
+          .addNullable("nested_value1", MinorType.VARCHAR)
+          .addNullable("nested_value2", MinorType.VARCHAR)
+        .resumeSchema()
+        .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("value1", strArray(null, null))
+        .addRow("value3", strArray("nv1", "nv2"))
+        .addRow("value5", strArray("nv3", "nv4"))
+        .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
+  public void testPartialSchema() throws Exception {
+    String sql = "SELECT * FROM `local`.`partialSchema`";
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_SCHEMA_CHANGE1));
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("field1", MinorType.VARCHAR)
+        .addMap("field2")
+        .addNullable("nested_value1", MinorType.VARCHAR)
+        .addNullable("nested_value2", MinorType.VARCHAR)
+        .resumeSchema()
+        .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("value1", strArray(null, null))
+        .addRow("value3", strArray("nv1", "nv2"))
+        .addRow("value5", strArray("nv3", "nv4"))
+        .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
+  public void testInlineSchema() throws Exception {
+    String sql = "SELECT * FROM table(`local`.`noSchema` " +
+      "(schema => 'inline=(`field1` VARCHAR, `field2` VARCHAR properties {`drill.json-mode` = `json`})'" +
+      "))";
+
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_SCHEMA_CHANGE1));
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("field1", MinorType.VARCHAR)
+        .addNullable("field2", MinorType.VARCHAR)
+        .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("value1", "value2")
+        .addRow("value3", "{\"nested_value1\": nv1, \"nested_value2\": nv2}")
+        .addRow("value5", "{\"nested_value1\": nv3, \"nested_value2\": nv4}")
+        .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
+  public void testPartialJSONSchema() throws Exception {
+    String sql = "SELECT * FROM `local`.`partialSchema`";
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_SCHEMA_CHANGE1));
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("field1", MinorType.VARCHAR)
+        .addMap("field2")
+        .addNullable("nested_value1", MinorType.VARCHAR)
+        .addNullable("nested_value2", MinorType.VARCHAR)
+        .resumeSchema()
+        .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("value1", strArray(null, null))
+        .addRow("value3", strArray("nv1", "nv2"))
+        .addRow("value5", strArray("nv3", "nv4"))
+        .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
+  public void testJsonMode() throws Exception {
+    String sql = "SELECT * FROM `local`.`jsonMode`";
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_SCHEMA_CHANGE1));
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("field1", MinorType.VARCHAR)
+        .addNullable("field2", MinorType.VARCHAR)
+        .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("value1", "value2")
+        .addRow("value3", "{\"nested_value1\": nv1, \"nested_value2\": nv2}")
+        .addRow("value5", "{\"nested_value1\": nv3, \"nested_value2\": nv4}")
+        .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+  /**
+   * Helper function to start the MockHTTPServer
+   * @return Started Mock server
+   * @throws IOException If the server cannot start, throws IOException
+   */
+  public MockWebServer startServer() throws IOException {
+    MockWebServer server = new MockWebServer();
+    server.start(MOCK_SERVER_PORT);
+    return server;
+  }
+}
diff --git a/contrib/storage-http/src/test/resources/data/schema_change_1.json b/contrib/storage-http/src/test/resources/data/schema_change_1.json
new file mode 100644
index 0000000..bf6a571
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/schema_change_1.json
@@ -0,0 +1,16 @@
+{
+  "field1": "value1",
+  "field2": "value2"
+} {
+  "field1": "value3",
+  "field2": {
+    "nested_value1": "nv1",
+    "nested_value2": "nv2"
+  }
+} {
+  "field1": "value5",
+  "field2": {
+    "nested_value1": "nv3",
+    "nested_value2": "nv4"
+  }
+}
diff --git a/contrib/storage-http/src/test/resources/data/schema_change_2.json b/contrib/storage-http/src/test/resources/data/schema_change_2.json
new file mode 100644
index 0000000..1ed2d12
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/schema_change_2.json
@@ -0,0 +1,30 @@
+{
+  "result": {
+    "data": [
+      {
+        "workStopDuration": "00:14:29",
+        "driver": "UnknownDriverId",
+        "id": "b2"
+      },
+      {
+
+        "workDrivingDuration": "00:00:13",
+        "workStopDuration": "00:01:57.0630000",
+        "driver": "UnknownDriverId",
+        "id": "b4"
+      },
+      {
+        "workStopDuration": "00:00:00",
+        "device": {
+          "id": "b4"
+        },
+        "driver": {
+          "id": "b12",
+          "isDriver": true
+        },
+        "id": "bF"
+      }
+    ],
+  },
+  "jsonrpc": "2.0"
+}