DRILL-7683: Add "message parsing" to new JSON loader

Adds the ability to parse "extra" JSON around the data payload,
as often needed for a REST API.

closes #2045
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
index 7c9a753..b2a793d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
@@ -22,6 +22,9 @@
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.server.options.OptionSet;
+
+import com.typesafe.config.Config;
 
 /**
  * Negotiates the table schema with the scanner framework and provides
@@ -100,6 +103,8 @@
 public interface SchemaNegotiator {
 
   OperatorContext context();
+  Config drillConfig();
+  OptionSet queryOptions();
 
   /**
    * Specify an advanced error context which allows the reader to
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
index 8763dd4..64bac43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
@@ -21,8 +21,11 @@
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.vector.ValueVector;
 
+import com.typesafe.config.Config;
+
 /**
  * Implementation of the schema negotiation between scan operator and
  * batch reader. Anticipates that the select list (and/or the list of
@@ -94,6 +97,16 @@
   }
 
   @Override
+  public Config drillConfig() {
+    return context().getFragmentContext().getConfig();
+  }
+
+  @Override
+  public OptionSet queryOptions() {
+    return context().getFragmentContext().getOptions();
+  }
+
+  @Override
   public CustomErrorContext parentErrorContext() {
     return framework.errorContext();
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoader.java
index 9d9afed..e4c5a0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoader.java
@@ -56,13 +56,7 @@
    * @throws RuntimeException for unexpected errors, most often due
    * to code errors
    */
-  boolean next();
-
-  /**
-   * Indicates that a batch is complete. Tells the loader to materialize
-   * any deferred null fields. (See {@link TupleListener} for details.)
-   */
-  void endBatch();
+  boolean readBatch();
 
   /**
    * Releases resources held by this class including the input stream.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index 9fd64bd..ecfaf4b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Reader;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -28,8 +29,12 @@
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.easy.json.parser.ErrorFactory;
 import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser;
+import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser.JsonStructureParserBuilder;
+import org.apache.drill.exec.store.easy.json.parser.MessageParser;
+import org.apache.drill.exec.store.easy.json.parser.MessageParser.MessageContextException;
 import org.apache.drill.exec.store.easy.json.parser.ValueDef;
 import org.apache.drill.exec.store.easy.json.parser.ValueDef.JsonType;
 import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
@@ -97,7 +102,7 @@
  * <li>Reports errors as {@link UserException} objects, complete with context
  * information, rather than as generic Java exception as in the prior version.</li>
  * <li>Moves parse options into a separate {@link JsonOptions} class.</li>
- * <li>Iteration protocol is simpler: simply call {@link #next()} until it returns
+ * <li>Iteration protocol is simpler: simply call {@link #readBatch()} until it returns
  * {@code false}. Errors are reported out-of-band via an exception.</li>
  * <li>The result set loader abstraction is perfectly happy with an empty schema.
  * For this reason, this version (unlike the original) does not make up a dummy
@@ -128,6 +133,60 @@
 public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
   protected static final Logger logger = LoggerFactory.getLogger(JsonLoaderImpl.class);
 
+  public static class JsonLoaderBuilder {
+    private ResultSetLoader rsLoader;
+    private TupleMetadata providedSchema;
+    private JsonLoaderOptions options;
+    private CustomErrorContext errorContext;
+    private InputStream stream;
+    private Reader reader;
+    private MessageParser messageParser;
+
+    public JsonLoaderBuilder resultSetLoader(ResultSetLoader rsLoader) {
+      this.rsLoader = rsLoader;
+      return this;
+    }
+
+    public JsonLoaderBuilder providedSchema(TupleMetadata providedSchema) {
+      this.providedSchema = providedSchema;
+      return this;
+    }
+
+    public JsonLoaderBuilder standardOptions(OptionSet optionSet) {
+      this.options = new JsonLoaderOptions(optionSet);
+      return this;
+    }
+
+    public JsonLoaderBuilder options(JsonLoaderOptions options) {
+      this.options = options;
+      return this;
+    }
+
+    public JsonLoaderBuilder errorContext(CustomErrorContext errorContext) {
+      this.errorContext = errorContext;
+      return this;
+    }
+
+    public JsonLoaderBuilder fromStream(InputStream stream) {
+      this.stream = stream;
+      return this;
+    }
+
+    public JsonLoaderBuilder fromReader(Reader reader) {
+      this.reader = reader;
+      return this;
+    }
+
+    public JsonLoaderBuilder messageParser(MessageParser messageParser) {
+      this.messageParser = messageParser;
+      return this;
+    }
+
+    public JsonLoader build() {
+      return new JsonLoaderImpl(this);
+    }
+  }
+
   interface NullTypeMarker {
     void forceResolution();
   }
@@ -146,30 +205,33 @@
    * inference, and not JSON tokens have been seen which would hint
    * at a type. Not needed when a schema is provided.
    */
