blob: 20fd2640553c0cbeabd7c737ceee4f6d0e15a97d [file] [log] [blame]
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
set -Eeuo pipefail
KAFKA_VERSION="3.2.3"
CONFLUENT_VERSION="7.2.2"
CONFLUENT_MAJOR_VERSION="7.2"
# Check the Confluent Platform <> Apache Kafka compatibility matrix when updating KAFKA_VERSION
KAFKA_SQL_VERSION="universal"
SQL_JARS_DIR=${END_TO_END_DIR}/flink-sql-client-test/target/sql-jars
KAFKA_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "kafka" )
function create_data_stream_kafka_source {
topicName="test-python-data-stream-source"
create_kafka_topic 1 1 $topicName
echo "Sending messages to Kafka..."
send_messages_to_kafka '{"f0": "a", "f1": 1}' $topicName
send_messages_to_kafka '{"f0": "ab", "f1": 2}' $topicName
send_messages_to_kafka '{"f0": "abc", "f1": 3}' $topicName
send_messages_to_kafka '{"f0": "abcd", "f1": 4}' $topicName
send_messages_to_kafka '{"f0": "abcde", "f1": 5}' $topicName
}
function sort_msg {
arr=()
while read line
do
value=$line
arr+=("$value")
done <<< "$1"
IFS=$'\n' sorted=($(sort <<< "${arr[*]}")); unset IFS
echo "${sorted[*]}"
}
CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
source "${CURRENT_DIR}"/common.sh
source "${CURRENT_DIR}"/kafka_sql_common.sh \
${KAFKA_VERSION} \
${CONFLUENT_VERSION} \
${CONFLUENT_MAJOR_VERSION} \
${KAFKA_SQL_VERSION}
function test_clean_up {
stop_cluster
# Note: The 'data_stream_job.py' still uses the Kafka legacy source,
# so we temporarily disable this case because we plan to address this
# in future updates. Once we align with the upcoming PyFlink and Flink 2.0
# compatibility efforts, we will remove this temporary disable.
#
# So we should also skip stopping the Kafka cluster because we did not start the Kafka cluster.
# stop_kafka_cluster
}
on_exit test_clean_up
cp -r "${FLINK_DIR}/conf" "${TEST_DATA_DIR}/conf"
# standard yaml do not allow duplicate keys
sed -i -e "/^taskmanager.memory.task.off-heap.size: /d" "${TEST_DATA_DIR}/conf/config.yaml"
sed -i -e "/^taskmanager.memory.process.size: /d" "${TEST_DATA_DIR}/conf/config.yaml"
sed -i -e "/^taskmanager.numberOfTaskSlots: /d" "${TEST_DATA_DIR}/conf/config.yaml"
echo "taskmanager.memory.task.off-heap.size: 768m" >> "${TEST_DATA_DIR}/conf/config.yaml"
echo "taskmanager.memory.process.size: 3172m" >> "${TEST_DATA_DIR}/conf/config.yaml"
echo "taskmanager.numberOfTaskSlots: 5" >> "${TEST_DATA_DIR}/conf/config.yaml"
export FLINK_CONF_DIR="${TEST_DATA_DIR}/conf"
FLINK_PYTHON_DIR=`cd "${CURRENT_DIR}/../../flink-python" && pwd -P`
UV_HOME="${FLINK_PYTHON_DIR}/dev/.uv"
"${FLINK_PYTHON_DIR}/dev/lint-python.sh" -s uv
PYTHON_EXEC="${UV_HOME}/bin/python"
source "${UV_HOME}/bin/activate"
cd "${FLINK_PYTHON_DIR}"
rm -rf dist
uv pip install --group dev
python setup.py sdist
pushd apache-flink-libraries
python setup.py sdist
uv pip install dist/*
popd
uv pip install dist/*
cd dev
zip -q -r "${TEST_DATA_DIR}/venv.zip" .uv
deactivate
cd "${CURRENT_DIR}"
start_cluster
echo "Test PyFlink Table job:"
FLINK_PYTHON_TEST_DIR=`cd "${CURRENT_DIR}/../flink-python-test" && pwd -P`
REQUIREMENTS_PATH="${TEST_DATA_DIR}/requirements.txt"
echo "pytest==8.3.5" > "${REQUIREMENTS_PATH}"
echo "Test submitting python job with 'pipeline.jars':\n"
PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-p 2 \
-pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-pyreq "${REQUIREMENTS_PATH}" \
-pyarch "${TEST_DATA_DIR}/venv.zip" \
-pyexec "venv.zip/.uv/bin/python" \
-py "${FLINK_PYTHON_TEST_DIR}/python/python_job.py" \
pipeline.jars "file://${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
echo "Test submitting python job with 'pipeline.classpaths':\n"
PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-p 2 \
-pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-pyreq "${REQUIREMENTS_PATH}" \
-pyarch "${TEST_DATA_DIR}/venv.zip" \
-pyexec "venv.zip/.uv/bin/python" \
-py "${FLINK_PYTHON_TEST_DIR}/python/python_job.py" \
pipeline.classpaths "file://${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
echo "Test stream python udf sql job:\n"
PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-p 2 \
-pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-pyreq "${REQUIREMENTS_PATH}" \
-pyarch "${TEST_DATA_DIR}/venv.zip" \
-pyexec "venv.zip/.uv/bin/python" \
"${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
echo "Test batch python udf sql job:\n"
PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-p 2 \
-pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-pyreq "${REQUIREMENTS_PATH}" \
-pyarch "${TEST_DATA_DIR}/venv.zip" \
-pyexec "venv.zip/.uv/bin/python" \
-c org.apache.flink.python.tests.BatchPythonUdfSqlJob \
"${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
echo "Test using python udf in sql client:\n"
SUBMITTED_SQL=$TEST_DATA_DIR/submit-sql-client.sql
cat >> $SUBMITTED_SQL << EOF
CREATE TABLE sink (
a BIGINT
) WITH (
'connector' = 'filesystem',
'path' = '$TEST_DATA_DIR/sql-client-test',
'format' = 'csv'
);
CREATE FUNCTION add_one AS 'add_one.add_one' LANGUAGE PYTHON;
SET 'python.client.executable'='$PYTHON_EXEC';
INSERT INTO sink SELECT add_one(a) FROM (VALUES (1), (2), (3)) as source (a);
EOF
JOB_ID=$($FLINK_DIR/bin/sql-client.sh \
-pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
-pyreq "${REQUIREMENTS_PATH}" \
-pyarch "${TEST_DATA_DIR}/venv.zip" \
-pyexec "venv.zip/.uv/bin/python" \
-f "$SUBMITTED_SQL" | grep "Job ID:" | sed 's/.* //g')
wait_job_terminal_state "$JOB_ID" "FINISHED"
# Note: The 'data_stream_job.py' still uses the Kafka legacy source,
# so we temporarily disable this case because we plan to address this
# in future updates. Once we align with the upcoming PyFlink and Flink 2.0
# compatibility efforts, we will remove this temporary disable.
# echo "Test PyFlink DataStream job:"
#
# # Prepare Kafka for the DataStream job
# echo "Preparing Kafka..."
#
# # Setup Kafka distribution
# setup_kafka_dist
#
# # Start Kafka cluster
# start_kafka_cluster
#
# # End to end test for DataStream ProcessFunction with timer
# # Creating necessary Kafka topics for the test
# create_kafka_topic 1 1 timer-stream-source
# create_kafka_topic 1 1 timer-stream-sink
#
# # Preparing a set of payment messages in JSON format for the test
# PAYMENT_MSGS='{"createTime": 1603679413000, "orderId": 1603679414, "payAmount": 83685.44904332698, "payPlatform": 0, "provinceId": 3}
# {"createTime": 1603679426000, "orderId": 1603679427, "payAmount": 30092.50657757042, "payPlatform": 0, "provinceId": 1}
# {"createTime": 1603679427000, "orderId": 1603679428, "payAmount": 62644.01719293056, "payPlatform": 0, "provinceId": 6}
# {"createTime": 1603679428000, "orderId": 1603679429, "payAmount": 6449.806795118451, "payPlatform": 0, "provinceId": 2}
# {"createTime": 1603679491000, "orderId": 1603679492, "payAmount": 41108.36128417494, "payPlatform": 0, "provinceId": 0}
# {"createTime": 1603679492000, "orderId": 1603679493, "payAmount": 64882.44233197067, "payPlatform": 0, "provinceId": 4}
# {"createTime": 1603679521000, "orderId": 1603679522, "payAmount": 81648.80712644062, "payPlatform": 0, "provinceId": 3}
# {"createTime": 1603679522000, "orderId": 1603679523, "payAmount": 81861.73063103345, "payPlatform": 0, "provinceId": 4}'
# function send_msg_to_kafka {
# while read line
# do
# send_messages_to_kafka "$line" "timer-stream-source"
# sleep 1
# done <<< "$1"
# }
# function read_msg_from_kafka {
# $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning \
# --max-messages $1 \
# --topic $2 \
# --consumer-property group.id=$3 --timeout-ms 90000 2> /dev/null
# }
# send_msg_to_kafka "${PAYMENT_MSGS[*]}"
# JOB_ID=$(${FLINK_DIR}/bin/flink run \
# -pyfs "${FLINK_PYTHON_TEST_DIR}/python/datastream" \
# -pyreq "${REQUIREMENTS_PATH}" \
# -pyarch "${TEST_DATA_DIR}/venv.zip" \
# -pyexec "venv.zip/.uv/bin/python" \
# -pyclientexec "venv.zip/.uv/bin/python" \
# -pym "data_stream_job" \
# -j "${KAFKA_SQL_JAR}")
# echo "${JOB_ID}"
# JOB_ID=`echo "${JOB_ID}" | sed 's/.* //g'`
# wait_job_running ${JOB_ID}
# echo "Reading kafka messages..."
# READ_MSG=$(read_msg_from_kafka 16 timer-stream-sink pyflink-e2e-test-timer)
# We use env.execute_async() to submit the job, cancel it after fetched results.
# cancel_job "${JOB_ID}"
# EXPECTED_MSG='Current key: 1603679414, orderId: 1603679414, payAmount: 83685.44904332698, timestamp: 1603679413000
# Current key: 1603679427, orderId: 1603679427, payAmount: 30092.50657757042, timestamp: 1603679426000
# Current key: 1603679428, orderId: 1603679428, payAmount: 62644.01719293056, timestamp: 1603679427000
# Current key: 1603679429, orderId: 1603679429, payAmount: 6449.806795118451, timestamp: 1603679428000
# Current key: 1603679492, orderId: 1603679492, payAmount: 41108.36128417494, timestamp: 1603679491000
# Current key: 1603679493, orderId: 1603679493, payAmount: 64882.44233197067, timestamp: 1603679492000
# Current key: 1603679522, orderId: 1603679522, payAmount: 81648.80712644062, timestamp: 1603679521000
# Current key: 1603679523, orderId: 1603679523, payAmount: 81861.73063103345, timestamp: 1603679522000
# On timer timestamp: -9223372036854774308
# On timer timestamp: -9223372036854774308
# On timer timestamp: -9223372036854774308
# On timer timestamp: -9223372036854774308
# On timer timestamp: -9223372036854774308
# On timer timestamp: -9223372036854774308
# On timer timestamp: -9223372036854774308
# On timer timestamp: -9223372036854774308'
# EXPECTED_MSG=$(sort_msg "${EXPECTED_MSG[*]}")
# SORTED_READ_MSG=$(sort_msg "${READ_MSG[*]}")
# if [[ "${EXPECTED_MSG[*]}" != "${SORTED_READ_MSG[*]}" ]]; then
# echo "Output from Flink program does not match expected output."
# echo -e "EXPECTED Output: --${EXPECTED_MSG[*]}--"
# echo -e "ACTUAL: --${SORTED_READ_MSG[*]}--"
# exit 1
# fi
# clean up python env
"${FLINK_PYTHON_DIR}/dev/lint-python.sh" -r