blob: fe0675e5939bf89ba28000215ecddd9d236a720b [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os.path
import re
from pathlib import Path
from pyflink.table import Schema, DataTypes, TableDescriptor, PlanReference
from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkTestCase
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
JSON_PLAN_DIR = os.path.join(THIS_DIR, "jsonplan")
def _replace_flink_version(plan: str) -> str:
"""
Ignore the value for the Flink Version in the compiled plan.
"""
return re.sub(r'"flinkVersion"\s*:\s*"[\w.-]*"', r'"flinkVersion" : ""', plan)
def _replace_exec_node_id(plan: str) -> str:
"""
Ignore id/source/target node ids, as these increment between test runs.
"""
id = re.sub(r'"id"\s*:\s*\d+', r'"id" : 0', plan)
source = re.sub(r'"source"\s*:\s*\d+', r'"source" : 0', id)
target = re.sub(r'"target"\s*:\s*\d+', r'"target" : 0', source)
return target
class CompiledPlanTest(PyFlinkStreamTableTestCase, PyFlinkTestCase):
def test_compile_plan_sql(self):
src = """
CREATE TABLE MyTable (a BIGINT, b INT, c VARCHAR)
WITH ('connector' = 'datagen', 'number-of-rows' = '5')
"""
self.t_env.execute_sql(src)
sink = """
CREATE TABLE MySink (a BIGINT, b INT, c VARCHAR)
WITH ('connector' = 'blackhole')
"""
self.t_env.execute_sql(sink)
compiled_plan = self.t_env.compile_plan_sql("INSERT INTO MySink SELECT * FROM MyTable")
with open(os.path.join(JSON_PLAN_DIR, "testGetJsonPlan.out")) as file:
expected = file.read()
self.maxDiff = None
self.assertEqual(
_replace_exec_node_id(_replace_flink_version(compiled_plan.as_json_string())), expected
)
def test_write_load_compiled_plan(self):
schema = Schema.new_builder().column("f0", DataTypes.STRING()).build()
table = self.t_env.from_descriptor(
TableDescriptor.for_connector("datagen")
.option("number-of-rows", "10")
.schema(schema)
.build()
)
self.t_env.create_temporary_table(
"RegisteredSink",
TableDescriptor.for_connector("blackhole").schema(schema).build(),
)
table_pipeline = table.insert_into("RegisteredSink")
compiled_plan = table_pipeline.compile_plan()
plan_path = Path(self.tempdir + "/plan.out")
compiled_plan.write_to_file(plan_path)
plan_reference_from_file = PlanReference.from_file(plan_path)
compiled_plan_from_file = self.t_env.load_plan(plan_reference_from_file)
self.assertEqual(compiled_plan.as_json_string(), compiled_plan_from_file.as_json_string())
with open(plan_path) as file:
content = file.read()
plan_reference_from_string = PlanReference.from_json_string(content)
compiled_plan_from_string = self.t_env.load_plan(plan_reference_from_string)
self.assertEqual(
compiled_plan.as_json_string(), compiled_plan_from_string.as_json_string()
)
if __name__ == "__main__":
import unittest
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports")
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)