| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| |
| from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic |
| from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings |
| from pyflink.table.udf import udf |
| |
| |
| provinces = ("Beijing", "Shanghai", "Hangzhou", "Shenzhen", "Jiangxi", "Chongqing", "Xizang") |
| |
| |
| @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) |
| def province_id_to_name(id): |
| return provinces[id] |
| |
| |
| def log_processing(): |
| env = StreamExecutionEnvironment.get_execution_environment() |
| t_env = StreamTableEnvironment.create(stream_execution_environment=env) |
| t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True) |
| |
| create_kafka_source_ddl = """ |
| CREATE TABLE payment_msg( |
| createTime VARCHAR, |
| orderId BIGINT, |
| payAmount DOUBLE, |
| payPlatform INT, |
| provinceId INT |
| ) WITH ( |
| 'connector.type' = 'kafka', |
| 'connector.version' = 'universal', |
| 'connector.topic' = 'payment_msg', |
| 'connector.properties.bootstrap.servers' = 'kafka:9092', |
| 'connector.properties.group.id' = 'test_3', |
| 'connector.startup-mode' = 'latest-offset', |
| 'format.type' = 'json' |
| ) |
| """ |
| |
| create_es_sink_ddl = """ |
| CREATE TABLE es_sink ( |
| province VARCHAR PRIMARY KEY, |
| pay_amount DOUBLE |
| ) with ( |
| 'connector.type' = 'elasticsearch', |
| 'connector.version' = '7', |
| 'connector.hosts' = 'http://elasticsearch:9200', |
| 'connector.index' = 'platform_pay_amount_1', |
| 'connector.document-type' = 'payment', |
| 'update-mode' = 'upsert', |
| 'connector.flush-on-checkpoint' = 'true', |
| 'connector.key-delimiter' = '$', |
| 'connector.key-null-literal' = 'n/a', |
| 'connector.bulk-flush.max-size' = '42mb', |
| 'connector.bulk-flush.max-actions' = '32', |
| 'connector.bulk-flush.interval' = '1000', |
| 'connector.bulk-flush.backoff.delay' = '1000', |
| 'format.type' = 'json' |
| ) |
| """ |
| |
| t_env.execute_sql(create_kafka_source_ddl) |
| t_env.execute_sql(create_es_sink_ddl) |
| t_env.register_function('province_id_to_name', province_id_to_name) |
| |
| t_env.from_path("payment_msg") \ |
| .select("province_id_to_name(provinceId) as province, payAmount") \ |
| .group_by("province") \ |
| .select("province, sum(payAmount) as pay_amount") \ |
| .execute_insert("es_sink") |
| |
| |
| if __name__ == '__main__': |
| log_processing() |
| |