blob: 09e17dd1fca015a11a8b4cd96337ea60219f70f0 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import uuid
from pyflink.common import Row
from pyflink.table.expressions import col, call, lit, row_interval
from pyflink.table.types import DataTypes
from pyflink.table.udf import udaf, udf, AggregateFunction
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import PyFlinkBatchTableTestCase, \
PyFlinkStreamTableTestCase
def generate_random_table_name():
return "Table{0}".format(str(uuid.uuid1()).replace("-", "_"))
class BatchPandasUDAFITTests(PyFlinkBatchTableTestCase):
@classmethod
def setUpClass(cls):
super(BatchPandasUDAFITTests, cls).setUpClass()
cls.t_env.create_temporary_system_function("max_add", udaf(MaxAdd(),
result_type=DataTypes.INT(),
func_type="pandas"))
cls.t_env.create_temporary_system_function("mean_udaf", mean_udaf)
def test_pandas_udaf_in_sql(self):
sql = f"""
CREATE TEMPORARY FUNCTION pymean AS
'{BatchPandasUDAFITTests.__module__}.mean_str_udaf'
LANGUAGE PYTHON
"""
self.t_env.execute_sql(sql)
self.assert_equals(
list(self.t_env.execute_sql("SELECT pymean(1)").collect()),
[Row(1)])
def test_check_result_type(self):
def pandas_udaf():
pass
with self.assertRaises(
TypeError,
msg="Invalid returnType: Pandas UDAF doesn't support DataType type MAP currently"):
udaf(pandas_udaf, result_type=DataTypes.MAP(DataTypes.INT(), DataTypes.INT()),
func_type="pandas")
def test_group_aggregate_function(self):
t = self.t_env.from_elements(
[(1, 2, 3), (3, 2, 3), (2, 1, 3), (1, 5, 4), (1, 8, 6), (2, 3, 4)],
DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.TINYINT()),
DataTypes.FIELD("b", DataTypes.SMALLINT()),
DataTypes.FIELD("c", DataTypes.INT())]))
sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(
a TINYINT,
b FLOAT,
c ROW<a INT, b INT>,
d STRING
) WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
# general udf
add = udf(lambda a: a + 1, result_type=DataTypes.INT())
# pandas udf
substract = udf(lambda a: a - 1, result_type=DataTypes.INT(), func_type="pandas")
max_udaf = udaf(lambda a: (a.max(), a.min()),
result_type=DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.INT()),
DataTypes.FIELD("b", DataTypes.INT())]),
func_type="pandas")
@udaf(result_type=DataTypes.STRING(), func_type="pandas")
def multiply_udaf(a, b):
return len(a) * b[0]
t.group_by(t.a) \
.select(t.a, mean_udaf(add(t.b)), max_udaf(substract(t.c)), multiply_udaf(t.b, 'abc')) \
.execute_insert(sink_table) \
.wait()
actual = source_sink_utils.results()
self.assert_equals(
actual,
["+I[1, 6.0, +I[5, 2], abcabcabc]",
"+I[2, 3.0, +I[3, 2], abcabc]",
"+I[3, 3.0, +I[2, 2], abc]"])
def test_group_aggregate_without_keys(self):
t = self.t_env.from_elements(
[(1, 2, 3), (3, 2, 3), (2, 1, 3), (1, 5, 4), (1, 8, 6), (2, 3, 4)],
DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.TINYINT()),
DataTypes.FIELD("b", DataTypes.SMALLINT()),
DataTypes.FIELD("c", DataTypes.INT())]))
sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(a INT) WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
min_add = udaf(lambda a, b, c: a.min() + b.min() + c.min(),
result_type=DataTypes.INT(), func_type="pandas")
t.select(min_add(t.a, t.b, t.c)) \
.execute_insert(sink_table) \
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual, ["+I[5]"])
def test_group_aggregate_with_aux_group(self):
t = self.t_env.from_elements(
[(1, 2, 3), (3, 2, 3), (2, 1, 3), (1, 5, 4), (1, 8, 6), (2, 3, 4)],
DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.TINYINT()),
DataTypes.FIELD("b", DataTypes.SMALLINT()),
DataTypes.FIELD("c", DataTypes.INT())]))
sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(a TINYINT, b INT, c FLOAT, d INT) WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
self.t_env.get_config().get_configuration().set_string('python.metric.enabled', 'true')
self.t_env.get_config().set('python.metric.enabled', 'true')
t.group_by(t.a) \
.select(t.a, (t.a + 1).alias("b"), (t.a + 2).alias("c")) \
.group_by(t.a, t.b) \
.select(t.a, t.b, mean_udaf(t.b), call("max_add", t.b, t.c, 1)) \
.execute_insert(sink_table) \
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual, ["+I[1, 2, 2.0, 6]", "+I[2, 3, 3.0, 8]", "+I[3, 4, 4.0, 10]"])
def test_tumble_group_window_aggregate_function(self):
from pyflink.table.window import Tumble
# create source file path
data = [
'1,2,3,2018-03-11 03:10:00',
'3,2,4,2018-03-11 03:10:00',
'2,1,2,2018-03-11 03:10:00',
'1,3,1,2018-03-11 03:40:00',
'1,8,5,2018-03-11 04:20:00',
'2,3,6,2018-03-11 03:30:00'
]
source_path = self.tempdir + '/test_tumble_group_window_aggregate_function.csv'
with open(source_path, 'w') as fd:
for ele in data:
fd.write(ele + '\n')
self.t_env.get_config().set(
"pipeline.time-characteristic", "EventTime")
source_table = generate_random_table_name()
source_table_ddl = f"""
create table {source_table}(
a TINYINT,
b SMALLINT,
c INT,
rowtime TIMESTAMP(3),
WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE
) with(
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '{source_path}',
'format.ignore-first-line' = 'false',
'format.field-delimiter' = ','
)
"""
self.t_env.execute_sql(source_table_ddl)
t = self.t_env.from_path(source_table)
sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(
a TIMESTAMP(3),
b TIMESTAMP(3),
c FLOAT
) WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
tumble_window = Tumble.over(lit(1).hours) \
.on(col("rowtime")) \
.alias("w")
t.window(tumble_window) \
.group_by(col("w")) \
.select(col("w").start, col("w").end, mean_udaf(t.b)) \
.execute_insert(sink_table) \
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual,
["+I[2018-03-11T03:00, 2018-03-11T04:00, 2.2]",
"+I[2018-03-11T04:00, 2018-03-11T05:00, 8.0]"])
def test_slide_group_window_aggregate_function(self):
from pyflink.table.window import Slide
# create source file path
data = [
'1,2,3,2018-03-11 03:10:00',
'3,2,4,2018-03-11 03:10:00',
'2,1,2,2018-03-11 03:10:00',
'1,3,1,2018-03-11 03:40:00',
'1,8,5,2018-03-11 04:20:00',
'2,3,6,2018-03-11 03:30:00'
]
source_path = self.tempdir + '/test_slide_group_window_aggregate_function.csv'
with open(source_path, 'w') as fd:
for ele in data:
fd.write(ele + '\n')
self.t_env.get_config().set(
"pipeline.time-characteristic", "EventTime")
source_table = generate_random_table_name()
source_table_ddl = f"""
create table {source_table}(
a TINYINT,
b SMALLINT,
c INT,
rowtime TIMESTAMP(3),
WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE
) with(
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '{source_path}',
'format.ignore-first-line' = 'false',
'format.field-delimiter' = ','
)
"""
self.t_env.execute_sql(source_table_ddl)
t = self.t_env.from_path(source_table)
sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(
a TINYINT,
b TIMESTAMP(3),
c TIMESTAMP(3),
d FLOAT,
e INT
) WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
slide_window = Slide.over(lit(1).hours) \
.every(lit(30).minutes) \
.on(col("rowtime")) \
.alias("w")
t.window(slide_window) \
.group_by(t.a, col("w")) \
.select(t.a,
col("w").start,
col("w").end,
mean_udaf(t.b),
call("max_add", t.b, t.c, 1)) \
.execute_insert(sink_table) \
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual,
["+I[1, 2018-03-11T02:30, 2018-03-11T03:30, 2.0, 6]",
"+I[1, 2018-03-11T03:00, 2018-03-11T04:00, 2.5, 7]",
"+I[1, 2018-03-11T03:30, 2018-03-11T04:30, 5.5, 14]",
"+I[1, 2018-03-11T04:00, 2018-03-11T05:00, 8.0, 14]",
"+I[2, 2018-03-11T02:30, 2018-03-11T03:30, 1.0, 4]",
"+I[2, 2018-03-11T03:00, 2018-03-11T04:00, 2.0, 10]",
"+I[2, 2018-03-11T03:30, 2018-03-11T04:30, 3.0, 10]",
"+I[3, 2018-03-11T03:00, 2018-03-11T04:00, 2.0, 7]",
"+I[3, 2018-03-11T02:30, 2018-03-11T03:30, 2.0, 7]"])
def test_over_window_aggregate_function(self):
import datetime
t = self.t_env.from_elements(
[
(1, 2, 3, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
(3, 2, 1, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
(2, 1, 2, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
(1, 3, 1, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)),
(1, 8, 5, datetime.datetime(2018, 3, 11, 4, 20, 0, 0)),
(2, 3, 6, datetime.datetime(2018, 3, 11, 3, 30, 0, 0))
],
DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.TINYINT()),
DataTypes.FIELD("b", DataTypes.SMALLINT()),
DataTypes.FIELD("c", DataTypes.INT()),
DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))]))
sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(
a TINYINT,
b FLOAT,
c INT,
d FLOAT,
e FLOAT,
f FLOAT,
g FLOAT,
h FLOAT,
i FLOAT,
j FLOAT
) WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
self.t_env.create_temporary_view("T_test_over_window_aggregate_function", t)
self.t_env.execute_sql(f"""
insert into {sink_table}
select a,
mean_udaf(b)
over (PARTITION BY a ORDER BY rowtime
ROWS BETWEEN UNBOUNDED preceding AND UNBOUNDED FOLLOWING),
max_add(b, c)
over (PARTITION BY a ORDER BY rowtime
ROWS BETWEEN UNBOUNDED preceding AND 0 FOLLOWING),
mean_udaf(b)
over (PARTITION BY a ORDER BY rowtime
ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING),
mean_udaf(c)
over (PARTITION BY a ORDER BY rowtime
ROWS BETWEEN 1 PRECEDING AND 0 FOLLOWING),
mean_udaf(c)
over (PARTITION BY a ORDER BY rowtime
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING),
mean_udaf(b)
over (PARTITION BY a ORDER BY rowtime
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
mean_udaf(b)
over (PARTITION BY a ORDER BY rowtime
RANGE BETWEEN INTERVAL '20' MINUTE PRECEDING AND UNBOUNDED FOLLOWING),
mean_udaf(c)
over (PARTITION BY a ORDER BY rowtime
RANGE BETWEEN INTERVAL '20' MINUTE PRECEDING AND UNBOUNDED FOLLOWING),
mean_udaf(c)
over (PARTITION BY a ORDER BY rowtime
RANGE BETWEEN INTERVAL '20' MINUTE PRECEDING AND CURRENT ROW)
from T_test_over_window_aggregate_function
""").wait()
actual = source_sink_utils.results()
self.assert_equals(actual,
["+I[1, 4.3333335, 5, 4.3333335, 3.0, 3.0, 2.5, 4.3333335, 3.0, 2.0]",
"+I[1, 4.3333335, 13, 5.5, 3.0, 3.0, 4.3333335, 8.0, 5.0, 5.0]",
"+I[1, 4.3333335, 6, 4.3333335, 2.0, 3.0, 2.5, 4.3333335, 3.0, 2.0]",
"+I[2, 2.0, 9, 2.0, 4.0, 4.0, 2.0, 2.0, 4.0, 4.0]",
"+I[2, 2.0, 3, 2.0, 2.0, 4.0, 1.0, 2.0, 4.0, 2.0]",
"+I[3, 2.0, 3, 2.0, 1.0, 1.0, 2.0, 2.0, 1.0, 1.0]"])
class StreamPandasUDAFITTests(PyFlinkStreamTableTestCase):
@classmethod
def setUpClass(cls):
super(StreamPandasUDAFITTests, cls).setUpClass()
cls.t_env.create_temporary_system_function("mean_udaf", mean_udaf)
max_add_min_udaf = udaf(lambda a: a.max() + a.min(),
result_type='SMALLINT',
func_type='pandas')
cls.t_env.create_temporary_system_function("max_add_min_udaf", max_add_min_udaf)
def test_sliding_group_window_over_time(self):
# create source file path
import tempfile
import os
tmp_dir = tempfile.gettempdir()
data = [
'1,1,2,2018-03-11 03:10:00',
'3,3,2,2018-03-11 03:10:00',
'2,2,1,2018-03-11 03:10:00',
'1,1,3,2018-03-11 03:40:00',
'1,1,8,2018-03-11 04:20:00',
'2,2,3,2018-03-11 03:30:00'
]
source_path = tmp_dir + '/test_sliding_group_window_over_time.csv'
with open(source_path, 'w') as fd:
for ele in data:
fd.write(ele + '\n')
from pyflink.table.window import Slide
self.t_env.get_config().set(
"pipeline.time-characteristic", "EventTime")
source_table = generate_random_table_name()
source_table_ddl = f"""
create table {source_table}(
a TINYINT,
b SMALLINT,
c SMALLINT,
rowtime TIMESTAMP(3),
WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE
) with(
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '{source_path}',
'format.ignore-first-line' = 'false',
'format.field-delimiter' = ','
)
"""
self.t_env.execute_sql(source_table_ddl)
t = self.t_env.from_path(source_table)
sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(a TINYINT, b TIMESTAMP(3), c TIMESTAMP(3), d FLOAT)
WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
t.window(Slide.over(lit(1).hours)
.every(lit(30).minutes)
.on(col("rowtime"))
.alias("w")) \
.group_by(t.a, t.b, col("w")) \
.select(t.a, col("w").start, col("w").end, mean_udaf(t.c).alias("b")) \
.execute_insert(sink_table) \
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual,
["+I[1, 2018-03-11T02:30, 2018-03-11T03:30, 2.0]",
"+I[1, 2018-03-11T03:00, 2018-03-11T04:00, 2.5]",
"+I[1, 2018-03-11T03:30, 2018-03-11T04:30, 5.5]",
"+I[1, 2018-03-11T04:00, 2018-03-11T05:00, 8.0]",
"+I[2, 2018-03-11T02:30, 2018-03-11T03:30, 1.0]",
"+I[2, 2018-03-11T03:00, 2018-03-11T04:00, 2.0]",
"+I[2, 2018-03-11T03:30, 2018-03-11T04:30, 3.0]",
"+I[3, 2018-03-11T03:00, 2018-03-11T04:00, 2.0]",
"+I[3, 2018-03-11T02:30, 2018-03-11T03:30, 2.0]"])
os.remove(source_path)
def test_sliding_group_window_over_count(self):
self.t_env.get_config().set("parallelism.default", "1")
# create source file path
import tempfile
import os
tmp_dir = tempfile.gettempdir()
data = [
'1,1,2,2018-03-11 03:10:00',
'3,3,2,2018-03-11 03:10:00',
'2,2,1,2018-03-11 03:10:00',
'1,1,3,2018-03-11 03:40:00',
'1,1,8,2018-03-11 04:20:00',
'2,2,3,2018-03-11 03:30:00',
'3,3,3,2018-03-11 03:30:00'
]
source_path = tmp_dir + '/test_sliding_group_window_over_count.csv'
with open(source_path, 'w') as fd:
for ele in data:
fd.write(ele + '\n')
from pyflink.table.window import Slide
self.t_env.get_config().set(
"pipeline.time-characteristic", "ProcessingTime")
source_table = generate_random_table_name()
source_table_ddl = f"""
create table {source_table}(
a TINYINT,
b SMALLINT,
c SMALLINT,
protime as PROCTIME()
) with(
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '%s',
'format.ignore-first-line' = 'false',
'format.field-delimiter' = ','
)
""" % source_path
self.t_env.execute_sql(source_table_ddl)
t = self.t_env.from_path(source_table)
sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(a TINYINT, d FLOAT) WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
t.window(Slide.over(row_interval(2))
.every(row_interval(1))
.on(t.protime)
.alias("w")) \
.group_by(t.a, t.b, col("w")) \
.select(t.a, mean_udaf(t.c).alias("b")) \
.execute_insert(sink_table) \
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual, ["+I[1, 2.5]", "+I[1, 5.5]", "+I[2, 2.0]", "+I[3, 2.5]"])
os.remove(source_path)
def test_tumbling_group_window_over_time(self):
# create source file path
import tempfile
import os
tmp_dir = tempfile.gettempdir()
data = [
'1,1,2,2018-03-11 03:10:00',
'3,3,2,2018-03-11 03:10:00',
'2,2,1,2018-03-11 03:10:00',
'1,1,3,2018-03-11 03:40:00',
'1,1,8,2018-03-11 04:20:00',
'2,2,3,2018-03-11 03:30:00'
]
source_path = tmp_dir + '/test_tumbling_group_window_over_time.csv'
with open(source_path, 'w') as fd:
for ele in data:
fd.write(ele + '\n')
from pyflink.table.window import Tumble
self.t_env.get_config().set(
"pipeline.time-characteristic", "EventTime")
source_table = generate_random_table_name()
source_table_ddl = f"""
create table {source_table}(
a TINYINT,
b SMALLINT,
c SMALLINT,
rowtime TIMESTAMP(3),
WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE
) with(
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '%s',
'format.ignore-first-line' = 'false',
'format.field-delimiter' = ','
)
""" % source_path
self.t_env.execute_sql(source_table_ddl)
t = self.t_env.from_path(source_table)
sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(
a TINYINT, b TIMESTAMP(3), c TIMESTAMP(3), d TIMESTAMP(3), e FLOAT)
WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
t.window(Tumble.over(lit(1).hours).on(t.rowtime).alias("w")) \
.group_by(t.a, t.b, col("w")) \
.select(t.a,
col("w").start,
col("w").end,
col("w").rowtime,
mean_udaf(t.c).alias("b")) \
.execute_insert(sink_table) \
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual, [
"+I[1, 2018-03-11T03:00, 2018-03-11T04:00, 2018-03-11T03:59:59.999, 2.5]",
"+I[1, 2018-03-11T04:00, 2018-03-11T05:00, 2018-03-11T04:59:59.999, 8.0]",
"+I[2, 2018-03-11T03:00, 2018-03-11T04:00, 2018-03-11T03:59:59.999, 2.0]",
"+I[3, 2018-03-11T03:00, 2018-03-11T04:00, 2018-03-11T03:59:59.999, 2.0]",
])
os.remove(source_path)
def test_tumbling_group_window_over_count(self):
self.t_env.get_config().set("parallelism.default", "1")
# create source file path
import tempfile
import os
tmp_dir = tempfile.gettempdir()
data = [
'1,1,2,2018-03-11 03:10:00',
'3,3,2,2018-03-11 03:10:00',
'2,2,1,2018-03-11 03:10:00',
'1,1,3,2018-03-11 03:40:00',
'1,1,8,2018-03-11 04:20:00',
'2,2,3,2018-03-11 03:30:00',
'3,3,3,2018-03-11 03:30:00',
'1,1,4,2018-03-11 04:20:00',
]
source_path = tmp_dir + '/test_group_window_aggregate_function_over_count.csv'
with open(source_path, 'w') as fd:
for ele in data:
fd.write(ele + '\n')
from pyflink.table.window import Tumble
self.t_env.get_config().set(
"pipeline.time-characteristic", "ProcessingTime")
source_table = generate_random_table_name()
source_table_ddl = f"""
create table {source_table}(
a TINYINT,
b SMALLINT,
c SMALLINT,
protime as PROCTIME()
) with(
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '%s',
'format.ignore-first-line' = 'false',
'format.field-delimiter' = ','
)
""" % source_path
self.t_env.execute_sql(source_table_ddl)
t = self.t_env.from_path(source_table)
sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(a TINYINT, d FLOAT) WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
t.window(Tumble.over(row_interval(2)).on(t.protime).alias("w")) \
.group_by(t.a, t.b, col("w")) \
.select(t.a, mean_udaf(t.c).alias("b")) \
.execute_insert(sink_table) \
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual, ["+I[1, 2.5]", "+I[1, 6.0]", "+I[2, 2.0]", "+I[3, 2.5]"])
os.remove(source_path)
def test_row_time_over_range_window_aggregate_function(self):
# create source file path
import tempfile
import os
tmp_dir = tempfile.gettempdir()
data = [
'1,1,2013-01-01 03:10:00',
'3,2,2013-01-01 03:10:00',
'2,1,2013-01-01 03:10:00',
'1,5,2013-01-01 03:10:00',
'1,8,2013-01-01 04:20:00',
'2,3,2013-01-01 03:30:00'
]
source_path = tmp_dir + '/test_over_range_window_aggregate_function.csv'
with open(source_path, 'w') as fd:
for ele in data:
fd.write(ele + '\n')
self.t_env.get_config().set(
"pipeline.time-characteristic", "EventTime")
source_table = generate_random_table_name()
source_table_ddl = f"""
create table {source_table}(
a TINYINT,
b SMALLINT,
rowtime TIMESTAMP(3),
WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE
) with(
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '{source_path}',
'format.ignore-first-line' = 'false',
'format.field-delimiter' = ','
)
"""
self.t_env.execute_sql(source_table_ddl)
sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(a TINYINT, b FLOAT, c SMALLINT) WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
self.t_env.execute_sql(f"""
insert into {sink_table}
select a,
mean_udaf(b)
over (PARTITION BY a ORDER BY rowtime
RANGE BETWEEN INTERVAL '20' MINUTE PRECEDING AND CURRENT ROW),
max_add_min_udaf(b)
over (PARTITION BY a ORDER BY rowtime
RANGE BETWEEN INTERVAL '20' MINUTE PRECEDING AND CURRENT ROW)
from {source_table}
""").wait()
actual = source_sink_utils.results()
self.assert_equals(actual,
["+I[1, 3.0, 6]",
"+I[1, 3.0, 6]",
"+I[1, 8.0, 16]",
"+I[2, 1.0, 2]",
"+I[2, 2.0, 4]",
"+I[3, 2.0, 4]"])
os.remove(source_path)
def test_row_time_over_rows_window_aggregate_function(self):
# create source file path
import tempfile
import os
tmp_dir = tempfile.gettempdir()
data = [
'1,1,2013-01-01 03:10:00',
'3,2,2013-01-01 03:10:00',
'2,1,2013-01-01 03:10:00',
'1,5,2013-01-01 03:10:00',
'1,8,2013-01-01 04:20:00',
'2,3,2013-01-01 03:30:00'
]
source_path = tmp_dir + '/test_over_rows_window_aggregate_function.csv'
with open(source_path, 'w') as fd:
for ele in data:
fd.write(ele + '\n')
self.t_env.get_config().set(
"pipeline.time-characteristic", "EventTime")
source_table = generate_random_table_name()
source_table_ddl = f"""
create table {source_table}(
a TINYINT,
b SMALLINT,
rowtime TIMESTAMP(3),
WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE
) with(
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '{source_path}',
'format.ignore-first-line' = 'false',
'format.field-delimiter' = ','
)
"""
self.t_env.execute_sql(source_table_ddl)
sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(a TINYINT, b FLOAT, c SMALLINT) WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
self.t_env.execute_sql(f"""
insert into {sink_table}
select a,
mean_udaf(b)
over (PARTITION BY a ORDER BY rowtime
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW),
max_add_min_udaf(b)
over (PARTITION BY a ORDER BY rowtime
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
from {source_table}
""").wait()
actual = source_sink_utils.results()
self.assert_equals(actual,
["+I[1, 1.0, 2]",
"+I[1, 3.0, 6]",
"+I[1, 6.5, 13]",
"+I[2, 1.0, 2]",
"+I[2, 2.0, 4]",
"+I[3, 2.0, 4]"])
os.remove(source_path)
def test_proc_time_over_rows_window_aggregate_function(self):
# create source file path
data = [
'1,1,2013-01-01 03:10:00',
'3,2,2013-01-01 03:10:00',
'2,1,2013-01-01 03:10:00',
'1,5,2013-01-01 03:10:00',
'1,8,2013-01-01 04:20:00',
'2,3,2013-01-01 03:30:00'
]
source_path = self.tempdir + '/test_proc_time_over_rows_window_aggregate_function.csv'
with open(source_path, 'w') as fd:
for ele in data:
fd.write(ele + '\n')
self.t_env.get_config().set("parallelism.default", "1")
self.t_env.get_config().set(
"pipeline.time-characteristic", "ProcessingTime")
source_table = generate_random_table_name()
source_table_ddl = f"""
create table {source_table}(
a TINYINT,
b SMALLINT,
proctime as PROCTIME()
) with(
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = '{source_path}',
'format.ignore-first-line' = 'false',
'format.field-delimiter' = ','
)
"""
self.t_env.execute_sql(source_table_ddl)
sink_table = generate_random_table_name()
sink_table_ddl = f"""
CREATE TABLE {sink_table}(a TINYINT, b FLOAT, c SMALLINT) WITH ('connector'='test-sink')
"""
self.t_env.execute_sql(sink_table_ddl)
self.t_env.execute_sql(f"""
insert into {sink_table}
select a,
mean_udaf(b)
over (PARTITION BY a ORDER BY proctime
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW),
max_add_min_udaf(b)
over (PARTITION BY a ORDER BY proctime
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
from {source_table}
""").wait()
actual = source_sink_utils.results()
self.assert_equals(actual,
["+I[1, 1.0, 2]",
"+I[1, 3.0, 6]",
"+I[1, 6.5, 13]",
"+I[2, 1.0, 2]",
"+I[2, 2.0, 4]",
"+I[3, 2.0, 4]"])
def test_execute_over_aggregate_from_json_plan(self):
# create source file path
tmp_dir = self.tempdir
data = [
'1,1,2013-01-01 03:10:00',
'3,2,2013-01-01 03:10:00',
'2,1,2013-01-01 03:10:00',
'1,5,2013-01-01 03:10:00',
'1,8,2013-01-01 04:20:00',
'2,3,2013-01-01 03:30:00'
]
source_path = tmp_dir + '/test_execute_over_aggregate_from_json_plan.csv'
sink_path = tmp_dir + '/test_execute_over_aggregate_from_json_plan'
with open(source_path, 'w') as fd:
for ele in data:
fd.write(ele + '\n')
source_table = generate_random_table_name()
source_table_ddl = f"""
CREATE TABLE {source_table} (
a TINYINT,
b SMALLINT,
rowtime TIMESTAMP(3),
WATERMARK FOR rowtime AS rowtime - INTERVAL '60' MINUTE
) WITH (
'connector' = 'filesystem',
'path' = '{source_path}',
'format' = 'csv'
)
"""
self.t_env.execute_sql(source_table_ddl)
sink_table = generate_random_table_name()
self.t_env.execute_sql(f"""
CREATE TABLE {sink_table} (
a TINYINT,
b FLOAT,
c SMALLINT
) WITH (
'connector' = 'filesystem',
'path' = '{sink_path}',
'format' = 'csv'
)
""")
self.t_env.get_config().set(
"pipeline.time-characteristic", "EventTime")
json_plan = self.t_env._j_tenv.compilePlanSql(f"""
insert into {sink_table}
select a,
mean_udaf(b)
over (PARTITION BY a ORDER BY rowtime
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW),
max_add_min_udaf(b)
over (PARTITION BY a ORDER BY rowtime
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
from {source_table}
""")
from py4j.java_gateway import get_method
get_method(json_plan.execute(), "await")()
import glob
lines = [line.strip() for file in glob.glob(sink_path + '/*') for line in open(file, 'r')]
lines.sort()
self.assertEqual(lines, ['1,1.0,2', '1,3.0,6', '1,6.5,13', '2,1.0,2', '2,2.0,4', '3,2.0,4'])
@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def mean_udaf(v):
return v.mean()
@udaf(input_types=['FLOAT'], result_type='FLOAT', func_type="pandas")
def mean_str_udaf(v):
return v.mean()
class MaxAdd(AggregateFunction):
def __init__(self):
self.counter = None
self.counter_sum = 0
def open(self, function_context):
mg = function_context.get_metric_group()
self.counter = mg.add_group("key", "value").counter("my_counter")
self.counter_sum = 0
def get_value(self, accumulator):
# counter
self.counter.inc(10)
self.counter_sum += 10
return accumulator[0]
def create_accumulator(self):
return []
def accumulate(self, accumulator, *args):
result = 0
for arg in args:
result += arg.max()
accumulator.append(result)
if __name__ == '__main__':
import unittest
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)