blob: 6722ca4ce172133bc6316668c707c6594a789989 [file] [log] [blame]
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
#include <stdlib.h>
#include "seabed/thread.h"
#include "tmtxthread.h"
#include "tmlogging.h"
#include "tmtx.h"
#include "tminfo.h"
#include "seabed/trace.h"
#include "hbasetm.h"
// Globals
//----------------------------------------------------------------------------
// CTxThread::CTxThread
// Purpose : Constructor
//----------------------------------------------------------------------------
CTxThread::CTxThread(SB_Thread::Sthr::Function pv_fun, int64 pv_num, const char *pp_name)
:CTmThread(pv_fun, pv_num, pp_name),iv_id(0)
{
TMTrace(2, ("CTxThread::CTxThread: ENTRY thread object %p(%s)\n",
(void *) this, pp_name));
//EID(EID_CTxThread);
delete_exit(false);
set_daemon(true);
reset();
}
//----------------------------------------------------------------------------
// CTxThread::~CTxThread
// Purpose : Destructor
//----------------------------------------------------------------------------
CTxThread::~CTxThread()
{
TMTrace(2, ("CTxThread::~CTxThread : ENTRY.\n"));
ip_txn = NULL;
int lv_error = gv_HbaseTM.detachThread();
if (lv_error)
{
TMTrace(1,("CTxThread::~CTxThread: CHbaseTM::detachThread failed with error %d.\n", lv_error));
tm_log_event(DTM_HBASE_DETACHTHREAD_FAILED, SQ_LOG_CRIT, "DTM_HBASE_DETACHTHREAD_FAILED", lv_error);
abort();
}
}
//----------------------------------------------------------------------------
// CTxThread::reset
// Purpose : reset values in this thread object.
//----------------------------------------------------------------------------
void CTxThread::reset()
{
ip_txn = NULL;
iv_state = TM_TX_TH_STATE_IDLE;
} //CTxThread::reset
//----------------------------------------------------------------------------
// CTxThread::constructPoolElement
// Purpose : Callback for CTmPool elements.
// This method is called to construct a CTxThread object by CTmPool::newElement.
//----------------------------------------------------------------------------
CTxThread * CTxThread::constructPoolElement(int64 pv_threadNum)
{
char lv_name[20];
sprintf(lv_name, "workTh" PFLL "", pv_threadNum);
TMTrace (2, ("CTxThread::constructPoolElement : ENTRY Instantiating new txn thread object %s.\n",
(char *) &lv_name));
CTxThread *lp_Thread = new CTxThread(txThread_main, pv_threadNum, (const char *) &lv_name);
if (lp_Thread)
lp_Thread->start();
else
{
tm_log_event(DTM_LOGIC_ERROR, SQ_LOG_CRIT, "DTM_LOGIC_ERROR",
-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,lv_name);
TMTrace (1, ("CTxThread::constructPoolElement : Failed to instantiate transaction thread object.\n"));
abort();
}
TMTrace (2, ("CTxThread::constructPoolElement : EXIT Thread object %p(%s) instantiated.\n",
(void *) lp_Thread, (char *) &lv_name));
return lp_Thread;
} //CTxThread::constructPoolElement
//----------------------------------------------------------------------------
// CTxThread::cleanPoolElement
// Purpose : Callback for CTmPool elements.
// This method is called to clean a CTxThread object when CTmPool::newElement
// allocated it from the freeList.
// Returns the threadNum which is reused to add the thread object to the
// CTmPool<CTxThread> inUseList.
//----------------------------------------------------------------------------
int64 CTxThread::cleanPoolElement()
{
reset();
return threadNum();
} //CTxThread::cleanPoolElement
//----------------------------------------------------------------------------
// CTxThread::associate
// Purpose : Associate a transaction object with this thread.
//----------------------------------------------------------------------------
void CTxThread::associate(TM_TX_Info * pp_txn)
{
TM_TX_Info * lp_disassociatedTxn = NULL;
TMTrace(2, ("CTxThread::associate ENTRY, txn tag=%d\n", pp_txn->tag()));
lock();
ip_txn = pp_txn;
ip_txn->thread(this);
ip_txn->threadPending(false);
iv_state = TM_TX_TH_STATE_ACTIVE;
unlock();
// Check the disassociatedTxn list for this transaction and remove any entries
// if present. There could be more than one. We now have an association.
int lv_count = 0;
gv_tm_info.txdisassociatedQ()->lock();
lp_disassociatedTxn = (TM_TX_Info *) gv_tm_info.txdisassociatedQ()->get_firstFIFO();
while (lp_disassociatedTxn != NULL) {
// If we found the transaction in the list, remove it.
if (pp_txn == lp_disassociatedTxn)
{
lv_count++;
TMTrace(3, ("CTxThread::associate : Txn ID (%d,%d) found on disassociated "
"transaction list, deleting it (entry %d).\n",
pp_txn->node(), pp_txn->seqnum(), lv_count));
gv_tm_info.txdisassociatedQ()->erase(); //delete txn from list
}
lp_disassociatedTxn = (TM_TX_Info *) gv_tm_info.txdisassociatedQ()->get_nextFIFO();
}
gv_tm_info.txdisassociatedQ()->unlock();
TMTrace(2, ("CTxThread::associate EXIT. Removed %d entries from disassociatedTxnQ\n", lv_count));
}
//----------------------------------------------------------------------------
// CTxThread::disassociate
// Purpose : Disassociate the transaction object from this thread.
// Returns true if the transaction was successfully disassociated from the
// thread.
// false if the transaction still has queued events outstanding. This
// will only happen when using worker threads.
//----------------------------------------------------------------------------
bool CTxThread::disassociate()
{
bool lv_cleanedup = false;
TM_TX_Info *lp_disassociatedTxn = NULL;
TMTrace(2, ("CTxThread::disassociate ENTRY, ID %d, Thread %s(%ld)\n",
ip_txn->seqnum(), get_name(), get_id()));
if(ip_txn == NULL) {
TMTrace (1, ("CTxThread::disassociate : ERROR! ip_txn is NULL\n"));
return false;
}
// Grab the transactions Qmutex here to make sure no one can queue new requests against
// this transaction until we've finished disassociating.
ip_txn->Qlock();
// Now that we've locked the event queues, check to see if they're empty
// This can happen when a request arrives and is queued between reset_transactionBusy
// completing and disassociate being called.
// Note this only applies to worker threads. Transaction threads will be finished when
// we disassociate.
if (!ip_txn->eventQ()->empty() || !ip_txn->PendingRequestQ()->empty())
{
ip_txn->Qunlock();
if (ip_txn->eventQ()->empty())
{
// Check the PendingRequest queue
CTmTxMessage *lp_msg = (CTmTxMessage *) ip_txn->PendingRequestQ()->pop_end();
if (lp_msg)
{
TMTrace (3, ("CTxThread::disassociate : ID %d"
" request popped off PendingRequestQ for msgid(%d).\n",
ip_txn->seqnum(), lp_msg->msgid()));
ip_txn->queueToTransaction(ip_txn->transid(), lp_msg);
}
else
{
tm_log_event(DTM_LOGIC_ERROR_NO_ENTRY, SQ_LOG_CRIT, "DTM_LOGIC_ERROR_NO_ENTRY",
-1,-1,-1,ip_txn->seqnum());
TMTrace (1, ("CTxThread::disassociate : LOGIC ERROR! ID %d "
"Expected to find an entry in eventQ or PendingRequestQ "
"for txn.\n", ip_txn->seqnum()));
abort();
}
}
TMTrace(2, ("CTxThread::disassociate EXIT returning false(events queued, not disassociated)\n"));
return false;
}
lock();
// Save the transaction object pointer so that we can unlock later.
if(ip_txn != NULL)
lp_disassociatedTxn = ip_txn;
// This would indicate that we are aborting a transaction on
// behalf of another TM. We do not have an internal rep for
// this, so don't try to delete it
if (ip_txn != NULL && ip_txn->dec_cleanup() && ip_txn->tag() != -1)
{
ip_txn->Qunlock();
gv_tm_info.cleanup(ip_txn);
// Can't use ip_txn after cleanup because it may have been deleted!
ip_txn = NULL;
lv_cleanedup = true;
}
else
if (gv_tm_info.iv_trace_level >= 3)
{
//There's a possibility that ip_txn could be NULL here, so check
//for that before accessing fields within it.
if (ip_txn == NULL)
trace_printf("CTxThread::disassociate not cleaning up. Null Tx\n");
else
trace_printf("CTxThread::disassociate not cleaning up. Tx cleanup flag %d.\n",
ip_txn->list_cleanup());
}
// For worker threads we need to clear the thread pointer from the transaction object
// each time we disassociate.
if (gv_tm_info.threadModel() == worker && ip_txn != NULL && ip_txn->thread())
ip_txn->thread(NULL);
ip_txn = NULL;
iv_state = TM_TX_TH_STATE_IDLE;
unlock();
// Release the transaction event queues.
if (lv_cleanedup == false)
lp_disassociatedTxn->Qunlock();
TMTrace(2, ("CTxThread::disassociate EXIT returning true(disassociated)\n"));
return true;
} //disassociate
// --------------------------------------------------------------
// eventQ_push
// Purpose - push a new event to the threads event queue.
// These are pushed in FIFO order.
// --------------------------------------------------------------
void CTxThread::eventQ_push(CTmEvent *pp_event)
{
Tm_Req_Msg_Type *lp_req = pp_event->request();
if (lp_req == NULL)
{
tm_log_event(DTM_THRD_INVALID_MSG_TYPE, SQ_LOG_CRIT, "DTM_THRD_INVALID_MSG_TYPE");
TMTrace(1, ("CTxThread::eventQ_push - Request to be queued is NULL.\n"));
abort ();
}
eventQ()->push(pp_event);
TMTrace(3, ("CTxThread::eventQ_push : signaling thread, event %d.\n",
lp_req->iv_msg_hdr.rr_type.request_type));
eventQ_CV()->signal(true /*lock*/);
} //CTxThread::eventQ_push
// --------------------------------------------------------------
// eventQ_push_top
// Purpose - push an event to the top of the queue.
// This is used for TM internal messages where they must be
// processed before anything else like an Initialize.
// --------------------------------------------------------------
void CTxThread::eventQ_push_top(CTmEvent *pp_event)
{
Tm_Req_Msg_Type *lp_req = pp_event->request();
if (lp_req == NULL)
{
tm_log_event(DTM_THRD_INVALID_MSG_TYPE, SQ_LOG_CRIT, "DTM_THRD_INVALID_MSG_TYPE");
TMTrace(1, ("CTxThread::eventQ_push_top - Request to be queued is NULL.\n"));
abort ();
}
eventQ()->push_back(pp_event);
TMTrace(3, ("CTxThread::eventQ_push_top : signaling thread, event %d.\n",
lp_req->iv_msg_hdr.rr_type.request_type));
eventQ_CV()->signal(true /*lock*/);
} //CTxThread::eventQ_push_top
// --------------------------------------------------------------
// eventQ_pop
// Purpose - pop an event from the end of the queue. Events are
// always processed in FIFO order.
// Remember to dispose of the returned event once finished with
// it!
// --------------------------------------------------------------
CTmEvent * CTxThread::eventQ_pop()
{
CTmEvent *lp_event = NULL;
while (eventQ()->empty())
{
TMTrace(3, ("CTxThread::eventQ_pop event Q empty, waiting for signal.\n"));
eventQ_CV()->wait(true /*lock*/);
}
lp_event = (CTmEvent *) eventQ()->pop_end();
TMTrace(3, ("CTxThread::eventQ_pop Thread %s returning event %d.\n",
get_name(), lp_event->request()->iv_msg_hdr.rr_type.request_type));
return lp_event;
} //CTxThread::eventQ_pop
//----------------------------------------------------------------------------
// txThread_main
// Purpose : Main for transaction threads
//----------------------------------------------------------------------------
void * txThread_main(void *arg)
{
char la_buf[DTM_STRING_BUF_SIZE];
char *lp_threadname = SB_Thread::Sthr::self_name();
CTmEvent *lp_event;
CTxThread *lp_thread = NULL;
bool lv_exit = false;
bool lv_disassociated = false;
arg = arg;
TMTrace(2, ("txThread_main : ENTRY for thread %s.\n", lp_threadname));
//Waiting for the threadpool to be unlocked to continue
gv_tm_info.threadPool()->lock();
gv_tm_info.threadPool()->unlock();
lp_thread = gv_tm_info.get_thread(lp_threadname);
if (lp_thread == NULL)
{
tm_log_event(DTM_THRD_INVALID_THREAD, SQ_LOG_CRIT, "DTM_THRD_INVALID_THREAD");
TMTrace(1, ("txThread_main : DEATH, couldn't find my thread %s!\n", lp_threadname));
fflush(stdout);
abort ();
}
lp_thread->set_id(SB_Thread::Sthr::self_id());
lp_thread->state(TM_TX_TH_STATE_IDLE);
// Attach the thread to the JVM for HBase TM LIbrary.
int lv_error = gv_HbaseTM.initJVM();
if (lv_error)
{
TMTrace(1,("txThread_main: CHbaseTM::attachThread failed with error %d.\n", lv_error));
tm_log_event(DTM_HBASE_ATTACHTHREAD_FAILED, SQ_LOG_CRIT, "DTM_HBASE_ATTACHTHREAD_FAILED", lv_error);
abort();
}
else
TMTrace(1,("CTxThread::CTxThread: CHbaseTM::attachThread succeeded for thread " PFLL "(%s).\n", lp_thread->threadNum(), lp_threadname));
while (!lv_exit)
{
lp_event = lp_thread->eventQ_pop();
switch (lp_event->requestType())
{
case TM_MSG_TXTHREAD_INITIALIZE:
{
if (lp_thread->state() == TM_TX_TH_STATE_IDLE)
{
TM_TX_Info *lp_tx = (TM_TX_Info *) lp_event->request()->u.iv_init_txthread.ip_txObject;
TMTrace(3, ("txThread_main : Initialize Thread Obj %p(%s), Txn Obj %p, ID %d\n",
(void *) lp_thread, lp_thread->get_name(),
(void *) lp_tx, lp_tx->seqnum()));
lp_thread->associate(lp_tx);
do
{
lp_tx->process_eventQ();
lv_disassociated = lp_thread->disassociate();
} while (!lv_disassociated);
}
else
{
lv_exit = true;
// EMS DTM_TXTHREAD_BAD_STATE
sprintf(la_buf, "Txn Thread %s main received Initialize event but not in "
"Idle state, terminating.\n", lp_threadname);
TMTrace(3, ("txThread_main : %s", la_buf));
tm_log_event(DTM_TXTHREAD_BAD_STATE, SQ_LOG_CRIT, "DTM_TXTHREAD_BAD_STATE",
-1, /*error_code*/
-1, /*rmid*/
-1, /*dtmid*/
-1, /*seq_num*/
-1, /*msgid*/
-1, /*xa_error*/
-1, /*pool_size*/
-1, /*pool_elems*/
-1, /*msg_retries*/
-1, /*pool_high*/
-1, /*pool_low*/
-1, /*pool_max*/
-1, /*tx_state*/
-1, /*data */
-1, /*data1*/
-1,/*data2 */
lp_threadname);
}
break;
} //case INITIALIZE
case TM_MSG_TXTHREAD_RELEASE:
// Nothing to do here as we aren't using this right now.
// If we receive a release here while in idle state, we just ignore it.
// EMS DTM_TXTHREAD_BAD_EVENT_RELEASE
sprintf(la_buf, "Txn Thread %s main received an unexpected Release event "
"in idle state.\n", lp_threadname);
TMTrace(3, ("txThread_main : %s", la_buf));
tm_log_event(DTM_TXTHREAD_BAD_EVENT_RELEASE, SQ_LOG_CRIT,
"DTM_TXTHREAD_BAD_EVENT_RELEASE",
-1, /*error_code*/
-1, /*rmid*/
-1, /*dtmid*/
-1, /*seq_num*/
-1, /*msgid*/
-1, /*xa_error*/
-1, /*pool_size*/
-1, /*pool_elems*/
-1, /*msg_retries*/
-1, /*pool_high*/
-1, /*pool_low*/
-1, /*pool_max*/
-1, /*tx_state*/
-1, /*data */
-1, /*data1*/
-1,/*data2 */
lp_threadname);
break;
case TM_MSG_TXTHREAD_TERMINATE:
TMTrace(3, ("txThread_main : Thread %s processing terminate request.\n",
lp_threadname));
lv_exit = true;
// If there was an associated transaction, delete it now
if (lp_thread->transaction() != NULL)
gv_tm_info.remove_tx(lp_thread->transaction());
break;
default:
lv_exit = true;
// EMS DTM_TXTHREAD_BAD_EVENT
TMTrace(3, ("txThread_main : Txn Thread %s main received an unexpected event %d, "
"terminating.\n", lp_threadname, lp_event->requestType()));
tm_log_event(DTM_TXTHREAD_BAD_EVENT, SQ_LOG_CRIT,
"DTM_TXTHREAD_BAD_EVENT",
-1, /*error_code*/
-1, /*rmid*/
-1, /*dtmid*/
-1, /*seq_num*/
-1, /*msgid*/
-1, /*xa_error*/
-1, /*pool_size*/
-1, /*pool_elems*/
-1, /*msg_retries*/
-1, /*pool_high*/
-1, /*pool_low*/
-1, /*pool_max*/
-1, /*tx_state*/
lp_event->requestType(), /*data */
-1, /*data1*/
-1,/*data2 */
lp_threadname);
} //switch
//Now that we're done with the event, dispose of it.
delete lp_event;
// Check to see if we have pending work for any transactions not
// currently associated with a thread. If there are no transaction
// objects with queued events waiting for a thread, then release this
// thread back to the pool.
if (lv_exit == false)
lv_exit = gv_tm_info.release_thread(lp_thread);
} //while
gv_tm_info.threadPool()->dec_totalElements();
TMTrace(2, ("txThread_main : EXIT for thread %p(%s). Thread terminating. %d txn "
"threads still active.\n",
(void *) lp_thread, lp_threadname, gv_tm_info.threadPool()->totalElements()));
return NULL;
} //txThread_main