DRILL-8358: Storage plugin for querying other Apache Drill clusters (#2709)
diff --git a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
index 3af6cc7..baa99b7 100644
--- a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
+++ b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
@@ -20,6 +20,7 @@
import com.github.pjfanning.xlsx.StreamingReader;
import com.github.pjfanning.xlsx.impl.StreamingWorkbook;
+import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -381,6 +382,9 @@
// Remove leading and trailing whitespace
tempColumnName = tempColumnName.trim();
+ if (StringUtils.isEmpty(tempColumnName)) {
+ tempColumnName = MISSING_FIELD_NAME_HEADER + (colPosition + 1);
+ }
tempColumnName = deconflictColumnNames(tempColumnName);
makeColumn(builder, tempColumnName, MinorType.FLOAT8);
excelFieldNames.add(colPosition, tempColumnName);
diff --git a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
index 9fe9f8e..960fcbe 100644
--- a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
+++ b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
@@ -710,6 +710,7 @@
RowSet results = client.queryBuilder().sql(sql).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("field_1", MinorType.FLOAT8)
.addNullable("original_id", MinorType.VARCHAR)
.addNullable("original_nameFirst", MinorType.VARCHAR)
.addNullable("original_nameLast", MinorType.VARCHAR)
@@ -722,14 +723,14 @@
.buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("XXXX00000001", "James", "Kushner", null, null, 10235, "US", LocalDate.parse("1957-04-18").atStartOfDay().toInstant(ZoneOffset.UTC), "NY")
- .addRow("XXXX00000002", "Steve", "Hecht", null, null, 11213, "US", LocalDate.parse("1982-08-10").atStartOfDay().toInstant(ZoneOffset.UTC), "NY")
- .addRow("XXXX00000003", "Ethan", "Stein", null, null, 10028, "US", LocalDate.parse("1991-04-11").atStartOfDay().toInstant(ZoneOffset.UTC), "NY")
- .addRow("XXXX00000004", "Mohammed", "Fatima", null, "Baltimore", 21202, "US", LocalDate.parse("1990-05-15").atStartOfDay().toInstant(ZoneOffset.UTC), "MD")
- .addRow("XXXX00000005", "Yakov", "Borodin", null, "Teaneck", 7666, "US", LocalDate.parse("1986-12-20").atStartOfDay().toInstant(ZoneOffset.UTC), "NJ")
- .addRow("XXXX00000006", "Akhil", "Chavda", null, null, null, "US", null, null)
- .addRow("XXXX00000007", "Mark", "Rahman", null, "Ellicott City", 21043, null, LocalDate.parse("1974-06-13").atStartOfDay().toInstant(ZoneOffset.UTC), "MD")
- .addRow("XXXX00000008", "Henry", "Smith", "xxxx@gmail.com", null, null, null, null, null)
+ .addRow(0F, "XXXX00000001", "James", "Kushner", null, null, 10235, "US", LocalDate.parse("1957-04-18").atStartOfDay().toInstant(ZoneOffset.UTC), "NY")
+ .addRow(1F, "XXXX00000002", "Steve", "Hecht", null, null, 11213, "US", LocalDate.parse("1982-08-10").atStartOfDay().toInstant(ZoneOffset.UTC), "NY")
+ .addRow(2F, "XXXX00000003", "Ethan", "Stein", null, null, 10028, "US", LocalDate.parse("1991-04-11").atStartOfDay().toInstant(ZoneOffset.UTC), "NY")
+ .addRow(3F, "XXXX00000004", "Mohammed", "Fatima", null, "Baltimore", 21202, "US", LocalDate.parse("1990-05-15").atStartOfDay().toInstant(ZoneOffset.UTC), "MD")
+ .addRow(4F, "XXXX00000005", "Yakov", "Borodin", null, "Teaneck", 7666, "US", LocalDate.parse("1986-12-20").atStartOfDay().toInstant(ZoneOffset.UTC), "NJ")
+ .addRow(5F, "XXXX00000006", "Akhil", "Chavda", null, null, null, "US", null, null)
+ .addRow(6F, "XXXX00000007", "Mark", "Rahman", null, "Ellicott City", 21043, null, LocalDate.parse("1974-06-13").atStartOfDay().toInstant(ZoneOffset.UTC), "MD")
+ .addRow(7F, "XXXX00000008", "Henry", "Smith", "xxxx@gmail.com", null, null, null, null, null)
.build();
new RowSetComparison(expected).verifyAndClearAll(results);
diff --git a/contrib/pom.xml b/contrib/pom.xml
index e728da9..476bc7f 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -58,6 +58,7 @@
<module>format-pcapng</module>
<module>format-iceberg</module>
<module>format-deltalake</module>
+ <module>storage-drill</module>
<module>storage-phoenix</module>
<module>storage-googlesheets</module>
<module>storage-hive</module>
diff --git a/contrib/storage-drill/README.md b/contrib/storage-drill/README.md
new file mode 100644
index 0000000..0f4491b
--- /dev/null
+++ b/contrib/storage-drill/README.md
@@ -0,0 +1,36 @@
+# Apache Drill storage plugin
+
+This storage plugin allows Drill to query other Drill clusters.
+Unlike the JDBC driver, this plugin doesn't produce extra conversions of input data and transfers it
+as is to the required operators. But similar to JDBC, it fetches data batches only when it is ready
+to process it to avoid memory issues.
+
+## Supported optimizations and features
+
+Drill storage plugin supports push-down of all operators it has.
+It determines which parts of the query plan could be pushed down and converts them to SQL queries
+submitted to the underlying Drill cluster.
+
+## Configuration
+
+Drill storage plugin has the following configuration properties:
+
+- `type` - storage plugin type, should be `'drill'`
+- `connection` - JDBC connection string to connect to underlying Drill cluster. Please refer to
+ [Configuration](https://drill.apache.org/docs/using-the-jdbc-driver/#using-the-jdbc-url-for-a-random-drillbit-connection) page for more details
+- `properties` - Connection properties. Please refer to [Configuration](https://drill.apache.org/docs/using-the-jdbc-driver/#using-the-jdbc-url-for-a-random-drillbit-connection) page for more details
+- `credentialsProvider` - credentials provider
+
+### Storage config example:
+
+```json
+{
+ "storage":{
+ "drill" : {
+ "type":"drill",
+ "connection":"jdbc:drill:drillbit=localhost:31010",
+ "enabled": false
+ }
+ }
+}
+```
diff --git a/contrib/storage-drill/pom.xml b/contrib/storage-drill/pom.xml
new file mode 100644
index 0000000..f3b88bb
--- /dev/null
+++ b/contrib/storage-drill/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>drill-contrib-parent</artifactId>
+ <groupId>org.apache.drill.contrib</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>drill-storage</artifactId>
+
+ <name>Drill : Contrib : Storage : Drill</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Test dependency -->
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill</groupId>
+ <artifactId>drill-common</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill.contrib.data</groupId>
+ <artifactId>tpch-sample-data</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>logback.log.dir</name>
+ <value>${project.build.directory}/surefire-reports</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillGroupScan.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillGroupScan.java
new file mode 100644
index 0000000..48c60ec
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillGroupScan.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
+import org.apache.drill.exec.proto.CoordinationProtos;
+
+import java.util.List;
+
+public class DrillGroupScan extends AbstractGroupScan {
+ private static final double ROWS = 1e6;
+
+ private final DrillStoragePluginConfig pluginConfig;
+ private final DrillScanSpec scanSpec;
+
+ @JsonCreator
+ public DrillGroupScan(
+ @JsonProperty("userName") String userName,
+ @JsonProperty("pluginConfig") DrillStoragePluginConfig pluginConfig,
+ @JsonProperty("scanSpec") DrillScanSpec scanSpec) {
+ super(userName);
+ this.pluginConfig = pluginConfig;
+ this.scanSpec = scanSpec;
+ }
+
+ public DrillGroupScan(DrillGroupScan that) {
+ super(that);
+ this.pluginConfig = that.pluginConfig;
+ this.scanSpec = that.scanSpec;
+ }
+
+ @JsonProperty("pluginConfig")
+ public DrillStoragePluginConfig getPluginConfig() {
+ return pluginConfig;
+ }
+
+ @JsonProperty("scanSpec")
+ public DrillScanSpec getScanSpec() {
+ return scanSpec;
+ }
+
+ @Override
+ public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
+ }
+
+ @Override
+ public SubScan getSpecificScan(int minorFragmentId) {
+ return new DrillSubScan(userName, pluginConfig, scanSpec.getQuery());
+ }
+
+ @JsonIgnore
+ public SqlDialect getDialect() {
+ return new SqlDialect(SqlDialect.EMPTY_CONTEXT
+ .withIdentifierQuoteString(pluginConfig.getIdentifierQuoteString())
+ .withConformance(SqlConverter.DRILL_CONFORMANCE)
+ .withUnquotedCasing(Casing.UNCHANGED)
+ .withQuotedCasing(Casing.UNCHANGED));
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return 1;
+ }
+
+ @Override
+ public String getDigest() {
+ return toString();
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ return new DrillGroupScan(this);
+ }
+
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ return new DrillGroupScan(this);
+ }
+
+ @Override
+ public ScanStats getScanStats() {
+ return new ScanStats(
+ ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT,
+ (long) Math.max(ROWS, 1),
+ 1,
+ 1);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("scanSpec", scanSpec)
+ .toString();
+ }
+
+ @Override
+ @JsonIgnore
+ public List<SchemaPath> getColumns() {
+ return super.getColumns();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ DrillGroupScan that = (DrillGroupScan) o;
+
+ return new EqualsBuilder()
+ .append(getPluginConfig(), that.getPluginConfig())
+ .append(getScanSpec(), that.getScanSpec())
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(getPluginConfig())
+ .append(getScanSpec())
+ .toHashCode();
+ }
+}
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillRecordReader.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillRecordReader.java
new file mode 100644
index 0000000..d680657
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillRecordReader.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.rpc.user.BlockingResultsListener;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.testing.ControlsInjector;
+import org.apache.drill.exec.testing.ControlsInjectorFactory;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLTimeoutException;
+import java.util.Iterator;
+import java.util.Optional;
+
+public class DrillRecordReader implements CloseableRecordBatch {
+ private static final Logger logger = LoggerFactory.getLogger(DrillRecordReader.class);
+
+ private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(DrillRecordReader.class);
+
+ private final DrillClient drillClient;
+ private final RecordBatchLoader batchLoader;
+ private final FragmentContext context;
+
+ private final BlockingResultsListener resultsListener;
+ private final UserBitShared.QueryId id;
+ private BatchSchema schema;
+ private boolean first = true;
+ private final OperatorContext oContext;
+
+ public DrillRecordReader(ExecutorFragmentContext context, DrillSubScan config)
+ throws OutOfMemoryException, ExecutionSetupException {
+ this.context = context;
+ this.oContext = context.newOperatorContext(config);
+ this.batchLoader = new RecordBatchLoader(oContext.getAllocator());
+
+ String userName = Optional.ofNullable(config.getUserName()).orElse(ImpersonationUtil.getProcessUserName());
+ this.drillClient = config.getPluginConfig().getDrillClient(userName, oContext.getAllocator());
+ long queryTimeout = drillClient.getConfig().getLong(ExecConstants.JDBC_QUERY_TIMEOUT);
+ int batchQueueThrottlingThreshold = drillClient.getConfig()
+ .getInt(ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ this.resultsListener =
+ new BlockingResultsListener(() -> stopwatch, () -> queryTimeout, batchQueueThrottlingThreshold);
+ this.drillClient.runQuery(QueryType.SQL, config.getQuery(), resultsListener);
+ this.id = resultsListener.getQueryId();
+ try {
+ resultsListener.awaitFirstMessage();
+ } catch (InterruptedException | SQLTimeoutException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ @Override
+ public FragmentContext getContext() {
+ return context;
+ }
+
+ @Override
+ public BatchSchema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public int getRecordCount() {
+ return batchLoader.getRecordCount();
+ }
+
+ @Override
+ public void cancel() {
+ drillClient.cancelQuery(id);
+ }
+
+ @Override
+ public Iterator<VectorWrapper<?>> iterator() {
+ return batchLoader.iterator();
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypedFieldId getValueVectorId(SchemaPath path) {
+ return batchLoader.getValueVectorId(path);
+ }
+
+ @Override
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ return batchLoader.getValueAccessorById(clazz, ids);
+ }
+
+ private QueryDataBatch getNextBatch() {
+ try {
+ injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class);
+ return resultsListener.getNext();
+ } catch(InterruptedException e) {
+ // Preserve evidence that the interruption occurred so that code higher up
+ // on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+
+ return null;
+ } catch (Exception e) {
+ throw UserException.dataReadError(e)
+ .addContext("Failure when reading incoming batch")
+ .build(logger);
+ }
+ }
+
+ @Override
+ public RecordBatch.IterOutcome next() {
+ batchLoader.resetRecordCount();
+ oContext.getStats().startProcessing();
+ try {
+ QueryDataBatch batch;
+ try {
+ oContext.getStats().startWait();
+ batch = getNextBatch();
+
+ // skip over empty batches. we do this since these are basically control messages.
+ while (batch != null && batch.getHeader().getDef().getRecordCount() == 0
+ && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
+ batch = getNextBatch();
+ }
+ } finally {
+ oContext.getStats().stopWait();
+ }
+
+ first = false;
+
+ if (batch == null) {
+ // Represents last outcome of next(). If an Exception is thrown
+ // during the method's execution a value IterOutcome.STOP will be assigned.
+ IterOutcome lastOutcome = IterOutcome.NONE;
+ batchLoader.zero();
+ context.getExecutorState().checkContinue();
+ return lastOutcome;
+ }
+
+ if (context.getAllocator().isOverLimit()) {
+ context.requestMemory(this);
+ if (context.getAllocator().isOverLimit()) {
+ throw new OutOfMemoryException("Allocator over limit");
+ }
+ }
+
+ UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef();
+ boolean schemaChanged = batchLoader.load(rbd, batch.getData());
+
+ batch.release();
+ if (schemaChanged) {
+ this.schema = batchLoader.getSchema();
+ oContext.getStats().batchReceived(0, rbd.getRecordCount(), true);
+ return RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+ } else {
+ oContext.getStats().batchReceived(0, rbd.getRecordCount(), false);
+ return RecordBatch.IterOutcome.OK;
+ }
+ } finally {
+ oContext.getStats().stopProcessing();
+ }
+ }
+
+ @Override
+ public WritableBatch getWritableBatch() {
+ return batchLoader.getWritableBatch();
+ }
+
+ @Override
+ public void close() {
+ logger.debug("Closing {}", getClass().getCanonicalName());
+ batchLoader.clear();
+ resultsListener.close();
+ drillClient.close();
+ }
+
+ @Override
+ public VectorContainer getOutgoingContainer() {
+ throw new UnsupportedOperationException(
+ String.format("You should not call getOutgoingContainer() for class %s",
+ getClass().getCanonicalName()));
+ }
+
+ @Override
+ public VectorContainer getContainer() {
+ return batchLoader.getContainer();
+ }
+
+ @Override
+ public void dump() {
+ logger.error("DrillRecordReader[batchLoader={}, schema={}]", batchLoader, schema);
+ }
+}
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillScanBatchCreator.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillScanBatchCreator.java
new file mode 100644
index 0000000..6837831
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillScanBatchCreator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.List;
+
+@SuppressWarnings("unused")
+public class DrillScanBatchCreator implements BatchCreator<DrillSubScan> {
+
+ @Override
+ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DrillSubScan subScan,
+ List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ return new DrillRecordReader(context, subScan);
+ }
+}
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillScanSpec.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillScanSpec.java
new file mode 100644
index 0000000..f50602a
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillScanSpec.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.exec.planner.logical.DrillTableSelection;
+
+import java.util.List;
+
+public class DrillScanSpec implements DrillTableSelection {
+ private List<String> schemaPath;
+ private String collectionName;
+
+ private String query;
+
+ @JsonCreator
+ public DrillScanSpec(@JsonProperty("schemaPath") List<String> schemaPath,
+ @JsonProperty("collectionName") String collectionName) {
+ this.schemaPath = schemaPath;
+ this.collectionName = collectionName;
+ }
+
+ public DrillScanSpec(String query) {
+ this.query = query;
+ }
+
+ public List<String> getSchemaPath() {
+ return this.schemaPath;
+ }
+
+ public String getCollectionName() {
+ return this.collectionName;
+ }
+
+ public String getQuery() {
+ return this.query;
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("schemaPath", schemaPath)
+ .field("collectionName", collectionName)
+ .field("query", query)
+ .toString();
+ }
+
+ @Override
+ public String digest() {
+ return toString();
+ }
+}
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillStoragePlugin.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillStoragePlugin.java
new file mode 100644
index 0000000..b4c77ef
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillStoragePlugin.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.drill.plugin.plan.DrillPluginImplementor;
+import org.apache.drill.exec.store.drill.plugin.schema.DrillSchemaFactory;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DrillStoragePlugin extends AbstractStoragePlugin {
+
+ private final DrillStoragePluginConfig drillConfig;
+ private final DrillSchemaFactory schemaFactory;
+ private final StoragePluginRulesSupplier storagePluginRulesSupplier;
+ private final Map<String, DrillClient> userClients;
+
+ public DrillStoragePlugin(
+ DrillStoragePluginConfig drillConfig,
+ DrillbitContext context,
+ String name) {
+ super(context, name);
+ this.drillConfig = drillConfig;
+ this.schemaFactory = new DrillSchemaFactory(this, name);
+ this.storagePluginRulesSupplier = storagePluginRulesSupplier(name);
+
+ assert drillConfig.getConnection().startsWith(DrillStoragePluginConfig.CONNECTION_STRING_PREFIX);
+
+ this.userClients = new ConcurrentHashMap<>();
+ }
+
+ private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name) {
+ Convention convention = new Convention.Impl("DRILL." + name, PluginRel.class);
+ return StoragePluginRulesSupplier.builder()
+ .rulesProvider(new PluginRulesProviderImpl(convention, DrillPluginImplementor::new))
+ .supportsProjectPushdown(true)
+ .supportsSortPushdown(true)
+ .supportsAggregatePushdown(true)
+ .supportsFilterPushdown(true)
+ .supportsLimitPushdown(true)
+ .supportsUnionPushdown(true)
+ .supportsJoinPushdown(true)
+ .convention(convention)
+ .build();
+ }
+
+ @Override
+ public DrillStoragePluginConfig getConfig() {
+ return drillConfig;
+ }
+
+ @Override
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+ schemaFactory.registerSchemas(schemaConfig, parent);
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+ DrillScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<DrillScanSpec>() {
+ });
+ return new DrillGroupScan(userName, drillConfig, scanSpec);
+ }
+
+ @Override
+ public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+ switch (phase) {
+ case PHYSICAL:
+ case LOGICAL:
+ return storagePluginRulesSupplier.getOptimizerRules();
+ case LOGICAL_PRUNE_AND_JOIN:
+ case LOGICAL_PRUNE:
+ case PARTITION_PRUNING:
+ case JOIN_PLANNING:
+ default:
+ return Collections.emptySet();
+ }
+ }
+
+ public Convention convention() {
+ return storagePluginRulesSupplier.convention();
+ }
+
+ public DrillClient getClient(String userName) {
+ userClients.computeIfAbsent(userName, this::createClient);
+ // recompute drill client for the case of closed connection
+ return userClients.computeIfPresent(userName, (name, value) -> {
+ if (!value.connectionIsActive()) {
+ AutoCloseables.closeSilently(value);
+ return createClient(name);
+ }
+ return value;
+ });
+ }
+
+ private DrillClient createClient(String userName) {
+ return drillConfig.getDrillClient(userName, null);
+ }
+
+ @Override
+ public void close() {
+ AutoCloseables.closeSilently(userClients.values().toArray(new AutoCloseable[0]));
+ }
+}
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillStoragePluginConfig.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillStoragePluginConfig.java
new file mode 100644
index 0000000..3dfb722
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillStoragePluginConfig.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.calcite.avatica.ConnectStringParser;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+@JsonTypeName(DrillStoragePluginConfig.NAME)
+public class DrillStoragePluginConfig extends StoragePluginConfig {
+ private static final Logger logger = LoggerFactory.getLogger(DrillStoragePluginConfig.class);
+
+ public static final String NAME = "drill";
+ public static final String CONNECTION_STRING_PREFIX = "jdbc:drill:";
+
+ private static final String DEFAULT_QUOTING_IDENTIFIER = "`";
+
+ private final String connection;
+ private final Properties properties;
+
+ @JsonCreator
+ public DrillStoragePluginConfig(
+ @JsonProperty("connection") String connection,
+ @JsonProperty("properties") Properties properties,
+ @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider,
+ @JsonProperty("authMode") String authMode) {
+ super(getCredentialsProvider(credentialsProvider), credentialsProvider == null,
+ AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER));
+ this.connection = connection;
+ this.properties = Optional.ofNullable(properties).orElse(new Properties());
+ }
+
+ private DrillStoragePluginConfig(DrillStoragePluginConfig that,
+ CredentialsProvider credentialsProvider) {
+ super(getCredentialsProvider(credentialsProvider),
+ credentialsProvider == null, that.authMode);
+ this.connection = that.connection;
+ this.properties = that.properties;
+ }
+
+ @JsonProperty("connection")
+ public String getConnection() {
+ return connection;
+ }
+
+ @JsonProperty("properties")
+ public Properties getProperties() {
+ return properties;
+ }
+
+ private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) {
+ return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
+ }
+
+ @JsonIgnore
+ public String getIdentifierQuoteString() {
+ return properties.getProperty(DrillProperties.QUOTING_IDENTIFIERS, DEFAULT_QUOTING_IDENTIFIER);
+ }
+
+ @Override
+ public DrillStoragePluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+ return new DrillStoragePluginConfig(this, credentialsProvider);
+ }
+
+ private Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials(
+ UserCredentials userCredentials) {
+ switch (authMode) {
+ case SHARED_USER:
+ return new UsernamePasswordCredentials.Builder()
+ .setCredentialsProvider(credentialsProvider)
+ .build();
+ case USER_TRANSLATION:
+ Preconditions.checkNotNull(
+ userCredentials,
+ "A drill query user is required for user translation auth mode."
+ );
+ return new UsernamePasswordCredentials.Builder()
+ .setCredentialsProvider(credentialsProvider)
+ .setQueryUser(userCredentials.getUserName())
+ .build();
+ default:
+ throw UserException.validationError()
+ .message("This storage plugin does not support auth mode: %s", authMode)
+ .build(logger);
+ }
+ }
+
+ private Map<String, String> getCredentials(UserCredentials userCredentials) {
+ return getUsernamePasswordCredentials(userCredentials)
+ .<Map<String, String>>map(creds -> ImmutableMap.of(DrillProperties.USER, creds.getUsername(),
+ DrillProperties.PASSWORD, creds.getPassword()))
+ .orElse(Collections.emptyMap());
+ }
+
+ @JsonIgnore
+ public DrillClient getDrillClient(String userName, BufferAllocator allocator) {
+ try {
+ String urlSuffix = connection.substring(CONNECTION_STRING_PREFIX.length());
+ Properties props = ConnectStringParser.parse(urlSuffix, properties);
+ props.putAll(getCredentials(UserCredentials.newBuilder().setUserName(userName).build()));
+
+ DrillConfig dConfig = DrillConfig.forClient();
+ boolean isDirect = props.getProperty(DrillProperties.DRILLBIT_CONNECTION) != null;
+ DrillClient client = new DrillClient(dConfig, null, allocator, isDirect);
+
+ String connect = props.getProperty(DrillProperties.ZOOKEEPER_CONNECTION);
+ client.connect(connect, props);
+ return client;
+ } catch (RpcException | SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DrillStoragePluginConfig that = (DrillStoragePluginConfig) o;
+ return Objects.equals(connection, that.connection);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(connection);
+ }
+}
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillSubScan.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillSubScan.java
new file mode 100644
index 0000000..bd70616
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillSubScan.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+@JsonTypeName("drill-read")
+public class DrillSubScan extends AbstractBase implements SubScan {
+
+ public static final String OPERATOR_TYPE = "DRILL_SUB_SCAN";
+
+ private final String query;
+
+ @JsonProperty
+ private final DrillStoragePluginConfig pluginConfig;
+
+ @JsonCreator
+ public DrillSubScan(
+ @JsonProperty("userName") String userName,
+ @JsonProperty("pluginConfig") StoragePluginConfig pluginConfig,
+ @JsonProperty("query") String query) {
+ super(userName);
+ this.pluginConfig = (DrillStoragePluginConfig) pluginConfig;
+ this.query = query;
+ }
+
+ public DrillSubScan(String userName,
+ DrillStoragePluginConfig storagePluginConfig,
+ String query) {
+ super(userName);
+ this.pluginConfig = storagePluginConfig;
+ this.query = query;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(
+ PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSubScan(this, value);
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new DrillSubScan(getUserName(), pluginConfig, query);
+ }
+
+ public DrillStoragePluginConfig getPluginConfig() {
+ return pluginConfig;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ @Override
+ public String getOperatorType() {
+ return OPERATOR_TYPE;
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return Collections.emptyIterator();
+ }
+}
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/package-info.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/package-info.java
new file mode 100644
index 0000000..a210cc6
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Drill storage plugin.
+ * <p>
+ * Enables querying Drill as a data store.
+ */
+package org.apache.drill.exec.store.drill.plugin;
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/plan/DrillPluginImplementor.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/plan/DrillPluginImplementor.java
new file mode 100644
index 0000000..7f1cb16
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/plan/DrillPluginImplementor.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin.plan;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.adapter.jdbc.JdbcImplementor;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.drill.plugin.DrillGroupScan;
+import org.apache.drill.exec.store.drill.plugin.DrillScanSpec;
+import org.apache.drill.exec.store.drill.plugin.DrillStoragePlugin;
+import org.apache.drill.exec.store.plan.AbstractPluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginJoinRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+public class DrillPluginImplementor extends AbstractPluginImplementor {
+ private DrillGroupScan groupScan;
+ private boolean isRoot = true;
+
+ @Override
+ protected Class<? extends StoragePlugin> supportedPlugin() {
+ return DrillStoragePlugin.class;
+ }
+
+ @Override
+ public void implement(StoragePluginTableScan scan) {
+ groupScan = (DrillGroupScan) scan.getGroupScan();
+
+ if (isRoot) {
+ completeProcessing(scan);
+ }
+ }
+
+ private void completeProcessing(RelNode scan) {
+ String query = buildQuery(scan);
+ DrillScanSpec scanSpec = new DrillScanSpec(query);
+ groupScan = new DrillGroupScan(groupScan.getUserName(), groupScan.getPluginConfig(), scanSpec);
+ }
+
+ @Override
+ public void implement(PluginAggregateRel aggregate) throws IOException {
+ process(aggregate);
+ }
+
+ private void process(RelNode relNode) throws IOException {
+ boolean isRoot = this.isRoot;
+ this.isRoot = false;
+ for (RelNode input : relNode.getInputs()) {
+ visitChild(input);
+ }
+
+ if (isRoot) {
+ completeProcessing(relNode);
+ }
+ }
+
+ @Override
+ public void implement(PluginFilterRel filter) throws IOException {
+ process(filter);
+ }
+
+ @Override
+ public void implement(PluginLimitRel limit) throws IOException {
+ process(limit);
+ }
+
+ @Override
+ public void implement(PluginProjectRel project) throws IOException {
+ process(project);
+ }
+
+ @Override
+ public void implement(PluginSortRel sort) throws IOException {
+ process(sort);
+ }
+
+ @Override
+ public void implement(PluginUnionRel union) throws IOException {
+ process(union);
+ }
+
+ @Override
+ public void implement(PluginJoinRel join) throws IOException {
+ process(join);
+ }
+
+ @Override
+ public boolean canImplement(Aggregate aggregate) {
+ return true;
+ }
+
+ @Override
+ public boolean canImplement(Filter filter) {
+ return true;
+ }
+
+ @Override
+ public boolean canImplement(DrillLimitRelBase limit) {
+ return true;
+ }
+
+ @Override
+ public boolean canImplement(Project project) {
+ return true;
+ }
+
+ @Override
+ public boolean canImplement(Sort sort) {
+ return true;
+ }
+
+ @Override
+ public boolean canImplement(Union union) {
+ return true;
+ }
+
+ @Override
+ public boolean canImplement(TableScan scan) {
+ return true;
+ }
+
+ @Override
+ public boolean canImplement(Join scan) {
+ return true;
+ }
+
+ @Override
+ protected boolean hasPluginGroupScan(RelNode node) {
+ return findGroupScan(node) instanceof DrillGroupScan;
+ }
+
+ @Override
+ public GroupScan getPhysicalOperator() {
+ return groupScan;
+ }
+
+ public String buildQuery(RelNode node) {
+ SqlDialect dialect = groupScan.getDialect();
+ JdbcImplementor jdbcImplementor = new DrillRelToSqlConverter(dialect,
+ (JavaTypeFactory) node.getCluster().getTypeFactory());
+ JdbcImplementor.Result result = jdbcImplementor.visitRoot(node);
+ return result.asStatement().toSqlString(dialect).getSql();
+ }
+
+ public static class DrillRelToSqlConverter extends JdbcImplementor {
+
+ public DrillRelToSqlConverter(SqlDialect dialect, JavaTypeFactory typeFactory) {
+ super(dialect, typeFactory);
+ }
+
+ @SuppressWarnings("unused")
+ public Result visit(PluginLimitRel e) {
+ Result x = visitInput(e, 0, Clause.OFFSET, Clause.FETCH);
+ Builder builder = x.builder(e);
+ Optional.ofNullable(e.getFetch())
+ .ifPresent(fetch -> builder.setFetch(builder.context.toSql(null, fetch)));
+ Optional.ofNullable(e.getOffset())
+ .ifPresent(offset -> builder.setOffset(builder.context.toSql(null, offset)));
+ return builder.result();
+ }
+
+ @Override
+ public Result visit(TableScan scan) {
+ List<String> qualifiedName = scan.getTable().getQualifiedName();
+ SqlIdentifier sqlIdentifier = new SqlIdentifier(
+ qualifiedName.subList(1, qualifiedName.size()), SqlParserPos.ZERO);
+ return result(sqlIdentifier, ImmutableList.of(Clause.FROM), scan, null);
+ }
+ }
+}
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/schema/DrillPluginSchema.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/schema/DrillPluginSchema.java
new file mode 100644
index 0000000..0fde3d4
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/schema/DrillPluginSchema.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin.schema;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.drill.plugin.DrillScanSpec;
+import org.apache.drill.exec.store.drill.plugin.DrillStoragePlugin;
+import org.apache.drill.exec.store.drill.plugin.DrillStoragePluginConfig;
+import org.apache.drill.exec.store.plan.rel.PluginDrillTable;
+import org.apache.drill.exec.util.ImpersonationUtil;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class DrillPluginSchema extends AbstractSchema {
+
+ private final DrillStoragePlugin plugin;
+
+ private final Map<String, DrillPluginSchema> schemaMap = new HashMap<>();
+
+ private final Map<String, DrillTable> drillTables = new HashMap<>();
+ private final String userName;
+
+ public DrillPluginSchema(DrillStoragePlugin plugin, String name, String userName) {
+ super(Collections.emptyList(), name);
+ this.plugin = plugin;
+ this.userName = Optional.ofNullable(userName).orElse(ImpersonationUtil.getProcessUserName());
+
+ getSchemasList().stream()
+ .map(UserProtos.SchemaMetadata::getSchemaName)
+ .map(String::toLowerCase)
+ .map(SchemaPath::parseFromString)
+ .forEach(this::addSubSchema);
+ }
+
+ private DrillPluginSchema(DrillStoragePlugin plugin, List<String> parentSchemaPath, String name, String userName) {
+ super(parentSchemaPath, name);
+ this.plugin = plugin;
+ this.userName = userName;
+ }
+
+ private void addSubSchema(SchemaPath path) {
+ DrillPluginSchema drillSchema = new DrillPluginSchema(plugin, getSchemaPath(), path.getRootSegmentPath(), userName);
+ schemaMap.put(path.getRootSegmentPath(), drillSchema);
+ while (!path.isLeaf()) {
+ path = new SchemaPath(path.getRootSegment().getChild().getNameSegment());
+ DrillPluginSchema child = new DrillPluginSchema(plugin, drillSchema.getSchemaPath(), path.getRootSegmentPath(), userName);
+ drillSchema.schemaMap.put(path.getRootSegmentPath(), child);
+ drillSchema = child;
+ }
+ }
+
+ private List<UserProtos.SchemaMetadata> getSchemasList() {
+ try {
+ return plugin.getClient(userName).getSchemas(null, null)
+ .get().getSchemasList();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public AbstractSchema getSubSchema(String name) {
+ return schemaMap.get(name);
+ }
+
+ void setHolder(SchemaPlus plusOfThis) {
+ getSubSchemaNames().forEach(s -> plusOfThis.add(s, getSubSchema(s)));
+ }
+
+ @Override
+ public boolean showInInformationSchema() {
+ return true;
+ }
+
+ @Override
+ public Set<String> getSubSchemaNames() {
+ return schemaMap.keySet();
+ }
+
+ @Override
+ public Table getTable(String name) {
+ return drillTables.computeIfAbsent(name,
+ key -> {
+ DrillScanSpec drillScanSpec = new DrillScanSpec(getSchemaPath(), key);
+ return new PluginDrillTable(plugin, getName(), null, drillScanSpec, plugin.convention());
+ });
+ }
+
+ @Override
+ public String getTypeName() {
+ return DrillStoragePluginConfig.NAME;
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ try {
+ List<String> schemaPaths = getSchemaPath();
+ String schemaPath = SchemaPath.getCompoundPath(
+ schemaPaths.subList(1, schemaPaths.size()).toArray(new String[0])).getAsUnescapedPath();
+ UserProtos.LikeFilter schemaNameFilter = UserProtos.LikeFilter.newBuilder()
+ .setPattern(schemaPath)
+ .build();
+ return plugin.getClient(userName).getTables(null, schemaNameFilter, null, null)
+ .get().getTablesList().stream()
+ .map(UserProtos.TableMetadata::getTableName)
+ .collect(Collectors.toSet());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+}
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/schema/DrillSchemaFactory.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/schema/DrillSchemaFactory.java
new file mode 100644
index 0000000..ee68ec3
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/schema/DrillSchemaFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin.schema;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.drill.plugin.DrillStoragePlugin;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class DrillSchemaFactory extends AbstractSchemaFactory {
+
+ private final LoadingCache<Pair<String, String>, DrillPluginSchema> databases;
+ private final DrillStoragePlugin plugin;
+
+ public DrillSchemaFactory(DrillStoragePlugin plugin, String schemaName) {
+ super(schemaName);
+ this.plugin = plugin;
+
+ databases = CacheBuilder
+ .newBuilder()
+ .expireAfterAccess(1, TimeUnit.MINUTES)
+ .build(new DatabaseLoader());
+ }
+
+ @Override
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+ try {
+ String userName = Optional.ofNullable(schemaConfig.getUserName()).orElse(ImpersonationUtil.getProcessUserName());
+ DrillPluginSchema schema = databases.get(Pair.of(getName(), userName));
+ SchemaPlus hPlus = parent.add(getName(), schema);
+ schema.setHolder(hPlus);
+ } catch (ExecutionException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ private class DatabaseLoader extends CacheLoader<Pair<String, String>, DrillPluginSchema> {
+ @Override
+ public DrillPluginSchema load(Pair<String, String> key) {
+ return new DrillPluginSchema(plugin, key.getKey(), key.getValue());
+ }
+ }
+}
diff --git a/contrib/storage-drill/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-drill/src/main/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..b1cb94d
--- /dev/null
+++ b/contrib/storage-drill/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,9 @@
+{
+ "storage":{
+ "drill" : {
+ "type":"drill",
+ "connection":"jdbc:drill:drillbit=localhost:31010",
+ "enabled": false
+ }
+ }
+}
diff --git a/contrib/storage-drill/src/main/resources/drill-module.conf b/contrib/storage-drill/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..33539df
--- /dev/null
+++ b/contrib/storage-drill/src/main/resources/drill-module.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file tells Drill to consider this module when class path scanning.
+# This file can also include any supplementary configuration information.
+# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.classpath.scanning: {
+ packages += "org.apache.drill.exec.store.drill.plugin"
+}
diff --git a/contrib/storage-drill/src/test/java/org/apache/drill/exec/store/drill/plugin/DrillPluginQueriesTest.java b/contrib/storage-drill/src/test/java/org/apache/drill/exec/store/drill/plugin/DrillPluginQueriesTest.java
new file mode 100644
index 0000000..2bef819
--- /dev/null
+++ b/contrib/storage-drill/src/test/java/org/apache/drill/exec/store/drill/plugin/DrillPluginQueriesTest.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class DrillPluginQueriesTest extends ClusterTest {
+
+ private static final String TABLE_NAME = "dfs.tmp.test_table";
+
+ private static ClusterFixture drill;
+ private static ClientFixture drillClient;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ initPlugin();
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception {
+ AutoCloseables.close(drill, drillClient);
+ }
+
+ private static void initPlugin() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+ drill = ClusterFixture.builder(dirTestWatcher).build();
+
+ DrillStoragePluginConfig config = new DrillStoragePluginConfig(
+ "jdbc:drill:drillbit=localhost:" + drill.drillbit().getUserPort(),
+ new Properties(), null, null);
+ config.setEnabled(true);
+ cluster.defineStoragePlugin("drill", config);
+ cluster.defineStoragePlugin("drill2", config);
+ drillClient = drill.clientFixture();
+
+ drillClient.queryBuilder()
+ .sql("create table %s as select * from cp.`tpch/nation.parquet`", TABLE_NAME)
+ .run();
+ }
+
+ @Test
+ public void testSerDe() throws Exception {
+ String plan = queryBuilder().sql("select * from drill.%s", TABLE_NAME).explainJson();
+ long count = queryBuilder().physical(plan).run().recordCount();
+ assertEquals(25, count);
+ }
+
+ @Test
+ public void testShowDatabases() throws Exception {
+ testBuilder()
+ .sqlQuery("show databases where SCHEMA_NAME='drill.dfs.tmp'")
+ .unOrdered()
+ .baselineColumns("SCHEMA_NAME")
+ .baselineValues("drill.dfs.tmp")
+ .go();
+ }
+
+ @Test
+ public void testShowTables() throws Exception {
+ testBuilder()
+ .sqlQuery("show tables IN drill.INFORMATION_SCHEMA")
+ .unOrdered()
+ .baselineColumns("TABLE_SCHEMA", "TABLE_NAME")
+ .baselineValues("drill.information_schema", "VIEWS")
+ .baselineValues("drill.information_schema", "CATALOGS")
+ .baselineValues("drill.information_schema", "COLUMNS")
+ .baselineValues("drill.information_schema", "PARTITIONS")
+ .baselineValues("drill.information_schema", "FILES")
+ .baselineValues("drill.information_schema", "SCHEMATA")
+ .baselineValues("drill.information_schema", "TABLES")
+ .go();
+ }
+
+ @Test
+ public void testProjectPushDown() throws Exception {
+ String query = "select n_nationkey, n_regionkey, n_name from drill.%s";
+
+ queryBuilder()
+ .sql(query, TABLE_NAME)
+ .planMatcher()
+ .include("query=\"SELECT `n_nationkey`, `n_regionkey`, `n_name`")
+ .exclude("\\*")
+ .match();
+
+ RowSet sets = queryBuilder()
+ .sql(query, TABLE_NAME)
+ .rowSet();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .add("n_nationkey", TypeProtos.MinorType.INT)
+ .add("n_regionkey", TypeProtos.MinorType.INT)
+ .add("n_name", TypeProtos.MinorType.VARCHAR)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow(0, 0, "ALGERIA")
+ .addRow(1, 1, "ARGENTINA")
+ .addRow(2, 1, "BRAZIL")
+ .addRow(3, 1, "CANADA")
+ .addRow(4, 4, "EGYPT")
+ .addRow(5, 0, "ETHIOPIA")
+ .addRow(6, 3, "FRANCE")
+ .addRow(7, 3, "GERMANY")
+ .addRow(8, 2, "INDIA")
+ .addRow(9, 2, "INDONESIA")
+ .addRow(10, 4, "IRAN")
+ .addRow(11, 4, "IRAQ")
+ .addRow(12, 2, "JAPAN")
+ .addRow(13, 4, "JORDAN")
+ .addRow(14, 0, "KENYA")
+ .addRow(15, 0, "MOROCCO")
+ .addRow(16, 0, "MOZAMBIQUE")
+ .addRow(17, 1, "PERU")
+ .addRow(18, 2, "CHINA")
+ .addRow(19, 3, "ROMANIA")
+ .addRow(20, 4, "SAUDI ARABIA")
+ .addRow(21, 2, "VIETNAM")
+ .addRow(22, 3, "RUSSIA")
+ .addRow(23, 3, "UNITED KINGDOM")
+ .addRow(24, 1, "UNITED STATES")
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+
+ @Test
+ public void testFilterPushDown() throws Exception {
+ String query = "select n_name, n_nationkey from drill.%s where n_nationkey = 0";
+ queryBuilder()
+ .sql(query, TABLE_NAME)
+ .planMatcher()
+ .include("WHERE")
+ .exclude("Filter")
+ .match();
+
+ testBuilder()
+ .unOrdered()
+ .sqlQuery(query, TABLE_NAME)
+ .baselineColumns("n_name", "n_nationkey")
+ .baselineValues("ALGERIA", 0)
+ .go();
+ }
+
+ @Test
+ public void testFilterPushDownWithJoin() throws Exception {
+ String query = "select * from drill.%s e\n" +
+ "join drill.%s s on e.n_nationkey = s.n_nationkey where e.n_name = 'BRAZIL'";
+
+ queryBuilder()
+ .sql(query, TABLE_NAME, TABLE_NAME)
+ .planMatcher()
+ .include("INNER JOIN")
+ .match();
+
+ testBuilder()
+ .ordered()
+ .sqlQuery(query, TABLE_NAME, TABLE_NAME)
+ .baselineColumns("n_nationkey", "n_name", "n_regionkey", "n_comment", "n_nationkey0",
+ "n_name0", "n_regionkey0", "n_comment0")
+ .baselineValues(2, "BRAZIL", 1, "y alongside of the pending deposits. carefully special " +
+ "packages are about the ironic forges. slyly special ", 2, "BRAZIL", 1, "y alongside of " +
+ "the pending deposits. carefully special packages are about the ironic forges. slyly special ")
+ .go();
+ }
+
+ @Test
+ public void testJoinDifferentDrillPlugins() throws Exception {
+ String query = "select * from drill.%s e\n" +
+ "join drill2.cp.`tpch/nation.parquet` s on e.n_nationkey = s.n_nationkey where e.n_name = 'BRAZIL'";
+
+ queryBuilder()
+ .sql(query, TABLE_NAME, TABLE_NAME)
+ .planMatcher()
+ .include("HashJoin")
+ .exclude("INNER JOIN")
+ .match();
+
+ testBuilder()
+ .unOrdered()
+ .sqlQuery(query, TABLE_NAME, TABLE_NAME)
+ .baselineColumns("n_nationkey", "n_name", "n_regionkey", "n_comment", "n_nationkey0",
+ "n_name0", "n_regionkey0", "n_comment0")
+ .baselineValues(2, "BRAZIL", 1, "y alongside of the pending deposits. carefully special " +
+ "packages are about the ironic forges. slyly special ", 2, "BRAZIL", 1, "y alongside of " +
+ "the pending deposits. carefully special packages are about the ironic forges. slyly special ")
+ .go();
+ }
+
+ @Test
+ public void testAggregationPushDown() throws Exception {
+ String query = "select count(*) c from drill.%s";
+ queryBuilder()
+ .sql(query, TABLE_NAME)
+ .planMatcher()
+ .include("query=\"SELECT COUNT\\(\\*\\)")
+ .match();
+
+ testBuilder()
+ .unOrdered()
+ .sqlQuery(query, TABLE_NAME)
+ .baselineColumns("c")
+ .baselineValues(25L)
+ .go();
+ }
+
+ @Test
+ public void testLimitPushDown() throws Exception {
+ String query = "select n_name from drill.%s FETCH NEXT 1 ROWS ONLY";
+ queryBuilder()
+ .sql(query, TABLE_NAME)
+ .planMatcher()
+ .include("FETCH NEXT 1 ROWS ONLY")
+ .match();
+
+ testBuilder()
+ .unOrdered()
+ .sqlQuery(query, TABLE_NAME)
+ .baselineColumns("n_name")
+ .baselineValues("ALGERIA")
+ .go();
+ }
+
+ @Test
+ public void testLimitWithSortPushDown() throws Exception {
+ String query = "select n_nationkey from drill.%s order by n_name limit 3";
+ queryBuilder()
+ .sql(query, TABLE_NAME)
+ .planMatcher()
+ .include("ORDER BY `n_name`", "FETCH NEXT 3 ROWS ONLY")
+ .match();
+
+ testBuilder()
+ .unOrdered()
+ .sqlQuery(query, TABLE_NAME)
+ .baselineColumns("n_nationkey")
+ .baselineValues(0)
+ .baselineValues(1)
+ .baselineValues(2)
+ .go();
+ }
+
+ @Test
+ public void testAggregationWithGroupByPushDown() throws Exception {
+ String query = "select sum(n_nationkey) s from drill.%s group by n_regionkey";
+ queryBuilder()
+ .sql(query, TABLE_NAME)
+ .planMatcher()
+ .include("query=\"SELECT SUM\\(`n_nationkey`\\)", "GROUP BY `n_regionkey`")
+ .match();
+
+ testBuilder()
+ .unOrdered()
+ .sqlQuery(query, TABLE_NAME)
+ .baselineColumns("s")
+ .baselineValues(47L)
+ .baselineValues(50L)
+ .baselineValues(58L)
+ .baselineValues(68L)
+ .baselineValues(77L)
+ .go();
+ }
+
+ @Test
+ public void testUnionAllPushDown() throws Exception {
+ String query = "select col1, col2 from drill.%s " +
+ "union all " +
+ "select col1, col2 from drill.%s";
+ queryBuilder()
+ .sql(query, TABLE_NAME, TABLE_NAME)
+ .planMatcher()
+ .include("UNION ALL")
+ .match();
+
+ long recordCount = queryBuilder()
+ .sql(query, TABLE_NAME, TABLE_NAME)
+ .run()
+ .recordCount();
+
+ assertEquals(50L, recordCount);
+ }
+}
diff --git a/distribution/pom.xml b/distribution/pom.xml
index f0fb953..eff3b81 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -489,6 +489,11 @@
<artifactId>drill-deltalake-format</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.drill.contrib</groupId>
+ <artifactId>drill-storage</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</profile>
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 7708be4..36c7133 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -70,6 +70,7 @@
<include>org.apache.drill.contrib:drill-druid-storage:jar</include>
<include>org.apache.drill.contrib:drill-iceberg-format:jar</include>
<include>org.apache.drill.contrib:drill-deltalake-format:jar</include>
+ <include>org.apache.drill.contrib:drill-storage:jar</include>
</includes>
<outputDirectory>jars</outputDirectory>
<useProjectArtifact>false</useProjectArtifact>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index ded472e..740f899 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -308,6 +308,8 @@
/** Size of JDBC batch queue (in batches) above which throttling begins. */
public static final String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
"drill.jdbc.batch_queue_throttling_threshold";
+ public static final String JDBC_QUERY_TIMEOUT =
+ "drill.jdbc.query_timeout";
// Thread pool size for scan threads. Used by the Parquet scan.
public static final String SCAN_THREADPOOL_SIZE = "drill.exec.scan.threadpool_size";
// The size of the thread pool used by a scan to decode the data. Used by Parquet
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LeafPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LeafPrel.java
index 55a7955..bbd992f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LeafPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LeafPrel.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.planner.physical;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+
import java.util.Collections;
import java.util.Iterator;
@@ -34,4 +36,9 @@
default Iterator<Prel> iterator() {
return Collections.emptyIterator();
}
+
+ @Override
+ default <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+ return logicalVisitor.visitLeaf(this, value);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
index 8d91628..03a72a6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
@@ -193,7 +193,8 @@
// Require prefix rename : there exists other expression, in addition to a star column.
if (!prefixedForStar // not set yet.
&& StarColumnHelper.containsStarColumn(prel.getRowType())
- && prel.getRowType().getFieldNames().size() > 1) {
+ && prel.getRowType().getFieldNames().size() > 1
+ && !(prel instanceof LeafPrel)) {
prefixedForStar = true;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
index 29a75aa..03908a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
@@ -20,6 +20,7 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
@@ -74,19 +75,20 @@
@Override
public Prel visitScreen(ScreenPrel prel, Void value) {
- // insert project under screen only if we don't have writer underneath
- if (containsWriter(prel)) {
+ // insert project under screen only if we don't have writer underneath or dynamic star is projected
+ if (containsWriter(prel)
+ || prel.getRowType().getFieldList().stream().allMatch(RelDataTypeField::isDynamicStar)) {
return prel;
}
Prel newChild = ((Prel) prel.getInput()).accept(this, value);
- return prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode)addTopProjectPrel(newChild, validatedRowType)));
+ return prel.copy(prel.getTraitSet(), Collections.singletonList(addTopProjectPrel(newChild, validatedRowType)));
}
@Override
public Prel visitWriter(WriterPrel prel, Void value) {
Prel newChild = ((Prel) prel.getInput()).accept(this, value);
- return prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode)addTopProjectPrel(newChild, validatedRowType)));
+ return prel.copy(prel.getTraitSet(), Collections.singletonList(addTopProjectPrel(newChild, validatedRowType)));
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/BlockingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/BlockingResultsListener.java
new file mode 100644
index 0000000..976904b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/BlockingResultsListener.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.user;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLTimeoutException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class BlockingResultsListener implements UserResultsListener {
+ private static final Logger logger = LoggerFactory.getLogger(BlockingResultsListener.class);
+
+ private static final AtomicInteger NEXT_INSTANCE_ID = new AtomicInteger(1);
+
+ private final int instanceId;
+
+ private final int batchQueueThrottlingThreshold;
+
+ private volatile UserBitShared.QueryId queryId;
+
+ private int lastReceivedBatchNumber;
+
+ private int lastDequeuedBatchNumber;
+
+ private volatile UserException executionFailureException;
+
+ private volatile boolean completed;
+
+ /**
+ * Whether throttling of incoming data is active.
+ */
+ private final AtomicBoolean throttled = new AtomicBoolean(false);
+
+ private volatile ConnectionThrottle throttle;
+
+ private volatile boolean closed;
+
+ private final CountDownLatch firstMessageReceived = new CountDownLatch(1);
+
+ private final LinkedBlockingDeque<QueryDataBatch> batchQueue =
+ Queues.newLinkedBlockingDeque();
+
+ private final Supplier<Stopwatch> elapsedTimer;
+
+ private final Supplier<Long> timeoutInMilliseconds;
+
+ public BlockingResultsListener(Supplier<Stopwatch> elapsedTimer, Supplier<Long> timeoutInMilliseconds,
+ int batchQueueThrottlingThreshold) {
+ this.elapsedTimer = elapsedTimer;
+ this.timeoutInMilliseconds = timeoutInMilliseconds;
+ this.instanceId = NEXT_INSTANCE_ID.getAndIncrement();
+ this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
+ logger.debug("[#{}] Query listener created.", instanceId);
+ }
+
+ /**
+ * Starts throttling if not currently throttling.
+ *
+ * @param throttle the "throttlable" object to throttle
+ * @return true if actually started (wasn't throttling already)
+ */
+ private boolean startThrottlingIfNot(ConnectionThrottle throttle) {
+ final boolean started = throttled.compareAndSet(false, true);
+ if (started) {
+ this.throttle = throttle;
+ throttle.setAutoRead(false);
+ }
+ return started;
+ }
+
+ /**
+ * Stops throttling if currently throttling.
+ *
+ * @return true if actually stopped (was throttling)
+ */
+ private boolean stopThrottlingIfSo() {
+ final boolean stopped = throttled.compareAndSet(true, false);
+ if (stopped) {
+ throttle.setAutoRead(true);
+ throttle = null;
+ }
+ return stopped;
+ }
+
+ public void awaitFirstMessage() throws InterruptedException, SQLTimeoutException {
+ //Check if a non-zero timeout has been set
+ if (timeoutInMilliseconds.get() > 0) {
+ //Identifying remaining in milliseconds to maintain a granularity close to integer value of
+ // timeout
+ long timeToTimeout =
+ timeoutInMilliseconds.get() - elapsedTimer.get().elapsed(TimeUnit.MILLISECONDS);
+ if (timeToTimeout <= 0 || !firstMessageReceived.await(timeToTimeout, TimeUnit.MILLISECONDS)) {
+ throw new SQLTimeoutException("Query timed out in "+ TimeUnit.MILLISECONDS.toSeconds(timeoutInMilliseconds.get()) + " seconds");
+ }
+ } else {
+ firstMessageReceived.await();
+ }
+ }
+
+ private void releaseIfFirst() {
+ firstMessageReceived.countDown();
+ }
+
+ @Override
+ public void queryIdArrived(UserBitShared.QueryId queryId) {
+ logger.debug("[#{}] Received query ID: {}.",
+ instanceId, QueryIdHelper.getQueryId(queryId));
+ this.queryId = queryId;
+ }
+
+ @Override
+ public void submissionFailed(UserException ex) {
+ logger.debug("Received query failure: {} {}", instanceId, ex);
+ this.executionFailureException = ex;
+ this.completed = true;
+ close();
+ logger.info("[#{}] Query failed: ", instanceId, ex);
+ }
+
+ @Override
+ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+ lastReceivedBatchNumber++;
+ logger.debug("[#{}] Received query data batch #{}: {}.",
+ instanceId, lastReceivedBatchNumber, result);
+
+ // If we're in a closed state, just release the message.
+ if (closed) {
+ result.release();
+ completed = true;
+ return;
+ }
+
+ // We're active; let's add to the queue.
+ batchQueue.add(result);
+
+ // Throttle server if queue size has exceed threshold.
+ if (batchQueue.size() > batchQueueThrottlingThreshold) {
+ if (startThrottlingIfNot(throttle)) {
+ logger.debug("[#{}] Throttling started at queue size {}.",
+ instanceId, batchQueue.size());
+ }
+ }
+
+ releaseIfFirst();
+ }
+
+ @Override
+ public void queryCompleted(UserBitShared.QueryResult.QueryState state) {
+ logger.debug("[#{}] Received query completion: {}.", instanceId, state);
+ releaseIfFirst();
+ completed = true;
+ }
+
+ public UserBitShared.QueryId getQueryId() {
+ return queryId;
+ }
+
+ /**
+ * Gets the next batch of query results from the queue.
+ *
+ * @return the next batch, or {@code null} after last batch has been returned
+ * @throws UserException if the query failed
+ * @throws InterruptedException if waiting on the queue was interrupted
+ */
+ public QueryDataBatch getNext() throws UserException, InterruptedException, SQLTimeoutException {
+ while (true) {
+ if (executionFailureException != null) {
+ logger.debug("[#{}] Dequeued query failure exception: {}.",
+ instanceId, executionFailureException);
+ throw executionFailureException;
+ }
+ if (completed && batchQueue.isEmpty()) {
+ return null;
+ } else {
+ QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
+ if (qdb != null) {
+ lastDequeuedBatchNumber++;
+ logger.debug("[#{}] Dequeued query data batch #{}: {}.",
+ instanceId, lastDequeuedBatchNumber, qdb);
+
+ // Unthrottle server if queue size has dropped enough below threshold:
+ if (batchQueue.size() < batchQueueThrottlingThreshold / 2
+ || batchQueue.size() == 0 // (in case threshold < 2)
+ ) {
+ if (stopThrottlingIfSo()) {
+ logger.debug("[#{}] Throttling stopped at queue size {}.",
+ instanceId, batchQueue.size());
+ }
+ }
+ return qdb;
+ }
+
+ // Check and throw SQLTimeoutException
+ if (timeoutInMilliseconds.get() > 0 && elapsedTimer.get().elapsed(TimeUnit.MILLISECONDS) >= timeoutInMilliseconds.get()) {
+ throw new SQLTimeoutException("Query timed out in "+ TimeUnit.MILLISECONDS.toSeconds(timeoutInMilliseconds.get()) + " seconds");
+ }
+ }
+ }
+ }
+
+ public void close() {
+ logger.debug("[#{}] Query listener closing.", instanceId);
+ closed = true;
+ if (stopThrottlingIfSo()) {
+ logger.debug("[#{}] Throttling stopped at close() (at queue size {}).",
+ instanceId, batchQueue.size());
+ }
+ while (!batchQueue.isEmpty()) {
+ // Don't bother with query timeout, we're closing the cursor
+ QueryDataBatch qdb = batchQueue.poll();
+ if (qdb != null && qdb.getData() != null) {
+ qdb.getData().release();
+ }
+ }
+ // Close may be called before the first result is received and therefore
+ // when the main thread is blocked waiting for the result. In that case
+ // we want to unblock the main thread.
+ releaseIfFirst();
+ completed = true;
+ }
+
+ public boolean isCompleted() {
+ return completed;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java
index 2d6e5c8..f22c69a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java
@@ -31,7 +31,6 @@
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.physical.LeafPrel;
import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
-import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
@@ -105,11 +104,6 @@
}
@Override
- public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
- return logicalVisitor.visitLeaf(this, value);
- }
-
- @Override
public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
return BatchSchema.SelectionVectorMode.DEFAULT;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java
index cc96426..1d4fff8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java
@@ -25,21 +25,18 @@
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.LeafPrel;
import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
-import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.store.SubsetRemover;
import org.apache.drill.exec.store.plan.PluginImplementor;
import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
/**
* Represents a plugin-specific plan once children nodes have been pushed down into group scan.
*/
-public class PluginPrel extends AbstractRelNode implements Prel {
+public class PluginPrel extends AbstractRelNode implements LeafPrel {
private final GroupScan groupScan;
private final RelDataType rowType;
@@ -62,11 +59,6 @@
}
@Override
- public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
- return logicalVisitor.visitPrel(this, value);
- }
-
- @Override
public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
return BatchSchema.SelectionVectorMode.DEFAULT;
}
@@ -82,11 +74,6 @@
}
@Override
- public Iterator<Prel> iterator() {
- return Collections.emptyIterator();
- }
-
- @Override
public RelWriter explainTerms(RelWriter pw) {
return super.explainTerms(pw).item("groupScan", groupScan);
}
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 5468f26..4dc45ec 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -507,7 +507,8 @@
}
drill.jdbc: {
- batch_queue_throttling_threshold: 100
+ batch_queue_throttling_threshold: 100,
+ query_timeout: 600000
}
# The following are defaults for system and session options.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java
index 891b4a6..d27bec1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java
@@ -193,7 +193,7 @@
SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext);
PlanningSet planningSet = preparePlanningSet(activeEndpoints, DEFAULT_SLICE_TARGET, resources, sql, parallelizer);
parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints);
- assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 30));
+ assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 20));
}
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java
deleted file mode 100644
index 7087a8c..0000000
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.jdbc;
-
-import java.sql.SQLTimeoutException;
-
-/**
- * Indicates that an operation timed out. This is not an error; you can
- * retry the operation.
- */
-public class SqlTimeoutException extends SQLTimeoutException {
- private static final long serialVersionUID = 2017_04_03L;
-
- SqlTimeoutException() {
- // SQLException(reason, SQLState, vendorCode)
- // REVIEW mb 19-Jul-05 Is there a standard SQLState?
- super("timeout", null, 0);
- }
-
- public SqlTimeoutException(long timeoutValueInSeconds) {
- super("Query timed out in "+ timeoutValueInSeconds + " seconds");
- }
-}
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
index 65d94db..f627b3f 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
@@ -20,15 +20,12 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.sql.SQLException;
-import java.sql.SQLTimeoutException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.drill.exec.rpc.user.BlockingResultsListener;
import org.apache.drill.jdbc.DrillStatement;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.calcite.avatica.AvaticaStatement;
@@ -40,249 +37,18 @@
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.proto.UserProtos.PreparedStatement;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
import org.apache.drill.jdbc.SchemaChangeListener;
-import org.apache.drill.jdbc.SqlTimeoutException;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
public class DrillCursor implements Cursor {
- ////////////////////////////////////////
- // ResultsListener:
- static class ResultsListener implements UserResultsListener {
- private static final Logger logger = LoggerFactory.getLogger(ResultsListener.class);
-
- private static volatile int nextInstanceId = 1;
-
- /** (Just for logging.) */
- private final int instanceId;
-
- private final int batchQueueThrottlingThreshold;
-
- /** (Just for logging.) */
- private volatile QueryId queryId;
-
- /** (Just for logging.) */
- private int lastReceivedBatchNumber;
- /** (Just for logging.) */
- private int lastDequeuedBatchNumber;
-
- private volatile UserException executionFailureException;
-
- // TODO: Revisit "completed". Determine and document exactly what it
- // means. Some uses imply that it means that incoming messages indicate
- // that the _query_ has _terminated_ (not necessarily _completing_
- // normally), while some uses imply that it's some other state of the
- // ResultListener. Some uses seem redundant.)
- volatile boolean completed;
-
- /** Whether throttling of incoming data is active. */
- private final AtomicBoolean throttled = new AtomicBoolean(false);
- private volatile ConnectionThrottle throttle;
-
- private volatile boolean closed;
-
- private final CountDownLatch firstMessageReceived = new CountDownLatch(1);
-
- final LinkedBlockingDeque<QueryDataBatch> batchQueue =
- Queues.newLinkedBlockingDeque();
-
- private final DrillCursor parent;
- Stopwatch elapsedTimer;
-
- /**
- * ...
- * @param parent
- * reference to DrillCursor
- * @param batchQueueThrottlingThreshold
- * queue size threshold for throttling server
- */
- ResultsListener(DrillCursor parent, int batchQueueThrottlingThreshold) {
- this.parent = parent;
- instanceId = nextInstanceId++;
- this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
- logger.debug("[#{}] Query listener created.", instanceId);
- }
-
- /**
- * Starts throttling if not currently throttling.
- * @param throttle the "throttlable" object to throttle
- * @return true if actually started (wasn't throttling already)
- */
- private boolean startThrottlingIfNot(ConnectionThrottle throttle) {
- final boolean started = throttled.compareAndSet(false, true);
- if (started) {
- this.throttle = throttle;
- throttle.setAutoRead(false);
- }
- return started;
- }
-
- /**
- * Stops throttling if currently throttling.
- * @return true if actually stopped (was throttling)
- */
- private boolean stopThrottlingIfSo() {
- final boolean stopped = throttled.compareAndSet(true, false);
- if (stopped) {
- throttle.setAutoRead(true);
- throttle = null;
- }
- return stopped;
- }
-
- public void awaitFirstMessage() throws InterruptedException, SQLTimeoutException {
- //Check if a non-zero timeout has been set
- if (parent.timeoutInMilliseconds > 0) {
- //Identifying remaining in milliseconds to maintain a granularity close to integer value of timeout
- long timeToTimeout = parent.timeoutInMilliseconds - parent.elapsedTimer.elapsed(TimeUnit.MILLISECONDS);
- if (timeToTimeout <= 0 || !firstMessageReceived.await(timeToTimeout, TimeUnit.MILLISECONDS)) {
- throw new SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(parent.timeoutInMilliseconds));
- }
- } else {
- firstMessageReceived.await();
- }
- }
-
- private void releaseIfFirst() {
- firstMessageReceived.countDown();
- }
-
- @Override
- public void queryIdArrived(QueryId queryId) {
- logger.debug("[#{}] Received query ID: {}.",
- instanceId, QueryIdHelper.getQueryId(queryId));
- this.queryId = queryId;
- }
-
- @Override
- public void submissionFailed(UserException ex) {
- logger.debug("Received query failure: {} {}", instanceId, ex);
- this.executionFailureException = ex;
- completed = true;
- close();
- logger.info("[#{}] Query failed: ", instanceId, ex);
- }
-
- @Override
- public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
- lastReceivedBatchNumber++;
- logger.debug("[#{}] Received query data batch #{}: {}.",
- instanceId, lastReceivedBatchNumber, result);
-
- // If we're in a closed state, just release the message.
- if (closed) {
- result.release();
- // TODO: Revisit member completed: Is ResultListener really completed
- // after only one data batch after being closed?
- completed = true;
- return;
- }
-
- // We're active; let's add to the queue.
- batchQueue.add(result);
-
- // Throttle server if queue size has exceed threshold.
- if (batchQueue.size() > batchQueueThrottlingThreshold) {
- if (startThrottlingIfNot(throttle)) {
- logger.debug("[#{}] Throttling started at queue size {}.",
- instanceId, batchQueue.size());
- }
- }
-
- releaseIfFirst();
- }
-
- @Override
- public void queryCompleted(QueryState state) {
- logger.debug("[#{}] Received query completion: {}.", instanceId, state);
- releaseIfFirst();
- completed = true;
- }
-
- QueryId getQueryId() {
- return queryId;
- }
-
- /**
- * Gets the next batch of query results from the queue.
- * @return the next batch, or {@code null} after last batch has been returned
- * @throws UserException
- * if the query failed
- * @throws InterruptedException
- * if waiting on the queue was interrupted
- */
- QueryDataBatch getNext() throws UserException, InterruptedException, SQLTimeoutException {
- while (true) {
- if (executionFailureException != null) {
- logger.debug("[#{}] Dequeued query failure exception: {}.",
- instanceId, executionFailureException);
- throw executionFailureException;
- }
- if (completed && batchQueue.isEmpty()) {
- return null;
- } else {
- QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
- if (qdb != null) {
- lastDequeuedBatchNumber++;
- logger.debug("[#{}] Dequeued query data batch #{}: {}.",
- instanceId, lastDequeuedBatchNumber, qdb);
-
- // Unthrottle server if queue size has dropped enough below threshold:
- if (batchQueue.size() < batchQueueThrottlingThreshold / 2
- || batchQueue.size() == 0 // (in case threshold < 2)
- ) {
- if (stopThrottlingIfSo()) {
- logger.debug("[#{}] Throttling stopped at queue size {}.",
- instanceId, batchQueue.size());
- }
- }
- return qdb;
- }
-
- // Check and throw SQLTimeoutException
- if (parent.timeoutInMilliseconds > 0 && parent.elapsedTimer.elapsed(TimeUnit.MILLISECONDS) >= parent.timeoutInMilliseconds) {
- throw new SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(parent.timeoutInMilliseconds));
- }
- }
- }
- }
-
- void close() {
- logger.debug("[#{}] Query listener closing.", instanceId);
- closed = true;
- if (stopThrottlingIfSo()) {
- logger.debug("[#{}] Throttling stopped at close() (at queue size {}).",
- instanceId, batchQueue.size());
- }
- while (!batchQueue.isEmpty()) {
- // Don't bother with query timeout, we're closing the cursor
- QueryDataBatch qdb = batchQueue.poll();
- if (qdb != null && qdb.getData() != null) {
- qdb.getData().release();
- }
- }
- // Close may be called before the first result is received and therefore
- // when the main thread is blocked waiting for the result. In that case
- // we want to unblock the main thread.
- firstMessageReceived.countDown(); // TODO: Why not call releaseIfFirst as used elsewhere?
- completed = true;
- }
- }
-
private static final Logger logger = getLogger(DrillCursor.class);
/** JDBC-specified string for unknown catalog, schema, and table names. */
@@ -295,7 +61,7 @@
/** Holds current batch of records (none before first load). */
private final RecordBatchLoader currentBatchHolder;
- private final ResultsListener resultsListener;
+ private final BlockingResultsListener resultsListener;
private SchemaChangeListener changeListener;
private final DrillAccessorList accessors = new DrillAccessorList();
@@ -355,7 +121,9 @@
final int batchQueueThrottlingThreshold =
client.getConfig().getInt(
ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD);
- resultsListener = new ResultsListener(this, batchQueueThrottlingThreshold);
+ resultsListener = new BlockingResultsListener(this::getElapsedTimer,
+ this::getTimeoutInMilliseconds,
+ batchQueueThrottlingThreshold);
currentBatchHolder = new RecordBatchLoader(client.getAllocator());
// Set Query Timeout
@@ -395,7 +163,7 @@
}
synchronized void cleanup() {
- if (resultsListener.getQueryId() != null && ! resultsListener.completed) {
+ if (resultsListener.getQueryId() != null && ! resultsListener.isCompleted()) {
connection.getClient().cancelQuery(resultsListener.getQueryId());
}
resultsListener.close();
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 0ce1e79..86dac42 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -30,6 +30,7 @@
import java.sql.RowId;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLTimeoutException;
import java.sql.SQLType;
import java.sql.SQLXML;
import java.sql.Time;
@@ -48,7 +49,6 @@
import org.apache.drill.jdbc.AlreadyClosedSqlException;
import org.apache.drill.jdbc.DrillResultSet;
import org.apache.drill.jdbc.ExecutionCanceledSqlException;
-import org.apache.drill.jdbc.SqlTimeoutException;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
@@ -100,7 +100,7 @@
if (elapsedTimer != null) {
//The timer has already been started by the DrillCursor at this point
if (elapsedTimer.elapsed(TimeUnit.MILLISECONDS) > this.queryTimeoutInMilliseconds) {
- throw new SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(this.queryTimeoutInMilliseconds));
+ throw new SQLTimeoutException("Query timed out in "+ TimeUnit.MILLISECONDS.toSeconds(queryTimeoutInMilliseconds) + " seconds");
}
}
}
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java
index f8ce2e2..43fe2df 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java
@@ -347,7 +347,7 @@
* Test setting timeout for a query that actually times out because of lack of timely server response
*/
@Ignore ( "Pause Injection appears broken for PreparedStatement" )
- @Test ( expected = SqlTimeoutException.class )
+ @Test ( expected = SQLTimeoutException.class )
public void testServerTriggeredQueryTimeout() throws Exception {
//Setting to a very low value (2sec)
int timeoutDuration = 2;
@@ -383,7 +383,7 @@
}
} catch (SQLTimeoutException sqlEx) {
logger.info("SQLTimeoutException thrown: {}", sqlEx.getMessage());
- throw (SqlTimeoutException) sqlEx;
+ throw sqlEx;
} finally {
//Pause briefly to wait for server to unblock
try {
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java
index de15b05..2da66be 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java
@@ -173,7 +173,7 @@
/**
* Test setting timeout for a query that actually times out because of lack of timely server response
*/
- @Test ( expected = SqlTimeoutException.class )
+ @Test ( expected = SQLTimeoutException.class )
public void testServerTriggeredQueryTimeout() throws Exception {
// Setting to a very low value (2sec)
int timeoutDuration = 2;
@@ -209,7 +209,7 @@
}
} catch (SQLTimeoutException sqlEx) {
logger.info("SQLTimeoutException thrown: {}", sqlEx.getMessage());
- throw (SqlTimeoutException) sqlEx;
+ throw sqlEx;
} finally {
// Pause briefly to wait for server to unblock
try {