blob: addc04f28bdadc503a6aea691fafaf0c77efb215 [file] [log] [blame]
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This script aims to test Materialized Table and Kubernetes integration.
source "$(dirname "$0")"/common_kubernetes.sh
CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
CLUSTER_ROLE_BINDING="flink-role-binding-default"
APPLICATION_CLUSTER_ID="flink-native-k8s-sql-mt-application-1"
FLINK_IMAGE_NAME="test_kubernetes_mt_application-1"
LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log"
IMAGE_BUILD_RETRIES=3
IMAGE_BUILD_BACKOFF=2
# copy test-filesystem jar & hadoop plugin
TEST_FILE_SYSTEM_JAR=`ls ${END_TO_END_DIR}/../flink-test-utils-parent/flink-table-filesystem-test-utils/target/flink-table-filesystem-test-utils-*.jar`
cp $TEST_FILE_SYSTEM_JAR ${FLINK_DIR}/lib/
add_optional_plugin "s3-fs-hadoop"
# start kubernetes
start_kubernetes
kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit --serviceaccount=default:default --namespace=default
# build image
if ! retry_times $IMAGE_BUILD_RETRIES $IMAGE_BUILD_BACKOFF "build_image ${FLINK_IMAGE_NAME} $(get_host_machine_address)"; then
echo "ERROR: Could not build image. Aborting..."
exit 1
fi
# setup materialized table data dir
echo "[INFO] Start S3 env"
source "$(dirname "$0")"/common_s3_minio.sh
s3_setup hadoop
S3_TEST_DATA_WORDS_URI="s3://$IT_CASE_S3_BUCKET/"
MATERIALIZED_TABLE_DATA_DIR="${S3_TEST_DATA_WORDS_URI}/test_materialized_table-$(uuidgen)"
echo "[INFO] Start SQL Gateway"
set_config_key "sql-gateway.endpoint.rest.address" "localhost"
start_sql_gateway
SQL_GATEWAY_REST_PORT=8083
function internal_cleanup {
kubectl delete deployment ${APPLICATION_CLUSTER_ID}
kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING}
}
function open_session() {
local session_options=$1
if [ -z "$session_options" ]; then
session_options="{}"
fi
curl -s -X POST \
-H "Content-Type: application/json" \
-d "{\"properties\": $session_options}" \
"http://localhost:$SQL_GATEWAY_REST_PORT/sessions" | jq -r '.sessionHandle'
}
function configure_session() {
local session_handle=$1
local statement=$2
response=$(curl -s -X POST \
-H "Content-Type: application/json" \
-d "{\"statement\": \"$statement\"}" \
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/configure-session)
if [ "$response" != "{}" ]; then
echo "Configure session $session_handle $statement failed: $response"
exit 1
fi
echo "Configured session $session_handle $statement successfully"
}
function execute_statement() {
local session_handle=$1
local statement=$2
local response=$(curl -s -X POST \
-H "Content-Type: application/json" \
-d "{\"statement\": \"$statement\"}" \
http://localhost:$SQL_GATEWAY_REST_PORT/v3/sessions/$session_handle/statements)
local operation_handle=$(echo $response | jq -r '.operationHandle')
if [ -z "$operation_handle" ] || [ "$operation_handle" == "null" ]; then
echo "Failed to execute statement: $statement, response: $response"
exit 1
fi
get_operation_result $session_handle $operation_handle
}
function get_operation_result() {
local session_handle=$1
local operation_handle=$2
local fields_array=()
local next_uri="v3/sessions/$session_handle/operations/$operation_handle/result/0"
while [ ! -z "$next_uri" ] && [ "$next_uri" != "null" ];
do
response=$(curl -s -X GET \
-H "Content-Type: \
application/json" \
http://localhost:$SQL_GATEWAY_REST_PORT/$next_uri)
result_type=$(echo $response | jq -r '.resultType')
result_kind=$(echo $response | jq -r '.resultKind')
next_uri=$(echo $response | jq -r '.nextResultUri')
errors=$(echo $response | jq -r '.errors')
if [ "$errors" != "null" ]; then
echo "fetch operation $operation_handle failed: $errors"
exit 1
fi
if [ result_kind == "SUCCESS" ]; then
fields_array+="SUCCESS"
break;
fi
if [ "$result_type" != "NOT_READY" ] && [ "$result_kind" == "SUCCESS_WITH_CONTENT" ]; then
new_fields=$(echo $response | jq -r '.results.data[].fields')
fields_array+=$new_fields
else
sleep 1
fi
done
echo $fields_array
}
function create_materialized_table_in_continous_mode() {
local session_handle=$1
local table_name=$2
local operation_handle=$(curl -s -X POST \
-H "Content-Type: application/json" \
-d "{\"statement\": \"CREATE MATERIALIZED TABLE $table_name \
PARTITIONED BY (ds) \
with (\
'format' = 'json',\
'sink.rolling-policy.rollover-interval' = '10s',\
'sink.rolling-policy.check-interval' = '10s'\
)\
FRESHNESS = INTERVAL '10' SECOND \
AS SELECT \
DATE_FORMAT(\`timestamp\`, 'yyyy-MM-dd') AS ds, \
\`user\`, \
\`type\` \
FROM filesystem_source \/*+ options('source.monitor-interval' = '10s') *\/ \"}" \
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements | jq -r '.operationHandle')
get_operation_result $session_handle $operation_handle
}
function create_filesystem_source() {
local session_handle=$1
local table_name=$2
local path=$3
create_source_result=$(curl -s -X POST \
-H "Content-Type: application/json" \
-d "{\"statement\": \"CREATE TABLE $table_name (\
\`timestamp\` TIMESTAMP_LTZ(3),\
\`user\` STRING,\
\`type\` STRING\
) WITH (\
'format' = 'csv'\
)\"}" \
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements/)
echo $create_source_result
}
S3_ENDPOINT=${S3_ENDPOINT//localhost/$(get_host_machine_address)}
echo "[INFO] Create a new session"
session_options="{\"table.catalog-store.kind\": \"file\",
\"table.catalog-store.file.path\": \"$MATERIALIZED_TABLE_DATA_DIR/\",
\"execution.target\": \"kubernetes-application\",
\"kubernetes.cluster-id\": \"${APPLICATION_CLUSTER_ID}\",
\"kubernetes.container.image.ref\": \"${FLINK_IMAGE_NAME}\",
\"jobmanager.memory.process.size\": \"1088m\",
\"taskmanager.memory.process.size\": \"1000m\",
\"kubernetes.jobmanager.cpu\": \"0.5\",
\"kubernetes.taskmanager.cpu\": \"0.5\",
\"kubernetes.rest-service.exposed.type\": \"NodePort\",
\"s3.endpoint\": \"$S3_ENDPOINT\",
\"workflow-scheduler.type\": \"embedded\"}"
session_handle=$(open_session "$session_options")
echo "[INFO] Session Handle $session_handle"
# prepare catalog & database
echo "[INFO] Create catalog & database"
configure_session "$session_handle" "create catalog if not exists test_catalog with (\
'type' = 'test-filesystem',\
'default-database' = 'test_db',\
'path' = '$MATERIALIZED_TABLE_DATA_DIR/'\
)"
configure_session $session_handle "use catalog test_catalog"
configure_session $session_handle "create database if not exists test_db"
create_filesystem_source $session_handle "filesystem_source"
# 1. create materialized table in continuous mode
echo "[INFO] Create Materialized table in continuous mode"
create_materialized_table_in_continous_mode $session_handle "my_materialized_table_in_continuous_mode"
echo "[INFO] Wait deployment ${APPLICATION_CLUSTER_ID} Available"
kubectl wait --for=condition=Available --timeout=30s deploy/${APPLICATION_CLUSTER_ID} || exit 1
jm_pod_name=$(kubectl get pods --selector="app=${APPLICATION_CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}')
echo "[INFO] Wait first checkpoint finished"
wait_num_checkpoints $jm_pod_name 1
# 2. suspend & resume materialized table in continuous mode
echo "[INFO] Suspend materialized table"
configure_session $session_handle "set 'execution.checkpointing.savepoint-dir' = '$MATERIALIZED_TABLE_DATA_DIR/savepoint'"
execute_statement $session_handle "alter materialized table my_materialized_table_in_continuous_mode suspend"
kubectl wait --for=delete deployment/$APPLICATION_CLUSTER_ID
echo "[INFO] Resume materialized table"
execute_statement $session_handle "alter materialized table my_materialized_table_in_continuous_mode resume"
kubectl wait --for=condition=Available --timeout=30s deploy/${APPLICATION_CLUSTER_ID} || exit 1
jm_pod_name=$(kubectl get pods --selector="app=${APPLICATION_CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}')
echo "[INFO] Wait resumed job finished the first checkpoint"
wait_num_checkpoints $jm_pod_name 1
# 3. verify resumed continuous job is restore from savepoint
mkdir -p "$LOCAL_LOGS_PATH"
kubectl logs $jm_pod_name > $LOCAL_LOGS_PATH/jobmanager.log
grep -E "Starting job [A-Za-z0-9]+ from savepoint" $LOCAL_LOGS_PATH/jobmanager.log