| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| |
| #include <sys/types.h> |
| #include <stdlib.h> |
| #include <string.h> |
| |
| #include "seabed/ms.h" |
| #include "rm.h" |
| #include "tmtx.h" |
| #include "xatmmsg.h" |
| #include "xatmlib.h" |
| #include "seabed/trace.h" |
| #include "tmlogging.h" |
| #include "tmxatxn.h" |
| #include "tminfo.h" |
| #include "tmxidmap.h" |
| |
| extern XID_MAP *gp_xarmXIDList; |
| |
| // ----------------------------------------------------------------- |
| // helper methods |
| // ----------------------------------------------------------------- |
| |
| // ------------------------------------- |
| // CTmXaTxn Methods |
| // ------------------------------------- |
| |
| // ------------------------------------- |
| // CTmXaTxn constructor |
| // Purpose : calls initialize |
| // ------------------------------------- |
| CTmXaTxn::CTmXaTxn(int32 pv_nid, int64 pv_flags, int32 pv_trace_level, |
| int32 pv_seq, int32 pv_pid, int32 pv_rm_wait_time) |
| { |
| //EID(EID_TM_TX_INFO); |
| // Initialize iv_trace_level and iv_lock_count here because they're used in calls |
| // to lock(). |
| iv_trace_level = pv_trace_level; |
| // Mutex attributes: Recursive = true, ErrorCheck=false |
| ip_mutex = new TM_Mutex(true, false); |
| iv_ender_nid = pv_nid; //TMs nid is the default for the ender nid. |
| ip_timeoutEvent = NULL; |
| ip_hung_event = NULL; |
| ip_Thread = NULL; |
| ip_Qlock_owner = NULL; |
| ip_currRequest = NULL; |
| |
| memset(&iv_transid, 0, sizeof(TM_Txid_Internal)); |
| iv_tag = 0; |
| iv_tx_state = TM_TX_STATE_NOTX; |
| iv_ender_pid = 0; |
| iv_num_active_partic.set_val(0); |
| iv_prepared_rms = 0; |
| iv_cleanup_sem = 0; |
| iv_rm_wait_time = 0; |
| iv_in_use = false; |
| iv_incremented = false; |
| iv_mark_for_rollback = false; |
| iv_tm_aborted = false; |
| iv_tse_aborted = false; |
| iv_appl_aborted = false; |
| iv_heur_aborted = false; |
| iv_read_only = false; |
| iv_use_ext_transid = false; |
| iv_written_in_cp = false; |
| iv_wrote_trans_state = false; |
| iv_recovering = false; |
| iv_transactionBusy = false; |
| iv_timeout = 0; |
| |
| ip_branches = new CTmTxBranches(); |
| |
| // Initialize separately from TM_Info::new_tx as we don't have the creator nid,pid when called by |
| // the tmPool constructor. |
| // this->initialize(pv_nid, pv_flags, pv_trace_level, pv_seq, pv_pid, pv_rm_wait_time); |
| } |
| |
| // ------------------------------------- |
| // CTmXaTxn destructor |
| // Purpose : |
| // ------------------------------------- |
| CTmXaTxn::~CTmXaTxn() |
| { |
| TMTrace (2, ("CTmXaTxn::~CTmXaTxn : ENTRY.\n")); |
| } |
| |
| |
| //---------------------------------------------------------------------------- |
| // CTmXaTxn::constructPoolElement |
| // Purpose : Callback for CTmPool elements. |
| // This method is called to construct a CTmXaTxn object by CTmPool::newElement. |
| //---------------------------------------------------------------------------- |
| CTmXaTxn * CTmXaTxn::constructPoolElement(int64 pv_id) |
| { |
| CTmTxKey k(pv_id); |
| |
| TMTrace (2, ("CTmXaTxn::constructPoolElement : ENTRY Instantiating new transaction object, ID (%d,%d).\n", |
| k.node(), k.seqnum())); |
| |
| CTmXaTxn *lp_tx = new CTmXaTxn(k.node(), 0, gv_tm_info.iv_trace_level, |
| k.seqnum(), 0, gv_tm_info.rm_wait_time()); |
| if (!lp_tx) |
| { |
| tm_log_event(DTM_LOGIC_ERROR_TX_OBJ, SQ_LOG_CRIT, "DTM_LOGIC_ERROR_TX_OBJ"); |
| TMTrace (1, ("CTmXaTxn::constructPoolElement : Failed to instantiate transaction object ID (%d,%d)\n", |
| k.node(), k.seqnum())); |
| abort(); |
| } |
| |
| TMTrace (2, ("CTmXaTxn::constructPoolElement : EXIT transaction object %p, ID (%d,%d) instantiated.\n", |
| (void *) lp_tx, k.node(), k.seqnum())); |
| return lp_tx; |
| } //CTmXaTxn::constructPoolElement |
| |
| |
| // --------------------------------------------------------- |
| // cleanup |
| // Purpose : Prepare transaction object for reuse. |
| // This is called cleanPoolElement. |
| // --------------------------------------------------------- |
| void CTmXaTxn::cleanup() |
| { |
| TMTrace (2, ("CTmXaTxn::cleanup : ENTRY transaction object %p, ID (%d,%d).\n", |
| (void *) this, node(), seqnum())); |
| lock(); |
| iv_txnType = TM_TX_TYPE_XARM; |
| iv_txnObj.ip_xaTxn = this; |
| iv_rmid = -1; |
| memset(&iv_XID, 0, sizeof(iv_XID)); |
| iv_tx_state = TM_TX_STATE_NOTX; |
| iv_in_use = false; |
| |
| if (ip_timeoutEvent != NULL) |
| { |
| gv_xaTM.tmTimer()->cancelEvent(ip_timeoutEvent); |
| ip_timeoutEvent = NULL; |
| } |
| if (num_active_partic()) |
| cleanup_app_partic(); |
| unlock(); |
| } |
| |
| |
| // ------------------------------------- |
| // initialize |
| // Purpose : Initialize this instance |
| // ------------------------------------- |
| void CTmXaTxn::initialize(int32 pv_nid, int64 pv_flags, int32 pv_trace_level, |
| int32 pv_seq, int32 pv_creator_nid, int32 pv_creator_pid, |
| int32 pv_rm_wait_time) |
| { |
| TMTrace(2, ("CTmXaTxn::initialize : ENTRY ID (%d,%d).\n",pv_nid, pv_seq)); |
| |
| // transid setup |
| lock(); |
| iv_txnType = TM_TX_TYPE_XARM; |
| iv_txnObj.ip_xaTxn = this; |
| iv_rmid = -1; |
| memset(&iv_XID, 0, sizeof(iv_XID)); |
| |
| CTmTxBase::initialize(pv_nid, pv_flags, pv_trace_level, pv_seq, pv_creator_nid, |
| pv_creator_pid, pv_rm_wait_time); |
| unlock(); |
| } // initialize |
| |
| |
| |
| // --------------------------------------------------------------- |
| // Transactional State Machine and helper |
| // --------------------------------------------------------------- |
| |
| TM_TX_STATE CTmXaTxn::state_change_abort_helper(CTmTxMessage * pp_msg) |
| { |
| int32 lv_error = XA_OK; |
| TM_TX_STATE lv_nextState = tx_state(); |
| |
| if (iv_wrote_trans_state == false) |
| { |
| gv_tm_info.write_trans_state(&iv_transid, TM_TX_STATE_ABORTED, abort_flags(), true); |
| iv_wrote_trans_state = true; |
| } |
| lv_error = branches()->rollback_branches(this, TT_flags(), pp_msg, (iv_tm_aborted | iv_tse_aborted)); |
| |
| switch (lv_error) |
| { |
| case XA_OK: |
| { |
| // Set the state to aborted now to allow SQL to continue |
| lv_nextState = TM_TX_STATE_ABORTED; |
| break; |
| } |
| case XA_HEURHAZ: |
| case XA_HEURCOM: |
| case XA_HEURRB: |
| case XA_HEURMIX: |
| { |
| // Got heuristic results from some RMs. |
| // Send xa_forget to those RMs. |
| // We set iv_partic to false for all branches that completed ok. |
| lv_nextState = TM_TX_STATE_FORGETTING_HEUR; |
| break; |
| } |
| case XAER_RMFAIL: |
| { |
| // At least one RM responded XAER_RMFAIL, failed to respond, or we |
| // encountered an unrecoverable Seabed error. |
| /*bug 1823 1/13/11: Commented out event as this event is written every time |
| we redrive rollback and can cause problems when a TSE is down. |
| tm_log_event(DTM_TMTX_TX_HUNGABORTED, SQ_LOG_WARNING, "DTM_TMTX_TX_HUNGABORTED", |
| -1,-1,-1,seqnum());*/ |
| TMTrace (1, ("CTmXaTxn::state_change_abort_helper - At least one RM failed to " |
| " respond or connection lost. " |
| "Transaction %d in aborted state placed in hungAborted state.\n", |
| seqnum())); |
| lv_nextState = TM_TX_STATE_HUNGABORTED; |
| gv_tm_info.inc_tx_hung_count(); |
| //reset_transactionBusy(); |
| // Set up retry |
| addHungTimerEvent(TM_MSG_TXINTERNAL_REDRIVEROLLBACK); |
| break; |
| } |
| default: // some unexpected error |
| { |
| tm_log_event(DTM_TMTX_TX_HUNGABORTED_ERROR, SQ_LOG_WARNING, "DTM_TMTX_TX_HUNGABORTED_ERROR", |
| lv_error); |
| TMTrace (1, ("CTmXaTxn::state_change_abort_helper - Unexpected XA error %d " |
| " returned by xa_rollback for aborted branch. " |
| "Transaction %d placed in hungAborted state.\n", |
| lv_error, seqnum())); |
| lv_nextState = TM_TX_STATE_HUNGABORTED; |
| gv_tm_info.inc_tx_hung_count(); |
| //reset_transactionBusy(); |
| // Set up retry |
| addHungTimerEvent(TM_MSG_TXINTERNAL_REDRIVEROLLBACK); |
| break; |
| } |
| } // switch (lv_error) |
| |
| return lv_nextState; |
| } //CTmXaTxn::state_change_abort_helper |
| |
| |
| // state_change_commit_helper |
| // Purpose : Process xa_commit. |
| TM_TX_STATE CTmXaTxn::state_change_commit_helper (CTmTxMessage * pp_msg, bool pv_read_only) |
| { |
| int32 lv_error = XA_OK; |
| TM_TX_STATE lv_nextState = tx_state(); |
| |
| if (pv_read_only) |
| { |
| if (pp_msg->replyPending()) |
| pp_msg->reply(lv_error); |
| // Don't write committed state record for read-only transactions |
| // TM_Info::write_trans_state (&iv_transid, TM_TX_STATE_COMMITTED, abort_flags, true); |
| // iv_wrote_trans_state = true; |
| TMTrace(3, ("state_change_commit_helper, read only, skipping commit record\n")); |
| lv_nextState = TM_TX_STATE_FORGETTING; |
| return lv_nextState; |
| } |
| |
| if (iv_wrote_trans_state == false) |
| { |
| gv_tm_info.write_trans_state(&iv_transid, TM_TX_STATE_COMMITTED, abort_flags(), true); |
| iv_wrote_trans_state = true; |
| } |
| lv_error = branches()->commit_branches(this, TT_flags(), pp_msg); |
| switch (lv_error) |
| { |
| case XA_OK: |
| { |
| lv_nextState = TM_TX_STATE_FORGETTING; |
| break; |
| } |
| case XA_HEURHAZ: |
| case XA_HEURCOM: |
| case XA_HEURRB: |
| case XA_HEURMIX: |
| { |
| // Got heuristic results from some RMs. |
| // Send xa_forget to those RMs. |
| // We set iv_partic to false for all branches that completed |
| // ok. |
| lv_nextState = TM_TX_STATE_FORGETTING_HEUR; |
| TMTrace (1, ("CTmXaTxn::state_change_commit_helper: Commit completed heuristically with XAER %d for " |
| "transaction ID (%d,%d).\n", |
| lv_error, node(), seqnum())); |
| break; |
| } |
| case XAER_RMFAIL: |
| { |
| // At least one RM responded XAER_RMFAIL, failed to respond, or we |
| // encountered an unrecoverable Seabed error. |
| tm_log_event(DTM_TMTX_TX_HUNGCOMMITTED, SQ_LOG_WARNING, "DTM_TMTX_TX_HUNGCOMMITTED", |
| -1,-1,-1,seqnum()); |
| TMTrace (1, ("CTmXaTxn::state_change_commit_helper - At least one RM failed to " |
| " respond or connection lost. " |
| "Transaction ID (%d,%d) in committed state placed in hungCommitted state.\n", |
| node(), seqnum())); |
| lv_nextState = TM_TX_STATE_HUNGCOMMITTED; |
| gv_tm_info.inc_tx_hung_count(); |
| reset_transactionBusy(); |
| // Set up retry |
| addHungTimerEvent(TM_MSG_TXINTERNAL_REDRIVECOMMIT); |
| break; |
| } |
| default: // some unexpected error |
| { |
| tm_log_event(DTM_TMTX_TX_HUNGCOMMITTED_ERROR, SQ_LOG_WARNING, "DTM_TMTX_TX_HUNGCOMMITTED_ERROR", lv_error); |
| TMTrace (1, ("CTmXaTxn::state_change_commit_helper - Unexpected XA error %d " |
| " returned by xa_commit for committed branch. " |
| "Transaction %d placed in hungCommitted state.\n", |
| lv_error, seqnum())); |
| lv_nextState = TM_TX_STATE_HUNGCOMMITTED; |
| gv_tm_info.inc_tx_hung_count(); |
| reset_transactionBusy(); |
| // Set up retry |
| addHungTimerEvent(TM_MSG_TXINTERNAL_REDRIVECOMMIT); |
| break; |
| } |
| } // switch |
| |
| return lv_nextState; |
| } //state_change_commit_helper |
| |
| |
| // state_change_prepare_helper |
| // Purpose : performs the work associated with xa_prepare |
| // Returns an XA error |
| TM_TX_STATE CTmXaTxn::state_change_prepare_helper(CTmTxMessage * pp_msg) |
| { |
| TM_TX_STATE lv_nextState = tx_state(); |
| int32 lv_error = mapErr(branches()->prepare_branches(this, TT_flags(), pp_msg)); |
| if (iv_mark_for_rollback == true) |
| { |
| pp_msg->responseError(FEABORTEDTRANSID); |
| lv_nextState = TM_TX_STATE_ABORTING; |
| iv_tse_aborted = true; |
| gv_tm_info.inc_tm_initiated_aborts(); |
| return lv_nextState; |
| } |
| |
| // need to save error! |
| switch (lv_error) |
| { |
| case XA_OK: |
| lv_nextState = TM_TX_STATE_PREPARED; |
| #ifdef DEBUG_MODE |
| bool lv_assert; |
| lv_assert = false; |
| ms_getenv_bool("TM_TEST_AFTER_PREPARE_ASSERT", &lv_assert); |
| if (lv_assert) |
| { |
| tm_log_event(TM_TEST_XA_OK_ASSERT, SQ_LOG_CRIT, "TM_TEST_XA_OK_ASSERT"); |
| TMTrace (1, ("CTmXaTxn::state_change_prepare_helper - TM test assert after XA_OK state\n")); |
| abort (); |
| } |
| #endif |
| break; |
| case XAER_RMFAIL: |
| lv_nextState = state_change_abort_set(pp_msg, XAER_RMFAIL); |
| break; |
| default: |
| // All other errors |
| lv_nextState = state_change_abort_set(pp_msg, XAER_RMFAIL); |
| break; |
| } |
| return lv_nextState; |
| } // state_change_prepare_helper |
| |
| TM_TX_STATE CTmXaTxn::state_change_abort_set(CTmTxMessage * pp_msg, short pv_error) |
| { |
| TM_TX_STATE lv_nextState = TM_TX_STATE_ABORTING; |
| |
| iv_tm_aborted = true; |
| gv_tm_info.inc_tm_initiated_aborts(); |
| // Reply now as we know the outcome and don't want to hold up the client |
| if (pp_msg->replyPending()) |
| pp_msg->reply(pv_error); |
| |
| return lv_nextState; |
| } |
| |
| |
| // state_change_forget_helper |
| // Purpose : Process xa_forget. |
| TM_TX_STATE CTmXaTxn::state_change_forget_helper (CTmTxMessage * pp_msg) |
| { |
| int32 lv_error = XA_OK; |
| TM_TX_STATE lv_nextState = tx_state(); |
| |
| if (iv_wrote_trans_state == false) |
| { |
| gv_tm_info.write_trans_state(&iv_transid, TM_TX_STATE_FORGOTTEN, abort_flags(), true); |
| iv_wrote_trans_state = true; |
| } |
| lv_error = branches()->forget_heur_branches(this, TT_flags()); |
| switch (lv_error) |
| { |
| case XA_OK: |
| { |
| lv_nextState = TM_TX_STATE_FORGOTTEN_HEUR; |
| break; |
| } |
| case XAER_RMFAIL: |
| { |
| // At least one RM responded XAER_RMFAIL, failed to respond, or we |
| // encountered an unrecoverable Seabed error. |
| tm_log_event(DTM_TMTX_TX_HUNGCOMMITTED, SQ_LOG_WARNING, "DTM_TMTX_TX_HUNGCOMMITTED", |
| -1,-1,-1,seqnum()); |
| TMTrace (1, ("CTmXaTxn::state_change_forget_helper - At least one RM failed to " |
| " respond or connection lost. " |
| "Transaction ID (%d,%d) in forget state placed in hungCommitted state.\n", |
| node(), seqnum())); |
| lv_nextState = TM_TX_STATE_HUNGCOMMITTED; |
| gv_tm_info.inc_tx_hung_count(); |
| reset_transactionBusy(); |
| // Set up retry |
| addHungTimerEvent(TM_MSG_TXINTERNAL_REDRIVECOMMIT); |
| break; |
| } |
| default: // some unexpected error |
| { |
| tm_log_event(DTM_TMTX_TX_HUNGCOMMITTED_ERROR, SQ_LOG_WARNING, "DTM_TMTX_TX_HUNGCOMMITTED_ERROR", lv_error); |
| TMTrace (1, ("CTmXaTxn::state_change_forget_helper - Unexpected XA error %d " |
| " returned by xa_forget for forgetting branch. " |
| "Transaction %d placed in hungCommitted state.\n", |
| lv_error, seqnum())); |
| lv_nextState = TM_TX_STATE_HUNGCOMMITTED; |
| gv_tm_info.inc_tx_hung_count(); |
| reset_transactionBusy(); |
| // Set up retry |
| addHungTimerEvent(TM_MSG_TXINTERNAL_REDRIVECOMMIT); |
| break; |
| } |
| } // switch |
| |
| return lv_nextState; |
| } //state_change_forget_helper |
| |
| |
| |
| // --------------------------------------------------------------------------- |
| // CTmXaTxn::state_change |
| // Purpose : This is the Finite State Machine implementation |
| // for XARM transactions. |
| // --------------------------------------------------------------------------- |
| int32 CTmXaTxn::state_change (TX_EVENT pv_event, CTmTxMessage *pp_msg) |
| { |
| bool lv_continue = true; |
| int32 lv_error = XA_OK; |
| |
| TMTrace (2, ("CTmXaTxn::state_change ENTRY, XA Txn ID (%d,%d) rmid " PFLL ", " |
| "XID %s, current state %d, new event %d\n", |
| node(), seqnum(), rmid(), XIDtoa(xid()), (int) tx_state(), (int) pv_event)); |
| while (lv_continue) |
| { |
| switch (pv_event) |
| { |
| case TX_BEGIN: |
| { |
| switch (iv_tx_state) |
| { |
| case TM_TX_STATE_NOTX: |
| case TM_TX_STATE_BEGINNING: |
| // Not a lot to do here - we assume the TSEs will enlist via ax_reg. |
| TM_TEST_PAUSE(iv_tx_state); |
| tx_state(TM_TX_STATE_ACTIVE); |
| lv_continue = false; |
| break; |
| default: |
| lv_error = XAER_PROTO; |
| tm_log_event(TM_TMTX_BRCH_STRT_UKN_STATE, SQ_LOG_CRIT, "TM_TMTX_BRCH_STRT_UKN_STATE", |
| -1,rmid(),node(),seqnum()); |
| TMTrace(1, ("CTmXaTxn::state_change - Branch event %d invalid for XA Txn ID (%d,%d) in state %d.\n", |
| pv_event, node(), seqnum(), tx_state())); |
| lv_continue = false; |
| } |
| break; |
| } |
| case TX_SUSPEND: //TODO Implement XA suspend/resume/join |
| case TX_JOIN: |
| case TX_END: |
| { |
| switch (iv_tx_state) |
| { |
| case TM_TX_STATE_ACTIVE: |
| TM_TEST_PAUSE(iv_tx_state); |
| tx_state(TM_TX_STATE_IDLE); |
| lv_continue = false; |
| break; |
| default: |
| lv_error = XAER_PROTO; |
| tm_log_event(TM_TMTX_BRCH_STRT_UKN_STATE, SQ_LOG_CRIT, "TM_TMTX_BRCH_STRT_UKN_STATE", |
| -1,rmid(),node(),seqnum()); |
| TMTrace(1, ("CTmXaTxn::state_change - Branch event %d invalid for XA Txn ID (%d,%d) in state %d.\n", |
| pv_event, node(), seqnum(), tx_state())); |
| lv_continue = false; |
| } |
| break; |
| } |
| case TX_PREPARE: |
| { |
| switch (iv_tx_state) |
| { |
| case TM_TX_STATE_ACTIVE: //Accept for now - shouldn't be legal |
| case TM_TX_STATE_IDLE: |
| TM_TEST_PAUSE(iv_tx_state); |
| // Send out prepares |
| tx_state(TM_TX_STATE_PREPARING); |
| tx_state(state_change_prepare_helper(pp_msg)); |
| if (tx_state() == TM_TX_STATE_PREPARED) |
| lv_continue = false; |
| break; |
| default: |
| lv_error = XAER_PROTO; |
| tm_log_event(TM_TMTX_BRCH_STRT_UKN_STATE, SQ_LOG_CRIT, "TM_TMTX_BRCH_STRT_UKN_STATE", |
| -1,rmid(),node(),seqnum()); |
| TMTrace(1, ("CTmXaTxn::state_change - Branch event %d invalid for XA Txn ID (%d,%d) in state %d.\n", |
| pv_event, node(), seqnum(), tx_state())); |
| lv_continue = false; |
| } |
| break; |
| } |
| case TX_COMMIT: |
| { |
| switch (iv_tx_state) |
| { |
| case TM_TX_STATE_PREPARED: |
| case TM_TX_STATE_HUNGCOMMITTED: |
| TM_TEST_PAUSE(iv_tx_state); |
| // Send out commits |
| tx_state(TM_TX_STATE_COMMITTING); |
| tx_state(state_change_commit_helper(pp_msg)); |
| if (tx_state() == TM_TX_STATE_FORGETTING || |
| tx_state() == TM_TX_STATE_FORGETTING_HEUR || |
| tx_state() == TM_TX_STATE_HUNGCOMMITTED || |
| tx_state() == TM_TX_STATE_HUNGABORTED) |
| lv_continue = false; |
| break; |
| default: |
| lv_error = XAER_PROTO; |
| tm_log_event(TM_TMTX_BRCH_STRT_UKN_STATE, SQ_LOG_CRIT, "TM_TMTX_BRCH_STRT_UKN_STATE", |
| -1,rmid(),node(),seqnum()); |
| TMTrace(1, ("CTmXaTxn::state_change - Branch event %d invalid for XA Txn ID (%d,%d) in state %d.\n", |
| pv_event, node(), seqnum(), tx_state())); |
| lv_continue = false; |
| } |
| break; |
| } |
| case TX_ROLLBACK: |
| { |
| switch (iv_tx_state) |
| { |
| case TM_TX_STATE_NOTX: |
| case TM_TX_STATE_BEGINNING: |
| case TM_TX_STATE_ACTIVE: |
| case TM_TX_STATE_IDLE: |
| case TM_TX_STATE_PREPARING: |
| case TM_TX_STATE_PREPARED: |
| case TM_TX_STATE_ABORTING: |
| case TM_TX_STATE_HUNGABORTED: |
| // Send out rollbacks |
| tx_state(TM_TX_STATE_ABORTING_PART2); |
| tx_state(state_change_abort_helper(pp_msg)); |
| lv_continue = false; |
| break; |
| default: |
| lv_error = XAER_PROTO; |
| tm_log_event(TM_TMTX_BRCH_STRT_UKN_STATE, SQ_LOG_CRIT, "TM_TMTX_BRCH_STRT_UKN_STATE", |
| -1,rmid(),node(),seqnum()); |
| TMTrace(1, ("CTmXaTxn::state_change - Branch event %d invalid for XA Txn ID (%d,%d) in state %d.\n", |
| pv_event, node(), seqnum(), tx_state())); |
| lv_continue = false; |
| } |
| break; |
| } |
| case TX_FORGET: |
| { |
| switch (iv_tx_state) |
| { |
| case TM_TX_STATE_FORGETTING: |
| case TM_TX_STATE_ABORTED: |
| // Normal commit path |
| pv_event = TX_TERMINATE; |
| lv_continue = true; |
| break; |
| case TM_TX_STATE_COMMITTING: |
| case TM_TX_STATE_COMMITTED: |
| case TM_TX_STATE_ABORTING: |
| case TM_TX_STATE_ABORTING_PART2: |
| // Send out forget |
| tx_state(TM_TX_STATE_FORGETTING_HEUR); |
| tx_state(state_change_forget_helper(pp_msg)); |
| lv_continue = false; |
| break; |
| default: |
| lv_error = XAER_PROTO; |
| tm_log_event(TM_TMTX_BRCH_STRT_UKN_STATE, SQ_LOG_CRIT, "TM_TMTX_BRCH_STRT_UKN_STATE", |
| -1,rmid(),node(),seqnum()); |
| TMTrace(1, ("CTmXaTxn::state_change - Branch event %d invalid for XA Txn ID (%d,%d) in state %d.\n", |
| pv_event, node(), seqnum(), tx_state())); |
| lv_continue = false; |
| } |
| break; |
| } |
| case TX_TERMINATE: |
| { |
| tx_state(TM_TX_STATE_TERMINATING); |
| lv_continue = false; |
| break; |
| } |
| |
| // we get this when we simply need to resent the commit |
| // at recovery time |
| case TX_REDRIVE_COMMIT: |
| { |
| TM_TEST_PAUSE(iv_tx_state); |
| // we've already written the trans state record if were are here |
| tx_state(TM_TX_STATE_COMMITTED); |
| pv_event = TX_COMMIT; |
| break; |
| } // case TX_REDRIVE_COMMIT |
| case TX_REDRIVE_ROLLBACK: |
| { |
| TM_TEST_PAUSE(iv_tx_state); |
| // we've already written the trans state record if were are here |
| tx_state(TM_TX_STATE_ABORTING_PART2); |
| pv_event = TX_ROLLBACK; |
| break; |
| } |
| default : |
| tm_log_event(TM_TMTX_INVALID_STATE, SQ_LOG_CRIT, "TM_TMTX_INVALID_STATE", |
| -1,rmid(),node(),seqnum()); |
| TMTrace (1, ("CTmXaTxn::state_change - tx state %d: " |
| "Invalid transaction state. Transaction seq num %d\n",pv_event, seqnum())); |
| abort(); |
| |
| }; //switch on event |
| } // while continue |
| |
| TM_TEST_PAUSE_NEXT(); |
| TMTrace (2, ("CTmXaTxn::state_change EXIT : txn ID %d, state %d, returning error %d.\n", |
| seqnum(), iv_tx_state, lv_error)); |
| return lv_error; |
| } // state_change |
| |
| |
| // -------------------------------------------------------------- |
| // schedule_abort |
| // This executes under the main thread, not under the transaction |
| // thread. If the transaction has already reached Phase 2, an |
| // error is returned. Otherwise a rollback will take place. |
| // If the transaction thread is not already processing an |
| // ENDTRANSACTION or ABORTTRANSACTION, then an abort event will |
| // be queued to the transaction thread. If the transaction thread |
| // is already processing a request, the transaction is marked for |
| // rollback and the already executing transaction thread will |
| // take care of the rollback. |
| // Return codes: |
| // FEOK Successful. |
| // FEENDEDTRANSID Transaction is already in Phase 2. |
| // -------------------------------------------------------------- |
| int32 CTmXaTxn::schedule_abort() |
| { |
| int32 lv_error = FEOK; |
| |
| TMTrace (2, ("CTmXaTxn::schedule_abort ENTRY, Txn ID (%d,%d), state %d.\n", |
| node(), seqnum(), iv_tx_state)); |
| |
| if ((tx_state() != TM_TX_STATE_NOTX) && // beginning |
| (tx_state() != TM_TX_STATE_BEGINNING) && // beginning part 2 |
| (tx_state() != TM_TX_STATE_ACTIVE) && // active |
| (tx_state() != TM_TX_STATE_PREPARING)) // about to commit |
| lv_error = FEENDEDTRANSID; |
| else |
| { |
| internal_abortTrans(true); |
| } |
| |
| TMTrace (2, ("CTmXaTxn::schedule_abort, EXIT, ID %d, error %d, state %d, %s.\n", |
| seqnum(), lv_error, tx_state(), ((iv_mark_for_rollback)?"rollback deferred":"rolled back"))); |
| |
| return lv_error; |
| } //schedule_abort |
| |
| |
| // ------------------------------------------------------------- |
| // schedule_eventQ |
| // ------------------------------------------------------------- |
| void CTmXaTxn::schedule_eventQ() |
| { |
| short lv_request_type = TM_MSG_TYPE_NULL; |
| |
| TMTrace (2, ("CTmXaTxn::schedule_eventQ ENTER ID %d, state %d.\n", |
| iv_tag, iv_tx_state)); |
| |
| switch (iv_tx_state) |
| { |
| case TM_TX_STATE_BEGINNING: |
| { |
| lv_request_type = TM_MSG_TXINTERNAL_BEGINCOMPLETE; |
| break; |
| } |
| case TM_TX_STATE_COMMITTED: |
| { |
| lv_request_type = TM_MSG_TXINTERNAL_ENDCOMPLETE; |
| break; |
| } |
| case TM_TX_STATE_ABORTED: |
| { |
| lv_request_type = TM_MSG_TXINTERNAL_ABORTCOMPLETE; |
| break; |
| } |
| case TM_TX_STATE_FORGOTTEN: |
| { |
| lv_request_type = TM_MSG_TXINTERNAL_ENDFORGET; |
| break; |
| } |
| default: |
| { |
| tm_log_event(TM_TMTX_INVALID_STATE, SQ_LOG_CRIT, "TM_TMTX_INVALID_STATE", |
| -1,-1,-1,seqnum()); |
| TMTrace (1, ("CTmXaTxn::schedule_eventQ - tx state %d: " |
| "Invalid transaction state. Transaction seq num %d\n",iv_tx_state, seqnum())); |
| abort(); |
| } |
| }; |
| |
| // We use the saved request here to drive the event. |
| // This is because this is a continuance of the existing |
| // request and we want this to be used for any reply as |
| // it contains the msgid of the orignal request (eg |
| // ENDTRANSACTION request). |
| if (!ip_currRequest) |
| { |
| tm_log_event(DTM_TMTX_INVALID_POINTER, SQ_LOG_CRIT, "DTM_TMTX_INVALID_POINTER", |
| -1,-1,-1,seqnum()); |
| TMTrace (1, ("CTmXaTxn::schedule_eventQ - PROGRAMMING BUG!: " |
| " ip_currRequest was null when we received a sync " |
| "completion for transaction %d.\n", seqnum())); |
| abort(); |
| } |
| else |
| { |
| ip_currRequest->requestType(lv_request_type); |
| eventQ_push(ip_currRequest); |
| } |
| |
| TMTrace (2, ("CTmXaTxn::schedule_eventQ EXIT ID %d, request_type %d.\n", |
| iv_tag, lv_request_type)); |
| } |
| |
| |
| // -------------------------------------------------------------- |
| // process_eventQ |
| // Purpose - process all events on the event queue for this |
| // transaction. |
| // For multithreaded TMs this is the main processing loop for |
| // the transaction. |
| // -------------------------------------------------------------- |
| void CTmXaTxn::process_eventQ() |
| { |
| bool lv_exit = false; |
| CTmEvent *lp_event = NULL; |
| |
| TMTrace (2, ("CTmXaTxn::process_eventQ : ENTRY ID %d, thread %s.\n", |
| iv_tag, ((gv_tm_info.multithreaded())?ip_Thread->get_name():"SINGLE THREAD"))); |
| |
| // Pop/wait for a transaction event |
| CTmTxMessage *lp_msg = eventQ_pop(); |
| lp_msg->validate(); |
| |
| while (!lv_exit && lp_msg->request()) |
| { |
| switch (lp_msg->requestType()) |
| { |
| case TM_DP2_SQ_XA_START: |
| req_xa_notImplemented(lp_msg); |
| break; |
| case TM_DP2_SQ_XA_END: |
| lv_exit = req_xa_end(lp_msg); |
| break; |
| case TM_DP2_SQ_XA_COMMIT: |
| lv_exit = req_xa_commit(lp_msg); |
| break; |
| case TM_DP2_SQ_XA_PREPARE: |
| lv_exit = req_xa_prepare(lp_msg); |
| break; |
| case TM_DP2_SQ_XA_ROLLBACK: |
| lv_exit = req_xa_rollback(lp_msg); |
| break; |
| case TM_DP2_SQ_XA_OPEN: |
| req_xa_notImplemented(lp_msg); |
| break; |
| case TM_DP2_SQ_XA_CLOSE: |
| req_xa_notImplemented(lp_msg); |
| break; |
| case TM_DP2_SQ_XA_RECOVER: |
| req_xa_notImplemented(lp_msg); |
| break; |
| case TM_DP2_SQ_XA_FORGET: |
| lv_exit = req_xa_forget(lp_msg); |
| break; |
| case TM_DP2_SQ_XA_COMPLETE: |
| req_xa_notImplemented(lp_msg); |
| break; |
| case TM_DP2_SQ_AX_REG: |
| req_xa_notImplemented(lp_msg); |
| break; |
| case TM_DP2_SQ_AX_UNREG: |
| req_xa_notImplemented(lp_msg); |
| break; |
| |
| case TM_MSG_TYPE_BEGINTRANSACTION: |
| TMTrace (1, ("CTmXaTxn::process_eventQ : ERROR - BEGINTRANSACTION for XARM subordinate branch.\n")); |
| req_xa_notImplemented(lp_msg); |
| break; |
| case TM_MSG_TYPE_ENDTRANSACTION: |
| lv_exit = req_end(lp_msg); |
| break; |
| case TM_MSG_TYPE_ABORTTRANSACTION: |
| case TM_MSG_TYPE_TSE_DOOMTX: |
| case TM_MSG_TYPE_DOOMTX: |
| lv_exit = req_abort(lp_msg); |
| break; |
| case TM_MSG_TYPE_STATUSTRANSACTION: |
| // These aren't queued, so we should never hit this |
| lv_exit = req_status(lp_msg); |
| break; |
| case TM_MSG_TYPE_AX_REG: |
| lv_exit = req_ax_reg(lp_msg); |
| break; |
| case TM_MSG_TYPE_JOINTRANSACTION: |
| // These aren't queued, so we should never hit this |
| lv_exit = req_join(lp_msg); |
| break; |
| case TM_MSG_TYPE_SUSPENDTRANSACTION: |
| // These aren't queued, so we should never hit this |
| lv_exit = req_suspend(lp_msg); |
| break; |
| case TM_MSG_TXINTERNAL_ROLLBACK: |
| { |
| lv_exit = rollback_txn(lp_msg); |
| if ((gv_tm_info.mode() == TM_NONSYNC_MODE) && (!lv_exit)) |
| { |
| if (transactionBusy()) |
| req_abort_complete(lp_msg); |
| lv_exit = req_forget(lp_msg); |
| } |
| break; |
| } |
| case TM_MSG_TXINTERNAL_REDRIVEROLLBACK: |
| lv_exit = redriverollback_txn(lp_msg); |
| break; |
| case TM_MSG_TXINTERNAL_REDRIVECOMMIT: |
| lv_exit = redrivecommit_txn(lp_msg); |
| break; |
| case TM_MSG_TXINTERNAL_ABORTCOMPLETE: |
| lv_exit = req_abort_complete(lp_msg); |
| break; |
| case TM_MSG_TXINTERNAL_ENDCOMPLETE: |
| lv_exit = req_end_complete(lp_msg); |
| break; |
| case TM_MSG_TXINTERNAL_ENDFORGET: |
| lv_exit = req_forget(lp_msg); |
| break; |
| case TM_MSG_TXINTERNAL_BEGINCOMPLETE: |
| req_xa_notImplemented(lp_msg); |
| break; |
| case TM_MSG_TXTHREAD_TERMINATE: |
| // Need to re-queue the Terminate to the thread event queue |
| lp_event = new CTmEvent(lp_msg); |
| ip_Thread->eventQ_push(lp_event); |
| // Intentional fall through. |
| case TM_MSG_TXTHREAD_RELEASE: |
| lv_exit = true; |
| break; |
| case TM_MSG_TXINTERNAL_REDRIVESYNC: |
| default: |
| // EMS message here, DTM_INVALID_MESSAGE_TYPE |
| tm_log_event(DTM_INVALID_MESSAGE_TYPE, SQ_LOG_CRIT, "DTM_INVALID_MESSAGE_TYPE", |
| -1, /*error_code*/ |
| -1, /*rmid*/ |
| gv_tm_info.nid(), /*dtmid*/ |
| -1, /*seq_num*/ |
| -1, /*msgid*/ |
| -1, /*xa_error*/ |
| -1, /*pool_size*/ |
| -1, /*pool_elems*/ |
| -1, /*msg_retries*/ |
| -1, /*pool_high*/ |
| -1, /*pool_low*/ |
| -1, /*pool_max*/ |
| -1, /*tx_state*/ |
| -1, /*data */ |
| -1, /*data1*/ |
| -1,/*data2 */ |
| NULL,/*string1*/ |
| -1, /*node*/ |
| -1, /*msgid2*/ |
| -1, /*offset*/ |
| lp_msg->requestType()); |
| |
| //tm_log_event(DTM_INVALID_MESSAGE_TYPE, SQ_LOG_CRIT , "DTM_INVALID_MESSAGE_TYPE"); |
| TMTrace (1, ("CTmXaTxn::process_eventQ - DTM%d txThread received UNKNOWN message type : %d\n", |
| gv_tm_info.nid(), lp_msg->requestType())); |
| abort (); // bogus type |
| } // switch |
| |
| lock(); |
| delete lp_msg; |
| unlock(); |
| // Multithreaded only: |
| // Worker threads release the transaction after every request |
| // unless there are more requests to process for this transaction. |
| if (gv_tm_info.threadModel() == worker && iv_eventQ.empty()) |
| { |
| lv_exit = true; |
| /* Not currently using the release event |
| CTmEvent *lp_event = new CTmEvent(TM_MSG_TXTHREAD_RELEASE, ip_Thread); |
| ip_Thread->eventQ_push(lp_event); */ |
| } |
| if (!lv_exit) |
| { |
| // We treat thread events with higher priority, so first |
| // check to see if there are any. If not, then check (wait) for |
| // a transaction event. |
| if (ip_Thread->eventQ()->empty()) |
| { |
| TMTrace (3, ("CTmXaTxn::process_eventQ : Thread %s thread event queue empty, " |
| "checking transaction Q.\n", ip_Thread->get_name())); |
| |
| lp_msg = eventQ_pop(); |
| } |
| else |
| { |
| TMTrace (3, ("CTmXaTxn::process_eventQ : Thread %s thread event detected, exiting proc.\n", |
| ip_Thread->get_name())); |
| // Drop out of transaction processing loop and |
| // handle the thread event |
| lv_exit = true; |
| } |
| } |
| } //while more events to process |
| |
| TMTrace (2, ("CTmXaTxn::process_eventQ : EXIT teminateThread=%d.\n", |
| lv_exit)); |
| } //process_eventQ |
| |
| |
| // -------------------------------------------------------------- |
| // DTM Event Specific Methods: |
| // -------------------------------------------------------------- |
| // -------------------------------------------------------------- |
| // Purpose - Txn Thread specific processing for BEGINTRANSACTION |
| // NOT SUPPORTED FOR XARM TRANSACTIONS. |
| // -------------------------------------------------------------- |
| bool CTmXaTxn::req_begin(CTmTxMessage * pp_msg) |
| { |
| //short lv_error = FEOK; |
| bool lv_terminate = false; |
| |
| TMTrace (2, ("CTmXaTxn::req_begin : Not implemented!\n")); |
| abort(); |
| |
| return lv_terminate; |
| } //req_end |
| |
| |
| // -------------------------------------------------------------- |
| // Purpose - Txn Thread specific processing for ENDTRANSACTION. |
| // -------------------------------------------------------------- |
| bool CTmXaTxn::req_end(CTmTxMessage * pp_msg) |
| { |
| //short lv_error = FEOK; |
| bool lv_terminate = false; |
| |
| TMTrace (2, ("CTmXaTxn::req_end : Not implemented!\n")); |
| abort(); |
| |
| return lv_terminate; |
| } //req_end |
| |
| |
| // -------------------------------------------------------------- |
| // req_abort |
| // Purpose - Txn Thread specific processing for ABORTTRANSACTION |
| // and for TSE_DOOMTX. |
| // -------------------------------------------------------------- |
| bool CTmXaTxn::req_abort(CTmTxMessage * pp_msg) |
| { |
| bool lv_terminateThread = false; |
| //int32 lv_error = FEOK; |
| int32 lv_nid = pp_msg->request()->u.iv_abort_trans.iv_nid, |
| lv_pid = pp_msg->request()->u.iv_abort_trans.iv_pid; |
| //int32 lv_rmid = gv_RMs.TSE()->return_rmid(lv_nid, lv_pid); |
| |
| TMTrace (2, ("CTmXaTxn::req_abort : ENTRY Txn ID (%d,%d), originating process (nid, pid): " |
| "(%d, %d).\n", node(), seqnum(), lv_nid, lv_pid)); |
| |
| |
| |
| TMTrace (2, ("CTmXaTxn::req_abort : ID %d, Not Implemented!\n", seqnum())); |
| abort(); |
| return lv_terminateThread; |
| } //req_abort |
| |
| |
| // -------------------------------------------------------------- |
| // req_abort_complete |
| // Purpose - Txn Thread specific processing for ABORTTRANSACTION. |
| // phase 2 (the real work). |
| // -------------------------------------------------------------- |
| bool CTmXaTxn::req_abort_complete(CTmTxMessage * pp_msg) |
| { |
| TMTrace (2, ("CTmXaTxn::req_abort_complete : Txn ID (%d,%d), ENTRY\n", node(), seqnum())); |
| pp_msg->validate(); |
| |
| |
| if (tx_state() != TM_TX_STATE_FORGETTING && |
| tx_state() != TM_TX_STATE_FORGOTTEN) |
| state_change(TX_ROLLBACK, pp_msg); |
| else |
| TMTrace (3, ("CTmXaTxn:req_abort_complete : tx state is FORGETTING or " |
| "FORGOTTEN so nothing to do here, exiting.\n")); |
| |
| if (tx_state() != TM_TX_STATE_HUNGABORTED) |
| stats()->txnAbort()->stop(); |
| |
| pp_msg->validate(); |
| |
| // If we haven't replied yet, do so now - before we forget the outcome! |
| if (pp_msg->replyPending()) |
| pp_msg->reply(); // reply with whatever error has been set |
| |
| TMTrace (2, ("CTmXaTxn::req_abort_complete : ID %d, EXIT\n", seqnum())); |
| |
| return false; //Don't terminate transaction, still need to do forget |
| |
| } |
| |
| |
| // -------------------------------------------------------------- |
| // req_end_complete |
| // Purpose - Txn Thread specific processing for ENDTRANSACTION. |
| // phase 2 (the real work). |
| // -------------------------------------------------------------- |
| bool CTmXaTxn::req_end_complete(CTmTxMessage * pp_msg) |
| { |
| TMTrace (2, ("CTmXaTxn::req_end_complete : ID %d, ENTRY\n", seqnum())); |
| |
| if (isAborting()) |
| { |
| pp_msg->responseError(FEABORTEDTRANSID); |
| state_change(TX_ROLLBACK, pp_msg); |
| } |
| else |
| state_change(TX_COMMIT, pp_msg); |
| |
| if (tx_state() != TM_TX_STATE_HUNGCOMMITTED) |
| stats()->txnCommit()->stop(); |
| |
| TMTrace (2, ("CTmXaTxn::req_end_complete : ID %d, EXIT\n", seqnum())); |
| |
| return false; //Don't terminate transaction, still need to do forget |
| } |
| |
| |
| // -------------------------------------------------------------- |
| // rollback_txn |
| // Purpose - Txn Thread specific processing for abort processing. |
| // -------------------------------------------------------------- |
| bool CTmXaTxn::rollback_txn(CTmTxMessage * pp_msg) |
| { |
| |
| TMTrace (2, ("CTmXaTxn::rollback_txn : ID %d, ENTRY\n", |
| seqnum())); |
| pp_msg->validate(); |
| |
| if ((tx_state() != TM_TX_STATE_BEGINNING) && |
| (tx_state() != TM_TX_STATE_ACTIVE) && |
| (tx_state() != TM_TX_STATE_ABORTING) && |
| (tx_state() != TM_TX_STATE_ABORTING_PART2) && |
| (tx_state() != TM_TX_STATE_ABORTED) && |
| (tx_state() != TM_TX_STATE_HUNGABORTED) && |
| (tx_state() != TM_TX_STATE_FORGETTING) && |
| (tx_state() != TM_TX_STATE_FORGOTTEN)) |
| { |
| // EMS Message DTM_ABORT_FAILED |
| tm_log_event(DTM_ABORT_FAILED, SQ_LOG_WARNING, "DTM_ABORT_FAILED", |
| -1, /*error_code*/ |
| -1, /*rmid*/ |
| gv_tm_info.nid(), /*dtmid*/ |
| seqnum(), /*seq_num*/ |
| -1, /*msgid*/ |
| -1, /*xa_error*/ |
| -1, /*pool_size*/ |
| -1, /*pool_elems*/ |
| -1, /*msg_retries*/ |
| -1, /*pool_high*/ |
| -1, /*pool_low*/ |
| -1, /*pool_max*/ |
| tx_state()); /*tx_state*/ |
| |
| TMTrace (1, ("CTmXaTxn::rollback_txn : DTM%d txThread could not abort " |
| " transaction %d, state %d incorrect.\n", |
| gv_tm_info.nid(), seqnum(), (short) tx_state())); |
| |
| // If this rollback was caused by a call to ABORTTRANSACTION, reply now |
| if (pp_msg->replyPending()) |
| { |
| if (isAborting()) |
| pp_msg->reply(FEABORTEDTRANSID); |
| else |
| pp_msg->reply(FEENDEDTRANSID); |
| } |
| |
| bool lv_txnBusy = reset_transactionBusy(); |
| if (!lv_txnBusy) |
| { |
| inc_cleanup(); |
| return false; // Allow thread to process outstanding requests |
| } |
| else |
| return true; // Terminate txn thread |
| } |
| |
| // we don't care about joiners here. The TX has to go, so we don't even check |
| |
| state_change(TX_ROLLBACK, pp_msg); |
| |
| TMTrace (2, ("CTmXaTxn::rollback_txn, EXIT\n")); |
| |
| return false; //Don't terminate txn thread, still need to process the |
| //TM_MSG_TXINTERNAL_ABORTCOMPLETE. |
| } //rollback_txn |
| |
| |
| // -------------------------------------------------------------- |
| // redriverollback_txn |
| // Purpose - Txn Thread specific processing to redrive a rollback. |
| // -------------------------------------------------------------- |
| bool CTmXaTxn::redriverollback_txn(CTmTxMessage * pp_msg) |
| { |
| bool lv_terminate = false; |
| |
| TMTrace (2, ("CTmXaTxn::redriverollback_txn : ID %d, ENTRY\n", seqnum())); |
| |
| if (iv_tx_state == TM_TX_STATE_HUNGABORTED) |
| gv_tm_info.dec_tx_hung_count(); |
| |
| // If we have a hung event timer then remove it now. |
| if (ip_hung_event) |
| { |
| gv_xaTM.tmTimer()->cancelEvent(ip_hung_event); |
| ip_hung_event = NULL; |
| } |
| |
| // We've already driven phase 1, redrive phase 2. |
| state_change(TX_REDRIVE_ROLLBACK, pp_msg); |
| |
| |
| if (tx_state() != TM_TX_STATE_HUNGABORTED) |
| stats()->txnAbort()->stop(); |
| |
| if (gv_tm_info.mode() == TM_NONSYNC_MODE) |
| lv_terminate = req_forget(pp_msg); |
| |
| TMTrace (2, ("CTmXaTxn::redriverollback_txn, EXIT returning %s\n", (lv_terminate)?"TRUE":"FALSE")); |
| return lv_terminate; |
| } //redriverollback_txn |
| |
| |
| // -------------------------------------------------------------- |
| // redrivecommit_txn |
| // Purpose - Txn Thread specific processing to redrive a commit. |
| // -------------------------------------------------------------- |
| bool CTmXaTxn::redrivecommit_txn(CTmTxMessage * pp_msg) |
| { |
| bool lv_terminate = false; |
| |
| TMTrace (2, ("CTmXaTxn::redrivecommit_txn : ID %d, ENTRY, current txn state %d\n", |
| seqnum(), tx_state())); |
| |
| if (iv_tx_state == TM_TX_STATE_HUNGCOMMITTED) |
| gv_tm_info.dec_tx_hung_count(); |
| |
| // If we have a hung event timer then remove it now. |
| if (ip_hung_event) |
| { |
| gv_xaTM.tmTimer()->cancelEvent(ip_hung_event); |
| ip_hung_event = NULL; |
| } |
| |
| // We've already driven phase 1, redrive phase 2. |
| state_change(TX_REDRIVE_COMMIT, pp_msg); |
| |
| if (tx_state() != TM_TX_STATE_HUNGCOMMITTED) |
| stats()->txnCommit()->stop(); |
| |
| if (gv_tm_info.mode() == TM_NONSYNC_MODE) |
| lv_terminate = req_forget(pp_msg); |
| |
| TMTrace (2, ("CTmXaTxn::redrivecommit_txn, EXIT returning %s\n", (lv_terminate)?"TRUE":"FALSE")); |
| return lv_terminate; |
| } //redrivecommit_txn |
| |
| |
| // -------------------------------------------------------------- |
| // req_forget |
| // Purpose - Txn Thread specific processing for ENDTRANSACTION, |
| // ABORTTRANSACTION and DOOMTRANSACTION. |
| // Forget the transaction. |
| // -------------------------------------------------------------- |
| bool CTmXaTxn::req_forget(CTmTxMessage * pp_msg) |
| { |
| bool lv_terminate = true; // Normal processing is for forget to |
| // drive thread disassociation for |
| // transaction threads. |
| TMTrace (2, ("CTmXaTxn::req_forget : ID %d, ENTRY\n", seqnum())); |
| |
| // If the transaction is in a hung state, we don't want to forget it. |
| // We will wait for the commit/rollback to be retried, or for operator |
| // intervention. |
| if (tx_state() == TM_TX_STATE_HUNGCOMMITTED || |
| tx_state() == TM_TX_STATE_HUNGABORTED) |
| { |
| TMTrace (1, ("CTmXaTxn::req_forget : Ignoring transaction ID %d in hungAborted state.\n", |
| seqnum())); |
| lv_terminate = false; // Don't cleanup transaction object yet |
| } |
| else |
| { |
| if (tx_state() == TM_TX_STATE_FORGETTING_HEUR || |
| tx_state() == TM_TX_STATE_FORGOTTEN_HEUR) |
| { |
| TMTrace (1, ("CTmXaTxn::req_forget : Transaction ID %d in heuristicForgetting state, sending xa_forget.\n", |
| seqnum())); |
| state_change(TX_FORGET, pp_msg); |
| } |
| |
| |
| gv_tm_info.remove_tx_from_oldest_list(this); |
| // Gather up statistics |
| stats()->txnTotal()->stop(); |
| gv_tm_info.stats()->addTxnCounters(stats()); |
| } |
| |
| // We want to reset the transactionBusy flag and cleanup the pendingQ |
| // even if this is a hung transaction. |
| bool lv_txnBusy = reset_transactionBusy(true /*cleanup pendingQ*/); |
| |
| // Cleanup now if we are terminating the transaction and we've finished |
| // processing transaction requests. |
| if (lv_terminate && !lv_txnBusy) |
| { |
| inc_cleanup(); |
| } |
| else |
| TMTrace (3, ("CTmXaTxn::req_forget : ID %d, terminate=%d, txnBusy=%d, " |
| "pending requests, not cleaning up transaction.\n", |
| seqnum(), lv_terminate, lv_txnBusy)); |
| |
| |
| TMTrace (2, ("CTmXaTxn::req_forget : ID %d, txnBusy %d, terminate %d EXIT\n", |
| seqnum(), lv_txnBusy, lv_terminate)); |
| return lv_terminate; //Finished with the transaction, disassociate and cleanup |
| } // req_forget |
| |
| |
| // -------------------------------------------------------------- |
| // doom_txn |
| // This executes under the main thread, not under the transaction |
| // thread. If the transaction has already reached Phase 2, an |
| // error is returned. Otherwise a the transaction is marked as |
| // rollback required (iv_mark_for_rollback = true). This will |
| // cause a rollback to occur once the ABORTTRANSACTION or |
| // ENDTRANSACTION arrives for the transaction. |
| // Return codes: |
| // FEOK Successful. |
| // FEENDEDTRANSID Transaction is already in Phase 2. |
| // -------------------------------------------------------------- |
| short CTmXaTxn::doom_txn() |
| { |
| short lv_error = FEOK; |
| |
| TMTrace (2, ("CTmXaTxn::doom_txn ENTRY, ID %d, state %d.\n", |
| seqnum(), iv_tx_state)); |
| |
| if ((tx_state() != TM_TX_STATE_NOTX) && // beginning |
| (tx_state() != TM_TX_STATE_BEGINNING) && // beginning part 2 |
| (tx_state() != TM_TX_STATE_ACTIVE) && // active |
| (tx_state() != TM_TX_STATE_PREPARING)) // about to commit |
| lv_error = FENOTRANSID; |
| else |
| { |
| iv_mark_for_rollback = true; |
| iv_appl_aborted = true; |
| } |
| |
| TMTrace (2, ("CTmXaTxn::doom_txn, EXIT, ID %d, error %d, state %d, %s.\n", |
| seqnum(), lv_error, tx_state(), ((iv_mark_for_rollback)?"rollback deferred":"rolled back"))); |
| |
| return lv_error; |
| } //doom_txn |
| |
| |
| // --------------------------------------------------------------------------- |
| // CTmXaTxn::req_xa_join |
| // Purpose: Support the TMJOIN flag in xa_start |
| // --------------------------------------------------------------------------- |
| bool CTmXaTxn::req_xa_join(CTmTxMessage *pp_msg) |
| { |
| bool lv_terminateThread = false; |
| int32 lv_rmid = pp_msg->request()->u.iv_start.iv_rmid; |
| XID *lp_xid = &pp_msg->request()->u.iv_start.iv_xid; |
| int64 lv_flags = pp_msg->request()->u.iv_start.iv_flags; |
| |
| TMTrace(2, ("CTmXaTxn::req_xa_join ENTRY, XA Txn ID (%d,%d), state %d, rmid %d, XID %s, flags " PFLL ".\n", |
| node(), seqnum(), iv_tx_state, lv_rmid, XIDtoa(lp_xid), lv_flags)); |
| |
| // Call join in-line in main thread |
| req_join(pp_msg); |
| |
| TMTrace(2, ("CTmXaTxn::req_xa_join EXIT.\n")); |
| return lv_terminateThread; |
| } // req_xa_join |
| |
| |
| // --------------------------------------------------------------------------- |
| // CTmXaTxn::req_xa_resume |
| // Purpose: Support the TMRESUME flag in xa_start |
| // --------------------------------------------------------------------------- |
| bool CTmXaTxn::req_xa_resume(CTmTxMessage *pp_msg) |
| { |
| bool lv_terminateThread = false; |
| int32 lv_rmid = pp_msg->request()->u.iv_start.iv_rmid; |
| XID *lp_xid = &pp_msg->request()->u.iv_start.iv_xid; |
| int64 lv_flags = pp_msg->request()->u.iv_start.iv_flags; |
| |
| TMTrace(2, ("CTmXaTxn::req_xa_resume ENTRY, XA Txn ID (%d,%d), rmid %d, XID %s, flags " PFLL ",state %d.\n", |
| node(), seqnum(), lv_rmid, XIDtoa(lp_xid), lv_flags, iv_tx_state)); |
| |
| // Call join in-line in main thread |
| req_join(pp_msg); //TODO Treated as identical to join for prototype |
| |
| TMTrace(2, ("CTmXaTxn::req_xa_resume EXIT.\n")); |
| return lv_terminateThread; |
| } // req_xa_resume |
| |
| |
| // --------------------------------------------------------------------------- |
| // CTmXaTxn::req_xa_end |
| // Purpose: Support xa_end |
| // --------------------------------------------------------------------------- |
| bool CTmXaTxn::req_xa_end(CTmTxMessage *pp_msg) |
| { |
| bool lv_terminateThread = false; |
| int32 lv_rmid = pp_msg->request()->u.iv_end.iv_rmid; |
| XID *lp_xid = &pp_msg->request()->u.iv_end.iv_xid; |
| int64 lv_flags = pp_msg->request()->u.iv_end.iv_flags; |
| |
| TMTrace(2, ("CTmXaTxn::req_xa_end ENTRY, XA Txn ID (%d,%d), state %d, rmid %d, XID %s, flags " PFLL ".\n", |
| node(), seqnum(), iv_tx_state, lv_rmid, XIDtoa(lp_xid), lv_flags)); |
| |
| switch (lv_flags) |
| { |
| case TMMIGRATE: |
| req_xa_notImplemented(pp_msg); |
| break; |
| case TMSUSPEND: |
| req_suspend(pp_msg); |
| break; |
| case TMNOFLAGS: |
| case TMSUCCESS: |
| pp_msg->reply(state_change(TX_END, pp_msg)); |
| break; |
| case TMFAIL: |
| mark_for_rollback(); |
| pp_msg->reply(state_change(TX_END, pp_msg)); |
| break; |
| case TMASYNC: |
| req_xa_notImplemented(pp_msg); |
| break; |
| default: |
| TMTrace(2, ("CTmXaTxn::req_xa_end flag unknown.\n")); |
| req_xa_notImplemented(pp_msg); |
| } |
| |
| TMTrace(2, ("CTmXaTxn::req_xa_end EXIT.\n")); |
| return lv_terminateThread; |
| } // req_xa_end |
| |
| |
| // --------------------------------------------------------------------------- |
| // CTmXaTxn::req_xa_prepare |
| // Purpose: Support xa_prepare |
| // --------------------------------------------------------------------------- |
| bool CTmXaTxn::req_xa_prepare(CTmTxMessage *pp_msg) |
| { |
| bool lv_terminateThread = false; |
| int32 lv_rmid = pp_msg->request()->u.iv_prepare.iv_rmid; |
| XID *lp_xid = &pp_msg->request()->u.iv_prepare.iv_xid; |
| int64 lv_flags = pp_msg->request()->u.iv_prepare.iv_flags; |
| |
| TMTrace(2, ("CTmXaTxn::req_xa_prepare ENTRY, Txn ID (%d,%d), state %d, rmid %d, XID %s, flags " PFLL ".\n", |
| node(), seqnum(), iv_tx_state, lv_rmid, XIDtoa(lp_xid), lv_flags)); |
| |
| state_change(TX_PREPARE, pp_msg); |
| pp_msg->reply(XA_OK); |
| lv_terminateThread = releaseTxnObj(false); |
| |
| TMTrace(2, ("CTmXaTxn::req_xa_prepare EXIT.\n")); |
| return lv_terminateThread; |
| } // req_xa_prepare |
| |
| |
| // --------------------------------------------------------------------------- |
| // CTmXaTxn::req_xa_commit |
| // Purpose: Support xa_commit |
| // --------------------------------------------------------------------------- |
| bool CTmXaTxn::req_xa_commit(CTmTxMessage *pp_msg) |
| { |
| bool lv_terminateThread = false; |
| int32 lv_rmid = pp_msg->request()->u.iv_commit.iv_rmid; |
| XID *lp_xid = &pp_msg->request()->u.iv_commit.iv_xid; |
| int64 lv_flags = pp_msg->request()->u.iv_commit.iv_flags; |
| |
| TMTrace(2, ("CTmXaTxn::req_xa_commit ENTRY, Txn ID (%d,%d), state %d, rmid %d, XID %s, flags " PFLL ".\n", |
| node(), seqnum(), iv_tx_state, lv_rmid, XIDtoa(lp_xid), lv_flags)); |
| |
| state_change(TX_COMMIT, pp_msg); |
| pp_msg->reply(XA_OK); |
| |
| switch (tx_state()) |
| { |
| case TM_TX_STATE_FORGETTING: |
| case TM_TX_STATE_COMMITTED: |
| state_change(TX_FORGET, pp_msg); |
| gp_xarmXIDList->remove(xid()); |
| break; |
| case TM_TX_STATE_FORGETTING_HEUR: //Need an xa_forget to cleanup |
| default: |
| ; |
| } |
| |
| lv_terminateThread = releaseTxnObj(true); |
| |
| TMTrace(2, ("CTmXaTxn::req_xa_commit EXIT.\n")); |
| return lv_terminateThread; |
| } // req_xa_commit |
| |
| |
| // --------------------------------------------------------------------------- |
| // CTmXaTxn::req_xa_rollback |
| // Purpose: Support xa_rollback |
| // --------------------------------------------------------------------------- |
| bool CTmXaTxn::req_xa_rollback(CTmTxMessage *pp_msg) |
| { |
| bool lv_terminateThread = true; |
| int32 lv_rmid = pp_msg->request()->u.iv_rollback.iv_rmid; |
| XID *lp_xid = &pp_msg->request()->u.iv_rollback.iv_xid; |
| int64 lv_flags = pp_msg->request()->u.iv_rollback.iv_flags; |
| |
| TMTrace(2, ("CTmXaTxn::req_xa_rollback ENTRY, Txn ID (%d,%d), state %d, rmid %d, XID %s, flags " PFLL ".\n", |
| node(), seqnum(), iv_tx_state, lv_rmid, XIDtoa(lp_xid), lv_flags)); |
| |
| state_change(TX_ROLLBACK, pp_msg); |
| pp_msg->reply(XA_OK); |
| |
| switch (tx_state()) |
| { |
| case TM_TX_STATE_ABORTED: |
| state_change(TX_FORGET, pp_msg); |
| gp_xarmXIDList->remove(xid()); |
| break; |
| case TM_TX_STATE_FORGETTING_HEUR: //Need an xa_forget to cleanup |
| default: |
| ; |
| } |
| |
| lv_terminateThread = releaseTxnObj(true); |
| |
| TMTrace(2, ("CTmXaTxn::req_xa_rollback EXIT.\n")); |
| return lv_terminateThread; |
| } // req_xa_rollback |
| |
| |
| // --------------------------------------------------------------------------- |
| // CTmXaTxn::req_xa_forget |
| // Purpose: Support xa_forget |
| // --------------------------------------------------------------------------- |
| bool CTmXaTxn::req_xa_forget(CTmTxMessage *pp_msg) |
| { |
| bool lv_terminateThread = false; |
| int32 lv_rmid = pp_msg->request()->u.iv_forget.iv_rmid; |
| XID *lp_xid = &pp_msg->request()->u.iv_forget.iv_xid; |
| int64 lv_flags = pp_msg->request()->u.iv_forget.iv_flags; |
| |
| TMTrace(2, ("CTmXaTxn::req_xa_forget ENTRY, Txn ID (%d,%d), state %d, rmid %d, XID %s, flags " PFLL ".\n", |
| node(), seqnum(), iv_tx_state, lv_rmid, XIDtoa(lp_xid), lv_flags)); |
| |
| state_change(TX_FORGET, pp_msg); |
| pp_msg->reply(XA_OK); |
| |
| switch (tx_state()) |
| { |
| case TM_TX_STATE_FORGOTTEN_HEUR: |
| gp_xarmXIDList->remove(xid()); |
| break; |
| default: |
| ; |
| } |
| |
| lv_terminateThread = releaseTxnObj(true); |
| |
| TMTrace(2, ("CTmXaTxn::req_xa_forget EXIT.\n")); |
| return lv_terminateThread; |
| } // req_xa_forget |
| |
| |
| // -------------------------------------------------------------- |
| // req_xa_start |
| // Purpose - Txn specific processing for xa_start. |
| // xa_start with no flags is like a BEGINTRANSACTION |
| // -------------------------------------------------------------- |
| bool CTmXaTxn::req_xa_start(CTmTxMessage * pp_msg) |
| { |
| rmid(pp_msg->request()->u.iv_start.iv_rmid); |
| xid(&pp_msg->request()->u.iv_start.iv_xid); |
| |
| TMTrace (2, ("CTmXaTxn::req_xa_start ENTRY: XA Txn ID (%d,%d), tx state %d, rmid " PFLL ", XID %s\n", |
| node(), seqnum(), tx_state(), rmid(), XIDtoa(xid()))); |
| add_app_partic(pp_msg->request()->u.iv_start.iv_nid, pp_msg->request()->u.iv_start.iv_pid); |
| state_change(TX_BEGIN, pp_msg); |
| |
| pp_msg->reply(XA_OK); |
| |
| return false; |
| } //req_xa_start |
| |
| |
| // --------------------------------------------------------------------------- |
| // CTmXaTxn::req_xa_notImplemented |
| // Purpose: Methods not implemented. |
| // --------------------------------------------------------------------------- |
| void CTmXaTxn::req_xa_notImplemented(CTmTxMessage *pp_msg) |
| { |
| TMTrace(2, ("Dialect %d, request type %d: This method is not implemented!!\n", |
| pp_msg->request()->iv_msg_hdr.dialect_type, pp_msg->requestType())); |
| abort(); |
| } // req_xa_notImplemented |
| |
| |
| // --------------------------------------------------------------------------- |
| // CTmXaTxn::mapErr |
| // Purpose: Maps TM error codes to XA ones. |
| // --------------------------------------------------------------------------- |
| int CTmXaTxn::mapErr(short pv_tmError) |
| { |
| int lv_xaError = XA_OK; |
| |
| switch (pv_tmError) |
| { |
| case FEOK: |
| lv_xaError = XA_OK; |
| break; |
| case FEENDEDTRANSID: |
| case FETXNOTSUSPENDED: |
| lv_xaError = XAER_RMERR; |
| break; |
| case FEINVTRANSID : |
| lv_xaError = XAER_DUPID; |
| break; |
| case FEABORTEDTRANSID: |
| lv_xaError = XAER_RMERR; |
| break; |
| default: |
| lv_xaError = XAER_RMFAIL; |
| } |
| return lv_xaError; |
| } // mapErr |
| |
| |
| //---------------------------------------------------------------------------- |
| // CTmXaTxn::releaseTxnObj |
| // Purpose: cleanup and release the transaction object. |
| //---------------------------------------------------------------------------- |
| bool CTmXaTxn::releaseTxnObj(bool pv_terminate) |
| { |
| bool lv_terminate = pv_terminate; |
| TMTrace (1, ("CTmXaTxn::releaseTxnObj ENTRY: XA Txn ID (%d,%d), txn state %d, terminate? %d.\n", |
| node(), seqnum(), tx_state(), lv_terminate)); |
| |
| // We want to reset the transactionBusy flag and cleanup the pendingQ |
| // even if this is a hung transaction. |
| bool lv_txnBusy = reset_transactionBusy(true /*cleanup pendingQ*/); |
| |
| // Cleanup now if we are terminating the transaction and we've finished |
| // processing transaction requests. |
| if (lv_terminate && !lv_txnBusy) |
| { |
| inc_cleanup(); |
| } |
| else |
| TMTrace (3, ("CTmXaTxn::releaseTxnObj : XA Txn ID (%d,%d), terminate=%d, txnBusy=%d, " |
| "pending requests, not cleaning up transaction.\n", |
| node(), seqnum(), lv_terminate, lv_txnBusy)); |
| |
| TMTrace (1, ("CTmXaTxn::releaseTxnObj EXIT: XA Txn ID (%d,%d), txn state %d, terminate? %d.\n", |
| node(), seqnum(), tx_state(), lv_terminate)); |
| return lv_terminate; |
| } //releaseTxnObj |
| |
| //---------------------------------------------------------------------------- |
| // CTmXaTxn::xid_eq |
| // Purpose: Compare an XID to the XID associated with |
| // this XA transaction. |
| //---------------------------------------------------------------------------- |
| bool CTmXaTxn::xid_eq(XID *pp_xid) |
| { |
| if (pp_xid->formatID == xid()->formatID && |
| pp_xid->bqual_length == xid()->bqual_length && |
| pp_xid->gtrid_length == xid()->gtrid_length && |
| !strcmp(pp_xid->data, xid()->data)) |
| return true; |
| else |
| return false; |
| } //xid_eq |