Merge branch 'master' of https://github.com/apache/drill into DRILL-7863
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 3876e20..5b7ebbc 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -58,6 +58,7 @@
     <module>storage-jdbc</module>
     <module>storage-kafka</module>
     <module>storage-kudu</module>
+    <module>storage-phoenix</module>
     <module>storage-opentsdb</module>
     <module>storage-splunk</module>
     <module>storage-http</module>
diff --git a/contrib/storage-phoenix/pom.xml b/contrib/storage-phoenix/pom.xml
new file mode 100644
index 0000000..2244bfc
--- /dev/null
+++ b/contrib/storage-phoenix/pom.xml
@@ -0,0 +1,32 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.drill.contrib</groupId>
+    <artifactId>drill-contrib-parent</artifactId>
+    <version>1.19.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>drill-storage-phoenix</artifactId>
+  <name>Drill : Contrib : Storage : Phoenix</name>
+  
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
new file mode 100644
index 0000000..26870b5
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
@@ -0,0 +1,144 @@
+package org.apache.drill.exec.store.phoenix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> {
+
+  private static final Logger logger = LoggerFactory.getLogger(PhoenixBatchReader.class);
+
+  private final PhoenixSubScan subScan;
+  private PhoenixResultSet resultSet;
+
+  private ColumnDefn[] columns;
+  private int count = 1;
+
+  public PhoenixBatchReader(PhoenixSubScan subScan) {
+    this.subScan = subScan;
+  }
+
+  @Override
+  public boolean open(SchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), true);
+//    negotiator.batchSize(3);
+    resultSet = new PhoenixResultSet(negotiator.build());
+    bindColumns(resultSet.getWriter());
+    return true;
+  }
+
+  String abc = "Currently, the Apache Drill build process is known to work on Linux, Windows and OSX.";
+
+  @Override
+  public boolean next() {
+    if (count > 3) {
+      return false;
+    }
+//    byte[] value = new byte[512];
+//    Arrays.fill(value, (byte) String.valueOf(count).charAt(0));
+    while(!resultSet.getWriter().isFull()) {
+      resultSet.getWriter().start();
+      for (int i = 0; i < columns.length; i++) {
+        columns[i].load(count + "\t" + abc);
+      }
+      resultSet.getWriter().save();
+    }
+    count++;
+    return true;
+  }
+
+  @Override
+  public void close() {
+    int count = resultSet.getWriter().loader().batchCount();
+    logger.info("phoenix fetch batch size: {}", count);
+  }
+
+  private TupleMetadata defineMetadata() {
+    List<String> cols = new ArrayList<String>(Arrays.asList("a", "b", "c"));
+    columns = new ColumnDefn[cols.size()];
+    SchemaBuilder builder = new SchemaBuilder();
+    for (int i = 0; i < cols.size(); i++) {
+      columns[i] = makeColumn(cols.get(i), i);
+      columns[i].define(builder);
+    }
+    return builder.buildSchema();
+  }
+
+  private ColumnDefn makeColumn(String name, int index) {
+    return new VarCharDefn(name, index);
+  }
+
+  private void bindColumns(RowSetLoader loader) {
+    for (int i = 0; i < columns.length; i++) {
+      columns[i].bind(loader);
+    }
+  }
+
+  public abstract static class ColumnDefn {
+
+    final String name;
+    int index;
+    ScalarWriter writer;
+
+    public String getName() {
+      return name;
+    }
+
+    public int getIndex() {
+      return index;
+    }
+
+    public ColumnDefn(String name, int index) {
+      this.name = name;
+      this.index = index;
+    }
+
+    public void bind(RowSetLoader loader) {
+      writer = loader.scalar(getName());
+    }
+
+    public abstract void define(SchemaBuilder builder);
+
+    public abstract void load(String value);
+
+    public abstract void load(byte[] value);
+
+    public abstract void load(int index, String value);
+  }
+
+  public static class VarCharDefn extends ColumnDefn {
+
+    public VarCharDefn(String name, int index) {
+      super(name, index);
+    }
+
+    @Override
+    public void define(SchemaBuilder builder) {
+      builder.addNullable(getName(), MinorType.VARCHAR);
+    }
+
+    @Override
+    public void load(String value) {
+      writer.setString(value);
+    }
+
+    @Override
+    public void load(byte[] value) {
+      writer.setBytes(value, value.length);
+    }
+
+    @Deprecated
+    @Override
+    public void load(int index, String value) {  }
+  }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java
new file mode 100644
index 0000000..21b5b01
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java
@@ -0,0 +1,130 @@
+package org.apache.drill.exec.store.phoenix;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("phoenix-scan")
+public class PhoenixGroupScan extends AbstractGroupScan {
+
+  private final List<SchemaPath> columns;
+  private final PhoenixScanSpec scanSpec;
+  private final ScanStats scanStats;
+
+  private int hashCode;
+
+  public PhoenixGroupScan(PhoenixScanSpec scanSpec) {
+    super("no-user");
+    this.scanSpec = scanSpec;
+    this.columns = ALL_COLUMNS;
+    this.scanStats = computeScanStats();
+  }
+
+  public PhoenixGroupScan(PhoenixGroupScan groupScan) {
+    super(groupScan);
+    this.scanSpec = groupScan.scanSpec;
+    this.columns = groupScan.columns;
+    this.scanStats = groupScan.scanStats;
+  }
+
+  public PhoenixGroupScan(PhoenixGroupScan groupScan, List<SchemaPath> columns) {
+    super(groupScan);
+    this.scanSpec = groupScan.scanSpec;
+    this.columns = columns;
+    this.scanStats = computeScanStats();
+  }
+
+  @JsonCreator
+  public PhoenixGroupScan(
+      @JsonProperty("columns") List<SchemaPath> columns,
+      @JsonProperty("scanSpec") PhoenixScanSpec scanSpec) {
+    super("no-user");
+    this.columns = columns;
+    this.scanSpec = scanSpec;
+    this.scanStats = computeScanStats();
+  }
+
+  @Override
+  @JsonProperty("columns")
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @JsonProperty("scanSpec")
+  public PhoenixScanSpec getScanSpec() {
+    return scanSpec;
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {  }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+    return new PhoenixSubScan(scanSpec, columns);
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return 1;
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+    return new PhoenixGroupScan(columns, scanSpec);
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    return new PhoenixGroupScan(this, columns);
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    return scanStats;
+  }
+
+  @Override
+  public int hashCode() {
+    if(hashCode == 0) {
+      hashCode = Objects.hash(scanSpec, columns);
+    }
+    return hashCode;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if(this == obj) {
+      return true;
+    }
+    if(obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PhoenixGroupScan groupScan = (PhoenixGroupScan) obj;
+    return Objects.equals(scanSpec, groupScan.getScanSpec()) && Objects.equals(columns, groupScan.getColumns());
+  }
+
+  private ScanStats computeScanStats() {
+    int estRowCount = 10_000;
+    double cpuRatio = 1.0;
+    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, cpuRatio, 0);
+  }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixResultSet.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixResultSet.java
new file mode 100644
index 0000000..51a0625
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixResultSet.java
@@ -0,0 +1,19 @@
+package org.apache.drill.exec.store.phoenix;
+
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+public class PhoenixResultSet {
+
+  private ResultSetLoader loader;
+  private RowSetLoader writer;
+
+  public PhoenixResultSet(ResultSetLoader loader) {
+    this.loader = loader;
+    this.writer = loader.writer();
+  }
+
+  public RowSetLoader getWriter() {
+    return writer;
+  }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanBatchCreator.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanBatchCreator.java
new file mode 100644
index 0000000..cd51365
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanBatchCreator.java
@@ -0,0 +1,79 @@
+package org.apache.drill.exec.store.phoenix;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+
+public class PhoenixScanBatchCreator implements BatchCreator<PhoenixSubScan> {
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, PhoenixSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
+    try {
+      ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan);
+      return builder.buildScanOperator(context, subScan);
+    } catch (UserException e) {
+      throw e;
+    } catch (Throwable e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  private ScanFrameworkBuilder createBuilder(OptionManager options, PhoenixSubScan subScan) {
+    ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
+    builder.projection(subScan.getColumns());
+    builder.setUserName(subScan.getUserName());
+
+    builder.errorContext(new ChildErrorContext(builder.errorContext()) {
+
+      @Override
+      public void addContext(UserException.Builder builder) {
+        builder.addContext("tableName", subScan.getScanSpec().getTableName());
+      }
+
+    });
+
+    ReaderFactory readerFactory = new PhoenixReaderFactory(subScan);
+    builder.setReaderFactory(readerFactory);
+    builder.nullType(Types.optional(MinorType.VARCHAR));
+
+    return builder;
+  }
+
+  private static class PhoenixReaderFactory implements ReaderFactory {
+
+    private final PhoenixSubScan subScan;
+    private int count;
+
+    public PhoenixReaderFactory(PhoenixSubScan subScan) {
+      this.subScan = subScan;
+    }
+
+    @Override
+    public void bind(ManagedScanFramework framework) {  }
+
+    @Override
+    public ManagedReader<? extends SchemaNegotiator> next() {
+      String tableName = subScan.getScanSpec().getTableName();
+      if (count++ == 0) {
+        if (tableName.startsWith("my")) {
+          return new PhoenixBatchReader(subScan);
+        }
+      }
+      return null;
+    }
+  }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanSpec.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanSpec.java
new file mode 100644
index 0000000..dfe0a1e
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanSpec.java
@@ -0,0 +1,19 @@
+package org.apache.drill.exec.store.phoenix;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("phoenix-scan-spec")
+public class PhoenixScanSpec {
+
+  private final String tableName;
+
+  @JsonCreator
+  public PhoenixScanSpec(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
new file mode 100644
index 0000000..b32ad13
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
@@ -0,0 +1,73 @@
+package org.apache.drill.exec.store.phoenix;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+
+public class PhoenixSchemaFactory extends AbstractSchemaFactory {
+
+  public static final String MY_TABLE = "myTable";
+
+  private final PhoenixStoragePlugin plugin;
+
+  public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
+    super(plugin.getName());
+    this.plugin = plugin;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    PhoenixSchema schema = new PhoenixSchema(plugin);
+    parent.add(getName(), schema);
+  }
+
+  protected static class PhoenixSchema extends AbstractSchema {
+
+    private final Map<String, DynamicDrillTable> activeTables = CaseInsensitiveMap.newHashMap();
+    private final PhoenixStoragePlugin plugin;
+
+    public PhoenixSchema(PhoenixStoragePlugin plugin) {
+      super(Collections.emptyList(), plugin.getName());
+      this.plugin = plugin;
+    }
+
+    @Override
+    public Table getTable(String name) {
+      DynamicDrillTable table = activeTables.get(name);
+      if (table != null) {
+        return table;
+      }
+      if (MY_TABLE.contentEquals(name)) {
+        return registerTable(name,
+            new DynamicDrillTable(plugin, plugin.getName(),
+                new PhoenixScanSpec(name)));
+      }
+      return null;
+    }
+
+    private DynamicDrillTable registerTable(String name, DynamicDrillTable table) {
+      activeTables.put(name, table);
+      return table;
+    }
+
+    @Override
+    public Set<String> getTableNames() {
+      return Sets.newHashSet(MY_TABLE);
+    }
+
+    @Override
+    public String getTypeName() {
+      return PhoenixStoragePluginConfig.NAME;
+    }
+  }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
new file mode 100644
index 0000000..7a7f58c
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
@@ -0,0 +1,46 @@
+package org.apache.drill.exec.store.phoenix;
+
+import java.io.IOException;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+public class PhoenixStoragePlugin extends AbstractStoragePlugin {
+
+  private final PhoenixStoragePluginConfig config;
+  private final PhoenixSchemaFactory schemaFactory;
+
+  public PhoenixStoragePlugin(PhoenixStoragePluginConfig config, DrillbitContext context, String name) {
+    super(context, name);
+    this.config = config;
+    this.schemaFactory = new PhoenixSchemaFactory(this);
+  }
+
+  @Override
+  public StoragePluginConfig getConfig() {
+    return config;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+      PhoenixScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<PhoenixScanSpec>() {});
+      return new PhoenixGroupScan(scanSpec);
+  }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java
new file mode 100644
index 0000000..e9f8a86
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java
@@ -0,0 +1,97 @@
+package org.apache.drill.exec.store.phoenix;
+
+import java.util.Objects;
+import java.util.Properties;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName(PhoenixStoragePluginConfig.NAME)
+public class PhoenixStoragePluginConfig extends StoragePluginConfig {
+
+  public static final String NAME = "phoenix";
+
+  public String driverName = "org.apache.phoenix.queryserver.client.Driver";
+  public String host;
+  public int port = 8765;
+  public String username;
+  public String password;
+  public Properties props;
+
+  @JsonCreator
+  public PhoenixStoragePluginConfig(
+      @JsonProperty("driverName") String driverName,
+      @JsonProperty("host") String host,
+      @JsonProperty("port") int port,
+      @JsonProperty("username") String username,
+      @JsonProperty("password") String password,
+      @JsonProperty("props") Properties props) {
+    this.driverName = driverName;
+    this.host = host;
+    this.port = port;
+    this.username = username;
+    this.password = password;
+    this.props = props;
+  }
+
+  @JsonProperty("driverName")
+  public String getDriverName() {
+    return driverName;
+  }
+
+  @JsonProperty("host")
+  public String getHost() {
+    return host;
+  }
+
+  @JsonProperty("port")
+  public int getPort() {
+    return port;
+  }
+
+  @JsonProperty("username")
+  public String getUsername() {
+    return username;
+  }
+
+  @JsonProperty("password")
+  public String getPassword() {
+    return password;
+  }
+
+  @JsonProperty("props")
+  public Properties getProps() {
+    return props;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == this) {
+      return true;
+    }
+    if (o == null || !(o instanceof PhoenixStoragePluginConfig) ) {
+      return false;
+    }
+    return Objects.equals(this.host, ((PhoenixStoragePluginConfig)o).getHost()) &&
+        Objects.equals(this.port, ((PhoenixStoragePluginConfig)o).getPort());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(host, port);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(PhoenixStoragePluginConfig.NAME)
+        .field("driverName", driverName)
+        .field("host", host)
+        .field("port", port)
+        .field("username", username)
+        .toString();
+  }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java
new file mode 100644
index 0000000..390c958
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java
@@ -0,0 +1,67 @@
+package org.apache.drill.exec.store.phoenix;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("phoenix-sub-scan")
+public class PhoenixSubScan extends AbstractBase implements SubScan {
+
+  private final List<SchemaPath> columns;
+  private final PhoenixScanSpec scanSpec;
+
+  public PhoenixSubScan(
+      @JsonProperty("scanSpec") PhoenixScanSpec scanSpec,
+      @JsonProperty("columns") List<SchemaPath> columns) {
+    super("user-if-needed");
+    this.scanSpec = scanSpec;
+    this.columns = columns;
+  }
+
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  public PhoenixScanSpec getScanSpec() {
+    return scanSpec;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+    return new PhoenixSubScan(scanSpec, columns);
+  }
+
+  @Override
+  public String getOperatorType() {
+    return "PHOENIX";
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return ImmutableSet.<PhysicalOperator>of().iterator();
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("columns", columns)
+      .field("scanSpec", scanSpec)
+      .toString();
+  }
+}
diff --git a/contrib/storage-phoenix/src/main/resources/drill-module.conf b/contrib/storage-phoenix/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..3d19b1a
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/resources/drill-module.conf
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#  This file tells Drill to consider this module when class path scanning.
+#  This file can also include any supplementary configuration information.
+#  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill: {
+  classpath.scanning: {
+    packages += "org.apache.drill.exec.store.phoenix"
+  }
+}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestPhoenixPlugin.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestPhoenixPlugin.java
new file mode 100644
index 0000000..59fd962
--- /dev/null
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestPhoenixPlugin.java
@@ -0,0 +1,51 @@
+package org.apache.drill.exec.store.phoenix;
+
+import static org.junit.Assert.fail;
+
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryRowSetIterator;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestPhoenixPlugin extends ClusterTest {
+
+  private static final Logger logger = LoggerFactory.getLogger(TestPhoenixPlugin.class);
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher);
+    startCluster(builder);
+
+    StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage();
+    PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, "gwssi-app0102", 8765, "guest", "gwssi123", null);
+    config.setEnabled(true);
+    registry.put(PhoenixStoragePluginConfig.NAME, config);
+  }
+
+  public void test() {
+    fail("Not yet implemented");
+  }
+
+  public void testSchema() throws Exception {
+    String sql = "select * from phoenix.myTable";
+    queryBuilder().sql(sql).run();
+  }
+
+  @Test
+  public void testScan() throws Exception {
+    String sql = "select * from phoenix.myTable";
+    QueryRowSetIterator iterator = queryBuilder().sql(sql).rowSetIterator();
+    int count = 0;
+    for (RowSet rowset : iterator) {
+      rowset.print();
+      rowset.clear();
+      count++;
+    }
+    logger.info("phoenix fetch batch size : {}", count + 1);
+  }
+}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestRowSet.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestRowSet.java
new file mode 100644
index 0000000..ebefa99
--- /dev/null
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestRowSet.java
@@ -0,0 +1,42 @@
+package org.apache.drill.exec.store.phoenix;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.physical.rowSet.RowSetWriter;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.SubOperatorTest;
+import org.junit.Test;
+
+public class TestRowSet extends SubOperatorTest {
+
+  @Test
+  public void testRowSet() {
+    final TupleMetadata schema = new SchemaBuilder()
+        .add("id", MinorType.INT)
+        .add("name",MinorType.VARCHAR)
+        .buildSchema();
+
+    final RowSet rowSet = new RowSetBuilder(fixture.allocator(), schema)
+        .addRow(1, "luocong")
+        .addRow(2, "sunny")
+        .build();
+
+    rowSet.print();
+    rowSet.clear();
+
+    DirectRowSet directRowSet = DirectRowSet.fromSchema(fixture.allocator(), schema);
+    RowSetWriter writer = directRowSet.writer();
+    writer.scalar("id").setInt(1);
+    writer.scalar("name").setString("luocong");
+    writer.scalar("id").setInt(2);
+    writer.scalar("name").setString("sunny");
+    writer.save();
+    SingleRowSet record = writer.done();
+    record.print();
+    record.clear();
+  }
+}
diff --git a/exec/java-exec/src/test/resources/logback-test.xml b/exec/java-exec/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..bd748ee
--- /dev/null
+++ b/exec/java-exec/src/test/resources/logback-test.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<configuration>
+
+  <!-- Uncomment the lines below (and <appender-ref ref="SOCKET"/> inside <root>)
+       to be able to use Lilith for viewing log events -->
+  <!--
+  <appender name="SOCKET" class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender">
+    <Compressing>true</Compressing>
+    <ReconnectionDelay>10000</ReconnectionDelay>
+    <IncludeCallerData>true</IncludeCallerData>
+    <RemoteHosts>${LILITH_HOSTNAME:-localhost}</RemoteHosts>
+  </appender>
+
+  <logger name="org.apache.drill" additivity="false">
+    <level value="debug"/>
+    <appender-ref ref="SOCKET"/>
+  </logger>
+
+  <logger name="query.logger" additivity="false">
+    <level value="info"/>
+    <appender-ref ref="SOCKET"/>
+  </logger>
+  -->
+  
+  <statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <!-- encoders are assigned the type
+         ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <root>
+    <level value="INFO"/>
+    <!-- Uncomment the next line (and the lines above) to be able to use Lilith for viewing log events -->
+    <!-- <appender-ref ref="SOCKET"/>-->
+    <appender-ref ref="STDOUT"/>
+  </root>
+
+</configuration>