| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| import unittest |
| |
| from pyflink.datastream import TimeCharacteristic |
| from pyflink.table import expressions as expr |
| from pyflink.table.types import DataTypes |
| from pyflink.table.udf import udaf, udf, AggregateFunction |
| from pyflink.testing import source_sink_utils |
| from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase, \ |
| PyFlinkBlinkStreamTableTestCase |
| |
| |
| class BatchPandasUDAFITTests(PyFlinkBlinkBatchTableTestCase): |
| |
| def test_check_result_type(self): |
| def pandas_udaf(): |
| pass |
| |
| with self.assertRaises( |
| TypeError, |
| msg="Invalid returnType: Pandas UDAF doesn't support DataType type ROW currently"): |
| udaf(pandas_udaf, result_type=DataTypes.ROW(), func_type="pandas") |
| |
| with self.assertRaises( |
| TypeError, |
| msg="Invalid returnType: Pandas UDAF doesn't support DataType type MAP currently"): |
| udaf(pandas_udaf, result_type=DataTypes.MAP(DataTypes.INT(), DataTypes.INT()), |
| func_type="pandas") |
| |
| def test_group_aggregate_function(self): |
| t = self.t_env.from_elements( |
| [(1, 2, 3), (3, 2, 3), (2, 1, 3), (1, 5, 4), (1, 8, 6), (2, 3, 4)], |
| DataTypes.ROW( |
| [DataTypes.FIELD("a", DataTypes.TINYINT()), |
| DataTypes.FIELD("b", DataTypes.SMALLINT()), |
| DataTypes.FIELD("c", DataTypes.INT())])) |
| |
| table_sink = source_sink_utils.TestAppendSink( |
| ['a', 'b', 'c'], |
| [DataTypes.TINYINT(), DataTypes.FLOAT(), DataTypes.INT()]) |
| self.t_env.register_table_sink("Results", table_sink) |
| # general udf |
| add = udf(lambda a: a + 1, result_type=DataTypes.INT()) |
| # pandas udf |
| substract = udf(lambda a: a - 1, result_type=DataTypes.INT(), func_type="pandas") |
| max_udaf = udaf(lambda a: a.max(), result_type=DataTypes.INT(), func_type="pandas") |
| t.group_by("a") \ |
| .select(t.a, mean_udaf(add(t.b)), max_udaf(substract(t.c))) \ |
| .execute_insert("Results") \ |
| .wait() |
| actual = source_sink_utils.results() |
| self.assert_equals(actual, ["1,6.0,5", "2,3.0,3", "3,3.0,2"]) |
| |
| def test_group_aggregate_without_keys(self): |
| t = self.t_env.from_elements( |
| [(1, 2, 3), (3, 2, 3), (2, 1, 3), (1, 5, 4), (1, 8, 6), (2, 3, 4)], |
| DataTypes.ROW( |
| [DataTypes.FIELD("a", DataTypes.TINYINT()), |
| DataTypes.FIELD("b", DataTypes.SMALLINT()), |
| DataTypes.FIELD("c", DataTypes.INT())])) |
| |
| table_sink = source_sink_utils.TestAppendSink( |
| ['a'], |
| [DataTypes.INT()]) |
| min_add = udaf(lambda a, b, c: a.min() + b.min() + c.min(), |
| result_type=DataTypes.INT(), func_type="pandas") |
| self.t_env.register_table_sink("Results", table_sink) |
| t.select(min_add(t.a, t.b, t.c)) \ |
| .execute_insert("Results") \ |
| .wait() |
| actual = source_sink_utils.results() |
| self.assert_equals(actual, ["5"]) |
| |
| def test_group_aggregate_with_aux_group(self): |
| t = self.t_env.from_elements( |
| [(1, 2, 3), (3, 2, 3), (2, 1, 3), (1, 5, 4), (1, 8, 6), (2, 3, 4)], |
| DataTypes.ROW( |
| [DataTypes.FIELD("a", DataTypes.TINYINT()), |
| DataTypes.FIELD("b", DataTypes.SMALLINT()), |
| DataTypes.FIELD("c", DataTypes.INT())])) |
| |
| table_sink = source_sink_utils.TestAppendSink( |
| ['a', 'b', 'c', 'd'], |
| [DataTypes.TINYINT(), DataTypes.INT(), DataTypes.FLOAT(), DataTypes.INT()]) |
| self.t_env.register_table_sink("Results", table_sink) |
| self.t_env.get_config().get_configuration().set_string('python.metric.enabled', 'true') |
| self.t_env.register_function("max_add", udaf(MaxAdd(), |
| result_type=DataTypes.INT(), |
| func_type="pandas")) |
| self.t_env.create_temporary_system_function("mean_udaf", mean_udaf) |
| t.group_by("a") \ |
| .select("a, a + 1 as b, a + 2 as c") \ |
| .group_by("a, b") \ |
| .select("a, b, mean_udaf(b), max_add(b, c, 1)") \ |
| .execute_insert("Results") \ |
| .wait() |
| actual = source_sink_utils.results() |
| self.assert_equals(actual, ["1,2,2.0,6", "2,3,3.0,8", "3,4,4.0,10"]) |
| |
| def test_tumble_group_window_aggregate_function(self): |
| import datetime |
| from pyflink.table.window import Tumble |
| t = self.t_env.from_elements( |
| [ |
| (1, 2, 3, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)), |
| (3, 2, 4, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)), |
| (2, 1, 2, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)), |
| (1, 3, 1, datetime.datetime(2018, 3, 11, 3, 40, 0, 0)), |
| (1, 8, 5, datetime.datetime(2018, 3, 11, 4, 20, 0, 0)), |
| (2, 3, 6, datetime.datetime(2018, 3, 11, 3, 30, 0, 0)) |
| ], |
| DataTypes.ROW( |
| [DataTypes.FIELD("a", DataTypes.TINYINT()), |
| DataTypes.FIELD("b", DataTypes.SMALLINT()), |
| DataTypes.FIELD("c", DataTypes.INT()), |
| DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))])) |
| |
| table_sink = source_sink_utils.TestAppendSink( |
| ['a', 'b', 'c'], |
| [ |
| DataTypes.TIMESTAMP(3), |
| DataTypes.TIMESTAMP(3), |
| DataTypes.FLOAT() |
| ]) |
| self.t_env.register_table_sink("Results", table_sink) |
| self.t_env.create_temporary_system_function("mean_udaf", mean_udaf) |
| tumble_window = Tumble.over(expr.lit(1).hours) \ |
| .on(expr.col("rowtime")) \ |
| .alias("w") |
| t.window(tumble_window) \ |
| .group_by("w") \ |
| .select("w.start, w.end, mean_udaf(b)") \ |
| .execute_insert("Results") \ |
| .wait() |
| |
| actual = source_sink_utils.results() |
| self.assert_equals(actual, |
| ["2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2.2", |
| "2018-03-11 04:00:00.0,2018-03-11 05:00:00.0,8.0"]) |
| |
| def test_slide_group_window_aggregate_function(self): |
| import datetime |
| from pyflink.table.window import Slide |
| t = self.t_env.from_elements( |
| [ |
| (1, 2, 3, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)), |
| (3, 2, 4, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)), |
| (2, 1, 2, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)), |
| (1, 3, 1, datetime.datetime(2018, 3, 11, 3, 40, 0, 0)), |
| (1, 8, 5, datetime.datetime(2018, 3, 11, 4, 20, 0, 0)), |
| (2, 3, 6, datetime.datetime(2018, 3, 11, 3, 30, 0, 0)) |
| ], |
| DataTypes.ROW( |
| [DataTypes.FIELD("a", DataTypes.TINYINT()), |
| DataTypes.FIELD("b", DataTypes.SMALLINT()), |
| DataTypes.FIELD("c", DataTypes.INT()), |
| DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))])) |
| |
| table_sink = source_sink_utils.TestAppendSink( |
| ['a', 'b', 'c', 'd', 'e'], |
| [ |
| DataTypes.TINYINT(), |
| DataTypes.TIMESTAMP(3), |
| DataTypes.TIMESTAMP(3), |
| DataTypes.FLOAT(), |
| DataTypes.INT() |
| ]) |
| self.t_env.register_table_sink("Results", table_sink) |
| self.t_env.register_function("max_add", udaf(MaxAdd(), |
| result_type=DataTypes.INT(), |
| func_type="pandas")) |
| self.t_env.create_temporary_system_function("mean_udaf", mean_udaf) |
| slide_window = Slide.over(expr.lit(1).hours) \ |
| .every(expr.lit(30).minutes) \ |
| .on(expr.col("rowtime")) \ |
| .alias("w") |
| t.window(slide_window) \ |
| .group_by("a, w") \ |
| .select("a, w.start, w.end, mean_udaf(b), max_add(b, c, 1)") \ |
| .execute_insert("Results") \ |
| .wait() |
| actual = source_sink_utils.results() |
| self.assert_equals(actual, |
| ["1,2018-03-11 02:30:00.0,2018-03-11 03:30:00.0,2.0,6", |
| "1,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2.5,7", |
| "1,2018-03-11 03:30:00.0,2018-03-11 04:30:00.0,5.5,14", |
| "1,2018-03-11 04:00:00.0,2018-03-11 05:00:00.0,8.0,14", |
| "2,2018-03-11 02:30:00.0,2018-03-11 03:30:00.0,1.0,4", |
| "2,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2.0,10", |
| "2,2018-03-11 03:30:00.0,2018-03-11 04:30:00.0,3.0,10", |
| "3,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2.0,7", |
| "3,2018-03-11 02:30:00.0,2018-03-11 03:30:00.0,2.0,7"]) |
| |
| def test_over_window_aggregate_function(self): |
| import datetime |
| t = self.t_env.from_elements( |
| [ |
| (1, 2, 3, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)), |
| (3, 2, 1, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)), |
| (2, 1, 2, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)), |
| (1, 3, 1, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)), |
| (1, 8, 5, datetime.datetime(2018, 3, 11, 4, 20, 0, 0)), |
| (2, 3, 6, datetime.datetime(2018, 3, 11, 3, 30, 0, 0)) |
| ], |
| DataTypes.ROW( |
| [DataTypes.FIELD("a", DataTypes.TINYINT()), |
| DataTypes.FIELD("b", DataTypes.SMALLINT()), |
| DataTypes.FIELD("c", DataTypes.INT()), |
| DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))])) |
| |
| table_sink = source_sink_utils.TestAppendSink( |
| ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'], |
| [DataTypes.TINYINT(), DataTypes.FLOAT(), DataTypes.INT(), DataTypes.FLOAT(), |
| DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), |
| DataTypes.FLOAT(), DataTypes.FLOAT()]) |
| self.t_env.register_table_sink("Results", table_sink) |
| self.t_env.create_temporary_system_function("mean_udaf", mean_udaf) |
| self.t_env.register_function("max_add", udaf(MaxAdd(), |
| result_type=DataTypes.INT(), |
| func_type="pandas")) |
| self.t_env.register_table("T", t) |
| self.t_env.execute_sql(""" |
| insert into Results |
| select a, |
| mean_udaf(b) |
| over (PARTITION BY a ORDER BY rowtime |
| ROWS BETWEEN UNBOUNDED preceding AND UNBOUNDED FOLLOWING), |
| max_add(b, c) |
| over (PARTITION BY a ORDER BY rowtime |
| ROWS BETWEEN UNBOUNDED preceding AND 0 FOLLOWING), |
| mean_udaf(b) |
| over (PARTITION BY a ORDER BY rowtime |
| ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING), |
| mean_udaf(c) |
| over (PARTITION BY a ORDER BY rowtime |
| ROWS BETWEEN 1 PRECEDING AND 0 FOLLOWING), |
| mean_udaf(c) |
| over (PARTITION BY a ORDER BY rowtime |
| RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), |
| mean_udaf(b) |
| over (PARTITION BY a ORDER BY rowtime |
| RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), |
| mean_udaf(b) |
| over (PARTITION BY a ORDER BY rowtime |
| RANGE BETWEEN INTERVAL '20' MINUTE PRECEDING AND UNBOUNDED FOLLOWING), |
| mean_udaf(c) |
| over (PARTITION BY a ORDER BY rowtime |
| RANGE BETWEEN INTERVAL '20' MINUTE PRECEDING AND UNBOUNDED FOLLOWING), |
| mean_udaf(c) |
| over (PARTITION BY a ORDER BY rowtime |
| RANGE BETWEEN INTERVAL '20' MINUTE PRECEDING AND CURRENT ROW) |
| from T |
| """).wait() |
| actual = source_sink_utils.results() |
| self.assert_equals(actual, |
| ["1,4.3333335,5,4.3333335,3.0,3.0,2.5,4.3333335,3.0,2.0", |
| "1,4.3333335,13,5.5,3.0,3.0,4.3333335,8.0,5.0,5.0", |
| "1,4.3333335,6,4.3333335,2.0,3.0,2.5,4.3333335,3.0,2.0", |
| "2,2.0,9,2.0,4.0,4.0,2.0,2.0,4.0,4.0", |
| "2,2.0,3,2.0,2.0,4.0,1.0,2.0,4.0,2.0", |
| "3,2.0,3,2.0,1.0,1.0,2.0,2.0,1.0,1.0"]) |
| |
| |
| class StreamPandasUDAFITTests(PyFlinkBlinkStreamTableTestCase): |
| def test_sliding_group_window_over_time(self): |
| # create source file path |
| import tempfile |
| import os |
| tmp_dir = tempfile.gettempdir() |
| data = [ |
| '1,1,2,2018-03-11 03:10:00', |
| '3,3,2,2018-03-11 03:10:00', |
| '2,2,1,2018-03-11 03:10:00', |
| '1,1,3,2018-03-11 03:40:00', |
| '1,1,8,2018-03-11 04:20:00', |
| '2,2,3,2018-03-11 03:30:00' |
| ] |
| source_path = tmp_dir + '/test_sliding_group_window_over_time.csv' |
| with open(source_path, 'w') as fd: |
| for ele in data: |
| fd.write(ele + '\n') |
| |
| from pyflink.table.window import Slide |
| self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) |
| self.t_env.register_function("mean_udaf", mean_udaf) |
| |
| source_table = """ |
| create table source_table( |
| a TINYINT, |
| b SMALLINT, |
| c SMALLINT, |
| rowtime TIMESTAMP(3), |
| WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE |
| ) with( |
| 'connector.type' = 'filesystem', |
| 'format.type' = 'csv', |
| 'connector.path' = '%s', |
| 'format.ignore-first-line' = 'false', |
| 'format.field-delimiter' = ',' |
| ) |
| """ % source_path |
| self.t_env.execute_sql(source_table) |
| t = self.t_env.from_path("source_table") |
| |
| table_sink = source_sink_utils.TestAppendSink( |
| ['a', 'b', 'c', 'd'], |
| [ |
| DataTypes.TINYINT(), |
| DataTypes.TIMESTAMP(3), |
| DataTypes.TIMESTAMP(3), |
| DataTypes.FLOAT()]) |
| self.t_env.register_table_sink("Results", table_sink) |
| t.window(Slide.over("1.hours").every("30.minutes").on("rowtime").alias("w")) \ |
| .group_by("a, b, w") \ |
| .select("a, w.start, w.end, mean_udaf(c) as b") \ |
| .execute_insert("Results") \ |
| .wait() |
| actual = source_sink_utils.results() |
| self.assert_equals(actual, |
| ["1,2018-03-11 02:30:00.0,2018-03-11 03:30:00.0,2.0", |
| "1,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2.5", |
| "1,2018-03-11 03:30:00.0,2018-03-11 04:30:00.0,5.5", |
| "1,2018-03-11 04:00:00.0,2018-03-11 05:00:00.0,8.0", |
| "2,2018-03-11 02:30:00.0,2018-03-11 03:30:00.0,1.0", |
| "2,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2.0", |
| "2,2018-03-11 03:30:00.0,2018-03-11 04:30:00.0,3.0", |
| "3,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2.0", |
| "3,2018-03-11 02:30:00.0,2018-03-11 03:30:00.0,2.0"]) |
| os.remove(source_path) |
| |
| def test_sliding_group_window_over_count(self): |
| self.env.set_parallelism(1) |
| # create source file path |
| import tempfile |
| import os |
| tmp_dir = tempfile.gettempdir() |
| data = [ |
| '1,1,2,2018-03-11 03:10:00', |
| '3,3,2,2018-03-11 03:10:00', |
| '2,2,1,2018-03-11 03:10:00', |
| '1,1,3,2018-03-11 03:40:00', |
| '1,1,8,2018-03-11 04:20:00', |
| '2,2,3,2018-03-11 03:30:00', |
| '3,3,3,2018-03-11 03:30:00' |
| ] |
| source_path = tmp_dir + '/test_sliding_group_window_over_count.csv' |
| with open(source_path, 'w') as fd: |
| for ele in data: |
| fd.write(ele + '\n') |
| |
| from pyflink.table.window import Slide |
| from pyflink.datastream import TimeCharacteristic |
| self.env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime) |
| self.t_env.register_function("mean_udaf", mean_udaf) |
| |
| source_table = """ |
| create table source_table( |
| a TINYINT, |
| b SMALLINT, |
| c SMALLINT, |
| protime as PROCTIME() |
| ) with( |
| 'connector.type' = 'filesystem', |
| 'format.type' = 'csv', |
| 'connector.path' = '%s', |
| 'format.ignore-first-line' = 'false', |
| 'format.field-delimiter' = ',' |
| ) |
| """ % source_path |
| self.t_env.execute_sql(source_table) |
| t = self.t_env.from_path("source_table") |
| |
| table_sink = source_sink_utils.TestAppendSink( |
| ['a', 'd'], |
| [ |
| DataTypes.TINYINT(), |
| DataTypes.FLOAT()]) |
| self.t_env.register_table_sink("Results", table_sink) |
| t.window(Slide.over("2.rows").every("1.rows").on("protime").alias("w")) \ |
| .group_by("a, b, w") \ |
| .select("a, mean_udaf(c) as b") \ |
| .execute_insert("Results") \ |
| .wait() |
| actual = source_sink_utils.results() |
| self.assert_equals(actual, ["1,2.5", "1,5.5", "2,2.0", "3,2.5"]) |
| os.remove(source_path) |
| |
| def test_tumbling_group_window_over_time(self): |
| # create source file path |
| import tempfile |
| import os |
| tmp_dir = tempfile.gettempdir() |
| data = [ |
| '1,1,2,2018-03-11 03:10:00', |
| '3,3,2,2018-03-11 03:10:00', |
| '2,2,1,2018-03-11 03:10:00', |
| '1,1,3,2018-03-11 03:40:00', |
| '1,1,8,2018-03-11 04:20:00', |
| '2,2,3,2018-03-11 03:30:00' |
| ] |
| source_path = tmp_dir + '/test_tumbling_group_window_over_time.csv' |
| with open(source_path, 'w') as fd: |
| for ele in data: |
| fd.write(ele + '\n') |
| |
| from pyflink.table.window import Tumble |
| self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) |
| self.t_env.register_function("mean_udaf", mean_udaf) |
| |
| source_table = """ |
| create table source_table( |
| a TINYINT, |
| b SMALLINT, |
| c SMALLINT, |
| rowtime TIMESTAMP(3), |
| WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE |
| ) with( |
| 'connector.type' = 'filesystem', |
| 'format.type' = 'csv', |
| 'connector.path' = '%s', |
| 'format.ignore-first-line' = 'false', |
| 'format.field-delimiter' = ',' |
| ) |
| """ % source_path |
| self.t_env.execute_sql(source_table) |
| t = self.t_env.from_path("source_table") |
| |
| table_sink = source_sink_utils.TestAppendSink( |
| ['a', 'b', 'c', 'd', 'e'], |
| [ |
| DataTypes.TINYINT(), |
| DataTypes.TIMESTAMP(3), |
| DataTypes.TIMESTAMP(3), |
| DataTypes.TIMESTAMP(3), |
| DataTypes.FLOAT()]) |
| self.t_env.register_table_sink("Results", table_sink) |
| t.window(Tumble.over("1.hours").on("rowtime").alias("w")) \ |
| .group_by("a, b, w") \ |
| .select("a, w.start, w.end, w.rowtime, mean_udaf(c) as b") \ |
| .execute_insert("Results") \ |
| .wait() |
| actual = source_sink_utils.results() |
| self.assert_equals(actual, [ |
| "1,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2018-03-11 03:59:59.999,2.5", |
| "1,2018-03-11 04:00:00.0,2018-03-11 05:00:00.0,2018-03-11 04:59:59.999,8.0", |
| "2,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2018-03-11 03:59:59.999,2.0", |
| "3,2018-03-11 03:00:00.0,2018-03-11 04:00:00.0,2018-03-11 03:59:59.999,2.0", |
| ]) |
| os.remove(source_path) |
| |
| def test_tumbling_group_window_over_count(self): |
| self.env.set_parallelism(1) |
| # create source file path |
| import tempfile |
| import os |
| tmp_dir = tempfile.gettempdir() |
| data = [ |
| '1,1,2,2018-03-11 03:10:00', |
| '3,3,2,2018-03-11 03:10:00', |
| '2,2,1,2018-03-11 03:10:00', |
| '1,1,3,2018-03-11 03:40:00', |
| '1,1,8,2018-03-11 04:20:00', |
| '2,2,3,2018-03-11 03:30:00', |
| '3,3,3,2018-03-11 03:30:00', |
| '1,1,4,2018-03-11 04:20:00', |
| ] |
| source_path = tmp_dir + '/test_group_window_aggregate_function_over_count.csv' |
| with open(source_path, 'w') as fd: |
| for ele in data: |
| fd.write(ele + '\n') |
| |
| from pyflink.table.window import Tumble |
| from pyflink.datastream import TimeCharacteristic |
| self.env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime) |
| self.t_env.register_function("mean_udaf", mean_udaf) |
| |
| source_table = """ |
| create table source_table( |
| a TINYINT, |
| b SMALLINT, |
| c SMALLINT, |
| protime as PROCTIME() |
| ) with( |
| 'connector.type' = 'filesystem', |
| 'format.type' = 'csv', |
| 'connector.path' = '%s', |
| 'format.ignore-first-line' = 'false', |
| 'format.field-delimiter' = ',' |
| ) |
| """ % source_path |
| self.t_env.execute_sql(source_table) |
| t = self.t_env.from_path("source_table") |
| |
| table_sink = source_sink_utils.TestAppendSink( |
| ['a', 'd'], |
| [ |
| DataTypes.TINYINT(), |
| DataTypes.FLOAT()]) |
| self.t_env.register_table_sink("Results", table_sink) |
| t.window(Tumble.over("2.rows").on("protime").alias("w")) \ |
| .group_by("a, b, w") \ |
| .select("a, mean_udaf(c) as b") \ |
| .execute_insert("Results") \ |
| .wait() |
| actual = source_sink_utils.results() |
| self.assert_equals(actual, ["1,2.5", "1,6.0", "2,2.0", "3,2.5"]) |
| os.remove(source_path) |
| |
| def test_row_time_over_range_window_aggregate_function(self): |
| # create source file path |
| import tempfile |
| import os |
| tmp_dir = tempfile.gettempdir() |
| data = [ |
| '1,1,2013-01-01 03:10:00', |
| '3,2,2013-01-01 03:10:00', |
| '2,1,2013-01-01 03:10:00', |
| '1,5,2013-01-01 03:10:00', |
| '1,8,2013-01-01 04:20:00', |
| '2,3,2013-01-01 03:30:00' |
| ] |
| source_path = tmp_dir + '/test_over_range_window_aggregate_function.csv' |
| with open(source_path, 'w') as fd: |
| for ele in data: |
| fd.write(ele + '\n') |
| max_add_min_udaf = udaf(lambda a: a.max() + a.min(), |
| result_type=DataTypes.SMALLINT(), |
| func_type='pandas') |
| self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) |
| self.t_env.register_function("mean_udaf", mean_udaf) |
| self.t_env.register_function("max_add_min_udaf", max_add_min_udaf) |
| source_table = """ |
| create table source_table( |
| a TINYINT, |
| b SMALLINT, |
| rowtime TIMESTAMP(3), |
| WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE |
| ) with( |
| 'connector.type' = 'filesystem', |
| 'format.type' = 'csv', |
| 'connector.path' = '%s', |
| 'format.ignore-first-line' = 'false', |
| 'format.field-delimiter' = ',' |
| ) |
| """ % source_path |
| self.t_env.execute_sql(source_table) |
| table_sink = source_sink_utils.TestAppendSink( |
| ['a', 'b', 'c'], |
| [ |
| DataTypes.TINYINT(), |
| DataTypes.FLOAT(), |
| DataTypes.SMALLINT()]) |
| self.t_env.register_table_sink("Results", table_sink) |
| self.t_env.execute_sql(""" |
| insert into Results |
| select a, |
| mean_udaf(b) |
| over (PARTITION BY a ORDER BY rowtime |
| RANGE BETWEEN INTERVAL '20' MINUTE PRECEDING AND CURRENT ROW), |
| max_add_min_udaf(b) |
| over (PARTITION BY a ORDER BY rowtime |
| RANGE BETWEEN INTERVAL '20' MINUTE PRECEDING AND CURRENT ROW) |
| from source_table |
| """).wait() |
| actual = source_sink_utils.results() |
| self.assert_equals(actual, |
| ["1,3.0,6", "1,3.0,6", "1,8.0,16", "2,1.0,2", "2,2.0,4", "3,2.0,4"]) |
| os.remove(source_path) |
| |
| def test_row_time_over_rows_window_aggregate_function(self): |
| # create source file path |
| import tempfile |
| import os |
| tmp_dir = tempfile.gettempdir() |
| data = [ |
| '1,1,2013-01-01 03:10:00', |
| '3,2,2013-01-01 03:10:00', |
| '2,1,2013-01-01 03:10:00', |
| '1,5,2013-01-01 03:10:00', |
| '1,8,2013-01-01 04:20:00', |
| '2,3,2013-01-01 03:30:00' |
| ] |
| source_path = tmp_dir + '/test_over_rows_window_aggregate_function.csv' |
| with open(source_path, 'w') as fd: |
| for ele in data: |
| fd.write(ele + '\n') |
| |
| max_add_min_udaf = udaf(lambda a: a.max() + a.min(), |
| result_type=DataTypes.SMALLINT(), |
| func_type='pandas') |
| self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) |
| self.t_env.register_function("mean_udaf", mean_udaf) |
| self.t_env.register_function("max_add_min_udaf", max_add_min_udaf) |
| source_table = """ |
| create table source_table( |
| a TINYINT, |
| b SMALLINT, |
| rowtime TIMESTAMP(3), |
| WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE |
| ) with( |
| 'connector.type' = 'filesystem', |
| 'format.type' = 'csv', |
| 'connector.path' = '%s', |
| 'format.ignore-first-line' = 'false', |
| 'format.field-delimiter' = ',' |
| ) |
| """ % source_path |
| self.t_env.execute_sql(source_table) |
| table_sink = source_sink_utils.TestAppendSink( |
| ['a', 'b', 'c'], |
| [ |
| DataTypes.TINYINT(), |
| DataTypes.FLOAT(), |
| DataTypes.SMALLINT()]) |
| self.t_env.register_table_sink("Results", table_sink) |
| self.t_env.execute_sql(""" |
| insert into Results |
| select a, |
| mean_udaf(b) |
| over (PARTITION BY a ORDER BY rowtime |
| ROWS BETWEEN 1 PRECEDING AND CURRENT ROW), |
| max_add_min_udaf(b) |
| over (PARTITION BY a ORDER BY rowtime |
| ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) |
| from source_table |
| """).wait() |
| actual = source_sink_utils.results() |
| self.assert_equals(actual, |
| ["1,1.0,2", "1,3.0,6", "1,6.5,13", "2,1.0,2", "2,2.0,4", "3,2.0,4"]) |
| os.remove(source_path) |
| |
| def test_proc_time_over_rows_window_aggregate_function(self): |
| # create source file path |
| import tempfile |
| import os |
| tmp_dir = tempfile.gettempdir() |
| data = [ |
| '1,1,2013-01-01 03:10:00', |
| '3,2,2013-01-01 03:10:00', |
| '2,1,2013-01-01 03:10:00', |
| '1,5,2013-01-01 03:10:00', |
| '1,8,2013-01-01 04:20:00', |
| '2,3,2013-01-01 03:30:00' |
| ] |
| source_path = tmp_dir + '/test_over_rows_window_aggregate_function.csv' |
| with open(source_path, 'w') as fd: |
| for ele in data: |
| fd.write(ele + '\n') |
| |
| max_add_min_udaf = udaf(lambda a: a.max() + a.min(), |
| result_type=DataTypes.SMALLINT(), |
| func_type='pandas') |
| self.env.set_parallelism(1) |
| self.env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime) |
| self.t_env.register_function("mean_udaf", mean_udaf) |
| self.t_env.register_function("max_add_min_udaf", max_add_min_udaf) |
| source_table = """ |
| create table source_table( |
| a TINYINT, |
| b SMALLINT, |
| proctime as PROCTIME() |
| ) with( |
| 'connector.type' = 'filesystem', |
| 'format.type' = 'csv', |
| 'connector.path' = '%s', |
| 'format.ignore-first-line' = 'false', |
| 'format.field-delimiter' = ',' |
| ) |
| """ % source_path |
| self.t_env.execute_sql(source_table) |
| table_sink = source_sink_utils.TestAppendSink( |
| ['a', 'b', 'c'], |
| [ |
| DataTypes.TINYINT(), |
| DataTypes.FLOAT(), |
| DataTypes.SMALLINT()]) |
| self.t_env.register_table_sink("Results", table_sink) |
| self.t_env.execute_sql(""" |
| insert into Results |
| select a, |
| mean_udaf(b) |
| over (PARTITION BY a ORDER BY proctime |
| ROWS BETWEEN 1 PRECEDING AND CURRENT ROW), |
| max_add_min_udaf(b) |
| over (PARTITION BY a ORDER BY proctime |
| ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) |
| from source_table |
| """).wait() |
| actual = source_sink_utils.results() |
| self.assert_equals(actual, |
| ["1,1.0,2", "1,3.0,6", "1,6.5,13", "2,1.0,2", "2,2.0,4", "3,2.0,4"]) |
| os.remove(source_path) |
| |
| |
| @udaf(result_type=DataTypes.FLOAT(), func_type="pandas") |
| def mean_udaf(v): |
| return v.mean() |
| |
| |
| class MaxAdd(AggregateFunction, unittest.TestCase): |
| |
| def open(self, function_context): |
| mg = function_context.get_metric_group() |
| self.counter = mg.add_group("key", "value").counter("my_counter") |
| self.counter_sum = 0 |
| |
| def get_value(self, accumulator): |
| # counter |
| self.counter.inc(10) |
| self.counter_sum += 10 |
| return accumulator[0] |
| |
| def create_accumulator(self): |
| return [] |
| |
| def accumulate(self, accumulator, *args): |
| result = 0 |
| for arg in args: |
| result += arg.max() |
| accumulator.append(result) |
| |
| |
| if __name__ == '__main__': |
| import unittest |
| |
| try: |
| import xmlrunner |
| |
| testRunner = xmlrunner.XMLTestRunner(output='target/test-reports') |
| except ImportError: |
| testRunner = None |
| unittest.main(testRunner=testRunner, verbosity=2) |