blob: e67ba6eb996c0b14d9527f5c6e7693f389e9b040 [file] [log] [blame]
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
// tmtxbase is the base class for transaction objects. This includes
// both transactions originating in and owned by DTM and XARM
// subordinate transaction branches.
// TM_TX_Info and CTmXaTxn are both derived from this class.
#include <sys/types.h>
#include <stdlib.h>
#include <string.h>
#include "seabed/ms.h"
#include "rm.h"
#include "xatmmsg.h"
#include "xatmlib.h"
#include "seabed/trace.h"
#include "tmlogging.h"
#include "tmtxbase.h"
#include "tminfo.h"
// -------------------------------------
// CTmTxBase Methods
// -------------------------------------
// -------------------------------------
// CTmTxBase constructor
// Purpose : calls initialize
// -------------------------------------
CTmTxBase::CTmTxBase(int32 pv_nid, int64 pv_flags, int32 pv_trace_level,
int32 pv_seq, int32 pv_pid, int32 pv_rm_wait_time)
//: CTmPoolElement()
{
//EID(EID_CTmTxBase);
// Initialize iv_trace_level and iv_lock_count here because they're used in calls
// to lock().
iv_trace_level = pv_trace_level;
// Mutex attributes: Recursive = true, ErrorCheck=false
ip_mutex = new TM_Mutex(true, false);
iv_ender_nid = pv_nid; //TMs nid is the default for the ender nid.
ip_timeoutEvent = NULL;
ip_hung_event = NULL;
ip_Thread = NULL;
iv_threadPending = false;
ip_Qlock_owner = NULL;
ip_currRequest = NULL;
memset(&iv_transid, 0, sizeof(TM_Txid_Internal));
iv_tag = 0;
iv_tx_state = TM_TX_STATE_NOTX;
iv_ender_pid = 0;
iv_num_active_partic.set_val(0);
iv_prepared_rms = 0;
iv_cleanup_sem = 0;
iv_rm_wait_time = 0;
iv_in_use = false;
iv_incremented = false;
iv_mark_for_rollback = false;
iv_tm_aborted = false;
iv_tse_aborted = false;
iv_appl_aborted = false;
iv_heur_aborted = false;
iv_read_only = false;
iv_use_ext_transid = false;
iv_written_in_cp = false;
iv_wrote_trans_state = false;
iv_recovering = false;
iv_transactionBusy = false;
iv_timeout = 0;
ip_branches = NULL;
// Initialize separately from TM_Info::new_tx as we don't have the creator nid,pid when called by
// the tmPool constructor.
// this->initialize(pv_nid, pv_flags, pv_trace_level, pv_seq, pv_pid, pv_rm_wait_time);
}
// -------------------------------------
// CTmTxBase destructor
// Purpose :
// -------------------------------------
CTmTxBase::~CTmTxBase()
{
TMTrace (2, ("CTmTxBase::~CTmTxBase : ENTRY.\n"));
}
//----------------------------------------------------------------------------
// CTmTxBase::constructPoolElement
// Purpose : Callback for CTmPool elements.
// This method must be called by the implementation class!
// Because the function is static we must work out here
// What the derived class is and call it's constructPoolElement
// function directly.
//----------------------------------------------------------------------------
CTmTxBase * CTmTxBase::constructPoolElement(int64 pv_id)
{
CTmTxBase *lp_txn = NULL;
CTmTxKey k(pv_id);
TMTrace (2, ("CTmTxBase::constructPoolElement ENTRY: Constructing transaction "
"pool element for Txn ID (%d,%d).\n",
k.node(), k.seqnum()));
abort();
/*
switch (iv_txnType)
{
case TM_TX_TYPE_DTM:
lp_txn = (CTmTxBase *) ::TM_TX_Info.constructPoolElement(pv_id);
break;
case TM_TX_TYPE_XARM:
lp_txn = (CTmTxBase *) ::CTmXaTxn.constructPoolElement(pv_id);
break;
default:
TMTrace (2, ("CTmTxBase::constructPoolElement ENTRY: Constructing transaction "
"pool element of type %d for Txn ID (%d,%d).\n",
iv_txnType, k.node(), k.seqnum()));
abort();
} */
TMTrace (2, ("CTmTxBase::constructPoolElement EXIT: Transaction "
"pool element %p constructed.\n", lp_txn));
return lp_txn;
} //constructPoolElement
//----------------------------------------------------------------------------
// CTmTxBase::cleanPoolElement
// Purpose : Callback for CTmPool elements.
// This method is called to clean a CTmTxBase object when CTmPool::newElement
// allocated it from the freeList.
// Returns the threadNum which is reused to add the thread object to the
// CTmPool<CTmTxBase> inUseList.
//----------------------------------------------------------------------------
int64 CTmTxBase::cleanPoolElement()
{
CTmTxKey k(node(), seqnum());
cleanup();
return k.id();
} //CTmTxBase::cleanPoolElement
// ---------------------------------------------------------
// cleanup
// ---------------------------------------------------------
void CTmTxBase::cleanup()
{
TMTrace(2,("CTmTxBase::cleanup BASE CLASS!\n"));
abort();
}
// ---------------------------------------------------------
// get_rm
// Purpose : Get the address of the ia_TSEBranches entry for the
// specified rmid.
// Note that this is a private function, and protected by
// the caller.
// ---------------------------------------------------------
RM_Info_TSEBranch * CTmTxBase::get_rm(int32 pv_rmid)
{
return branches()->TSE()->return_slot(pv_rmid);
}
// -------------------------------------
// initialize
// Purpose : Initialize this instance
// -------------------------------------
void CTmTxBase::initialize(int32 pv_nid, int64 pv_flags, int32 pv_trace_level,
int32 pv_seq, int32 pv_creator_nid, int32 pv_creator_pid,
int32 pv_rm_wait_time)
{
TMTrace(2, ("CTmTxBase::initialize : ENTRY ID (%d,%d).\n",pv_nid, pv_seq));
// transid setup
lock();
iv_txnType = TM_TX_TYPE_DTM; // default to standard DTM owned txn
iv_incremented = false;
iv_transid.iv_seq_num = pv_seq;
ip_Thread = NULL;
iv_threadPending = false;
iv_transid.iv_tx_flags = (short)pv_flags; //HERE
iv_transid.iv_node = pv_nid;
iv_transid.iv_version = 1;
iv_transid.iv_check_sum = 403;
iv_transid.iv_incarnation_num = gv_tm_info.incarnation_num();
iv_transid.iv_tt_flags.Application = 0;
iv_transid.iv_tt_flags.Reserved[0] = 0;
iv_transid.iv_tt_flags.Reserved[1] = 0;
iv_transid.iv_tt_flags.Predefined = 0;
iv_transid.iv_timestamp = SB_Thread::Sthr::time();
// misc setup
iv_timeout = gv_tm_info.timeout(); //default timeout
ip_timeoutEvent = NULL;
iv_tm_aborted = iv_tse_aborted = iv_appl_aborted = iv_heur_aborted = iv_written_in_cp = false;
iv_wrote_trans_state = iv_mark_for_rollback = false;
iv_recovering = false;
iv_read_only = false;
iv_in_use = true;
iv_prepared_rms = 0;
iv_trace_level = pv_trace_level;
iv_tx_state = TM_TX_STATE_NOTX;
iv_tag = pv_seq;
iv_num_active_partic.set_val(1);
iv_ender_nid = pv_creator_nid;
iv_ender_pid = pv_creator_pid;
ip_currRequest = NULL;
iv_transactionBusy = false;
iv_cleanup_sem = 0;
iv_rm_wait_time = pv_rm_wait_time;
ip_hung_event = NULL;
add_app_partic(iv_ender_pid, iv_ender_nid);
iv_stats.initialize(gv_tm_info.stats()->collectStats(),
gv_tm_info.stats()->collectInterval());
iv_stats.clearCounters();
unlock();
}
// -----------------------------------------------------------------
// safe_initialize_slot
// purpose : this method provides a safe version of initialize_slot, to
// be called by external users. It acquires the lock, calls initialize_slot,
// releases the lock, and returns the value to the caller.
// -----------------------------------------------------------------
int CTmTxBase::safe_initialize_slot (int32 pv_rmid)
{
int lv_idx = 0;
lock();
lv_idx = initialize_slot (pv_rmid);
unlock();
return lv_idx;
}
// -----------------------------------------------------------------
// initialize_slot
// purpose : this method is used to set an RM's partic flag to true
// so that it can partic in the transaction.
// Currently only called by register_branches (ax_reg), so we assume
// that the RM is up and set it's flags appropriately
// register_branches or other caller MUST have already acquired the lock.
// -----------------------------------------------------------------
int CTmTxBase::initialize_slot (int32 pv_rmid)
{
return branches()->TSE()->set_partic_and_transid(iv_transid, pv_rmid);
}
// ----------------------------------------------------------------
// initialize_tx_rms
// Purpose : initialize the group of standard RMs for any
// given transaction. This is done at the start
// of every transaction
// ----------------------------------------------------------------
void CTmTxBase::initialize_tx_rms(bool pv_partic_true)
{
lock();
if (branches() == NULL)
{
TMTrace(1, ("CTmTxBase::initialize_tx_rms ERROR : ip_branches not instantiated!\n"));
tm_log_event(DTM_TXN_INIT_RMS_NO_BRANCH_OBJ, SQ_LOG_CRIT, "DTM_TXN_INIT_RMS_NO_BRANCH_OBJ",
-1,-1,node(),seqnum());
abort();
}
branches()->init_rms(this, pv_partic_true);
unlock();
}
// --------------------------------------------------------------
// sync_write
// Purpose - prepare and send sync for state transition
// -------------------------------------------------------------
void CTmTxBase::sync_write (int32 pv_nid,
int32 pv_pid, TM_TX_STATE pv_state)
{
TM_SYNC_TYPE lv_type;
TMTrace (2, ("CTmTxBase::sync_write : ENTRY. TxnId %d, Current state %d, new state %d.\n",
iv_tag, iv_tx_state, pv_state));
iv_tx_state = pv_state;
switch (pv_state)
{
case TM_TX_STATE_BEGINNING:
{
lv_type = TM_BEGIN_SYNC;
break;
}
case TM_TX_STATE_COMMITTED:
case TM_TX_STATE_ABORTING:
case TM_TX_STATE_ABORTING_PART2:
case TM_TX_STATE_ABORTED:
{
lv_type = TM_END_SYNC;
break;
}
case TM_TX_STATE_FORGOTTEN:
{
lv_type = TM_FORGET_SYNC;
break;
}
default:
{
return; // nothing to do for now
}
}; //switch
init_and_send_tx_sync_data( lv_type, pv_state,
(TM_Transid_Type*)&iv_transid,
pv_nid, pv_pid);
TMTrace (2, ("CTmTxBase::sync_write : EXIT. TxnId %d, type %d.\n",
iv_tag, lv_type));
}
// --------------------------------------------------------------
// eventQ
// Purpose - retrieve the iv_eventQ to allow direct deque method
// calls against it.
// --------------------------------------------------------------
TM_DEQUE * CTmTxBase::eventQ()
{
return &iv_eventQ;
}
// --------------------------------------------------------------
// eventQ_push
// Purpose - push a new event to the transactions event queue.
// These are pushed in FIFO order.
// pv_threadAssociated was added to ensure that when a thread
// is associated with a new transaction, we don't get a race
// condition with the transaction being picked up by a
// disassociating thread before the newly instantiated/allocated
// thread gets a chance to process the THREAD_INITIATE event.
// --------------------------------------------------------------
void CTmTxBase::eventQ_push(CTmTxMessage * pp_msg, bool pv_threadAssociated)
{
CTxThread * lp_thread = NULL;
TMTrace(3, ("CTmTxBase::eventQ_push Entry: Txn ID (%d,%d), msg %d, thdAssoc %d, Thread Obj %s(%p) tid %ld, ThreadPending %d, Txn Obj %p\n",
node(), seqnum(), (pp_msg)?pp_msg->msgid():-1, pv_threadAssociated,
(ip_Thread)?ip_Thread->get_name():"MAIN", (void *) ip_Thread,
(ip_Thread)?ip_Thread->get_id():-1,
threadPending(), (void *) this));
if (pp_msg == NULL)
{
tm_log_event(TM_TMTX_INVALID_MSG_TYPE, SQ_LOG_CRIT, "TM_TMTX_INVALID_MSG_TYPE");
TMTrace (1, ("CTmTxBase::eventQ_push - Request to be queued is NULL.\n"));
abort ();
}
// for worker threads we need to grab a new thread for every event, unless
// the transaction object already has an associated thread
if (!pv_threadAssociated && !ip_Thread && !threadPending())
{
TMTrace (3, ("CTmTxBase::eventQ_push : Instantiating new thread to service request.\n"));
threadPending(true);
lp_thread = gv_tm_info.new_thread(this);
if (lp_thread != NULL)
pv_threadAssociated = true;
else
threadPending(false);
}
iv_eventQ.push(pp_msg);
if(ip_Thread || threadPending())
{
TMTrace (3, ("CTmTxBase::eventQ_push : signalling Txn ID (%d,%d) thread %s, tid %ld, event %d.\n",
node(), seqnum(),
(ip_Thread)?ip_Thread->get_name():"MAIN",
(ip_Thread)?ip_Thread->get_id():-1, pp_msg->requestType()));
iv_CV.signal(true /*lock*/);
}
else
{
if (pv_threadAssociated)
{
TMTrace (3, ("CTmTxBase::eventQ_push : "
"Txn ID (%d,%d), event %d queued but thread not signalled.\n",
node(), seqnum(), pp_msg->requestType()));
}
else
{
TMTrace (3, ("CTmTxBase::eventQ_push : no thread associated with "
"ID %d, event %d queued and txn added to disassociated Q.\n",
seqnum(), pp_msg->requestType()));
// Put this transaction object on the txdisassociated queue.
// This allows an idle thread to pick up the work.
// TODO: Should check here to make sure that the transaction object
// isn't already on the queue or we could end up with duplicates.
gv_tm_info.txdisassociatedQ()->push(this);
}
}
}
// --------------------------------------------------------------
// eventQ_push_top
// Purpose - push an event to the top of the queue.
// This is used for TM internal messages where they must be
// processed before anything else like an Initialize.
// This has no effect if the TM is singlethreaded.
// --------------------------------------------------------------
void CTmTxBase::eventQ_push_top(CTmTxMessage * pp_msg)
{
TMTrace(3, ("CTmTxBase::eventQ_push_top : Thread Obj %p(%s), lpid %ld, lname %s, Txn Obj %p, ID %d\n",
(void *) ip_Thread, (ip_Thread)?ip_Thread->get_name():"MAIN",
SB_Thread::Sthr::self_id(), SB_Thread::Sthr::self_name(),
(void *) this, seqnum()));
if (pp_msg == NULL)
{
tm_log_event(TM_TMTX_INVALID_MSG_TYPE, SQ_LOG_CRIT, "TM_TMTX_INVALID_MSG_TYPE");
TMTrace (1, ("CTmTxBase::eventQ_push_top - Request to be queued is NULL.\n"));
abort ();
}
iv_eventQ.push_back(pp_msg);
if(ip_Thread)
{
TMTrace (3, ("CTmTxBase::eventQ_push_top : signaling transaction ID %d thread %s, event %d.\n",
iv_tag, ip_Thread->get_name(), pp_msg->requestType()));
iv_CV.signal(true /*lock*/);
}
else
TMTrace (3, ("CTmTxBase::eventQ_push_top : no thread associated with "
"transaction ID %d, event %d queued.\n",
iv_tag, pp_msg->requestType()));
}
// --------------------------------------------------------------
// eventQ_pop
// Purpose - pop an event from the end of the queue. Events are
// always processed in FIFO order.
// pp_msg must be allocated prior to calling eventQ_pop. calling
// the CTmTxMessage::reply method against it will delete the request
// so no need to clean this up.
// pv_timeout is only implemented as:
// TX_EVENTQ_WAITFOREVER (-1): wait for ever, the default, or
// TX_EVENTQ_NOWAIT (-2): don't wait.
// --------------------------------------------------------------
CTmTxMessage * CTmTxBase::eventQ_pop(int pv_timeout)
{
char la_buf[1024];
CTmTxMessage * lp_msg = NULL;
CTmTxBase *lp_txn = this;
long lv_thrd = SB_Thread::Sthr::self_id();
long lv_txnThrd = ip_Thread->get_id();
TMTrace (2, ("CTmTxBase::eventQ_pop ENTRY timeout(%d) Thread %ld.\n", pv_timeout, lv_thrd));
if (!ip_Thread || ip_Thread->get_id() != SB_Thread::Sthr::self_id() || lv_thrd != SB_Thread::Sthr::self_id() || lv_txnThrd != lv_thrd)
{
sprintf(la_buf, "TxnThread %p, %s(%ld), Executing Thread %s(%ld), Txn Obj %p",
(void *) ip_Thread,
((ip_Thread)?ip_Thread->get_name():"MAIN"),
((ip_Thread)?ip_Thread->get_id():-1),
SB_Thread::Sthr::self_name(), SB_Thread::Sthr::self_id(),
(void *) this);
tm_log_event(DTM_LOGIC_ERROR_THR_MISMATCH, SQ_LOG_CRIT, "DTM_LOGIC_ERROR_THR_MISMATCH",
-1,-1,node(),seqnum(),-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,la_buf);
TMTrace (1, ("CTmTxBase::eventQ_pop : Txn ID (%d,%d) Thread Mismatch! %s\n", node(), seqnum(), la_buf));
fflush(stdout);
abort();
}
if (pv_timeout == TX_EVENTQ_NOWAIT && iv_eventQ.empty())
{
TMTrace (2, ("CTmTxBase::eventQ_pop EXIT nowait and queue is empty.\n"));
return NULL;
}
while (iv_eventQ.empty())
{
TMTrace (3, ("CTmTxBase::eventQ_pop transaction ID %d thread %s event Q "
"empty, waiting for signal.\n",
iv_tag, ((ip_Thread)?ip_Thread->get_name():"NO THREAD")));
iv_CV.wait(true /*lock*/);
}
lp_msg = (CTmTxMessage *) iv_eventQ.pop_end();
// If we've somehow picked up an event for a different thread abort now!
if (lp_txn != this || ip_Thread->get_id() != SB_Thread::Sthr::self_id() || lv_thrd != SB_Thread::Sthr::self_id())
{
sprintf(la_buf, "(2) TxnThread %p, %s(%ld), Executing Thread %s(%ld), Txn Obj %p",
(void *) ip_Thread,
((ip_Thread)?ip_Thread->get_name():"MAIN"),
((ip_Thread)?ip_Thread->get_id():-1),
SB_Thread::Sthr::self_name(), SB_Thread::Sthr::self_id(),
(void *) this);
tm_log_event(DTM_LOGIC_ERROR_THR_MISMATCH, SQ_LOG_CRIT, "DTM_LOGIC_ERROR_THR_MISMATCH",
-1,-1,node(),seqnum(),-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,la_buf);
TMTrace (1, ("CTmTxBase::eventQ_pop : Txn ID (%d,%d) Thread Mismatch! %s\n", node(), seqnum(), la_buf));
fflush(stdout);
abort();
}
TMTrace (2, ("CTmTxBase::eventQ_pop EXIT transaction ID %d thread %s, "
"msgid %d returning event %d.\n",
iv_tag, ((ip_Thread)?ip_Thread->get_name():"NO THREAD"),
((lp_msg)?lp_msg->msgid():0),
((lp_msg)?lp_msg->requestType():0)));
return lp_msg;
}
// --------------------------------------------------------------
// Event Specific Methods:
// --------------------------------------------------------------
// req_begin
// --------------------------------------------------------------
bool CTmTxBase::req_begin(CTmTxMessage * pp_msg)
{
bool lv_exit = false;
TMTrace (2, ("CTmTxBase::req_begin : BASE CLASS!\n"));
abort();
return lv_exit;
} //req_begin
// --------------------------------------------------------------
// req_end
// --------------------------------------------------------------
bool CTmTxBase::req_end(CTmTxMessage * pp_msg)
{
bool lv_terminate = false;
TMTrace (2, ("CTmTxBase::req_end : BASE CLASS!\n"));
abort();
return lv_terminate;
} //req_end
// --------------------------------------------------------------
// req_abort
// --------------------------------------------------------------
bool CTmTxBase::req_abort(CTmTxMessage * pp_msg)
{
bool lv_terminateThread = false;
TMTrace (2, ("CTmTxBase::req_abort : BASE CLASS! Txn ID (%d,%d).\n", node(), seqnum()));
abort();
return lv_terminateThread;
} //req_abort
// --------------------------------------------------------------
// rollback_txn
// --------------------------------------------------------------
bool CTmTxBase::rollback_txn(CTmTxMessage * pp_msg)
{
TMTrace (2, ("CTmTxBase::rollback_txn : ID %d, BASE CLASS!\n",
seqnum()));
abort();
return false; //Don't terminate txn thread, still need to process the
//TM_MSG_TXINTERNAL_ABORTCOMPLETE.
} //rollback_txn
// --------------------------------------------------------------
// redrivecommit_txn
// --------------------------------------------------------------
bool CTmTxBase::redrivecommit_txn(CTmTxMessage * pp_msg)
{
bool lv_terminate = false;
TMTrace (2, ("CTmTxBase::redrivecommit_txn BASE CLASS!: ID %d, current txn state %d\n",
seqnum(), tx_state()));
abort();
return lv_terminate;
} //redrivecommit_txn
// --------------------------------------------------------------
// req_abort_complete
// --------------------------------------------------------------
bool CTmTxBase::req_abort_complete(CTmTxMessage * pp_msg)
{
TMTrace (2, ("CTmTxBase::req_abort_complete BASE CLASS!: ID %d\n", seqnum()));
abort();
return false; //Don't terminate transaction, still need to do forget
}
// --------------------------------------------------------------
// req_end_complete
// --------------------------------------------------------------
bool CTmTxBase::req_end_complete(CTmTxMessage * pp_msg)
{
TMTrace (2, ("CTmTxBase::req_end_complete BASE CLASS!: ID %d\n", seqnum()));
abort();
return false; //Don't terminate transaction, still need to do forget
}
// --------------------------------------------------------------
// req_forget
// --------------------------------------------------------------
bool CTmTxBase::req_forget(CTmTxMessage * pp_msg)
{
bool lv_terminate = true; // Normal processing is for forget to
// drive thread disassociation for
// transaction threads.
TMTrace (2, ("CTmTxBase::req_forget BASE CLASS!: ID %d\n", seqnum()));
abort();
return lv_terminate; //Finished with the transaction, disassociate and cleanup
} // req_forget
// --------------------------------------------------------------
// req_begin_complete
// --------------------------------------------------------------
bool CTmTxBase::req_begin_complete(CTmTxMessage * pp_msg)
{
TMTrace (2, ("CTmTxBase::req_begin_complete BASE CLASS!: ID %d, error=%d\n",
seqnum(), pp_msg->responseError()));
abort();
return false; //Leave transaction thread alive
}
// --------------------------------------------------------------
// Event Specific Methods implemented in base class
// --------------------------------------------------------------
// req_join
// Purpose - Txn specific processing for JOINTRANSACTION.
// --------------------------------------------------------------
bool CTmTxBase::req_join(CTmTxMessage * pp_msg)
{
TMTrace(2, ("CTmTxBase::req_join : ENTRY: XA Txn ID (%d,%d).\n",
node(), seqnum()));
if (tx_state() != TM_TX_STATE_NOTX &&
tx_state() != TM_TX_STATE_BEGINNING &&
tx_state() != TM_TX_STATE_ACTIVE)
{
TMTrace (1, ("CTmTxBase::req_join : FEENDEDTRANSID\n"));
pp_msg->responseError(mapErr(FEENDEDTRANSID));
}
else
{
// Transaction is either BEGINNING or ACTIVE, so can be joined.
// Note that a transaction in NOTX state has not yet reached BEGINNING.
TMTrace (3, ("CTmTxBase::req_join : active participants %d\n",
num_active_partic()));
pp_msg->responseError(FEOK);
if (pp_msg->request()->u.iv_join_trans.iv_coord_role == true)
{
if (iv_ender_pid == -1)
{
iv_ender_pid = pp_msg->request()->u.iv_join_trans.iv_pid;
inc_active_partic();
add_app_partic(iv_ender_pid, pp_msg->request()->u.iv_join_trans.iv_nid);
}
else
pp_msg->responseError(mapErr(FETXNOTSUSPENDED));
}
else
{
inc_active_partic();
add_app_partic(pp_msg->request()->u.iv_join_trans.iv_pid,
pp_msg->request()->u.iv_join_trans.iv_nid);
}
}
pp_msg->response()->u.iv_join_trans.iv_tm_pid = gv_tm_info.nid();
pp_msg->reply(mapErr(FEOK));
TMTrace (2, ("CTmTxBase::req_join : EXIT, replying error %d\n",
pp_msg->responseError()));
// Return code doesn't matter, we run from the main thread
return false;
} //req_join
// --------------------------------------------------------------
// req_status
// Purpose - Txn specific processing for STATUSTRANSACTION.
// We return a status of Aborted if the transaction is in the
// process of aborting because a TMF_DOOMTRANSACTION_ will
// leave the transaction marked for rollback once an ENDTRANSACTION
// or ABORTTRANSACTION is received. SQL use STATUSTRANSACTION
// to determine whether the transaction was doomed.
// --------------------------------------------------------------
bool CTmTxBase::req_status(CTmTxMessage * pp_msg)
{
TMTrace (2, ("CTmTxBase::req_status : ENTRY\n"));
// For transactions that are beginning we want to report them as active
if (tx_state() == TM_TX_STATE_BEGINNING ||
tx_state() == TM_TX_STATE_NOTX)
pp_msg->response()->u.iv_status_trans.iv_status = TM_TX_STATE_ACTIVE;
else
{
if (isAborting() && tx_state() != TM_TX_STATE_ABORTING &&
tx_state() != TM_TX_STATE_ABORTING_PART2 &&
tx_state() != TM_TX_STATE_ACTIVE)
pp_msg->response()->u.iv_status_trans.iv_status = TM_TX_STATE_ABORTED;
else
pp_msg->response()->u.iv_status_trans.iv_status = (short) tx_state();
}
pp_msg->reply(mapErr(FEOK));
TMTrace (2, ("CTmTxBase::req_status : EXIT, state %d\n", tx_state()));
// Return code doesn't matter, we run from the main thread
return false;
} //req_status
// --------------------------------------------------------------
// req_suspend
// Purpose - Txn specific processing for SUSPENDTRANSACTION.
// --------------------------------------------------------------
bool CTmTxBase::req_suspend(CTmTxMessage * pp_msg)
{
TMTrace (2, ("CTmTxBase::req_suspend : ENTRY\n"));
if (tx_state() == TM_TX_STATE_NOTX)
{
TMTrace (1, ("CTmTxBase::req_suspend : FENOTRANSID\n"));
pp_msg->responseError(mapErr(FENOTRANSID));
}
else
{
if (remove_app_partic(pp_msg->request()->u.iv_suspend_trans.iv_pid,
pp_msg->request()->u.iv_suspend_trans.iv_nid))
{
dec_active_partic();
TMTrace (3, ("CTmTxBase::req_suspend : active participants %d\n",
num_active_partic()));
if ((pp_msg->request()->u.iv_suspend_trans.iv_coord_role == false)
&& (pp_msg->request()->u.iv_suspend_trans.iv_pid == iv_ender_pid))
iv_ender_pid = -1; // someone better call join with coord = true
pp_msg->responseError(mapErr(FEOK));
}
else
{
TMTrace (3, ("CTmTxBase::req_suspend : not an active participant.\n"));
pp_msg->responseError(mapErr(FENOTRANSID));
}
}
pp_msg->reply();
TMTrace (2, ("CTmTxBase::req_suspend : EXIT\n"));
// Return code doesn't matter, we run from the main thread
return false;
} //req_suspend
// --------------------------------------------------------------
// req_ax_reg
// Purpose - Txn Thread specific processing for ax_reg.
// --------------------------------------------------------------
bool CTmTxBase::req_ax_reg(CTmTxMessage * pp_msg)
{
TMTrace (2, ("CTmTxBase::req_ax_reg : ID (%d,%d) ENTRY for rmid %d\n",
node(), seqnum(), pp_msg->request()->u.iv_ax_reg.iv_rmid));
short lv_error = register_branch(pp_msg->request()->u.iv_ax_reg.iv_rmid, pp_msg);
pp_msg->response()->u.iv_ax_reg.iv_TM_incarnation_num = gv_tm_info.incarnation_num();
pp_msg->response()->u.iv_ax_reg.iv_LeadTM_nid = gv_tm_info.lead_tm_nid();
pp_msg->reply(mapErr(lv_error));
// Don't reset here - we're now called in the main thread rather than queued to the
// transaction thread
// We're finished so allow other work to commence
//reset_transactionBusy();
TMTrace (2, ("CTmTxBase::req_ax_reg : ID %d, EXIT\n", seqnum()));
// ax_reg never causes the thread to terminate
return false;
} //req_ax_reg
// --------------------------------------------------------------
// req_registerRegion
// Purpose - Txn Thread specific processing for registerRegion.
// This is only received in SeaTrans for HBase-trx participants.
// --------------------------------------------------------------
bool CTmTxBase::req_registerRegion(CTmTxMessage * pp_msg)
{
TMTrace (2, ("CTmTxBase::req_registerRegion : ID (%d,%d) ENTRY.\n",
node(), seqnum()));
short lv_error = branches()->registerRegion(this, 0, pp_msg);
// Reply here if we haven't yet done so.
lock();
if (pp_msg->replyPending())
pp_msg->reply(lv_error);
unlock();
reset_transactionBusy();
TMTrace (2, ("CTmTxBase::req_registerRegion : EXIT error %d, Txn ID (%d,%d).\n",
lv_error, node(), seqnum()));
return false; //Never terminate the thread if there's no more work to do
} //req_registerRegion
//------------------------------------------------------------------------------
// register_branch
// Purpose - Process received ax_reg from TSE
// If pp_msg == NULL, we are registering a branch internally (eg because we
// just received a doomtxn from a TSE which was not participating.
// Returns error code.
//-----------------------------------------------------------------------------
short CTmTxBase::register_branch(int32 pv_rmid, CTmTxMessage * pp_msg = NULL)
{
short lv_error = FEOK;
bool lv_txnActive = false,
lv_TTflagsChanged = false,
lv_IownLock = false,
lv_partic = false;
RM_Info_TSEBranch *lp_rm_branch = NULL;
TMTrace (2, ("CTmTxBase::register_branch ENTER, Txn ID (%d,%d), rmid %d, Tx state %d.\n",
node(), seqnum(), pv_rmid, tx_state()));
// The transaction object may have the lock, but we
// still want to accept the ax_reg here because failure
// to do so could hang the TSE which won't respond to
// requests like xa_rollback until it gets the reply to
// ax_reg!
branches()->branch_lock();
if (ip_mutex->lock_count() == 0)
{
lv_IownLock = true;
lock();
}
// Get the branch
// This had better work or our pv_rmid is bad.
lp_rm_branch = branches()->TSE()->return_slot(pv_rmid);
if (lp_rm_branch == 0)
{
tm_log_event(DTM_TMTX_INVALID_RMID, SQ_LOG_WARNING, "DTM_TMTX_INVALID_RMID",
FEINVTRANSID,pv_rmid,node(),seqnum());
TMTrace(1, ("CTmTxBase::register_branch - Unable to find RM %d to "
"register branch for txn ID (%d,%d), transaction rejected with FEINVTRANSID(75).\n",
pv_rmid, node(), seqnum()));
// Reject the register request
return FEINVTRANSID;
}
// Allow branches to participate in transactions in the following states:
if (iv_tx_state == TM_TX_STATE_ACTIVE ||
iv_tx_state == TM_TX_STATE_IDLE ||
iv_tx_state == TM_TX_STATE_ABORTING ||
iv_tx_state == TM_TX_STATE_COMMITTING ||
iv_tx_state == TM_TX_STATE_PREPARING ||
iv_tx_state == TM_TX_STATE_PREPARED ||
iv_tx_state == TM_TX_STATE_HUNGABORTED ||
iv_tx_state == TM_TX_STATE_ABORTING_PART2)
{
lv_txnActive = true;
lv_partic = lp_rm_branch->add_partic(iv_transid);
if (lv_partic)
{
if (iv_tx_state != TM_TX_STATE_ACTIVE &&
iv_tx_state != TM_TX_STATE_IDLE) {
TMTrace(1, ("CTmTxBase::register_branch - adding late participant rmid %d to Txn ID (%d,%d), Tx state %d.\n",
pv_rmid, node(), seqnum(), tx_state()));
}
else {
TMTrace(1, ("CTmTxBase::register_branch - adding participant rmid %d to Txn ID (%d,%d), Tx state %d.\n",
pv_rmid, node(), seqnum(), tx_state()));
}
// Issue a warning if this was a doomtxn
if (!pp_msg)
{
tm_log_event(DTM_TMTX_DOOMTXN_ADDED_PARTIC, SQ_LOG_WARNING, "DTM_TMTX_DOOMTXN_ADDED_PARTIC",
-1, pv_rmid, node(), seqnum(),-1,-1,-1,-1,-1,-1,-1,-1,tx_state());
TMTrace(3, ("CTmTxBase::register_branch - RM %d already participating in Txn "
"ID (%d,%d), state %d.\n",
pv_rmid, node(), seqnum(), tx_state()));
}
branches()->TSE()->inc_num_rm_partic();
}
else //add_partic() returns false if the branch is already participating.
{
// If we're already resolved this branch, return an error
lv_error = FEINVTRANSID;
// Only log an event if this register request came from an ax_reg. Doomtxn
// doesn't pass in a message pointer.
if (pp_msg)
tm_log_event(DTM_TMTX_ALREADY_PARTIC, SQ_LOG_WARNING, "DTM_TMTX_ALREADY_PARTIC",
lv_error, pv_rmid, node(), seqnum(),-1,-1,-1,-1,-1,-1,-1,-1,tx_state());
TMTrace(3, ("CTmTxBase::register_branch - RM %d already participating in Txn "
"ID (%d,%d), state %d, returning error %d.\n",
pv_rmid, node(), seqnum(), tx_state(), lv_error));
}
TMTrace(3, ("CTmTxBase::register_branch - RM %d ID (%d,%d) participating: %d, resolved: %d\n",
pv_rmid, node(), seqnum(),lv_partic, lp_rm_branch->resolved() ));
}
if (pp_msg)
{
memcpy(&pp_msg->response()->u.iv_ax_reg.iv_xid, &lp_rm_branch->iv_xid, sizeof(XID));
// We need to add in any flags set by the TSE that we don't know about.
// If the TSE set new flags then we need to write a trans state record
// now so that we know the new flag settings in the case of a failure.
int64 lv_TTflags = TT_flags() | pp_msg->request()->u.iv_ax_reg.iv_flags;
if (lv_TTflags != TT_flags())
{
lv_TTflagsChanged = true;
TMTrace(3, ("CTmTxBase::register_branch : Txn ID (%d,%d) Detected "
"different TT flags from TSE RM %d, TM "
PFLLX ", TSE " PFLLX ", combined " PFLLX ".\n",
node(), seqnum(), pv_rmid, TT_flags(),
pp_msg->request()->u.iv_ax_reg.iv_flags, lv_TTflags));
char lv_TTflagsStr[10], *lp_TTflagsStr = (char *) &lv_TTflagsStr;
sprintf(lp_TTflagsStr, "0x" PFLLX, lv_TTflags);
tm_log_event(DTM_TMTX_REG_TT_FLAGS_CHANGED, SQ_LOG_INFO, "DTM_TMTX_REG_TT_FLAGS_CHANGED",
-1,pv_rmid,node(),seqnum(),-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,lp_TTflagsStr);
// Record that a NO UNDO occured.
if (TT_flags() & TM_TT_NO_UNDO)
{
tm_log_event(DTM_NOUNDO, SQ_LOG_WARNING, "DTM_NOUNDO",
-1,pv_rmid,node(),seqnum(),-1,-1,-1,-1,-1,-1,-1,-1,tx_state());
TMTrace(1, ("CTmTxBase::register_branch : WARNING - Txn ID (%d,%d), state %d - RM %d set "
"NO UNDO. This will leave the database in an inconsistent state.\n",
node(), seqnum(), tx_state(), pv_rmid));
}
TT_flags(lv_TTflags);
gv_tm_info.write_trans_state (&iv_transid, iv_tx_state, abort_flags(), false);
}
pp_msg->response()->u.iv_ax_reg.iv_flags = TT_flags();
}
if (!lv_txnActive && !lv_TTflagsChanged)
{
tm_log_event(DTM_TMTX_REG_ERROR_TOOLATE, SQ_LOG_WARNING, "DTM_TMTX_REG_ERROR_TOOLATE",
-1,pv_rmid,node(),seqnum(),-1,-1,-1,-1,-1,-1,-1,-1,tx_state(),
lp_rm_branch->resolved(),lp_rm_branch->partic());
TMTrace(1, ("CTmTxBase::register_branch - ERROR RM %d is too late to participate in "
"Txn ID (%d,%d), state %d, partic %d, resolved %d.\n",
pv_rmid, node(), seqnum(), tx_state(),
lp_rm_branch->partic(), lp_rm_branch->resolved()));
lv_error = FEINVTRANSID;
}
if (pp_msg)
pp_msg->responseError(lv_error);
branches()->branch_unlock();
if (lv_IownLock)
unlock();
TMTrace(2, ("register_branch EXIT, error %d\n", lv_error));
return lv_error;
} // CTmTxBase::register_branch
// ----------------------------------------------------------------
// internal_abortTrans
// Purpose - Queue a rollback request for this transaction.
// This is currently only used during takeover, shutdown and
// process death.
// ----------------------------------------------------------------
int32 CTmTxBase::internal_abortTrans(bool pv_takeover_or_shutdown)
{
CTmTxMessage * lp_msg = new CTmTxMessage(TM_MSG_TXINTERNAL_ROLLBACK);
lp_msg->request()->u.iv_rollback_internal.iv_takeover_or_shutdown = pv_takeover_or_shutdown;
queueToTransaction(&iv_transid, lp_msg);
TMTrace (2, ("CTmTxBase::internal_abortTrans : EXIT\n"));
return FEOK;
} //internal_abortTrans
// ----------------------------------------------------------------
// redrive_rollback
// Purpose - Queue a redrive rollback request for this transaction.
// This is currently only used during a takeover.
// ----------------------------------------------------------------
int32 CTmTxBase::redrive_rollback()
{
CTmTxMessage * lp_msg = new CTmTxMessage(TM_MSG_TXINTERNAL_REDRIVEROLLBACK);
queueToTransaction(&iv_transid, lp_msg);
TMTrace (2, ("CTmTxBase::redrive_rollback : EXIT\n"));
return FEOK;
} //redrive_rollback
// ----------------------------------------------------------------
// redrive_commit
// Purpose - Queue a redrive commit request for this transaction.
// This is currently only used during a takeover.
// ----------------------------------------------------------------
int32 CTmTxBase::redrive_commit()
{
CTmTxMessage * lp_msg = new CTmTxMessage(TM_MSG_TXINTERNAL_REDRIVECOMMIT);
queueToTransaction(&iv_transid, lp_msg);
TMTrace (2, ("CTmTxBase::redrive_commit : EXIT\n"));
return FEOK;
} //redrive_commit
void CTmTxBase::schedule_redrive_sync()
{
CTmTxMessage * lp_msg = new CTmTxMessage(TM_MSG_TXINTERNAL_REDRIVESYNC);
eventQ_push(lp_msg);
}
//-----------------------------------------------------------------------------
// decrement & test cleanup semaphore
// Only return true of the semaphore is 1 to make sure
// that only one caller will do the actual cleanup.
// The idea is that every thread working on a specific transaction calls
// inc_cleanup() when they want the txn object cleaned up on exit.
// The cleanup routine executes when the thread disassociates, calls
// dec_cleanup() which will only return true when the iv_cleanup_sem count
// is 1, so only one thread will cleanup the txn object.
// Note that we lock the Qmutex when incrementing the cleanup
// semaphore to ensure no one trys to write to the eventQ or PendingRequestQ
// while we're setting cleanup.
//-----------------------------------------------------------------------------
bool CTmTxBase::dec_cleanup()
{
bool cleanup = false;
lock();
if (iv_cleanup_sem == 1)
cleanup = true;
if (iv_cleanup_sem > 0)
iv_cleanup_sem--;
unlock();
TMTrace (2, ("CTmTxBase::dec_cleanup: semaphore = %d, returning %d\n", iv_cleanup_sem, cleanup));
return cleanup;
}
// increment cleanup semaphore
void CTmTxBase::inc_cleanup()
{
lock();
Qlock();
iv_cleanup_sem++;
Qunlock();
unlock();
TMTrace (2, ("CTmTxBase::inc_cleanup: semaphore = %d\n", iv_cleanup_sem));
}
//-----------------------------------------------------------------------------
// set_transactionBusy
// The following 2 methods allow setting and resetting the
// iv_transactionBusy within the same lock to ensure
// it's consistent and another thread doesn't interfere.
// Qlock() must be called before calling set_transactionBusy and Qunlock()
// after any outstanding request is queued to eventQ or PendingRequestQ.
// The return value indicates whether set_transactionBusy succeeded in setting
// the value.
// We also set the iv_currRequest here. It may be needed later to handle
// a request which spans waits.
// Returns true if iv_transactionBusy==false, ie if the transaction object
// is available.
// false if iv_transactionBusy==true - the transaction object is busy.
//-----------------------------------------------------------------------------
bool CTmTxBase::set_transactionBusy(CTmTxMessage * pp_msg)
{
bool lv_return = false;
TMTrace (2, ("CTmTxBase::set_transactionBusy : ENTRY ID (%d,%d), iv_transactionBusy=%d.\n",
node(), seqnum(), iv_transactionBusy));
if (iv_transactionBusy == false)
{
iv_transactionBusy = true;
lv_return = true;
// Save the request, it will be needed to drive events after a wakeup for
// multi-part requests like ENDTRANSACTION.
currRequest(pp_msg);
}
else
{
TMTrace (3, ("CTmTxBase::set_transactionBusy : WARNING: Possible "
"programming bug. iv_transactionBusy already set on call "
"to set_transactionBusy.\n"));
if (thread() == NULL && currRequest() == NULL)
{
TMTrace (3, ("CTmTxBase::set_transactionBusy : BUG! Transaction "
"ID (%d,%d) busy but has no associated thread and current request!\n",
node(), seqnum()));
abort();
}
}
TMTrace (2, ("CTmTxBase::set_transactionBusy : EXIT ID (%d,%d), returning "
"transactionBusy=%d.\n", node(), seqnum(), lv_return));
return lv_return;
} //set_transactionBusy
//-----------------------------------------------------------------------------
// reset_transactionBusy
// Resets the transactionBusy flag. This method also checks the
// PendingRequest queue and pops the top entry if there is one.
// Note that we also delete the iv_currRequest message object here,
// so callers had better have replied to the client prior to calling
// reset_transactionBusy!
// This routine locks the Qmutex to serialize changes to the transactionBusy
// flag and writes/reads to the PendingRequestQ. It does not lock the transaction
// object. To ensure we don't mess up,
// pv_cleanupPending input.
// True if cleaning up the transaction object. This happens in forget().
// In this case we cleanup the PendingRequestQ, replying to any that have
// outstanding replies.
// Returns true if iv_transactionBusy==true, ie if the transaction object
// was busy.
// false if iv_transactionBusy==false - the transaction object was free.
//-----------------------------------------------------------------------------
bool CTmTxBase::reset_transactionBusy(bool pv_cleanupPending)
{
CTmTxMessage *lp_msg = NULL;
CTmTxMessage *lp_firstMsg = NULL;
TMTrace (2, ("CTmTxBase::reset_transactionBusy : ENTRY, ID (%d,%d), "
"cleanupPending=%d, iv_transactionBusy=%d.\n",
node(), seqnum(), pv_cleanupPending, iv_transactionBusy));
Qlock();
if (iv_transactionBusy)
{
// Clean up the old message, we've finished with it
delete_currRequest();
// Check the PendingRequest queue
lp_msg = lp_firstMsg = (CTmTxMessage *) iv_PendingRequestQ.pop_end();
while (lp_msg)
{
TMTrace (3, ("CTmTxBase::reset_transactionBusy : API "
"request popped off PendingRequestQ for msgid(%d).\n",
lp_msg->msgid()));
// If the cleanupPending flag was set then reply
// to all outstanding requests.
if (pv_cleanupPending)
{
if (lp_msg->replyPending())
{
if (isAborting())
lp_msg->reply(mapErr(FEABORTEDTRANSID));
else
lp_msg->reply(mapErr(FEENDEDTRANSID));
}
delete lp_msg;
lp_msg = (CTmTxMessage *) iv_PendingRequestQ.pop_end();
}
else
{
eventQ_push(lp_msg);
// Make this new request the current one for the transaction
currRequest(lp_msg);
lp_msg = NULL; // Break out of while
}
} //while
// Reset the transactionBusy flag now if there's nothing more to process
if (currRequest() == NULL)
{
iv_transactionBusy = false;
TMTrace (3, ("CTmTxBase::reset_transactionBusy: No requests outstanding, "
"iv_transactionBusy=false.\n"));
}
}
Qunlock();
TMTrace (2, ("CTmTxBase::reset_transactionBusy : EXIT ID (%d,%d), returning "
"transactionBusy=%d.\n", node(), seqnum(), iv_transactionBusy));
return iv_transactionBusy;
} //reset_transactionBusy
// ---------------------------------------------------------
// txn_object_good
// Purpose : Check that the transaction object is allocated
// to this transaction. There is a window in which the
// transaction might have been cleaned up and the transaction
// object returned to the free list.
//
// Note that this function will delete pp_msg if there is
// an error, so it must not be used after the call if false
// is returned
// --------------------------------------------------------
bool CTmTxBase::txn_object_good(TM_Txid_Internal *pp_transid, CTmTxMessage * pp_msg)
{
if (!cleaning_up() && iv_in_use && iv_transid.iv_seq_num == pp_transid->iv_seq_num)
{
TMTrace(2, ("CTmTxBase::txn_object_good txn object is good :-)\n"));
return true;
}
else
{
TMTrace(1, ("CTmTxBase::txn_object_good txn %d is old! Replying "
"FEINVTRANSID to late request %d & deleting msg object.\n",
pp_transid->iv_seq_num, pp_msg->requestType()));
pp_msg->reply(mapErr(FEINVTRANSID));
delete pp_msg;
return false;
}
}
// --------------------------------------------------------------
// CTmTxBase::reply_to_queuedRequests
// Purpose : reply to all requests queued on the iv_PendingRequestQ
// with the error provided.
// --------------------------------------------------------------
void CTmTxBase::reply_to_queuedRequests(short pv_error)
{
CTmTxMessage * lp_msg;
TMTrace (2, ("CTmTxBase::reply_to_queuedRequests ENTRY\n"));
lock();
// Reply to any events queued against the transaction
lp_msg = (CTmTxMessage *) eventQ_pop(TX_EVENTQ_NOWAIT);
while (lp_msg)
{
if (lp_msg->replyPending())
{
TMTrace (3, ("CTmTxBase::reply_to_queuedRequests replying to queued "
"transaction event %d, msgid %d\n",
lp_msg->requestType(), lp_msg->msgid()));
lp_msg->reply(mapErr(pv_error));
}
else
TMTrace (3, ("CTmTxBase::reply_to_queuedRequests queued transaction "
"event %d, msgid %d ignored, no reply pending.\n",
lp_msg->requestType(), lp_msg->msgid()));
// If this message matches the current Request then clear it as we've just
// processed the message
if (lp_msg == ip_currRequest)
ip_currRequest = 0;
delete lp_msg;
lp_msg = (CTmTxMessage *) eventQ_pop(TX_EVENTQ_NOWAIT);
}
// Reply to pending requests for this transaction. These can't be handled
// because we're in the process of forgetting the transaction.
lp_msg = (CTmTxMessage *) iv_PendingRequestQ.pop_end();
while (lp_msg)
{
TMTrace (3, ("CTmTxBase::reply_to_queuedRequests replying to request %d\n",
lp_msg->requestType()));
lp_msg->reply(mapErr(pv_error));
delete lp_msg;
lp_msg = (CTmTxMessage *) iv_PendingRequestQ.pop_end();
}
// Check to see if we have a request queued that needs to be replied to.
if (ip_currRequest && ip_currRequest->replyPending())
{
TMTrace (3, ("CTmTxBase::reply_to_queuedRequests replying to queued event %d thread %s, txn tag %d.\n",
ip_currRequest->requestType(),
((ip_Thread)?ip_Thread->get_name():"NO THREAD"),
iv_tag));
ip_currRequest->reply(mapErr(pv_error));
delete_currRequest();
}
unlock();
TMTrace (2, ("CTmTxBase::reply_to_queuedRequests EXIT\n"));
} //reply_to_queuedRequests
// ---------------------------------------------------------------------------
// CTmTxBase::add_app_partic
// Purpose : add the process to the list of processes participating in this
// transaction.
// pv_pid pid of participating process
// pv_nid nid of participating process - not of this TM!!!
// Returns true if successfully added.
// false indicates we didn't get the memory for the pid, or it's
// already in the list!
// ---------------------------------------------------------------------------
bool CTmTxBase::add_app_partic (int32 pv_pid, int32 pv_nid)
{
CTmTxKey lv_key(pv_nid, pv_pid);
CTmTxKey *lp_key = (CTmTxKey *) iv_app_partic_list.get(lv_key.id());
if (lp_key == NULL)
{
lp_key = new CTmTxKey(lv_key.id());
if (lp_key)
{
iv_app_partic_list.put(lv_key.id(), lp_key);
TMTrace (3, ("CTmTxBase::add_app_partic added process (%d,%d) : %p\n",
pv_nid, pv_pid, (void *)lp_key));
return true;
}
else
{
TMTrace(1, ("CTmTxBase::add_app_partic (%d,%d) no memory to add int32 pid!, aborting.\n",
pv_nid, pv_pid));
tm_log_event(DTM_TM_ADD_APP_PARTIC_NOMEM, SQ_LOG_CRIT, "DTM_TM_ADD_APP_PARTIC_NOMEM",
-1,-1,pv_nid,pv_pid);
abort();
}
}
else
{
TMTrace (1, ("CTmTxBase::add_app_partic Warning: nid %d, pid %d found in "
"app_partic_list - ignored.\n", pv_nid, pv_pid));
}
return false;
} //add_app_partic
bool CTmTxBase::remove_app_partic (int32 pv_pid, int32 pv_nid)
{
CTmTxKey lv_key(pv_nid, pv_pid);
CTmTxKey *lp_key = (CTmTxKey *) iv_app_partic_list.remove(lv_key.id());
if (lp_key)
{
TMTrace (3, ("CTmTxBase::remove_app_partic removing process (%d,%d) : %p\n",
pv_nid, pv_pid, (void *)lp_key));
delete lp_key;
return true;
}
return false;
}
// erase_app_partic is similar to remove_app_partic but can be called by
// code which reads through the map using get_first, ...
// Pass in the map element pointer. Note that in this case the contents
// of the map pointer is the key.
bool CTmTxBase::erase_app_partic(CTmTxKey *pp_key)
{
if (pp_key->id())
{
TMTrace (3, ("CTmTxBase::erase_app_partic removing process (%d,%d) : %p\n",
pp_key->node(), pp_key->pid(), (void *) pp_key));
iv_app_partic_list.erase(pp_key->id());
delete pp_key;
return true;
}
return false;
}
bool CTmTxBase::is_app_partic (int32 pv_pid, int32 pv_nid)
{
CTmTxKey lv_key(pv_nid, pv_pid);
CTmTxKey *lp_key = (CTmTxKey *)iv_app_partic_list.get(lv_key.id());
if (lp_key)
return true;
else
return false;
}
void CTmTxBase::cleanup_app_partic()
{
// go through our particpation list and make sure there is no
// outstanding particpants (this is fine as it could have been
// a unilateral abort or shutdown). Just get rid of it.
int32 lv_count = 0;
CTmTxKey *lp_key = NULL;
CTmTxKey *lp_last_key = NULL;
lp_key = (CTmTxKey *) iv_app_partic_list.get_first();
while (lv_count < num_active_partic() && lp_key)
{
lv_count++;
lp_last_key = lp_key;
lp_key = (CTmTxKey *) iv_app_partic_list.get_next();
erase_app_partic(lp_last_key);
}
iv_app_partic_list.get_end();
if (lv_count != num_active_partic())
TMTrace(1, ("CTmTxBase::cleanup_app_partic Warning: actual participants "
"%d does not match stored count %d\n",
lv_count, num_active_partic()));
num_active_partic();
}
//----------------------------------------------------------------------------
// PendingRequestQ_push
// Purpose : Push a message onto the iv_PendingRequestQ. This method also
// checks whether the CTmTxBase object is associated with a thread. If it
// is not, then we add the transaction to the txdisassociatedQ to make sure
// it is woken when a thread becomes available.
//----------------------------------------------------------------------------
void CTmTxBase::PendingRequestQ_push(CTmTxMessage *pp_msg)
{
TMTrace (2, ("CTmTxBase::PendingRequestQ_push ENTRY. ID %d, msgid %d.\n",
tag(), pp_msg->msgid()));
PendingRequestQ()->push(pp_msg);
// If there is no thread associated here we must add the txn object to the txdisassociatedQ
// to ensure it's picked up when a thread becomes available.
if (thread() == NULL)
{
gv_tm_info.txdisassociatedQ()->push(this);
if (iv_trace_level >= 3)
trace_printf ("CTmTxBase::PendingRequestQ_push ID %d, msgid %d. Txn object "
"added to txdisassociatedQ. " PFLL " entries on txdisassociatedQ.\n",
tag(), pp_msg->msgid(), gv_tm_info.txdisassociatedQ()->size());
}
TMTrace (2, ("CTmTxBase::PendingRequestQ_push EXIT. ID %d, msgid %d. "
"" PFLL " entries on PendingRequest queue.\n",
tag(), pp_msg->msgid(), PendingRequestQ()->size()));
} //CTmTxBase::PendingRequestQ_push
// ----------------------------------------------------------------------------
// CTmTxBase::queueToTransaction
// Purpose : Queues the request against the transaction object.
// Check that we aren't already processing a request which alters the transaction state
// (Begin, Abort, or End). If we are, then put this request on the PendingRequestQ.
// If the transaction is not processing a state altering request, then queue the request
// directly to the transactions event queue.
// Returns true if it successfully queued the message against the transaction
// object. Otherwise it returns false.
// ----------------------------------------------------------------------------
bool CTmTxBase::queueToTransaction(TM_Txid_Internal *pp_transid, CTmTxMessage *pp_msg)
{
bool lv_success = false;
TMTrace (2, ("CTmTxBase::queueToTransaction : ENTRY\n"));
// There is not need to lock the transaction object here since we're only writing to the
// event queue.
Qlock();
if (txn_object_good(pp_transid, pp_msg))
{
TMTrace (3, ("CTmTxBase::queueToTransaction txn ID (%d,%d), "
"Request %d queued directly to txn object event queue for msgid(%d), entries in eventQ %ld.\n",
node(), seqnum(), pp_msg->requestType(), pp_msg->msgid(), iv_eventQ.size()));
eventQ_push(pp_msg, (ip_Thread?true:false));
lv_success = true;
}
Qunlock();
TMTrace (2, ("CTmTxBase::queueToTransaction : EXIT returning %d, entries in eventQ %ld\n",
lv_success, iv_eventQ.size()));
return lv_success;
} // CTmTxBase::queueToTransaction
// ----------------------------------------------------------------------------
// CTmTxBase::currRequest
// Purpose : set the current request.
// ----------------------------------------------------------------------------
void CTmTxBase::currRequest(CTmTxMessage * pp_msg)
{
if (ip_currRequest != NULL)
{
tm_log_event(DTM_TMTX_INVALID_STATE_CURR_REQ, SQ_LOG_CRIT, "DTM_TMTX_INVALID_STATE_CURR_REQ",
-1, /*error_code*/
-1, /*rmid*/
-1, /*dtmid*/
iv_transid.iv_seq_num, /*seq_num*/
ip_currRequest->msgid(), /*msgid*/
-1, /*xa_error*/
-1, /*pool_size*/
-1, /*pool_elems*/
-1, /*msg_retries*/
-1, /*pool_high*/
-1, /*pool_low*/
-1, /*pool_max*/
-1, /*tx_state*/
-1, /*data */
-1, /*data1*/
-1,/*data2 */
NULL,/*string1*/
-1, /* node =*/
pp_msg->msgid() /*msgid2*/
);
TMTrace (1, ("CTmTxBase::currRequest - Failed to set currRequest for Transaction ID %d, current "
"msgid %d, new msgid %d\n",
iv_transid.iv_seq_num, ip_currRequest->msgid(), pp_msg->msgid()));
abort();
}
ip_currRequest = pp_msg;
} // CTmTxBase::currRequest
// ----------------------------------------------------------------------------
// CTmTxBase::addTimerEvent
// Purpose : Wrapper to simplify the addition of timer events to the timer
// threads event queue by transaction. Events will always be queued back
// against the calling transaction objects event queue.
// pv_type is a message type
// pv_delayInterval is the interval the timer thread will wait before
// posting the event back to the transaction in msec.
// ----------------------------------------------------------------------------
CTmTimerEvent * CTmTxBase::addTimerEvent(TM_MSG_TYPE pv_type, int pv_delayInterval)
{
TMTrace (2, ("CTmTxBase::addTimerEvent : ENTRY, msg type %d, delay %d\n",
pv_type, pv_delayInterval));
CTmTimerEvent *lp_timerEvent = new CTmTimerEvent(pv_type, thread(), this, pv_delayInterval);
gv_tm_info.tmTimer()->eventQ_push((CTmEvent *) lp_timerEvent);
return lp_timerEvent;
} // CTmTxBase::addTimerEvent
// ----------------------------------------------------------------------------
// CTmTxBase::setAbortTimeout
// Purpose : Set the abort timeout for this transaction. This involves
// creating a new timer event for the timeout.
// pv_timeout is the timeout in seconds.
// -1 = wait forever.
// 0 = use default (gv_tm_info.timeout())
// ----------------------------------------------------------------------------
void CTmTxBase::setAbortTimeout(int32 pv_timeout)
{
int32 lv_timeout;
int32 lv_msec;
if (pv_timeout != 0)
lv_timeout = pv_timeout;
else
lv_timeout = gv_tm_info.timeout();
lv_msec = (lv_timeout==-1)?-1:(lv_timeout*1000);
TMTrace (2, ("CTmTxBase::setAbortTimeout : ENTRY, timeout %d\n",
lv_timeout));
if (ip_timeoutEvent != NULL)
gv_xaTM.tmTimer()->cancelEvent(ip_timeoutEvent);
if (lv_timeout != -1)
ip_timeoutEvent = addTimerEvent(TM_MSG_TXINTERNAL_ROLLBACK, lv_msec);
} // CTmTxBase::setAbortTimeout
// ----------------------------------------------------------------------------
// CTmTxBase::addHungTimerEvent
// Purpose : Add a hung timer event for this transaction. Only one of these
// is allowed per transaction at any time.
// pv_type should be TM_MSG_TXINTERNAL_REDRIVEROLLBACK or TM_MSG_TXINTERNAL_REDRIVECOMMIT.
// ----------------------------------------------------------------------------
void CTmTxBase::addHungTimerEvent(TM_MSG_TYPE pv_type)
{
TMTrace (2, ("CTmTxBase::addHungTimerEvent : ENTRY ID %d, msg type %d\n",
seqnum(), pv_type));
if (ip_hung_event == 0)
ip_hung_event = addTimerEvent(pv_type,
gv_tm_info.trans_hung_retry_interval());
} //addHungTimerEvent
// ----------------------------------------------------------------------------
// CTmTxBase::hung_redrive
// Redrive a hung transaction.
// This routine walks through the global RM info table
// ----------------------------------------------------------------------------
void CTmTxBase::hung_redrive()
{
//RM_Info_TSEBranch *lp_rmInfo; // Txn copy of RM info.
TMTrace (2, ("CTmTxBase::hung_redrive : ENTRY ID %d\n", seqnum()));
TM_MSG_TYPE lv_type;
switch (tx_state())
{
case TM_TX_STATE_HUNGABORTED:
lv_type = TM_MSG_TXINTERNAL_REDRIVEROLLBACK;
break;
case TM_TX_STATE_HUNGCOMMITTED:
lv_type = TM_MSG_TXINTERNAL_REDRIVECOMMIT;
break;
default:
// Ignore, not hung.
return;
} // switch state
/*Commented out as I don't think we mark the RMs as failed over or recovering
in the ia_TSEBranches table.
for (int lv_index = 0; ((lv_index <= ia_TSEBranches.return_highest_index_used()) &&
(lv_index < lv_count)); lv_index++)
{
lp_rmInfo = ia_TSEBranches.get_slot(lv_index);
if (lp_rmInfo->state() == TSEBranch_FAILOVER)
{
lp_rmInfo->state(TSEBranch_UP);
// Set the RM's 'partic' flag to true
safe_initialize_slot(lp_rmInfo->rmid());
inc_prepared_rms();
}
} // for each rm
*/
CTmTxMessage * lp_msg = new CTmTxMessage(lv_type);
queueToTransaction(&iv_transid, lp_msg);
TMTrace (2, ("CTmTxBase::hung_redrive : EXIT\n"));
} //CTmTxBase::hung_redrive
// ------------------------------------------------------------
// CTmTxBase::associate_tx
// Instantiate a transaction thread if required and associate
// it with this transaction.
// Queue the specified event to this transaction object.
// Returns true if a thread has been associated with this
// transaction.
// false if no thread was associated.
// ------------------------------------------------------------
bool CTmTxBase::associate_tx(CTmTxMessage *pp_msg)
{
TMTrace (2, ("CTmTxBase::associate_tx : ENTRY Txn ID (%d,%d) request %d.\n",
node(), seqnum(), pp_msg->requestType()));
// Transaction threads instantiate a thread object
// and associate it with the transaction for its whole life.
bool lv_threadAssociated = false;
if (gv_tm_info.threadModel() == transaction)
{
TMTrace(3, ("CTmTxBase::associate_tx : Instantiating new thread.\n"));
CTxThread * lp_thread = gv_tm_info.new_thread(this);
if (lp_thread != NULL)
lv_threadAssociated = true;
}
TMTrace(2, ("CTmTxBase::associate_tx : EXIT returning threadAssociated %d.\n",
lv_threadAssociated));
return lv_threadAssociated;
} //associate_tx
// ------------------------------------------------------------
// CTmTxBase::recover_tx
// This is used during recovery to instantiate and associate
// a thread with this transaction, and queue a request against
// it.
// ------------------------------------------------------------
void CTmTxBase::recover_tx(TM_MSG_TYPE pv_type)
{
TMTrace (2, ("CTmTxBase::recover_tx : ENTRY Txn ID (%d,%d) request %d.\n",
node(), seqnum(), pv_type));
CTmTxMessage * lp_msg = new CTmTxMessage(pv_type);
bool lv_threadAssociated = associate_tx(lp_msg);
stats()->txnTotal()->start();
// Set the appropriate state before queuing the request
switch (pv_type)
{
case TM_MSG_TXINTERNAL_REDRIVEROLLBACK:
stats()->txnAbort()->start();
tx_state(TM_TX_STATE_ABORTING);
break;
case TM_MSG_TXINTERNAL_REDRIVECOMMIT:
stats()->txnCommit()->start();
tx_state(TM_TX_STATE_COMMITTING);
break;
default:
// Should never get here
TMTrace(1, ("CTmTxBase::recover_tx : PROGRAMMING ERROR!\n"));
tm_log_event(DTM_RECOV_BAD_TYPE, SQ_LOG_CRIT, "DTM_RECOV_BAD_TYPE");
abort();
}
// Setting the transactionBusy must succeed here because no one
// else knows about the transaction yet!
Qlock();
bool lv_txnFree = set_transactionBusy(lp_msg);
Qunlock(); //No need to wait here, we will only queue against eventQ
if (!lv_txnFree)
{
TMTrace(1, ("CTmTxBase::recover_tx : ERROR transaction collision - txn ID (%d,%d) busy!\n",
node(), seqnum()));
tm_log_event(DTM_RECOV_TRANSACTION_COLLISION, SQ_LOG_CRIT, "DTM_RECOV_TRANSACTION_COLLISION",
-1, -1, node(), seqnum());
abort();
}
// Queue the begin against the transaction object
eventQ_push(lp_msg, lv_threadAssociated);
TMTrace(2, ("CTmTxBase::recover_tx : EXIT.\n"));
} //recover_tx
// --------------------------------------------------------------
// process_eventQ
// --------------------------------------------------------------
void CTmTxBase::process_eventQ()
{
TMTrace (2, ("CTmTxBase::process_eventQ : BASE CLASS! ID %d, thread %s.\n",
iv_tag, ((gv_tm_info.multithreaded())?ip_Thread->get_name():"SINGLE THREAD")));
abort();
}
// -------------------------------------------------------------
// schedule_eventQ
// -------------------------------------------------------------
void CTmTxBase::schedule_eventQ()
{
TMTrace (2, ("CTmTxBase::schedule_eventQ BASE CLASS! ID %d, state %d.\n",
iv_tag, iv_tx_state));
abort();
}
// --------------------------------------------------------------
// schedule_abort
// --------------------------------------------------------------
int32 CTmTxBase::schedule_abort()
{
int32 lv_error = FEOK;
TMTrace (2, ("CTmTxBase::schedule_abort BASE CLASS!, Txn ID (%d,%d), state %d.\n",
node(), seqnum(), iv_tx_state));
abort();
return lv_error;
}
// --------------------------------------------------------------
// doom_txn
// --------------------------------------------------------------
short CTmTxBase::doom_txn()
{
short lv_error = FEOK;
TMTrace (2, ("CTmTxBase::doom_txn BASE CLASS!, ID %d, state %d.\n",
seqnum(), iv_tx_state));
abort();
return lv_error;
}
// --------------------------------------------------------------
// redriverollback_txn
// --------------------------------------------------------------
bool CTmTxBase::redriverollback_txn(CTmTxMessage * pp_msg)
{
bool lv_terminate = false;
TMTrace (2, ("CTmTxBase::redriverollback_txn BASE CLASS!: ID %d.\n", seqnum()));
abort();
return lv_terminate;
}