blob: ae293e0d79f0417f027bbe23aba978195c07e8c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.factories.csv;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.RichTableSchema;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.factories.BatchCompatibleTableSinkFactory;
import org.apache.flink.table.factories.BatchTableSourceFactory;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.sinks.BatchCompatibleStreamTableSink;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sinks.csv.CsvTableSink;
import org.apache.flink.table.sinks.csv.RetractCsvTableSink;
import org.apache.flink.table.sinks.csv.UpsertCsvTableSink;
import org.apache.flink.table.sources.BatchTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.csv.CsvTableSource;
import org.apache.flink.table.util.TableProperties;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.commons.lang3.StringEscapeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import scala.Option;
import scala.Some;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
/**
* A CSV table factory.
*/
public class CsvTableFactory implements
StreamTableSourceFactory<BaseRow>,
BatchTableSourceFactory<BaseRow>,
StreamTableSinkFactory<Object>,
BatchCompatibleTableSinkFactory<Object> {
private static final Logger LOG = LoggerFactory.getLogger(CsvTableFactory.class);
@Override
public BatchCompatibleStreamTableSink<Object> createBatchCompatibleTableSink(Map<String, String> properties) {
return (BatchCompatibleStreamTableSink<Object>) createCsvTableSink(properties, false);
}
@Override
public BatchTableSource<BaseRow> createBatchTableSource(Map<String, String> properties) {
return createCsvTableSource(properties);
}
@Override
public StreamTableSink<Object> createStreamTableSink(Map<String, String> properties) {
return (StreamTableSink<Object>) createCsvTableSink(properties, true);
}
@Override
public StreamTableSource<BaseRow> createStreamTableSource(Map<String, String> properties) {
return createCsvTableSource(properties);
}
@Override
public Map<String, String> requiredContext() {
// The connector type should be filesystem and format be CSV, we use CSV as connector type
// to distinguish between Other Csv factories, these factories should be merged.
Map<String, String> context = new HashMap<>();
context.put(CONNECTOR_TYPE, "CSV");
context.put(CONNECTOR_PROPERTY_VERSION, "1");
return context;
}
@Override
public List<String> supportedProperties() {
return CsvOptions.SUPPORTED_KEYS;
}
private CsvTableSource createCsvTableSource(Map<String, String> props) {
TableProperties properties = new TableProperties();
properties.putProperties(props);
RichTableSchema schema = properties.readSchemaFromProperties(null);
final String path = properties.getString(CsvOptions.PATH);
if (StringUtils.isNullOrWhitespaceOnly(path)) {
throw new IllegalArgumentException(CsvOptions.PARAMS_HELP_MSG);
}
final String fieldDelim = getJavaEscapedDelim(properties.getString(CsvOptions.OPTIONAL_FIELD_DELIM));
final String lineDelim = getJavaEscapedDelim(properties.getString(CsvOptions.OPTIONAL_LINE_DELIM));
final String charset = properties.getString(CsvOptions.OPTIONAL_CHARSET);
final boolean emptyColumnAsNull = properties.getBoolean(CsvOptions.EMPTY_COLUMN_AS_NULL);
final String timeZone = properties.getString(CsvOptions.OPTIONAL_TIME_ZONE);
final TimeZone tz = (timeZone == null) ? TimeZone.getTimeZone("UTC") : TimeZone.getTimeZone(timeZone);
final boolean enumerateNestedFiles = properties.getBoolean(CsvOptions.OPTIONAL_ENUMERATE_NESTED_FILES);
CsvTableSource.Builder builder = CsvTableSource.builder()
.path(path)
.fieldDelimiter(fieldDelim)
.lineDelimiter(lineDelim)
.charset(charset)
.fields(schema.getColumnNames(), schema.getColumnTypes(), schema.getNullables())
.timezone(tz)
.setNestedFileEnumerate(enumerateNestedFiles);
Set<Set<String>> uniqueKeys = new HashSet<>();
Set<Set<String>> normalIndexes = new HashSet<>();
if (!schema.getPrimaryKeys().isEmpty()) {
uniqueKeys.add(new HashSet<>(schema.getPrimaryKeys()));
}
for (List<String> uniqueKey : schema.getUniqueKeys()) {
uniqueKeys.add(new HashSet<>(uniqueKey));
}
for (RichTableSchema.Index index : schema.getIndexes()) {
if (index.unique) {
uniqueKeys.add(new HashSet<>(index.keyList));
} else {
normalIndexes.add(new HashSet<>(index.keyList));
}
}
if (!uniqueKeys.isEmpty()) {
builder.uniqueKeys(uniqueKeys);
}
if (!normalIndexes.isEmpty()) {
builder.indexKeys(normalIndexes);
}
if (emptyColumnAsNull) {
builder.enableEmptyColumnAsNull();
}
final String quoteCharacter = getJavaEscapedDelim(properties.getString(CsvOptions.OPTIONAL_QUOTE_CHARACTER));
if (quoteCharacter != null) {
Preconditions.checkArgument(
quoteCharacter.length() == 1,
"quote character should be a single character, " + quoteCharacter + " found.");
builder.quoteCharacter(quoteCharacter.charAt(0));
}
final boolean firstLineAsHeader = properties.getBoolean(CsvOptions.OPTIONAL_FIRST_LINE_AS_HEADER);
if (firstLineAsHeader) {
builder.ignoreFirstLine();
}
final String commentsPrefix = properties.getString(CsvOptions.OPTIONAL_COMMENTS_PREFIX);
if (commentsPrefix != null) {
builder.commentPrefix(commentsPrefix);
}
return builder.build();
}
private TableSink createCsvTableSink(Map<String, String> props, boolean isStreaming) {
TableProperties properties = new TableProperties();
properties.putProperties(props);
RichTableSchema schema = properties.readSchemaFromProperties(null);
final String path = properties.getString(CsvOptions.PATH);
if (StringUtils.isNullOrWhitespaceOnly(path)) {
throw new IllegalArgumentException(CsvOptions.PARAMS_HELP_MSG);
}
final boolean writeModeFlag = properties.getBoolean(CsvOptions.OPTIONAL_OVER_RIDE_MODE);
final FileSystem.WriteMode writeMode =
writeModeFlag ? FileSystem.WriteMode.OVERWRITE : FileSystem.WriteMode.NO_OVERWRITE;
final String fieldDelim = getJavaEscapedDelim(properties.getString(CsvOptions.OPTIONAL_FIELD_DELIM));
final String lineDelim = getJavaEscapedDelim(properties.getString(CsvOptions.OPTIONAL_LINE_DELIM));
final String quoteCharacter = getJavaEscapedDelim(properties.getString(CsvOptions.OPTIONAL_QUOTE_CHARACTER));
if (quoteCharacter != null) {
Preconditions.checkArgument(
quoteCharacter.length() == 1,
"quote character should be a single character, " + quoteCharacter + " found.");
}
final int parallelism = properties.getInteger(CsvOptions.PARALLELISM, -1);
Option numFiles = parallelism == -1 ? Option.apply(null) : new Some(parallelism);
final String timeZone = properties.getString(CsvOptions.OPTIONAL_TIME_ZONE);
final TimeZone tz = (timeZone == null) ? TimeZone.getTimeZone("UTC") : TimeZone.getTimeZone(timeZone);
final String updateMode = properties.getString(CsvOptions.OPTIONAL_UPDATE_MODE);
switch (updateMode.toLowerCase()) {
case "append":
return
new CsvTableSink(
path,
Option.apply(fieldDelim),
Option.apply(lineDelim),
Option.apply(quoteCharacter),
numFiles,
Option.apply(writeMode),
Option.empty(),
Option.apply(tz)
).configure(schema.getColumnNames(), schema.getColumnTypes());
case "retract":
return
new RetractCsvTableSink(
path,
Option.apply(fieldDelim),
Option.apply(lineDelim),
Option.apply(quoteCharacter),
numFiles,
Option.apply(writeMode),
Option.empty(),
Option.apply(tz)
).configure(schema.getColumnNames(), schema.getColumnTypes());
case "upsert":
return new UpsertCsvTableSink(
path,
Option.apply(fieldDelim),
Option.apply(lineDelim),
Option.apply(quoteCharacter),
numFiles,
Option.apply(writeMode),
Option.empty(),
Option.apply(tz)
).configure(schema.getColumnNames(), schema.getColumnTypes());
default:
throw new RuntimeException("Unsupported updateMode: " + updateMode + " for CSV sink.");
}
}
public static String getJavaEscapedDelim(String fieldDelim) {
String unescapedFieldDelim = StringEscapeUtils.unescapeJava(fieldDelim);
if (fieldDelim != null && !fieldDelim.equals(unescapedFieldDelim)) {
LOG.info("Field delimiter unescaped from {} to {}.", fieldDelim, unescapedFieldDelim);
}
return unescapedFieldDelim;
}
}