DRILL-7799: REST Queries Fail if Caching is Enabled
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index 1b322dd..83058e2 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -40,10 +40,13 @@
public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
private final HttpSubScan subScan;
+ private final int maxRecords;
private JsonLoader jsonLoader;
+ private int recordCount;
public HttpBatchReader(HttpSubScan subScan) {
this.subScan = subScan;
+ this.maxRecords = subScan.maxRecords();
}
@Override
@@ -141,6 +144,12 @@
@Override
public boolean next() {
+ recordCount++;
+
+ // Stop after the limit has been reached
+ if (maxRecords >= 1 && recordCount > maxRecords) {
+ return false;
+ }
return jsonLoader.readBatch();
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
index f7db13b..831f6bd 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
@@ -44,6 +44,7 @@
public class HttpCSVBatchReader extends HttpBatchReader {
private final HttpSubScan subScan;
private final CsvParserSettings csvSettings;
+ private final int maxRecords;
private CsvParser csvReader;
private List<StringColumnWriter> columnWriters;
private String[] firstRow;
@@ -55,6 +56,7 @@
public HttpCSVBatchReader(HttpSubScan subScan) {
super(subScan);
this.subScan = subScan;
+ this.maxRecords = subScan.maxRecords();
this.csvSettings = new CsvParserSettings();
csvSettings.setLineSeparatorDetectionEnabled(true);
@@ -102,6 +104,10 @@
@Override
public boolean next() {
while (!rowWriter.isFull()) {
+ if (rowWriter.limitReached(maxRecords)) {
+ return false;
+ }
+
if (!processRow()) {
return false;
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
index ff0f7d6..4f2d9e1 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
@@ -47,6 +47,7 @@
private final Map<String, String> filters;
private final ScanStats scanStats;
private final double filterSelectivity;
+ private final int maxRecords;
// Used only in planner, not serialized
private int hashCode;
@@ -61,6 +62,7 @@
this.filters = null;
this.filterSelectivity = 0.0;
this.scanStats = computeScanStats();
+ this.maxRecords = -1;
}
/**
@@ -72,6 +74,7 @@
this.columns = that.columns;
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
+ this.maxRecords = that.maxRecords;
// Calcite makes many copies in the later stage of planning
// without changing anything. Retain the previous stats.
@@ -92,6 +95,7 @@
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.scanStats = computeScanStats();
+ this.maxRecords = that.maxRecords;
}
/**
@@ -107,9 +111,26 @@
this.filters = filters;
this.filterSelectivity = filterSelectivity;
this.scanStats = computeScanStats();
+ this.maxRecords = that.maxRecords;
}
/**
+ * Adds a limit to the scan.
+ */
+ public HttpGroupScan(HttpGroupScan that, int maxRecords) {
+ super(that);
+ this.columns = that.columns;
+ this.httpScanSpec = that.httpScanSpec;
+
+ // Applies a filter.
+ this.filters = that.filters;
+ this.filterSelectivity = that.filterSelectivity;
+ this.scanStats = computeScanStats();
+ this.maxRecords = maxRecords;
+ }
+
+
+ /**
* Deserialize a group scan. Not called in normal operation. Probably used
* only if Drill executes a logical plan.
*/
@@ -118,7 +139,8 @@
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("httpScanSpec") HttpScanSpec httpScanSpec,
@JsonProperty("filters") Map<String, String> filters,
- @JsonProperty("filterSelectivity") double selectivity
+ @JsonProperty("filterSelectivity") double selectivity,
+ @JsonProperty("maxRecords") int maxRecords
) {
super("no-user");
this.columns = columns;
@@ -126,6 +148,7 @@
this.filters = filters;
this.filterSelectivity = selectivity;
this.scanStats = computeScanStats();
+ this.maxRecords = maxRecords;
}
@JsonProperty("columns")
@@ -161,7 +184,7 @@
@Override
public SubScan getSpecificScan(int minorFragmentId) {
- return new HttpSubScan(httpScanSpec, columns, filters);
+ return new HttpSubScan(httpScanSpec, columns, filters, maxRecords);
}
@Override
@@ -175,6 +198,10 @@
return toString();
}
+ @JsonProperty("maxRecords")
+ public int maxRecords() { return maxRecords; }
+
+
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
@@ -226,11 +253,25 @@
}
@Override
+ public boolean supportsLimitPushdown() {
+ return true;
+ }
+
+ @Override
+ public GroupScan applyLimit(int maxRecords) {
+ if (maxRecords == this.maxRecords) {
+ return null;
+ }
+ return new HttpGroupScan(this, maxRecords);
+ }
+
+ @Override
public String toString() {
return new PlanStringBuilder(this)
.field("scan spec", httpScanSpec)
.field("columns", columns)
.field("filters", filters)
+ .field("maxRecords", maxRecords)
.toString();
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
index 1706c7c..d818824 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
@@ -41,16 +41,20 @@
private final HttpScanSpec tableSpec;
private final List<SchemaPath> columns;
private final Map<String, String> filters;
+ private final int maxRecords;
@JsonCreator
public HttpSubScan(
@JsonProperty("tableSpec") HttpScanSpec tableSpec,
@JsonProperty("columns") List<SchemaPath> columns,
- @JsonProperty("filters") Map<String, String> filters) {
+ @JsonProperty("filters") Map<String, String> filters,
+ @JsonProperty("maxRecords") int maxRecords
+ ) {
super("user-if-needed");
this.tableSpec = tableSpec;
this.columns = columns;
this.filters = filters;
+ this.maxRecords = maxRecords;
}
@JsonProperty("tableSpec")
@@ -68,6 +72,11 @@
return filters;
}
+ @JsonProperty("maxRecords")
+ public int maxRecords() {
+ return maxRecords;
+ }
+
@Override
public <T, X, E extends Throwable> T accept(
PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
@@ -76,7 +85,7 @@
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
- return new HttpSubScan(tableSpec, columns, filters);
+ return new HttpSubScan(tableSpec, columns, filters, maxRecords);
}
@Override
@@ -96,6 +105,7 @@
.field("tableSpec", tableSpec)
.field("columns", columns)
.field("filters", filters)
+ .field("maxRecords", maxRecords)
.toString();
}
@@ -115,6 +125,7 @@
HttpSubScan other = (HttpSubScan) obj;
return Objects.equals(tableSpec, other.tableSpec)
&& Objects.equals(columns, other.columns)
- && Objects.equals(filters, other.filters);
+ && Objects.equals(filters, other.filters)
+ && Objects.equals(maxRecords, other.maxRecords);
}
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 8918e74..5cb9991 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -206,14 +206,18 @@
private void setupCache(Builder builder) {
int cacheSize = 10 * 1024 * 1024; // TODO Add cache size in MB to config
File cacheDirectory = new File(tempDir, "http-cache");
- if (!cacheDirectory.mkdirs()) {
- throw UserException.dataWriteError()
- .message("Could not create the HTTP cache directory")
- .addContext("Path", cacheDirectory.getAbsolutePath())
- .addContext("Please check the temp directory or disable HTTP caching.")
- .addContext(errorContext)
- .build(logger);
+ if (!cacheDirectory.exists()) {
+ if (!cacheDirectory.mkdirs()) {
+ throw UserException
+ .dataWriteError()
+ .message("Could not create the HTTP cache directory")
+ .addContext("Path", cacheDirectory.getAbsolutePath())
+ .addContext("Please check the temp directory or disable HTTP caching.")
+ .addContext(errorContext)
+ .build(logger);
+ }
}
+
try {
Cache cache = new Cache(cacheDirectory, cacheSize);
logger.debug("Caching HTTP Query Results at: {}", cacheDirectory);
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 4c20e99..442aa5f 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -452,6 +452,17 @@
}
@Test
+ public void testLimitPushdown() throws Exception {
+ String sql = "SELECT sunrise, sunset FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1 LIMIT 5";
+
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("Limit", "maxRecords=5")
+ .match();
+ }
+
+ @Test
public void testSlowResponse() throws Exception {
try (MockWebServer server = startServer()) {