blob: 20ac3524655b2d21198f7f211506388e8ab493ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog
import java.util
import org.apache.flink.table.api._
import org.apache.flink.table.descriptors.ConnectorDescriptor
import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService, TableSourceParserFactory}
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.util.{Logging, TableProperties}
import org.apache.flink.util.Preconditions
import _root_.scala.collection.JavaConversions._
/**
* The utility class is used to convert CatalogTable to TableSinkTable.
*/
object ExternalTableUtil extends Logging {
private def convertTableSchemaToRichTableSchema(
tableSchema: TableSchema): RichTableSchema = {
val colNames = tableSchema.getFieldNames
val colTypes = tableSchema.getFieldTypes
val colNullables = tableSchema.getFieldNullables
val richTableSchema = new RichTableSchema(
colNames, colTypes, colNullables)
val primaryKeys = tableSchema.getPrimaryKeys
richTableSchema.setPrimaryKey(primaryKeys: _*)
// TODO unique keys of RichTableSchema
// TODO indexes of RichTableSchema
// TODO header fields of RichTableSchema
richTableSchema
}
/**
* Converts table source parser from the given CatalogTable.
*
* @param name the name of the table
* @param table the [[CatalogTable]] instance which to convert
* @param isStreaming Is in streaming mode or not
* @return the extracted parser
*/
def toParser(
name: String, table: CatalogTable, isStreaming: Boolean): TableSourceParser = {
val tableProperties = generateTableProperties(name, table, isStreaming)
try {
val tableFactory = TableFactoryService.find(classOf[TableSourceParserFactory],
getToolDescriptor(getStorageType(name, tableProperties), tableProperties))
tableFactory.createParser(
name, table.getRichTableSchema, tableProperties)
} catch {
// No TableSourceParserFactory found
case e@ (_: AmbiguousTableFactoryException | _: NoMatchingTableFactoryException) => null
}
}
/**
* Converts an [[CatalogTable]] instance to a [[TableSource]] instance
*
* @param name the name of the table source
* @param catalogTable the [[CatalogTable]] instance which to convert
* @param isStreaming is streaming source expected.
* @return converted [[TableSource]] instance from the input catalog table
*/
def toTableSource(
name: String,
catalogTable: CatalogTable,
isStreaming: Boolean): TableSource = {
val tableProperties = generateTableProperties(name, catalogTable, isStreaming)
if (isStreaming) {
val tableFactory = TableFactoryService.find(
classOf[StreamTableSourceFactory[_]],
getToolDescriptor(getStorageType(name, tableProperties), tableProperties))
tableFactory.createStreamTableSource(tableProperties.toKeyLowerCase.toMap)
} else {
val tableFactory = TableFactoryService.find(
classOf[BatchTableSourceFactory[_]],
getToolDescriptor(getStorageType(name, tableProperties), tableProperties))
tableFactory.createBatchTableSource(tableProperties.toKeyLowerCase.toMap)
}
}
/**
* Converts an [[CatalogTable]] instance to a [[TableSink]] instance
*
* @param name name of the table
* @param externalTable the [[CatalogTable]] instance to convert
* @param isStreaming is in streaming mode or not.
* @return
*/
def toTableSink(
name: String,
externalTable: CatalogTable,
isStreaming: Boolean): TableSink[_] = {
val tableProperties: TableProperties = generateTableProperties(name, externalTable, isStreaming)
if (isStreaming) {
val tableFactory = TableFactoryService.find(classOf[StreamTableSinkFactory[_]],
getToolDescriptor(getStorageType(name, tableProperties), tableProperties))
tableFactory.createStreamTableSink(tableProperties.toKeyLowerCase.toMap)
} else {
val tableFactory = TableFactoryService.find(classOf[BatchTableSinkFactory[_]],
getToolDescriptor(getStorageType(name, tableProperties), tableProperties))
tableFactory.createBatchTableSink(tableProperties.toKeyLowerCase.toMap)
}
}
def generateTableProperties(sqlTableName: String,
externalTable: CatalogTable,
isStream: Boolean): TableProperties = {
val tableProperties = new TableProperties()
tableProperties.addAll(externalTable.getProperties)
isStream match {
case true =>
tableProperties.setString(
TableProperties.BLINK_ENVIRONMENT_TYPE_KEY,
TableProperties.BLINK_ENVIRONMENT_STREAM_VALUE)
case false =>
tableProperties.setString(
TableProperties.BLINK_ENVIRONMENT_TYPE_KEY,
TableProperties.BLINK_ENVIRONMENT_BATCHEXEC_VALUE)
}
// we choose table factory based on the connector type.
tableProperties.setString(TableProperties.BLINK_CONNECTOR_TYPE_KEY, externalTable.getTableType)
// put in internal arguments.
tableProperties.putTableNameIntoProperties(sqlTableName)
tableProperties.putSchemaIntoProperties(externalTable.getRichTableSchema)
tableProperties
}
private def normalizeSupportedKeys(props: util.Map[String, String]): util.Map[String, String] = {
val ret = new util.HashMap[String, String]()
ret.putAll(props)
TableProperties.INTERNAL_KEYS foreach(ret.remove(_))
ret
}
private def getToolDescriptor(typeName: String, tableProperties: TableProperties)
: ToolConnectorDescriptor = {
new ToolConnectorDescriptor(typeName,
normalizeSupportedKeys(tableProperties.toKeyLowerCase.toMap))
}
private def getStorageType (tableName: String, properties: TableProperties): String = {
val typeName = properties.getString("type", null)
Preconditions.checkState(typeName != null,
"Property of table %s is missing", typeName)
typeName
}
}
class ToolConnectorDescriptor(typeName: String, properties: util.Map[String, String])
extends ConnectorDescriptor(typeName, 1, false) {
override protected def toConnectorProperties: util.Map[String, String] = {
properties
}
}