| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.table.descriptors |
| |
| import java.util |
| |
| import org.apache.flink.table.api.{TableEnvironment, ValidationException} |
| import org.apache.flink.table.factories.TableFactoryUtil |
| |
| /** |
| * Common class for table's created with [[TableEnvironment.connect(ConnectorDescriptor)]]. |
| */ |
| abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]]( |
| private val tableEnv: TableEnvironment, |
| private val connectorDescriptor: ConnectorDescriptor) |
| extends TableDescriptor |
| with SchematicDescriptor[D] |
| with RegistrableDescriptor { this: D => |
| |
| private var formatDescriptor: Option[FormatDescriptor] = None |
| private var schemaDescriptor: Option[Schema] = None |
| |
| /** |
| * Searches for the specified table source, configures it accordingly, and registers it as |
| * a table under the given name. |
| * |
| * @param name table name to be registered in the table environment |
| */ |
| override def registerTableSource(name: String): Unit = { |
| val tableSource = TableFactoryUtil.findAndCreateTableSource(tableEnv, this) |
| tableEnv.registerTableSource(name, tableSource) |
| } |
| |
| /** |
| * Searches for the specified table sink, configures it accordingly, and registers it as |
| * a table under the given name. |
| * |
| * @param name table name to be registered in the table environment |
| */ |
| override def registerTableSink(name: String): Unit = { |
| val tableSink = TableFactoryUtil.findAndCreateTableSink(tableEnv, this) |
| tableEnv.registerTableSink(name, tableSink) |
| } |
| |
| /** |
| * Searches for the specified table source and sink, configures them accordingly, and registers |
| * them as a table under the given name. |
| * |
| * @param name table name to be registered in the table environment |
| */ |
| override def registerTableSourceAndSink(name: String): Unit = { |
| registerTableSource(name) |
| registerTableSink(name) |
| } |
| |
| /** |
| * Specifies the format that defines how to read data from a connector. |
| */ |
| override def withFormat(format: FormatDescriptor): D = { |
| formatDescriptor = Some(format) |
| this |
| } |
| |
| /** |
| * Specifies the resulting table schema. |
| */ |
| override def withSchema(schema: Schema): D = { |
| schemaDescriptor = Some(schema) |
| this |
| } |
| |
| // ---------------------------------------------------------------------------------------------- |
| |
| /** |
| * Converts this descriptor into a set of properties. |
| */ |
| override def toProperties: util.Map[String, String] = { |
| val properties = new DescriptorProperties() |
| |
| // this performs only basic validation |
| // more validation can only happen within a factory |
| if (connectorDescriptor.isFormatNeeded && formatDescriptor.isEmpty) { |
| throw new ValidationException( |
| s"The connector '$connectorDescriptor' requires a format description.") |
| } else if (!connectorDescriptor.isFormatNeeded && formatDescriptor.isDefined) { |
| throw new ValidationException( |
| s"The connector '$connectorDescriptor' does not require a format description " + |
| s"but '${formatDescriptor.get}' found.") |
| } |
| |
| properties.putProperties(connectorDescriptor.toProperties) |
| formatDescriptor.foreach(d => properties.putProperties(d.toProperties)) |
| schemaDescriptor.foreach(d => properties.putProperties(d.toProperties)) |
| |
| properties.asMap() |
| } |
| } |