blob: 384bae22febad6f1c1bb65b2d7fc0674f7773b60 [file] [log] [blame]
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
#include <assert.h>
#include <stdlib.h>
#include <string.h>
// TM includes
#include "tminfo.h"
#include "tmlogging.h"
#include "tmregistry.h"
#include "tmtxthread.h"
#include "tmrecov.h"
#include "tmtxbase.h"
#include "hbasetm.h"
// Seabed includes
#include "seabed/pctl.h"
#include "seabed/pevents.h"
#include "seabed/trace.h"
#include <cstdlib>
#include <ctime>
#include <cstdio>
#include <iostream>
#include <sstream>
using std::stringstream;
using std::string;
// for use only in this file
typedef struct _tminfo_cpp_as_0
{
int32 iv_tag;
int32 iv_msgid;
int32 iv_nid;
} pid_msgid_struct;
// -------------------------------------------------------------
// TM_Info
// Purpose : Initialize the TM info.
// This is a single TM entity instance
// -------------------------------------------------------------
TM_Info::TM_Info()
:iv_stats(TM_STATS, TM_STATS_INTERVAL)
{
// Mutex attributes: Recursive = true, ErrorCheck=false
ip_mutex = new TM_Mutex(true, false);
iv_trace_level = 0;
iv_state = TM_STATE_INITIAL;
iv_lock_count = iv_cps_in_curr_file = 0;
iv_lock_owner = -1;
lock();
iv_all_rms_closed = true;
iv_counts.iv_tx_count = iv_counts.iv_abort_count =
iv_counts.iv_commit_count = iv_counts.iv_tm_initiated_aborts =
iv_counts.iv_tx_hung_count = iv_rm_wait_time = iv_stall_phase_2 = 0;
iv_run_mode = iv_incarnation_num = iv_lead_tm_nid = iv_sys_recov_state =
iv_sys_recov_lead_tm_nid = iv_stats_interval = iv_RMRetry_interval =
iv_TMRestartRetry_interval = iv_SeqNumBlockStart = iv_nextSeqNumBlockStart =
iv_counts.iv_current_tx_count = iv_counts.iv_current_tx_hung_count = 0;
iv_shutdown_coordination_started = iv_TSE_xa_start = false;
iv_threadModel = none;
iv_shutdown_level = MS_Mon_ShutdownLevel_Undefined;
iv_audit_seqno = 0;
iv_audit_mode = TM_BUFFER_AUDIT_MODE; // M10+ default.
iv_perf_stats = TM_PERF_STATS_OFF;
iv_num_active_txs = 0;
iv_timeout = TX_ABORT_TIMEOUT;
iv_allTMsOpen = false;
iv_lead_tm = iv_can_takeover = iv_leadTM_isolated = iv_lead_tm_takeover = false;
iv_nid = iv_pid = -1;
iv_cp_interval = -1;
iv_sync_otag = 0;
iv_sendingBroadcastSeqNum = 1;
iv_receivingBroadcastSeqNum = 0;
iv_write_cp = iv_initiate_cp = iv_trace_initialized = false;
iv_RMPartic = DEFAULT_RM_PARTIC;
iv_TSMode = DEFAULT_TS_MODE;
iv_TLOGperTM = DEFAULT_TLOG_PER_TM;
iv_SysRecovMode = TM_DEFAULT_SYSRECOVERY_MODE;
iv_maxRecoveringTxns = TM_DEFAULT_MAXRECOVERINGTXNS;
iv_AllRmParticRecov = DEFAULT_ALL_RM_PARTIC_RECOV;
iv_earlyCommitReply = TM_DEFAULT_EARLY_COMMIT_REPLY;
memset(iv_open_tms, 0, sizeof(Tm_phandle_info) * MAX_NODES);
restarting_tm(-1);
for (int lv_idx = 0; lv_idx < MAX_NODES; lv_idx++)
{
iv_recovery[lv_idx].iv_node_being_recovered = -1;
iv_recovery[lv_idx].iv_list_built = false;
iv_recovery[lv_idx].iv_down_without_sync = false;
iv_open_tms[lv_idx].iv_in_use = false;
iv_open_tms[lv_idx].iv_recov_state = TM_FAIL_RECOV_STATE_INITIAL;
NodeRecov(NULL, lv_idx);
}
iv_tms_highest_index_used = 0;
for (int i=0; i<MAX_NODES; i++)
iv_syncDataList[i].clear();
ClusterRecov(NULL);
memset(&iv_TMUP_Wait_reply, 0, sizeof(Tm_Rsp_Msg_Type));
// Initialize transactionPool
ip_transactionPool = new CTmPool<CTmTxBase>(tm_stats(), MAX_NUM_TRANS,
STEADYSTATE_LOW_TRANS, STEADYSTATE_HIGH_TRANS);
iv_globalUniqueSeqNum = false; // By default use local TM seq number generation
iv_SeqNumInterval = TM_DEFAULT_SEQ_NUM_INTERVAL;
iv_nextSeqNum = 1;
// Initialize threadPool
ip_threadPool = new CTmPool<CTxThread>(tm_stats(), MAX_NUM_THREADS,
STEADYSTATE_LOW_THREADS, STEADYSTATE_HIGH_THREADS);
iv_txThreadNum = 1;
ip_tmAuditObj = NULL;
ip_tmTimer = NULL;
iv_trans_hung_retry_interval = TRANS_HUNG_RETRY_INTERVAL;
iv_timerDefaultWaitTime = TIMERTHREAD_WAIT;
iv_overrideAuditInconsistencyDuringRecovery = OVERRIDE_AUDIT_INCONSISTENCY;
iv_broadcast_rollbacks = TM_DEFAULT_BROADCAST_ROLLBACKS;
iv_stats.clearCounters();
// Default no pause
gv_pause_state = -1;
gv_pause_state_type = TX_PAUSE_STATE_TYPE_FIXED;
ms_getenv_int("TM_TEST_PAUSE_STATE", &gv_pause_state);
ms_getenv_int("TM_TEST_PAUSE_STATE_TYPE", (int *) &gv_pause_state_type);
unlock();
}
// ------------------------------------
// ~TM_Info
// Purpose : not much to do here
// ------------------------------------
TM_Info::~TM_Info()
{
TMTrace (2, ("TM_Info::~TM_Info ENTRY: lock count = %d.\n", iv_lock_count));
char la_tm_name[8];
sprintf(la_tm_name, "$tm%d", iv_nid);
while (ip_mutex->lock_count() > 0)
SB_Thread::Sthr::sleep(10); // 1/100th of a second
//if (iv_lock_count > 0)
// unlock();
if (iv_state != TM_STATE_INITIAL)
terminate_all_threads();
delete ip_transactionPool;
delete ip_threadPool;
delete ip_mutex;
TMTrace (2, ("TM_Info::~TM_Info EXIT\n"));
//12/8/2010 Removed exit here as exit() is calling this destructor
//exit(0);
}
// ------------------------------------------------------------------
// initialize
// Purpose : initialize the global TM structure. Includes things like
// setting the state, starting adp, etc...
// -------------------------------------------------------------------
void TM_Info::initialize()
{
char la_tm_name[8];
char la_value[9];
int32 lv_error = 0;
int32 lv_trace_detail = 0;
bool lv_unique = false;
bool lv_success = false;
//initialize pool limits
int32 lv_max_num_trans = 0;
int32 lv_ss_low_trans = 0;
int32 lv_ss_high_trans = 0;
int32 lv_max_num_threads = 0;
int32 lv_ss_low_threads = 0;
int32 lv_ss_high_threads = 0;
// TM Stats
bool lv_tm_stats = ((TM_STATS==0)?false:true);
int32 lv_tm_stats_interval = TM_STATS_INTERVAL;
sprintf(la_tm_name, "$tm%d", iv_nid);
lock();
// intialize the tm info
iv_state = TM_STATE_DOWN;
// initialize system recovery state and recovering
// lead TM nid.
iv_sys_recov_state = TM_SYS_RECOV_STATE_INIT;
iv_sys_recov_lead_tm_nid = -1;
iv_shutdown_level = MS_Mon_ShutdownLevel_Undefined;
// start up the adp
// iv_mat.initialize_adp();
iv_lead_tm = false;
iv_lead_tm_takeover = false;
// get stall directive from registry
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_STALL_PHASE_2, la_value);
if (lv_error == 0)
{
iv_stall_phase_2 = atoi (la_value);
if (iv_stall_phase_2 < 0)
iv_stall_phase_2 = 0; // Turn it off
}
else
iv_stall_phase_2 = 0; // Turn it off
// get interval from registry
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_RM_WAIT_TIME, la_value);
if (lv_error == 0)
{
iv_rm_wait_time = atoi (la_value);
if (iv_rm_wait_time <= 0)
iv_rm_wait_time = MAX_RM_WAIT_TIME;
}
else
iv_rm_wait_time = MAX_RM_WAIT_TIME;
// get trans hung retry from registry
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_TRANS_HUNG_RETRY_INTERVAL, la_value);
if (lv_error == 0)
{
iv_trans_hung_retry_interval = atoi (la_value);
if (iv_trans_hung_retry_interval == 0)
iv_trans_hung_retry_interval = TRANS_HUNG_RETRY_INTERVAL;
}
else
iv_trans_hung_retry_interval = TRANS_HUNG_RETRY_INTERVAL;
// get default timer thread wait time from registry (msec)
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_TIMERTHREAD_WAIT, la_value);
if (lv_error == 0)
{
iv_timerDefaultWaitTime = atoi (la_value);
if (iv_timerDefaultWaitTime == 0)
iv_timerDefaultWaitTime = TIMERTHREAD_WAIT;
}
else
iv_timerDefaultWaitTime = TIMERTHREAD_WAIT;
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_CP_INTERVAL, la_value);
if (lv_error == 0)
{
iv_cp_interval = atoi (la_value);
if (iv_cp_interval > 0)
iv_cp_interval *= 60000; // 60 (mins to secs) * 1000 (secs to msecs)
else
iv_cp_interval = TM_CP_DEFAULT * 60000;
}
else
{
iv_cp_interval = TM_CP_DEFAULT * 60000;
}
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_STATS_INTERVAL, la_value);
if (lv_error == 0)
{
iv_stats_interval = atoi (la_value);
if (iv_stats_interval > 0)
iv_stats_interval *= 60000; // 60 (mins to secs) * 1000 (secs to msecs)
else
iv_stats_interval = TM_STATS_DEFAULT * 60000;
}
else
{
iv_stats_interval = TM_STATS_DEFAULT * 60000;
}
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_TM_RMRETRY_INTERVAL, la_value);
if (lv_error == 0)
{
iv_RMRetry_interval = atoi (la_value);
if (iv_RMRetry_interval > 0)
iv_RMRetry_interval *= 60000; // 60 (mins to secs) * 1000 (secs to msecs)
else
iv_RMRetry_interval = TM_RMRETRY_DEFAULT * 60000;
}
else
{
iv_RMRetry_interval = TM_RMRETRY_DEFAULT * 60000;
}
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_TM_TMRESTARTRETRY_INTERVAL, la_value);
if (lv_error == 0)
{
iv_TMRestartRetry_interval = atoi (la_value);
if (iv_TMRestartRetry_interval > 0)
iv_TMRestartRetry_interval *= 1000; // * 1000 (secs to msecs)
else
iv_TMRestartRetry_interval = TM_TMRESTARTRETRY_DEFAULT * 1000; //secs
}
else
{
iv_TMRestartRetry_interval = TM_TMRESTARTRETRY_DEFAULT * 1000; //secs
}
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_TX_ABORT_TIMEOUT, la_value);
if (lv_error == 0)
{
iv_timeout = atoi (la_value);
if (iv_timeout != -1 && iv_timeout <= 0)
iv_timeout = TX_ABORT_TIMEOUT; // Reset to default
}
else
{
iv_timeout = TX_ABORT_TIMEOUT; // Default
}
//initialize trace file
iv_trace_level = 0;
lv_unique = false;
ms_getenv_int("TM_TRACE", &iv_trace_level);
if (!iv_trace_level)
{
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_TM_TRACE, la_value);
if (lv_error == 0)
{
iv_trace_level = atoi (la_value);
init_tracing (true, (char *) "tm_trace", iv_trace_level);
}
}
else //if (iv_trace_level)
{
ms_getenv_int("TM_TRACE_DETAIL", &lv_trace_detail);
if (lv_trace_detail)
iv_trace_level = lv_trace_detail;
ms_getenv_bool ("TM_TRACE_UNIQUE", &lv_unique);
const char *lp_file = ms_getenv_str("TM_TRACE_FILE");
if (lp_file != NULL)
{
char *lp_trace_file = (char*)lp_file;
init_tracing (lv_unique, lp_trace_file, iv_trace_level);
}
else
init_tracing (lv_unique, (char *) "tm_trace", iv_trace_level);
TMTrace (1, ("TM Tracing is on, trace level %d.\n", iv_trace_level));
}
// Get DTM_TM_STATS
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_TM_STATS,
la_value);
if (lv_error == 0)
lv_tm_stats = ((atoi(la_value) == 0)?false:true);
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_TM_STATS_INTERVAL,
la_value);
if (lv_error == 0)
lv_tm_stats_interval = atoi(la_value);
gv_tm_info.stats()->initialize(lv_tm_stats, lv_tm_stats_interval);
// Check for global unique sequence number generation
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_GLOBAL_UNIQUE_SEQ_NUM, la_value);
if (lv_error == 0)
{
int lv_globalUniqueSeqNum = atoi (la_value);
if (lv_globalUniqueSeqNum == 0)
iv_globalUniqueSeqNum = false;
}
if (iv_trace_level)
{
if (iv_globalUniqueSeqNum)
trace_printf("Monitor to be used for sequence number generation.\n");
else
trace_printf("TM will generate local sequence numbers.\n");
}
#ifdef MULTITHREADED_TM
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_THREAD_MODEL, la_value);
if (lv_error == 0)
{
int lv_threadModel = atoi (la_value);
if (lv_threadModel == 1)
iv_threadModel = worker;
else
iv_threadModel = transaction;
}
else // Use default
{
if (THREAD_MODEL == 1)
iv_threadModel = worker;
else
iv_threadModel = transaction;
}
if (iv_trace_level)
{
if (iv_threadModel == worker)
trace_printf("Multithreading enabled for TM using worker threads.\n");
else
trace_printf("Multithreading enabled for TM using transaction threads.\n");
}
#else
iv_threadModel = none;
TMTrace (1, ("Multithreading disabled for TM.\n"));
#endif
// Configure transactionPool
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_MAX_NUM_TRANS,
la_value);
lv_max_num_trans = ((lv_error == 0)?atoi(la_value):-1);
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_STEADYSTATE_LOW_TRANS,
la_value);
lv_ss_low_trans = ((lv_error == 0)?atoi(la_value):-1);
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_STEADYSTATE_HIGH_TRANS,
la_value);
lv_ss_high_trans = ((lv_error == 0)?atoi(la_value):-1);
lv_success = ip_transactionPool->setConfig(tm_stats(), lv_max_num_trans,
lv_ss_low_trans, lv_ss_high_trans);
if (lv_success)
{
TMTrace (1, ("Transaction pool parameters set: "
"Max %d, steady state low %d, steady state high %d.\n",
lv_max_num_trans, lv_ss_low_trans, lv_ss_high_trans));
}
else
{
TMTrace (1, ("Attempt to set tranasction pool parameters failed: "
"Max %d, steady state low %d, steady state high %d.\n",
lv_max_num_trans, lv_ss_low_trans, lv_ss_high_trans));
}
// Configure threadPool
ms_getenv_int ("DTM_MAX_NUM_THREADS", &lv_max_num_threads);
if (lv_max_num_threads)
{
TMTrace (1, ("Enabling DTM_MAX_NUM_THREADS from env variable\n"));
}
else
{
lv_max_num_threads = MAX_NUM_THREADS;
}
ms_getenv_int ("DTM_STEADYSTATE_LOW_THREADS", &lv_ss_low_threads);
if (lv_ss_low_threads)
{
TMTrace (1, ("Enabling DTM_STEADYSTATE_LOW_THREADS from env variable\n"));
}
else
{
lv_ss_low_threads = STEADYSTATE_LOW_THREADS;
}
ms_getenv_int ("DTM_STEADYSTATE_HIGH_THREADS", &lv_ss_high_threads);
if (lv_ss_high_threads)
{
TMTrace (1, ("Enabling DTM_STEADYSTATE_HIGH_THREADS from env variable\n"));
}
else
{
lv_ss_high_threads = STEADYSTATE_HIGH_THREADS;
}
lv_success = ip_threadPool->setConfig(tm_stats(), lv_max_num_threads,
lv_ss_low_threads, lv_ss_high_threads);
if (lv_success)
{
TMTrace (1, ("Thread pool parameters set: "
"Max %d, steady state low %d, steady state high %d.\n",
lv_max_num_threads, lv_ss_low_threads, lv_ss_high_threads));
}
else
{
TMTrace (1, ("Attempt to set thread pool parameters failed: "
"Max %d, steady state low %d, steady state high %d.\n",
lv_max_num_threads, lv_ss_low_threads, lv_ss_high_threads));
}
// get incarnation num
char la_incarnation[9];
lv_error = tm_reg_get(MS_Mon_ConfigType_Process,
(char *) la_tm_name, (char *) DTM_INCARNATION_NUM, la_incarnation);
if (lv_error) // NOT FOUND
iv_incarnation_num = 0;
else
iv_incarnation_num = (short) (atoi (la_incarnation) + 1);
TMTrace (1, ("Current incarnation for %s is %d: with error : %d \n",
la_tm_name, iv_incarnation_num, lv_error));
sprintf(la_incarnation, "%d", iv_incarnation_num);
lv_error = tm_reg_set(MS_Mon_ConfigType_Process,
la_tm_name, (char *) DTM_INCARNATION_NUM, la_incarnation);
if (lv_error)
{
tm_log_event (DTM_TM_REGISTRY_SET_ERROR, SQ_LOG_CRIT, "DTM_TM_REGISTRY_SET_ERROR", lv_error);
TMTrace (1, ("Failed to write value into the registry. Error %d\n",lv_error));
abort ();
}
// get running mode
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_RUN_MODE, la_value);
if (lv_error == 0)
{
int lv_run_mode = atoi(la_value);
switch (lv_run_mode)
{
case TM_SYNC_MODE:
{
TMTrace (1, ("Setting RUNTIME mode to SYNC_MODE\n"));
iv_run_mode = TM_SYNC_MODE;
break;
}
case TM_NONSYNC_MODE:
default:
{
TMTrace (1, ("Setting RUNTIME mode to TM_NONSYNC_MODE\n"));
iv_run_mode = TM_NONSYNC_MODE;
break;
}
}
}
else
{
TMTrace (1, ("Setting RUNTIME mode to TM_NONSYNC_MODE\n"));
iv_run_mode = TM_NONSYNC_MODE;
}
int lv_use_tlog;
ms_getenv_int ("TM_ENABLE_TLOG_WRITES", &lv_use_tlog);
if (lv_use_tlog)
TMTrace (1, ("Enabling TLOG use from env variable\n"));
use_tlog( lv_use_tlog != 0 );
int lv_audit_mode;
ms_getenv_int ("DTM_AUDIT_MODE", &lv_audit_mode);
if (lv_audit_mode)
TMTrace (1, ("Setting AUDIT mode from env variable\n"));
if (!lv_audit_mode)
{
// get audit mode DTM_AUDIT_MODE
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_AUDIT_MODE, la_value);
if (lv_error == 0)
lv_audit_mode = atoi(la_value);
}
switch (lv_audit_mode)
{
case TM_NORMAL_AUDIT_MODE:
{
TMTrace (1, ("Setting AUDIT mode to BUFFERING OFF\n"));
iv_audit_mode = lv_audit_mode;
break;
}
case TM_BUFFER_AUDIT_MODE:
{
TMTrace (1, ("Setting AUDIT mode to BUFFERING ON\n"));
iv_audit_mode = lv_audit_mode;
break;
}
default:
{
TMTrace (1, ("Setting AUDIT mode to BUFFERING OFF as default\n"));
iv_audit_mode = TM_NORMAL_AUDIT_MODE;
break;
}
}
// get perf stats mode DTM_PERF_STATS
iv_perf_stats = TM_PERF_STATS_OFF;
TMTrace (1, ("Setting PERF STATS mode to %d\n",iv_perf_stats ));
// Get the next sequence number block and set the next block range in
// the registry. SeqNumInterval determines the block size.
iv_nextSeqNum = setNextSeqNumBlock();
iv_TSE_xa_start = TSE_XA_START_DEFAULT;
ms_getenv_bool ("TM_TSE_XA_START", &iv_TSE_xa_start);
if (iv_trace_level)
{
if (iv_TSE_xa_start)
trace_printf("TM_TSE_XA_START on, but no longer used - remove!\n");
else
trace_printf("TM_TSE_XA_START off, but no longer used - remove!\n");
}
// RM Participation
// We can't distingush between the DTM_RM_PARTIC environment
// variable =0 or missing. So, you must omit both the environment
// variable and registry value or include only the registry value
// (omitting the environment variable) for the defaulting to work.
// A non-zero environment variable takes precidence, followed by the
// registry value, then the default.
bool lv_RMPartic;
ms_getenv_bool ("DTM_RM_PARTIC", &lv_RMPartic);
if (lv_RMPartic)
iv_RMPartic = lv_RMPartic;
else
{
// Check for registry value
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_RM_PARTIC, la_value);
if (lv_error == 0)
{
lv_RMPartic = atoi(la_value);
switch (lv_RMPartic)
{
case PARTIC_NONE:
iv_RMPartic = false;
break;
case PARTIC_ALL_RMS:
iv_RMPartic = true;
break;
default:
iv_RMPartic = (bool) DEFAULT_RM_PARTIC;
}
}
else
iv_RMPartic = lv_RMPartic;
}
if (iv_trace_level)
{
if (iv_RMPartic)
trace_printf("TM will use xa_start to register transactions with all TSEs.\n");
else
trace_printf("RMs will use ax_reg to register transaction participation.\n");
}
// Timestamp mode
TS_MODE lv_TSMode;
ms_getenv_bool ("DTM_TM_TS_MODE", (bool *) &iv_TSMode);
if (!iv_TSMode)
{
// Check for registry value
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_TM_TS_MODE, la_value);
if (lv_error == 0)
{
lv_TSMode = (TS_MODE) atoi(la_value);
if (lv_TSMode)
iv_TSMode = lv_TSMode;
else
iv_TSMode = DEFAULT_TS_MODE;
}
else
iv_TSMode = DEFAULT_TS_MODE;
}
if (iv_perf_stats == TM_PERF_STATS_ON && iv_TSMode < TS_FAST)
{
iv_TSMode = TS_DETAIL;
if (iv_trace_level)
trace_printf("Warning Timestamp mode changed to %d because TM_PERF_STATS is on.\n", iv_TSMode);
}
if (iv_trace_level)
trace_printf("Timestamp mode %d.\n", iv_TSMode);
iv_overrideAuditInconsistencyDuringRecovery = OVERRIDE_AUDIT_INCONSISTENCY;
ms_getenv_bool ("DTM_OVERRIDE_AUDIT_INCONSISTENTCY", &iv_overrideAuditInconsistencyDuringRecovery);
if (iv_overrideAuditInconsistencyDuringRecovery)
{
tm_log_event (DTM_OVERRIDE_AUDIT_INCONSISTENCY_WARN, SQ_LOG_INFO,"DTM_OVERRIDE_AUDIT_INCONSISTENCY_WARN");
if (iv_trace_level)
trace_printf("DTM_OVERRIDE_AUDIT_INCONSISTENCY set. Only the first trans state record will be used on scan back.\n");
}
TM_BROADCAST_ROLLBACKS lv_broadcast_rollbacks = TM_DEFAULT_BROADCAST_ROLLBACKS;
ms_getenv_int (DTM_BROADCAST_ROLLBACKS, (int *) &lv_broadcast_rollbacks);
if (lv_broadcast_rollbacks == 0)
{
// Check for registry value
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_BROADCAST_ROLLBACKS, la_value);
if (lv_error == 0)
{
lv_broadcast_rollbacks = (TM_BROADCAST_ROLLBACKS) atoi(la_value);
switch (lv_broadcast_rollbacks)
{
case TM_BROADCAST_ROLLBACKS_NO:
case TM_BROADCAST_ROLLBACKS_YES:
case TM_BROADCAST_ROLLBACKS_DEBUG:
broadcast_rollbacks(lv_broadcast_rollbacks);
break;
default:
broadcast_rollbacks(TM_DEFAULT_BROADCAST_ROLLBACKS);
}
}
else
broadcast_rollbacks(lv_broadcast_rollbacks);
}
else
broadcast_rollbacks(lv_broadcast_rollbacks);
tm_log_event (DTM_BROADCAST_ROLLBACKS_INFO, SQ_LOG_INFO,"DTM_BROADCAST_ROLLBACKS_INFO",\
-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,iv_broadcast_rollbacks);
if (iv_trace_level)
{
if (iv_trace_level)
trace_printf("DTM_BROADCAST_ROLLBACKS set to %d. All aborts will cause xa_rollback to be broadcast "
"to all TSEs regardless of branch participation.\n", iv_broadcast_rollbacks);
}
TM_SYSRECOVERY_MODE lv_sysrecovery_mode = iv_SysRecovMode;
ms_getenv_int("DTM_TM_SYSRECOVERY_MODE", (int *) &lv_sysrecovery_mode);
switch (lv_sysrecovery_mode)
{
case CLEAN_SHUTDOWN_OPTIMIZE:
case ALWAYS_SEND_XA_RECOVER:
TMTrace(1, ("Setting System Recovery mode to %d\n", lv_sysrecovery_mode));
iv_SysRecovMode = lv_sysrecovery_mode;
break;
default:
TMTrace(1, ("** Invalid System Recovery mode %d, default of %d used.\n",
lv_sysrecovery_mode, iv_SysRecovMode));
}
// Setting a TLOG per TM allows greater scale than the standard one TLOG/environment.
// We can't distingush between the DTM_TLOG_PER_TM environment
// variable =0 or missing. So, you must omit both the environment
// variable and registry value or include only the registry value
// (omitting the environment variable) for the defaulting to work.
// A non-zero environment variable takes precidence, followed by the
// registry value, then the default.
bool lv_TLOGperTM;
ms_getenv_bool ("DTM_TLOG_PER_TM", &lv_TLOGperTM);
if (lv_TLOGperTM)
iv_TLOGperTM = lv_TLOGperTM;
else
{
// Check for registry value
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_TLOG_PER_TM, la_value);
if (lv_error == 0)
{
lv_TLOGperTM = atoi(la_value);
switch (lv_TLOGperTM)
{
case false:
iv_TLOGperTM = false;
break;
case true:
iv_TLOGperTM = true;
break;
default:
iv_TLOGperTM = (bool) DEFAULT_TLOG_PER_TM;
}
}
else
iv_TLOGperTM = lv_TLOGperTM;
}
if (iv_trace_level)
{
if (iv_TLOGperTM)
trace_printf("TM will configure one TLOG per TM process for audit.\n");
else
trace_printf("TM will use a single TLOG for all audit.\n");
}
iv_earlyCommitReply = TM_DEFAULT_EARLY_COMMIT_REPLY;
ms_getenv_bool ("DTM_EARLYCOMMITREPLY", &iv_earlyCommitReply);
if (iv_trace_level)
{
if (iv_earlyCommitReply)
trace_printf("TM will reply early to HBase commits.\n");
else
trace_printf("TM will wait for HBase commits to complete before replying.\n");
}
int lv_maxRecoveringTxns = 0;
ms_getenv_int ("DTM_MAXRECOVERINGTXNS", &lv_maxRecoveringTxns);
if (lv_maxRecoveringTxns)
{
TMTrace (1, ("Setting maximum recovering transactions from env variable "
"DTM_MAXRECOVERINGTXNS to %d, default was %d.\n",
lv_maxRecoveringTxns, iv_maxRecoveringTxns));
iv_maxRecoveringTxns = lv_maxRecoveringTxns;
}
int lv_rm_partic_recov;
ms_getenv_int ("DTM_RM_PARTIC_RECOV", &lv_rm_partic_recov);
if (lv_rm_partic_recov)
TMTrace (1, ("Setting RM_PARTIC_RECOV from env variable\n"));
if (!lv_rm_partic_recov)
{
// get rm_partic_recov DTM_RM_PARTIC_RECOV registry
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_RM_PARTIC_RECOV, la_value);
if (lv_error == 0)
lv_rm_partic_recov = atoi(la_value);
}
switch (lv_rm_partic_recov)
{
case TM_PARTIC_RM_PARTIC_RECOV:
{
TMTrace (1, ("Setting ALL_RM_PARTIC_RECOV to false\n"));
iv_AllRmParticRecov = false;
break;
}
case TM_ALL_RM_PARTIC_RECOV:
{
TMTrace (1, ("Setting ALL_RM_PARTIC_RECOV to true\n"));
iv_AllRmParticRecov = true;
break;
}
default:
{
TMTrace (1, ("Setting RM_PARTIC_RECOV to default\n"));
iv_AllRmParticRecov = DEFAULT_ALL_RM_PARTIC_RECOV;
break;
}
}
// get SeaTrans multithread
bool lv_seatransSingleThreaded = false;
ms_getenv_bool("TM_SEATRANS_SINGLETHREAD", &lv_seatransSingleThreaded);
if (lv_seatransSingleThreaded == false)
{
cout << "SeaTrans multithread mode set." << endl;
multithreaded(true);
}
else
{
cout << "SeaTrans singlethread mode set." << endl;
multithreaded(false);
}
TMTrace (1, ("Setting SeaTrans Singlethread mode to %d.\n", !iv_multithreaded));
// Test pause state is used for testing only and requires the define
// debug_mode=1 to be set
#ifdef debug_mode
int lv_test_pause_state = -1;
ms_getenv_int ("DTM_TEST_PAUSE_STATE", &lv_test_pause_state);
if (lv_test_pause_state != -1)
TMTrace (1, ("Setting TEST_PAUSE_STATE to %d from env variable\n", lv_test_pause_state));
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *) DTM_TEST_PAUSE_STATE, la_value);
if (lv_error == 0)
lv_test_pause_state = atoi(la_value);
switch (lv_test_pause_state)
{
case -2:
{
TMTrace (1, ("Setting TEST_PAUSE_STATE to RANDOM.\n"));
gv_test_pause_type = TX_PAUSE_STATE_TYPE_RANDOM;
gv_test_pause = -1;
break;
}
case -1:
{
TMTrace (1, ("Setting TEST_PAUSE_STATE to off.\n"));
gv_test_pause_type = TX_PAUSE_STATE_TYPE_OFF;
gv_test_pause = -1;
break;
}
default:
{
TMTrace (1, ("Setting TEST_PAUSE_STATE to FIXED %d.\n", lv_test_pause_state));
gv_test_pause_type = TX_PAUSE_STATE_TYPE_FIXED;
gv_test_pause = lv_test_pause_state;
break;
}
}
#endif //deug_mode
unlock();
}
void TM_Info::set_xa_trace (char *pp_string)
{
char * lp_trace_string_end;
unsigned long lv_trace_mask;
lp_trace_string_end = pp_string + strlen (pp_string);
lv_trace_mask = strtoul (pp_string, &lp_trace_string_end, 16);
xaTM_setTrace(lv_trace_mask);
}
void TM_Info::set_trace (int32 pv_detail)
{
iv_trace_level = pv_detail;
if (!iv_trace_initialized)
init_tracing (true, (char *) "tm_trace", iv_trace_level); // use defaults
}
void TM_Info::init_tracing(bool pv_unique, const char *pp_trace_file, int32 pv_detail)
{
iv_trace_initialized = true;
if (pp_trace_file != NULL)
{
char *lp_trace_file = (char*)pp_trace_file;
trace_init(lp_trace_file, pv_unique, NULL, false);
}
else
trace_init ((char *) "tm_trace", pv_unique, NULL, false);
iv_trace_level = pv_detail;
gv_tm_trace_level = iv_trace_level;
TMTrace (1, ("TM Tracing is on, trace level %d.\n", iv_trace_level));
}
void TM_Info::send_system_status(TM_STATUSSYS *pp_system_status)
{
short la_results[6];
Tm_Sys_Status_Req_Type *lp_req = NULL;
Tm_Sys_Status_Rsp_Type *lp_rsp = NULL;
int32 lv_error = FEOK;
int32 lv_index = 0;
int32 lv_num_sent = 0;
pid_msgid_struct lv_pid_msgid[MAX_NODES];
int32 lv_reqLen = 0;
long lv_ret;
long lv_ret2;
int32 lv_rspLen = 0;
int lv_rsp_rcvd = 0;
BMS_SRE_LDONE lv_sre;
int32 lv_up = 0;
int32 lv_down = 0;
int32 lv_recovering = 0;
int32 lv_totaltms = 0;
int32 lv_activetxns = 0;
int32 lv_leadtm = 0;
TMTrace (2, ("TM_Info::send_system_status: ENTRY\n"));
//initialize lv_pid_msgid
for (int32 i = 0; i <= tms_highest_index_used(); i++)
{
lv_pid_msgid[i].iv_tag = 0;
lv_pid_msgid[i].iv_msgid = 0;
lv_pid_msgid[i].iv_nid = 0;
}
TMTrace (3, ("TM_Info::send_system_status Sending Stats request to other TMs.\n"));
lp_req = new Tm_Sys_Status_Req_Type [tms_highest_index_used() + 1];
lp_rsp = new Tm_Sys_Status_Rsp_Type [tms_highest_index_used() + 1];
for (int lv_idx = 0; lv_idx <= tms_highest_index_used(); lv_idx++)
{
//gather even if not in_use
if (lv_idx == iv_nid)
{
lv_pid_msgid[lv_idx].iv_tag = -1;
}
else
{
lv_pid_msgid[lv_idx].iv_tag = lv_idx + 1; // non zero
lp_req[lv_idx].iv_msg_hdr.rr_type.request_type = TM_MSG_TYPE_STATUSSYSTEM;
lp_req[lv_idx].iv_msg_hdr.version.request_version = TM_SQ_MSG_VERSION_CURRENT;
lv_pid_msgid[lv_idx].iv_nid = lv_idx;
lv_reqLen = sizeof (Tm_Sys_Status_Req_Type);
lv_rspLen = sizeof (Tm_Sys_Status_Rsp_Type);
lv_error = link(&(iv_open_tms[lv_idx].iv_phandle), // phandle,
&lv_pid_msgid[lv_idx].iv_msgid, // msgid
(char *) &lp_req[lv_idx], // reqdata
lv_reqLen, // reqdatasize
(char *) &lp_rsp[lv_idx], // replydata
lv_rspLen, // replydatamax
lv_pid_msgid[lv_idx].iv_tag, // linkertag
TM_TM_LINK_PRIORITY, // pri
BMSG_LINK_LDONEQ, // linkopts
TM_LINKRETRY_RETRIES); // retry count
if (lv_error != 0)
{
TMTrace (1, ("TM_Info::send_system_status BMSG_LINK_ failed with error %d. failure ignored.\n",lv_error));
}
else
lv_num_sent++;
}
} // for each tm
// LDONE LOOP
while (lv_rsp_rcvd < lv_num_sent)
{
// wait for an LDONE wakeup
XWAIT(LDONE, -1);
do {
lv_error = 0;
// we've reached our message reply count, break
if (lv_rsp_rcvd >= lv_num_sent)
break;
lv_ret = BMSG_LISTEN_((short *)&lv_sre,
BLISTEN_ALLOW_LDONEM, 0);
if (lv_ret == BSRETYPE_LDONE)
{
lv_index = -1;
for (int32 lv_idx2 = 0; lv_idx2 <=tms_highest_index_used(); lv_idx2++)
{
if (lv_pid_msgid[lv_idx2].iv_tag == lv_sre.sre_linkTag)
{
lv_index = lv_idx2;
break;
}
}
if (lv_index == -1)
{
TMTrace (1, ("TM_Info::send_system_status - Link Tag %d not found\n", (int)lv_sre.sre_linkTag));
lv_error = FEDEVDOWN;
}
if (!lv_error)
{
lv_ret2 = BMSG_BREAK_(lv_pid_msgid[lv_index].iv_msgid,
la_results,
&(iv_open_tms[lv_pid_msgid[lv_index].iv_nid].iv_phandle));
if (lv_ret2 != 0)
{
TMTrace (1, ("TM_Info::send_system_status ERROR BMSG_BREAK_ returned %ld, index %d, msgid %d.\n",
lv_ret2, lv_index, lv_pid_msgid[lv_index].iv_msgid));
lv_error = FEDEVDOWN;
}
}
if (lv_error == FEDEVDOWN)
{
lv_down += 1;
lv_totaltms +=1;
TMTrace (1, ("TM_Info::send_system_status - TM respond error\n"));
}
else
{
lv_up += lp_rsp[lv_index].iv_status_system.iv_up;
lv_down += lp_rsp[lv_index].iv_status_system.iv_down;
lv_recovering += lp_rsp[lv_index].iv_status_system.iv_recovering;
lv_totaltms += lp_rsp[lv_index].iv_status_system.iv_totaltms;
lv_activetxns += lp_rsp[lv_index].iv_status_system.iv_activetxns;
if (lp_rsp[lv_index].iv_status_system.iv_leadtm == 1) {
lv_leadtm = lv_index;
}
}
lv_rsp_rcvd++;
}
} while (lv_ret == BSRETYPE_LDONE);
}// while (lv_rsp_rcvd < lv_num_sent)
delete []lp_rsp;
delete []lp_req;
if(state() == TM_STATE_UP) {
lv_up += 1;
}
else if (state() == TM_STATE_DOWN) {
lv_down += 1;
}
if(sys_recov_state() != TM_SYS_RECOV_STATE_END) {
lv_recovering += 1;
}
lv_totaltms += 1;
lv_activetxns += num_active_txs();
// If we're still in recovery we need to add any transactions
// still queued to recover.
if (ip_ClusterRecov)
lv_activetxns += ip_ClusterRecov->txnStateList()->size();
if(lead_tm()) {
lv_leadtm = nid();
}
pp_system_status->iv_up = lv_up;
pp_system_status->iv_down = lv_down;
pp_system_status->iv_recovering = lv_recovering;
pp_system_status->iv_totaltms = lv_totaltms;
pp_system_status->iv_activetxns = lv_activetxns;
pp_system_status->iv_leadtm = lv_leadtm;
TMTrace (2, ("TM_Info::send_system_status: EXIT\n"));
}
// ------------------------------------------------------------
// lock
// Purpose : locks access to the structure to enforce
// serialization.
// Now using recursive semaphores
// ------------------------------------------------------------
void TM_Info::lock()
{
TMTrace(4, ("TM_Info::lock, count %d, owner %ld\n",
ip_mutex->lock_count(), ip_mutex->lock_owner()));
int lv_error = ip_mutex->lock();
if (lv_error)
{
TMTrace(1, ("TM_Info::lock returned error %d.\n", lv_error));
abort();
}
}
// ------------------------------------------------------------
// unlock
// Purpose : unlocks access to the structure to enforce
// serialization.
// ------------------------------------------------------------
void TM_Info::unlock()
{
iv_lock_count--;
if (iv_lock_count == 0)
iv_lock_owner= -1;
TMTrace(4, ("TM_Info::unlock, count %d, owner %ld\n",
ip_mutex->lock_count(), ip_mutex->lock_owner()));
int lv_error = ip_mutex->unlock();
if (lv_error)
{
TMTrace(1, ("TM_Info::unlock returned error %d.\n", lv_error));
abort();
}
}
// ---------------------------------------------------------
// cleanup tx
// Purpose : Clean up transaction related info when a
// transaction exits.
// ---------------------------------------------------------
void TM_Info::cleanup(void *pp_txn)
{
CTmTxBase *lp_txn = (CTmTxBase *) pp_txn;
TMTrace (2, ("TM_Info::cleanup : ENTRY, Txn ID (%d,%d).\n", lp_txn->node(), lp_txn->seqnum()));
//No longer need to check for additional queued requests here as
// nothing else could have been queued because we don't queue begintxn
// any more, just end and aborts.
// Reply to any queued application requests
//pp_txn->reply_to_queuedRequests(FEINVTRANSID);
lock();
if (lp_txn->in_use())
remove_tx(lp_txn);
unlock();
TMTrace (2, ("TM_Info::cleanup : EXIT, Active Txns=%d.\n", num_active_txs()));
}
// --------------------------------------------------------------
// TM_Info::init_slot
// Purpose : Initialize a slot in the array with RM indicated
// by pv_rmid
// --------------------------------------------------------------
int32 TM_Info::init_slot (MS_Mon_Process_Info_Type *pp_info, int32 pv_rmid, bool pv_is_ax_reg)
{
int32 lv_err = gv_RMs.TSE()->init(pp_info->pid, pp_info->nid, pp_info->process_name,
pv_rmid, pv_is_ax_reg, TSEBranch_UP);
return lv_err;
} //TM_Info::init_slot
// --------------------------------------------------------------------
// TM_Info::schedule_init_and_recover_rms
// Purpose : this is called at startup to schedule the opening of RMs.
// It does this by creating a timerEvent and queuing it to the timer
// thread for immediate processing.
// ---------------------------------------------------------------------
void TM_Info::schedule_init_and_recover_rms()
{
const int32 lc_delay = 0;
TMTrace (2, ("TM_Info::schedule_init_and_recover_rms : ENTRY, delay time: %dusec\n", lc_delay));
CTmTimerEvent *lp_timerEvent =
new CTmTimerEvent(TM_MSG_TXINTERNAL_INITIALIZE_RMS, lc_delay);
tmTimer()->eventQ_push((CTmEvent *) lp_timerEvent);
// We don't wait here - the starup will be driven forward by
// confirmation that the TM_SYS_RECOV_START_SYNC sync has
// been sent to all TMs (tm_originating_sync_commit()).
} //TM_Info::schedule_init_and_recover_rms
//----------------------------------------------------------------------------
// TM_Info::schedule_recover_system
// Purpose : Schedules system recovery to execute under the timer thread.
// This frees up the main thread to handle any requests which arrive during
// the recovery window.
//----------------------------------------------------------------------------
void TM_Info::schedule_recover_system()
{
const int32 lc_delay = 0;
TMTrace (2, ("TM_Info::schedule_recover_system : ENTRY, delay time: %dusec\n", lc_delay));
CTmTimerEvent *lp_timerEvent =
new CTmTimerEvent(TM_MSG_TXINTERNAL_SYSTEM_RECOVERY, lc_delay);
tmTimer()->eventQ_push((CTmEvent *) lp_timerEvent);
} //TM_Info::schedule_recover_system
// --------------------------------------------------------------------
// TM_Info::init_and_recover_rms
// Purpose : this is called upon startup. We call into the monitor
// to get all the DP2s in the system. Then we try to open
// them. This is done to drive recovery (if need be) as well
// get a handle to them for future communication.
//
// ---------------------------------------------------------------------
void TM_Info::init_and_recover_rms()
{
#define MAX_RM_PROCS MAX_OPEN_RMS*2 //Allow for backups
TM_RM_Responses la_resp[MAX_RM_PROCS];
//char la_value[9];
int32 lv_count;
int32 lv_error;
int32 lv_index = 0;
MS_Mon_Process_Info_Type *lp_info;
int32 lv_msg_count = 0;
RM_Open_struct lv_open;
RMID lv_rmid;
int32 la_rmid[MAX_RM_PROCS];
int32 lv_rmindex = 0;
TMTrace (2, ("TM_Info::init_and_recover_rms : ENTRY.\n"));
if (!all_rms_closed()) {
// RMs open, no need to re-open
TMTrace (2, ("TM_Info::init_and_recover_rms : EXIT.\n"));
return;
}
lp_info = new MS_Mon_Process_Info_Type[MAX_RM_PROCS];
if (!lp_info)
{
TMTrace(1, ("TM_Info::init_and_recover_rms - Unable to allocate process array, aborting.\n"));
tm_log_event(DTM_OUT_OF_MEMORY, SQ_LOG_CRIT, "DTM_OUT_OF_MEMORY",
-1, -1, nid(), -1, -1, -1, MAX_RM_PROCS, -1, -1, -1, -1, -1, -1, -1, -1,
-1, "TM_Info::init_and_recover_rms");
abort();
}
// get all DP2s in the system and open them all
lv_error = msg_mon_get_process_info_type(MS_ProcessType_TSE,
&lv_count,
MAX_RM_PROCS,
lp_info);
switch (lv_error)
{
case FEOK:
break;
case FEBOUNDSERR:
TMTrace(1, ("TM_Info::init_and_recover_rms - Error 22 (FEBOUNDSERR) received from "
"msg_mon_get_process_info_type. DTM currently only supports %d TSEs.\n",
MAX_RM_PROCS));
// Intentional drop-through.
default:
TMTrace(1, ("TM_Info::init_and_recover_rms - Error %d, aborting.\n", lv_error));
tm_log_event(DTM_RM_OPEN_FAILED, SQ_LOG_CRIT, "DTM_RM_OPEN_FAILED",
lv_error /*error_code*/ );
abort();
}
TMTrace( 3, ("TM_Info::init_and_recover_rms : received type for %d rms.\n", lv_count));
for (lv_index = 0; ((lv_index < MAX_RM_PROCS) && (lv_index < lv_count)); lv_index++)
{
// we don't care about it if its a backup
if (lp_info[lv_index].backup == 1)
{
la_rmid[lv_index] = -1;
continue;
}
// Removed RMID registry value in M7.
// M8_TODO: Remove these lines and implement internal rmid algorithm similar
// to seq num with node and locally unique number.
/*lv_error = tm_reg_get(MS_Mon_ConfigType_Process,
lp_info[lv_index].process_name, (char *) "RMID", la_value);
lv_rmid=(lv_error) ? 0 : (atoi(la_value));
*/
lv_rmid.iv_rmid = 0;
//if (lv_rmid.iv_rmid == 0) //TODO
// msg_mon_get_tm_seq((int *) &lv_rmid.iv_rmid);
lv_rmid.s.iv_nid = nid();
lv_rmid.s.iv_num = lv_index;
la_rmid[lv_index] = lv_rmid.iv_rmid; // coordinate lv_info index with rmid
strcpy(lv_open.process_name, lp_info[lv_index].process_name);
lv_open.incarnation_num = incarnation_num();
lv_open.seq_num_block_start = SeqNumBlockStart();
lv_error = (*tm_switch).xa_open_entry ((char *)&lv_open, lv_rmid.iv_rmid, TMNOFLAGS);
if (lv_error == XA_OK)
{
lv_msg_count++;
}
else
{
// Handle RM error.
tm_log_event(DTM_RM_OPEN_FAILED, SQ_LOG_CRIT, "DTM_RM_OPEN_FAILED",
-1, /*error_code*/
lv_rmid.s.iv_num, /*rmid*/
-1, /*dtmid*/
-1, /*seq_num*/
-1, /*msgid*/
-1, /*xa_error*/
-1, /*pool_size*/
-1, /*pool_elems*/
-1, /*msg_retries*/
-1, /*pool_high*/
-1, /*pool_low*/
-1, /*pool_max*/
-1, /*tx_state*/
-1, /*data */
-1, /*data1*/
-1,/*data2 */
lp_info[lv_index].process_name);
TMTrace(1, ("TM_Info::init_and_recover_rms - Failed to open RM %s, rmid %d\n",
lp_info[lv_index].process_name, lv_rmid.s.iv_num));
}
}
//Since we ignore backup TSEs, there could be gaps in the la_rmid array.
//So, we cannot just check lv_msg_count entries in the la_rmid array.
//Instead, we should go through 'lv_index-1' entries.
lv_rmindex = lv_index-1;
int32 lv_repliesOutstanding = complete_all(lv_msg_count, la_resp, MAX_TMTIMER_WAIT_TIME);
if (lv_repliesOutstanding > 0)
{
tm_log_event(DTM_RM_REPLY_FAILED, SQ_LOG_CRIT , "DTM_RM_REPLY_FAILED",
-1, /*error_code*/
-1, /*rmid*/
-1, /*dtmid*/
-1, /*seq_num*/
-1, /*msgid*/
-1, /*xa_error*/
-1, /*pool_size*/
-1, /*pool_elems*/
-1, /*msg_retries*/
-1, /*pool_high*/
-1, /*pool_low*/
-1, /*pool_max*/
-1, /*tx_state*/
lv_repliesOutstanding); /*data */
TMTrace(1, ("TM_Info::init_and_recover_rms - %d RMs failed to reply to xa_open request.\n",
lv_repliesOutstanding));
abort();
}
// if there are not errors, then initialize the rm slot
for (int32 lv_idx = 0; lv_idx < lv_msg_count; lv_idx++)
{
if (!la_resp[lv_idx].iv_error)
{
// find right slot
int lv_i = 0;
for (; lv_i <= lv_rmindex; lv_i++)
if ((la_rmid[lv_i] != -1) &&
(la_rmid[lv_i] == la_resp[lv_idx].iv_rmid))
break;
if (lv_i > lv_rmindex)
{
tm_log_event(DTM_RM_NO_MATCH, SQ_LOG_CRIT, "DTM_RM_NO_MATCH",-1,la_resp[lv_idx].iv_rmid);
TMTrace(1, ("TM_Info::init_and_recover_rms - RM id %d did not match any of the RM slots.\n",
la_resp[lv_idx].iv_rmid));
abort ();
}
init_slot(&lp_info[lv_i], la_resp[lv_idx].iv_rmid, la_resp[lv_idx].iv_ax_reg);
}
else
{
tm_log_event(DTM_RM_OPEN_FAILED2, SQ_LOG_CRIT,"DTM_RM_OPEN_FAILED2",
la_resp[lv_idx].iv_error, la_resp[lv_idx].iv_rmid);
TMTrace(1, ("TM_Info::init_and_recover_rms - Failed to open RM rmid %d, error %d\n",
la_resp[lv_idx].iv_rmid, la_resp[lv_idx].iv_error));
}
}
all_rms_closed(false); // Finished openning RMs. This flag is used in Shutdown.
// once recovery is done, write 2 control points for a clean slate
if (lead_tm())
{
// If this is the Lead TM, proceeds with system crash recovery.
// Once recovery is done, the recover_system function will
// write 2 control points for a clean slate.
if (sys_recov_state() != TM_SYS_RECOV_STATE_END)
{
// send out recovery start sync. The rest of system recover will be driven
// from the completion
ClusterRecov(new TM_Recov(gv_tm_info.rm_wait_time()));
lv_error = ClusterRecov()->initiate_start_sync();
}
}
else
{
// Mark the TM up now if it's pending.
tm_up();
if (restarting_tm() == nid())
{
gv_tm_info.set_sys_recov_status(TM_SYS_RECOV_STATE_END,lead_tm_nid());
msg_mon_tm_ready();
restarting_tm(-1);
}
}
delete[] lp_info;
TMTrace(2, ("TM_Info::init_and_recover_rms : EXIT.\n"));
} //TM_Info::init_and_recover_rms
// -------------------------------------------------------------------
// restart_tm _process
// Purpose : This method will be called by the lead TM when a node Up
// message is received by the monitor. It will restart the
// TM process immediately and send a sync to update the
// restarted process.
// -------------------------------------------------------------------
int32 TM_Info::restart_tm_process(int32 pv_nid)
{
char la_buf_tm_name[20];
char la_prog[MS_MON_MAX_PROCESS_PATH];
char la_out_file[128];
int lv_server_nid = pv_nid;
int lv_server_pid;
int32 lv_oid = 0;
int lv_error = 0;
TMTrace(2, ("TM_Info::restart_tm_process : ENTRY.\n"));
// Make sure we're the Lead TM
if (!lead_tm())
{
TMTrace(1, ("TM_Info::restart_tm_process: Only the Lead TM can restart $TM%d. "
"This TM is %d and the current Lead TM is %d, aborting self!\n",
pv_nid, nid(), lead_tm_nid()));
tm_log_event(DTM_RECOV_NOT_LEAD_TM, SQ_LOG_CRIT, "DTM_RECOV_LOST_LEAD_TM",
-1,-1,pv_nid,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,nid(),-1,-1,NULL,lead_tm_nid());
//msg_mon_shutdown(MS_Mon_ShutdownLevel_Abrupt);
abort();
}
strcpy(la_prog, "tm");
sprintf(la_out_file, "stdout_dtm_%d", pv_nid);
sprintf (la_buf_tm_name, "$tm%d", pv_nid);
lock();
lv_error = msg_mon_start_process2(
la_prog, /* prog */
la_buf_tm_name, /* name */
NULL, /* ret name */
0, /* argc */
NULL, /* argv */
&(iv_open_tms[pv_nid].iv_phandle),
1, /* open */
&lv_oid, /* oid */
MS_ProcessType_DTM, /* type */
0, /* priority */
0, /* debug */
0, /* backup */
&lv_server_nid, /* nid */
&lv_server_pid, /* pid */
NULL, /* infile */
(char *) &la_out_file, /* outfile */
true); /* unhooked */
if (lv_error == FEOK)
{
iv_open_tms[pv_nid].iv_in_use = 1;
if (pv_nid > iv_tms_highest_index_used)
iv_tms_highest_index_used = pv_nid;
restart_tm_process_helper(pv_nid);
}
if (lv_error || !can_takeover())
{
TMTrace(2, ("TM_Info::restart_tm_process: msg_mon_start_process2 for $TM%d failed with "
"error %d, canTakeover %d, scheduling retry to timer thread.\n",
pv_nid, lv_error, can_takeover()));
tm_log_event(DTM_TMRESTART_ERROR, SQ_LOG_ERR, "DTM_TMRESTART_ERROR", lv_error,
-1, pv_nid, -1,-1,-1,-1,-1,-1,-1,-1,-1,-1, can_takeover());
}
else
{
iv_allTMsOpen = all_tms_recovered();
tm_fail_recov_state(pv_nid, TM_FAIL_RECOV_STATE_INITIAL);
gv_HbaseTM.nodeUp(pv_nid);
tm_log_event(DTM_TM_RESTARTED, SQ_LOG_INFO, "DTM_TM_RESTARTED",
-1,-1, pv_nid, -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,NULL, gv_tm_info.nid());
}
unlock();
dummy_link_to_refresh_phandle(pv_nid);
// SB_Thread::Sthr::sleep(100); // in msec
dummy_link_to_refresh_phandle(pv_nid); // The second one actually updates the phandle
TMTrace(2, ("TM_Info::restart_tm_process : EXIT.\n"));
return lv_error;
} // TM_Info::restart_tm_process
// -------------------------------------------------------------------
// recover_tm
// Purpose : This method will be called by the lead TM when a node down
// message is received by the monitor, or a DTM process death
// notice is received in the case of a logical node death.
// It attempts recovery if recovery has not yet completed
// successfully. The gv_tm_info.iv_tm_fail_recov_state will
// be INITIAL the first time we're called and will change to
// RUNNING once started and COMPLETE once recovery completes.
// This causes the TM to reject disableTrans(shutdown) requests
// during recovery.
// -------------------------------------------------------------------
int32 TM_Info::recover_tm(int32 pv_nid)
{
int lv_error = 0;
bool lv_recov_success = false;
TMTrace (2, ("TM_Info::recover_tm: ENTRY - starting TM on node %d\n",pv_nid));
// Make sure we're the Lead TM
if (!lead_tm())
{
TMTrace(1, ("TM_Info::recover_tm: Only the Lead TM can restart $TM%d. "
"This TM is %d and the current Lead TM is %d, aborting self!\n",
pv_nid, nid(), lead_tm_nid()));
tm_log_event(DTM_RECOV_NOT_LEAD_TM, SQ_LOG_CRIT, "DTM_RECOV_LOST_LEAD_TM",
-1,-1,pv_nid,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,nid(),-1,-1,NULL,lead_tm_nid());
//msg_mon_shutdown(MS_Mon_ShutdownLevel_Abrupt);
abort();
}
// At least one TM is down
iv_allTMsOpen = false;
// No other TMs will be doing work during system recovery, so we can
// safely restart those TMs without any recovery.
if (iv_sys_recov_state != TM_SYS_RECOV_STATE_END)
{
TMTrace(1, ("TM_Info::recover_tm: System recovery still active when $TM%d failed!\n", pv_nid));
tm_log_event(DTM_RECOV_LOST_TM_DURING_SYSRECOV, SQ_LOG_INFO, "DTM_RECOV_LOST_TM_DURING_SYSRECOV",
-1,-1,pv_nid);
tm_fail_recov_state(pv_nid, TM_FAIL_RECOV_STATE_COMPLETE);
}
// We only want to run the recovery the first time we enter recover_tm
if (tm_fail_recov_state(pv_nid) == TM_FAIL_RECOV_STATE_INITIAL)
{
tm_fail_recov_state(pv_nid, TM_FAIL_RECOV_STATE_RUNNING);
if (can_takeover())
set_recovery_start(pv_nid);
lv_recov_success = recover_failed_tm(pv_nid, MAX_TMTIMER_WAIT_TIME);
if (lv_recov_success)
{
TMTrace(2, ("TM_Info::recover_tm: Recovery completed for failed TM %d. "
"Enabling shutdown and restarting TM.\n",pv_nid));
tm_fail_recov_state(pv_nid, TM_FAIL_RECOV_STATE_COMPLETE);
}
else
{
TMTrace (2, ("TM_Info::recover_tm: Recovery not completed for failed TM %d "
" Exiting early.\n", pv_nid));
addTMRestartRetry(pv_nid, -1);
return FERETRY;
}
}
// If we are the new lead TM, need to see if any node recoveries are active and
// need to be redriven
if (gv_tm_info.lead_tm_takeover())
{
gv_tm_info.lead_tm_takeover(false);
for (int lv_idx = 0; lv_idx < MAX_NODES; lv_idx++)
{
if (gv_tm_info.node_being_recovered(lv_idx) != -1)
{
TMTrace(2, ("tm_process_node_down_msg - restarting recovery for node %d.\n", lv_idx));
lv_error = gv_tm_info.recover_tm(lv_idx);
}
}
}
iv_allTMsOpen = all_tms_recovered();
gv_tm_info.set_recovery_end(pv_nid);
tm_log_event(DTM_TM_RECOVERED, SQ_LOG_INFO, "DTM_TM_RECOVERED",
-1,-1, pv_nid, -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,NULL, gv_tm_info.nid());
if (gv_tm_info.mode() == TM_SYNC_MODE)
broadcast_sync_data (pv_nid);
TMTrace (2, ("TM_Info::recover_tm: EXIT, returning %d\n", lv_error));
return lv_error;
} //TM_Info::recover_tm
// ---------------------------------------------------------------------------
// TM_Info::recover_failed_tm
// Purpose : Attempt to recover a failed TM.
// ---------------------------------------------------------------------------
bool TM_Info::recover_failed_tm(int32 pv_nid, int32 pv_rm_wait_time)
{
bool lv_success = false;
int32 lv_error = FEOK;
TMTrace(1, ("TM_Info::recover_failed_tm ENTRY, Failed TM nid %d.\n", pv_nid));
// NON SYNC MODE
if (mode() == TM_NONSYNC_MODE)
{
// create the NodeRecov object if not yet created
if (!gv_tm_info.NodeRecov(pv_nid))
gv_tm_info.NodeRecov(new TM_Recov(pv_rm_wait_time), pv_nid);
// send recov messages to TSE
lv_error = gv_tm_info.NodeRecov(pv_nid)->recover_dtm_death(pv_nid);
if (lv_error != FEOK) // issue an EMS
{
tm_log_event(DTM_RECOVERY_FAILED1, SQ_LOG_CRIT, "DTM_RECOVERY_FAILED1",
lv_error, /*error_code*/
-1, /*rmid*/
pv_nid); /*dtmid*/ ;
TMTrace(1, ("TM_Info::recover_failed_tm : Failed to recover from the death of $TM%d. "
"Shutting down!\n", pv_nid));
// Recovery for dead TM failed, so can't continue
error_shutdown_abrupt(lv_error);
}
else
lv_success = true;
// delete lp_recov; never get rid of the node recovery object once created in lead TM
}
else
{
TM_MAP *lp_dataList = get_node_syncDataList(pv_nid);
do_take_over(lp_dataList);
lv_success = true;
}
TMTrace(1, ("TM_Info::recover_failed_tm EXIT, returning %d.\n", lv_success));
return lv_success;
} //TM_Info::recover_failed_tm
// ---------------------------------------------------------------------------
// TM_Info::error_shutdown_abrupt
// Purpose : Handle recover error by checking env variable to decide
// whether to shutdown, loop-wait, or abort
// Parameters:
// pv_error is the file error which caused the shutdown
// Registry value DTM_ERROR_SHUTDOWN_MODE is used as follows:
// 1 Sleep in 1 minute intervals forever (used for debugging).
// 2 Dump core and don't drive shutdown (debugging).
// other Shutdown abrupt (normal use).
// ---------------------------------------------------------------------------
void TM_Info::error_shutdown_abrupt(int32 pv_error) {
int lv_counter = 0;
int32 lv_error = 0;
char la_value[9];
int lv_error_shutdown_mode = 0;
TMTrace(1, ("TM_Info::error_shutdown_abrupt ENTRY\n"));
ms_getenv_int ("DTM_ERROR_SHUTDOWN_MODE", &lv_error_shutdown_mode);
if(!lv_error_shutdown_mode) {
lv_error = tm_reg_get(MS_Mon_ConfigType_Cluster,
(char *) CLUSTER_GROUP, (char *)DTM_ERROR_SHUTDOWN_MODE, la_value);
if (lv_error == 0)
{
lv_error_shutdown_mode = atoi(la_value);
}
}
if(lv_error_shutdown_mode == 1) {
while (1) {
if(lv_counter == 0) {
tm_log_event(DTM_ERROR_SHUTDOWN_DEBUG, SQ_LOG_CRIT, "DTM_ERROR_SHUTDOWN_DEBUG",
pv_error, /*error_code*/
-1, /*rmid*/
-1); /*dtmid*/ ;
}
TMTrace(1, ("TM_Info::recover_failed_tm Looping where we would normally issue a shutdown Abrupt error %d: %d iterations", pv_error, lv_counter));
SB_Thread::Sthr::sleep(60000); // 60 seconds
lv_counter++;
}
}
else if(lv_error_shutdown_mode == 2) {
TMTrace(1, ("TM_Info::error_shutdown_abrupt Proceeding with abort. Error: %d, Shutdown mode: %d\n", pv_error, lv_error_shutdown_mode));
tm_log_event(DTM_ERROR_SHUTDOWN_DEBUG, SQ_LOG_CRIT, "DTM_ERROR_SHUTDOWN_DEBUG",
pv_error, /*error_code*/
-1, /*rmid*/
-1); /*dtmid*/ ;
abort();
}
else {
TMTrace(1, ("TM_Info::error_shutdown_abrupt Proceeding with shutdown. Error: %d, Shutdown mode: %d.\n", pv_error, lv_error_shutdown_mode));
tm_log_event(DTM_ERROR_SHUTDOWN_ABRUPT, SQ_LOG_CRIT, "DTM_ERROR_SHUTDOWN_ABRUPT",
pv_error, /*error_code*/
-1, /*rmid*/
-1); /*dtmid*/ ;
msg_mon_shutdown(MS_Mon_ShutdownLevel_Abrupt);
}
TMTrace(1, ("TM_Info::error_shutdown_abrupt EXIT\n"));
} //TM_Info::error_shutdown_abrupt
// ----------------------------------------------------
// TM_Info::take_over_abort
// Purpose - this is called during a takeover to abort
// transactions from the down TM. This will
// get called for transactions in the ACTIVE
// and ABORTED state
// ----------------------------------------------------
int32 TM_Info::take_over_abort(TM_Txid_Internal *pv_transid, bool pv_do_phase1,
TM_TX_STATE pv_state)
{
// import this foreign tx into this TM
TMTrace(2, ("TM_Info::take_over_abort ENTRY\n"));
CTmTxBase *lp_tx = (CTmTxBase *) import_tx(pv_transid, pv_state);
if (lp_tx == NULL)
{
// we just imported, this shouldn't happen
// generate EMS message , DTM_TAKEOVER_TX_FAILED
// DTM_DEATH here when it is available
tm_log_event(DTM_TAKEOVER_TX_FAILED, SQ_LOG_CRIT, "DTM_TAKEOVER_TX_FAILED",
-1, /*error_code*/
-1, /*rmid*/
nid()); /*dtmid*/
TMTrace(1, ("TM_Info::take_over_abort : Invalid transaction to take over in DTM%d\n",
nid()));
abort();
}
// phase 1 and phase 2 (when the tx is in ACTIVE state)
lp_tx->remove_app_partic(-1, pv_transid->iv_node); // stored as -1 above
if (pv_do_phase1)
{
lp_tx->internal_abortTrans(true /*takeover*/);
}
// phase 2 only (when the TX is in ABORTED state)
else
{
lp_tx->redrive_rollback();
}
TMTrace(2, ("TM_Info::take_over_abort EXIT\n"));
return 0;
} //TM_Info::take_over_abort
// -------------------------------------------------------
// TM_Info::take_over_commit
// Purpose : called during a takeover for transaction in
// the COMMITTED state
// -------------------------------------------------------
int32 TM_Info::take_over_commit(TM_Txid_Internal *pv_transid, TM_TX_STATE pv_state)
{
TMTrace(2, ("TM_Info::take_over_commit ENTRY\n"));
CTmTxBase *lp_tx = (CTmTxBase *) import_tx(pv_transid, pv_state);
if (lp_tx == NULL)
{
// we just imported, this shouldn't be NULL
// EMS message here, DTM_TAKEOVER_TX_FAILED
tm_log_event(DTM_TAKEOVER_TX_FAILED, SQ_LOG_CRIT, "DTM_TAKEOVER_TX_FAILED",
-1, /*error_code*/
-1, /*rmid*/
nid()); /*dtmid*/
TMTrace(1, ("TM_Info::take_over_commit : Invalid transaction to take over in DTM%d\n",
nid()));
abort();
}
lp_tx->redrive_commit();
TMTrace(2, ("TM_Info::take_over_commit EXIT\n"));
return 0;
} //TM_Info::take_over_commit
// --------------------------------------------------------
// TM_Info::do_take_over
// Purpose - the lead TM will call this when a NodeDown
// message is received.
// --------------------------------------------------------
bool TM_Info::do_take_over(TM_MAP *pp_dataList)
{
TMTrace(2, ("TM_Info::do_take_over ENTRY\n"));
// for each entry for the foreign TM, process depending on state
Tm_Tx_Sync_Data *lp_data = (Tm_Tx_Sync_Data *) pp_dataList->get_first();
bool lv_return = true;
while (lp_data != NULL && lp_data->iv_is_valid == true && lv_return == true)
{
TM_Txid_Internal *lv_transid = (TM_Txid_Internal *)
&lp_data->iv_transid;
switch (lp_data->iv_state)
{
case TM_TX_STATE_ABORTED:
{
// redrive abort
take_over_abort(lv_transid, false, lp_data->iv_state);
break;
}
case TM_TX_STATE_ACTIVE:
case TM_TX_STATE_BEGINNING:
case TM_TX_STATE_ABORTING:
case TM_TX_STATE_ABORTING_PART2:
{
// drive abort
take_over_abort(lv_transid, true, lp_data->iv_state);
break;
}
case TM_TX_STATE_COMMITTED:
{
// redrive commit
take_over_commit(lv_transid, lp_data->iv_state);
break;
}
case TM_TX_STATE_FORGOTTEN:
{
break;
}
default:
{
// EMS message here, DTM_TAKEOVER_TX_FAILED
tm_log_event(DTM_TAKEOVER_TX_FAILED, SQ_LOG_CRIT, "DTM_TAKEOVER_TX_FAILED",
-1, /*error_code*/
-1, /*rmid*/
nid()); /*dtmid*/
TMTrace(1, ("TM_Info::do_take_over : Transaction in invalid state during takeover\n"));
lv_return = false;
break;
}
}; //switch on state
// Get next sync data element
lp_data = (Tm_Tx_Sync_Data *) pp_dataList->get_next();
} // while there is still sync data in the list
// Unlock the list
pp_dataList->get_end();
// Clear the list
pp_dataList->clear();
TMTrace(2, ("TM_Info::do_take_over EXIT.\n"));
return lv_return;
} //TM_Info::do_take_over
// -----------------------------------------------------------------------
// TM_Info::set_recovery_start
// Purpose : send out syncs for each TM we need to set RECOVERY_START for
// -----------------------------------------------------------------------
void TM_Info::set_recovery_start(int32 pv_nid)
{
int32 lv_error = FEOK;
int32 lv_node_count = 0;
int32 lv_lnode_count = 0;
MS_Mon_Node_Info_Entry_Type *lv_info;
TMTrace(2, ("TM_Info::set_recovery_start ENTRY, Node %d\n",pv_nid));
gv_tm_info.node_being_recovered (pv_nid, nid());
send_takeover_tm_sync(TM_RECOVERY_START, nid(), pv_nid);
lv_error = msg_mon_get_node_info2(&lv_node_count, MAX_NODES,
NULL, &lv_lnode_count, NULL, NULL, NULL);
if (lv_error != FEOK)
{
TMTrace(1, ("TM_Info::set_recovery_start Fatal Error: msg_mon_get_node_info "
"returned error %d.\n", lv_error));
tm_log_event(DTM_TM_GET_NODEINFO_FAILED, SQ_LOG_CRIT, "DTM_TM_GET_NODEINFO_FAILED",
lv_error, -1, pv_nid);
abort();
}
lv_info = new MS_Mon_Node_Info_Entry_Type[lv_node_count];
if (!lv_info)
{
TMTrace(1, ("TM_Info::set_recovery_start Fatal Error: out of memory, %d nodes.\n",
lv_node_count));
tm_log_event(DTM_TM_TAKEOVER_NOMEM, SQ_LOG_CRIT, "DTM_TM_TAKEOVER_NOMEM",
FENOBUFSPACE, -1, pv_nid, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, lv_node_count);
abort();
}
msg_mon_get_node_info2(&lv_node_count, lv_node_count, lv_info,
NULL, NULL, NULL, NULL);
// now go through any pending recoveries
// for (int32 lv_inx = 0; lv_inx < lv_lnode_count; lv_inx++)
// {
// Should not see spare nodes with new API
//if (lv_info[lv_inx].spare_node)
// continue;
// Skip down node
// if (lv_info[lv_inx].state != MS_Mon_State_Up)
// continue;
//lv_lnode_count++;
// if (gv_tm_info.node_being_recovered(lv_info[lv_inx].nid) == -1)
// continue;
// send sync here for start of sync
// send_takeover_tm_sync(TM_RECOVERY_START, nid(), lv_info[lv_inx].nid);
// }
delete [] lv_info;
TMTrace(2, ("TM_Info::set_recovery_start EXIT, %d nodes detected.\n", lv_lnode_count));
} //TM_Info::set_recovery_start
// ---------------------------------------------------------------------------
// TM_Info::set_recovery_end
// Purpose : sets recovery flag to RECOVERY_END for given TM
// If we are running in Non-sync mode, there's nothing to do here as we
// recovered the indoubt transactions associated with the down node at the
// beginning of restart_tm which is driven by nodeDown or DTM death notice.
// ---------------------------------------------------------------------------
void TM_Info::set_recovery_end(int32 pv_nid)
{
bool lv_success = true;
TMTrace(2, ("TM_Info::set_recovery_end ENTRY, Node %d\n",pv_nid));
gv_tm_info.node_being_recovered (pv_nid, -1);
TMTrace(3, ("TM_Info::set_recovery_end setting recovery_list_built to FALSE for Node %d\n",pv_nid));
gv_tm_info.recovery_list_built (pv_nid, false);
// NON SYNC MODE
if (mode() == TM_NONSYNC_MODE)
{
TMTrace(2, ("TM_Info::set_recovery_end (NONSYNC MODE)\n"));
// Nothing to do!
}
else
{
TM_MAP *lp_dataList = get_node_syncDataList(pv_nid);
lv_success = do_take_over(lp_dataList);
}
if (lv_success)
send_takeover_tm_sync(TM_RECOVERY_END, gv_tm_info.nid(), pv_nid);
else
{
tm_log_event(DTM_TM_TAKEOVER_FAILED, SQ_LOG_CRIT, "DTM_TM_TAKEOVER_FAILED",
-1, -1, pv_nid);
TMTrace(1, ("TM_Info::set_recovery_end error occcurred during recovery, Node %d\n",pv_nid));
}
TMTrace(2, ("TM_Info::set_recovery_end EXIT with success = %d\n", lv_success));
} //TM_Info::set_recovery_end
// -----------------------------------------------------------------------
// TM_Info::restart_tm_process_helper
// Purpose : send out syncs for each TM we need to set RECOVERY_START for
// send out sync for TM up
// -----------------------------------------------------------------------
void TM_Info::restart_tm_process_helper(int32 pv_nid)
{
int32 lv_error = FEOK;
int32 lv_node_count = 0;
int32 lv_lnode_count = 0;
MS_Mon_Node_Info_Entry_Type *lv_info;
TMTrace(2, ("TM_Info::restart_tm_process_helper ENTRY, Node %d\n",pv_nid));
lv_error = msg_mon_get_node_info2(&lv_node_count, MAX_NODES,
NULL, &lv_lnode_count, NULL, NULL, NULL);
if (lv_error != FEOK)
{
TMTrace(1, ("TM_Info::restart_tm_process_helper Fatal Error: msg_mon_get_node_info "
"returned error %d.\n", lv_error));
tm_log_event(DTM_TM_GET_NODEINFO_FAILED, SQ_LOG_CRIT, "DTM_TM_GET_NODEINFO_FAILED",
lv_error, -1, pv_nid);
abort();
}
lv_info = new MS_Mon_Node_Info_Entry_Type[lv_node_count];
if (!lv_info)
{
TMTrace(1, ("TM_Info::restart_tm_process_helper Fatal Error: out of memory, %d nodes.\n",
lv_node_count));
tm_log_event(DTM_TM_TAKEOVER_NOMEM, SQ_LOG_CRIT, "DTM_TM_TAKEOVER_NOMEM",
FENOBUFSPACE, -1, pv_nid, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, lv_node_count);
abort();
}
msg_mon_get_node_info2(&lv_node_count, lv_node_count, lv_info,
NULL, NULL, NULL, NULL);
// now go through any pending recoveries
for (int32 lv_inx = 0; lv_inx < lv_lnode_count; lv_inx++)
{
// Should not see spare nodes with new API
if (lv_info[lv_inx].state != MS_Mon_State_Up)
continue;
if (gv_tm_info.node_being_recovered(lv_info[lv_inx].nid) == -1)
continue;
// send sync here for start of sync
send_takeover_tm_sync(TM_RECOVERY_START, nid(), lv_info[lv_inx].nid);
}
delete [] lv_info;
// process has been restarted, set to up and recover rms
send_tm_process_restart_sync(nid(), pv_nid);
send_tm_state_information();
TMTrace(2, ("TM_Info::restart_tm_process_helper EXIT, %d nodes detected.\n", lv_lnode_count));
} //TM_Info::restart_tm_process_helper
// --------------------------------------------------------------------------------------
//
// send_tm_state_information
// Purpose : determine what state information we need to send for a node reintegration
//
// --------------------------------------------------------------------------------------
void TM_Info::send_tm_state_information()
{
TMTrace(2, ("TM_Info::send_tm_state_information ENTRY.\n"));
int lv_index = 0;
for (lv_index = 0; lv_index < MAX_NODES; lv_index++)
{
if ((iv_recovery[lv_index].iv_down_without_sync == true) ||
(iv_recovery[lv_index].iv_node_being_recovered != -1) ||
(iv_recovery[lv_index].iv_list_built == true))
{
send_state_resync (iv_nid, iv_recovery[lv_index].iv_down_without_sync,
iv_recovery[lv_index].iv_node_being_recovered,
iv_recovery[lv_index].iv_list_built, lv_index);
TMTrace(2, ("TM_Info::send_tm_state_information sent state for index %d.\n", lv_index));
}
}
TMTrace(2, ("TM_Info::send_tm_state_information EXIT.\n"));
}
// ----------------------------------------------------------------
// pack_sync_buffer
// Purpose : pack the internal contents of sync data into
// supplied Request buffer starting at index pv_startAt
// for MAX_TRANS_PER_SYNC.
// ----------------------------------------------------------------
void TM_Info::pack_sync_buffer (Tm_Broadcast_Req_Type *pp_data,
int32 pv_node, int32 pv_startAt)
{
// Walk through the sync data for node pv_node moving it into the broadcast
// buffer for sending.
int32 lv_inx = pv_startAt;
Tm_Tx_Sync_Data *lp_syncData = (Tm_Tx_Sync_Data *) iv_syncDataList[pv_node].get_first();
while (lp_syncData != NULL && lv_inx < transactionPool()->get_maxPoolSize() && lv_inx < MAX_TRANS_PER_SYNC)
{
memcpy (&pp_data->iv_data[lv_inx], lp_syncData, sizeof (Tm_Tx_Sync_Data));
lv_inx++;
lp_syncData = (Tm_Tx_Sync_Data *) iv_syncDataList[pv_node].get_next();
}
iv_syncDataList[pv_node].get_end();
// Blank out those entries not in use
while (lv_inx < MAX_TRANS_PER_SYNC)
{
pp_data->iv_data[lv_inx++].iv_is_valid = false;
}
pp_data->iv_sys_recov_data.iv_sys_recov_state = iv_sys_recov_state;
pp_data->iv_sys_recov_data.iv_sys_recov_lead_tm_node = iv_sys_recov_lead_tm_nid;
}
// -------------------------------------------------------------
// unpack_sync_buffer
// Purpose : unpack entire sync data
// ------------------------------------------------------------
void TM_Info::unpack_sync_buffer (Tm_Broadcast_Req_Type *pp_data, int32 pv_node)
{
Tm_Tx_Sync_Data *lp_syncData;
TM_Txid_Internal *lp_transid;
int32 lv_inx = 0;
lock();
// If this is a new sequence number then clear out the sync data
// slot for ALL nodes before we start to unpack.
if (pp_data->iv_BroadcastSeqNum > iv_receivingBroadcastSeqNum)
{
iv_receivingBroadcastSeqNum = pp_data->iv_BroadcastSeqNum;
for (int32 lv_node = 0; lv_node < MAX_NODES; lv_node++)
iv_syncDataList[lv_node].clear();
}
// Move all the sync data for the node into the appropriate slot
while (lv_inx < MAX_TRANS_PER_SYNC && pp_data->iv_data[lv_inx].iv_is_valid == true)
{
lp_syncData = new Tm_Tx_Sync_Data;
memcpy(lp_syncData, &pp_data->iv_data[lv_inx], sizeof(Tm_Tx_Sync_Data));
lp_transid = (TM_Txid_Internal *) &(lp_syncData->iv_transid);
iv_syncDataList[pv_node].put(lp_transid->iv_seq_num, lp_syncData);
lv_inx++;
}
iv_sys_recov_state = pp_data->iv_sys_recov_data.iv_sys_recov_state;
iv_sys_recov_lead_tm_nid = pp_data->iv_sys_recov_data.iv_sys_recov_lead_tm_node;
unlock();
}
// -------------------------------------------------------------
// write_trans_state
// Purpose : write a transaction state record
// -------------------------------------------------------------
void TM_Info::write_trans_state(TM_Txid_Internal *pp_transid, TM_TX_STATE pv_state,
int32 pv_abort_flags, bool pv_hurry)
{
int32 lv_write_cp = 0;
if (pp_transid == NULL)
{
tm_log_event(DTM_TM_INFO_INVALID_TRANSID, SQ_LOG_CRIT,"DTM_TM_INFO_INVALID_TRANSID");
TMTrace (1, ("TM_Info::write_trans_state - Attempt to write a transaction "
" state record with a NULL transid\n"));
abort ();
}
TMTrace (2, ("TM_Info::write_trans_state: ENTRY txn (%d,%d), state=%d, abortFlags=%d\n",
pp_transid->iv_node, pp_transid->iv_seq_num, pv_state, pv_abort_flags));
// TT - we do not want to write a transaction state record
// if this transaction is read only
if ((TTflagstoint64(pp_transid->iv_tt_flags) & TM_TT_READ_ONLY) == TM_TT_READ_ONLY)
return;
if (pv_state != TM_TX_STATE_NOTX)
{
if (iv_audit_mode == TM_NORMAL_AUDIT_MODE)
{
lv_write_cp = ip_tmAuditObj->write_trans_state ((TM_Transid_Type *) pp_transid,
pp_transid->iv_node, pv_state, pv_abort_flags);
}
else
{
char la_buf[REC_SIZE];
int64 lv_vsn = ip_tmAuditObj->prepare_trans_state_rec(la_buf, REC_SIZE, (TM_Transid_Type *) pp_transid,
pp_transid->iv_node, pv_state, pv_abort_flags);
ip_tmAuditObj->push_audit_rec(REC_SIZE, la_buf, lv_vsn, pv_hurry);
lv_write_cp = iv_initiate_cp; // set by audit thread
}
check_for_rollover(lv_write_cp);
}
TMTrace (2, ("TM_Info::write_trans_state: EXIT\n"));
} //TM_Info::write_trans_state
// ---------------------------------------------------------------
// TM_Info::check_for_rollover
// Purpose : This function handles an audit rollover or threshold.
// We note the rollover but there's not much to do here as CPs are
// issued only by the Lead TM and on 2 minute intervals.
// ---------------------------------------------------------------
void TM_Info::check_for_rollover(int32 pv_notification)
{
// if we need to write a cp - we don't want to overwrite what is there with false
switch (pv_notification)
{
// rolled over
case 1:
{
TMTrace (1, ("TM_Info::check_for_rollover: Received rollover request\n"));
iv_cps_in_curr_file = 0;
iv_write_cp = true;
iv_lastAuditRolloverTime = Ctimeval::now();
break;
}
// threshold
case 2:
{
if (iv_cps_in_curr_file < 2)
{
TMTrace (1, ("TM_Info::check_for_rollover: Wrote cp due to threshold\n"));
iv_write_cp = true;
iv_lastAuditThresholdTime = Ctimeval::now();
}
break;
}
// nothing to do
case 0:
default:
{
break;
}
}
if ((!iv_write_cp) && (pv_notification) && (iv_cps_in_curr_file < 2))
{
iv_cps_in_curr_file = 0; // rolled over
iv_write_cp = pv_notification;
}
} //TM_Info::check_for_rollover
// ---------------------------------------------------------------
// write_all_trans_state
// Purpose : write all transaction state records that haven't been
// written in this control point
// Locks the CTmTxBase object to ensure no one changes the state
// while we're writing the trans state record to audit!
// ----------------------------------------------------------------
void TM_Info::write_all_trans_state()
{
TMTrace (2, ("TM_Info::write_all_trans_state: ENTRY, " PFLL " active txns.\n",
transactionPool()->get_inUseList()->size()));
CTmTxBase *lp_tx = transactionPool()->getFirst_inUseList();
while (lp_tx != NULL)
{
if (lp_tx->wrote_trans_state())
lp_tx->wrote_trans_state(false);
else
{
write_trans_state(lp_tx->transid(), lp_tx->tx_state(),
lp_tx->abort_flags(), false /*no hurry*/);
lp_tx->wrote_trans_state(false);
}
lp_tx = transactionPool()->getNext_inUseList();
}
transactionPool()->getEnd_inUseList();
TMTrace (2, ("TM_Info::write_all_trans_state: EXIT\n"));
}
// --------------------------------------------------------------
// TM_Info::write_control_point
// Purpose : [lead tm only] Send message out to all TMs to
// write their state records. Once they all respond,
// write the control point.
// --------------------------------------------------------------
void TM_Info::write_control_point(bool pv_cp_only, bool pv_startup)
{
short la_results[6];
Tm_Control_Point_Req_Type *lp_req = NULL;
Tm_Control_Point_Rsp_Type *lp_rsp = NULL;
int32 lv_error = 0;
int32 lv_index = 0;
int32 lv_num_sent = 0;
pid_msgid_struct lv_pid_msgid[MAX_NODES];
int32 lv_reqLen = 0;
long lv_ret;
long lv_ret2;
int32 lv_rspLen = 0;
int lv_rsp_rcvd = 0;
BMS_SRE_LDONE lv_sre;
TMTrace (2, ("TM_Info::write_control_point: ENTRY (Lead TM).\n"));
iv_write_cp = false;
iv_cps_in_curr_file++; // gets reset on rollover
//initialize lv_pid_msgid
for (int32 i = 0; i <= tms_highest_index_used(); i++)
{
lv_pid_msgid[i].iv_tag = 0;
lv_pid_msgid[i].iv_msgid = 0;
lv_pid_msgid[i].iv_nid = 0;
}
if ((!pv_cp_only) || (pv_startup))
{
TMTrace (3, ("TM_Info::write_control_point Sending CP request to other TMs.\n"));
lp_req = new Tm_Control_Point_Req_Type [tms_highest_index_used() + 1];
lp_rsp = new Tm_Control_Point_Rsp_Type [tms_highest_index_used() + 1];
for (int lv_idx = 0; lv_idx <= tms_highest_index_used(); lv_idx++)
{
if ((lv_idx == iv_nid) ||
(iv_open_tms[lv_idx].iv_in_use == false))
{
lv_pid_msgid[lv_idx].iv_tag = -1;
}
else
{
lv_pid_msgid[lv_idx].iv_tag = lv_idx + 1; // non zero
lp_req[lv_idx].iv_msg_hdr.rr_type.request_type = TM_MSG_TYPE_CP;
lp_req[lv_idx].iv_msg_hdr.version.request_version = TM_SQ_MSG_VERSION_CURRENT;
lv_pid_msgid[lv_idx].iv_nid = lv_idx;
lp_req[lv_idx].iv_sending_tm_nid = lead_tm_nid();
lp_req[lv_idx].iv_startup = pv_startup;
lv_reqLen = sizeof (Tm_Control_Point_Req_Type);
lv_rspLen = sizeof (Tm_Control_Point_Rsp_Type);
lv_error = link(&(iv_open_tms[lv_idx].iv_phandle), // phandle,
&lv_pid_msgid[lv_idx].iv_msgid, // msgid
(char *) &lp_req[lv_idx], // reqdata
lv_reqLen, // reqdatasize
(char *) &lp_rsp[lv_idx], // replydata
lv_rspLen, // replydatamax
lv_pid_msgid[lv_idx].iv_tag, // linkertag
TM_TM_LINK_PRIORITY, // pri
BMSG_LINK_LDONEQ, // linkopts
TM_LINKRETRY_RETRIES); // retry
if (lv_error != 0)
{
tm_log_event (DTM_TM_INFO_LINK_MSG_FAIL, SQ_LOG_CRIT,"DTM_TM_INFO_INVALID_TRANSID", lv_error);
TMTrace (1, ("TM_Info::write_control_point-BMSG_LINK_ failed with error %d\n",lv_error));
abort ();
}
else
lv_num_sent++;
}
}
// LDONE LOOP
while (lv_rsp_rcvd < lv_num_sent)
{
// wait for an LDONE wakeup
XWAIT(LDONE, -1);
do {
// we've reached our message reply count, break
if (lv_rsp_rcvd >= lv_num_sent)
{
TMTrace (1, ("TM_Info::write_control_point-reached our message reply count, break 2.\n"));
break;
}
lv_ret = BMSG_LISTEN_((short *)&lv_sre,
BLISTEN_ALLOW_LDONEM, 0);
if (lv_ret == BSRETYPE_LDONE)
{
lv_index = -1;
for (int32 lv_idx2 = 0; lv_idx2 <=tms_highest_index_used(); lv_idx2++)
{
if (lv_pid_msgid[lv_idx2].iv_tag == lv_sre.sre_linkTag)
{
lv_index = lv_idx2;
TMTrace (1, ("TM_Info::write_control_point-found a match %d.\n", lv_idx2));
break;
}
}
if (lv_index == -1)
{
tm_log_event (DTM_TM_INFO_NO_LTAG, SQ_LOG_CRIT,"DTM_TM_INFO_NO_LTAG");
TMTrace (1, ("TM_Info::write_control_point - Link Tag %d not found in lv_pid_msgid.\n",
(int)lv_sre.sre_linkTag));
abort ();
}
lv_ret2 = BMSG_BREAK_(lv_pid_msgid[lv_index].iv_msgid,
la_results,
&(iv_open_tms[lv_pid_msgid[lv_index].iv_nid].iv_phandle));
// We just ignore errors and keep going
if (lv_ret2 != 0)
{
tm_log_event (DTM_CP_TO_TM_FAILED, SQ_LOG_WARNING, "DTM_CP_TO_TM_FAILED",
lv_ret2,-1,nid(),-1,lv_pid_msgid[lv_index].iv_msgid,
-1,-1,-1,-1,-1,-1,-1,-1,iv_open_tms[lv_index].iv_in_use,
-1,-1,NULL,lv_index);
TMTrace (1, ("TM_Info::write_control_point ERROR BMSG_BREAK_ returned %ld, node %d, msgid %d.\n",
lv_ret2, lv_index, lv_pid_msgid[lv_index].iv_msgid));
}
if (!lv_ret2 && lp_rsp[lv_index].iv_error != 0)
{
tm_log_event(DTM_TM_INFO_CNTRL_PTRSP_FAIL, SQ_LOG_CRIT,
"DTM_TM_INFO_CNTRL_PTRSP_FAIL",
lp_rsp[lv_index].iv_error,-1,nid(),
-1,lv_pid_msgid[lv_index].iv_msgid,
-1,-1,-1,-1,-1,-1,-1,-1,lv_index);
// for now, lets make sure everything succeeded
TMTrace (1, ("TM_Info::write_control_point - TM control point respond error %d from node %d "
", msgid %d, index %d.\n",
lp_rsp[lv_index].iv_error, nid(), lv_pid_msgid[lv_index].iv_msgid, lv_index));
}
lv_rsp_rcvd++;
}
} while (lv_ret == BSRETYPE_LDONE);
} // while (lv_rsp_rcvd < lv_num_sent)
/* Removed this check as it causes problems in 2 node configurations when a TM fails.
// Check we received a response from other TMs.
// If we didn't receive any then take the Lead TM down as it has become
// isolated - the others will appoint a new leader. We wait for one full
// control point cycle to ensure it's not just a momentary failure which
// we're in the process of recovering from.
// If we received some but not all the responses then we can (hopefully)
// assume we're still the lead TM. We expect to see node down messages,
// so don't do anything here.
if (tms_highest_index_used() > 0 && lv_rsp_rcvd == 0)
{
tm_log_event(DTM_TM_INFO_CNTRL_PT_ISOLATED, SQ_LOG_CRIT,
"DTM_TM_INFO_CNTRL_PT_ISOLATED");
// Abort if we're still isolated after 2 control points.
if (iv_leadTM_isolated)
{
TMTrace (1, ("TM_Info::write_control_point - TM control point detected "
"Lead TM isolation for second consecutive control point - assuming "
"Lead TM is really isolated and aborting.\n"));
abort();
}
else
{
TMTrace (1, ("TM_Info::write_control_point - TM control point detected "
"Lead TM isolation. Waiting for next control point to see "
"if I'm still isolated.\n"));
iv_leadTM_isolated = true;
}
}
else
iv_leadTM_isolated = false;
*/
iv_leadTM_isolated = false;
delete []lp_rsp;
delete []lp_req;
}
// everything is ok, lets write the control point
int32 lv_notification = ip_tmAuditObj->write_control_point(iv_nid);
check_for_rollover(lv_notification);
iv_write_cp = false;
TMTrace (2, ("TM_Info::write_control_point: EXIT\n"));
}
// -----------------------------------------------------------------
// addControlPointEvent
// Purpose: Adds Control Point event to timer thread to be
// executed immediately
// -----------------------------------------------------------------
void TM_Info::addControlPointEvent()
{
CTmTxMessage *lp_msg;
TMTrace (3, ("TM_Info::addControlPointEvent : ENTRY\n"));
lp_msg = new CTmTxMessage(TM_MSG_TXINTERNAL_CONTROLPOINT);
addTimerEvent(lp_msg, 0);
delete lp_msg;
TMTrace (3, ("TM_Info::addControlPointEvent : EXIT\n"));
}
// -----------------------------------------------------------------
// write_rollover_control_point
// Purpose: Writes a control point due to audit file rollover
// If called by lead TM then write CP directly, otherwise
// Send message to Lead to write control point
// -----------------------------------------------------------------
int32 TM_Info::write_rollover_control_point()
{
Tm_RolloverCP_Req_Type *lp_req = NULL;
Tm_RolloverCP_Rsp_Type *lp_rsp = NULL;
int16 la_results[6];
int32 lv_error = 0;
int32 lv_msgid = 1;
int32 lv_oid=0;
TPT_DECL (lv_phandle);
char la_buffer[8];
int64 lv_audit_pos = ip_tmAuditObj->audit_position();
TMTrace (2, ("TM_Info::write_rollover_control_point: ENTRY\n"));
if(lead_tm()) {
if((lv_audit_pos > iv_audit_seqno) || ((lv_audit_pos == 1) && (iv_audit_seqno != 1))) {
iv_audit_seqno = lv_audit_pos;
addControlPointEvent();
}
}
else {
sprintf(la_buffer, "$tm%d", lead_tm_nid());
lv_error = msg_mon_open_process(la_buffer,
&lv_phandle,
&lv_oid);
if (lv_error)
{
tm_log_event (DTM_ROLLOVER_OPENPROC_FAILED, SQ_LOG_CRIT,"DTM_ROLLOVER_OPENPROC_FAILED",
lv_error, /*error_code*/
-1, /*rmid*/
-1, /*dtmid*/
-1, /*seq_num*/
-1, /*msgid*/
-1, /*xa_error*/
-1, /*pool_size*/
-1, /*pool_elems*/
-1, /*msg_retries*/
-1, /*pool_high*/
-1, /*pool_low*/
-1, /*pool_max*/
-1, /*tx_state*/
-1, /*data */
-1, /*data1*/
-1, /*data2*/
NULL, /*string1*/
lead_tm_nid());
TMTrace (1, ("TM_Info::write_rollover_control_point: Error sending message err: %d \n", lv_error));
return lv_error;
}
lp_req = new Tm_RolloverCP_Req_Type();
lp_rsp = new Tm_RolloverCP_Rsp_Type();
lp_req->iv_msg_hdr.rr_type.request_type = TM_MSG_TYPE_ROLLOVER_CP;
lp_req->iv_msg_hdr.version.request_version = TM_SQ_MSG_VERSION_CURRENT;
lp_req->iv_nid = iv_nid;
lp_req->iv_sequence_no = ip_tmAuditObj->audit_position();
lv_error = link(&lv_phandle, //phandle
&lv_msgid, //msgId
(char *)lp_req, //reqdata
sizeof(Tm_Req_Msg_Type), //reqdatsize
(char *)lp_rsp, //replydata
sizeof(Tm_Rsp_Msg_Type), //replydatamax
0, //linkertag
TM_TM_LINK_PRIORITY, //pri
0, //linkopts
TM_LINKRETRY_RETRIES);
if(!lv_error) {
lv_error = BMSG_BREAK_(lv_msgid, la_results,
&lv_phandle);
}
else {
tm_log_event (DTM_ROLLOVER_LINK_TO_DTM_FAILED, SQ_LOG_CRIT,"DTM_ROLLOVER_LINK_TO_DTM_FAILED",
lv_error, /*error_code*/
-1, /*rmid*/
-1, /*dtmid*/
-1, /*seq_num*/
-1, /*msgid*/
-1, /*xa_error*/
-1, /*pool_size*/
-1, /*pool_elems*/
-1, /*msg_retries*/
-1, /*pool_high*/
-1, /*pool_low*/
-1, /*pool_max*/
-1, /*tx_state*/
-1, /*data */
-1, /*data1*/
-1, /*data2*/
NULL, /*string1*/
iv_nid);
TMTrace (1, ("TM_Info::write_rollover_control_point: Error sending message err: %d \n", lv_error));
}
delete lp_req;
delete lp_rsp;
}
TMTrace (2, ("TM_Info::write_rollover_control_point, return code %d : EXIT\n", lv_error));
return lv_error;
}
// -----------------------------------------------------------------
// write_shutdown
// Purpose: Writes a shutdown record to indicate clean shutdown.
// The write is always forced.
// -----------------------------------------------------------------
void TM_Info::write_shutdown()
{
ip_tmAuditObj->write_shutdown(iv_nid, 0 /* for state */);
}
// -----------------------------------------------------------------
// start_backwards_scan
// Purpose : starts a backward scan of TLOG to build outstanding tx
// list for crash recovery
// -----------------------------------------------------------------
void TM_Info::start_backwards_scan()
{
ip_tmAuditObj->start_backwards_scan();
}
// -----------------------------------------------------------------
// end_backwards_scan
// Purpose : ends a backward scan of TLOG after outstanding tx
// list for crash recovery
// -----------------------------------------------------------------
void TM_Info::end_backwards_scan()
{
ip_tmAuditObj->end_backwards_scan();
}
// -----------------------------------------------------------------
// read_audit_rec
// Purpose : reads one audit record at the cursor from TLog.
// -----------------------------------------------------------------
Addr TM_Info::read_audit_rec()
{
return ip_tmAuditObj->read_audit_rec();
}
// -----------------------------------------------------------------
// release_audit_rec
// Purpose : releases the previously read record
// list for crash recovery
// -----------------------------------------------------------------
void TM_Info::release_audit_rec()
{
ip_tmAuditObj->release_audit_rec();
}
// -----------------------------------------------------------------
// add_sync_data
// Purpose : insert a sync structure into array, the tx was begun on
// another node by another TM
// -----------------------------------------------------------------
void TM_Info::add_sync_data (int32 pv_nid, Tm_Tx_Sync_Data *pp_data)
{
if (!pp_data)
return;
TM_Txid_Internal *lp_transid = (TM_Txid_Internal*)&(pp_data->iv_transid);
Tm_Tx_Sync_Data *lp_syncData = new Tm_Tx_Sync_Data;
memcpy(lp_syncData, pp_data, sizeof(Tm_Tx_Sync_Data));
lock();
iv_syncDataList[pv_nid].put(lp_transid->iv_seq_num, lp_syncData);
unlock();
lp_syncData->iv_is_valid = true;
}
// ------------------------------------------------------------------
// get_node_syncDataList
// Purpose : Return the sync data list for a given node. This is
// used at takeover time to walk through and deal with any
// outstanding txs
// ------------------------------------------------------------------
TM_MAP * TM_Info::get_node_syncDataList (int32 pv_nid)
{
return &iv_syncDataList[pv_nid];
}
// -------------------------------------------------------------
// get_sync_data
// Purpose : return the given sync data associated with this
// transaction.
// -------------------------------------------------------------
Tm_Tx_Sync_Data *TM_Info::get_sync_data (Tm_Tx_Sync_Data *pp_data)
{
if (!pp_data)
return NULL;
TM_Txid_Internal *lp_tx = (TM_Txid_Internal*) &pp_data->iv_transid;
return (Tm_Tx_Sync_Data *) iv_syncDataList[lp_tx->iv_node].get(lp_tx->iv_seq_num);
}
// ------------------------------------------------------------
// remove_sync_data
// Purpose : remove sync data from the sync data list.
// ------------------------------------------------------------
void TM_Info::remove_sync_data (Tm_Tx_Sync_Data *pp_data)
{
if (!pp_data)
return;
TM_Txid_Internal *lp_tx = (TM_Txid_Internal *) &pp_data->iv_transid;
lock();
iv_syncDataList[lp_tx->iv_node].remove(lp_tx->iv_seq_num);
unlock();
}
// ------------------------------------------------------------
// broadcast_sync_packet
// Send a single broadcast packet to a TM. This is used to
// break the Broadcast Sync down into multiple packets when
// the number of transactions exceeds the maximum that will
// fit in a broadcast buffer (Tm_Broadcast_Request_Type).
// ------------------------------------------------------------
int32 TM_Info::broadcast_sync_packet(TPT_PTR(pp_TMphandle),
int32 pv_node,
Tm_Broadcast_Req_Type *pp_req,
Tm_Broadcast_Rsp_Type *pp_rsp,
int32 pv_start)
{
char la_buf[DTM_STRING_BUF_SIZE];
int16 la_results[6];
int32 lv_error = FEOK;
int32 lv_msgid;
int32 lv_size = 0;
pack_sync_buffer (pp_req, pv_node, pv_start);
if (pv_node == (MAX_NODES-1)) // last one
pp_req->iv_state_up = true;
else
pp_req->iv_state_up = false;
pp_req->iv_BroadcastSeqNum = iv_sendingBroadcastSeqNum;
pp_req->iv_DataStartAddr = pv_start;
if (iv_trace_level >= 2)
{
trace_printf("TM_Info::broadcast_sync_packet ENTRY: Broadcast SeqNum %d, node %d, start at %d\n",
iv_sendingBroadcastSeqNum, pv_node, pv_start);
lv_size = sizeof (Tm_Broadcast_Req_Type);
trace_printf("DATA : %d\n", lv_size);
}
lv_error = link(pp_TMphandle, // phandle
&lv_msgid, // msgid
(char *) pp_req, // reqdata
sizeof (Tm_Broadcast_Req_Type), // reqdatasize
(char *) pp_rsp, // replydata
sizeof (Tm_Broadcast_Rsp_Type), // replydatamax
0, // linkertag
TM_BROADCAST_LINK_PRIORITY, // pri
0, // linkopts
TM_LINKRETRY_RETRIES);
if (!lv_error)
lv_error = BMSG_BREAK_(lv_msgid, la_results, pp_TMphandle);
if (lv_error)
{
// EMS message DTM_BCAST_MSG_TO_DTM_FAILED
sprintf(la_buf, "Broadcast to DTM%d failed with error %d, broadcast seqnum %d, Starting offset %d.\n",
pv_node, lv_error, iv_sendingBroadcastSeqNum, pv_start);
tm_log_event (DTM_BCAST_MSG_TO_DTM_FAILED, SQ_LOG_CRIT,"DTM_BCAST_MSG_TO_DTM_FAILED",
lv_error,-1,-1,iv_sendingBroadcastSeqNum,
lv_msgid, /*msgid*/
-1, /*xa_error*/
-1, /*pool_size*/
-1, /*pool_elems*/
-1, /*msg_retries*/
-1, /*pool_high*/
-1, /*pool_low*/
-1, /*pool_max*/
-1, /*tx_state*/
-1, /*data */
-1, /*data1*/
-1, /*data2*/
NULL, /*string1*/
pv_node, /*node*/
-1, /*msgid2*/
pv_start /*offset*/);
TMTrace (1, ("TM_Info::broadcast_sync_packet - %s", la_buf));
abort ();
// TODO: We need to handle this error here and not abort.
return TM_ERR;
}
TMTrace (2, ("TM_Info::broadcast_sync_packet: EXIT\n"));
return TM_NO_ERR;
}
// ------------------------------------------------------------
// broadcast_sync_data
// Purpose : Send a copy of the entire sync data table for
// each node to the respective TM in that node.
// ------------------------------------------------------------
int32 TM_Info::broadcast_sync_data (int32 pv_nid)
{
char la_buffer[8];
Tm_Broadcast_Req_Type *lp_req = new Tm_Broadcast_Req_Type();
Tm_Broadcast_Rsp_Type *lp_rsp = new Tm_Broadcast_Rsp_Type();
int32 lv_error = 0;
int32 lv_TMerror = TM_NO_ERR;
int32 lv_oid = 0;
TPT_DECL (lv_phandle);
lp_req->iv_msg_hdr.dialect_type = DIALECT_TM_SQ;
lp_req->iv_msg_hdr.version.request_version =
TM_SQ_MSG_VERSION_CURRENT;
lp_req->iv_msg_hdr.miv_err.minimum_interpretation_version =
TM_SQ_MSG_VERSION_CURRENT;
lp_req->iv_msg_hdr.rr_type.request_type = TM_MSG_TYPE_BROADCAST;
lp_rsp->iv_msg_hdr.rr_type.reply_type = TM_MSG_TYPE_BROADCAST_REPLY;
sprintf(la_buffer, "$tm%d", pv_nid);
lv_error = msg_mon_open_process(la_buffer,
&lv_phandle,
&lv_oid);
if (lv_error)
{
tm_log_event (DTM_CANNOT_OPEN_DTM, SQ_LOG_CRIT,"DTM_CANNOT_OPEN_DTM",
lv_error, /*error_code*/
-1, /*rmid*/
-1, /*dtmid*/
-1, /*seq_num*/
-1, /*msgid*/
-1, /*xa_error*/
-1, /*pool_size*/
-1, /*pool_elems*/
-1, /*msg_retries*/
-1, /*pool_high*/
-1, /*pool_low*/
-1, /*pool_max*/
-1, /*tx_state*/
-1, /*data */
-1, /*data1*/
-1, /*data2*/
NULL, /*string1*/
pv_nid);
TMTrace (1, ("broadcast_sync_data : cannot open $TM%d\n", pv_nid));
abort ();
// TODO: We need to handle this error here and not abort.
return TM_ERR;
}
inc_broadcastSeqNum();
for (int lv_idx = 0; lv_idx < MAX_NODES; lv_idx++)
{
for (int lv_startAt = 0; lv_startAt < transactionPool()->get_maxPoolSize(); lv_startAt += MAX_TRANS_PER_SYNC)
{
lv_TMerror = broadcast_sync_packet(
&lv_phandle,
lv_idx,
lp_req, lp_rsp, lv_startAt);
if (lv_TMerror)
break;
}
} // for each node
delete lp_req;
delete lp_rsp;
return lv_TMerror;
}
// ------------------------------------------------------------
// new_tx
// Purpose : Instantiated a new transaction object
// pv_creator_nid INPUT: nid of transaction beginner. This need not
// match the TM nid.
// pv_creator_pid INPUT: pid of transaction beginner. For imported
// (recovered) transactions this should be the pid of the tm.
// pv_node INPUT, optional, default=-1: Node id for this transaciton
// This should always be the node of the beginning TM.
// pv_seq_num INPUT, optional, default=-1: Sequence number of this
// transaction. Must be -1 for new transactions (begintxn).
// Returns NULL if we reach the maximum transactions configured
// Otherwise it returns the txn object pointer.
// ------------------------------------------------------------
void *TM_Info::new_tx(int32 pv_creator_nid, int32 pv_creator_pid, int32 pv_node, int32 pv_seqnum,
void * (*constructPoolElement)(int64))
{
CTmTxKey lv_txKey(0,0);
CTmTxBase *lp_tx = NULL;
bool lv_reused = false;
TMTrace (2, ("TM_Info::new_tx : ENTRY. Txn ID (%d,%d).\n",
pv_node, pv_seqnum));
lock();
if (pv_node != -1 && pv_seqnum != -1)
lv_txKey.set(pv_node, pv_seqnum);
else
// Allocate a new sequence number
lv_txKey.set(gv_tm_info.nid(), tm_new_seqNum());
TMTrace (3, ("TM_Info::new_tx : Calling CTmPool<CTmTxBase>::newElement "
"transid (%d,%d).\n", lv_txKey.node(), lv_txKey.seqnum()));
lp_tx = ip_transactionPool->newElement(lv_txKey.id(), &lv_reused,
false, /*force lv_txKey.id() as index*/
constructPoolElement);
TMTrace (3, ("TM_Info::new_tx : CTmPool<CTmTxBase>::newElement returned "
"reused %d, CTmTxBase object %p.\n", lv_reused, (void *) lp_tx));
if (lp_tx)
{
lp_tx->initialize(lv_txKey.node(), 0, iv_trace_level, lv_txKey.seqnum(),
pv_creator_nid, pv_creator_pid, iv_rm_wait_time);
// Add the tx object to the tx lists.
add_tx(lp_tx);
}
unlock();
TMTrace (2, ("TM_Info::new_tx : EXIT, ID (%d,%d) creator (%d,%d), CTmTxBase object %p.\n",
lv_txKey.node(), lv_txKey.seqnum(), pv_creator_nid, pv_creator_pid, (void *) lp_tx));
return (void *) lp_tx;
} //new_tx
// ------------------------------------------------------------------
// import_tx
// Purpose - import a transaction into this TM - used during takeover
// and recovery.
// Parameters:
// pv_state: Input, optional. Default TM_TX_STATE_NOTX
// pv_txnType: Input, optional. Default TM_TX_TYPE_DTM
// -------------------------------------------------------------------
void * TM_Info::import_tx (TM_Txid_Internal *pv_transid, TM_TX_STATE pv_state, TM_TX_TYPE pv_txnType)
{
CTmTxBase *lp_tx;
TMTrace(2, ("TM_Info::import_tx : ID (%d,%d) ENTRY\n", pv_transid->iv_node, pv_transid->iv_seq_num));
// we need to import this transaction into our system in order
// to properly drive commitment.
// Right now recovery for XARM is not fully implemented, so this will always be a DTM transaction (the default).
switch (pv_txnType)
{
case TM_TX_TYPE_DTM:
lp_tx = (TM_TX_Info *) new_tx(nid(), pid(), pv_transid->iv_node, pv_transid->iv_seq_num,
(void* (*)(long int)) &TM_TX_Info::constructPoolElement);
break;
case TM_TX_TYPE_XARM:
lp_tx = (CTmXaTxn *) new_tx(nid(), pid(), pv_transid->iv_node, pv_transid->iv_seq_num,
(void* (*)(long int)) &CTmXaTxn::constructPoolElement);
break;
default:
TMTrace (2, ("TM_Info::import_txt : ERROR Instantiating new Txn ID (%d,%d) of bad type %d.\n",
pv_transid->iv_node, pv_transid->iv_seq_num, pv_txnType));
tm_log_event(DTM_RECOV_FAIL_BAD_TXN_TYPE, SQ_LOG_CRIT, "DTM_RECOV_FAIL_BAD_TXN_TYPE",
-1,-1,pv_transid->iv_node,pv_transid->iv_seq_num,-1,-1,-1,-1,-1,-1,-1,-1,-1,pv_txnType);
abort();
}
// An error indicates when we are handling our maximum number of concurrent
// transactions.
if (lp_tx == NULL)
{
// Removing this event for now as we keep hitting it and it's just announcing that
// we've reached the maximum transactions allowed per node.
//tm_log_event(DTM_TX_MAX_EXCEEDED, SQ_LOG_WARNING, "DTM_TX_MAX_EXCEEDED",
// FETOOMANYTRANSBEGINS, /*error_code*/
// -1, /*rmid*/
// nid(), /*dtmid*/
// pv_transid->iv_seqnum, /*seq_num*/
// -1, /*msgid*/
// -1, /*xa_error*/
// transactionPool()->get_maxPoolSize(), /*pool_size*/
// transactionPool()->totalElements() /*pool_elems*/);
TMTrace(1, ("TM_Info::import_tx, FETOOMANYTRANSBEGINS\n"));
}
else
{
lp_tx->tx_state(pv_state);
// Start statistics counters TODO
//lp_tx->stats()->txnTotal()->start();
//lp_tx->stats()->txnBegin()->start();
// Don't start thread or queue any work against the transaction yet.
}
TMTrace(2, ("TM_Info::import_tx EXIT ID (%d,%d).\n", pv_transid->iv_node, pv_transid->iv_seq_num));
return (void *) lp_tx;
} //import_tx
// ------------------------------------------------------------
// add_tx
// Purpose : add a transaction to the TX list
// Semaphore locked by caller.
// ------------------------------------------------------------
int32 TM_Info::add_tx(CTmTxBase *pp_tx)
{
TMTrace (2, ("TM_Info::add_tx : ENTRY.\n"));
if (!pp_tx)
{
TMTrace (1, ("TM_Info::add_tx : NULL pp_tx object pointer passed in.\n"));
return TM_ERR;
}
pp_tx->in_use(true);
num_active_txs_inc();
gv_system_tx_count++;
pidKey lv_pidKey;
lv_pidKey.k.iv_pid = pp_tx->ender_pid();
lv_pidKey.k.iv_seqnum = pp_tx->seqnum();
CTmTxBase * lv_tx = (CTmTxBase *) iv_txPidList.get(lv_pidKey.intKey);
if (lv_tx)
{
TMTrace(1, ("TM_Info::add_tx : Warning: ID %d, pid %d found in iv_txPidList replaced.\n",
pp_tx->seqnum(), pp_tx->ender_pid()));
iv_txPidList.remove(lv_pidKey.intKey);
}
iv_txPidList.put(lv_pidKey.intKey, pp_tx);
iv_tx_start_list.push(pp_tx->transid());
TMTrace (3, ("TM_Info::add_tx : iv_tx_start_list size : " PFLL ".\n", iv_tx_start_list.size()));
TMTrace (2, ("TM_Info::add_tx : EXIT.\n"));
return TM_OK;
}
// ------------------------------------------------------------
// get_tx
// Purpose : find a tx in the TX list and return it
// ------------------------------------------------------------
void * TM_Info::get_tx(int32 pv_node, int32 pv_seq)
{
CTmTxKey k(pv_node, pv_seq);
return transactionPool()->get(k.id());
}
// ------------------------------------------------------------
// get_tx
// Purpose : find a tx in the TX list and return it
// ------------------------------------------------------------
void * TM_Info::get_tx(TM_Txid_Internal *pv_transid)
{
CTmTxKey k(pv_transid->iv_node, pv_transid->iv_seq_num);
return transactionPool()->get(k.id());
}
// ------------------------------------------------------------
// get_tx
// Purpose : find a tx in the TX list and return it
// ------------------------------------------------------------
void * TM_Info::get_tx(int64 pv_txnId)
{
CTmTxBase *lp_txBase = transactionPool()->get(pv_txnId);
void *lp_txn = (void *) lp_txBase;
return lp_txn;
}
void ** TM_Info::get_all_txs(int64 *pv_size)
{
return transactionPool()->get_inUseList()->return_all(pv_size);
}
// ------------------------------------------------------------
// getFirst_tx
// Purpose : find the first tx in the TX list and return it
// ------------------------------------------------------------
void * TM_Info::getFirst_tx()
{
return transactionPool()->getFirst_inUseList();
}
// ------------------------------------------------------------
// getNext_tx
// Purpose : find the next tx in the TX list and return it
// ------------------------------------------------------------
void * TM_Info::getNext_tx()
{
return transactionPool()->getNext_inUseList();
}
// ------------------------------------------------------------
// getEnd_tx
// Purpose : Unlock the map when we've finished. Must be called
// by any code which calls getFirst_tx.
// ------------------------------------------------------------
void TM_Info::getEnd_tx()
{
transactionPool()->getEnd_inUseList();
}
// ------------------------------------------------------------
// getFirst_tx_byPid
// Purpose : get the first transaction object in the tx list with
// with this pid.
//NOTE: This is currently not used and has issues because pid
// and seqNum are not sufficient to uniquely identify a
// transaction/process association!!!!!!!!!!!!!!!!
// ------------------------------------------------------------
void *TM_Info::getFirst_tx_byPid(int32 pv_pid)
{
pidKey lv_pidKey;
lv_pidKey.k.iv_pid = pv_pid;
lv_pidKey.k.iv_seqnum = 0;
return (CTmTxBase *) iv_txPidList.get(lv_pidKey.intKey);
}
// ------------------------------------------------------------
// getNext_tx_byPid
// Purpose : get the next transaction object in the tx list
// with this pid.
// ------------------------------------------------------------
void *TM_Info::getNext_tx_byPid(int32 pv_pid)
{
CTmTxBase *lp_tx = (CTmTxBase *) iv_txPidList.get_next();
if (lp_tx != NULL && lp_tx->ender_pid() == pv_pid)
return lp_tx;
else
return NULL;
}
// ------------------------------------------------------------
// getEnd_tx_byPid
// Purpose : Unlock the iv_txPidList when finished. This method
// must be called after calling getFirst_tx_byPid.
// ------------------------------------------------------------
void TM_Info::getEnd_tx_byPid()
{
iv_txPidList.get_end();
}
// ------------------------------------------------------------
// remove_tx
// Purpose : return the transaction object to the transactionPool.
// Note that the caller must lock the TM_Info object prior to calling.
// ------------------------------------------------------------
void TM_Info::remove_tx (CTmTxBase * pp_tx)
{
CTmTxKey k(pp_tx->node(), pp_tx->seqnum());
TMTrace (2, ("TM_Info::remove_tx : ENTRY, tx object %p ID (%d,%d).\n",
(void *) pp_tx, pp_tx->node(), pp_tx->seqnum()));
pp_tx->cleanup();
//lock(); Must be locked by caller
TMTrace (2, ("TM_Info::remove_tx : Calling CTmPool<CTmTxBase>::deleteElement "
"index (%d, %d).\n", k.node(), k.seqnum()));
//If it was a recovery transaction from a node failure, we can finally decrement the counter now
if ((!gv_tm_info.ClusterRecov()) && pp_tx->recovering())
{
TMTrace (3, ("TM_Info::remove_tx : Finished recovery for transaction on node %d.\n", k.node()));
gv_tm_info.NodeRecov(k.node())->dec_txs_to_recover();
TMTrace (3, ("TM_Info::remove_tx : %d transactions left for recovery on node %d.\n", gv_tm_info.NodeRecov(k.node())->total_txs_to_recover(),
k.node()));
if (gv_tm_info.NodeRecov(k.node())->total_txs_to_recover() <= 0)
{
TMTrace (3, ("TM_Info::remove_tx : Finished recovery for node %d.\n", k.node()));
gv_tm_info.NodeRecov(k.node())->listBuilt (false); // reset since recovery is done
gv_tm_info.set_recovery_end(k.node());
tm_log_event(DTM_RECOVERY_COMPLETED, SQ_LOG_NOTICE, "DTM_RECOVERY_COMPLETED",-1,-1,k.node());
TMTrace (1, ("TM_Info::remove_tx : DTM Recovery completed for node %d.\n", k.node()));
}
}
pidKey lv_pidKey;
lv_pidKey.k.iv_pid = pp_tx->ender_pid();
lv_pidKey.k.iv_seqnum = pp_tx->seqnum();
iv_txPidList.remove(lv_pidKey.intKey);
ip_transactionPool->deleteElement(k.id());
num_active_txs_dec();
TMTrace (3, ("TM_Info::remove_tx : iv_tx_start_list size : " PFLL ".\n", iv_tx_start_list.size()));
//unlock();
TMTrace (2, ("TM_Info::remove_tx : EXIT.\n"));
}
// Methods to access sync tags. Sync Tags replace sync handles.
// ------------------------------------------------------------------
// add_sync_otag
// Purpose : Add a new sync tag to the list. This also allocates a
// new tag and returns it. Unlike sync handles which are returned by
// a call to msg_mon_issue_tmsync(), sync tags are allocated by the
// TM and passed to msg_mon_issue_tmsync(). This avoids a problem
// in multi-threaded TMs where a completion can arrive for a sync
// handle before the msg_mon_issue_tmsync() call completes and the
// handle is known to the TM.
// Note: This function will call lock(true) which allows reentrant
// calls for a thread.
// ------------------------------------------------------------------
int32 TM_Info::add_sync_otag(Tm_Sync_Type_Transid *pp_data)
{
int32 lv_sync_otag;
TMTrace (2, ("TM_Info::add_sync_otag ENTRY ID %d\n",
pp_data->u.iv_seqnum));
// Allocate a new tag
// Note that we check to make sure the tag is not currently in use.
lock();
do
{
iv_sync_otag++;
if (iv_sync_otag > 1000000)
iv_sync_otag = 0;
lv_sync_otag = iv_sync_otag;
}
while (iv_synctags.get(lv_sync_otag) != NULL);
unlock();
pp_data->iv_sync_otag = lv_sync_otag;
iv_synctags.put(lv_sync_otag, pp_data);
TMTrace (2, ("TM_Info::add_sync_otag, tag %d, EXIT.\n", lv_sync_otag));
return lv_sync_otag;
}
// ------------------------------------------------------------------
// get_sync_otag
// Purpose : Retrieve the transaction from the iv_synctags map based
// on pv_tag.
// ------------------------------------------------------------------
Tm_Sync_Type_Transid *TM_Info::get_sync_otag(int32 pv_tag)
{
Tm_Sync_Type_Transid *lp_data = NULL;
TMTrace (2, ("TM_Info::get_sync_otag ENTRY with tag %d, size " PFLL ".\n",
pv_tag, iv_synctags.size()));
lp_data = (Tm_Sync_Type_Transid *) iv_synctags.get(pv_tag);
if (iv_trace_level >= 2)
{
if (lp_data != NULL)
trace_printf("TM_Info::get_sync_otag EXIT TxnId %d.\n",
lp_data->u.iv_seqnum);
else
trace_printf("TM_Info::get_sync_otag EXIT lp_data null.\n");
}
return lp_data;
}
// ------------------------------------------------------------------
// remove_sync_otag
// Purpose : Remove the tag from iv_synctags map.
// ------------------------------------------------------------------
void TM_Info::remove_sync_otag (int32 pv_tag)
{
TMTrace (2, ("TM_Info::remove_sync_otag with tag %d , ENTER.\n", pv_tag));
Tm_Sync_Type_Transid *lp_data = (Tm_Sync_Type_Transid *) iv_synctags.remove(pv_tag);
if (lp_data)
delete lp_data;
else
TMTrace (1, ("TM_Info::remove_sync_otag : WARNING tag not found in iv_synctags!\n"));
TMTrace (2, ("TM_Info::remove_sync_otag : EXIT.\n"));
}
// ------------------------------------------------------------------
// tm_test_verify
// for testing only. This returns true if the syncDataList was
// cleaned up, that, is it is empty.
// ------------------------------------------------------------------
bool TM_Info::tm_test_verify (int32 pv_nid)
{
if (iv_syncDataList[pv_nid].size() == 0)
return true;
else
return false;
}
// ------------------------------------------------------------------
// Transaction thread related helper functions:
// ------------------------------------------------------------------
// new_thread
// Purpose : Allocate a new thread object.
// ------------------------------------------------------------------
CTxThread * TM_Info::new_thread(CTmTxBase *pp_Txn)
{
CTxThread *lp_Thread = NULL;
bool lv_startThread = false;
lv_startThread = lv_startThread; // compiler error
TMTrace (2, ("TM_Info::new_thread : ENTRY.\n"));
// Reject the call if no transaction object was supplied.
if (!pp_Txn)
{
TMTrace (1, ("TM_Info::new_thread : No transaction object specified on call.\n"));
return NULL;
}
TMTrace (2, ("TM_Info::new_thread : Calling CTmPool<CTxThread>::newElement "
"next index is " PFLL ".\n", iv_txThreadNum));
bool lv_reused = false;
lp_Thread = ip_threadPool->newElement(iv_txThreadNum, &lv_reused);
TMTrace (3, ("TM_Info::new_thread : CTmPool<CTxThread>::newElement returned "
"reused %d, thread %p.\n", lv_reused, (void *) lp_Thread));
if (lp_Thread == NULL)
{
TMTrace(1, ("TM_Info::new_thread : Maximum threads of %d in use. "
"Transaction suspended, waiting for thread to become available.\n",
ip_threadPool->get_maxPoolSize()));
/* Removing event as it's a pain
tm_log_event(DTM_MAX_THREADS, SQ_LOG_INFO, "DTM_MAX_THREADS",
-1, //error_code
-1, //rmid
nid(), //dtmid
pp_Txn->seqnum(), //seq_num
-1, //msgid
-1, //xa_error
ip_threadPool->get_maxPoolSize(), //pool_size
ip_threadPool->totalElements()); //pool_elems */
}
else
{
if (!lv_reused)
iv_txThreadNum++;
// Queue an Initialize event to the thread
// This is always placed at the top of the queue.
CTmEvent *lp_event = new CTmEvent(TM_MSG_TXTHREAD_INITIALIZE, lp_Thread);
lp_event->request()->u.iv_init_txthread.ip_txObject = pp_Txn;
lp_Thread->eventQ_push_top(lp_event);
}
TMTrace (2, ("TM_Info::new_thread : EXIT, thread object %p(%s).\n",
(void *) lp_Thread,
(lp_Thread?lp_Thread->get_name():"undefined")));
return lp_Thread;
} //new_thread
// ------------------------------------------------------------------
// release_thread
// Purpose : Release a thread object back to the pool.
// The return value indicates whether the calling thread should exit.
// ------------------------------------------------------------------
bool TM_Info::release_thread(CTxThread * pp_thread)
{
CTmTxMessage *lp_msg;
bool lv_exit = false;
TMTrace (2, ("TM_Info::release_thread : ENTRY thread object %p(%s).\n",
(void *) pp_thread, pp_thread->get_name()));
// Signal thread to terminate
// This shouldn't happen as we shouldn't have an associated transaction
CTmTxBase * lp_tx = (CTmTxBase *) pp_thread->transaction();
if (lp_tx != NULL)
{
lp_msg = new CTmTxMessage(TM_MSG_TXTHREAD_RELEASE);
lp_tx->eventQ_push(lp_msg);
return false; // Drop out early to process the release.
}
if (!check_for_queued_requests(pp_thread))
{
TMTrace (2, ("TM_Info::release_thread : Calling CTmPool<CTxThread>::deleteElement "
"index " PFLL ".\n", pp_thread->threadNum()));
lv_exit = ip_threadPool->deleteElement(pp_thread->threadNum());
}
TMTrace (2, ("TM_Info::release_thread : EXIT, returning %d.\n", lv_exit));
return lv_exit;
} //TM_Info::release_thread
// ------------------------------------------------------------------
// get_thread
// Purpose : Get the thread with name pp_name.
// Walks through the ip_threadPool's inUseList looking for pp_name.
// ------------------------------------------------------------------
CTxThread * TM_Info::get_thread(char * pp_name)
{
TMTrace (2, ("TM_Info::get_thread : ENTRY, looking for thread %s.\n", pp_name));
CTxThread * lp_thread = ip_threadPool->getFirst_inUseList();
int lv_inx = 0;
while (lp_thread != NULL && strcmp(pp_name, lp_thread->get_name()) != 0)
{
TMTrace (4, ("TM_Info::get_thread : thread %d: name=%s.\n", lv_inx, lp_thread->get_name()));
lp_thread = ip_threadPool->getNext_inUseList();
lv_inx++;
}
// Must unlock the thread list once we've finished.
ip_threadPool->getEnd_inUseList();
TMTrace(2, ("TM_Info::get_thread : EXIT, thread %s found.\n",
((lp_thread == NULL)?"not":pp_name)));
return lp_thread;
} //TM_Info::get_thread
// -------------------------------------------------------------------
// tm_init_other_tms
// Purpose : lead tm will open other tms
// -------------------------------------------------------------------
void TM_Info::open_other_tms()
{
int32 lv_count;
int32 lv_error;
int lv_oid;
MS_Mon_Process_Info_Type lv_info[MAX_NODES];
if (!iv_lead_tm)
return;
TMTrace (2, ("TM_Info::open_other_tms : ENTRY.\n"));
// get all TMs in the system and open them all
lv_error = msg_mon_get_process_info_type(MS_ProcessType_DTM,
&lv_count,
MAX_NODES,
lv_info);
if (lv_error != 0 && iv_trace_level)
trace_printf("TM_Info::open_other_tms : Error opening other TMs\n");
if (lv_error)
{
tm_log_event (DTM_CANNOT_OPEN_DTM, SQ_LOG_CRIT, "DTM_CANNOT_OPEN_DTM", lv_error);
abort();
}
lock();
for (int lv_idx = 0; lv_idx < lv_count; lv_idx++)
{
// we don't want to open ourselves
if ((lv_info[lv_idx].nid != iv_nid) &&
(lv_info[lv_idx].state != MS_Mon_State_Stopped))
{
TMTrace(3, ("TM_Info::open_other_tms : opening TM %s.\n",
lv_info[lv_idx].process_name));
iv_open_tms[lv_info[lv_idx].nid].iv_in_use = 1;
if (lv_info[lv_idx].nid > iv_tms_highest_index_used)
iv_tms_highest_index_used = lv_info[lv_idx].nid;
lv_error = msg_mon_open_process(lv_info[lv_idx].process_name,
&(iv_open_tms[lv_info[lv_idx].nid].iv_phandle),
&lv_oid);
if (lv_error)
{
TMTrace(1, ("TM_Info::open_other_tms : msg_mon_open_process error %d, trying to open %s.\n",
lv_error, lv_info[lv_idx].process_name));
tm_log_event(DTM_CANNOT_OPEN_DTM, SQ_LOG_WARNING, "DTM_CANNOT_OPEN_DTM", lv_error,
-1, lv_info[lv_idx].nid);
// This is generally caused by a window where the TM/node has failed since the
// msg_mon_get_process_info_type() call was made.
// Lead TM will retry open when a node_up notification arrives from the monitor.
iv_open_tms[lv_info[lv_idx].nid].iv_in_use = 0;
}
}
else
iv_open_tms[lv_info[lv_idx].nid].iv_in_use = 0;
}
iv_allTMsOpen = all_tms_recovered();
unlock();
TMTrace (2, ("TM_Info::open_other_tms : EXIT allTMsOpen=%d.\n", iv_allTMsOpen));
}
// -------------------------------------------------------------------
// open_restarted_tm
// Purpose : open the restarted TM by the lead TM
// This routine is ONLY called in response to a monitor TMRestarted message!
// -------------------------------------------------------------------
int32 TM_Info::open_restarted_tm(int32 pv_nid)
{
char la_buffer[20];
int lv_oid;
int lv_error;
if (!iv_lead_tm)
return (FEOK);
TMTrace (2, ("TM_Info::open_restarted_tm : ENTRY.\n"));
sprintf(la_buffer, "$tm%d", pv_nid);
lock();
lv_error = msg_mon_open_process(la_buffer,
&(iv_open_tms[pv_nid].iv_phandle),
&lv_oid);
if (lv_error)
{
TMTrace(1, ("TM_Info::open_restarted_tm : msg_mon_open_process error %d, trying to open %s.\n",
lv_error, la_buffer));
tm_log_event(DTM_CANNOT_OPEN_DTM, SQ_LOG_WARNING, "DTM_CANNOT_OPEN_DTM", lv_error,
-1, pv_nid);
}
if (lv_error == FEOK)
{
iv_open_tms[pv_nid].iv_in_use = 1;
if (pv_nid > iv_tms_highest_index_used)
iv_tms_highest_index_used = pv_nid;
restart_tm_process_helper(pv_nid);
}
if (lv_error != 0 && iv_trace_level)
trace_printf("TM_Info::open_restarted_tm : Error opening TM %d\n",pv_nid);
unlock();
dummy_link_to_refresh_phandle(pv_nid);
// SB_Thread::Sthr::sleep(100); // in msec
dummy_link_to_refresh_phandle(pv_nid); // The second one actually updates the phandle
TMTrace (2, ("TM_Info::open_restarted_tm : EXIT"));
return lv_error;
}
// -------------------------------------------------------------------
// get_opened_tm_phandle
// Purpose : return the phandle of another TM opened by the lead TM
// -------------------------------------------------------------------
SB_Phandle_Type *
TM_Info::get_opened_tm_phandle(int32 pv_index)
{
if (iv_open_tms[pv_index].iv_in_use)
return &(iv_open_tms[pv_index].iv_phandle);
else
return NULL;
}
//----------------------------------------------------------------------------
// TM_Info::tm_new_seqNum
// Purpose : This method allocates a new unique sequence number.
// If iv_globalUniqueSeqNum is true then a global unique sequence number is
// retrieved from the Monitor.
// If not, then TM local sequence numbering is used.
// Because sequence number is monotonically increasing, we can only get a
// collision when we wraparound.
// The caller must lock the TM_Info object prior to calling tm_new_seqNum.
//----------------------------------------------------------------------------
unsigned int TM_Info::tm_new_seqNum()
{
bool lv_noMoreSeqNums = false;
unsigned int lv_seqNum;
unsigned int lv_start = 0;
TMTrace (2, ("TM_Info::tm_new_seqNum: ENTRY\n"));
if (iv_nextSeqNum >= iv_nextSeqNumBlockStart)
lv_seqNum = setNextSeqNumBlock();
else
lv_seqNum = iv_nextSeqNum;
iv_nextSeqNum = lv_seqNum + 1;
// If we hit a sequence number that's in use, then we need to roll
// over it and look for the next unused one.
lv_start = lv_seqNum;
while (tm_active_seqNum(lv_seqNum))
{
// If we're here we got a collision!
// EMS DTM_SEQNUM_COLLISION
tm_log_event (DTM_SEQNUM_COLLISION, SQ_LOG_WARNING,"DTM_SEQNUM_COLLISION",
-1, /*error_code*/
-1, /*rmid*/
-1, /*dtmid*/
-1, /*seq_num*/
-1, /*msgid*/
-1, /*xa_error*/
-1, /*pool_size*/
-1, /*pool_elems*/
-1, /*msg_retries*/
-1, /*pool_high*/
-1, /*pool_low*/
-1, /*pool_max*/
-1, /*tx_state*/
-1, /*data */
-1, /*data1*/
-1, /*data2*/
NULL, /*string1*/
-1, /*node*/
-1, /*msgid2*/
-1, /*offset*/
-1, /*tm_event_msg*/
lv_seqNum);
TMTrace (1, ("TM_Info::tm_new_seqNum: Sequence number %u already in use", lv_seqNum));
if (lv_seqNum >= iv_nextSeqNumBlockStart)
lv_seqNum = setNextSeqNumBlock();
else
lv_seqNum = iv_nextSeqNum;
iv_nextSeqNum++;
if (iv_nextSeqNum == lv_start)
{
lv_noMoreSeqNums = true;
break;
}
}
// If we wrap around and come back to the starting sequence number, there
// are no free slots - all sequence numbers are allocated!
if (lv_noMoreSeqNums)
{
// EMS DTM_NO_MORE_SEQNUMS
tm_log_event (DTM_NO_MORE_SEQNUMS, SQ_LOG_CRIT, "DTM_NO_MORE_SEQNUMS");
TMTrace (1, ("TM_Info::tm_new_seqNum: No more sequence numbers available!"));
abort ();
}
TMTrace (2, ("TM_Info::tm_new_seqNum EXIT: Allocating sequence number %u\n",
lv_seqNum));
return lv_seqNum;
} //tm_new_seqNum
//----------------------------------------------------------------------------
// TM_Info::tm_new_seqNumBlock
// Purpose: This methods returns a block of sequence number.s
// Because sequence number is monotonically increasing, we can only get a
// collision when we wraparound.
// The caller must lock the TM_Info object prior to calling tm_new_seqNumBlock.
//----------------------------------------------------------------------------
void TM_Info::tm_new_seqNumBlock(int pv_blockSize, unsigned int *pp_start, int *pp_count)
{
TMTrace (2, ("TM_Info::tm_new_seqNumBlock: ENTRY, block Size %d.\n", pv_blockSize));
*pp_start = getSeqNumBlock(pv_blockSize);
*pp_count = pv_blockSize;
TMTrace (2, ("TM_Info::tm_new_seqNumBlock: EXIT. Returned block starting at %d, for %d seqNums.\n",
*pp_start, *pp_count));
} //tm_new_seqNumBlock
//----------------------------------------------------------------------------
// TM_Info::tm_up
// Purpose : Set the tm state to TM_UP and inform any waiters
//----------------------------------------------------------------------------
void TM_Info::tm_up()
{
TMTrace (2, ("TM_Info::tm_up : ENTRY TM state %d, sys_recov_state %d, all_rms_closed %d.\n",
iv_state, iv_sys_recov_state, all_rms_closed()));
// We need to block new transactions while the RMs are still being opened.
if (all_rms_closed())
state(TM_STATE_WAITING_RM_OPEN);
else
state(TM_STATE_UP);
if (iv_sys_recov_state == TM_SYS_RECOV_STATE_END)
wake_TMUP_waiters(FEOK);
TMTrace (2, ("TM_Info::tm_up : EXIT\n"));
}
//----------------------------------------------------------------------------
// TM_Info::wake_TMUP_waiters
// Purpose : Wake up any applications which called TMWAIT.
//----------------------------------------------------------------------------
void TM_Info::wake_TMUP_waiters(short pv_error)
{
CTmTxMessage * lp_msg = (CTmTxMessage *) iv_TMUP_wait_list.pop();
while (lp_msg)
{
TMTrace (3, ("TM_Info::wake_TMUP_waiters : replying to wait_TMUP request msgid(%d), error(%d).\n",
lp_msg->msgid(), pv_error));
lp_msg->reply(pv_error);
delete lp_msg;
lp_msg = (CTmTxMessage *) iv_TMUP_wait_list.pop();
}
}
//----------------------------------------------------------------------------
// TM_Info::terminate_all_threads
// Purpose : queue a terminate request to all transaction threads.
// This should only be done when the TM is shutting down!
//----------------------------------------------------------------------------
void TM_Info::terminate_all_threads()
{
CTmEvent *lp_event;
CTxThread *lp_thread;
int32 lv_signalledThreads = 0;
TMTrace (2, ("TM_Info::terminate_all_threads : ENTRY.\n"));
// Terminate any threads still in the in-use list
lp_thread = ip_threadPool->getFirst_inUseList();
while (lp_thread != NULL)
{
lp_event = new CTmEvent(TM_MSG_TXTHREAD_TERMINATE, lp_thread);
lp_thread->eventQ_push(lp_event);
lv_signalledThreads++;
TMTrace (3, ("TM_Info::terminate_all_threads : Warning: signalling in-use "
"thread %s to terminate.\n",
lp_thread->get_name()));
lp_thread = ip_threadPool->getNext_inUseList();
}
ip_threadPool->getEnd_inUseList();
// Terminate all threads in the free list
lp_thread = ip_threadPool->getFirst_freeList();
while (lp_thread != NULL)
{
lp_event = new CTmEvent(TM_MSG_TXTHREAD_TERMINATE, lp_thread);
lp_thread->eventQ_push(lp_event);
lv_signalledThreads++;
TMTrace (3, ("TM_Info::terminate_all_threads : Signalling free thread %s "
"to terminate.\n", lp_thread->get_name()));
lp_thread = ip_threadPool->getNext_freeList();
}
ip_threadPool->getEnd_freeList();
// Now wait for all transaction threads to terminate
// The maximum time we wait for is 1 second, then just give up and exit.
int lv_waitCount = 0;
while (ip_threadPool->totalElements() > 0 && (lv_waitCount/100) < 1)
{
SB_Thread::Sthr::sleep(10); //1/100th of a second
lv_waitCount++;
}
stopTimerEvent();
stopAuditThread();
if (iv_trace_level >= 2)
{
if (ip_threadPool->totalElements() > 0)
trace_printf("TM_Info::terminate_all_threads : WARNING %d threads did "
"not terminate within 1 second, exiting anyway.\n",
ip_threadPool->totalElements());
else
trace_printf("TM_Info::terminate_all_threads : EXIT, all transaction "
"threads terminated.\n");
}
} //terminate_all_threads
//----------------------------------------------------------------------------
// TM_Info::setNextSeqNumBlock
// Purpose : Gets the iv_nextSeqNum based on the DTM_NEXT_SEQNUM_BLOCK registry
// value the first time it is called by the TM.
// On subsequent calls it increments the sequence number block.
// Finally it sets the next sequence number block in the registry by adding
// iv_SeqNumInterval to the next sequence number.
// This function handles sequence number and block wraparound.
// Returns the sequence number which begins the next block.
// Trafodion: Changed to use the next sequence number
// as base for next block rather than Registry value.
// The caller is expected to lock the TM_Info object.
//----------------------------------------------------------------------------
unsigned int TM_Info::setNextSeqNumBlock()
{
char la_seq_num[20];
char la_tm_name[8];
char *lp_stop;
int32 lv_error = 0;
static bool lv_firstTime = true;
unsigned int lv_startSeqNum = 1;
sprintf(la_tm_name, "$tm%d", iv_nid);
TMTrace (2, ("TM_Info::setNextSeqNumBlock : ENTRY.\n"));
if (lv_firstTime)
{
lv_firstTime = false;
iv_SeqNumInterval = TM_DEFAULT_SEQ_NUM_INTERVAL;
ms_getenv_int("DTM_SEQ_NUM_INTERVAL", &iv_SeqNumInterval);
lv_error = tm_reg_get(MS_Mon_ConfigType_Process,
(char *) la_tm_name, (char *) DTM_NEXT_SEQNUM_BLOCK,
la_seq_num);
if (lv_error == 0)
lv_startSeqNum = (unsigned int) strtoul((char *) &la_seq_num, &lp_stop, 10);
}
else
lv_startSeqNum = iv_nextSeqNum;
// Check for sequence number wraparound
if (lv_startSeqNum >= MAX_SEQNUM)
{
// EMS DTM_SEQNUM_WRAPAROUND
tm_log_event (DTM_SEQNUM_WRAPAROUND, SQ_LOG_WARNING, "DTM_SEQNUM_WRAPAROUND");
TMTrace (1, ("TM_Info::setNextSeqNumBlock: Sequence number wraparound\n"));
lv_startSeqNum = iv_SeqNumBlockStart = 1;
}
else
// set the sequence number block
iv_SeqNumBlockStart = lv_startSeqNum;
iv_nextSeqNumBlockStart = lv_startSeqNum + iv_SeqNumInterval;
// Check for short final block
// In this case we create a short block
if (iv_nextSeqNumBlockStart < lv_startSeqNum)
{
TMTrace (1, ("TM_Info::setNextSeqNumBlock : Sequence number block "
"reached maximum sequence number and will wrap on next allocation!\n"));
iv_nextSeqNumBlockStart = MAX_SEQNUM;
}
sprintf(la_seq_num, "%u", iv_nextSeqNumBlockStart);
lv_error = tm_reg_set(MS_Mon_ConfigType_Process,
la_tm_name, (char *) DTM_NEXT_SEQNUM_BLOCK, la_seq_num);
if (lv_error)
{
tm_log_event(DTM_TM_REGISTRY_SET_ERROR, SQ_LOG_CRIT, "DTM_TM_REGISTRY_SET_ERROR", lv_error);
TMTrace (1, ("Failed to write the DTM next seqnum value into the registry. Error %d\n", lv_error));
abort ();
}
TMTrace (2, ("TM_Info::setNextSeqNumBlock : EXIT, current seqNum block: "
"%u - %u, returning seqNum %u.\n",
iv_SeqNumBlockStart, (iv_nextSeqNumBlockStart-1), lv_startSeqNum));
return lv_startSeqNum;
} //setNextSeqNumBlock
//----------------------------------------------------------------------------
// TM_Info::getSeqNumBlock
// Purpose : (Trafodion only)
// Returns the next sequence number.
// Allocates the next sequence number block to the caller.
// This is used by local transactions to allocate a block of
// sequence numbers to a client process.
// Added a TM Library specified block size which must be less
// than the iv_SeqNumInterval. This allows the Library to
// grab a block without always updating the registry value.
// The caller is expected to lock the TM_Info object.
//----------------------------------------------------------------------------
unsigned int TM_Info::getSeqNumBlock(int32 pv_blockSize)
{
char la_seq_num[20];
char la_tm_name[8];
int32 lv_error = 0;
unsigned int lv_startSeqNum = iv_nextSeqNum;
bool lv_updateRegistry = false;
sprintf(la_tm_name, "$tm%d", iv_nid);
TMTrace (2, ("TM_Info::getSeqNumBlock : ENTRY blockSize %d, nextSeqNum %u, startNextSeqNumBlock %u "
"seqNumBlockSize %u.\n", pv_blockSize, iv_nextSeqNum, iv_nextSeqNumBlockStart, iv_SeqNumInterval));
// Check for sequence number wraparound
// We need to check that there is more than a full block left as we're allocating the block to the client.
if (lv_startSeqNum >= MAX_SEQNUM - pv_blockSize)
{
// EMS DTM_SEQNUM_WRAPAROUND
tm_log_event (DTM_SEQNUM_WRAPAROUND, SQ_LOG_WARNING, "DTM_SEQNUM_WRAPAROUND");
TMTrace (1, ("TM_Info::getSeqNumBlock: Sequence number wraparound\n"));
lv_startSeqNum = iv_SeqNumBlockStart = 1;
lv_updateRegistry = true;
}
else
// set the sequence number block for the TM
lv_startSeqNum = iv_nextSeqNum;
iv_nextSeqNum = lv_startSeqNum + pv_blockSize;
// if we exceeded the TM's allocation, get the next block
if (iv_nextSeqNum >= iv_nextSeqNumBlockStart) {
iv_nextSeqNumBlockStart = iv_nextSeqNum + iv_SeqNumInterval;
// Check for sequence number wraparound
// We need to check that there is more than a full block left as we're allocating the block to the client.
if (iv_nextSeqNum >= MAX_SEQNUM - iv_SeqNumInterval)
{
// EMS DTM_SEQNUM_WRAPAROUND
tm_log_event (DTM_SEQNUM_WRAPAROUND, SQ_LOG_WARNING, "DTM_SEQNUM_WRAPAROUND");
TMTrace (1, ("TM_Info::getSeqNumBlock: Sequence number wraparound\n"));
iv_nextSeqNum = iv_SeqNumBlockStart = 1;
iv_nextSeqNumBlockStart = 1 + iv_SeqNumInterval;
lv_updateRegistry = true;
}
}
if (lv_updateRegistry) {
// Copy new sequence number block back to the registry
sprintf(la_seq_num, "%u", iv_nextSeqNumBlockStart);
lv_error = tm_reg_set(MS_Mon_ConfigType_Process,
la_tm_name, (char *) DTM_NEXT_SEQNUM_BLOCK, la_seq_num);
if (lv_error)
{
tm_log_event(DTM_TM_REGISTRY_SET_ERROR, SQ_LOG_CRIT, "DTM_TM_REGISTRY_SET_ERROR", lv_error);
TMTrace (1, ("Failed to write the DTM next seqnum value %u into the registry. Error %d\n",
iv_nextSeqNumBlockStart, lv_error));
abort ();
}
}
TMTrace (1, ("TM_Info::getSeqNumBlock : EXIT returning seqNum block %u - %u, "
"current TM seqNum block: %u - %u.\n",
lv_startSeqNum, (lv_startSeqNum + pv_blockSize-1),
iv_SeqNumBlockStart, (iv_nextSeqNumBlockStart-1)));
return lv_startSeqNum;
} //getSeqNumBlock
//----------------------------------------------------------------------------
// TM_Info::check_for_queued_requests
// Purpose : Check the iv_txdisassociatedQ for disassociated transactions and
// reuse the thread if one is found with a request queued.
// Returns true if one was found and the thread was reused.
// false if no disassociated txns with events queued were found.
// This method is called when releasing a thread back to the free list.
// There will no longer be an associated transaction, so we don't need to
// check it here!
//----------------------------------------------------------------------------
bool TM_Info::check_for_queued_requests(CTxThread *pp_thread)
{
CTmTxBase * lp_txn;
TM_DEQUE * lp_txn_eventQ;
TM_DEQUE * lp_txn_PendingRequestQ;
TMTrace (2, ("TM_Info::check_for_queued_requests : ENTRY.\n"));
iv_txdisassociatedQ.lock();
lp_txn = (CTmTxBase *) iv_txdisassociatedQ.get_firstFIFO();
if (lp_txn)
{
lp_txn_eventQ = lp_txn->eventQ();
lp_txn_PendingRequestQ = lp_txn->PendingRequestQ();
}
while (lp_txn && lp_txn_eventQ->empty() && lp_txn_PendingRequestQ->empty())
{
TMTrace (3, ("TM_Info::check_for_queued_requests : WARNING thread %s(%ld) found "
"transaction ID %d on disassociatedTxn "
"queue with no work to do!\n",
pp_thread->get_name(), pp_thread->get_id(), lp_txn->seqnum()));
// Get next
lp_txn = (CTmTxBase *) iv_txdisassociatedQ.get_nextFIFO();
if (lp_txn != NULL)
{
lp_txn_eventQ = lp_txn->eventQ();
lp_txn_PendingRequestQ = lp_txn->PendingRequestQ();
}
}
// If we found a disassociated transaction with outstanding events
// remove the element from the iv_txdisassociatedQ and associate
// with the thread we have just released.
// otherwise add the thread to the free list.
if (lp_txn)
{
TMTrace (3, ("CTmTxBase::check_for_queued_requests : Reusing the thread %s(%ld) for "
"disassociated transaction, ID %d.\n",
pp_thread->get_name(), pp_thread->get_id(), lp_txn->seqnum()));
iv_txdisassociatedQ.erase(); //delete the current entry
}
iv_txdisassociatedQ.unlock();
// If we have a transaction object pointer then there are outstanding events
// for this transaction, reuse this thread to process them.
if (lp_txn)
{
// For worker threads we queue an Initialize event against the thread so that it
// will process the outstanding events.
//if (iv_threadModel == worker)
//{
TMTrace (3, ("TM_Info::check_for_queued_requests : ID %d:"
" Thread %s(%ld), TxnObjThread %s(%ld) Worker thread queuing initialize "
" request to thread to process outstanding events.\n",
lp_txn->seqnum(),
pp_thread->get_name(), pp_thread->get_id(),
((lp_txn->thread()==NULL)?"none":lp_txn->thread()->get_name()),
((lp_txn->thread()==NULL)?-1:lp_txn->thread()->get_id())));
CTmEvent *lp_event = new CTmEvent(TM_MSG_TXTHREAD_INITIALIZE, pp_thread);
lp_event->request()->u.iv_init_txthread.ip_txObject = lp_txn;
TMTrace (3, ("TM_Info::check_for_queued_requests : ID %d:"
" Thread Initialize event queued to thread %s (%ld).\n",
lp_txn->seqnum(), pp_thread->get_name(),
pp_thread->get_id()));
pp_thread->eventQ_push_top(lp_event);
//}
// If we have a pending request but nothing in the eventQ and the transaction
// isn't busy then re-queue it to the transaction.
if (lp_txn_eventQ->empty() && !lp_txn->transactionBusy())
{
// Check the PendingRequest queue
CTmTxMessage *lp_msg = (CTmTxMessage *) lp_txn_PendingRequestQ->pop_end();
if (lp_msg)
{
TMTrace (3, ("TM_Info::check_for_queued_requests : ID %d"
" request popped off PendingRequestQ for msgid(%d).\n",
lp_txn->seqnum(), lp_msg->msgid()));
lp_txn->queueToTransaction(lp_txn->transid(), lp_msg);
}
else
{
TMTrace (3, ("TM_Info::check_for_queued_requests : LOGIC ERROR! ID %d "
"Didn't find anything in eventQ or PendingRequestQ "
"for txn.\n", lp_txn->seqnum()));
}
}
TMTrace(2, ("TM_Info::check_for_queued_requests : EXIT returning true - thread reused.\n"));
return true;
}
else
{
TMTrace (2, ("TM_Info::check_for_queued_requests : EXIT returning false - nothing outstanding to process, thread discarded.\n"));
return false;
}
} // TM_Info::check_for_queued_requests
// ----------------------------------------------------------------------------
// TM_Info::stopTimerEvent
// Purpose : Sends a stop event to the timer thread. This should be called
// only when the TM is exiting.
// ----------------------------------------------------------------------------
void TM_Info::stopTimerEvent()
{
TMTrace (2, ("TM_Info::stopTimerEvent : ENTRY.\n"));
CTmTimerEvent *lp_timerEvent = new CTmTimerEvent(TmTimerCmd_Stop, tmTimer());
if (tmTimer())
tmTimer()->eventQ_push((CTmEvent *) lp_timerEvent);
// Wait for Timer thread to exit.
int lv_waitCount = 0;
while (tmTimer()->state() != TmTimer_Down)
{
SB_Thread::Sthr::sleep(10); //1/100th of a second
lv_waitCount++;
// Write an event every second we have to wait for the thread to exit
if (lv_waitCount % 100 == 0)
{
TMTrace(1, ("TM_Info::stopTimerEvent : Have waited %d sec for Timer "
"thread to exit.\n", (lv_waitCount/100)));
//tm_log_event(DTM_TIMER_TH_WAITING_FOR_EXIT, SQ_LOG_WARNING, "DTM_TIMER_TH_WAITING_FOR_EXIT");
}
}
//tmTimer()->stop(); //Stop the thread now
//SB_Thread::Sthr::sleep(10); //1/100th of a second to allow thread to stop.
delete tmTimer();
TMTrace (2, ("TM_Info::stopTimerEvent : EXIT.\n"));
} // TM_Info::stopTimerEvent
// ----------------------------------------------------------------------------
// TM_Info::stopAuditThread
// Purpose : Stops the audit thread. This should be called
// only when the TM is exiting. It must be called AFTER stopping the timer
// thread to ensure that we can still process any audit writes which were
// queued by the timer thread before it stopped.
//12/8/2010 Added this to cleanup the audit thread during TM shutdown.
// ----------------------------------------------------------------------------
void TM_Info::stopAuditThread()
{
TMTrace (2, ("TM_Info::stopAuditThread : ENTRY.\n"));
tmAuditObj()->exitNow();
SB_Thread::Sthr::sleep(10); //1/100th of a second to allow thread to stop.
delete ip_tmAuditObj;
} // TM_Info::stopAuditThread
// ----------------------------------------------------------------------------
// TM_Info::CheckFailed_RMs
// Purpose : Tries to reopen any RMs in failed state. This is performed
// periodically from a RMRetry timer event, and when a node up notification is
// received.
// It is also invoked to reintegrate a TSE back into the system
// Note that TSEs currently don't support lock reinstatement, so we must mark
// them as TSEBranch_RECOVERING so that they can't process new transactions while we
// have indoubt transactions!
// This routine sets the RM state to
// TSEBranch_RECOVERING if the TSE crashed or
// TSEBranch_FAILOVER if a failover was detected.
// TSEBranch_RECOVERING will drive recovery of any indoubt
// (hung) transactions. For failover we don't want to send the
// TSE an xa_recover as it already has all the details. We do,
// however want to redrive all hung transasctions.
//
// Paremters:
// pp_rmname - name of TSE to integrate or NULL for general case
// ----------------------------------------------------------------------------
void TM_Info::CheckFailed_RMs(char *pp_rmname)
{
#define MAX_RM_PROCS MAX_OPEN_RMS*2 //Allow for backups
int32 lv_count;
MS_Mon_Process_Info_Type lv_info[MAX_RM_PROCS];
int32 lv_index = 0;
char la_value[9];
TM_RM_Responses la_resp[MAX_RM_PROCS];
int32 lv_error = FEOK;
int32 lv_msg_count = 0;
RM_Open_struct lv_open;
int32 lv_rmid = 0;
int32 la_rmid[MAX_RM_PROCS];
int32 lv_rmindex = 0;
RM_Info_TSEBranch *lp_TSEBranchInfo;
TM_Recov *lp_recov;
TSEBranch_state lv_RM_state = TSEBranch_RECOVERING; // Assume recovering.
TSEBranch_state lv_RM_failover_or_recovery = TSEBranch_RECOVERING;
// Set to TSEBranch_FAILOVER if at least one TSE has failed over.
TMTrace (2, ("TM_Info::CheckFailed_RMs : ENTRY.\n"));
// get all DP2s in the system and open them all
msg_mon_get_process_info_type(MS_ProcessType_TSE,
&lv_count,
MAX_RM_PROCS,
lv_info);
TMTrace(3, ("TM_Info::CheckFailed_RMs : received type for %d rms.\n", lv_count));
// Walk through the list of current TSEs and work out which ones are new or
// marked as TSEBranch_FAILED||TSEBranch_DOWN. New and failed/down RMs are re-opened.
for (lv_index = 0; ((lv_index < MAX_RM_PROCS) && (lv_index < lv_count)); lv_index++)
{
// we don't care about it if its a backup
if (lv_info[lv_index].backup == 1)
{
la_rmid[lv_index] = -1;
continue;
}
// if a particular TSE name was passed in,
// then only worry about that particular TSE, none others
if ((pp_rmname != NULL) && (strcasecmp(pp_rmname, lv_info[lv_index].process_name) != 0))
{
TMTrace(4, ("TM_Info::CheckFailed_RMs : skipping %s, looking for %s.\n",
lv_info[lv_index].process_name, pp_rmname));
continue;
}
else
TMTrace(4, ("TM_Info::CheckFailed_RMs : found %s.\n", pp_rmname));
lv_error = tm_reg_get(MS_Mon_ConfigType_Process,
lv_info[lv_index].process_name, (char *) "RMID", la_value);
lv_rmid = (lv_error) ? 0 : (atoi(la_value));
// Lookup is more efficient by rmid, so use this if possible
if (lv_rmid == 0)
{
lp_TSEBranchInfo = gv_RMs.TSE()->return_slot((char *) lv_info[lv_index].process_name);
if (lp_TSEBranchInfo != NULL)
lv_rmid = lp_TSEBranchInfo->rmid();
}
else
lp_TSEBranchInfo = gv_RMs.TSE()->return_slot(lv_rmid);
// If return_slot returned 0 then this is a new TSE
if (lp_TSEBranchInfo == NULL)
{
// If no RMID was configured, allocate one now
if (lv_rmid == 0)
//M8_TODO encapsulate and clean up rmid allocation
//msg_mon_get_tm_seq(&lv_rmid);
{
RMID llv_rmid;
llv_rmid.s.iv_nid = nid(); //TMs nid
llv_rmid.s.iv_num = lv_index;
lv_rmid = llv_rmid.iv_rmid;
}
tm_log_event(DTM_TM_INTEGRATING_TSE, SQ_LOG_WARNING, "DTM_TM_INTEGRATING_TSE", -1,
lv_rmid, iv_nid, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
lv_info[lv_index].pid, -1, -1, (char *) &lv_info[lv_index].process_name,
lv_info[lv_index].nid);
TMTrace(1, ("TM_Info::CheckFailed_RMs : Integrating new TSE %s (%d,%d), rmid %d into TM.\n",
lv_info[lv_index].process_name, lv_info[lv_index].nid, lv_info[lv_index].pid, lv_rmid));
// is_ax_reg is set to false here. We need the TSE response to set
// it correctly, so we do this later.
// The RM is left in TSEBranch_DOWN state until the open completes.
lv_error = gv_RMs.TSE()->init(lv_info[lv_index].pid, lv_info[lv_index].nid,
lv_info[lv_index].process_name,
lv_rmid, false);
}
else
// Ignore any RM which is already up.
if (lp_TSEBranchInfo->state() == TSEBranch_UP)
{
la_rmid[lv_index] = -1;
// Update nid & pid every time, it could have changed (failover).
lp_TSEBranchInfo->nid(lv_info[lv_index].nid);
lp_TSEBranchInfo->pid(lv_info[lv_index].pid);
continue;
}
else
{
tm_log_event(DTM_TM_INTEGRATING_TSE2, SQ_LOG_INFO, "DTM_TM_INTEGRATING_TSE2", -1,
lv_rmid, iv_nid, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
lv_info[lv_index].pid, -1, -1, (char *) &lv_info[lv_index].process_name,
lv_info[lv_index].nid);
TMTrace(1, ("TM_Info::CheckFailed_RMs : Reintegrating TSE %s (%d,%d), rmid %d into TM.\n",
lv_info[lv_index].process_name, lv_info[lv_index].nid, lv_info[lv_index].pid, lv_rmid));
lp_TSEBranchInfo->state(TSEBranch_RECOVERING);
lp_TSEBranchInfo->nid(lv_info[lv_index].nid);
lp_TSEBranchInfo->pid(lv_info[lv_index].pid);
}
// Now send open to the RM. Note the wait for completion is later.
la_rmid[lv_index] = lv_rmid; // coordinate lv_info index with rmid
strcpy(lv_open.process_name, lv_info[lv_index].process_name);
lv_open.incarnation_num = gv_tm_info.incarnation_num();
lv_open.seq_num_block_start = gv_tm_info.SeqNumBlockStart();
lv_error = (*tm_switch).xa_open_entry ((char *)&lv_open, lv_rmid, TMNOFLAGS);
switch (lv_error)
{
case XA_OK:
{
lv_msg_count++;
break;
}
default:
{
// Handle RM error.
tm_log_event(DTM_RM_OPEN_FAILED, SQ_LOG_CRIT, "DTM_RM_OPEN_FAILED", -1,
lv_rmid, iv_nid, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
lv_info[lv_index].pid, -1, -1, (char *) &lv_info[lv_index].process_name,
lv_info[lv_index].nid);
TMTrace(1, ("TM_Info::CheckFailed_RMs - Failed to open RM %s, rmid %d\n",
lv_info[lv_index].process_name, lv_rmid));
// Remark the RM as down since OPEN failed and we don't want to try and sent a recovery to it
if (lp_TSEBranchInfo != NULL)
lp_TSEBranchInfo->state(TSEBranch_DOWN);
}
} // switch xa_open error
} // for each new, down or failed RM
//Since we ignore backup TSEs, there could be gaps in the la_rmid array.
//So, we cannot just check lv_msg_count entries in the la_rmid array.
//Instead, we should go through 'lv_index-1' entries.
lv_rmindex = lv_index-1;
int32 lv_repliesOutstanding = complete_all(lv_msg_count, la_resp, MAX_TMTIMER_WAIT_TIME);
if (lv_repliesOutstanding > 0)
{
tm_log_event(DTM_RM_REPLY_FAILED, SQ_LOG_WARNING , "DTM_RM_REPLY_FAILED",
-1, /*error_code*/
-1, /*rmid*/
-1, /*dtmid*/
-1, /*seq_num*/
-1, /*msgid*/
-1, /*xa_error*/
-1, /*pool_size*/
-1, /*pool_elems*/
-1, /*msg_retries*/
-1, /*pool_high*/
-1, /*pool_low*/
-1, /*pool_max*/
-1, /*tx_state*/
lv_repliesOutstanding); /*data */
TMTrace(1, ("TM_Info::CheckFailed_RMs - %d RMs failed to reply to xa_open request.\n",
lv_repliesOutstanding));
}
// if there are not errors, then initialize the rm slot
// Note that we process any responses even though we may have timed out on the XWAIT.
for (int32 lv_idx = 0; lv_idx < (lv_msg_count-lv_repliesOutstanding); lv_idx++)
{
switch (la_resp[lv_idx].iv_error)
{
case XA_RETRY:
{ // This indicates a failover! The new primary TSE replies XA_RETRY
// when it receives an xa_open request.
if(lv_index < MAX_RM_PROCS) {
tm_log_event(DTM_TM_TSE_FAILOVER, SQ_LOG_WARNING, "DTM_TM_TSE_FAILOVER", -1,
lv_rmid, iv_nid, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
lv_info[lv_index].pid, -1, -1, (char *) &lv_info[lv_index].process_name,
lv_info[lv_index].nid);
TMTrace(1, ("TM_Info::CheckFailed_RMs : TSE %s (%d,%d) failover, rmid %d.\n",
lv_info[lv_index].process_name, lv_info[lv_index].nid, lv_info[lv_index].pid, lv_rmid));
}
lv_RM_failover_or_recovery = lv_RM_state = TSEBranch_FAILOVER;
}
// Intentional drop through
case XA_OK:
{
// find right slot
int lv_i = 0;
for (; lv_i <= lv_rmindex; lv_i++)
if ((la_rmid[lv_i] != -1) &&
(la_rmid[lv_i] == la_resp[lv_idx].iv_rmid))
break;
// if we didn't find the rmid in la_rmid then something is very wrong - give up!
if (lv_i > lv_rmindex)
{
tm_log_event(DTM_RM_NO_MATCH, SQ_LOG_CRIT, "DTM_RM_NO_MATCH",-1,la_resp[lv_idx].iv_rmid);
TMTrace(1, ("TM_Info::CheckFailed_RMs - RM id %d did not match any of the RM slots.\n",
la_resp[lv_idx].iv_rmid));
abort ();
}
lv_error = gv_RMs.TSE()->reinit(lv_info[lv_i].pid, lv_info[lv_i].nid,
lv_info[lv_i].process_name,
la_resp[lv_idx].iv_rmid,
la_resp[lv_idx].iv_ax_reg,
false,
lv_RM_state);
break;
}
default:
{
tm_log_event(DTM_RM_OPEN_FAILED2, SQ_LOG_CRIT,"DTM_RM_OPEN_FAILED2",
la_resp[lv_idx].iv_error, la_resp[lv_idx].iv_rmid);
TMTrace(1, ("TM_Info::CheckFailed_RMs - Failed to open RM rmid %d, error %d\n",
la_resp[lv_idx].iv_rmid, la_resp[lv_idx].iv_error));
}
} // switch response error
} //for each down/failed/new RM send an xa_open
// Oliver - CR 5283
// create the NodeRecov object if not yet created
if (!gv_tm_info.NodeRecov(iv_nid))
gv_tm_info.NodeRecov(new TM_Recov(MAX_TMTIMER_WAIT_TIME), iv_nid);
lp_recov = gv_tm_info.NodeRecov(iv_nid);
// We only want to recover those TSEs which we marked for recovery.
for (lv_index = 0;
((lv_index <= gv_RMs.TSE()->return_highest_index_used()) &&
(lv_index < lv_count));
lv_index++)
{
lp_TSEBranchInfo = gv_RMs.TSE()->get_slot(lv_index);
if (lp_TSEBranchInfo->state() == TSEBranch_RECOVERING)
{
lp_recov->recover_tse_restart(lp_TSEBranchInfo->rmid());
// Only allow the RM to come up if all txns were recovered.
if (lp_recov->total_txs_to_recover() == 0)
{
lp_TSEBranchInfo->up();
TMTrace(1, ("TM_Info::CheckFailed_RMs : All txns recovered and RM %d is now up.\n",
lp_TSEBranchInfo->rmid()));
}
else
{
lp_TSEBranchInfo->totalBranchesLeftToRecover(lp_recov->total_txs_to_recover());
TMTrace(1, ("TM_Info::CheckFailed_RMs : Not all txns recovered. RM %d "
"remains in recovering state.\n", lp_TSEBranchInfo->rmid()));
}
}
else
// Reset failover flag on RM
if (lp_TSEBranchInfo->state() == TSEBranch_FAILOVER)
lp_TSEBranchInfo->up();
if (lp_TSEBranchInfo->state() != TSEBranch_UP)
{
// Abort any active transactions which have this RM enlisted.
abort_active_txns(lp_TSEBranchInfo->rmid());
}
} //for each rm to be recovered
// Oliver - 5283 delete lp_recov;
/*M8 Commented out. We shouldn't be disabling transactions for the node when an RM fails!
// Set the TM state now that we've been through all the RMs. This toggles
// between UP and TX_DISABLED states.
if ((state() == TM_STATE_UP && lv_TMState == TM_STATE_TX_DISABLED) ||
(state() == TM_STATE_TX_DISABLED && lv_TMState == TM_STATE_UP))
state(lv_TMState);M8*/
// If we had a failover, force all hung transactions to retry commit/rollback now
if (lv_RM_failover_or_recovery == TSEBranch_FAILOVER)
{
CTmTxBase * lp_txn = (CTmTxBase *) getFirst_tx();
while (lp_txn)
{
lp_txn->hung_redrive();
lp_txn = (CTmTxBase *) getNext_tx();
}
getEnd_tx();
}
TMTrace(2, ("TM_Info::CheckFailed_RMs : EXIT.\n"));
} // TM_Info::CheckFailed_RMs
void TM_Info::convert_tx_to_str(std::string &pp_str, TM_Txid_Internal &pp_tx, bool pv_empty)
{
char la_buf[1024];
TM_Transid_Type *pp_tx_ex = (TM_Transid_Type *)&pp_tx;
if (!pv_empty)
sprintf(la_buf, "%d.%d.%d." PFLL "." PFLL "." PFLL,
pp_tx.iv_seq_num, pp_tx.iv_node, pp_tx.iv_incarnation_num,
pp_tx_ex->id[1], pp_tx_ex->id[2], pp_tx_ex->id[3]);
/* sprintf(la_buf, "%d.%d.%d.%d.%d.%d.%d.%d.%d.%d." PFLL,
pp_tx.iv_seqnum, pp_tx.iv_node, pp_tx.iv_incarnation_num,
pp_tx.iv_tx_flags, pp_tx.iv_tt_flags.Application, pp_tx.iv_tt_flags.Reserved[0],
pp_tx.iv_tt_flags.Reserved[1], pp_tx.iv_tt_flags.Predefined,
pp_tx.iv_version, pp_tx.iv_check_sum, pp_tx.iv_timestamp);
*/
else
sprintf (la_buf, "0.0.0.0.0.0");
pp_str = la_buf;
}
// ----------------------------------------------------------------------------
// TM_Info::abort_active_txns
// This function aborts any transactions in active or beginning states for the
// specified RM.
// ----------------------------------------------------------------------------
void TM_Info::abort_active_txns(int32 pv_rmid)
{
RM_Info_TSEBranch * lp_TSEBranch;
TMTrace(2, ("TM_Info::abort_active_txns : ENTRY rmid %d.\n", pv_rmid));
// Look up the RM slot first and get the index as it's more efficient than
// using rmid lookup for every transaction.
int32 lv_rmidx = gv_RMs.TSE()->return_slot_index(pv_rmid);
if (lv_rmidx == -1)
{
tm_log_event(DTM_RM_NOT_FOUND, SQ_LOG_CRIT,"DTM_RM_NOT_FOUND", -1, pv_rmid);
TMTrace(1, ("TM_Info::abort_active_txns : Programming Bug!! Rmid %d "
"not found in global RM list.\n", pv_rmid));
abort();
}
int32 lv_count = 0;
CTmTxBase *lp_tx = (CTmTxBase*) getFirst_tx();
while (lp_tx != NULL)
{
if (lp_tx->tx_state() == TM_TX_STATE_NOTX ||
lp_tx->tx_state() == TM_TX_STATE_ACTIVE ||
lp_tx->tx_state() == TM_TX_STATE_BEGINNING)
{
lp_TSEBranch = lp_tx->TSEBranch(lv_rmidx);
if (lp_TSEBranch == NULL)
{
tm_log_event(DTM_RM_NOT_FOUND2, SQ_LOG_CRIT,"DTM_RM_NOT_FOUND2", -1, pv_rmid);
TMTrace(1, ("TM_Info::abort_active_txns : Programming Bug!! Rmid %d "
"not found in global TSE RM list(2).\n", pv_rmid));
abort();
}
if (lp_TSEBranch->partic() && lp_TSEBranch->in_use() && lp_TSEBranch->state() != TSEBranch_UP)
{
lp_tx->internal_abortTrans(true); // Abort the tx for shutdown.
lv_count++;
}
}
lp_tx = (CTmTxBase*) getNext_tx();
}
getEnd_tx();
TMTrace(2, ("TM_Info::abort_active_txns : EXIT %d Txns aborted.\n", lv_count));
} //TM_Info::abort_active_txns
// ----------------------------------------------------------------------------
// TM_Info::abort_all_active_txns
// This function aborts all transactions in active or beginning states.
// ----------------------------------------------------------------------------
void TM_Info::abort_all_active_txns()
{
TMTrace(2, ("TM_Info::abort_all_active_txns : ENTRY.\n"));
int32 lv_count = 0;
CTmTxBase *lp_tx = (CTmTxBase *) getFirst_tx();
while (lp_tx != NULL)
{
if (lp_tx->tx_state() == TM_TX_STATE_NOTX ||
lp_tx->tx_state() == TM_TX_STATE_ACTIVE ||
lp_tx->tx_state() == TM_TX_STATE_BEGINNING)
{
lp_tx->internal_abortTrans(true); // Abort the tx for shutdown.
lv_count++;
}
lp_tx = (CTmTxBase*) getNext_tx();
}
getEnd_tx();
tm_log_event(DTM_TM_SHUTDOWN_ABORT_TXNS, SQ_LOG_INFO,"DTM_TM_SHUTDOWN_ABORT_TXNS",
-1,-1,gv_tm_info.nid(),-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,lv_count);
TMTrace(2, ("TM_Info::abort_all_active_txns : ENTRY %d Txns aborted.\n", lv_count));
} //TM_Info::abort_all_active_txns
// ----------------------------------------------------------------------------
// TM_Info::state
// Sets the TM state to a new value.
// ----------------------------------------------------------------------------
void TM_Info::state (int32 pv_state)
{
char la_buf[DTM_STRING_BUF_SIZE];
int32 lv_old_state = iv_state;
char * lp_tmState = tmStatetoa(pv_state);
lock();
iv_state = pv_state;
unlock();
sprintf(la_buf, "Old state %d, new state %d", lv_old_state, iv_state);
tm_log_event(DTM_TM_STATE_CHANGE, SQ_LOG_INFO,"DTM_TM_STATE_CHANGE",
-1, /*error_code*/
-1, /*rmid*/
iv_nid, /*dtmid*/
-1, /*seq_num*/
-1, /*msgid*/
-1, /*xa_error*/
-1, /*pool_size*/
-1, /*pool_elems*/
-1, /*msg_retries*/
-1, /*pool_high*/
-1, /*pool_low*/
-1, /*pool_max*/
iv_state, /*tx_state - TM state in this case!*/
lv_old_state, /*data*/
-1, /*data1*/
-1, /*data2*/
lp_tmState /*string1*/);
TMTrace(1, ("TM_Info::state, TM state changed from %d(%s) to %d(%s).\n", lv_old_state, tmStatetoa(lv_old_state), iv_state, tmStatetoa(iv_state)));
} //TM_Info::state
// ----------------------------------------------------------------------------
// TM_Info::addTimerEvent
// Purpose : Wrapper to simplify the addition of timer events to the timer
// threads event queue. Because there is no transaction associated with these
// events, they will be executed by the timer thread. There must be a
// corresponding implementation, something like TM_Info::enableTrans.
// pv_type is a message type
// pv_delayInterval is the interval the timer thread will wait before
// executing the request in msec.
// ----------------------------------------------------------------------------
CTmTimerEvent * TM_Info::addTimerEvent(CTmTxMessage *pp_msg, int pv_delayInterval)
{
TMTrace (2, ("TM_Info::addTimerEvent : ENTRY, msg type %d, delay %d\n",
pp_msg->requestType(), pv_delayInterval));
CTmTimerEvent *lp_timerEvent = new CTmTimerEvent(pp_msg, pv_delayInterval);
tmTimer()->eventQ_push((CTmEvent *) lp_timerEvent);
return lp_timerEvent;
} // TM_Info::addTimerEvent
// ----------------------------------------------------------------------------
// TM_Info::addTMRestartRetry
// Purpose : Add a TM Restart retry event. This event will schedule/retry the call
// to TM_Info::restart_tm. If it fails then the code also calls addTMRestartRetry
// to put the event back on the queue.
// There will be one of these on the timer list for each failed non-lead TM
// in the Lead TM only.
// pv_nid input: Node id of the TM to be restarted.
// pv_waitTime: Time interval the Timer thread waits for before driving the event.
// -1 = default (TM_Info::iv_TMRestartRetry_interval)
// 0 = no wait - execute immediately.
// > 0 = wait time in msec
// ----------------------------------------------------------------------------
CTmTimerEvent * TM_Info::addTMRestartRetry(int32 pv_nid, int32 pv_waitTime=-1)
{
int32 lv_wait = (pv_waitTime < 0)? iv_TMRestartRetry_interval: pv_waitTime;
TMTrace (2, ("TM_Info::addTMRestartRetry : ENTRY, $TM%d, wait %d.\n", pv_nid, lv_wait));
if (pv_nid < 0)
{
tm_log_event(DTM_TM_RESTART_RETRY_PROGERROR, SQ_LOG_CRIT, "DTM_TM_RESTART_RETRY_PROGERROR",
-1,-1,nid(),-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,NULL,pv_nid);
TMTrace(1, ("TM_Info::addTMRestartRetry : Programming ERROR : Bad node id %d\n",
pv_nid));
abort();
}
//Must never cancel a on off event!
// We no longer want to cancel the event. Duplicates will get taken
// care of appropriately when popped off the queue
// if (iv_open_tms[pv_nid].ip_restartTimerEvent)
// cancelTMRestartEvent(pv_nid);
CTmTimerEvent *lp_timerEvent =
new CTmTimerEvent(TM_MSG_TXINTERNAL_TMRESTART_RETRY, lv_wait);
lp_timerEvent->request()->u.iv_tmrestart_internal.iv_nid = pv_nid;
tmTimer()->eventQ_push((CTmEvent *) lp_timerEvent);
// Save the event pointer for node restarts
iv_open_tms[pv_nid].ip_restartTimerEvent = lp_timerEvent;
return lp_timerEvent;
} //TM_Info::addTMRestartRetry
// --------------------------------------------------------------
// TM_Info::cancelTMRestartEvent
// Purpose : Cancel the TM Restart event.
// --------------------------------------------------------------
void TM_Info::cancelTMRestartEvent(int32 pv_nid)
{
TMTrace (2, ("TM_Info::cancelTMRestartEvent : ENTRY. Node %d, Event %p.\n",
pv_nid, (void *) iv_open_tms[pv_nid].ip_restartTimerEvent));
if (iv_open_tms[pv_nid].ip_restartTimerEvent != NULL)
tmTimer()->cancelEvent(iv_open_tms[pv_nid].ip_restartTimerEvent);
iv_open_tms[pv_nid].ip_restartTimerEvent = NULL;
} //TM_Info::cancelTMRestartEvent
// ----------------------------------------------------------------------------
// TM_Info::addTMRecoveryWait
// Purpose : Add a TM Recovery wait event. This event will wait for transactions
// to be 0 to continue with recovery. If greater than 0 then it will
// put the event back on the queue.
// pv_nid input: -1 is cluster recovery else node recovery
// pv_delay input: amount of time to delay
// ----------------------------------------------------------------------------
CTmTimerEvent * TM_Info::addTMRecoveryWait(int32 pv_nid, int32 pv_delay)
{
TMTrace (2, ("TM_Info::addTMRecoveryWait : ENTRY, node: %d delay time: %d\n", pv_nid, pv_delay));
CTmTimerEvent *lp_timerEvent =
new CTmTimerEvent(TM_MSG_TXINTERNAL_RECOVERY_WAIT, pv_delay);
lp_timerEvent->request()->u.iv_tmrecovery_internal.iv_nid = pv_nid;
tmTimer()->eventQ_push((CTmEvent *) lp_timerEvent);
// Do not need to store event info since multiple events for a given node are fine
return lp_timerEvent;
} //TM_Info::addTMRecoveryWait
// ----------------------------------------------------------------------------
// TM_Info::sendAllTMs
// Purpose : Sends the request out to all open TMs and waits for their replies.
// The caller must be the Lead TM.
// NOTE: This function adds 10000 (TM_TM_MSG_OFFSET) to the request type to
// make sure it can be distingushed by the receiving TM from an incoming client
// request.
// ----------------------------------------------------------------------------
int32 TM_Info::sendAllTMs(CTmTxMessage *pp_msg)
{
short la_results[6];
Tm_Req_Msg_Type *lp_req = NULL;
Tm_Rsp_Msg_Type *lp_rsp = NULL;
int32 lv_error = 0;
int32 lv_index = 0;
int32 lv_num_sent = 0;
pid_msgid_struct lv_pid_msgid[MAX_NODES];
int32 lv_reqLen = 0;
long lv_ret;
long lv_ret2;
int32 lv_rspLen = 0;
int lv_rsp_rcvd = 0;
BMS_SRE_LDONE lv_sre;
//initialize lv_pid_msgid
for (int32 i = 0; i <= tms_highest_index_used(); i++)
{
lv_pid_msgid[i].iv_tag = 0;
lv_pid_msgid[i].iv_msgid = 0;
lv_pid_msgid[i].iv_nid = 0;
}
TMTrace (2, ("TM_Info::sendAllTMs ENTRY. Sending request %d request to other TMs.\n", pp_msg->requestType()));
lp_req = new Tm_Req_Msg_Type [tms_highest_index_used() + 1];
lp_rsp = new Tm_Rsp_Msg_Type [tms_highest_index_used() + 1];
for (int lv_idx = 0; lv_idx <= tms_highest_index_used(); lv_idx++)
{
if ((lv_idx == iv_nid) ||
(iv_open_tms[lv_idx].iv_in_use == false))
{
lv_pid_msgid[lv_idx].iv_tag = -1;
}
else
{
lv_pid_msgid[lv_idx].iv_tag = lv_idx + 1; // non zero
memcpy (&lp_req[lv_idx], pp_msg->request(), sizeof(Tm_Req_Msg_Type));
lp_req[lv_idx].iv_msg_hdr.rr_type.request_type = pp_msg->requestType() + TM_TM_MSG_OFFSET;
lp_req[lv_idx].iv_msg_hdr.version.request_version = TM_SQ_MSG_VERSION_CURRENT;
lv_pid_msgid[lv_idx].iv_nid = lv_idx;
lv_reqLen = sizeof (Tm_Req_Msg_Type);
lv_rspLen = sizeof (Tm_Rsp_Msg_Type);
lv_error = link(&(iv_open_tms[lv_idx].iv_phandle), // phandle,
&lv_pid_msgid[lv_idx].iv_msgid, // msgid
(char *) &lp_req[lv_idx], // reqdata
lv_reqLen, // reqdatasize
(char *) &lp_rsp[lv_idx], // replydata
lv_rspLen, // replydatamax
lv_pid_msgid[lv_idx].iv_tag, // linkertag
TM_TM_LINK_PRIORITY, // pri
BMSG_LINK_LDONEQ, // linkopts
TM_LINKRETRY_WAITFOREVER);
if (lv_error != 0)
{
// sprintf(la_buf, "BMSG_LINK_ failed with error %d\n", lv_error);
// tm_log_write(DTM_TM_INFO_LINK_MSG_FAIL, SQ_LOG_CRIT, la_buf);
TMTrace (1, ("TM_Info::sendAllTMs-BMSG_LINK_ failed with error %d. failure ignored.\n",lv_error));
// Ignore errors here. We assume that the TM is down.
//abort ();
return lv_error;
}
else
lv_num_sent++;
}
}
// LDONE LOOP
while (lv_rsp_rcvd < lv_num_sent)
{
// wait for an LDONE wakeup
XWAIT(LDONE, -1);
do {
// we've reached our message reply count, break
if (lv_rsp_rcvd >= lv_num_sent)
break;
lv_ret = BMSG_LISTEN_((short *)&lv_sre,
BLISTEN_ALLOW_LDONEM, 0);
if (lv_ret == BSRETYPE_LDONE)
{
lv_index = -1;
for (int32 lv_idx2 = 0; lv_idx2 <=tms_highest_index_used(); lv_idx2++)
{
if (lv_pid_msgid[lv_idx2].iv_tag == lv_sre.sre_linkTag)
{
lv_index = lv_idx2;
break;
}
}
if (lv_index == -1)
{
tm_log_event(DTM_TM_INFO_NO_LTAG, SQ_LOG_WARNING, "DTM_TM_INFO_NO_LTAG");
TMTrace (1, ("TM_Info::sendAllTMs - Link Tag %d not found\n", (int)lv_sre.sre_linkTag));
lv_error = FEDEVDOWN;
}
if (!lv_error)
{
lv_ret2 = BMSG_BREAK_(lv_pid_msgid[lv_index].iv_msgid,
la_results,
&(iv_open_tms[lv_pid_msgid[lv_index].iv_nid].iv_phandle));
if (lv_ret2 != 0)
{
tm_log_event(DTM_TM_INFO_MSGBRK_FAIL2, SQ_LOG_WARNING, "DTM_TM_INFO_MSGBRK_FAIL2",
lv_ret2,-1,nid(),-1,lv_pid_msgid[lv_index].iv_msgid);
TMTrace (1, ("TM_Info::sendAllTMs ERROR BMSG_BREAK_ returned %ld, index %d, msgid %d.\n",
lv_ret2, lv_index, lv_pid_msgid[lv_index].iv_msgid));
lv_error = FEDEVDOWN;
}
}
if (lv_error || lp_rsp[lv_index].iv_msg_hdr.miv_err.error != 0)
{
if (lv_error)
tm_log_event(DTM_TM_INFO_SENDALL_FAIL, SQ_LOG_WARNING, "DTM_TM_INFO_SENDALL_FAIL", lv_error);
else
{
tm_log_event(DTM_TM_INFO_SENDALL_FAIL, SQ_LOG_WARNING, "DTM_TM_INFO_SENDALL_FAIL", lp_rsp[lv_index].iv_msg_hdr.miv_err.error);
TMTrace (1, ("TM_Info::sendAllTMs - Request %d responded with error %d, %d. Ignoring.\n",
pp_msg->requestType(), lv_error, lp_rsp[lv_index].iv_msg_hdr.miv_err.error));
lv_error = lp_rsp[lv_index].iv_msg_hdr.miv_err.error;
}
}
lv_rsp_rcvd++;
}
} while (lv_ret == BSRETYPE_LDONE);
} // while (lv_rsp_rcvd < lv_num_sent)
delete []lp_rsp;
delete []lp_req;
TMTrace (2, ("TM_Info::sendAllTMs : EXIT, %d.\n", lv_error));
return lv_error;
} //TM_Info::sendAllTMs
// ----------------------------------------------------------------------------
// TM_Info::set_txnsvc_ready
// Purpose : Set the SQ_TXNSVC_READY registry value.
// Note that this routine will NOT return if there is an error, but will
// instead abort().
// When SQ_TXNSVC_READY is "1" the transaction service is available.
// "0" the transaction service is unavailable.
// "2" transaction service is unavailable, shutdown phase 2.
// The value of 2 is used to tell the sqstop shell script that it can now
// commence phase 2 of shutdown.
// ----------------------------------------------------------------------------
void TM_Info::set_txnsvc_ready(int32 pv_flag)
{
int32 lv_error = FEOK;
TMTrace (2, ("TM_Info::set_txnsvc_ready : ENTRY, ready? %d.\n", pv_flag));
// set registry entry to indicate whether transaction service is ready.
switch (pv_flag)
{
case 0:
lv_error = tm_reg_set(MS_Mon_ConfigType_Cluster,
(char *) "CLUSTER", (char *) "SQ_TXNSVC_READY",
(char *) "0");
break;
case 1:
lv_error = tm_reg_set(MS_Mon_ConfigType_Cluster,
(char *) "CLUSTER", (char *) "SQ_TXNSVC_READY",
(char *) "1");
break;
case 2:
lv_error = tm_reg_set(MS_Mon_ConfigType_Cluster,
(char *) "CLUSTER", (char *) "SQ_TXNSVC_READY",
(char *) "2");
break;
default:
lv_error = FEINVALOP;
}
if (lv_error != FEOK)
{
tm_log_event(DTM_TM_REGISTRY_SET_ERROR, SQ_LOG_CRIT, "DTM_TM_REGISTRY_SET_ERROR", lv_error);
TMTrace(1, ("TM_Info::set_txnsvc_ready - Registry entry error %d. TM is not ready!\n", lv_error));
abort ();
}
TMTrace (2, ("TM_Info::set_txnsvc_ready : EXIT.\n"));
} //TM_Info::set_txnsvc_ready
// ----------------------------------------------------------------------------
// TM_Info::attachRm
// Purpose : attach an RM back into the system
// ----------------------------------------------------------------------------
int32 TM_Info::attachRm(CTmTxMessage * pp_msg)
{
int32 lv_error = FEOK;
char *lp_TSEBranchname = pp_msg->request()->u.iv_attachrm.ia_rmname;
TMTrace (2, ("TM_Info::attachRm for %s : ENTRY.\n", lp_TSEBranchname));
CheckFailed_RMs (lp_TSEBranchname);
TMTrace (2, ("TM_Info::attachRm : EXIT.\n"));
return lv_error;
}
// ----------------------------------------------------------------------------
// TM_Info::enableTrans
// Purpose : Executes an enabletransaction command within the Lead TM.
// Lead TM: Sends the enableTrans out to all open TMs and waits for their replies.
// non-Lead TMs: enable transactions if in the right state.
// Returns a FEDEVDOWN error if the TM is not in tx disabled state, or
// an error if sendAllTMs fails.
// enableTrans is called by both lead and non-lead TMs.
// ----------------------------------------------------------------------------
int32 TM_Info::enableTrans(CTmTxMessage * pp_msg)
{
int32 lv_error = FEOK;
TMTrace (2, ("TM_Info::enableTrans : ENTRY.\n"));
// You can only enable transactions when they have been disabled.
// If we're shutting down it's already too late to enable transactions
if (state() != TM_STATE_TX_DISABLED)
{
TMTrace (1, ("TM_Info::enableTrans: EXIT - TM not in TxDisabled state.\n"));
return FEDEVDOWN;
}
if (iv_lead_tm)
{
lv_error = sendAllTMs(pp_msg);
if (lv_error == FEOK)
{
tm_up();
set_txnsvc_ready(TXNSVC_UP);
}
else
{ // Currently we don't handle an error, just issue a warning and continue
tm_log_event(DTM_TM_ENABLETRANS_FAIL, SQ_LOG_WARNING, "DTM_TM_ENABLETRANS_FAIL", lv_error);
TMTrace(1, ("TM_Info::enableTrans - Error %d returned by sendAllTMs.\n", lv_error));
}
}
else // Non-lead TMs must reply to the lead TM now
{
pp_msg->reply(lv_error);
tm_up();
}
TMTrace (2, ("TM_Info::enableTrans : EXIT, error %d.\n", lv_error));
return lv_error;
} //TM_Info::enableTrans
// ----------------------------------------------------------------------------
// TM_Info::disableTrans
// Purpose : Executes an disabletransaction command within both the Lead TM
// and all other TMs (via sendAllTMs).
// Lead TM: Sends the disableTrans out to all open TMs and waits for their replies.
// non-Lead TMs: disable transactions if in the right state.
// Returns the error returned by sendAllTMs.
// disableTrans is called by both lead and non-lead TMs.
// ----------------------------------------------------------------------------
int32 TM_Info::disableTrans(CTmTxMessage * pp_msg)
{
int32 lv_error = FEOK;
TMTrace (2, ("TM_Info::disableTrans : ENTRY, reqType %d, shutdown level %d.\n",
pp_msg->requestType(), pp_msg->request()->u.iv_disabletrans.iv_shutdown_level));
// We should never get an abrupt!!
/*if (pp_msg->request()->u.iv_disabletrans.iv_shutdown_level == TM_DISABLE_SHUTDOWN_ABRUPT)
{
TMTrace (1, ("TM_Info::disableTrans : disableTrans ABRUPT, shutting down SQ!\n"));
msg_mon_shutdown(MS_Mon_ShutdownLevel_Abrupt);
return FEBADERR;
} */
if ((state() == TM_STATE_DOWN || state() == TM_STATE_WAITING_RM_OPEN ||
state() == TM_STATE_SHUTTING_DOWN || state() == TM_STATE_SHUTDOWN_FAILED ||
state() == TM_STATE_SHUTDOWN_COMPLETED || state() == TM_STATE_QUIESCE))
{
lv_error = FEBADSTATE;
tm_log_event(DTM_TM_DISABLETRANS_TOOLATE, SQ_LOG_WARNING, "DTM_TM_DISABLETRANS_TOOLATE",
lv_error, -1, -1, nid(), -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, state());
TMTrace (1, ("TM_Info::disableTrans: Error %d - Too late for disableTrans - TM state = %d.\n",
lv_error, state()));
}
else
{
if (pp_msg->request()->u.iv_disabletrans.iv_shutdown_level == TM_DISABLE_SHUTDOWN_NORMAL ||
pp_msg->request()->u.iv_disabletrans.iv_shutdown_level == TM_DISABLE_SHUTDOWN_IMMEDIATE)
{
state(TM_STATE_TX_DISABLED_SHUTDOWN_PHASE1);
TMTrace (3, ("TM_Info::disableTrans: Shutdown Phase 1 detected, level %d.\n",
pp_msg->request()->u.iv_disabletrans.iv_shutdown_level));
addShutdownPhase1WaitEvent(pp_msg);
// Only delete the message object here if we are not the lead TM. The lead TM will reply and delete
// it in tm_process_req_disabletrans when it's finished scheduling the request.
if (!iv_lead_tm)
delete pp_msg;
}
else
if (state() == TM_STATE_TX_DISABLED_SHUTDOWN_PHASE1)
{
lv_error = FEINVALIDSTATE;
TMTrace (1, ("TM_Info::disableTrans: Error %d - Already shutting down.\n", lv_error));
}
else
{
state(TM_STATE_TX_DISABLED);
// Lead TM waits here for all other TMs to complete disableTrans here
if (iv_lead_tm)
{
lv_error = sendAllTMs(pp_msg);
if (lv_error == FEOK)
{
TMTrace (3, ("TM_Info::disableTrans: Disable, no shutdown.\n"));
set_txnsvc_ready(TXNSVC_DOWN);
}
else
{ // Currently we don't handle an error, just issue a warning and continue
tm_log_event(DTM_TM_DISABLETRANS_FAIL, SQ_LOG_WARNING, "DTM_TM_DISABLETRANS_FAIL", lv_error);
TMTrace(1, ("TM_Info::disableTrans - Error %d returned by sendAllTMs.\n", lv_error));
}
}
}
}
TMTrace(2, ("TM_Info::disableTrans : EXIT, error %d.\n", lv_error));
return lv_error;
} //TM_Info::disableTrans
// --------------------------------------------------------------
// TM_Info::addShutdownPhase1WaitEvent
// Purpose : Add a Shutdown Phase 1 Wait Event to wait for
// transactions to complete.
// All TMs call this.
// pp_msg is the original disableTrans message if this is the
// lead tm, or message from the lead TM if non-lead. We copy the
// msgid and request into our event so that the timer thread can
// reply if this is a non-lead TM.
// --------------------------------------------------------------
void TM_Info::addShutdownPhase1WaitEvent(CTmTxMessage * pp_msg)
{
TMTrace (2, ("TM_Info::addShutdownPhase1WaitEvent : ENTRY, msgid %d.\n", pp_msg->msgid()));
CTmTxMessage *lp_msg = new CTmTxMessage(pp_msg->request(), pp_msg->msgid());
cancelShutdownPhase1WaitEvent();
// Add Shutdown Phase 1 Wait Timer event.
// This is always processed by the timer thread, so no need to specify thread or tranasction.
// Repeat it forever, with an interval of TM_SHUTDOWNP1WAIT_DEFAULT msec.
lp_msg->requestType(TM_MSG_TXINTERNAL_SHUTDOWNP1_WAIT);
tmTimer()->ShutdownP1_event(new CTmTimerEvent(lp_msg, TM_SHUTDOWNP1WAIT_DEFAULT, -1));
tmTimer()->eventQ_push((CTmEvent *) tmTimer()->ShutdownP1_event());
TMTrace (2, ("TM_Info::addShutdownPhase1WaitEvent : EXIT, new TmTxMessage 0x%p, msgid %d\n", (void *) lp_msg, lp_msg->msgid()));
} //TM_Info::addShutdownPhase1WaitEvent
// --------------------------------------------------------------
// TM_Info::cancelShutdownPhase1WaitEvent
// Purpose : Cancel the Shutdown Phase 1 Wait event.
// --------------------------------------------------------------
void TM_Info::cancelShutdownPhase1WaitEvent()
{
TMTrace (2, ("TM_Info::cancelShutdownPhase1WaitEvent : ENTRY. Event %p\n",
(void *) tmTimer()->ShutdownP1_event()));
// If we already have a shutdown event then we must have created a corresponding
// TmTxMessage object.
if (tmTimer()->ShutdownP1_event() != NULL)
tmTimer()->cancelEvent(tmTimer()->ShutdownP1_event());
tmTimer()->ShutdownP1_event(NULL);
} //TM_Info::cancelShutdownPhase1WaitEvent
// ---------------------------------------------------------------------------
// TM_Info::ShutdownPhase1Wait
// This is called as a recurring timer event to wait for transactions to
// complete or abort during the first part of shutdown. This is driven by
// a disable transactions, shutdown normal|immediate and executes within
// the Timer thread.
// ---------------------------------------------------------------------------
void TM_Info::ShutdownPhase1Wait(CTmTxMessage *pp_msg)
{
static bool lv_first = true;
static int32 lv_calls = 1;
int32 lv_error = FEOK;
int32 lv_activeTxns = transactionPool()->get_inUseList()->size();
TMTrace (2, ("TM_Info::ShutdownPhase1Wait : ENTRY msgid %d, msg 0x%p, first %d, call %d.\n",
pp_msg->msgid(), (void *) pp_msg, lv_first, lv_calls++));
if (lv_first)
{
lv_first = false;
// If this is a shutdown immediate then abort all transactions now.
if (pp_msg->request()->u.iv_disabletrans.iv_shutdown_level == TM_DISABLE_SHUTDOWN_IMMEDIATE)
{
abort_all_active_txns();
tm_log_event(DTM_DISABLE_TRANSACTIONS_IMMEDIATE, SQ_LOG_WARNING,
"DTM_DISABLE_TRANSACTIONS_IMMEDIATE",-1,-1,nid(),-1,-1,-1,-1,
-1,-1,-1,-1,-1,-1,num_active_txs());
}
else
tm_log_event(DTM_DISABLE_TRANSACTIONS_NORMAL, SQ_LOG_WARNING,
"DTM_DISABLE_TRANSACTIONS_NORMAL",-1,-1,nid(),-1,-1,-1,-1,
-1,-1,-1,-1,-1,-1,num_active_txs());
// Lead TM waits here for all other TMs to complete transaction processing here
if (iv_lead_tm)
lv_error = sendAllTMs(pp_msg);
}
else
{
tm_log_event(DTM_TM_SHUTDOWNP1WAIT_RUNNING, SQ_LOG_WARNING,
"DTM_TM_SHUTDOWNP1WAIT_RUNNING",-1,-1,nid(),-1,-1,-1,-1,
-1,-1,-1,-1,-1,-1,num_active_txs(), lv_calls);
}
if (lv_error)
{
if (iv_lead_tm)
{
tm_log_event(DTM_TM_SHUTDOWNP1WAIT_ERR, SQ_LOG_CRIT, "DTM_TM_SHUTDOWNP1WAIT_ERR",
lv_error, -1, nid(), -1, -1, -1, -1, -1, -1, -1, -1, -1, state(), lv_activeTxns);
TMTrace (1, ("TM_Info::ShutdownPhase1Wait Lead TM error %d, there are still %d active txns waiting.\n",
lv_error, lv_activeTxns));
msg_mon_shutdown(MS_Mon_ShutdownLevel_Abrupt);
}
else
{
tm_log_event(DTM_TM_SHUTDOWNP1WAIT_NONLEAD_ERR, SQ_LOG_ERR, "DTM_TM_SHUTDOWNP1WAIT_NONLEAD_ERR",
lv_error, -1, nid(), -1, -1, -1, -1, -1, -1, -1, -1, -1, state(), lv_activeTxns);
TMTrace (1, ("TM_Info::ShutdownPhase1Wait non-lead TM error %d, there are still %d active txns waiting.\n",
lv_error, lv_activeTxns));
pp_msg->reply(lv_error);
}
}
else
{
lv_activeTxns = transactionPool()->get_inUseList()->size();
if (lv_activeTxns)
{
TMTrace (3, ("TM_Info::ShutdownPhase1Wait There are still %d active txns waiting.\n",
lv_activeTxns));
if (lv_calls == 1 || lv_calls % 10)
tm_log_event(DTM_TM_SHUTDOWNP1WAIT_TXNS, SQ_LOG_INFO, "DTM_TM_SHUTDOWNP1WAIT_TXNS",
-1, -1, nid(), -1, -1, -1, -1, -1, -1, -1, -1, -1, state(), lv_activeTxns);
}
else
{
if (iv_lead_tm)
set_txnsvc_ready(TXNSVC_SHUTDOWN);
else
pp_msg->reply(lv_error);
cancelShutdownPhase1WaitEvent();
}
}
TMTrace (2, ("TM_Info::ShutdownPhase1Wait : EXIT.\n"));
} //TM_Info::ShutdownPhase1Wait
// ---------------------------------------------------------------------------
// TM_Info::link
// Purpose : Centralized call to BMSG_LINK_.
// All linkers should use this function.
// This function will retry any retriable errors such as FENOLCB (30).
// Parameters are as for BMSG_LINK_ without control, xmitclass and including
// transid.
// pp_phandle input
// pp_msgid output
// pp_reqdata input
// pv_reqdatasize input
// pp_replydata input (where to write reply data to)
// pp_replydatamax input (max size of reply buffer)
// pv_linkertag input
// pv_pri input priority
// pv_linkopts input Link options
// pv_transid input Default 0
// pv_maxretries input Maximum retries. Default is -1 = forever.
// Returns error from BMSG_LINK_ call.
// ---------------------------------------------------------------------------
short TM_Info::link(SB_Phandle_Type *pp_phandle,
int *pp_msgid,
// short *reqctrl, Unused
// int reqctrlsize,
// short *replyctrl,
// int replyctrlmax,
char *pp_reqdata,
int pv_reqdatasize,
char *pp_replydata,
int pv_replydatamax,
long pv_linkertag,
short pv_pri,
// short xmitclass, Unused
short pv_linkopts,
int32 pv_maxretries,
TM_Transid *pp_transid)
{
short lv_ret = 0;
int32 lv_retries = 0;
bool lv_exit = false;
TM_Transid lv_transid = *pp_transid;
TMTrace(2, ("TM_Info::link ENTRY : ID (%d,%d), linker tag %ld, linkopts %d.\n",
lv_transid.get_node(), lv_transid.get_seq_num(), pv_linkertag, pv_linkopts));
do {
lv_ret = BMSG_LINK_ (pp_phandle,
pp_msgid,
NULL, // reqctrl
0, // reqctrlsize
NULL, // replyctrl
0, // replyctrlmax
pp_reqdata,
pv_reqdatasize,
pp_replydata,
pv_replydatamax,
pv_linkertag,
pv_pri,
0, // xmitclass
pv_linkopts);
lv_retries++;
if (lv_ret == FENOLCB &&
//((pv_maxretries == -1 && (lv_retries % TM_LINKRETRY_RETRIES == 0)) ||
(pv_maxretries == -1 ||
(pv_maxretries > 0 && (lv_retries <= pv_maxretries))))
{ // Message Descriptor depletion. This means we ran out of MDs.
// This is retriable, and we want to slow down the TM to allow
// some of the outstanding requests to complete.
TMTrace(1, ("TM_Info::link BMSG_LINK_ error %d, "
"linker tag %ld, retires %d/%d - Pausing thread for %dms before retrying.\n",
lv_ret, pv_linkertag,
lv_retries, pv_maxretries,
TM_LINKRETRY_PAUSE));
tm_log_event(DTM_TM_LINK_PAUSED, SQ_LOG_WARNING, "DTM_TM_LINK_PAUSED",
lv_ret, -1, lv_transid.get_node(), lv_transid.get_seq_num(), -1, -1, -1, -1, lv_retries,
-1, -1, -1, -1, TM_LINKRETRY_PAUSE /*pause in ms*/, pv_linkertag);
SB_Thread::Sthr::sleep(TM_LINKRETRY_PAUSE); // in msec
}
if (lv_ret != FENOLCB)
lv_exit = true;
else
if (pv_maxretries > 0 && lv_retries >= pv_maxretries)
lv_exit = true;
} while (!lv_exit);
if (lv_ret)
{
TMTrace(2, ("TM_Info::link EXIT : returning error %d.\n", lv_ret));
}
else
{
TMTrace(2, ("TM_Info::link EXIT : returning msgid %d.\n", *pp_msgid));
}
return lv_ret;
} //TM_Info::link
// ---------------------------------------------------------------------------
// TM_Info::all_tms_recovered
// Purpose : Determine whether all of the TMs have been recovered. They don't
// need to be up, but we must have completed recovery.
// ---------------------------------------------------------------------------
bool TM_Info::all_tms_recovered()
{
bool lv_ret = true;
lock();
// Check to see if any TMs are still recovering or down and haven't started
// recovery yet. We exclude ourselves.
for (int i=0;i<iv_tms_highest_index_used; i++)
if (i != iv_nid &&
((iv_open_tms[i].iv_recov_state == TM_FAIL_RECOV_STATE_RUNNING) ||
(iv_open_tms[i].iv_in_use == false &&
iv_open_tms[i].iv_recov_state == TM_FAIL_RECOV_STATE_INITIAL &&
iv_recovery[i].iv_node_being_recovered != -1)))
{
lv_ret = false;
break;
}
unlock();
return lv_ret;
} //TM_Info::all_tms_recovered
// ----------------------------------------------------------------------------
// TM_Info::drainTrans
// Purpose : Executes an draintransaction command within any TM.
// Returns the error ??
// drainTrans is called by both lead and non-lead TMs.
// ----------------------------------------------------------------------------
int32 TM_Info::drainTrans(CTmTxMessage * pp_msg)
{
int32 lv_error = FEOK;
TMTrace (2, ("TM_Info::drainTrans : ENTRY, immediate %d.\n",
pp_msg->request()->u.iv_draintrans.iv_immediate));
if (state() == TM_STATE_DOWN|| state() == TM_STATE_WAITING_RM_OPEN ||
state() == TM_STATE_SHUTTING_DOWN || state() == TM_STATE_SHUTDOWN_FAILED ||
state() == TM_STATE_SHUTDOWN_COMPLETED || state() == TM_STATE_QUIESCE)
{
lv_error = FEBADSTATE;
tm_log_event(DTM_TM_DRAINTRANS_TOOLATE, SQ_LOG_WARNING, "DTM_TM_DRAINTRANS_TOOLATE",
lv_error, -1, -1, nid(), -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, state());
TMTrace (1, ("TM_Info::drainTrans: Error %d - Too late for drainTrans - TM state = %d.\n",
lv_error, state()));
}
else
{
state(TM_STATE_DRAIN);
if (pp_msg->request()->u.iv_draintrans.iv_immediate)
abort_all_active_txns();
}
pp_msg->reply(lv_error);
TMTrace(2, ("TM_Info::drainTrans : EXIT, error %d.\n", lv_error));
return lv_error;
} //TM_Info::drainTrans
// ----------------------------------------------------------------------------
// TM_Info::set_sys_recov_status
// Purpose : Set the sys_recov_status. This is used to define the period of
// System Recovery in the Lead TM at startup.
// ----------------------------------------------------------------------------
void TM_Info::set_sys_recov_status(int32 pv_sys_recov_state, int32 pv_sys_recov_lead_tm_nid)
{
if (iv_trace_level >= 2)
trace_printf("TM_Info::set_sys_recov_status: ENTRY, recovery state %d, lead tm %d, TM state is %d\n",
pv_sys_recov_state, pv_sys_recov_lead_tm_nid, iv_state);
lock();
iv_sys_recov_state = pv_sys_recov_state;
iv_sys_recov_lead_tm_nid = pv_sys_recov_lead_tm_nid;
unlock();
if (iv_sys_recov_state == TM_SYS_RECOV_STATE_END && iv_state == TM_UP)
wake_TMUP_waiters(FEOK);
if (pv_sys_recov_state == TM_SYS_RECOV_STATE_END)
tm_log_event(DTM_RECOV_SYSRECOV_COMPLETE, SQ_LOG_NOTICE, "DTM_RECOV_SYSRECOV_COMPLETE",
-1,-1,gv_tm_info.nid(),-1,-1,-1,-1,-1,-1,-1,-1,-1,iv_state); //Uses Tx state for TM State!
if (iv_trace_level >= 2)
trace_printf("TM_Info::set_sys_recov_status: EXIT\n");
} //TM_Info::set_sys_recov_status
// ---------------------------------------------------------------------------
// TM_Info::txStatetoa
// Purpose : Returns a string describing the txn state
// ---------------------------------------------------------------------------
char * txStatetoa(int32 pv_state)
{
const int lc_maxStateLength = 48;
static char lv_txState[lc_maxStateLength];
char * lp_txState = (char *) &lv_txState;
memset(lp_txState, 0, lc_maxStateLength);
switch (pv_state)
{
case TM_TX_STATE_ACTIVE:
strcpy(lp_txState,"ACTIVE");
break;
case TM_TX_STATE_FORGOTTEN:
strcpy(lp_txState,"FORGOTTEN");
break;
case TM_TX_STATE_COMMITTED:
strcpy(lp_txState,"COMMITTED");
break;
case TM_TX_STATE_ABORTING:
strcpy(lp_txState,"ABORTING");
break;
case TM_TX_STATE_ABORTING_PART2:
strcpy(lp_txState,"ABORTING PT2");
break;
case TM_TX_STATE_ABORTED:
strcpy(lp_txState,"ABORTED");
break;
case TM_TX_STATE_HUNGABORTED:
strcpy(lp_txState,"HUNGABORTED");
break;
case TM_TX_STATE_HUNGCOMMITTED:
strcpy(lp_txState,"HUNGCOMMITTED");
break;
case TM_TX_STATE_COMMITTING:
strcpy(lp_txState,"COMMITTING");
break;
case TM_TX_STATE_PREPARING:
strcpy(lp_txState,"PREPARING");
break;
case TM_TX_STATE_FORGETTING:
strcpy(lp_txState,"FORGETTING");
break;
case TM_TX_STATE_FORGOTTEN_HEUR:
strcpy(lp_txState,"FORGOTTEN_HEUR");
break;
case TM_TX_STATE_FORGETTING_HEUR:
strcpy(lp_txState,"FORGETTING_HEUR");
break;
case TM_TX_STATE_BEGINNING:
strcpy(lp_txState,"BEGINNING");
break;
case TM_TX_STATE_NOTX:
strcpy(lp_txState,"INITIALIZE");
break;
default:
sprintf(lp_txState,"(Unknown Txn state %d)", pv_state);
break;
} //switch
return lp_txState;
// TM_Info::txStatetoa
}
// ---------------------------------------------------------------------------
// TM_Info::tmStatetoa
// Purpose : Returns a string describing the TM state
// ---------------------------------------------------------------------------
char * TM_Info::tmStatetoa(int32 pv_state)
{
const int lc_maxStateLength = 48;
static char lv_tmState[lc_maxStateLength];
char * lp_tmState = (char *) &lv_tmState;
memset(lp_tmState, 0, lc_maxStateLength);
switch (pv_state)
{
case TM_STATE_INITIAL:
strcpy(lp_tmState,"INITIAL");
break;
case TM_STATE_UP:
strcpy(lp_tmState,"UP");
break;
case TM_STATE_DOWN:
strcpy(lp_tmState,"DOWN");
break;
case TM_STATE_SHUTTING_DOWN:
strcpy(lp_tmState,"SHUTTING DOWN");
break;
case TM_STATE_SHUTDOWN_FAILED:
strcpy(lp_tmState,"SHUTDOWN FAILED");
break;
case TM_STATE_SHUTDOWN_COMPLETED:
strcpy(lp_tmState,"SHUTDOWN COMPLETE");
break;
case TM_STATE_TX_DISABLED:
strcpy(lp_tmState,"TXNS DISABLED");
break;
case TM_STATE_TX_DISABLED_SHUTDOWN_PHASE1:
strcpy(lp_tmState,"TXNS DISABLED, SHUTDOWN PHASE 1");
break;
case TM_STATE_QUIESCE:
strcpy(lp_tmState,"QUIESCING");
break;
case TM_STATE_DRAIN:
strcpy(lp_tmState,"DRAINING");
break;
case TM_STATE_WAITING_RM_OPEN:
strcpy(lp_tmState,"WAITING FOR RM OPENS TO COMPLETE");
break;
default:
sprintf(lp_tmState,"(Unknown TM state %d)", pv_state);
break;
} //switch
return lp_tmState;
} // TM_Info::tmStatetoa
// ---------------------------------------------------------------------------
// TM_Info::dummy_link_to_refresh_phandle
// Purpose : On NodeUp or TmRestarted, the new TM phandle will be stale
// This procedure will force a refresh in seabed and is ONLY called by the Lead DTM!
// ---------------------------------------------------------------------------
void TM_Info::dummy_link_to_refresh_phandle(int32 pv_nid)
{
short la_results[6];
Tm_Req_Msg_Type *lp_req = NULL;
Tm_Rsp_Msg_Type *lp_rsp = NULL;
int32 lv_error = FEOK;
int32 lv_index = 0;
int32 lv_num_sent = 0;
pid_msgid_struct lv_pid_msgid;
int32 lv_reqLen = 0;
long lv_ret;
long lv_ret2;
int32 lv_rspLen = 0;
int lv_rsp_rcvd = 0;
BMS_SRE_LDONE lv_sre;
TMTrace (2, ("TM_Info::dummy_link_to_refresh_phandle: ENTRY\n"));
//initialize lv_pid_msgid
lv_pid_msgid.iv_tag = 0;
lv_pid_msgid.iv_msgid = 0;
lv_pid_msgid.iv_nid = 0;
TMTrace (3, ("TM_Info::dummy_link_to_refresh_phandle sending Leadtm request to TM%d.\n",pv_nid));
lp_req = new Tm_Req_Msg_Type;
lp_rsp = new Tm_Rsp_Msg_Type;
//Send messaget to tm
lv_pid_msgid.iv_tag = 1; // non zero
lp_req->iv_msg_hdr.dialect_type = DIALECT_TM_SQ;
lp_req->iv_msg_hdr.rr_type.request_type = TM_MSG_TYPE_LEADTM;
lp_req->iv_msg_hdr.version.request_version = TM_SQ_MSG_VERSION_CURRENT;
lv_pid_msgid.iv_nid = pv_nid;
lv_reqLen = sizeof (Tm_Req_Msg_Type);
lv_rspLen = sizeof (Tm_Rsp_Msg_Type);
lv_error = link(&(iv_open_tms[pv_nid].iv_phandle), // phandle,
&lv_pid_msgid.iv_msgid, // msgid
(char *) lp_req, // reqdata
lv_reqLen, // reqdatasize
(char *) lp_rsp, // replydata
lv_rspLen, // replydatamax
lv_pid_msgid.iv_tag, // linkertag
TM_TM_LINK_PRIORITY, // pri
BMSG_LINK_LDONEQ, // linkopts
TM_LINKRETRY_RETRIES); // retry count
if (lv_error != 0)
{
TMTrace (1, ("TM_Info::dummy_link_to_refresh_phandle BMSG_LINK_ failed with error %d. failure ignored.\n",lv_error));
}
else
lv_num_sent++;
// for one tm
// LDONE LOOP
while (lv_rsp_rcvd < lv_num_sent)
{
// wait for an LDONE wakeup
XWAIT(LDONE, -1);
do {
lv_error = 0;
// we've reached our message reply count, break
if (lv_rsp_rcvd >= lv_num_sent)
break;
lv_ret = BMSG_LISTEN_((short *)&lv_sre,
BLISTEN_ALLOW_LDONEM, 0);
if (lv_ret == BSRETYPE_LDONE)
{
lv_index = -1;
if (lv_pid_msgid.iv_tag == lv_sre.sre_linkTag)
{
lv_index = pv_nid;
}
if (lv_index == -1)
{
TMTrace (1, ("TM_Info::dummy_link_to_refresh_phandle - Link Tag %d not found\n", (int)lv_sre.sre_linkTag));
lv_error = FEDEVDOWN;
}
if (!lv_error)
{
lv_ret2 = BMSG_BREAK_(lv_pid_msgid.iv_msgid,
la_results,
&(iv_open_tms[pv_nid].iv_phandle));
if (lv_ret2 != 0)
{
TMTrace (1, ("TM_Info::dummy_link_to_refresh_phandle ERROR BMSG_BREAK_ returned %ld, index %d, msgid %d.\n",
lv_ret2, lv_index, lv_pid_msgid.iv_msgid));
lv_error = FEDEVDOWN;
}
}
if (lv_error == FEDEVDOWN)
{
TMTrace (1, ("TM_Info::dummy_link_to_refresh_phandle - TM respond error\n"));
}
lv_rsp_rcvd++;
}
} while (lv_ret == BSRETYPE_LDONE);
}// while (lv_rsp_rcvd < lv_num_sent)
TMTrace (2, ("TM_Info::dummy_link_to_refresh_phandle: EXIT\n"));
}