DRILL-7799: REST Queries Fail if Caching is Enabled
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index 1b322dd..83058e2 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -40,10 +40,13 @@
 
 public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
   private final HttpSubScan subScan;
+  private final int maxRecords;
   private JsonLoader jsonLoader;
+  private int recordCount;
 
   public HttpBatchReader(HttpSubScan subScan) {
     this.subScan = subScan;
+    this.maxRecords = subScan.maxRecords();
   }
 
   @Override
@@ -141,6 +144,12 @@
 
   @Override
   public boolean next() {
+    recordCount++;
+
+    // Stop after the limit has been reached
+    if (maxRecords >= 1 && recordCount > maxRecords) {
+      return false;
+    }
     return jsonLoader.readBatch();
   }
 
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
index f7db13b..831f6bd 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
@@ -44,6 +44,7 @@
 public class HttpCSVBatchReader extends HttpBatchReader {
   private final HttpSubScan subScan;
   private final CsvParserSettings csvSettings;
+  private final int maxRecords;
   private CsvParser csvReader;
   private List<StringColumnWriter> columnWriters;
   private String[] firstRow;
@@ -55,6 +56,7 @@
   public HttpCSVBatchReader(HttpSubScan subScan) {
     super(subScan);
     this.subScan = subScan;
+    this.maxRecords = subScan.maxRecords();
 
     this.csvSettings = new CsvParserSettings();
     csvSettings.setLineSeparatorDetectionEnabled(true);
@@ -102,6 +104,10 @@
   @Override
   public boolean next() {
     while (!rowWriter.isFull()) {
+      if (rowWriter.limitReached(maxRecords)) {
+        return false;
+      }
+
       if (!processRow()) {
         return false;
       }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
index ff0f7d6..4f2d9e1 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
@@ -47,6 +47,7 @@
   private final Map<String, String> filters;
   private final ScanStats scanStats;
   private final double filterSelectivity;
+  private final int maxRecords;
 
   // Used only in planner, not serialized
   private int hashCode;
@@ -61,6 +62,7 @@
     this.filters = null;
     this.filterSelectivity = 0.0;
     this.scanStats = computeScanStats();
+    this.maxRecords = -1;
   }
 
   /**
@@ -72,6 +74,7 @@
     this.columns = that.columns;
     this.filters = that.filters;
     this.filterSelectivity = that.filterSelectivity;
+    this.maxRecords = that.maxRecords;
 
     // Calcite makes many copies in the later stage of planning
     // without changing anything. Retain the previous stats.
@@ -92,6 +95,7 @@
     this.filters = that.filters;
     this.filterSelectivity = that.filterSelectivity;
     this.scanStats = computeScanStats();
+    this.maxRecords = that.maxRecords;
   }
 
   /**
@@ -107,9 +111,26 @@
     this.filters = filters;
     this.filterSelectivity = filterSelectivity;
     this.scanStats = computeScanStats();
+    this.maxRecords = that.maxRecords;
   }
 
   /**
+   * Adds a limit to the scan.
+   */
+  public HttpGroupScan(HttpGroupScan that, int maxRecords) {
+    super(that);
+    this.columns = that.columns;
+    this.httpScanSpec = that.httpScanSpec;
+
+    // Applies a filter.
+    this.filters = that.filters;
+    this.filterSelectivity = that.filterSelectivity;
+    this.scanStats = computeScanStats();
+    this.maxRecords = maxRecords;
+  }
+
+
+  /**
    * Deserialize a group scan. Not called in normal operation. Probably used
    * only if Drill executes a logical plan.
    */
@@ -118,7 +139,8 @@
     @JsonProperty("columns") List<SchemaPath> columns,
     @JsonProperty("httpScanSpec") HttpScanSpec httpScanSpec,
     @JsonProperty("filters") Map<String, String> filters,
-    @JsonProperty("filterSelectivity") double selectivity
+    @JsonProperty("filterSelectivity") double selectivity,
+    @JsonProperty("maxRecords") int maxRecords
   ) {
     super("no-user");
     this.columns = columns;
@@ -126,6 +148,7 @@
     this.filters = filters;
     this.filterSelectivity = selectivity;
     this.scanStats = computeScanStats();
+    this.maxRecords = maxRecords;
   }
 
   @JsonProperty("columns")
@@ -161,7 +184,7 @@
 
   @Override
   public SubScan getSpecificScan(int minorFragmentId) {
-    return new HttpSubScan(httpScanSpec, columns, filters);
+    return new HttpSubScan(httpScanSpec, columns, filters, maxRecords);
   }
 
   @Override
@@ -175,6 +198,10 @@
     return toString();
   }
 
+  @JsonProperty("maxRecords")
+  public int maxRecords() { return maxRecords; }
+
+
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
@@ -226,11 +253,25 @@
   }
 
   @Override
