DRILL-8179: Convert LTSV Format Plugin to EVF2 (#2725)

diff --git a/contrib/format-ltsv/README.md b/contrib/format-ltsv/README.md
index 05916b0..af71cf5 100644
--- a/contrib/format-ltsv/README.md
+++ b/contrib/format-ltsv/README.md
@@ -4,6 +4,18 @@
 
 For more information about LTSV, please see [LTSV (Labeled Tab-separated Values)](http://ltsv.org/).
 
+## Configuration
+There are several optional configuration parameters which you can use to modify how ltsv files are read.  In general, it is not necessary to change these from the defaults.  They are:
+
+* `parseMode`: Sets the error tolerance of the LTSV parser.  Possible values are `lenient` and `strict`.  Defaults to `lenient`.
+* `escapeCharacter`: Character to be used to escape control character.
+* `kvDelimiter`: Character to delimit key/value pairs.
+* `entryDelimiter`: Character to delimit entries.
+* `lineEnding`: Character to denote line endings.
+* `quoteChar`: Character to denote quoted strings.
+
+With the exception of `parseMode`, all fields accept a single character string.
+
 ## Example of Querying an LTSV File
 
 ### About the Data
@@ -36,3 +48,19 @@
 +-----------------------------+------------------+---------------+-----------------------+---------+-------+----------+-----------------+----------+----------+------------------+
 1 row selected (6.074 seconds)
 ```
+
+### Providing a Schema
+The LTSV reader does supports provided schema.  You can read about Drill's [provided schema functionality here](https://drill.apache.org/docs/plugin-configuration-basics/#specifying-the-schema-as-table-function-parameter)
+
+An example query would be:
+
+```sql
+SELECT * FROM table(cp.`simple.ltsv` (type=> 'ltsv', schema => 
+    'inline=(`referer` VARCHAR, 
+    `vhost`VARCHAR, `size` INT, 
+    `forwardedfor` VARCHAR, 
+    `reqtime` DOUBLE, 
+    `apptime` DOUBLE, 
+    `status` INT)'))
+```
+Only scalar types are supported in the LTSV reader.
diff --git a/contrib/format-ltsv/pom.xml b/contrib/format-ltsv/pom.xml
index 2a0d244..4a33ee5 100644
--- a/contrib/format-ltsv/pom.xml
+++ b/contrib/format-ltsv/pom.xml
@@ -36,6 +36,11 @@
       <artifactId>drill-java-exec</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.github.lonely-lockley</groupId>
+      <artifactId>ltsv-parser</artifactId>
+      <version>1.1.0</version>
+    </dependency>
     <!-- Test dependencies -->
     <dependency>
       <groupId>org.apache.drill.exec</groupId>
diff --git a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVBatchReader.java b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVBatchReader.java
new file mode 100644
index 0000000..f25e672
--- /dev/null
+++ b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVBatchReader.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.ltsv;
+
+import com.github.lolo.ltsv.LtsvParser;
+import com.github.lolo.ltsv.LtsvParser.Builder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map;
+
+public class LTSVBatchReader implements ManagedReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(LTSVBatchReader.class);
+  private final LTSVFormatPluginConfig config;
+  private final FileDescrip file;
+  private final CustomErrorContext errorContext;
+  private final LtsvParser ltsvParser;
+  private final RowSetLoader rowWriter;
+  private final FileSchemaNegotiator negotiator;
+  private InputStream fsStream;
+  private Iterator<Map<String, String>> rowIterator;
+
+
+  public LTSVBatchReader(LTSVFormatPluginConfig config, FileSchemaNegotiator negotiator) {
+    this.config = config;
+    this.negotiator = negotiator;
+    file = negotiator.file();
+    errorContext = negotiator.parentErrorContext();
+    ltsvParser = buildParser();
+
+    openFile();
+
+    // If there is a provided schema, import it
+    if (negotiator.providedSchema() != null) {
+      TupleMetadata schema = negotiator.providedSchema();
+      negotiator.tableSchema(schema, false);
+    }
+    ResultSetLoader loader = negotiator.build();
+    rowWriter = loader.writer();
+
+  }
+
+  private void openFile() {
+    try {
+      fsStream = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
+    } catch (IOException e) {
+      throw UserException
+          .dataReadError(e)
+          .message("Unable to open LTSV File %s", file.split().getPath() + " " + e.getMessage())
+          .addContext(errorContext)
+          .build(logger);
+    }
+    rowIterator = ltsvParser.parse(fsStream);
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (!processNextRow()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private LtsvParser buildParser() {
+    Builder builder = LtsvParser.builder();
+    builder.trimKeys();
+    builder.trimValues();
+    builder.skipNullValues();
+
+    if (config.getParseMode().contentEquals("strict")) {
+      builder.strict();
+    } else {
+      builder.lenient();
+    }
+
+    if (StringUtils.isNotEmpty(config.getEscapeCharacter())) {
+      builder.withEscapeChar(config.getEscapeCharacter().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getKvDelimiter())) {
+      builder.withKvDelimiter(config.getKvDelimiter().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getEntryDelimiter())) {
+      builder.withEntryDelimiter(config.getEntryDelimiter().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getLineEnding())) {
+      builder.withLineEnding(config.getLineEnding().charAt(0));
+    }
+
+    if (StringUtils.isNotEmpty(config.getQuoteChar())) {
+      builder.withQuoteChar(config.getQuoteChar().charAt(0));
+    }
+
+    return builder.build();
+  }
+
+  private boolean processNextRow() {
+    if (!rowIterator.hasNext()) {
+      return false;
+    }
+    // Start the row
+    String key;
+    String value;
+    int columnIndex;
+    ScalarWriter columnWriter;
+    Map<String, String> row = rowIterator.next();
+
+    // Skip empty lines
+    if (row.isEmpty()) {
+      return true;
+    }
+    rowWriter.start();
+    for (Map.Entry<String,String> field: row.entrySet()) {
+      key = field.getKey();
+      value = field.getValue();
+      columnIndex = getColumnIndex(key);
+      columnWriter = getColumnWriter(key);
+
+
+      if (negotiator.providedSchema() != null) {
+        // Check the type. LTSV will only read other data types if a schema is provided.
+        ColumnMetadata columnMetadata = rowWriter.tupleSchema().metadata(columnIndex);
+        MinorType dataType = columnMetadata.type();
+        LocalTime localTime;
+        LocalDate localDate;
+
+        switch (dataType) {
+          case BIT:
+            columnWriter.setBoolean(Boolean.parseBoolean(value));
+            break;
+          case INT:
+          case SMALLINT:
+          case TINYINT:
+            columnWriter.setInt(Integer.parseInt(value));
+            break;
+          case BIGINT:
+            columnWriter.setLong(Long.parseLong(value));
+            break;
+          case FLOAT8:
+          case FLOAT4:
+            columnWriter.setDouble(Double.parseDouble(value));
+            break;
+          case TIME:
+            columnMetadata = rowWriter.tupleSchema().metadata(key);
+            String dateFormat = columnMetadata.property("drill.format");
+
+            if (Strings.isNullOrEmpty(dateFormat)) {
+              localTime = LocalTime.parse(value);
+            } else {
+              localTime = LocalTime.parse(value, DateTimeFormatter.ofPattern(dateFormat));
+            }
+            columnWriter.setTime(localTime);
+            break;
+          case DATE:
+            dateFormat = columnMetadata.property("drill.format");
+
+            if (Strings.isNullOrEmpty(dateFormat)) {
+              localDate = LocalDate.parse(value);
+            } else {
+              localDate = LocalDate.parse(value, DateTimeFormatter.ofPattern(dateFormat));
+            }
+            columnWriter.setDate(localDate);
+            break;
+          case TIMESTAMP:
+            dateFormat = columnMetadata.property("drill.format");
+            Instant timestamp;
+            if (Strings.isNullOrEmpty(dateFormat)) {
+              timestamp = Instant.parse(value);
+            } else {
+              try {
+                SimpleDateFormat simpleDateFormat = new SimpleDateFormat(dateFormat);
+                Date parsedDate = simpleDateFormat.parse(value);
+                timestamp = Instant.ofEpochMilli(parsedDate.getTime());
+              } catch (ParseException e) {
+                throw UserException.parseError(e)
+                    .message("Cannot parse " + value + " as a timestamp. You can specify a format string in the provided schema to correct this.")
+                    .addContext(errorContext)
+                    .build(logger);
+              }
+            }
+            columnWriter.setTimestamp(timestamp);
+            break;
+          default:
+            columnWriter.setString(value);
+        }
+      } else {
+        columnWriter.setString(value);
+      }
+    }
+    // Finish the row
+    rowWriter.save();
+    return true;
+  }
+
+  @Override
+  public void close() {
+    logger.debug("Closing input stream for LTSV reader.");
+    AutoCloseables.closeSilently(fsStream);
+  }
+
+  private int getColumnIndex(String fieldName) {
+    // Find the TupleWriter object
+    int index = rowWriter.tupleSchema().index(fieldName);
+
+    // Unknown columns are always strings.
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(fieldName, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
+      index = rowWriter.addColumn(colSchema);
+    }
+    return index;
+  }
+
+  private ScalarWriter getColumnWriter(String fieldName){
+    // Find the TupleWriter object
+    int index = getColumnIndex(fieldName);
+    return rowWriter.scalar(index);
+  }
+}
diff --git a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java
index c28b101..8b077db 100644
--- a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java
+++ b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java
@@ -17,69 +17,63 @@
  */
 package org.apache.drill.exec.store.ltsv;
 
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
 
-import java.util.List;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.hadoop.conf.Configuration;
+
 
 public class LTSVFormatPlugin extends EasyFormatPlugin<LTSVFormatPluginConfig> {
 
-  private static final boolean IS_COMPRESSIBLE = true;
-
   private static final String DEFAULT_NAME = "ltsv";
 
-  public LTSVFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
-    this(name, context, fsConf, storageConfig, new LTSVFormatPluginConfig(null));
+  private static class LTSVReaderFactory extends FileReaderFactory {
+
+    private final LTSVFormatPluginConfig config;
+
+    public LTSVReaderFactory(LTSVFormatPluginConfig config) {
+      super();
+      this.config = config;
+    }
+
+    @Override
+    public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+      return new LTSVBatchReader(config, negotiator);
+    }
   }
 
-  public LTSVFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, LTSVFormatPluginConfig formatPluginConfig) {
-    super(name, context, fsConf, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
+  public LTSVFormatPlugin(String name, DrillbitContext context,
+                         Configuration fsConf, StoragePluginConfig storageConfig,
+                         LTSVFormatPluginConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
+  }
+
+  private static EasyFormatConfig easyConfig(Configuration fsConf, LTSVFormatPluginConfig pluginConfig) {
+    return EasyFormatConfig.builder()
+        .readable(true)
+        .writable(false)
+        .blockSplittable(true)
+        .compressible(true)
+        .supportsProjectPushdown(true)
+        .extensions(pluginConfig.getExtensions())
+        .fsConf(fsConf)
+        .defaultName(DEFAULT_NAME)
+        .scanVersion(ScanFrameworkVersion.EVF_V2)
+        .supportsLimitPushdown(true)
+        .build();
   }
 
   @Override
-  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns, String userName) {
-    return new LTSVRecordReader(context, fileWork.getPath(), dfs, columns);
-  }
-
-  @Override
-  public String getWriterOperatorType() {
-    throw new UnsupportedOperationException("Drill doesn't currently support writing to LTSV files.");
-  }
-
-  @Override
-  public boolean supportsPushDown() {
-    return true;
-  }
-
-  @Override
-  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) {
-    throw new UnsupportedOperationException("Drill doesn't currently support writing to LTSV files.");
-  }
-
-  @Override
-  public boolean supportsStatistics() {
-    return false;
-  }
-
-  @Override
-  public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
-    throw new UnsupportedOperationException("unimplemented");
-  }
-
-  @Override
-  public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) {
-    throw new UnsupportedOperationException("unimplemented");
+  protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
+    builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+    builder.readerFactory(new LTSVReaderFactory(getConfig()));
   }
 }
diff --git a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPluginConfig.java b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPluginConfig.java
index 11b0554..79c7f4b 100644
--- a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPluginConfig.java
+++ b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPluginConfig.java
@@ -22,6 +22,7 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
@@ -34,12 +35,58 @@
   private static final List<String> DEFAULT_EXTS = ImmutableList.of("ltsv");
 
   private final List<String> extensions;
+  private final String parseMode;
+  private final String escapeCharacter;
+  private final String kvDelimiter;
+  private final String entryDelimiter;
+  private final String lineEnding;
+  private final String quoteChar;
 
   @JsonCreator
-  public LTSVFormatPluginConfig(
-      @JsonProperty("extensions") List<String> extensions) {
-    this.extensions = extensions == null ?
-        DEFAULT_EXTS : ImmutableList.copyOf(extensions);
+  public LTSVFormatPluginConfig(@JsonProperty("extensions") List<String> extensions,
+                                @JsonProperty("parseMode") String parseMode,
+                                @JsonProperty("escapeCharacter") String escapeCharacter,
+                                @JsonProperty("kvDelimiter") String kvDelimiter,
+                                @JsonProperty("entryDelimiter") String entryDelimiter,
+                                @JsonProperty("lineEnding") String lineEnding,
+                                @JsonProperty("quoteChar") String quoteChar) {
+    this.extensions = extensions == null ? DEFAULT_EXTS : ImmutableList.copyOf(extensions);
+    this.escapeCharacter = escapeCharacter;
+    this.kvDelimiter = kvDelimiter;
+    this.parseMode = StringUtils.isEmpty(parseMode) ? "lenient" : parseMode;
+    this.entryDelimiter = entryDelimiter;
+    this.lineEnding = lineEnding;
+    this.quoteChar = quoteChar;
+  }
+
+  @JsonProperty("parseMode")
+  public String getParseMode() {
+    return parseMode;
+  }
+
+  @JsonProperty("escapeCharacter")
+  public String getEscapeCharacter() {
+    return escapeCharacter;
+  }
+
+  @JsonProperty("kvDelimiter")
+  public String getKvDelimiter() {
+    return kvDelimiter;
+  }
+
+  @JsonProperty("entryDelimiter")
+  public String getEntryDelimiter() {
+    return entryDelimiter;
+  }
+
+  @JsonProperty("lineEnding")
+  public String getLineEnding() {
+    return lineEnding;
+  }
+
+  @JsonProperty("quoteChar")
+  public String getQuoteChar() {
+    return quoteChar;
   }
 
   @JsonInclude(JsonInclude.Include.NON_DEFAULT)
@@ -49,7 +96,7 @@
 
   @Override
   public int hashCode() {
-    return Objects.hash(extensions);
+    return Objects.hash(extensions,parseMode, escapeCharacter, kvDelimiter, entryDelimiter, lineEnding, quoteChar);
   }
 
   @Override
@@ -60,13 +107,25 @@
       return false;
     }
     LTSVFormatPluginConfig that = (LTSVFormatPluginConfig) obj;
-    return Objects.equals(extensions, that.extensions);
+    return Objects.equals(extensions, that.extensions) &&
+        Objects.equals(parseMode, that.parseMode) &&
+        Objects.equals(escapeCharacter, that.escapeCharacter) &&
+        Objects.equals(entryDelimiter, that.entryDelimiter) &&
+        Objects.equals(lineEnding, that.lineEnding) &&
+        Objects.equals(quoteChar, that.quoteChar) &&
+        Objects.equals(kvDelimiter, that.kvDelimiter);
   }
 
   @Override
   public String toString() {
     return new PlanStringBuilder(this)
         .field("extensions", extensions)
+        .field("parseMode", parseMode)
+        .field("escapeCharacter", escapeCharacter)
+        .field("kvDelimiter", kvDelimiter)
+        .field("lineEnding", lineEnding)
+        .field("quoteChar", quoteChar)
+        .field("entryDelimiter", entryDelimiter)
         .toString();
   }
 }
diff --git a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java
deleted file mode 100644
index 619ceb1..0000000
--- a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.ltsv;
-
-import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-
-public class LTSVRecordReader extends AbstractRecordReader {
-
-  private static final Logger logger = LoggerFactory.getLogger(LTSVRecordReader.class);
-
-  private static final int MAX_RECORDS_PER_BATCH = 8096;
-
-  private final String inputPath;
-
-  private final InputStream fsStream;
-
-  private final BufferedReader reader;
-
-  private DrillBuf buffer;
-
-  private VectorContainerWriter writer;
-
-  public LTSVRecordReader(FragmentContext fragmentContext, Path path, DrillFileSystem fileSystem,
-                          List<SchemaPath> columns) throws OutOfMemoryException {
-    this.inputPath = path.toUri().getPath();
-    try {
-      this.fsStream = fileSystem.openPossiblyCompressedStream(path);
-      this.reader = new BufferedReader(new InputStreamReader(fsStream, StandardCharsets.UTF_8));
-      this.buffer = fragmentContext.getManagedBuffer();
-      setColumns(columns);
-    } catch (IOException e) {
-      throw UserException.dataReadError(e)
-        .message(String.format("Failed to open input file: %s", inputPath))
-        .build(logger);
-    }
-  }
-
-  @Override
-  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projected) {
-    Set<SchemaPath> transformed = new LinkedHashSet<>();
-    if (!isStarQuery()) {
-      transformed.addAll(projected);
-    } else {
-      transformed.add(SchemaPath.STAR_COLUMN);
-    }
-    return transformed;
-  }
-
-  public void setup(final OperatorContext context, final OutputMutator output) {
-    this.writer = new VectorContainerWriter(output);
-  }
-
-  public int next() {
-    this.writer.allocate();
-    this.writer.reset();
-
-    int recordCount = 0;
-
-    try {
-      BaseWriter.MapWriter map = this.writer.rootAsMap();
-      String line;
-
-      while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
-        // Skip empty lines
-        if (line.trim().length() == 0) {
-          continue;
-        }
-
-        List<String[]> fields = new ArrayList<>();
-        for (String field : line.split("\t")) {
-          int index = field.indexOf(":");
-          if (index <= 0) {
-            throw new ParseException(String.format("Invalid LTSV format: %s\n%d:%s", inputPath, recordCount + 1, line), 0);
-          }
-
-          String fieldName = field.substring(0, index);
-          String fieldValue = field.substring(index + 1);
-          if (selectedColumn(fieldName)) {
-            fields.add(new String[]{fieldName, fieldValue});
-          }
-        }
-
-        if (fields.size() == 0) {
-          continue;
-        }
-
-        this.writer.setPosition(recordCount);
-        map.start();
-
-        for (String[] field : fields) {
-          byte[] bytes = field[1].getBytes(StandardCharsets.UTF_8);
-          this.buffer = this.buffer.reallocIfNeeded(bytes.length);
-          this.buffer.setBytes(0, bytes, 0, bytes.length);
-          map.varChar(field[0]).writeVarChar(0, bytes.length, buffer);
-        }
-
-        map.end();
-        recordCount++;
-      }
-
-      this.writer.setValueCount(recordCount);
-      return recordCount;
-
-    } catch (final Exception e) {
-      String msg = String.format("Failure while reading messages from %s. Record reader was at record: %d", inputPath, recordCount + 1);
-      throw UserException.dataReadError(e)
-        .message(msg)
-        .build(logger);
-    }
-  }
-
-  private boolean selectedColumn(String fieldName) {
-    for (SchemaPath col : getColumns()) {
-      if (col.equals(SchemaPath.STAR_COLUMN) || col.getRootSegment().getPath().equals(fieldName)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  public void close() throws Exception {
-    AutoCloseables.close(reader, fsStream);
-  }
-}
diff --git a/contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVQueries.java b/contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVQueries.java
new file mode 100644
index 0000000..8565703
--- /dev/null
+++ b/contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVQueries.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.ltsv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestLTSVQueries extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+  }
+
+  @Test
+  public void testWildcard() throws Exception {
+    String sql = "SELECT * FROM cp.`simple.ltsv`";
+    RowSet results  = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("referer", MinorType.VARCHAR)
+        .addNullable("vhost", MinorType.VARCHAR)
+        .addNullable("size", MinorType.VARCHAR)
+        .addNullable("forwardedfor", MinorType.VARCHAR)
+        .addNullable("reqtime", MinorType.VARCHAR)
+        .addNullable("apptime", MinorType.VARCHAR)
+        .addNullable("host", MinorType.VARCHAR)
+        .addNullable("ua", MinorType.VARCHAR)
+        .addNullable("req", MinorType.VARCHAR)
+        .addNullable("status", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("-", "api.example.com", "4968", "-", "2.532", "2.532", "xxx.xxx.xxx.xxx", "Java/1.8.0_131", "GET /v1/xxx HTTP/1.1", "200")
+        .addRow("-", "api.example.com", "412", "-", "3.580", "3.580", "xxx.xxx.xxx.xxx", "Java/1.8.0_201", "GET /v1/yyy HTTP/1.1", "200")
+        .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSelectColumns() throws Exception {
+    String sql = "SELECT ua, reqtime FROM cp.`simple.ltsv`";
+    RowSet results  = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("ua", MinorType.VARCHAR)
+        .addNullable("reqtime", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("Java/1.8.0_131", "2.532")
+        .addRow("Java/1.8.0_201", "3.580")
+        .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testQueryWithConditions() throws Exception {
+    String sql = "SELECT * FROM cp.`simple.ltsv` WHERE reqtime > 3.0";
+    RowSet results  = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("referer", MinorType.VARCHAR)
+        .addNullable("vhost", MinorType.VARCHAR)
+        .addNullable("size", MinorType.VARCHAR)
+        .addNullable("forwardedfor", MinorType.VARCHAR)
+        .addNullable("reqtime", MinorType.VARCHAR)
+        .addNullable("apptime", MinorType.VARCHAR)
+        .addNullable("host", MinorType.VARCHAR)
+        .addNullable("ua", MinorType.VARCHAR)
+        .addNullable("req", MinorType.VARCHAR)
+        .addNullable("status", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("-", "api.example.com", "412", "-", "3.580", "3.580", "xxx.xxx.xxx.xxx", "Java/1.8.0_201", "GET /v1/yyy HTTP/1.1", "200")
+        .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String sql = "SELECT COUNT(*) as cnt FROM cp.`simple.ltsv` ";
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+    assertEquals("Counts should match", 2L, cnt);
+  }
+
+  @Test
+  public void testSkipEmptyLines() throws Exception {
+    assertEquals(2, queryBuilder().sql("SELECT * FROM cp.`emptylines.ltsv`").run().recordCount());
+  }
+
+  @Test
+  public void testReadException() throws Exception {
+    try {
+      run("SELECT * FROM table(cp.`invalid.ltsv` (type => 'ltsv', parseMode => 'strict'))");
+      fail();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains("DATA_READ ERROR: Empty key detected at line [0] position [49]"));
+    }
+  }
+
+  @Test
+  public void testProvidedSchema() throws Exception {
+    String sql = "SELECT * FROM table(cp.`simple.ltsv` (type=> 'ltsv', schema => 'inline=(`referer` VARCHAR, `vhost` VARCHAR, `size` INT, `forwardedfor` VARCHAR, " +
+        "`reqtime` DOUBLE, `apptime` DOUBLE, `status` INT)'))";
+    RowSet results  = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .addNullable("referer", MinorType.VARCHAR)
+        .addNullable("vhost", MinorType.VARCHAR)
+        .addNullable("size", MinorType.INT)
+        .addNullable("forwardedfor", MinorType.VARCHAR)
+        .addNullable("reqtime", MinorType.FLOAT8)
+        .addNullable("apptime", MinorType.FLOAT8)
+        .addNullable("status", MinorType.INT)
+        .addNullable("host", MinorType.VARCHAR)
+        .addNullable("ua", MinorType.VARCHAR)
+        .addNullable("req", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addRow("-", "api.example.com", 4968, "-", 2.532, 2.532, 200, "xxx.xxx.xxx.xxx", "Java/1.8.0_131", "GET /v1/xxx HTTP/1.1")
+        .addRow("-", "api.example.com", 412, "-", 3.58, 3.58, 200, "xxx.xxx.xxx.xxx", "Java/1.8.0_201", "GET /v1/yyy HTTP/1.1")
+        .build();
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+}
diff --git a/contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVRecordReader.java b/contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVRecordReader.java
deleted file mode 100644
index 419bb6f..0000000
--- a/contrib/format-ltsv/src/test/java/org/apache/drill/exec/store/ltsv/TestLTSVRecordReader.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.ltsv;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.test.ClusterFixture;
-import org.apache.drill.test.ClusterTest;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestLTSVRecordReader extends ClusterTest {
-
-  @BeforeClass
-  public static void setup() throws Exception {
-    startCluster(ClusterFixture.builder(dirTestWatcher));
-  }
-
-  @Test
-  public void testWildcard() throws Exception {
-    testBuilder()
-      .sqlQuery("SELECT * FROM cp.`simple.ltsv`")
-      .unOrdered()
-      .baselineColumns("host", "forwardedfor", "req", "status", "size", "referer", "ua", "reqtime", "apptime", "vhost")
-      .baselineValues("xxx.xxx.xxx.xxx", "-", "GET /v1/xxx HTTP/1.1", "200", "4968", "-", "Java/1.8.0_131", "2.532", "2.532", "api.example.com")
-      .baselineValues("xxx.xxx.xxx.xxx", "-", "GET /v1/yyy HTTP/1.1", "200", "412", "-", "Java/1.8.0_201", "3.580", "3.580", "api.example.com")
-      .go();
-  }
-
-  @Test
-  public void testSelectColumns() throws Exception {
-    testBuilder()
-      .sqlQuery("SELECT ua, reqtime FROM cp.`simple.ltsv`")
-      .unOrdered()
-      .baselineColumns("ua", "reqtime")
-      .baselineValues("Java/1.8.0_131", "2.532")
-      .baselineValues("Java/1.8.0_201", "3.580")
-      .go();
-  }
-
-  @Test
-  public void testQueryWithConditions() throws Exception {
-    testBuilder()
-      .sqlQuery("SELECT * FROM cp.`simple.ltsv` WHERE reqtime > 3.0")
-      .unOrdered()
-      .baselineColumns("host", "forwardedfor", "req", "status", "size", "referer", "ua", "reqtime", "apptime", "vhost")
-      .baselineValues("xxx.xxx.xxx.xxx", "-", "GET /v1/yyy HTTP/1.1", "200", "412", "-", "Java/1.8.0_201", "3.580", "3.580", "api.example.com")
-      .go();
-  }
-
-  @Test
-  public void testSkipEmptyLines() throws Exception {
-    assertEquals(2, queryBuilder().sql("SELECT * FROM cp.`emptylines.ltsv`").run().recordCount());
-  }
-
-  @Test
-  public void testReadException() throws Exception {
-    try {
-      run("SELECT * FROM cp.`invalid.ltsv`");
-      fail();
-    } catch (UserException e) {
-      assertEquals(UserBitShared.DrillPBError.ErrorType.DATA_READ, e.getErrorType());
-      assertTrue(e.getMessage().contains("Failure while reading messages from /invalid.ltsv. Record reader was at record: 1"));
-    }
-  }
-}