blob: 5a621cb91ec385415bdc904fe719225eda8f1d12 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
provinces = ("Beijing", "Shanghai", "Hangzhou", "Shenzhen", "Jiangxi", "Chongqing", "Xizang")
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def province_id_to_name(id):
return provinces[id]
def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
create_kafka_source_ddl = """
CREATE TABLE payment_msg(
createTime VARCHAR,
orderId BIGINT,
payAmount DOUBLE,
payPlatform INT,
provinceId INT
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'payment_msg',
'connector.properties.bootstrap.servers' = 'kafka:9092',
'connector.properties.group.id' = 'test_3',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
)
"""
create_es_sink_ddl = """
CREATE TABLE es_sink (
province VARCHAR PRIMARY KEY,
pay_amount DOUBLE
) with (
'connector.type' = 'elasticsearch',
'connector.version' = '7',
'connector.hosts' = 'http://elasticsearch:9200',
'connector.index' = 'platform_pay_amount_1',
'connector.document-type' = 'payment',
'update-mode' = 'upsert',
'connector.flush-on-checkpoint' = 'true',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.max-size' = '42mb',
'connector.bulk-flush.max-actions' = '32',
'connector.bulk-flush.interval' = '1000',
'connector.bulk-flush.backoff.delay' = '1000',
'format.type' = 'json'
)
"""
t_env.execute_sql(create_kafka_source_ddl)
t_env.execute_sql(create_es_sink_ddl)
t_env.register_function('province_id_to_name', province_id_to_name)
t_env.from_path("payment_msg") \
.select("province_id_to_name(provinceId) as province, payAmount") \
.group_by("province") \
.select("province, sum(payAmount) as pay_amount") \
.execute_insert("es_sink")
if __name__ == '__main__':
log_processing()