Merge branch 'master' of https://github.com/apache/drill into DRILL-7863
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 3876e20..5b7ebbc 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -58,6 +58,7 @@
<module>storage-jdbc</module>
<module>storage-kafka</module>
<module>storage-kudu</module>
+ <module>storage-phoenix</module>
<module>storage-opentsdb</module>
<module>storage-splunk</module>
<module>storage-http</module>
diff --git a/contrib/storage-phoenix/pom.xml b/contrib/storage-phoenix/pom.xml
new file mode 100644
index 0000000..2244bfc
--- /dev/null
+++ b/contrib/storage-phoenix/pom.xml
@@ -0,0 +1,32 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.drill.contrib</groupId>
+ <artifactId>drill-contrib-parent</artifactId>
+ <version>1.19.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>drill-storage-phoenix</artifactId>
+ <name>Drill : Contrib : Storage : Phoenix</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill</groupId>
+ <artifactId>drill-common</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
new file mode 100644
index 0000000..26870b5
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java
@@ -0,0 +1,144 @@
+package org.apache.drill.exec.store.phoenix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> {
+
+ private static final Logger logger = LoggerFactory.getLogger(PhoenixBatchReader.class);
+
+ private final PhoenixSubScan subScan;
+ private PhoenixResultSet resultSet;
+
+ private ColumnDefn[] columns;
+ private int count = 1;
+
+ public PhoenixBatchReader(PhoenixSubScan subScan) {
+ this.subScan = subScan;
+ }
+
+ @Override
+ public boolean open(SchemaNegotiator negotiator) {
+ negotiator.tableSchema(defineMetadata(), true);
+// negotiator.batchSize(3);
+ resultSet = new PhoenixResultSet(negotiator.build());
+ bindColumns(resultSet.getWriter());
+ return true;
+ }
+
+ String abc = "Currently, the Apache Drill build process is known to work on Linux, Windows and OSX.";
+
+ @Override
+ public boolean next() {
+ if (count > 3) {
+ return false;
+ }
+// byte[] value = new byte[512];
+// Arrays.fill(value, (byte) String.valueOf(count).charAt(0));
+ while(!resultSet.getWriter().isFull()) {
+ resultSet.getWriter().start();
+ for (int i = 0; i < columns.length; i++) {
+ columns[i].load(count + "\t" + abc);
+ }
+ resultSet.getWriter().save();
+ }
+ count++;
+ return true;
+ }
+
+ @Override
+ public void close() {
+ int count = resultSet.getWriter().loader().batchCount();
+ logger.info("phoenix fetch batch size: {}", count);
+ }
+
+ private TupleMetadata defineMetadata() {
+ List<String> cols = new ArrayList<String>(Arrays.asList("a", "b", "c"));
+ columns = new ColumnDefn[cols.size()];
+ SchemaBuilder builder = new SchemaBuilder();
+ for (int i = 0; i < cols.size(); i++) {
+ columns[i] = makeColumn(cols.get(i), i);
+ columns[i].define(builder);
+ }
+ return builder.buildSchema();
+ }
+
+ private ColumnDefn makeColumn(String name, int index) {
+ return new VarCharDefn(name, index);
+ }
+
+ private void bindColumns(RowSetLoader loader) {
+ for (int i = 0; i < columns.length; i++) {
+ columns[i].bind(loader);
+ }
+ }
+
+ public abstract static class ColumnDefn {
+
+ final String name;
+ int index;
+ ScalarWriter writer;
+
+ public String getName() {
+ return name;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public ColumnDefn(String name, int index) {
+ this.name = name;
+ this.index = index;
+ }
+
+ public void bind(RowSetLoader loader) {
+ writer = loader.scalar(getName());
+ }
+
+ public abstract void define(SchemaBuilder builder);
+
+ public abstract void load(String value);
+
+ public abstract void load(byte[] value);
+
+ public abstract void load(int index, String value);
+ }
+
+ public static class VarCharDefn extends ColumnDefn {
+
+ public VarCharDefn(String name, int index) {
+ super(name, index);
+ }
+
+ @Override
+ public void define(SchemaBuilder builder) {
+ builder.addNullable(getName(), MinorType.VARCHAR);
+ }
+
+ @Override
+ public void load(String value) {
+ writer.setString(value);
+ }
+
+ @Override
+ public void load(byte[] value) {
+ writer.setBytes(value, value.length);
+ }
+
+ @Deprecated
+ @Override
+ public void load(int index, String value) { }
+ }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java
new file mode 100644
index 0000000..21b5b01
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java
@@ -0,0 +1,130 @@
+package org.apache.drill.exec.store.phoenix;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("phoenix-scan")
+public class PhoenixGroupScan extends AbstractGroupScan {
+
+ private final List<SchemaPath> columns;
+ private final PhoenixScanSpec scanSpec;
+ private final ScanStats scanStats;
+
+ private int hashCode;
+
+ public PhoenixGroupScan(PhoenixScanSpec scanSpec) {
+ super("no-user");
+ this.scanSpec = scanSpec;
+ this.columns = ALL_COLUMNS;
+ this.scanStats = computeScanStats();
+ }
+
+ public PhoenixGroupScan(PhoenixGroupScan groupScan) {
+ super(groupScan);
+ this.scanSpec = groupScan.scanSpec;
+ this.columns = groupScan.columns;
+ this.scanStats = groupScan.scanStats;
+ }
+
+ public PhoenixGroupScan(PhoenixGroupScan groupScan, List<SchemaPath> columns) {
+ super(groupScan);
+ this.scanSpec = groupScan.scanSpec;
+ this.columns = columns;
+ this.scanStats = computeScanStats();
+ }
+
+ @JsonCreator
+ public PhoenixGroupScan(
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("scanSpec") PhoenixScanSpec scanSpec) {
+ super("no-user");
+ this.columns = columns;
+ this.scanSpec = scanSpec;
+ this.scanStats = computeScanStats();
+ }
+
+ @Override
+ @JsonProperty("columns")
+ public List<SchemaPath> getColumns() {
+ return columns;
+ }
+
+ @JsonProperty("scanSpec")
+ public PhoenixScanSpec getScanSpec() {
+ return scanSpec;
+ }
+
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException { }
+
+ @Override
+ public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+ return new PhoenixSubScan(scanSpec, columns);
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return 1;
+ }
+
+ @Override
+ public String getDigest() {
+ return toString();
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ return new PhoenixGroupScan(columns, scanSpec);
+ }
+
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ return new PhoenixGroupScan(this, columns);
+ }
+
+ @Override
+ public ScanStats getScanStats() {
+ return scanStats;
+ }
+
+ @Override
+ public int hashCode() {
+ if(hashCode == 0) {
+ hashCode = Objects.hash(scanSpec, columns);
+ }
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if(this == obj) {
+ return true;
+ }
+ if(obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ PhoenixGroupScan groupScan = (PhoenixGroupScan) obj;
+ return Objects.equals(scanSpec, groupScan.getScanSpec()) && Objects.equals(columns, groupScan.getColumns());
+ }
+
+ private ScanStats computeScanStats() {
+ int estRowCount = 10_000;
+ double cpuRatio = 1.0;
+ return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, cpuRatio, 0);
+ }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixResultSet.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixResultSet.java
new file mode 100644
index 0000000..51a0625
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixResultSet.java
@@ -0,0 +1,19 @@
+package org.apache.drill.exec.store.phoenix;
+
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+
+public class PhoenixResultSet {
+
+ private ResultSetLoader loader;
+ private RowSetLoader writer;
+
+ public PhoenixResultSet(ResultSetLoader loader) {
+ this.loader = loader;
+ this.writer = loader.writer();
+ }
+
+ public RowSetLoader getWriter() {
+ return writer;
+ }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanBatchCreator.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanBatchCreator.java
new file mode 100644
index 0000000..cd51365
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanBatchCreator.java
@@ -0,0 +1,79 @@
+package org.apache.drill.exec.store.phoenix;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+
+public class PhoenixScanBatchCreator implements BatchCreator<PhoenixSubScan> {
+
+ @Override
+ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, PhoenixSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
+ try {
+ ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan);
+ return builder.buildScanOperator(context, subScan);
+ } catch (UserException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ private ScanFrameworkBuilder createBuilder(OptionManager options, PhoenixSubScan subScan) {
+ ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
+ builder.projection(subScan.getColumns());
+ builder.setUserName(subScan.getUserName());
+
+ builder.errorContext(new ChildErrorContext(builder.errorContext()) {
+
+ @Override
+ public void addContext(UserException.Builder builder) {
+ builder.addContext("tableName", subScan.getScanSpec().getTableName());
+ }
+
+ });
+
+ ReaderFactory readerFactory = new PhoenixReaderFactory(subScan);
+ builder.setReaderFactory(readerFactory);
+ builder.nullType(Types.optional(MinorType.VARCHAR));
+
+ return builder;
+ }
+
+ private static class PhoenixReaderFactory implements ReaderFactory {
+
+ private final PhoenixSubScan subScan;
+ private int count;
+
+ public PhoenixReaderFactory(PhoenixSubScan subScan) {
+ this.subScan = subScan;
+ }
+
+ @Override
+ public void bind(ManagedScanFramework framework) { }
+
+ @Override
+ public ManagedReader<? extends SchemaNegotiator> next() {
+ String tableName = subScan.getScanSpec().getTableName();
+ if (count++ == 0) {
+ if (tableName.startsWith("my")) {
+ return new PhoenixBatchReader(subScan);
+ }
+ }
+ return null;
+ }
+ }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanSpec.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanSpec.java
new file mode 100644
index 0000000..dfe0a1e
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanSpec.java
@@ -0,0 +1,19 @@
+package org.apache.drill.exec.store.phoenix;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("phoenix-scan-spec")
+public class PhoenixScanSpec {
+
+ private final String tableName;
+
+ @JsonCreator
+ public PhoenixScanSpec(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
new file mode 100644
index 0000000..b32ad13
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSchemaFactory.java
@@ -0,0 +1,73 @@
+package org.apache.drill.exec.store.phoenix;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+
+public class PhoenixSchemaFactory extends AbstractSchemaFactory {
+
+ public static final String MY_TABLE = "myTable";
+
+ private final PhoenixStoragePlugin plugin;
+
+ public PhoenixSchemaFactory(PhoenixStoragePlugin plugin) {
+ super(plugin.getName());
+ this.plugin = plugin;
+ }
+
+ @Override
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ PhoenixSchema schema = new PhoenixSchema(plugin);
+ parent.add(getName(), schema);
+ }
+
+ protected static class PhoenixSchema extends AbstractSchema {
+
+ private final Map<String, DynamicDrillTable> activeTables = CaseInsensitiveMap.newHashMap();
+ private final PhoenixStoragePlugin plugin;
+
+ public PhoenixSchema(PhoenixStoragePlugin plugin) {
+ super(Collections.emptyList(), plugin.getName());
+ this.plugin = plugin;
+ }
+
+ @Override
+ public Table getTable(String name) {
+ DynamicDrillTable table = activeTables.get(name);
+ if (table != null) {
+ return table;
+ }
+ if (MY_TABLE.contentEquals(name)) {
+ return registerTable(name,
+ new DynamicDrillTable(plugin, plugin.getName(),
+ new PhoenixScanSpec(name)));
+ }
+ return null;
+ }
+
+ private DynamicDrillTable registerTable(String name, DynamicDrillTable table) {
+ activeTables.put(name, table);
+ return table;
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ return Sets.newHashSet(MY_TABLE);
+ }
+
+ @Override
+ public String getTypeName() {
+ return PhoenixStoragePluginConfig.NAME;
+ }
+ }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
new file mode 100644
index 0000000..7a7f58c
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java
@@ -0,0 +1,46 @@
+package org.apache.drill.exec.store.phoenix;
+
+import java.io.IOException;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+public class PhoenixStoragePlugin extends AbstractStoragePlugin {
+
+ private final PhoenixStoragePluginConfig config;
+ private final PhoenixSchemaFactory schemaFactory;
+
+ public PhoenixStoragePlugin(PhoenixStoragePluginConfig config, DrillbitContext context, String name) {
+ super(context, name);
+ this.config = config;
+ this.schemaFactory = new PhoenixSchemaFactory(this);
+ }
+
+ @Override
+ public StoragePluginConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ schemaFactory.registerSchemas(schemaConfig, parent);
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+ PhoenixScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<PhoenixScanSpec>() {});
+ return new PhoenixGroupScan(scanSpec);
+ }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java
new file mode 100644
index 0000000..e9f8a86
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java
@@ -0,0 +1,97 @@
+package org.apache.drill.exec.store.phoenix;
+
+import java.util.Objects;
+import java.util.Properties;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName(PhoenixStoragePluginConfig.NAME)
+public class PhoenixStoragePluginConfig extends StoragePluginConfig {
+
+ public static final String NAME = "phoenix";
+
+ public String driverName = "org.apache.phoenix.queryserver.client.Driver";
+ public String host;
+ public int port = 8765;
+ public String username;
+ public String password;
+ public Properties props;
+
+ @JsonCreator
+ public PhoenixStoragePluginConfig(
+ @JsonProperty("driverName") String driverName,
+ @JsonProperty("host") String host,
+ @JsonProperty("port") int port,
+ @JsonProperty("username") String username,
+ @JsonProperty("password") String password,
+ @JsonProperty("props") Properties props) {
+ this.driverName = driverName;
+ this.host = host;
+ this.port = port;
+ this.username = username;
+ this.password = password;
+ this.props = props;
+ }
+
+ @JsonProperty("driverName")
+ public String getDriverName() {
+ return driverName;
+ }
+
+ @JsonProperty("host")
+ public String getHost() {
+ return host;
+ }
+
+ @JsonProperty("port")
+ public int getPort() {
+ return port;
+ }
+
+ @JsonProperty("username")
+ public String getUsername() {
+ return username;
+ }
+
+ @JsonProperty("password")
+ public String getPassword() {
+ return password;
+ }
+
+ @JsonProperty("props")
+ public Properties getProps() {
+ return props;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (o == null || !(o instanceof PhoenixStoragePluginConfig) ) {
+ return false;
+ }
+ return Objects.equals(this.host, ((PhoenixStoragePluginConfig)o).getHost()) &&
+ Objects.equals(this.port, ((PhoenixStoragePluginConfig)o).getPort());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(host, port);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(PhoenixStoragePluginConfig.NAME)
+ .field("driverName", driverName)
+ .field("host", host)
+ .field("port", port)
+ .field("username", username)
+ .toString();
+ }
+}
diff --git a/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java
new file mode 100644
index 0000000..390c958
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixSubScan.java
@@ -0,0 +1,67 @@
+package org.apache.drill.exec.store.phoenix;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("phoenix-sub-scan")
+public class PhoenixSubScan extends AbstractBase implements SubScan {
+
+ private final List<SchemaPath> columns;
+ private final PhoenixScanSpec scanSpec;
+
+ public PhoenixSubScan(
+ @JsonProperty("scanSpec") PhoenixScanSpec scanSpec,
+ @JsonProperty("columns") List<SchemaPath> columns) {
+ super("user-if-needed");
+ this.scanSpec = scanSpec;
+ this.columns = columns;
+ }
+
+ public List<SchemaPath> getColumns() {
+ return columns;
+ }
+
+ public PhoenixScanSpec getScanSpec() {
+ return scanSpec;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSubScan(this, value);
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ return new PhoenixSubScan(scanSpec, columns);
+ }
+
+ @Override
+ public String getOperatorType() {
+ return "PHOENIX";
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return ImmutableSet.<PhysicalOperator>of().iterator();
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("columns", columns)
+ .field("scanSpec", scanSpec)
+ .toString();
+ }
+}
diff --git a/contrib/storage-phoenix/src/main/resources/drill-module.conf b/contrib/storage-phoenix/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..3d19b1a
--- /dev/null
+++ b/contrib/storage-phoenix/src/main/resources/drill-module.conf
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file tells Drill to consider this module when class path scanning.
+# This file can also include any supplementary configuration information.
+# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill: {
+ classpath.scanning: {
+ packages += "org.apache.drill.exec.store.phoenix"
+ }
+}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestPhoenixPlugin.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestPhoenixPlugin.java
new file mode 100644
index 0000000..59fd962
--- /dev/null
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestPhoenixPlugin.java
@@ -0,0 +1,51 @@
+package org.apache.drill.exec.store.phoenix;
+
+import static org.junit.Assert.fail;
+
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryRowSetIterator;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestPhoenixPlugin extends ClusterTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(TestPhoenixPlugin.class);
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher);
+ startCluster(builder);
+
+ StoragePluginRegistry registry = cluster.drillbit().getContext().getStorage();
+ PhoenixStoragePluginConfig config = new PhoenixStoragePluginConfig(null, "gwssi-app0102", 8765, "guest", "gwssi123", null);
+ config.setEnabled(true);
+ registry.put(PhoenixStoragePluginConfig.NAME, config);
+ }
+
+ public void test() {
+ fail("Not yet implemented");
+ }
+
+ public void testSchema() throws Exception {
+ String sql = "select * from phoenix.myTable";
+ queryBuilder().sql(sql).run();
+ }
+
+ @Test
+ public void testScan() throws Exception {
+ String sql = "select * from phoenix.myTable";
+ QueryRowSetIterator iterator = queryBuilder().sql(sql).rowSetIterator();
+ int count = 0;
+ for (RowSet rowset : iterator) {
+ rowset.print();
+ rowset.clear();
+ count++;
+ }
+ logger.info("phoenix fetch batch size : {}", count + 1);
+ }
+}
diff --git a/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestRowSet.java b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestRowSet.java
new file mode 100644
index 0000000..ebefa99
--- /dev/null
+++ b/contrib/storage-phoenix/src/test/java/org/apache/drill/exec/store/phoenix/TestRowSet.java
@@ -0,0 +1,42 @@
+package org.apache.drill.exec.store.phoenix;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.physical.rowSet.RowSetWriter;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.SubOperatorTest;
+import org.junit.Test;
+
+public class TestRowSet extends SubOperatorTest {
+
+ @Test
+ public void testRowSet() {
+ final TupleMetadata schema = new SchemaBuilder()
+ .add("id", MinorType.INT)
+ .add("name",MinorType.VARCHAR)
+ .buildSchema();
+
+ final RowSet rowSet = new RowSetBuilder(fixture.allocator(), schema)
+ .addRow(1, "luocong")
+ .addRow(2, "sunny")
+ .build();
+
+ rowSet.print();
+ rowSet.clear();
+
+ DirectRowSet directRowSet = DirectRowSet.fromSchema(fixture.allocator(), schema);
+ RowSetWriter writer = directRowSet.writer();
+ writer.scalar("id").setInt(1);
+ writer.scalar("name").setString("luocong");
+ writer.scalar("id").setInt(2);
+ writer.scalar("name").setString("sunny");
+ writer.save();
+ SingleRowSet record = writer.done();
+ record.print();
+ record.clear();
+ }
+}
diff --git a/exec/java-exec/src/test/resources/logback-test.xml b/exec/java-exec/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..bd748ee
--- /dev/null
+++ b/exec/java-exec/src/test/resources/logback-test.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<configuration>
+
+ <!-- Uncomment the lines below (and <appender-ref ref="SOCKET"/> inside <root>)
+ to be able to use Lilith for viewing log events -->
+ <!--
+ <appender name="SOCKET" class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender">
+ <Compressing>true</Compressing>
+ <ReconnectionDelay>10000</ReconnectionDelay>
+ <IncludeCallerData>true</IncludeCallerData>
+ <RemoteHosts>${LILITH_HOSTNAME:-localhost}</RemoteHosts>
+ </appender>
+
+ <logger name="org.apache.drill" additivity="false">
+ <level value="debug"/>
+ <appender-ref ref="SOCKET"/>
+ </logger>
+
+ <logger name="query.logger" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="SOCKET"/>
+ </logger>
+ -->
+
+ <statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root>
+ <level value="INFO"/>
+ <!-- Uncomment the next line (and the lines above) to be able to use Lilith for viewing log events -->
+ <!-- <appender-ref ref="SOCKET"/>-->
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</configuration>