DRILL-6096: Provide mechanism to configure text writer configuration

1. Usage of format plugin configuration allows to specify line and field delimiters, quotes and escape characters.
2. Usage of system / session options allows to specify if writer should add headers, force quotes.

closes #1873
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index a9ac685..9f1ace3 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -89,7 +89,7 @@
     <dependency>
       <groupId>com.univocity</groupId>
       <artifactId>univocity-parsers</artifactId>
-      <version>1.3.0</version>
+      <version>2.8.3</version>
     </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
diff --git a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
index 1bf1b09..017fda4 100644
--- a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
@@ -43,7 +43,7 @@
 
 /**
  * Abstract implementation of RecordWriter interface which exposes interface:
- *    {@link #writeHeader(List)}
+ *    {@link #startNewSchema(BatchSchema)}
  *    {@link #addField(int,String)}
  * to output the data in string format instead of implementing addField for each type holder.
  *
@@ -60,13 +60,7 @@
 
   @Override
   public void updateSchema(VectorAccessible batch) throws IOException {
-    BatchSchema schema = batch.getSchema();
-    List<String> columnNames = Lists.newArrayList();
-    for (int i=0; i < schema.getFieldCount(); i++) {
-      columnNames.add(schema.getColumn(i).getName());
-    }
-
-    startNewSchema(columnNames);
+    startNewSchema(batch.getSchema());
   }
 
   @Override
@@ -160,6 +154,6 @@
   public void cleanup() throws IOException {
   }
 
-  public abstract void startNewSchema(List<String> columnNames) throws IOException;
+  public abstract void startNewSchema(BatchSchema schema) throws IOException;
   public abstract void addField(int fieldId, String value) throws IOException;
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 066d04d..20668b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -431,6 +431,13 @@
   public static final DoubleValidator TEXT_ESTIMATED_ROW_SIZE = new RangeDoubleValidator("store.text.estimated_row_size_bytes", 1, Long.MAX_VALUE,
       new OptionDescription("Estimate of the row size in a delimited text file, such as csv. The closer to actual, the better the query plan. Used for all csv files in the system/session where the value is set. Impacts the decision to plan a broadcast join or not."));
 
+  public static final String TEXT_WRITER_ADD_HEADER = "store.text.writer.add_header";
+  public static final BooleanValidator TEXT_WRITER_ADD_HEADER_VALIDATOR = new BooleanValidator(TEXT_WRITER_ADD_HEADER,
+    new OptionDescription("Enables the TEXT writer to write header in newly created file. Default is true. (Drill 1.17+)"));
+
+  public static final String TEXT_WRITER_FORCE_QUOTES = "store.text.writer.force_quotes";
+  public static final BooleanValidator TEXT_WRITER_FORCE_QUOTES_VALIDATOR = new BooleanValidator(TEXT_WRITER_FORCE_QUOTES,
+    new OptionDescription("Enables the TEXT writer to enclose in quotes all fields. Default is false. (Drill 1.17+)"));
 
   /**
    * Json writer option for writing `NaN` and `Infinity` tokens as numbers (not enclosed with double quotes)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
index ceb0848..ffd69e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
@@ -24,8 +24,10 @@
 import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
 import org.apache.drill.exec.physical.resultSet.project.RequestedColumnImpl;
 import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
-import org.apache.drill.exec.store.easy.text.reader.TextReader;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Parses the `columns` array. Doing so is surprisingly complex.
@@ -68,7 +70,8 @@
  */
 
 public class ColumnsArrayParser implements ScanProjectionParser {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnsArrayParser.class);
+
+  private static final Logger logger = LoggerFactory.getLogger(ColumnsArrayParser.class);
 
   // Config
 
