blob: aa8240e23d28e782a6c9c05c87f2b2be5f2e6432 [file] [log] [blame]
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <sys/time.h>
#include "seabed/ms.h"
#include "seabed/pctl.h"
#include "seabed/pevents.h"
#include "seabed/trace.h"
#include "dtm/tm_util.h"
#include "dtm/xa.h"
#include "rm.h"
#include "tmrm.h"
#include "xatmglob.h"
#include "tmlibmsg.h"
#include "tmmap.h"
#include "tmdeque.h"
#include "tmlogging.h"
#include "tmtimer.h"
#include "tmpool.h"
#include "xatmmsg.h"
#include "tmregistry.h"
#include "xatmlib.h"
#include "xatmapi.h"
// Externals
// Global functions
// -------------------------------------------------------------------
// XIDtotransid
// Purpose - Extract the transaction id from an XID.
// ** NOTE THIS FUNCTION DEPENDS ON XID AND TRANSID FORMATS! **
// This is only done for branches associated with a TSE where the
// XID.data contains the transid. The transid is then used
// to maintain a list of Seabed msgids per transaction to allow
// requests to be cancelled when complete_all completes to avoid
// replies left over from previous interactions confusing completions.
// To ensure we only get TSE specific XIDs, the XID.formatID is checked
// to make sure it is correct (FORMAT_ID = 403).
// Any XIDs of different formats will return 0.
// -------------------------------------------------------------------
int64 XIDtotransid(XID *pp_xid)
{
union u_
{
int64 i;
char c[8];
} lv_transid;
if (pp_xid->formatID == FORMAT_ID)
{
memcpy(&lv_transid.c, pp_xid->data, 8);
return lv_transid.i;
}
else
return 0;
}
// -------------------------------------------------------------------
// XIDtoa
// Purpose - Convert an xid to a character string for tracing.
// Note: Callers to this procedure must be careful to use the returned
// string before a thread switch occurs as it could cause another
// call to XIDtoa and overwrite the string.
// -------------------------------------------------------------------
char * XIDtoa(XID *pp_xid)
{
static char lv_xid[11+11+11+XIDDATASIZE+1];
union u_
{
int32 i;
char c[4];
} lv_seqNum;
memcpy(&lv_seqNum.c, pp_xid->data, 4);
sprintf((char *) &lv_xid, "%d:%d:%d:ID %d",
pp_xid->formatID,
pp_xid->gtrid_length,
pp_xid->bqual_length,
lv_seqNum.i);
return (char *) &lv_xid;
}
// CxaTM_TM Methods
// TM Default Constructor
CxaTM_TM::CxaTM_TM()
{
// Mutex attributes: Recursive = true, ErrorCheck=false
ip_mutex = new TM_Mutex(true, false);
iv_tm_stats = false;
lock();
iv_initialized = false;
my_nid(-1); // Indicates that the node hasn't been set yet.
iv_traceMask = XATM_TraceOff;
iv_RMmsgMax = MAX_NUM_RMMESSAGES;
iv_RMmsgSteadyLow = STEADYSTATE_LOW_RMMESSAGES;
iv_RMmsgSteadyHigh = STEADYSTATE_HIGH_RMMESSAGES;
iv_lastMonError = 0;
iv_RMmsgTotal = 0;
iv_RMmsgPoolThresholdEventCounter = 0;
// Initialize RMMessagePool
ip_RMMessagePool = new CTmPool<CxaTM_RMMessage>(gv_xaTM.tm_stats(), MAX_NUM_RMMESSAGES,
STEADYSTATE_LOW_RMMESSAGES, STEADYSTATE_HIGH_RMMESSAGES);
#ifndef XARM_BUILD_
ip_tmTimer = NULL;
#endif
iv_next_msgNum = 1;
unlock();
}
// TM Destructor
CxaTM_TM::~CxaTM_TM()
{
delete ip_mutex;
delete ip_RMMessagePool;
XATrace(XATM_TraceExit, ("XATM: CxaTM_TM::~CxaTM_TM Exit.\n"));
}
// TM deleteRM
// Remove RM from rmList, cleanup and delete the RM object.
void CxaTM_TM::deleteRM(CxaTM_RM *pp_RM)
{
XATrace(XATM_TraceDetail, ("XATM: CxaTM_TM::deleteRM ENTRY.\n"));
lock();
ia_rmList.remove(pp_RM->getRmid());
delete pp_RM;
unlock();
XATrace(XATM_TraceDetail, ("XATM: CxaTM_TM::deleteRM EXIT.\n"));
}
// TM lock semaphore
void CxaTM_TM::lock()
{
XATrace(XATM_TraceLock, ("XATM: CxaTM_TM::lock, count %d, owner %ld\n",
ip_mutex->lock_count(), ip_mutex->lock_owner()));
int lv_error = ip_mutex->lock();
if (lv_error)
{
XATrace(XATM_TraceError, ("XATM: CxaTM_TM::lock returned error %d.\n", lv_error));
abort();
}
}
// TM new RM
// Lookup the rmList for this rmid. If found, we assume
// this is a reopen and return the RM, otherwise
// instantiate a new CxaTM_RM object and return that.
// Return codes:
// XA_OK Success, RM object returned
// XA_RETRY Success, RM found in rmList (assume reopen)
// XAER_RMERR Failure, rmid already in use
int CxaTM_TM::newRM(int pv_rmid, CxaTM_RM **ppp_RM)
{
int lv_xaError = XA_OK;
XATrace(XATM_TraceExit,("XATM: CxaTM_TM::newRM ENTRY rmid (%d)\n", pv_rmid));
CxaTM_RM * lp_RM = (CxaTM_RM *) ia_rmList.get(pv_rmid);
// if we found the rmid in the rmList then return XA_RETRY to indicate
// we are reusing it. Assume it's a re-open.
if (lp_RM)
{
lv_xaError = XA_RETRY;
*ppp_RM = lp_RM;
}
else
{
// Instantiate and initialize the RM object
*ppp_RM = new CxaTM_RM(pv_rmid);
// Insert RM into the rmList.
lock();
ia_rmList.put(pv_rmid, *ppp_RM);
unlock();
}
XATrace((lv_xaError?XATM_TraceExitError:XATM_TraceExit),
("XATM: CxaTM_TM::newRM EXIT rmid(%d) returning %s.\n",
pv_rmid, XAtoa(lv_xaError)));
return lv_xaError;
} //CxaTM_TM::newRM
// CxaTM_TM::setAndGetNid
// Get the node number
// The first time this is called it will retrieve the value from
// the Monitor.
inline int CxaTM_TM::setAndGetNid()
{
lock();
if (my_nid() == -1)
{
msg_mon_get_process_info(NULL, &iv_my_nid, &iv_my_pid);
}
unlock();
return my_nid();
} //setAndGetNid
// CxaTM_TM::initialize
// Initialize the CxaTM_TM object
//
int CxaTM_TM::initialize(XATM_TraceMask pv_traceMask, bool pv_tm_stats, CTmTimer *pp_tmTimer)
{
char la_value[9];
bool lv_success = false;
int lv_error = 0;
setxaTrace(pv_traceMask);
lock();
gv_xaTM.setxaTrace(pv_traceMask);
iv_tm_stats = pv_tm_stats;
//initialize pool limits
int32 lv_max_num_RMmsgs=0;
int32 lv_ss_low_RMmsgs=0;
int32 lv_ss_high_RMmsgs=0;
if (pp_tmTimer != NULL)
{
ip_tmTimer = pp_tmTimer;
gv_startTime = ip_tmTimer->startTime();
// Configure RMMessagePool
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_MAX_NUM_RMMESSAGES,
la_value);
lv_max_num_RMmsgs = ((lv_error == 0)?atoi(la_value):-1);
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_STEADYSTATE_LOW_RMMESSAGES,
la_value);
lv_ss_low_RMmsgs = ((lv_error == 0)?atoi(la_value):-1);
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_STEADYSTATE_HIGH_RMMESSAGES,
la_value);
lv_ss_high_RMmsgs = ((lv_error == 0)?atoi(la_value):-1);
}
else
{
ip_tmTimer = NULL;
gv_startTime = Ctimeval::now();
lv_max_num_RMmsgs=1024;
lv_ss_low_RMmsgs=1;
lv_ss_high_RMmsgs=1024;
}
lv_success = ip_RMMessagePool->setConfig(gv_xaTM.tm_stats(), lv_max_num_RMmsgs,
lv_ss_low_RMmsgs, lv_ss_high_RMmsgs);
if (lv_success)
{
XATrace (XATM_TraceAPIError, ("RMMessage pool parameters set: "
"Max %d, steady state low %d, steady state high %d.\n",
lv_max_num_RMmsgs, lv_ss_low_RMmsgs, lv_ss_high_RMmsgs));
}
else
{
XATrace (XATM_TraceAPIError, ("Attempt to set RMMessage pool parameters failed: "
"Max %d, steady state low %d, steady state high %d.\n",
lv_max_num_RMmsgs, lv_ss_low_RMmsgs, lv_ss_high_RMmsgs));
}
iv_initialized = true;
unlock();
XATrace((lv_success?XATM_TraceExit:XATM_TraceExitError),
("XATM: CxaTM_TM::initialize EXIT returning %d.\n", lv_error));
return lv_error;
}
// CxaTM_TM::initializePhandle
// Initialize a phandle to zeros.
inline void CxaTM_TM::initalizePhandle(SB_Phandle_Type *pp_phandle)
{
memset(pp_phandle, 0, sizeof (SB_Phandle_Type));
}
// CxaTM_TM::setPhandle
// Set a (destination) phandle to a supplied (source) phandle.
inline void CxaTM_TM::setPhandle(SB_Phandle_Type *pp_destPhandle,
SB_Phandle_Type *pp_sourcePhandle)
{
memcpy(pp_destPhandle, pp_sourcePhandle, sizeof (SB_Phandle_Type));
}
// CxaTM_TM::mapMonErr_To_xaErr
// Map an error code returned by the Monitor into an XA error code.
inline int CxaTM_TM::mapMonErr_To_xaErr(int32 pv_error)
{
int lv_xaerror = XA_OK;
if (pv_error)
{
gv_xaTM.lastMonError(pv_error);
lv_xaerror = XAER_PROTO;
}
// To do: expand error handling
return lv_xaerror;
}
// TM unlock semaphore
void CxaTM_TM::unlock()
{
XATrace(XATM_TraceLock, ("XATM: CxaTM_TM::unlock, count %d, owner %ld\n",
ip_mutex->lock_count(), ip_mutex->lock_owner()));
int lv_error = ip_mutex->unlock();
if (lv_error)
{
XATrace(XATM_TraceError, ("XATM: CxaTM_TM::unlock returned error %d.\n", lv_error));
abort();
}
}
// TM xaTrace
// Returns the value of iv_trace.
bool CxaTM_TM::xaTrace(XATM_TraceMask pv_traceMask)
{
return ((pv_traceMask & iv_traceMask)? true: false);
}
// TM setxaTrace
// Sets the value of iv_traceMask.
// Note that because this is a mask it is concatenated to the mask unless
// set to 0. Ie:
// If pv_traceMask == 0, set iv_traceMask = 0
// If pv_traceMask > 0, iv_traceMask |= pv_traceMask; (bit-wise OR)
void CxaTM_TM::setxaTrace(XATM_TraceMask pv_traceMask)
{
XATM_TraceMask iv_OldMask = iv_traceMask;
lock();
if (pv_traceMask == XATM_TraceOff)
iv_traceMask = XATM_TraceOff;
else
iv_traceMask = (XATM_TraceMask) (iv_traceMask | pv_traceMask);
gv_XATM_traceMask = iv_traceMask;
unlock();
// Don't use XATrace here as we always want to write the trace record.
if (iv_OldMask != XATM_TraceOff && pv_traceMask == XATM_TraceOff)
trace_printf("XATM:Tracing off.\n");
else
if (pv_traceMask != XATM_TraceOff)
trace_printf("XATM: Tracing on, Mask=0x%x.\n", iv_traceMask);
}
// ------------------------------------------------------------------
// RM Message object related helper functions:
// ------------------------------------------------------------------
// new_RMmsg
// Purpose : Allocate a new RM message object.
// This could be taken from the freeList or
// instantiate a new CTM_RMMessage object, depending on the
// number of current RM Message objects and configuration of the
// RMMessagePool.
// Returns a pointer to the instantiated message if successful or
// NULL if it fails to allocate a message object.
// ------------------------------------------------------------------
CxaTM_RMMessage * CxaTM_TM::new_RMmsg()
{
CxaTM_RMMessage * lp_msg = NULL;
bool lv_reused = false;
XATrace(XATM_TraceRMMsg, ("CxaTM_TM::new_RMmsg : ENTRY.\n"));
lock();
int64 lv_next_msgNum = gv_xaTM.next_msgNum();
XATrace(XATM_TraceRMMsg, ("CxaTM_TM::new_RMmsg : Calling CTmPool<CxaTM_RMMessage>::newElement "
"next index is " PFLL ", poolInUseCount=" PFLL ", freeCount=" PFLL ".\n",
lv_next_msgNum, ip_RMMessagePool->get_inUseList()->size(),
ip_RMMessagePool->get_freeList()->size()));
lp_msg = ip_RMMessagePool->newElement(lv_next_msgNum, &lv_reused, true);
//, (void *) &CxaTM_RMMessage::constructPoolElement);
if (lp_msg == NULL)
{
tm_log_event(DTM_XATM_MAX_RM_MSG, SQ_LOG_CRIT,"DTM_XATM_MAX_RM_MSG",
-1, /*error_code*/
-1, /*rmid*/
setAndGetNid(), /*dtmid*/
-1, /*seq_num*/
-1, /*msgid*/
-1, /*xa_error*/
ip_RMMessagePool->get_maxPoolSize(), /*pool_size*/
ip_RMMessagePool->totalElements() /*pool_elems*/);
XATrace (XATM_TraceError, ("CxaTM_TM::new_RMmsg : RM Message pool max elements hit. "
"RMMessage object not allocated by CTmPool<CxaTM_RMMessage>::newElement. "
"Current pool size=%d, Pool maxSize=%d\n",
ip_RMMessagePool->get_maxPoolSize(), ip_RMMessagePool->totalElements()));
}
else
{
if (!lv_reused)
gv_xaTM.inc_next_msgNum();
XATrace(XATM_TraceRMMsg, ("CxaTM_TM:new_RMmsg : CTmPool<CxaTM_RMMessage>::newElement returned "
"reused %d, RMMessage object %p, msgNum " PFLL ", poolInUseCount=" PFLL ", freeCount=" PFLL ".\n",
lv_reused, (void *) lp_msg, lp_msg->msgNum(), ip_RMMessagePool->get_inUseList()->size(),
ip_RMMessagePool->get_freeList()->size()));
}
unlock();
XATrace(XATM_TraceRMMsg, ("CxaTM_TM::new_RMmsg : EXIT returning %p.\n", lp_msg));
return lp_msg;
} //new_RMmsg
// ------------------------------------------------------------------
// release_RMmsg
// Purpose : Release an RMmsg object.
// Return to the RMMessagePool.
// NOTE: Best practice is for the caller should remove the RM message
// from the RMs RM message list prior to calling release_RMmsg.
// ------------------------------------------------------------------
void CxaTM_TM::release_RMmsg(CxaTM_RMMessage *pp_msg)
{
XATrace(XATM_TraceRMMsg, ("CxaTM_TM::release_RMmsg : ENTRY msg object %p"
", rmid %d, msgNum " PFLL ", msgid %d.\n",
(void *) pp_msg, pp_msg->RM()->getRmid(),
pp_msg->msgNum(), pp_msg->msgid()));
lock();
// Check the RM objects message list and remove this message if it's still there
pp_msg->RM()->msgList()->remove(pp_msg->msgid());
XATrace(XATM_TraceRMMsg, ("CxaTM_TM::releaseRMmsg : Calling CTmPool<CxaTM_RMMessage>::deleteElement "
"index " PFLL ", poolInUseCount=" PFLL ", freeCount=" PFLL ".\n", pp_msg->msgNum(),
ip_RMMessagePool->get_inUseList()->size(),
ip_RMMessagePool->get_freeList()->size()));
ip_RMMessagePool->deleteElement(pp_msg->msgNum());
unlock();
XATrace(XATM_TraceRMMsg, ("CxaTM_TM::release_RMmsg : EXIT, poolInUseCount=" PFLL ", freeCount=" PFLL " .\n",
ip_RMMessagePool->get_inUseList()->size(),
ip_RMMessagePool->get_freeList()->size()));
} //CxaTM_TM::release_RMmsg
// ------------------------------------------------------------------
// CxaTM_TM::insert_txnMsgList
// Purpose : Insert the message into the transaction msgList
// Note: this method assumes the XID contains a transid
// ------------------------------------------------------------------
void CxaTM_TM::insert_txnMsgList(XID *pp_xid, CxaTM_RMMessage * pp_msg)
{
XATrace(XATM_TraceExit, ("CxaTM_TM::insert_txnMsgList : Entry. xid=%s, msgNum=" PFLL ", msgid=%d\n",
XIDtoa(pp_xid), pp_msg->msgNum(), pp_msg->msgid()));
lock();
pp_msg->xid(pp_xid);
gv_xaTM.txnMsgList()->put(XIDtotransid(pp_xid), pp_msg);
unlock();
} //CxaTM_TM::insert_txnMsgList
// ------------------------------------------------------------------
// CxaTM_TM::cancelAll
// Purpose : cancel all outstanding requests associated with the
// transaction pv_transid supplied and remove from the transaction & RM msgLists.
// pv_count is the number of messages outstanding after complete_all.
// pv_numMsgs is the number of messages sent.
// ------------------------------------------------------------------
void CxaTM_TM::cancelAll(int32 pv_count, int32 pv_numMsgs, int64 pv_transid)
{
int32 lv_index = 0;
bool lv_locked = false;
XATrace(XATM_TraceExit, ("CxaTM_TM::cancelAll : ENTRY msgs to be cancelled %d, outstanding msgs %d, "
"transid " PFLLX "\n", pv_count, pv_numMsgs, pv_transid));
if (pv_count > 0)
{
lock();
lv_locked = true;
CxaTM_RMMessage *lp_msg = (CxaTM_RMMessage *) ia_txnMsgList.get_first(pv_transid);
while (lv_index++ < pv_count && lp_msg != NULL)
{
// We check the transid on the RMMessage object to make sure it still applies to
// this transaction and hasn't been released and reused.
// Also need to make sure the message is still valid and in the list before we abandon!
int64 lv_transid = XIDtotransid(lp_msg->xid());
if (pv_transid == lv_transid && lp_msg->msgid() && lp_msg->RM()
&& lp_msg->RM()->msgList()->get(lp_msg->msgid()))
{
#ifndef XARM_BUILD_
if (lp_msg->EIDState() != EID_STATE_INUSE)
{
tm_log_event(DTM_XATM_MSG_INTEGRITY_FAILED1, SQ_LOG_CRIT, "DTM_XATM_MSG_INTEGRITY_FAILED1",
-1, lp_msg->RM()->getRmid(), -1, -1, lp_msg->msgid(),
-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, NULL, gv_xaTM.setAndGetNid());
XATrace(XATM_TraceError, ("CxaTM_TM::cancelAll: ERROR Msg object EID not in use!\n"));
abort();
}
#endif
XATrace(XATM_TraceDetail, ("CxaTM_TM::cancelAll : ID transid " PFLLX ": cancelling "
"message: msgNum " PFLL ", msgid %d\n", pv_transid, lp_msg->msgNum(), lp_msg->msgid()));
lp_msg->abandon();
//lp_msg->cancel(true /*releaseMsg*/);
}
lp_msg = (CxaTM_RMMessage *) ia_txnMsgList.get_next(pv_transid);
}
ia_txnMsgList.get_end();
}
//Remove all entries for this seqNum.
int lv_countRemoved = ia_txnMsgList.remove_all(pv_transid);
if (lv_locked)
unlock();
XATrace(XATM_TraceExit, ("CxaTM_TM::cancelAll : EXIT transid " PFLLX ", removed %d "
"elements from txnMsgList.\n", pv_transid, lv_countRemoved));
} //CxaTM_TM::cancelAll
// ------------------------------------------------------------------
// CxaTM_TM::check_msgIntegrity
// Purpose : Lookup pp_msg in the txnMsgList. This is used to
// match replies to requests to check the transaction thread is
// receiving the right messages.
// NOTE: This routine asserts that the sending thread also completes
// requests to subordinate RM.
// ------------------------------------------------------------------
bool CxaTM_TM::check_msgIntegrity(int64 pv_transid, CxaTM_RMMessage * pp_msg)
{
bool lv_found = false;
long int lv_my_threadId = SB_Thread::Sthr::self_id();
XATrace(XATM_TraceExit, ("CxaTM_TM::check_msgIntegrity : ENTRY transid " PFLLX ", msgid %d\n",
pv_transid, pp_msg->msgid()));
// Ignore if the transid wasn't supplied
if (pv_transid == 0)
return true;
lock();
CxaTM_RMMessage *lp_txnMsgList_msg = (CxaTM_RMMessage *) ia_txnMsgList.get_first(pv_transid);
while (!lv_found && lp_txnMsgList_msg != NULL)
{
if (pp_msg == lp_txnMsgList_msg)
{
lv_found = true;
// Double check thread
if (lp_txnMsgList_msg->threadId() != lv_my_threadId)
{
tm_log_event(DTM_XATM_MSG_INTEGRITY_FAILED2, SQ_LOG_CRIT, "DTM_XATM_MSG_INTEGRITY_FAILED2",
-1, pp_msg->RM()->getRmid(), -1, -1, pp_msg->msgid(),
-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, NULL, gv_xaTM.setAndGetNid());
XATrace(XATM_TraceError, ("CxaTM_TM::check_msgIntegrity: ERROR "
"completion thread %ld does not match sender %ld! ID "
"transid " PFLLX ", msgid %d\n",
lv_my_threadId, lp_txnMsgList_msg->threadId(),
pv_transid, pp_msg->msgid()));
abort();
}
}
else
lp_txnMsgList_msg = (CxaTM_RMMessage *) ia_txnMsgList.get_next(pv_transid);
}
ia_txnMsgList.get_end();
unlock();
if (!lv_found)
{
tm_log_event(DTM_XATM_MSG_INTEGRITY_FAILED3, SQ_LOG_CRIT, "DTM_XATM_MSG_INTEGRITY_FAILED3",
-1, pp_msg->RM()->getRmid(), -1, -1, pp_msg->msgid(),
-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, NULL, gv_xaTM.setAndGetNid());
XATrace(XATM_TraceError, ("CxaTM_TM::check_msgIntegrity: ERROR "
"message returned by Seabed not found in txnMsgList! ID "
"transid " PFLLX ", msgid %d\n",
pv_transid, pp_msg->msgid()));
abort();
}
XATrace(XATM_TraceExit, ("CxaTM_TM::check_msgIntegrity : EXIT transid " PFLLX ", msgid %d "
"returning %d.\n", pv_transid, pp_msg->msgid(), lv_found));
return lv_found;
} //CxaTM_TM::check_msgIntegrity
//------------------------------
// CxaTM_RM Methods
//------------------------------
// Utility methods
// RM Default Constructor
CxaTM_RM::CxaTM_RM()
{
// Mutex attributes: Recursive = true, ErrorCheck=false
ip_mutex = new TM_Mutex(true, false);
lock();
iv_rmid = -1;
// Set the flag to say we haven't completed an xa_recover call
iv_recoverEnd = false;
iv_maxSendRetries = MAX_TSE_SEND_RETRIES;
memset(iv_xarmName, 0, RMNAMESZ);
memset(ia_xarmInfo, 0, MAXINFOSIZE);
memset(&iv_phandle, 0, sizeof(SB_Phandle_Type));
memset(iv_tmName, 0, MAXPROCESSNAME);
xarmFlags(TMNOFLAGS);
iv_totalRetries = 0;
unlock();
}
// RM Constructor
// This should always be used when instantiating an RM object.
CxaTM_RM::CxaTM_RM(int pv_rmid)
{
// Mutex attributes: Recursive = true, ErrorCheck=false
ip_mutex = new TM_Mutex(true, false);
lock();
iv_rmid = pv_rmid;
// Set the flag to say we haven't completed an xa_recover call
iv_recoverEnd = false;
iv_maxSendRetries = MAX_TSE_SEND_RETRIES;
memset(iv_xarmName, 0, RMNAMESZ);
memset(ia_xarmInfo, 0, MAXINFOSIZE);
memset(iv_tmName, 0, MAXPROCESSNAME);
memset(&iv_phandle, 0, sizeof(SB_Phandle_Type));
xarmFlags(TMNOFLAGS);
iv_totalRetries = 0;
unlock();
}
// CxaTM_RM Destructor
CxaTM_RM::~CxaTM_RM()
{
lock();
// Close the RM process now.
msg_mon_close_process(getRmPhandle());
// Cleanup any messages in the RMs Outstanding Messages list.
cleanup_msgList();
unlock();
delete ip_mutex;
XATrace(XATM_TraceExit, ("XATM: CxaTM_RM::~CxaTM_RM EXIT.\n"));
}
//------------------------------------------------------------------------
// cleanup_msgList
// Purpose : Release all messages on the RMs outstanding message list.
//------------------------------------------------------------------------
void CxaTM_RM::cleanup_msgList()
{
CxaTM_RMMessage *lp_cur_msg;
CxaTM_RMMessage *lp_next_msg;
XATrace(XATM_TraceExit, ("XATM: CxaTM_RM::cleanup_msgList ENTRY.\n"));
lock();
lp_cur_msg = (CxaTM_RMMessage *) iv_msgList.get_first();
while (lp_cur_msg != NULL)
{
lp_next_msg = (CxaTM_RMMessage *) iv_msgList.get_next();
gv_xaTM.release_RMmsg(lp_cur_msg);
lp_cur_msg = lp_next_msg;
}
iv_msgList.get_end();
unlock();
XATrace(XATM_TraceExit, ("XATM: CxaTM_RM::cleanup_msgList EXIT.\n"));
}
int CxaTM_RM::getRmid()
{
return iv_rmid;
}
//------------------------------------------------------------------------
// inc_totalRetries
// Purpose : Increment the totalRetries.
// Returns true if totalRetries is a multiple of TM_RM_DOWN_LOGEVENT_INTERVAL.
//------------------------------------------------------------------------
bool CxaTM_RM::inc_totalRetries()
{
bool lv_rtn = false;
lock();
iv_totalRetries++;
if (iv_totalRetries % TM_RM_DOWN_LOGEVENT_INTERVAL == 0)
lv_rtn = true;
unlock();
return lv_rtn;
}
// CxaTM_RM::lock
// Lock the RM semaphore
// Now using recursive semaphores
void CxaTM_RM::lock()
{
XATrace(XATM_TraceLock, ("XATM: CxaTM_RM::lock rmid %d: count %d, owner %ld acquirer %ld\n",
iv_rmid, ip_mutex->lock_count(), ip_mutex->lock_owner(), SB_Thread::Sthr::self_id()));
int lv_error = ip_mutex->lock();
if (lv_error)
{
XATrace(XATM_TraceError, ("XATM: CxaTM_RM::lock returned error %d.\n", lv_error));
abort();
}
else
{
XATrace(XATM_TraceLock, ("XATM: CxaTM_RM::lock rmid %d: acquired.\n", iv_rmid));
}
}
// --------------------------------------------------------------
// CxaTM_RM::send_rm
// Purpose - send a message to the RM
// Caller must lock CxaTM_RM object.
// --------------------------------------------------------------
int CxaTM_RM::send_rm(CxaTM_RMMessage * pp_msg)
{
XATrace(XATM_TraceExit,("XATM: CxaTM_RM::send_rm ENTRY.\n"));
lock();
int lv_error = pp_msg->send_rm();
if (!lv_error)
{
// Add message to RMs outstanding RM message list.
iv_msgList.put(pp_msg->msgid(), pp_msg);
XATrace(XATM_TraceDetail, ("XATM: CxaTM_RM::send_rm added message(%d) to "
"RM outstanding message list.\n", pp_msg->msgid()));
}
unlock();
return lv_error;
} //CxaTM_RM::send_rm
// --------------------------------------------------------------------
// CxaTM_RM::getRmPhandle
// Get the phandle of RM.
// --------------------------------------------------------------------
SB_Phandle_Type * CxaTM_RM::getRmPhandle()
{
return (SB_Phandle_Type *) &iv_phandle;
}
// --------------------------------------------------------------------
// CxaTM_RM::recoverSend
// Send an xa_recover message to the RM
// --------------------------------------------------------------------
int CxaTM_RM::recoverSend(int64 pv_count, int64 pv_flags, int pv_node, bool pv_dead_node, int pv_index)
{
int lv_xaError = XA_OK;
XATrace(XATM_TraceExit,("XATM: CxaTM_RM::recoverSend ENTRY.\n"));
lock();
CxaTM_RMMessage * lp_msg = gv_xaTM.new_RMmsg();
if (lp_msg == NULL)
{
lv_xaError = XAER_RMFAIL;
}
else
{
lp_msg->initialize(this, TM_DP2_SQ_XA_RECOVER,
RM_MsgSize(lp_msg->Req()->u.iv_recover),
RM_MsgSize(lp_msg->Rsp()->u.iv_recover));
lp_msg->Req()->u.iv_recover.iv_rmid = getRmid();
lp_msg->Req()->u.iv_recover.iv_flags = pv_flags;
lp_msg->Req()->u.iv_recover.iv_count = MIN(pv_count, MAX_RECOVER_XIDS);
lp_msg->Req()->u.iv_recover.iv_recovery_index = pv_index;
if (pv_node != -1)
{
lp_msg->Req()->u.iv_recover.iv_dtm_death = pv_dead_node;
lp_msg->Req()->u.iv_recover.iv_dtm_node = pv_node;
}
else
{
lp_msg->Req()->u.iv_recover.iv_dtm_death = false;
lp_msg->Req()->u.iv_recover.iv_dtm_node = -1;
}
lv_xaError = send_rm(lp_msg);
}
// release the message object back to the pool if there was an error
if (lv_xaError && lp_msg)
gv_xaTM.release_RMmsg(lp_msg);
unlock();
XATrace((lv_xaError?XATM_TraceExitError:XATM_TraceExit),
("XATM: CxaTM_RM::recoverSend EXIT returning %s.\n",
XAtoa(lv_xaError)));
return lv_xaError;
} //recoverSend
// --------------------------------------------------------------------
// CxaTM_RM::setRmPhandle
// Set the phandle of RM specified by pp_name.
// --------------------------------------------------------------------
int CxaTM_RM::setRmPhandle(char *pp_name)
{
SB_Phandle_Type la_rm_phandle;
int32 lv_error = 0;
int32 lv_oid;
int lv_xaError = XA_OK;
CxaTM_TM::initalizePhandle((SB_Phandle_Type *) &la_rm_phandle);
lv_error = msg_mon_open_process(pp_name,
&la_rm_phandle,
&lv_oid);
if (lv_error)
lv_xaError = CxaTM_TM::mapMonErr_To_xaErr(lv_error);
if (lv_xaError == XA_OK)
{
lock();
CxaTM_TM::setPhandle((SB_Phandle_Type *) &iv_phandle, (SB_Phandle_Type *) &la_rm_phandle);
unlock();
}
else
XATrace(XATM_TraceError,
("XATM: CxaTM_RM::setRmPhandle returning %s. msg_mon_open_process returned error %d.\n",
XAtoa(lv_xaError),
lv_error));
return lv_xaError;
} //CxaTM_RM::setRmPhandle
// CxaTM_RM::unlock
// Unlock the RM semaphore
void CxaTM_RM::unlock()
{
XATrace(XATM_TraceLock, ("XATM: CxaTM_RM::unlock rmid %d: count %d, owner %ld\n",
iv_rmid, ip_mutex->lock_count(), ip_mutex->lock_owner()));
int lv_error = ip_mutex->unlock();
if (lv_error)
{
XATrace(XATM_TraceError, ("XATM: CxaTM_RM::unlock returned error %d.\n", lv_error));
abort();
}
}
//-------------------------------------------------------------------
// CxaTM_RM::validateRMname
// Purpose : Extract and validate the RM name from xa_info passed
// in a standard XARM xa_open call.
// Checks the pp_info is formatted correctly and
// extracts the rm name which is returned in
// pp_rmName.
// Format must be "RM=<name>, FLAGS=<flags>"
//-------------------------------------------------------------------
short CxaTM_RM::validateRMname(const char * pp_info, char * pp_rmName)
{
char *tmp_p = (char *) pp_info;
char *eq_char = NULL;
char var[MAXVARNAME];
char value[RMNAMESZ];
int64 my_flags = -1;
char la_buf[DTM_STRING_BUF_SIZE];
while (tmp_p != NULL)
{
//make sure it is of the form a=b
if ((eq_char = strchr(tmp_p,'=')) == NULL)
{
XATrace(XATM_TraceError, ("XATM: CxaTM_RM::validateRMname: malformed "
"openinfo string %s. Returning XAER_INVAL",tmp_p));
return XAER_INVAL;
}
int len = eq_char - tmp_p;
strncpy(var,tmp_p,MAXVARNAME);
var[len] = '\0';
strncpy(value, eq_char + 1, RMNAMESZ - 1);
if (!strcmp(var,"RM"))
{
/*
* Save the RM name. Make sure that it is less than
* 31 characters in length and should contain only
* valid characters: [a-zA-Z0-9$_-^@&]+. Last character
* must be null.
*/
strncpy(pp_rmName,value,RMNAMESZ);
XATrace(XATM_TraceDetail, ("XATM: CxaTM_RM::validateRMname: xa_open: RM = %s\n",
pp_rmName));
}
else if (!strcmp(var,"FLAGS"))
{
// No flags supported at this stage
my_flags = atoi(value);
/* Not sure what values we will permit at this stage.
if (my_flags & ~(ENABLE_AUDIT|ENABLE_STATS|ENABLE_ASYNC_IMPORT))
{
XATrace(XATM_TraceError, ("XATM: CxaTM_RM::validateRMname: xa_open: "
"Unknown flags (0x%08X) specified.\n",
my_flags & ~(ENABLE_AUDIT|ENABLE_STATS|ENABLE_ASYNC_IMPORT)));
return XAER_INVAL;
} */
XATrace(XATM_TraceDetail, ("XATM: CxaTM_RM::validateRMname: xa_open: FLAGS = 0x" PFLLX "\n",
my_flags));
}
else
{
sprintf(la_buf, "XATM: xa_open: malformed openinfo string %s.\n", tmp_p);
tm_log_write(DTM_XATM_XA_OPEN_FAILED, SQ_LOG_WARNING, la_buf);
XATrace (XATM_TraceError, ("XATM: CxaTM_RM::validateRMname: %s", la_buf));
return XAER_INVAL;
}
tmp_p = strtok(NULL,":");
} // while
xarmName(pp_rmName);
xarmFlags(my_flags);
return XA_OK;
} // CxaTM_RM::validateRMname
// xa Implementation methods
//-------------------------------------------------------------------
// CxaTM_RM::close
// Send the xa_close message and close the RM.
//-------------------------------------------------------------------
int CxaTM_RM::close(char *info, int64 pv_flags)
{
int lv_xaError = XA_OK;
lock();
CxaTM_RMMessage * lp_msg = gv_xaTM.new_RMmsg();
if (lp_msg == NULL)
{
lv_xaError = XAER_RMFAIL;
}
else
{
lp_msg->initialize(this, TM_DP2_SQ_XA_CLOSE,
RM_MsgSize(lp_msg->Req()->u.iv_close),
RM_MsgSize(lp_msg->Rsp()->u.iv_close));
lp_msg->Req()->u.iv_close.iv_rmid = getRmid();
sprintf(lp_msg->Req()->u.iv_close.iv_info, info);
lp_msg->Req()->u.iv_close.iv_flags = pv_flags;
lp_msg->Req()->u.iv_start.iv_nid = gv_xaTM.my_nid();
lp_msg->Req()->u.iv_start.iv_pid = gv_xaTM.my_pid();
lv_xaError = send_rm(lp_msg);
}
// release the message object back to the pool if there was an error
if (lv_xaError && lp_msg)
gv_xaTM.release_RMmsg(lp_msg);
unlock();
if (lp_msg && (pv_flags & TMASYNC) && (lv_xaError == XA_OK))
return lp_msg->msgid();
else
return lv_xaError;
} // CxaTM_RM::close
//-------------------------------------------------------------------
// CxaTM_RM::commit
// Send xa_commit message to RM.
//-------------------------------------------------------------------
int CxaTM_RM::commit(XID *pp_xid, int64 pv_flags)
{
int lv_xaError = XA_OK;
lock();
CxaTM_RMMessage * lp_msg = gv_xaTM.new_RMmsg();
if (lp_msg == NULL)
{
lv_xaError = XAER_RMFAIL;
}
else
{
lp_msg->initialize(this, TM_DP2_SQ_XA_COMMIT,
RM_MsgSize(lp_msg->Req()->u.iv_commit),
RM_MsgSize(lp_msg->Rsp()->u.iv_commit));
lp_msg->Req()->u.iv_commit.iv_xid = *pp_xid;
lp_msg->Req()->u.iv_commit.iv_rmid = getRmid();
lp_msg->Req()->u.iv_commit.iv_flags = pv_flags;
lp_msg->Req()->u.iv_start.iv_nid = gv_xaTM.my_nid();
lp_msg->Req()->u.iv_start.iv_pid = gv_xaTM.my_pid();
lv_xaError = send_rm(lp_msg);
}
// release the message object back to the pool if there was an error
if (lp_msg)
{
if (lv_xaError)
gv_xaTM.release_RMmsg(lp_msg);
else
gv_xaTM.insert_txnMsgList(pp_xid, lp_msg);
}
unlock();
if (lp_msg && (pv_flags & TMASYNC) && (lv_xaError == XA_OK))
return lp_msg->msgid();
else
return lv_xaError;
} //CxaTM_RM::commit
//-------------------------------------------------------------------
// CxaTM_RM::end
// Send an xa_end message to RM.
//-------------------------------------------------------------------
int CxaTM_RM::end(XID *pp_xid, int64 pv_flags)
{
int lv_xaError = XA_OK;
lock();
CxaTM_RMMessage * lp_msg = gv_xaTM.new_RMmsg();
if (lp_msg == NULL)
{
lv_xaError = XAER_RMFAIL;
}
else
{
lp_msg->initialize(this, TM_DP2_SQ_XA_END,
RM_MsgSize(lp_msg->Req()->u.iv_end),
RM_MsgSize(lp_msg->Rsp()->u.iv_end));
lp_msg->Req()->u.iv_end.iv_xid = *pp_xid;
lp_msg->Req()->u.iv_end.iv_rmid = getRmid();
lp_msg->Req()->u.iv_end.iv_flags = pv_flags;
lp_msg->Req()->u.iv_start.iv_nid = gv_xaTM.my_nid();
lp_msg->Req()->u.iv_start.iv_pid = gv_xaTM.my_pid();
lv_xaError = send_rm(lp_msg);
}
// release the message object back to the pool if there was an error
if (lp_msg)
{
if (lv_xaError)
gv_xaTM.release_RMmsg(lp_msg);
else
gv_xaTM.insert_txnMsgList(pp_xid, lp_msg);
}
unlock();
if (lp_msg && (pv_flags & TMASYNC) && (lv_xaError == XA_OK))
return lp_msg->msgid();
else
return lv_xaError;
} // CxaTM_RM::end
//-------------------------------------------------------------------
// CxaTM_RM::forget
// Send an xa_forget message to RM.
//-------------------------------------------------------------------
int CxaTM_RM::forget(XID *pp_xid, int64 pv_flags)
{
int lv_xaError = XA_OK;
lock();
CxaTM_RMMessage * lp_msg = gv_xaTM.new_RMmsg();
if (lp_msg == NULL)
{
lv_xaError = XAER_RMFAIL;
}
else
{
lp_msg->initialize(this, TM_DP2_SQ_XA_FORGET,
RM_MsgSize(lp_msg->Req()->u.iv_forget),
RM_MsgSize(lp_msg->Rsp()->u.iv_forget));
lp_msg->Req()->u.iv_forget.iv_xid = *pp_xid;
lp_msg->Req()->u.iv_forget.iv_rmid = getRmid();
lp_msg->Req()->u.iv_forget.iv_flags = pv_flags;
lp_msg->Req()->u.iv_start.iv_nid = gv_xaTM.my_nid();
lp_msg->Req()->u.iv_start.iv_pid = gv_xaTM.my_pid();
lv_xaError = send_rm(lp_msg);
}
// release the message object back to the pool if there was an error
if (lp_msg)
{
if (lv_xaError)
gv_xaTM.release_RMmsg(lp_msg);
else
gv_xaTM.insert_txnMsgList(pp_xid, lp_msg);
}
unlock();
if (lp_msg && (pv_flags & TMASYNC) && (lv_xaError == XA_OK))
return lp_msg->msgid();
else
return lv_xaError;
} // CxaTM_RM::forget
//-------------------------------------------------------------------
// CxaTM_RM::open
// Open the RM.
// Send xa_open message to RM.
//-------------------------------------------------------------------
int CxaTM_RM::open(char *pp_info, int64 pv_flags)
{
int lv_xaError = XA_OK;
xarmFlags(pv_flags);
char lv_xarmName[RMNAMESZ];
RM_Open_struct *lp_info = NULL;
CxaTM_RMMessage *lp_msg = NULL;
lock();
if (tm_XARM_generic_library())
{
// for standard XARM, this will contain the RM name in the form:
// "RM=<rm name>" We want to extract the <rm name> and save it.
lv_xaError = validateRMname(pp_info, (char *) &lv_xarmName);
xarmInfo(pp_info);
// For now we always go to the TM in our node.
sprintf (iv_tmName, "$tm%d", gv_xaTM.setAndGetNid());
lv_xaError = setRmPhandle(iv_tmName);
XATrace(XATM_TraceExit,("XATM: CxaTM_RM::open XARM xa_open ENTRY. "
"RM type %s, rmid %d (%s), associated TM %s, XA error %d.\n",
xarmName(), getRmid(), xarmInfo(), iv_tmName, lv_xaError));
char lv_generic_name[MAXPROCESSNAME + RMNAMESZ + MAXINFOSIZE + 10];
sprintf((char *) &lv_generic_name, "TM_%s:RM_%s:%s", iv_tmName, xarmName(), xarmInfo());
tm_log_event(DTM_XATM_OPEN_GENERIC, SQ_LOG_INFO, "DTM_XATM_OPEN_GENERIC",
-1,getRmid(),gv_xaTM.setAndGetNid(),-1,-1,lv_xaError,-1,-1,-1,-1,
-1,-1,-1,-1,-1,-1,(char *) &lv_generic_name);
}
else
{
lp_info = (RM_Open_struct *) pp_info;
lv_xaError = setRmPhandle((char *) &lp_info->process_name);
xarmName(tm_switch->name); // For DTM - T SE XA interface we use the generic RM name for now TODO
xarmInfo(pp_info);
XATrace(XATM_TraceError,("XATM: CxaTM_RM::open TSE ENTRY nid %d, rmid %d (%s) XA error %d.\n",
gv_xaTM.setAndGetNid(), getRmid(), getRMnameDetail(), lv_xaError));
tm_log_event(DTM_XATM_OPEN_TSE, SQ_LOG_INFO, "DTM_XATM_OPEN_TSE",
-1,getRmid(),gv_xaTM.setAndGetNid(),-1,-1,lv_xaError,-1,-1,-1,-1,
-1,-1,-1,-1,-1,-1,getRMnameDetail());
}
if (lv_xaError == XA_OK)
{
// Send xa_open message
lp_msg = gv_xaTM.new_RMmsg();
if (lp_msg == NULL)
{
lv_xaError = XAER_RMFAIL;
}
else
{
lp_msg->initialize(this, TM_DP2_SQ_XA_OPEN,
RM_MsgSize(lp_msg->Req()->u.iv_open),
RM_MsgSize(lp_msg->Rsp()->u.iv_open));
lp_msg->Req()->u.iv_open.iv_tm_nid = gv_xaTM.setAndGetNid();
lp_msg->Req()->u.iv_open.iv_incarnation_num = (lp_info)?lp_info->incarnation_num:0;
lp_msg->Req()->u.iv_open.iv_seq_num_minimum = (lp_info)?lp_info->seq_num_block_start:0;
lp_msg->Req()->u.iv_open.iv_rmid = getRmid();
strcpy(lp_msg->Req()->u.iv_open.iv_info, pp_info);
lp_msg->Req()->u.iv_start.iv_nid = gv_xaTM.my_nid();
lp_msg->Req()->u.iv_start.iv_pid = gv_xaTM.my_pid();
lp_msg->Rsp()->u.iv_open.iv_ax_reg = 1;
lv_xaError = send_rm(lp_msg);
}
// release the message object back to the pool if there was an error
if (lv_xaError && lp_msg)
gv_xaTM.release_RMmsg(lp_msg);
}
unlock();
XATrace((lv_xaError?XATM_TraceExitError:XATM_TraceExit),
("XATM: CxaTM_RM::open EXIT returning %s, nid(%d), rmid(%d).\n",
XAtoa(lv_xaError), gv_xaTM.setAndGetNid(),
getRmid()));
if (lp_msg && (pv_flags & TMASYNC) && (lv_xaError == XA_OK))
return lp_msg->msgid();
else
return lv_xaError;
} //CxaTM_RM::open
//-------------------------------------------------------------------
// CxaTM_RM::prepare
// Send xa_prepare message to RM.
//-------------------------------------------------------------------
int CxaTM_RM::prepare(XID *pp_xid, int64 pv_flags)
{
int lv_xaError = XA_OK;
lock();
CxaTM_RMMessage * lp_msg = gv_xaTM.new_RMmsg();
if (lp_msg == NULL)
{
lv_xaError = XAER_RMFAIL;
}
else
{
lp_msg->initialize(this, TM_DP2_SQ_XA_PREPARE,
RM_MsgSize(lp_msg->Req()->u.iv_prepare),
RM_MsgSize(lp_msg->Rsp()->u.iv_prepare));
lp_msg->Req()->u.iv_prepare.iv_xid = *pp_xid;
lp_msg->Req()->u.iv_prepare.iv_rmid = getRmid();
lp_msg->Req()->u.iv_prepare.iv_flags = pv_flags;
lp_msg->Req()->u.iv_start.iv_nid = gv_xaTM.my_nid();
lp_msg->Req()->u.iv_start.iv_pid = gv_xaTM.my_pid();
lv_xaError = send_rm(lp_msg);
}
// release the message object back to the pool if there was an error
if (lp_msg)
{
if (lv_xaError)
gv_xaTM.release_RMmsg(lp_msg);
else
gv_xaTM.insert_txnMsgList(pp_xid, lp_msg);
}
unlock();
if (lp_msg && (pv_flags & TMASYNC) && (lv_xaError == XA_OK))
return lp_msg->msgid();
else
return lv_xaError;
} //CxaTM_RM::prepare
//-------------------------------------------------------------------
// CxaTM_RM::start
// Send xa_start message to RM.
//-------------------------------------------------------------------
int CxaTM_RM::start(XID *pp_xid, int64 pv_flags)
{
int lv_xaError = XA_OK;
lock();
CxaTM_RMMessage * lp_msg = gv_xaTM.new_RMmsg();
if (lp_msg == NULL)
{
lv_xaError = XAER_RMFAIL;
}
else
{
lp_msg->initialize(this, TM_DP2_SQ_XA_START,
RM_MsgSize(lp_msg->Req()->u.iv_start),
RM_MsgSize(lp_msg->Rsp()->u.iv_start));
lp_msg->Req()->u.iv_start.iv_xid = *pp_xid;
lp_msg->Req()->u.iv_start.iv_rmid = getRmid();
lp_msg->Req()->u.iv_start.iv_flags = pv_flags;
lp_msg->Req()->u.iv_start.iv_nid = gv_xaTM.my_nid();
lp_msg->Req()->u.iv_start.iv_pid = gv_xaTM.my_pid();
lv_xaError = send_rm(lp_msg);
}
// release the message object back to the pool if there was an error
if (lp_msg)
{
if (lv_xaError)
gv_xaTM.release_RMmsg(lp_msg);
else
gv_xaTM.insert_txnMsgList(pp_xid, lp_msg);
}
unlock();
if (lp_msg && (pv_flags & TMASYNC) && (lv_xaError == XA_OK))
return lp_msg->msgid();
else
return lv_xaError;
} //CxaTM_RM::start
//-------------------------------------------------------------------
// CxaTM_RM::rollback
// Send xa_rollback message to RM.
//-------------------------------------------------------------------
int CxaTM_RM::rollback(XID *pp_xid, int64 pv_flags)
{
int lv_xaError = XA_OK;
lock();
CxaTM_RMMessage * lp_msg = gv_xaTM.new_RMmsg();
if (lp_msg == NULL)
{
lv_xaError = XAER_RMFAIL;
}
else
{
lp_msg->initialize(this, TM_DP2_SQ_XA_ROLLBACK,
RM_MsgSize(lp_msg->Req()->u.iv_rollback),
RM_MsgSize(lp_msg->Rsp()->u.iv_rollback));
lp_msg->Req()->u.iv_rollback.iv_xid = *pp_xid;
lp_msg->Req()->u.iv_rollback.iv_rmid = getRmid();
lp_msg->Req()->u.iv_rollback.iv_flags = pv_flags;
lp_msg->Req()->u.iv_start.iv_nid = gv_xaTM.my_nid();
lp_msg->Req()->u.iv_start.iv_pid = gv_xaTM.my_pid();
lv_xaError = send_rm(lp_msg);
}
// release the message object back to the pool if there was an error
if (lp_msg)
{
if (lv_xaError)
gv_xaTM.release_RMmsg(lp_msg);
else
gv_xaTM.insert_txnMsgList(pp_xid, lp_msg);
}
unlock();
if (lp_msg && (pv_flags & TMASYNC) && (lv_xaError == XA_OK))
return lp_msg->msgid();
else
return lv_xaError;
} //CxaTM_RM::rollback