-
   // Using a simple list. Won't perform well if we have hundreds of
   // null fields; but then we've never seen such a pathologically bad
-  // case... Usually just one or two fields have deferred nulls.
+  // case. Usually just one or two fields have deferred nulls.
   private final List<NullTypeMarker> nullStates = new ArrayList<>();
 
-  public JsonLoaderImpl(ResultSetLoader rsLoader, TupleMetadata providedSchema,
-      JsonLoaderOptions options, CustomErrorContext errorContext,
-      InputStream stream) {
-    this.rsLoader = rsLoader;
-    this.options = options;
-    this.errorContext = errorContext;
-    this.rowListener = new TupleListener(this, rsLoader.writer(), providedSchema);
-    this.parser = new JsonStructureParser(stream, options, rowListener, this);
+  private JsonLoaderImpl(JsonLoaderBuilder builder) {
+    this.rsLoader = builder.rsLoader;
+    this.options = builder.options;
+    this.errorContext = builder. errorContext;
+    this.rowListener = new TupleListener(this, rsLoader.writer(), builder.providedSchema);
+    this.parser = new JsonStructureParserBuilder()
+            .fromStream(builder.stream)
+            .fromReader(builder.reader)
+            .options(builder.options)
+            .rootListener(rowListener)
+            .errorFactory(this)
+            .messageParser(builder.messageParser)
+            .build();
   }
 
   public JsonLoaderOptions options() { return options; }
 
   @Override // JsonLoader
-  public boolean next() {
+  public boolean readBatch() {
     if (eof) {
       return false;
     }
-    rsLoader.startBatch();
     RowSetLoader rowWriter = rsLoader.writer();
     while (rowWriter.start()) {
       if (parser.next()) {
@@ -179,6 +241,7 @@
         break;
       }
     }
+    endBatch();
     return rsLoader.hasRows();
   }
 
@@ -209,8 +272,7 @@
    * Bottom line: the user is responsible for not giving Drill
    * ambiguous data that would require Drill to predict the future.
    */
-  @Override // JsonLoader
-  public void endBatch() {
+  private void endBatch() {
 
     // Make a copy. Forcing resolution will remove the
     // element from the original list.
@@ -237,8 +299,7 @@
   @Override // ErrorFactory
   public RuntimeException ioException(IOException e) {
     throw buildError(
-        UserException.dataReadError(e)
-          .addContext(errorContext));
+        UserException.dataReadError(e));
   }
 
   @Override // ErrorFactory
@@ -325,6 +386,16 @@
           .addContext("Array nesting", dims));
   }
 
+  @Override
+  public RuntimeException messageParseError(MessageContextException e) {
+    return buildError(
+        UserException.validationError(e)
+          .message("Syntax error when parsing a JSON message")
+          .addContext(e.getMessage())
+          .addContext("Looking for field", e.nextElement)
+          .addContext("On token", e.token.name()));
+  }
+
   protected UserException buildError(ColumnMetadata schema, UserException.Builder builder) {
     return buildError(builder
         .addContext("Column", schema.name())
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderOptions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderOptions.java
index 59a9faa..d982e11 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderOptions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderOptions.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.store.easy.json.loader;
 
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.store.easy.json.parser.JsonStructureOptions;
 
 /**
@@ -41,4 +43,11 @@
    * </ul>
    */
   public boolean classicArrayNulls;
+
+  public JsonLoaderOptions() { }
+
+  public JsonLoaderOptions(OptionSet options) {
+    super(options);
+    this.readNumbersAsDouble = options.getBoolean(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ErrorFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ErrorFactory.java
index b58763f..e3a208b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ErrorFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ErrorFactory.java
@@ -69,8 +69,14 @@
   RuntimeException syntaxError(JsonToken token);
 
   /**
-   * Error recover is on, the structure parser tried to recover, but
+   * Error recovery is on, the structure parser tried to recover, but
    * encountered too many other errors and gave up.
    */
   RuntimeException unrecoverableError();
+
+  /**
+   * Parser is configured to find a message tag within the JSON
+   * and a syntax occurred when following the data path.
+   */
+  RuntimeException messageParseError(MessageParser.MessageContextException e);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
index 78d2e67..6e072d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
@@ -17,6 +17,9 @@
  */
 package org.apache.drill.exec.store.easy.json.parser;
 
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.OptionSet;
+
 /**
  * Input to the JSON structure parser which defines guidelines
  * for low-level parsing as well as listeners for higher-level
@@ -51,4 +54,15 @@
    * two or three valid records before it stabilizes.
    */
   public boolean skipMalformedRecords;
+
+  public boolean enableEscapeAnyChar;
+
+  public JsonStructureOptions() { }
+
+  public JsonStructureOptions(OptionSet options) {
+    this.allTextMode = options.getBoolean(ExecConstants.JSON_ALL_TEXT_MODE);
+    this.allowNanInf = options.getBoolean(ExecConstants.JSON_READER_NAN_INF_NUMBERS);
+    this.skipMalformedRecords = options.getBoolean(ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG);
+    this.enableEscapeAnyChar = options.getBoolean(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
index 14016a8..2b814ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
@@ -19,7 +19,10 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Reader;
 
+import org.apache.drill.exec.store.easy.json.parser.MessageParser.MessageContextException;
+import org.apache.drill.exec.store.easy.json.parser.RootParser.NestedRootArrayParser;
 import org.apache.drill.exec.store.easy.json.parser.RootParser.RootArrayParser;
 import org.apache.drill.exec.store.easy.json.parser.RootParser.RootObjectParser;
 import org.apache.drill.exec.store.easy.json.parser.TokenIterator.RecoverableJsonException;
@@ -59,6 +62,62 @@
 public class JsonStructureParser {
   protected static final Logger logger = LoggerFactory.getLogger(JsonStructureParser.class);
 
+  public static class JsonStructureParserBuilder {
+    private InputStream stream;
+    private Reader reader;
+    private JsonStructureOptions options;
+    private ObjectListener rootListener;
+    private ErrorFactory errorFactory;
+    private String dataPath;
+    private MessageParser messageParser;
+
+    public JsonStructureParserBuilder options(JsonStructureOptions options) {
+      this.options = options;
+      return this;
+    }
+
+    public JsonStructureParserBuilder rootListener(ObjectListener rootListener) {
+      this.rootListener = rootListener;
+      return this;
+    }
+
+    public JsonStructureParserBuilder errorFactory(ErrorFactory errorFactory) {
+      this.errorFactory = errorFactory;
+      return this;
+    }
+
+    public JsonStructureParserBuilder fromStream(InputStream stream) {
+      this.stream = stream;
+      return this;
+    }
+
+    public JsonStructureParserBuilder fromReader(Reader reader) {
+      this.reader = reader;
+      return this;
+    }
+
+    public JsonStructureParserBuilder messageParser(MessageParser messageParser) {
+      this.messageParser = messageParser;
+      return this;
+    }
+
+    public JsonStructureParserBuilder dataPath(String dataPath) {
+      this.dataPath = dataPath;
+      return this;
+    }
+
+    public JsonStructureParser build() {
+      if (dataPath != null) {
+        dataPath = dataPath.trim();
+        dataPath = dataPath.isEmpty() ? null : dataPath;
+      }
+      if (dataPath != null && messageParser == null) {
+        messageParser = new SimpleMessageParser(dataPath);
+      }
+      return new JsonStructureParser(this);
+    }
+  }
+
   private final JsonParser parser;
   private final JsonStructureOptions options;
   private final ObjectListener rootListener;
@@ -78,31 +137,50 @@
    * @param errorFactory factory for errors thrown for various
    * conditions
    */
-  public JsonStructureParser(InputStream stream, JsonStructureOptions options,
-      ObjectListener rootListener, ErrorFactory errorFactory) {
-    this.options = Preconditions.checkNotNull(options);
-    this.rootListener = Preconditions.checkNotNull(rootListener);
-    this.errorFactory = Preconditions.checkNotNull(errorFactory);
+  private JsonStructureParser(JsonStructureParserBuilder builder) {
+    this.options = Preconditions.checkNotNull(builder.options);
+    this.rootListener = Preconditions.checkNotNull(builder.rootListener);
+    this.errorFactory = Preconditions.checkNotNull(builder.errorFactory);
     try {
       ObjectMapper mapper = new ObjectMapper()
           .configure(JsonParser.Feature.ALLOW_COMMENTS, true)
           .configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
-          .configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, options.allowNanInf);
+          .configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, options.allowNanInf)
+          .configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, options.enableEscapeAnyChar);
 
-      parser = mapper.getFactory().createParser(stream);
+      if (builder.stream != null) {
+        parser = mapper.getFactory().createParser(builder.stream);
+      } else {
+        parser = mapper.getFactory().createParser(Preconditions.checkNotNull(builder.reader));
+      }
     } catch (JsonParseException e) {
       throw errorFactory().parseError("Failed to create the JSON parser", e);
     } catch (IOException e) {
       throw errorFactory().ioException(e);
     }
     tokenizer = new TokenIterator(parser, options, errorFactory());
-    rootState = makeRootState();
+    if (builder.messageParser == null) {
+      rootState = makeRootState();
+    } else {
+      rootState = makeCustomRoot(builder.messageParser);
+    }
   }
 
   public JsonStructureOptions options() { return options; }
   public ErrorFactory errorFactory() { return errorFactory; }
   public ObjectListener rootListener() { return rootListener; }
 
+  private RootParser makeCustomRoot(MessageParser messageParser) {
+    try {
+      if (! messageParser.parsePrefix(tokenizer)) {
+        return null;
+      }
+    } catch (MessageContextException e) {
+      throw errorFactory.messageParseError(e);
+    }
+    return new NestedRootArrayParser(this, messageParser);
+  }
+
   private RootParser makeRootState() {
     JsonToken token = tokenizer.next();
     if (token == null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/MessageParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/MessageParser.java
new file mode 100644
index 0000000..5b02f97
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/MessageParser.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.parser;
+
+import com.fasterxml.jackson.core.JsonToken;
+
+/**
+ * Optional custom parser for the portion of a JSON message that
+ * surrounds the data "payload". Can be used to extract status codes.
+ * See {@link SimpleMessageParser} to simply skip all fields but
+ * a given path.
+ */
+public interface MessageParser {
+
+  @SuppressWarnings("serial")
+  class MessageContextException extends Exception {
+    public final JsonToken token;
+    public final String nextElement;
+
+    public MessageContextException(JsonToken token, String nextElement, String descrip) {
+      super(descrip);
+      this.token = token;
+      this.nextElement = nextElement;
+    }
+  }
+  boolean parsePrefix(TokenIterator tokenizer) throws MessageContextException;
+  void parseSuffix(TokenIterator tokenizer) throws MessageContextException;
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java
index f81fd3a..32460a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/RootParser.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.easy.json.parser;
 
+import org.apache.drill.exec.store.easy.json.parser.MessageParser.MessageContextException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -118,4 +119,37 @@
       }
     }
   }
+
+  public static class NestedRootArrayParser extends RootParser {
+
+    private final MessageParser messageParser;
+
+    public NestedRootArrayParser(JsonStructureParser structParser, MessageParser messageParser) {
+      super(structParser);
+      this.messageParser = messageParser;
+    }
+
+    @Override
+    public boolean parseRoot(TokenIterator tokenizer) {
+      JsonToken token = tokenizer.next();
+      if (token == null) {
+        // Position: { ... EOF ^
+        // Saw EOF, but no closing ]. Warn and ignore.
+        // Note that the Jackson parser won't let us get here;
+        // it will have already thrown a syntax error.
+        logger.warn("Failed to close outer array. {}",
+            tokenizer.context());
+        return false;
+      } else if (token == JsonToken.END_ARRAY) {
+        try {
+          messageParser.parseSuffix(tokenizer);
+        } catch (MessageContextException e) {
+          throw errorFactory().messageParseError(e);
+        }
+        return false;
+      } else {
+        return parseRootObject(token, tokenizer);
+      }
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
new file mode 100644
index 0000000..956b910
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.parser;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import com.fasterxml.jackson.core.JsonToken;
+
+/**
+ * A message parser which accepts a path to the data encoded as a
+ * slash-separated string. Given the following JSON message:
+ *
+ * <pre><code:
+ * { status: {
+ *     succeeded: true,
+ *     runTimeMs: 123,
+ *   }
+ *   response: {
+ *     rowCount: 10,
+ *     rows: [
+ *       { ... },
+ *       { ... } ]
+ *     },
+ *   footer: "something interesting"
+ *  }
+ * </code></pre>
+ *
+ * The path to the actual data would be {@code "response/rows"}.
+ * <p>
+ * The message parser will "free-wheel" over all objects not on the
+ * data path. Thus, this class will skip over the nested structure
+ * within the {@code status} member.
+ * <p>
+ * If the data path is not found then this class reports EOF of
+ * the whole data stream. It may have skipped over the actual payload
+ * if the path is mis-configured.
+ */
+public class SimpleMessageParser implements MessageParser {
+
+  private final String[] path;
+
+  public SimpleMessageParser(String dataPath) {
+    path = dataPath.split("/");
+    Preconditions.checkArgument(path.length > 0,
+        "Data path should not be empty.");
+  }
+
+  @Override
+  public boolean parsePrefix(TokenIterator tokenizer) throws MessageContextException {
+    JsonToken token = tokenizer.next();
+    if (token == null) {
+      return false;
+    }
+    if (token != JsonToken.START_OBJECT) {
+      throw new MessageContextException(token,
+          path[0], "Unexpected top-level array");
+    }
+    return parseToElement(tokenizer, 0);
+  }
+
+  private boolean parseToElement(TokenIterator tokenizer, int level) throws MessageContextException {
+    for (;;) {
+      JsonToken token = tokenizer.requireNext();
+      switch (token) {
+        case FIELD_NAME:
+          break;
+        case END_OBJECT:
+          return false;
+        default:
+          throw new MessageContextException(token,
+              path[0], "Unexpected token");
+      }
+
+      String fieldName = tokenizer.textValue();
+      if (fieldName.equals(path[level])) {
+        return parseInnerLevel(tokenizer, level);
+      } else {
+        skipElement(tokenizer);
+      }
+    }
+  }
+
+  private boolean parseInnerLevel(TokenIterator tokenizer, int level) throws MessageContextException {
+    JsonToken token = tokenizer.requireNext();
+    if (level == path.length - 1) {
+      switch (token) {
+      case VALUE_NULL:
+        return false;
+      case START_ARRAY:
+        return true;
+      default:
+        throw new MessageContextException(token,
+            path[level], "Expected JSON array for final path element");
+      }
+    }
+    if (token != JsonToken.START_OBJECT) {
+      throw new MessageParser.MessageContextException(token,
+          path[level], "Expected JSON object");
+    }
+    return parseToElement(tokenizer, level + 1);
+  }
+
+  private void skipElement(TokenIterator tokenizer) {
+    int level = 0;
+    do {
+      JsonToken token = tokenizer.requireNext();
+      switch (token) {
+        case START_OBJECT:
+        case START_ARRAY:
+          level++;
+          break;
+        case END_OBJECT:
+        case END_ARRAY:
+          level--;
+          break;
+        default:
+          break;
+      }
+    } while (level > 0);
+  }
+
+  @Override
+  public void parseSuffix(TokenIterator tokenizer) {
+    // No need to parse the unwanted tail elements.
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/BaseJsonLoaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/BaseJsonLoaderTest.java
index d45b197..17cdc06 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/BaseJsonLoaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/loader/BaseJsonLoaderTest.java
@@ -28,6 +28,7 @@
 import org.apache.drill.exec.physical.resultSet.impl.ResultSetOptionBuilder;
 import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
 import org.apache.drill.test.SubOperatorTest;
 
 public class BaseJsonLoaderTest extends SubOperatorTest {
@@ -43,7 +44,13 @@
 
     public void open(InputStream is) {
       rsLoader = new ResultSetLoaderImpl(fixture.allocator(), rsLoaderOptions.build());
-      loader = new JsonLoaderImpl(rsLoader, providedSchema, jsonOptions, errorContext, is);
+      loader = new JsonLoaderBuilder()
+          .resultSetLoader(rsLoader)
+          .providedSchema(providedSchema)
+          .options(jsonOptions)
+          .errorContext(errorContext)
+          .fromStream(is)
+          .build();
     }
 
     public void open(String json) {
@@ -52,10 +59,10 @@
     }
 
     public RowSet next() {
-      if (!loader.next()) {
+      rsLoader.startBatch();
+      if (!loader.readBatch()) {
         return null;
       }
-      loader.endBatch();
       return fixture.wrap(rsLoader.harvest());
     }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java
index c77807d..8e75109 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/BaseTestJsonParser.java
@@ -32,6 +32,7 @@
 import java.util.Set;
 
 import org.apache.commons.io.input.ReaderInputStream;
+import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser.JsonStructureParserBuilder;
 import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
 
 import com.fasterxml.jackson.core.JsonParseException;
@@ -63,37 +64,42 @@
 
     @Override
     public RuntimeException parseError(String msg, JsonParseException e) {
-      throw new JsonErrorFixture("parseError", msg, e);
+      return new JsonErrorFixture("parseError", msg, e);
     }
 
     @Override
     public RuntimeException ioException(IOException e) {
-      throw new JsonErrorFixture("ioException", "", e);
+      return new JsonErrorFixture("ioException", "", e);
     }
 
     @Override
     public RuntimeException structureError(String msg) {
-      throw new JsonErrorFixture("structureError", msg);
+      return new JsonErrorFixture("structureError", msg);
     }
 
     @Override
     public RuntimeException syntaxError(JsonParseException e) {
-      throw new JsonErrorFixture("syntaxError", "", e);
+      return new JsonErrorFixture("syntaxError", "", e);
     }
 
     @Override
     public RuntimeException typeError(UnsupportedConversionError e) {
-      throw new JsonErrorFixture("typeError", "", e);
+      return new JsonErrorFixture("typeError", "", e);
     }
 
     @Override
     public RuntimeException syntaxError(JsonToken token) {
-      throw new JsonErrorFixture("syntaxError", token.toString());
+      return new JsonErrorFixture("syntaxError", token.toString());
     }
 
     @Override
     public RuntimeException unrecoverableError() {
-      throw new JsonErrorFixture("unrecoverableError", "");
+      return new JsonErrorFixture("unrecoverableError", "");
+    }
+
+    @Override
+    public RuntimeException messageParseError(MessageParser.MessageContextException e) {
+      return new JsonErrorFixture("messageParseError", "Message parse error", e);
     }
   }
 
@@ -252,16 +258,25 @@
   }
 
   protected static class JsonParserFixture {
+    JsonStructureParserBuilder builder;
     JsonStructureOptions options = new JsonStructureOptions();
     JsonStructureParser parser;
     ObjectListenerFixture rootObject = new ObjectListenerFixture();
     ErrorFactory errorFactory = new ErrorFactoryFixture();
 
+    public JsonParserFixture() {
+      builder = new JsonStructureParserBuilder();
+    }
+
     public void open(String json) {
       InputStream inStream = new
           ReaderInputStream(new StringReader(json));
-      parser = new JsonStructureParser(inStream, options, rootObject,
-          errorFactory);
+      builder
+          .fromStream(inStream)
+          .options(options)
+          .rootListener(rootObject)
+          .errorFactory(errorFactory);
+      parser = builder.build();
     }
 
     public boolean next() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserMessage.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserMessage.java
new file mode 100644
index 0000000..4905d1f
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/json/parser/TestJsonParserMessage.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.parser;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.drill.exec.store.easy.json.parser.ValueDef.JsonType;
+import org.junit.Test;
+
+import com.fasterxml.jackson.core.JsonToken;
+
+public class TestJsonParserMessage extends BaseTestJsonParser {
+
+  /**
+   * Example message parser. A real parser would provide much better
+   * error messages for badly-formed JSON or error codes.
+   */
+  private static class MessageParserFixture implements MessageParser {
+
+    @Override
+    public boolean parsePrefix(TokenIterator tokenizer) {
+      assertEquals(JsonToken.START_OBJECT, tokenizer.requireNext());
+      assertEquals(JsonToken.FIELD_NAME, tokenizer.requireNext());
+      assertEquals(JsonToken.VALUE_STRING, tokenizer.requireNext());
+      if (!"ok".equals(tokenizer.stringValue())) {
+        return false;
+      }
+      assertEquals(JsonToken.FIELD_NAME, tokenizer.requireNext());
+      assertEquals(JsonToken.START_ARRAY, tokenizer.requireNext());
+      return true;
+    }
+
+    @Override
+    public void parseSuffix(TokenIterator tokenizer) {
+      assertEquals(JsonToken.END_OBJECT, tokenizer.requireNext());
+    }
+  }
+
+  /**
+   * Test the ability to wrap the data objects with a custom message
+   * structure, typical of a REST call.
+   */
+  @Test
+  public void testMessageParser() {
+    final String json =
+        "{ status: \"ok\", data: [{a: 0}, {a: 100}, {a: null}]}";
+    JsonParserFixture fixture = new JsonParserFixture();
+    fixture.builder.messageParser(new MessageParserFixture());
+    fixture.open(json);
+    assertTrue(fixture.next());
+    ValueListenerFixture a = fixture.field("a");
+    assertEquals(JsonType.INTEGER, a.valueDef.type());
+    assertEquals(2, fixture.read());
+    assertEquals(1, a.nullCount);
+    assertEquals(100L, a.value);
+    fixture.close();
+  }
+
+  /**
+   * Test the ability to cancel the data load if a message header
+   * indicates that there is no data.
+   */
+  @Test
+  public void testMessageParserEOF() {
+    final String json =
+        "{ status: \"fail\", data: [{a: 0}, {a: 100}, {a: null}]}";
+    JsonParserFixture fixture = new JsonParserFixture();
+    fixture.builder.messageParser(new MessageParserFixture());
+    fixture.open(json);
+    assertFalse(fixture.next());
+    fixture.close();
+  }
+
+  @Test
+  public void testDataPath() {
+    final String json =
+        "{ status: \"ok\", data: [{a: 0}, {a: 100}, {a: null}]}";
+    JsonParserFixture fixture = new JsonParserFixture();
+    fixture.builder.dataPath("data");
+    fixture.open(json);
+    assertTrue(fixture.next());
+    ValueListenerFixture a = fixture.field("a");
+    assertEquals(JsonType.INTEGER, a.valueDef.type());
+    assertEquals(2, fixture.read());
+    assertEquals(1, a.nullCount);
+    assertEquals(100L, a.value);
+    fixture.close();
+  }
+
+  @Test
+  public void testComplexDataPath() {
+    final String json =
+        "{ status: {result : \"ok\", runtime: 123},\n" +
+        "  response: { rowCount: 1,\n" +
+        "    data: [{a: 0}, {a: 100}, {a: null}]},\n" +
+        "  footer: \"some stuff\"}";
+    JsonParserFixture fixture = new JsonParserFixture();
+    fixture.builder.dataPath("response/data");
+    fixture.open(json);
+    assertTrue(fixture.next());
+    ValueListenerFixture a = fixture.field("a");
+    assertEquals(JsonType.INTEGER, a.valueDef.type());
+    assertEquals(2, fixture.read());
+    assertEquals(1, a.nullCount);
+    assertEquals(100L, a.value);
+    fixture.close();
+  }
+
+  @Test
+  public void testDataPathNull() {
+    final String json =
+        "{ status: \"fail\", data: null}";
+    JsonParserFixture fixture = new JsonParserFixture();
+    fixture.builder.messageParser(new MessageParserFixture());
+    fixture.open(json);
+    assertFalse(fixture.next());
+    fixture.close();
+  }
+
+  @Test
+  public void testDataPathMissing() {
+    final String json =
+        "{ status: \"fail\"}";
+    JsonParserFixture fixture = new JsonParserFixture();
+    fixture.builder.messageParser(new MessageParserFixture());
+    fixture.open(json);
+    assertFalse(fixture.next());
+    fixture.close();
+  }
+
+  @Test
+  public void testDataPathErrorRoot() {
+    final String json = "\"Bogus!\"";
+    JsonParserFixture fixture = new JsonParserFixture();
+    fixture.builder.dataPath("data");
+    try {
+      fixture.open(json);
+      fail();
+    } catch (JsonErrorFixture e) {
+      assertTrue(e.errorType.equals("messageParseError"));
+      assertTrue(e.getCause() instanceof MessageParser.MessageContextException);
+    }
+    fixture.close();
+  }
+
+  @Test
+  public void testDataPathErrorLeaf() {
+    final String json =
+        "{ status: \"bogus\", data: { notValid: \"must be array\"}}";
+    JsonParserFixture fixture = new JsonParserFixture();
+    fixture.builder.dataPath("data");
+    try {
+      fixture.open(json);
+      fail();
+    } catch (JsonErrorFixture e) {
+      assertTrue(e.errorType.equals("messageParseError"));
+      assertTrue(e.getCause() instanceof MessageParser.MessageContextException);
+    }
+    fixture.close();
+  }
+}