blob: b1500a0a96a629ed8cf7fd18047637bd4dc8711c [file] [log] [blame]
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
set -Eeuo pipefail
source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/kafka-common.sh 0.10.2.0 3.2.0 3.2
function verify_output {
local expected=$(printf $1)
local result=$(echo $2 | sed 's/ //g')
if [[ "$result" != "$expected" ]]; then
echo "Output from Flink program does not match expected output."
echo -e "EXPECTED FOR KEY: --$expected--"
echo -e "ACTUAL: --$result--"
exit 1
fi
}
function test_cleanup {
# don't call ourselves again for another signal interruption
trap "exit -1" INT
# don't call ourselves again for normal exit
trap "" EXIT
stop_confluent_schema_registry
stop_kafka_cluster
}
trap test_cleanup INT
trap test_cleanup EXIT
setup_kafka_dist
setup_confluent_dist
start_kafka_cluster
start_confluent_schema_registry
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-confluent-schema-registry/target/TestAvroConsumerConfluent.jar
INPUT_MESSAGE_1='{"name":"Alyssa","favoriteNumber":"250","favoriteColor":"green","eventType":"meeting"}'
INPUT_MESSAGE_2='{"name":"Charlie","favoriteNumber":"10","favoriteColor":"blue","eventType":"meeting"}'
INPUT_MESSAGE_3='{"name":"Ben","favoriteNumber":"7","favoriteColor":"red","eventType":"meeting"}'
USER_SCHEMA='{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string","default":""},{"name":"favoriteNumber","type":"string","default":""},{"name":"favoriteColor","type":"string","default":""},{"name":"eventType","type":{"name":"EventType","type":"enum","symbols":["meeting"]}}]}'
curl -X POST \
${SCHEMA_REGISTRY_URL}/subjects/users-value/versions \
-H 'cache-control: no-cache' \
-H 'content-type: application/vnd.schemaregistry.v1+json' \
-d '{"schema": "{\"namespace\": \"example.avro\",\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"favoriteNumber\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"favoriteColor\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"eventType\",\"type\": {\"name\": \"EventType\",\"type\": \"enum\", \"symbols\": [\"meeting\"] }}]}"}'
echo "Sending messages to Kafka topic [test-avro-input] ..."
send_messages_to_kafka_avro $INPUT_MESSAGE_1 test-avro-input $USER_SCHEMA
send_messages_to_kafka_avro $INPUT_MESSAGE_2 test-avro-input $USER_SCHEMA
send_messages_to_kafka_avro $INPUT_MESSAGE_3 test-avro-input $USER_SCHEMA
start_cluster
create_kafka_topic 1 1 test-avro-out
# Read Avro message from [test-avro-input], check the schema and send message to [test-avro-out]
$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
--input-topic test-avro-input --output-topic test-avro-out \
--bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest \
--schema-registry-url ${SCHEMA_REGISTRY_URL}
#echo "Reading messages from Kafka topic [test-avro-out] ..."
KEY_1_MSGS=$(read_messages_from_kafka 3 test-avro-out Alyssa_consumer | grep Alyssa)
KEY_2_MSGS=$(read_messages_from_kafka 3 test-avro-out Charlie_consumer | grep Charlie)
KEY_3_MSGS=$(read_messages_from_kafka 3 test-avro-out Ben_consumer | grep Ben)
## Verifying AVRO output with actual message
verify_output $INPUT_MESSAGE_1 "$KEY_1_MSGS"
verify_output $INPUT_MESSAGE_2 "$KEY_2_MSGS"
verify_output $INPUT_MESSAGE_3 "$KEY_3_MSGS"