@@ -151,13 +154,13 @@
 
     if (inCol.isArray()) {
       int maxIndex = inCol.maxIndex();
-      if (maxIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) {
+      if (maxIndex > TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS) {
         throw UserException
           .validationError()
           .message("`columns`[%d] index out of bounds, max supported size is %d",
-              maxIndex, TextReader.MAXIMUM_NUMBER_COLUMNS)
+              maxIndex, TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS)
           .addContext("Column:", inCol.name())
-          .addContext("Maximum index:", TextReader.MAXIMUM_NUMBER_COLUMNS)
+          .addContext("Maximum index:", TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS)
           .addContext("Actual index:", maxIndex)
           .addContext(builder.context())
           .build(logger);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 9438870..897a7c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -186,6 +186,8 @@
       new OptionDefinition(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE_VALIDATOR),
       new OptionDefinition(ExecConstants.ENABLE_UNION_TYPE),
       new OptionDefinition(ExecConstants.TEXT_ESTIMATED_ROW_SIZE),
+      new OptionDefinition(ExecConstants.TEXT_WRITER_ADD_HEADER_VALIDATOR),
+      new OptionDefinition(ExecConstants.TEXT_WRITER_FORCE_QUOTES_VALIDATOR),
       new OptionDefinition(ExecConstants.JSON_EXTENDED_TYPES),
       new OptionDefinition(ExecConstants.JSON_WRITER_UGLIFY),
       new OptionDefinition(ExecConstants.JSON_WRITER_SKIPNULLFIELDS),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 289d26c..c090f98 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -17,11 +17,11 @@
  */
 package org.apache.drill.exec.store.easy.text;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.common.exceptions.ChildErrorContext;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
@@ -31,9 +31,9 @@
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.metastore.MetadataProviderManager;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsScanBuilder;
@@ -58,14 +58,13 @@
 import org.apache.drill.exec.store.easy.text.writer.TextRecordWriter;
 import org.apache.drill.exec.store.schedule.CompleteFileWork;
 import org.apache.drill.exec.vector.accessor.convert.AbstractConvertFromString;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Text format plugin for CSV and other delimited text formats.
@@ -83,6 +82,12 @@
 public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
   private final static String PLUGIN_NAME = "text";
 
+  public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024;
+
+  public static final int MAX_CHARS_PER_COLUMN = Character.MAX_VALUE;
+
+  public static final char NULL_CHAR = '\0';
+
   // Provided schema table properties unique to this plugin. If specified
   // in the provided schema, they override the corresponding property in
   // the plugin config. Names here match the field names in the format config.
@@ -103,7 +108,7 @@
   @JsonInclude(Include.NON_DEFAULT)
   public static class TextFormatConfig implements FormatPluginConfig {
 
-    public List<String> extensions = ImmutableList.of();
+    public List<String> extensions = Collections.emptyList();
     public String lineDelimiter = "\n";
     public char fieldDelimiter = '\n';
     public char quote = '"';
@@ -125,11 +130,6 @@
     @JsonIgnore
     public boolean isHeaderExtractionEnabled() { return extractHeader; }
 
-    @JsonIgnore
-    public String getFieldDelimiterAsString(){
-      return new String(new char[]{fieldDelimiter});
-    }
-
     @Deprecated
     @JsonProperty("delimiter")
     public void setFieldDelimiter(char delimiter){
@@ -312,14 +312,25 @@
 
   @Override
   public RecordWriter getRecordWriter(final FragmentContext context, final EasyWriter writer) throws IOException {
-    final Map<String, String> options = new HashMap<>();
+    Map<String, String> options = new HashMap<>();
 
     options.put("location", writer.getLocation());
+
+    TextFormatConfig config = getConfig();
+    List<String> extensions = config.getExtensions();
+    options.put("extension", extensions == null || extensions.isEmpty() ? null : extensions.get(0));
+
     FragmentHandle handle = context.getHandle();
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
     options.put("prefix", fragmentId);
-    options.put("separator", getConfig().getFieldDelimiterAsString());
-    options.put("extension", getConfig().getExtensions().get(0));
+
+    options.put("addHeader", Boolean.toString(context.getOptions().getBoolean(ExecConstants.TEXT_WRITER_ADD_HEADER)));
+    options.put("forceQuotes", Boolean.toString(context.getOptions().getBoolean(ExecConstants.TEXT_WRITER_FORCE_QUOTES)));
+
+    options.put("lineSeparator", config.getLineDelimiter());
+    options.put("fieldDelimiter", String.valueOf(config.getFieldDelimiter()));
+    options.put("quote", String.valueOf(config.getQuote()));
+    options.put("escape", String.valueOf(config.getEscape()));
 
     RecordWriter recordWriter = new TextRecordWriter(
         context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java
index 6e74d36..d4edaf2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java
@@ -20,7 +20,7 @@
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 
-public abstract class BaseFieldOutput extends TextOutput {
+public abstract class BaseFieldOutput implements TextOutput {
 
   /**
    * Width of the per-field data buffer. Fields can be larger.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java
index 932096a..7fe6bd5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java
@@ -19,6 +19,7 @@
 
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 
 /**
@@ -31,20 +32,18 @@
   /**
    * We initialize and add the varchar vector for each incoming field in this
    * constructor.
-   * @param outputMutator  Used to create/modify schema
-   * @param fieldNames Incoming field names
-   * @param columns  List of columns selected in the query
-   * @param isStarQuery  boolean to indicate if all fields are selected or not
+   *
+   * @param writer row set writer
    */
-  public FieldVarCharOutput(RowSetLoader writer) {
+  FieldVarCharOutput(RowSetLoader writer) {
     super(writer,
-        TextReader.MAXIMUM_NUMBER_COLUMNS,
+        TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS,
         makeMask(writer));
   }
 
   private static boolean[] makeMask(RowSetLoader writer) {
     final TupleMetadata schema = writer.tupleSchema();
-    final boolean projectionMask[] = new boolean[schema.size()];
+    final boolean[] projectionMask = new boolean[schema.size()];
     for (int i = 0; i < schema.size(); i++) {
       projectionMask[i] = writer.column(i).isProjected();
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/HeaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/HeaderBuilder.java
index 2fb0ffc..c561171 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/HeaderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/HeaderBuilder.java
@@ -17,6 +17,12 @@
  */
 package org.apache.drill.exec.store.easy.text.reader;
 
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -24,11 +30,6 @@
 import java.util.List;
 import java.util.Set;
 
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-
 /**
  * Text output that implements a header reader/parser.
  * The caller parses out the characters of each header;
@@ -45,8 +46,9 @@
 // and read a single row, there is no good reason to try to use
 // value vectors and direct memory for this task.
 
-public class HeaderBuilder extends TextOutput {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HeaderBuilder.class);
+public class HeaderBuilder implements TextOutput {
+
+  private static final Logger logger = LoggerFactory.getLogger(HeaderBuilder.class);
 
   /**
    * Maximum Drill symbol length, as enforced for headers.
@@ -71,9 +73,9 @@
 
   public static final String ANONYMOUS_COLUMN_PREFIX = "column_";
 
-  public final Path filePath;
-  public final List<String> headers = new ArrayList<>();
-  public final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN);
+  private final Path filePath;
+  private final List<String> headers = new ArrayList<>();
+  private final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN);
 
   public HeaderBuilder(Path filePath) {
     this.filePath = filePath;
@@ -214,7 +216,7 @@
 
     // Force headers to be unique.
 
-    final Set<String> idents = new HashSet<String>();
+    final Set<String> idents = new HashSet<>();
     for (int i = 0; i < headers.size(); i++) {
       String header = headers.get(i);
       String key = header.toLowerCase();
@@ -254,7 +256,7 @@
     // Just return the headers: any needed checks were done in
     // finishRecord()
 
-    final String array[] = new String[headers.size()];
+    final String[] array = new String[headers.size()];
     return headers.toArray(array);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/RepeatedVarCharOutput.java
index 7d8894f..fdf3e53 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/RepeatedVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/RepeatedVarCharOutput.java
@@ -19,8 +19,11 @@
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class is responsible for generating record batches for text file inputs. We generate
@@ -28,23 +31,23 @@
  * value within the vector containing all the fields in the record as individual array elements.
  */
 public class RepeatedVarCharOutput extends BaseFieldOutput {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class);
+
+  private static final Logger logger = LoggerFactory.getLogger(BaseFieldOutput.class);
 
   private final ScalarWriter columnWriter;
-  private final ArrayWriter arrayWriter;
 
   /**
    * Provide the row set loader (which must have just one repeated Varchar
    * column) and an optional array projection mask.
-   * @param projectionMask
-   * @param tupleLoader
+   *
+   * @param loader row set loader
+   * @param projectionMask array projection mask
    */
-
   public RepeatedVarCharOutput(RowSetLoader loader, boolean[] projectionMask) {
     super(loader,
         maxField(loader, projectionMask),
         projectionMask);
-    arrayWriter = writer.array(0);
+    ArrayWriter arrayWriter = writer.array(0);
     columnWriter = arrayWriter.scalar();
   }
 
@@ -61,7 +64,7 @@
     // possible fields.
 
     if (projectionMask == null) {
-      return TextReader.MAXIMUM_NUMBER_COLUMNS;
+      return TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS;
     }
 
     // Else, this is a SELECT columns[x], columns[y], ... query.
@@ -110,11 +113,11 @@
       // this only if all fields are selected; the same query will succeed if
       // the user does a COUNT(*) or SELECT columns[x], columns[y], ...
 
-      if (currentFieldIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) {
+      if (currentFieldIndex > TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS) {
         throw UserException
           .unsupportedError()
           .message("Text file contains too many fields")
-          .addContext("Limit", TextReader.MAXIMUM_NUMBER_COLUMNS)
+          .addContext("Limit", TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS)
           .build(logger);
       }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextInput.java
index d9fa973..0ca2cfe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextInput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextInput.java
@@ -74,12 +74,12 @@
   /**
    * The current position in the buffer.
    */
-  public int bufferPtr;
+  private int bufferPtr;
 
   /**
    * The quantity of valid data in the buffer.
    */
-  public int length = -1;
+  private int length = -1;
 
   private boolean endFound = false;
 
@@ -91,7 +91,7 @@
    * {@link TextParsingSettings#getNormalizedNewLine()}) that is used to replace any
    * lineSeparator sequence found in the input.
    */
-  public TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
+  TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
     this.lineSeparator = settings.getNewLineDelimiter();
     byte normalizedLineSeparator = settings.getNormalizedNewLine();
     Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
@@ -156,9 +156,8 @@
    * May get an incomplete string since we don't support stream rewind.  Returns empty string for now.
    *
    * @return String of last few bytes.
-   * @throws IOException for input file read errors
    */
-  public String getStringSinceMarkForError() throws IOException {
+  public String getStringSinceMarkForError() {
     return " ";
   }
 
@@ -359,10 +358,6 @@
     return charCount + bufferPtr;
   }
 
-  public long getLineCount() {
-    return lineCount;
-  }
-
   public void close() throws IOException{
     input.close();
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextOutput.java
index 71d4731..50e2d83 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextOutput.java
@@ -17,63 +17,58 @@
  */
 package org.apache.drill.exec.store.easy.text.reader;
 
-
 /**
- * Base class for producing output record batches while dealing with
+ * Interface for producing output record batches while dealing with
  * text files. Defines the interface called from text parsers to create
  * the corresponding value vectors (record batch).
  */
+interface TextOutput {
 
-abstract class TextOutput {
-
-  public abstract void startRecord();
+  /**
+   * Start processing a new record.
+   */
+  void startRecord();
 
   /**
    * Start processing a new field within a record.
-   * @param index  index within the record
+   *
+   * @param index index within the record
    */
-  public abstract void startField(int index);
+  void startField(int index);
 
   /**
    * End processing a field within a record.
-   * @return  true if engine should continue processing record.  false if rest of record can be skipped.
+   *
+   * @return true if engine should continue processing record.  false if rest of record can be skipped.
    */
-  public abstract boolean endField();
+  boolean endField();
 
   /**
    * Shortcut that lets the output know that we are closing ending a field with no data.
+   *
    * @return true if engine should continue processing record.  false if rest of record can be skipped.
    */
-  public abstract boolean endEmptyField();
+  boolean endEmptyField();
 
   /**
-   * Add the provided data but drop any whitespace.
-   * @param data character to append
+   * Appends a byte to the output character data buffer.
+   *
+   * @param data current byte read
    */
-  public void appendIgnoringWhitespace(byte data) {
-    if (TextReader.isWhite(data)) {
-      // noop
-    } else {
-      append(data);
-    }
-  }
-
-  /**
-   * Appends a byte to the output character data buffer
-   * @param data  current byte read
-   */
-  public abstract void append(byte data);
+  void append(byte data);
 
   /**
    * Completes the processing of a given record. Also completes the processing of the
    * last field being read.
    */
-  public abstract void finishRecord();
+  void finishRecord();
 
   /**
-   *  Return the total number of records (across batches) processed
+   * Return the total number of records (across batches) processed
+   *
+   * @return record count
    */
-  public abstract long getRecordCount();
+  long getRecordCount();
 
   /**
    * Indicates if the current batch is full and reading for this batch
@@ -83,6 +78,5 @@
    * the batch to be sent downstream, false if the reader may continue to
    * add rows to the current batch
    */
-
-  public abstract boolean isFull();
+  boolean isFull();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingContext.java
index 3f6dbeb..5b3f3d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingContext.java
@@ -17,101 +17,130 @@
  */
 package org.apache.drill.exec.store.easy.text.reader;
 
-import java.io.IOException;
-
 import com.univocity.parsers.common.ParsingContext;
+import com.univocity.parsers.common.record.Record;
+import com.univocity.parsers.common.record.RecordMetaData;
+
+import java.util.Collections;
+import java.util.Map;
 
 class TextParsingContext implements ParsingContext {
 
   private final TextInput input;
   private final TextOutput output;
-  protected boolean stopped;
 
-  private int[] extractedIndexes;
+  private boolean stopped;
 
-  public TextParsingContext(TextInput input, TextOutput output) {
+  TextParsingContext(TextInput input, TextOutput output) {
     this.input = input;
     this.output = output;
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  public boolean isFull() {
+    return output.isFull();
+  }
+
+  public void stop(boolean stopped) {
+    this.stopped = stopped;
+  }
+
   @Override
   public void stop() {
     stopped = true;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public boolean isStopped() {
     return stopped;
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  @Override
+  public int errorContentLength() {
+    return -1;
+  }
+
+  @Override
+  public Record toRecord(String[] row) {
+    return null;
+  }
+
+  @Override
+  public RecordMetaData recordMetaData() {
+    return null;
+  }
+
   @Override
   public long currentLine() {
     return input.lineCount();
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public long currentChar() {
     return input.charCount();
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  @Override
+  public void skipLines(long lines) {
+  }
+
+  @Override
+  public String[] parsedHeaders() {
+    return new String[0];
+  }
+
   @Override
   public int currentColumn() {
     return -1;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public String[] headers() {
-    return new String[]{};
+    return new String[0];
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  @Override
+  public String[] selectedHeaders() {
+    return new String[0];
+  }
+
   @Override
   public int[] extractedFieldIndexes() {
-    return extractedIndexes;
+    return new int[0];
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public long currentRecord() {
     return output.getRecordCount();
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public String currentParsedContent() {
-    try {
-      return input.getStringSinceMarkForError();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+    return input.getStringSinceMarkForError();
   }
 
   @Override
-  public void skipLines(int lines) {
+  public int currentParsedContentLength() {
+    return input.getStringSinceMarkForError().toCharArray().length;
+  }
+
+  @Override
+  public String fieldContentOnError() {
+    return null;
+  }
+
+  @Override
+  public Map<Long, String> comments() {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public String lastComment() {
+    return null;
+  }
+
+  @Override
+  public char[] lineSeparator() {
+    return new char[0];
   }
 
   @Override
@@ -119,8 +148,14 @@
     return false;
   }
 
-  public boolean isFull() {
-    return output.isFull();
+  @Override
+  public int indexOf(String header) {
+    return -1;
+  }
+
+  @Override
+  public int indexOf(Enum<?> header) {
+    return -1;
   }
 }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java
index a91a8ab..acc1cda 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java
@@ -33,7 +33,7 @@
   private final byte delimiter;
   private final byte comment;
 
-  private final long maxCharsPerColumn = Character.MAX_VALUE;
+  private final long maxCharsPerColumn = TextFormatPlugin.MAX_CHARS_PER_COLUMN;
   private final byte normalizedNewLine = b('\n');
   private final byte[] newLineDelimiter;
   private final boolean ignoreLeadingWhitespaces = false;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java
index 0ce856e..96d62f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java
@@ -17,13 +17,14 @@
  */
 package org.apache.drill.exec.store.easy.text.reader;
 
-import java.io.IOException;
-
-import org.apache.drill.common.exceptions.UserException;
-
 import com.univocity.parsers.common.TextParsingException;
-
 import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 
 /*******************************************************************************
  * Portions Copyright 2014 uniVocity Software Pty Ltd
@@ -34,11 +35,10 @@
  * DrillBuf support.
  */
 public final class TextReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextReader.class);
 
-  private static final byte NULL_BYTE = (byte) '\0';
+  private static final Logger logger = LoggerFactory.getLogger(TextReader.class);
 
-  public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024;
+  private static final byte NULL_BYTE = (byte) TextFormatPlugin.NULL_CHAR;
 
   private final TextParsingContext context;
 
@@ -102,7 +102,7 @@
    * any ASCII <= ' ' is considered a white space. However since byte in JAVA is signed
    * we have an additional check to make sure its not negative
    */
-  static final boolean isWhite(byte b){
+  static boolean isWhite(byte b){
     return b <= ' ' && b > -1;
   }
 
@@ -252,12 +252,15 @@
           prev = NULL_BYTE;
         } else {
           prev = ch;
+          // read next char taking into account it can be new line indicator
+          // to ensure that custom new line will be replaced with normalized one
+          ch = input.nextChar();
+          continue;
         }
       } else {
         if (prev == quoteEscape) {
           output.append(prev);
-        }
-        else if (prev == quote) { // unescaped quote detected
+        } else if (prev == quote) { // unescaped quote detected
           if (parseUnescapedQuotes) {
             output.append(prev);
             break;
@@ -326,7 +329,7 @@
    * @return true if more rows can be read, false if not
    * @throws IOException for input file read errors
    */
-  private final boolean parseField() throws IOException {
+  private boolean parseField() throws IOException {
 
     output.startField(fieldIndex++);
 
@@ -375,7 +378,7 @@
    * @throws IOException for input file read errors
    */
   public final void start() throws IOException {
-    context.stopped = false;
+    context.stop(false);
     input.start();
   }
 
@@ -386,7 +389,7 @@
    */
   public final boolean parseNext() throws IOException {
     try {
-      while (! context.stopped) {
+      while (!context.isStopped()) {
         ch = input.nextChar();
         if (ch == comment) {
           input.skipLines(1);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/writer/TextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/writer/TextRecordWriter.java
index a114fd4..fa1d62f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/writer/TextRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/writer/TextRecordWriter.java
@@ -17,44 +17,49 @@
  */
 package org.apache.drill.exec.store.easy.text.writer;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.List;
-import java.util.Map;
-
+import com.univocity.parsers.csv.CsvFormat;
+import com.univocity.parsers.csv.CsvWriter;
+import com.univocity.parsers.csv.CsvWriterSettings;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.store.StringOutputRecordWriter;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
-import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 public class TextRecordWriter extends StringOutputRecordWriter {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextRecordWriter.class);
+
+  private static final Logger logger = LoggerFactory.getLogger(TextRecordWriter.class);
 
   private final StorageStrategy storageStrategy;
+  private final Configuration fsConf;
 
+  private FileSystem fs;
   private Path cleanUpLocation;
-
   private String location;
   private String prefix;
-
-  private String fieldDelimiter;
   private String extension;
+  // indicates number of a file created by this writer: 0_0_{fileNumberIndex}.csv (ex: 0_0_0.csv)
+  private int fileNumberIndex;
 
-  private int index;
-  private PrintStream stream = null;
-  private FileSystem fs = null;
+  private CsvWriterSettings writerSettings;
+  private CsvWriter writer;
 
-  // Record write status
-  private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
-  private StringBuilder currentRecord; // contains the current record separated by field delimiter
-
-  private Configuration fsConf;
+  // record write status: true once the startRecord() is called until endRecord() is called
+  private boolean fRecordStarted = false;
 
   public TextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy, Configuration fsConf) {
     super(allocator);
@@ -66,24 +71,38 @@
   public void init(Map<String, String> writerOptions) throws IOException {
     this.location = writerOptions.get("location");
     this.prefix = writerOptions.get("prefix");
-    this.fieldDelimiter = writerOptions.get("separator");
-    this.extension = writerOptions.get("extension");
 
     this.fs = FileSystem.get(fsConf);
+    String extension = writerOptions.get("extension");
+    this.extension = extension == null ? "" : "." + extension;
+    this.fileNumberIndex = 0;
 
-    this.currentRecord = new StringBuilder();
-    this.index = 0;
+    CsvWriterSettings writerSettings = new CsvWriterSettings();
+    writerSettings.setMaxColumns(TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS);
+    writerSettings.setMaxCharsPerColumn(TextFormatPlugin.MAX_CHARS_PER_COLUMN);
+    writerSettings.setHeaderWritingEnabled(Boolean.parseBoolean(writerOptions.get("addHeader")));
+    writerSettings.setQuoteAllFields(Boolean.parseBoolean(writerOptions.get("forceQuotes")));
+    CsvFormat format = writerSettings.getFormat();
+    format.setLineSeparator(writerOptions.get("lineSeparator"));
+    format.setDelimiter(writerOptions.get("fieldDelimiter"));
+    format.setQuote(writerOptions.get("quote").charAt(0));
+    format.setQuoteEscape(writerOptions.get("escape").charAt(0));
+    format.setCharToEscapeQuoteEscaping(TextFormatPlugin.NULL_CHAR); // do not escape "escape" char
+
+    this.writerSettings = writerSettings;
+
+    logger.trace("Text writer settings: {}", this.writerSettings);
   }
 
   @Override
-  public void startNewSchema(List<String> columnNames) throws IOException {
+  public void startNewSchema(BatchSchema schema) throws IOException {
     // wrap up the current file
     cleanup();
 
     // open a new file for writing data with new schema
-    Path fileName = new Path(location, prefix + "_" + index + "." + extension);
+    Path fileName = new Path(location, String.format("%s_%s%s", prefix, fileNumberIndex, extension));
     try {
-      // drill text writer does not support partitions, so only one file can be created
+      // Drill text writer does not support partitions, so only one file can be created
       // and thus only one location should be deleted in case of abort
       // to ensure that our writer was the first to create output file,
       // we create empty output file first and fail if file exists
@@ -93,21 +112,26 @@
       // we need to re-apply file permission
       DataOutputStream fos = fs.create(fileName);
       storageStrategy.applyToFile(fs, fileName);
+      logger.debug("Created file: {}.", fileName);
 
-      stream = new PrintStream(fos);
-      logger.debug("Created file: {}", fileName);
-    } catch (IOException ex) {
-      logger.error("Unable to create file: " + fileName, ex);
-      throw ex;
+      // increment file number index
+      fileNumberIndex++;
+
+      this.writer = new CsvWriter(fos, writerSettings);
+    } catch (IOException e) {
+      throw new IOException(String.format("Unable to create file: %s.", fileName), e);
     }
-    index++;
 
-    stream.println(Joiner.on(fieldDelimiter).join(columnNames));
+    if (writerSettings.isHeaderWritingEnabled()) {
+      writer.writeHeaders(StreamSupport.stream(schema.spliterator(), false)
+        .map(MaterializedField::getName)
+        .collect(Collectors.toList()));
+    }
   }
 
   @Override
-  public void addField(int fieldId, String value) throws IOException {
-    currentRecord.append(value + fieldDelimiter);
+  public void addField(int fieldId, String value) {
+    writer.addValue(value);
   }
 
   @Override
@@ -115,7 +139,6 @@
     if (fRecordStarted) {
       throw new IOException("Previous record is not written completely");
     }
-
     fRecordStarted = true;
   }
 
@@ -125,13 +148,7 @@
       throw new IOException("No record is in writing");
     }
 
-    // remove the extra delimiter at the end
-    currentRecord.deleteCharAt(currentRecord.length()-fieldDelimiter.length());
-
-    stream.println(currentRecord.toString());
-
-    // reset current record status
-    currentRecord.delete(0, currentRecord.length());
+    writer.writeValuesToRow();
     fRecordStarted = false;
   }
 
@@ -164,11 +181,16 @@
 
   @Override
   public void cleanup() throws IOException {
-    super.cleanup();
-    if (stream != null) {
-      stream.close();
-      stream = null;
-      logger.debug("closing file");
+    fRecordStarted = false;
+    if (writer != null) {
+      try {
+        writer.close();
+        writer = null;
+        logger.debug("Closed text writer for file: {}.", cleanUpLocation);
+      } catch (IllegalStateException e) {
+        throw new IOException(String.format("Unable to close text writer for file %s: %s",
+          cleanUpLocation, e.getMessage()), e);
+      }
     }
   }
 
@@ -180,5 +202,4 @@
           cleanUpLocation.toUri().getPath(), fs.getUri());
     }
   }
-
 }
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 7dfafd3..b276389 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -660,6 +660,8 @@
     store.table.use_schema_file: false,
     store.partition.hash_distribute: false,
     store.text.estimated_row_size_bytes: 100.0,
+    store.text.writer.add_header: true,
+    store.text.writer.force_quotes: false,
     store.kafka.all_text_mode: false,
     store.kafka.read_numbers_as_double: false,
     store.kafka.record.reader: "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader",
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java
new file mode 100644
index 0000000..6ad38bc
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.writer;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import static org.junit.Assert.assertEquals;
+
+public class TestTextWriter extends ClusterTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static final List<String> tablesToDrop = new ArrayList<>();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    startCluster(builder);
+
+    Map<String, FormatPluginConfig> formats = new HashMap<>();
+
+    TextFormatConfig csv = new TextFormatConfig();
+    csv.extensions = Collections.singletonList("csv");
+    csv.lineDelimiter = "\n";
+    csv.fieldDelimiter = ',';
+    csv.quote = '"';
+    csv.escape = '"';
+    csv.extractHeader = true;
+    formats.put("csv", csv);
+
+    TextFormatConfig tsv = new TextFormatConfig();
+    tsv.extensions = Collections.singletonList("tsv");
+    tsv.lineDelimiter = "\n";
+    tsv.fieldDelimiter = '\t';
+    tsv.quote = '"';
+    tsv.escape = '"';
+    tsv.extractHeader = true;
+    formats.put("tsv", tsv);
+
+    TextFormatConfig custom = new TextFormatConfig();
+    custom.extensions = Collections.singletonList("custom");
+    custom.lineDelimiter = "!";
+    custom.fieldDelimiter = '_';
+    custom.quote = '$';
+    custom.escape = '^';
+    custom.extractHeader = true;
+    formats.put("custom", custom);
+
+    cluster.defineFormats("dfs", formats);
+  }
+
+  @After
+  public void cleanUp() {
+    client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION);
+    client.resetSession(ExecConstants.TEXT_WRITER_ADD_HEADER);
+    client.resetSession(ExecConstants.TEXT_WRITER_FORCE_QUOTES);
+
+    tablesToDrop.forEach(
+      table -> client.runSqlSilently(String.format("drop table if exists %s", table)));
+  }
+
+  @Test
+  public void testWithHeaders() throws Exception {
+    client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+
+    String tableName = "csv_with_headers_table";
+    String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+    tablesToDrop.add(fullTableName);
+
+    queryBuilder().sql("create table %s as select 'a' as col1, 'b' as col2 from (values(1))", fullTableName).run();
+
+    Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.csv");
+    List<String> lines = Files.readAllLines(path);
+    assertEquals(Arrays.asList("col1,col2", "a,b"), lines);
+  }
+
+  @Test
+  public void testWithoutHeaders() throws Exception {
+    client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+    client.alterSession(ExecConstants.TEXT_WRITER_ADD_HEADER, false);
+
+    String tableName = "csv_without_headers_table";
+    String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+    tablesToDrop.add(fullTableName);
+
+    queryBuilder().sql("create table %s as select 'a' as col1, 'b' as col2 from (values(1))", fullTableName).run();
+
+    Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.csv");
+    List<String> lines = Files.readAllLines(path);
+    assertEquals(Collections.singletonList("a,b"), lines);
+  }
+
+  @Test
+  public void testNoQuotes() throws Exception {
+    client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+
+    String tableName = "csv_no_quotes_table";
+    String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+    tablesToDrop.add(fullTableName);
+
+    queryBuilder().sql("create table %s as " +
+      "select 1 as id, 'Bob' as name, 'A B C' as desc from (values(1))", fullTableName).run();
+
+    testBuilder()
+      .sqlQuery("select * from %s", fullTableName)
+      .unOrdered()
+      .baselineColumns("id", "name", "desc")
+      .baselineValues("1", "Bob", "A B C")
+      .go();
+
+    Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.csv");
+    List<String> lines = Files.readAllLines(path);
+    assertEquals(Arrays.asList("id,name,desc", "1,Bob,A B C"), lines);
+  }
+
+  @Test
+  public void testQuotesOnDemand() throws Exception {
+    client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+
+    String tableName = "csv_quotes_on_demand_table";
+    String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+    tablesToDrop.add(fullTableName);
+
+    queryBuilder().sql("create table %s as " +
+      "select 1 as id, 'Bob\nSmith' as name, 'A,B,C' as desc from (values(1))", fullTableName).run();
+
+    testBuilder()
+      .sqlQuery("select * from %s", fullTableName)
+      .unOrdered()
+      .baselineColumns("id", "name", "desc")
+      .baselineValues("1", "Bob\nSmith", "A,B,C")
+      .go();
+
+    Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.csv");
+    List<String> lines = Files.readAllLines(path);
+    assertEquals(Arrays.asList("id,name,desc", "1,\"Bob", "Smith\",\"A,B,C\""), lines);
+  }
+
+  @Test
+  public void testForceQuotes() throws Exception {
+    client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+    client.alterSession(ExecConstants.TEXT_WRITER_FORCE_QUOTES, true);
+
+    String tableName = "csv_force_quotes_table";
+    String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+    tablesToDrop.add(fullTableName);
+
+    queryBuilder().sql("create table %s as " +
+      "select 1 as id, 'Bob' as name, 'A,B,C' as desc from (values(1))", fullTableName).run();
+
+    testBuilder()
+      .sqlQuery("select * from %s", fullTableName)
+      .unOrdered()
+      .baselineColumns("id", "name", "desc")
+      .baselineValues("1", "Bob", "A,B,C")
+      .go();
+
+    Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.csv");
+    List<String> lines = Files.readAllLines(path);
+    assertEquals(Arrays.asList("\"id\",\"name\",\"desc\"", "\"1\",\"Bob\",\"A,B,C\""), lines);
+  }
+
+  @Test
+  public void testTsv() throws Exception {
+    client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "tsv");
+
+    String tableName = "tsv_table";
+    String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+    tablesToDrop.add(fullTableName);
+
+    queryBuilder().sql("create table %s as " +
+      "select 1 as id, 'Bob\tSmith' as name, 'A\"B\"C' as desc from (values(1))", fullTableName).run();
+
+    testBuilder()
+      .sqlQuery("select * from %s", fullTableName)
+      .unOrdered()
+      .baselineColumns("id", "name", "desc")
+      .baselineValues("1", "Bob\tSmith", "A\"B\"C")
+      .go();
+
+    Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.tsv");
+    List<String> lines = Files.readAllLines(path);
+    assertEquals(Arrays.asList("id\tname\tdesc", "1\t\"Bob\tSmith\"\tA\"B\"C"), lines);
+  }
+
+  @Test
+  public void testCustomFormat() throws Exception {
+    client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "custom");
+
+    String tableName = "custom_format_table";
+    String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+    tablesToDrop.add(fullTableName);
+
+    queryBuilder().sql("create table %s as " +
+      "select 1 as `id_`, 'Bob$Smith' as name, 'A^B!C' as desc from (values(1))", fullTableName).run();
+
+    testBuilder()
+      .sqlQuery("select * from %s", fullTableName)
+      .unOrdered()
+      .baselineColumns("id_", "name", "desc")
+      .baselineValues("1", "Bob$Smith", "A^B!C")
+      .go();
+
+    Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.custom");
+    List<String> lines = Files.readAllLines(path);
+    assertEquals(Collections.singletonList("$id_$_name_desc!1_Bob$Smith_$A^B!C$!"), lines);
+  }
+
+  @Test
+  public void testLineDelimiterLengthLimit() throws Exception {
+    TextFormatConfig incorrect = new TextFormatConfig();
+    incorrect.lineDelimiter = "end";
+    cluster.defineFormat("dfs", "incorrect", incorrect);
+
+    client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "incorrect");
+
+    String tableName = "incorrect_line_delimiter_table";
+    String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+    tablesToDrop.add(fullTableName);
+
+    // univocity-parsers allow only 1 - 2 characters line separators
+    thrown.expect(UserException.class);
+    thrown.expectMessage("Invalid line separator");
+
+    queryBuilder().sql("create table %s as select 1 as id from (values(1))", fullTableName).run();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
index 67ca4c1..a22d11c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
@@ -17,24 +17,25 @@
  */
 package org.apache.drill.exec.store.easy.text.compliant;
 
-import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import org.apache.drill.TestSelectWithOption;
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.io.File;
 import java.io.FileWriter;
 import java.io.PrintWriter;
 
-import org.apache.drill.TestSelectWithOption;
-import org.apache.drill.categories.RowSetTests;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
-import org.apache.drill.exec.physical.rowSet.RowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
-import org.apache.drill.test.rowSet.RowSetUtilities;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test table properties with the compliant text reader. The
@@ -49,7 +50,7 @@
  * using that schema rather than using the "columns" array
  * column.
  *
- * @see {@link TestSelectWithOption} for similar tests using table
+ * @see TestSelectWithOption for similar tests using table
  * properties within SQL
  */
 
@@ -89,9 +90,9 @@
         .build();
   }
 
-  public static String SELECT_ALL = "SELECT * FROM %s";
+  private static final String SELECT_ALL = "SELECT * FROM %s";
 
-  private static String noHeaders[] = {
+  private static final String[] noHeaders = {
       "10,fred",
       "20,wilma"
   };
@@ -122,7 +123,7 @@
     }
   }
 
-  private static String extraCols[] = {
+  private static final String[] extraCols = {
       "10,fred,23.45",
       "20,wilma,1234.56,vip"
   };
@@ -148,7 +149,7 @@
     }
   }
 
-  private static String skipHeaders[] = {
+  private static final String[] skipHeaders = {
       "ignore,me",
       "10,fred",
       "20,wilma"
@@ -180,7 +181,7 @@
     }
   }
 
-  private static String withHeaders[] = {
+  private static final String[] withHeaders = {
       "id, name",
       "10,fred",
       "20,wilma"
@@ -220,7 +221,7 @@
     }
   }
 
-  private static String barDelim[] = {
+  private static final String[] barDelim = {
       "10|fred",
       "20|wilma"
   };
@@ -241,7 +242,7 @@
     }
   }
 
-  private static String customCommentChar[] = {
+  private static final String[] customCommentChar = {
       "@Comment",
       "#10,fred",
       "#20,wilma"
@@ -273,7 +274,7 @@
     }
   }
 
-  private static String noCommentChar[] = {
+  private static final String[] noCommentChar = {
       "#10,fred",
       "#20,wilma"
   };
@@ -301,7 +302,7 @@
     }
   }
 
-  private static String quotesData[] = {
+  private static final String[] quotesData = {
     "1,@foo@",
     "2,@foo~@bar@",
 
@@ -342,7 +343,7 @@
     }
   }
 
-  private static String doubleQuotesData[] = {
+  private static final String[] doubleQuotesData = {
       "1,@foo@",
       "2,@foo@@bar@",
     };
@@ -380,7 +381,38 @@
     }
   }
 
-  private static String specialCharsData[] = {
+  private static final String[] quotesAndCustomNewLineData = {
+    "1,@foo@!2,@foo@@bar@!",
+  };
+
+  @Test
+  public void testQuotesAndCustomNewLine() throws Exception {
+    try {
+      enableSchemaSupport();
+      String tablePath = buildTable("quotesAndCustomNewLine", quotesAndCustomNewLineData);
+      String sql = "create schema () " +
+        "for table " + tablePath + " PROPERTIES ('" +
+        TextFormatPlugin.HAS_HEADERS_PROP + "'='false', '" +
+        TextFormatPlugin.SKIP_FIRST_LINE_PROP + "'='false', '" +
+        TextFormatPlugin.LINE_DELIM_PROP + "'='!', '" +
+        TextFormatPlugin.QUOTE_PROP + "'='@', '" +
+        TextFormatPlugin.QUOTE_ESCAPE_PROP + "'='@')";
+      run(sql);
+      RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+      TupleMetadata expectedSchema = new SchemaBuilder()
+        .addArray("columns", MinorType.VARCHAR)
+        .buildSchema();
+      RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+        .addSingleCol(strArray("1", "foo"))
+        .addSingleCol(strArray("2", "foo@bar"))
+        .build();
+      RowSetUtilities.verify(expected, actual);
+    } finally {
+      resetSchemaSupport();
+    }
+  }
+
+  private static final String[] specialCharsData = {
       "10\u0001'fred'",
       "20\u0001'wilma'"
     };
@@ -428,7 +460,7 @@
       enableSchemaSupport();
       String tableName = "newline";
       File rootDir = new File(testDir, tableName);
-      rootDir.mkdir();
+      assertTrue(rootDir.mkdir());
       try(PrintWriter out = new PrintWriter(new FileWriter(new File(rootDir, ROOT_FILE)))) {
         out.print("1,fred\r2,wilma\r");
       }
@@ -453,7 +485,7 @@
     }
   }
 
-  private static String messyQuotesData[] = {
+  private static final String[] messyQuotesData = {
       "first\"field\"here,another \"field",
       "end quote\",another\"",
       "many\"\"\"\",more\"\"",
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index a26cf2f..91584dd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -17,6 +17,35 @@
  */
 package org.apache.drill.test;
 
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ZookeeperHelper;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.store.SchemaFactory;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistryImpl;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.drill.exec.store.mock.MockStorageEngine;
+import org.apache.drill.exec.store.mock.MockStorageEngineConfig;
+import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
+import org.apache.drill.exec.util.StoragePluginTestUtils;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.io.Resources;
+import org.apache.drill.test.DrillTestWrapper.TestServices;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -33,35 +62,6 @@
 import java.util.Optional;
 import java.util.Properties;
 
-import org.apache.drill.exec.store.SchemaFactory;
-import org.apache.drill.test.DrillTestWrapper.TestServices;
-import org.apache.drill.common.config.DrillProperties;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ZookeeperHelper;
-import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.RootAllocatorFactory;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.UserBitShared.QueryType;
-import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.RemoteServiceSet;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.StoragePluginRegistryImpl;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.dfs.WorkspaceConfig;
-import org.apache.drill.exec.store.mock.MockStorageEngine;
-import org.apache.drill.exec.store.mock.MockStorageEngineConfig;
-import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
-import org.apache.drill.exec.util.StoragePluginTestUtils;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.io.Resources;
-
 import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_TMP_SCHEMA;
 import static org.apache.drill.exec.util.StoragePluginTestUtils.ROOT_SCHEMA;
 import static org.apache.drill.exec.util.StoragePluginTestUtils.TMP_SCHEMA;
@@ -416,6 +416,10 @@
       }
     }
     zkHelper = null;
+
+    if (ex != null) {
+      throw ex;
+    }
   }
 
   /**
@@ -487,7 +491,7 @@
     }
   }
 
-  public static void defineWorkspace(Drillbit drillbit, String pluginName,
+  private void defineWorkspace(Drillbit drillbit, String pluginName,
       String schemaName, String path, String defaultFormat, FormatPluginConfig format)
       throws ExecutionSetupException {
     final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
@@ -500,19 +504,55 @@
       .ifPresent(newWorkspaces::putAll);
     newWorkspaces.put(schemaName, newTmpWSConfig);
 
-    Map<String, FormatPluginConfig> newFormats = new HashMap<>(pluginConfig.getFormats());
+    Map<String, FormatPluginConfig> newFormats = new HashMap<>();
     Optional.ofNullable(pluginConfig.getFormats())
       .ifPresent(newFormats::putAll);
     Optional.ofNullable(format)
       .ifPresent(f -> newFormats.put(defaultFormat, f));
 
-    FileSystemConfig newPluginConfig = new FileSystemConfig(
-        pluginConfig.getConnection(),
-        pluginConfig.getConfig(),
-        newWorkspaces,
-        newFormats);
-    newPluginConfig.setEnabled(pluginConfig.isEnabled());
+    updatePlugin(pluginRegistry, pluginName, pluginConfig, newWorkspaces, newFormats);
+  }
 
+  public void defineFormat(String pluginName, String name, FormatPluginConfig config) {
+    defineFormats(pluginName, ImmutableMap.of(name, config));
+  }
+
+  public void defineFormats(String pluginName, Map<String, FormatPluginConfig> formats) {
+    for (Drillbit bit : drillbits()) {
+      try {
+        defineFormats(bit, pluginName, formats);
+      } catch (ExecutionSetupException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+  }
+
+  private void defineFormats(Drillbit drillbit,
+                             String pluginName,
+                             Map<String, FormatPluginConfig> formats) throws ExecutionSetupException {
+    StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
+    FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin(pluginName);
+    FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
+
+    Map<String, FormatPluginConfig> newFormats = new HashMap<>();
+    Optional.ofNullable(pluginConfig.getFormats())
+      .ifPresent(newFormats::putAll);
+    newFormats.putAll(formats);
+
+    updatePlugin(pluginRegistry, pluginName, pluginConfig, null, newFormats);
+  }
+
+  private void updatePlugin(StoragePluginRegistry pluginRegistry,
+                            String pluginName,
+                            FileSystemConfig pluginConfig,
+                            Map<String, WorkspaceConfig> newWorkspaces,
+                            Map<String, FormatPluginConfig> newFormats) throws ExecutionSetupException {
+    FileSystemConfig newPluginConfig = new FileSystemConfig(
+      pluginConfig.getConnection(),
+      pluginConfig.getConfig(),
+      newWorkspaces == null ? pluginConfig.getWorkspaces() : newWorkspaces,
+      newFormats == null ? pluginConfig.getFormats() : newFormats);
+    newPluginConfig.setEnabled(pluginConfig.isEnabled());
 
     pluginRegistry.createOrUpdate(pluginName, newPluginConfig, true);
   }
@@ -580,8 +620,8 @@
    * Return a cluster fixture built with standard options. This is a short-cut
    * for simple tests that don't need special setup.
    *
+   * @param dirTestWatcher directory test watcher
    * @return a cluster fixture with standard options
-   * @throws Exception if something goes wrong
    */
   public static ClusterFixture standardCluster(BaseDirTestWatcher dirTestWatcher) {
     return builder(dirTestWatcher).build();