+  public boolean supportsLimitPushdown() {
+    return true;
+  }
+
+  @Override
+  public GroupScan applyLimit(int maxRecords) {
+    if (maxRecords == this.maxRecords) {
+      return null;
+    }
+    return new HttpGroupScan(this, maxRecords);
+  }
+
+  @Override
   public String toString() {
     return new PlanStringBuilder(this)
       .field("scan spec", httpScanSpec)
       .field("columns", columns)
       .field("filters", filters)
+      .field("maxRecords", maxRecords)
       .toString();
   }
 
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
index 1706c7c..d818824 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
@@ -41,16 +41,20 @@
   private final HttpScanSpec tableSpec;
   private final List<SchemaPath> columns;
   private final Map<String, String> filters;
+  private final int maxRecords;
 
   @JsonCreator
   public HttpSubScan(
     @JsonProperty("tableSpec") HttpScanSpec tableSpec,
     @JsonProperty("columns") List<SchemaPath> columns,
-    @JsonProperty("filters") Map<String, String> filters) {
+    @JsonProperty("filters") Map<String, String> filters,
+    @JsonProperty("maxRecords") int maxRecords
+    ) {
     super("user-if-needed");
     this.tableSpec = tableSpec;
     this.columns = columns;
     this.filters = filters;
+    this.maxRecords = maxRecords;
   }
 
   @JsonProperty("tableSpec")
@@ -68,6 +72,11 @@
     return filters;
   }
 
+  @JsonProperty("maxRecords")
+  public int maxRecords() {
+    return maxRecords;
+  }
+
  @Override
   public <T, X, E extends Throwable> T accept(
    PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
@@ -76,7 +85,7 @@
 
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    return new HttpSubScan(tableSpec, columns, filters);
+    return new HttpSubScan(tableSpec, columns, filters, maxRecords);
   }
 
   @Override
@@ -96,6 +105,7 @@
       .field("tableSpec", tableSpec)
       .field("columns", columns)
       .field("filters", filters)
+      .field("maxRecords", maxRecords)
       .toString();
   }
 
@@ -115,6 +125,7 @@
     HttpSubScan other = (HttpSubScan) obj;
     return Objects.equals(tableSpec, other.tableSpec)
       && Objects.equals(columns, other.columns)
-      && Objects.equals(filters, other.filters);
+      && Objects.equals(filters, other.filters)
+      && Objects.equals(maxRecords, other.maxRecords);
   }
 }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 8918e74..5cb9991 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -206,14 +206,18 @@
   private void setupCache(Builder builder) {

     int cacheSize = 10 * 1024 * 1024;   // TODO Add cache size in MB to config

     File cacheDirectory = new File(tempDir, "http-cache");

-    if (!cacheDirectory.mkdirs()) {

-      throw UserException.dataWriteError()

-        .message("Could not create the HTTP cache directory")

-        .addContext("Path", cacheDirectory.getAbsolutePath())

-        .addContext("Please check the temp directory or disable HTTP caching.")

-        .addContext(errorContext)

-        .build(logger);

+    if (!cacheDirectory.exists()) {

+      if (!cacheDirectory.mkdirs()) {

+        throw UserException

+          .dataWriteError()

+          .message("Could not create the HTTP cache directory")

+          .addContext("Path", cacheDirectory.getAbsolutePath())

+          .addContext("Please check the temp directory or disable HTTP caching.")

+          .addContext(errorContext)

+          .build(logger);

+      }

     }

+

     try {

       Cache cache = new Cache(cacheDirectory, cacheSize);

       logger.debug("Caching HTTP Query Results at: {}", cacheDirectory);

diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 4c20e99..442aa5f 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -452,6 +452,17 @@
   }
 
   @Test
+  public void testLimitPushdown() throws Exception {
+    String sql = "SELECT sunrise, sunset FROM local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1 LIMIT 5";
+
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=5")
+      .match();
+  }
+
+  @Test
   public void testSlowResponse() throws Exception {
     try (MockWebServer server = startServer()) {