blob: 956914e29d507b44a691eb50be83e4a50e380c28 [file] [log] [blame]
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
file:test_stream_insert.py
测试insert [streaming] select 复杂query
"""
import os
import sys
import pytest
file_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
sys.path.append(file_dir)
file_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.append(file_dir)
from data import schema as DATA
import palo_config
import palo_client
import util
table_test = "test_query_qa.test"
table_base = "test_query_qa.baseall"
table_big = "test_query_qa.bigtable"
config = palo_config.config
def setup_module():
"""
init config
"""
global client
client = palo_client.PaloClient(config.fe_host, config.fe_query_port, user=config.fe_user,
password=config.fe_password)
client.init()
def check(table_name, select_line):
"""
check if result of mysql and palo is same
1. select
2. view
3. check
"""
insert_sql = 'insert into %s %s' % (table_name, select_line)
ret = client.execute(insert_sql)
assert ret == ()
check_sql = 'select * from %s'
insert_result = client.execute(check_sql % table_name)
orgin_restule = client.execute(select_line)
# util.check(insert_result, orgin_restule, force_order=True)
delete_sql = 'delete from %s partition %s where k1 is not null'
def test_insert_join():
"""
{
"title": "test_stream_insert.test_insert_join",
"describe": "complicate query about join",
"tag": "system,p1"
}
"""
"""complicate query about join"""
database_name, table_name, rollup_table_name = util.gen_name_list()
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
keys_desc=DATA.baseall_duplicate_key,
distribution_info=DATA.baseall_distribution_info,
set_null=True)
assert ret
line = 'select a.k1 ak1, a.k2 ak2, a.k3 ak3, a.k4 ak4, a.k5 ak5, a.k6 ak6, \
b.k10 bk10, b.k11 bk11, b.k7 bk7, b.k8 bk8, b.k9 bk9 \
from {test} a join {big} b on a.k1 = b.k1 join {base} c on c.k2 = b.k2 \
where a.k2 > 0 and b.k1 > 0 order by a.k1, a.k2'.format(test=table_test,
big=table_big, base=table_base)
check(table_name, line)
line = 'select a.k1 ak1, a.k2 ak2, a.k3 ak3, a.k4 ak4, a.k5 ak5, a.k6 ak6, \
b.k10 bk10, b.k11 bk11, b.k7 bk7, b.k8 bk8, b.k9 bk9 \
from {test} a join {big} b on a.k1 = b.k1 join {base} c on c.k2 = b.k2 \
having a.k2 > 0 and b.k1 > 0 order by a.k1, a.k2, a.k3, \
a.k4'.format(test=table_test, big=table_big, base=table_base)
check(table_name, line)
line = 'select a.k1 ak1, a.k2 ak2, a.k3 ak3, a.k4 ak4, a.k5 ak5, a.k6 ak6, \
b.k10 bk10, b.k11 bk11, c.k7 ck7, c.k8 ck8, c.k9 ck9 \
from {test} a left outer join {big} b on a.k1 = b.k1 left outer join {base} c \
on a.k2 = c.k2 where a.k2 > 0 and a.k1 > 0 order by \
a.k1, a.k2, a.k3, a.k4'.format(test=table_test, big=table_big, base=table_base)
check(table_name, line)
line = 'select a.k1 ak1, a.k2 ak2, a.k3 ak3, a.k4 ak4, a.k5 ak5, a.k6 ak6, \
c.k10 ck10, c.k11 ck11, c.k7 ck7, c.k8 ck8, c.k9 ck9 \
from {test} a left outer join {big} b on a.k1 = b.k1 left outer join \
{base} c on a.k2 = c.k2 having a.k2 > 0 and b.k1 > 0 and c.k3 != 0 order by \
a.k1, a.k2, a.k3, a.k4'.format(test=table_test, big=table_big, base=table_base)
check(table_name, line)
line = 'select a.k1 ak1, a.k2 ak2, a.k3 ak3, a.k4 ak4, a.k5 ak5, a.k6 ak6, \
c.k10 ck10, c.k11 ck11, c.k7 ck7, c.k8 ck8, c.k9 ck9 \
from {test} a join (select * from {test} where k2 > 0 and k1 = 1) c \
on a.k1 = c.k1 and a.k2 = c.k2 where c.k2 > 10 order by a.k1, a.k2, a.k3, \
a.k4'.format(test=table_test, big=table_big, base=table_base)
check(table_name, line)
line = 'select a.k1 ak1, a.k2 ak2, a.k3 ak3, a.k4 ak4, a.k5 ak5, a.k6 ak6, \
c.k10 ck10, c.k11 ck11, c.k7 ck7, c.k8 ck8, c.k9 ck9 \
from {test} a right outer join (select * from {test} where k2 > 0 and k1 = 1) c \
on a.k1 = c.k1 and a.k2 = c.k2 having c.k2 > 10 \
order by a.k1, a.k2, a.k3, a.k4'.format(test=table_test, big=table_big,
base=table_base)
check(table_name, line)
line = 'select c.k1 ak1, c.k2 ak2, c.k3 ak3, c.k4 ak4, a.k5 ak5, a.k6 ak6, a.k10 ak10, \
a.k11 ak11, a.k7 ak7, a.k8 ak8, a.k9 ak9 from {test} a \
join (select 1 k1, 2 k2, 3 k3, 4 k4 union select k1, k2, k3, k4 from {base}) \
c on a.k1 = c.k1 where a.k2 = c.k2 or c.k2 = 2 \
order by a.k1, a.k2, a.k3, a.k4, c.k1'.format(test=table_test,
big=table_big, base=table_base)
check(table_name, line)
line = 'select c.k1 ak1, c.k2 ak2, c.k3 ak3, c.k4 ak4, a.k5 ak5, a.k6 ak6, a.k10 ak10, a.k11 ak11, \
a.k7 ak7, a.k8 ak8, a.k9 ak9 \
from {test} a left join (select 1 k1, 2 k2, 3 k3, 4 k4 union select k1, k2, k3, k4 from \
{base}) c on a.k1 = c.k1 having a.k2 = c.k2 or c.k2 = 2 order by \
a.k1, a.k2, a.k3, a.k4, c.k1'.format(test=table_test, big=table_big, base=table_base)
check(table_name, line)
# schame not fix
line = 'select a.k2 ak2, c.k2 ck2, count(*) from {test} a left join (select 1 k1, 2 k2, 3 k3, 4 k4 \
union select k1, k2, k3, k4 from {base}) c on a.k1 = c.k1 group by a.k2, c.k2 \
having a.k2 = c.k2 or c.k2 = 2 order by ak2, ck2'
# check(line)
line = 'select sum(a.k2) s1, sum(b.k2) s2 from test a left outer join bigtable b on a.k1 = b.k1\
left outer join baseall c on a.k2 = c.k2 where a.k2 > 0 and a.k1 > 0 group by a.k1 \
order by s1, s2'
# check(line)
client.clean(database_name)
def test_insert_subquery():
"""
{
"title": "complicate query about subquery",
"describe": "describe",
"tag": "system,p1"
}
"""
"""complicate query about subquery"""
database_name, table_name, rollup_table_name = util.gen_name_list()
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
keys_desc=DATA.baseall_duplicate_key,
distribution_info=DATA.baseall_distribution_info,
set_null=True)
assert ret
line = 'select * from (select * from {test} where k1 % 3 = 0 and k2 % 5 = 0 and k3 % 10 = 0) c \
where k1 in (select k1 from {base}) order by k1, k2, k3, k4'.format(test=table_test,
base=table_base)
check(table_name, line)
line = 'select * from (select * from {test} where k1 % 3 = 0 and k2 % 5 = 0 and k3 % 10 = 0 \
union select * from {test} where k1 % 3 = 1 and k2 % 5 = 1 and k3 % 10 = 1) c \
where k1 > 0 order by k1, k2, k3, k4'.format(test=table_test)
check(table_name, line)
line = 'select * from {base} where k1 in (select r1 from (select k6, k3, \
sum(k1) over(partition by k6 order by k3) s1, \
count(k2) over(partition by k6 order by k4) c1, \
rank() over(partition by k6 order by k3) r1 from {base} where k3 > 0) a) \
order by k1'.format(test=table_test, base=table_base)
check(table_name, line)
line = 'select * from (select * from {test} where k1 % 3 = 0 and k2 % 5 = 0 and k3 % 10 = 0) c \
where k1 in (select k1 from {base}) having k1 % 3 = 0 \
order by k1, k2, k3, k4'.format(test=table_test, base=table_base)
check(table_name, line)
line = 'select * from (select min(k1), k2, max(k3), max(k4), sum(k5), max(k6), max(k10), \
min(k11), min(k7), sum(k8), sum(k9) from {base} group by k2) a \
order by k2'.format(base=table_base)
check(table_name, line)
line = 'select * from {test} where k3 in (select max(k3) from {base} group by k2) \
order by k1'.format(test=table_test, base=table_base)
check(table_name, line)
line = 'select * from {test} where k3 in (select max(k3) from {base} group by k2) \
having k1 % 3 = 0 order by k1'.format(test=table_test, base=table_base)
check(table_name, line)
client.clean(database_name)
def test_insert_union():
"""
{
"title": "complicate query about union",
"describe": "complicate query about union",
"tag": "system,p1"
}
"""
"""complicate query about union"""
database_name, table_name, rollup_table_name = util.gen_name_list()
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
keys_desc=DATA.baseall_duplicate_key,
distribution_info=DATA.baseall_distribution_info,
set_null=True)
line = 'select * from {base} union select 0, null, null, null, null, null, null, null, null, \
null, null'.format(base=table_base)
check(table_name, line)
line = 'select * from {base} union all select * from {base}'.format(base=table_base)
check(table_name, line)
line = 'select * from {base} union select 1, null, null, null, null, null, null, null, null, \
null, null order by k1, k2, k3, k4, k5, k6, k7, k8, k9 \
limit 10'.format(base=table_base)
check(table_name, line)
client.clean(database_name)
def test_insert_win():
"""
{
"title": "complicate query about win function",
"describe": "complicate query about win function",
"tag": "system,p1"
}
"""
"""complicate query about win function"""
database_name, table_name, rollup_table_name = util.gen_name_list()
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
sql = 'CREATE TABLE %s ( k1 char(5) NULL, k2 int NULL, k3 tinyint NULL, k4 int NULL) \
DUPLICATE KEY(k1,k2,k3,k4) DISTRIBUTED BY HASH(k1) BUCKETS 5' % table_name
ret = client.execute(sql)
assert ret == ()
line = 'select a.k6, a.k3, b.k1, sum(b.k1) over (partition by a.k6 order by a.k3) w_sum \
from {base} a join {big} b on a.k1 = b.k1 + 5 \
order by a.k6, a.k3, a.k1, w_sum'.format(big=table_big, base=table_base)
check(table_name, line)
line = 'select a.k6, a.k3, b.k1, sum(b.k1) over (partition by a.k6 order by a.k3) w_sum \
from {base} a left join {big} b on a.k1 = b.k1 + 5 \
order by 1, 2, 3, 4'.format(big=table_big, base=table_base)
check(table_name, line)
line = 'select k6, k3, k1, sum(k1) over (partition by a.k6 order by k3) w_sum\
from (select "oh" as k6, k3, k1 from {base} union select k6, k3, k1 from {base}) a \
order by k6, k3, k1, w_sum'.format(base=table_base)
check(table_name, line)
line = 'select k6, k3, k10, first_value(k10) over (partition by k6 order by k3) w_first \
from (select * from {base}) a order by 1, 2, 3, 4'.format(base=table_base)
check(table_name, line)
line = 'select k6, k3, k10, first_value(k10) over (partition by k6 order by k3) w_first \
from {base} having k3 > 0 and k3 < 10000 order by 1, 2, 3, 4'.format(base=table_base)
check(table_name, line)
line = 'select k6, k3, k10, first_value(k10) over (partition by k6 order by k3) w_frist \
from {base} where k3 > 0 and k3 < 10000 order by 1, 2, 3, 4'.format(base=table_base)
check(table_name, line)
line = 'select k6, k3, k10, first_value(k10) over (partition by k6 order by k3) w_first \
from {base} having k6 = "false" order by 1, 2, 3, 4'.format(base=table_base)
check(table_name, line)
line = 'select * from (select a.k6, a.k3, b.k1, sum(b.k1) over \
(partition by a.k6 order by a.k3) w_sum from {base} a join {big} b \
on a.k1 = b.k1 + 5) w order by k6,k3,k1,w_sum'.format(big=table_big, base=table_base)
check(table_name, line)
line = 'select * from (select a.k6, a.k3, b.k1, sum(b.k1) over (partition by a.k6 \
order by a.k3) w_sum from {base} a left join {big} b on a.k1 = b.k1 + 5) w \
order by k6, k3, k1, w_sum'.format(big=table_big, base=table_base)
check(table_name, line)
line = 'select * from (select k6, k3, k1, sum(k1) over (partition by a.k6 order by k3) w_sum \
from (select "oh" as k6, k3, k1 from {base} union select k6, k3, k1 from {base}) a) w \
order by k6, k3, k1, w_sum'.format(base=table_base)
check(table_name, line)
line = 'select * from (select k6, k3, k10, first_value(k10) over (partition by k6 order by k3) \
w_first from (select * from {base}) a) w \
order by k6, k3, k10, w_first'.format(base=table_base)
check(table_name, line)
line = 'select * from (select k6, k3, k10, last_value(k10) over (partition by k6 order by k3) \
w_last from {base} having k3 > 0) w \
order by k6, k3, k10, w_last'.format(base=table_base)
check(table_name, line)
client.clean(database_name)
def test_insert_group():
"""
{
"title": "test_stream_insert.test_insert_group",
"describe": "complicate query about group",
"tag": "system,p1"
}
"""
"""complicate query about group"""
database_name, table_name, rollup_table_name = util.gen_name_list()
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
sql = 'CREATE TABLE %s ( k1 int NULL, k2 decimal(20, 7) NULL) \
DUPLICATE KEY(k1,k2) DISTRIBUTED BY HASH(k1) BUCKETS 5' % table_name
client.execute(sql)
line = 'select a.k2 ak2, b.k1 bk1 from (select k2, avg(k1) k1 from {base} group by k2) a join \
(select k2, count(k1) k1 from {test} group by k2) b on a.k2 = b.k2 \
order by ak2'.format(test=table_test, base=table_base)
check(table_name, line)
line = 'select a.k2 ak2, b.k1 bk1 from (select k2, avg(k1) k1 from {base} group by k2) a \
left join (select k2, count(k1) k1 from {test} group by k2) b on a.k2 = b.k2 \
order by ak2'.format(test=table_test, base=table_base)
check(table_name, line)
line = 'select * from ((select k2, avg(k1) k1 from {base} group by k2) union \
(select k2, count(k1) k1 from {test} group by k2) )b \
order by k2'.format(test=table_test, base=table_base)
check(table_name, line)
line = 'select k2, count(k1) from ((select k2, avg(k1) k1 from {base} group by k2) \
union all (select k2, count(k1) k1 from {test} group by k2) )b group by k2 \
having k2 > 0 order by k2'.format(test=table_test, base=table_base)
check(table_name, line)
line = 'select * from (select k1, count(k2) k2 from {test} group by k1) a \
order by k1'.format(test=table_test)
check(table_name, line)
line = 'select a.k1 ak1, max(a.k2) maxk2 from {test} a join {base} b on \
a.k1 = b.k1 group by a.k1, b.k1 order by 1, 2'.format(base=table_base,
test=table_test)
check(table_name, line)
line = 'select k1, count(k2) k2 from ((select k1, k2 from {test}) union all \
(select k1, k2 from {base})) a group by k1 order by k1'.format(base=table_base,
test=table_test)
check(table_name, line)
line = 'select k1, count(k2) k2 from {test} where k2 in (select k2 from {base} group by k2) \
group by k1 order by k1'.format(base=table_base, test=table_test)
check(table_name, line)
line = 'select wj, max(k1) m from (select k1, count(k2) wj from {test} group by k1) a \
where wj > 0 group by wj order by 1, 2'.format(test=table_test)
check(table_name, line)
client.clean(database_name)
def teardown_module():
"""tearDown"""
print("End")
if __name__ == '__main__':
setup_module()
test_insert_join()