blob: 8bd7b25798e251b4c4a9263c8bc0d8f3c5af39c1 [file] [log] [blame]
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# test_flink_uber_jar.sh tests the Python FlinkRunner class.
set -e
set -v
SAVE_MAIN_SESSION=0
while [[ $# -gt 0 ]]
do
key="$1"
case $key in
--flink_job_server_jar)
FLINK_JOB_SERVER_JAR="$2"
shift # past argument
shift # past value
;;
--flink_mini_cluster_jar)
FLINK_MINI_CLUSTER_JAR="$2"
shift # past argument
shift # past value
;;
--env_dir)
ENV_DIR="$2"
shift # past argument
shift # past value
;;
--python_root_dir)
PYTHON_ROOT_DIR="$2"
shift # past argument
shift # past value
;;
--python_version)
PYTHON_VERSION="$2"
shift # past argument
shift # past value
;;
--python_container_image)
PYTHON_CONTAINER_IMAGE="$2"
shift # past argument
shift # past value
;;
--save_main_session)
SAVE_MAIN_SESSION=1
shift # past value
;;
*) # unknown option
echo "Unknown option: $1"
exit 1
;;
esac
done
# Go to the root of the repository
cd "$(git rev-parse --show-toplevel)"
# Verify docker command exists
command -v docker
docker -v
# Verify container has already been built
docker images --format "{{.Repository}}:{{.Tag}}" | grep "$PYTHON_CONTAINER_IMAGE"
# Set up Python environment
virtualenv -p "python$PYTHON_VERSION" "$ENV_DIR"
. $ENV_DIR/bin/activate
pip install --retries 10 -e "$PYTHON_ROOT_DIR"
# Hacky python script to find a free port. Note there is a small chance the chosen port could
# get taken before being claimed.
SOCKET_SCRIPT="
from __future__ import print_function
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('localhost', 0))
print(s.getsockname()[1])
s.close()
"
FLINK_PORT=$(python -c "$SOCKET_SCRIPT")
echo "Starting Flink mini cluster listening on port $FLINK_PORT"
java -Dorg.slf4j.simpleLogger.defaultLogLevel=warn -jar "$FLINK_MINI_CLUSTER_JAR" --rest-port "$FLINK_PORT" --rest-bind-address localhost &
PIPELINE_PY="
import apache_beam as beam
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms import Create
from apache_beam.transforms import Map
logging.basicConfig(level=logging.INFO)
pipeline_options = PipelineOptions()
pipeline = beam.Pipeline(options=pipeline_options)
if pipeline_options.view_as(SetupOptions).save_main_session:
# To test that our main session is getting plumbed through artifact staging
# correctly, create a global variable. If the main session is not plumbed
# through properly, global_var will be undefined and the pipeline will fail.
global_var = 1
pcoll = (pipeline
| Create([0, 1, 2])
| Map(lambda x: x + global_var))
else:
pcoll = (pipeline
| Create([0, 1, 2])
| Map(lambda x: x + 1))
assert_that(pcoll, equal_to([1, 2, 3]))
result = pipeline.run()
result.wait_until_finish()
"
if [[ "$SAVE_MAIN_SESSION" -eq 0 ]]; then
(python -c "$PIPELINE_PY" \
--runner FlinkRunner \
--flink_job_server_jar "$FLINK_JOB_SERVER_JAR" \
--parallelism 1 \
--environment_type DOCKER \
--environment_options "docker_container_image=$PYTHON_CONTAINER_IMAGE" \
--flink_master "localhost:$FLINK_PORT" \
--flink_submit_uber_jar \
) || TEST_EXIT_CODE=$? # don't fail fast here; clean up before exiting
else
(python -c "$PIPELINE_PY" \
--runner FlinkRunner \
--flink_job_server_jar "$FLINK_JOB_SERVER_JAR" \
--parallelism 1 \
--environment_type DOCKER \
--environment_options "docker_container_image=$PYTHON_CONTAINER_IMAGE" \
--flink_master "localhost:$FLINK_PORT" \
--flink_submit_uber_jar \
--save_main_session
) || TEST_EXIT_CODE=$? # don't fail fast here; clean up before exiting
fi
kill %1 || echo "Failed to shut down Flink mini cluster"
rm -rf "$ENV_DIR"
if [[ "$TEST_EXIT_CODE" -eq 0 ]]; then
echo ">>> SUCCESS"
else
echo ">>> FAILURE"
fi
exit $TEST_EXIT_CODE