DRILL-8287: Add Support for Keyset Based Pagination (#2633)


diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsStoragePluginConfig.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsStoragePluginConfig.java
index 7f2f2f1..61115ed 100644
--- a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsStoragePluginConfig.java
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsStoragePluginConfig.java
@@ -126,7 +126,7 @@
       .build();
   }
 
-  @JsonProperty("clientID")
+  @JsonIgnore
   public String getClientID() {
     if(getOAuthCredentials().isPresent()) {
       return getOAuthCredentials().get().getClientID();
@@ -135,7 +135,7 @@
     }
   }
 
-  @JsonProperty("clientSecret")
+  @JsonIgnore
   public String getClientSecret() {
     if (getOAuthCredentials().isPresent()) {
       return getOAuthCredentials().get().getClientSecret();
diff --git a/contrib/storage-http/Pagination.md b/contrib/storage-http/Pagination.md
index d2206b2..8855fb2 100644
--- a/contrib/storage-http/Pagination.md
+++ b/contrib/storage-http/Pagination.md
@@ -5,7 +5,7 @@
 To use a paginator, you simply have to configure the paginator in the connection for the particular API.  
 
 ## Words of Caution
-While extremely powerful, the auto-pagination feature has the potential to run afoul of APIs rate limits and even potentially DDOS an API. 
+While extremely powerful, the auto-pagination feature has the potential to run afoul of APIs rate limits and even potentially DDoS an API. Please use with extreme care.
 
 
 ## Offset Pagination
@@ -40,3 +40,33 @@
       }
 ```
 In either case, the `pageSize` parameter should be set to the maximum page size allowable by the API.  This will minimize the number of requests Drill is making.
+
+## Index / KeySet Pagination
+Index or KeySet pagination is when the API itself returns values to generate the next page. 
+
+Consider an API that returned data like this: 
+
+```json
+{
+  "companies": [
+    ...
+  ],
+  "has-more": true,
+  "index": 3849945478
+}
+
+```
+In this case, the `has-more` parameter is a boolean value which indicates whether or not there are more pages. The `index` parameter gets appended to the URL in question to generate the next page.
+
+`https://api.myapi.com/paged?properties=name&properties=website&offset=3856722038`
+
+There is a slight variant of this where the API will return the actual URL of the next page.
+
+There are three possible parameters:
+
+* `hasMoreParam`: This is the name of the boolean parameter which indicates whether the API has more pages.
+* `indexParam`:  The parameter name of the key or offset that will be used to generate the next page.
+* `nextPageParam`: The parameter name which returns a complete URL of the next page.
+
+
+** Note: Index / Keyset Pagination is only implemented for APIs that return JSON ** 
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index a6ce240..9915948 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -20,6 +20,7 @@
 import com.typesafe.config.Config;
 import okhttp3.HttpUrl;
 import okhttp3.HttpUrl.Builder;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.ChildErrorContext;
 import org.apache.drill.common.exceptions.CustomErrorContext;
@@ -36,6 +37,8 @@
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
 import org.apache.drill.exec.store.http.HttpApiConfig.PostLocation;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
+import org.apache.drill.exec.store.http.paginator.IndexPaginator;
 import org.apache.drill.exec.store.http.paginator.Paginator;
 import org.apache.drill.exec.store.http.util.HttpProxyConfig;
 import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
@@ -48,6 +51,7 @@
 
 import java.io.File;
 import java.io.InputStream;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -64,15 +68,17 @@
   protected String baseUrl;
   private JsonLoader jsonLoader;
   private ResultSetLoader resultSetLoader;
-
+  private final List<SchemaPath> projectedColumns;
   protected ImplicitColumns implicitColumns;
-
+  private final Map<String, Object> paginationFields;
 
   public HttpBatchReader(HttpSubScan subScan) {
     this.subScan = subScan;
     this.maxRecords = subScan.maxRecords();
     this.baseUrl = subScan.tableSpec().connectionConfig().url();
     this.paginator = null;
+    this.projectedColumns = subScan.columns();
+    this.paginationFields = generatePaginationFieldMap();
   }
 
   public HttpBatchReader(HttpSubScan subScan, Paginator paginator) {
@@ -80,6 +86,8 @@
     this.maxRecords = subScan.maxRecords();
     this.paginator = paginator;
     this.baseUrl = paginator.next();
+    this.projectedColumns = subScan.columns();
+    this.paginationFields = generatePaginationFieldMap();
     logger.debug("Batch reader with URL: {}", this.baseUrl);
   }
 
@@ -130,6 +138,7 @@
           .maxRows(maxRecords)
           .dataPath(subScan.tableSpec().connectionConfig().dataPath())
           .errorContext(errorContext)
+          .listenerColumnMap(paginationFields)
           .fromStream(inStream);
 
       if (subScan.tableSpec().connectionConfig().jsonOptions() != null) {
@@ -208,6 +217,29 @@
     implicitColumns.getColumn(RESPONSE_CODE_FIELD).setValue(http.getResponseCode());
   }
 
+  protected Map<String, Object> generatePaginationFieldMap() {
+    if (paginator == null || paginator.getMode() != PaginatorMethod.INDEX) {
+      return null;
+    }
+
+    Map<String, Object> fieldMap = new HashMap<>();
+    IndexPaginator indexPaginator = (IndexPaginator) paginator;
+
+    // Initialize the pagination field map
+    if (StringUtils.isNotEmpty(indexPaginator.getIndexParam())) {
+      fieldMap.put(indexPaginator.getIndexParam(), null);
+    }
+
+    if (StringUtils.isNotEmpty(indexPaginator.getHasMoreParam())) {
+      fieldMap.put(indexPaginator.getHasMoreParam(), null);
+    }
+
+    if (StringUtils.isNotEmpty(indexPaginator.getNextPageParam())) {
+      fieldMap.put(indexPaginator.getNextPageParam(), null);
+    }
+    return fieldMap;
+  }
+
   protected boolean implicitColumnsAreProjected() {
     List<SchemaPath> columns = subScan.columns();
     for (SchemaPath path : columns) {
@@ -291,13 +323,64 @@
     return builder.build();
   }
 
+  protected void populateIndexPaginator() {
+    IndexPaginator indexPaginator = (IndexPaginator) paginator;
+    // There are two cases, however in both, there is a boolean parameter which indicates
+    // whether there are more pages to follow.  The first case is when the API provides
+    // a URL for the next page and the other is when the API provides an offset which is added
+    // to the next page.
+    //
+    // In ether case, we first need the boolean "has more" parameter so let's grab that.
+    if (paginationFields.get(indexPaginator.getHasMoreParam()) != null) {
+      Object hasMore = paginationFields.get(indexPaginator.getHasMoreParam());
+      boolean hasMoreValues;
+
+      if (hasMore instanceof Boolean) {
+        hasMoreValues = (Boolean) hasMore;
+      } else {
+        String hasMoreString = hasMore.toString();
+        // Attempt to convert to boolean
+        hasMoreValues = Boolean.parseBoolean(hasMoreString);
+      }
+      indexPaginator.setHasMoreValue(hasMoreValues);
+
+      // If there are no more values, notify the paginator
+      if (!hasMoreValues) {
+        paginator.notifyPartialPage();
+      }
+
+      // At this point we know that there are more pages to come.  Send the data to the paginator and
+      // use that to generate the next page.
+      if (StringUtils.isNotEmpty(indexPaginator.getIndexParam())) {
+        indexPaginator.setIndexValue(paginationFields.get(indexPaginator.getIndexParam()).toString());
+      }
+
+      if (StringUtils.isNotEmpty(indexPaginator.getNextPageParam())) {
+        indexPaginator.setNextPageValue(paginationFields.get(indexPaginator.getNextPageParam()).toString());
+      }
+    }
+  }
+
   @Override
   public boolean next() {
     boolean result = jsonLoader.readBatch();
 
-    // Allows limitless pagination.
-    if (paginator != null &&
+    // This code implements the index/keyset pagination.  This pagination method
+    // uses a value returned in the current result set as the starting point for the
+    // next page.  Some APIs will have a boolean parameter to indicate that there are
+    // additional pages.  Some APIs will also return a URL for the next page. In any event,
+    // it is necessary to grab these values from the returned data.
+    if (paginator != null && paginator.getMode() == PaginatorMethod.INDEX) {
+      // First check to see if the limit has been reached.  If so, mark the end of pagination.
+      if (maxRecords > 0 && (resultSetLoader.totalRowCount() > maxRecords)) {
+        // End Pagination
+        paginator.notifyPartialPage();
+      } else {
+        populateIndexPaginator();
+      }
+    } else if (paginator != null &&
       maxRecords < 0 && (resultSetLoader.totalRowCount()) < paginator.getPageSize()) {
+    // Allows limitless pagination. Does not apply for index pagination
       logger.debug("Partially filled page received, ending pagination");
       paginator.notifyPartialPage();
     }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
index 3da54d0..e22a1b0 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
@@ -26,6 +26,7 @@
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.expression.SchemaPath;
 
@@ -38,6 +39,7 @@
 import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
 import org.apache.drill.exec.store.http.util.SimpleHttp;
 import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.metastore.metadata.TableMetadata;
@@ -99,8 +101,8 @@
    */
   public HttpGroupScan(HttpGroupScan that, List<SchemaPath> columns) {
     super(that);
-    this.columns = columns;
     this.httpScanSpec = that.httpScanSpec;
+    this.columns = columns;
 
     // Oddly called later in planning, after earlier assigning columns,
     // to again assign columns. Retain filters, but compute new stats.
@@ -112,6 +114,41 @@
     this.maxRecords = that.maxRecords;
   }
 
+  private List<SchemaPath> addColumnsToSchemaPath(List<SchemaPath> columns) {
+    // This function handles the case when the pagination columns are not projected.
+    // This logic adds these columns to the projection list to ensure they are projected
+    // This is only relevant for index pagination.
+
+    // First check to see whether this is an index paginator or not, and all the fields are populated.
+    if (httpScanSpec.connectionConfig().paginator() != null &&
+      httpScanSpec.connectionConfig().paginator().getMethodType() != PaginatorMethod.INDEX &&
+      StringUtils.isEmpty(httpScanSpec.connectionConfig().dataPath())) {
+      return columns;
+    }
+
+    // Next, if the query is a star query, we don't need to modify anything.
+    if (columns == ALL_COLUMNS && StringUtils.isEmpty(httpScanSpec.connectionConfig().dataPath())) {
+      return columns;
+    }
+
+    HttpPaginatorConfig paginatorConfig = httpScanSpec.connectionConfig().paginator();
+
+    if (StringUtils.isNotEmpty(paginatorConfig.hasMoreParam())) {
+      columns.add(SchemaPath.parseFromString(cleanUpColumnName(paginatorConfig.hasMoreParam())));
+    }
+
+    if (StringUtils.isNotEmpty(paginatorConfig.indexParam())) {
+      columns.add(SchemaPath.parseFromString(cleanUpColumnName(paginatorConfig.indexParam())));
+    }
+
+    if (StringUtils.isNotEmpty(paginatorConfig.nextPageParam())) {
+      columns.add(SchemaPath.parseFromString(cleanUpColumnName(paginatorConfig.nextPageParam())));
+    }
+
+    return columns;
+  }
+
+
   /**
    * Adds a filter to the scan.
    */
@@ -344,6 +381,18 @@
     return (getHttpConfig().params() != null) || SimpleHttp.hasURLParameters(getHttpConfig().getHttpUrl());
   }
 
+  private String cleanUpColumnName(String columnName) {
+    if (! columnName.startsWith("`")) {
+      columnName = "`" + columnName;
+    }
+
+    if (! columnName.endsWith("`")) {
+      columnName = columnName + "`";
+    }
+
+    return columnName;
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
index 9c1a5b9..ad474f9 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
@@ -33,7 +33,7 @@
 
 
 @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-@JsonDeserialize(builder = HttpPaginatorConfig.HttpPaginatorBuilder.class)
+@JsonDeserialize(builder = HttpPaginatorConfig.HttpPaginatorConfigBuilder.class)
 public class HttpPaginatorConfig {
 
   private static final Logger logger = LoggerFactory.getLogger(HttpPaginatorConfig.class);
@@ -61,6 +61,16 @@
   @JsonProperty
   private final String method;
 
+  // For index/keyset pagination
+  @JsonProperty
+  private final String hasMoreParam;
+
+  @JsonProperty
+  private final String indexParam;
+
+  @JsonProperty
+  private final String nextPageParam;
+
   public HttpPaginatorConfig(HttpPaginatorConfigBuilder builder) {
     this.limitParam = builder.limitParam;
     this.offsetParam = builder.offsetParam;
@@ -68,7 +78,72 @@
     this.pageSizeParam = builder.pageSizeParam;
     this.pageSize = builder.pageSize;
     this.maxRecords = builder.maxRecords;
-    this.method = builder.method;
+    this.hasMoreParam = builder.hasMoreParam;
+    this.indexParam = builder.indexParam;
+    this.nextPageParam = builder.nextPageParam;
+
+    this.method = StringUtils.isEmpty(builder.method)
+      ? PaginatorMethod.OFFSET.toString() : builder.method.trim().toUpperCase();
+
+    PaginatorMethod paginatorMethod = PaginatorMethod.valueOf(this.method);
+
+    /*
+     * For pagination to function key fields must be defined.  This block validates the required fields for
+     * each type of paginator.
+     */
+    switch (paginatorMethod) {
+      case OFFSET:
+        if (StringUtils.isEmpty(this.limitParam) || StringUtils.isEmpty(this.offsetParam)) {
+          throw UserException
+            .validationError()
+            .message("Invalid paginator configuration.  For OFFSET pagination, limitField and offsetField must be defined.")
+            .build(logger);
+        } else if (this.pageSize <= 0) {
+          throw UserException
+            .validationError()
+            .message("Invalid paginator configuration.  For OFFSET pagination, maxPageSize must be defined and greater than zero.")
+            .build(logger);
+        }
+        break;
+      case PAGE:
+        if (StringUtils.isEmpty(this.pageParam) || StringUtils.isEmpty(this.pageSizeParam)) {
+          throw UserException
+            .validationError()
+            .message("Invalid paginator configuration.  For PAGE pagination, pageField and pageSizeField must be defined.")
+            .build(logger);
+        } else if (this.pageSize <= 0) {
+          throw UserException
+            .validationError()
+            .message("Invalid paginator configuration.  For PAGE pagination, maxPageSize must be defined and greater than zero.")
+            .build(logger);
+        }
+        break;
+      case INDEX:
+        // Either the nextPageParam OR the indexParam must be populated
+        if (StringUtils.isEmpty(this.hasMoreParam)) {
+          throw UserException
+            .validationError()
+            .message("Invalid paginator configuration.  For INDEX pagination, the hasMoreParam must be defined.")
+            .build(logger);
+        } else if ((StringUtils.isEmpty(this.nextPageParam) && StringUtils.isNotEmpty(this.indexParam)) &&
+          (StringUtils.isNotEmpty(this.nextPageParam) && StringUtils.isEmpty(this.indexParam))) {
+          throw UserException
+            .validationError()
+            .message("Invalid paginator configuration.  For INDEX pagination, the nextPageParam or indexParam must be defined.")
+            .build(logger);
+        } else if (StringUtils.isEmpty(this.nextPageParam) && StringUtils.isEmpty(this.indexParam)) {
+          throw UserException
+            .validationError()
+            .message("Invalid paginator configuration.  For INDEX pagination, the nextPageParam or indexParam must be defined.")
+            .build(logger);
+        }
+        break;
+      default:
+        throw UserException
+          .validationError()
+          .message("Invalid paginator method: %s.  Drill supports 'OFFSET', 'INDEX' and 'PAGE'", method)
+          .build(logger);
+    }
   }
 
   public static HttpPaginatorConfigBuilder builder() {
@@ -103,6 +178,17 @@
     return this.method;
   }
 
+  public String hasMoreParam() {
+    return hasMoreParam;
+  }
+
+  public String nextPageParam() {
+    return nextPageParam;
+  }
+  public String indexParam() {
+    return indexParam;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -118,13 +204,16 @@
       && Objects.equals(offsetParam, that.offsetParam)
       && Objects.equals(pageParam, that.pageParam)
       && Objects.equals(pageSizeParam, that.pageSizeParam)
-      && Objects.equals(method, that.method);
+      && Objects.equals(method, that.method)
+      && Objects.equals(hasMoreParam, that.hasMoreParam)
+      && Objects.equals(indexParam, that.indexParam)
+      && Objects.equals(nextPageParam, that.nextPageParam);
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(limitParam, offsetParam, pageParam, pageSizeParam,
-      pageSize, maxRecords, method);
+      pageSize, maxRecords, method, nextPageParam, indexParam, hasMoreParam);
   }
 
   @Override
@@ -137,64 +226,16 @@
       .field("pageSize", pageSize)
       .field("maxRecords", maxRecords)
       .field("method", method)
+      .field("indexParam", indexParam)
+      .field("hasMoreParam", hasMoreParam)
+      .field("nextPageParam", nextPageParam)
       .toString();
   }
 
   public enum PaginatorMethod {
     OFFSET,
-    PAGE
-  }
-
-  private HttpPaginatorConfig(HttpPaginatorConfig.HttpPaginatorBuilder builder) {
-    this.limitParam = builder.limitParam;
-    this.offsetParam = builder.offsetParam;
-    this.pageSize = builder.pageSize;
-    this.pageParam = builder.pageParam;
-    this.pageSizeParam = builder.pageSizeParam;
-    this.maxRecords = builder.maxRecords;
-
-    this.method = StringUtils.isEmpty(builder.method)
-      ? PaginatorMethod.OFFSET.toString() : builder.method.trim().toUpperCase();
-
-    PaginatorMethod paginatorMethod = PaginatorMethod.valueOf(this.method);
-
-    /*
-    * For pagination to function key fields must be defined.  This block validates the required fields for
-    * each type of paginator.
-     */
-    switch (paginatorMethod) {
-      case OFFSET:
-        if (StringUtils.isEmpty(this.limitParam) || StringUtils.isEmpty(this.offsetParam)) {
-          throw UserException
-            .validationError()
-            .message("Invalid paginator configuration.  For OFFSET pagination, limitField and offsetField must be defined.")
-            .build(logger);
-        } else if (this.pageSize <= 0) {
-          throw UserException
-            .validationError()
-            .message("Invalid paginator configuration.  For OFFSET pagination, maxPageSize must be defined and greater than zero.")
-            .build(logger);
-        }
-        break;
-      case PAGE:
-        if (StringUtils.isEmpty(this.pageParam) || StringUtils.isEmpty(this.pageSizeParam)) {
-          throw UserException
-            .validationError()
-            .message("Invalid paginator configuration.  For PAGE pagination, pageField and pageSizeField must be defined.")
-            .build(logger);
-        } else if (this.pageSize <= 0) {
-          throw UserException
-            .validationError()
-            .message("Invalid paginator configuration.  For PAGE pagination, maxPageSize must be defined and greater than zero.")
-            .build(logger);
-        }
-        break;
-      default:
-        throw UserException
-          .validationError()
-          .message("Invalid paginator method: %s.  Drill supports 'OFFSET' and 'PAGE'", method)
-          .build(logger);
-    }
+    PAGE,
+    INDEX
   }
 
   @JsonIgnore
@@ -203,103 +244,32 @@
   }
 
   @JsonPOJOBuilder(withPrefix = "")
-  public static class HttpPaginatorBuilder {
-    public String limitParam;
-
-    public String offsetParam;
-
-    public int maxRecords;
-
-    public int pageSize;
-
-    public String pageParam;
-
-    public String pageSizeParam;
-
-    public String method;
-
-    public HttpPaginatorConfig build() {
-      return new HttpPaginatorConfig(this);
-    }
-
-    public String limitParam() {
-      return this.limitParam;
-    }
-
-    public String offsetParam() {
-      return this.offsetParam;
-    }
-
-    public int maxRecords() {
-      return this.maxRecords;
-    }
-
-    public int pageSize() {
-      return this.pageSize;
-    }
-
-    public String pageParam() {
-      return this.pageParam;
-    }
-
-    public String pageSizeParam() {
-      return this.pageSizeParam;
-    }
-
-    public String method() {
-      return this.method;
-    }
-
-    public HttpPaginatorBuilder limitParam(String limitParam) {
-      this.limitParam = limitParam;
-      return this;
-    }
-
-    public HttpPaginatorBuilder offsetParam(String offsetParam) {
-      this.offsetParam = offsetParam;
-      return this;
-    }
-
-    public HttpPaginatorBuilder maxRecords(int maxRecords) {
-      this.maxRecords = maxRecords;
-      return this;
-    }
-
-    public HttpPaginatorBuilder pageSize(int pageSize) {
-      this.pageSize = pageSize;
-      return this;
-    }
-
-    public HttpPaginatorBuilder pageParam(String pageParam) {
-      this.pageParam = pageParam;
-      return this;
-    }
-
-    public HttpPaginatorBuilder pageSizeParam(String pageSizeParam) {
-      this.pageSizeParam = pageSizeParam;
-      return this;
-    }
-
-    public HttpPaginatorBuilder method(String method) {
-      this.method = method;
-      return this;
-    }
-  }
-
   public static class HttpPaginatorConfigBuilder {
     private String limitParam;
-
     private String offsetParam;
-
     private String pageParam;
-
     private String pageSizeParam;
-
     private int pageSize;
-
     private int maxRecords;
-
     private String method;
+    private String hasMoreParam;
+    private String indexParam;
+    private String nextPageParam;
+
+    public HttpPaginatorConfigBuilder hasMoreParam(String hasMoreParam) {
+      this.hasMoreParam = hasMoreParam;
+      return this;
+    }
+
+    public HttpPaginatorConfigBuilder indexParam(String indexParam) {
+      this.indexParam = indexParam;
+      return this;
+    }
+
+    public HttpPaginatorConfigBuilder nextPageParam(String nextPageParam) {
+      this.nextPageParam = nextPageParam;
+      return this;
+    }
 
     public HttpPaginatorConfigBuilder limitParam(String limitParam) {
       this.limitParam = limitParam;
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
index 57797b3..edc7dee 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
@@ -34,6 +34,7 @@
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
+import org.apache.drill.exec.store.http.paginator.IndexPaginator;
 import org.apache.drill.exec.store.http.paginator.OffsetPaginator;
 import org.apache.drill.exec.store.http.paginator.PagePaginator;
 import org.apache.drill.exec.store.http.paginator.Paginator;
@@ -136,6 +137,12 @@
           paginatorConfig.pageSize(),
           paginatorConfig.pageParam(),
           paginatorConfig.pageSizeParam());
+      } else if (paginatorConfig.getMethodType() == PaginatorMethod.INDEX) {
+        paginator = new IndexPaginator(urlBuilder,
+          subScan.maxRecords(),
+          0, paginatorConfig.hasMoreParam(),
+          paginatorConfig.indexParam(),
+          paginatorConfig.nextPageParam());
       }
       return paginator;
     }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/IndexPaginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/IndexPaginator.java
new file mode 100644
index 0000000..22aa531
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/IndexPaginator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http.paginator;
+
+import okhttp3.HttpUrl.Builder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.NoSuchElementException;
+
+public class IndexPaginator extends Paginator {
+
+  private final String hasMoreParam;
+  private final String indexParam;
+  private final String nextPageParam;
+
+  private String indexValue;
+  private Boolean hasMoreValue;
+  private String nextPageValue;
+  private int pageCount;
+
+  public IndexPaginator(Builder builder, int pageSize, int limit, String hasMoreParam, String indexParam, String nextPageParam) {
+    super(builder, PaginatorMethod.INDEX, pageSize, limit);
+    this.hasMoreParam = hasMoreParam;
+    this.indexParam = indexParam;
+    this.nextPageParam = nextPageParam;
+    pageCount = 0;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return !partialPageReceived;
+
+  }
+
+  public String getIndexParam() {
+    return this.indexParam;
+  }
+
+  public String getHasMoreParam() {
+    return this.hasMoreParam;
+  }
+
+  public String getNextPageParam() {
+    return this.nextPageParam;
+  }
+
+  public void setHasMoreValue(Boolean hasMoreValue) {
+    this.hasMoreValue = hasMoreValue;
+  }
+
+  public void setIndexValue(String indexValue) {
+    if (StringUtils.isNumeric(indexValue)) {
+
+    }
+    this.indexValue = indexValue;
+  }
+
+  public void setNextPageValue(String nextPageValue) {
+    this.nextPageValue = nextPageValue;
+  }
+
+  public boolean isFirstPage() {
+    return pageCount < 1;
+  }
+
+  @Override
+  public String next() {
+    // If the paginator has never been run before, just return the base URL.
+    if (pageCount == 0) {
+      pageCount++;
+      return builder.build().url().toString();
+    }
+
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+
+    if (StringUtils.isNotEmpty(nextPageValue)) {
+      // TODO figure this out...
+    } else if (StringUtils.isNotEmpty(indexValue)) {
+      try {
+        indexValue = URLEncoder.encode(indexValue, "UTF-8");
+      } catch (UnsupportedEncodingException e) {
+        // Do nothing.
+      }
+      builder.removeAllEncodedQueryParameters(indexParam);
+      builder.addQueryParameter(indexParam, indexValue);
+    }
+    pageCount++;
+    return builder.build().url().toString();
+  }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/OffsetPaginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/OffsetPaginator.java
index 22a0233..eed3c23 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/OffsetPaginator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/OffsetPaginator.java
@@ -20,6 +20,7 @@
 
 import okhttp3.HttpUrl.Builder;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +47,7 @@
    * @param offsetParam The field name which corresponds to the offset field from the API
    */
   public OffsetPaginator(Builder builder, int limit, int pageSize, String limitParam, String offsetParam) {
-    super(builder, paginationMode.OFFSET, pageSize, limit);
+    super(builder, PaginatorMethod.OFFSET, pageSize, limit);
     this.limit = limit;
     this.limitParam = limitParam;
     this.offsetParam = offsetParam;
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
index 16746c1..b640d3a 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
@@ -20,6 +20,7 @@
 
 import okhttp3.HttpUrl.Builder;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +45,7 @@
    * @param pageSizeParam The API Query parameter which specifies how many results per page
    */
   public PagePaginator(Builder builder, int limit, int pageSize, String pageParam, String pageSizeParam) {
-    super(builder, paginationMode.PAGE, pageSize, limit);
+    super(builder, PaginatorMethod.PAGE, pageSize, limit);
     this.limit = limit;
     this.pageParam = pageParam;
     this.pageSizeParam = pageSizeParam;
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/Paginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/Paginator.java
index eb1f1e7..f7489a7 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/Paginator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/Paginator.java
@@ -19,6 +19,7 @@
 package org.apache.drill.exec.store.http.paginator;
 
 import okhttp3.HttpUrl.Builder;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,17 +42,12 @@
   private static final int MAX_ATTEMPTS = 100;
   protected final int pageSize;
 
-  public enum paginationMode {
-    OFFSET,
-    PAGE
-  }
-
-  protected final paginationMode MODE;
+  protected final PaginatorMethod MODE;
   protected final int limit;
   protected boolean partialPageReceived;
   protected Builder builder;
 
-  public Paginator(Builder builder, paginationMode mode, int pageSize, int limit) {
+  public Paginator(Builder builder, PaginatorMethod mode, int pageSize, int limit) {
     this.MODE = mode;
     this.builder = builder;
     this.pageSize = pageSize;
@@ -78,4 +74,8 @@
   }
 
   public int getPageSize() { return pageSize; }
+
+  public PaginatorMethod getMode() {
+    return MODE;
+  }
 }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
index b8c5972..328a447 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
@@ -60,6 +60,12 @@
   private static String TEST_JSON_PAGE1;
   private static String TEST_JSON_PAGE2;
   private static String TEST_JSON_PAGE3;
+
+  private static String TEST_JSON_INDEX_PAGE1;
+  private static String TEST_JSON_INDEX_PAGE2;
+  private static String TEST_JSON_INDEX_PAGE3;
+  private static String TEST_JSON_INDEX_PAGE4;
+
   private static String TEST_XML_PAGE1;
   private static String TEST_XML_PAGE2;
   private static String TEST_XML_PAGE3;
@@ -77,6 +83,12 @@
     TEST_JSON_PAGE2 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/p2.json"), Charsets.UTF_8).read();
     TEST_JSON_PAGE3 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/p3.json"), Charsets.UTF_8).read();
 
+    TEST_JSON_INDEX_PAGE1 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/index_response1.json"), Charsets.UTF_8).read();
+    TEST_JSON_INDEX_PAGE2 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/index_response2.json"), Charsets.UTF_8).read();
+
+    TEST_JSON_INDEX_PAGE3 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/index_response3.json"), Charsets.UTF_8).read();
+    TEST_JSON_INDEX_PAGE4 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/index_response4.json"), Charsets.UTF_8).read();
+
     TEST_XML_PAGE1 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response_1.xml"), Charsets.UTF_8).read();
     TEST_XML_PAGE2 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response_2.xml"), Charsets.UTF_8).read();
     TEST_XML_PAGE3 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response_3.xml"), Charsets.UTF_8).read();
@@ -140,6 +152,32 @@
       .pageSize(2)
       .build();
 
+    HttpPaginatorConfig indexPaginator = HttpPaginatorConfig.builder()
+      .indexParam("offset")
+      .hasMoreParam("has-more")
+      .method("index")
+      .build();
+
+    HttpApiConfig mockJsonConfigWithKeyset = HttpApiConfig.builder()
+      .url("http://localhost:8092/json")
+      .method("get")
+      .headers(headers)
+      .requireTail(false)
+      .paginator(indexPaginator)
+      .inputType("json")
+      .build();
+
+    HttpApiConfig mockJsonConfigWithKeysetAndDataPath = HttpApiConfig.builder()
+      .url("http://localhost:8092/json")
+      .method("get")
+      .headers(headers)
+      .requireTail(false)
+      .dataPath("companies")
+      .paginator(indexPaginator)
+      .inputType("json")
+      .build();
+
+
     HttpApiConfig mockJsonConfigWithPaginator = HttpApiConfig.builder()
       .url("http://localhost:8092/json")
       .method("get")
@@ -192,6 +230,8 @@
 
     Map<String, HttpApiConfig> configs = new HashMap<>();
     configs.put("csv_paginator", mockCsvConfigWithPaginator);
+    configs.put("json_index", mockJsonConfigWithKeyset);
+    configs.put("json_index_datapath", mockJsonConfigWithKeysetAndDataPath);
     configs.put("json_paginator", mockJsonConfigWithPaginator);
     configs.put("xml_paginator", mockXmlConfigWithPaginator);
     configs.put("xml_paginator_url_params", mockXmlConfigWithPaginatorAndUrlParams);
@@ -244,6 +284,72 @@
   }
 
   @Test
+  public void simpleJSONIndexQuery() throws Exception {
+    String sql = "SELECT * FROM `local`.`json_index` LIMIT 4";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_INDEX_PAGE1));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_INDEX_PAGE2));
+
+      List<QueryDataBatch> results = client.queryBuilder()
+        .sql(sql)
+        .results();
+
+      int count = 0;
+      for(QueryDataBatch b : results){
+        count += b.getHeader().getRowCount();
+        b.release();
+      }
+      assertEquals(2, results.size());
+      assertEquals(2, count);
+    }
+  }
+
+  @Test
+  public void simpleJSONIndexQueryWithProjectedColumns() throws Exception {
+    String sql = "SELECT companies FROM `local`.`json_index` LIMIT 4";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_INDEX_PAGE1));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_INDEX_PAGE2));
+
+      List<QueryDataBatch> results = client.queryBuilder()
+        .sql(sql)
+        .results();
+
+      int count = 0;
+      for(QueryDataBatch b : results){
+        count += b.getHeader().getRowCount();
+        b.release();
+      }
+      assertEquals(2, results.size());
+      assertEquals(2, count);
+    }
+  }
+
+  @Test
+  public void simpleJSONIndexQueryAndDataPath() throws Exception {
+    String sql = "SELECT * FROM `local`.`json_index_datapath` LIMIT 4";
+    try (MockWebServer server = startServer()) {
+
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_INDEX_PAGE3));
+      server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_INDEX_PAGE4));
+
+      List<QueryDataBatch> results = client.queryBuilder()
+        .sql(sql)
+        .results();
+
+      int count = 0;
+      for(QueryDataBatch b : results){
+        count += b.getHeader().getRowCount();
+        b.release();
+      }
+      assertEquals(2, results.size());
+      assertEquals(4, count);
+    }
+  }
+
+  @Test
   public void simpleJSONPaginatorQueryWithoutLimit() throws Exception {
     String sql = "SELECT * FROM `local`.`json_paginator`";
     try (MockWebServer server = startServer()) {
diff --git a/contrib/storage-http/src/test/resources/data/index_response1.json b/contrib/storage-http/src/test/resources/data/index_response1.json
new file mode 100644
index 0000000..708018b
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/index_response1.json
@@ -0,0 +1,96 @@
+{
+  "companies": [
+    {
+      "portalId": 62515,
+      "additionalDomains": [
+
+      ],
+      "properties": {
+        "website": {
+          "sourceId": null,
+          "timestamp": 1457513066540,
+          "versions": [
+            {
+              "timestamp": 1457513066540,
+              "sourceVid": [
+
+              ],
+              "name": "website",
+              "value": "example.com",
+              "source": "COMPANIES"
+            }
+          ],
+          "value": "example.com",
+          "source": "COMPANIES"
+        },
+        "name": {
+          "sourceId": "name",
+          "timestamp": 1464484587592,
+          "versions": [
+            {
+              "name": "name",
+              "sourceId": "name",
+              "timestamp": 1464484587592,
+              "value": "Example Company",
+              "source": "BIDEN",
+              "sourceVid": [
+
+              ]
+            }
+          ],
+          "value": "Example Company",
+          "source": "BIDEN"
+        }
+      },
+      "isDeleted": false,
+      "companyId": 115200636
+    },
+    {
+      "portalId": 62515,
+      "additionalDomains": [
+
+      ],
+      "properties": {
+        "website": {
+          "sourceId": null,
+          "timestamp": 1457535205549,
+          "versions": [
+            {
+              "timestamp": 1457535205549,
+              "sourceVid": [
+
+              ],
+              "name": "website",
+              "value": "test.com",
+              "source": "COMPANIES"
+            }
+          ],
+          "value": "test.com",
+          "source": "COMPANIES"
+        },
+        "name": {
+          "sourceId": "name",
+          "timestamp": 1468832771769,
+          "versions": [
+            {
+              "name": "name",
+              "sourceId": "name",
+              "timestamp": 1468832771769,
+              "value": "Test Company",
+              "source": "BIDEN",
+              "sourceVid": [
+
+              ]
+            }
+          ],
+          "value": "Test Company",
+          "source": "BIDEN"
+        }
+      },
+      "isDeleted": false,
+      "companyId": 115279791
+    }
+  ],
+  "has-more": true,
+  "offset": 3849945478
+}
diff --git a/contrib/storage-http/src/test/resources/data/index_response2.json b/contrib/storage-http/src/test/resources/data/index_response2.json
new file mode 100644
index 0000000..c13acaa
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/index_response2.json
@@ -0,0 +1,96 @@
+{
+  "companies": [
+    {
+      "portalId": 62515,
+      "additionalDomains": [
+
+      ],
+      "properties": {
+        "website": {
+          "sourceId": null,
+          "timestamp": 1457513066540,
+          "versions": [
+            {
+              "timestamp": 1457513066540,
+              "sourceVid": [
+
+              ],
+              "name": "website",
+              "value": "example.com",
+              "source": "COMPANIES"
+            }
+          ],
+          "value": "example.com",
+          "source": "COMPANIES"
+        },
+        "name": {
+          "sourceId": "name",
+          "timestamp": 1464484587592,
+          "versions": [
+            {
+              "name": "name",
+              "sourceId": "name",
+              "timestamp": 1464484587592,
+              "value": "Example Company",
+              "source": "BIDEN",
+              "sourceVid": [
+
+              ]
+            }
+          ],
+          "value": "Example Company",
+          "source": "BIDEN"
+        }
+      },
+      "isDeleted": false,
+      "companyId": 115200636
+    },
+    {
+      "portalId": 62515,
+      "additionalDomains": [
+
+      ],
+      "properties": {
+        "website": {
+          "sourceId": null,
+          "timestamp": 1457535205549,
+          "versions": [
+            {
+              "timestamp": 1457535205549,
+              "sourceVid": [
+
+              ],
+              "name": "website",
+              "value": "test.com",
+              "source": "COMPANIES"
+            }
+          ],
+          "value": "test.com",
+          "source": "COMPANIES"
+        },
+        "name": {
+          "sourceId": "name",
+          "timestamp": 1468832771769,
+          "versions": [
+            {
+              "name": "name",
+              "sourceId": "name",
+              "timestamp": 1468832771769,
+              "value": "Test Company",
+              "source": "BIDEN",
+              "sourceVid": [
+
+              ]
+            }
+          ],
+          "value": "Test Company",
+          "source": "BIDEN"
+        }
+      },
+      "isDeleted": false,
+      "companyId": 115279791
+    }
+  ],
+  "has-more": false,
+  "offset": 115279791
+}
diff --git a/contrib/storage-http/src/test/resources/data/index_response3.json b/contrib/storage-http/src/test/resources/data/index_response3.json
new file mode 100644
index 0000000..6377343
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/index_response3.json
@@ -0,0 +1,14 @@
+{
+  "has-more": true,
+  "offset": 115279791,
+  "companies": [
+    {
+      "portalId": 62515,
+      "companyId": 115200636
+    },
+    {
+      "portalId": 62515,
+      "companyId": 115279791
+    }
+  ]
+}
diff --git a/contrib/storage-http/src/test/resources/data/index_response4.json b/contrib/storage-http/src/test/resources/data/index_response4.json
new file mode 100644
index 0000000..319858b
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/index_response4.json
@@ -0,0 +1,14 @@
+{
+  "has-more": false,
+  "offset": 115279791,
+  "companies": [
+    {
+      "portalId": 62515,
+      "companyId": 115200636
+    },
+    {
+      "portalId": 62515,
+      "companyId": 115279791
+    }
+  ]
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
index b52b116..b3f9f28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
@@ -17,7 +17,9 @@
  */
 package org.apache.drill.exec.physical.impl.scan;
 
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.store.RecordReader;
 
 /**
  * Extended version of a record reader used by the revised
@@ -78,7 +80,7 @@
  * don't worry about it.
  * <p>
  * If an error occurs, the reader can throw a {@link RuntimeException}
- * from any method. A {@link UserException} is preferred to provide
+ * from any method. A {@link org.apache.drill.common.exceptions.UserException} is preferred to provide
  * detailed information about the source of the problem.
  */
 public interface RowBatchReader {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index 7e9d9d5..5ac3a59 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -25,6 +25,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import io.netty.buffer.DrillBuf;
 import org.apache.commons.io.IOUtils;
@@ -153,6 +154,7 @@
     private MessageParser messageParser;
     private ImplicitColumns implicitFields;
     private int maxRows;
+    private Map<String, Object> listenerColumnMap;
 
     public JsonLoaderBuilder resultSetLoader(ResultSetLoader rsLoader) {
       this.rsLoader = rsLoader;
@@ -164,6 +166,11 @@
       return this;
     }
 
+    public JsonLoaderBuilder listenerColumnMap(Map<String,Object> listenerColumnMap) {
+      this.listenerColumnMap = listenerColumnMap;
+      return this;
+    }
+
     public JsonLoaderBuilder standardOptions(OptionSet optionSet) {
       this.options = new JsonLoaderOptions(optionSet);
       return this;
@@ -246,6 +253,7 @@
   private final JsonStructureParser parser;
   private final FieldFactory fieldFactory;
   private final ImplicitColumns implicitFields;
+  private final Map<String, Object> listenerColumnMap;
   private final int maxRows;
   private boolean eof;
 
@@ -268,6 +276,7 @@
     this.implicitFields = builder.implicitFields;
     this.maxRows = builder.maxRows;
     this.fieldFactory = buildFieldFactory(builder);
+    this.listenerColumnMap = builder.listenerColumnMap;
     this.parser = buildParser(builder);
   }
 
@@ -279,6 +288,7 @@
             .parserFactory(parser ->
                 new TupleParser(parser, JsonLoaderImpl.this, rsLoader.writer(), builder.providedSchema))
             .errorFactory(this)
+            .listenerColumnMap(listenerColumnMap)
             .messageParser(builder.messageParser)
             .dataPath(builder.dataPath)
             .build();
@@ -298,6 +308,9 @@
   public JsonLoaderOptions options() { return options; }
   public JsonStructureParser parser() { return parser; }
   public FieldFactory fieldFactory() { return fieldFactory; }
+  public Map<String, Object> listenerColumnMap() {
+    return listenerColumnMap;
+  }
 
   @Override // JsonLoader
   public boolean readBatch() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/TupleParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/TupleParser.java
index 100ddc3..bf65bb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/TupleParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/TupleParser.java
@@ -127,10 +127,19 @@
 
   @Override
   public ElementParser onField(String key, TokenIterator tokenizer) {
-    if (!tupleWriter.isProjected(key)) {
-      return fieldFactory().ignoredFieldParser();
-    } else {
+    if (projectField(key)) {
       return fieldParserFor(key, tokenizer);
+    } else {
+      return fieldFactory().ignoredFieldParser();
+    }
+  }
+
+  private boolean projectField(String key) {
+    // This method makes sure that fields necessary for column listeners are read.
+    if (tupleWriter.isProjected(key)) {
+      return true;
+    } else {
+      return loader.listenerColumnMap() != null && loader.listenerColumnMap().containsKey(key);
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java
index 59aecf9..60ccbf9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java
@@ -43,6 +43,20 @@
     }
   }
 
+  public JsonToken parseAndReturnToken(TokenIterator tokenizer) {
+    JsonToken token = tokenizer.requireNext();
+    switch (token) {
+      case START_ARRAY:
+      case START_OBJECT:
+        parseTail(tokenizer);
+        break;
+
+      default:
+        break;
+    }
+    return token;
+  }
+
   private void parseTail(TokenIterator tokenizer) {
 
     // Parse (field: value)* }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
index 98163fd..03fa51b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
@@ -21,6 +21,7 @@
 import java.io.InputStream;
 import java.io.Reader;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.function.Function;
 
 import org.apache.commons.collections4.IterableUtils;
@@ -83,6 +84,7 @@
     private ErrorFactory errorFactory;
     private String dataPath;
     private MessageParser messageParser;
+    private Map<String, Object> listenerColumnMap;
 
     public JsonStructureParserBuilder options(JsonStructureOptions options) {
       this.options = options;
@@ -125,13 +127,18 @@
       return this;
     }
 
+    public JsonStructureParserBuilder listenerColumnMap(Map<String, Object> listenerColumnMap) {
+      this.listenerColumnMap = listenerColumnMap;
+      return this;
+    }
+
     public JsonStructureParser build() {
       if (dataPath != null) {
         dataPath = dataPath.trim();
         dataPath = dataPath.isEmpty() ? null : dataPath;
       }
       if (dataPath != null && messageParser == null) {
-        messageParser = new SimpleMessageParser(dataPath);
+        messageParser = new SimpleMessageParser(dataPath, listenerColumnMap);
       }
       return new JsonStructureParser(this);
     }
@@ -142,6 +149,7 @@
   private final TokenIterator tokenizer;
   private final RootParser rootState;
   private final FieldParserFactory fieldFactory;
+  private final Map<String, Object> listenerColumnMap;
   private int errorRecoveryCount;
 
   /**
@@ -167,6 +175,8 @@
     fieldFactory = new FieldParserFactory(this,
         Preconditions.checkNotNull(builder.parserFactory));
 
+    this.listenerColumnMap = builder.listenerColumnMap;
+
     // Parse to the start of the data object(s), and create a root
     // state to parse objects and watch for the end of data.
     // The root state parses one object on each next() call.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
index f100a8f..3db8a4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
@@ -21,6 +21,8 @@
 
 import com.fasterxml.jackson.core.JsonToken;
 
+import java.util.Map;
+
 /**
  * A message parser which accepts a path to the data encoded as a
  * slash-separated string. Given the following JSON message:
@@ -66,11 +68,13 @@
 public class SimpleMessageParser implements MessageParser {
 
   private final String[] path;
+  private final Map<String, Object> listenerColumnMap;
 
-  public SimpleMessageParser(String dataPath) {
+  public SimpleMessageParser(String dataPath, Map<String, Object> listenerColumnMap) {
     path = dataPath.split("/");
     Preconditions.checkArgument(path.length > 0,
         "Data path should not be empty.");
+    this.listenerColumnMap = listenerColumnMap;
   }
 
   @Override
@@ -102,6 +106,8 @@
       String fieldName = tokenizer.textValue();
       if (fieldName.equals(path[level])) {
         return parseInnerLevel(tokenizer, level);
+      } else if (listenerColumnMap != null && listenerColumnMap.containsKey(fieldName)) {
+        skipElementButRetainValue(tokenizer, fieldName);
       } else {
         skipElement(tokenizer);
       }
@@ -129,6 +135,44 @@
     return parseToElement(tokenizer, level + 1);
   }
 
+  /**
+   * This function is called when a storage plugin needs to retrieve values which have been read.  This logic
+   * enables use of the data path in these situations.  Normally, when the datapath is defined, the JSON reader
+   * will "free-wheel" over unprojected columns or columns outside of the datapath.  However, in this case, often
+   * the values which are being read, are outside the dataPath.  This logic offers a way to capture these values
+   * without creating a ValueVector for them.
+   *
+   * @param tokenizer A {@link TokenIterator} of the parsed JSON data.
+   * @param fieldName A {@link String} of the column listener field name.
+   */
+  private void skipElementButRetainValue(TokenIterator tokenizer, String fieldName) {
+    JsonToken token = ((DummyValueParser) DummyValueParser.INSTANCE).parseAndReturnToken(tokenizer);
+    String value;
+    switch (token) {
+      case VALUE_NULL:
+        value = null;
+      case VALUE_TRUE:
+        value = Boolean.TRUE.toString();
+        break;
+      case VALUE_FALSE:
+        value = Boolean.FALSE.toString();
+        break;
+      case VALUE_NUMBER_INT:
+        value = Long.toString(tokenizer.longValue());
+        break;
+      case VALUE_NUMBER_FLOAT:
+        value = Double.toString(tokenizer.doubleValue());
+        break;
+      case VALUE_STRING:
+        value = tokenizer.stringValue();
+        break;
+      default:
+        throw tokenizer.invalidValue(token);
+    }
+
+    listenerColumnMap.put(fieldName, value);
+  }
+
   private void skipElement(TokenIterator tokenizer) {
     DummyValueParser.INSTANCE.parse(tokenizer);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BigIntListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BigIntListener.java
index bc2ad0c..671f062 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BigIntListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BigIntListener.java
@@ -62,6 +62,7 @@
       default:
         throw tokenizer.invalidValue(token);
     }
+    addValueToListenerMap(writer.schema().name(), value);
     writer.setLong(value);
   }
 
@@ -71,6 +72,7 @@
       setNull();
     } else {
       try {
+        addValueToListenerMap(writer.schema().name(), value);
         writer.setLong(Long.parseLong(value));
       } catch (NumberFormatException e) {
         throw loader.dataConversionError(schema(), "string", value);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BooleanListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BooleanListener.java
index 3c933f3..089723e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BooleanListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BooleanListener.java
@@ -61,6 +61,7 @@
         // errors.
         throw tokenizer.invalidValue(token);
     }
+    addValueToListenerMap(writer.schema().name(), value);
     writer.setBoolean(value);
   }
 
@@ -69,6 +70,7 @@
     if (value.isEmpty()) {
       setNull();
     } else {
+      addValueToListenerMap(writer.schema().name(), value);
       writer.setBoolean(Boolean.parseBoolean(value.trim()));
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DateValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DateValueListener.java
index 3d22145..23ce22c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DateValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DateValueListener.java
@@ -46,6 +46,7 @@
         setNull();
         break;
       case VALUE_NUMBER_INT:
+        addValueToListenerMap(writer.schema().name(), tokenizer.longValue());
         writer.setLong(tokenizer.longValue());
         break;
       case VALUE_STRING:
@@ -63,6 +64,7 @@
           LocalDate localDate = LocalDate.parse(tokenizer.stringValue(), dateTimeFormatter);
           writer.setLong(Duration.between(TimestampValueListener.LOCAL_EPOCH,
               localDate.atStartOfDay()).toMillis());
+          addValueToListenerMap(writer.schema().name(), Duration.between(TimestampValueListener.LOCAL_EPOCH, localDate.atStartOfDay()).toMillis());
         } catch (Exception e) {
           throw loader.dataConversionError(schema(), "date", tokenizer.stringValue());
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DecimalValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DecimalValueListener.java
index 76d51bf..dfac321 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DecimalValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DecimalValueListener.java
@@ -41,6 +41,7 @@
       case VALUE_NUMBER_FLOAT:
       case VALUE_STRING:
         try {
+          addValueToListenerMap(writer.schema().name(), new BigDecimal(tokenizer.textValue()));
           writer.setDecimal(new BigDecimal(tokenizer.textValue()));
         } catch (NumberFormatException e) {
           throw loader.dataConversionError(schema(), "DECIMAL", tokenizer.textValue());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DoubleListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DoubleListener.java
index 0a48fe3..9ddac1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DoubleListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DoubleListener.java
@@ -61,6 +61,7 @@
         // errors.
         throw tokenizer.invalidValue(token);
     }
+    addValueToListenerMap(writer.schema().name(), tokenizer.textValue());
     writer.setDouble(value);
   }
 
@@ -70,6 +71,7 @@
       setNull();
     } else {
       try {
+        addValueToListenerMap(writer.schema().name(), value);
         writer.setDouble(Double.parseDouble(value));
       } catch (NumberFormatException e) {
         throw loader.dataConversionError(schema(), "string", value);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/IntervalValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/IntervalValueListener.java
index ec1018b..2b8662e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/IntervalValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/IntervalValueListener.java
@@ -45,6 +45,7 @@
         break;
       case VALUE_STRING:
         try {
+          addValueToListenerMap(writer.schema().name(), FORMATTER.parsePeriod(tokenizer.stringValue()));
           writer.setPeriod(FORMATTER.parsePeriod(tokenizer.stringValue()));
         } catch (Exception e) {
           throw loader.dataConversionError(schema(), "date", tokenizer.stringValue());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/ScalarListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/ScalarListener.java
index b64cab0..1e8e9b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/ScalarListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/ScalarListener.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.easy.json.values;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
@@ -27,6 +28,8 @@
 
 import com.fasterxml.jackson.core.JsonToken;
 
+import java.util.Map;
+
 /**
  * Base class for scalar field listeners
  */
@@ -76,4 +79,30 @@
   protected UserException typeConversionError(String jsonType) {
     return loader.typeConversionError(schema(), jsonType);
   }
+
+  /**
+   * Adds a field's most recent value to the column listener map.
+   * This data is only stored if the listener column map is defined, and has keys.
+   * @param key The key of the listener field
+   * @param value The value of to be retained
+   */
+  protected void addValueToListenerMap(String key, String value) {
+    Map<String,Object> listenerColumnMap = loader.listenerColumnMap();
+
+    if (listenerColumnMap == null || listenerColumnMap.isEmpty()) {
+      return;
+    } else if (listenerColumnMap.containsKey(key) && StringUtils.isNotEmpty(value)) {
+      listenerColumnMap.put(key, value);
+    }
+  }
+
+  protected void addValueToListenerMap(String key, Object value) {
+    Map<String, Object> listenerColumnMap = loader.listenerColumnMap();
+
+    if (listenerColumnMap == null || listenerColumnMap.isEmpty()) {
+      return;
+    } else if (listenerColumnMap.containsKey(key) && value != null) {
+      listenerColumnMap.put(key, value);
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictBigIntValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictBigIntValueListener.java
index 9143243..d56ea43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictBigIntValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictBigIntValueListener.java
@@ -41,10 +41,12 @@
         break;
       case VALUE_NUMBER_INT:
         writer.setLong(tokenizer.longValue());
+        addValueToListenerMap(writer.schema().name(), tokenizer.longValue());
         break;
       case VALUE_STRING:
         try {
           writer.setLong(Long.parseLong(tokenizer.stringValue()));
+          addValueToListenerMap(writer.schema().name(), Long.parseLong(tokenizer.stringValue()));
         } catch (NumberFormatException e) {
           throw loader.dataConversionError(schema(), "string", tokenizer.stringValue());
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictDoubleValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictDoubleValueListener.java
index 6b69637..b0ed580 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictDoubleValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictDoubleValueListener.java
@@ -52,6 +52,7 @@
       default:
         throw tokenizer.invalidValue(token);
     }
+    addValueToListenerMap(writer.schema().name(), value);
     writer.setDouble(value);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictIntValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictIntValueListener.java
index 895dbf2..85bbea0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictIntValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictIntValueListener.java
@@ -40,11 +40,13 @@
         setNull();
         break;
       case VALUE_NUMBER_INT:
+        addValueToListenerMap(writer.schema().name(), tokenizer.longValue());
         writer.setInt((int) tokenizer.longValue());
         break;
       case VALUE_STRING:
         try {
           writer.setInt(Integer.parseInt(tokenizer.stringValue()));
+          addValueToListenerMap(writer.schema().name(), Integer.parseInt(tokenizer.stringValue()));
         } catch (NumberFormatException e) {
           throw loader.dataConversionError(schema(), "string", tokenizer.stringValue());
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictStringValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictStringValueListener.java
index 6d491e7..e73ab68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictStringValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictStringValueListener.java
@@ -36,6 +36,7 @@
         setNull();
         break;
       case VALUE_STRING:
+        addValueToListenerMap(writer.schema().name(), tokenizer.stringValue());
         writer.setString(tokenizer.stringValue());
         break;
       default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimeValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimeValueListener.java
index 6cc791f..24ddd0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimeValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimeValueListener.java
@@ -48,11 +48,13 @@
         setNull();
         break;
       case VALUE_NUMBER_INT:
+        addValueToListenerMap(writer.schema().name(), tokenizer.longValue());
         writer.setInt((int) tokenizer.longValue());
         break;
       case VALUE_STRING:
         try {
           LocalTime localTime = LocalTime.parse(tokenizer.stringValue(), TIME_FORMAT);
+          addValueToListenerMap(writer.schema().name(), localTime);
           writer.setTime(localTime);
         } catch (Exception e) {
           throw loader.dataConversionError(schema(), "string", tokenizer.stringValue());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimestampValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimestampValueListener.java
index 56a998e..e8fa6b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimestampValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimestampValueListener.java
@@ -46,11 +46,13 @@
         setNull();
         return;
       case VALUE_NUMBER_INT:
+        addValueToListenerMap(writer.schema().name(), tokenizer.longValue());
         writer.setLong(tokenizer.longValue());
         break;
       case VALUE_STRING:
         try {
           LocalDateTime localDT = LocalDateTime.parse(tokenizer.stringValue());
+          addValueToListenerMap(writer.schema().name(), Duration.between(LOCAL_EPOCH, localDT).toMillis());
           writer.setLong(Duration.between(LOCAL_EPOCH, localDT).toMillis());
         } catch (Exception e) {
           throw loader.dataConversionError(schema(), "date", tokenizer.stringValue());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcDateValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcDateValueListener.java
index ab45ef3..9577d0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcDateValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcDateValueListener.java
@@ -48,6 +48,7 @@
         setNull();
         break;
       case VALUE_NUMBER_INT:
+        addValueToListenerMap(writer.schema().name(), tokenizer.longValue());
         writer.setLong(tokenizer.longValue());
         break;
       case VALUE_STRING:
@@ -61,6 +62,7 @@
           // is different locally than UTC. A mess.
           LocalDate localDate = LocalDate.parse(tokenizer.stringValue(), DateUtility.isoFormatDate);
           ZonedDateTime utc = localDate.atStartOfDay(ZoneOffset.UTC);
+          addValueToListenerMap(writer.schema().name(), utc.toEpochSecond() * 1000);
           writer.setLong(utc.toEpochSecond() * 1000);
         } catch (Exception e) {
           throw loader.dataConversionError(schema(), "date", tokenizer.stringValue());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcTimestampValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcTimestampValueListener.java
index 3d4b917..2dd36db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcTimestampValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcTimestampValueListener.java
@@ -69,6 +69,7 @@
       default:
         throw tokenizer.invalidValue(token);
     }
+    addValueToListenerMap(writer.schema().name(), instant.toEpochMilli() + LOCAL_ZONE_ID.getRules().getOffset(instant).getTotalSeconds() * 1000L);
     writer.setLong(instant.toEpochMilli() + LOCAL_ZONE_ID.getRules().getOffset(instant).getTotalSeconds() * 1000L);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/VarCharListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/VarCharListener.java
index f0b88e2..44c01bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/VarCharListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/VarCharListener.java
@@ -63,6 +63,7 @@
       default:
         throw tokenizer.invalidValue(token);
     }
+    addValueToListenerMap(writer.schema().name(), value);
     writer.setString(value);
   }