blob: be9c0a6030a46682579c8113486fdf47b45cc4dc [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Optional
from pyflink.java_gateway import get_gateway
from pyflink.table import ExplainDetail
from pyflink.table.catalog import ObjectIdentifier
from pyflink.table.compiled_plan import CompiledPlan
from pyflink.table.table_result import TableResult
from pyflink.util.api_stability_decorators import PublicEvolving
from pyflink.util.java_utils import to_j_explain_detail_arr
__all__ = ["TablePipeline"]
@PublicEvolving()
class TablePipeline(object):
"""
Describes a complete pipeline from one or more source tables to a sink table.
"""
def __init__(self, j_table_pipeline, t_env):
self._j_table_pipeline = j_table_pipeline
self._t_env = t_env
def __str__(self) -> str:
return self._j_table_pipeline.toString()
def execute(self) -> TableResult:
"""
Executes the table pipeline.
.. versionadded:: 2.1.0
"""
self._t_env._before_execute()
return TableResult(self._j_table_pipeline.execute())
def explain(self, *extra_details: ExplainDetail) -> str:
"""
Returns the AST and the execution plan of the table pipeline.
:param extra_details: The extra explain details which the explain result should include,
e.g. estimated cost, changelog mode for streaming
:return: AST and execution plans
.. versionadded:: 2.1.0
"""
gateway = get_gateway()
j_extra_details = to_j_explain_detail_arr(extra_details)
return self._j_table_pipeline.explain(
gateway.jvm.org.apache.flink.table.api.ExplainFormat.TEXT, j_extra_details
)
def print_explain(self, *extra_details: ExplainDetail):
"""
Like :func:`~pyflink.table.TablePipeline.explain`, but prints the result to the client
console.
.. versionadded:: 2.1.0
"""
print(self.explain(*extra_details))
def get_sink_identifier(self) -> Optional[ObjectIdentifier]:
"""
Returns the sink table's :class:`~pyflink.table.catalog.ObjectIdentifier`, if any.
The result is empty for anonymous sink tables that haven't been registered before.
Registering sink tables can be done via
:func:`~pyflink.table.TableEnvironment.create_temporary_table` using a
:class:`~pyflink.table.TableDescriptor`.
.. versionadded:: 2.1.0
"""
optional_result = self._j_table_pipeline.getSinkIdentifier()
return (
ObjectIdentifier(j_object_identifier=optional_result.get())
if optional_result.isPresent()
else None
)
def compile_plan(self) -> CompiledPlan:
"""
Compiles this :class:`TablePipeline` into a :class:`~pyflink.table.CompiledPlan` that can
be executed as one job.
:class:`~pyflink.table.CompiledPlan`s can be persisted and reloaded across Flink versions.
They describe static pipelines to ensure backwards compatibility and enable stateful
streaming job upgrades. See :class:`~pyflink.table.CompiledPlan` and the website
documentation for more information.
.. note::
The compiled plan feature is experimental in batch mode.
:raises TableException: if any of the statements is invalid or if the plan cannot be
persisted.
.. versionadded:: 2.1.0
"""
return CompiledPlan(j_compiled_plan=self._j_table_pipeline.compilePlan(), t_env=self._t_env)