blob: 20be5fb8806977b89c1ca6440304d20e051c9b2d [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os
import shutil
import sys
import unittest
import uuid
from pyflink.pyflink_gateway_server import on_windows
from pyflink.table import DataTypes
from pyflink.table import expressions as expr
from pyflink.table.udf import udf
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import (PyFlinkBlinkStreamTableTestCase,
PyFlinkBlinkBatchTableTestCase,
PyFlinkStreamTableTestCase,
PyFlinkBatchTableTestCase)
class DependencyTests(object):
def test_add_python_file(self):
python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
os.mkdir(python_file_dir)
python_file_path = os.path.join(python_file_dir, "test_dependency_manage_lib.py")
with open(python_file_path, 'w') as f:
f.write("def add_two(a):\n raise Exception('This function should not be called!')")
self.t_env.add_python_file(python_file_path)
python_file_dir_with_higher_priority = os.path.join(
self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
os.mkdir(python_file_dir_with_higher_priority)
python_file_path_higher_priority = os.path.join(python_file_dir_with_higher_priority,
"test_dependency_manage_lib.py")
with open(python_file_path_higher_priority, 'w') as f:
f.write("def add_two(a):\n return a + 2")
self.t_env.add_python_file(python_file_path_higher_priority)
def plus_two(i):
from test_dependency_manage_lib import add_two
return add_two(i)
self.t_env.create_temporary_system_function(
"add_two", udf(plus_two, DataTypes.BIGINT(), DataTypes.BIGINT()))
table_sink = source_sink_utils.TestAppendSink(
['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
self.t_env.register_table_sink("Results", table_sink)
t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
t.select(expr.call("add_two", t.a), t.a).execute_insert("Results").wait()
actual = source_sink_utils.results()
self.assert_equals(actual, ["3,1", "4,2", "5,3"])
class FlinkStreamDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
pass
class FlinkBatchDependencyTests(PyFlinkBatchTableTestCase):
def test_add_python_file(self):
python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
os.mkdir(python_file_dir)
python_file_path = os.path.join(python_file_dir, "test_dependency_manage_lib.py")
with open(python_file_path, 'w') as f:
f.write("def add_two(a):\n return a + 2")
self.t_env.add_python_file(python_file_path)
def plus_two(i):
from test_dependency_manage_lib import add_two
return add_two(i)
self.t_env.create_temporary_system_function(
"add_two", udf(plus_two, DataTypes.BIGINT(), DataTypes.BIGINT()))
t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
t = t.select(expr.call('add_two', t.a), t.a)
result = self.collect(t)
self.assertEqual(result, ["3,1", "4,2", "5,3"])
class BlinkBatchDependencyTests(DependencyTests, PyFlinkBlinkBatchTableTestCase):
pass
class BlinkStreamDependencyTests(DependencyTests, PyFlinkBlinkStreamTableTestCase):
def test_set_requirements_without_cached_directory(self):
requirements_txt_path = os.path.join(self.tempdir, str(uuid.uuid4()))
with open(requirements_txt_path, 'w') as f:
f.write("cloudpickle==1.2.2")
self.t_env.set_python_requirements(requirements_txt_path)
def check_requirements(i):
import cloudpickle
assert os.path.abspath(cloudpickle.__file__).startswith(
os.environ['_PYTHON_REQUIREMENTS_INSTALL_DIR'])
return i
self.t_env.create_temporary_system_function("check_requirements",
udf(check_requirements, DataTypes.BIGINT(),
DataTypes.BIGINT()))
table_sink = source_sink_utils.TestAppendSink(
['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
self.t_env.register_table_sink("Results", table_sink)
t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
t.select(expr.call('check_requirements', t.a), t.a).execute_insert("Results").wait()
actual = source_sink_utils.results()
self.assert_equals(actual, ["1,1", "2,2", "3,3"])
def test_set_requirements_with_cached_directory(self):
tmp_dir = self.tempdir
requirements_txt_path = os.path.join(tmp_dir, "requirements_txt_" + str(uuid.uuid4()))
with open(requirements_txt_path, 'w') as f:
f.write("python-package1==0.0.0")
requirements_dir_path = os.path.join(tmp_dir, "requirements_dir_" + str(uuid.uuid4()))
os.mkdir(requirements_dir_path)
package_file_name = "python-package1-0.0.0.tar.gz"
with open(os.path.join(requirements_dir_path, package_file_name), 'wb') as f:
import base64
# This base64 data is encoded from a python package file which includes a
# "python_package1" module. The module contains a "plus(a, b)" function.
# The base64 can be recomputed by following code:
# base64.b64encode(open("python-package1-0.0.0.tar.gz", "rb").read()).decode("utf-8")
f.write(base64.b64decode(
"H4sICNefrV0C/2Rpc3QvcHl0aG9uLXBhY2thZ2UxLTAuMC4wLnRhcgDtmVtv2jAYhnPtX2H1CrRCY+ckI"
"XEx7axuUA11u5imyICTRc1JiVnHfv1MKKWjYxwKEdPehws7xkmUfH5f+3PyqfqWpa1cjG5EKFnLbOvfhX"
"FQTI3nOPPSdavS5Pa8nGMwy3Esi3ke9wyTObbnGNQxamBSKlFQavzUryG8ldG6frpbEGx4yNmDLMp/hPy"
"P8b+6fNN613vdP1z8XdteG3+ug/17/F3Hcw1qIv5H54NUYiyUaH2SRRllaYeytkl6IpEdujI2yH2XapCQ"
"wSRJRDHt0OveZa//uUfeZonUvUO5bHo+0ZcoVo9bMhFRvGx9H41kWj447aUsR0WUq+pui8arWKggK5Jli"
"wGOo/95q79ovXi6/nfyf246Dof/n078fT9KI+X77Xx6BP83bX4Xf5NxT7dz7toO/L8OxjKgeTwpG+KcDp"
"sdQjWFVJMipYI+o0MCk4X/t2UYtqI0yPabCHb3f861XcD/Ty/+Y5nLdCzT0dSPo/SmbKsf6un+b7KV+Ls"
"W4/D/OoC9w/930P9eGwM75//csrD+Q/6P/P/k9D/oX3988Wqw1bS/tf6tR+s/m3EG/ddBqXO9XKf15C8p"
"P9k4HZBtBgzZaVW5vrfKcj+W32W82ygEB9D/Xu9+4/qfP9L/rBv0X1v87yONKRX61/qfzwqjIDzIPTbv/"
"7or3/88i0H/tfBFW7s/s/avRInQH06ieEy7tDrQeYHUdRN7wP+n/vf62LOH/pld7f9xz7a5Pfufedy0oP"
"86iJI8KxStAq6yLC4JWdbbVbWRikR2z1ZGytk5vauW3QdnBFE6XqwmykazCesAAAAAAAAAAAAAAAAAAAA"
"AAAAAAAAAAAAAAOBw/AJw5CHBAFAAAA=="))
self.t_env.set_python_requirements(requirements_txt_path, requirements_dir_path)
def add_one(i):
from python_package1 import plus
return plus(i, 1)
self.t_env.create_temporary_system_function("add_one",
udf(add_one, DataTypes.BIGINT(),
DataTypes.BIGINT()))
table_sink = source_sink_utils.TestAppendSink(
['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
self.t_env.register_table_sink("Results", table_sink)
t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
t.select(expr.call('add_one', t.a), t.a).execute_insert("Results").wait()
actual = source_sink_utils.results()
self.assert_equals(actual, ["2,1", "3,2", "4,3"])
def test_add_python_archive(self):
tmp_dir = self.tempdir
archive_dir_path = os.path.join(tmp_dir, "archive_" + str(uuid.uuid4()))
os.mkdir(archive_dir_path)
with open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
f.write("2")
archive_file_path = \
shutil.make_archive(os.path.dirname(archive_dir_path), 'zip', archive_dir_path)
self.t_env.add_python_archive(archive_file_path, "data")
def add_from_file(i):
with open("data/data.txt", 'r') as f:
return i + int(f.read())
self.t_env.create_temporary_system_function("add_from_file",
udf(add_from_file, DataTypes.BIGINT(),
DataTypes.BIGINT()))
table_sink = source_sink_utils.TestAppendSink(
['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
self.t_env.register_table_sink("Results", table_sink)
t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
t.select(expr.call('add_from_file', t.a), t.a).execute_insert("Results").wait()
actual = source_sink_utils.results()
self.assert_equals(actual, ["3,1", "4,2", "5,3"])
@unittest.skipIf(on_windows(), "Symbolic link is not supported on Windows, skipping.")
def test_set_environment(self):
python_exec = sys.executable
tmp_dir = self.tempdir
python_exec_link_path = os.path.join(tmp_dir, "py_exec")
os.symlink(python_exec, python_exec_link_path)
self.t_env.get_config().set_python_executable(python_exec_link_path)
def check_python_exec(i):
import os
assert os.environ["python"] == python_exec_link_path
return i
self.t_env.create_temporary_system_function("check_python_exec",
udf(check_python_exec, DataTypes.BIGINT(),
DataTypes.BIGINT()))
def check_pyflink_gateway_disabled(i):
try:
from pyflink.java_gateway import get_gateway
get_gateway()
except Exception as e:
assert str(e).startswith("It's launching the PythonGatewayServer during Python UDF"
" execution which is unexpected.")
else:
raise Exception("The gateway server is not disabled!")
return i
self.t_env.create_temporary_system_function(
"check_pyflink_gateway_disabled",
udf(check_pyflink_gateway_disabled, DataTypes.BIGINT(),
DataTypes.BIGINT()))
table_sink = source_sink_utils.TestAppendSink(
['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
self.t_env.register_table_sink("Results", table_sink)
t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
t.select(
expr.call('check_python_exec', t.a),
expr.call('check_pyflink_gateway_disabled', t.a)) \
.execute_insert("Results").wait()
actual = source_sink_utils.results()
self.assert_equals(actual, ["1,1", "2,2", "3,3"])
if __name__ == "__main__":
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)