blob: c9cc19db71d554743d845308e20c9e7f68a493cc [file] [log] [blame]
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/kafka-common.sh
setup_kafka_dist
start_kafka_cluster
# modify configuration to have enough slots
cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: 3/" $FLINK_DIR/conf/flink-conf.yaml
start_cluster
function test_cleanup {
# don't call ourselves again for another signal interruption
trap "exit -1" INT
# don't call ourselves again for normal exit
trap "" EXIT
stop_kafka_cluster
# revert our modifications to the Flink distribution
mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
}
trap test_cleanup INT
trap test_cleanup EXIT
# create the required topics
create_kafka_topic 1 1 test-input
create_kafka_topic 1 1 test-output
# run the Flink job (detached mode)
$FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar \
--input-topic test-input --output-topic test-output \
--prefix=PREFIX \
--bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest \
--flink.partition-discovery.interval-millis 1000
function verify_output {
local expected=$(printf $1)
if [[ "$2" != "$expected" ]]; then
echo "Output from Flink program does not match expected output."
echo -e "EXPECTED FOR KEY: --$expected--"
echo -e "ACTUAL: --$2--"
exit 1
fi
}
echo "Sending messages to Kafka topic [test-input] ..."
# send some data to Kafka
send_messages_to_kafka "elephant,5,45218\nsquirrel,12,46213\nbee,3,51348\nsquirrel,22,52444\nbee,10,53412\nelephant,9,54867" test-input
echo "Verifying messages from Kafka topic [test-output] ..."
KEY_1_MSGS=$(read_messages_from_kafka 6 test-output elephant_consumer | grep elephant)
KEY_2_MSGS=$(read_messages_from_kafka 6 test-output squirrel_consumer | grep squirrel)
KEY_3_MSGS=$(read_messages_from_kafka 6 test-output bee_consumer | grep bee)
# check all keys; make sure we have actual newlines in the string, not "\n"
verify_output "elephant,5,45218\nelephant,14,54867" "$KEY_1_MSGS"
verify_output "squirrel,12,46213\nsquirrel,34,52444" "$KEY_2_MSGS"
verify_output "bee,3,51348\nbee,13,53412" "$KEY_3_MSGS"
# now, we add a new partition to the topic
echo "Repartitioning Kafka topic [test-input] ..."
modify_num_partitions test-input 2
if (( $(get_num_partitions test-input) != 2 )); then
echo "Failed adding a partition to test-input topic."
exit 1
fi
# send some more messages to Kafka
echo "Sending more messages to Kafka topic [test-input] ..."
send_messages_to_kafka "elephant,13,64213\ngiraffe,9,65555\nbee,5,65647\nsquirrel,18,66413" test-input
# verify that our assumption that the new partition actually has written messages is correct
if (( $(get_partition_end_offset test-input 1) == 0 )); then
echo "The newly created partition does not have any new messages, and therefore partition discovery cannot be verified."
exit 1
fi
# all new messages should have been consumed, and has produced correct output
echo "Verifying messages from Kafka topic [test-output] ..."
KEY_1_MSGS=$(read_messages_from_kafka 4 test-output elephant_consumer | grep elephant)
KEY_2_MSGS=$(read_messages_from_kafka 4 test-output squirrel_consumer | grep squirrel)
KEY_3_MSGS=$(read_messages_from_kafka 4 test-output bee_consumer | grep bee)
KEY_4_MSGS=$(read_messages_from_kafka 10 test-output giraffe_consumer | grep giraffe)
verify_output "elephant,27,64213" "$KEY_1_MSGS"
verify_output "squirrel,52,66413" "$KEY_2_MSGS"
verify_output "bee,18,65647" "$KEY_3_MSGS"
verify_output "giraffe,9,65555" "$KEY_4_MSGS"