DRILL-7603 and DRILL-7604: Add schema, options to REST query

Update and revision of work originally done by dobesv.

DRILL-7603: Allow default schema to be set for HTTP queries
DRILL-7604: Allow session options to be set in HTTP queries

Merges the above two. Separates running a REST query from the
JSON representation. Allows setting all option types from
a string (as required by DRILL-7604).

Added default schema to query profile query editor.

Made the two query editors a bit more similar visually,
but see DRILL-7697 for more work needed.

Added a utility to run a server for UI teseting without
a full build.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 027ebb5..a0d14a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -472,7 +472,8 @@
   public static final String JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG = "store.json.reader.print_skipped_invalid_record_number";
   public static final BooleanValidator JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG_VALIDATOR = new BooleanValidator(JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG,
       new OptionDescription("Enables Drill to log the bad records that the JSON record reader skips when reading JSON files. Default is false. (Drill 1.9+)"));
-  public static final DoubleValidator TEXT_ESTIMATED_ROW_SIZE = new RangeDoubleValidator("store.text.estimated_row_size_bytes", 1, Long.MAX_VALUE,
+  public static final String TEXT_ESTIMATED_ROW_SIZE_KEY = "store.text.estimated_row_size_bytes";
+  public static final DoubleValidator TEXT_ESTIMATED_ROW_SIZE = new RangeDoubleValidator(TEXT_ESTIMATED_ROW_SIZE_KEY, 1, Long.MAX_VALUE,
       new OptionDescription("Estimate of the row size in a delimited text file, such as csv. The closer to actual, the better the query plan. Used for all csv files in the system/session where the value is set. Impacts the decision to plan a broadcast join or not."));
 
   public static final String TEXT_WRITER_ADD_HEADER = "store.text.writer.add_header";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java
index ee786c0..6b8ea80 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java
@@ -19,6 +19,8 @@
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.server.options.OptionValue.Kind;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
 
@@ -28,7 +30,7 @@
  */
 
 public abstract class BaseOptionManager implements OptionManager {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseOptionManager.class);
+  private static final Logger logger = LoggerFactory.getLogger(BaseOptionManager.class);
 
   /**
    * Gets the current option value given a validator.
@@ -142,33 +144,14 @@
     final OptionValue.AccessibleScopes type = definition.getMetaData().getAccessibleScopes();
     final OptionValue.OptionScope scope = getScope();
     checkOptionPermissions(name, type, scope);
-    final OptionValue optionValue = OptionValue.create(type, name, value, scope);
+    final OptionValue optionValue = OptionValue.create(type, name, value, scope, validator.getKind());
     validator.validate(optionValue, metaData, this);
     setLocalOptionHelper(optionValue);
   }
 
   @Override
   public void setLocalOption(final OptionValue.Kind kind, final String name, final String valueStr) {
-    Object value;
-
-    switch (kind) {
-      case LONG:
-        value = Long.valueOf(valueStr);
-        break;
-      case DOUBLE:
-        value = Double.valueOf(valueStr);
-        break;
-      case STRING:
-        value = valueStr;
-        break;
-      case BOOLEAN:
-        value = Boolean.valueOf(valueStr);
-        break;
-      default:
-        throw new IllegalArgumentException(String.format("Unsupported kind %s", kind));
-    }
-
-    setLocalOption(name, value);
+    setLocalOption(name, valueStr);
   }
 
   private static void checkOptionPermissions(String name, OptionValue.AccessibleScopes type, OptionValue.OptionScope scope) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
index e7bb476..be90f44 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FallbackOptionManager.java
@@ -22,13 +22,15 @@
 import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
 
 /**
- * An {@link OptionManager} which allows for falling back onto another {@link OptionManager} when retrieving options.
+ * An {@link OptionManager} which allows for falling back onto another
+ * {@link OptionManager} when retrieving options.
  * <p/>
- * {@link FragmentOptionManager} and {@link SessionOptionManager} use {@link SystemOptionManager} as the fall back
- * manager. {@link QueryOptionManager} uses {@link SessionOptionManager} as the fall back manager.
+ * {@link FragmentOptionManager} and {@link SessionOptionManager} use
+ * {@link SystemOptionManager} as the fall back manager.
+ * {@link QueryOptionManager} uses {@link SessionOptionManager} as the fall back
+ * manager.
  */
 public abstract class FallbackOptionManager extends BaseOptionManager {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FallbackOptionManager.class);
 
   protected final OptionManager fallback;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java
index c3cd63d..78dab55 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/InMemoryOptionManager.java
@@ -20,12 +20,13 @@
 import java.util.Map;
 
 /**
- * This is an {@link OptionManager} that holds options in memory rather than in a persistent store. Options stored in
- * {@link SessionOptionManager}, {@link QueryOptionManager}, and {@link FragmentOptionManager} are held in memory
- * (see {@link #options}) whereas the {@link SystemOptionManager} stores options in a persistent store.
+ * This is an {@link OptionManager} that holds options in memory rather than in
+ * a persistent store. Options stored in {@link SessionOptionManager},
+ * {@link QueryOptionManager}, and {@link FragmentOptionManager} are held in
+ * memory (see {@link #options}) whereas the {@link SystemOptionManager} stores
+ * options in a persistent store.
  */
 public abstract class InMemoryOptionManager extends FallbackOptionManager {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InMemoryOptionManager.class);
 
   protected final Map<String, OptionValue> options;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
index 61d430d..e74f46a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
@@ -22,27 +22,38 @@
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.EnumSet;
 
 /**
  * <p>
- * An {@link OptionValue option value} is used internally by an {@link OptionManager} to store a run-time setting. This setting,
- * for example, could affect a query in an execution stage. Instances of this class are JSON serializable and can be stored
- * in a {@link PersistentStore persistent store} (see {@link SystemOptionManager#options}), or
- * in memory (see {@link InMemoryOptionManager#options}).
+ * An {@link OptionValue option value} is used internally by an
+ * {@link OptionManager} to store a run-time setting. This setting, for example,
+ * could affect a query in an execution stage. Instances of this class are JSON
+ * serializable and can be stored in a {@link PersistentStore persistent store}
+ * (see {@link SystemOptionManager#options}), or in memory (see
+ * {@link InMemoryOptionManager#options}).
  * </p>
  * <p>
- * {@link AccessibleScopes} defines the scopes at which the option can be set. If it can be set at System level or Session level or so on.
- * Whereas {@link OptionScope} defines the scope at which the option is being set. If the option is being set at the BOOT time
- * the scope of the option is BOOT. If it is set at SYSTEM level the scope is SYSTEM. Although they look similar there
- * is a fine level which differentiates both of them which is at which level of hierarchy they can be set and
- * at what at level of hierarchy they were actually set.
+ * {@link AccessibleScopes} defines the scopes at which the option can be set.
+ * If it can be set at System level or Session level or so on. Whereas
+ * {@link OptionScope} defines the scope at which the option is being set. If
+ * the option is being set at the BOOT time the scope of the option is BOOT. If
+ * it is set at SYSTEM level the scope is SYSTEM. Although they look similar
+ * there is a fine level which differentiates both of them which is at which
+ * level of hierarchy they can be set and at what at level of hierarchy they
+ * were actually set.
  * </p>
  */
 @JsonInclude(Include.NON_NULL)
 public class OptionValue implements Comparable<OptionValue> {
+  private static final Logger logger = LoggerFactory.getLogger(OptionValue.class);
+
   public static final String JSON_KIND = "kind";
   public static final String JSON_ACCESSIBLE_SCOPES = "accessibleScopes";
   public static final String JSON_NAME = "name";
@@ -114,21 +125,29 @@
 
   public static OptionValue create(Kind kind, AccessibleScopes accessibleScopes,
                                    String name, String val, OptionScope scope) {
-    switch (kind) {
-      case BOOLEAN:
-        return create(accessibleScopes, name, Boolean.valueOf(val), scope);
-      case STRING:
-        return create(accessibleScopes, name, val, scope);
-      case DOUBLE:
-        return create(accessibleScopes, name, Double.parseDouble(val), scope);
-      case LONG:
-        return create(accessibleScopes, name, Long.parseLong(val), scope);
-      default:
-        throw new IllegalArgumentException(String.format("Unsupported kind %s", kind));
+    try {
+      switch (kind) {
+        case BOOLEAN:
+          return create(accessibleScopes, name, Boolean.valueOf(val), scope);
+        case STRING:
+          return create(accessibleScopes, name, val, scope);
+        case DOUBLE:
+          return create(accessibleScopes, name, Double.parseDouble(val), scope);
+        case LONG:
+          return create(accessibleScopes, name, Long.parseLong(val), scope);
+        default:
+          throw new IllegalArgumentException(String.format("Unsupported kind %s", kind));
+      }
+    } catch (NumberFormatException e) {
+      throw UserException.validationError(e)
+        .message("'%s' is not a valid value option '%s' of type %s",
+            val.toString(), name, kind.name())
+        .build(logger);
     }
   }
 
-  public static OptionValue create(AccessibleScopes type, String name, Object val, OptionScope scope) {
+  public static OptionValue create(AccessibleScopes type, String name, Object val, OptionScope scope, Kind kind) {
+    Preconditions.checkArgument(val != null);
     if (val instanceof Boolean) {
       return create(type, name, ((Boolean) val).booleanValue(), scope);
     } else if (val instanceof Long) {
@@ -136,7 +155,7 @@
     } else if (val instanceof Integer) {
       return create(type, name, ((Integer) val).longValue(), scope);
     } else if (val instanceof String) {
-      return create(type, name, (String) val, scope);
+      return fromString(type, name, (String) val, scope, kind);
     } else if (val instanceof Double) {
       return create(type, name, ((Double) val).doubleValue(), scope);
     } else if (val instanceof Float) {
@@ -146,6 +165,41 @@
     throw new IllegalArgumentException(String.format("Unsupported type %s", val.getClass()));
   }
 
+  private static OptionValue fromString(AccessibleScopes type, String name,
+      String val, OptionScope scope, Kind kind) {
+    Preconditions.checkArgument(val != null);
+    val = val.trim();
+    try {
+      switch (kind) {
+        case BOOLEAN: {
+
+          // Strict enforcement of true or false, in any case
+          val = val.toLowerCase();
+          if (!val.equals("true") && !val.equals("false")) {
+            throw UserException.validationError()
+              .message("'%s' is not a valid value for option '%s' of type %s",
+                  val, name, kind.name())
+              .build(logger);
+          }
+          return create(type, name, Boolean.parseBoolean(val.toLowerCase()), scope);
+        }
+        case DOUBLE:
+          return create(type, name, Double.parseDouble(val), scope);
+        case LONG:
+          return create(type, name, Long.parseLong(val), scope);
+        case STRING:
+          return create(type, name, val, scope);
+        default:
+          throw new IllegalStateException(kind.name());
+      }
+    } catch (NumberFormatException e) {
+      throw UserException.validationError(e)
+        .message("'%s' is not a valid value for option '%s' of type %s",
+            val, name, kind.name())
+        .build(logger);
+    }
+  }
+
   @JsonCreator
   private OptionValue(@JsonProperty(JSON_KIND) Kind kind,
                       @JsonProperty(JSON_ACCESSIBLE_SCOPES) AccessibleScopes accessibleScopes,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
index f7da53c..d719567 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
@@ -38,7 +38,6 @@
  * in the reset query itself.
  */
 public class SessionOptionManager extends InMemoryOptionManager {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class);
 
   private final UserSession session;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
index 1910737..a912719 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
@@ -58,6 +58,8 @@
 import org.apache.drill.exec.work.foreman.rm.ThrottledResourceManager;
 import org.apache.http.client.methods.HttpPost;
 import org.glassfish.jersey.server.mvc.Viewable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 
@@ -66,7 +68,7 @@
 @Path("/")
 @PermitAll
 public class DrillRoot {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRoot.class);
+  private static final Logger logger = LoggerFactory.getLogger(DrillRoot.class);
 
   @Inject
   UserAuthEnabled authEnabled;
@@ -234,7 +236,7 @@
         endpoint1.getUserPort() == endpoint2.getUserPort();
   }
 
-  private Response setResponse(Map entity) {
+  private Response setResponse(Map<String, ?> entity) {
     return Response.ok()
             .entity(entity)
             .header("Access-Control-Allow-Origin", "*")
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
index e1fc811..6e6005e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
@@ -17,15 +17,15 @@
  */
 package org.apache.drill.exec.server.rest;
 
-import org.apache.drill.shaded.guava.com.google.common.base.CharMatcher;
 import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
+import org.apache.drill.exec.server.rest.QueryWrapper.RestQueryBuilder;
+import org.apache.drill.exec.server.rest.RestQueryRunner.QueryResult;
 import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
-import org.apache.drill.exec.server.rest.QueryWrapper.QueryResult;
 import org.apache.drill.exec.work.WorkManager;
 import org.glassfish.jersey.server.mvc.Viewable;
 import org.slf4j.Logger;
@@ -34,15 +34,18 @@
 import javax.annotation.security.RolesAllowed;
 import javax.inject.Inject;
 import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.BadRequestException;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.FormParam;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.core.Form;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.SecurityContext;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -90,7 +93,7 @@
   public QueryResult submitQueryJSON(QueryWrapper query) throws Exception {
     try {
       // Run the query
-      return query.run(work, webUserConnection);
+      return new RestQueryRunner(query, work, webUserConnection).run();
     } finally {
       // no-op for authenticated user
       webUserConnection.cleanupSession();
@@ -105,12 +108,20 @@
                               @FormParam("queryType") String queryType,
                               @FormParam("autoLimit") String autoLimit,
                               @FormParam("userName") String userName,
-                              @FormParam("defaultSchema") String defaultSchema) throws Exception {
+                              @FormParam("defaultSchema") String defaultSchema,
+                              Form form) throws Exception {
     try {
-      // Apply options from the form fields, if provided
-      final String trimmedQueryString = CharMatcher.is(';').trimTrailingFrom(query.trim());
-      final QueryResult result = submitQueryJSON(new QueryWrapper(trimmedQueryString, queryType, autoLimit, userName, defaultSchema));
-      List<Integer> rowsPerPageValues = work.getContext().getConfig().getIntList(ExecConstants.HTTP_WEB_CLIENT_RESULTSET_ROWS_PER_PAGE_VALUES);
+      final QueryResult result = submitQueryJSON(
+          new RestQueryBuilder()
+            .query(query)
+            .queryType(queryType)
+            .rowLimit(autoLimit)
+            .userName(userName)
+            .defaultSchema(defaultSchema)
+            .sessionOptions(readOptionsFromForm(form))
+            .build());
+      List<Integer> rowsPerPageValues = work.getContext().getConfig().getIntList(
+          ExecConstants.HTTP_WEB_CLIENT_RESULTSET_ROWS_PER_PAGE_VALUES);
       Collections.sort(rowsPerPageValues);
       final String rowsPerPageValuesAsStr = Joiner.on(",").join(rowsPerPageValues);
       return ViewableWithPermissions.create(authEnabled.get(), "/rest/query/result.ftl", sc, new TabularResult(result, rowsPerPageValuesAsStr));
@@ -121,6 +132,27 @@
   }
 
   /**
+   * Convert the form to a map. The form allows multiple values per key;
+   * discard the entry if empty, throw an error if more than one value.
+   */
+  private Map<String, String> readOptionsFromForm(Form form) {
+    Map<String, String> options = new HashMap<>();
+    for (Map.Entry<String, List<String>> pair : form.asMap().entrySet()) {
+      List<String> values = pair.getValue();
+       if (values.isEmpty()) {
+        continue;
+      }
+      if (values.size() > 1) {
+        throw new BadRequestException(String.format(
+            "Multiple values given for option '%s'", pair.getKey()));
+      }
+
+      options.put(pair.getKey(), values.get(0));
+    }
+    return options;
+  }
+
+  /**
    * Model class for Query page
    */
   public static class QueryPage {
@@ -225,5 +257,4 @@
       return autoLimitedRowCount;
     }
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
index 0a7bcd7..94fcea9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
@@ -17,46 +17,38 @@
  */
 package org.apache.drill.exec.server.rest;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.exceptions.UserRemoteException;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
-import org.apache.drill.exec.proto.UserBitShared.QueryType;
-import org.apache.drill.exec.proto.UserProtos.QueryResultsMode;
-import org.apache.drill.exec.proto.UserProtos.RunQuery;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.rpc.user.InboundImpersonationManager;
-import org.apache.drill.exec.server.options.SessionOptionManager;
-import org.apache.drill.exec.store.SchemaTreeProvider;
-import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.drill.exec.work.WorkManager;
-import org.apache.parquet.Strings;
+import java.util.Map;
 
 import javax.xml.bind.annotation.XmlRootElement;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.shaded.guava.com.google.common.base.CharMatcher;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.parquet.Strings;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 
 @XmlRootElement
 public class QueryWrapper {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWrapper.class);
 
   private final String query;
   private final String queryType;
-  private final int autoLimitRowCount;
-  private final String userName;
-  private final String defaultSchema;
+  final int autoLimitRowCount;
+  final String userName;
+  final String defaultSchema;
+  final Map<String, String> options;
 
-  private static MemoryMXBean memMXBean = ManagementFactory.getMemoryMXBean();
+  protected QueryWrapper(String query, String queryType,
+      int rowCountLimit, String userName, String defaultSchema,
+      Map<String, String> options) {
+    this.query = query;
+    this.queryType = queryType.toUpperCase();
+    this.autoLimitRowCount = rowCountLimit;
+    this.userName = userName;
+    this.defaultSchema = defaultSchema;
+    this.options = options;
+  }
 
   @JsonCreator
   public QueryWrapper(
@@ -64,152 +56,98 @@
     @JsonProperty("queryType") String queryType,
     @JsonProperty("autoLimit") String autoLimit,
     @JsonProperty("userName") String userName,
-    @JsonProperty("defaultSchema") String defaultSchema) {
-    this.query = query;
-    this.queryType = queryType.toUpperCase();
-    this.autoLimitRowCount = autoLimit != null && autoLimit.matches("[0-9]+") ? Integer.valueOf(autoLimit) : 0;
-    this.userName = userName;
-    this.defaultSchema = defaultSchema;
+    @JsonProperty("defaultSchema") String defaultSchema,
+    @JsonProperty("options") Map<String, String> options) {
+    this(query, queryType, mapCount(autoLimit), userName, defaultSchema, options);
   }
 
-  public String getQuery() {
-    return query;
-  }
-
-  public String getQueryType() {
-    return queryType;
-  }
-
-  public QueryType getType() {
-    return QueryType.valueOf(queryType);
-  }
-
-  public QueryResult run(final WorkManager workManager, final WebUserConnection webUserConnection) throws Exception {
-    final RunQuery runQuery = RunQuery.newBuilder().setType(getType())
-        .setPlan(getQuery())
-        .setResultsMode(QueryResultsMode.STREAM_FULL)
-        .setAutolimitRowcount(autoLimitRowCount)
-        .build();
-
-    applyUserName(workManager, webUserConnection);
-
-    int defaultMaxRows = webUserConnection.getSession().getOptions().getOption(ExecConstants.QUERY_MAX_ROWS).num_val.intValue();
-    if (!Strings.isNullOrEmpty(defaultSchema)) {
-      SessionOptionManager options = webUserConnection.getSession().getOptions();
-      SchemaTreeProvider schemaTreeProvider = new SchemaTreeProvider(workManager.getContext());
-      SchemaPlus rootSchema = schemaTreeProvider.createRootSchema(options);
-      webUserConnection.getSession().setDefaultSchemaPath(defaultSchema, rootSchema);
+  private static int mapCount(String rowLimit) {
+    if (Strings.isNullOrEmpty(rowLimit)) {
+      return 0;
     }
-
-    int maxRows;
-    if (autoLimitRowCount > 0 && defaultMaxRows > 0) {
-      maxRows = Math.min(autoLimitRowCount, defaultMaxRows);
-    } else {
-      maxRows = Math.max(autoLimitRowCount, defaultMaxRows);
-    }
-    webUserConnection.setAutoLimitRowCount(maxRows);
-
-    // Heap usage threshold/trigger to provide resiliency on web server for queries submitted via HTTP
-    double memoryFailureThreshold = workManager.getContext().getConfig().getDouble(ExecConstants.HTTP_MEMORY_HEAP_FAILURE_THRESHOLD);
-
-    // Submit user query to Drillbit work queue.
-    final QueryId queryId = workManager.getUserWorker().submitWork(webUserConnection, runQuery);
-
-    boolean isComplete = false;
-    boolean nearlyOutOfHeapSpace = false;
-    float usagePercent = getHeapUsage();
-
-    // Wait until the query execution is complete or there is error submitting the query
-    logger.debug("Wait until the query execution is complete or there is error submitting the query");
-    do {
-      try {
-        isComplete = webUserConnection.await(TimeUnit.SECONDS.toMillis(1)); //periodically timeout 1 sec to check heap
-      } catch (InterruptedException e) {}
-      usagePercent = getHeapUsage();
-      if (memoryFailureThreshold > 0 && usagePercent > memoryFailureThreshold) {
-        nearlyOutOfHeapSpace = true;
-      }
-    } while (!isComplete && !nearlyOutOfHeapSpace);
-
-    //Fail if nearly out of heap space
-    if (nearlyOutOfHeapSpace) {
-      UserException almostOutOfHeapException = UserException.resourceError()
-          .message("There is not enough heap memory to run this query using the web interface. ")
-          .addContext("Please try a query with fewer columns or with a filter or limit condition to limit the data returned. ")
-          .addContext("You can also try an ODBC/JDBC client. ")
-          .build(logger);
-      //Add event
-      workManager.getBee().getForemanForQueryId(queryId)
-        .addToEventQueue(QueryState.FAILED, almostOutOfHeapException);
-      //Return NearlyOutOfHeap exception
-      throw almostOutOfHeapException;
-    }
-
-    logger.trace("Query {} is completed ", queryId);
-
-    if (webUserConnection.getError() != null) {
-      throw new UserRemoteException(webUserConnection.getError());
-    }
-
-    // Return the QueryResult.
-    return new QueryResult(queryId, webUserConnection, webUserConnection.results);
-  }
-
-  private void applyUserName(WorkManager workManager, WebUserConnection webUserConnection) {
-    SessionOptionManager options = webUserConnection.getSession().getOptions();
-    DrillConfig config = workManager.getContext().getConfig();
-    if (!Strings.isNullOrEmpty(userName)) {
-      if (!config.getBoolean(ExecConstants.IMPERSONATION_ENABLED)) {
-        throw UserException.permissionError().message("User impersonation is not enabled").build(logger);
-      }
-      InboundImpersonationManager inboundImpersonationManager = new InboundImpersonationManager();
-      boolean isAdmin = !config.getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED) ||
-        ImpersonationUtil.hasAdminPrivileges(webUserConnection.getSession().getCredentials().getUserName(),
-          ExecConstants.ADMIN_USERS_VALIDATOR.getAdminUsers(options),
-          ExecConstants.ADMIN_USER_GROUPS_VALIDATOR.getAdminUserGroups(options));
-      if (isAdmin) {
-        // Admin user can impersonate any user they want to (when authentication is disabled, all users are admin)
-        webUserConnection.getSession().replaceUserCredentials(
-          inboundImpersonationManager,
-          UserBitShared.UserCredentials.newBuilder().setUserName(userName).build());
-      } else {
-        // Check configured impersonation rules to see if this user is allowed to impersonate the given user
-        inboundImpersonationManager.replaceUserOnSession(userName, webUserConnection.getSession());
-      }
+    try {
+      return Integer.parseInt(rowLimit.trim());
+    } catch (NumberFormatException e) {
+      return 0;
     }
   }
 
-  //Detect possible excess heap
-  private float getHeapUsage() {
-    return (float) memMXBean.getHeapMemoryUsage().getUsed() / memMXBean.getHeapMemoryUsage().getMax();
-  }
+  public String getQuery() { return query; }
 
-  public static class QueryResult {
-    private final String queryId;
-    public final Collection<String> columns;
-    public final List<Map<String, String>> rows;
-    public final List<String> metadata;
-    public final String queryState;
-    public final int attemptedAutoLimit;
+  public String getQueryType() { return queryType; }
 
-    //DRILL-6847:  Modified the constructor so that the method has access to all the properties in webUserConnection
-    public QueryResult(QueryId queryId, WebUserConnection webUserConnection, List<Map<String, String>> rows) {
-        this.queryId = QueryIdHelper.getQueryId(queryId);
-        this.columns = webUserConnection.columns;
-        this.metadata = webUserConnection.metadata;
-        this.queryState = webUserConnection.getQueryState();
-        this.rows = rows;
-        this.attemptedAutoLimit = webUserConnection.getAutoLimitRowCount();
-      }
+  public String getUserName() { return userName; }
 
-    public String getQueryId() {
-      return queryId;
-    }
-  }
+  public int getAutoLimitRowCount() { return autoLimitRowCount; }
+
+  public String getDefaultSchema() { return defaultSchema; }
+
+  public Map<String, String> getOptions() { return options; }
 
   @Override
   public String toString() {
-    return "QueryRequest [queryType=" + queryType + ", query=" + query + "]";
+    return new PlanStringBuilder(this)
+        .field("query", query)
+        .field("query type", queryType)
+        .field("user name", userName)
+        .field("default schema", defaultSchema)
+        .field("row limit", autoLimitRowCount)
+        .toString();
   }
 
+  public static final class RestQueryBuilder {
+
+    private String query;
+    private String queryType = "SQL";
+    private int rowLimit;
+    private String userName;
+    private String defaultSchema;
+    private Map<String, String> options;
+
+    public RestQueryBuilder query(String query) {
+      this.query = query;
+      return this;
+    }
+
+    public RestQueryBuilder queryType(String queryType) {
+      this.queryType = queryType;
+      return this;
+    }
+
+    public RestQueryBuilder rowLimit(int rowLimit) {
+      this.rowLimit = rowLimit;
+      return this;
+    }
+
+    public RestQueryBuilder rowLimit(String rowLimit) {
+      this.rowLimit = mapCount(rowLimit);
+      return this;
+    }
+
+    public RestQueryBuilder userName(String userName) {
+      this.userName = userName;
+      return this;
+    }
+
+    public RestQueryBuilder defaultSchema(String defaultSchema) {
+      this.defaultSchema = defaultSchema;
+      return this;
+    }
+
+    /**
+     * Optional session option values encoded as strings.
+     */
+    public RestQueryBuilder sessionOptions(Map<String, String> options) {
+      this.options = options;
+      return this;
+    }
+
+    public QueryWrapper build() {
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(query));
+      query = CharMatcher.is(';').trimTrailingFrom(query.trim());
+      Preconditions.checkArgument(!query.isEmpty());
+      return new QueryWrapper(query, queryType, rowLimit, userName,
+          defaultSchema, options);
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/RestQueryRunner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/RestQueryRunner.java
new file mode 100644
index 0000000..e944386
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/RestQueryRunner.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserProtos.QueryResultsMode;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.user.InboundImpersonationManager;
+import org.apache.drill.exec.server.options.SessionOptionManager;
+import org.apache.drill.exec.store.SchemaTreeProvider;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.work.WorkManager;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RestQueryRunner {
+  private static final Logger logger = LoggerFactory.getLogger(QueryWrapper.class);
+  private static final MemoryMXBean memMXBean = ManagementFactory.getMemoryMXBean();
+
+  private final QueryWrapper query;
+  private final WorkManager workManager;
+  private final WebUserConnection webUserConnection;
+  private final SessionOptionManager options;
+
+  public RestQueryRunner(final QueryWrapper query, final WorkManager workManager, final WebUserConnection webUserConnection) {
+    this.query = query;
+    this.workManager = workManager;
+    this.webUserConnection = webUserConnection;
+    this.options = webUserConnection.getSession().getOptions();
+  }
+
+  public RestQueryRunner.QueryResult run() throws Exception {
+    applyUserName();
+    applyOptions();
+    applyDefaultSchema();
+    int maxRows = applyRowLimit();
+    return submitQuery(maxRows);
+  }
+
+  private void applyUserName() {
+    String userName = query.getUserName();
+    if (!Strings.isNullOrEmpty(userName)) {
+      DrillConfig config = workManager.getContext().getConfig();
+      if (!config.getBoolean(ExecConstants.IMPERSONATION_ENABLED)) {
+        throw UserException.permissionError()
+          .message("User impersonation is not enabled")
+          .build(logger);
+      }
+      InboundImpersonationManager inboundImpersonationManager = new InboundImpersonationManager();
+      boolean isAdmin = !config.getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED) ||
+        ImpersonationUtil.hasAdminPrivileges(
+            webUserConnection.getSession().getCredentials().getUserName(),
+            ExecConstants.ADMIN_USERS_VALIDATOR.getAdminUsers(options),
+            ExecConstants.ADMIN_USER_GROUPS_VALIDATOR.getAdminUserGroups(options));
+      if (isAdmin) {
+        // Admin user can impersonate any user they want to (when authentication is disabled, all users are admin)
+        webUserConnection.getSession().replaceUserCredentials(
+          inboundImpersonationManager,
+          UserBitShared.UserCredentials.newBuilder().setUserName(userName).build());
+      } else {
+        // Check configured impersonation rules to see if this user is allowed to impersonate the given user
+        inboundImpersonationManager.replaceUserOnSession(userName, webUserConnection.getSession());
+      }
+    }
+  }
+
+  private void applyOptions() {
+    Map<String, String> options = query.getOptions();
+    if (options != null) {
+      SessionOptionManager sessionOptionManager = webUserConnection.getSession().getOptions();
+      for (Map.Entry<String, String> entry : options.entrySet()) {
+        sessionOptionManager.setLocalOption(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+
+  private void applyDefaultSchema() throws ValidationException {
+    String defaultSchema = query.getDefaultSchema();
+    if (!Strings.isNullOrEmpty(defaultSchema)) {
+      SessionOptionManager options = webUserConnection.getSession().getOptions();
+      @SuppressWarnings("resource")
+      SchemaTreeProvider schemaTreeProvider = new SchemaTreeProvider(workManager.getContext());
+      SchemaPlus rootSchema = schemaTreeProvider.createRootSchema(options);
+      webUserConnection.getSession().setDefaultSchemaPath(defaultSchema, rootSchema);
+    }
+  }
+
+  private int applyRowLimit() {
+    int defaultMaxRows = webUserConnection.getSession().getOptions().getInt(ExecConstants.QUERY_MAX_ROWS);
+    int maxRows;
+    int limit = query.getAutoLimitRowCount();
+    if (limit > 0 && defaultMaxRows > 0) {
+      maxRows = Math.min(limit, defaultMaxRows);
+    } else {
+      maxRows = Math.max(limit, defaultMaxRows);
+    }
+    webUserConnection.setAutoLimitRowCount(maxRows);
+    return maxRows;
+  }
+
+  public RestQueryRunner.QueryResult submitQuery(int maxRows) {
+    final RunQuery runQuery = RunQuery.newBuilder()
+        .setType(QueryType.valueOf(query.getQueryType()))
+        .setPlan(query.getQuery())
+        .setResultsMode(QueryResultsMode.STREAM_FULL)
+        .setAutolimitRowcount(maxRows)
+        .build();
+
+    // Heap usage threshold/trigger to provide resiliency on web server for queries submitted via HTTP
+    double memoryFailureThreshold = workManager.getContext().getConfig().getDouble(ExecConstants.HTTP_MEMORY_HEAP_FAILURE_THRESHOLD);
+
+    // Submit user query to Drillbit work queue.
+    final QueryId queryId = workManager.getUserWorker().submitWork(webUserConnection, runQuery);
+
+    boolean isComplete = false;
+    boolean nearlyOutOfHeapSpace = false;
+    float usagePercent = getHeapUsage();
+
+    // Wait until the query execution is complete or there is error submitting the query
+    logger.debug("Wait until the query execution is complete or there is error submitting the query");
+    do {
+      try {
+        isComplete = webUserConnection.await(TimeUnit.SECONDS.toMillis(1)); //periodically timeout 1 sec to check heap
+      } catch (InterruptedException e) {}
+      usagePercent = getHeapUsage();
+      if (memoryFailureThreshold > 0 && usagePercent > memoryFailureThreshold) {
+        nearlyOutOfHeapSpace = true;
+      }
+    } while (!isComplete && !nearlyOutOfHeapSpace);
+
+    //Fail if nearly out of heap space
+    if (nearlyOutOfHeapSpace) {
+      UserException almostOutOfHeapException = UserException.resourceError()
+          .message("There is not enough heap memory to run this query using the web interface. ")
+          .addContext("Please try a query with fewer columns or with a filter or limit condition to limit the data returned. ")
+          .addContext("You can also try an ODBC/JDBC client. ")
+          .build(logger);
+      //Add event
+      workManager.getBee().getForemanForQueryId(queryId)
+        .addToEventQueue(QueryState.FAILED, almostOutOfHeapException);
+      //Return NearlyOutOfHeap exception
+      throw almostOutOfHeapException;
+    }
+
+    logger.trace("Query {} is completed ", queryId);
+
+    if (webUserConnection.getError() != null) {
+      throw new UserRemoteException(webUserConnection.getError());
+    }
+
+    // Return the QueryResult.
+    return new QueryResult(queryId, webUserConnection, webUserConnection.results);
+  }
+
+  //Detect possible excess heap
+  private float getHeapUsage() {
+    return (float) memMXBean.getHeapMemoryUsage().getUsed() / memMXBean.getHeapMemoryUsage().getMax();
+  }
+
+  public static class QueryResult {
+    private final String queryId;
+    public final Collection<String> columns;
+    public final List<Map<String, String>> rows;
+    public final List<String> metadata;
+    public final String queryState;
+    public final int attemptedAutoLimit;
+
+    //DRILL-6847:  Modified the constructor so that the method has access to all the properties in webUserConnection
+    public QueryResult(QueryId queryId, WebUserConnection webUserConnection, List<Map<String, String>> rows) {
+        this.queryId = QueryIdHelper.getQueryId(queryId);
+        this.columns = webUserConnection.columns;
+        this.metadata = webUserConnection.metadata;
+        this.queryState = webUserConnection.getQueryState();
+        this.rows = rows;
+        this.attemptedAutoLimit = webUserConnection.getAutoLimitRowCount();
+      }
+
+    public String getQueryId() {
+      return queryId;
+    }
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
index 17d97d7..6819457 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
@@ -63,6 +63,8 @@
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.glassfish.jersey.servlet.ServletContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.servlet.DispatcherType;
 import javax.servlet.http.HttpSession;
@@ -88,6 +90,8 @@
  * Wrapper class around jetty based web server.
  */
 public class WebServer implements AutoCloseable {
+  private static final Logger logger = LoggerFactory.getLogger(WebServer.class);
+
   private static final String ACE_MODE_SQL_TEMPLATE_JS = "ace.mode-sql.template.js";
   private static final String ACE_MODE_SQL_JS = "mode-sql.js";
   private static final String DRILL_FUNCTIONS_PLACEHOLDER = "__DRILL_FUNCTIONS__";
@@ -95,7 +99,6 @@
   private static final String STATUS_THREADS_PATH = "/status/threads";
   private static final String STATUS_METRICS_PATH = "/status/metrics";
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WebServer.class);
   private static final String OPTIONS_DESCRIBE_JS = "options.describe.js";
   private static final String OPTIONS_DESCRIBE_TEMPLATE_JS = "options.describe.template.js";
 
@@ -145,7 +148,6 @@
       return;
     }
 
-
     final QueuedThreadPool threadPool = new QueuedThreadPool(2, 2);
     embeddedJetty = new Server(threadPool);
 
@@ -200,18 +202,20 @@
     servletContextHandler.addServlet(new ServletHolder(new ThreadDumpServlet()), STATUS_THREADS_PATH);
 
     final ServletHolder staticHolder = new ServletHolder("static", DefaultServlet.class);
+
     // Get resource URL for Drill static assets, based on where Drill icon is located
     String drillIconResourcePath =
-        Resource.newClassPathResource(BASE_STATIC_PATH + DRILL_ICON_RESOURCE_RELATIVE_PATH).getURL().toString();
+        Resource.newClassPathResource(BASE_STATIC_PATH + DRILL_ICON_RESOURCE_RELATIVE_PATH).getURI().toString();
     staticHolder.setInitParameter("resourceBase",
         drillIconResourcePath.substring(0, drillIconResourcePath.length() - DRILL_ICON_RESOURCE_RELATIVE_PATH.length()));
     staticHolder.setInitParameter("dirAllowed", "false");
     staticHolder.setInitParameter("pathInfoOnly", "true");
     servletContextHandler.addServlet(staticHolder, "/static/*");
 
-    //Add Local path resource (This will allow access to dynamically created files like JavaScript)
+    // Add Local path resource (This will allow access to dynamically created files like JavaScript)
     final ServletHolder dynamicHolder = new ServletHolder("dynamic", DefaultServlet.class);
-    //Skip if unable to get a temp directory (e.g. during Unit tests)
+
+    // Skip if unable to get a temp directory (e.g. during Unit tests)
     if (getOrCreateTmpJavaScriptDir() != null) {
       dynamicHolder.setInitParameter("resourceBase", getOrCreateTmpJavaScriptDir().getAbsolutePath());
       dynamicHolder.setInitParameter("dirAllowed", "true");
@@ -220,7 +224,7 @@
     }
 
     if (authEnabled) {
-      //DrillSecurityHandler is used to support SPNEGO and FORM authentication together
+      // DrillSecurityHandler is used to support SPNEGO and FORM authentication together
       servletContextHandler.setSecurityHandler(new DrillHttpSecurityHandlerProvider(config, workManager.getContext()));
       servletContextHandler.setSessionHandler(createSessionHandler(servletContextHandler.getSecurityHandler()));
     }
@@ -247,10 +251,11 @@
       }
     }
 
-    //Allow for Other Drillbits to make REST calls
+    // Allow for Other Drillbits to make REST calls
     FilterHolder filterHolder = new FilterHolder(CrossOriginFilter.class);
     filterHolder.setInitParameter("allowedOrigins", "*");
-    //Allowing CORS for metrics only
+
+    // Allowing CORS for metrics only
     servletContextHandler.addFilter(filterHolder, STATUS_METRICS_PATH, null);
 
     FilterHolder responseHeadersSettingFilter = new FilterHolder(ResponseHeadersSettingFilter.class);
@@ -403,7 +408,6 @@
     return tmpJavaScriptDir;
   }
 
-
   /**
    * Generate Options Description JavaScript to serve http://drillhost/options ACE library search features
    * @throws IOException when unable to generate functions JS file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
index e14e696..1cdc359 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
@@ -58,13 +58,15 @@
 import org.apache.drill.exec.work.WorkManager;
 import org.apache.drill.exec.work.foreman.Foreman;
 import org.glassfish.jersey.server.mvc.Viewable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 @Path("/")
 @RolesAllowed(DrillUserPrincipal.AUTHENTICATED_ROLE)
 public class ProfileResources {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProfileResources.class);
+  private static final Logger logger = LoggerFactory.getLogger(ProfileResources.class);
 
   @Inject
   UserAuthEnabled authEnabled;
@@ -208,9 +210,9 @@
 
   @XmlRootElement
   public class QProfiles {
-    private List<ProfileInfo> runningQueries;
-    private List<ProfileInfo> finishedQueries;
-    private List<String> errors;
+    private final List<ProfileInfo> runningQueries;
+    private final List<ProfileInfo> finishedQueries;
+    private final List<String> errors;
 
     public QProfiles(List<ProfileInfo> runningQueries, List<ProfileInfo> finishedQueries, List<String> errors) {
       this.runningQueries = runningQueries;
@@ -368,7 +370,6 @@
     .build(logger);
   }
 
-
   @GET
   @Path("/profiles/{queryid}.json")
   @Produces(MediaType.APPLICATION_JSON)
@@ -440,4 +441,3 @@
     }
   }
 }
-
diff --git a/exec/java-exec/src/main/resources/rest/profile/profile.ftl b/exec/java-exec/src/main/resources/rest/profile/profile.ftl
index 704c2dd..99b27b9 100644
--- a/exec/java-exec/src/main/resources/rest/profile/profile.ftl
+++ b/exec/java-exec/src/main/resources/rest/profile/profile.ftl
@@ -87,7 +87,7 @@
     //Injects Estimated Rows
     function injectEstimatedRows() {
       Object.keys(opRowCountMap).forEach(key => {
-        var tgtElem = $("td.estRowsAnchor[key='" + key + "']"); 
+        var tgtElem = $("td.estRowsAnchor[key='" + key + "']");
         var status = tgtElem.append("<div class='estRows' title='Estimated'>(" + opRowCountMap[key] + ")</div>");
       });
     }
@@ -164,18 +164,16 @@
     <div id="query-edit" class="tab-pane">
       <p>
 
-        <#if model.isOnlyImpersonationEnabled()>
+       <#-- DRILL-7697: merge with copy in query.ftl -->
+       <form role="form" id="queryForm" action="/query" method="POST">
+          <#if model.isOnlyImpersonationEnabled()>
+            <div class="form-group">
+              <label for="userName">User Name</label>
+              <input type="text" size="30" name="userName" id="userName" placeholder="User Name" value="${model.getProfile().user}">
+            </div>
+          </#if>
           <div class="form-group">
-            <label for="userName">User Name</label>
-            <input type="text" size="30" name="userName" id="userName" placeholder="User Name" value="${model.getProfile().user}">
-          </div>
-        </#if>
-
-        <form role="form" id="queryForm" action="/query" method="POST">
-          <div id="query-editor" class="form-group">${model.getProfile().query}</div>
-          <input class="form-control" id="query" name="query" type="hidden" value="${model.getProfile().query}"/>
-          <div style="padding:5px"><b>Hint: </b>Use <div id="keyboardHint" style="display:inline-block; font-style:italic"></div> to submit</div>
-          <div class="form-group">
+            <label for="queryType">Query type:&nbsp;&nbsp;</label>
             <div class="radio-inline">
               <label>
                 <input type="radio" name="queryType" id="sql" value="SQL" checked>
@@ -185,22 +183,39 @@
             <div class="radio-inline">
               <label>
                  <input type="radio" name="queryType" id="physical" value="PHYSICAL">
-                     PHYSICAL
+                     Physical
               </label>
             </div>
             <div class="radio-inline">
               <label>
                 <input type="radio" name="queryType" id="logical" value="LOGICAL">
-                    LOGICAL
+                    Logical
               </label>
             </div>
+
+            <div class="form-group">
+              <div style="display: inline-block"><label for="query">Query</label></div>
+              <div style="display: inline-block; float:right; padding-right:5%"><b>Hint: </b>Use
+                <div id="keyboardHint" style="display:inline-block; font-style:italic"></div> to submit</div>
+              <div id="query-editor">${model.getProfile().query}</div>
+              <input class="form-control" id="query" name="query" type="hidden" value="${model.getProfile().query}"/>
             </div>
+
             <div>
-              <button class="btn btn-default" type="button" id="rerunButton" onclick="<#if model.isOnlyImpersonationEnabled()>doSubmitQueryWithUserName()<#else>doSubmitQueryWithAutoLimit()</#if>">
-            Re-run query
+              <button class="btn btn-primary" type="button" id="rerunButton" onclick="<#if model.isOnlyImpersonationEnabled()>doSubmitQueryWithUserName()<#else>doSubmitQueryWithAutoLimit()</#if>">
+                Re-run query
               </button>
-              <input type="checkbox" name="forceLimit" value="limit" <#if model.hasAutoLimit()>checked</#if>> Limit results to <input type="text" id="autoLimit" name="autoLimit" min="0" value="<#if model.hasAutoLimit()>${model.getAutoLimit()?c}<#else>${model.getDefaultAutoLimit()?c}</#if>" size="6" pattern="[0-9]*"> rows <span class="glyphicon glyphicon-info-sign" title="Limits the number of records retrieved in the query. Ignored if query has a limit already" style="cursor:pointer"></span>
-            </div>
+              &nbsp;&nbsp;&nbsp;
+              <input type="checkbox" name="forceLimit" value="limit" <#if model.hasAutoLimit()>checked</#if>>
+                Limit results to <input type="text" id="autoLimit" name="autoLimit" min="0"
+                  value="<#if model.hasAutoLimit()>${model.getAutoLimit()?c}<#else>${model.getDefaultAutoLimit()?c}</#if>" size="6" pattern="[0-9]*">
+                  rows <span class="glyphicon glyphicon-info-sign" title="Limits the number of records retrieved in the query.
+                  Ignored if query has a limit already" style="cursor:pointer"></span>
+              &nbsp;&nbsp;&nbsp;
+              Default schema: <input type="text" size="10" name="defaultSchema" id="defaultSchema">
+                <span class="glyphicon glyphicon-info-sign" title="Set the default schema used to find table names
+                and for SHOW FILES and SHOW TABLES." style="cursor:pointer"></span>
+           </div>
             <input type="hidden" name="csrfToken" value="${model.getCsrfToken()}">
           </form>
       </p>
@@ -620,7 +635,7 @@
     document.getElementById('queryForm')
             .addEventListener('keydown', function(e) {
       if (!(e.keyCode == 13 && (e.metaKey || e.ctrlKey))) return;
-      if (e.target.form) 
+      if (e.target.form)
         <#if model.isOnlyImpersonationEnabled()>doSubmitQueryWithUserName()<#else>doSubmitQueryWithAutoLimit()</#if>;
     });
 
diff --git a/exec/java-exec/src/main/resources/rest/query/query.ftl b/exec/java-exec/src/main/resources/rest/query/query.ftl
index 1034674..d124e95 100644
--- a/exec/java-exec/src/main/resources/rest/query/query.ftl
+++ b/exec/java-exec/src/main/resources/rest/query/query.ftl
@@ -43,6 +43,7 @@
 
 <#include "*/runningQuery.ftl">
 
+  <#-- DRILL-7697: merge with copy in profile.ftl -->
   <form role="form" id="queryForm" action="/query" method="POST">
     <#if model.isOnlyImpersonationEnabled()>
       <div class="form-group">
@@ -51,49 +52,55 @@
       </div>
     </#if>
     <div class="form-group">
-      <label for="queryType">Query Type</label>
-      <div class="radio">
+      <label for="queryType">Query type:&nbsp;&nbsp;</label>
+      <div class="radio-inline">
         <label>
           <input type="radio" name="queryType" id="sql" value="SQL" checked>
           SQL
         </label>
       </div>
-      <div class="radio">
+      <div class="radio-inline">
         <label>
           <input type="radio" name="queryType" id="physical" value="PHYSICAL">
-          PHYSICAL
+          Physical
         </label>
       </div>
-      <div class="radio">
+      <div class="radio-inline">
         <label>
           <input type="radio" name="queryType" id="logical" value="LOGICAL">
-          LOGICAL
+          Logical
         </label>
       </div>
     </div>
+
     <div class="form-group">
       <div style="display: inline-block"><label for="query">Query</label></div>
-      <div style="display: inline-block; float:right; padding-right:5%"><b>Hint: </b>Use <div id="keyboardHint" style="display:inline-block; font-style:italic"></div> to submit</div>
+      <div style="display: inline-block; float:right; padding-right:5%"><b>Hint: </b>Use
+        <div id="keyboardHint" style="display:inline-block; font-style:italic"></div> to submit</div>
       <div id="query-editor-format"></div>
       <input class="form-control" type="hidden" id="query" name="query" autofocus/>
     </div>
 
-    <button class="btn btn-default" type="button" onclick="<#if model.isOnlyImpersonationEnabled()>doSubmitQueryWithUserName()<#else>doSubmitQueryWithAutoLimit()</#if>">
+    <button class="btn btn-primary" type="button" onclick="<#if model.isOnlyImpersonationEnabled()>doSubmitQueryWithUserName()<#else>doSubmitQueryWithAutoLimit()</#if>">
       Submit
     </button>
-    <input type="checkbox" name="forceLimit" value="limit" <#if model.isAutoLimitEnabled()>checked</#if>> Limit results to <input type="text" id="autoLimit" name="autoLimit" min="0" value="${model.getDefaultRowsAutoLimited()?c}" size="6" pattern="[0-9]*"> rows <span class="glyphicon glyphicon-info-sign" title="Limits the number of records retrieved in the query. Ignored if query has a limit already" style="cursor:pointer"></span>
-    <label for="defaultSchema">
-      Default Schema
-      <input type="text" name="defaultSchema" id="defaultSchema" list="enabledPlugins" placeholder="-- default schema --">
-      <datalist id="enabledPlugins">
-        <#list model.getEnabledPlugins() as pluginModel>
-          <#if pluginModel.getPlugin()?? && pluginModel.getPlugin().enabled() == true>
-            <option value="${pluginModel.getPlugin().getName()}">
-          </#if>
-        </#list>
-      </datalist>
-    </label>
-    <span class="glyphicon glyphicon-info-sign" title="Set the default schema used to find table names, and for SHOW FILES and SHOW TABLES" style="cursor:pointer"></span>
+    &nbsp;&nbsp;&nbsp;
+    <input type="checkbox" name="forceLimit" value="limit" <#if model.isAutoLimitEnabled()>checked</#if>>
+      Limit results to <input type="text" id="autoLimit" name="autoLimit" min="0" value="${model.getDefaultRowsAutoLimited()?c}" size="6" pattern="[0-9]*">
+      rows <span class="glyphicon glyphicon-info-sign" title="Limits the number of records retrieved in the query.
+      Ignored if query has a LIMIT clause." style="cursor:pointer"></span>
+    &nbsp;&nbsp;&nbsp;
+    Default schema:
+    <input type="text" name="defaultSchema" id="defaultSchema" list="enabledPlugins" placeholder="schema">
+    <datalist id="enabledPlugins">
+      <#list model.getEnabledPlugins() as pluginModel>
+        <#if pluginModel.getPlugin()?? && pluginModel.getPlugin().enabled() == true>
+          <option value="${pluginModel.getPlugin().getName()}">
+        </#if>
+      </#list>
+    </datalist>
+     <span class="glyphicon glyphicon-info-sign" title="Set the default schema used to find table names
+      and for SHOW FILES and SHOW TABLES." style="cursor:pointer"></span>
     <input type="hidden" name="csrfToken" value="${model.getCsrfToken()}">
   </form>
 
@@ -179,7 +186,7 @@
     document.getElementById('queryForm')
             .addEventListener('keydown', function(e) {
       if (!(e.keyCode == 13 && (e.metaKey || e.ctrlKey))) return;
-      if (e.target.form) //Submit [Wrapped] Query 
+      if (e.target.form) //Submit [Wrapped] Query
         <#if model.isOnlyImpersonationEnabled()>doSubmitQueryWithUserName()<#else>doSubmitQueryWithAutoLimit()</#if>;
     });
   </script>
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/OptionValueTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/OptionValueTest.java
index 56f4ba6..8c30723 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/OptionValueTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/options/OptionValueTest.java
@@ -17,11 +17,15 @@
  */
 package org.apache.drill.exec.server.options;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.test.BaseTest;
-import org.junit.Assert;
 import org.junit.Test;
 
 public class OptionValueTest extends BaseTest {
+
   @Test
   public void createBooleanKindTest() {
     final OptionValue createdValue = OptionValue.create(
@@ -31,7 +35,7 @@
     final OptionValue expectedValue = OptionValue.create(
       OptionValue.AccessibleScopes.ALL, "myOption", true, OptionValue.OptionScope.SYSTEM);
 
-    Assert.assertEquals(expectedValue, createdValue);
+    assertEquals(expectedValue, createdValue);
   }
 
   @Test
@@ -43,7 +47,16 @@
     final OptionValue expectedValue = OptionValue.create(
       OptionValue.AccessibleScopes.ALL, "myOption", 1.5, OptionValue.OptionScope.SYSTEM);
 
-    Assert.assertEquals(expectedValue, createdValue);
+    assertEquals(expectedValue, createdValue);
+
+    try {
+      OptionValue.create(
+          OptionValue.Kind.DOUBLE, OptionValue.AccessibleScopes.ALL,
+          "myOption", "bogus", OptionValue.OptionScope.SYSTEM);
+      fail();
+    } catch (UserException e) {
+      // Expected
+    }
   }
 
   @Test
@@ -55,7 +68,16 @@
     final OptionValue expectedValue = OptionValue.create(
       OptionValue.AccessibleScopes.ALL, "myOption", 3000l, OptionValue.OptionScope.SYSTEM);
 
-    Assert.assertEquals(expectedValue, createdValue);
+    assertEquals(expectedValue, createdValue);
+
+    try {
+      OptionValue.create(
+          OptionValue.Kind.LONG, OptionValue.AccessibleScopes.ALL,
+          "myOption", "bogus", OptionValue.OptionScope.SYSTEM);
+      fail();
+    } catch (UserException e) {
+      // Expected
+    }
   }
 
   @Test
@@ -67,6 +89,6 @@
     final OptionValue expectedValue = OptionValue.create(
       OptionValue.AccessibleScopes.ALL, "myOption", "wabalubawubdub", OptionValue.OptionScope.SYSTEM);
 
-    Assert.assertEquals(expectedValue, createdValue);
+    assertEquals(expectedValue, createdValue);
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/InteractiveUI.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/InteractiveUI.java
new file mode 100644
index 0000000..1e2617b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/InteractiveUI.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+
+/**
+ * Quick-and-dirty tool to run the Web UI for debugging without having
+ * to wait or a full build to run using {@code drillbit.sh}.
+ */
+public class InteractiveUI extends ClusterTest {
+
+  public static void main(String[] args) {
+    ClusterFixtureBuilder builder = new ClusterFixtureBuilder();
+    builder.configBuilder().put(ExecConstants.HTTP_ENABLE, true);
+    try {
+      startCluster(builder);
+      for (;;) {
+        Thread.sleep(1000);
+      }
+    } catch (Exception e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java
index 2915d28..0e9e48d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java
@@ -20,24 +20,32 @@
 import io.netty.channel.DefaultChannelPromise;
 import io.netty.channel.local.LocalAddress;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.server.rest.QueryWrapper.RestQueryBuilder;
+import org.apache.drill.exec.server.rest.RestQueryRunner.QueryResult;
 import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
 import org.apache.drill.exec.work.WorkManager;
 import org.apache.drill.exec.work.foreman.Foreman;
 import org.apache.drill.test.ClusterTest;
 
 public class RestServerTest extends ClusterTest {
-  protected QueryWrapper.QueryResult runQuery(String sql) throws Exception {
-    return runQuery(new QueryWrapper(sql, "SQL", null, null, null));
+
+  protected QueryResult runQuery(String sql) throws Exception {
+    return runQuery(new RestQueryBuilder().query(sql).build());
   }
 
-  protected QueryWrapper.QueryResult runQueryWithUsername(String sql, String userName) throws Exception {
-    return runQuery(new QueryWrapper(sql, "SQL", null, userName, null));
+  protected QueryResult runQueryWithUsername(String sql, String userName) throws Exception {
+    return runQuery(
+        new RestQueryBuilder()
+          .query(sql)
+          .userName(userName)
+          .build());
   }
 
-  protected QueryWrapper.QueryResult runQuery(QueryWrapper q) throws Exception {
+  protected QueryResult runQuery(QueryWrapper q) throws Exception {
     SystemOptionManager systemOptions = cluster.drillbit().getContext().getOptionManager();
     DrillUserPrincipal principal = new DrillUserPrincipal.AnonDrillUserPrincipal();
     WebSessionResources webSessionResources = new WebSessionResources(
@@ -49,11 +57,10 @@
         .build(),
       new DefaultChannelPromise(null));
     WebUserConnection connection = new WebUserConnection.AnonWebUserConnection(webSessionResources);
-    return q.run(cluster.drillbit().getManager(), connection);
+    return new RestQueryRunner(q, cluster.drillbit().getManager(), connection).run();
   }
 
-
-  protected UserBitShared.QueryProfile getQueryProfile(QueryWrapper.QueryResult result) {
+  protected QueryProfile getQueryProfile(QueryResult result) {
     String queryId = result.getQueryId();
     WorkManager workManager = cluster.drillbit().getManager();
     Foreman f = workManager.getBee().getForemanForQueryId(QueryIdHelper.getQueryIdFromString(queryId));
@@ -65,5 +72,4 @@
     }
     return workManager.getContext().getProfileStoreContext().getCompletedProfileStore().get(queryId);
   }
-
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestQueryWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestQueryWrapper.java
index aaa7931..4c13310 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestQueryWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestQueryWrapper.java
@@ -17,18 +17,23 @@
  */
 package org.apache.drill.exec.server.rest;
 
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.test.ClusterFixture;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.rest.QueryWrapper.RestQueryBuilder;
+import org.apache.drill.exec.server.rest.RestQueryRunner.QueryResult;
+import org.apache.drill.test.ClusterFixture;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
 public class TestQueryWrapper extends RestServerTest {
 
   @BeforeClass
@@ -40,7 +45,7 @@
 
   @Test
   public void testShowSchemas() throws Exception {
-    QueryWrapper.QueryResult result = runQuery("SHOW SCHEMAS");
+    QueryResult result = runQuery("SHOW SCHEMAS");
     assertEquals("COMPLETED", result.queryState);
     assertNotEquals(0, result.rows.size());
     assertEquals(1, result.columns.size());
@@ -50,8 +55,10 @@
   @Test
   public void testImpersonationDisabled() throws Exception {
     try {
-      QueryWrapper q = new QueryWrapper("SHOW SCHEMAS", "SQL", null, "alfred", null);
-      runQuery(q);
+      runQuery(new RestQueryBuilder()
+          .query("SHOW SCHEMAS")
+          .userName("alfred")
+          .build());
       fail("Should have thrown exception");
     } catch (UserException e) {
       assertThat(e.getMessage(), containsString("User impersonation is not enabled"));
@@ -60,9 +67,83 @@
 
   @Test
   public void testSpecifyDefaultSchema() throws Exception {
-    QueryWrapper.QueryResult result = runQuery(new QueryWrapper("SHOW FILES", "SQL", null, null, "dfs.tmp"));
+    QueryResult result = runQuery(
+        new RestQueryBuilder()
+          .query("SHOW FILES")
+          .defaultSchema("dfs.tmp")
+          .build());
     // SHOW FILES will fail if default schema is not provided
     assertEquals("COMPLETED", result.queryState);
   }
 
+  protected QueryResult runQueryWithOption(String sql, String name, String value) throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(name, value);
+    return runQuery(
+        new RestQueryBuilder()
+        .query(sql)
+        .sessionOptions(options)
+        .build());
+  }
+
+  @Test
+  public void testOptionWithQuery() throws Exception {
+    runOptionTest(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY, "true");   // Boolean
+    runOptionTest(ExecConstants.QUERY_MAX_ROWS, "10");                // Long
+    runOptionTest(ExecConstants.TEXT_ESTIMATED_ROW_SIZE_KEY, "10.5"); // Double
+    runOptionTest(ExecConstants.OUTPUT_FORMAT_OPTION, "json");        // String
+  }
+
+  public void runOptionTest(String option, String value) throws Exception {
+    String query = String.format("SELECT val FROM sys.options WHERE `name`='%s'", option);
+    String origValue = client.queryBuilder()
+        .sql(query)
+        .singletonString();
+    assertNotEquals(origValue, value,
+        "Not a valid test: new value is the same as the current value");
+    QueryResult result = runQueryWithOption(query, option, value);
+    assertEquals(1, result.rows.size());
+    assertEquals(1, result.columns.size());
+    assertEquals(value, result.rows.get(0).get("val"));
+  }
+
+  @Test
+  public void testInvalidOptionName() throws Exception {
+    try {
+      runQueryWithOption("SHOW SCHEMAS", "xxx", "s");
+      fail("Expected exception to be thrown");
+    } catch (Exception e) {
+      assertThat(e.getMessage(), containsString("The option 'xxx' does not exist."));
+    }
+  }
+
+  @Test
+  public void testInvalidBooleanOption() {
+    try {
+      runQueryWithOption("SHOW SCHEMAS", ExecConstants.ENABLE_VERBOSE_ERRORS_KEY, "not a boolean");
+      fail("Expected exception to be thrown");
+    } catch (Exception e) {
+      assertThat(e.getMessage(), containsString("not a valid value"));
+    }
+  }
+
+  @Test
+  public void testInvalidLongOption() {
+    try {
+      runQueryWithOption("SHOW SCHEMAS", ExecConstants.QUERY_MAX_ROWS, "bogus");
+      fail("Expected exception to be thrown");
+    } catch (Exception e) {
+      assertThat(e.getMessage(), containsString("not a valid value"));
+    }
+  }
+
+  @Test
+  public void testInvalidDoubleOption() {
+    try {
+      runQueryWithOption("SHOW SCHEMAS", ExecConstants.TEXT_ESTIMATED_ROW_SIZE_KEY, "bogus");
+      fail("Expected exception to be thrown");
+    } catch (Exception e) {
+      assertThat(e.getMessage(), containsString("not a valid value"));
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestQueryWrapperImpersonation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestQueryWrapperImpersonation.java
index fef8f1a..188a3fe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestQueryWrapperImpersonation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestQueryWrapperImpersonation.java
@@ -19,6 +19,7 @@
 
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.rest.RestQueryRunner.QueryResult;
 import org.apache.drill.test.ClusterFixture;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -27,6 +28,7 @@
 import static org.junit.Assert.assertNotNull;
 
 public final class TestQueryWrapperImpersonation extends RestServerTest {
+
   @BeforeClass
   public static void setupServer() throws Exception {
     startCluster(ClusterFixture.bareBuilder(dirTestWatcher)
@@ -36,7 +38,8 @@
 
   @Test
   public void testImpersonation() throws Exception {
-    QueryWrapper.QueryResult result = runQueryWithUsername("SELECT CATALOG_NAME, SCHEMA_NAME FROM information_schema.SCHEMATA", "alfred");
+    QueryResult result = runQueryWithUsername(
+        "SELECT CATALOG_NAME, SCHEMA_NAME FROM information_schema.SCHEMATA", "alfred");
     UserBitShared.QueryProfile queryProfile = getQueryProfile(result);
     assertNotNull(queryProfile);
     assertEquals("alfred", queryProfile.getUser());
@@ -44,10 +47,10 @@
 
   @Test
   public void testImpersonationEnabledButUserNameNotProvided() throws Exception {
-    QueryWrapper.QueryResult result = runQueryWithUsername("SELECT CATALOG_NAME, SCHEMA_NAME FROM information_schema.SCHEMATA", null);
+    QueryResult result = runQueryWithUsername(
+        "SELECT CATALOG_NAME, SCHEMA_NAME FROM information_schema.SCHEMATA", null);
     UserBitShared.QueryProfile queryProfile = getQueryProfile(result);
     assertNotNull(queryProfile);
     assertEquals("anonymous", queryProfile.getUser());
   }
-
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index c8b64a4..1ba053c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -248,6 +248,11 @@
   }
 
   private void configureStoragePlugins(Drillbit bit) throws Exception {
+
+    // Skip plugins if not running in test mode.
+    if (builder.dirTestWatcher == null) {
+      return;
+    }
     // Create the dfs name space
     builder.dirTestWatcher.newDfsTestTmpDir();
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
index 9bc5312..5aa9ff6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
@@ -30,6 +30,15 @@
  * Build a Drillbit and client with the options provided. The simplest
  * builder starts an embedded Drillbit, with the "dfs" name space,
  * a max width (parallelization) of 2.
+ * <p>
+ * Designed primarily for unit tests: the builders provide control
+ * over all aspects of the Drillbit or cluster. Can also be used to
+ * create an embedded Drillbit, use the zero-argument
+ * constructor which will omit creating set of test-only directories
+ * and will skip creating the test-only storage plugins and other
+ * configuration. In this mode, you should configure the builder
+ * to read from a config file, or specify all the non-default
+ * config options needed.
  */
 public class ClusterFixtureBuilder {
 
@@ -58,6 +67,10 @@
   protected Properties clientProps;
   protected final BaseDirTestWatcher dirTestWatcher;
 
+  public ClusterFixtureBuilder() {
+    this.dirTestWatcher = null;
+  }
+
   public ClusterFixtureBuilder(BaseDirTestWatcher dirTestWatcher) {
     this.dirTestWatcher = Preconditions.checkNotNull(dirTestWatcher);
   }