blob: f2416c1151ed742573c2c5e49def15a1a2931b74 [file] [log] [blame]
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
set -Eeuo pipefail
SCALE="0.01"
source "$(dirname "$0")"/common.sh
################################################################################
# Generate test data
################################################################################
echo "Generating test data..."
TARGET_DIR="$END_TO_END_DIR/flink-tpch-test/target"
TPCH_DATA_DIR="$END_TO_END_DIR/test-scripts/test-data/tpch"
java -cp "$TARGET_DIR/TpchTestProgram.jar:$TARGET_DIR/lib/*" org.apache.flink.table.tpch.TpchDataGenerator "$SCALE" "$TARGET_DIR"
################################################################################
# Prepare Flink
################################################################################
echo "Preparing Flink..."
set_config_key "taskmanager.memory.managed.fraction" "0.55f"
start_cluster
################################################################################
# Run SQL statements
################################################################################
TABLE_DIR="$TARGET_DIR/table"
ORIGIN_QUERY_DIR="$TARGET_DIR/query"
MODIFIED_QUERY_DIR="$TPCH_DATA_DIR/modified-query"
EXPECTED_DIR="$TARGET_DIR/expected"
RESULT_DIR="$TEST_DATA_DIR/result"
SQL_CONF="$TEST_DATA_DIR/sql-client-session.conf"
mkdir "$RESULT_DIR"
SOURCES_YAML=$(cat "$TPCH_DATA_DIR/source.yaml")
SOURCES_YAML=${SOURCES_YAML//\$TABLE_DIR/"$TABLE_DIR"}
for i in {1..22}
do
echo "Running query #$i..."
# First line in sink yaml is ignored
SINK_YAML=$(tail -n +2 "$TPCH_DATA_DIR/sink/q${i}.yaml")
SINK_YAML=${SINK_YAML//\$RESULT_DIR/"$RESULT_DIR"}
cat > "$SQL_CONF" << EOF
${SOURCES_YAML}
${SINK_YAML}
execution:
planner: blink
type: batch
result-mode: table
parallelism: 2
EOF
if [[ -e "$MODIFIED_QUERY_DIR/q$i.sql" ]]
then
SQL_STATEMENT="INSERT INTO q$i $(cat "$MODIFIED_QUERY_DIR/q$i.sql")"
else
SQL_STATEMENT="INSERT INTO q$i $(cat "$ORIGIN_QUERY_DIR/q$i.sql")"
fi
JOB_ID=$("$FLINK_DIR/bin/sql-client.sh" \
--environment "$SQL_CONF" \
--update "$SQL_STATEMENT" | grep "Job ID:" | sed 's/.* //g')
wait_job_terminal_state "$JOB_ID" "FINISHED"
java -cp "$TARGET_DIR/TpchTestProgram.jar" org.apache.flink.table.tpch.TpchResultComparator "$EXPECTED_DIR/q$i.csv" "$RESULT_DIR/q$i.csv"
done