blob: 39e444373aaecb01e50d2bd1f77334c259965f7f [file] [log] [blame]
///////////////////////////////////////////////////////////////////////////////
//
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
//
///////////////////////////////////////////////////////////////////////////////
#include <iostream>
using namespace std;
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <sys/file.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <errno.h>
#include <limits.h>
#include <unistd.h>
#include "lnode.h"
#include "pnode.h"
#include "ptpclient.h"
#include "monitor.h"
#include "monlogging.h"
#include "montrace.h"
#include "meas.h"
extern CMonitor *Monitor;
extern CNode *MyNode;
extern CNodeContainer *Nodes;
extern bool IsRealCluster;
extern CMeas Meas;
extern int MyPNID;
#define MON2MON_IO_RETRIES 3
CPtpClient::CPtpClient (void)
: ptpCommPort_(0)
, ptpClusterSocks_(NULL)
, seqNum_(0)
{
const char method_name[] = "CPtpClient::CPtpClient";
TRACE_ENTRY;
ptpHost_[0] = '\0';
ptpPortBase_[0] = '\0';
if ( !IsRealCluster )
{
SetLocalHost();
}
char * env = getenv( "MON2MON_COMM_PORT" );
if ( env )
{
ptpCommPort_ = atoi( env );
}
else
{
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
, "[%s@%d] MON2MON_COMM_PORT environment variable is not set!\n"
, method_name, __LINE__ );
mon_log_write( PTPCLIENT_PTPCLIENT_1, SQ_LOG_CRIT, buf );
abort();
}
ptpClusterSocks_ = new int[MAX_NODES];
for (int i=0; i < MAX_NODES; ++i)
{
ptpClusterSocks_[i] = -1;
}
TRACE_EXIT;
}
CPtpClient::~CPtpClient (void)
{
const char method_name[] = "CPtpClient::~CPtpClient";
TRACE_ENTRY;
delete [] ptpClusterSocks_;
TRACE_EXIT;
}
int CPtpClient::InitializePtpClient( int pnid, char * ptpPort )
{
const char method_name[] = "CPtpClient::InitializePtpClient";
TRACE_ENTRY;
int err = 0;
if (ptpClusterSocks_[pnid] == -1)
{
int sock = Monitor->MkCltSock( ptpPort );
if (sock < 0)
{
err = sock;
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - MkCltSock failed with error %d\n"
, method_name, __LINE__, err );
}
}
else
{
ptpClusterSocks_[pnid] = sock;
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - connected to monitor node=%d(%s), sock=%d, "
"ptpClusterSocks_[%d]=%d\n"
, method_name, __LINE__
, pnid
, ptpPort
, sock
, pnid
, ptpClusterSocks_[pnid] );
}
}
}
TRACE_EXIT;
return err;
}
bool CPtpClient::IsTargetRemote( int targetNid )
{
const char method_name[] = "CPtpClient::IsTargetRemote";
TRACE_ENTRY;
CLNode *targetLNode = Nodes->GetLNode( targetNid );
CNode *targetNode = targetLNode->GetNode();
bool rs = (targetNode && targetNode->GetPNid() == MyPNID) ? false : true ;
TRACE_EXIT;
return(rs);
}
int CPtpClient::ProcessAddUniqStr( int nid
, int id
, const char *stringValue
, int targetNid
, const char *targetNodeName )
{
const char method_name[] = "CPtpClient::ProcessAddUniqStr";
TRACE_ENTRY;
if (!IsTargetRemote( targetNid ))
{
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Not Sending InternalType_UniqStr request to "
"local nid=%d\n"
, method_name, __LINE__
, targetNid );
}
return(0);
}
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Sending InternalType_UniqStr request to %s, "
"targetNid=%d\n"
, method_name, __LINE__
, targetNodeName
, targetNid );
}
struct internal_msg_def msg;
memset(&msg, 0, sizeof(msg));
msg.type = InternalType_UniqStr;
msg.u.uniqstr.nid = nid;
msg.u.uniqstr.id = id;
char *stringData = & msg.u.uniqstr.valueData;
int stringDataLen = strlen(stringValue) + 1;
// Copy the string
memcpy( stringData, stringValue, stringDataLen );
ptpMsgInfo_t myInfo;
myInfo.pnid = MyPNID;
myInfo.size = offsetof(struct internal_msg_def, u);
myInfo.size += sizeof(msg.u.uniqstr);
myInfo.size += stringDataLen;
if (trace_settings & TRACE_PROCESS_DETAIL)
{
trace_printf( "%s@%d - size_=%d, forwarding unique string [%d, %d] (%s)\n"
, method_name, __LINE__
, myInfo.size
, msg.u.uniqstr.nid
, msg.u.uniqstr.id
, &msg.u.uniqstr.valueData );
}
int error = SendToMon( "process-add-unique-string"
, &msg
, myInfo
, targetNid
, targetNodeName);
TRACE_EXIT;
return error;
}
int CPtpClient::ProcessClone( CProcess *process )
{
const char method_name[] = "CPtpClient::ProcessClone";
TRACE_ENTRY;
CLNode *parentLNode = NULL;
if (process->GetParentNid() != -1)
{
parentLNode = Nodes->GetLNode( process->GetParentNid() );
}
if (parentLNode == NULL)
{
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Not Sending InternalType_Clone request to parentNid=%d"
", process=%s (%d:%d:%d)\n"
, method_name, __LINE__
, process->GetParentNid()
, process->GetName()
, process->GetNid()
, process->GetPid()
, process->GetVerifier() );
}
return(0);
}
if (!IsTargetRemote( process->GetParentNid() ))
{
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Not Sending InternalType_Clone request to "
"local nid=%d\n"
, method_name, __LINE__
, process->GetParentNid() );
}
return(0);
}
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Sending InternalType_Clone request to %s, parentNid=%d"
", process=%s (%d:%d:%d)\n"
, method_name, __LINE__
, parentLNode->GetNode()->GetName()
, process->GetParentNid()
, process->GetName()
, process->GetNid()
, process->GetPid()
, process->GetVerifier() );
}
struct internal_msg_def msg;
memset(&msg, 0, sizeof(msg));
msg.type = InternalType_Clone;
msg.u.clone.backup = process->IsBackup();
msg.u.clone.unhooked = process->IsUnhooked();
msg.u.clone.event_messages = process->IsEventMessages();
msg.u.clone.system_messages = process->IsSystemMessages();
msg.u.clone.nid = process->GetNid();
msg.u.clone.verifier = process->GetVerifier();
msg.u.clone.type = process->GetType();
msg.u.clone.priority = process->GetPriority();
msg.u.clone.parent_nid = process->GetParentNid();
msg.u.clone.parent_pid = process->GetParentPid();
msg.u.clone.parent_verifier = process->GetParentVerifier();
msg.u.clone.os_pid = process->GetPid();
msg.u.clone.persistent = process->IsPersistent();
msg.u.clone.persistent_retries = process->GetPersistentRetries();
msg.u.clone.origPNidNs= -1;
msg.u.clone.argc = process->argc();
msg.u.clone.creation_time = process->GetCreationTime();
msg.u.clone.pathStrId = process->pathStrId();
msg.u.clone.ldpathStrId = process->ldPathStrId();
msg.u.clone.programStrId = process->programStrId();
msg.u.clone.prior_pid = process->GetPriorPid ();
process->SetPriorPid ( 0 );
msg.u.clone.creation_time = process->GetCreationTime();
char *stringData = & msg.u.clone.stringData;
int nameLen = strlen(process->GetName()) + 1;
int portLen = strlen(process->GetPort()) + 1;
int infileLen = strlen(process->infile()) + 1;
int outfileLen = strlen(process->outfile()) + 1;
int argvLen = process->userArgvLen();
// Copy the process name
msg.u.clone.nameLen = nameLen;
memcpy( stringData, process->GetName(), nameLen );
stringData += nameLen;
// Copy the port
msg.u.clone.portLen = portLen;
memcpy(stringData, process->GetPort(), portLen );
stringData += portLen;
// Copy the standard in file name
msg.u.clone.infileLen = infileLen;
memcpy( stringData, process->infile(), infileLen );
stringData += infileLen ;
// Copy the standard out file name
msg.u.clone.outfileLen = outfileLen;
memcpy( stringData, process->outfile(), outfileLen );
stringData += outfileLen ;
// Copy the program argument strings
msg.u.clone.argvLen = argvLen;
memcpy( stringData, process->userArgv(), argvLen );
ptpMsgInfo_t myInfo;
myInfo.pnid = MyPNID;
myInfo.size = offsetof(struct internal_msg_def, u);
myInfo.size += sizeof(msg.u.clone);
myInfo.size += nameLen ;
myInfo.size += portLen ;
myInfo.size += infileLen ;
myInfo.size += outfileLen ;
myInfo.size += argvLen ;
if (trace_settings & TRACE_PROCESS_DETAIL)
{
trace_printf( "%s@%d - size_=%d, programStrId=(%d,%d), "
"pathStrId=(%d,%d), ldPathStrId=(%d,%d), "
"name=%s, strlen(name)=%d, "
"port=%s, strlen(port)=%d, "
"infile=%s, strlen(infile)=%d, "
"outfile=%s, strlen(outfile)=%d, "
"argc=%d, strlen(total argv)=%d, args=[%.*s]\n"
, method_name, __LINE__
, myInfo.size
, msg.u.clone.programStrId.nid
, msg.u.clone.programStrId.id
, msg.u.clone.pathStrId.nid
, msg.u.clone.pathStrId.id
, msg.u.clone.ldpathStrId.nid
, msg.u.clone.ldpathStrId.id
, &msg.u.clone.stringData
, nameLen
, &msg.u.clone.stringData+nameLen
, portLen
, &msg.u.clone.stringData+nameLen
, infileLen
, &msg.u.clone.stringData+nameLen+infileLen
, outfileLen
, msg.u.clone.argc
, argvLen
, argvLen
, &msg.u.clone.stringData+nameLen+infileLen+outfileLen);
}
int error = SendToMon( "process-clone"
, &msg
, myInfo
, process->GetParentNid()
, parentLNode->GetNode()->GetName());
TRACE_EXIT;
return error;
}
int CPtpClient::ProcessExit( CProcess *process
, int targetNid
, const char *targetNodeName )
{
const char method_name[] = "CPtpClient::ProcessExit";
TRACE_ENTRY;
if (!IsTargetRemote( targetNid ))
{
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Not Sending InternalType_Exit request to "
"local nid=%d\n"
, method_name, __LINE__
, targetNid );
}
return(0);
}
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Sending InternalType_Exit request to %s, targetNid=%d"
", process=%s (%d,%d:%d) is exiting\n"
, method_name, __LINE__
, targetNodeName
, targetNid
, process->GetName()
, process->GetNid()
, process->GetPid()
, process->GetVerifier() );
}
struct internal_msg_def msg;
memset(&msg, 0, sizeof(msg));
msg.type = InternalType_Exit;
msg.u.exit.nid = process->GetNid();
msg.u.exit.pid = process->GetPid();
msg.u.exit.verifier = process->GetVerifier();
strcpy(msg.u.exit.name, process->GetName());
msg.u.exit.abended = process->IsAbended();
ptpMsgInfo_t myInfo;
myInfo.pnid = MyPNID;
myInfo.size = offsetof(struct internal_msg_def, u);
myInfo.size += sizeof(msg.u.exit);
if (trace_settings & TRACE_PROCESS_DETAIL)
{
trace_printf( "%s@%d - size_=%d, process %s (%d,%d:%d) "
"abended=%d\n"
, method_name, __LINE__
, myInfo.size
, msg.u.exit.name
, msg.u.exit.nid
, msg.u.exit.pid
, msg.u.exit.verifier
, msg.u.exit.abended );
}
int error = SendToMon( "process-exit"
, &msg
, myInfo
, targetNid
, targetNodeName);
TRACE_EXIT;
return error;
}
int CPtpClient::ProcessInit( CProcess *process
, void *tag
, int result
, int parentNid )
{
const char method_name[] = "CPtpClient::ProcessInit";
TRACE_ENTRY;
CLNode *parentLNode = NULL;
if (process->GetParentNid() != -1)
{
parentLNode = Nodes->GetLNode( process->GetParentNid() );
}
if (parentLNode == NULL)
{
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Not Sending InternalType_ProcessInit request to parentNid=%d"
", process=%s (%d,%d:%d)\n"
, method_name, __LINE__
, process->GetParentNid()
, process->GetName()
, process->GetNid()
, process->GetPid()
, process->GetVerifier() );
}
return(0);
}
if (!IsTargetRemote( process->GetParentNid() ))
{
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Not Sending InternalType_ProcessInit request to "
"local nid=%d\n"
, method_name, __LINE__
, process->GetParentNid() );
}
return(0);
}
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d" " - Sending InternalType_ProcessInit to parent node %s, parentNid=%d"
", for process %s (%d,%d:%d), result=%d, tag=%p\n"
, method_name, __LINE__
, parentLNode->GetNode()->GetName()
, parentNid
, process->GetName()
, process->GetNid()
, process->GetPid()
, process->GetVerifier()
, result
, tag );
}
struct internal_msg_def msg;
memset(&msg, 0, sizeof(msg));
msg.type = InternalType_ProcessInit;
msg.u.processInit.nid = process->GetNid();
msg.u.processInit.pid = process->GetPid();
msg.u.processInit.verifier = process->GetVerifier();
strcpy (msg.u.processInit.name, process->GetName());
msg.u.processInit.state = process->GetState();
msg.u.processInit.result = result;
msg.u.processInit.tag = tag;
msg.u.processInit.origNid = process->GetParentNid();
ptpMsgInfo_t myInfo;
myInfo.pnid = MyPNID;
myInfo.size = offsetof(struct internal_msg_def, u);
myInfo.size += sizeof(msg.u.processInit);
int error = SendToMon( "process-init"
, &msg
, myInfo
, parentNid
, parentLNode->GetNode()->GetName() );
TRACE_EXIT;
return error;
}
int CPtpClient::ProcessKill( CProcess *process
, bool abort
, int targetNid
, const char *targetNodeName )
{
const char method_name[] = "CPtpClient::ProcessKill";
TRACE_ENTRY;
if (!IsTargetRemote( targetNid ))
{
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Not Sending InternalType_Kill request to "
"local nid=%d\n"
, method_name, __LINE__
, targetNid );
}
return(0);
}
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Sending InternalType_Kill request to %s, targetNid=%d"
", killing process (%d,%d:%d)\n"
, method_name, __LINE__
, targetNodeName
, targetNid
, process->GetNid()
, process->GetPid()
, process->GetVerifier() );
}
struct internal_msg_def msg;
memset(&msg, 0, sizeof(msg));
msg.type = InternalType_Kill;
msg.u.kill.nid = process->GetNid();
msg.u.kill.pid = process->GetPid();
msg.u.kill.verifier = process->GetVerifier();
msg.u.kill.persistent_abort = abort;
ptpMsgInfo_t myInfo;
myInfo.pnid = MyPNID;
myInfo.size = offsetof(struct internal_msg_def, u);
myInfo.size += sizeof(msg.u.exit);
if (trace_settings & TRACE_PROCESS_DETAIL)
{
trace_printf( "%s@%d - size_=%d, process (%d,%d:%d) "
"persistent_abort=%d\n"
, method_name, __LINE__
, myInfo.size
, msg.u.kill.nid
, msg.u.kill.pid
, msg.u.kill.verifier
, msg.u.kill.persistent_abort );
}
int error = SendToMon( "process-kill"
, &msg
, myInfo
, targetNid
, targetNodeName);
TRACE_EXIT;
return error;
}
int CPtpClient::ProcessNew( CProcess *process
, int targetNid
, const char *targetNodeName )
{
const char method_name[] = "CPtpClient::ProcessNew";
TRACE_ENTRY;
if (!IsTargetRemote( targetNid ))
{
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Not Sending InternalType_Process request to "
"local nid=%d\n"
, method_name, __LINE__
, targetNid );
}
return(0);
}
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Sending InternalType_Process request to %s, targetNid=%d"
", program=%s, parent=(%d,%d:%d)\n"
, method_name, __LINE__
, targetNodeName
, targetNid
, process->program()
, process->GetParentNid()
, process->GetParentPid()
, process->GetParentVerifier() );
}
struct internal_msg_def msg;
memset(&msg, 0, sizeof(msg));
msg.type = InternalType_Process;
msg.u.process.nid = process->GetNid();
msg.u.process.pid = process->GetPid();
msg.u.process.type = process->GetType();
msg.u.process.priority = process->GetPriority();
msg.u.process.backup = process->IsBackup();
msg.u.process.unhooked = process->IsUnhooked();
msg.u.process.tag = process;
msg.u.process.parent_nid = process->GetParentNid();
msg.u.process.parent_pid = process->GetParentPid();
msg.u.process.parent_verifier = process->GetParentVerifier();
msg.u.process.pair_parent_nid = process->GetPairParentNid();
msg.u.process.pair_parent_pid = process->GetPairParentPid();
msg.u.process.pair_parent_verifier = process->GetPairParentVerifier();
msg.u.process.pathStrId = process->pathStrId();
msg.u.process.ldpathStrId = process->ldPathStrId();
msg.u.process.programStrId = process->programStrId();
msg.u.process.argc = process->argc();
char *stringData = & msg.u.process.stringData;
int nameLen = strlen(process->GetName()) + 1;
int infileLen = strlen(process->infile()) + 1;
int outfileLen = strlen(process->outfile()) + 1;
int argvLen = process->userArgvLen();
// Copy the process name
msg.u.process.nameLen = nameLen;
memcpy( stringData, process->GetName(), nameLen );
stringData += nameLen;
// Copy the standard in file name
msg.u.process.infileLen = infileLen;
memcpy( stringData, process->infile(), infileLen );
stringData += infileLen;
// Copy the standard out file name
msg.u.process.outfileLen = outfileLen;
memcpy( stringData, process->outfile(), outfileLen );
stringData += outfileLen;
// Copy the program argument strings
msg.u.process.argvLen = argvLen;
memcpy( stringData, process->userArgv(), argvLen );
ptpMsgInfo_t myInfo;
myInfo.pnid = MyPNID;
myInfo.size = offsetof(struct internal_msg_def, u);
myInfo.size += sizeof(msg.u.process);
myInfo.size += nameLen ;
myInfo.size += infileLen ;
myInfo.size += outfileLen ;
myInfo.size += argvLen ;
if (trace_settings & TRACE_PROCESS_DETAIL)
{
trace_printf( "%s@%d - size_=%d, programStrId=(%d,%d), "
"pathStrId=(%d,%d), ldPathStrId=(%d,%d), "
"name=%s, strlen(name)=%d, "
"infile=%s, strlen(infile)=%d, "
"outfile=%s, strlen(outfile)=%d, "
"argc=%d, strlen(total argv)=%d, args=[%.*s]\n"
, method_name, __LINE__
, myInfo.size
, msg.u.process.programStrId.nid
, msg.u.process.programStrId.id
, msg.u.process.pathStrId.nid
, msg.u.process.pathStrId.id
, msg.u.process.ldpathStrId.nid
, msg.u.process.ldpathStrId.id
, &msg.u.process.stringData
, nameLen
, &msg.u.process.stringData+nameLen
, infileLen
, &msg.u.process.stringData+nameLen+infileLen
, outfileLen
, msg.u.process.argc
, argvLen
, argvLen
, &msg.u.process.stringData+nameLen+infileLen+outfileLen);
}
int error = SendToMon( "process-new"
, &msg
, myInfo
, targetNid
, targetNodeName);
TRACE_EXIT;
return error;
}
int CPtpClient::ProcessNotify( int nid
, int pid
, Verifier_t verifier
, _TM_Txid_External transId
, bool canceled
, CProcess *targetProcess
, int targetNid
, const char *targetNodeName )
{
const char method_name[] = "CPtpClient::ProcessNotify";
TRACE_ENTRY;
if (!IsTargetRemote( targetNid ))
{
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Not Sending InternalType_Notify request to "
"local nid=%d\n"
, method_name, __LINE__
, targetNid );
}
return(0);
}
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Sending InternalType_Notify request to %s"
", nid=%d, canceled=%d\n"
, method_name, __LINE__
, targetNodeName
, targetNid
, canceled );
}
struct internal_msg_def msg;
memset(&msg, 0, sizeof(msg));
msg.type = InternalType_Notify;
msg.u.notify.nid = nid;
msg.u.notify.pid = pid;
msg.u.notify.verifier = verifier;
msg.u.notify.canceled = canceled;
msg.u.notify.target_nid = targetProcess->GetNid();
msg.u.notify.target_pid = targetProcess->GetPid();
msg.u.notify.target_verifier = targetProcess->GetVerifier();
msg.u.notify.trans_id = transId;
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
if (canceled)
{
trace_printf( "%s@%d - Process (%d, %d:%d) deleting death "
"notice interest for %s (%d, %d:%d), "
"trans_id=%lld.%lld.%lld.%lld\n"
, method_name, __LINE__
, msg.u.notify.nid
, msg.u.notify.pid
, msg.u.notify.verifier
, targetProcess->GetName()
, msg.u.notify.target_nid
, msg.u.notify.target_pid
, msg.u.notify.target_verifier
, msg.u.notify.trans_id.txid[0]
, msg.u.notify.trans_id.txid[1]
, msg.u.notify.trans_id.txid[2]
, msg.u.notify.trans_id.txid[3] );
}
else
{
trace_printf("%s@%d - Process (%d, %d:%d) registering interest "
"in death of process %s (%d, %d:%d), "
"trans_id=%lld.%lld.%lld.%lld\n"
, method_name, __LINE__
, msg.u.notify.nid
, msg.u.notify.pid
, msg.u.notify.verifier
, targetProcess->GetName()
, msg.u.notify.target_nid
, msg.u.notify.target_pid
, msg.u.notify.target_verifier
, msg.u.notify.trans_id.txid[0]
, msg.u.notify.trans_id.txid[1]
, msg.u.notify.trans_id.txid[2]
, msg.u.notify.trans_id.txid[3] );
}
}
ptpMsgInfo_t myInfo;
myInfo.pnid = MyPNID;
myInfo.size = offsetof(struct internal_msg_def, u);
myInfo.size += sizeof(msg.u.notify);
int error = SendToMon( "process-notify"
, &msg
, myInfo
, targetNid
, targetNodeName);
TRACE_EXIT;
return error;
}
int CPtpClient::ProcessStdInReq( int nid
, int pid
, StdinReqType type
, int supplierNid
, int supplierPid )
{
const char method_name[] = "CPtpClient::ProcessStdInReq";
TRACE_ENTRY;
if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
{
trace_printf( "%s@%d - Sending InternalType_StdinReq request type =%d "
"from (%d,%d), for supplier (%d,%d)\n"
, method_name, __LINE__
, type
, nid
, pid
, supplierNid
, supplierPid );
}
CLNode *lnode = Nodes->GetLNode( supplierNid );
if (lnode == NULL)
{
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
, "[%s], Can't find supplier node nid=%d "
"for stdin data request.\n"
, method_name
, supplierNid );
mon_log_write(PTPCLIENT_STDINREQ_1, SQ_LOG_ERR, buf);
TRACE_EXIT;
return -1;
}
CProcess *process = lnode->GetProcessL( supplierPid );
if (process == NULL)
{
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
, "[%s], Can't find process nid=%d, "
"pid=%d for stdin data request.\n"
, method_name
, supplierNid
, supplierPid );
mon_log_write(PTPCLIENT_STDINREQ_2, SQ_LOG_ERR, buf);
TRACE_EXIT;
return -1;
}
struct internal_msg_def msg;
memset(&msg, 0, sizeof(msg));
msg.type = InternalType_StdinReq;
msg.u.stdin_req.nid = nid;
msg.u.stdin_req.pid = pid;
msg.u.stdin_req.reqType = type;
msg.u.stdin_req.supplier_nid = supplierNid;
msg.u.stdin_req.supplier_pid = supplierPid;
ptpMsgInfo_t myInfo;
myInfo.pnid = MyPNID;
myInfo.size = offsetof(struct internal_msg_def, u);
myInfo.size += sizeof(msg.u.stdin_req);
if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL))
{
trace_printf( "%s@%d - size_=%d, type =%d "
"from (%d,%d), for supplier (%d,%d)\n"
, method_name, __LINE__
, myInfo.size
, msg.u.stdin_req.reqType
, msg.u.stdin_req.nid
, msg.u.stdin_req.pid
, msg.u.stdin_req.supplier_nid
, msg.u.stdin_req.supplier_pid );
}
int error = SendToMon( "process-stdin"
, &msg
, myInfo
, process->GetNid()
, lnode->GetNode()->GetName());
TRACE_EXIT;
return error;
}
int CPtpClient::ProcessStdIoData( int nid
, int pid
, StdIoType type
, ssize_t count
, char *data )
{
const char method_name[] = "CPtpClient::ProcessStdIoData";
TRACE_ENTRY;
if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
{
trace_printf( "%s@%d - Sending InternalType_IoData request type =%d "
"to (%d,%d), count=%ld\n"
, method_name, __LINE__
, type
, nid
, pid
, count );
}
CLNode *lnode = Nodes->GetLNode( nid );
if (lnode == NULL)
{
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
, "[%s], Can't find supplier node nid=%d "
"for stdin data request.\n"
, method_name
, nid );
mon_log_write(PTPCLIENT_STDIODATA_1, SQ_LOG_ERR, buf);
TRACE_EXIT;
return -1;
}
CProcess *process = lnode->GetProcessL( pid );
if (process == NULL)
{
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
, "[%s], Can't find process nid=%d, "
"pid=%d for stdin data request.\n"
, method_name
, nid
, pid );
mon_log_write(PTPCLIENT_STDIODATA_2, SQ_LOG_ERR, buf);
TRACE_EXIT;
return -1;
}
struct internal_msg_def msg;
memset(&msg, 0, sizeof(msg));
msg.type = InternalType_IoData;
msg.u.iodata.nid = nid ;
msg.u.iodata.pid = pid ;
msg.u.iodata.ioType = type ;
msg.u.iodata.length = count;
memcpy(&msg.u.iodata.data, data, count);
ptpMsgInfo_t myInfo;
myInfo.pnid = MyPNID;
myInfo.size = offsetof(struct internal_msg_def, u);
myInfo.size += sizeof(msg.u.iodata);
if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL))
{
trace_printf( "%s@%d - size_=%d, type =%d "
"to (%d,%d), count=%d\n(%s)"
, method_name, __LINE__
, myInfo.size
, msg.u.iodata.ioType
, msg.u.iodata.nid
, msg.u.iodata.pid
, msg.u.iodata.length
, msg.u.iodata.length?msg.u.iodata.data:"\n" );
}
int error = SendToMon( "process-stdio-data"
, &msg
, myInfo
, process->GetNid()
, lnode->GetNode()->GetName());
TRACE_EXIT;
return error;
}
int CPtpClient::SendToMon(const char *reqType, internal_msg_def *msg
, ptpMsgInfo_t &myInfo
, int targetNid, const char *hostName)
{
const char method_name[] = "CPtpClient::SendToMon";
TRACE_ENTRY;
char ptpHost[MAX_PROCESSOR_NAME];
char ptpPort[MAX_PROCESSOR_NAME];
int error = 0;
int tempPort = ptpCommPort_;
int pnid = 0;
int sendSock = -1;
int retryCount = 0;
CNode *node = NULL;
CLNode *lnode = NULL;
ptpHost[0] = '\0';
lnode = Nodes->GetLNode( targetNid );
node = lnode->GetNode();
pnid = node->GetPNid();
// For virtual env
if (!IsRealCluster)
{
tempPort += targetNid;
strcat( ptpHost, ptpHost_ );
}
else
{
strcat( ptpHost, hostName );
}
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - reqType=%s, hostName=%s, targetNid=%d, "
"ptpHost=%s, tempPort=%d, ptpCommPort_=%d\n"
, method_name, __LINE__
, reqType
, hostName
, targetNid
, ptpHost
, tempPort
, ptpCommPort_ );
}
memset( &ptpPort, 0, MAX_PROCESSOR_NAME );
memset( &ptpPortBase_, 0, MAX_PROCESSOR_NAME+100 );
sprintf( ptpPortBase_,"%s:", ptpHost );
sprintf( ptpPort,"%s%d", ptpPortBase_, tempPort );
retryIO:
if (ptpClusterSocks_[pnid] == -1)
{
error = InitializePtpClient( pnid, ptpPort );
if (error < 0)
{
ptpClusterSocks_[pnid] = -1;
TRACE_EXIT;
return error;
}
}
sendSock = ptpClusterSocks_[pnid];
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - sending %s REQ to Monitor=%s, sock=%d\n"
, method_name, __LINE__
, reqType
, ptpPort
, sendSock );
}
error = SockSend((char *) &myInfo, sizeof(ptpMsgInfo_t), sendSock);
if (error)
{
int err = error;
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
, "[%s], unable to send %s request size %ld to "
"node %s, error: %d(%s)\n"
, method_name, reqType, sizeof(ptpMsgInfo_t), ptpHost, err, strerror(err) );
mon_log_write(PTPCLIENT_SENDTOMON_1, SQ_LOG_ERR, buf);
}
else
{
error = SockSend((char *) msg, myInfo.size, sendSock);
if (error)
{
int err = error;
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
, "[%s], unable to send %s request to "
"node %s, error: %d(%s)\n"
, method_name, reqType, ptpHost, err, strerror(err) );
mon_log_write(PTPCLIENT_SENDTOMON_2, SQ_LOG_ERR, buf);
}
}
if (error)
{
SockClose( pnid );
if ( retryCount < MON2MON_IO_RETRIES )
{
retryCount++;
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - retrying IO (%d) to node %s\n"
, method_name, __LINE__
, retryCount
, ptpHost );
}
goto retryIO;
}
}
TRACE_EXIT;
return error;
}
void CPtpClient::SockClose( int pnid )
{
const char method_name[] = "CPtpClient::SockClose";
TRACE_ENTRY;
if (ptpClusterSocks_[pnid] != -1)
{
close( ptpClusterSocks_[pnid] );
ptpClusterSocks_[pnid] = -1;
}
TRACE_EXIT;
}
void CPtpClient::SetLocalHost( void )
{
gethostname( ptpHost_, MAX_PROCESSOR_NAME );
}
int CPtpClient::SockReceive(char *buf, int size, int sockFd)
{
const char method_name[] = "CPtpClient::SockReceive";
TRACE_ENTRY;
bool readAgain = false;
int error = 0;
int readCount = 0;
int received = 0;
int sizeCount = size;
do
{
readCount = (int) recv( sockFd
, buf
, sizeCount
, 0 );
if ( readCount > 0 ) Meas.addSockPtpRcvdBytes( readCount );
if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
{
trace_printf( "%s@%d - Count read %d = recv(%d)\n"
, method_name, __LINE__
, readCount
, sizeCount );
}
if ( readCount > 0 )
{ // Got data
received += readCount;
buf += readCount;
if ( received == size )
{
readAgain = false;
}
else
{
sizeCount -= readCount;
readAgain = true;
}
}
else if ( readCount == 0 )
{ // EOF
error = ENODATA;
readAgain = false;
}
else
{ // Got an error
if ( errno != EINTR)
{
error = errno;
readAgain = false;
}
else
{
readAgain = true;
}
}
}
while( readAgain );
if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
{
trace_printf( "%s@%d - recv(), received=%d, error=%d(%s)\n"
, method_name, __LINE__
, received
, error, strerror(error) );
}
TRACE_EXIT;
return error;
}
int CPtpClient::SockSend(char *buf, int size, int sockFd)
{
const char method_name[] = "CPtpClient::SockSend";
TRACE_ENTRY;
bool sendAgain = false;
int error = 0;
int sendCount = 0;
int sent = 0;
do
{
sendCount = (int) send( sockFd
, buf
, size
, 0 );
if ( sendCount > 0 ) Meas.addSockPtpSentBytes( sendCount );
if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
{
trace_printf( "%s@%d - send(), sendCount=%d\n"
, method_name, __LINE__
, sendCount );
}
if ( sendCount > 0 )
{ // Sent data
sent += sendCount;
if ( sendCount == size )
{
sendAgain = false;
}
else
{
sendAgain = true;
}
}
else
{ // Got an error
if ( errno != EINTR)
{
error = errno;
sendAgain = false;
}
else
{
sendAgain = true;
}
}
}
while( sendAgain );
if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
{
trace_printf( "%s@%d - send(), sent=%d, error=%d(%s)\n"
, method_name, __LINE__
, sent
, error, strerror(error) );
}
TRACE_EXIT;
return error;
}