blob: ce57070ac98d7ec8454c49a371ecdbc831a01c65 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog
import java.util
import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
import org.apache.flink.table.descriptors._
import org.apache.flink.table.factories.TableFactory
import org.apache.flink.table.plan.stats.TableStats
import org.apache.flink.table.util.JavaScalaConversionUtil.toScala
import scala.collection.JavaConverters._
/**
* Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
* and/or sinks for both batch and stream environments.
*
* See also [[TableFactory]] for more information about how to target suitable factories.
*
* Use [[ExternalCatalogTableBuilder]] to integrate with the normalized descriptor-based API.
*
* @param isBatch Flag whether this external table is intended for batch environments.
* @param isStreaming Flag whether this external table is intended for streaming environments.
* @param isSource Flag whether this external table is declared as table source.
* @param isSink Flag whether this external table is declared as table sink.
* @param properties Properties that describe the table and should match with a [[TableFactory]].
*/
class ExternalCatalogTable(
private val isBatch: Boolean,
private val isStreaming: Boolean,
private val isSource: Boolean,
private val isSink: Boolean,
private val properties: java.util.Map[String, String])
extends TableDescriptor {
// ----------------------------------------------------------------------------------------------
// Legacy code
// ---------------------------------------------------------------------------------------------
/**
* Reads table statistics from the descriptors properties.
*
* @deprecated This method exists for backwards-compatibility only.
*/
@Deprecated
@deprecated
def getTableStats: Option[TableStats] = {
val normalizedProps = new DescriptorProperties()
normalizedProps.putProperties(normalizedProps)
val rowCount = toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
rowCount match {
case Some(cnt) =>
val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS)
Some(TableStats(cnt, columnStats.asJava))
case None =>
None
}
}
// ----------------------------------------------------------------------------------------------
// Getters
// ----------------------------------------------------------------------------------------------
/**
* Returns whether this external table is declared as table source.
*/
def isTableSource: Boolean = {
isSource
}
/**
* Returns whether this external table is declared as table sink.
*/
def isTableSink: Boolean = {
isSink
}
/**
* Returns whether this external table is intended for batch environments.
*/
def isBatchTable: Boolean = {
isBatch
}
/**
* Returns whether this external table is intended for stream environments.
*/
def isStreamTable: Boolean = {
isStreaming
}
// ----------------------------------------------------------------------------------------------
/**
* Converts this descriptor into a set of properties.
*/
override def toProperties: util.Map[String, String] = {
properties
}
}
object ExternalCatalogTable {
/**
* Creates a builder for creating an [[ExternalCatalogTable]].
*
* It takes [[Descriptor]]s which allow for declaring the communication to external
* systems in an implementation-agnostic way. The classpath is scanned for suitable table
* factories that match the desired configuration.
*
* Use the provided builder methods to configure the external catalog table accordingly.
*
* The following example shows how to read from a connector using a JSON format and
* declaring it as a table source:
*
* {{{
* ExternalCatalogTable(
* new ExternalSystemXYZ()
* .version("0.11"))
* .withFormat(
* new Json()
* .jsonSchema("{...}")
* .failOnMissingField(false))
* .withSchema(
* new Schema()
* .field("user-name", "VARCHAR").from("u_name")
* .field("count", "DECIMAL")
* .supportsStreaming()
* .asTableSource()
* }}}
*
* @param connectorDescriptor Connector descriptor describing the external system
* @return External catalog builder
*/
def builder(connectorDescriptor: ConnectorDescriptor): ExternalCatalogTableBuilder = {
new ExternalCatalogTableBuilder(connectorDescriptor)
}
}
/**
* Builder for an [[ExternalCatalogTable]].
*
* @param connectorDescriptor Connector descriptor describing the external system
*/
class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDescriptor)
extends TableDescriptor
with SchematicDescriptor[ExternalCatalogTableBuilder]
with StreamableDescriptor[ExternalCatalogTableBuilder] {
private var isBatch: Boolean = true
private var isStreaming: Boolean = true
private var formatDescriptor: Option[FormatDescriptor] = None
private var schemaDescriptor: Option[Schema] = None
private var statisticsDescriptor: Option[Statistics] = None
private var metadataDescriptor: Option[Metadata] = None
private var updateMode: Option[String] = None
/**
* Specifies the format that defines how to read data from a connector.
*/
override def withFormat(format: FormatDescriptor): ExternalCatalogTableBuilder = {
formatDescriptor = Some(format)
this
}
/**
* Specifies the resulting table schema.
*/
override def withSchema(schema: Schema): ExternalCatalogTableBuilder = {
schemaDescriptor = Some(schema)
this
}
/**
* Declares how to perform the conversion between a dynamic table and an external connector.
*
* In append mode, a dynamic table and an external connector only exchange INSERT messages.
*
* @see See also [[inRetractMode()]] and [[inUpsertMode()]].
*/
override def inAppendMode(): ExternalCatalogTableBuilder = {
updateMode = Some(UPDATE_MODE_VALUE_APPEND)
this
}
/**
* Declares how to perform the conversion between a dynamic table and an external connector.
*
* In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages.
*
* An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an
* UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for
* the updating (new) row.
*
* In this mode, a key must not be defined as opposed to upsert mode. However, every update
* consists of two messages which is less efficient.
*
* @see See also [[inAppendMode()]] and [[inUpsertMode()]].
*/
override def inRetractMode(): ExternalCatalogTableBuilder = {
updateMode = Some(UPDATE_MODE_VALUE_RETRACT)
this
}
/**
* Declares how to perform the conversion between a dynamic table and an external connector.
*
* In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages.
*
* This mode requires a (possibly composite) unique key by which updates can be propagated. The
* external connector needs to be aware of the unique key attribute in order to apply messages
* correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as
* DELETE messages.
*
* The main difference to a retract stream is that UPDATE changes are encoded with a single
* message and are therefore more efficient.
*
* @see See also [[inAppendMode()]] and [[inRetractMode()]].
*/
override def inUpsertMode(): ExternalCatalogTableBuilder = {
updateMode = Some(UPDATE_MODE_VALUE_UPSERT)
this
}
/**
* Specifies the statistics for this external table.
*/
def withStatistics(statistics: Statistics): ExternalCatalogTableBuilder = {
statisticsDescriptor = Some(statistics)
this
}
/**
* Specifies the metadata for this external table.
*/
def withMetadata(metadata: Metadata): ExternalCatalogTableBuilder = {
metadataDescriptor = Some(metadata)
this
}
/**
* Explicitly declares this external table for supporting only stream environments.
*/
def supportsStreaming(): ExternalCatalogTableBuilder = {
isBatch = false
isStreaming = true
this
}
/**
* Explicitly declares this external table for supporting only batch environments.
*/
def supportsBatch(): ExternalCatalogTableBuilder = {
isBatch = true
isStreaming = false
this
}
/**
* Explicitly declares this external table for supporting both batch and stream environments.
*/
def supportsBatchAndStreaming(): ExternalCatalogTableBuilder = {
isBatch = true
isStreaming = true
this
}
/**
* Declares this external table as a table source and returns the
* configured [[ExternalCatalogTable]].
*
* @return External catalog table
*/
def asTableSource(): ExternalCatalogTable = {
new ExternalCatalogTable(
isBatch,
isStreaming,
isSource = true,
isSink = false,
toProperties)
}
/**
* Declares this external table as a table sink and returns the
* configured [[ExternalCatalogTable]].
*
* @return External catalog table
*/
def asTableSink(): ExternalCatalogTable = {
new ExternalCatalogTable(
isBatch,
isStreaming,
isSource = false,
isSink = true,
toProperties)
}
/**
* Declares this external table as both a table source and sink. It returns the
* configured [[ExternalCatalogTable]].
*
* @return External catalog table
*/
def asTableSourceAndSink(): ExternalCatalogTable = {
new ExternalCatalogTable(
isBatch,
isStreaming,
isSource = true,
isSink = true,
toProperties)
}
// ----------------------------------------------------------------------------------------------
/**
* Converts this descriptor into a set of properties.
*/
override def toProperties: util.Map[String, String] = {
val properties = new DescriptorProperties()
properties.putProperties(connectorDescriptor.toProperties)
formatDescriptor.foreach(d => properties.putProperties(d.toProperties))
schemaDescriptor.foreach(d => properties.putProperties(d.toProperties))
statisticsDescriptor.foreach(d => properties.putProperties(d.toProperties))
metadataDescriptor.foreach(d => properties.putProperties(d.toProperties))
updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
properties.asMap()
}
}