blob: 6555c2ddd368ac0a770262ac672b55d0d668ecbe [file] [log] [blame]
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
KAFKA_VERSION="$1"
CONFLUENT_VERSION="$2"
CONFLUENT_MAJOR_VERSION="$3"
KAFKA_SQL_VERSION="$4"
source "$(dirname "$0")"/kafka-common.sh $1 $2 $3
function create_kafka_json_source {
topicName="$1"
create_kafka_topic 1 1 $topicName
# put JSON data into Kafka
echo "Sending messages to Kafka..."
send_messages_to_kafka '{"timestamp": "2018-03-12T08:00:00Z", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' $topicName
send_messages_to_kafka '{"timestamp": "2018-03-12T08:10:00Z", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' $topicName
send_messages_to_kafka '{"timestamp": "2018-03-12T09:00:00Z", "user": "Bob", "event": { "type": "WARNING", "message": "This is another warning."}}' $topicName
send_messages_to_kafka '{"timestamp": "2018-03-12T09:10:00Z", "user": "Alice", "event": { "type": "INFO", "message": "This is a info."}}' $topicName
send_messages_to_kafka '{"timestamp": "2018-03-12T09:20:00Z", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' $topicName
send_messages_to_kafka '{"timestamp": "2018-03-12T09:30:00Z", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' $topicName
send_messages_to_kafka '{"timestamp": "2018-03-12T09:30:00Z", "user": null, "event": { "type": "WARNING", "message": "This is a bad message because the user is missing."}}' $topicName
send_messages_to_kafka '{"timestamp": "2018-03-12T10:40:00Z", "user": "Bob", "event": { "type": "ERROR", "message": "This is an error."}}' $topicName
}
function get_kafka_json_source_schema {
topicName="$1"
tableName="$2"
cat << EOF
CREATE TABLE $tableName (
\`timestamp\` TIMESTAMP_LTZ(3),
\`user\` STRING,
\`event\` ROW<type STRING, message STRING>,
\`rowtime\` as \`timestamp\`,
WATERMARK FOR \`rowtime\` AS \`rowtime\` - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '$topicName',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
EOF
}