blob: 81900e8474dcc08e61f70918fde9b1b4d292cab4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "envswitch.h"
#include "dynrm.h"
#include "time.h"
#include "resourcemanager/resourcemanager.h"
#include "communication/rmcomm_RMSEG2RM.h"
#include "communication/rmcomm_RM2RMSEG.h"
#include "communication/rmcomm_MessageHandler.h"
#include "communication/rmcomm_QE_RMSEG_Protocol.h"
#include "communication/rmcomm_RM_RMSEG_Protocol.h"
#include "utils/simplestring.h"
#include "utils/linkedlist.h"
#include "utils/palloc.h"
#include "gp-libpq-fe.h"
#include "gp-libpq-int.h"
#include "resourceenforcer/resourceenforcer_message.h"
/* UINT32_T.MAX_VALUE = 4294967295 */
#define MAX_DIGITS_OF_UINT32_T 10
/* Format of cgroup timestamp: YYYYMMDD_HHMMSS */
#define LENTH_CGROUP_TIMESTAMP 15
char *buildCGroupNameString(int64 masterStartTime, uint32 connId);
char *buildCGroupNameString(int64 masterStartTime, uint32 connId)
{
int tLength = strlen(timestamptz_to_str(masterStartTime));
char *tTime = (char *)rm_palloc0(PCONTEXT, tLength+1);
strcpy(tTime, timestamptz_to_str(masterStartTime));
char *sTime = (char *)rm_palloc0(PCONTEXT, LENTH_CGROUP_TIMESTAMP+1);
sprintf(sTime, "%.4s%.2s%.2s_%.2s%.2s%.2s",
tTime,
tTime + 5,
tTime + 8,
tTime + 11,
tTime + 14,
tTime + 17);
/* CGroup name follows the format: hawq-YYMMDD_HHMMSS-connCONNECTIONID */
char *cgroupName = (char*)rm_palloc0(PCONTEXT,
sizeof("hawq-") -1 + strlen(sTime) +
sizeof("-conn") -1 + MAX_DIGITS_OF_UINT32_T + 1);
sprintf(cgroupName, "hawq-%s-conn%d", sTime, connId);
rm_pfree(PCONTEXT, sTime);
rm_pfree(PCONTEXT, tTime);
return cgroupName;
}
/**
* Handle QE MoveToGroup function call.
*/
bool handleQEMoveToCGroup(void **arg)
{
ConnectionTrack conntrack = (ConnectionTrack)(*arg);
Assert(conntrack != NULL);
RPCResponseMoveToCGroupData response;
int segmentPid;
TimestampTz masterStartTime;
uint32_t connId;
int segId;
RPCRequestMoveToCGroup request = (RPCRequestMoveToCGroup)
(conntrack->MessageBuff.Buffer);
masterStartTime = request->MasterStartTime;
connId = request->ConnID;
segId = request->SegmentID;
segmentPid = request->ProcID;
elog(DEBUG1, "Resource enforcer moves QE to CGroup: "
"masterStartTime = %s, connId = %d, segId = %d, procId = %d",
timestamptz_to_str(masterStartTime),
connId, segmentPid, segmentPid);
response.Result = FUNC_RETURN_OK;
response.Reserved = 0;
buildResponseIntoConnTrack(conntrack,
(char *)&response,
sizeof(response),
conntrack->MessageMark1,
conntrack->MessageMark2,
RESPONSE_QE_MOVETOCGROUP);
conntrack->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
MEMORY_CONTEXT_SWITCH_BACK
/* Prepare cgroupName from masterStartTime, connId and segId information */
char *cgroupName = buildCGroupNameString(masterStartTime, connId);
/* Move To CGroup in thread */
ResourceEnforcementRequest *task = (ResourceEnforcementRequest *)malloc(sizeof(ResourceEnforcementRequest));
task->type = MOVETO;
task->pid = segmentPid;
memset(task->cgroup_name, 0, sizeof(task->cgroup_name));
strncpy(task->cgroup_name, cgroupName, strlen(cgroupName)+1);
enqueue(g_queue_cgroup, (void *)task);
rm_pfree(PCONTEXT, cgroupName);
return true;
}
/**
* Handle QE MoveOutGroup function call.
*/
bool handleQEMoveOutCGroup(void **arg)
{
ConnectionTrack conntrack = (ConnectionTrack)(*arg);
Assert(conntrack != NULL);
RPCResponseMoveOutCGroupData response;
int segmentPid;
TimestampTz masterStartTime;
uint32_t connId;
int segId;
RPCRequestMoveOutCGroup request = (RPCRequestMoveOutCGroup)
(conntrack->MessageBuff.Buffer);
masterStartTime = request->MasterStartTime;
connId = request->ConnID;
segId = request->SegmentID;
segmentPid = request->ProcID;
elog(DEBUG1, "Resource enforcer moves QE out from CGroup: "
"masterStartTime = %s, connId = %d, segId = %d, procId = %d",
timestamptz_to_str(masterStartTime),
connId, segmentPid, segmentPid);
response.Result = FUNC_RETURN_OK;
response.Reserved = 0;
buildResponseIntoConnTrack(conntrack,
(char *)&response,
sizeof(response),
conntrack->MessageMark1,
conntrack->MessageMark2,
RESPONSE_QE_MOVEOUTCGROUP);
conntrack->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
MEMORY_CONTEXT_SWITCH_BACK
/* Prepare cgroupName from masterStartTime, connId and segId information */
char *cgroupName = buildCGroupNameString(masterStartTime, connId);
/* Move Out CGroup in thread */
ResourceEnforcementRequest *task = (ResourceEnforcementRequest *)
malloc(sizeof(ResourceEnforcementRequest));
task->type = MOVEOUT;
task->pid = segmentPid;
memset(task->cgroup_name, 0, sizeof(task->cgroup_name));
strncpy(task->cgroup_name, cgroupName, strlen(cgroupName)+1);
enqueue(g_queue_cgroup, (void *)task);
rm_pfree(PCONTEXT, cgroupName);
return true;
}
/**
* Handle QE SetWeightGroup function call.
*/
bool handleQESetWeightCGroup(void **arg)
{
ConnectionTrack conntrack = (ConnectionTrack)(*arg);
Assert(conntrack != NULL);
TimestampTz masterStartTime;
uint32_t connId;
int segId;
int segmentPid;
double weight;
RPCRequestSetWeightCGroup request = (RPCRequestSetWeightCGroup)
(conntrack->MessageBuff.Buffer);
masterStartTime = request->MasterStartTime;
connId = request->ConnID;
segId = request->SegmentID;
segmentPid = request->ProcID;
weight = request->Weight;
elog(DEBUG1, "Resource enforcer sets weight for QE in CGroup: "
"masterStartTime = %s, connId = %d, segId = %d, "
"procId = %d, weight = %lf",
timestamptz_to_str(masterStartTime),
connId, segmentPid, segmentPid, weight);
/* Prepare cgroupName from masterStartTime, connId and segId information */
char *cgroupName = buildCGroupNameString(masterStartTime, connId);
/* Build request instance and add request into the queue. */
ResourceEnforcementRequest *task = (ResourceEnforcementRequest *)
malloc(sizeof(ResourceEnforcementRequest));
if ( task == NULL ) {
elog(ERROR, "Resource enforcer fails to malloc "
"resource enforcement request instance");
}
task->type = SETWEIGHT;
task->pid = segmentPid;
memset(task->cgroup_name, 0, sizeof(task->cgroup_name));
strncpy(task->cgroup_name, cgroupName, strlen(cgroupName)+1);
task->query_resource.vcore = weight;
if (enqueue(g_queue_cgroup, (void *)task) == -1 ) {
elog(ERROR, "Resource enforcer fails to add "
"resource enforcement request into task queue");
}
RPCResponseSetWeightCGroupData response;
response.Result = FUNC_RETURN_OK;
response.Reserved = 0;
buildResponseIntoConnTrack(conntrack,
(char *)&response,
sizeof(response),
conntrack->MessageMark1,
conntrack->MessageMark2,
RESPONSE_QE_SETWEIGHTCGROUP);
conntrack->ResponseSent = false;
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
PCONTRACK->ConnToSend = lappend(PCONTRACK->ConnToSend, conntrack);
MEMORY_CONTEXT_SWITCH_BACK
rm_pfree(PCONTEXT, cgroupName);
return true;
}