blob: eb56d45ecf5d37b76fb04c51edcaf30c1a91c789 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.descriptors
import java.util
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableSchema, ValidationException}
import org.apache.flink.table.descriptors.CsvValidator._
import org.apache.flink.table.utils.TypeStringUtils
import scala.collection.mutable
import scala.collection.JavaConverters._
/**
* Format descriptor for comma-separated values (CSV).
*/
class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
private var fieldDelim: Option[String] = None
private var lineDelim: Option[String] = None
private val schema: mutable.LinkedHashMap[String, String] =
mutable.LinkedHashMap[String, String]()
private var quoteCharacter: Option[Character] = None
private var commentPrefix: Option[String] = None
private var isIgnoreFirstLine: Option[Boolean] = None
private var lenient: Option[Boolean] = None
/**
* Sets the field delimiter, "," by default.
*
* @param delim the field delimiter
*/
def fieldDelimiter(delim: String): Csv = {
this.fieldDelim = Some(delim)
this
}
/**
* Sets the line delimiter, "\n" by default.
*
* @param delim the line delimiter
*/
def lineDelimiter(delim: String): Csv = {
this.lineDelim = Some(delim)
this
}
/**
* Sets the format schema with field names and the types. Required.
* The table schema must not contain nested fields.
*
* This method overwrites existing fields added with [[field()]].
*
* @param schema the table schema
*/
def schema(schema: TableSchema): Csv = {
this.schema.clear()
schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) =>
field(n, t)
}
this
}
/**
* Adds a format field with the field name and the type information. Required.
* This method can be called multiple times. The call order of this method defines
* also the order of the fields in the format.
*
* @param fieldName the field name
* @param fieldType the type information of the field
*/
def field(fieldName: String, fieldType: TypeInformation[_]): Csv = {
field(fieldName, TypeStringUtils.writeTypeInfo(fieldType))
this
}
/**
* Adds a format field with the field name and the type string. Required.
* This method can be called multiple times. The call order of this method defines
* also the order of the fields in the format.
*
* @param fieldName the field name
* @param fieldType the type string of the field
*/
def field(fieldName: String, fieldType: String): Csv = {
if (schema.contains(fieldName)) {
throw new ValidationException(s"Duplicate field name $fieldName.")
}
schema += (fieldName -> fieldType)
this
}
/**
* Sets a quote character for String values, null by default.
*
* @param quote the quote character
*/
def quoteCharacter(quote: Character): Csv = {
this.quoteCharacter = Option(quote)
this
}
/**
* Sets a prefix to indicate comments, null by default.
*
* @param prefix the prefix to indicate comments
*/
def commentPrefix(prefix: String): Csv = {
this.commentPrefix = Option(prefix)
this
}
/**
* Ignore the first line. Not skip the first line by default.
*/
def ignoreFirstLine(): Csv = {
this.isIgnoreFirstLine = Some(true)
this
}
/**
* Skip records with parse error instead to fail. Throw an exception by default.
*/
def ignoreParseErrors(): Csv = {
this.lenient = Some(true)
this
}
override protected def toFormatProperties: util.Map[String, String] = {
val properties = new DescriptorProperties()
fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))
lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))
val subKeys = util.Arrays.asList(
DescriptorProperties.TABLE_SCHEMA_NAME,
DescriptorProperties.TABLE_SCHEMA_TYPE)
val subValues = schema.map(e => util.Arrays.asList(e._1, e._2)).toList.asJava
properties.putIndexedFixedProperties(
FORMAT_FIELDS,
subKeys,
subValues)
quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _))
commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _))
isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, _))
lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _))
properties.asMap()
}
}
/**
* Format descriptor for comma-separated values (CSV).
*/
object Csv {
/**
* Format descriptor for comma-separated values (CSV).
*
* @deprecated Use `new Csv()`.
*/
@deprecated
def apply(): Csv = new Csv()
}