DRILL-6096: Provide mechanism to configure text writer configuration
1. Usage of format plugin configuration allows to specify line and field delimiters, quotes and escape characters.
2. Usage of system / session options allows to specify if writer should add headers, force quotes.
closes #1873
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index a9ac685..9f1ace3 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -89,7 +89,7 @@
<dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
- <version>1.3.0</version>
+ <version>2.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
diff --git a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
index 1bf1b09..017fda4 100644
--- a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
@@ -43,7 +43,7 @@
/**
* Abstract implementation of RecordWriter interface which exposes interface:
- * {@link #writeHeader(List)}
+ * {@link #startNewSchema(BatchSchema)}
* {@link #addField(int,String)}
* to output the data in string format instead of implementing addField for each type holder.
*
@@ -60,13 +60,7 @@
@Override
public void updateSchema(VectorAccessible batch) throws IOException {
- BatchSchema schema = batch.getSchema();
- List<String> columnNames = Lists.newArrayList();
- for (int i=0; i < schema.getFieldCount(); i++) {
- columnNames.add(schema.getColumn(i).getName());
- }
-
- startNewSchema(columnNames);
+ startNewSchema(batch.getSchema());
}
@Override
@@ -160,6 +154,6 @@
public void cleanup() throws IOException {
}
- public abstract void startNewSchema(List<String> columnNames) throws IOException;
+ public abstract void startNewSchema(BatchSchema schema) throws IOException;
public abstract void addField(int fieldId, String value) throws IOException;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 066d04d..20668b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -431,6 +431,13 @@
public static final DoubleValidator TEXT_ESTIMATED_ROW_SIZE = new RangeDoubleValidator("store.text.estimated_row_size_bytes", 1, Long.MAX_VALUE,
new OptionDescription("Estimate of the row size in a delimited text file, such as csv. The closer to actual, the better the query plan. Used for all csv files in the system/session where the value is set. Impacts the decision to plan a broadcast join or not."));
+ public static final String TEXT_WRITER_ADD_HEADER = "store.text.writer.add_header";
+ public static final BooleanValidator TEXT_WRITER_ADD_HEADER_VALIDATOR = new BooleanValidator(TEXT_WRITER_ADD_HEADER,
+ new OptionDescription("Enables the TEXT writer to write header in newly created file. Default is true. (Drill 1.17+)"));
+
+ public static final String TEXT_WRITER_FORCE_QUOTES = "store.text.writer.force_quotes";
+ public static final BooleanValidator TEXT_WRITER_FORCE_QUOTES_VALIDATOR = new BooleanValidator(TEXT_WRITER_FORCE_QUOTES,
+ new OptionDescription("Enables the TEXT writer to enclose in quotes all fields. Default is false. (Drill 1.17+)"));
/**
* Json writer option for writing `NaN` and `Infinity` tokens as numbers (not enclosed with double quotes)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
index ceb0848..ffd69e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
@@ -24,8 +24,10 @@
import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
import org.apache.drill.exec.physical.resultSet.project.RequestedColumnImpl;
import org.apache.drill.exec.physical.resultSet.project.RequestedTuple.RequestedColumn;
-import org.apache.drill.exec.store.easy.text.reader.TextReader;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Parses the `columns` array. Doing so is surprisingly complex.
@@ -68,7 +70,8 @@
*/
public class ColumnsArrayParser implements ScanProjectionParser {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnsArrayParser.class);
+
+ private static final Logger logger = LoggerFactory.getLogger(ColumnsArrayParser.class);
// Config
@@ -151,13 +154,13 @@
if (inCol.isArray()) {
int maxIndex = inCol.maxIndex();
- if (maxIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) {
+ if (maxIndex > TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS) {
throw UserException
.validationError()
.message("`columns`[%d] index out of bounds, max supported size is %d",
- maxIndex, TextReader.MAXIMUM_NUMBER_COLUMNS)
+ maxIndex, TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS)
.addContext("Column:", inCol.name())
- .addContext("Maximum index:", TextReader.MAXIMUM_NUMBER_COLUMNS)
+ .addContext("Maximum index:", TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS)
.addContext("Actual index:", maxIndex)
.addContext(builder.context())
.build(logger);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 9438870..897a7c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -186,6 +186,8 @@
new OptionDefinition(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE_VALIDATOR),
new OptionDefinition(ExecConstants.ENABLE_UNION_TYPE),
new OptionDefinition(ExecConstants.TEXT_ESTIMATED_ROW_SIZE),
+ new OptionDefinition(ExecConstants.TEXT_WRITER_ADD_HEADER_VALIDATOR),
+ new OptionDefinition(ExecConstants.TEXT_WRITER_FORCE_QUOTES_VALIDATOR),
new OptionDefinition(ExecConstants.JSON_EXTENDED_TYPES),
new OptionDefinition(ExecConstants.JSON_WRITER_UGLIFY),
new OptionDefinition(ExecConstants.JSON_WRITER_SKIPNULLFIELDS),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 289d26c..c090f98 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -17,11 +17,11 @@
*/
package org.apache.drill.exec.store.easy.text;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.common.exceptions.ChildErrorContext;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
@@ -31,9 +31,9 @@
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsScanBuilder;
@@ -58,14 +58,13 @@
import org.apache.drill.exec.store.easy.text.writer.TextRecordWriter;
import org.apache.drill.exec.store.schedule.CompleteFileWork;
import org.apache.drill.exec.vector.accessor.convert.AbstractConvertFromString;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
* Text format plugin for CSV and other delimited text formats.
@@ -83,6 +82,12 @@
public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
private final static String PLUGIN_NAME = "text";
+ public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024;
+
+ public static final int MAX_CHARS_PER_COLUMN = Character.MAX_VALUE;
+
+ public static final char NULL_CHAR = '\0';
+
// Provided schema table properties unique to this plugin. If specified
// in the provided schema, they override the corresponding property in
// the plugin config. Names here match the field names in the format config.
@@ -103,7 +108,7 @@
@JsonInclude(Include.NON_DEFAULT)
public static class TextFormatConfig implements FormatPluginConfig {
- public List<String> extensions = ImmutableList.of();
+ public List<String> extensions = Collections.emptyList();
public String lineDelimiter = "\n";
public char fieldDelimiter = '\n';
public char quote = '"';
@@ -125,11 +130,6 @@
@JsonIgnore
public boolean isHeaderExtractionEnabled() { return extractHeader; }
- @JsonIgnore
- public String getFieldDelimiterAsString(){
- return new String(new char[]{fieldDelimiter});
- }
-
@Deprecated
@JsonProperty("delimiter")
public void setFieldDelimiter(char delimiter){
@@ -312,14 +312,25 @@
@Override
public RecordWriter getRecordWriter(final FragmentContext context, final EasyWriter writer) throws IOException {
- final Map<String, String> options = new HashMap<>();
+ Map<String, String> options = new HashMap<>();
options.put("location", writer.getLocation());
+
+ TextFormatConfig config = getConfig();
+ List<String> extensions = config.getExtensions();
+ options.put("extension", extensions == null || extensions.isEmpty() ? null : extensions.get(0));
+
FragmentHandle handle = context.getHandle();
String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
options.put("prefix", fragmentId);
- options.put("separator", getConfig().getFieldDelimiterAsString());
- options.put("extension", getConfig().getExtensions().get(0));
+
+ options.put("addHeader", Boolean.toString(context.getOptions().getBoolean(ExecConstants.TEXT_WRITER_ADD_HEADER)));
+ options.put("forceQuotes", Boolean.toString(context.getOptions().getBoolean(ExecConstants.TEXT_WRITER_FORCE_QUOTES)));
+
+ options.put("lineSeparator", config.getLineDelimiter());
+ options.put("fieldDelimiter", String.valueOf(config.getFieldDelimiter()));
+ options.put("quote", String.valueOf(config.getQuote()));
+ options.put("escape", String.valueOf(config.getEscape()));
RecordWriter recordWriter = new TextRecordWriter(
context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java
index 6e74d36..d4edaf2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/BaseFieldOutput.java
@@ -20,7 +20,7 @@
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
-public abstract class BaseFieldOutput extends TextOutput {
+public abstract class BaseFieldOutput implements TextOutput {
/**
* Width of the per-field data buffer. Fields can be larger.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java
index 932096a..7fe6bd5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java
@@ -19,6 +19,7 @@
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
/**
@@ -31,20 +32,18 @@
/**
* We initialize and add the varchar vector for each incoming field in this
* constructor.
- * @param outputMutator Used to create/modify schema
- * @param fieldNames Incoming field names
- * @param columns List of columns selected in the query
- * @param isStarQuery boolean to indicate if all fields are selected or not
+ *
+ * @param writer row set writer
*/
- public FieldVarCharOutput(RowSetLoader writer) {
+ FieldVarCharOutput(RowSetLoader writer) {
super(writer,
- TextReader.MAXIMUM_NUMBER_COLUMNS,
+ TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS,
makeMask(writer));
}
private static boolean[] makeMask(RowSetLoader writer) {
final TupleMetadata schema = writer.tupleSchema();
- final boolean projectionMask[] = new boolean[schema.size()];
+ final boolean[] projectionMask = new boolean[schema.size()];
for (int i = 0; i < schema.size(); i++) {
projectionMask[i] = writer.column(i).isProjected();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/HeaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/HeaderBuilder.java
index 2fb0ffc..c561171 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/HeaderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/HeaderBuilder.java
@@ -17,6 +17,12 @@
*/
package org.apache.drill.exec.store.easy.text.reader;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -24,11 +30,6 @@
import java.util.List;
import java.util.Set;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-
/**
* Text output that implements a header reader/parser.
* The caller parses out the characters of each header;
@@ -45,8 +46,9 @@
// and read a single row, there is no good reason to try to use
// value vectors and direct memory for this task.
-public class HeaderBuilder extends TextOutput {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HeaderBuilder.class);
+public class HeaderBuilder implements TextOutput {
+
+ private static final Logger logger = LoggerFactory.getLogger(HeaderBuilder.class);
/**
* Maximum Drill symbol length, as enforced for headers.
@@ -71,9 +73,9 @@
public static final String ANONYMOUS_COLUMN_PREFIX = "column_";
- public final Path filePath;
- public final List<String> headers = new ArrayList<>();
- public final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN);
+ private final Path filePath;
+ private final List<String> headers = new ArrayList<>();
+ private final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN);
public HeaderBuilder(Path filePath) {
this.filePath = filePath;
@@ -214,7 +216,7 @@
// Force headers to be unique.
- final Set<String> idents = new HashSet<String>();
+ final Set<String> idents = new HashSet<>();
for (int i = 0; i < headers.size(); i++) {
String header = headers.get(i);
String key = header.toLowerCase();
@@ -254,7 +256,7 @@
// Just return the headers: any needed checks were done in
// finishRecord()
- final String array[] = new String[headers.size()];
+ final String[] array = new String[headers.size()];
return headers.toArray(array);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/RepeatedVarCharOutput.java
index 7d8894f..fdf3e53 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/RepeatedVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/RepeatedVarCharOutput.java
@@ -19,8 +19,11 @@
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
import org.apache.drill.exec.vector.accessor.ArrayWriter;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Class is responsible for generating record batches for text file inputs. We generate
@@ -28,23 +31,23 @@
* value within the vector containing all the fields in the record as individual array elements.
*/
public class RepeatedVarCharOutput extends BaseFieldOutput {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class);
+
+ private static final Logger logger = LoggerFactory.getLogger(BaseFieldOutput.class);
private final ScalarWriter columnWriter;
- private final ArrayWriter arrayWriter;
/**
* Provide the row set loader (which must have just one repeated Varchar
* column) and an optional array projection mask.
- * @param projectionMask
- * @param tupleLoader
+ *
+ * @param loader row set loader
+ * @param projectionMask array projection mask
*/
-
public RepeatedVarCharOutput(RowSetLoader loader, boolean[] projectionMask) {
super(loader,
maxField(loader, projectionMask),
projectionMask);
- arrayWriter = writer.array(0);
+ ArrayWriter arrayWriter = writer.array(0);
columnWriter = arrayWriter.scalar();
}
@@ -61,7 +64,7 @@
// possible fields.
if (projectionMask == null) {
- return TextReader.MAXIMUM_NUMBER_COLUMNS;
+ return TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS;
}
// Else, this is a SELECT columns[x], columns[y], ... query.
@@ -110,11 +113,11 @@
// this only if all fields are selected; the same query will succeed if
// the user does a COUNT(*) or SELECT columns[x], columns[y], ...
- if (currentFieldIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) {
+ if (currentFieldIndex > TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS) {
throw UserException
.unsupportedError()
.message("Text file contains too many fields")
- .addContext("Limit", TextReader.MAXIMUM_NUMBER_COLUMNS)
+ .addContext("Limit", TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS)
.build(logger);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextInput.java
index d9fa973..0ca2cfe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextInput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextInput.java
@@ -74,12 +74,12 @@
/**
* The current position in the buffer.
*/
- public int bufferPtr;
+ private int bufferPtr;
/**
* The quantity of valid data in the buffer.
*/
- public int length = -1;
+ private int length = -1;
private boolean endFound = false;
@@ -91,7 +91,7 @@
* {@link TextParsingSettings#getNormalizedNewLine()}) that is used to replace any
* lineSeparator sequence found in the input.
*/
- public TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
+ TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
this.lineSeparator = settings.getNewLineDelimiter();
byte normalizedLineSeparator = settings.getNormalizedNewLine();
Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
@@ -156,9 +156,8 @@
* May get an incomplete string since we don't support stream rewind. Returns empty string for now.
*
* @return String of last few bytes.
- * @throws IOException for input file read errors
*/
- public String getStringSinceMarkForError() throws IOException {
+ public String getStringSinceMarkForError() {
return " ";
}
@@ -359,10 +358,6 @@
return charCount + bufferPtr;
}
- public long getLineCount() {
- return lineCount;
- }
-
public void close() throws IOException{
input.close();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextOutput.java
index 71d4731..50e2d83 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextOutput.java
@@ -17,63 +17,58 @@
*/
package org.apache.drill.exec.store.easy.text.reader;
-
/**
- * Base class for producing output record batches while dealing with
+ * Interface for producing output record batches while dealing with
* text files. Defines the interface called from text parsers to create
* the corresponding value vectors (record batch).
*/
+interface TextOutput {
-abstract class TextOutput {
-
- public abstract void startRecord();
+ /**
+ * Start processing a new record.
+ */
+ void startRecord();
/**
* Start processing a new field within a record.
- * @param index index within the record
+ *
+ * @param index index within the record
*/
- public abstract void startField(int index);
+ void startField(int index);
/**
* End processing a field within a record.
- * @return true if engine should continue processing record. false if rest of record can be skipped.
+ *
+ * @return true if engine should continue processing record. false if rest of record can be skipped.
*/
- public abstract boolean endField();
+ boolean endField();
/**
* Shortcut that lets the output know that we are closing ending a field with no data.
+ *
* @return true if engine should continue processing record. false if rest of record can be skipped.
*/
- public abstract boolean endEmptyField();
+ boolean endEmptyField();
/**
- * Add the provided data but drop any whitespace.
- * @param data character to append
+ * Appends a byte to the output character data buffer.
+ *
+ * @param data current byte read
*/
- public void appendIgnoringWhitespace(byte data) {
- if (TextReader.isWhite(data)) {
- // noop
- } else {
- append(data);
- }
- }
-
- /**
- * Appends a byte to the output character data buffer
- * @param data current byte read
- */
- public abstract void append(byte data);
+ void append(byte data);
/**
* Completes the processing of a given record. Also completes the processing of the
* last field being read.
*/
- public abstract void finishRecord();
+ void finishRecord();
/**
- * Return the total number of records (across batches) processed
+ * Return the total number of records (across batches) processed
+ *
+ * @return record count
*/
- public abstract long getRecordCount();
+ long getRecordCount();
/**
* Indicates if the current batch is full and reading for this batch
@@ -83,6 +78,5 @@
* the batch to be sent downstream, false if the reader may continue to
* add rows to the current batch
*/
-
- public abstract boolean isFull();
+ boolean isFull();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingContext.java
index 3f6dbeb..5b3f3d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingContext.java
@@ -17,101 +17,130 @@
*/
package org.apache.drill.exec.store.easy.text.reader;
-import java.io.IOException;
-
import com.univocity.parsers.common.ParsingContext;
+import com.univocity.parsers.common.record.Record;
+import com.univocity.parsers.common.record.RecordMetaData;
+
+import java.util.Collections;
+import java.util.Map;
class TextParsingContext implements ParsingContext {
private final TextInput input;
private final TextOutput output;
- protected boolean stopped;
- private int[] extractedIndexes;
+ private boolean stopped;
- public TextParsingContext(TextInput input, TextOutput output) {
+ TextParsingContext(TextInput input, TextOutput output) {
this.input = input;
this.output = output;
}
- /**
- * {@inheritDoc}
- */
+ public boolean isFull() {
+ return output.isFull();
+ }
+
+ public void stop(boolean stopped) {
+ this.stopped = stopped;
+ }
+
@Override
public void stop() {
stopped = true;
}
- /**
- * {@inheritDoc}
- */
@Override
public boolean isStopped() {
return stopped;
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public int errorContentLength() {
+ return -1;
+ }
+
+ @Override
+ public Record toRecord(String[] row) {
+ return null;
+ }
+
+ @Override
+ public RecordMetaData recordMetaData() {
+ return null;
+ }
+
@Override
public long currentLine() {
return input.lineCount();
}
- /**
- * {@inheritDoc}
- */
@Override
public long currentChar() {
return input.charCount();
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public void skipLines(long lines) {
+ }
+
+ @Override
+ public String[] parsedHeaders() {
+ return new String[0];
+ }
+
@Override
public int currentColumn() {
return -1;
}
- /**
- * {@inheritDoc}
- */
@Override
public String[] headers() {
- return new String[]{};
+ return new String[0];
}
- /**
- * {@inheritDoc}
- */
+ @Override
+ public String[] selectedHeaders() {
+ return new String[0];
+ }
+
@Override
public int[] extractedFieldIndexes() {
- return extractedIndexes;
+ return new int[0];
}
- /**
- * {@inheritDoc}
- */
@Override
public long currentRecord() {
return output.getRecordCount();
}
- /**
- * {@inheritDoc}
- */
@Override
public String currentParsedContent() {
- try {
- return input.getStringSinceMarkForError();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ return input.getStringSinceMarkForError();
}
@Override
- public void skipLines(int lines) {
+ public int currentParsedContentLength() {
+ return input.getStringSinceMarkForError().toCharArray().length;
+ }
+
+ @Override
+ public String fieldContentOnError() {
+ return null;
+ }
+
+ @Override
+ public Map<Long, String> comments() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public String lastComment() {
+ return null;
+ }
+
+ @Override
+ public char[] lineSeparator() {
+ return new char[0];
}
@Override
@@ -119,8 +148,14 @@
return false;
}
- public boolean isFull() {
- return output.isFull();
+ @Override
+ public int indexOf(String header) {
+ return -1;
+ }
+
+ @Override
+ public int indexOf(Enum<?> header) {
+ return -1;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java
index a91a8ab..acc1cda 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java
@@ -33,7 +33,7 @@
private final byte delimiter;
private final byte comment;
- private final long maxCharsPerColumn = Character.MAX_VALUE;
+ private final long maxCharsPerColumn = TextFormatPlugin.MAX_CHARS_PER_COLUMN;
private final byte normalizedNewLine = b('\n');
private final byte[] newLineDelimiter;
private final boolean ignoreLeadingWhitespaces = false;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java
index 0ce856e..96d62f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java
@@ -17,13 +17,14 @@
*/
package org.apache.drill.exec.store.easy.text.reader;
-import java.io.IOException;
-
-import org.apache.drill.common.exceptions.UserException;
-
import com.univocity.parsers.common.TextParsingException;
-
import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
/*******************************************************************************
* Portions Copyright 2014 uniVocity Software Pty Ltd
@@ -34,11 +35,10 @@
* DrillBuf support.
*/
public final class TextReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextReader.class);
- private static final byte NULL_BYTE = (byte) '\0';
+ private static final Logger logger = LoggerFactory.getLogger(TextReader.class);
- public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024;
+ private static final byte NULL_BYTE = (byte) TextFormatPlugin.NULL_CHAR;
private final TextParsingContext context;
@@ -102,7 +102,7 @@
* any ASCII <= ' ' is considered a white space. However since byte in JAVA is signed
* we have an additional check to make sure its not negative
*/
- static final boolean isWhite(byte b){
+ static boolean isWhite(byte b){
return b <= ' ' && b > -1;
}
@@ -252,12 +252,15 @@
prev = NULL_BYTE;
} else {
prev = ch;
+ // read next char taking into account it can be new line indicator
+ // to ensure that custom new line will be replaced with normalized one
+ ch = input.nextChar();
+ continue;
}
} else {
if (prev == quoteEscape) {
output.append(prev);
- }
- else if (prev == quote) { // unescaped quote detected
+ } else if (prev == quote) { // unescaped quote detected
if (parseUnescapedQuotes) {
output.append(prev);
break;
@@ -326,7 +329,7 @@
* @return true if more rows can be read, false if not
* @throws IOException for input file read errors
*/
- private final boolean parseField() throws IOException {
+ private boolean parseField() throws IOException {
output.startField(fieldIndex++);
@@ -375,7 +378,7 @@
* @throws IOException for input file read errors
*/
public final void start() throws IOException {
- context.stopped = false;
+ context.stop(false);
input.start();
}
@@ -386,7 +389,7 @@
*/
public final boolean parseNext() throws IOException {
try {
- while (! context.stopped) {
+ while (!context.isStopped()) {
ch = input.nextChar();
if (ch == comment) {
input.skipLines(1);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/writer/TextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/writer/TextRecordWriter.java
index a114fd4..fa1d62f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/writer/TextRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/writer/TextRecordWriter.java
@@ -17,44 +17,49 @@
*/
package org.apache.drill.exec.store.easy.text.writer;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.List;
-import java.util.Map;
-
+import com.univocity.parsers.csv.CsvFormat;
+import com.univocity.parsers.csv.CsvWriter;
+import com.univocity.parsers.csv.CsvWriterSettings;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.store.StringOutputRecordWriter;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
-import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
public class TextRecordWriter extends StringOutputRecordWriter {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextRecordWriter.class);
+
+ private static final Logger logger = LoggerFactory.getLogger(TextRecordWriter.class);
private final StorageStrategy storageStrategy;
+ private final Configuration fsConf;
+ private FileSystem fs;
private Path cleanUpLocation;
-
private String location;
private String prefix;
-
- private String fieldDelimiter;
private String extension;
+ // indicates number of a file created by this writer: 0_0_{fileNumberIndex}.csv (ex: 0_0_0.csv)
+ private int fileNumberIndex;
- private int index;
- private PrintStream stream = null;
- private FileSystem fs = null;
+ private CsvWriterSettings writerSettings;
+ private CsvWriter writer;
- // Record write status
- private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
- private StringBuilder currentRecord; // contains the current record separated by field delimiter
-
- private Configuration fsConf;
+ // record write status: true once the startRecord() is called until endRecord() is called
+ private boolean fRecordStarted = false;
public TextRecordWriter(BufferAllocator allocator, StorageStrategy storageStrategy, Configuration fsConf) {
super(allocator);
@@ -66,24 +71,38 @@
public void init(Map<String, String> writerOptions) throws IOException {
this.location = writerOptions.get("location");
this.prefix = writerOptions.get("prefix");
- this.fieldDelimiter = writerOptions.get("separator");
- this.extension = writerOptions.get("extension");
this.fs = FileSystem.get(fsConf);
+ String extension = writerOptions.get("extension");
+ this.extension = extension == null ? "" : "." + extension;
+ this.fileNumberIndex = 0;
- this.currentRecord = new StringBuilder();
- this.index = 0;
+ CsvWriterSettings writerSettings = new CsvWriterSettings();
+ writerSettings.setMaxColumns(TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS);
+ writerSettings.setMaxCharsPerColumn(TextFormatPlugin.MAX_CHARS_PER_COLUMN);
+ writerSettings.setHeaderWritingEnabled(Boolean.parseBoolean(writerOptions.get("addHeader")));
+ writerSettings.setQuoteAllFields(Boolean.parseBoolean(writerOptions.get("forceQuotes")));
+ CsvFormat format = writerSettings.getFormat();
+ format.setLineSeparator(writerOptions.get("lineSeparator"));
+ format.setDelimiter(writerOptions.get("fieldDelimiter"));
+ format.setQuote(writerOptions.get("quote").charAt(0));
+ format.setQuoteEscape(writerOptions.get("escape").charAt(0));
+ format.setCharToEscapeQuoteEscaping(TextFormatPlugin.NULL_CHAR); // do not escape "escape" char
+
+ this.writerSettings = writerSettings;
+
+ logger.trace("Text writer settings: {}", this.writerSettings);
}
@Override
- public void startNewSchema(List<String> columnNames) throws IOException {
+ public void startNewSchema(BatchSchema schema) throws IOException {
// wrap up the current file
cleanup();
// open a new file for writing data with new schema
- Path fileName = new Path(location, prefix + "_" + index + "." + extension);
+ Path fileName = new Path(location, String.format("%s_%s%s", prefix, fileNumberIndex, extension));
try {
- // drill text writer does not support partitions, so only one file can be created
+ // Drill text writer does not support partitions, so only one file can be created
// and thus only one location should be deleted in case of abort
// to ensure that our writer was the first to create output file,
// we create empty output file first and fail if file exists
@@ -93,21 +112,26 @@
// we need to re-apply file permission
DataOutputStream fos = fs.create(fileName);
storageStrategy.applyToFile(fs, fileName);
+ logger.debug("Created file: {}.", fileName);
- stream = new PrintStream(fos);
- logger.debug("Created file: {}", fileName);
- } catch (IOException ex) {
- logger.error("Unable to create file: " + fileName, ex);
- throw ex;
+ // increment file number index
+ fileNumberIndex++;
+
+ this.writer = new CsvWriter(fos, writerSettings);
+ } catch (IOException e) {
+ throw new IOException(String.format("Unable to create file: %s.", fileName), e);
}
- index++;
- stream.println(Joiner.on(fieldDelimiter).join(columnNames));
+ if (writerSettings.isHeaderWritingEnabled()) {
+ writer.writeHeaders(StreamSupport.stream(schema.spliterator(), false)
+ .map(MaterializedField::getName)
+ .collect(Collectors.toList()));
+ }
}
@Override
- public void addField(int fieldId, String value) throws IOException {
- currentRecord.append(value + fieldDelimiter);
+ public void addField(int fieldId, String value) {
+ writer.addValue(value);
}
@Override
@@ -115,7 +139,6 @@
if (fRecordStarted) {
throw new IOException("Previous record is not written completely");
}
-
fRecordStarted = true;
}
@@ -125,13 +148,7 @@
throw new IOException("No record is in writing");
}
- // remove the extra delimiter at the end
- currentRecord.deleteCharAt(currentRecord.length()-fieldDelimiter.length());
-
- stream.println(currentRecord.toString());
-
- // reset current record status
- currentRecord.delete(0, currentRecord.length());
+ writer.writeValuesToRow();
fRecordStarted = false;
}
@@ -164,11 +181,16 @@
@Override
public void cleanup() throws IOException {
- super.cleanup();
- if (stream != null) {
- stream.close();
- stream = null;
- logger.debug("closing file");
+ fRecordStarted = false;
+ if (writer != null) {
+ try {
+ writer.close();
+ writer = null;
+ logger.debug("Closed text writer for file: {}.", cleanUpLocation);
+ } catch (IllegalStateException e) {
+ throw new IOException(String.format("Unable to close text writer for file %s: %s",
+ cleanUpLocation, e.getMessage()), e);
+ }
}
}
@@ -180,5 +202,4 @@
cleanUpLocation.toUri().getPath(), fs.getUri());
}
}
-
}
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 7dfafd3..b276389 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -660,6 +660,8 @@
store.table.use_schema_file: false,
store.partition.hash_distribute: false,
store.text.estimated_row_size_bytes: 100.0,
+ store.text.writer.add_header: true,
+ store.text.writer.force_quotes: false,
store.kafka.all_text_mode: false,
store.kafka.read_numbers_as_double: false,
store.kafka.record.reader: "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader",
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java
new file mode 100644
index 0000000..6ad38bc
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.writer;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import static org.junit.Assert.assertEquals;
+
+public class TestTextWriter extends ClusterTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private static final List<String> tablesToDrop = new ArrayList<>();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+ startCluster(builder);
+
+ Map<String, FormatPluginConfig> formats = new HashMap<>();
+
+ TextFormatConfig csv = new TextFormatConfig();
+ csv.extensions = Collections.singletonList("csv");
+ csv.lineDelimiter = "\n";
+ csv.fieldDelimiter = ',';
+ csv.quote = '"';
+ csv.escape = '"';
+ csv.extractHeader = true;
+ formats.put("csv", csv);
+
+ TextFormatConfig tsv = new TextFormatConfig();
+ tsv.extensions = Collections.singletonList("tsv");
+ tsv.lineDelimiter = "\n";
+ tsv.fieldDelimiter = '\t';
+ tsv.quote = '"';
+ tsv.escape = '"';
+ tsv.extractHeader = true;
+ formats.put("tsv", tsv);
+
+ TextFormatConfig custom = new TextFormatConfig();
+ custom.extensions = Collections.singletonList("custom");
+ custom.lineDelimiter = "!";
+ custom.fieldDelimiter = '_';
+ custom.quote = '$';
+ custom.escape = '^';
+ custom.extractHeader = true;
+ formats.put("custom", custom);
+
+ cluster.defineFormats("dfs", formats);
+ }
+
+ @After
+ public void cleanUp() {
+ client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION);
+ client.resetSession(ExecConstants.TEXT_WRITER_ADD_HEADER);
+ client.resetSession(ExecConstants.TEXT_WRITER_FORCE_QUOTES);
+
+ tablesToDrop.forEach(
+ table -> client.runSqlSilently(String.format("drop table if exists %s", table)));
+ }
+
+ @Test
+ public void testWithHeaders() throws Exception {
+ client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+
+ String tableName = "csv_with_headers_table";
+ String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+ tablesToDrop.add(fullTableName);
+
+ queryBuilder().sql("create table %s as select 'a' as col1, 'b' as col2 from (values(1))", fullTableName).run();
+
+ Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.csv");
+ List<String> lines = Files.readAllLines(path);
+ assertEquals(Arrays.asList("col1,col2", "a,b"), lines);
+ }
+
+ @Test
+ public void testWithoutHeaders() throws Exception {
+ client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+ client.alterSession(ExecConstants.TEXT_WRITER_ADD_HEADER, false);
+
+ String tableName = "csv_without_headers_table";
+ String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+ tablesToDrop.add(fullTableName);
+
+ queryBuilder().sql("create table %s as select 'a' as col1, 'b' as col2 from (values(1))", fullTableName).run();
+
+ Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.csv");
+ List<String> lines = Files.readAllLines(path);
+ assertEquals(Collections.singletonList("a,b"), lines);
+ }
+
+ @Test
+ public void testNoQuotes() throws Exception {
+ client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+
+ String tableName = "csv_no_quotes_table";
+ String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+ tablesToDrop.add(fullTableName);
+
+ queryBuilder().sql("create table %s as " +
+ "select 1 as id, 'Bob' as name, 'A B C' as desc from (values(1))", fullTableName).run();
+
+ testBuilder()
+ .sqlQuery("select * from %s", fullTableName)
+ .unOrdered()
+ .baselineColumns("id", "name", "desc")
+ .baselineValues("1", "Bob", "A B C")
+ .go();
+
+ Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.csv");
+ List<String> lines = Files.readAllLines(path);
+ assertEquals(Arrays.asList("id,name,desc", "1,Bob,A B C"), lines);
+ }
+
+ @Test
+ public void testQuotesOnDemand() throws Exception {
+ client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+
+ String tableName = "csv_quotes_on_demand_table";
+ String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+ tablesToDrop.add(fullTableName);
+
+ queryBuilder().sql("create table %s as " +
+ "select 1 as id, 'Bob\nSmith' as name, 'A,B,C' as desc from (values(1))", fullTableName).run();
+
+ testBuilder()
+ .sqlQuery("select * from %s", fullTableName)
+ .unOrdered()
+ .baselineColumns("id", "name", "desc")
+ .baselineValues("1", "Bob\nSmith", "A,B,C")
+ .go();
+
+ Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.csv");
+ List<String> lines = Files.readAllLines(path);
+ assertEquals(Arrays.asList("id,name,desc", "1,\"Bob", "Smith\",\"A,B,C\""), lines);
+ }
+
+ @Test
+ public void testForceQuotes() throws Exception {
+ client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+ client.alterSession(ExecConstants.TEXT_WRITER_FORCE_QUOTES, true);
+
+ String tableName = "csv_force_quotes_table";
+ String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+ tablesToDrop.add(fullTableName);
+
+ queryBuilder().sql("create table %s as " +
+ "select 1 as id, 'Bob' as name, 'A,B,C' as desc from (values(1))", fullTableName).run();
+
+ testBuilder()
+ .sqlQuery("select * from %s", fullTableName)
+ .unOrdered()
+ .baselineColumns("id", "name", "desc")
+ .baselineValues("1", "Bob", "A,B,C")
+ .go();
+
+ Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.csv");
+ List<String> lines = Files.readAllLines(path);
+ assertEquals(Arrays.asList("\"id\",\"name\",\"desc\"", "\"1\",\"Bob\",\"A,B,C\""), lines);
+ }
+
+ @Test
+ public void testTsv() throws Exception {
+ client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "tsv");
+
+ String tableName = "tsv_table";
+ String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+ tablesToDrop.add(fullTableName);
+
+ queryBuilder().sql("create table %s as " +
+ "select 1 as id, 'Bob\tSmith' as name, 'A\"B\"C' as desc from (values(1))", fullTableName).run();
+
+ testBuilder()
+ .sqlQuery("select * from %s", fullTableName)
+ .unOrdered()
+ .baselineColumns("id", "name", "desc")
+ .baselineValues("1", "Bob\tSmith", "A\"B\"C")
+ .go();
+
+ Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.tsv");
+ List<String> lines = Files.readAllLines(path);
+ assertEquals(Arrays.asList("id\tname\tdesc", "1\t\"Bob\tSmith\"\tA\"B\"C"), lines);
+ }
+
+ @Test
+ public void testCustomFormat() throws Exception {
+ client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "custom");
+
+ String tableName = "custom_format_table";
+ String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+ tablesToDrop.add(fullTableName);
+
+ queryBuilder().sql("create table %s as " +
+ "select 1 as `id_`, 'Bob$Smith' as name, 'A^B!C' as desc from (values(1))", fullTableName).run();
+
+ testBuilder()
+ .sqlQuery("select * from %s", fullTableName)
+ .unOrdered()
+ .baselineColumns("id_", "name", "desc")
+ .baselineValues("1", "Bob$Smith", "A^B!C")
+ .go();
+
+ Path path = Paths.get(dirTestWatcher.getDfsTestTmpDir().getAbsolutePath(), tableName, "0_0_0.custom");
+ List<String> lines = Files.readAllLines(path);
+ assertEquals(Collections.singletonList("$id_$_name_desc!1_Bob$Smith_$A^B!C$!"), lines);
+ }
+
+ @Test
+ public void testLineDelimiterLengthLimit() throws Exception {
+ TextFormatConfig incorrect = new TextFormatConfig();
+ incorrect.lineDelimiter = "end";
+ cluster.defineFormat("dfs", "incorrect", incorrect);
+
+ client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "incorrect");
+
+ String tableName = "incorrect_line_delimiter_table";
+ String fullTableName = String.format("dfs.tmp.`%s`", tableName);
+ tablesToDrop.add(fullTableName);
+
+ // univocity-parsers allow only 1 - 2 characters line separators
+ thrown.expect(UserException.class);
+ thrown.expectMessage("Invalid line separator");
+
+ queryBuilder().sql("create table %s as select 1 as id from (values(1))", fullTableName).run();
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
index 67ca4c1..a22d11c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
@@ -17,24 +17,25 @@
*/
package org.apache.drill.exec.store.easy.text.compliant;
-import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import org.apache.drill.TestSelectWithOption;
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
-import org.apache.drill.TestSelectWithOption;
-import org.apache.drill.categories.RowSetTests;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
-import org.apache.drill.exec.physical.rowSet.RowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
-import org.apache.drill.test.rowSet.RowSetUtilities;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertTrue;
/**
* Test table properties with the compliant text reader. The
@@ -49,7 +50,7 @@
* using that schema rather than using the "columns" array
* column.
*
- * @see {@link TestSelectWithOption} for similar tests using table
+ * @see TestSelectWithOption for similar tests using table
* properties within SQL
*/
@@ -89,9 +90,9 @@
.build();
}
- public static String SELECT_ALL = "SELECT * FROM %s";
+ private static final String SELECT_ALL = "SELECT * FROM %s";
- private static String noHeaders[] = {
+ private static final String[] noHeaders = {
"10,fred",
"20,wilma"
};
@@ -122,7 +123,7 @@
}
}
- private static String extraCols[] = {
+ private static final String[] extraCols = {
"10,fred,23.45",
"20,wilma,1234.56,vip"
};
@@ -148,7 +149,7 @@
}
}
- private static String skipHeaders[] = {
+ private static final String[] skipHeaders = {
"ignore,me",
"10,fred",
"20,wilma"
@@ -180,7 +181,7 @@
}
}
- private static String withHeaders[] = {
+ private static final String[] withHeaders = {
"id, name",
"10,fred",
"20,wilma"
@@ -220,7 +221,7 @@
}
}
- private static String barDelim[] = {
+ private static final String[] barDelim = {
"10|fred",
"20|wilma"
};
@@ -241,7 +242,7 @@
}
}
- private static String customCommentChar[] = {
+ private static final String[] customCommentChar = {
"@Comment",
"#10,fred",
"#20,wilma"
@@ -273,7 +274,7 @@
}
}
- private static String noCommentChar[] = {
+ private static final String[] noCommentChar = {
"#10,fred",
"#20,wilma"
};
@@ -301,7 +302,7 @@
}
}
- private static String quotesData[] = {
+ private static final String[] quotesData = {
"1,@foo@",
"2,@foo~@bar@",
@@ -342,7 +343,7 @@
}
}
- private static String doubleQuotesData[] = {
+ private static final String[] doubleQuotesData = {
"1,@foo@",
"2,@foo@@bar@",
};
@@ -380,7 +381,38 @@
}
}
- private static String specialCharsData[] = {
+ private static final String[] quotesAndCustomNewLineData = {
+ "1,@foo@!2,@foo@@bar@!",
+ };
+
+ @Test
+ public void testQuotesAndCustomNewLine() throws Exception {
+ try {
+ enableSchemaSupport();
+ String tablePath = buildTable("quotesAndCustomNewLine", quotesAndCustomNewLineData);
+ String sql = "create schema () " +
+ "for table " + tablePath + " PROPERTIES ('" +
+ TextFormatPlugin.HAS_HEADERS_PROP + "'='false', '" +
+ TextFormatPlugin.SKIP_FIRST_LINE_PROP + "'='false', '" +
+ TextFormatPlugin.LINE_DELIM_PROP + "'='!', '" +
+ TextFormatPlugin.QUOTE_PROP + "'='@', '" +
+ TextFormatPlugin.QUOTE_ESCAPE_PROP + "'='@')";
+ run(sql);
+ RowSet actual = client.queryBuilder().sql(SELECT_ALL, tablePath).rowSet();
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addArray("columns", MinorType.VARCHAR)
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addSingleCol(strArray("1", "foo"))
+ .addSingleCol(strArray("2", "foo@bar"))
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetSchemaSupport();
+ }
+ }
+
+ private static final String[] specialCharsData = {
"10\u0001'fred'",
"20\u0001'wilma'"
};
@@ -428,7 +460,7 @@
enableSchemaSupport();
String tableName = "newline";
File rootDir = new File(testDir, tableName);
- rootDir.mkdir();
+ assertTrue(rootDir.mkdir());
try(PrintWriter out = new PrintWriter(new FileWriter(new File(rootDir, ROOT_FILE)))) {
out.print("1,fred\r2,wilma\r");
}
@@ -453,7 +485,7 @@
}
}
- private static String messyQuotesData[] = {
+ private static final String[] messyQuotesData = {
"first\"field\"here,another \"field",
"end quote\",another\"",
"many\"\"\"\",more\"\"",
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index a26cf2f..91584dd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -17,6 +17,35 @@
*/
package org.apache.drill.test;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ZookeeperHelper;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.store.SchemaFactory;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistryImpl;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.drill.exec.store.mock.MockStorageEngine;
+import org.apache.drill.exec.store.mock.MockStorageEngineConfig;
+import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
+import org.apache.drill.exec.util.StoragePluginTestUtils;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.io.Resources;
+import org.apache.drill.test.DrillTestWrapper.TestServices;
+
import java.io.File;
import java.io.IOException;
import java.net.URI;
@@ -33,35 +62,6 @@
import java.util.Optional;
import java.util.Properties;
-import org.apache.drill.exec.store.SchemaFactory;
-import org.apache.drill.test.DrillTestWrapper.TestServices;
-import org.apache.drill.common.config.DrillProperties;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.logical.FormatPluginConfig;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ZookeeperHelper;
-import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.RootAllocatorFactory;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.UserBitShared.QueryType;
-import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.RemoteServiceSet;
-import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.StoragePluginRegistryImpl;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
-import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.dfs.WorkspaceConfig;
-import org.apache.drill.exec.store.mock.MockStorageEngine;
-import org.apache.drill.exec.store.mock.MockStorageEngineConfig;
-import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
-import org.apache.drill.exec.util.StoragePluginTestUtils;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.io.Resources;
-
import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_TMP_SCHEMA;
import static org.apache.drill.exec.util.StoragePluginTestUtils.ROOT_SCHEMA;
import static org.apache.drill.exec.util.StoragePluginTestUtils.TMP_SCHEMA;
@@ -416,6 +416,10 @@
}
}
zkHelper = null;
+
+ if (ex != null) {
+ throw ex;
+ }
}
/**
@@ -487,7 +491,7 @@
}
}
- public static void defineWorkspace(Drillbit drillbit, String pluginName,
+ private void defineWorkspace(Drillbit drillbit, String pluginName,
String schemaName, String path, String defaultFormat, FormatPluginConfig format)
throws ExecutionSetupException {
final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
@@ -500,19 +504,55 @@
.ifPresent(newWorkspaces::putAll);
newWorkspaces.put(schemaName, newTmpWSConfig);
- Map<String, FormatPluginConfig> newFormats = new HashMap<>(pluginConfig.getFormats());
+ Map<String, FormatPluginConfig> newFormats = new HashMap<>();
Optional.ofNullable(pluginConfig.getFormats())
.ifPresent(newFormats::putAll);
Optional.ofNullable(format)
.ifPresent(f -> newFormats.put(defaultFormat, f));
- FileSystemConfig newPluginConfig = new FileSystemConfig(
- pluginConfig.getConnection(),
- pluginConfig.getConfig(),
- newWorkspaces,
- newFormats);
- newPluginConfig.setEnabled(pluginConfig.isEnabled());
+ updatePlugin(pluginRegistry, pluginName, pluginConfig, newWorkspaces, newFormats);
+ }
+ public void defineFormat(String pluginName, String name, FormatPluginConfig config) {
+ defineFormats(pluginName, ImmutableMap.of(name, config));
+ }
+
+ public void defineFormats(String pluginName, Map<String, FormatPluginConfig> formats) {
+ for (Drillbit bit : drillbits()) {
+ try {
+ defineFormats(bit, pluginName, formats);
+ } catch (ExecutionSetupException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ private void defineFormats(Drillbit drillbit,
+ String pluginName,
+ Map<String, FormatPluginConfig> formats) throws ExecutionSetupException {
+ StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
+ FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin(pluginName);
+ FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
+
+ Map<String, FormatPluginConfig> newFormats = new HashMap<>();
+ Optional.ofNullable(pluginConfig.getFormats())
+ .ifPresent(newFormats::putAll);
+ newFormats.putAll(formats);
+
+ updatePlugin(pluginRegistry, pluginName, pluginConfig, null, newFormats);
+ }
+
+ private void updatePlugin(StoragePluginRegistry pluginRegistry,
+ String pluginName,
+ FileSystemConfig pluginConfig,
+ Map<String, WorkspaceConfig> newWorkspaces,
+ Map<String, FormatPluginConfig> newFormats) throws ExecutionSetupException {
+ FileSystemConfig newPluginConfig = new FileSystemConfig(
+ pluginConfig.getConnection(),
+ pluginConfig.getConfig(),
+ newWorkspaces == null ? pluginConfig.getWorkspaces() : newWorkspaces,
+ newFormats == null ? pluginConfig.getFormats() : newFormats);
+ newPluginConfig.setEnabled(pluginConfig.isEnabled());
pluginRegistry.createOrUpdate(pluginName, newPluginConfig, true);
}
@@ -580,8 +620,8 @@
* Return a cluster fixture built with standard options. This is a short-cut
* for simple tests that don't need special setup.
*
+ * @param dirTestWatcher directory test watcher
* @return a cluster fixture with standard options
- * @throws Exception if something goes wrong
*/
public static ClusterFixture standardCluster(BaseDirTestWatcher dirTestWatcher) {
return builder(dirTestWatcher).build();