blob: 76b30357277c8f624272c3bb9cd3b9c2ccfa1630 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.io;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeConvertUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.TimeZone;
/**
* Base class of CsvOutputFormat, it serializes records to text. The output is
* structured by record delimiters and field delimiters as common in CSV files.
* Record delimiter separate records from each other ('\n' is common). Field
* delimiters separate fields within a record.
*/
@PublicEvolving
public abstract class AbstractCsvOutputFormat<T> extends FileOutputFormat<T> {
private static final long serialVersionUID = 1L;
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(AbstractCsvOutputFormat.class);
// --------------------------------------------------------------------------------------------
public static final String DEFAULT_LINE_DELIMITER = CsvInputFormat.DEFAULT_LINE_DELIMITER;
public static final String DEFAULT_FIELD_DELIMITER = String.valueOf(CsvInputFormat.DEFAULT_FIELD_DELIMITER);
// --------------------------------------------------------------------------------------------
private transient Writer wrt;
private String recordDelimiter;
private String fieldDelimiter;
private String quoteCharacter;
private String doubleQuotes;
private String charsetName;
private boolean allowNullValues;
private boolean quoteStrings = false;
private TimeZone timezone = TimeZone.getTimeZone("UTC");
private boolean outputFieldName = false;
private String[] fieldNames = null;
// --------------------------------------------------------------------------------------------
// Constructors and getters/setters for the configurable parameters
// --------------------------------------------------------------------------------------------
/**
* Creates an instance of AbstractCsvOutputFormat. Lines are separated by the newline character
* '\n', fields are separated by ',', no escape character.
*
* @param outputPath The path where the CSV file is written.
*/
public AbstractCsvOutputFormat(Path outputPath) {
this(outputPath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER);
}
/**
* Creates an instance of AbstractCsvOutputFormat. Lines are separated by the newline character
* '\n', fields by the given field delimiter, no escape character.
*
* @param outputPath The path where the CSV file is written.
* @param fieldDelimiter The delimiter that is used to separate fields in a tuple.
*/
public AbstractCsvOutputFormat(Path outputPath, String fieldDelimiter) {
this(outputPath, DEFAULT_LINE_DELIMITER, fieldDelimiter);
}
/**
* Creates an instance of AbstractCsvOutputFormat. Lines are separated by the given record
* delimiter, fields by the given field delimiter, no escape character.
*
* @param outputPath The path where the CSV file is written.
* @param recordDelimiter The delimiter that is used to separate the record.
* @param fieldDelimiter The delimiter that is used to separate fields in a record.
*/
public AbstractCsvOutputFormat(Path outputPath, String recordDelimiter, String fieldDelimiter) {
this(outputPath, recordDelimiter, fieldDelimiter, null);
}
/**
* Creates an instance of AbstractCsvOutputFormat.
*
* @param outputPath The path where the CSV file is written.
* @param recordDelimiter The delimiter that is used to separate the record.
* @param fieldDelimiter The delimiter that is used to separate fields in a record.
* @param quoteCharacter The quote character that is used to escape other characters.
*/
public AbstractCsvOutputFormat(
Path outputPath, String recordDelimiter, String fieldDelimiter, String quoteCharacter) {
super(outputPath);
setRecordDelimiter(recordDelimiter);
setFieldDelimiter(fieldDelimiter);
setQuoteCharacter(quoteCharacter);
this.allowNullValues = false;
}
/**
* Creates an instance of AbstractCsvOutputFormat.
*
* @param outputPath The path where the CSV file is written.
* @param recordDelimiter The delimiter that is used to separate the record.
* @param fieldDelimiter The delimiter that is used to separate fields in a record.
* @param quoteCharacter The quote character that is used to escape other characters.
*/
public AbstractCsvOutputFormat(
Path outputPath, String recordDelimiter, String fieldDelimiter, char quoteCharacter) {
super(outputPath);
setRecordDelimiter(recordDelimiter);
setFieldDelimiter(fieldDelimiter);
setQuoteCharacter(quoteCharacter);
this.allowNullValues = false;
}
public void setRecordDelimiter(String recordDelimiter) {
Preconditions.checkArgument(recordDelimiter != null, "RecordDelmiter shall not be null.");
this.recordDelimiter = recordDelimiter;
}
public void setFieldDelimiter(String fieldDelimiter) {
Preconditions.checkArgument(fieldDelimiter != null, "FieldDelimiter shall not be null.");
this.fieldDelimiter = fieldDelimiter;
}
public void setQuoteCharacter(String character) {
Preconditions.checkArgument(
character == null || character.length() == 1, "character should be a single character string");
if (character == null) {
this.quoteCharacter = null;
this.doubleQuotes = null;
} else {
this.quoteCharacter = character;
this.doubleQuotes = this.quoteCharacter + this.quoteCharacter;
}
}
public void setQuoteCharacter(char character) {
this.quoteCharacter = Character.toString(character);
this.doubleQuotes = this.quoteCharacter + this.quoteCharacter;
}
/**
* Configures the format to either allow null values (writing an empty field),
* or to throw an exception when encountering a null field.
*
* <p>by default, null values are disallowed.
*
* @param allowNulls Flag to indicate whether the output format should accept null values.
*/
public void setAllowNullValues(boolean allowNulls) {
this.allowNullValues = allowNulls;
}
/**
* Sets the charset with which the CSV strings are written to the file.
* If not specified, the output format uses the systems default character encoding.
*
* @param charsetName The name of charset to use for encoding the output.
*/
public void setCharsetName(String charsetName) {
this.charsetName = charsetName;
}
/**
* Configures whether the output format should quote string values. String values are fields
* of type {@link String} and {@link StringValue}, as well as
* all subclasses of the latter.
*
* <p>By default, strings are not quoted.
*
* @param quoteStrings Flag indicating whether string fields should be quoted.
*/
public void setQuoteStrings(boolean quoteStrings) {
this.quoteStrings = quoteStrings;
}
public TimeZone getTimezone() {
return timezone;
}
public void setTimezone(TimeZone timezone) {
this.timezone = timezone;
}
public void setOutputFieldName(boolean outputFieldName) {
this.outputFieldName = outputFieldName;
}
public void setFieldNames(String[] fieldNames) {
this.fieldNames = fieldNames;
}
// --------------------------------------------------------------------------------------------
@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
this.wrt = this.charsetName == null ? new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096)) :
new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096), this.charsetName);
}
@Override
public void close() throws IOException {
if (wrt != null) {
this.wrt.flush();
this.wrt.close();
// Make sure we set the Writer to null after close. If the operators are
// chained together and closes multiple times, it will throw an IOException.
this.wrt = null;
}
super.close();
}
/**
* Get the specified field of the record.
*
* @param record The processing record.
* @param n The field position.
* @return The specified field of the record.
*/
protected abstract Object getSpecificField(T record, int n);
/**
* Get fields num of the record.
*
* @param record The processing record.
* @return Fields num of the record.
*/
protected abstract int getFieldsNum(T record);
@Override
public void writeRecord(T record) throws IOException {
if (outputFieldName) {
outputFieldNames();
}
outputRecord(record);
}
private void outputFieldNames() throws IOException {
outputFieldName = false;
if (null == fieldNames || fieldNames.length == 0) {
throw new RuntimeException("Field names are empty");
}
for (int i = 0; i < fieldNames.length; i++) {
if (i != 0) {
this.wrt.write(this.fieldDelimiter);
}
this.wrt.write(getEscapedString(fieldNames[i]));
}
// add the record delimiter
this.wrt.write(this.recordDelimiter);
}
private void outputRecord(T record) throws IOException {
int numFields = getFieldsNum(record);
for (int i = 0; i < numFields; i++) {
Object v = getSpecificField(record, i);
if (v != null) {
if (i != 0) {
this.wrt.write(this.fieldDelimiter);
}
if (quoteStrings && quoteCharacter != null &&
(v instanceof String || v instanceof StringValue)) {
String value = v.toString();
this.wrt.write(getQuotedEscapedString(value));
} else if (v instanceof Date) {
String date = TimeConvertUtils.unixDateToString(TimeConvertUtils.toInt((Date) v));
this.wrt.write(date);
} else if (v instanceof Time) {
String time = TimeConvertUtils.unixTimeToString(TimeConvertUtils.toInt((Time) v));
this.wrt.write(time);
} else if (v instanceof Timestamp) {
long val = ((Timestamp) v).getTime();
long offset = timezone.getOffset(val);
String timestamp =
TimeConvertUtils.unixTimestampToString(val + offset, 3);
this.wrt.write(timestamp);
} else if (v instanceof BigDecimal) {
String value = ((BigDecimal) v).toPlainString();
this.wrt.write(value);
} else if (v instanceof byte[]) {
String value = Arrays.toString((byte[]) v);
this.wrt.write(getEscapedString(value));
} else {
String value = v.toString();
this.wrt.write(getEscapedString(value));
}
} else {
if (this.allowNullValues) {
if (i != 0) {
this.wrt.write(this.fieldDelimiter);
}
} else {
throw new RuntimeException("Cannot write tuple with <null> value at position: " + i);
}
}
}
// add the record delimiter
this.wrt.write(this.recordDelimiter);
}
private String getEscapedString(String value) {
if (quoteCharacter == null) {
return value;
} else {
boolean quote = value.contains(quoteCharacter);
boolean hasDelim = value.contains(fieldDelimiter) || value.contains(recordDelimiter);
if (quote || hasDelim) {
return getQuotedEscapedString(value);
} else {
return value;
}
}
}
private String getQuotedEscapedString(String value) {
Preconditions.checkArgument(quoteCharacter != null, "Quote character can't be null");
value = value.replaceAll(quoteCharacter, doubleQuotes);
return quoteCharacter + value + quoteCharacter;
}
// --------------------------------------------------------------------------------------------
@Override
public String toString() {
return this.getClass().getSimpleName() + " (path: " + this.getOutputFilePath() + ", delimiter: " + this.fieldDelimiter + ")";
}
}