| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| from typing import Union, List |
| |
| from pyflink.java_gateway import get_gateway |
| from pyflink.table import Expression |
| from pyflink.table.expression import _get_java_expression |
| from pyflink.table.types import DataType, _to_java_data_type |
| from pyflink.util.java_utils import to_jarray |
| |
| __all__ = ['Schema'] |
| |
| |
| class Schema(object): |
| """ |
| Schema of a table or view. |
| |
| A schema represents the schema part of a {@code CREATE TABLE (schema) WITH (options)} DDL |
| statement in SQL. It defines columns of different kind, constraints, time attributes, and |
| watermark strategies. It is possible to reference objects (such as functions or types) across |
| different catalogs. |
| |
| This class is used in the API and catalogs to define an unresolved schema that will be |
| translated to ResolvedSchema. Some methods of this class perform basic validation, however, the |
| main validation happens during the resolution. Thus, an unresolved schema can be incomplete and |
| might be enriched or merged with a different schema at a later stage. |
| |
| Since an instance of this class is unresolved, it should not be directly persisted. The str() |
| shows only a summary of the contained objects. |
| """ |
| |
| def __init__(self, j_schema): |
| self._j_schema = j_schema |
| |
| @staticmethod |
| def new_builder() -> 'Schema.Builder': |
| gateway = get_gateway() |
| j_builder = gateway.jvm.Schema.newBuilder() |
| return Schema.Builder(j_builder) |
| |
| def __str__(self): |
| return self._j_schema.toString() |
| |
| def __eq__(self, other): |
| return self.__class__ == other.__class__ and self._j_schema.equals(other._j_schema) |
| |
| def __hash__(self): |
| return self._j_schema.hashCode() |
| |
| class Builder(object): |
| """ |
| A builder for constructing an immutable but still unresolved Schema. |
| """ |
| |
| def __init__(self, j_builder): |
| self._j_builder = j_builder |
| |
| def from_schema(self, unresolved_schema: 'Schema') -> 'Schema.Builder': |
| """ |
| Adopts all members from the given unresolved schema. |
| """ |
| self._j_builder.fromSchema(unresolved_schema._j_schema) |
| return self |
| |
| def from_row_data_type(self, data_type: DataType) -> 'Schema.Builder': |
| """ |
| Adopts all fields of the given row as physical columns of the schema. |
| """ |
| self._j_builder.fromRowDataType(_to_java_data_type(data_type)) |
| return self |
| |
| def from_fields(self, |
| field_names: List[str], |
| field_data_types: List[DataType]) -> 'Schema.Builder': |
| """ |
| Adopts the given field names and field data types as physical columns of the schema. |
| """ |
| gateway = get_gateway() |
| j_field_names = to_jarray(gateway.jvm.String, field_names) |
| j_field_data_types = to_jarray( |
| gateway.jvm.AbstractDataType, |
| [_to_java_data_type(field_data_type) for field_data_type in field_data_types]) |
| self._j_builder.fromFields(j_field_names, j_field_data_types) |
| return self |
| |
| def column(self, |
| column_name: str, |
| data_type: Union[str, DataType]) -> 'Schema.Builder': |
| """ |
| Declares a physical column that is appended to this schema. |
| |
| Physical columns are regular columns known from databases. They define the names, the |
| types, and the order of fields in the physical data. Thus, physical columns represent |
| the payload that is read from and written to an external system. Connectors and formats |
| use these columns (in the defined order) to configure themselves. Other kinds of columns |
| can be declared between physical columns but will not influence the final physical |
| schema. |
| |
| :param column_name: Column name |
| :param data_type: Data type of the column |
| """ |
| if isinstance(data_type, str): |
| self._j_builder.column(column_name, data_type) |
| else: |
| self._j_builder.column(column_name, _to_java_data_type(data_type)) |
| return self |
| |
| def column_by_expression(self, |
| column_name: str, |
| expr: Union[str, Expression]) -> 'Schema.Builder': |
| """ |
| Declares a computed column that is appended to this schema. |
| |
| Computed columns are virtual columns that are generated by evaluating an expression |
| that can reference other columns declared in the same table. Both physical columns and |
| metadata columns can be accessed. The column itself is not physically stored within the |
| table. The column’s data type is derived automatically from the given expression and |
| does not have to be declared manually. |
| |
| Computed columns are commonly used for defining time attributes. For example, the |
| computed column can be used if the original field is not TIMESTAMP(3) type or is nested |
| in a JSON string. |
| |
| Example: |
| :: |
| |
| >>> Schema.new_builder(). |
| ... column_by_expression("ts", "orig_ts - INTERVAL '60' MINUTE"). |
| ... column_by_metadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp") |
| |
| :param column_name: Column name |
| :param expr: Computation of the column |
| """ |
| self._j_builder.columnByExpression(column_name, _get_java_expression(expr)) |
| return self |
| |
| def column_by_metadata(self, |
| column_name: str, |
| data_type: Union[DataType, str], |
| metadata_key: str = None, |
| is_virtual: bool = False) -> 'Schema.Builder': |
| """ |
| Declares a metadata column that is appended to this schema. |
| |
| Metadata columns allow to access connector and/or format specific fields for every row |
| of a table. For example, a metadata column can be used to read and write the timestamp |
| from and to Kafka records for time-based operations. The connector and format |
| documentation lists the available metadata fields for every component. |
| |
| Every metadata field is identified by a string-based key and has a documented data |
| type. The metadata key can be omitted if the column name should be used as the |
| identifying metadata key. For convenience, the runtime will perform an explicit cast if |
| the data type of the column differs from the data type of the metadata field. Of course, |
| this requires that the two data types are compatible. |
| |
| By default, a metadata column can be used for both reading and writing. However, in |
| many cases an external system provides more read-only metadata fields than writable |
| fields. Therefore, it is possible to exclude metadata columns from persisting by setting |
| the {@code is_virtual} flag to {@code true}. |
| |
| :param column_name: Column name |
| :param data_type: Data type of the column |
| :param metadata_key: Identifying metadata key, if null the column name will be used as |
| metadata key |
| :param is_virtual: Whether the column should be persisted or not |
| """ |
| if isinstance(data_type, DataType): |
| self._j_builder.columnByMetadata( |
| column_name, |
| _to_java_data_type(data_type), |
| metadata_key, is_virtual) |
| else: |
| self._j_builder.columnByMetadata( |
| column_name, |
| data_type, |
| metadata_key, |
| is_virtual) |
| return self |
| |
| def watermark(self, |
| column_name: str, |
| watermark_expr: Union[str, Expression]) -> 'Schema.Builder': |
| """ |
| Declares that the given column should serve as an event-time (i.e. rowtime) attribute |
| and specifies a corresponding watermark strategy as an expression. |
| |
| The column must be of type {@code TIMESTAMP(3)} or {@code TIMESTAMP_LTZ(3)} and be a |
| top-level column in the schema. It may be a computed column. |
| |
| The watermark generation expression is evaluated by the framework for every record |
| during runtime. The framework will periodically emit the largest generated watermark. If |
| the current watermark is still identical to the previous one, or is null, or the value |
| of the returned watermark is smaller than that of the last emitted one, then no new |
| watermark will be emitted. A watermark is emitted in an interval defined by the |
| configuration. |
| |
| Any scalar expression can be used for declaring a watermark strategy for |
| in-memory/temporary tables. However, currently, only SQL expressions can be persisted in |
| a catalog. The expression's return data type must be {@code TIMESTAMP(3)}. User-defined |
| functions (also defined in different catalogs) are supported. |
| |
| Example: |
| :: |
| |
| >>> Schema.new_builder().watermark("ts", "ts - INTERVAL '5' SECOND") |
| |
| :param column_name: The column name used as a rowtime attribute |
| :param watermark_expr: The expression used for watermark generation |
| """ |
| self._j_builder.watermark(column_name, _get_java_expression(watermark_expr)) |
| return self |
| |
| def primary_key(self, *column_names: str) -> 'Schema.Builder': |
| """ |
| Declares a primary key constraint for a set of given columns. Primary key uniquely |
| identify a row in a table. Neither of columns in a primary can be nullable. The primary |
| key is informational only. It will not be enforced. It can be used for optimizations. It |
| is the data owner's responsibility to ensure uniqueness of the data. |
| |
| The primary key will be assigned a generated name in the format {@code PK_col1_col2}. |
| |
| :param column_names: Columns that form a unique primary key |
| """ |
| gateway = get_gateway() |
| self._j_builder.primaryKey(to_jarray(gateway.jvm.java.lang.String, column_names)) |
| return self |
| |
| def primary_key_named(self, |
| constraint_name: str, |
| *column_names: str) -> 'Schema.Builder': |
| """ |
| Declares a primary key constraint for a set of given columns. Primary key uniquely |
| identify a row in a table. Neither of columns in a primary can be nullable. The primary |
| key is informational only. It will not be enforced. It can be used for optimizations. It |
| is the data owner's responsibility to ensure uniqueness of the data. |
| |
| :param constraint_name: Name for the primary key, can be used to reference the |
| constraint |
| :param column_names: Columns that form a unique primary key |
| """ |
| gateway = get_gateway() |
| self._j_builder.primaryKeyNamed( |
| constraint_name, |
| to_jarray(gateway.jvm.java.lang.String, column_names)) |
| return self |
| |
| def build(self) -> 'Schema': |
| """ |
| Returns an instance of an unresolved Schema. |
| """ |
| return Schema(self._j_builder.build()) |