blob: ac521f1b3e73a284c86519f190f47dc33f736015 [file] [log] [blame]
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
set -Eeuo pipefail
CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
source "${CURRENT_DIR}"/common.sh
cp -r "${FLINK_DIR}/conf" "${TEST_DATA_DIR}/conf"
echo "taskmanager.memory.task.off-heap.size: 768m" >> "${TEST_DATA_DIR}/conf/flink-conf.yaml"
echo "taskmanager.memory.process.size: 3172m" >> "${TEST_DATA_DIR}/conf/flink-conf.yaml"
export FLINK_CONF_DIR="${TEST_DATA_DIR}/conf"
FLINK_PYTHON_DIR=`cd "${CURRENT_DIR}/../../flink-python" && pwd -P`
CONDA_HOME="${FLINK_PYTHON_DIR}/dev/.conda"
"${FLINK_PYTHON_DIR}/dev/lint-python.sh" -s miniconda
PYTHON_EXEC="${CONDA_HOME}/bin/python"
source "${CONDA_HOME}/bin/activate"
cd "${FLINK_PYTHON_DIR}"
rm -rf dist
python setup.py sdist
pip install dist/*
cd dev
conda install -y -q zip=3.0
rm -rf .conda/pkgs
zip -q -r "${TEST_DATA_DIR}/venv.zip" .conda
deactivate
cd "${CURRENT_DIR}"
start_cluster
on_exit stop_cluster
FLINK_PYTHON_TEST_DIR=`cd "${CURRENT_DIR}/../flink-python-test" && pwd -P`
REQUIREMENTS_PATH="${TEST_DATA_DIR}/requirements.txt"
echo "scipy==1.4.1" > "${REQUIREMENTS_PATH}"
echo "Test submitting python job:\n"
PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-pyreq "${REQUIREMENTS_PATH}" \
-pyarch "${TEST_DATA_DIR}/venv.zip" \
-pyexec "venv.zip/.conda/bin/python" \
-py "${FLINK_PYTHON_TEST_DIR}/python/python_job.py"
echo "Test blink stream python udf sql job:\n"
PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-pyreq "${REQUIREMENTS_PATH}" \
-pyarch "${TEST_DATA_DIR}/venv.zip" \
-pyexec "venv.zip/.conda/bin/python" \
"${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
echo "Test blink batch python udf sql job:\n"
PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-pyreq "${REQUIREMENTS_PATH}" \
-pyarch "${TEST_DATA_DIR}/venv.zip" \
-pyexec "venv.zip/.conda/bin/python" \
-c org.apache.flink.python.tests.BlinkBatchPythonUdfSqlJob \
"${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
echo "Test flink stream python udf sql job:\n"
PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-pyreq "${REQUIREMENTS_PATH}" \
-pyarch "${TEST_DATA_DIR}/venv.zip" \
-pyexec "venv.zip/.conda/bin/python" \
-c org.apache.flink.python.tests.FlinkStreamPythonUdfSqlJob \
"${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
echo "Test flink batch python udf sql job:\n"
PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-pyreq "${REQUIREMENTS_PATH}" \
-pyarch "${TEST_DATA_DIR}/venv.zip" \
-pyexec "venv.zip/.conda/bin/python" \
-c org.apache.flink.python.tests.FlinkBatchPythonUdfSqlJob \
"${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
echo "Test using python udf in sql client:\n"
SQL_CONF=$TEST_DATA_DIR/sql-client-session.conf
cat >> $SQL_CONF << EOF
tables:
- name: sink
type: sink-table
update-mode: append
schema:
- name: a
type: BIGINT
connector:
type: filesystem
path: "$TEST_DATA_DIR/sql-client-test.csv"
format:
type: csv
fields:
- name: a
type: BIGINT
functions:
- name: add_one
from: python
fully-qualified-name: add_one.add_one
configuration:
python.client.executable: "$PYTHON_EXEC"
EOF
SQL_STATEMENT="insert into sink select add_one(a) from (VALUES (1), (2), (3)) as source (a)"
JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
--environment $SQL_CONF \
-pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-pyreq "${REQUIREMENTS_PATH}" \
-pyarch "${TEST_DATA_DIR}/venv.zip" \
-pyexec "venv.zip/.conda/bin/python" \
--update "$SQL_STATEMENT" | grep "Job ID:" | sed 's/.* //g')
wait_job_terminal_state "$JOB_ID" "FINISHED"
stop_cluster