DRILL-8287: Add Support for Keyset Based Pagination (#2633)
diff --git a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsStoragePluginConfig.java b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsStoragePluginConfig.java
index 7f2f2f1..61115ed 100644
--- a/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsStoragePluginConfig.java
+++ b/contrib/storage-googlesheets/src/main/java/org/apache/drill/exec/store/googlesheets/GoogleSheetsStoragePluginConfig.java
@@ -126,7 +126,7 @@
.build();
}
- @JsonProperty("clientID")
+ @JsonIgnore
public String getClientID() {
if(getOAuthCredentials().isPresent()) {
return getOAuthCredentials().get().getClientID();
@@ -135,7 +135,7 @@
}
}
- @JsonProperty("clientSecret")
+ @JsonIgnore
public String getClientSecret() {
if (getOAuthCredentials().isPresent()) {
return getOAuthCredentials().get().getClientSecret();
diff --git a/contrib/storage-http/Pagination.md b/contrib/storage-http/Pagination.md
index d2206b2..8855fb2 100644
--- a/contrib/storage-http/Pagination.md
+++ b/contrib/storage-http/Pagination.md
@@ -5,7 +5,7 @@
To use a paginator, you simply have to configure the paginator in the connection for the particular API.
## Words of Caution
-While extremely powerful, the auto-pagination feature has the potential to run afoul of APIs rate limits and even potentially DDOS an API.
+While extremely powerful, the auto-pagination feature has the potential to run afoul of APIs rate limits and even potentially DDoS an API. Please use with extreme care.
## Offset Pagination
@@ -40,3 +40,33 @@
}
```
In either case, the `pageSize` parameter should be set to the maximum page size allowable by the API. This will minimize the number of requests Drill is making.
+
+## Index / KeySet Pagination
+Index or KeySet pagination is when the API itself returns values to generate the next page.
+
+Consider an API that returned data like this:
+
+```json
+{
+ "companies": [
+ ...
+ ],
+ "has-more": true,
+ "index": 3849945478
+}
+
+```
+In this case, the `has-more` parameter is a boolean value which indicates whether or not there are more pages. The `index` parameter gets appended to the URL in question to generate the next page.
+
+`https://api.myapi.com/paged?properties=name&properties=website&offset=3856722038`
+
+There is a slight variant of this where the API will return the actual URL of the next page.
+
+There are three possible parameters:
+
+* `hasMoreParam`: This is the name of the boolean parameter which indicates whether the API has more pages.
+* `indexParam`: The parameter name of the key or offset that will be used to generate the next page.
+* `nextPageParam`: The parameter name which returns a complete URL of the next page.
+
+
+** Note: Index / Keyset Pagination is only implemented for APIs that return JSON **
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index a6ce240..9915948 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -20,6 +20,7 @@
import com.typesafe.config.Config;
import okhttp3.HttpUrl;
import okhttp3.HttpUrl.Builder;
+import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.ChildErrorContext;
import org.apache.drill.common.exceptions.CustomErrorContext;
@@ -36,6 +37,8 @@
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
import org.apache.drill.exec.store.http.HttpApiConfig.PostLocation;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
+import org.apache.drill.exec.store.http.paginator.IndexPaginator;
import org.apache.drill.exec.store.http.paginator.Paginator;
import org.apache.drill.exec.store.http.util.HttpProxyConfig;
import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
@@ -48,6 +51,7 @@
import java.io.File;
import java.io.InputStream;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -64,15 +68,17 @@
protected String baseUrl;
private JsonLoader jsonLoader;
private ResultSetLoader resultSetLoader;
-
+ private final List<SchemaPath> projectedColumns;
protected ImplicitColumns implicitColumns;
-
+ private final Map<String, Object> paginationFields;
public HttpBatchReader(HttpSubScan subScan) {
this.subScan = subScan;
this.maxRecords = subScan.maxRecords();
this.baseUrl = subScan.tableSpec().connectionConfig().url();
this.paginator = null;
+ this.projectedColumns = subScan.columns();
+ this.paginationFields = generatePaginationFieldMap();
}
public HttpBatchReader(HttpSubScan subScan, Paginator paginator) {
@@ -80,6 +86,8 @@
this.maxRecords = subScan.maxRecords();
this.paginator = paginator;
this.baseUrl = paginator.next();
+ this.projectedColumns = subScan.columns();
+ this.paginationFields = generatePaginationFieldMap();
logger.debug("Batch reader with URL: {}", this.baseUrl);
}
@@ -130,6 +138,7 @@
.maxRows(maxRecords)
.dataPath(subScan.tableSpec().connectionConfig().dataPath())
.errorContext(errorContext)
+ .listenerColumnMap(paginationFields)
.fromStream(inStream);
if (subScan.tableSpec().connectionConfig().jsonOptions() != null) {
@@ -208,6 +217,29 @@
implicitColumns.getColumn(RESPONSE_CODE_FIELD).setValue(http.getResponseCode());
}
+ protected Map<String, Object> generatePaginationFieldMap() {
+ if (paginator == null || paginator.getMode() != PaginatorMethod.INDEX) {
+ return null;
+ }
+
+ Map<String, Object> fieldMap = new HashMap<>();
+ IndexPaginator indexPaginator = (IndexPaginator) paginator;
+
+ // Initialize the pagination field map
+ if (StringUtils.isNotEmpty(indexPaginator.getIndexParam())) {
+ fieldMap.put(indexPaginator.getIndexParam(), null);
+ }
+
+ if (StringUtils.isNotEmpty(indexPaginator.getHasMoreParam())) {
+ fieldMap.put(indexPaginator.getHasMoreParam(), null);
+ }
+
+ if (StringUtils.isNotEmpty(indexPaginator.getNextPageParam())) {
+ fieldMap.put(indexPaginator.getNextPageParam(), null);
+ }
+ return fieldMap;
+ }
+
protected boolean implicitColumnsAreProjected() {
List<SchemaPath> columns = subScan.columns();
for (SchemaPath path : columns) {
@@ -291,13 +323,64 @@
return builder.build();
}
+ protected void populateIndexPaginator() {
+ IndexPaginator indexPaginator = (IndexPaginator) paginator;
+ // There are two cases, however in both, there is a boolean parameter which indicates
+ // whether there are more pages to follow. The first case is when the API provides
+ // a URL for the next page and the other is when the API provides an offset which is added
+ // to the next page.
+ //
+ // In ether case, we first need the boolean "has more" parameter so let's grab that.
+ if (paginationFields.get(indexPaginator.getHasMoreParam()) != null) {
+ Object hasMore = paginationFields.get(indexPaginator.getHasMoreParam());
+ boolean hasMoreValues;
+
+ if (hasMore instanceof Boolean) {
+ hasMoreValues = (Boolean) hasMore;
+ } else {
+ String hasMoreString = hasMore.toString();
+ // Attempt to convert to boolean
+ hasMoreValues = Boolean.parseBoolean(hasMoreString);
+ }
+ indexPaginator.setHasMoreValue(hasMoreValues);
+
+ // If there are no more values, notify the paginator
+ if (!hasMoreValues) {
+ paginator.notifyPartialPage();
+ }
+
+ // At this point we know that there are more pages to come. Send the data to the paginator and
+ // use that to generate the next page.
+ if (StringUtils.isNotEmpty(indexPaginator.getIndexParam())) {
+ indexPaginator.setIndexValue(paginationFields.get(indexPaginator.getIndexParam()).toString());
+ }
+
+ if (StringUtils.isNotEmpty(indexPaginator.getNextPageParam())) {
+ indexPaginator.setNextPageValue(paginationFields.get(indexPaginator.getNextPageParam()).toString());
+ }
+ }
+ }
+
@Override
public boolean next() {
boolean result = jsonLoader.readBatch();
- // Allows limitless pagination.
- if (paginator != null &&
+ // This code implements the index/keyset pagination. This pagination method
+ // uses a value returned in the current result set as the starting point for the
+ // next page. Some APIs will have a boolean parameter to indicate that there are
+ // additional pages. Some APIs will also return a URL for the next page. In any event,
+ // it is necessary to grab these values from the returned data.
+ if (paginator != null && paginator.getMode() == PaginatorMethod.INDEX) {
+ // First check to see if the limit has been reached. If so, mark the end of pagination.
+ if (maxRecords > 0 && (resultSetLoader.totalRowCount() > maxRecords)) {
+ // End Pagination
+ paginator.notifyPartialPage();
+ } else {
+ populateIndexPaginator();
+ }
+ } else if (paginator != null &&
maxRecords < 0 && (resultSetLoader.totalRowCount()) < paginator.getPageSize()) {
+ // Allows limitless pagination. Does not apply for index pagination
logger.debug("Partially filled page received, ending pagination");
paginator.notifyPartialPage();
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
index 3da54d0..e22a1b0 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
@@ -26,6 +26,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.expression.SchemaPath;
@@ -38,6 +39,7 @@
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
import org.apache.drill.exec.store.http.util.SimpleHttp;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.metastore.metadata.TableMetadata;
@@ -99,8 +101,8 @@
*/
public HttpGroupScan(HttpGroupScan that, List<SchemaPath> columns) {
super(that);
- this.columns = columns;
this.httpScanSpec = that.httpScanSpec;
+ this.columns = columns;
// Oddly called later in planning, after earlier assigning columns,
// to again assign columns. Retain filters, but compute new stats.
@@ -112,6 +114,41 @@
this.maxRecords = that.maxRecords;
}
+ private List<SchemaPath> addColumnsToSchemaPath(List<SchemaPath> columns) {
+ // This function handles the case when the pagination columns are not projected.
+ // This logic adds these columns to the projection list to ensure they are projected
+ // This is only relevant for index pagination.
+
+ // First check to see whether this is an index paginator or not, and all the fields are populated.
+ if (httpScanSpec.connectionConfig().paginator() != null &&
+ httpScanSpec.connectionConfig().paginator().getMethodType() != PaginatorMethod.INDEX &&
+ StringUtils.isEmpty(httpScanSpec.connectionConfig().dataPath())) {
+ return columns;
+ }
+
+ // Next, if the query is a star query, we don't need to modify anything.
+ if (columns == ALL_COLUMNS && StringUtils.isEmpty(httpScanSpec.connectionConfig().dataPath())) {
+ return columns;
+ }
+
+ HttpPaginatorConfig paginatorConfig = httpScanSpec.connectionConfig().paginator();
+
+ if (StringUtils.isNotEmpty(paginatorConfig.hasMoreParam())) {
+ columns.add(SchemaPath.parseFromString(cleanUpColumnName(paginatorConfig.hasMoreParam())));
+ }
+
+ if (StringUtils.isNotEmpty(paginatorConfig.indexParam())) {
+ columns.add(SchemaPath.parseFromString(cleanUpColumnName(paginatorConfig.indexParam())));
+ }
+
+ if (StringUtils.isNotEmpty(paginatorConfig.nextPageParam())) {
+ columns.add(SchemaPath.parseFromString(cleanUpColumnName(paginatorConfig.nextPageParam())));
+ }
+
+ return columns;
+ }
+
+
/**
* Adds a filter to the scan.
*/
@@ -344,6 +381,18 @@
return (getHttpConfig().params() != null) || SimpleHttp.hasURLParameters(getHttpConfig().getHttpUrl());
}
+ private String cleanUpColumnName(String columnName) {
+ if (! columnName.startsWith("`")) {
+ columnName = "`" + columnName;
+ }
+
+ if (! columnName.endsWith("`")) {
+ columnName = columnName + "`";
+ }
+
+ return columnName;
+ }
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
index 9c1a5b9..ad474f9 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
@@ -33,7 +33,7 @@
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
-@JsonDeserialize(builder = HttpPaginatorConfig.HttpPaginatorBuilder.class)
+@JsonDeserialize(builder = HttpPaginatorConfig.HttpPaginatorConfigBuilder.class)
public class HttpPaginatorConfig {
private static final Logger logger = LoggerFactory.getLogger(HttpPaginatorConfig.class);
@@ -61,6 +61,16 @@
@JsonProperty
private final String method;
+ // For index/keyset pagination
+ @JsonProperty
+ private final String hasMoreParam;
+
+ @JsonProperty
+ private final String indexParam;
+
+ @JsonProperty
+ private final String nextPageParam;
+
public HttpPaginatorConfig(HttpPaginatorConfigBuilder builder) {
this.limitParam = builder.limitParam;
this.offsetParam = builder.offsetParam;
@@ -68,7 +78,72 @@
this.pageSizeParam = builder.pageSizeParam;
this.pageSize = builder.pageSize;
this.maxRecords = builder.maxRecords;
- this.method = builder.method;
+ this.hasMoreParam = builder.hasMoreParam;
+ this.indexParam = builder.indexParam;
+ this.nextPageParam = builder.nextPageParam;
+
+ this.method = StringUtils.isEmpty(builder.method)
+ ? PaginatorMethod.OFFSET.toString() : builder.method.trim().toUpperCase();
+
+ PaginatorMethod paginatorMethod = PaginatorMethod.valueOf(this.method);
+
+ /*
+ * For pagination to function key fields must be defined. This block validates the required fields for
+ * each type of paginator.
+ */
+ switch (paginatorMethod) {
+ case OFFSET:
+ if (StringUtils.isEmpty(this.limitParam) || StringUtils.isEmpty(this.offsetParam)) {
+ throw UserException
+ .validationError()
+ .message("Invalid paginator configuration. For OFFSET pagination, limitField and offsetField must be defined.")
+ .build(logger);
+ } else if (this.pageSize <= 0) {
+ throw UserException
+ .validationError()
+ .message("Invalid paginator configuration. For OFFSET pagination, maxPageSize must be defined and greater than zero.")
+ .build(logger);
+ }
+ break;
+ case PAGE:
+ if (StringUtils.isEmpty(this.pageParam) || StringUtils.isEmpty(this.pageSizeParam)) {
+ throw UserException
+ .validationError()
+ .message("Invalid paginator configuration. For PAGE pagination, pageField and pageSizeField must be defined.")
+ .build(logger);
+ } else if (this.pageSize <= 0) {
+ throw UserException
+ .validationError()
+ .message("Invalid paginator configuration. For PAGE pagination, maxPageSize must be defined and greater than zero.")
+ .build(logger);
+ }
+ break;
+ case INDEX:
+ // Either the nextPageParam OR the indexParam must be populated
+ if (StringUtils.isEmpty(this.hasMoreParam)) {
+ throw UserException
+ .validationError()
+ .message("Invalid paginator configuration. For INDEX pagination, the hasMoreParam must be defined.")
+ .build(logger);
+ } else if ((StringUtils.isEmpty(this.nextPageParam) && StringUtils.isNotEmpty(this.indexParam)) &&
+ (StringUtils.isNotEmpty(this.nextPageParam) && StringUtils.isEmpty(this.indexParam))) {
+ throw UserException
+ .validationError()
+ .message("Invalid paginator configuration. For INDEX pagination, the nextPageParam or indexParam must be defined.")
+ .build(logger);
+ } else if (StringUtils.isEmpty(this.nextPageParam) && StringUtils.isEmpty(this.indexParam)) {
+ throw UserException
+ .validationError()
+ .message("Invalid paginator configuration. For INDEX pagination, the nextPageParam or indexParam must be defined.")
+ .build(logger);
+ }
+ break;
+ default:
+ throw UserException
+ .validationError()
+ .message("Invalid paginator method: %s. Drill supports 'OFFSET', 'INDEX' and 'PAGE'", method)
+ .build(logger);
+ }
}
public static HttpPaginatorConfigBuilder builder() {
@@ -103,6 +178,17 @@
return this.method;
}
+ public String hasMoreParam() {
+ return hasMoreParam;
+ }
+
+ public String nextPageParam() {
+ return nextPageParam;
+ }
+ public String indexParam() {
+ return indexParam;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -118,13 +204,16 @@
&& Objects.equals(offsetParam, that.offsetParam)
&& Objects.equals(pageParam, that.pageParam)
&& Objects.equals(pageSizeParam, that.pageSizeParam)
- && Objects.equals(method, that.method);
+ && Objects.equals(method, that.method)
+ && Objects.equals(hasMoreParam, that.hasMoreParam)
+ && Objects.equals(indexParam, that.indexParam)
+ && Objects.equals(nextPageParam, that.nextPageParam);
}
@Override
public int hashCode() {
return Objects.hash(limitParam, offsetParam, pageParam, pageSizeParam,
- pageSize, maxRecords, method);
+ pageSize, maxRecords, method, nextPageParam, indexParam, hasMoreParam);
}
@Override
@@ -137,64 +226,16 @@
.field("pageSize", pageSize)
.field("maxRecords", maxRecords)
.field("method", method)
+ .field("indexParam", indexParam)
+ .field("hasMoreParam", hasMoreParam)
+ .field("nextPageParam", nextPageParam)
.toString();
}
public enum PaginatorMethod {
OFFSET,
- PAGE
- }
-
- private HttpPaginatorConfig(HttpPaginatorConfig.HttpPaginatorBuilder builder) {
- this.limitParam = builder.limitParam;
- this.offsetParam = builder.offsetParam;
- this.pageSize = builder.pageSize;
- this.pageParam = builder.pageParam;
- this.pageSizeParam = builder.pageSizeParam;
- this.maxRecords = builder.maxRecords;
-
- this.method = StringUtils.isEmpty(builder.method)
- ? PaginatorMethod.OFFSET.toString() : builder.method.trim().toUpperCase();
-
- PaginatorMethod paginatorMethod = PaginatorMethod.valueOf(this.method);
-
- /*
- * For pagination to function key fields must be defined. This block validates the required fields for
- * each type of paginator.
- */
- switch (paginatorMethod) {
- case OFFSET:
- if (StringUtils.isEmpty(this.limitParam) || StringUtils.isEmpty(this.offsetParam)) {
- throw UserException
- .validationError()
- .message("Invalid paginator configuration. For OFFSET pagination, limitField and offsetField must be defined.")
- .build(logger);
- } else if (this.pageSize <= 0) {
- throw UserException
- .validationError()
- .message("Invalid paginator configuration. For OFFSET pagination, maxPageSize must be defined and greater than zero.")
- .build(logger);
- }
- break;
- case PAGE:
- if (StringUtils.isEmpty(this.pageParam) || StringUtils.isEmpty(this.pageSizeParam)) {
- throw UserException
- .validationError()
- .message("Invalid paginator configuration. For PAGE pagination, pageField and pageSizeField must be defined.")
- .build(logger);
- } else if (this.pageSize <= 0) {
- throw UserException
- .validationError()
- .message("Invalid paginator configuration. For PAGE pagination, maxPageSize must be defined and greater than zero.")
- .build(logger);
- }
- break;
- default:
- throw UserException
- .validationError()
- .message("Invalid paginator method: %s. Drill supports 'OFFSET' and 'PAGE'", method)
- .build(logger);
- }
+ PAGE,
+ INDEX
}
@JsonIgnore
@@ -203,103 +244,32 @@
}
@JsonPOJOBuilder(withPrefix = "")
- public static class HttpPaginatorBuilder {
- public String limitParam;
-
- public String offsetParam;
-
- public int maxRecords;
-
- public int pageSize;
-
- public String pageParam;
-
- public String pageSizeParam;
-
- public String method;
-
- public HttpPaginatorConfig build() {
- return new HttpPaginatorConfig(this);
- }
-
- public String limitParam() {
- return this.limitParam;
- }
-
- public String offsetParam() {
- return this.offsetParam;
- }
-
- public int maxRecords() {
- return this.maxRecords;
- }
-
- public int pageSize() {
- return this.pageSize;
- }
-
- public String pageParam() {
- return this.pageParam;
- }
-
- public String pageSizeParam() {
- return this.pageSizeParam;
- }
-
- public String method() {
- return this.method;
- }
-
- public HttpPaginatorBuilder limitParam(String limitParam) {
- this.limitParam = limitParam;
- return this;
- }
-
- public HttpPaginatorBuilder offsetParam(String offsetParam) {
- this.offsetParam = offsetParam;
- return this;
- }
-
- public HttpPaginatorBuilder maxRecords(int maxRecords) {
- this.maxRecords = maxRecords;
- return this;
- }
-
- public HttpPaginatorBuilder pageSize(int pageSize) {
- this.pageSize = pageSize;
- return this;
- }
-
- public HttpPaginatorBuilder pageParam(String pageParam) {
- this.pageParam = pageParam;
- return this;
- }
-
- public HttpPaginatorBuilder pageSizeParam(String pageSizeParam) {
- this.pageSizeParam = pageSizeParam;
- return this;
- }
-
- public HttpPaginatorBuilder method(String method) {
- this.method = method;
- return this;
- }
- }
-
public static class HttpPaginatorConfigBuilder {
private String limitParam;
-
private String offsetParam;
-
private String pageParam;
-
private String pageSizeParam;
-
private int pageSize;
-
private int maxRecords;
-
private String method;
+ private String hasMoreParam;
+ private String indexParam;
+ private String nextPageParam;
+
+ public HttpPaginatorConfigBuilder hasMoreParam(String hasMoreParam) {
+ this.hasMoreParam = hasMoreParam;
+ return this;
+ }
+
+ public HttpPaginatorConfigBuilder indexParam(String indexParam) {
+ this.indexParam = indexParam;
+ return this;
+ }
+
+ public HttpPaginatorConfigBuilder nextPageParam(String nextPageParam) {
+ this.nextPageParam = nextPageParam;
+ return this;
+ }
public HttpPaginatorConfigBuilder limitParam(String limitParam) {
this.limitParam = limitParam;
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
index 57797b3..edc7dee 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
@@ -34,6 +34,7 @@
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
+import org.apache.drill.exec.store.http.paginator.IndexPaginator;
import org.apache.drill.exec.store.http.paginator.OffsetPaginator;
import org.apache.drill.exec.store.http.paginator.PagePaginator;
import org.apache.drill.exec.store.http.paginator.Paginator;
@@ -136,6 +137,12 @@
paginatorConfig.pageSize(),
paginatorConfig.pageParam(),
paginatorConfig.pageSizeParam());
+ } else if (paginatorConfig.getMethodType() == PaginatorMethod.INDEX) {
+ paginator = new IndexPaginator(urlBuilder,
+ subScan.maxRecords(),
+ 0, paginatorConfig.hasMoreParam(),
+ paginatorConfig.indexParam(),
+ paginatorConfig.nextPageParam());
}
return paginator;
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/IndexPaginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/IndexPaginator.java
new file mode 100644
index 0000000..22aa531
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/IndexPaginator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http.paginator;
+
+import okhttp3.HttpUrl.Builder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.NoSuchElementException;
+
+public class IndexPaginator extends Paginator {
+
+ private final String hasMoreParam;
+ private final String indexParam;
+ private final String nextPageParam;
+
+ private String indexValue;
+ private Boolean hasMoreValue;
+ private String nextPageValue;
+ private int pageCount;
+
+ public IndexPaginator(Builder builder, int pageSize, int limit, String hasMoreParam, String indexParam, String nextPageParam) {
+ super(builder, PaginatorMethod.INDEX, pageSize, limit);
+ this.hasMoreParam = hasMoreParam;
+ this.indexParam = indexParam;
+ this.nextPageParam = nextPageParam;
+ pageCount = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !partialPageReceived;
+
+ }
+
+ public String getIndexParam() {
+ return this.indexParam;
+ }
+
+ public String getHasMoreParam() {
+ return this.hasMoreParam;
+ }
+
+ public String getNextPageParam() {
+ return this.nextPageParam;
+ }
+
+ public void setHasMoreValue(Boolean hasMoreValue) {
+ this.hasMoreValue = hasMoreValue;
+ }
+
+ public void setIndexValue(String indexValue) {
+ if (StringUtils.isNumeric(indexValue)) {
+
+ }
+ this.indexValue = indexValue;
+ }
+
+ public void setNextPageValue(String nextPageValue) {
+ this.nextPageValue = nextPageValue;
+ }
+
+ public boolean isFirstPage() {
+ return pageCount < 1;
+ }
+
+ @Override
+ public String next() {
+ // If the paginator has never been run before, just return the base URL.
+ if (pageCount == 0) {
+ pageCount++;
+ return builder.build().url().toString();
+ }
+
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ if (StringUtils.isNotEmpty(nextPageValue)) {
+ // TODO figure this out...
+ } else if (StringUtils.isNotEmpty(indexValue)) {
+ try {
+ indexValue = URLEncoder.encode(indexValue, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ // Do nothing.
+ }
+ builder.removeAllEncodedQueryParameters(indexParam);
+ builder.addQueryParameter(indexParam, indexValue);
+ }
+ pageCount++;
+ return builder.build().url().toString();
+ }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/OffsetPaginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/OffsetPaginator.java
index 22a0233..eed3c23 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/OffsetPaginator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/OffsetPaginator.java
@@ -20,6 +20,7 @@
import okhttp3.HttpUrl.Builder;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@
* @param offsetParam The field name which corresponds to the offset field from the API
*/
public OffsetPaginator(Builder builder, int limit, int pageSize, String limitParam, String offsetParam) {
- super(builder, paginationMode.OFFSET, pageSize, limit);
+ super(builder, PaginatorMethod.OFFSET, pageSize, limit);
this.limit = limit;
this.limitParam = limitParam;
this.offsetParam = offsetParam;
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
index 16746c1..b640d3a 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
@@ -20,6 +20,7 @@
import okhttp3.HttpUrl.Builder;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +45,7 @@
* @param pageSizeParam The API Query parameter which specifies how many results per page
*/
public PagePaginator(Builder builder, int limit, int pageSize, String pageParam, String pageSizeParam) {
- super(builder, paginationMode.PAGE, pageSize, limit);
+ super(builder, PaginatorMethod.PAGE, pageSize, limit);
this.limit = limit;
this.pageParam = pageParam;
this.pageSizeParam = pageSizeParam;
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/Paginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/Paginator.java
index eb1f1e7..f7489a7 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/Paginator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/Paginator.java
@@ -19,6 +19,7 @@
package org.apache.drill.exec.store.http.paginator;
import okhttp3.HttpUrl.Builder;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,17 +42,12 @@
private static final int MAX_ATTEMPTS = 100;
protected final int pageSize;
- public enum paginationMode {
- OFFSET,
- PAGE
- }
-
- protected final paginationMode MODE;
+ protected final PaginatorMethod MODE;
protected final int limit;
protected boolean partialPageReceived;
protected Builder builder;
- public Paginator(Builder builder, paginationMode mode, int pageSize, int limit) {
+ public Paginator(Builder builder, PaginatorMethod mode, int pageSize, int limit) {
this.MODE = mode;
this.builder = builder;
this.pageSize = pageSize;
@@ -78,4 +74,8 @@
}
public int getPageSize() { return pageSize; }
+
+ public PaginatorMethod getMode() {
+ return MODE;
+ }
}
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
index b8c5972..328a447 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
@@ -60,6 +60,12 @@
private static String TEST_JSON_PAGE1;
private static String TEST_JSON_PAGE2;
private static String TEST_JSON_PAGE3;
+
+ private static String TEST_JSON_INDEX_PAGE1;
+ private static String TEST_JSON_INDEX_PAGE2;
+ private static String TEST_JSON_INDEX_PAGE3;
+ private static String TEST_JSON_INDEX_PAGE4;
+
private static String TEST_XML_PAGE1;
private static String TEST_XML_PAGE2;
private static String TEST_XML_PAGE3;
@@ -77,6 +83,12 @@
TEST_JSON_PAGE2 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/p2.json"), Charsets.UTF_8).read();
TEST_JSON_PAGE3 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/p3.json"), Charsets.UTF_8).read();
+ TEST_JSON_INDEX_PAGE1 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/index_response1.json"), Charsets.UTF_8).read();
+ TEST_JSON_INDEX_PAGE2 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/index_response2.json"), Charsets.UTF_8).read();
+
+ TEST_JSON_INDEX_PAGE3 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/index_response3.json"), Charsets.UTF_8).read();
+ TEST_JSON_INDEX_PAGE4 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/index_response4.json"), Charsets.UTF_8).read();
+
TEST_XML_PAGE1 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response_1.xml"), Charsets.UTF_8).read();
TEST_XML_PAGE2 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response_2.xml"), Charsets.UTF_8).read();
TEST_XML_PAGE3 = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/response_3.xml"), Charsets.UTF_8).read();
@@ -140,6 +152,32 @@
.pageSize(2)
.build();
+ HttpPaginatorConfig indexPaginator = HttpPaginatorConfig.builder()
+ .indexParam("offset")
+ .hasMoreParam("has-more")
+ .method("index")
+ .build();
+
+ HttpApiConfig mockJsonConfigWithKeyset = HttpApiConfig.builder()
+ .url("http://localhost:8092/json")
+ .method("get")
+ .headers(headers)
+ .requireTail(false)
+ .paginator(indexPaginator)
+ .inputType("json")
+ .build();
+
+ HttpApiConfig mockJsonConfigWithKeysetAndDataPath = HttpApiConfig.builder()
+ .url("http://localhost:8092/json")
+ .method("get")
+ .headers(headers)
+ .requireTail(false)
+ .dataPath("companies")
+ .paginator(indexPaginator)
+ .inputType("json")
+ .build();
+
+
HttpApiConfig mockJsonConfigWithPaginator = HttpApiConfig.builder()
.url("http://localhost:8092/json")
.method("get")
@@ -192,6 +230,8 @@
Map<String, HttpApiConfig> configs = new HashMap<>();
configs.put("csv_paginator", mockCsvConfigWithPaginator);
+ configs.put("json_index", mockJsonConfigWithKeyset);
+ configs.put("json_index_datapath", mockJsonConfigWithKeysetAndDataPath);
configs.put("json_paginator", mockJsonConfigWithPaginator);
configs.put("xml_paginator", mockXmlConfigWithPaginator);
configs.put("xml_paginator_url_params", mockXmlConfigWithPaginatorAndUrlParams);
@@ -244,6 +284,72 @@
}
@Test
+ public void simpleJSONIndexQuery() throws Exception {
+ String sql = "SELECT * FROM `local`.`json_index` LIMIT 4";
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_INDEX_PAGE1));
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_INDEX_PAGE2));
+
+ List<QueryDataBatch> results = client.queryBuilder()
+ .sql(sql)
+ .results();
+
+ int count = 0;
+ for(QueryDataBatch b : results){
+ count += b.getHeader().getRowCount();
+ b.release();
+ }
+ assertEquals(2, results.size());
+ assertEquals(2, count);
+ }
+ }
+
+ @Test
+ public void simpleJSONIndexQueryWithProjectedColumns() throws Exception {
+ String sql = "SELECT companies FROM `local`.`json_index` LIMIT 4";
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_INDEX_PAGE1));
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_INDEX_PAGE2));
+
+ List<QueryDataBatch> results = client.queryBuilder()
+ .sql(sql)
+ .results();
+
+ int count = 0;
+ for(QueryDataBatch b : results){
+ count += b.getHeader().getRowCount();
+ b.release();
+ }
+ assertEquals(2, results.size());
+ assertEquals(2, count);
+ }
+ }
+
+ @Test
+ public void simpleJSONIndexQueryAndDataPath() throws Exception {
+ String sql = "SELECT * FROM `local`.`json_index_datapath` LIMIT 4";
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_INDEX_PAGE3));
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_INDEX_PAGE4));
+
+ List<QueryDataBatch> results = client.queryBuilder()
+ .sql(sql)
+ .results();
+
+ int count = 0;
+ for(QueryDataBatch b : results){
+ count += b.getHeader().getRowCount();
+ b.release();
+ }
+ assertEquals(2, results.size());
+ assertEquals(4, count);
+ }
+ }
+
+ @Test
public void simpleJSONPaginatorQueryWithoutLimit() throws Exception {
String sql = "SELECT * FROM `local`.`json_paginator`";
try (MockWebServer server = startServer()) {
diff --git a/contrib/storage-http/src/test/resources/data/index_response1.json b/contrib/storage-http/src/test/resources/data/index_response1.json
new file mode 100644
index 0000000..708018b
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/index_response1.json
@@ -0,0 +1,96 @@
+{
+ "companies": [
+ {
+ "portalId": 62515,
+ "additionalDomains": [
+
+ ],
+ "properties": {
+ "website": {
+ "sourceId": null,
+ "timestamp": 1457513066540,
+ "versions": [
+ {
+ "timestamp": 1457513066540,
+ "sourceVid": [
+
+ ],
+ "name": "website",
+ "value": "example.com",
+ "source": "COMPANIES"
+ }
+ ],
+ "value": "example.com",
+ "source": "COMPANIES"
+ },
+ "name": {
+ "sourceId": "name",
+ "timestamp": 1464484587592,
+ "versions": [
+ {
+ "name": "name",
+ "sourceId": "name",
+ "timestamp": 1464484587592,
+ "value": "Example Company",
+ "source": "BIDEN",
+ "sourceVid": [
+
+ ]
+ }
+ ],
+ "value": "Example Company",
+ "source": "BIDEN"
+ }
+ },
+ "isDeleted": false,
+ "companyId": 115200636
+ },
+ {
+ "portalId": 62515,
+ "additionalDomains": [
+
+ ],
+ "properties": {
+ "website": {
+ "sourceId": null,
+ "timestamp": 1457535205549,
+ "versions": [
+ {
+ "timestamp": 1457535205549,
+ "sourceVid": [
+
+ ],
+ "name": "website",
+ "value": "test.com",
+ "source": "COMPANIES"
+ }
+ ],
+ "value": "test.com",
+ "source": "COMPANIES"
+ },
+ "name": {
+ "sourceId": "name",
+ "timestamp": 1468832771769,
+ "versions": [
+ {
+ "name": "name",
+ "sourceId": "name",
+ "timestamp": 1468832771769,
+ "value": "Test Company",
+ "source": "BIDEN",
+ "sourceVid": [
+
+ ]
+ }
+ ],
+ "value": "Test Company",
+ "source": "BIDEN"
+ }
+ },
+ "isDeleted": false,
+ "companyId": 115279791
+ }
+ ],
+ "has-more": true,
+ "offset": 3849945478
+}
diff --git a/contrib/storage-http/src/test/resources/data/index_response2.json b/contrib/storage-http/src/test/resources/data/index_response2.json
new file mode 100644
index 0000000..c13acaa
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/index_response2.json
@@ -0,0 +1,96 @@
+{
+ "companies": [
+ {
+ "portalId": 62515,
+ "additionalDomains": [
+
+ ],
+ "properties": {
+ "website": {
+ "sourceId": null,
+ "timestamp": 1457513066540,
+ "versions": [
+ {
+ "timestamp": 1457513066540,
+ "sourceVid": [
+
+ ],
+ "name": "website",
+ "value": "example.com",
+ "source": "COMPANIES"
+ }
+ ],
+ "value": "example.com",
+ "source": "COMPANIES"
+ },
+ "name": {
+ "sourceId": "name",
+ "timestamp": 1464484587592,
+ "versions": [
+ {
+ "name": "name",
+ "sourceId": "name",
+ "timestamp": 1464484587592,
+ "value": "Example Company",
+ "source": "BIDEN",
+ "sourceVid": [
+
+ ]
+ }
+ ],
+ "value": "Example Company",
+ "source": "BIDEN"
+ }
+ },
+ "isDeleted": false,
+ "companyId": 115200636
+ },
+ {
+ "portalId": 62515,
+ "additionalDomains": [
+
+ ],
+ "properties": {
+ "website": {
+ "sourceId": null,
+ "timestamp": 1457535205549,
+ "versions": [
+ {
+ "timestamp": 1457535205549,
+ "sourceVid": [
+
+ ],
+ "name": "website",
+ "value": "test.com",
+ "source": "COMPANIES"
+ }
+ ],
+ "value": "test.com",
+ "source": "COMPANIES"
+ },
+ "name": {
+ "sourceId": "name",
+ "timestamp": 1468832771769,
+ "versions": [
+ {
+ "name": "name",
+ "sourceId": "name",
+ "timestamp": 1468832771769,
+ "value": "Test Company",
+ "source": "BIDEN",
+ "sourceVid": [
+
+ ]
+ }
+ ],
+ "value": "Test Company",
+ "source": "BIDEN"
+ }
+ },
+ "isDeleted": false,
+ "companyId": 115279791
+ }
+ ],
+ "has-more": false,
+ "offset": 115279791
+}
diff --git a/contrib/storage-http/src/test/resources/data/index_response3.json b/contrib/storage-http/src/test/resources/data/index_response3.json
new file mode 100644
index 0000000..6377343
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/index_response3.json
@@ -0,0 +1,14 @@
+{
+ "has-more": true,
+ "offset": 115279791,
+ "companies": [
+ {
+ "portalId": 62515,
+ "companyId": 115200636
+ },
+ {
+ "portalId": 62515,
+ "companyId": 115279791
+ }
+ ]
+}
diff --git a/contrib/storage-http/src/test/resources/data/index_response4.json b/contrib/storage-http/src/test/resources/data/index_response4.json
new file mode 100644
index 0000000..319858b
--- /dev/null
+++ b/contrib/storage-http/src/test/resources/data/index_response4.json
@@ -0,0 +1,14 @@
+{
+ "has-more": false,
+ "offset": 115279791,
+ "companies": [
+ {
+ "portalId": 62515,
+ "companyId": 115200636
+ },
+ {
+ "portalId": 62515,
+ "companyId": 115279791
+ }
+ ]
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
index b52b116..b3f9f28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
@@ -17,7 +17,9 @@
*/
package org.apache.drill.exec.physical.impl.scan;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.store.RecordReader;
/**
* Extended version of a record reader used by the revised
@@ -78,7 +80,7 @@
* don't worry about it.
* <p>
* If an error occurs, the reader can throw a {@link RuntimeException}
- * from any method. A {@link UserException} is preferred to provide
+ * from any method. A {@link org.apache.drill.common.exceptions.UserException} is preferred to provide
* detailed information about the source of the problem.
*/
public interface RowBatchReader {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index 7e9d9d5..5ac3a59 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import io.netty.buffer.DrillBuf;
import org.apache.commons.io.IOUtils;
@@ -153,6 +154,7 @@
private MessageParser messageParser;
private ImplicitColumns implicitFields;
private int maxRows;
+ private Map<String, Object> listenerColumnMap;
public JsonLoaderBuilder resultSetLoader(ResultSetLoader rsLoader) {
this.rsLoader = rsLoader;
@@ -164,6 +166,11 @@
return this;
}
+ public JsonLoaderBuilder listenerColumnMap(Map<String,Object> listenerColumnMap) {
+ this.listenerColumnMap = listenerColumnMap;
+ return this;
+ }
+
public JsonLoaderBuilder standardOptions(OptionSet optionSet) {
this.options = new JsonLoaderOptions(optionSet);
return this;
@@ -246,6 +253,7 @@
private final JsonStructureParser parser;
private final FieldFactory fieldFactory;
private final ImplicitColumns implicitFields;
+ private final Map<String, Object> listenerColumnMap;
private final int maxRows;
private boolean eof;
@@ -268,6 +276,7 @@
this.implicitFields = builder.implicitFields;
this.maxRows = builder.maxRows;
this.fieldFactory = buildFieldFactory(builder);
+ this.listenerColumnMap = builder.listenerColumnMap;
this.parser = buildParser(builder);
}
@@ -279,6 +288,7 @@
.parserFactory(parser ->
new TupleParser(parser, JsonLoaderImpl.this, rsLoader.writer(), builder.providedSchema))
.errorFactory(this)
+ .listenerColumnMap(listenerColumnMap)
.messageParser(builder.messageParser)
.dataPath(builder.dataPath)
.build();
@@ -298,6 +308,9 @@
public JsonLoaderOptions options() { return options; }
public JsonStructureParser parser() { return parser; }
public FieldFactory fieldFactory() { return fieldFactory; }
+ public Map<String, Object> listenerColumnMap() {
+ return listenerColumnMap;
+ }
@Override // JsonLoader
public boolean readBatch() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/TupleParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/TupleParser.java
index 100ddc3..bf65bb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/TupleParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/TupleParser.java
@@ -127,10 +127,19 @@
@Override
public ElementParser onField(String key, TokenIterator tokenizer) {
- if (!tupleWriter.isProjected(key)) {
- return fieldFactory().ignoredFieldParser();
- } else {
+ if (projectField(key)) {
return fieldParserFor(key, tokenizer);
+ } else {
+ return fieldFactory().ignoredFieldParser();
+ }
+ }
+
+ private boolean projectField(String key) {
+ // This method makes sure that fields necessary for column listeners are read.
+ if (tupleWriter.isProjected(key)) {
+ return true;
+ } else {
+ return loader.listenerColumnMap() != null && loader.listenerColumnMap().containsKey(key);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java
index 59aecf9..60ccbf9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java
@@ -43,6 +43,20 @@
}
}
+ public JsonToken parseAndReturnToken(TokenIterator tokenizer) {
+ JsonToken token = tokenizer.requireNext();
+ switch (token) {
+ case START_ARRAY:
+ case START_OBJECT:
+ parseTail(tokenizer);
+ break;
+
+ default:
+ break;
+ }
+ return token;
+ }
+
private void parseTail(TokenIterator tokenizer) {
// Parse (field: value)* }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
index 98163fd..03fa51b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
@@ -21,6 +21,7 @@
import java.io.InputStream;
import java.io.Reader;
import java.util.Arrays;
+import java.util.Map;
import java.util.function.Function;
import org.apache.commons.collections4.IterableUtils;
@@ -83,6 +84,7 @@
private ErrorFactory errorFactory;
private String dataPath;
private MessageParser messageParser;
+ private Map<String, Object> listenerColumnMap;
public JsonStructureParserBuilder options(JsonStructureOptions options) {
this.options = options;
@@ -125,13 +127,18 @@
return this;
}
+ public JsonStructureParserBuilder listenerColumnMap(Map<String, Object> listenerColumnMap) {
+ this.listenerColumnMap = listenerColumnMap;
+ return this;
+ }
+
public JsonStructureParser build() {
if (dataPath != null) {
dataPath = dataPath.trim();
dataPath = dataPath.isEmpty() ? null : dataPath;
}
if (dataPath != null && messageParser == null) {
- messageParser = new SimpleMessageParser(dataPath);
+ messageParser = new SimpleMessageParser(dataPath, listenerColumnMap);
}
return new JsonStructureParser(this);
}
@@ -142,6 +149,7 @@
private final TokenIterator tokenizer;
private final RootParser rootState;
private final FieldParserFactory fieldFactory;
+ private final Map<String, Object> listenerColumnMap;
private int errorRecoveryCount;
/**
@@ -167,6 +175,8 @@
fieldFactory = new FieldParserFactory(this,
Preconditions.checkNotNull(builder.parserFactory));
+ this.listenerColumnMap = builder.listenerColumnMap;
+
// Parse to the start of the data object(s), and create a root
// state to parse objects and watch for the end of data.
// The root state parses one object on each next() call.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
index f100a8f..3db8a4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
@@ -21,6 +21,8 @@
import com.fasterxml.jackson.core.JsonToken;
+import java.util.Map;
+
/**
* A message parser which accepts a path to the data encoded as a
* slash-separated string. Given the following JSON message:
@@ -66,11 +68,13 @@
public class SimpleMessageParser implements MessageParser {
private final String[] path;
+ private final Map<String, Object> listenerColumnMap;
- public SimpleMessageParser(String dataPath) {
+ public SimpleMessageParser(String dataPath, Map<String, Object> listenerColumnMap) {
path = dataPath.split("/");
Preconditions.checkArgument(path.length > 0,
"Data path should not be empty.");
+ this.listenerColumnMap = listenerColumnMap;
}
@Override
@@ -102,6 +106,8 @@
String fieldName = tokenizer.textValue();
if (fieldName.equals(path[level])) {
return parseInnerLevel(tokenizer, level);
+ } else if (listenerColumnMap != null && listenerColumnMap.containsKey(fieldName)) {
+ skipElementButRetainValue(tokenizer, fieldName);
} else {
skipElement(tokenizer);
}
@@ -129,6 +135,44 @@
return parseToElement(tokenizer, level + 1);
}
+ /**
+ * This function is called when a storage plugin needs to retrieve values which have been read. This logic
+ * enables use of the data path in these situations. Normally, when the datapath is defined, the JSON reader
+ * will "free-wheel" over unprojected columns or columns outside of the datapath. However, in this case, often
+ * the values which are being read, are outside the dataPath. This logic offers a way to capture these values
+ * without creating a ValueVector for them.
+ *
+ * @param tokenizer A {@link TokenIterator} of the parsed JSON data.
+ * @param fieldName A {@link String} of the column listener field name.
+ */
+ private void skipElementButRetainValue(TokenIterator tokenizer, String fieldName) {
+ JsonToken token = ((DummyValueParser) DummyValueParser.INSTANCE).parseAndReturnToken(tokenizer);
+ String value;
+ switch (token) {
+ case VALUE_NULL:
+ value = null;
+ case VALUE_TRUE:
+ value = Boolean.TRUE.toString();
+ break;
+ case VALUE_FALSE:
+ value = Boolean.FALSE.toString();
+ break;
+ case VALUE_NUMBER_INT:
+ value = Long.toString(tokenizer.longValue());
+ break;
+ case VALUE_NUMBER_FLOAT:
+ value = Double.toString(tokenizer.doubleValue());
+ break;
+ case VALUE_STRING:
+ value = tokenizer.stringValue();
+ break;
+ default:
+ throw tokenizer.invalidValue(token);
+ }
+
+ listenerColumnMap.put(fieldName, value);
+ }
+
private void skipElement(TokenIterator tokenizer) {
DummyValueParser.INSTANCE.parse(tokenizer);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BigIntListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BigIntListener.java
index bc2ad0c..671f062 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BigIntListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BigIntListener.java
@@ -62,6 +62,7 @@
default:
throw tokenizer.invalidValue(token);
}
+ addValueToListenerMap(writer.schema().name(), value);
writer.setLong(value);
}
@@ -71,6 +72,7 @@
setNull();
} else {
try {
+ addValueToListenerMap(writer.schema().name(), value);
writer.setLong(Long.parseLong(value));
} catch (NumberFormatException e) {
throw loader.dataConversionError(schema(), "string", value);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BooleanListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BooleanListener.java
index 3c933f3..089723e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BooleanListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/BooleanListener.java
@@ -61,6 +61,7 @@
// errors.
throw tokenizer.invalidValue(token);
}
+ addValueToListenerMap(writer.schema().name(), value);
writer.setBoolean(value);
}
@@ -69,6 +70,7 @@
if (value.isEmpty()) {
setNull();
} else {
+ addValueToListenerMap(writer.schema().name(), value);
writer.setBoolean(Boolean.parseBoolean(value.trim()));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DateValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DateValueListener.java
index 3d22145..23ce22c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DateValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DateValueListener.java
@@ -46,6 +46,7 @@
setNull();
break;
case VALUE_NUMBER_INT:
+ addValueToListenerMap(writer.schema().name(), tokenizer.longValue());
writer.setLong(tokenizer.longValue());
break;
case VALUE_STRING:
@@ -63,6 +64,7 @@
LocalDate localDate = LocalDate.parse(tokenizer.stringValue(), dateTimeFormatter);
writer.setLong(Duration.between(TimestampValueListener.LOCAL_EPOCH,
localDate.atStartOfDay()).toMillis());
+ addValueToListenerMap(writer.schema().name(), Duration.between(TimestampValueListener.LOCAL_EPOCH, localDate.atStartOfDay()).toMillis());
} catch (Exception e) {
throw loader.dataConversionError(schema(), "date", tokenizer.stringValue());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DecimalValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DecimalValueListener.java
index 76d51bf..dfac321 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DecimalValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DecimalValueListener.java
@@ -41,6 +41,7 @@
case VALUE_NUMBER_FLOAT:
case VALUE_STRING:
try {
+ addValueToListenerMap(writer.schema().name(), new BigDecimal(tokenizer.textValue()));
writer.setDecimal(new BigDecimal(tokenizer.textValue()));
} catch (NumberFormatException e) {
throw loader.dataConversionError(schema(), "DECIMAL", tokenizer.textValue());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DoubleListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DoubleListener.java
index 0a48fe3..9ddac1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DoubleListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/DoubleListener.java
@@ -61,6 +61,7 @@
// errors.
throw tokenizer.invalidValue(token);
}
+ addValueToListenerMap(writer.schema().name(), tokenizer.textValue());
writer.setDouble(value);
}
@@ -70,6 +71,7 @@
setNull();
} else {
try {
+ addValueToListenerMap(writer.schema().name(), value);
writer.setDouble(Double.parseDouble(value));
} catch (NumberFormatException e) {
throw loader.dataConversionError(schema(), "string", value);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/IntervalValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/IntervalValueListener.java
index ec1018b..2b8662e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/IntervalValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/IntervalValueListener.java
@@ -45,6 +45,7 @@
break;
case VALUE_STRING:
try {
+ addValueToListenerMap(writer.schema().name(), FORMATTER.parsePeriod(tokenizer.stringValue()));
writer.setPeriod(FORMATTER.parsePeriod(tokenizer.stringValue()));
} catch (Exception e) {
throw loader.dataConversionError(schema(), "date", tokenizer.stringValue());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/ScalarListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/ScalarListener.java
index b64cab0..1e8e9b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/ScalarListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/ScalarListener.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.easy.json.values;
+import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
@@ -27,6 +28,8 @@
import com.fasterxml.jackson.core.JsonToken;
+import java.util.Map;
+
/**
* Base class for scalar field listeners
*/
@@ -76,4 +79,30 @@
protected UserException typeConversionError(String jsonType) {
return loader.typeConversionError(schema(), jsonType);
}
+
+ /**
+ * Adds a field's most recent value to the column listener map.
+ * This data is only stored if the listener column map is defined, and has keys.
+ * @param key The key of the listener field
+ * @param value The value of to be retained
+ */
+ protected void addValueToListenerMap(String key, String value) {
+ Map<String,Object> listenerColumnMap = loader.listenerColumnMap();
+
+ if (listenerColumnMap == null || listenerColumnMap.isEmpty()) {
+ return;
+ } else if (listenerColumnMap.containsKey(key) && StringUtils.isNotEmpty(value)) {
+ listenerColumnMap.put(key, value);
+ }
+ }
+
+ protected void addValueToListenerMap(String key, Object value) {
+ Map<String, Object> listenerColumnMap = loader.listenerColumnMap();
+
+ if (listenerColumnMap == null || listenerColumnMap.isEmpty()) {
+ return;
+ } else if (listenerColumnMap.containsKey(key) && value != null) {
+ listenerColumnMap.put(key, value);
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictBigIntValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictBigIntValueListener.java
index 9143243..d56ea43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictBigIntValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictBigIntValueListener.java
@@ -41,10 +41,12 @@
break;
case VALUE_NUMBER_INT:
writer.setLong(tokenizer.longValue());
+ addValueToListenerMap(writer.schema().name(), tokenizer.longValue());
break;
case VALUE_STRING:
try {
writer.setLong(Long.parseLong(tokenizer.stringValue()));
+ addValueToListenerMap(writer.schema().name(), Long.parseLong(tokenizer.stringValue()));
} catch (NumberFormatException e) {
throw loader.dataConversionError(schema(), "string", tokenizer.stringValue());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictDoubleValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictDoubleValueListener.java
index 6b69637..b0ed580 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictDoubleValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictDoubleValueListener.java
@@ -52,6 +52,7 @@
default:
throw tokenizer.invalidValue(token);
}
+ addValueToListenerMap(writer.schema().name(), value);
writer.setDouble(value);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictIntValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictIntValueListener.java
index 895dbf2..85bbea0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictIntValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictIntValueListener.java
@@ -40,11 +40,13 @@
setNull();
break;
case VALUE_NUMBER_INT:
+ addValueToListenerMap(writer.schema().name(), tokenizer.longValue());
writer.setInt((int) tokenizer.longValue());
break;
case VALUE_STRING:
try {
writer.setInt(Integer.parseInt(tokenizer.stringValue()));
+ addValueToListenerMap(writer.schema().name(), Integer.parseInt(tokenizer.stringValue()));
} catch (NumberFormatException e) {
throw loader.dataConversionError(schema(), "string", tokenizer.stringValue());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictStringValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictStringValueListener.java
index 6d491e7..e73ab68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictStringValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/StrictStringValueListener.java
@@ -36,6 +36,7 @@
setNull();
break;
case VALUE_STRING:
+ addValueToListenerMap(writer.schema().name(), tokenizer.stringValue());
writer.setString(tokenizer.stringValue());
break;
default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimeValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimeValueListener.java
index 6cc791f..24ddd0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimeValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimeValueListener.java
@@ -48,11 +48,13 @@
setNull();
break;
case VALUE_NUMBER_INT:
+ addValueToListenerMap(writer.schema().name(), tokenizer.longValue());
writer.setInt((int) tokenizer.longValue());
break;
case VALUE_STRING:
try {
LocalTime localTime = LocalTime.parse(tokenizer.stringValue(), TIME_FORMAT);
+ addValueToListenerMap(writer.schema().name(), localTime);
writer.setTime(localTime);
} catch (Exception e) {
throw loader.dataConversionError(schema(), "string", tokenizer.stringValue());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimestampValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimestampValueListener.java
index 56a998e..e8fa6b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimestampValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/TimestampValueListener.java
@@ -46,11 +46,13 @@
setNull();
return;
case VALUE_NUMBER_INT:
+ addValueToListenerMap(writer.schema().name(), tokenizer.longValue());
writer.setLong(tokenizer.longValue());
break;
case VALUE_STRING:
try {
LocalDateTime localDT = LocalDateTime.parse(tokenizer.stringValue());
+ addValueToListenerMap(writer.schema().name(), Duration.between(LOCAL_EPOCH, localDT).toMillis());
writer.setLong(Duration.between(LOCAL_EPOCH, localDT).toMillis());
} catch (Exception e) {
throw loader.dataConversionError(schema(), "date", tokenizer.stringValue());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcDateValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcDateValueListener.java
index ab45ef3..9577d0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcDateValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcDateValueListener.java
@@ -48,6 +48,7 @@
setNull();
break;
case VALUE_NUMBER_INT:
+ addValueToListenerMap(writer.schema().name(), tokenizer.longValue());
writer.setLong(tokenizer.longValue());
break;
case VALUE_STRING:
@@ -61,6 +62,7 @@
// is different locally than UTC. A mess.
LocalDate localDate = LocalDate.parse(tokenizer.stringValue(), DateUtility.isoFormatDate);
ZonedDateTime utc = localDate.atStartOfDay(ZoneOffset.UTC);
+ addValueToListenerMap(writer.schema().name(), utc.toEpochSecond() * 1000);
writer.setLong(utc.toEpochSecond() * 1000);
} catch (Exception e) {
throw loader.dataConversionError(schema(), "date", tokenizer.stringValue());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcTimestampValueListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcTimestampValueListener.java
index 3d4b917..2dd36db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcTimestampValueListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/UtcTimestampValueListener.java
@@ -69,6 +69,7 @@
default:
throw tokenizer.invalidValue(token);
}
+ addValueToListenerMap(writer.schema().name(), instant.toEpochMilli() + LOCAL_ZONE_ID.getRules().getOffset(instant).getTotalSeconds() * 1000L);
writer.setLong(instant.toEpochMilli() + LOCAL_ZONE_ID.getRules().getOffset(instant).getTotalSeconds() * 1000L);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/VarCharListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/VarCharListener.java
index f0b88e2..44c01bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/VarCharListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/values/VarCharListener.java
@@ -63,6 +63,7 @@
default:
throw tokenizer.invalidValue(token);
}
+ addValueToListenerMap(writer.schema().name(), value);
writer.setString(value);
}