| ################################################################################ | 
 | #  Licensed to the Apache Software Foundation (ASF) under one | 
 | #  or more contributor license agreements.  See the NOTICE file | 
 | #  distributed with this work for additional information | 
 | #  regarding copyright ownership.  The ASF licenses this file | 
 | #  to you under the Apache License, Version 2.0 (the | 
 | #  "License"); you may not use this file except in compliance | 
 | #  with the License.  You may obtain a copy of the License at | 
 | # | 
 | #      http://www.apache.org/licenses/LICENSE-2.0 | 
 | # | 
 | #  Unless required by applicable law or agreed to in writing, software | 
 | #  distributed under the License is distributed on an "AS IS" BASIS, | 
 | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 | #  See the License for the specific language governing permissions and | 
 | # limitations under the License. | 
 | ################################################################################ | 
 | from typing import Dict, Union, List, Optional | 
 |  | 
 | from pyflink.common.config_options import ConfigOption | 
 | from pyflink.java_gateway import get_gateway | 
 | from pyflink.table.schema import Schema | 
 | from pyflink.util.java_utils import to_jarray | 
 |  | 
 | __all__ = ['TableDescriptor', 'FormatDescriptor'] | 
 |  | 
 |  | 
 | class TableDescriptor(object): | 
 |     """ | 
 |     Describes a CatalogTable representing a source or sink. | 
 |  | 
 |     TableDescriptor is a template for creating a CatalogTable instance. It closely resembles the | 
 |     "CREATE TABLE" SQL DDL statement, containing schema, connector options, and other | 
 |     characteristics. Since tables in Flink are typically backed by external systems, the | 
 |     descriptor describes how a connector (and possibly its format) are configured. | 
 |  | 
 |     This can be used to register a table in the Table API, see :func:`create_temporary_table` in | 
 |     TableEnvironment. | 
 |     """ | 
 |  | 
 |     def __init__(self, j_table_descriptor): | 
 |         self._j_table_descriptor = j_table_descriptor | 
 |  | 
 |     @staticmethod | 
 |     def for_connector(connector: str) -> 'TableDescriptor.Builder': | 
 |         """ | 
 |         Creates a new :class:`~pyflink.table.TableDescriptor.Builder` for a table using the given | 
 |         connector. | 
 |  | 
 |         :param connector: The factory identifier for the connector. | 
 |         """ | 
 |         gateway = get_gateway() | 
 |         j_builder = gateway.jvm.TableDescriptor.forConnector(connector) | 
 |         return TableDescriptor.Builder(j_builder) | 
 |  | 
 |     def get_schema(self) -> Optional[Schema]: | 
 |         j_schema = self._j_table_descriptor.getSchema() | 
 |         if j_schema.isPresent(): | 
 |             return Schema(j_schema.get()) | 
 |         else: | 
 |             return None | 
 |  | 
 |     def get_options(self) -> Dict[str, str]: | 
 |         return self._j_table_descriptor.getOptions() | 
 |  | 
 |     def get_partition_keys(self) -> List[str]: | 
 |         return self._j_table_descriptor.getPartitionKeys() | 
 |  | 
 |     def get_comment(self) -> Optional[str]: | 
 |         j_comment = self._j_table_descriptor.getComment() | 
 |         if j_comment.isPresent(): | 
 |             return j_comment.get() | 
 |         else: | 
 |             return None | 
 |  | 
 |     def __str__(self): | 
 |         return self._j_table_descriptor.toString() | 
 |  | 
 |     def __eq__(self, other): | 
 |         return (self.__class__ == other.__class__ and | 
 |                 self._j_table_descriptor.equals(other._j_table_descriptor)) | 
 |  | 
 |     def __hash__(self): | 
 |         return self._j_table_descriptor.hashCode() | 
 |  | 
 |     class Builder(object): | 
 |         """ | 
 |         Builder for TableDescriptor. | 
 |         """ | 
 |  | 
 |         def __init__(self, j_builder): | 
 |             self._j_builder = j_builder | 
 |  | 
 |         def schema(self, schema: Schema) -> 'TableDescriptor.Builder': | 
 |             """ | 
 |             Define the schema of the TableDescriptor. | 
 |             """ | 
 |             self._j_builder.schema(schema._j_schema) | 
 |             return self | 
 |  | 
 |         def option(self, key: Union[str, ConfigOption], value) -> 'TableDescriptor.Builder': | 
 |             """ | 
 |             Sets the given option on the table. | 
 |  | 
 |             Option keys must be fully specified. When defining options for a Format, use | 
 |             format(FormatDescriptor) instead. | 
 |  | 
 |             Example: | 
 |             :: | 
 |  | 
 |                 >>> TableDescriptor.for_connector("kafka") \ | 
 |                 ...     .option("scan.startup.mode", "latest-offset") \ | 
 |                 ...     .build() | 
 |  | 
 |             """ | 
 |             if isinstance(key, str): | 
 |                 self._j_builder.option(key, value) | 
 |             else: | 
 |                 self._j_builder.option(key._j_config_option, value) | 
 |             return self | 
 |  | 
 |         def format(self, | 
 |                    format: Union[str, 'FormatDescriptor'], | 
 |                    format_option: ConfigOption[str] = None) -> 'TableDescriptor.Builder': | 
 |             """ | 
 |             Defines the format to be used for this table. | 
 |  | 
 |             Note that not every connector requires a format to be specified, while others may use | 
 |             multiple formats. | 
 |  | 
 |             Example: | 
 |             :: | 
 |  | 
 |                 >>> TableDescriptor.for_connector("kafka") \ | 
 |                 ...     .format(FormatDescriptor.for_format("json") | 
 |                 ...                 .option("ignore-parse-errors", "true") | 
 |                 ...                 .build()) | 
 |  | 
 |                 will result in the options: | 
 |  | 
 |                     'format' = 'json' | 
 |                     'json.ignore-parse-errors' = 'true' | 
 |  | 
 |             """ | 
 |             if format_option is None: | 
 |                 if isinstance(format, str): | 
 |                     self._j_builder.format(format) | 
 |                 else: | 
 |                     self._j_builder.format(format._j_format_descriptor) | 
 |             else: | 
 |                 if isinstance(format, str): | 
 |                     self._j_builder.format(format_option._j_config_option, format) | 
 |                 else: | 
 |                     self._j_builder.format( | 
 |                         format_option._j_config_option, format._j_format_descriptor) | 
 |             return self | 
 |  | 
 |         def partitioned_by(self, *partition_keys: str) -> 'TableDescriptor.Builder': | 
 |             """ | 
 |             Define which columns this table is partitioned by. | 
 |             """ | 
 |             gateway = get_gateway() | 
 |             self._j_builder.partitionedBy(to_jarray(gateway.jvm.java.lang.String, partition_keys)) | 
 |             return self | 
 |  | 
 |         def comment(self, comment: str) -> 'TableDescriptor.Builder': | 
 |             """ | 
 |             Define the comment for this table. | 
 |             """ | 
 |             self._j_builder.comment(comment) | 
 |             return self | 
 |  | 
 |         def build(self) -> 'TableDescriptor': | 
 |             """ | 
 |             Returns an immutable instance of :class:`~pyflink.table.TableDescriptor`. | 
 |             """ | 
 |             return TableDescriptor(self._j_builder.build()) | 
 |  | 
 |  | 
 | class FormatDescriptor(object): | 
 |     """ | 
 |     Describes a Format and its options for use with :class:`~pyflink.table.TableDescriptor`. | 
 |  | 
 |     Formats are responsible for encoding and decoding data in table connectors. Note that not | 
 |     every connector has a format, while others may have multiple formats (e.g. the Kafka connector | 
 |     has separate formats for keys and values). Common formats are "json", "csv", "avro", etc. | 
 |     """ | 
 |  | 
 |     def __init__(self, j_format_descriptor): | 
 |         self._j_format_descriptor = j_format_descriptor | 
 |  | 
 |     @staticmethod | 
 |     def for_format(format: str) -> 'FormatDescriptor.Builder': | 
 |         """ | 
 |         Creates a new :class:`~pyflink.table.FormatDescriptor.Builder` describing a format with the | 
 |         given format identifier. | 
 |  | 
 |         :param format: The factory identifier for the format. | 
 |         """ | 
 |         gateway = get_gateway() | 
 |         j_builder = gateway.jvm.FormatDescriptor.forFormat(format) | 
 |         return FormatDescriptor.Builder(j_builder) | 
 |  | 
 |     def get_format(self) -> str: | 
 |         return self._j_format_descriptor.getFormat() | 
 |  | 
 |     def get_options(self) -> Dict[str, str]: | 
 |         return self._j_format_descriptor.getOptions() | 
 |  | 
 |     def __str__(self): | 
 |         return self._j_format_descriptor.toString() | 
 |  | 
 |     def __eq__(self, other): | 
 |         return (self.__class__ == other.__class__ and | 
 |                 self._j_format_descriptor.equals(other._j_format_descriptor)) | 
 |  | 
 |     def __hash__(self): | 
 |         return self._j_format_descriptor.hashCode() | 
 |  | 
 |     class Builder(object): | 
 |         """ | 
 |         Builder for FormatDescriptor. | 
 |         """ | 
 |  | 
 |         def __init__(self, j_builder): | 
 |             self._j_builder = j_builder | 
 |  | 
 |         def option(self, key: Union[str, ConfigOption], value) -> 'FormatDescriptor.Builder': | 
 |             """ | 
 |             Sets the given option on the format. | 
 |  | 
 |             Note that format options must not be prefixed with the format identifier itself here. | 
 |  | 
 |             Example: | 
 |             :: | 
 |  | 
 |                 >>> FormatDescriptor.for_format("json") \ | 
 |                 ...     .option("ignore-parse-errors", "true") \ | 
 |                 ...     .build() | 
 |  | 
 |                 will automatically be converted into its prefixed form: | 
 |  | 
 |                     'format' = 'json' | 
 |                     'json.ignore-parse-errors' = 'true' | 
 |  | 
 |             """ | 
 |             if isinstance(key, str): | 
 |                 self._j_builder.option(key, value) | 
 |             else: | 
 |                 self._j_builder.option(key._j_config_option, value) | 
 |             return self | 
 |  | 
 |         def build(self) -> 'FormatDescriptor': | 
 |             """ | 
 |             Returns an immutable instance of :class:`~pyflink.table.FormatDescriptor`. | 
 |             """ | 
 |             return FormatDescriptor(self._j_builder.build()) |