blob: 98284d27a09cc07038cf17bbe5de543cced3c53d [file]
/*-------------------------------------------------------------------------
*
* resgroup.c
* GPDB resource group management code.
*
* Portions Copyright (c) 2006-2010, Greenplum inc.
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
*
*
* IDENTIFICATION
* src/backend/utils/resgroup/resgroup.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <math.h>
#include "libpq-fe.h"
#include "access/genam.h"
#include "access/table.h"
#include "access/xact.h"
#include "tcop/tcopprot.h"
#include "catalog/catalog.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_resgroup.h"
#include "catalog/pg_resgroupcapability.h"
#include "cdb/cdbgang.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/memquota.h"
#include "commands/resgroupcmds.h"
#include "commands/tablespace.h"
#include "common/hashfn.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "port/atomics.h"
#include "postmaster/autovacuum.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lmgr.h"
#include "storage/lock.h"
#include "storage/pg_shmem.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/cgroup.h"
#include "utils/backend_status.h"
#include "utils/resgroup.h"
#include "utils/resource_manager.h"
#include "utils/session_state.h"
#include "utils/vmem_tracker.h"
#include "utils/cgroup-ops-v1.h"
#include "utils/cgroup-ops-dummy.h"
#include "utils/cgroup-ops-v2.h"
#include "utils/cgroup_io_limit.h"
#include "access/xact.h"
#define InvalidSlotId (-1)
#define RESGROUP_MAX_SLOTS (MaxConnections)
/*
* GUC variables.
*/
int gp_resgroup_memory_policy = RESMANAGER_MEMORY_POLICY_NONE;
bool gp_log_resgroup_memory = false;
int gp_resgroup_memory_query_fixed_mem;
int gp_resgroup_memory_policy_auto_fixed_mem;
bool gp_resgroup_print_operator_memory_limits = false;
bool gp_resgroup_debug_wait_queue = true;
int gp_resource_group_queuing_timeout = 0;
int gp_resource_group_move_timeout = 30000;
/*
* Data structures
*/
typedef struct ResGroupInfo ResGroupInfo;
typedef struct ResGroupHashEntry ResGroupHashEntry;
typedef struct ResGroupProcData ResGroupProcData;
typedef struct ResGroupSlotData ResGroupSlotData;
typedef struct ResGroupData ResGroupData;
typedef struct ResGroupControl ResGroupControl;
/*
* Resource group info.
*
* This records the group and groupId for a transaction.
* When group->groupId != groupId, it means the group
* has been dropped.
*/
struct ResGroupInfo
{
ResGroupData *group;
Oid groupId;
};
struct ResGroupHashEntry
{
Oid groupId;
int index;
};
/*
* Per proc resource group information.
*
* Config snapshot and runtime accounting information in current proc.
*/
struct ResGroupProcData
{
Oid groupId;
ResGroupData *group;
ResGroupSlotData *slot;
ResGroupCaps caps;
};
/*
* Per slot resource group information.
*
* Resource group have 'concurrency' number of slots.
* Each transaction acquires a slot on master before running.
* The information shared by QE processes on each segments are stored
* in this structure.
*/
struct ResGroupSlotData
{
Oid groupId;
ResGroupData *group; /* pointer to the group */
int nProcs; /* number of procs in this slot */
ResGroupSlotData *next;
ResGroupCaps caps;
};
/*
* Resource group information.
*/
struct ResGroupData
{
Oid groupId; /* ID for this group */
volatile int nRunning; /* number of running trans */
volatile int nRunningBypassed; /* number of running trans in bypass mode */
int64 totalExecuted; /* total number of executed trans */
int64 totalQueued; /* total number of queued trans */
int64 totalQueuedTimeMs; /* total queue time, in milliseconds */
dclist_head waitProcs; /* list of PGPROC objects waiting on this group */
bool lockedForDrop; /* true if resource group is dropped but not committed yet */
ResGroupCaps caps; /* capabilities of this group */
};
struct ResGroupControl
{
int segmentsOnMaster;
ResGroupSlotData *slots; /* slot pool shared by all resource groups */
ResGroupSlotData *freeSlot; /* head of the free list */
HTAB *htbl;
/*
* The hash table for resource groups in shared memory should only be populated
* once, so we add a flag here to implement this requirement.
*/
bool loaded;
int nGroups;
ResGroupData groups[1];
};
bool gp_resource_group_enable_cgroup_cpuset = false;
CGroupOpsRoutine *cgroupOpsRoutine = NULL;
CGroupSystemInfo *cgroupSystemInfo = NULL;
/* hooks */
resgroup_assign_hook_type resgroup_assign_hook = NULL;
/* static variables */
static ResGroupControl *pResGroupControl = NULL;
static ResGroupProcData __self =
{
InvalidOid,
};
static ResGroupProcData *self = &__self;
/* If we are waiting on a group, this points to the associated group */
static ResGroupData *groupAwaited = NULL;
static TimestampTz groupWaitStart;
static TimestampTz groupWaitEnd;
/* the resource group self is running in bypass mode */
static ResGroupData *bypassedGroup = NULL;
/* a fake slot used in bypass mode */
static ResGroupSlotData bypassedSlot;
/* static functions */
static void wakeupSlots(ResGroupData *group, bool grant);
static ResGroupData *groupHashNew(Oid groupId);
static ResGroupData *groupHashFind(Oid groupId, bool raise);
static ResGroupData *groupHashRemove(Oid groupId);
static void waitOnGroup(ResGroupData *group, bool isMoveQuery);
static ResGroupData *createGroup(Oid groupId, const ResGroupCaps *caps);
static void removeGroup(Oid groupId);
static void AtProcExit_ResGroup(int code, Datum arg);
static void groupWaitCancel(bool isMoveQuery);
static void initSlot(ResGroupSlotData *slot, ResGroupData *group);
static void selfAttachResGroup(ResGroupData *group, ResGroupSlotData *slot);
static void selfDetachResGroup(ResGroupData *group, ResGroupSlotData *slot);
static bool slotpoolInit(void);
static ResGroupSlotData *slotpoolAllocSlot(void);
static void slotpoolFreeSlot(ResGroupSlotData *slot);
static ResGroupSlotData *groupGetSlot(ResGroupData *group);
static void groupPutSlot(ResGroupData *group, ResGroupSlotData *slot);
static Oid decideResGroupId(void);
static void decideResGroup(ResGroupInfo *pGroupInfo);
static bool groupIncBypassedRef(ResGroupInfo *pGroupInfo);
static void groupDecBypassedRef(ResGroupData *group);
static ResGroupSlotData *groupAcquireSlot(ResGroupInfo *pGroupInfo, bool isMoveQuery);
static void groupReleaseSlot(ResGroupData *group, ResGroupSlotData *slot, bool isMoveQuery);
static void addTotalQueueDuration(ResGroupData *group);
static void selfValidateResGroupInfo(void);
static bool selfIsAssigned(void);
static void selfSetGroup(ResGroupData *group);
static void selfUnsetGroup(void);
static void selfSetSlot(ResGroupSlotData *slot);
static void selfUnsetSlot(void);
static bool procIsWaiting(const PGPROC *proc);
static void procWakeup(PGPROC *proc);
static int slotGetId(const ResGroupSlotData *slot);
static void groupWaitQueueValidate(const ResGroupData *group);
static void groupWaitProcValidate(PGPROC *proc, dclist_head *head);
static void groupWaitQueuePush(ResGroupData *group, PGPROC *proc);
static PGPROC *groupWaitQueuePop(ResGroupData *group);
static void groupWaitQueueErase(ResGroupData *group, PGPROC *proc);
static bool groupWaitQueueIsEmpty(const ResGroupData *group);
static bool checkBypassWalker(Node *node, void *context);
static bool shouldBypassSelectQuery(Node *node);
static bool shouldBypassQuery(const char *query_string);
static void lockResGroupForDrop(ResGroupData *group);
static void unlockResGroupForDrop(ResGroupData *group);
static bool groupIsDropped(ResGroupInfo *pGroupInfo);
static void resgroupDumpGroup(StringInfo str, ResGroupData *group);
static void resgroupDumpWaitQueue(StringInfo str, dclist_head *queue);
static void resgroupDumpCaps(StringInfo str, ResGroupCap *caps);
static void resgroupDumpSlots(StringInfo str);
static void resgroupDumpFreeSlots(StringInfo str);
static void sessionSetSlot(ResGroupSlotData *slot);
static void sessionResetSlot(ResGroupSlotData *slot);
static ResGroupSlotData *sessionGetSlot(void);
static void cpusetOperation(char *cpuset1,
const char *cpuset2,
int len,
bool sub);
#ifdef USE_ASSERT_CHECKING
static bool selfHasGroup(void);
static bool selfHasSlot(void);
static void slotValidate(const ResGroupSlotData *slot);
static bool slotIsInFreelist(const ResGroupSlotData *slot);
static bool slotIsInUse(const ResGroupSlotData *slot);
static bool groupIsNotDropped(const ResGroupData *group);
static bool groupWaitQueueFind(ResGroupData *group, const PGPROC *proc);
#endif /* USE_ASSERT_CHECKING */
static bool is_pure_catalog_plan(PlannedStmt *stmt);
static bool can_bypass_based_on_plan_cost(PlannedStmt *stmt);
static bool can_bypass_direct_dispatch_plan(PlannedStmt *stmt);
/*
* Estimate size the resource group structures will need in
* shared memory.
*/
Size
ResGroupShmemSize(void)
{
Size size = 0;
/* The hash of groups. */
size = hash_estimate_size(MaxResourceGroups, sizeof(ResGroupHashEntry));
/* The control structure. */
size = add_size(size, sizeof(ResGroupControl) - sizeof(ResGroupData));
/* The control structure. */
size = add_size(size, mul_size(MaxResourceGroups, sizeof(ResGroupData)));
/* The slot pool. */
size = add_size(size, mul_size(RESGROUP_MAX_SLOTS, sizeof(ResGroupSlotData)));
/* Add a safety margin */
size = add_size(size, size / 10);
return size;
}
/*
* Initialize the global ResGroupControl struct of resource groups.
*/
void
ResGroupControlInit(void)
{
int i;
bool found;
HASHCTL info;
int hash_flags;
int size;
size = sizeof(*pResGroupControl) - sizeof(ResGroupData);
size += mul_size(MaxResourceGroups, sizeof(ResGroupData));
pResGroupControl = ShmemInitStruct("global resource group control",
size, &found);
if (found)
return;
if (pResGroupControl == NULL)
goto error_out;
/* Set key and entry sizes of hash table */
MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(Oid);
info.entrysize = sizeof(ResGroupHashEntry);
info.hash = tag_hash;
hash_flags = (HASH_ELEM | HASH_FUNCTION);
LOG_RESGROUP_DEBUG(LOG, "creating hash table for %d resource groups", MaxResourceGroups);
pResGroupControl->htbl = ShmemInitHash("Resource Group Hash Table",
MaxResourceGroups,
MaxResourceGroups,
&info, hash_flags);
if (!pResGroupControl->htbl)
goto error_out;
/*
* No need to acquire LWLock here, since this is expected to be called by
* postmaster only
*/
pResGroupControl->loaded = false;
pResGroupControl->nGroups = MaxResourceGroups;
for (i = 0; i < MaxResourceGroups; i++)
pResGroupControl->groups[i].groupId = InvalidOid;
if (!slotpoolInit())
goto error_out;
return;
error_out:
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough shared memory for resource group control")));
}
/*
* Allocate a resource group entry from a hash table
*/
void
AllocResGroupEntry(Oid groupId, const ResGroupCaps *caps)
{
ResGroupData *group;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
group = createGroup(groupId, caps);
Assert(group != NULL);
LWLockRelease(ResGroupLock);
}
void
initCgroup(void)
{
#ifdef __linux__
if (Gp_resource_manager_policy == RESOURCE_MANAGER_POLICY_GROUP)
{
cgroupOpsRoutine = get_group_routine_v1();
cgroupSystemInfo = get_cgroup_sysinfo_v1();
}
else
{
cgroupOpsRoutine = get_group_routine_v2();
cgroupSystemInfo = get_cgroup_sysinfo_v2();
}
#else
cgroupOpsRoutine = get_cgroup_routine_dummy();
cgroupSystemInfo = get_cgroup_sysinfo_dummy();
#endif
bool probe_result = cgroupOpsRoutine->probecgroup();
if (!probe_result)
elog(ERROR, "The control group is not well configured, please check your "
"system configuration.");
cgroupOpsRoutine->checkcgroup();
cgroupOpsRoutine->initcgroup();
}
/*
* Load the resource groups in shared memory. Note this
* can only be done after enough setup has been done. This uses
* heap_open etc which in turn requires shared memory to be set up.
*/
void
InitResGroups(void)
{
HeapTuple tuple;
SysScanDesc sscan;
int numGroups;
CdbComponentDatabaseInfo *qdinfo;
ResGroupCaps caps;
Relation relResGroup;
Relation relResGroupCapability;
char cpuset[MaxCpuSetLength] = {0};
int defaultCore = -1;
Bitmapset *bmsUnused = NULL;
on_shmem_exit(AtProcExit_ResGroup, 0);
/*
* On master and segments, the first backend does the initialization.
*/
if (pResGroupControl->loaded)
return;
if (Gp_role == GP_ROLE_DISPATCH && pResGroupControl->segmentsOnMaster == 0)
{
Assert(IS_QUERY_DISPATCHER());
qdinfo = cdbcomponent_getComponentInfo(MASTER_CONTENT_ID);
pResGroupControl->segmentsOnMaster = qdinfo->hostPrimaryCount;
Assert(pResGroupControl->segmentsOnMaster > 0);
}
/*
* The resgroup shared mem initialization must be serialized. Only the first session
* should do the init.
* Serialization is done by LW_EXCLUSIVE ResGroupLock. However, we must obtain all DB
* locks before obtaining LWlock to prevent deadlock.
*/
relResGroup = table_open(ResGroupRelationId, AccessShareLock);
relResGroupCapability = table_open(ResGroupCapabilityRelationId, AccessShareLock);
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
if (pResGroupControl->loaded)
goto exit;
if (gp_resource_group_enable_cgroup_cpuset)
{
/* Get cpuset from cpuset/gpdb, and transform it into bitset */
cgroupOpsRoutine->getcpuset(CGROUP_ROOT_ID, cpuset, MaxCpuSetLength);
bmsUnused = CpusetToBitset(cpuset, MaxCpuSetLength);
/* get the minimum core number, in case of the zero core is not exist */
defaultCore = bms_next_member(bmsUnused, -1);
Assert(defaultCore >= 0);
}
numGroups = 0;
sscan = systable_beginscan(relResGroup, InvalidOid, false, NULL, 0, NULL);
while (HeapTupleIsValid(tuple = systable_getnext(sscan)))
{
Oid groupId = ((Form_pg_resgroup) GETSTRUCT(tuple))->oid;
ResGroupData *group;
Bitmapset *bmsCurrent;
GetResGroupCapabilities(relResGroupCapability, groupId, &caps);
group = createGroup(groupId, &caps);
Assert(group != NULL);
cgroupOpsRoutine->createcgroup(groupId);
if (caps.io_limit != NIL)
{
cgroupOpsRoutine->setio(groupId, caps.io_limit);
cgroupOpsRoutine->freeio(caps.io_limit);
}
if (CpusetIsEmpty(caps.cpuset))
{
cgroupOpsRoutine->setcpulimit(groupId, caps.cpuMaxPercent);
cgroupOpsRoutine->setcpuweight(groupId, caps.cpuWeight);
}
else
{
char *cpuset2;
cpuset2 = getCpuSetByRole(caps.cpuset);
bmsCurrent = CpusetToBitset(cpuset2, MaxCpuSetLength);
Bitmapset *bmsCommon = bms_intersect(bmsCurrent, bmsUnused);
Bitmapset *bmsMissing = bms_difference(bmsCurrent, bmsCommon);
/*
* Do not call EnsureCpusetIsAvailable() here as resource group is
* not activated yet
*/
if (!gp_resource_group_enable_cgroup_cpuset)
{
ereport(WARNING,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cgroup is not properly configured to use the cpuset2 feature"),
errhint("Extra cgroup configurations are required to enable this feature, "
"please refer to the Cloudberry Documentations for details")));
}
Assert(caps.cpuMaxPercent == CPU_MAX_PERCENT_DISABLED);
if (bms_is_empty(bmsMissing))
{
/*
* write cpus to corresponding file
* if all the cores are available
*/
cpuset2= getCpuSetByRole(caps.cpuset);
cgroupOpsRoutine->setcpuset(groupId, cpuset2);
bmsUnused = bms_del_members(bmsUnused, bmsCurrent);
}
else
{
char cpusetMissing[MaxCpuSetLength] = {0};
/*
* if some of the cores are unavailable, just set defaultCore
* to this group and send a warning message, so the system
* can startup, then DBA can fix it
*/
snprintf(cpuset2, MaxCpuSetLength, "%d", defaultCore);
cgroupOpsRoutine->setcpuset(groupId, cpuset2);
BitsetToCpuset(bmsMissing, cpusetMissing, MaxCpuSetLength);
ereport(WARNING,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cpu cores %s are unavailable on the system "
"in resource group %s",
cpusetMissing, GetResGroupNameForId(groupId)),
errhint("using core %d for this resource group, "
"please adjust the settings and restart",
defaultCore)));
}
}
numGroups++;
Assert(numGroups <= MaxResourceGroups);
}
systable_endscan(sscan);
if (gp_resource_group_enable_cgroup_cpuset)
{
/*
* set default cpuset
*/
if (bms_is_empty(bmsUnused))
{
/* no unused core, assign default core to default group */
snprintf(cpuset, MaxCpuSetLength, "%d", defaultCore);
}
else
{
/* assign all unused cores to default group */
BitsetToCpuset(bmsUnused, cpuset, MaxCpuSetLength);
}
Assert(cpuset[0]);
Assert(!CpusetIsEmpty(cpuset));
cgroupOpsRoutine->setcpuset(DEFAULT_CPUSET_GROUP_ID, cpuset);
}
pResGroupControl->loaded = true;
LOG_RESGROUP_DEBUG(LOG, "initialized %d resource groups", numGroups);
exit:
LWLockRelease(ResGroupLock);
/*
* release lock here to guarantee we have no lock held when acquiring
* resource group slot
*/
table_close(relResGroup, AccessShareLock);
table_close(relResGroupCapability, AccessShareLock);
}
/*
* Check resource group status when DROP RESOURCE GROUP
*
* Errors out if there're running transactions, otherwise lock the resource group.
* New transactions will be queued if the resource group is locked.
*/
void
ResGroupCheckForDrop(Oid groupId, char *name)
{
ResGroupData *group;
if (Gp_role != GP_ROLE_DISPATCH)
return;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
group = groupHashFind(groupId, true);
if (group->nRunning + group->nRunningBypassed > 0)
{
int nQuery = group->nRunning + group->nRunningBypassed + group->waitProcs.count;
Assert(name != NULL);
ereport(ERROR,
(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
errmsg("cannot drop resource group \"%s\"", name),
errhint(" The resource group is currently managing %d query(ies) and cannot be dropped.\n"
"\tTerminate the queries first or try dropping the group later.\n"
"\tThe view pg_stat_activity tracks the queries managed by resource groups.", nQuery)));
}
lockResGroupForDrop(group);
LWLockRelease(ResGroupLock);
}
/*
* Drop resource group call back function
*
* Wake up the backends in the wait queue when DROP RESOURCE GROUP finishes.
* Unlock the resource group if the transaction is aborted.
* Remove the resource group entry in shared memory if the transaction is committed.
*
* This function is called in the callback function of DROP RESOURCE GROUP.
*/
void
ResGroupDropFinish(const ResourceGroupCallbackContext *callbackCtx,
bool isCommit)
{
ResGroupData *group;
volatile int savedInterruptHoldoffCount;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
PG_TRY();
{
savedInterruptHoldoffCount = InterruptHoldoffCount;
group = groupHashFind(callbackCtx->groupid, true);
if (Gp_role == GP_ROLE_DISPATCH)
{
wakeupSlots(group, false);
unlockResGroupForDrop(group);
}
if (isCommit)
{
removeGroup(callbackCtx->groupid);
if (!CpusetIsEmpty(group->caps.cpuset))
{
if (gp_resource_group_enable_cgroup_cpuset)
{
/* reset default group, add cpu cores to it */
char cpuset[MaxCpuSetLength];
cgroupOpsRoutine->getcpuset(DEFAULT_CPUSET_GROUP_ID,
cpuset, MaxCpuSetLength);
CpusetUnion(cpuset, group->caps.cpuset, MaxCpuSetLength);
cgroupOpsRoutine->setcpuset(DEFAULT_CPUSET_GROUP_ID, cpuset);
}
}
cgroupOpsRoutine->destroycgroup(callbackCtx->groupid, true);
}
}
PG_CATCH();
{
InterruptHoldoffCount = savedInterruptHoldoffCount;
if (elog_demote(WARNING))
{
EmitErrorReport();
FlushErrorState();
}
else
{
elog(LOG, "unable to demote error");
}
}
PG_END_TRY();
LWLockRelease(ResGroupLock);
}
/*
* Remove the resource group entry in shared memory if the transaction is aborted.
*
* This function is called in the callback function of CREATE RESOURCE GROUP.
*/
void
ResGroupCreateOnAbort(const ResourceGroupCallbackContext *callbackCtx)
{
volatile int savedInterruptHoldoffCount;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
PG_TRY();
{
savedInterruptHoldoffCount = InterruptHoldoffCount;
removeGroup(callbackCtx->groupid);
/* remove the os dependent part for this resource group */
cgroupOpsRoutine->destroycgroup(callbackCtx->groupid, true);
if (!CpusetIsEmpty(callbackCtx->caps.cpuset) &&
gp_resource_group_enable_cgroup_cpuset)
{
/* return cpu cores to default group */
char defaultGroupCpuset[MaxCpuSetLength];
cgroupOpsRoutine->getcpuset(DEFAULT_CPUSET_GROUP_ID,
defaultGroupCpuset,
MaxCpuSetLength);
CpusetUnion(defaultGroupCpuset,
callbackCtx->caps.cpuset,
MaxCpuSetLength);
cgroupOpsRoutine->setcpuset(DEFAULT_CPUSET_GROUP_ID, defaultGroupCpuset);
}
}
PG_CATCH();
{
InterruptHoldoffCount = savedInterruptHoldoffCount;
if (elog_demote(WARNING))
{
EmitErrorReport();
FlushErrorState();
}
else
{
elog(LOG, "unable to demote error");
}
}
PG_END_TRY();
LWLockRelease(ResGroupLock);
}
/*
* Apply the new resgroup caps.
*/
void
ResGroupAlterOnCommit(const ResourceGroupCallbackContext *callbackCtx)
{
ResGroupData *group;
volatile int savedInterruptHoldoffCount;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
PG_TRY();
{
savedInterruptHoldoffCount = InterruptHoldoffCount;
group = groupHashFind(callbackCtx->groupid, true);
group->caps = callbackCtx->caps;
if (callbackCtx->limittype == RESGROUP_LIMIT_TYPE_CPU)
{
cgroupOpsRoutine->setcpulimit(callbackCtx->groupid,
callbackCtx->caps.cpuMaxPercent);
/* We should set cpuset to the default value */
char *cpuset = (char *) palloc(MaxCpuSetLength);
sprintf(cpuset, "0-%d", cgroupSystemInfo->ncores-1);
cgroupOpsRoutine->setcpuset(callbackCtx->groupid, cpuset);
}
else if (callbackCtx->limittype == RESGROUP_LIMIT_TYPE_CPU_SHARES)
{
cgroupOpsRoutine->setcpuweight(callbackCtx->groupid,
callbackCtx->caps.cpuWeight);
}
else if (callbackCtx->limittype == RESGROUP_LIMIT_TYPE_CPUSET)
{
if (gp_resource_group_enable_cgroup_cpuset)
{
char *cpuset = getCpuSetByRole(callbackCtx->caps.cpuset);
cgroupOpsRoutine->setcpuset(callbackCtx->groupid,
cpuset);
}
}
else if (callbackCtx->limittype == RESGROUP_LIMIT_TYPE_CONCURRENCY)
{
wakeupSlots(group, true);
}
else if (callbackCtx->limittype == RESGROUP_LIMIT_TYPE_IO_LIMIT)
{
/*
* When alter io_limit to -1 , the caps.io_limit will be nil.
* There are no errors in io_limit string when caps.io_limit is nil.
* When alter io_limit, caps.io_limit is nil means this resource group's io_limit should be clear.
*/
cgroupOpsRoutine->cleario(callbackCtx->groupid);
cgroupOpsRoutine->setio(callbackCtx->groupid, callbackCtx->caps.io_limit);
}
/* reset default group if cpuset has changed */
if (strcmp(callbackCtx->oldCaps.cpuset, callbackCtx->caps.cpuset) &&
gp_resource_group_enable_cgroup_cpuset)
{
char defaultCpusetGroup[MaxCpuSetLength];
/* get current default group value */
cgroupOpsRoutine->getcpuset(DEFAULT_CPUSET_GROUP_ID,
defaultCpusetGroup,
MaxCpuSetLength);
/* Add old value to default group
* sub new value from default group */
char *cpuset= getCpuSetByRole(callbackCtx->caps.cpuset);
char *oldcpuset = getCpuSetByRole(callbackCtx->oldCaps.cpuset);
CpusetUnion(defaultCpusetGroup,
oldcpuset,
MaxCpuSetLength);
CpusetDifference(defaultCpusetGroup,
cpuset,
MaxCpuSetLength);
cgroupOpsRoutine->setcpuset(DEFAULT_CPUSET_GROUP_ID, defaultCpusetGroup);
}
}
PG_CATCH();
{
InterruptHoldoffCount = savedInterruptHoldoffCount;
if (elog_demote(WARNING))
{
EmitErrorReport();
FlushErrorState();
}
else
{
elog(LOG, "unable to demote error");
}
}
PG_END_TRY();
LWLockRelease(ResGroupLock);
}
bool
ResGroupIsAssigned(void)
{
return selfIsAssigned();
}
/*
* Get resource group id of my proc.
*
* Returns InvalidOid in any of below cases:
* - resource group is not enabled;
* - resource group is not activated (initialized);
* - my proc is not running inside a transaction;
* - my proc is not assigned a resource group yet;
*
* Otherwise a valid resource group id is returned.
*
* This function is not dead code although there is no consumer in the gpdb
* code tree. Some extensions require this to get the internal resource group
* information.
*/
Oid
GetMyResGroupId(void)
{
return self->groupId;
}
/*
* Retrieve statistic information of type from resource group
*/
Datum
ResGroupGetStat(Oid groupId, ResGroupStatType type)
{
ResGroupData *group;
Interval *interval;
Datum result;
Assert(IsResGroupActivated());
LWLockAcquire(ResGroupLock, LW_SHARED);
group = groupHashFind(groupId, true);
switch (type)
{
case RES_GROUP_STAT_NRUNNING:
result = Int32GetDatum(group->nRunning + group->nRunningBypassed);
break;
case RES_GROUP_STAT_NQUEUEING:
result = Int32GetDatum(group->waitProcs.count);
break;
case RES_GROUP_STAT_TOTAL_EXECUTED:
result = Int64GetDatum(group->totalExecuted);
break;
case RES_GROUP_STAT_TOTAL_QUEUED:
result = Int64GetDatum(group->totalQueued);
break;
case RES_GROUP_STAT_TOTAL_QUEUE_TIME:
/*
* Turn milliseconds in totalQueuedTimeMs into an Interval.
*
* Note: we only use the 'time' field. The user can call
* justify_interval() if she wants.
*/
interval = (Interval *) palloc(sizeof(Interval));
interval->time = group->totalQueuedTimeMs * 1000;
interval->day = 0;
interval->month = 0;
result = IntervalPGetDatum(interval);
break;
default:
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("invalid stat type %d", type)));
}
LWLockRelease(ResGroupLock);
return result;
}
/*
* Get the number of primary segments on this host
*/
int
ResGroupGetHostPrimaryCount()
{
return (Gp_role == GP_ROLE_EXECUTE ? host_primary_segment_count : pResGroupControl->segmentsOnMaster);
}
/*
* removeGroup -- remove resource group from share memory and
* reclaim the group's memory back to MEM POOL.
*/
static void
removeGroup(Oid groupId)
{
ResGroupData *group;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(OidIsValid(groupId));
group = groupHashRemove(groupId);
group->groupId = InvalidOid;
}
/*
* createGroup -- initialize the elements for a resource group.
*
* Notes:
* It is expected that the appropriate lightweight lock is held before
* calling this - unless we are the startup process.
*/
static ResGroupData *
createGroup(Oid groupId, const ResGroupCaps *caps)
{
ResGroupData *group;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(OidIsValid(groupId));
group = groupHashNew(groupId);
Assert(group != NULL);
group->groupId = groupId;
group->caps = *caps;
/* remove local pointers */
group->caps.io_limit = NIL;
group->nRunning = 0;
group->nRunningBypassed = 0;
dclist_init(&group->waitProcs);
group->totalExecuted = 0;
group->totalQueued = 0;
group->totalQueuedTimeMs = 0;
group->lockedForDrop = false;
return group;
}
/*
* Attach a process (QD or QE) to a slot.
*/
static void
selfAttachResGroup(ResGroupData *group, ResGroupSlotData *slot)
{
selfSetGroup(group);
selfSetSlot(slot);
pg_atomic_add_fetch_u32((pg_atomic_uint32*) &slot->nProcs, 1);
}
/*
* Detach a process (QD or QE) from a slot.
*/
static void
selfDetachResGroup(ResGroupData *group, ResGroupSlotData *slot)
{
pg_atomic_sub_fetch_u32((pg_atomic_uint32*) &slot->nProcs, 1);
selfUnsetSlot();
selfUnsetGroup();
}
/*
* Initialize the members of a slot
*/
static void
initSlot(ResGroupSlotData *slot, ResGroupData *group)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(!slotIsInUse(slot));
Assert(group->groupId != InvalidOid);
slot->group = group;
slot->groupId = group->groupId;
slot->caps = group->caps;
}
/*
* Alloc and initialize slot pool
*/
static bool
slotpoolInit(void)
{
ResGroupSlotData *slot;
ResGroupSlotData *next;
int numSlots;
int memSize;
int i;
numSlots = RESGROUP_MAX_SLOTS;
memSize = mul_size(numSlots, sizeof(ResGroupSlotData));
pResGroupControl->slots = ShmemAlloc(memSize);
if (!pResGroupControl->slots)
return false;
MemSet(pResGroupControl->slots, 0, memSize);
/* push all the slots into the list */
next = NULL;
for (i = numSlots - 1; i >= 0; i--)
{
slot = &pResGroupControl->slots[i];
slot->group = NULL;
slot->groupId = InvalidOid;
slot->next = next;
next = slot;
}
pResGroupControl->freeSlot = next;
return true;
}
/*
* Alloc a slot from shared slot pool
*/
static ResGroupSlotData *
slotpoolAllocSlot(void)
{
ResGroupSlotData *slot;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(pResGroupControl->freeSlot != NULL);
slot = pResGroupControl->freeSlot;
pResGroupControl->freeSlot = slot->next;
slot->next = NULL;
return slot;
}
/*
* Free a slot back to shared slot pool
*/
static void
slotpoolFreeSlot(ResGroupSlotData *slot)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(slotIsInUse(slot));
Assert(slot->nProcs == 0);
slot->group = NULL;
slot->groupId = InvalidOid;
slot->next = pResGroupControl->freeSlot;
pResGroupControl->freeSlot = slot;
}
/*
* Get a slot.
*
* A slot can be got with this function the concurrency limit is not reached.
*
* On success the nRunning is increased and the slot's groupId is also set
* accordingly, the slot id is returned.
*
* On failure nothing is changed and InvalidSlotId is returned.
*/
static ResGroupSlotData *
groupGetSlot(ResGroupData *group)
{
ResGroupSlotData *slot;
ResGroupCaps *caps;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(Gp_role == GP_ROLE_DISPATCH);
Assert(groupIsNotDropped(group));
caps = &group->caps;
/* First check if the concurrency limit is reached */
if (group->nRunning >= caps->concurrency)
return NULL;
/* Now actually get a free slot */
slot = slotpoolAllocSlot();
Assert(!slotIsInUse(slot));
initSlot(slot, group);
group->nRunning++;
return slot;
}
/*
* Put back the slot assigned to self.
*
* This will release a slot, its nRunning will be decreased.
*/
static void
groupPutSlot(ResGroupData *group, ResGroupSlotData *slot)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(slotIsInUse(slot));
/* Return the slot back to free list */
slotpoolFreeSlot(slot);
group->nRunning--;
}
/*
* Pick a resource group for the current transaction.
*/
static Oid
decideResGroupId(void)
{
Oid groupId = InvalidOid;
if (resgroup_assign_hook)
groupId = resgroup_assign_hook();
if (groupId == InvalidOid)
groupId = GetResGroupIdForRole(GetUserId());
return groupId;
}
/*
* Decide the proper resource group for current role.
*
* An exception is thrown if current role is invalid.
*/
static void
decideResGroup(ResGroupInfo *pGroupInfo)
{
ResGroupData *group;
Oid groupId;
Assert(pResGroupControl != NULL);
Assert(pResGroupControl->segmentsOnMaster > 0);
Assert(Gp_role == GP_ROLE_DISPATCH);
/* always find out the up-to-date resgroup id */
groupId = decideResGroupId();
LWLockAcquire(ResGroupLock, LW_SHARED);
group = groupHashFind(groupId, false);
if (!group)
{
groupId = superuser() ? ADMINRESGROUP_OID : DEFAULTRESGROUP_OID;
group = groupHashFind(groupId, false);
}
Assert(group != NULL);
LWLockRelease(ResGroupLock);
pGroupInfo->group = group;
pGroupInfo->groupId = groupId;
}
/*
* Increase the bypassed ref count
*
* Return true if the operation is done, or false if the group is dropped.
*/
static bool
groupIncBypassedRef(ResGroupInfo *pGroupInfo)
{
ResGroupData *group = pGroupInfo->group;
bool result = false;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
/* Has the group been dropped? */
if (groupIsDropped(pGroupInfo))
goto end;
/* Is the group locked for drop? */
if (group->lockedForDrop)
goto end;
result = true;
pg_atomic_add_fetch_u32((pg_atomic_uint32 *) &group->nRunningBypassed, 1);
end:
LWLockRelease(ResGroupLock);
return result;
}
/*
* Decrease the bypassed ref count
*/
static void
groupDecBypassedRef(ResGroupData *group)
{
pg_atomic_sub_fetch_u32((pg_atomic_uint32 *) &group->nRunningBypassed, 1);
}
/*
* Acquire a resource group slot
*
* Call this function at the start of the transaction.
* This function set current resource group in MyResGroupSharedInfo,
* and current slot in MyProc->resSlot.
*/
static ResGroupSlotData *
groupAcquireSlot(ResGroupInfo *pGroupInfo, bool isMoveQuery)
{
ResGroupSlotData *slot;
ResGroupData *group;
Assert(!selfIsAssigned() || isMoveQuery);
group = pGroupInfo->group;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
/* Has the group been dropped? */
if (groupIsDropped(pGroupInfo))
{
LWLockRelease(ResGroupLock);
return NULL;
}
/* acquire a slot */
if (!group->lockedForDrop)
{
/* try to get a slot directly */
slot = groupGetSlot(group);
if (slot != NULL)
{
/* got one, lucky */
group->totalExecuted++;
LWLockRelease(ResGroupLock);
pgstat_report_resgroup(group->groupId);
return slot;
}
}
/*
* Add into group wait queue (if not waiting yet).
*/
Assert(!proc_exit_inprogress);
groupWaitQueuePush(group, MyProc);
if (!group->lockedForDrop)
group->totalQueued++;
LWLockRelease(ResGroupLock);
/*
* wait on the queue
* slot will be assigned by the proc wakes me up
* if i am waken up by DROP RESOURCE GROUP statement, the
* resSlot will be NULL.
*/
waitOnGroup(group, isMoveQuery);
if (MyProc->resSlot == NULL)
return NULL;
/*
* The waking process has granted us a valid slot.
* Update the statistic information of the resource group.
*/
slot = (ResGroupSlotData *) MyProc->resSlot;
MyProc->resSlot = NULL;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
addTotalQueueDuration(group);
group->totalExecuted++;
LWLockRelease(ResGroupLock);
pgstat_report_resgroup(group->groupId);
return slot;
}
/*
* Attempt to wake up pending slots in the group.
*
* - grant indicates whether to grant the proc a slot;
* - release indicates whether to wake up the proc with the LWLock
* temporarily released;
*
* When grant is true we'll give up once no slot can be get,
* e.g. due to lack of free slot.
*
* When grant is false all the pending procs will be woken up.
*/
static void
wakeupSlots(ResGroupData *group, bool grant)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
while (!groupWaitQueueIsEmpty(group))
{
PGPROC *waitProc;
ResGroupSlotData *slot = NULL;
if (grant)
{
/* try to get a slot for that proc */
slot = groupGetSlot(group);
if (slot == NULL)
/* if can't get one then give up */
break;
}
/* wake up one process in the wait queue */
waitProc = groupWaitQueuePop(group);
waitProc->resSlot = slot;
procWakeup(waitProc);
}
}
/* Update the total queued time of this group */
static void
addTotalQueueDuration(ResGroupData *group)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
if (group == NULL)
return;
group->totalQueuedTimeMs += (groupWaitEnd - groupWaitStart);
}
/*
* Release the resource group slot
*
* Call this function at the end of the transaction.
*/
static void
groupReleaseSlot(ResGroupData *group, ResGroupSlotData *slot, bool isMoveQuery)
{
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(!selfIsAssigned() || isMoveQuery);
groupPutSlot(group, slot);
/*
* We should wake up other pending queries on master nodes.
*/
if (IS_QUERY_DISPATCHER())
/*
* My slot is put back, then how many queuing queries should I wake up?
* Maybe zero, maybe one, maybe more, depends on how the resgroup's
* configuration were changed during our execution.
*/
wakeupSlots(group, true);
}
/*
* Serialize the resource group information that need to dispatch to segment.
*/
void
SerializeResGroupInfo(StringInfo str)
{
unsigned int cpuset_len;
int32 itmp;
ResGroupCaps empty_caps;
ResGroupCaps *caps;
if (selfIsAssigned())
caps = &self->caps;
else
{
ClearResGroupCaps(&empty_caps);
caps = &empty_caps;
}
itmp = htonl(self->groupId);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(int32));
itmp = htonl(caps->concurrency);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(int32));
itmp = htonl(caps->cpuMaxPercent);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(int32));
itmp = htonl(caps->cpuWeight);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(int32));
itmp = htonl(caps->memory_quota);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(int32));
itmp = htonl(caps->min_cost);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(int32));
cpuset_len = strlen(caps->cpuset);
itmp = htonl(cpuset_len);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(int32));
appendBinaryStringInfo(str, caps->cpuset, cpuset_len);
itmp = htonl(bypassedSlot.groupId);
appendBinaryStringInfo(str, (char *) &itmp, sizeof(itmp));
}
/*
* Deserialize the resource group information dispatched by QD.
*/
void
DeserializeResGroupInfo(struct ResGroupCaps *capsOut,
Oid *groupId,
const char *buf,
int len)
{
int32 itmp;
unsigned int cpuset_len;
const char *ptr = buf;
Assert(len > 0);
ClearResGroupCaps(capsOut);
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
*groupId = ntohl(itmp);
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
capsOut->concurrency = ntohl(itmp);
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
capsOut->cpuMaxPercent = ntohl(itmp);
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
capsOut->cpuWeight = ntohl(itmp);
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
capsOut->memory_quota = ntohl(itmp);
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
capsOut->min_cost = ntohl(itmp);
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
cpuset_len = ntohl(itmp);
if (cpuset_len >= sizeof(capsOut->cpuset))
elog(ERROR, "malformed serialized resource group info");
memcpy(capsOut->cpuset, ptr, cpuset_len); ptr += cpuset_len;
capsOut->cpuset[cpuset_len] = '\0';
memcpy(&itmp, ptr, sizeof(int32)); ptr += sizeof(int32);
bypassedSlot.groupId = ntohl(itmp);
Assert(len == ptr - buf);
}
/*
* Check whether resource group should be assigned on master.
*/
bool
ShouldAssignResGroupOnMaster(void)
{
/*
* Bypass resource group when it's waiting for a resource group slot. e.g.
* MyProc was interrupted by SIGTERM while waiting for resource group slot.
* Some callbacks - RemoveTempRelationsCallback for example - open new
* transactions on proc exit. It can cause a double add of MyProc to the
* waiting queue (and its corruption).
*
* Also bypass resource group when it's exiting.
* Also bypass resource group when it's vacuum worker process.
*/
return IsResGroupActivated() &&
IsNormalProcessingMode() &&
Gp_role == GP_ROLE_DISPATCH &&
!proc_exit_inprogress &&
!procIsWaiting(MyProc) &&
!IsAutoVacuumWorkerProcess();
}
/*
* Check whether resource group should be un-assigned.
* This will be called on both master and segments.
*/
bool
ShouldUnassignResGroup(void)
{
return IsResGroupActivated() &&
IsNormalProcessingMode() &&
(Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE);
}
/*
* On master, QD is assigned to a resource group at the beginning of a transaction.
* It will first acquire a slot from the resource group, and then, it will get the
* current capability snapshot, update the memory usage information, and add to
* the corresponding cgroup.
*/
void
AssignResGroupOnMaster(void)
{
ResGroupSlotData *slot;
ResGroupInfo groupInfo;
Assert(Gp_role == GP_ROLE_DISPATCH);
/*
* if query should be bypassed, do not assign a
* resource group, leave self unassigned
*/
if (shouldBypassQuery(debug_query_string))
{
/*
* If it's the first query in the connection (make sure tab completion
* is not triggered otherwise it will run some implicit query before
* you execute the SHOW command).
*
* Also need to increase a bypassed ref count to prevent the group
* being dropped concurrently.
*/
do {
decideResGroup(&groupInfo);
} while (!groupIncBypassedRef(&groupInfo));
/* Record which resgroup we are running in */
bypassedGroup = groupInfo.group;
/* Update pg_stat_activity statistics */
bypassedGroup->totalExecuted++;
pgstat_report_resgroup(bypassedGroup->groupId);
/* Initialize the fake slot */
bypassedSlot.group = groupInfo.group;
bypassedSlot.groupId = groupInfo.groupId;
/* Add into cgroup */
cgroupOpsRoutine->attachcgroup(bypassedGroup->groupId, MyProcPid,
bypassedGroup->caps.cpuMaxPercent == CPU_MAX_PERCENT_DISABLED);
return;
}
PG_TRY();
{
do {
decideResGroup(&groupInfo);
/* Acquire slot */
slot = groupAcquireSlot(&groupInfo, false);
} while (slot == NULL);
/* Set resource group slot for current session */
sessionSetSlot(slot);
selfAttachResGroup(groupInfo.group, slot);
/* Init self */
self->caps = slot->caps;
/* Don't error out before this line in this function */
SIMPLE_FAULT_INJECTOR("resgroup_assigned_on_master");
/* Add into cgroup */
cgroupOpsRoutine->attachcgroup(self->groupId, MyProcPid,
self->caps.cpuMaxPercent == CPU_MAX_PERCENT_DISABLED);
}
PG_CATCH();
{
UnassignResGroup();
PG_RE_THROW();
}
PG_END_TRY();
}
/*
* Detach from a resource group at the end of the transaction.
*/
void
UnassignResGroup(void)
{
ResGroupData *group;
ResGroupSlotData *slot;
if (bypassedGroup)
{
/* bypass mode ref count is only maintained on qd */
if (Gp_role == GP_ROLE_DISPATCH)
groupDecBypassedRef(bypassedGroup);
/* Reset the fake slot */
bypassedSlot.group = NULL;
bypassedSlot.groupId = InvalidOid;
bypassedGroup = NULL;
/* Update pg_stat_activity statistics */
pgstat_report_resgroup(InvalidOid);
return;
}
if (Gp_role == GP_ROLE_EXECUTE && IS_QUERY_DISPATCHER())
SIMPLE_FAULT_INJECTOR("unassign_resgroup_start_entrydb");
if (!selfIsAssigned())
return;
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
group = self->group;
slot = self->slot;
/* Sub proc memory accounting info from group and slot */
selfDetachResGroup(group, slot);
/* Release the slot if no reference. */
if (slot->nProcs == 0)
{
groupReleaseSlot(group, slot, false);
/*
* Reset resource group slot for current session. Note MySessionState
* could be reset as NULL in shmem_exit() before.
*/
sessionResetSlot(slot);
}
LWLockRelease(ResGroupLock);
if (Gp_role == GP_ROLE_DISPATCH)
SIMPLE_FAULT_INJECTOR("unassign_resgroup_end_qd");
pgstat_report_resgroup(InvalidOid);
}
/*
* QEs are not assigned/unassigned to a resource group on segments for each
* transaction, instead, they switch resource group when a new resource group
* id or slot id is dispatched.
*/
void
SwitchResGroupOnSegment(const char *buf, int len)
{
Oid newGroupId;
ResGroupCaps caps;
ResGroupData *group;
ResGroupSlotData *slot;
DeserializeResGroupInfo(&caps, &newGroupId, buf, len);
/*
* QD will dispatch the resgroup id via bypassedSlot.groupId
* in bypass mode.
*/
if (bypassedSlot.groupId != InvalidOid)
{
/* Are we already running in bypass mode? */
if (bypassedGroup != NULL)
{
Assert(bypassedGroup->groupId == bypassedSlot.groupId);
return;
}
/* Find out the resgroup by id */
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
bypassedGroup = groupHashFind(bypassedSlot.groupId, true);
LWLockRelease(ResGroupLock);
Assert(bypassedGroup != NULL);
/* Add into cgroup */
cgroupOpsRoutine->attachcgroup(bypassedGroup->groupId, MyProcPid,
caps.cpuMaxPercent == CPU_MAX_PERCENT_DISABLED);
return;
}
if (newGroupId == InvalidOid)
{
UnassignResGroup();
return;
}
/*
* The working case: pg_resgroup_move_query command was interrupted, but
* at the time target (dispatcher) process already got control over slot.
* If we'll wait until the end of current target process command and then
* will dispatch something on segments in the same transaction, then
* newGroupId will not be equal to current segment's one. We want to move
* out of inconsistent state.
*/
if (newGroupId != self->groupId)
UnassignResGroup();
if (self->groupId != InvalidOid)
{
/* it's not the first dispatch in the same transaction */
Assert(self->groupId == newGroupId);
Assert(self->caps.concurrency == caps.concurrency);
Assert(self->caps.cpuMaxPercent == caps.cpuMaxPercent);
Assert(!strcmp(self->caps.cpuset, caps.cpuset));
return;
}
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
group = groupHashFind(newGroupId, true);
Assert(group != NULL);
/* Init self */
Assert(host_primary_segment_count > 0);
Assert(caps.concurrency > 0);
self->caps = caps;
/* Init slot */
slot = sessionGetSlot();
if (slot != NULL)
{
Assert(slotIsInUse(slot));
Assert(slot->groupId == newGroupId);
}
else
{
/* This is the first QE of this session, allocate a slot from slot pool */
slot = slotpoolAllocSlot();
Assert(!slotIsInUse(slot));
sessionSetSlot(slot);
initSlot(slot, group);
group->nRunning++;
}
selfAttachResGroup(group, slot);
LWLockRelease(ResGroupLock);
/* finally, we can say we are in a valid resgroup */
Assert(selfIsAssigned());
/* Add into cgroup */
cgroupOpsRoutine->attachcgroup(self->groupId, MyProcPid,
self->caps.cpuMaxPercent == CPU_MAX_PERCENT_DISABLED);
}
/*
* Wait on the queue of resource group
*/
static void
waitOnGroup(ResGroupData *group, bool isMoveQuery)
{
int64 timeout = -1;
int64 curTime;
const char *old_status;
char *new_status = NULL;
int len;
PGPROC *proc = MyProc;
const char *queueStr = " queuing";
Assert(!LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(!selfIsAssigned() || isMoveQuery);
/* set ps status to waiting */
if (update_process_title)
{
old_status = get_real_act_ps_display(&len);
new_status = (char *) palloc(len + strlen(queueStr) + 1);
memcpy(new_status, old_status, len);
strcpy(new_status + len, queueStr);
set_ps_display(new_status);
/* truncate off " queuing" */
new_status[len] = '\0';
}
/*
* The low bits of 'wait_event_info' argument to WaitLatch are
* not enough to store a full Oid, so we set groupId out-of-band,
* via the backend entry.
*/
pgstat_report_resgroup(group->groupId);
/*
* Mark that we are waiting on resource group
*
* This is used for interrupt cleanup, similar to lockAwaited in ProcSleep
*/
groupAwaited = group;
groupWaitStart = GetCurrentTimestamp();
/*
* Make sure we have released all locks before going to sleep, to eliminate
* deadlock situations
*/
PG_TRY();
{
for (;;)
{
ResetLatch(&proc->procLatch);
CHECK_FOR_INTERRUPTS();
if (!procIsWaiting(proc))
break;
if (gp_resource_group_queuing_timeout > 0)
{
curTime = GetCurrentTimestamp();
timeout = gp_resource_group_queuing_timeout - (curTime - groupWaitStart) / 1000;
if (timeout < 0)
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("canceling statement due to resource group waiting timeout")));
WaitLatch(&proc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
(long) timeout, PG_WAIT_RESOURCE_GROUP);
}
else
{
WaitLatch(&proc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
PG_WAIT_RESOURCE_GROUP);
}
}
}
PG_CATCH();
{
/* reset ps status */
if (update_process_title)
{
set_ps_display(new_status);
pfree(new_status);
}
groupWaitCancel(isMoveQuery);
PG_RE_THROW();
}
PG_END_TRY();
groupAwaited = NULL;
groupWaitEnd = GetCurrentTimestamp();
/* reset ps status */
if (update_process_title)
{
set_ps_display(new_status);
pfree(new_status);
}
}
/*
* groupHashNew -- return a new (empty) group object to initialize.
*
* Notes
* The resource group lightweight lock (ResGroupLock) *must* be held for
* this operation.
*/
static ResGroupData *
groupHashNew(Oid groupId)
{
int i;
bool found;
ResGroupHashEntry *entry;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(groupId != InvalidOid);
for (i = 0; i < pResGroupControl->nGroups; i++)
{
if (pResGroupControl->groups[i].groupId == InvalidOid)
break;
}
Assert(i < pResGroupControl->nGroups);
entry = (ResGroupHashEntry *)
hash_search(pResGroupControl->htbl, (void *) &groupId, HASH_ENTER, &found);
/* caller should test that the group does not exist already */
Assert(!found);
entry->index = i;
return &pResGroupControl->groups[i];
}
/*
* groupHashFind -- return the group for a given oid.
*
* If the group cannot be found, then NULL is returned if 'raise' is false,
* otherwise an exception is thrown.
*
* Notes
* The resource group lightweight lock (ResGroupLock) *must* be held for
* this operation.
*/
static ResGroupData *
groupHashFind(Oid groupId, bool raise)
{
bool found;
ResGroupHashEntry *entry;
Assert(LWLockHeldByMe(ResGroupLock));
entry = (ResGroupHashEntry *)
hash_search(pResGroupControl->htbl, (void *) &groupId, HASH_FIND, &found);
if (!found)
{
ereport(raise ? ERROR : LOG,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("cannot find resource group with Oid %u in shared memory",
groupId)));
return NULL;
}
Assert(entry->index < pResGroupControl->nGroups);
return &pResGroupControl->groups[entry->index];
}
/*
* groupHashRemove -- remove the group for a given oid.
*
* If the group cannot be found then an exception is thrown.
*
* Notes
* The resource group lightweight lock (ResGroupLock) *must* be held for
* this operation.
*/
static ResGroupData *
groupHashRemove(Oid groupId)
{
bool found;
ResGroupHashEntry *entry;
ResGroupData *group;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
entry = (ResGroupHashEntry*)hash_search(pResGroupControl->htbl,
(void *) &groupId,
HASH_REMOVE,
&found);
if (!found)
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("cannot find resource group with Oid %u in shared memory to remove",
groupId)));
group = &pResGroupControl->groups[entry->index];
return group;
}
/* Process exit without waiting for slot or received SIGTERM */
static void
AtProcExit_ResGroup(int code, Datum arg)
{
groupWaitCancel(false);
}
/*
* Handle the interrupt cases when waiting on the queue
*
* The proc may wait on the queue for a slot, or wait for the
* DROP transaction to finish. In the first case, at the same time
* we get interrupted (SIGINT or SIGTERM), we could have been
* granted a slot or not. In the second case, there's no running
* transaction in the group. If the DROP transaction is finished
* (commit or abort) at the same time as we get interrupted,
* MyProc should have been removed from the wait queue, and the
* ResGroupData entry may have been removed if the DROP is committed.
*/
static void
groupWaitCancel(bool isMoveQuery)
{
ResGroupData *group;
ResGroupSlotData *slot;
/* Nothing to do if we weren't waiting on a group */
if (groupAwaited == NULL)
return;
pgstat_report_wait_end();
groupWaitEnd = GetCurrentTimestamp();
Assert(!selfIsAssigned() || isMoveQuery);
group = groupAwaited;
/* We are sure to be interrupted in the for loop of waitOnGroup now */
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
AssertImply(procIsWaiting(MyProc),
groupWaitQueueFind(group, MyProc));
if (procIsWaiting(MyProc))
{
/*
* Still waiting on the queue when get interrupted, remove
* myself from the queue
*/
Assert(!groupWaitQueueIsEmpty(group));
groupWaitQueueErase(group, MyProc);
addTotalQueueDuration(group);
}
else if (MyProc->resSlot != NULL)
{
/* Woken up by a slot holder */
Assert(!procIsWaiting(MyProc));
/* First complete the slot's transfer from MyProc to self */
slot = MyProc->resSlot;
MyProc->resSlot = NULL;
/*
* Similar as groupReleaseSlot(), how many pending queries to
* wake up depends on how many slots we can get.
*/
groupReleaseSlot(group, slot, false);
/*
* Reset resource group slot for current session. Note MySessionState
* could be reset as NULL in shmem_exit() before.
*/
sessionResetSlot(slot);
group->totalExecuted++;
addTotalQueueDuration(group);
}
else
{
/*
* The transaction of DROP RESOURCE GROUP is finished,
* groupAcquireSlot will do the retry.
*
* The resource group pointed by self->group may have
* already been removed by here.
*/
Assert(!procIsWaiting(MyProc));
}
LWLockRelease(ResGroupLock);
groupAwaited = NULL;
}
/*
* Validate the consistency of the resgroup information in self.
*
* This function checks the consistency of (group & groupId).
*/
static void
selfValidateResGroupInfo(void)
{
AssertImply(self->groupId != InvalidOid,
self->group != NULL);
}
/*
* Check whether self is assigned.
*
* This is mostly equal to (selfHasSlot() && selfHasGroup()),
* however this function requires the slot and group to be in
* a consistent status, they must both be set or unset,
* so calling this function during the assign/unassign/switch process
* might cause an error, use with caution.
*
* Even selfIsAssigned() is true it doesn't mean the assign/switch
* process is completely done, for example the memory accounting
* information might not been updated yet.
*
* This function doesn't check whether the assigned resgroup
* is valid or dropped.
*/
static bool
selfIsAssigned(void)
{
selfValidateResGroupInfo();
AssertImply(self->group == NULL,
self->slot == NULL);
AssertImply(self->group != NULL,
self->slot != NULL);
return self->groupId != InvalidOid;
}
#ifdef USE_ASSERT_CHECKING
/*
* Check whether self has been set a slot.
*
* We don't check whether a resgroup is set or not.
*/
static bool
selfHasSlot(void)
{
return self->slot != NULL;
}
/*
* Check whether self has been set a resgroup.
*
* Consistency will be checked on the groupId and group pointer.
*
* We don't check whether the resgroup is valid or dropped.
*
* We don't check whether a slot is set or not.
*/
static bool
selfHasGroup(void)
{
AssertImply(self->groupId != InvalidOid,
self->group != NULL);
return self->groupId != InvalidOid;
}
#endif /* USE_ASSERT_CHECKING */
/*
* Set both the groupId and the group pointer in self.
*
* The group must not be dropped.
*
* Some over limitations are put to force the caller understand
* what it's doing and what it wants:
* - self must has not been set a resgroup;
*/
static void
selfSetGroup(ResGroupData *group)
{
Assert(!selfIsAssigned());
Assert(groupIsNotDropped(group));
self->group = group;
self->groupId = group->groupId;
}
/*
* Unset both the groupId and the resgroup pointer in self.
*
* Some over limitations are put to force the caller understand
* what it's doing and what it wants:
* - self must has been set a resgroup;
*/
static void
selfUnsetGroup(void)
{
Assert(selfHasGroup());
Assert(!selfHasSlot());
self->groupId = InvalidOid;
self->group = NULL;
}
/*
* Set the slot pointer in self.
*
* Some over limitations are put to force the caller understand
* what it's doing and what it wants:
* - self must has been set a resgroup;
* - self must has not been set a slot before set;
*/
static void
selfSetSlot(ResGroupSlotData *slot)
{
Assert(selfHasGroup());
Assert(!selfHasSlot());
Assert(slotIsInUse(slot));
self->slot = slot;
}
/*
* Unset the slot pointer in self.
*
* Some over limitations are put to force the caller understand
* what it's doing and what it wants:
* - self must has been set a resgroup;
* - self must has been set a slot before unset;
*/
static void
selfUnsetSlot(void)
{
Assert(selfHasGroup());
Assert(selfHasSlot());
self->slot = NULL;
}
/*
* Check whether proc is in some resgroup's wait queue.
*
* The LWLock is not required.
*
* This function does not check whether proc is in a specific resgroup's
* wait queue. To make this check use groupWaitQueueFind().
*/
static bool
procIsWaiting(const PGPROC *proc)
{
/*------
* The typical asm instructions fow below C operation can be like this:
* ( gcc 4.8.5-11, x86_64-redhat-linux, -O0 )
*
* mov -0x8(%rbp),%rax ; load proc
* mov 0x8(%rax),%rax ; load proc->links.next
* cmp $0,%rax ; compare with NULL
* setne %al ; store the result
*
* The operation is atomic, so a lock is not required here.
*------
*/
return proc->links.next != NULL;
}
/*
* Notify a proc it's woken up.
*/
static void
procWakeup(PGPROC *proc)
{
Assert(!procIsWaiting(proc));
SetLatch(&proc->procLatch);
}
#ifdef USE_ASSERT_CHECKING
/*
* Validate a slot's attributes.
*/
static void
slotValidate(const ResGroupSlotData *slot)
{
Assert(slot != NULL);
/* further checks whether the slot is freed or idle */
if (slot->groupId == InvalidOid)
{
Assert(slot->nProcs == 0);
}
else
{
Assert(!slotIsInFreelist(slot));
/*
* Entrydb process can have different self and session slots at the
* time of moving to another group.
*/
AssertImply(Gp_role == GP_ROLE_EXECUTE && !IS_QUERY_DISPATCHER(),
slot == sessionGetSlot());
}
}
/*
* A slot is in use if it has a valid groupId.
*/
static bool
slotIsInUse(const ResGroupSlotData *slot)
{
slotValidate(slot);
return slot->groupId != InvalidOid;
}
static bool
slotIsInFreelist(const ResGroupSlotData *slot)
{
ResGroupSlotData *current;
current = pResGroupControl->freeSlot;
for ( ; current != NULL; current = current->next)
{
if (current == slot)
return true;
}
return false;
}
#endif /* USE_ASSERT_CHECKING */
/*
* Get the slot id of the given slot.
*
* Return InvalidSlotId if slot is NULL.
*/
static int
slotGetId(const ResGroupSlotData *slot)
{
int slotId;
if (slot == NULL)
return InvalidSlotId;
slotId = slot - pResGroupControl->slots;
Assert(slotId >= 0);
Assert(slotId < RESGROUP_MAX_SLOTS);
return slotId;
}
static void
lockResGroupForDrop(ResGroupData *group)
{
if (group->lockedForDrop)
return;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(Gp_role == GP_ROLE_DISPATCH);
Assert(group->nRunning == 0);
Assert(group->nRunningBypassed == 0);
group->lockedForDrop = true;
}
static void
unlockResGroupForDrop(ResGroupData *group)
{
if (!group->lockedForDrop)
return;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(Gp_role == GP_ROLE_DISPATCH);
Assert(group->nRunning == 0);
Assert(group->nRunningBypassed == 0);
group->lockedForDrop = false;
}
#ifdef USE_ASSERT_CHECKING
/*
* Check whether a resgroup is dropped.
*
* A dropped resgroup has groupId == InvalidOid,
* however there is also the case that the resgroup is first dropped
* then the shm struct is reused by another newly created resgroup,
* in such a case the groupId is not InvalidOid but the original
* resgroup does is dropped.
*
* So this function is not always reliable, use with caution.
*/
static bool
groupIsNotDropped(const ResGroupData *group)
{
return group
&& group->groupId != InvalidOid;
}
#endif /* USE_ASSERT_CHECKING */
/*
* Validate the consistency of the resgroup wait queue.
*/
static void
groupWaitQueueValidate(const ResGroupData *group)
{
const dclist_head *waitQueue;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
waitQueue = &group->waitProcs;
if (gp_resgroup_debug_wait_queue)
{
if (waitQueue->count == 0)
{
if (waitQueue->dlist.head.next != &waitQueue->dlist.head ||
waitQueue->dlist.head.prev != &waitQueue->dlist.head)
elog(PANIC, "resource group wait queue is corrupted");
}
else
{
PGPROC *nextProc = dclist_container(PGPROC, links, waitQueue->dlist.head.next);
PGPROC *prevProc = dclist_container(PGPROC, links, waitQueue->dlist.head.prev);
if (!nextProc->mppIsWriter ||
!prevProc->mppIsWriter ||
nextProc->links.prev != &waitQueue->dlist.head ||
prevProc->links.next != &waitQueue->dlist.head)
elog(PANIC, "resource group wait queue is corrupted");
}
return;
}
AssertImply(waitQueue->count == 0,
waitQueue->dlist.head.next == &waitQueue->dlist.head &&
waitQueue->dlist.head.prev == &waitQueue->dlist.head);
}
static void
groupWaitProcValidate(PGPROC *proc, dclist_head *head)
{
PGPROC *nextProc = (PGPROC *)proc->links.next;
PGPROC *prevProc = (PGPROC *)proc->links.prev;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
if (!gp_resgroup_debug_wait_queue)
return;
if (!proc->mppIsWriter ||
((dlist_node *)nextProc != &head->dlist.head && !nextProc->mppIsWriter) ||
((dlist_node *)prevProc != &head->dlist.head && !prevProc->mppIsWriter) ||
nextProc->links.prev != &proc->links ||
prevProc->links.next != &proc->links)
elog(PANIC, "resource group wait queue is corrupted");
return;
}
/*
* Push a proc to the resgroup wait queue.
*/
static void
groupWaitQueuePush(ResGroupData *group, PGPROC *proc)
{
dclist_head *waitQueue;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(!procIsWaiting(proc));
Assert(proc->resSlot == NULL);
groupWaitQueueValidate(group);
waitQueue = &group->waitProcs;
dclist_push_tail(waitQueue, &proc->links);
groupWaitProcValidate(proc, waitQueue);
Assert(groupWaitQueueFind(group, proc));
}
/*
* Pop the top proc from the resgroup wait queue and return it.
*/
static PGPROC *
groupWaitQueuePop(ResGroupData *group)
{
dclist_head *waitQueue;
PGPROC *proc;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(!groupWaitQueueIsEmpty(group));
groupWaitQueueValidate(group);
waitQueue = &group->waitProcs;
proc = dclist_head_element(PGPROC, links, waitQueue);
groupWaitProcValidate(proc, waitQueue);
Assert(groupWaitQueueFind(group, proc));
Assert(proc->resSlot == NULL);
dclist_delete_from_thoroughly(waitQueue, &proc->links);
return proc;
}
/*
* Erase proc from the resgroup wait queue.
*/
static void
groupWaitQueueErase(ResGroupData *group, PGPROC *proc)
{
dclist_head *waitQueue;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(!groupWaitQueueIsEmpty(group));
Assert(groupWaitQueueFind(group, proc));
Assert(proc->resSlot == NULL);
groupWaitQueueValidate(group);
waitQueue = &group->waitProcs;
groupWaitProcValidate(proc, waitQueue);
dclist_delete_from_thoroughly(waitQueue, &proc->links);
}
/*
* Check whether the resgroup wait queue is empty.
*/
static bool
groupWaitQueueIsEmpty(const ResGroupData *group)
{
const dclist_head *waitQueue;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
groupWaitQueueValidate(group);
waitQueue = &group->waitProcs;
return waitQueue->count == 0;
}
#ifdef USE_ASSERT_CHECKING
/*
* Find proc in group's wait queue.
*
* Return true if found or false if not found.
*
* This functions is expensive so should only be used in debugging logic,
* in most cases procIsWaiting() shall be used.
*/
static bool
groupWaitQueueFind(ResGroupData *group, const PGPROC *proc)
{
dclist_head *waitQueue;
Size offset;
dlist_iter proc_iter;
Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
groupWaitQueueValidate(group);
waitQueue = &group->waitProcs;
offset = offsetof(PGPROC, links);
dclist_foreach(proc_iter, waitQueue)
{
PGPROC *queued_proc = dlist_container(PGPROC, links, proc_iter.cur);
if (queued_proc == proc)
{
Assert(procIsWaiting(proc));
return true;
}
}
return false;
}
#endif/* USE_ASSERT_CHECKING */
/*
* Walk through the raw expression tree, if there is a RangeVar without
* `pg_catalog` prefix, terminate the process immediately to save the cpu
* resource.
*/
static bool
checkBypassWalker(Node *node, void *context)
{
bool *bypass = context;
if (node == NULL)
return false;
if (IsA(node, RangeVar))
{
RangeVar *from = (RangeVar *) node;
if (from->schemaname == NULL ||
strcmp(from->schemaname, "pg_catalog") != 0)
{
*bypass = false;
return true;
}
else
{
/*
* Make sure there is at least one RangeVar
*/
*bypass = true;
}
}
return raw_expression_tree_walker(node, checkBypassWalker, context);
}
static bool
shouldBypassSelectQuery(Node *node)
{
bool catalog_bypass = false;
if (gp_resource_group_bypass_catalog_query)
raw_expression_tree_walker(node, checkBypassWalker, &catalog_bypass);
return catalog_bypass;
}
/*
* Parse the query and check if this query should
* bypass the management of resource group.
*
* Currently, only SET/RESET/SHOW command and SELECT with only catalog tables
* can be bypassed
*/
static bool
shouldBypassQuery(const char *query_string)
{
MemoryContext oldcontext = NULL;
MemoryContext tmpcontext = NULL;
List *parsetree_list;
ListCell *parsetree_item;
Node *parsetree;
bool bypass;
if (gp_resource_group_bypass)
return true;
if (!query_string)
return false;
/*
* Switch to appropriate context for constructing parsetrees.
*
* It is possible that MessageContext is NULL, for example in a bgworker:
*
* debug_query_string = "select 1";
* StartTransactionCommand();
*
* This is not the recommended order of setting debug_query_string, but we
* should not put a constraint on the order by resource group anyway.
*/
if (MessageContext)
oldcontext = MemoryContextSwitchTo(MessageContext);
else
{
/* Create a temp memory context to prevent memory leaks */
tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
"resgroup temporary context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
oldcontext = MemoryContextSwitchTo(tmpcontext);
}
parsetree_list = pg_parse_query(query_string);
MemoryContextSwitchTo(oldcontext);
if (parsetree_list == NULL)
return false;
/* Only bypass SET/RESET/SHOW command and SELECT with only catalog tables
* for now */
bypass = true;
foreach(parsetree_item, parsetree_list)
{
parsetree = (Node *) lfirst(parsetree_item);
if (nodeTag(parsetree) == T_RawStmt)
parsetree = ((RawStmt *)parsetree)->stmt;
if (IsA(parsetree, SelectStmt))
{
if (!shouldBypassSelectQuery(parsetree))
{
bypass = false;
break;
}
}
else if (nodeTag(parsetree) != T_VariableSetStmt &&
nodeTag(parsetree) != T_VariableShowStmt)
{
bypass = false;
break;
}
}
list_free_deep(parsetree_list);
if (tmpcontext)
MemoryContextDelete(tmpcontext);
return bypass;
}
/*
* Check whether the resource group has been dropped.
*/
static bool
groupIsDropped(ResGroupInfo *pGroupInfo)
{
Assert(pGroupInfo != NULL);
Assert(pGroupInfo->group != NULL);
return pGroupInfo->group->groupId != pGroupInfo->groupId;
}
/*
* Debug helper functions
*/
void
ResGroupDumpInfo(StringInfo str)
{
int i;
if (!IsResGroupEnabled())
return;
appendStringInfo(str, "{\"segid\":%d,", GpIdentity.segindex);
/* dump fields in pResGroupControl. */
appendStringInfo(str, "\"segmentsOnMaster\":%d,", pResGroupControl->segmentsOnMaster);
appendStringInfo(str, "\"loaded\":%s,", pResGroupControl->loaded ? "true" : "false");
/* dump each group */
appendStringInfo(str, "\"groups\":[");
for (i = 0; i < pResGroupControl->nGroups; i++)
{
resgroupDumpGroup(str, &pResGroupControl->groups[i]);
if (i < pResGroupControl->nGroups - 1)
appendStringInfo(str, ",");
}
appendStringInfo(str, "],");
/* dump slots */
resgroupDumpSlots(str);
appendStringInfo(str, ",");
/* dump freeslot links */
resgroupDumpFreeSlots(str);
appendStringInfo(str, "}");
}
static void
resgroupDumpGroup(StringInfo str, ResGroupData *group)
{
appendStringInfo(str, "{");
appendStringInfo(str, "\"group_id\":%u,", group->groupId);
appendStringInfo(str, "\"nRunning\":%d,", group->nRunning);
appendStringInfo(str, "\"nRunningBypassed\":%d,", group->nRunningBypassed);
appendStringInfo(str, "\"locked_for_drop\":%d,", group->lockedForDrop);
resgroupDumpWaitQueue(str, &group->waitProcs);
resgroupDumpCaps(str, (ResGroupCap*)(&group->caps));
appendStringInfo(str, "}");
}
static void
resgroupDumpWaitQueue(StringInfo str, dclist_head *queue)
{
dlist_iter iter;
bool first = true;
appendStringInfo(str, "\"wait_queue\":{");
appendStringInfo(str, "\"wait_queue_size\":%d,", queue->count);
appendStringInfo(str, "\"wait_queue_content\":[");
dclist_foreach(iter, queue)
{
PGPROC *proc = dlist_container(PGPROC, links, iter.cur);
if (!first)
appendStringInfo(str, ",");
first = false;
appendStringInfo(str, "{");
appendStringInfo(str, "\"pid\":%d,", proc->pid);
appendStringInfo(str, "\"resWaiting\":%s,",
procIsWaiting(proc) ? "true" : "false");
appendStringInfo(str, "\"resSlot\":%d", slotGetId(proc->resSlot));
appendStringInfo(str, "}");
}
appendStringInfo(str, "]},");
}
static void
resgroupDumpCaps(StringInfo str, ResGroupCap *caps)
{
int i;
appendStringInfo(str, "\"caps\":[");
for (i = 1; i < RESGROUP_LIMIT_TYPE_COUNT; i++)
{
appendStringInfo(str, "{\"%d\":%d}", i, caps[i]);
if (i < RESGROUP_LIMIT_TYPE_COUNT - 1)
appendStringInfo(str, ",");
}
appendStringInfo(str, "]");
}
static void
resgroupDumpSlots(StringInfo str)
{
int i;
ResGroupSlotData* slot;
appendStringInfo(str, "\"slots\":[");
for (i = 0; i < RESGROUP_MAX_SLOTS; i++)
{
slot = &(pResGroupControl->slots[i]);
appendStringInfo(str, "{");
appendStringInfo(str, "\"slotId\":%d,", i);
appendStringInfo(str, "\"groupId\":%u,", slot->groupId);
appendStringInfo(str, "\"nProcs\":%d,", slot->nProcs);
appendStringInfo(str, "\"next\":%d,", slotGetId(slot->next));
resgroupDumpCaps(str, (ResGroupCap*)(&slot->caps));
appendStringInfo(str, "}");
if (i < RESGROUP_MAX_SLOTS - 1)
appendStringInfo(str, ",");
}
appendStringInfo(str, "]");
}
static void
resgroupDumpFreeSlots(StringInfo str)
{
ResGroupSlotData* head;
head = pResGroupControl->freeSlot;
appendStringInfo(str, "\"free_slot_list\":{");
appendStringInfo(str, "\"head\":%d", slotGetId(head));
appendStringInfo(str, "}");
}
/*
* Set resource group slot for current session.
*/
static void
sessionSetSlot(ResGroupSlotData *slot)
{
Assert(slot != NULL);
/*
* Previously, we had an assertion, that MySessionState->resGroupSlot
* should be NULL here. There is a case, when we want to move processes
* from one group to another. We got assertion error on main process,
* if entrydb process not called UnassignResGroup() yet (and vice versa).
* Next call to UnassignResGroup() (by main or entrydb process) will free
* slot and it's OK, but here we want to set new slot to session, so we
* changed assertion.
*/
AssertImply((Gp_role == GP_ROLE_EXECUTE && !IS_QUERY_DISPATCHER()),
MySessionState->resGroupSlot == NULL);
/*
* SessionStateLock is required since runaway detector will traverse
* the current session array and check corresponding resGroupSlot with
* shared lock on SessionStateLock.
*/
LWLockAcquire(SessionStateLock, LW_EXCLUSIVE);
MySessionState->resGroupSlot = (void *) slot;
LWLockRelease(SessionStateLock);
}
/*
* Reset resource group slot for current session to NULL, check we resetting
* correct slot
*/
static void
sessionResetSlot(ResGroupSlotData *slot)
{
/*
* SessionStateLock is required since runaway detector will traverse
* the current session array and check corresponding resGroupSlot with
* shared lock on SessionStateLock.
*/
if (MySessionState != NULL)
{
LWLockAcquire(SessionStateLock, LW_EXCLUSIVE);
/* If the slot is ours, set resGroupSlot to NULL. */
if (MySessionState->resGroupSlot == slot)
MySessionState->resGroupSlot = NULL;
LWLockRelease(SessionStateLock);
}
}
/*
* Get resource group slot of current session.
*/
static ResGroupSlotData *
sessionGetSlot(void)
{
if (MySessionState == NULL)
return NULL;
else
return (ResGroupSlotData *) MySessionState->resGroupSlot;
}
/*
* Parse cpuset to bitset
* If cpuset is "1,3-5", Bitmapset 1,3,4,5 are set.
*/
Bitmapset *
CpusetToBitset(const char *cpuset, int len)
{
int pos = 0, num1 = 0, num2 = 0;
enum Status
{
Initial,
Begin,
Number,
Interval,
Number2
};
enum Status s = Initial;
Bitmapset *bms = NULL;
if (cpuset == NULL || len <= 0)
return bms;
while (pos < len && cpuset[pos])
{
char c = cpuset[pos++];
if (c == ',')
{
if (s == Initial || s == Begin)
{
continue;
}
else if (s == Interval)
{
goto error_logic;
}
else if (s == Number)
{
bms = bms_union(bms, bms_make_singleton(num1));
num1 = 0;
s = Begin;
}
else if (s == Number2)
{
if (num1 > num2)
{
goto error_logic;
}
for (int i = num1; i <= num2; ++i)
{
bms = bms_union(bms, bms_make_singleton(i));
}
num1 = num2 = 0;
s = Begin;
}
}
else if (c == '-')
{
if (s != Number)
{
goto error_logic;
}
s = Interval;
}
else if (isdigit(c))
{
if (s == Initial || s == Begin)
{
s = Number;
}
else if (s == Interval)
{
s = Number2;
}
if (s == Number)
{
num1 = num1 * 10 + (c - '0');
}
else if (s == Number2)
{
num2 = num2 * 10 + (c - '0');
}
}
else if (c == '\n')
{
break;
}
else
{
goto error_logic;
}
}
if (s == Number)
{
bms = bms_union(bms, bms_make_singleton(num1));
}
else if (s == Number2)
{
if (num1 > num2)
{
goto error_logic;
}
for (int i = num1; i <= num2; ++i)
{
bms = bms_union(bms, bms_make_singleton(i));
}
}
else if (s == Initial || s == Interval)
{
goto error_logic;
}
return bms;
error_logic:
return NULL;
}
/*
* Check the value of cpuset is empty or not
*/
bool CpusetIsEmpty(const char *cpuset)
{
return strcmp(cpuset, DefaultCpuset) == 0;
}
/*
* Set cpuset value to default value -1.
*/
void SetCpusetEmpty(char *cpuset, int cpusetSize)
{
strlcpy(cpuset, DefaultCpuset, cpusetSize);
}
/*
* Transform non-empty bitset to cpuset.
*
* This function does not check the cpu cores are available or not.
*/
void
BitsetToCpuset(const Bitmapset *bms,
char *cpuset,
int cpusetSize)
{
int len = 0;
int lastContinuousBit = -1;
int intervalStart = -1;
int num;
char buffer[32] = {0};
Assert(!bms_is_empty(bms));
cpuset[0] = '\0';
num = -1;
while ((num = bms_next_member(bms, num)) >= 0)
{
if (lastContinuousBit == -1)
{
intervalStart = lastContinuousBit = num;
}
else
{
if (num != lastContinuousBit + 1)
{
if (intervalStart == lastContinuousBit)
{
snprintf(buffer, sizeof(buffer), "%d,", intervalStart);
}
else
{
snprintf(buffer, sizeof(buffer), "%d-%d,", intervalStart, lastContinuousBit);
}
if (len + strlen(buffer) >= cpusetSize)
{
Assert(cpuset[0]);
return ;
}
strcpy(cpuset + len, buffer);
len += strlen(buffer);
intervalStart = lastContinuousBit = num;
}
else
{
lastContinuousBit = num;
}
}
}
if (intervalStart != -1)
{
if (intervalStart == lastContinuousBit)
{
snprintf(buffer, sizeof(buffer), "%d", intervalStart);
}
else
{
snprintf(buffer, sizeof(buffer), "%d-%d", intervalStart, lastContinuousBit);
}
if (len + strlen(buffer) >= cpusetSize)
{
Assert(cpuset[0]);
return ;
}
strcpy(cpuset + len, buffer);
len += strlen(buffer);
}
else
{
/* bms is non-empty, so it should never reach here */
pg_unreachable();
}
}
/*
* calculate the result of cpuset1 plus/minus cpuset2 and save in place
* if sub is true, the operation is minus
* if sub is false, the operation is plus
*/
void
cpusetOperation(char *cpuset1, const char *cpuset2,
int len, bool sub)
{
char cpuset[MaxCpuSetLength] = {0};
int defaultCore = -1;
Bitmapset *bms1 = CpusetToBitset(cpuset1, len);
Bitmapset *bms2 = CpusetToBitset(cpuset2, len);
if (sub)
{
bms1 = bms_del_members(bms1, bms2);
}
else
{
bms1 = bms_add_members(bms1, bms2);
}
if (!bms_is_empty(bms1))
{
BitsetToCpuset(bms1, cpuset1, len);
}
else
{
/* Get cpuset from cpuset/gpdb, and transform it into bitset */
cgroupOpsRoutine->getcpuset(CGROUP_ROOT_ID, cpuset, MaxCpuSetLength);
Bitmapset *bmsDefault = CpusetToBitset(cpuset, MaxCpuSetLength);
/* get the minimum core number, in case of the zero core is not exist */
defaultCore = bms_next_member(bmsDefault, -1);
Assert(defaultCore >= 0);
snprintf(cpuset1, MaxCpuSetLength, "%d", defaultCore);
}
}
/*
* union cpuset2 to cpuset1
*/
void
CpusetUnion(char *cpuset1, const char *cpuset2, int len)
{
cpusetOperation(cpuset1, cpuset2, len, false);
}
/*
* subtract cpuset2 from cpuset1
*/
void
CpusetDifference(char *cpuset1, const char *cpuset2, int len)
{
cpusetOperation(cpuset1, cpuset2, len, true);
}
/*
* ensure that cpuset is available.
*/
bool
EnsureCpusetIsAvailable(int elevel)
{
if (!IsResGroupActivated())
{
ereport(elevel,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("resource group must be enabled to use cpuset feature")));
return false;
}
if (!gp_resource_group_enable_cgroup_cpuset)
{
ereport(elevel,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cgroup is not properly configured to use the cpuset feature"),
errhint("Extra cgroup configurations are required to enable this feature, "
"please refer to the Cloudberry Documentations for details")));
return false;
}
return true;
}
/*
* Return group id for a session
*/
Oid
SessionGetResGroupId(SessionState *session)
{
ResGroupSlotData *sessionSlot = (ResGroupSlotData *)session->resGroupSlot;
if (sessionSlot)
return sessionSlot->groupId;
else
return InvalidOid;
}
/*
* move a proc to a resource group
*/
void
HandleMoveResourceGroup(void)
{
ResGroupSlotData *slot;
ResGroupData *group;
ResGroupData *oldGroup;
Oid groupId;
pid_t callerPid;
Assert(Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE);
/* transaction has finished */
if (!IsTransactionState() || !selfIsAssigned())
{
if (Gp_role == GP_ROLE_DISPATCH)
{
SpinLockAcquire(&MyProc->movetoMutex);
/*
* setting movetoGroupId to InvalidOid alone without setting
* movetoResSlot to NULL means target process tried to handle, but
* can't do anything with a command
*/
MyProc->movetoGroupId = InvalidOid;
callerPid = MyProc->movetoCallerPid;
SpinLockRelease(&MyProc->movetoMutex);
/* notify initiator, current command is irrelevant */
if (callerPid != InvalidPid)
ResGroupMoveNotifyInitiator(callerPid);
}
return;
}
if (Gp_role == GP_ROLE_DISPATCH)
{
SIMPLE_FAULT_INJECTOR("resource_group_move_handler_before_qd_control");
SpinLockAcquire(&MyProc->movetoMutex);
slot = (ResGroupSlotData *) MyProc->movetoResSlot;
groupId = MyProc->movetoGroupId;
callerPid = MyProc->movetoCallerPid;
/* set to NULL to mark we got slot control */
MyProc->movetoResSlot = NULL;
/* set to InvalidOid to mark we handling the command */
MyProc->movetoGroupId = InvalidOid;
/*
* Don't clean movetoCallerPid. It guards us from another initiators,
* which may overwrite moveto* params.
*/
SpinLockRelease(&MyProc->movetoMutex);
if (!slot)
{
/* moving command is irrelevant */
return;
}
/*
* starting from this point, all control over slot should be done
* here, from target process
*/
Assert(groupId != InvalidOid);
SIMPLE_FAULT_INJECTOR("resource_group_move_handler_after_qd_control");
ResGroupMoveNotifyInitiator(callerPid);
/* unassign the old resource group and release the old slot */
UnassignResGroup();
sessionSetSlot(slot);
/* Add proc memory accounting info into group and slot */
group = slot->group;
selfAttachResGroup(group, slot);
/* Init self */
self->caps = slot->caps;
/*
* You may say it's ugly to notify entrydb process here, but not in
* initiator process, but we want to be sure slot was actually
* assigned to session using sessionSetSlot(). We can't do much inside
* one spinlock. Especially, we can't work with multiple LWLocks
* inside of it. So, to keep the solution simple and plain, we decided
* to signal entrydb process here, inside of target process handler.
*/
(void) ResGroupMoveSignalTarget(MyProc->mppSessionId,
NULL, groupId, true);
/*
* Add into cgroup. On any exception slot will be freed by the end of
* transaction.
*/
cgroupOpsRoutine->attachcgroup(self->groupId, MyProcPid,
self->caps.cpuMaxPercent == CPU_MAX_PERCENT_DISABLED);
pgstat_report_resgroup(self->groupId);
}
/*
* Move entrydb process. This is very similar to moving of target process,
* but without setting session level slot, which was already set by
* target.
*/
else if (Gp_role == GP_ROLE_EXECUTE && IS_QUERY_DISPATCHER())
{
SpinLockAcquire(&MyProc->movetoMutex);
groupId = MyProc->movetoGroupId;
MyProc->movetoGroupId = InvalidOid;
SpinLockRelease(&MyProc->movetoMutex);
/*
* The right session-level slot was set by the dispatcher's part of
* handler (above).
*/
slot = sessionGetSlot();
Assert(slot != NULL);
Assert(slot->groupId == groupId);
group = slot->group;
/*
* But before we'll attach new slot to current entrydb process, we
* need to unassign all from 'self'.
*/
UnassignResGroup();
/* And now, attach it and increment all counters we need. */
selfAttachResGroup(group, slot);
self->caps = group->caps;
/* finally we can say we are in a valid resgroup */
Assert(selfIsAssigned());
/* Add into cgroup */
cgroupOpsRoutine->attachcgroup(self->groupId, MyProcPid,
self->caps.cpuMaxPercent == CPU_MAX_PERCENT_DISABLED);
}
/*
* Move segment's executor. Use simple manual counters manipulation. We
* can't call same complex designed for coordinator functions like above.
*/
else if (Gp_role == GP_ROLE_EXECUTE && !IS_QUERY_DISPATCHER())
{
SpinLockAcquire(&MyProc->movetoMutex);
groupId = MyProc->movetoGroupId;
MyProc->movetoGroupId = InvalidOid;
SpinLockRelease(&MyProc->movetoMutex);
slot = sessionGetSlot();
Assert(slot != NULL);
selfUnsetSlot();
selfUnsetGroup();
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
group = groupHashFind(groupId, true);
oldGroup = slot->group;
Assert(group != NULL);
Assert(oldGroup != NULL);
/*
* move the slot memory to the new group, only do it once if there're
* more than once slice.
*/
if (slot->groupId != groupId)
{
oldGroup->nRunning--;
slot->groupId = groupId;
slot->group = group;
slot->caps = group->caps;
group->nRunning++;
}
LWLockRelease(ResGroupLock);
selfSetGroup(group);
selfSetSlot(slot);
self->caps = group->caps;
/* finally we can say we are in a valid resgroup */
Assert(selfIsAssigned());
/* Add into cgroup */
cgroupOpsRoutine->attachcgroup(self->groupId, MyProcPid,
self->caps.cpuMaxPercent == CPU_MAX_PERCENT_DISABLED);
}
}
/*
* Try to give away all slot control to target process.
*/
static void
resGroupGiveSlotAway(int sessionId, ResGroupSlotData ** slot, Oid groupId)
{
long timeout;
int64 curTime;
int64 waitStart;
int latchRes;
bool clean = false;
bool res = false;
SIMPLE_FAULT_INJECTOR("resource_group_give_away_begin");
if (!ResGroupMoveSignalTarget(sessionId, *slot, groupId, false))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
(errmsg("cannot send signal to process"))));
waitStart = GetCurrentTimestamp();
for (;;)
{
/*
* In an infinite loop, call ResGroupMoveCheckTargetReady() to check whether
* all target processes have received signal of RG move.
* If we have hit gp_resource_group_move_timeout, try to cancel the move
* operation (no matter was target handled signal) and clean remained stuffs.
*/
curTime = GetCurrentTimestamp();
timeout = gp_resource_group_move_timeout - (curTime - waitStart) / 1000;
if (timeout > 0)
{
PG_TRY();
{
SIMPLE_FAULT_INJECTOR("resource_group_give_away_wait_latch");
/*
* do check here to clean all target's moveto* params in case
* of interruption or any exception
*/
CHECK_FOR_INTERRUPTS();
latchRes = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
timeout,
PG_WAIT_RESOURCE_GROUP);
if (latchRes & WL_POSTMASTER_DEATH)
elog(ERROR,
"got WL_POSTMASTER_DEATH waiting on latch; exiting...");
}
PG_CATCH();
{
clean = true;
ResGroupMoveCheckTargetReady(sessionId, &clean, &res);
if (res)
{
/*
* clean slot variable, because we don't need to touch it
* in current process as control is on the target side
*/
*slot = NULL;
ereport(WARNING,
(errmsg("got exception, but slot control is on the target process side"),
errhint("QEs weren't moved. They'll be moved by the next command dispatched in the target transaction, if any.")));
}
PG_RE_THROW();
}
PG_END_TRY();
}
else
latchRes = WL_TIMEOUT;
SIMPLE_FAULT_INJECTOR("resource_group_give_away_after_latch");
clean = (latchRes & WL_TIMEOUT);
ResGroupMoveCheckTargetReady(sessionId, &clean, &res);
if (clean)
break;
ResetLatch(&MyProc->procLatch);
}
if (!res)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
(errmsg("target process failed to move to a new group"))));
}
void
ResGroupMoveQuery(int sessionId, Oid groupId, const char *groupName)
{
ResGroupInfo groupInfo;
ResGroupData *group;
ResGroupSlotData *slot;
char *cmd;
Assert(pResGroupControl != NULL);
Assert(pResGroupControl->segmentsOnMaster > 0);
Assert(Gp_role == GP_ROLE_DISPATCH);
LWLockAcquire(ResGroupLock, LW_SHARED);
group = groupHashFind(groupId, false);
LWLockRelease(ResGroupLock);
if (!group)
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
(errmsg("invalid resource group id: %u", groupId))));
}
groupInfo.group = group;
groupInfo.groupId = groupId;
slot = groupAcquireSlot(&groupInfo, true);
if (slot == NULL)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
(errmsg("cannot get slot in resource group %u", groupId))));
PG_TRY();
{
resGroupGiveSlotAway(sessionId, &slot, groupId);
}
PG_CATCH();
{
/*
* There can be exceptional situations, when slot is already on the
* target side. Release slot only if available.
*/
if (slot)
{
LWLockAcquire(ResGroupLock, LW_EXCLUSIVE);
groupReleaseSlot(group, slot, true);
LWLockRelease(ResGroupLock);
}
PG_RE_THROW();
}
PG_END_TRY();
/*
* starting from this point, all slot control should be done from target
* process, so we don't need to release it here if something will go wrong
*/
cmd = psprintf("SELECT pg_resgroup_move_query(%d, %s)",
sessionId,
quote_literal_cstr(groupName));
CdbDispatchCommand(cmd, 0, NULL);
}
/*
* get resource group id by session id
*/
Oid
ResGroupGetGroupIdBySessionId(int sessionId)
{
Oid groupId = InvalidOid;
SessionState *curSessionState;
LWLockAcquire(SessionStateLock, LW_SHARED);
curSessionState = AllSessionStateEntries->usedList;
while (curSessionState != NULL)
{
if (curSessionState->sessionId == sessionId)
{
ResGroupSlotData *slot = (ResGroupSlotData *)curSessionState->resGroupSlot;
if (slot != NULL)
groupId = slot->groupId;
break;
}
curSessionState = curSessionState->next;
}
LWLockRelease(SessionStateLock);
return groupId;
}
/*
* In resource group mode, how much memory should a query take in bytes.
*/
uint64
ResourceGroupGetQueryMemoryLimit(void)
{
ResGroupCaps *caps;
int64 resgLimit = -1;
uint64 queryMem = -1;
uint64 stateMem = (uint64) statement_mem * 1024L;
Assert(Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY);
/* for bypass query,use statement_mem as the query mem. */
if (bypassedGroup)
return stateMem;
if (gp_resgroup_memory_query_fixed_mem > 0)
return (uint64) gp_resgroup_memory_query_fixed_mem * 1024L;
Assert(selfIsAssigned());
LWLockAcquire(ResGroupLock, LW_SHARED);
caps = &self->group->caps;
resgLimit = caps->memory_quota;
AssertImply(resgLimit < 0, resgLimit == -1);
if (resgLimit == -1)
{
LWLockRelease(ResGroupLock);
return stateMem;
}
queryMem = (uint64)(resgLimit *1024L *1024L / caps->concurrency);
LWLockRelease(ResGroupLock);
/*
* If user requests more than statement_mem, grant that.
*/
return Max(queryMem, stateMem);
}
/*
* After getting the plan of a query, it must be inside
* a transaction which means it must already hold a resgroup
* slot. For some cases, we can unassign to save a concurrency
* slot and other resources (just like bypass):
* - only happen on QD
* - for explicit transaction block (begin; end), don't do it
* because for following SQLs it will not try to enter resgroup
* - pure catalog query or very simple query (no rangetable and
* no function)
* - if the total cost is smaller than resgroup configured mincost
*/
void
check_and_unassign_from_resgroup(PlannedStmt* stmt)
{
bool inFunction;
ResGroupInfo groupInfo;
SIMPLE_FAULT_INJECTOR("check_and_unassign_from_resgroup_entry");
if (Gp_role != GP_ROLE_DISPATCH ||
!IsNormalProcessingMode() ||
!IsResGroupActivated() ||
bypassedGroup != NULL)
return;
/*
* Don't need to consider the sql commands inside the UDF, they will also
* be bypassed or use the same resgroup as the outer query.
*/
inFunction = already_under_executor_run() || utility_nested();
if (IsInTransactionBlock(!inFunction))
return;
/*
* If none of the bypass(unassign) rule satisfy, return directly
*/
if (!can_bypass_based_on_plan_cost(stmt) &&
!(gp_resource_group_bypass_direct_dispatch && can_bypass_direct_dispatch_plan(stmt)) &&
!(gp_resource_group_bypass_catalog_query && is_pure_catalog_plan(stmt)))
return;
/* Unassign from resgroup and bypass */
UnassignResGroup();
do {
decideResGroup(&groupInfo);
} while (!groupIncBypassedRef(&groupInfo));
bypassedGroup = groupInfo.group;
pgstat_report_resgroup(bypassedGroup->groupId);
bypassedSlot.group = groupInfo.group;
bypassedSlot.groupId = groupInfo.groupId;
cgroupOpsRoutine->attachcgroup(bypassedGroup->groupId, MyProcPid,
bypassedGroup->caps.cpuMaxPercent == CPU_MAX_PERCENT_DISABLED);
}
/*
* return ture if there is a resource group which io_limit contains tblspcid.
* if errout is true, print warning, message. */
bool
checkTablespaceInIOlimit(Oid tblspcid, bool errout)
{
Relation rel_resgroup_caps;
SysScanDesc sscan;
HeapTuple tuple;
bool contain = false;
bool print_header = false;
StringInfo log = makeStringInfo();
rel_resgroup_caps = table_open(ResGroupCapabilityRelationId, AccessShareLock);
/* get io limit string from catalog */
sscan = systable_beginscan(rel_resgroup_caps, InvalidOid, false,
NULL, 0, NULL);
while (HeapTupleIsValid(tuple = systable_getnext(sscan)))
{
Oid id;
bool isNULL;
Datum id_datum;
Datum type_datum;
Datum value_datum;
ResGroupLimitType type;
char *io_limit_str;
List *limit_list;
ListCell *cell;
type_datum = heap_getattr(tuple, Anum_pg_resgroupcapability_reslimittype,
rel_resgroup_caps->rd_att, &isNULL);
type = (ResGroupLimitType) DatumGetInt16(type_datum);
if (type != RESGROUP_LIMIT_TYPE_IO_LIMIT)
continue;
id_datum = heap_getattr(tuple, Anum_pg_resgroupcapability_resgroupid,
rel_resgroup_caps->rd_att, &isNULL);
id = DatumGetObjectId(id_datum);
value_datum = heap_getattr(tuple, Anum_pg_resgroupcapability_value,
rel_resgroup_caps->rd_att, &isNULL);
io_limit_str = TextDatumGetCString(value_datum);
if (strcmp(io_limit_str, DefaultIOLimit) == 0)
continue;
limit_list = cgroupOpsRoutine->parseio(io_limit_str);
foreach(cell, limit_list)
{
TblSpcIOLimit *limit = (TblSpcIOLimit *) lfirst(cell);
if (limit->tablespace_oid == tblspcid)
{
contain = true;
if (!errout)
break;
if (!print_header)
{
print_header = true;
appendStringInfo(log, "io limit: following resource groups depend on tablespace %s:", get_tablespace_name(tblspcid));
}
appendStringInfo(log, " %s", GetResGroupNameForId(id));
}
}
cgroupOpsRoutine->freeio(limit_list);
if (contain && !errout)
break;
}
systable_endscan(sscan);
table_close(rel_resgroup_caps, AccessShareLock);
if (contain && errout)
ereport(ERROR, (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
errmsg("%s", log->data),
errhint("you can remove those resource groups or remove tablespace %s from io_limit of those resource groups.", get_tablespace_name(tblspcid))));
pfree(log->data);
pfree(log);
return contain;
}
/*
* Given a planned statement, check if it is pure catalog query or a very simple query.
* Return true only when:
* - there must be only one slice
* - there is no FuncExpr in target list
* - range table cannot contain FUNCTION or TABLEFUNC
* - range table must be catalog if it is RTE_RELATION
*/
static bool
is_pure_catalog_plan(PlannedStmt *stmt)
{
ListCell *rtable;
List *func_tag;
/* For catalog SQL, we only consider SELECT stmt. */
if (stmt->commandType != CMD_SELECT)
return false;
if (stmt->numSlices != 1)
return false;
if (stmt->planTree->targetlist != NIL)
{
int pos;
func_tag = list_make1_int(T_FuncExpr);
pos = find_nodes((Node *) (stmt->planTree->targetlist), func_tag);
list_free(func_tag);
if (pos >= 0)
return false;
}
foreach(rtable, stmt->rtable)
{
RangeTblEntry *rte = (RangeTblEntry *) lfirst(rtable);
if (rte->rtekind == RTE_FUNCTION ||
rte->rtekind == RTE_TABLEFUNC ||
rte->rtekind == RTE_TABLEFUNCTION)
return false;
if (rte->rtekind != RTE_RELATION)
continue;
if (rte->relkind == RELKIND_MATVIEW)
return false;
if (rte->relkind == RELKIND_VIEW)
continue;
if(!IsCatalogRelationOid(rte->relid))
return false;
}
return true;
}
static bool
can_bypass_based_on_plan_cost(PlannedStmt *stmt)
{
ResGroupCaps *caps = &self->group->caps;
int min_cost;
min_cost = (int) pg_atomic_read_u32((pg_atomic_uint32 *) &caps->min_cost);
return stmt->planTree->total_cost < min_cost;
}
/*
* Insert|Delete|Update: bypass those with numSlice = 1
* and the slice is direct dispatch.
*
* Select: since there is motion to gather to QD, bypass
* those with numSlice = 2, and the 1st slice in QD and
* the 2nd slice is direct dispatch.
*/
static bool
can_bypass_direct_dispatch_plan(PlannedStmt *stmt)
{
if (stmt->commandType == CMD_SELECT)
{
return (stmt->numSlices == 2 &&
stmt->slices[1].directDispatch.isDirectDispatch);
}
else if (stmt->commandType == CMD_UPDATE ||
stmt->commandType == CMD_INSERT ||
stmt->commandType == CMD_DELETE)
return stmt->numSlices == 1 && stmt->slices[0].directDispatch.isDirectDispatch;
else
return false;
}