blob: ae83f4579fe0f45f2e939c8220bdea4b4a2a2b97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "envswitch.h"
#include "dynrm.h"
#include "resourcemanager/resourcemanager.h"
#include "communication/rmcomm_AsyncComm.h"
#include "communication/rmcomm_MessageHandler.h"
#include "communication/rmcomm_MessageServer.h"
#include "communication/rmcomm_RMSEG2RM.h"
#include "resourceenforcer/resourceenforcer.h"
#include "cdb/cdbtmpdir.h"
#include "storage/pmsignal.h" /* PostmasterIsAlive */
int ResManagerMainSegment2ndPhase(void)
{
int res = FUNC_RETURN_OK;
elog(DEBUG5, "HAWQ RM SEG is triggered.");
/* Register message handlers */
registerMessageHandler(REQUEST_QE_MOVETOCGROUP, handleQEMoveToCGroup);
registerMessageHandler(REQUEST_QE_MOVEOUTCGROUP, handleQEMoveOutCGroup);
registerMessageHandler(REQUEST_QE_SETWEIGHTCGROUP, handleQESetWeightCGroup);
registerMessageHandler(REQUEST_RM_INCREASE_MEMORY_QUOTA, handleRMIncreaseMemoryQuota);
registerMessageHandler(REQUEST_RM_DECREASE_MEMORY_QUOTA, handleRMDecreaseMemoryQuota);
registerMessageHandler(REQUEST_RM_RUALIVE, handleRMSEGRequestRUAlive);
/**************************************************************************
* New socket facility poll based server.
**************************************************************************/
res = initializeSocketServer_RMSEG();
if ( res != FUNC_RETURN_OK ) {
elog(FATAL, "Fail to initialize socket server.");
}
/*
* Resource enforcement: initialize, cleanup, and rebuild CGroup hash table
* when segment resource manager starts/restarts
*/
initCGroupThreads();
InitFileAccess();
/*
* Notify postmaster that HAWQ RM is ready. Ignore the possible problem that
* the parent process quits. HAWQ RM will automatically detect if its parent
* dies, then HAWQ RM should exit normally.
*/
kill(DRMGlobalInstance->ParentPID, SIGUSR2);
elog(LOG, "HAWQ RM SEG process works now.");
/* Start request handler to provide services. */
res = MainHandlerLoop_RMSEG();
elog(RMLOG, "HAWQ RM SEG server goes into exit phase.");
return res;
}
int initializeSocketServer_RMSEG(void)
{
int res = FUNC_RETURN_OK;
int netres = 0;
char *allip = "0.0.0.0";
pgsocket RMListenSocket[HAWQRM_SERVER_PORT_COUNT];
for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i ) {
RMListenSocket[i] = PGINVALID_SOCKET;
}
/* Listen normal socket addresses. */
netres = StreamServerPort( AF_UNSPEC,
allip,
rm_segment_port,
NULL,
RMListenSocket,
HAWQRM_SERVER_PORT_COUNT);
/* If there are multiple segments in one host, which is common in old imp.
* We can not make all segments work. So, if HAWQ RM SEG fails to start
* socket server by listening the port, we accept this case and make it
* silent. HAWQ RM will not recognize this segment and will not assign
* tasks. */
if ( netres != STATUS_OK ) {
res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER;
elog( LOG, "cannot create socket server. HostName=%s, Port=%d",
allip,
rm_segment_port);
return res;
}
/* Initialize array for polling all file descriptors. */
initializeAsyncComm();
int validfdcount = 0;
AsyncCommBuffer newbuffer = NULL;
for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i ) {
if (RMListenSocket[i] != PGINVALID_SOCKET) {
netres = registerFileDesc(RMListenSocket[i],
ASYNCCOMM_READ,
&AsyncCommBufferHandlersMsgServer,
NULL,
&newbuffer);
if ( netres != FUNC_RETURN_OK ) {
res = REQUESTHANDLER_FAIL_START_SOCKET_SERVER;
elog(LOG, "Resource manager cannot track socket server.");
break;
}
validfdcount++;
InitHandler_Message(newbuffer);
}
}
if ( res != FUNC_RETURN_OK ) {
for ( int i = 0 ; i < HAWQRM_SERVER_PORT_COUNT ; ++i ) {
if ( RMListenSocket[i] != PGINVALID_SOCKET ) close(RMListenSocket[i]);
}
return res;
}
elog(DEBUG5, "HAWQ RM SEG :: Starts accepting resource request. "
"Listening normal socket port %s:%d. "
"Total listened %d FDs.",
allip,
rm_segment_port,
validfdcount);
return res;
}
int MainHandlerLoop_RMSEG(void)
{
int res = FUNC_RETURN_OK;
uint64_t curtime = 0;
int errorcode = FUNC_RETURN_OK;
char errorbuf[1024];
DRMGlobalInstance->ResourceManagerStartTime = gettime_microsec();
while( DRMGlobalInstance->ResManagerMainKeepRun ) {
if (!PostmasterIsAlive(true)) {
DRMGlobalInstance->ResManagerMainKeepRun = false;
elog(LOG, "Postmaster is not alive, resource manager exits");
break;
}
/* PART1. Handle socket server inputs. */
res = processAllCommFileDescs();
if ( res != FUNC_RETURN_OK ) {
/*
* The possible error here is the failure of poll(), we won't keep
* running HAWQ RM any longer, graceful quit is requested.
*/
DRMGlobalInstance->ResManagerMainKeepRun = false;
elog(LOG, "System error cause resource manager not possible to track "
"network communications.");
}
/* PART2. Handle all BE submitted requests. */
processSubmittedRequests();
if ( curtime - DRMGlobalInstance->TmpDirLastCheckTime >
1000000LL * rm_segment_tmpdir_detect_interval )
{
checkAndBuildFailedTmpDirList();
DRMGlobalInstance->TmpDirLastCheckTime = gettime_microsec();
}
/* PART3. Fresh local host info and send IMAlive message to resource
* manager server. */
curtime = gettime_microsec();
if ( DRMGlobalInstance->LocalHostStat == NULL ||
curtime - DRMGlobalInstance->LocalHostLastUpdateTime >
1000000LL * rm_segment_config_refresh_interval )
{
refreshLocalHostInstance();
checkLocalPostmasterStatus();
}
if ( DRMGlobalInstance->SendIMAlive )
{
if (DRMGlobalInstance->LocalHostStat != NULL &&
curtime - DRMGlobalInstance->HeartBeatLastSentTime >
1000000LL * rm_segment_heartbeat_interval )
{
sendIMAlive(&errorcode, errorbuf, sizeof(errorbuf));
DRMGlobalInstance->HeartBeatLastSentTime = gettime_microsec();
}
}
/* PART4. Send responses back to the clients. */
sendResponseToClients();
/* PART5. Resource enforcement work thread quit */
if (g_enforcement_thread_quited) {
elog(ERROR, "Resource enforcement thread quited");
}
}
elog(RMLOG, "Resource manager main event handler exits.");
return res;
}
/*
* Check if this temporary directory is OK to read or write.
* If not, it's probably due to disk error.
*/
bool CheckTmpDirAvailable(char *path)
{
FILE *tmp = NULL;
bool ret = true;
char* fname = NULL;
char* testfile = "/checktmpdir.log";
/* open a file to check if
* this temporary directory is OK.
*/
fname = palloc0(strlen(path) + strlen(testfile) + 1);
strncpy(fname, path, strlen(path));
strncpy(fname + strlen(path), testfile, strlen(testfile));
tmp = fopen(fname, "w");
if (tmp == NULL)
{
elog(LOG, "Can't open file:%s when check temporary directory: %s",
fname,
strerror(errno));
ret = false;
}
pfree(fname);
if (tmp != NULL)
fclose(tmp);
return ret;
}
/*
* Check the status of each temporary directory,
* and build a list of failed temporary directories.
*/
void checkAndBuildFailedTmpDirList(void)
{
uint64_t starttime = gettime_microsec();
destroyTmpDirList(DRMGlobalInstance->LocalHostFailedTmpDirList);
DRMGlobalInstance->LocalHostFailedTmpDirList = NULL;
DQUEUE_LOOP_BEGIN(&DRMGlobalInstance->LocalHostTempDirectories, iter, SimpStringPtr, value)
if (!CheckTmpDirAvailable(value->Str))
{
char *failedDir = pstrdup(value->Str);
DRMGlobalInstance->LocalHostFailedTmpDirList =
lappend(DRMGlobalInstance->LocalHostFailedTmpDirList, failedDir);
}
DQUEUE_LOOP_END
uint64_t endtime = gettime_microsec();
elog(LOG, "checkAndBuildFailedTmpDirList finished checking temporary "
"directory, which costs " UINT64_FORMAT " us",
endtime - starttime);
}
void switchIMAliveSendingTarget(void)
{
/* We switch to standby server only when it is correctly set. */
if (pg_strcasecmp(standby_addr_host, "none") != 0)
{
DRMGlobalInstance->SendToStandby = !DRMGlobalInstance->SendToStandby;
elog(LOG, "segment will send heart-beat to %s from now on",
DRMGlobalInstance->SendToStandby ? "standby" : "master");
}
}