DRILL-8393: Allow parameters to be passed to headers through SQL in WHERE clause (#2747)

diff --git a/contrib/storage-http/README.md b/contrib/storage-http/README.md
index 7d0ce10..a19017e 100644
--- a/contrib/storage-http/README.md
+++ b/contrib/storage-http/README.md
@@ -15,6 +15,7 @@
 {
   "type": "http",
   "cacheResults": true,
+  "enhanced": false,
   "connections": {},
   "timeout": 0,
   "proxyHost": null,
@@ -29,6 +30,7 @@
 
 * `type`:  This should be `http`
 * `cacheResults`:  Enable caching of the HTTP responses.  Defaults to `false`
+* `enhanced`:  Enable enhanced param syntax in Drill 1.22 and beyond. The enhanced mode allows the user to send the request header through the `WHERE` clause. Defaults to `false`
 * `timeout`:  Sets the response timeout in seconds. Defaults to `0` which is no timeout.
 * `connections`:  This field contains the details for individual connections. See the section *Configuring API Connections for Details*.
 
@@ -112,17 +114,18 @@
 ```json
 url: "https://api.sunrise-sunset.org/json",
 requireTail: false,
-params: ["lat", "lng", "date"]
+enhanced: true,
+params: ["tail.lat", "tail.lng", "tail.date"]
 ```
 
 SQL query:
 
 ```sql
 SELECT * FROM api.sunrise
-WHERE `lat` = 36.7201600 AND `lng` = -4.4203400 AND `date` = '2019-10-02'
+WHERE `tail.lat` = 36.7201600 AND `tail.lng` = -4.4203400 AND `tail.date` = '2019-10-02'
 ```
 
-In this case, Drill appends the parameters to the URL, adding a question mark
+In this case, Drill appends the `tail` prefixed parameters to the URL, adding a question mark
 to separate the two.
 
 #### Method
@@ -138,11 +141,28 @@
 ```
 
 `postParameterLocation`:  If the API uses the `POST` method, you can send parameters in several different ways:
-* `query_string`:  Parameters from the query are pushed down to the query string.  Static parameters are pushed to the post body.
+* `query_string`:  Parameters from the query are pushed down to the query string.  Static parameters are pushed to the post body. This option is abandoned in enhanced mode.
 * `post_body`:  Both static and parameters from the query are pushed to the post body as key/value pairs
 * `json_body`:  Both static and parameters from the query are pushed to the post body as json.
 * `xml_body`:  Both static and parameters from the query are pushed to the post body as XML.
 
+```json
+url: "https://api.sunrise-sunset.org/json",
+requireTail: false,
+enhanced: true,
+postParameterLocation: "json_body",
+params: ["body.lat", "body.lng", "body.date"]
+```
+
+SQL query:
+
+```sql
+SELECT * FROM api.sunrise
+WHERE `body.lat` = 36.7201600 AND `body.lng` = -4.4203400 AND `body.date` = '2019-10-02'
+```
+
+In this case, Drill appends the `body` prefixed parameters to the post body as json.
+
 #### Headers
 
 `headers`: Often APIs will require custom headers as part of the authentication. This field allows
@@ -155,20 +175,43 @@
    }
 ```
 
-#### Query Parmeters as Filters
-
-* `params`: Allows you to map SQL `WHERE` clause conditions to query parameters.
+You can also pass the request headers through the where clause.
 
 ```json
 url: "https://api.sunrise-sunset.org/json",
-params: ["lat", "lng", "date"]
+requireTail: false,
+enhanced: true,
+postParameterLocation: "json_body",
+params: ["body.lat", "body.lng", "body.date", "header.header1"]
 ```
 
 SQL query:
 
 ```sql
 SELECT * FROM api.sunrise
-WHERE `lat` = 36.7201600 AND `lng` = -4.4203400 AND `date` = '2019-10-02'
+WHERE `body.lat` = 36.7201600 AND `body.lng` = -4.4203400 AND `body.date` = '2019-10-02'
+AND `header.header1` = 'value1'
+```
+
+In this case, Drill appends the `header` prefixed parameters to the headers.
+
+#### Query Parmeters as Filters
+
+* `params`: Allows you to map SQL `WHERE` clause conditions to query parameters, request bodies, and
+request headers. The `tail` prefixed params maps to the query parameters at the end of the URL, the
+`body` prefixed params maps to the request bodies, and the `header` prefixed params maps to the
+request headers.
+
+```json
+url: "https://api.sunrise-sunset.org/json",
+params: ["tail.lat", "tail.lng", "tail.date"]
+```
+
+SQL query:
+
+```sql
+SELECT * FROM api.sunrise
+WHERE `tail.lat` = 36.7201600 AND `tail.lng` = -4.4203400 AND `tail.date` = '2019-10-02'
 ```
 
 HTTP parameters are untyped; Drill converts any value you provide into a string.
@@ -493,7 +536,7 @@
       "method": "GET",
       "dataPath": "results",
       "headers": null,
-      "params": [ "lat", "lng", "date" ],
+      "params": [ "tail.lat", "tail.lng", "tail.date" ],
       "authType": "none",
       "userName": null,
       "password": null,
@@ -508,7 +551,7 @@
 ```sql
 SELECT sunrise, sunset
 FROM   http.sunrise
-WHERE  `lat` = 36.7201600 AND `lng` = -4.4203400 AND `date` = 'today'
+WHERE  `tail.lat` = 36.7201600 AND `tail.lng` = -4.4203400 AND `tail.date` = 'today'
 ```
 
 Which yields the same results as before.
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
index 32efdbf..1d8f49b 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
@@ -49,8 +49,9 @@
   protected static final String CSV_INPUT_FORMAT = "csv";
   protected static final String XML_INPUT_FORMAT = "xml";
 
-  public static final String POST_BODY_POST_LOCATION = "post_body";
   public static final String QUERY_STRING_POST_LOCATION = "query_string";
+  public static final String POST_BODY_POST_LOCATION = "post_body";
+  public static final String XML_BODY_POST_LOCATION = "xml_body";
   public static final String JSON_BODY_POST_LOCATION = "json_body";
 
   @JsonProperty
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index 85d8073..df7dff5 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -56,6 +56,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.regex.Pattern;
 
 public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
 
@@ -296,18 +297,30 @@
   protected void addFilters(Builder urlBuilder, List<String> params,
       Map<String, String> filters) {
 
-    // If the request is a POST query and the user selected to push the filters to either JSON body
-    // or the post body, do not add to the query string.
-    if (subScan.tableSpec().connectionConfig().getMethodType() == HttpApiConfig.HttpMethod.POST &&
-      (subScan.tableSpec().connectionConfig().getPostLocation() == PostLocation.POST_BODY ||
-        subScan.tableSpec().connectionConfig().getPostLocation() == PostLocation.JSON_BODY)
-    ) {
-      return;
-    }
-    for (String param : params) {
-      String value = filters.get(param);
-      if (value != null) {
-        urlBuilder.addQueryParameter(param, value);
+    final Pattern tailParamsKeyPattern = Pattern.compile("^tail\\..+$");
+    final HttpStoragePluginConfig config = subScan.tableSpec().config();
+    if (!config.enableEnhancedParamSyntax()) {
+      // If the request is a POST query and the user selected to push the filters to either JSON body
+      // or the post body, do not add to the query string.
+      if (subScan.tableSpec().connectionConfig().getMethodType() == HttpApiConfig.HttpMethod.GET ||
+          (subScan.tableSpec().connectionConfig().getMethodType() == HttpApiConfig.HttpMethod.POST
+              && subScan.tableSpec().connectionConfig().getPostLocation() == PostLocation.QUERY_STRING)
+      ) {
+        for (String param : params) {
+          String value = filters.get(param);
+          if (value != null) {
+            urlBuilder.addQueryParameter(param, value);
+          }
+        }
+      }
+    } else {
+      for (String param : params) {
+        if (tailParamsKeyPattern.matcher(param).find()){
+          String value = filters.get(param);
+          if (value != null) {
+            urlBuilder.addQueryParameter(param.substring(5), value);
+          }
+        }
       }
     }
   }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
index 5c38430..7262295 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
@@ -54,6 +54,7 @@
   public final String proxyHost;
   public final int proxyPort;
   public final String proxyType;
+  public final boolean enableEnhancedParamSyntax;
   /**
    * Timeout in {@link TimeUnit#SECONDS}.
    */
@@ -62,6 +63,7 @@
 
   @JsonCreator
   public HttpStoragePluginConfig(@JsonProperty("cacheResults") Boolean cacheResults,
+                                 @JsonProperty("enhanced") Boolean enableEnhancedParamSyntax,
                                  @JsonProperty("connections") Map<String, HttpApiConfig> connections,
                                  @JsonProperty("timeout") Integer timeout,
                                  @JsonProperty("retryDelay") Integer retryDelay,
@@ -89,6 +91,7 @@
         AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER),
       oAuthConfig);
     this.cacheResults = cacheResults != null && cacheResults;
+    this.enableEnhancedParamSyntax = enableEnhancedParamSyntax != null && enableEnhancedParamSyntax;
     this.retryDelay = (retryDelay == null || retryDelay < 0) ? DEFAULT_RATE_LIMIT : retryDelay;
 
     this.connections = CaseInsensitiveMap.newHashMap();
@@ -121,6 +124,7 @@
   private HttpStoragePluginConfig(HttpStoragePluginConfig that, CredentialsProvider credentialsProvider) {
     super(credentialsProvider, credentialsProvider == null, that.authMode);
     this.cacheResults = that.cacheResults;
+    this.enableEnhancedParamSyntax = that.enableEnhancedParamSyntax;
     this.connections = that.connections;
     this.timeout = that.timeout;
     this.proxyHost = that.proxyHost;
@@ -140,6 +144,7 @@
       that.credentialsProvider == null);
 
     this.cacheResults = that.cacheResults;
+    this.enableEnhancedParamSyntax = that.enableEnhancedParamSyntax;
     this.connections = that.connections;
     this.timeout = that.timeout;
     this.proxyHost = that.proxyHost;
@@ -165,6 +170,7 @@
     Optional<UsernamePasswordWithProxyCredentials> creds = getUsernamePasswordCredentials();
     return new HttpStoragePluginConfig(
       cacheResults,
+      enableEnhancedParamSyntax,
       configFor(connectionName),
       timeout, retryDelay,
       username(),
@@ -196,6 +202,7 @@
     HttpStoragePluginConfig thatConfig = (HttpStoragePluginConfig) that;
     return Objects.equals(connections, thatConfig.connections) &&
       Objects.equals(cacheResults, thatConfig.cacheResults) &&
+      Objects.equals(enableEnhancedParamSyntax, thatConfig.enableEnhancedParamSyntax) &&
       Objects.equals(proxyHost, thatConfig.proxyHost) &&
       Objects.equals(retryDelay, thatConfig.retryDelay) &&
       Objects.equals(proxyPort, thatConfig.proxyPort) &&
@@ -210,6 +217,7 @@
     return new PlanStringBuilder(this)
       .field("connections", connections)
       .field("cacheResults", cacheResults)
+      .field("enhanced", enableEnhancedParamSyntax)
       .field("timeout", timeout)
       .field("retryDelay", retryDelay)
       .field("proxyHost", proxyHost)
@@ -223,13 +231,16 @@
 
   @Override
   public int hashCode() {
-    return Objects.hash(connections, cacheResults, timeout, retryDelay,
+    return Objects.hash(connections, cacheResults, enableEnhancedParamSyntax, timeout, retryDelay,
         proxyHost, proxyPort, proxyType, oAuthConfig, credentialsProvider, authMode);
   }
 
   @JsonProperty("cacheResults")
   public boolean cacheResults() { return cacheResults; }
 
+  @JsonProperty("enhanced")
+  public boolean enableEnhancedParamSyntax() { return enableEnhancedParamSyntax; }
+
   @JsonProperty("connections")
   public Map<String, HttpApiConfig> connections() { return connections; }
 
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 6ba9918..d0a78e3 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -344,6 +344,8 @@
 
     Request.Builder requestBuilder = new Request.Builder()
       .url(url);
+    final Pattern bodyParamsKeyPattern = Pattern.compile("^body\\..+$");
+    final Pattern headerParamsKeyPattern = Pattern.compile("^header\\..+$");
 
     // The configuration does not allow for any other request types other than POST and GET.
     if (apiConfig.getMethodType() == HttpMethod.POST) {
@@ -354,11 +356,19 @@
       if (apiConfig.getPostLocation() == PostLocation.POST_BODY) {
         formBodyBuilder = buildPostBody(filters, apiConfig.postBody());
         requestBuilder.post(formBodyBuilder.build());
-      } else if (apiConfig.getPostLocation() == PostLocation.JSON_BODY) {
+      } else if (apiConfig.getPostLocation() == PostLocation.JSON_BODY
+          || (apiConfig.getPostLocation() == PostLocation.QUERY_STRING
+          && pluginConfig.enableEnhancedParamSyntax())) {
         // Add static parameters from postBody
         JSONObject json = buildJsonPostBody(apiConfig.postBody());
         // Now add filters
-        if (filters != null) {
+        if (filters != null && pluginConfig.enableEnhancedParamSyntax()) {
+          for (Map.Entry<String, String> filter : filters.entrySet()) {
+            if (bodyParamsKeyPattern.matcher(filter.getKey()).find()){
+              json.put(filter.getKey().substring(5), filter.getValue());
+            }
+          }
+        } else if (filters != null && !pluginConfig.enableEnhancedParamSyntax()) {
           for (Map.Entry<String, String> filter : filters.entrySet()) {
             json.put(filter.getKey(), filter.getValue());
           }
@@ -369,7 +379,15 @@
       } else if (apiConfig.getPostLocation() == PostLocation.XML_BODY) {
         StringBuilder xmlRequest = new StringBuilder();
         xmlRequest.append("<request>");
-        if (filters != null) {
+        if (filters != null && pluginConfig.enableEnhancedParamSyntax()) {
+          for (Map.Entry<String, String> filter : filters.entrySet()) {
+            if (bodyParamsKeyPattern.matcher(filter.getKey()).find()){
+              xmlRequest.append("<").append(filter.getKey().substring(5)).append(">");
+              xmlRequest.append(filter.getValue());
+              xmlRequest.append("</").append(filter.getKey().substring(5)).append(">");
+            }
+          }
+        } else if (filters != null && !pluginConfig.enableEnhancedParamSyntax()) {
           for (Map.Entry<String, String> filter : filters.entrySet()) {
             xmlRequest.append("<").append(filter.getKey()).append(">");
             xmlRequest.append(filter.getValue());
@@ -398,6 +416,14 @@
       }
     }
 
+    if (filters != null && pluginConfig.enableEnhancedParamSyntax()) {
+      for (Map.Entry<String, String> filter : filters.entrySet()) {
+        if (headerParamsKeyPattern.matcher(filter.getKey()).find()){
+          requestBuilder.addHeader(filter.getKey().substring(7), filter.getValue());
+        }
+      }
+    }
+
     // Build the request object
     Request request = requestBuilder.build();
     Response response = null;
@@ -657,10 +683,19 @@
   public FormBody.Builder buildPostBody(Map<String, String> filters, String postBody) {
     // Add static parameters
     FormBody.Builder builder = buildPostBody(postBody);
+    final Pattern bodyParamsKeyPattern = Pattern.compile("^body\\..+$");
 
     // Now add the filters
-    for (Map.Entry<String, String> filter : filters.entrySet()) {
-      builder.add(filter.getKey(), filter.getValue());
+    if (filters != null && pluginConfig.enableEnhancedParamSyntax()) {
+      for (Map.Entry<String, String> filter : filters.entrySet()) {
+        if (bodyParamsKeyPattern.matcher(filter.getKey()).find()){
+          builder.add(filter.getKey().substring(5), filter.getValue());
+        }
+      }
+    } else if (filters != null && !pluginConfig.enableEnhancedParamSyntax()) {
+      for (Map.Entry<String, String> filter : filters.entrySet()) {
+        builder.add(filter.getKey(), filter.getValue());
+      }
     }
     return builder;
   }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 32e31fc..dc1509d 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -94,6 +94,8 @@
     TEST_JSON_RESPONSE_WITH_DATATYPES = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response2.json"), Charsets.UTF_8).read();
 
     dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
+    makeEnhancedLiveConfig();
+    makeEnhancedMockConfig();
     makeLiveConfig();
     makeMockConfig();
   }
@@ -103,7 +105,7 @@
    * subject to the whims of the external site. Timeout is 10 seconds to allow
    * for real-world delays.
    */
-  private static void makeLiveConfig() {
+  private static void makeEnhancedLiveConfig() {
 
     HttpApiConfig sunriseConfig = HttpApiConfig.builder()
       .url("https://api.sunrise-sunset.org/json")
@@ -113,7 +115,7 @@
     HttpApiConfig sunriseWithParamsConfig = HttpApiConfig.builder()
       .url("https://api.sunrise-sunset.org/json")
       .method("GET")
-      .params(Arrays.asList("lat", "lng", "date"))
+      .params(Arrays.asList("tail.lat", "tail.lng", "tail.date"))
       .dataPath("results")
       .requireTail(false)
       .build();
@@ -140,7 +142,7 @@
       .method("post")
       .inputType("xml")
       .requireTail(false)
-      .params(Arrays.asList("type_of_data", "records_from", "max_records"))
+      .params(Arrays.asList("body.type_of_data", "body.records_from", "body.max_records","year"))
       .postParameterLocation("xml_body")
       .xmlOptions(nycXmlOptions)
       .build();
@@ -153,10 +155,10 @@
     configs.put("nyc", nycConfig);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-        new HttpStoragePluginConfig(false, configs, 10, 1000, null, null, "", 80, "", "", "", null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER,
+        new HttpStoragePluginConfig(false, true, configs, 10, 1000, null, null, "", 80, "", "", "", null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER,
           AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
-    cluster.defineStoragePlugin("live", mockStorageConfigWithWorkspace);
+    cluster.defineStoragePlugin("live2", mockStorageConfigWithWorkspace);
   }
 
   /**
@@ -164,7 +166,7 @@
    * testing. Timeout is short to allow for timeout testing. The mock server is
    * useful, but won't catch bugs related to real-world server glitches.
    */
-  private static void makeMockConfig() {
+  private static void makeEnhancedMockConfig() {
 
     Map<String, String> headers = new HashMap<>();
     headers.put("header1", "value1");
@@ -197,7 +199,7 @@
       .authType("basic")
       .userName("user")
       .password("pass")
-      .params(Arrays.asList("lat", "lng", "date"))
+      .params(Arrays.asList("tail.lat", "tail.lng", "tail.date"))
       .dataPath("results")
       .requireTail(false)
       .build();
@@ -214,7 +216,7 @@
       .method("POST")
       .headers(headers)
       .requireTail(false)
-      .params(Arrays.asList("lat", "lng", "date"))
+      .params(Arrays.asList("body.lat", "body.lng", "body.date"))
       .postBody("key1=value1\nkey2=value2")
       .postParameterLocation("post_body")
       .build();
@@ -224,7 +226,7 @@
       .method("POST")
       .headers(headers)
       .requireTail(false)
-      .params(Arrays.asList("lat", "lng", "date"))
+      .params(Arrays.asList("body.lat", "body.lng", "body.date"))
       .postParameterLocation("post_body")
       .build();
 
@@ -233,7 +235,7 @@
       .method("POST")
       .headers(headers)
       .requireTail(false)
-      .params(Arrays.asList("lat", "lng", "date"))
+      .params(Arrays.asList("body.lat", "body.lng", "body.date"))
       .postParameterLocation("json_body")
       .build();
 
@@ -242,7 +244,7 @@
       .method("POST")
       .headers(headers)
       .requireTail(false)
-      .params(Arrays.asList("lat", "lng", "date"))
+      .params(Arrays.asList("body.lat", "body.lng", "body.date", "header.header3"))
       .postParameterLocation("json_body")
       .postBody("key1=value1\nkey2=value2")
       .build();
@@ -350,7 +352,7 @@
       .url(makeUrl("http://localhost:%d/orgs/{org}/repos"))
       .method("GET")
       .headers(headers)
-      .params(Arrays.asList("lat", "lng", "date"))
+      .params(Arrays.asList("tail.lat", "tail.lng", "tail.date"))
       .dataPath("results")
       .requireTail(false)
       .build();
@@ -359,7 +361,7 @@
       .url(makeUrl("http://localhost:%d/orgs/{org}/repos"))
       .method("GET")
       .headers(headers)
-      .params(Arrays.asList("org", "lng", "date"))
+      .params(Arrays.asList("org", "tail.lng", "tail.date", "tail.org"))
       .dataPath("results")
       .requireTail(false)
       .build();
@@ -368,7 +370,7 @@
       .url(makeUrl("http://localhost:%d/orgs/{org}/repos?p1={p1}"))
       .method("GET")
       .headers(headers)
-      .params(Arrays.asList("p2", "p3"))
+      .params(Arrays.asList("tail.p2", "tail.p3"))
       .dataPath("results")
       .requireTail(false)
       .build();
@@ -405,11 +407,324 @@
     configs.put("malformedJson", mockJsonWithMalformedData);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-        new HttpStoragePluginConfig(false, configs, 2, 1000, "globaluser", "globalpass", "",
+        new HttpStoragePluginConfig(false, true, configs, 2, 1000, "globaluser", "globalpass", "",
           80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
           UsernamePasswordCredentials.USERNAME, "globaluser",
           UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
+    cluster.defineStoragePlugin("local2", mockStorageConfigWithWorkspace);
+  }
+
+  /**
+   * Create configs to test legacy request syntax.
+   */
+  private static void makeLiveConfig() {
+
+    HttpApiConfig sunriseConfig = HttpApiConfig.builder()
+        .url("https://api.sunrise-sunset.org/json")
+        .method("GET")
+        .build();
+
+    HttpApiConfig sunriseWithParamsConfig = HttpApiConfig.builder()
+        .url("https://api.sunrise-sunset.org/json")
+        .method("GET")
+        .params(Arrays.asList("lat", "lng", "date"))
+        .dataPath("results")
+        .requireTail(false)
+        .build();
+
+    HttpApiConfig stockConfig = HttpApiConfig.builder()
+        .url("https://api.worldtradingdata.com/api/v1/stock?symbol=SNAP,TWTR,VOD" +
+            ".L&api_token=zuHlu2vZaehdZN6GmJdTiVlp7xgZn6gl6sfgmI4G6TY4ej0NLOzvy0TUl4D4")
+        .method("get")
+        .build();
+
+    HttpApiConfig pokemonConfig = HttpApiConfig.builder()
+        .url("https://pokeapi.co/api/v2/pokemon/{pokemon_name}")
+        .method("get")
+        .inputType("json")
+        .requireTail(false)
+        .build();
+
+    HttpXmlOptions nycXmlOptions = HttpXmlOptions.builder()
+        .dataLevel(5)
+        .build();
+
+    HttpApiConfig nycConfig = HttpApiConfig.builder()
+        .url("https://www.checkbooknyc.com/api")
+        .method("post")
+        .inputType("xml")
+        .requireTail(false)
+        .params(Arrays.asList("type_of_data", "records_from", "max_records","year"))
+        .postParameterLocation("xml_body")
+        .xmlOptions(nycXmlOptions)
+        .build();
+
+    Map<String, HttpApiConfig> configs = new HashMap<>();
+    configs.put("stock", stockConfig);
+    configs.put("sunrise", sunriseConfig);
+    configs.put("sunrise2", sunriseWithParamsConfig);
+    configs.put("pokemon", pokemonConfig);
+    configs.put("nyc", nycConfig);
+
+    HttpStoragePluginConfig mockStorageConfigWithWorkspace =
+        new HttpStoragePluginConfig(false, false, configs, 10, 1000, null, null, "", 80, "", "", "", null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER,
+            AuthMode.SHARED_USER.name());
+    mockStorageConfigWithWorkspace.setEnabled(true);
+    cluster.defineStoragePlugin("live", mockStorageConfigWithWorkspace);
+  }
+
+  /**
+   * Create configs for an in-process mock server. Used for normal automated unit
+   * testing. Timeout is short to allow for timeout testing. The mock server is
+   * useful, but won't catch bugs related to real-world server glitches.
+   */
+  private static void makeMockConfig() {
+
+    Map<String, String> headers = new HashMap<>();
+    headers.put("header1", "value1");
+    headers.put("header2", "value2");
+
+    // Use the mock server with HTTP parameters passed as table name.
+    // The connection acts like a schema.
+    // Ignores the message body except for data.
+    HttpApiConfig mockSchema = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/json"))
+        .method("GET")
+        .headers(headers)
+        .authType("basic")
+        .credentialsProvider(new PlainCredentialsProvider(ImmutableMap.of(
+            UsernamePasswordCredentials.USERNAME, "user",
+            UsernamePasswordCredentials.PASSWORD, "pass")))
+        .dataPath("results")
+        .errorOn400(true)
+        .build();
+
+    // Use the mock server with the HTTP parameters passed as WHERE
+    // clause filters. The connection acts like a table.
+    // Ignores the message body except for data.
+    // This is the preferred approach, the base URL contains as much info as possible;
+    // all other parameters are specified in SQL. See README for an example.
+    HttpApiConfig mockTable = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/json"))
+        .method("GET")
+        .headers(headers)
+        .authType("basic")
+        .userName("user")
+        .password("pass")
+        .params(Arrays.asList("lat", "lng", "date"))
+        .dataPath("results")
+        .requireTail(false)
+        .build();
+
+    HttpApiConfig mockPostConfig = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/"))
+        .method("POST")
+        .headers(headers)
+        .postBody("key1=value1\nkey2=value2")
+        .build();
+
+    HttpApiConfig mockPostPushdownWithStaticParams = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/"))
+        .method("POST")
+        .headers(headers)
+        .requireTail(false)
+        .params(Arrays.asList("lat", "lng", "date"))
+        .postBody("key1=value1\nkey2=value2")
+        .postParameterLocation("post_body")
+        .build();
+
+    HttpApiConfig mockPostPushdown = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/"))
+        .method("POST")
+        .headers(headers)
+        .requireTail(false)
+        .params(Arrays.asList("lat", "lng", "date"))
+        .postParameterLocation("post_body")
+        .build();
+
+    HttpApiConfig mockJsonNullBodyPost = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/"))
+        .method("POST")
+        .headers(headers)
+        .requireTail(false)
+        .params(Arrays.asList("lat", "lng", "date"))
+        .postParameterLocation("json_body")
+        .build();
+
+    HttpApiConfig mockJsonPostConfig = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/"))
+        .method("POST")
+        .headers(headers)
+        .requireTail(false)
+        .params(Arrays.asList("lat", "lng", "date"))
+        .postParameterLocation("json_body")
+        .postBody("key1=value1\nkey2=value2")
+        .build();
+
+    HttpPaginatorConfig offsetPaginatorForJson = HttpPaginatorConfig.builder()
+        .limitParam("limit")
+        .offsetParam("offset")
+        .method("offset")
+        .pageSize(2)
+        .build();
+
+    HttpApiConfig mockJsonConfigWithPaginator = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/json"))
+        .method("get")
+        .headers(headers)
+        .requireTail(false)
+        .paginator(offsetPaginatorForJson)
+        .inputType("json")
+        .build();
+
+    HttpApiConfig mockJsonWithMalformedData = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/json"))
+        .method("get")
+        .requireTail(false)
+        .jsonOptions(new HttpJsonOptions.HttpJsonOptionsBuilder()
+            .skipMalformedRecords(true)
+            .build())
+        .inputType("json")
+        .build();
+
+
+    HttpApiConfig mockPostConfigWithoutPostBody = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/"))
+        .method("POST")
+        .authType("basic")
+        .headers(headers)
+        .build();
+
+    HttpApiConfig mockCsvConfig = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/csv"))
+        .method("GET")
+        .headers(headers)
+        .authType("basic")
+        .userName("user")
+        .password("pass")
+        .dataPath("results")
+        .inputType("csv")
+        .build();
+
+    HttpApiConfig mockCsvConfigWithPaginator = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/csv"))
+        .method("get")
+        .paginator(offsetPaginatorForJson)
+        .inputType("csv")
+        .requireTail(false)
+        .dataPath("results")
+        .build();
+
+    HttpXmlOptions xmlOptions = new HttpXmlOptions.HttpXmlOptionsBuilder()
+        .dataLevel(2)
+        .build();
+
+    TupleMetadata testSchema = new SchemaBuilder()
+        .add("attributes", MinorType.MAP)
+        .addNullable("COMMON", MinorType.VARCHAR)
+        .addNullable("BOTANICAL", MinorType.VARCHAR)
+        .addNullable("ZONE", MinorType.INT)
+        .addNullable("LIGHT", MinorType.VARCHAR)
+        .addNullable("PRICE", MinorType.VARCHAR)
+        .addNullable("AVAILABILITY", MinorType.VARCHAR)
+        .buildSchema();
+
+    HttpXmlOptions xmlOptionsWithSchhema = new HttpXmlOptions.HttpXmlOptionsBuilder()
+        .dataLevel(2)
+        .schema(testSchema)
+        .build();
+
+
+    HttpApiConfig mockXmlConfig = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/xml"))
+        .method("GET")
+        .headers(headers)
+        .authType("basic")
+        .userName("user")
+        .password("pass")
+        .dataPath("results")
+        .inputType("xml")
+        .xmlOptions(xmlOptions)
+        .build();
+
+    HttpApiConfig mockXmlConfigWithSchema = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/xml"))
+        .method("GET")
+        .headers(headers)
+        .authType("basic")
+        .userName("user")
+        .password("pass")
+        .dataPath("results")
+        .inputType("xml")
+        .xmlOptions(xmlOptionsWithSchhema)
+        .build();
+
+
+    HttpApiConfig mockGithubWithParam = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/orgs/{org}/repos"))
+        .method("GET")
+        .headers(headers)
+        .params(Arrays.asList("lat", "lng", "date"))
+        .dataPath("results")
+        .requireTail(false)
+        .build();
+
+    HttpApiConfig mockGithubWithDuplicateParam = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/orgs/{org}/repos"))
+        .method("GET")
+        .headers(headers)
+        .params(Arrays.asList("org", "lng", "date"))
+        .dataPath("results")
+        .requireTail(false)
+        .build();
+
+    HttpApiConfig mockGithubWithParamInQuery = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/orgs/{org}/repos?p1={p1}"))
+        .method("GET")
+        .headers(headers)
+        .params(Arrays.asList("p2", "p3"))
+        .dataPath("results")
+        .requireTail(false)
+        .build();
+
+    HttpApiConfig mockTableWithJsonOptions = HttpApiConfig.builder()
+        .url(makeUrl("http://localhost:%d/json"))
+        .method("GET")
+        .headers(headers)
+        .requireTail(false)
+        .jsonOptions(HttpJsonOptions.builder()
+            .allTextMode(true)
+            .build()
+        )
+        .build();
+
+    Map<String, HttpApiConfig> configs = new HashMap<>();
+    configs.put("csv_paginator", mockCsvConfigWithPaginator);
+    configs.put("json_paginator", mockJsonConfigWithPaginator);
+    configs.put("sunrise", mockSchema);
+    configs.put("mocktable", mockTable);
+    configs.put("mockpost", mockPostConfig);
+    configs.put("nullPost", mockPostConfigWithoutPostBody);
+    configs.put("mockJsonPost", mockJsonPostConfig);
+    configs.put("mockJsonNullBodyPost", mockJsonNullBodyPost);
+    configs.put("mockPostPushdown", mockPostPushdown);
+    configs.put("mockPostPushdownWithStaticParams", mockPostPushdownWithStaticParams);
+    configs.put("mockcsv", mockCsvConfig);
+    configs.put("mockxml", mockXmlConfig);
+    configs.put("mockxml_with_schema", mockXmlConfigWithSchema);
+    configs.put("github", mockGithubWithParam);
+    configs.put("github2", mockGithubWithDuplicateParam);
+    configs.put("github3", mockGithubWithParamInQuery);
+    configs.put("mockJsonAllText", mockTableWithJsonOptions);
+    configs.put("malformedJson", mockJsonWithMalformedData);
+
+    HttpStoragePluginConfig mockStorageConfigWithWorkspace =
+        new HttpStoragePluginConfig(false, false, configs, 2, 1000, "globaluser", "globalpass", "",
+            80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
+            UsernamePasswordCredentials.USERNAME, "globaluser",
+            UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
+    mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
   }
 
@@ -430,6 +745,9 @@
         .addRow("live", "http") // For table-like connections
         .addRow("live.stock", "http")
         .addRow("live.sunrise", "http")
+        .addRow("live2", "http")
+        .addRow("live2.stock", "http")
+        .addRow("live2.sunrise", "http")
         .addRow("local", "http")
         .addRow("local.mockcsv", "http")
         .addRow("local.mockpost", "http")
@@ -437,6 +755,13 @@
         .addRow("local.mockxml_with_schema", "http")
         .addRow("local.nullpost", "http")
         .addRow("local.sunrise", "http")
+        .addRow("local2", "http")
+        .addRow("local2.mockcsv", "http")
+        .addRow("local2.mockpost", "http")
+        .addRow("local2.mockxml", "http")
+        .addRow("local2.mockxml_with_schema", "http")
+        .addRow("local2.nullpost", "http")
+        .addRow("local2.sunrise", "http")
         .build();
 
     RowSetUtilities.verify(expected, results);
@@ -501,6 +826,36 @@
     RowSetUtilities.verify(expected, results);
   }
 
+  @Test
+  @Ignore("Requires Remote Server")
+  public void simpleStarQueryWithEnhancedMode() throws Exception {
+    String sql = "SELECT * FROM live2.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addMap("results")
+        .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .resumeSchema()
+        .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow(mapValue("6:12:17 AM", "6:01:54 PM", "12:07:06 PM", "11:49:37",
+            "5:47:49 AM", "6:26:22 PM", "5:17:51 AM", "6:56:21 PM", "4:47:41 AM", "7:26:31 PM"), "OK")
+        .build();
+
+    RowSetUtilities.verify(expected, results);
+  }
   /**
    * As above, but we return only the contents of {@code results}, and use
    * filter push-down for the arguments.
@@ -509,10 +864,10 @@
    */
   @Test
   @Ignore("Requires Remote Server")
-  public void wildcardQueryWithParams() throws Exception {
+  public void wildcardQueryWithEnhancedParamSyntax() throws Exception {
     String sql =
-        "SELECT * FROM live.sunrise2\n" +
-        "WHERE `lat`=36.7201600 AND `lng`=-4.4203400 AND `date`='2019-10-02'";
+        "SELECT * FROM live2.sunrise2\n" +
+        "WHERE `tail.lat`=36.7201600 AND `tail.lng`=-4.4203400 AND `tail.date`='2019-10-02'";
 
     RowSet results = client.queryBuilder().sql(sql).rowSet();
 
@@ -539,6 +894,36 @@
 
   @Test
   @Ignore("Requires Remote Server")
+  public void wildcardQueryWithParams() throws Exception {
+    String sql =
+        "SELECT * FROM live.sunrise2\n" +
+            "WHERE `lat`=36.7201600 AND `lng`=-4.4203400 AND `date`='2019-10-02'";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("6:12:17 AM", "6:01:54 PM", "12:07:06 PM", "11:49:37", "5:47:49 AM",
+            "6:26:22 PM", "5:17:51 AM", "6:56:21 PM", "4:47:41 AM", "7:26:31 PM")
+        .build();
+
+    RowSetUtilities.verify(expected, results);
+  }
+
+  @Test
+  @Ignore("Requires Remote Server")
   public void simpleSpecificQuery() throws Exception {
     String sql = "SELECT t1.results.sunrise AS sunrise, t1.results.sunset AS sunset\n" +
                  "FROM live.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
@@ -547,19 +932,37 @@
 
   @Test
   @Ignore("Requires Remote Server")
-  public void simpleSpecificQueryWithParams() throws Exception {
-    String sql =
-        "SELECT sunrise, sunset\n" +
-        "FROM live.sunrise2\n" +
-        "WHERE `lat`=36.7201600 AND `lng`=-4.4203400 AND `date`='2019-10-02'";
+  public void simpleSpecificQueryWithEnhancedMode() throws Exception {
+    String sql = "SELECT t1.results.sunrise AS sunrise, t1.results.sunset AS sunset\n" +
+        "FROM live2.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
     doSimpleSpecificQuery(sql);
   }
 
   @Test
   @Ignore("Requires Remote Server")
-  public void simpleStarQueryWithXMLParams() throws Exception {
+  public void simpleSpecificQueryWithEnhancedParamSyntax() throws Exception {
+    String sql =
+        "SELECT sunrise, sunset\n" +
+        "FROM live2.sunrise2\n" +
+        "WHERE `tail.lat`=36.7201600 AND `tail.lng`=-4.4203400 AND `tail.date`='2019-10-02'";
+    doSimpleSpecificQuery(sql);
+  }
+
+  @Test
+  @Ignore("Requires Remote Server")
+  public void simpleSpecificQueryWithParams() throws Exception {
+    String sql =
+        "SELECT sunrise, sunset\n" +
+            "FROM live.sunrise2\n" +
+            "WHERE `lat`=36.7201600 AND `lng`=-4.4203400 AND `date`='2019-10-02'";
+    doSimpleSpecificQuery(sql);
+  }
+
+  @Test
+  @Ignore("Requires Remote Server")
+  public void simpleStarQueryWithXMLEnhancedParamSyntax() throws Exception {
     String sql = "SELECT year, department, expense_category, budget_code, budget_name, modified, adopted " +
-      "FROM live.nyc WHERE type_of_data='Budget' AND records_from=1 AND max_records=5 AND year IS NOT null";
+      "FROM live2.nyc WHERE `body.type_of_data`='Budget' AND `body.records_from`=1 AND `body.max_records`=5 AND year IS NOT null";
 
     RowSet results = client.queryBuilder().sql(sql).rowSet();
 
@@ -583,6 +986,33 @@
     RowSetUtilities.verify(expected, results);
   }
 
+  @Test
+  @Ignore("Requires Remote Server")
+  public void simpleStarQueryWithXMLParams() throws Exception {
+    String sql = "SELECT year, department, expense_category, budget_code, budget_name, modified, adopted " +
+        "FROM live.nyc WHERE `type_of_data`='Budget' AND `records_from`=1 AND `max_records`=5 AND year IS NOT null";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("year", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("department", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("expense_category", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("budget_code", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("budget_name", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("modified", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .add("adopted", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("2022", "MEDICAL ASSISTANCE - OTPS", "MEDICAL ASSISTANCE", "9564", "MMIS MEDICAL ASSISTANCE", "5972433142", "5584533142")
+        .addRow("2020", "MEDICAL ASSISTANCE - OTPS", "MEDICAL ASSISTANCE", "9564", "MMIS MEDICAL ASSISTANCE", "5819588142", "4953233142")
+        .addRow("2014", "MEDICAL ASSISTANCE - OTPS", "MEDICAL ASSISTANCE", "9564", "MMIS MEDICAL ASSISTANCE", "5708101276", "5231324567")
+        .addRow("2015", "MEDICAL ASSISTANCE - OTPS", "MEDICAL ASSISTANCE", "9564", "MMIS MEDICAL ASSISTANCE", "5663673673", "5312507361")
+        .build();
+
+    RowSetUtilities.verify(expected, results);
+  }
 
   private void doSimpleSpecificQuery(String sql) throws Exception {
 
@@ -611,6 +1041,16 @@
   }
 
   @Test
+  @Ignore("Requires Remote Server")
+  public void liveTestWithURLParametersWithEnhancedMode() throws Exception {
+    String sql = "SELECT * FROM live2.pokemon WHERE pokemon_name = 'ditto'";
+    client.testBuilder()
+        .sqlQuery(sql)
+        .expectsNumRecords(1)
+        .go();
+  }
+
+  @Test
   public void simpleTestWithJsonConfig() {
     String sql = "SELECT * FROM local.mockJsonAllText";
 
@@ -636,6 +1076,31 @@
   }
 
   @Test
+  public void simpleTestWithJsonConfigWithEnhancedMode() {
+    String sql = "SELECT * FROM local2.mockJsonAllText";
+
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_RESPONSE_WITH_DATATYPES));
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("col_1", MinorType.VARCHAR, DataMode.OPTIONAL)
+          .add("col_2", MinorType.VARCHAR, DataMode.OPTIONAL)
+          .add("col_3", MinorType.VARCHAR, DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow("1.0", "2", "3.0")
+          .addRow("4.0", "5", "6.0")
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+    } catch (Exception e) {
+      fail();
+    }
+  }
+
+  @Test
   public void simpleTestWithMalformedJson() {
     String sql = "SELECT * FROM local.malformedJson";
 
@@ -660,6 +1125,30 @@
   }
 
   @Test
+  public void simpleTestWithMalformedJsonWithEnhancedMode() {
+    String sql = "SELECT * FROM local2.malformedJson";
+
+    try (MockWebServer server = startServer()) {
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_MALFORMED_JSON_RESPONSE));
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .addNullable("a", MinorType.BIGINT)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(1)
+          .addRow(5)
+          .addRow(6)
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+    } catch (Exception e) {
+      fail();
+    }
+  }
+
+  @Test
   public void simpleTestWithMockServerWithURLParams() throws Exception {
     String sql = "SELECT _response_url FROM local.github\n" +
         "WHERE `org` = 'apache'";
@@ -686,6 +1175,32 @@
   }
 
   @Test
+  public void simpleTestWithMockServerWithURLParamsWithEnhancedMode() throws Exception {
+    String sql = "SELECT _response_url FROM local2.github\n" +
+        "WHERE `org` = 'apache'";
+
+    try (MockWebServer server = startServer()) {
+      server.enqueue(
+          new MockResponse()
+              .setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("_response_url", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(makeUrl("http://localhost:%d/orgs/apache/repos"))
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
   public void simpleTestWithMockServerWithURLParamsOfBooleanType() throws Exception {
     String sql = "SELECT _response_url FROM local.github\n" +
       "WHERE `org` = true";
@@ -712,6 +1227,32 @@
   }
 
   @Test
+  public void simpleTestWithMockServerWithURLParamsOfBooleanTypeWithEnhancedMode() throws Exception {
+    String sql = "SELECT _response_url FROM local2.github\n" +
+        "WHERE `org` = true";
+
+    try (MockWebServer server = startServer()) {
+      server.enqueue(
+          new MockResponse()
+              .setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("_response_url", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(makeUrl("http://localhost:%d/orgs/true/repos"))
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
   public void simpleTestWithMockServerWithURLParamsOfIntType() throws Exception {
     String sql = "SELECT _response_url FROM local.github\n" +
       "WHERE `org` = 1234";
@@ -738,6 +1279,32 @@
   }
 
   @Test
+  public void simpleTestWithMockServerWithURLParamsOfIntTypeWithEnhancedMode() throws Exception {
+    String sql = "SELECT _response_url FROM local2.github\n" +
+        "WHERE `org` = 1234";
+
+    try (MockWebServer server = startServer()) {
+      server.enqueue(
+          new MockResponse()
+              .setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("_response_url", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(makeUrl("http://localhost:%d/orgs/1234/repos"))
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
   @Ignore("Requires Remote Server")
   public void simpleTestWithUrlParamsInSubquery() throws Exception {
     String sql = "select pokemon_data.data.game_index AS game_index, pokemon_data.data.version.name AS name " +
@@ -763,9 +1330,34 @@
   }
 
   @Test
-  public void simpleTestWithMockServerWithDuplicateURLParams() throws Exception {
-    String sql = "SELECT _response_url FROM local.github2\n" +
-      "WHERE `org` = 'apache'";
+  @Ignore("Requires Remote Server")
+  public void simpleTestWithUrlParamsInSubqueryWithEnhancedMode() throws Exception {
+    String sql = "select pokemon_data.data.game_index AS game_index, pokemon_data.data.version.name AS name " +
+        "from (select flatten(game_indices) as data " +
+        "from live2.pokemon " +
+        "where pokemon_name='ditto' " +
+        ") as pokemon_data WHERE pokemon_data.data.game_index=76";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add("game_index", MinorType.BIGINT, TypeProtos.DataMode.OPTIONAL)
+        .add("name", MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow(76, "red")
+        .addRow(76, "blue")
+        .addRow(76, "yellow")
+        .build();
+
+    RowSetUtilities.verify(expected, results);
+  }
+
+  @Test
+  public void simpleTestWithMockServerWithDuplicateURLParamsWithEnhancedMode() throws Exception {
+    String sql = "SELECT _response_url FROM local2.github2\n" +
+      "WHERE `org` = 'apache' and `tail.org` = 'apache'";
 
     try (MockWebServer server = startServer()) {
       server.enqueue(
@@ -789,9 +1381,35 @@
   }
 
   @Test
-  public void testUrlParamsInQueryString() throws Exception {
-    String sql = "SELECT _response_url FROM local.github3\n" +
-      "WHERE `org` = 'apache' AND p1='param1' AND p2='param2'";
+  public void simpleTestWithMockServerWithDuplicateURLParams() throws Exception {
+    String sql = "SELECT _response_url FROM local.github2\n" +
+        "WHERE `org` = 'apache'";
+
+    try (MockWebServer server = startServer()) {
+      server.enqueue(
+          new MockResponse()
+              .setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("_response_url", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(makeUrl("http://localhost:%d/orgs/apache/repos?org=apache"))
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
+  public void testUrlParamsInQueryStringWithEnhancedMode() throws Exception {
+    String sql = "SELECT _response_url FROM local2.github3\n" +
+      "WHERE `org` = 'apache' AND p1='param1' AND `tail.p2`='param2'";
 
     try (MockWebServer server = startServer()) {
       server.enqueue(
@@ -814,6 +1432,32 @@
     }
   }
 
+  @Test
+  public void testUrlParamsInQueryString() throws Exception {
+    String sql = "SELECT _response_url FROM local.github3\n" +
+        "WHERE `org` = 'apache' AND p1='param1' AND `p2`='param2'";
+
+    try (MockWebServer server = startServer()) {
+      server.enqueue(
+          new MockResponse()
+              .setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("_response_url", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(makeUrl("http://localhost:%d/orgs/apache/repos?p1=param1&p2=param2"))
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
   /**
    * When the user has configured an API connection with URL parameters,
    * it is mandatory that those parameters are included in the WHERE clause. Drill
@@ -838,6 +1482,23 @@
   }
 
   @Test
+  public void testUrlParamErrorWithEnhancedMode() throws Exception {
+    String sql = "SELECT _response_url FROM local2.github\n";
+
+    try (MockWebServer server = startServer()) {
+      server.enqueue(
+          new MockResponse()
+              .setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+      run(sql);
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains("API Query with URL Parameters must be populated."));
+    }
+  }
+
+  @Test
   public void testSerDeXML() throws Exception {
     try (MockWebServer server = startServer()) {
 
@@ -854,6 +1515,22 @@
   }
 
   @Test
+  public void testSerDeXMLWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+          new MockResponse().setResponseCode(200)
+              .setBody(TEST_XML_RESPONSE)
+      );
+
+      String sql = "SELECT COUNT(*) FROM local2.mockxml.`xml?arg1=4` ";
+      String plan = queryBuilder().sql(sql).explainJson();
+      long cnt = queryBuilder().physical(plan).singletonLong();
+      assertEquals("Counts should match", 36L, cnt);
+    }
+  }
+
+  @Test
    public void testSerDeCSV() throws Exception {
     try (MockWebServer server = startServer()) {
 
@@ -870,6 +1547,22 @@
   }
 
   @Test
+  public void testSerDeCSVWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+          new MockResponse().setResponseCode(200)
+              .setBody(TEST_CSV_RESPONSE)
+      );
+
+      String sql = "SELECT COUNT(*) FROM local2.mockcsv.`csv?arg1=4` ";
+      String plan = queryBuilder().sql(sql).explainJson();
+      long cnt = queryBuilder().physical(plan).singletonLong();
+      assertEquals("Counts should match", 2L, cnt);
+    }
+  }
+
+  @Test
   public void testSerDe() throws Exception {
     try (MockWebServer server = startServer()) {
 
@@ -886,6 +1579,22 @@
   }
 
   @Test
+  public void testSerDeWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+          new MockResponse().setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT COUNT(*) FROM local2.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+      String plan = queryBuilder().sql(sql).explainJson();
+      long cnt = queryBuilder().physical(plan).singletonLong();
+      assertEquals("Counts should match", 1L, cnt);
+    }
+  }
+
+  @Test
   public void testApiConfigRequiresTailSerDe() throws Exception {
     String sql = "SELECT * FROM local.mocktable";
 
@@ -897,15 +1606,39 @@
   }
 
   @Test
+  public void testApiConfigRequiresTailSerDeWithEnhancedMode() throws Exception {
+    String sql = "SELECT * FROM local2.mocktable";
+
+    queryBuilder()
+        .sql(sql)
+        .detailedPlanMatcher()
+        .include("requireTail=false")
+        .match();
+  }
+
+  @Test
   public void simpleTestWithMockServer() throws Exception {
     String sql = "SELECT * FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
     doSimpleTestWithMockServer(sql);
   }
 
   @Test
+  public void simpleTestWithMockServerWithEnhancedMode() throws Exception {
+    String sql = "SELECT * FROM local2.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+    doSimpleTestWithMockServer(sql);
+  }
+
+  @Test
+  public void simpleTestWithMockServerWithEnhancedParamSyntax() throws Exception {
+    String sql = "SELECT * FROM local2.mocktable\n" +
+                 "WHERE `tail.lat` = 36.7201600 AND `tail.lng` = -4.4203400 AND `tail.date` = '2019-10-02'";
+    doSimpleTestWithMockServer(sql);
+  }
+
+  @Test
   public void simpleTestWithMockServerWithParams() throws Exception {
     String sql = "SELECT * FROM local.mocktable\n" +
-                 "WHERE `lat` = 36.7201600 AND `lng` = -4.4203400 AND `date` = '2019-10-02'";
+        "WHERE `lat` = 36.7201600 AND `lng` = -4.4203400 AND `date` = '2019-10-02'";
     doSimpleTestWithMockServer(sql);
   }
 
@@ -939,6 +1672,35 @@
   }
 
   @Test
+  public void testCsvResponseWithEnhancedMode() throws Exception {
+    String sql = "SELECT * FROM local2.mockcsv.`csv?arg1=4`";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_CSV_RESPONSE));
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("col1", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("col2", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("col3", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow("1", "2", "3")
+          .addRow("4", "5", "6")
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+
+      // Verify correct username/password from endpoint configuration
+      RecordedRequest recordedRequest = server.takeRequest();
+      assertNotNull(recordedRequest.getHeader("Authorization"));
+      assertEquals("Basic dXNlcjpwYXNz", recordedRequest.getHeader("Authorization"));
+    }
+  }
+
+  @Test
   public void testXmlResponse() throws Exception {
     String sql = "SELECT * FROM local.mockxml.`?arg1=4` LIMIT 5";
     try (MockWebServer server = startServer()) {
@@ -970,6 +1732,37 @@
   }
 
   @Test
+  public void testXmlResponseWithEnhancedMode() throws Exception {
+    String sql = "SELECT * FROM local2.mockxml.`?arg1=4` LIMIT 5";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_XML_RESPONSE));
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("attributes", MinorType.MAP)
+          .addNullable("COMMON", MinorType.VARCHAR)
+          .addNullable("BOTANICAL", MinorType.VARCHAR)
+          .addNullable("ZONE", MinorType.VARCHAR)
+          .addNullable("LIGHT", MinorType.VARCHAR)
+          .addNullable("PRICE", MinorType.VARCHAR)
+          .addNullable("AVAILABILITY", MinorType.VARCHAR)
+          .buildSchema();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(mapArray(), "Bloodroot", "Sanguinaria canadensis", "4", "Mostly Shady", "$2.44", "031599")
+          .addRow(mapArray(),"Columbine", "Aquilegia canadensis", "3", "Mostly Shady", "$9.37", "030699")
+          .addRow(mapArray(),"Marsh Marigold", "Caltha palustris", "4", "Mostly Sunny", "$6.81", "051799")
+          .addRow(mapArray(), "Cowslip", "Caltha palustris", "4", "Mostly Shady", "$9.90", "030699")
+          .addRow(mapArray(), "Dutchman's-Breeches", "Dicentra cucullaria", "3", "Mostly Shady", "$6.44", "012099")
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
   public void testXmlWithSchemaResponse() throws Exception {
     String sql = "SELECT * FROM local.mockxml_with_schema.`?arg1=4` LIMIT 5";
     try (MockWebServer server = startServer()) {
@@ -1001,6 +1794,37 @@
   }
 
   @Test
+  public void testXmlWithSchemaResponseWithEnhancedMode() throws Exception {
+    String sql = "SELECT * FROM local2.mockxml_with_schema.`?arg1=4` LIMIT 5";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_XML_RESPONSE));
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("attributes", MinorType.MAP)
+          .addNullable("COMMON", MinorType.VARCHAR)
+          .addNullable("BOTANICAL", MinorType.VARCHAR)
+          .addNullable("ZONE", MinorType.INT)
+          .addNullable("LIGHT", MinorType.VARCHAR)
+          .addNullable("PRICE", MinorType.VARCHAR)
+          .addNullable("AVAILABILITY", MinorType.VARCHAR)
+          .buildSchema();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(mapArray(), "Bloodroot", "Sanguinaria canadensis", 4, "Mostly Shady", "$2.44", "031599")
+          .addRow(mapArray(),"Columbine", "Aquilegia canadensis", 3, "Mostly Shady", "$9.37", "030699")
+          .addRow(mapArray(),"Marsh Marigold", "Caltha palustris", 4, "Mostly Sunny", "$6.81", "051799")
+          .addRow(mapArray(), "Cowslip", "Caltha palustris", 4, "Mostly Shady", "$9.90", "030699")
+          .addRow(mapArray(), "Dutchman's-Breeches", "Dicentra cucullaria", 3, "Mostly Shady", "$6.44", "012099")
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
   public void testImplicitFieldsWithJSON() throws Exception {
     String sql = "SELECT _response_code, _response_message, _response_protocol, _response_url FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
     try (MockWebServer server = startServer()) {
@@ -1024,6 +1848,29 @@
   }
 
   @Test
+  public void testImplicitFieldsWithJSONWithEnhancedMode() throws Exception {
+    String sql = "SELECT _response_code, _response_message, _response_protocol, _response_url FROM local2.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_RESPONSE));
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("_response_code", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+          .add("_response_message", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("_response_protocol", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("_response_url", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02"))
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
   public void testImplicitFieldsWithCSV() throws Exception {
     String sql = "SELECT _response_code, _response_message, _response_protocol, _response_url FROM local.mockcsv.`csv?arg1=4`";
     try (MockWebServer server = startServer()) {
@@ -1048,6 +1895,30 @@
   }
 
   @Test
+  public void testImplicitFieldsWithCSVWithEnhancedMode() throws Exception {
+    String sql = "SELECT _response_code, _response_message, _response_protocol, _response_url FROM local2.mockcsv.`csv?arg1=4`";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_CSV_RESPONSE));
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("_response_code", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+          .add("_response_message", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("_response_protocol", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("_response_url", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/csvcsv?arg1=4"))
+          .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/csvcsv?arg1=4"))
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
   public void testImplicitFieldsWithXML() throws Exception {
     String sql = "SELECT _response_code, _response_message, _response_protocol, _response_url FROM local.mockxml.`?arg1=4` LIMIT 5";
     try (MockWebServer server = startServer()) {
@@ -1074,6 +1945,33 @@
     }
   }
 
+  @Test
+  public void testImplicitFieldsWithXMLWithEnhancedMode() throws Exception {
+    String sql = "SELECT _response_code, _response_message, _response_protocol, _response_url FROM local2.mockxml.`?arg1=4` LIMIT 5";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_XML_RESPONSE));
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("_response_code", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+          .add("_response_message", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("_response_protocol", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("_response_url", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/xml?arg1=4"))
+          .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/xml?arg1=4"))
+          .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/xml?arg1=4"))
+          .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/xml?arg1=4"))
+          .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/xml?arg1=4"))
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
   private void doSimpleTestWithMockServer(String sql) throws Exception {
     try (MockWebServer server = startServer()) {
 
@@ -1149,6 +2047,49 @@
   }
 
   @Test
+  public void testPostWithMockServerAndNullPostbodyWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+          new MockResponse()
+              .setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT * FROM local2.nullPost\n.`json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .addMap("results")
+          .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .resumeSchema()
+          .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+
+      RecordedRequest recordedRequest = server.takeRequest();
+      assertEquals("POST", recordedRequest.getMethod());
+      // Verify correct username/password from global configuration
+      assertNotNull(recordedRequest.getHeader("Authorization"));
+      assertEquals("Basic Z2xvYmFsdXNlcjpnbG9iYWxwYXNz", recordedRequest.getHeader("Authorization"));
+    }
+  }
+
+  @Test
   public void testPostWithMockServer() throws Exception {
     try (MockWebServer server = startServer()) {
 
@@ -1191,6 +2132,48 @@
   }
 
   @Test
+  public void testPostWithMockServerWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+          new MockResponse()
+              .setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT * FROM local2.mockPost.`json?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .addMap("results")
+          .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .resumeSchema()
+          .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+
+      RecordedRequest recordedRequest = server.takeRequest();
+      assertEquals("POST", recordedRequest.getMethod());
+      assertEquals(recordedRequest.getHeader("header1"), "value1");
+      assertEquals(recordedRequest.getHeader("header2"), "value2");
+    }
+  }
+
+  @Test
   public void specificTestWithMockServer() throws Exception {
     try (MockWebServer server = startServer()) {
 
@@ -1217,6 +2200,32 @@
   }
 
   @Test
+  public void specificTestWithMockServerWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+          new MockResponse().setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT sunrise, sunset FROM local2.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .buildSchema();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow("6:13:58 AM", "5:59:55 PM")
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
   public void testLimitPushdown() throws Exception {
     String sql = "SELECT sunrise, sunset FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1 LIMIT 5";
 
@@ -1228,17 +2237,39 @@
   }
 
   @Test
-  public void testLimitPushdownWithFilter() throws Exception {
-    String sql = "SELECT sunrise, sunset FROM live.sunrise2 WHERE `date`='2019-10-02' LIMIT 5";
+  public void testLimitPushdownWithEnhancedMode() throws Exception {
+    String sql = "SELECT sunrise, sunset FROM local2.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1 LIMIT 5";
+
+    queryBuilder()
+        .sql(sql)
+        .planMatcher()
+        .include("Limit", "maxRecords=5")
+        .match();
+  }
+
+  @Test
+  public void testLimitPushdownWithFilterWithEnhancedMode() throws Exception {
+    String sql = "SELECT sunrise, sunset FROM live2.sunrise2 WHERE `tail.date`='2019-10-02' LIMIT 5";
 
     queryBuilder()
       .sql(sql)
       .planMatcher()
-      .include("Limit", "maxRecords=5", "filters=\\{date=2019-10-02\\}")
+      .include("Limit", "maxRecords=5", "filters=\\{tail.date=2019-10-02\\}")
       .match();
   }
 
   @Test
+  public void testLimitPushdownWithFilter() throws Exception {
+    String sql = "SELECT sunrise, sunset FROM live.sunrise2 WHERE `date`='2019-10-02' LIMIT 5";
+
+    queryBuilder()
+        .sql(sql)
+        .planMatcher()
+        .include("Limit", "maxRecords=5", "filters=\\{date=2019-10-02\\}")
+        .match();
+  }
+
+  @Test
   public void testSlowResponse() throws Exception {
     try (MockWebServer server = startServer()) {
 
@@ -1261,6 +2292,28 @@
   }
 
   @Test
+  public void testSlowResponseWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+          new MockResponse().setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+              .setBodyDelay(6, TimeUnit.SECONDS)
+      );
+
+      String sql = "SELECT sunrise AS sunrise, sunset AS sunset FROM local2.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1";
+
+      try {
+        client.queryBuilder().sql(sql).rowSet();
+        fail();
+      } catch (Exception e) {
+        assertTrue("Not timeout exception, " + e,
+            e.getMessage().contains("DATA_READ ERROR: timeout") || e.getMessage().contains("DATA_READ ERROR: Read timed out"));
+      }
+    }
+  }
+
+  @Test
   public void testZeroByteResponse() throws Exception {
     try (MockWebServer server = startServer()) {
 
@@ -1277,6 +2330,22 @@
   }
 
   @Test
+  public void testZeroByteResponseWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+          new MockResponse().setResponseCode(200)
+              .setBody("")
+      );
+
+      String sql = "SELECT * FROM local2.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      assertNull(results);
+    }
+  }
+
+  @Test
   public void testZeroByteResponseFromCSV() throws Exception {
     try (MockWebServer server = startServer()) {
 
@@ -1292,6 +2361,22 @@
     }
   }
 
+  @Test
+  public void testZeroByteResponseFromCSVWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+          new MockResponse().setResponseCode(200)
+              .setBody("")
+      );
+
+      String sql = "SELECT * FROM local2.mockcsv.`csv?arg1=4`";
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      assertNull(results);
+    }
+  }
+
   // The connection expects a response object of the form
   // { results: { ... } }, but there is no such object, which
   // is treated as a null (no data, no schema) result set.
@@ -1312,6 +2397,22 @@
   }
 
   @Test
+  public void testEmptyJSONObjectResponseWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+          new MockResponse().setResponseCode(200)
+              .setBody("{}")
+      );
+
+      String sql = "SELECT * FROM local2.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      assertNull(results);
+    }
+  }
+
+  @Test
   public void testNullContent() throws Exception {
     try (MockWebServer server = startServer()) {
 
@@ -1327,6 +2428,22 @@
     }
   }
 
+  @Test
+  public void testNullContentWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+          new MockResponse().setResponseCode(200)
+              .setBody("{results: null}")
+      );
+
+      String sql = "SELECT * FROM local2.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+      assertNull(results);
+    }
+  }
+
   // Note that, in this test, the response is not empty. Instead, the
   // response has a single row with no columns.
   @Test
@@ -1354,6 +2471,30 @@
   }
 
   @Test
+  public void testEmptyContentWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+          new MockResponse().setResponseCode(200)
+              .setBody("{results: {} }")
+      );
+
+      String sql = "SELECT * FROM local2.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .buildSchema();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow()
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
   public void testErrorResponse() throws Exception {
     try (MockWebServer server = startServer()) {
 
@@ -1379,6 +2520,31 @@
   }
 
   @Test
+  public void testErrorResponseWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+          new MockResponse().setResponseCode(404)
+              .setBody("{}")
+      );
+
+      String sql = "SELECT * FROM local2.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+
+      try {
+        client.queryBuilder().sql(sql).rowSet();
+        fail();
+      } catch (Exception e) {
+        String msg = e.getMessage();
+        assertTrue(msg.contains("DATA_READ ERROR: HTTP request failed"));
+        assertTrue(msg.contains("Response code: 404"));
+        assertTrue(msg.contains("Response message: Client Error"));
+        assertTrue(msg.contains("Connection: sunrise"));
+        assertTrue(msg.contains("Plugin: local"));
+      }
+    }
+  }
+
+  @Test
   public void testNoErrorOn404() throws Exception {
     try (MockWebServer server = startServer()) {
 
@@ -1407,6 +2573,34 @@
   }
 
   @Test
+  public void testNoErrorOn404WithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+          new MockResponse()
+              .setResponseCode(404)
+              .setBody("{}")
+      );
+
+      String sql = "SELECT _response_code, _response_message, _response_protocol, _response_url FROM local2.mocktable";
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("_response_code", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+          .add("_response_message", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("_response_protocol", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("_response_url", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(404, "Client Error", "http/1.1", makeUrl("http://localhost:%d/json"))
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+    }
+  }
+
+  @Test
   public void testHeaders() throws Exception {
     try (MockWebServer server = startServer()) {
 
@@ -1446,6 +2640,45 @@
   }
 
   @Test
+  public void testHeadersWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(
+          new MockResponse().setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT * FROM local2.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM",
+              "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM")
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+
+      RecordedRequest request = server.takeRequest();
+      assertEquals("value1", request.getHeader("header1"));
+      assertEquals("value2", request.getHeader("header2"));
+      assertEquals("Basic dXNlcjpwYXNz", request.getHeader("Authorization"));
+    }
+  }
+
+  @Test
   public void testJsonPostWithMockServer() throws Exception {
     try (MockWebServer server = startServer()) {
       server.enqueue(
@@ -1487,7 +2720,48 @@
   }
 
   @Test
-  public void testJsonPostWithFiltersAndMockServer() throws Exception {
+  public void testJsonPostWithMockServerWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+      server.enqueue(
+          new MockResponse()
+              .setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT * FROM local2.mockJsonPost";
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .addMap("results")
+          .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .resumeSchema()
+          .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+
+      RecordedRequest recordedRequest = server.takeRequest();
+      assertEquals("POST", recordedRequest.getMethod());
+      String resultJsonString = recordedRequest.getBody().toString();
+      assertEquals("[text={\"key1\":\"value1\",\"key2\":\"value2\"}]", resultJsonString);
+    }
+  }
+
+  @Test
+  public void testJsonPostWithFiltersAndMockServerWithEnhancedMode() throws Exception {
     try (MockWebServer server = startServer()) {
       server.enqueue(
         new MockResponse()
@@ -1495,7 +2769,7 @@
           .setBody(TEST_JSON_RESPONSE)
       );
 
-      String sql = "SELECT * FROM local.mockJsonPost WHERE lat=36.7201600 AND lng=-4.4203400";
+      String sql = "SELECT * FROM local2.mockJsonPost WHERE `body.lat`=36.7201600 AND `body.lng`=-4.4203400";
       RowSet results = client.queryBuilder().sql(sql).rowSet();
 
       TupleMetadata expectedSchema = new SchemaBuilder()
@@ -1528,7 +2802,92 @@
   }
 
   @Test
-  public void testJsonPostWithFiltersAndNullPostBodyMockServer() throws Exception {
+  public void testJsonPostWithFiltersAndMockServer() throws Exception {
+    try (MockWebServer server = startServer()) {
+      server.enqueue(
+          new MockResponse()
+              .setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT * FROM local.mockJsonPost WHERE `lat`=36.7201600 AND `lng`=-4.4203400";
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .addMap("results")
+          .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .resumeSchema()
+          .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+
+      RecordedRequest recordedRequest = server.takeRequest();
+      assertEquals("POST", recordedRequest.getMethod());
+      String resultJsonString = recordedRequest.getBody().toString();
+      assertEquals("[size=71 text={\"key1\":\"value1\",\"key2\":\"value2\",\"lng\":\"-4.4203400\",\"lat\":\"36.72…]", resultJsonString);
+    }
+  }
+
+  @Test
+  public void testJsonPostAndHeadersWithFiltersAndMockServerWithEnhancedMode() throws Exception {
+    try (MockWebServer server = startServer()) {
+      server.enqueue(
+          new MockResponse()
+              .setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT * FROM local2.mockJsonPost WHERE `body.lat`=36.7201600 AND `body.lng`=-4.4203400 AND `header.header3`='value3'";
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .addMap("results")
+          .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .resumeSchema()
+          .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+
+      RecordedRequest recordedRequest = server.takeRequest();
+      assertEquals("POST", recordedRequest.getMethod());
+      assertEquals("value1", recordedRequest.getHeader("header1"));
+      assertEquals("value2", recordedRequest.getHeader("header2"));
+      assertEquals("value3", recordedRequest.getHeader("header3"));
+      String resultJsonString = recordedRequest.getBody().toString();
+      assertEquals("[size=71 text={\"key1\":\"value1\",\"key2\":\"value2\",\"lng\":\"-4.4203400\",\"lat\":\"36.72…]", resultJsonString);
+    }
+  }
+
+  @Test
+  public void testJsonPostWithFiltersAndNullPostBodyMockServerWithEnhancedMode() throws Exception {
     try (MockWebServer server = startServer()) {
       server.enqueue(
         new MockResponse()
@@ -1536,7 +2895,7 @@
           .setBody(TEST_JSON_RESPONSE)
       );
 
-      String sql = "SELECT * FROM local.mockJsonNullBodyPost WHERE lat=36.7201600 AND lng=-4.4203400";
+      String sql = "SELECT * FROM local2.mockJsonNullBodyPost WHERE `body.lat`=36.7201600 AND `body.lng`=-4.4203400";
       RowSet results = client.queryBuilder().sql(sql).rowSet();
 
       TupleMetadata expectedSchema = new SchemaBuilder()
@@ -1569,7 +2928,48 @@
   }
 
   @Test
-  public void testParamsInPostBody() throws Exception {
+  public void testJsonPostWithFiltersAndNullPostBodyMockServer() throws Exception {
+    try (MockWebServer server = startServer()) {
+      server.enqueue(
+          new MockResponse()
+              .setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT * FROM local.mockJsonNullBodyPost WHERE `lat`=36.7201600 AND `lng`=-4.4203400";
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .addMap("results")
+          .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .resumeSchema()
+          .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+
+      RecordedRequest recordedRequest = server.takeRequest();
+      assertEquals("POST", recordedRequest.getMethod());
+      String resultJsonString = recordedRequest.getBody().toString();
+      assertEquals("[text={\"lng\":\"-4.4203400\",\"lat\":\"36.7201600\"}]", resultJsonString);
+    }
+  }
+
+  @Test
+  public void testParamsInPostBodyWithEnhancedMode() throws Exception {
     try (MockWebServer server = startServer()) {
       server.enqueue(
         new MockResponse()
@@ -1577,7 +2977,7 @@
           .setBody(TEST_JSON_RESPONSE)
       );
 
-      String sql = "SELECT * FROM local.mockPostPushdown WHERE lat=36.7201600 AND lng=-4.4203400";
+      String sql = "SELECT * FROM local2.mockPostPushdown WHERE `body.lat`=36.7201600 AND `body.lng`=-4.4203400";
       RowSet results = client.queryBuilder().sql(sql).rowSet();
 
       TupleMetadata expectedSchema = new SchemaBuilder()
@@ -1610,7 +3010,48 @@
   }
 
   @Test
-  public void testParamsInPostBodyAndStaticParams() throws Exception {
+  public void testParamsInPostBody() throws Exception {
+    try (MockWebServer server = startServer()) {
+      server.enqueue(
+          new MockResponse()
+              .setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT * FROM local.mockPostPushdown WHERE `lat`=36.7201600 AND `lng`=-4.4203400";
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .addMap("results")
+          .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .resumeSchema()
+          .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+
+      RecordedRequest recordedRequest = server.takeRequest();
+      assertEquals("POST", recordedRequest.getMethod());
+      String resultJsonString = recordedRequest.getBody().toString();
+      assertEquals("[text=lng=-4.4203400&lat=36.7201600]", resultJsonString);
+    }
+  }
+
+  @Test
+  public void testParamsInPostBodyAndStaticParamsWithEnhancedMode() throws Exception {
     try (MockWebServer server = startServer()) {
       server.enqueue(
         new MockResponse()
@@ -1618,7 +3059,7 @@
           .setBody(TEST_JSON_RESPONSE)
       );
 
-      String sql = "SELECT * FROM local.mockPostPushdownWithStaticParams WHERE lat=36.7201600 AND lng=-4.4203400";
+      String sql = "SELECT * FROM local2.mockPostPushdownWithStaticParams WHERE `body.lat`=36.7201600 AND `body.lng`=-4.4203400";
       RowSet results = client.queryBuilder().sql(sql).rowSet();
 
       TupleMetadata expectedSchema = new SchemaBuilder()
@@ -1650,6 +3091,47 @@
     }
   }
 
+  @Test
+  public void testParamsInPostBodyAndStaticParams() throws Exception {
+    try (MockWebServer server = startServer()) {
+      server.enqueue(
+          new MockResponse()
+              .setResponseCode(200)
+              .setBody(TEST_JSON_RESPONSE)
+      );
+
+      String sql = "SELECT * FROM local.mockPostPushdownWithStaticParams WHERE `lat`=36.7201600 AND `lng`=-4.4203400";
+      RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+      TupleMetadata expectedSchema = new SchemaBuilder()
+          .addMap("results")
+          .add("sunrise", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("sunset", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("solar_noon", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("day_length", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("civil_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("nautical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .add("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .resumeSchema()
+          .add("status", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+          .build();
+
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+          .addRow(mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM", "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"), "OK")
+          .build();
+
+      RowSetUtilities.verify(expected, results);
+
+      RecordedRequest recordedRequest = server.takeRequest();
+      assertEquals("POST", recordedRequest.getMethod());
+      String resultJsonString = recordedRequest.getBody().toString();
+      assertEquals("[text=key1=value1&key2=value2&lng=-4.4203400&lat=36.7201600]", resultJsonString);
+    }
+  }
+
   /**
    * Helper function to start the MockHTTPServer
    * @return Started Mock server
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
index e176d7c..9df72c5 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
@@ -117,7 +117,7 @@
     configs.put("basicJson", basicJson);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-      new HttpStoragePluginConfig(false, configs, 200, 1000, "globaluser", "globalpass", "",
+      new HttpStoragePluginConfig(false, false, configs, 200, 1000, "globaluser", "globalpass", "",
         80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
         UsernamePasswordCredentials.USERNAME, "globaluser",
         UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFWithAliases.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFWithAliases.java
index 1bd574c..3b22571 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFWithAliases.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFWithAliases.java
@@ -116,7 +116,7 @@
     configs.put("basicJson", basicJson);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-      new HttpStoragePluginConfig(false, configs, 200, 1000, "globaluser", "globalpass", "",
+      new HttpStoragePluginConfig(false, false, configs, 200, 1000, "globaluser", "globalpass", "",
         80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
         UsernamePasswordCredentials.USERNAME, "globaluser",
         UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
index 57f2e81..c4be7e1 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
@@ -111,7 +111,7 @@
 
     // Add storage plugin for test OAuth
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-      new HttpStoragePluginConfig(false, configs, TIMEOUT, 1000, null, null, "", 80, "", "", "",
+      new HttpStoragePluginConfig(false, false, configs, TIMEOUT, 1000, null, null, "", 80, "", "", "",
         oAuthConfig, credentialsProvider, AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("localOauth", mockStorageConfigWithWorkspace);
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java
index 2e818ae..7abb1f2 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java
@@ -91,7 +91,7 @@
 
     // Add storage plugin for test OAuth
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-      new HttpStoragePluginConfig(false, configs, TIMEOUT, 1000, null, null, "", 80, "", "", "",
+      new HttpStoragePluginConfig(false, false, configs, TIMEOUT, 1000, null, null, "", 80, "", "", "",
         oAuthConfig, credentialsProvider, AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("localOauth", mockStorageConfigWithWorkspace);
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
index e5294a9..8363c13 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
@@ -148,7 +148,7 @@
     configs.put("github", githubConfig);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-      new HttpStoragePluginConfig(false, configs, 10, 1000, null, null, "", 80, "", "", "", null,
+      new HttpStoragePluginConfig(false,true, configs, 10, 1000, null, null, "", 80, "", "", "", null,
         PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER, AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("live", mockStorageConfigWithWorkspace);
@@ -346,7 +346,7 @@
     configs.put("customers", mockJsonConfigWithHeaderIndex);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-      new HttpStoragePluginConfig(false, configs, 2,1000, null, null, "", 80, "", "", "", null,
+      new HttpStoragePluginConfig(false, false, configs, 2,1000, null, null, "", 80, "", "", "", null,
         PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER, AuthMode.SHARED_USER.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
     cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestProvidedSchema.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestProvidedSchema.java
index 281320c..8c9a9b6 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestProvidedSchema.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestProvidedSchema.java
@@ -160,7 +160,7 @@
     configs.put("noSchema", noSchema);
 
     HttpStoragePluginConfig mockStorageConfigWithWorkspace =
-      new HttpStoragePluginConfig(false, configs, 2, 1000, "globaluser", "globalpass", "",
+      new HttpStoragePluginConfig(false, false, configs, 2, 1000, "globaluser", "globalpass", "",
         80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
         UsernamePasswordCredentials.USERNAME, "globaluser",
         UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java
index 2252dae..b167a4b 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java
@@ -133,11 +133,11 @@
 
     PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(TEST_USER_2, credentials);
 
-    HttpStoragePluginConfig mockStorageConfigWithWorkspace = new HttpStoragePluginConfig(false, configs, 2, 1000, null, null, "", 80, "", "", "", null, credentialsProvider,
+    HttpStoragePluginConfig mockStorageConfigWithWorkspace = new HttpStoragePluginConfig(false, false, configs, 2, 1000, null, null, "", 80, "", "", "", null, credentialsProvider,
       AuthMode.USER_TRANSLATION.name());
     mockStorageConfigWithWorkspace.setEnabled(true);
 
-    HttpStoragePluginConfig mockOAuthPlugin = new HttpStoragePluginConfig(false, configs, 2, 1000, null, null, "", 80, "", "", "", oAuthConfig, oauthCredentialProvider,
+    HttpStoragePluginConfig mockOAuthPlugin = new HttpStoragePluginConfig(false, false, configs, 2, 1000, null, null, "", 80, "", "", "", oAuthConfig, oauthCredentialProvider,
       AuthMode.USER_TRANSLATION.name());
     mockOAuthPlugin.setEnabled(true);