DRILL-7868: Restore correct master branch state

Reverts 54a0ec6225ff275844ab055cfc8af1db2c429904 and 76d5654b34c4c42b078662e55a693f60d12b35ac
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 5b7ebbc..3876e20 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -58,7 +58,6 @@
     <module>storage-jdbc</module>
     <module>storage-kafka</module>
     <module>storage-kudu</module>
-    <module>storage-phoenix</module>
     <module>storage-opentsdb</module>
     <module>storage-splunk</module>
     <module>storage-http</module>
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
index 680281e..b779c9a 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuite.java
@@ -53,7 +53,6 @@
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
-import org.junit.runners.Suite.SuiteClasses;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,9 +66,15 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 @RunWith(Suite.class)
-@SuiteClasses({TestMongoFilterPushDown.class, TestMongoProjectPushDown.class,
-    TestMongoQueries.class, TestMongoChunkAssignment.class,
-    TestMongoStoragePluginUsesCredentialsStore.class})
+@Suite.SuiteClasses({
+  TestMongoFilterPushDown.class,
+  TestMongoProjectPushDown.class,
+  TestMongoQueries.class,
+  TestMongoChunkAssignment.class,
+  TestMongoStoragePluginUsesCredentialsStore.class,
+  TestMongoDrillIssue.class
+})
+
 @Category({SlowTest.class, MongoStorageTest.class})
 public class MongoTestSuite extends BaseTest implements MongoTestConstants {
 
diff --git a/contrib/storage-phoenix/pom.xml b/contrib/storage-phoenix/pom.xml
deleted file mode 100644
index 2244bfc..0000000
--- a/contrib/storage-phoenix/pom.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.drill.contrib</groupId>
-    <artifactId>drill-contrib-parent</artifactId>
-    <version>1.19.0-SNAPSHOT</version>
-  </parent>
-  <artifactId>drill-storage-phoenix</artifactId>
-  <name>Drill : Contrib : Storage : Phoenix</name>
-  
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.drill.exec</groupId>
-      <artifactId>drill-java-exec</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.drill.exec</groupId>
-      <artifactId>drill-java-exec</artifactId>
-      <classifier>tests</classifier>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.drill</groupId>
-      <artifactId>drill-common</artifactId>
-      <classifier>tests</classifier>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-</project>
\ No newline at end of file
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
deleted file mode 100644
index 26870b5..0000000
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.drill.exec.store.phoenix;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
-import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
-import org.apache.drill.exec.physical.resultSet.RowSetLoader;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> {
-
-  private static final Logger logger = LoggerFactory.getLogger(PhoenixBatchReader.class);
-
-  private final PhoenixSubScan subScan;
-  private PhoenixResultSet resultSet;
-
-  private ColumnDefn[] columns;
-  private int count = 1;
-
-  public PhoenixBatchReader(PhoenixSubScan subScan) {
-    this.subScan = subScan;
-  }
-
-  @Override
-  public boolean open(SchemaNegotiator negotiator) {
-    negotiator.tableSchema(defineMetadata(), true);
-//    negotiator.batchSize(3);
-    resultSet = new PhoenixResultSet(negotiator.build());
-    bindColumns(resultSet.getWriter());
-    return true;
-  }
-
-  String abc = "Currently, the Apache Drill build process is known to work on Linux, Windows and OSX.";
-
-  @Override
-  public boolean next() {
-    if (count > 3) {
-      return false;
-    }
-//    byte[] value = new byte[512];
-//    Arrays.fill(value, (byte) String.valueOf(count).charAt(0));
-    while(!resultSet.getWriter().isFull()) {
-      resultSet.getWriter().start();
-      for (int i = 0; i < columns.length; i++) {
-        columns[i].load(count + "\t" + abc);
-      }
-      resultSet.getWriter().save();
-    }
-    count++;
-    return true;
-  }
-
-  @Override
-  public void close() {
-    int count = resultSet.getWriter().loader().batchCount();
-    logger.info("phoenix fetch batch size: {}", count);
-  }
-
-  private TupleMetadata defineMetadata() {
-    List<String> cols = new ArrayList<String>(Arrays.asList("a", "b", "c"));
-    columns = new ColumnDefn[cols.size()];
-    SchemaBuilder builder = new SchemaBuilder();
-    for (int i = 0; i < cols.size(); i++) {
-      columns[i] = makeColumn(cols.get(i), i);
-      columns[i].define(builder);
-    }
-    return builder.buildSchema();
-  }
-
-  private ColumnDefn makeColumn(String name, int index) {
-    return new VarCharDefn(name, index);
-  }
-
-  private void bindColumns(RowSetLoader loader) {
-    for (int i = 0; i < columns.length; i++) {
-      columns[i].bind(loader);
-    }
-  }
-
-  public abstract static class ColumnDefn {
-
-    final String name;
-    int index;
-    ScalarWriter writer;
-
-    public String getName() {
-      return name;
-    }
-
-    public int getIndex() {
-      return index;
-    }
-
-    public ColumnDefn(String name, int index) {
-      this.name = name;
-      this.index = index;
-    }
-
-    public void bind(RowSetLoader loader) {
-      writer = loader.scalar(getName());
-    }
-
-    public abstract void define(SchemaBuilder builder);
-
-    public abstract void load(String value);
-
-    public abstract void load(byte[] value);
-
-    public abstract void load(int index, String value);
-  }
-
-  public static class VarCharDefn extends ColumnDefn {
-
-    public VarCharDefn(String name, int index) {
-      super(name, index);
-    }
-
-    @Override
-    public void define(SchemaBuilder builder) {
-      builder.addNullable(getName(), MinorType.VARCHAR);
-    }
-
-    @Override
-    public void load(String value) {
-      writer.setString(value);
-    }
-
-    @Override
-    public void load(byte[] value) {
-      writer.setBytes(value, value.length);
-    }
-
-    @Deprecated
-    @Override
-    public void load(int index, String value) {  }
-  }
-}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java
deleted file mode 100644
index 21b5b01..0000000
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java
+++ /dev/null
@@ -1,130 +0,0 @@
-package org.apache.drill.exec.store.phoenix;
-
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.ScanStats;
-import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
-import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("phoenix-scan")
-public class PhoenixGroupScan extends AbstractGroupScan {
-
-  private final List<SchemaPath> columns;
-  private final PhoenixScanSpec scanSpec;
-  private final ScanStats scanStats;
-
-  private int hashCode;
-
-  public PhoenixGroupScan(PhoenixScanSpec scanSpec) {
-    super("no-user");
-    this.scanSpec = scanSpec;
-    this.columns = ALL_COLUMNS;
-    this.scanStats = computeScanStats();
-  }
-
-  public PhoenixGroupScan(PhoenixGroupScan groupScan) {
-    super(groupScan);
-    this.scanSpec = groupScan.scanSpec;
-    this.columns = groupScan.columns;
-    this.scanStats = groupScan.scanStats;
-  }
-
-  public PhoenixGroupScan(PhoenixGroupScan groupScan, List<SchemaPath> columns) {
-    super(groupScan);
-    this.scanSpec = groupScan.scanSpec;
-    this.columns = columns;
-    this.scanStats = computeScanStats();
-  }
-
-  @JsonCreator
-  public PhoenixGroupScan(
-      @JsonProperty("columns") List<SchemaPath> columns,
-      @JsonProperty("scanSpec") PhoenixScanSpec scanSpec) {
-    super("no-user");
-    this.columns = columns;
-    this.scanSpec = scanSpec;
-    this.scanStats = computeScanStats();
-  }
-
-  @Override
-  @JsonProperty("columns")
-  public List<SchemaPath> getColumns() {
-    return columns;
-  }
-
-  @JsonProperty("scanSpec")
-  public PhoenixScanSpec getScanSpec() {
-    return scanSpec;
-  }
-
-  @Override
-  public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {  }
-
-  @Override
-  public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
-    return new PhoenixSubScan(scanSpec, columns);
-  }
-
-  @Override
-  public int getMaxParallelizationWidth() {
-    return 1;
-  }
-
-  @Override
-  public String getDigest() {
-    return toString();
-  }
-
-  @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
-    return new PhoenixGroupScan(columns, scanSpec);
-  }
-
-  @Override
-  public GroupScan clone(List<SchemaPath> columns) {
-    return new PhoenixGroupScan(this, columns);
-  }
-
-  @Override
-  public ScanStats getScanStats() {
-    return scanStats;
-  }
-
-  @Override
-  public int hashCode() {
-    if(hashCode == 0) {
-      hashCode = Objects.hash(scanSpec, columns);
-    }
-    return hashCode;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if(this == obj) {
-      return true;
-    }
-    if(obj == null || getClass() != obj.getClass()) {
-      return false;
-    }
-    PhoenixGroupScan groupScan = (PhoenixGroupScan) obj;
-    return Objects.equals(scanSpec, groupScan.getScanSpec()) && Objects.equals(columns, groupScan.getColumns());
-  }
-
-  private ScanStats computeScanStats() {
-    int estRowCount = 10_000;
-    double cpuRatio = 1.0;
-    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, cpuRatio, 0);
-  }
-}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixResultSet.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixResultSet.java
deleted file mode 100644
index 51a0625..0000000
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixResultSet.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.drill.exec.store.phoenix;
-
-import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
-import org.apache.drill.exec.physical.resultSet.RowSetLoader;
-
-public class PhoenixResultSet {
-
-  private ResultSetLoader loader;
-  private RowSetLoader writer;
-
-  public PhoenixResultSet(ResultSetLoader loader) {
-    this.loader = loader;
-    this.writer = loader.writer();
-  }
-
-  public RowSetLoader getWriter() {
-    return writer;
-  }
-}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanBatchCreator.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanBatchCreator.java
deleted file mode 100644
index cd51365..0000000
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanBatchCreator.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.apache.drill.exec.store.phoenix;
-
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ChildErrorContext;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.ops.ExecutorFragmentContext;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
-import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
-import org.apache.drill.exec.record.CloseableRecordBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.server.options.OptionManager;
-
-public class PhoenixScanBatchCreator implements BatchCreator<PhoenixSubScan> {
-
-  @Override
-  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, PhoenixSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
-    try {
-      ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan);
-      return builder.buildScanOperator(context, subScan);
-    } catch (UserException e) {
-      throw e;
-    } catch (Throwable e) {
-      throw new ExecutionSetupException(e);
-    }
-  }
-
-  private ScanFrameworkBuilder createBuilder(OptionManager options, PhoenixSubScan subScan) {
-    ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
-    builder.projection(subScan.getColumns());
-    builder.setUserName(subScan.getUserName());
-
-    builder.errorContext(new ChildErrorContext(builder.errorContext()) {
-
-      @Override
-      public void addContext(UserException.Builder builder) {
-        builder.addContext("tableName", subScan.getScanSpec().getTableName());
-      }
-
-    });
-
-    ReaderFactory readerFactory = new PhoenixReaderFactory(subScan);
-    builder.setReaderFactory(readerFactory);
-    builder.nullType(Types.optional(MinorType.VARCHAR));
-
-    return builder;
-  }
-
-  private static class PhoenixReaderFactory implements ReaderFactory {
-
-    private final PhoenixSubScan subScan;
-    private int count;
-
-    public PhoenixReaderFactory(PhoenixSubScan subScan) {
-      this.subScan = subScan;
-    }
-
-    @Override
-    public void bind(ManagedScanFramework framework) {  }
-
-    @Override
-    public ManagedReader<? extends SchemaNegotiator> next() {
-      String tableName = subScan.getScanSpec().getTableName();
-      if (count++ == 0) {
-        if (tableName.startsWith("my")) {
-          return new PhoenixBatchReader(subScan);
-        }
-      }
-      return null;
-    }
-  }
-}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanSpec.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanSpec.java
deleted file mode 100644
index dfe0a1e..0000000
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanSpec.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.drill.exec.store.phoenix;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("phoenix-scan-spec")
-public class PhoenixScanSpec {
-
-  private final String tableName;
-
-  @JsonCreator
-  public PhoenixScanSpec(String tableName) {
-    this.tableName = tableName;
-  }
-
-  public String getTableName() {
-    return tableName;
-  }
-}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
deleted file mode 100644
index b32ad13..0000000
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.drill.exec.store.phoenix;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Table;
-import org.apache.drill.common.map.CaseInsensitiveMap;
-import org.apache.drill.exec.planner.logical.DynamicDrillTable;
-import org.apache.drill.exec.store.AbstractSchema;
-import org.apache.drill.exec.store.AbstractSchemaFactory;
-import org.apache.drill.exec.store.SchemaConfig;
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-
-public class PhoenixSchemaFactory extends AbstractSchemaFactory {
-
-  public static final String MY_TABLE = "myTable";
-
-  private final PhoenixStoragePlugin plugin;
-
-  public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
-    super(plugin.getName());
-    this.plugin = plugin;
-  }
-
-  @Override
-  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    PhoenixSchema schema = new PhoenixSchema(plugin);
-    parent.add(getName(), schema);
-  }
-
-  protected static class PhoenixSchema extends AbstractSchema {
-
-    private final Map<String, DynamicDrillTable> activeTables = CaseInsensitiveMap.newHashMap();
-    private final PhoenixStoragePlugin plugin;
-
-    public PhoenixSchema(PhoenixStoragePlugin plugin) {
-      super(Collections.emptyList(), plugin.getName());
-      this.plugin = plugin;
-    }
-
-    @Override
-    public Table getTable(String name) {
-      DynamicDrillTable table = activeTables.get(name);
-      if (table != null) {
-        return table;
-      }
-      if (MY_TABLE.contentEquals(name)) {
-        return registerTable(name,
-            new DynamicDrillTable(plugin, plugin.getName(),
-                new PhoenixScanSpec(name)));
-      }
-      return null;
-    }
-
-    private DynamicDrillTable registerTable(String name, DynamicDrillTable table) {
-      activeTables.put(name, table);
-      return table;
-    }
-
-    @Override
-    public Set<String> getTableNames() {
-      return Sets.newHashSet(MY_TABLE);
-    }
-
-    @Override
-    public String getTypeName() {
-      return PhoenixStoragePluginConfig.NAME;
-    }
-  }
-}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
deleted file mode 100644
index 7a7f58c..0000000
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package org.apache.drill.exec.store.phoenix;
-
-import java.io.IOException;
-
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.drill.common.JSONOptions;
-import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.AbstractStoragePlugin;
-import org.apache.drill.exec.store.SchemaConfig;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-
-public class PhoenixStoragePlugin extends AbstractStoragePlugin {
-
-  private final PhoenixStoragePluginConfig config;
-  private final PhoenixSchemaFactory schemaFactory;
-
-  public PhoenixStoragePlugin(PhoenixStoragePluginConfig config, DrillbitContext context, String name) {
-    super(context, name);
-    this.config = config;
-    this.schemaFactory = new PhoenixSchemaFactory(this);
-  }
-
-  @Override
-  public StoragePluginConfig getConfig() {
-    return config;
-  }
-
-  @Override
-  public boolean supportsRead() {
-    return true;
-  }
-
-  @Override
-  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
-    schemaFactory.registerSchemas(schemaConfig, parent);
-  }
-
-  @Override
-  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
-      PhoenixScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<PhoenixScanSpec>() {});
-      return new PhoenixGroupScan(scanSpec);
-  }
-}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java
deleted file mode 100644
index e9f8a86..0000000
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.apache.drill.exec.store.phoenix;
-
-import java.util.Objects;
-import java.util.Properties;
-
-import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.logical.StoragePluginConfig;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName(PhoenixStoragePluginConfig.NAME)
-public class PhoenixStoragePluginConfig extends StoragePluginConfig {
-
-  public static final String NAME = "phoenix";
-
-  public String driverName = "org.apache.phoenix.queryserver.client.Driver";
-  public String host;
-  public int port = 8765;
-  public String username;
-  public String password;
-  public Properties props;
-
-  @JsonCreator
-  public PhoenixStoragePluginConfig(
-      @JsonProperty("driverName") String driverName,
-      @JsonProperty("host") String host,
-      @JsonProperty("port") int port,
-      @JsonProperty("username") String username,
-      @JsonProperty("password") String password,
-      @JsonProperty("props") Properties props) {
-    this.driverName = driverName;
-    this.host = host;
-    this.port = port;
-    this.username = username;
-    this.password = password;
-    this.props = props;
-  }
-
-  @JsonProperty("driverName")
-  public String getDriverName() {
-    return driverName;
-  }
-
-  @JsonProperty("host")
-  public String getHost() {
-    return host;
-  }
-
-  @JsonProperty("port")
-  public int getPort() {
-    return port;
-  }
-
-  @JsonProperty("username")
-  public String getUsername() {
-    return username;
-  }
-
-  @JsonProperty("password")
-  public String getPassword() {
-    return password;
-  }
-
-  @JsonProperty("props")
-  public Properties getProps() {
-    return props;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o == this) {
-      return true;
-    }
-    if (o == null || !(o instanceof PhoenixStoragePluginConfig) ) {
-      return false;
-    }
-    return Objects.equals(this.host, ((PhoenixStoragePluginConfig)o).getHost()) &&
-        Objects.equals(this.port, ((PhoenixStoragePluginConfig)o).getPort());
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(host, port);
-  }
-
-  @Override
-  public String toString() {
-    return new PlanStringBuilder(PhoenixStoragePluginConfig.NAME)
-        .field("driverName", driverName)
-        .field("host", host)
-        .field("port", port)
-        .field("username", username)
-        .toString();
-  }
-}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java
deleted file mode 100644
index 390c958..0000000
--- a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package org.apache.drill.exec.store.phoenix;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.base.AbstractBase;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("phoenix-sub-scan")
-public class PhoenixSubScan extends AbstractBase implements SubScan {
-
-  private final List<SchemaPath> columns;
-  private final PhoenixScanSpec scanSpec;
-
-  public PhoenixSubScan(
-      @JsonProperty("scanSpec") PhoenixScanSpec scanSpec,
-      @JsonProperty("columns") List<SchemaPath> columns) {
-    super("user-if-needed");
-    this.scanSpec = scanSpec;
-    this.columns = columns;
-  }
-
-  public List<SchemaPath> getColumns() {
-    return columns;
-  }
-
-  public PhoenixScanSpec getScanSpec() {
-    return scanSpec;
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
-    return physicalVisitor.visitSubScan(this, value);
-  }
-
-  @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
-    return new PhoenixSubScan(scanSpec, columns);
-  }
-
-  @Override
-  public String getOperatorType() {
-    return "PHOENIX";
-  }
-
-  @Override
-  public Iterator<PhysicalOperator> iterator() {
-    return ImmutableSet.<PhysicalOperator>of().iterator();
-  }
-
-  @Override
-  public String toString() {
-    return new PlanStringBuilder(this)
-      .field("columns", columns)
-      .field("scanSpec", scanSpec)
-      .toString();
-  }
-}
diff --git a/contrib/storage-phoenix/src/main/resources/drill-module.conf b/contrib/storage-phoenix/src/main/resources/drill-module.conf
deleted file mode 100644
index 3d19b1a..0000000
--- a/contrib/storage-phoenix/src/main/resources/drill-module.conf
+++ /dev/null
@@ -1,27 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#  This file tells Drill to consider this module when class path scanning.
-#  This file can also include any supplementary configuration information.
-#  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
-
-drill: {
-  classpath.scanning: {
-    packages += "org.apache.drill.exec.store.phoenix"
-  }
-}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestPhoenixPlugin.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestPhoenixPlugin.java
deleted file mode 100644
index 59fd962..0000000
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestPhoenixPlugin.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.drill.exec.store.phoenix;
-
-import static org.junit.Assert.fail;
-
-import org.apache.drill.exec.physical.rowSet.RowSet;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.test.ClusterFixtureBuilder;
-import org.apache.drill.test.ClusterTest;
-import org.apache.drill.test.QueryRowSetIterator;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestPhoenixPlugin extends ClusterTest {
-
-  private static final Logger logger = LoggerFactory.getLogger(TestPhoenixPlugin.class);
-
-  @BeforeClass
-  public static void setup() throws Exception {
-    ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher);
-    startCluster(builder);
-
-    StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage();
-    PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, "gwssi-app0102", 8765, "guest", "gwssi123", null);
-    config.setEnabled(true);
-    registry.put(PhoenixStoragePluginConfig.NAME, config);
-  }
-
-  public void test() {
-    fail("Not yet implemented");
-  }
-
-  public void testSchema() throws Exception {
-    String sql = "select * from phoenix.myTable";
-    queryBuilder().sql(sql).run();
-  }
-
-  @Test
-  public void testScan() throws Exception {
-    String sql = "select * from phoenix.myTable";
-    QueryRowSetIterator iterator = queryBuilder().sql(sql).rowSetIterator();
-    int count = 0;
-    for (RowSet rowset : iterator) {
-      rowset.print();
-      rowset.clear();
-      count++;
-    }
-    logger.info("phoenix fetch batch size : {}", count + 1);
-  }
-}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestRowSet.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestRowSet.java
deleted file mode 100644
index ebefa99..0000000
--- a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestRowSet.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package org.apache.drill.exec.store.phoenix;
-
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.rowSet.DirectRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
-import org.apache.drill.exec.physical.rowSet.RowSetWriter;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.test.SubOperatorTest;
-import org.junit.Test;
-
-public class TestRowSet extends SubOperatorTest {
-
-  @Test
-  public void testRowSet() {
-    final TupleMetadata schema = new SchemaBuilder()
-        .add("id", MinorType.INT)
-        .add("name",MinorType.VARCHAR)
-        .buildSchema();
-
-    final RowSet rowSet = new RowSetBuilder(fixture.allocator(), schema)
-        .addRow(1, "luocong")
-        .addRow(2, "sunny")
-        .build();
-
-    rowSet.print();
-    rowSet.clear();
-
-    DirectRowSet directRowSet = DirectRowSet.fromSchema(fixture.allocator(), schema);
-    RowSetWriter writer = directRowSet.writer();
-    writer.scalar("id").setInt(1);
-    writer.scalar("name").setString("luocong");
-    writer.scalar("id").setInt(2);
-    writer.scalar("name").setString("sunny");
-    writer.save();
-    SingleRowSet record = writer.done();
-    record.print();
-    record.clear();
-  }
-}
diff --git a/contrib/udfs/README.md b/contrib/udfs/README.md
index 3ef761c..ae65e1d 100644
--- a/contrib/udfs/README.md
+++ b/contrib/udfs/README.md
@@ -272,3 +272,40 @@
 SELECT parse_user_agent( `user_agent`, 'AgentName` ) as AgentName ...
 ```
 which will just return the requested field. If the user agent string is empty, all fields will have the value of `Hacker`.  
+
+## Map Schema Function
+This function allows you to drill down into the schema of maps.  The REST API and JDBC interfaces will only return `MAP`, `LIST` for the MAP, however, it is not possible to get 
+the schema of the inner map. The function `getMapSchema(<MAP>)` will return a `MAP` of the fields and datatypes.
+
+### Example Usage
+
+Using the data below, the query below will return the schema as shown below.
+```bash
+apache drill> SELECT getMapSchema(record) AS schema FROM dfs.test.`schema_test.json`;
++----------------------------------------------------------------------------------+
+|                                      schema                                      |
++----------------------------------------------------------------------------------+
+| {"int_field":"BIGINT","double_field":"FLOAT8","string_field":"VARCHAR","int_list":"REPEATED_BIGINT","double_list":"REPEATED_FLOAT8","map":"MAP"} |
++----------------------------------------------------------------------------------+
+1 row selected (0.298 seconds)
+```
+
+```json
+{
+  "record" : {
+    "int_field": 1,
+    "double_field": 2.0,
+    "string_field": "My string",
+    "int_list": [1,2,3],
+    "double_list": [1.0,2.0,3.0],
+    "map": {
+      "nested_int_field" : 5,
+      "nested_double_field": 5.0,
+      "nested_string_field": "5.0"
+    }
+  },
+  "single_field": 10
+}
+```
+
+The function returns an empty map if the row is `null`.
diff --git a/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/ComplexSchemaFunctions.java b/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/ComplexSchemaFunctions.java
new file mode 100644
index 0000000..50fef82
--- /dev/null
+++ b/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/ComplexSchemaFunctions.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.udfs;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+
+import javax.inject.Inject;
+
+public class ComplexSchemaFunctions {
+
+  /**
+   * This function exists to help the user understand the inner schemata of maps
+   * It is NOT recursive (yet).
+   */
+  @FunctionTemplate(names = {"get_map_schema", "getMapSchema"},
+    scope = FunctionTemplate.FunctionScope.SIMPLE,
+    nulls = NullHandling.INTERNAL)
+  public static class GetMapSchemaFunction implements DrillSimpleFunc {
+
+    @Param
+    FieldReader reader;
+
+    @Output
+    BaseWriter.ComplexWriter outWriter;
+
+    @Inject
+    DrillBuf outBuffer;
+
+    @Override
+    public void setup() {
+      // Nothing to see here...
+    }
+
+    @Override
+    public void eval() {
+      if (reader.isSet()) {
+        org.apache.drill.exec.udfs.ComplexSchemaUtils.getFields(reader, outWriter, outBuffer);
+      } else {
+        org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter queryMapWriter = outWriter.rootAsMap();
+        // Return empty map
+        queryMapWriter.start();
+        queryMapWriter.end();
+      }
+    }
+  }
+}
diff --git a/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/ComplexSchemaUtils.java b/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/ComplexSchemaUtils.java
new file mode 100644
index 0000000..7f027aa
--- /dev/null
+++ b/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/ComplexSchemaUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.udfs;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+
+import java.util.Iterator;
+
+public class ComplexSchemaUtils {
+
+  public static void getFields(FieldReader reader, BaseWriter.ComplexWriter outWriter, DrillBuf buffer) {
+
+    BaseWriter.MapWriter queryMapWriter = outWriter.rootAsMap();
+
+    if (reader.getType().getMinorType() != MinorType.MAP) {
+      // If the field is not a map, return an empty map
+      queryMapWriter.start();
+      queryMapWriter.end();
+    }
+
+    Iterator<String> fieldIterator = reader.iterator();
+    queryMapWriter.start();
+
+    while (fieldIterator.hasNext()) {
+      String fieldName = fieldIterator.next();
+      FieldReader fieldReader = reader.reader(fieldName);
+      String dataType = fieldReader.getType().getMinorType().toString();
+
+      DataMode dataMode = fieldReader.getType().getMode();
+      if (dataMode == DataMode.REPEATED) {
+        dataType = dataMode + "_" + dataType;
+      }
+
+      VarCharHolder rowHolder = new VarCharHolder();
+      byte[] rowStringBytes = dataType.getBytes();
+      buffer.reallocIfNeeded(rowStringBytes.length);
+      buffer.setBytes(0, rowStringBytes);
+
+      rowHolder.start = 0;
+      rowHolder.end = rowStringBytes.length;
+      rowHolder.buffer = buffer;
+
+      queryMapWriter.varChar(fieldName).write(rowHolder);
+    }
+    queryMapWriter.end();
+  }
+}
diff --git a/contrib/udfs/src/test/java/org/apache/drill/exec/udfs/TestComplexSchemaFunctions.java b/contrib/udfs/src/test/java/org/apache/drill/exec/udfs/TestComplexSchemaFunctions.java
new file mode 100644
index 0000000..bf453ef
--- /dev/null
+++ b/contrib/udfs/src/test/java/org/apache/drill/exec/udfs/TestComplexSchemaFunctions.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.udfs;
+
+import org.apache.drill.categories.SqlFunctionTest;
+import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertEquals;
+
+@Category({UnlikelyTest.class, SqlFunctionTest.class})
+public class TestComplexSchemaFunctions extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    startCluster(builder);
+  }
+
+  @Test
+  public void testMapSchemaFunction() throws RpcException {
+    String sql = "SELECT getMapSchema(record) AS schema FROM cp.`json/nestedSchema.json`";
+
+    QueryBuilder q = client.queryBuilder().sql(sql);
+    RowSet results = q.rowSet();
+    assertEquals(results.rowCount(), 1);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addMap("schema")
+          .addNullable("int_field", MinorType.VARCHAR)
+          .addNullable("double_field", MinorType.VARCHAR)
+          .addNullable("string_field", MinorType.VARCHAR)
+          .addNullable("boolean_field", MinorType.VARCHAR)
+          .addNullable("int_list", MinorType.VARCHAR)
+          .addNullable("double_list", MinorType.VARCHAR)
+          .addNullable("boolean_list", MinorType.VARCHAR)
+          .addNullable("map", MinorType.VARCHAR)
+          .addNullable("repeated_map", MinorType.VARCHAR)
+        .resumeSchema()
+      .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+      .addRow((Object)strArray("BIGINT", "FLOAT8", "VARCHAR", "BIT", "REPEATED_BIGINT", "REPEATED_FLOAT8", "REPEATED_BIT", "MAP", "REPEATED_MAP"))
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testMapSchemaFunctionWithInnerMap() throws RpcException {
+    String sql = "SELECT getMapSchema(t1.record.map) AS schema FROM cp.`json/nestedSchema.json` AS t1";
+
+    QueryBuilder q = client.queryBuilder().sql(sql);
+    RowSet results = q.rowSet();
+    assertEquals(results.rowCount(), 1);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .addMap("schema")
+          .addNullable("nested_int_field", MinorType.VARCHAR)
+          .addNullable("nested_double_field", MinorType.VARCHAR)
+          .addNullable("nested_string_field", MinorType.VARCHAR)
+        .resumeSchema()
+      .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+      .addRow((Object)strArray("BIGINT", "FLOAT8", "VARCHAR"))
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testMapSchemaFunctionWithNull() throws RpcException {
+    String sql = "SELECT getMapSchema(null) AS schema FROM cp.`json/nestedSchema.json` AS t1";
+
+    QueryBuilder q = client.queryBuilder().sql(sql);
+    RowSet results = q.rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("schema", MinorType.MAP)
+      .build();
+
+    RowSet expected = client.rowSetBuilder(expectedSchema)
+      .addRow((Object) mapArray())
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+}
diff --git a/contrib/udfs/src/test/resources/json/nestedSchema.json b/contrib/udfs/src/test/resources/json/nestedSchema.json
new file mode 100644
index 0000000..22abcc2
--- /dev/null
+++ b/contrib/udfs/src/test/resources/json/nestedSchema.json
@@ -0,0 +1,21 @@
+{
+  "record" : {
+    "int_field": 1,
+    "double_field": 2.0,
+    "string_field": "My string",
+    "boolean_field": true,
+    "int_list": [1,2,3],
+    "double_list": [1.0,2.0,3.0],
+    "boolean_list": [true, false, true],
+    "map": {
+      "nested_int_field" : 5,
+      "nested_double_field": 5.0,
+      "nested_string_field": "5.0"
+    },
+    "repeated_map": [
+      { "a" : 1 },
+      { "b" : "abc" }
+    ]
+  },
+  "single_field": 10
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index a5330dd..c36ba67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -777,7 +777,7 @@
     }
 
     public B limit(int maxRecords) {
-      source.maxRecords = maxRecords;
+      this.maxRecords = maxRecords;
       return self();
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
index e046563..9e2c0e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
@@ -111,10 +111,10 @@
 
       final RelNode newLimit;
       if (projectRel != null) {
-        final RelNode newProject = projectRel.copy(projectRel.getTraitSet(), ImmutableList.of((RelNode) newScanRel));
+        final RelNode newProject = projectRel.copy(projectRel.getTraitSet(), ImmutableList.of(newScanRel));
         newLimit = limitRel.copy(limitRel.getTraitSet(), ImmutableList.of(newProject));
       } else {
-        newLimit = limitRel.copy(limitRel.getTraitSet(), ImmutableList.of((RelNode) newScanRel));
+        newLimit = limitRel.copy(limitRel.getTraitSet(), ImmutableList.of(newScanRel));
       }
 
       call.transformTo(newLimit);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index 85709b0..0a8e66a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -246,8 +246,9 @@
       return message("Success");
     } catch (PluginEncodingException e) {
       logger.warn("Error in JSON mapping: {}", storagePluginConfig, e);
-      return message("Invalid JSON");
+      return message("Invalid JSON: " + e.getMessage());
     } catch (PluginException e) {
+      logger.error("Error while saving plugin", e);
       return message(e.getMessage());
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index 390a32c..38a489e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -49,6 +49,8 @@
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
+import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
 
 /**
  * Plugin registry. Caches plugin instances which correspond to configurations
@@ -510,6 +512,8 @@
       return context.mapper().reader()
           .forType(StoragePluginConfig.class)
           .readValue(json);
+    } catch (InvalidTypeIdException | UnrecognizedPropertyException e) {
+      throw new PluginEncodingException(e.getMessage(), e);
     } catch (IOException e) {
       throw new PluginEncodingException("Failure when decoding plugin JSON", e);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 928ebac..580cc6c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.collections.MapUtils;
 import org.apache.drill.common.PlanStringBuilder;
@@ -42,6 +43,7 @@
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.metastore.store.FileTableMetadataProviderBuilder;
+import org.apache.drill.metastore.metadata.FileMetadata;
 import org.apache.drill.metastore.metadata.LocationProvider;
 import org.apache.drill.metastore.metadata.TableMetadataProvider;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -87,7 +89,6 @@
   private List<CompleteFileWork> chunks;
   private List<EndpointAffinity> endpointAffinities;
   private final Path selectionRoot;
-  private final int maxRecords;
 
   @JsonCreator
   public EasyGroupScan(
@@ -177,7 +178,6 @@
     mappings = that.mappings;
     partitionDepth = that.partitionDepth;
     metadataProvider = that.metadataProvider;
-    maxRecords = getMaxRecords();
   }
 
   @JsonIgnore
@@ -407,9 +407,13 @@
       newScan.files = files;
       newScan.matchAllMetadata = matchAllMetadata;
       newScan.nonInterestingColumnsMetadata = nonInterestingColumnsMetadata;
+      newScan.maxRecords = maxRecords;
 
-      newScan.fileSet = newScan.getFilesMetadata().keySet();
-      newScan.selection = FileSelection.create(null, new ArrayList<>(newScan.fileSet), newScan.selectionRoot);
+      Map<Path, FileMetadata> filesMetadata = newScan.getFilesMetadata();
+      if (MapUtils.isNotEmpty(filesMetadata)) {
+        newScan.fileSet = filesMetadata.keySet();
+        newScan.selection = FileSelection.create(null, new ArrayList<>(newScan.fileSet), newScan.selectionRoot);
+      }
       try {
         newScan.initFromSelection(newScan.selection, newScan.formatPlugin);
       } catch (IOException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index c13846d..2923bd2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -538,6 +538,7 @@
       newScan.rowGroups = rowGroups;
       newScan.matchAllMetadata = matchAllMetadata;
       newScan.nonInterestingColumnsMetadata = nonInterestingColumnsMetadata;
+      newScan.maxRecords = maxRecords;
       // since builder is used when pruning happens, entries and fileSet should be expanded
       if (!newScan.getFilesMetadata().isEmpty()) {
         newScan.entries = newScan.getFilesMetadata().keySet().stream()
diff --git a/exec/java-exec/src/test/resources/logback-test.xml b/exec/java-exec/src/test/resources/logback-test.xml
deleted file mode 100644
index bd748ee..0000000
--- a/exec/java-exec/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,61 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-
--->
-<configuration>
-
-  <!-- Uncomment the lines below (and <appender-ref ref="SOCKET"/> inside <root>)
-       to be able to use Lilith for viewing log events -->
-  <!--
-  <appender name="SOCKET" class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender">
-    <Compressing>true</Compressing>
-    <ReconnectionDelay>10000</ReconnectionDelay>
-    <IncludeCallerData>true</IncludeCallerData>
-    <RemoteHosts>${LILITH_HOSTNAME:-localhost}</RemoteHosts>
-  </appender>
-
-  <logger name="org.apache.drill" additivity="false">
-    <level value="debug"/>
-    <appender-ref ref="SOCKET"/>
-  </logger>
-
-  <logger name="query.logger" additivity="false">
-    <level value="info"/>
-    <appender-ref ref="SOCKET"/>
-  </logger>
-  -->
-  
-  <statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
-
-  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-    <!-- encoders are assigned the type
-         ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
-    <encoder>
-      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
-    </encoder>
-  </appender>
-
-  <root>
-    <level value="INFO"/>
-    <!-- Uncomment the next line (and the lines above) to be able to use Lilith for viewing log events -->
-    <!-- <appender-ref ref="SOCKET"/>-->
-    <appender-ref ref="STDOUT"/>
-  </root>
-
-</configuration>