blob: 5ff8a1fb50f34b52e209bab90c8377fe2aa10f57 [file] [log] [blame]
-- This file includes all the python script that help other SQL file to
-- finish the correctness test, such as check the CPU usage, the bitmap of
-- CPUSET, and the Cgroup file is exist or not.
--
-- In Cgroup v1 (Alpha), we will check the directory of
-- /sys/fs/cgroup/cpu/gpdb
-- /sys/fs/cgroup/cpuacct/gpdb
-- /sys/fs/cgroup/cpuset/gpdb
--
-- In Cgroup v2 (Beta), we will check the directory of
-- /sys/fs/cgroup/gpdb/*
--
-- When we run different tests, we should include different auxiliary tool files
-- to schedule file.
-- start_ignore
CREATE LANGUAGE plpython3u;
-- end_ignore
-- enable resource group and restart cluster.
-- start_ignore
! gpconfig -c gp_resource_manager -v group;
! gpconfig -c max_connections -v 250 -m 25;
! gpconfig -c runaway_detector_activation_percent -v 100;
! gpstop -rai;
-- end_ignore
-- after the restart we need a new connection to run the queries
0: SHOW gp_resource_manager;
-- resource queue statistics should not crash
0: SELECT * FROM pg_resqueue_status;
0: SELECT * FROM gp_toolkit.gp_resqueue_status;
0: SELECT * FROM gp_toolkit.gp_resq_priority_backend;
-- verify the default settings
0: SELECT * from gp_toolkit.gp_resgroup_config;
0: CREATE OR REPLACE FUNCTION check_cgroup_configuration() RETURNS BOOL AS $$
import os
root = '/sys/fs/cgroup/'
def get_cgroup_prop(prop):
fullpath = os.path.join(root, prop)
return int(open(fullpath).readline())
def show_guc(guc):
return plpy.execute('SHOW {}'.format(guc))[0][guc]
# get top-level cgroup props
cfs_quota_us = get_cgroup_prop('cpu/gpdb/cpu.cfs_quota_us')
cfs_period_us = get_cgroup_prop('cpu/gpdb/cpu.cfs_period_us')
shares = get_cgroup_prop('cpu/gpdb/cpu.shares')
# get system props
ncores = os.cpu_count()
# get global gucs
gp_resource_group_cpu_limit = float(show_guc('gp_resource_group_cpu_limit'))
gp_resource_group_cpu_priority = int(show_guc('gp_resource_group_cpu_priority'))
# cfs_quota_us := cfs_period_us * ncores * gp_resource_group_cpu_limit
assert cfs_quota_us == cfs_period_us * ncores * gp_resource_group_cpu_limit
# shares := 1024 * gp_resource_group_cpu_priority
assert shares == 1024 * gp_resource_group_cpu_priority
def check_group_shares(name):
cpu_weight = int(plpy.execute('''
SELECT value
FROM pg_resgroupcapability c, pg_resgroup g
WHERE c.resgroupid=g.oid
AND reslimittype=3
AND g.rsgname='{}'
'''.format(name))[0]['value'])
oid = int(plpy.execute('''
SELECT oid FROM pg_resgroup WHERE rsgname='{}'
'''.format(name))[0]['oid'])
sub_shares = get_cgroup_prop('cpu/gpdb/{}/cpu.shares'.format(oid))
assert sub_shares == int(cpu_weight * 1024 / 100)
# check default groups
check_group_shares('default_group')
check_group_shares('admin_group')
check_group_shares('system_group')
# check user groups
check_group_shares('rg1_cpu_test')
check_group_shares('rg2_cpu_test')
return True
$$ LANGUAGE plpython3u;
-- check whether the queries running on the specific core set
-- @param grp: the resource group name queries running in
-- @param cpuset: cpu cores which the queries should only be run on them, e.g. 0,1
-- @return bool: true/false indicating whether it corresponds to the rule
0: CREATE OR REPLACE FUNCTION check_cpuset(grp TEXT, cpuset TEXT) RETURNS BOOL AS $$
import subprocess
import time
import re
pt = re.compile(r'con\d+')
def check(expect_cpus, sess_ids):
# use ps -eF to find all processes which belongs to postgres and in the given sessions
procs = subprocess.check_output(['ps', '-eF']).decode().split('\n')
head, proc_stats = procs[0], procs[1:]
PSR = [id for id, attr in enumerate(head.split()) if attr.strip() == 'PSR'][0]
cpus = [proc_stat.split()[PSR].strip() for proc_stat in proc_stats if 'postgres' in proc_stat and
pt.findall(proc_stat) and sess_ids.issubset(set(pt.findall(proc_stat)))]
return set(cpus).issubset(set(expect_cpus))
def get_all_sess_ids_in_group(group_name):
sql = "select sess_id from pg_stat_activity where rsgname = '%s'" % group_name
result = plpy.execute(sql)
return set([str(r['sess_id']) for r in result])
conf = cpuset
if conf == '':
fd = open("/sys/fs/cgroup/cpuset/gpdb/cpuset.cpus")
line = fd.readline()
fd.close()
conf = line.strip('\n')
tokens = conf.split(",")
expect_cpu = []
for token in tokens:
if token.find('-') != -1:
interval = token.split("-")
num1 = interval[0]
num2 = interval[1]
for num in range(int(num1), int(num2) + 1):
expect_cpu.append(str(num))
else:
expect_cpu.append(token)
sess_ids = get_all_sess_ids_in_group(grp)
for i in range(1000):
time.sleep(0.01)
if not check(expect_cpu, sess_ids):
return False
return True
$$ LANGUAGE plpython3u;
-- create a resource group that contains all the cpu cores
0: CREATE OR REPLACE FUNCTION create_allcores_group(grp TEXT) RETURNS BOOL AS $$
import subprocess
file = "/sys/fs/cgroup/cpuset/gpdb/cpuset.cpus"
fd = open(file)
line = fd.readline()
fd.close()
line = line.strip('\n')
sql = "create resource group " + grp + " with (" + "cpuset='" + line + "')"
# plpy SPI will always start a transaction, but res group cannot be created in a transaction.
ret = subprocess.run(['psql', 'postgres', '-c' , '{}'.format(sql)], stdout=subprocess.PIPE)
if ret.returncode != 0:
plpy.error('failed to create resource group.\n {} \n {}'.format(ret.stdout, ret.stderr))
file = "/sys/fs/cgroup/cpuset/gpdb/1/cpuset.cpus"
fd = open(file)
line = fd.readline()
fd.close()
line = line.strip('\n')
if line != "0":
return False
return True
$$ LANGUAGE plpython3u;
-- check whether the cpuset value in cgroup is valid according to the rule
0: CREATE OR REPLACE FUNCTION check_cpuset_rules() RETURNS BOOL AS $$
def get_all_group_which_cpuset_is_set():
sql = "select groupid,cpuset from gp_toolkit.gp_resgroup_config where cpuset != '-1'"
result = plpy.execute(sql)
return result
def parse_cpuset(line):
line = line.strip('\n')
if len(line) == 0:
return set([])
tokens = line.split(",")
cpuset = []
for token in tokens:
if token.find('-') != -1:
interval = token.split("-")
num1 = interval[0]
num2 = interval[1]
for num in range(int(num1), int(num2) + 1):
cpuset.append(str(num))
else:
cpuset.append(token)
return set(cpuset)
def get_cgroup_cpuset(group):
group = str(group)
if group == '0':
file = "/sys/fs/cgroup/cpuset/gpdb/cpuset.cpus"
else:
file = "/sys/fs/cgroup/cpuset/gpdb/" + group + "/cpuset.cpus"
fd = open(file)
line = fd.readline()
fd.close()
return parse_cpuset(line)
config_groups = get_all_group_which_cpuset_is_set()
groups_cpuset = set([])
# check whether cpuset in config and cgroup are same, and have no overlap
for config_group in config_groups:
groupid = config_group['groupid']
cpuset_value = config_group['cpuset']
config_cpuset = parse_cpuset(cpuset_value)
cgroup_cpuset = get_cgroup_cpuset(groupid)
if len(groups_cpuset & cgroup_cpuset) > 0:
return False
groups_cpuset |= cgroup_cpuset
if not (config_cpuset.issubset(cgroup_cpuset) and cgroup_cpuset.issubset(config_cpuset)):
return False
# check whether cpuset in resource group union default group is universal set
default_cpuset = get_cgroup_cpuset(1)
all_cpuset = get_cgroup_cpuset(0)
if not (default_cpuset | groups_cpuset).issubset(all_cpuset):
return False
if not all_cpuset.issubset(default_cpuset | groups_cpuset):
return False
# if all the cores are allocated to resource group, default group must has a core left
if len(default_cpuset & groups_cpuset) > 0 and (len(default_cpuset) != 1 or (not default_cpuset.issubset(all_cpuset))):
return False
return True
$$ LANGUAGE plpython3u;
0: CREATE OR REPLACE FUNCTION is_session_in_group(pid integer, groupname text) RETURNS BOOL AS $$
import subprocess
sql = "select sess_id from pg_stat_activity where pid = '%d'" % pid
result = plpy.execute(sql)
session_id = result[0]['sess_id']
sql = "select groupid from gp_toolkit.gp_resgroup_config where groupname='%s'" % groupname
result = plpy.execute(sql)
groupid = result[0]['groupid']
sql = "select hostname from gp_segment_configuration group by hostname"
result = plpy.execute(sql)
hosts = [_['hostname'] for _ in result]
def get_result(host):
stdout = subprocess.run(["ssh", "{}".format(host), "ps -ef | grep postgres | grep con{} | grep -v grep | awk '{{print $2}}'".format(session_id)],
check=True, stdout=subprocess.PIPE).stdout
session_pids = stdout.splitlines()
path = "/sys/fs/cgroup/cpu/gpdb/{}/cgroup.procs".format(groupid)
stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], check=True, stdout=subprocess.PIPE).stdout
cgroups_pids = stdout.splitlines()
return set(session_pids).issubset(set(cgroups_pids))
for host in hosts:
if not get_result(host):
return False
return True
$$ LANGUAGE plpython3u;