blob: 59870f7ee676ad26585466818d6f909cd5afb1d2 [file] [log] [blame]
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
source "$(dirname "$0")"/common.sh
INPUT_TYPE=${1:-file}
RESULT_HASH="72a690412be8928ba239c2da967328a5"
S3_PREFIX=temp/test_batch_wordcount-$(uuidgen)
OUTPUT_PATH="${TEST_DATA_DIR}/out/wc_out"
fetch_complete_result=()
case $INPUT_TYPE in
(file)
ARGS="--input ${TEST_INFRA_DIR}/test-data/words --output ${OUTPUT_PATH}"
;;
(hadoop)
source "$(dirname "$0")"/common_s3.sh
s3_setup hadoop
ARGS="--input ${S3_TEST_DATA_WORDS_URI} --output s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
on_exit "s3_delete_by_full_path_prefix '$S3_PREFIX'"
fetch_complete_result=(s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "${S3_PREFIX}" "" false)
;;
(hadoop_minio)
source "$(dirname "$0")"/common_s3_minio.sh
s3_setup hadoop
ARGS="--input ${S3_TEST_DATA_WORDS_URI} --output s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
OUTPUT_PATH="$TEST_INFRA_DIR/$IT_CASE_S3_BUCKET/$S3_PREFIX"
;;
(hadoop_with_provider)
source "$(dirname "$0")"/common_s3.sh
s3_setup_with_provider hadoop "fs.s3a.aws.credentials.provider"
ARGS="--input ${S3_TEST_DATA_WORDS_URI} --output s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
on_exit "s3_delete_by_full_path_prefix '$S3_PREFIX'"
fetch_complete_result=(s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "${S3_PREFIX}" "" false)
;;
(presto)
source "$(dirname "$0")"/common_s3.sh
s3_setup presto
ARGS="--input ${S3_TEST_DATA_WORDS_URI} --output s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
on_exit "s3_delete_by_full_path_prefix '$S3_PREFIX'"
fetch_complete_result=(s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "${S3_PREFIX}" "" false)
;;
(presto_minio)
source "$(dirname "$0")"/common_s3_minio.sh
s3_setup presto
ARGS="--input ${S3_TEST_DATA_WORDS_URI} --output s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
OUTPUT_PATH="$TEST_INFRA_DIR/$IT_CASE_S3_BUCKET/$S3_PREFIX"
;;
(dummy-fs)
source "$(dirname "$0")"/common_dummy_fs.sh
dummy_fs_setup
ARGS="--input dummy://localhost/words --input anotherDummy://localhost/words --output ${OUTPUT_PATH}"
RESULT_HASH="0e5bd0a3dd7d5a7110aa85ff70adb54b"
;;
(*)
echo "Unknown input type $INPUT_TYPE"
exit 1
;;
esac
mkdir -p "$(dirname $OUTPUT_PATH)"
start_cluster
# The test may run against different source types.
# But the sources should provide the same test data, so the checksum stays the same for all tests.
${FLINK_DIR}/bin/flink run -p 1 ${FLINK_DIR}/examples/batch/WordCount.jar ${ARGS}
# Fetches result from AWS s3 to the OUTPUT_PATH, no-op for other filesystems and minio-based tests
# it seems we need a function for retry_times
function fetch_it() {
${fetch_complete_result[@]}
}
retry_times 10 5 fetch_it
check_result_hash "WordCount (${INPUT_TYPE})" "${OUTPUT_PATH}" "${RESULT_HASH}"