blob: 5078727cab30e01069ac1048afaa16a5a439444d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.descriptors
import java.util
import java.util.Optional
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.table.api.{TableException, TableSchema, ValidationException}
import org.apache.flink.table.descriptors.RowtimeValidator._
import org.apache.flink.table.descriptors.SchemaValidator._
import org.apache.flink.table.sources.RowtimeAttributeDescriptor
import org.apache.flink.table.util.JavaScalaConversionUtil.{toJava, toScala}
import scala.collection.JavaConverters._
import scala.collection.mutable
/**
* Validator for [[Schema]].
*/
class SchemaValidator(
isStreamEnvironment: Boolean,
supportsSourceTimestamps: Boolean,
supportsSourceWatermarks: Boolean)
extends DescriptorValidator {
override def validate(properties: DescriptorProperties): Unit = {
val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
val types = properties.getIndexedProperty(SCHEMA, SCHEMA_TYPE)
if (names.isEmpty && types.isEmpty) {
throw new ValidationException(
s"Could not find the required schema in property '$SCHEMA'.")
}
var proctimeFound = false
for (i <- 0 until Math.max(names.size, types.size)) {
properties
.validateString(s"$SCHEMA.$i.$SCHEMA_NAME", false, 1)
properties
.validateType(s"$SCHEMA.$i.$SCHEMA_TYPE", false, false)
properties
.validateString(s"$SCHEMA.$i.$SCHEMA_FROM", true, 1)
// either proctime or rowtime
val proctime = s"$SCHEMA.$i.$SCHEMA_PROCTIME"
val rowtime = s"$SCHEMA.$i.$ROWTIME"
if (properties.containsKey(proctime)) {
// check the environment
if (!isStreamEnvironment) {
throw new ValidationException(
s"Property '$proctime' is not allowed in a batch environment.")
}
// check for only one proctime attribute
else if (proctimeFound) {
throw new ValidationException("A proctime attribute must only be defined once.")
}
// check proctime
properties.validateBoolean(proctime, false)
proctimeFound = properties.getBoolean(proctime)
// no rowtime
properties.validatePrefixExclusion(rowtime)
} else if (properties.hasPrefix(rowtime)) {
// check rowtime
val rowtimeValidator = new RowtimeValidator(
supportsSourceTimestamps,
supportsSourceWatermarks,
s"$SCHEMA.$i.")
rowtimeValidator.validate(properties)
// no proctime
properties.validateExclusion(proctime)
}
}
}
}
object SchemaValidator {
/**
* Prefix for schema-related properties.
*/
val SCHEMA = "schema"
val SCHEMA_NAME = "name"
val SCHEMA_TYPE = "type"
val SCHEMA_PROCTIME = "proctime"
val SCHEMA_FROM = "from"
/**
* Returns keys for a
* [[org.apache.flink.table.factories.TableFormatFactory.supportedProperties()]] method that
* are accepted for schema derivation using [[deriveFormatFields(DescriptorProperties)]].
*/
def getSchemaDerivationKeys: util.List[String] = {
val keys = new util.ArrayList[String]()
// schema
keys.add(SCHEMA + ".#." + SCHEMA_TYPE)
keys.add(SCHEMA + ".#." + SCHEMA_NAME)
keys.add(SCHEMA + ".#." + SCHEMA_FROM)
// time attributes
keys.add(SCHEMA + ".#." + SCHEMA_PROCTIME)
keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE)
keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM)
keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS)
keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED)
keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE)
keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS)
keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED)
keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY)
keys
}
// utilities
/**
* Finds the proctime attribute if defined.
*/
def deriveProctimeAttribute(properties: DescriptorProperties): Optional[String] = {
val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
for (i <- 0 until names.size) {
val isProctime = toScala(
properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
isProctime.foreach { isSet =>
if (isSet) {
return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
}
}
}
toJava(None)
}
/**
* Finds the rowtime attributes if defined.
*/
def deriveRowtimeAttributes(properties: DescriptorProperties)
: util.List[RowtimeAttributeDescriptor] = {
val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
// check for rowtime in every field
for (i <- 0 until names.size) {
RowtimeValidator
.getRowtimeComponents(properties, s"$SCHEMA.$i.")
.foreach { case (extractor, strategy) =>
// create descriptor
attributes += new RowtimeAttributeDescriptor(
properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
extractor,
strategy)
}
}
attributes.asJava
}
/**
* Derives the table schema for a table sink. A sink ignores a proctime attribute and
* needs to track the origin of a rowtime field.
*
* @deprecated This method combines two separate concepts of table schema and field mapping.
* This should be split into two methods once we have support for
* the corresponding interfaces (see FLINK-9870).
*/
@deprecated
def deriveTableSinkSchema(properties: DescriptorProperties): TableSchema = {
val builder = TableSchema.builder()
val schema = properties.getTableSchema(SCHEMA)
schema.getFieldNames.zip(schema.getFieldTypes).zipWithIndex.foreach { case ((n, t), i) =>
val isProctime = properties
.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
.orElse(false)
val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE"
val isRowtime = properties.containsKey(tsType)
if (!isProctime && !isRowtime) {
// check for a aliasing
val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")
.orElse(n)
builder.field(fieldName, t)
}
// only use the rowtime attribute if it references a field
else if (isRowtime) {
properties.getString(tsType) match {
case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD =>
val field = properties.getString(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_FROM")
builder.field(field, t)
// other timestamp strategies require a reverse timestamp extractor to
// insert the timestamp into the output
case t@_ =>
throw new TableException(
s"Unsupported rowtime type '$t' for sink table schema. Currently " +
s"only '$ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD' is supported for table sinks.")
}
}
}
builder.build()
}
/**
* Finds a table source field mapping.
*
* @param properties The properties describing a schema.
* @param inputType The input type that a connector and/or format produces. This parameter
* can be used to resolve a rowtime field against an input field.
*/
def deriveFieldMapping(
properties: DescriptorProperties,
inputType: Optional[TypeInformation[_]])
: util.Map[String, String] = {
val mapping = mutable.Map[String, String]()
val schema = properties.getTableSchema(SCHEMA)
val columnNames = toScala(inputType) match {
case Some(composite: CompositeType[_]) => composite.getFieldNames.toSeq
case _ => Seq[String]()
}
// add all source fields first because rowtime might reference one of them
columnNames.foreach(name => mapping.put(name, name))
// add all schema fields first for implicit mappings
schema.getFieldNames.foreach { name =>
mapping.put(name, name)
}
val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
for (i <- 0 until names.size) {
val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) match {
// add explicit mapping
case Some(source) =>
mapping.put(name, source)
// implicit mapping or time
case None =>
val isProctime = properties
.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
.orElse(false)
val isRowtime = properties
.containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
// remove proctime/rowtime from mapping
if (isProctime || isRowtime) {
mapping.remove(name)
}
// check for invalid fields
else if (!columnNames.contains(name)) {
throw new ValidationException(s"Could not map the schema field '$name' to a field " +
s"from source. Please specify the source field from which it can be derived.")
}
}
}
mapping.toMap.asJava
}
}