DRILL-7683: Add "message parsing" to new JSON loader
Adds the ability to parse "extra" JSON around the data payload,
as often needed for a REST API.
closes #2045
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
index 7c9a753..b2a793d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
@@ -22,6 +22,9 @@
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.server.options.OptionSet;
+
+import com.typesafe.config.Config;
/**
* Negotiates the table schema with the scanner framework and provides
@@ -100,6 +103,8 @@
public interface SchemaNegotiator {
OperatorContext context();
+ Config drillConfig();
+ OptionSet queryOptions();
/**
* Specify an advanced error context which allows the reader to
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
index 8763dd4..64bac43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
@@ -21,8 +21,11 @@
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.vector.ValueVector;
+import com.typesafe.config.Config;
+
/**
* Implementation of the schema negotiation between scan operator and
* batch reader. Anticipates that the select list (and/or the list of
@@ -94,6 +97,16 @@
}
@Override
+ public Config drillConfig() {
+ return context().getFragmentContext().getConfig();
+ }
+
+ @Override
+ public OptionSet queryOptions() {
+ return context().getFragmentContext().getOptions();
+ }
+
+ @Override
public CustomErrorContext parentErrorContext() {
return framework.errorContext();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoader.java
index 9d9afed..e4c5a0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoader.java
@@ -56,13 +56,7 @@
* @throws RuntimeException for unexpected errors, most often due
* to code errors
*/
- boolean next();
-
- /**
- * Indicates that a batch is complete. Tells the loader to materialize
- * any deferred null fields. (See {@link TupleListener} for details.)
- */
- void endBatch();
+ boolean readBatch();
/**
* Releases resources held by this class including the input stream.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index 9fd64bd..ecfaf4b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.Reader;
import java.util.ArrayList;
import java.util.List;
@@ -28,8 +29,12 @@
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.store.easy.json.parser.ErrorFactory;
import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser;
+import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser.JsonStructureParserBuilder;
+import org.apache.drill.exec.store.easy.json.parser.MessageParser;
+import org.apache.drill.exec.store.easy.json.parser.MessageParser.MessageContextException;
import org.apache.drill.exec.store.easy.json.parser.ValueDef;
import org.apache.drill.exec.store.easy.json.parser.ValueDef.JsonType;
import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
@@ -97,7 +102,7 @@
* <li>Reports errors as {@link UserException} objects, complete with context
* information, rather than as generic Java exception as in the prior version.</li>
* <li>Moves parse options into a separate {@link JsonOptions} class.</li>
- * <li>Iteration protocol is simpler: simply call {@link #next()} until it returns
+ * <li>Iteration protocol is simpler: simply call {@link #readBatch()} until it returns
* {@code false}. Errors are reported out-of-band via an exception.</li>
* <li>The result set loader abstraction is perfectly happy with an empty schema.
* For this reason, this version (unlike the original) does not make up a dummy
@@ -128,6 +133,60 @@
public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
protected static final Logger logger = LoggerFactory.getLogger(JsonLoaderImpl.class);
+ public static class JsonLoaderBuilder {
+ private ResultSetLoader rsLoader;
+ private TupleMetadata providedSchema;
+ private JsonLoaderOptions options;
+ private CustomErrorContext errorContext;
+ private InputStream stream;
+ private Reader reader;
+ private MessageParser messageParser;
+
+ public JsonLoaderBuilder resultSetLoader(ResultSetLoader rsLoader) {
+ this.rsLoader = rsLoader;
+ return this;
+ }
+
+ public JsonLoaderBuilder providedSchema(TupleMetadata providedSchema) {
+ this.providedSchema = providedSchema;
+ return this;
+ }
+
+ public JsonLoaderBuilder standardOptions(OptionSet optionSet) {
+ this.options = new JsonLoaderOptions(optionSet);
+ return this;
+ }
+
+ public JsonLoaderBuilder options(JsonLoaderOptions options) {
+ this.options = options;
+ return this;
+ }
+
+ public JsonLoaderBuilder errorContext(CustomErrorContext errorContext) {
+ this.errorContext = errorContext;
+ return this;
+ }
+
+ public JsonLoaderBuilder fromStream(InputStream stream) {
+ this.stream = stream;
+ return this;
+ }
+
+ public JsonLoaderBuilder fromReader(Reader reader) {
+ this.reader = reader;
+ return this;
+ }
+
+ public JsonLoaderBuilder messageParser(MessageParser messageParser) {
+ this.messageParser = messageParser;
+ return this;
+ }
+
+ public JsonLoader build() {
+ return new JsonLoaderImpl(this);
+ }
+ }
+
interface NullTypeMarker {
void forceResolution();
}
@@ -146,30 +205,33 @@
* inference, and not JSON tokens have been seen which would hint
* at a type. Not needed when a schema is provided.
*/
-
// Using a simple list. Won't perform well if we have hundreds of
// null fields; but then we've never seen such a pathologically bad
- // case... Usually just one or two fields have deferred nulls.
+ // case. Usually just one or two fields have deferred nulls.
private final List<NullTypeMarker> nullStates = new ArrayList<>();
- public JsonLoaderImpl(ResultSetLoader rsLoader, TupleMetadata providedSchema,
- JsonLoaderOptions options, CustomErrorContext errorContext,
- InputStream stream) {
- this.rsLoader = rsLoader;
- this.options = options;
- this.errorContext = errorContext;
- this.rowListener = new TupleListener(this, rsLoader.writer(), providedSchema);
- this.parser = new JsonStructureParser(stream, options, rowListener, this);
+ private JsonLoaderImpl(JsonLoaderBuilder builder) {
+ this.rsLoader = builder.rsLoader;
+ this.options = builder.options;
+ this.errorContext = builder. errorContext;
+ this.rowListener = new TupleListener(this, rsLoader.writer(), builder.providedSchema);
+ this.parser = new JsonStructureParserBuilder()
+ .fromStream(builder.stream)
+ .fromReader(builder.reader)
+ .options(builder.options)
+ .rootListener(rowListener)
+ .errorFactory(this)
+ .messageParser(builder.messageParser)
+ .build();
}
public JsonLoaderOptions options() { return options; }
@Override // JsonLoader
- public boolean next() {
+ public boolean readBatch() {
if (eof) {
return false;
}
- rsLoader.startBatch();
RowSetLoader rowWriter = rsLoader.writer();
while (rowWriter.start()) {
if (parser.next()) {
@@ -179,6 +241,7 @@
break;
}
}
+ endBatch();
return rsLoader.hasRows();
}
@@ -209,8 +272,7 @@
* Bottom line: the user is responsible for not giving Drill
* ambiguous data that would require Drill to predict the future.
*/
- @Override // JsonLoader
- public void endBatch() {
+ private void endBatch() {
// Make a copy. Forcing resolution will remove the
// element from the original list.
@@ -237,8 +299,7 @@
@Override // ErrorFactory
public RuntimeException ioException(IOException e) {
throw buildError(
- UserException.dataReadError(e)
- .addContext(errorContext));
+ UserException.dataReadError(e));
}
@Override // ErrorFactory
@@ -325,6 +386,16 @@
.addContext("Array nesting", dims));
}
+ @Override
+ public RuntimeException messageParseError(MessageContextException e) {
+ return buildError(
+ UserException.validationError(e)
+ .message("Syntax error when parsing a JSON message")
+ .addContext(e.getMessage())
+ .addContext("Looking for field", e.nextElement)
+ .addContext("On token", e.token.name()));
+ }
+
protected UserException buildError(ColumnMetadata schema, UserException.Builder builder) {
return buildError(builder
.addContext("Column", schema.name())
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderOptions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderOptions.java
index 59a9faa..d982e11 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderOptions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderOptions.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.store.easy.json.loader;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.store.easy.json.parser.JsonStructureOptions;
/**
@@ -41,4 +43,11 @@
* </ul>
*/
public boolean classicArrayNulls;
+
+ public JsonLoaderOptions() { }
+
+ public JsonLoaderOptions(OptionSet options) {
+ super(options);
+ this.readNumbersAsDouble = options.getBoolean(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ErrorFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ErrorFactory.java
index b58763f..e3a208b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ErrorFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ErrorFactory.java
@@ -69,8 +69,14 @@
RuntimeException syntaxError(JsonToken token);
/**
- * Error recover is on, the structure parser tried to recover, but
+ * Error recovery is on, the structure parser tried to recover, but
* encountered too many other errors and gave up.
*/
RuntimeException unrecoverableError();
+
+ /**
+ * Parser is configured to find a message tag within the JSON
+ * and a syntax occurred when following the data path.
+ */
+ RuntimeException messageParseError(MessageParser.MessageContextException e);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
index 78d2e67..6e072d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
@@ -17,6 +17,9 @@
*/
package org.apache.drill.exec.store.easy.json.parser;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.OptionSet;
+
/**
* Input to the JSON structure parser which defines guidelines
* for low-level parsing as well as listeners for higher-level
@@ -51,4 +54,15 @@
* two or three valid records before it stabilizes.
*/
public boolean skipMalformedRecords;
+
+ public boolean enableEscapeAnyChar;
+
+ public JsonStructureOptions() { }
+
+ public JsonStructureOptions(OptionSet options) {
+ this.allTextMode = options.getBoolean(ExecConstants.JSON_ALL_TEXT_MODE);
+ this.allowNanInf = options.getBoolean(ExecConstants.JSON_READER_NAN_INF_NUMBERS);
+ this.skipMalformedRecords = options.getBoolean(ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG);
+ this.enableEscapeAnyChar = options.getBoolean(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
index 14016a8..2b814ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
@@ -19,7 +19,10 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.Reader;
+import org.apache.drill.exec.store.easy.json.parser.MessageParser.MessageContextException;
+import org.apache.drill.exec.store.easy.json.parser.RootParser.NestedRootArrayParser;
import org.apache.drill.exec.store.easy.json.parser.RootParser.RootArrayParser;
import org.apache.drill.exec.store.easy.json.parser.RootParser.RootObjectParser;
import org.apache.drill.exec.store.easy.json.parser.TokenIterator.RecoverableJsonException;
@@ -59,6 +62,62 @@
public class JsonStructureParser {
protected static final Logger logger = LoggerFactory.getLogger(JsonStructureParser.class);
+ public static class JsonStructureParserBuilder {
+ private InputStream stream;
+ private Reader reader;
+ private JsonStructureOptions options;
+ private ObjectListener rootListener;
+ private ErrorFactory errorFactory;
+ private String dataPath;
+ private MessageParser messageParser;
+
+ public JsonStructureParserBuilder options(JsonStructureOptions options) {
+ this.options = options;
+ return this;
+ }
+
+ public JsonStructureParserBuilder rootListener(ObjectListener rootListener) {
+ this.rootListener = rootListener;
+ return this;
+ }
+
+ public JsonStructureParserBuilder errorFactory(ErrorFactory errorFactory) {
+ this.errorFactory = errorFactory;
+ return this;
+ }
+
+ public JsonStructureParserBuilder fromStream(InputStream stream) {
+ this.stream = stream;
+ return this;
+ }
+
+ public JsonStructureParserBuilder fromReader(Reader reader) {
+ this.reader = reader;
+ return this;
+ }
+
+ public JsonStructureParserBuilder messageParser(MessageParser messageParser) {
+ this.messageParser = messageParser;
+ return this;
+ }
+
+ public JsonStructureParserBuilder dataPath(String dataPath) {
+ this.dataPath = dataPath;
+ return this;
+ }
+
+ public JsonStructureParser build() {
+ if (dataPath != null) {
+ dataPath = dataPath.trim();
+ dataPath = dataPath.isEmpty() ? null : dataPath;
+ }
+ if (dataPath != null && messageParser == null) {
+ messageParser = new SimpleMessageParser(dataPath);
+ }
+ return new JsonStructureParser(this);
+ }
+ }
+
private final JsonParser parser;
private final JsonStructureOptions options;
private final ObjectListener rootListener;
@@ -78,31 +137,50 @@
* @param errorFactory factory for errors thrown for various
* conditions
*/
- public JsonStructureParser(InputStream stream, JsonStructureOptions options,
- ObjectListener rootListener, ErrorFactory errorFactory) {
- this.options = Preconditions.checkNotNull(options);
- this.rootListener = Preconditions.checkNotNull(rootListener);
- this.errorFactory = Preconditions.checkNotNull(errorFactory);
+ private JsonStructureParser(JsonStructureParserBuilder builder) {
+ this.options = Preconditions.checkNotNull(builder.options);
+ this.rootListener = Preconditions.checkNotNull(builder.rootListener);
+ this.errorFactory = Preconditions.checkNotNull(builder.errorFactory);
try {
ObjectMapper mapper = new ObjectMapper()
.configure(JsonParser.Feature.ALLOW_COMMENTS, true)
.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
- .configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, options.allowNanInf);
+ .configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, options.allowNanInf)
+ .configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, options.enableEscapeAnyChar);
- parser = mapper.getFactory().createParser(stream);
+ if (builder.stream != null) {
+ parser = mapper.getFactory().createParser(builder.stream);
+ } else {
+ parser = mapper.getFactory().createParser(Preconditions.checkNotNull(builder.reader));
+ }
} catch (JsonParseException e) {
throw errorFactory().parseError("Failed to create the JSON parser", e);
} catch (IOException e) {
throw errorFactory().ioException(e);
}
tokenizer = new TokenIterator(parser, options, errorFactory());
- rootState = makeRootState();
+ if (builder.messageParser == null) {
+ rootState = makeRootState();
+ } else {
+ rootState = makeCustomRoot(builder.messageParser);
+ }
}
public JsonStructureOptions options() { return options; }
public ErrorFactory errorFactory() { return errorFactory; }
public ObjectListener rootListener() { return rootListener; }
+ private RootParser makeCustomRoot(MessageParser messageParser) {
+ try {
+ if (! messageParser.parsePrefix(tokenizer)) {
+ return null;
+ }
+ } catch (MessageContextException e) {
+ throw errorFactory.messageParseError(e);
+ }
+ return new NestedRootArrayParser(this, messageParser);
+ }
+
private RootParser makeRootState() {
JsonToken token = tokenizer.next();
if (token == null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/MessageParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/MessageParser.java
new file mode 100644
index 0000000..5b02f97
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/MessageParser.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.parser;
+
+import com.fasterxml.jackson.core.JsonToken;
+
+/**
+ * Optional custom parser for the portion of a JSON message that
+ * surrounds the data "payload". Can be used to extract status codes.
+ * See {@link SimpleMessageParser} to simply skip all fields but
+ * a given path.
+ */
+public interface MessageParser {
+
+ @SuppressWarnings("serial")
+ class MessageContextException extends Exception {
+ public final JsonToken token;
+ public final String nextElement;
+
+ public MessageContextException(JsonToken token, String nextElement, String descrip) {
+ super(descrip);
+ this.token = token;
+ this.nextElement = nextElement;
+ }
+ }
+ boolean parsePrefix(TokenIterator tokenizer) throws MessageContextException;
+ void parseSuffix(TokenIterator tokenizer) throws MessageContextException;
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java
index f81fd3a..32460a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.easy.json.parser;
+import org.apache.drill.exec.store.easy.json.parser.MessageParser.MessageContextException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,4 +119,37 @@
}
}
}
+
+ public static class NestedRootArrayParser extends RootParser {
+
+ private final MessageParser messageParser;
+
+ public NestedRootArrayParser(JsonStructureParser structParser, MessageParser messageParser) {
+ super(structParser);
+ this.messageParser = messageParser;
+ }
+
+ @Override
+ public boolean parseRoot(TokenIterator tokenizer) {
+ JsonToken token = tokenizer.next();
+ if (token == null) {
+ // Position: { ... EOF ^
+ // Saw EOF, but no closing ]. Warn and ignore.
+ // Note that the Jackson parser won't let us get here;
+ // it will have already thrown a syntax error.
+ logger.warn("Failed to close outer array. {}",
+ tokenizer.context());
+ return false;
+ } else if (token == JsonToken.END_ARRAY) {
+ try {
+ messageParser.parseSuffix(tokenizer);
+ } catch (MessageContextException e) {
+ throw errorFactory().messageParseError(e);
+ }
+ return false;
+ } else {
+ return parseRootObject(token, tokenizer);
+ }
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
new file mode 100644
index 0000000..956b910
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.parser;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import com.fasterxml.jackson.core.JsonToken;
+
+/**
+ * A message parser which accepts a path to the data encoded as a
+ * slash-separated string. Given the following JSON message:
+ *
+ * <pre><code:
+ * { status: {
+ * succeeded: true,
+ * runTimeMs: 123,
+ * }
+ * response: {
+ * rowCount: 10,
+ * rows: [
+ * { ... },
+ * { ... } ]
+ * },
+ * footer: "something interesting"
+ * }
+ * </code></pre>
+ *
+ * The path to the actual data would be {@code "response/rows"}.
+ * <p>
+ * The message parser will "free-wheel" over all objects not on the
+ * data path. Thus, this class will skip over the nested structure
+ * within the {@code status} member.
+ * <p>
+ * If the data path is not found then this class reports EOF of
+ * the whole data stream. It may have skipped over the actual payload
+ * if the path is mis-configured.
+ */
+public class SimpleMessageParser implements MessageParser {
+
+ private final String[] path;
+
+ public SimpleMessageParser(String dataPath) {
+ path = dataPath.split("/");
+ Preconditions.checkArgument(path.length > 0,
+ "Data path should not be empty.");
+ }
+
+ @Override
+ public boolean parsePrefix(TokenIterator tokenizer) throws MessageContextException {
+ JsonToken token = tokenizer.next();
+ if (token == null) {
+ return false;
+ }
+ if (token != JsonToken.START_OBJECT) {
+ throw new MessageContextException(token,
+ path[0], "Unexpected top-level array");
+ }
+ return parseToElement(tokenizer, 0);
+ }
+
+ private boolean parseToElement(TokenIterator tokenizer, int level) throws MessageContextException {
+ for (;;) {
+ JsonToken token = tokenizer.requireNext();
+ switch (token) {
+ case FIELD_NAME:
+ break;
+ case END_OBJECT:
+ return false;
+ default:
+ throw new MessageContextException(token,
+ path[0], "Unexpected token");
+ }
+
+ String fieldName = tokenizer.textValue();
+ if (fieldName.equals(path[level])) {
+ return parseInnerLevel(tokenizer, level);
+ } else {
+ skipElement(tokenizer);
+ }
+ }
+ }
+
+ private boolean parseInnerLevel(TokenIterator tokenizer, int level) throws MessageContextException {
+ JsonToken token = tokenizer.requireNext();
+ if (level == path.length - 1) {
+ switch (token) {
+ case VALUE_NULL:
+ return false;
+ case START_ARRAY:
+ return true;
+ default:
+ throw new MessageContextException(token,
+ path[level], "Expected JSON array for final path element");
+ }
+ }
+ if (token != JsonToken.START_OBJECT) {
+ throw new MessageParser.MessageContextException(token,
+ path[level], "Expected JSON object");
+ }
+ return parseToElement(tokenizer, level + 1);
+ }
+
+ private void skipElement(TokenIterator tokenizer) {
+ int level = 0;
+ do {
+ JsonToken token = tokenizer.requireNext();
+ switch (token) {
+ case START_OBJECT:
+ case START_ARRAY:
+ level++;
+ break;
+ case END_OBJECT:
+ case END_ARRAY:
+ level--;
+ break;
+ default:
+ break;
+ }
+ } while (level > 0);
+ }
+
+ @Override
+ public void parseSuffix(TokenIterator tokenizer) {
+ // No need to parse the unwanted tail elements.
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/BaseJsonLoaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/BaseJsonLoaderTest.java
index d45b197..17cdc06 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/BaseJsonLoaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/BaseJsonLoaderTest.java
@@ -28,6 +28,7 @@
import org.apache.drill.exec.physical.resultSet.impl.ResultSetOptionBuilder;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
import org.apache.drill.test.SubOperatorTest;
public class BaseJsonLoaderTest extends SubOperatorTest {
@@ -43,7 +44,13 @@
public void open(InputStream is) {
rsLoader = new ResultSetLoaderImpl(fixture.allocator(), rsLoaderOptions.build());
- loader = new JsonLoaderImpl(rsLoader, providedSchema, jsonOptions, errorContext, is);
+ loader = new JsonLoaderBuilder()
+ .resultSetLoader(rsLoader)
+ .providedSchema(providedSchema)
+ .options(jsonOptions)
+ .errorContext(errorContext)
+ .fromStream(is)
+ .build();
}
public void open(String json) {
@@ -52,10 +59,10 @@
}
public RowSet next() {
- if (!loader.next()) {
+ rsLoader.startBatch();
+ if (!loader.readBatch()) {
return null;
}
- loader.endBatch();
return fixture.wrap(rsLoader.harvest());
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java
index c77807d..8e75109 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java
@@ -32,6 +32,7 @@
import java.util.Set;
import org.apache.commons.io.input.ReaderInputStream;
+import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser.JsonStructureParserBuilder;
import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
import com.fasterxml.jackson.core.JsonParseException;
@@ -63,37 +64,42 @@
@Override
public RuntimeException parseError(String msg, JsonParseException e) {
- throw new JsonErrorFixture("parseError", msg, e);
+ return new JsonErrorFixture("parseError", msg, e);
}
@Override
public RuntimeException ioException(IOException e) {
- throw new JsonErrorFixture("ioException", "", e);
+ return new JsonErrorFixture("ioException", "", e);
}
@Override
public RuntimeException structureError(String msg) {
- throw new JsonErrorFixture("structureError", msg);
+ return new JsonErrorFixture("structureError", msg);
}
@Override
public RuntimeException syntaxError(JsonParseException e) {
- throw new JsonErrorFixture("syntaxError", "", e);
+ return new JsonErrorFixture("syntaxError", "", e);
}
@Override
public RuntimeException typeError(UnsupportedConversionError e) {
- throw new JsonErrorFixture("typeError", "", e);
+ return new JsonErrorFixture("typeError", "", e);
}
@Override
public RuntimeException syntaxError(JsonToken token) {
- throw new JsonErrorFixture("syntaxError", token.toString());
+ return new JsonErrorFixture("syntaxError", token.toString());
}
@Override
public RuntimeException unrecoverableError() {
- throw new JsonErrorFixture("unrecoverableError", "");
+ return new JsonErrorFixture("unrecoverableError", "");
+ }
+
+ @Override
+ public RuntimeException messageParseError(MessageParser.MessageContextException e) {
+ return new JsonErrorFixture("messageParseError", "Message parse error", e);
}
}
@@ -252,16 +258,25 @@
}
protected static class JsonParserFixture {
+ JsonStructureParserBuilder builder;
JsonStructureOptions options = new JsonStructureOptions();
JsonStructureParser parser;
ObjectListenerFixture rootObject = new ObjectListenerFixture();
ErrorFactory errorFactory = new ErrorFactoryFixture();
+ public JsonParserFixture() {
+ builder = new JsonStructureParserBuilder();
+ }
+
public void open(String json) {
InputStream inStream = new
ReaderInputStream(new StringReader(json));
- parser = new JsonStructureParser(inStream, options, rootObject,
- errorFactory);
+ builder
+ .fromStream(inStream)
+ .options(options)
+ .rootListener(rootObject)
+ .errorFactory(errorFactory);
+ parser = builder.build();
}
public boolean next() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserMessage.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserMessage.java
new file mode 100644
index 0000000..4905d1f
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserMessage.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.parser;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.drill.exec.store.easy.json.parser.ValueDef.JsonType;
+import org.junit.Test;
+
+import com.fasterxml.jackson.core.JsonToken;
+
+public class TestJsonParserMessage extends BaseTestJsonParser {
+
+ /**
+ * Example message parser. A real parser would provide much better
+ * error messages for badly-formed JSON or error codes.
+ */
+ private static class MessageParserFixture implements MessageParser {
+
+ @Override
+ public boolean parsePrefix(TokenIterator tokenizer) {
+ assertEquals(JsonToken.START_OBJECT, tokenizer.requireNext());
+ assertEquals(JsonToken.FIELD_NAME, tokenizer.requireNext());
+ assertEquals(JsonToken.VALUE_STRING, tokenizer.requireNext());
+ if (!"ok".equals(tokenizer.stringValue())) {
+ return false;
+ }
+ assertEquals(JsonToken.FIELD_NAME, tokenizer.requireNext());
+ assertEquals(JsonToken.START_ARRAY, tokenizer.requireNext());
+ return true;
+ }
+
+ @Override
+ public void parseSuffix(TokenIterator tokenizer) {
+ assertEquals(JsonToken.END_OBJECT, tokenizer.requireNext());
+ }
+ }
+
+ /**
+ * Test the ability to wrap the data objects with a custom message
+ * structure, typical of a REST call.
+ */
+ @Test
+ public void testMessageParser() {
+ final String json =
+ "{ status: \"ok\", data: [{a: 0}, {a: 100}, {a: null}]}";
+ JsonParserFixture fixture = new JsonParserFixture();
+ fixture.builder.messageParser(new MessageParserFixture());
+ fixture.open(json);
+ assertTrue(fixture.next());
+ ValueListenerFixture a = fixture.field("a");
+ assertEquals(JsonType.INTEGER, a.valueDef.type());
+ assertEquals(2, fixture.read());
+ assertEquals(1, a.nullCount);
+ assertEquals(100L, a.value);
+ fixture.close();
+ }
+
+ /**
+ * Test the ability to cancel the data load if a message header
+ * indicates that there is no data.
+ */
+ @Test
+ public void testMessageParserEOF() {
+ final String json =
+ "{ status: \"fail\", data: [{a: 0}, {a: 100}, {a: null}]}";
+ JsonParserFixture fixture = new JsonParserFixture();
+ fixture.builder.messageParser(new MessageParserFixture());
+ fixture.open(json);
+ assertFalse(fixture.next());
+ fixture.close();
+ }
+
+ @Test
+ public void testDataPath() {
+ final String json =
+ "{ status: \"ok\", data: [{a: 0}, {a: 100}, {a: null}]}";
+ JsonParserFixture fixture = new JsonParserFixture();
+ fixture.builder.dataPath("data");
+ fixture.open(json);
+ assertTrue(fixture.next());
+ ValueListenerFixture a = fixture.field("a");
+ assertEquals(JsonType.INTEGER, a.valueDef.type());
+ assertEquals(2, fixture.read());
+ assertEquals(1, a.nullCount);
+ assertEquals(100L, a.value);
+ fixture.close();
+ }
+
+ @Test
+ public void testComplexDataPath() {
+ final String json =
+ "{ status: {result : \"ok\", runtime: 123},\n" +
+ " response: { rowCount: 1,\n" +
+ " data: [{a: 0}, {a: 100}, {a: null}]},\n" +
+ " footer: \"some stuff\"}";
+ JsonParserFixture fixture = new JsonParserFixture();
+ fixture.builder.dataPath("response/data");
+ fixture.open(json);
+ assertTrue(fixture.next());
+ ValueListenerFixture a = fixture.field("a");
+ assertEquals(JsonType.INTEGER, a.valueDef.type());
+ assertEquals(2, fixture.read());
+ assertEquals(1, a.nullCount);
+ assertEquals(100L, a.value);
+ fixture.close();
+ }
+
+ @Test
+ public void testDataPathNull() {
+ final String json =
+ "{ status: \"fail\", data: null}";
+ JsonParserFixture fixture = new JsonParserFixture();
+ fixture.builder.messageParser(new MessageParserFixture());
+ fixture.open(json);
+ assertFalse(fixture.next());
+ fixture.close();
+ }
+
+ @Test
+ public void testDataPathMissing() {
+ final String json =
+ "{ status: \"fail\"}";
+ JsonParserFixture fixture = new JsonParserFixture();
+ fixture.builder.messageParser(new MessageParserFixture());
+ fixture.open(json);
+ assertFalse(fixture.next());
+ fixture.close();
+ }
+
+ @Test
+ public void testDataPathErrorRoot() {
+ final String json = "\"Bogus!\"";
+ JsonParserFixture fixture = new JsonParserFixture();
+ fixture.builder.dataPath("data");
+ try {
+ fixture.open(json);
+ fail();
+ } catch (JsonErrorFixture e) {
+ assertTrue(e.errorType.equals("messageParseError"));
+ assertTrue(e.getCause() instanceof MessageParser.MessageContextException);
+ }
+ fixture.close();
+ }
+
+ @Test
+ public void testDataPathErrorLeaf() {
+ final String json =
+ "{ status: \"bogus\", data: { notValid: \"must be array\"}}";
+ JsonParserFixture fixture = new JsonParserFixture();
+ fixture.builder.dataPath("data");
+ try {
+ fixture.open(json);
+ fail();
+ } catch (JsonErrorFixture e) {
+ assertTrue(e.errorType.equals("messageParseError"));
+ assertTrue(e.getCause() instanceof MessageParser.MessageContextException);
+ }
+ fixture.close();
+ }
+}