blob: 3f830d2688c820835cde2008216a4ecf14f402ed [file] [log] [blame]
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
CREATE TABLE JsonTable (
rowtime TIMESTAMP(3),
`user` STRING,
event ROW<`type` STRING, message STRING>,
WATERMARK FOR rowtime AS rowtime - INTERVAL '2' SECOND
) WITH (
'connector' = '$KAFKA_IDENTIFIER',
'topic' = '$TOPIC_JSON_NAME',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '$KAFKA_BOOTSTRAP_SERVERS',
'format' = 'json'
);
CREATE TABLE AvroTable (
event_timestamp STRING,
`user` STRING,
message STRING,
duplicate_count BIGINT
) WITH (
'connector' = '$KAFKA_IDENTIFIER',
'topic' = '$TOPIC_AVRO_NAME',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '$KAFKA_BOOTSTRAP_SERVERS',
'format' = 'avro'
);
CREATE TABLE CsvTable (
event_timestamp STRING,
`user` STRING,
message STRING,
duplicate_count BIGINT,
constant STRING
) WITH (
'connector' = 'filesystem',
'path' = '$RESULT',
'sink.rolling-policy.rollover-interval' = '2s',
'sink.rolling-policy.check-interval' = '2s',
'format' = 'csv',
'csv.disable-quote-character' = 'true'
);
CREATE FUNCTION RegReplace AS 'org.apache.flink.table.toolbox.StringRegexReplaceFunction';
INSERT INTO AvroTable -- read from Kafka Json, window aggregation, and write into Kafka Avro
SELECT
CAST(TUMBLE_START(rowtime, INTERVAL '1' HOUR) AS VARCHAR) AS event_timestamp,
`user`,
RegReplace(event.message, ' is ', ' was ') AS message,
COUNT(*) AS duplicate_count
FROM JsonTable
WHERE `user` IS NOT NULL
GROUP BY
`user`,
event.message,
TUMBLE(rowtime, INTERVAL '1' HOUR);
INSERT INTO CsvTable -- read from Kafka Avro, and write into Filesystem Csv
SELECT AvroTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant
FROM AvroTable;