blob: 4df9b14a971d58cc3ea2fe17c7b1e5e2d06510c9 [file] [log] [blame]
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
// seabed includes
#include "seabed/ms.h"
#include "seabed/fs.h"
#include "seabed/trace.h"
#include "seabed/thread.h"
#include "javaobjectinterfacetm.h"
// tm includes
#include "dtm/tm_util.h"
//#include "tmtx.h"
#include "tmlib.h"
#include "tmlogging.h"
//extern int HbaseTM_initiate_stall(int where); Shouldn't need these here.
//extern HashMapArray* HbaseTM_process_request_regions_info();
//==== For the JNI call to RMInterface.cleartransaction - begin
#include <iostream>
#include "jni.h"
jclass TMLIB::javaClass_ = 0;
// ==============================================================
// === HBaseTM return codes
// ==============================================================
enum HBTM_RetCode {
RET_OK = 0,
RET_NOTX,
RET_READONLY,
RET_ADD_PARAM,
RET_EXCEPTION,
RET_HASCONFLICT,
RET_IOEXCEPTION,
RET_NOCOMMITEX,
RET_LAST
};
// global externsTMLIB_ThreadTxn_Object
__thread TMLIB_ThreadTxn_Object *gp_trans_thr;
TMLIB gv_tmlib;
bool gv_tmlib_initialized = false;
// --------------------------------------------------------------------
// Helper methods
// --------------------------------------------------------------------
// block signal 60 as soon as possible upon starting
#define SQ_LIO_SIGNAL_REQUEST_REPLY (SIGRTMAX - 4)
static int tm_rtsigblock_proc() {
sigset_t lv_sig_set;
// Setup signal handling
sigemptyset(&lv_sig_set);
sigaddset(&lv_sig_set, SQ_LIO_SIGNAL_REQUEST_REPLY);
int err = pthread_sigmask(SIG_BLOCK, &lv_sig_set, NULL);
if (err)
abort();
//fprintf(stderr,"Blocked signal %d.\n",SQ_LIO_SIGNAL_REQUEST_REPLY);
fflush(stderr);
return 0;
}
//----------------------------------------------------------------------------
// Map HBase-trx error to DTM
//----------------------------------------------------------------------------
short HBasetoTxnError(short pv_HBerr)
{
switch (pv_HBerr)
{
case RET_OK: return FEOK;
case RET_NOTX: return FENOTRANSID;
case RET_READONLY: return FEOK; //Read-only reply is ok
case RET_ADD_PARAM: return FEBOUNDSERR;
case RET_EXCEPTION: return FETRANSEXCEPTION;
case RET_HASCONFLICT: return FEHASCONFLICT;
case RET_IOEXCEPTION: return FETRANSIOEXCEPTION;
case RET_NOCOMMITEX: return FEABORTEDTRANSID;
default:
printf("Unknown error %d encountered, returning FETRANSERRUNKNOWN\n.", pv_HBerr);
return FETRANSERRUNKNOWN;
}
}
// -------------------------------------------------------------------
// tmlib_trace_enabled
// -- will return if tracing is enabled, and will initialize if not
// already done
// -------------------------------------------------------------------
bool tmlib_trace_enabled(int pv_level)
{
static bool lv_trace_enabled = false;
static bool lv_trace_on = false;
static int lv_trace_level = 0;
bool lv_unique = false;
if (!lv_trace_enabled)
{
ms_getenv_bool("TMLIB_TRACE", &lv_trace_on);
if (lv_trace_on)
{
lv_trace_level = 1; // default
ms_getenv_int("TMLIB_TRACE_DETAIL", &lv_trace_level); //set the detail
ms_getenv_bool("TMLIB_TRACE_UNIQUE", &lv_unique);
const char *lp_file = ms_getenv_str ("TMLIB_TRACE_FILE");
if (lp_file != NULL)
{
char *lp_trace_file = (char *)lp_file;
trace_init(lp_trace_file, lv_unique, NULL, false);
}
else
trace_init((char *)"tmlib_trace", lv_unique, NULL, false);
trace_printf("TMLIB_TRACE : Process Initialize, trace level = %d\n", lv_trace_level);
}
lv_trace_enabled = true;
}
if (lv_trace_level >= pv_level)
return true;
return false;
}
// ---------------------------------------------------------------
// tmlib_zero_transid
// Purpose - return if the transid is all zeros
// --------------------------------------------------------------
bool tmlib_zero_transid(TM_Transid_Type *pp_transid)
{
if (pp_transid)
{
if ((pp_transid->id[0] == 0) &&
(pp_transid->id[1] == 0) &&
(pp_transid->id[2] == 0) &&
(pp_transid->id[3] == 0))
return true;
else
return false;
}
// null transid
return true;
}
// ----------------------------------------------------------------
// tmlib_init_req_hdr
// Purpose - Initialize the header field for a TM Library
// message request.
// ----------------------------------------------------------------
int tmlib_init_req_hdr(short req_type, Tm_Req_Msg_Type *pp_req)
{
pp_req->iv_msg_hdr.dialect_type = DIALECT_TM_SQ;
pp_req->iv_msg_hdr.rr_type.request_type = req_type;
pp_req->iv_msg_hdr.version.request_version =
TM_SQ_MSG_VERSION_CURRENT;
pp_req->iv_msg_hdr.miv_err.minimum_interpretation_version =
TM_SQ_MSG_VERSION_CURRENT;
return 0;
}
// ------------------------------------------------------------
// set_transid
// Purpose - need this for now to convert from seabed transid
// to ours
// ------------------------------------------------------------
void tmlib_set_transid_from_ms ( TM_Transid *pp_transid, MS_Mon_Transid_Type pv_transid2)
{
TM_Transid_Type pv_ext_transid;
pv_ext_transid.id[0] = pv_transid2.id[0];
pv_ext_transid.id[1] = pv_transid2.id[1];
pv_ext_transid.id[2] = pv_transid2.id[2];
pv_ext_transid.id[3] = pv_transid2.id[3];
*pp_transid = pv_ext_transid;
}
// ------------------------------------------------------------
// set_transid_startid
// Purpose - need this for now to convert from seabed transid
// to ours
// ------------------------------------------------------------
void tmlib_set_transid_startid_from_ms ( TM_Transid *pp_transid, MS_Mon_Transid_Type pv_transid2, TM_Transseq_Type *pv_startid, MS_Mon_Transseq_Type pv_startid2)
{
TM_Transid_Type pv_ext_transid;
pv_ext_transid.id[0] = pv_transid2.id[0];
pv_ext_transid.id[1] = pv_transid2.id[1];
pv_ext_transid.id[2] = pv_transid2.id[2];
pv_ext_transid.id[3] = pv_transid2.id[3];
*pp_transid = pv_ext_transid;
*pv_startid = pv_startid2;
}
void tmlib_set_ms_from_transid(TM_Transid_Type pp_transid, MS_Mon_Transid_Type *pp_transid2)
{
pp_transid2->id[0] = pp_transid.id[0];
pp_transid2->id[1] = pp_transid.id[1];
pp_transid2->id[2] = pp_transid.id[2];
pp_transid2->id[3] = pp_transid.id[3];
}
void tmlib_set_ms_from_transid_startid(TM_Transid_Type pp_transid, MS_Mon_Transid_Type *pp_transid2, TM_Transseq_Type pv_startid, MS_Mon_Transseq_Type *pv_startid2)
{
pp_transid2->id[0] = pp_transid.id[0];
pp_transid2->id[1] = pp_transid.id[1];
pp_transid2->id[2] = pp_transid.id[2];
pp_transid2->id[3] = pp_transid.id[3];
*pv_startid2 = pv_startid;
}
// ------------------------------------------------------------
// tmlib_check_active_tx
// -- check if there is an active transaction
// ------------------------------------------------------------
short tmlib_check_active_tx ( )
{
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
if (gp_trans_thr->get_current() == NULL)
{
TMlibTrace(("TMLIB_TRACE : tmlib_check_active_tx returning FENOTRANSID\n"), 3);
return FENOTRANSID;
}
return FEOK;
}
// -----------------------------------------------------------
// tmlib_check_miss_param
// -- check if the param is valid (i.e. not NULL)
// ----------------------------------------------------------
short tmlib_check_miss_param( void * pp_param)
{
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// make sure there is space for pp_status
if (pp_param == NULL)
{
TMlibTrace(("TMLIB_TRACE : tmlib_check_miss_param returning FEMISSPARM\n"), 1);
return FEMISSPARM;
}
return FEOK;
}
// ------------------------------------------------------------
// tmlib_check_outstanding_ios
// -- check if there are outstanding_ios
// -----------------------------------------------------------
short tmlib_check_outstanding_ios()
{
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// make sure there is space for pp_status
if (gp_trans_thr->get_current_ios())
{
TMlibTrace(("TMLIB_TRACE : tmlib_check_outstanding_ios returning "
"FETRANSNOWAITOUT. %d outstanding ios.\n", gp_trans_thr->get_current_ios()), 1);
return FETRANSNOWAITOUT;
}
return FEOK;
}
short tmlib_send_suspend(TM_Transid pv_transid, bool pv_coord_role, int pv_pid)
{
short lv_error;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
TMlibTrace(("TMLIB_TRACE : tmlib_send_suspend ENTRY\n"), 2);
if (!gv_tmlib.is_initialized())
{
gv_tmlib.initialize();
}
tmlib_init_req_hdr(TM_MSG_TYPE_SUSPENDTRANSACTION, &lv_req);
pv_transid.set_external_data_type (&lv_req.u.iv_suspend_trans.iv_transid);
lv_req.u.iv_suspend_trans.iv_coord_role = pv_coord_role;
lv_req.u.iv_suspend_trans.iv_pid = pv_pid;
lv_req.u.iv_suspend_trans.iv_nid = gv_tmlib.iv_my_nid;
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, pv_transid.get_node());
TMlibTrace(("TMLIB_TRACE : tmlib_send_suspend (seq num %d) EXIT returning %d\n",
pv_transid.get_seq_num(), lv_error), 2);
if (lv_error)
return lv_error;
if (lv_rsp.iv_msg_hdr.miv_err.error)
return lv_rsp.iv_msg_hdr.miv_err.error;
return FEOK;
}
// ----------------------------------------------------------------
// tmlib_callback
// Purpose - callback registered with Seabed upon startup, used
// for File System Propagation
// ---------------------------------------------------------------
int tmlib_callback (MS_Mon_Tmlib_Fun_Type pv_fun,
MS_Mon_Transid_Type pv_transid,
MS_Mon_Transid_Type *pp_transid_out)
{
char la_buf[DTM_STRING_BUF_SIZE];
int lv_return = FEOK;
TM_Transid lv_transid;
TMlibTrace(("TMLIB_TRACE : tmlib_callback ENTRY with function %d \n",
pv_fun), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
tmlib_set_transid_from_ms (&lv_transid, pv_transid);
switch (pv_fun)
{
// called in the client to get the current transaction
case TMLIB_FUN_GET_TX:
{
TMlibTrace(("TMLIB_TRACE : tmlib_callback, FUN_GET_TX\n"), 3);
// active transaction
if (gp_trans_thr->get_current() != NULL)
{
if (pp_transid_out == NULL)
lv_return = FEMISSPARM;
else
{
gp_trans_thr->increase_current_ios();
// otherwise, return the active transaction to propagate
tmlib_set_ms_from_transid(
gp_trans_thr->get_current()->getTransid()->get_data(), pp_transid_out);
TMlibTrace(("TMLIB_TRACE : tmlib_callback, FUN_GET_TX, returning seq num %d\n",
gp_trans_thr->get_current()->getTransid()->get_seq_num()), 3);
}
}
break;
}
// called in the server to register a propagated transaction
case TMLIB_FUN_REG_TX:
{
// if the tx is already active, increase the depth as we
// cannot delete the transaction before the final reply.
// in a nowait env, we can have more than 1 request come
// in for the same tx
TMlibTrace(("TMLIB_TRACE : tmlib_callback, TMLIB_FUN_REG_TX, seq num %d\n", lv_transid.get_seq_num()), 3);
if ((gp_trans_thr->get_current()!= NULL) && (gp_trans_thr->get_current()->equal(lv_transid)))
{
gp_trans_thr->increase_current_depth();
}
else
{
// if its not active, add it if need be
lv_return = gv_tmlib.add_or_update(lv_transid);
}
break;
}
// called in the server to clear the propagated transaction
case TMLIB_FUN_CLEAR_TX:
{
// make sure we are clearing the proper transaction
TMlibTrace(("TMLIB_TRACE : tmlib_callback, TMLIB_FUN_CLEAR_TX, seq num %d\n",
lv_transid.get_seq_num()), 3);
// transaction may have been aborted and was cleared out
if (gp_trans_thr->get_current())
{
// If the transaction being cleared doesn't match the current transaction then
// we assume that Seabed is well behaved, but that the application has managed
// to set the current transaction since the reinstate callback! So we re-do
// the reinstate, clear it and then set the current transaction back to what
// it was when we were called.
if (!(gp_trans_thr->get_current()->equal(lv_transid)))
{
sprintf(la_buf, "Warning: File System transaction to be cleared does not "
"match current transaction %d, assuming it changed since the reinstate.\n",
gp_trans_thr->get_current()->getTransid()->get_seq_num());
tm_log_write(DTM_LIB_TRANS_INVALID_ID, SQ_LOG_WARNING, la_buf);
TMlibTrace(("TMLIB_TRACE : tmlib_callback, TMLIB_FUN_CLEAR_TX, %s\n",
la_buf), 1);
TM_Transaction *lp_saveTrans = gp_trans_thr->get_current();
gv_tmlib.reinstate_tx (&lv_transid);
gv_tmlib.clear_entry(lv_transid, true /*server*/, false);
gp_trans_thr->set_current(lp_saveTrans);
}
else
gv_tmlib.clear_entry (lv_transid, true /*server*/, false);
}
break;
}
// called in the client to reinstate a transaction that left the process
case TMLIB_FUN_REINSTATE_TX:
{
TMlibTrace(("TMLIB_TRACE : tmlib_callback, TMLIB_FUN_REINSTATE_TX, seq num %d\n",
lv_transid.get_seq_num()), 3);
// if we have an active tx, it better be the same!
if (gp_trans_thr->get_current())
{
if (!(gp_trans_thr->get_current()->equal(lv_transid)))
lv_return = FEINVTRANSID;
}
else
gv_tmlib.reinstate_tx (&lv_transid);
/* We have to allow this for aborts with outstanding I/Os.
if (!(gp_trans_thr->get_current()))
{
sprintf(la_buf, "Transaction reinstatement failed.\n");
tm_log_write(DTM_LIB_INVALID_TRANS, SQ_LOG_CRIT, la_buf);
abort();
} */
if (gp_trans_thr->get_current())
gp_trans_thr->decrease_current_ios();
else
lv_return = FEINVTRANSID;
break;
}
default:
{
sprintf(la_buf, "Seabed software fault - bad input pv_fun %d\n", pv_fun);
TMlibTrace(("TMLIB_TRACE : tmlib_callback failed %s", la_buf), 1);
tm_log_write(DTM_SEA_SOFT_FAULT, SQ_LOG_CRIT, la_buf);
abort(); // seabed software fault - bad input
break;
}
}
TMlibTrace(("TMLIB_TRACE : tmlib_callback EXIT with error %d\n",
lv_return), 2);
return lv_return;
}
// ----------------------------------------------------------------
// tmlib_callback2
// Purpose - callback registered with Seabed upon startup, used
// for File System Propagation (transId and StartId)
// ---------------------------------------------------------------
int tmlib_callback2 (MS_Mon_Tmlib_Fun_Type pv_fun,
MS_Mon_Transid_Type pv_transid,
MS_Mon_Transid_Type *pp_transid_out,
MS_Mon_Transseq_Type pv_startid,
MS_Mon_Transseq_Type *pp_startid_out)
{
char la_buf[DTM_STRING_BUF_SIZE];
int lv_return = FEOK;
TM_Transid lv_transid;
TM_Transseq_Type lv_startid;
TMlibTrace(("TMLIB_TRACE : tmlib_callback2 ENTRY with function %d \n",
pv_fun), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
tmlib_set_transid_startid_from_ms (&lv_transid, pv_transid, &lv_startid, pv_startid);
switch (pv_fun)
{
// called in the client to get the current transaction
case TMLIB_FUN_GET_TX:
{
TMlibTrace(("TMLIB_TRACE : tmlib_callback2, FUN_GET_TX\n"), 3);
// active transaction
if (gp_trans_thr->get_current() != NULL)
{
if ((pp_transid_out == NULL) || (pp_startid_out == NULL))
lv_return = FEMISSPARM;
else
{
gp_trans_thr->increase_current_ios();
// otherwise, return the active transaction to propagate
tmlib_set_ms_from_transid_startid(
gp_trans_thr->get_current()->getTransid()->get_data(), pp_transid_out,
gp_trans_thr->get_startid(), pp_startid_out);
TMlibTrace(("TMLIB_TRACE : tmlib_callback2, FUN_GET_TX, returning seq num %d and startid %ld\n",
gp_trans_thr->get_current()->getTransid()->get_seq_num(), gp_trans_thr->get_startid()), 3);
}
}
break;
}
// called in the server to register a propagated transaction
case TMLIB_FUN_REG_TX:
{
// if the tx is already active, increase the depth as we
// cannot delete the transaction before the final reply.
// in a nowait env, we can have more than 1 request come
// in for the same tx
TMlibTrace(("TMLIB_TRACE : tmlib_callback2, TMLIB_FUN_REG_TX, seq num %d\n", lv_transid.get_seq_num()), 3);
if ((gp_trans_thr->get_current()!= NULL) && (gp_trans_thr->get_current()->equal(lv_transid)))
{
gp_trans_thr->increase_current_depth();
}
else
{
// if its not active, add it if need be
lv_return = gv_tmlib.add_or_update(lv_transid, lv_startid);
}
break;
}
// called in the server to clear the propagated transaction
case TMLIB_FUN_CLEAR_TX:
{
// make sure we are clearing the proper transaction
TMlibTrace(("TMLIB_TRACE : tmlib_callback2, TMLIB_FUN_CLEAR_TX, seq num %d\n",
lv_transid.get_seq_num()), 3);
// transaction may have been aborted and was cleared out
if (gp_trans_thr->get_current())
{
// If the transaction being cleared doesn't match the current transaction then
// we assume that Seabed is well behaved, but that the application has managed
// to set the current transaction since the reinstate callback! So we re-do
// the reinstate, clear it and then set the current transaction back to what
// it was when we were called.
if (!(gp_trans_thr->get_current()->equal(lv_transid)))
{
sprintf(la_buf, "Warning: File System transaction to be cleared does not "
"match current transaction %d, assuming it changed since the reinstate.\n",
gp_trans_thr->get_current()->getTransid()->get_seq_num());
tm_log_write(DTM_LIB_TRANS_INVALID_ID, SQ_LOG_WARNING, la_buf);
TMlibTrace(("TMLIB_TRACE : tmlib_callback2, TMLIB_FUN_CLEAR_TX, %s\n",
la_buf), 1);
TM_Transaction *lp_saveTrans = gp_trans_thr->get_current();
TM_Transseq_Type lv_saveStartId = gp_trans_thr->get_startid();
gv_tmlib.reinstate_tx (&lv_transid);
gv_tmlib.clear_entry(lv_transid, true /*server*/, false);
gp_trans_thr->set_current(lp_saveTrans);
gp_trans_thr->set_startid(lv_saveStartId);
}
else
gv_tmlib.clear_entry (lv_transid, true /*server*/, false);
}
break;
}
// called in the client to reinstate a transaction that left the process
case TMLIB_FUN_REINSTATE_TX:
{
TMlibTrace(("TMLIB_TRACE : tmlib_callback2, TMLIB_FUN_REINSTATE_TX, seq num %d\n",
lv_transid.get_seq_num()), 3);
// if we have an active tx, it better be the same!
if (gp_trans_thr->get_current())
{
if (!(gp_trans_thr->get_current()->equal(lv_transid)))
lv_return = FEINVTRANSID;
}
else{
gv_tmlib.reinstate_tx (&lv_transid);
gp_trans_thr->set_startid(lv_startid);
}
/* We have to allow this for aborts with outstanding I/Os.
if (!(gp_trans_thr->get_current()))
{
sprintf(la_buf, "Transaction reinstatement failed.\n");
tm_log_write(DTM_LIB_INVALID_TRANS, SQ_LOG_CRIT, la_buf);
abort();
} */
if (gp_trans_thr->get_current())
gp_trans_thr->decrease_current_ios();
else
lv_return = FEINVTRANSID;
break;
}
default:
{
sprintf(la_buf, "Seabed software fault - bad input pv_fun %d\n", pv_fun);
TMlibTrace(("TMLIB_TRACE : tmlib_callback2 failed %s", la_buf), 1);
tm_log_write(DTM_SEA_SOFT_FAULT, SQ_LOG_CRIT, la_buf);
abort(); // seabed software fault - bad input
break;
}
}
TMlibTrace(("TMLIB_TRACE : tmlib_callback2 EXIT with error %d\n",
lv_return), 2);
return lv_return;
}
// TOPL REGISTERTRANSACTION
short REGISTERREGION(long transid, long startid, int pv_port, char *pa_hostname, int pv_hostname_length, long pv_startcode, char *pa_regionInfo, int pv_regionInfo_length)
{
short lv_error = FEOK;
TM_Transaction *lp_trans = NULL;
TM_Transid lv_transid((TM_Native_Type) transid);
TM_Transseq_Type lv_startid((TM_Transseq_Type) startid);
// instantiate a gp_trans_thr object for this thread if needed.
TMlibTrace(("TMLIB_TRACE : REGISTERREGION ENTRY: transid: %ld, txid: (%d,%d), startId: %ld, port: %d, hostname %s, length: %d, startcode: %ld, regionInfo: %s, length: %d.\n",
transid, lv_transid.get_node(), lv_transid.get_seq_num(), startid, pv_port, pa_hostname, pv_hostname_length, pv_startcode, pa_regionInfo, pv_regionInfo_length), 2);
if (gp_trans_thr == NULL){
TMlibTrace(("REGISTERREGION gp_trans_thr is null\n"), 2);
gp_trans_thr = new TMLIB_ThreadTxn_Object();
gp_trans_thr->set_startid(lv_startid);
}
TM_Transaction *lp_currTrans = gp_trans_thr->get_current();
// Check if the passed-in transid is known to the thread
// if so, make that Transid current
if (lp_currTrans == NULL) {
lp_currTrans = gp_trans_thr->get_trans (lv_transid.get_native_type());
if (lp_currTrans != NULL)
gp_trans_thr->set_current(lp_currTrans);
}
// Check if the thread's current transid matches the passed-in transid
// If not, check if the passed-in transid is known to the thread
// if so, make that Transid current
else if (! lp_currTrans->equal(lv_transid)) {
lp_currTrans = gp_trans_thr->get_trans (lv_transid.get_native_type());
if (lp_currTrans != NULL)
gp_trans_thr->set_current(lp_currTrans);
}
TM_Transseq_Type lv_savedStartId = gp_trans_thr->get_startid();
TMlibTrace(("REGISTERREGION lv_savedStartId is %ld. Startid is %ld \n", (long) lv_savedStartId, startid), 2);
if (lv_savedStartId != lv_startid){
TMlibTrace(("REGISTERREGION setting lv_savedStartId to %ld. \n", startid), 2);
lv_savedStartId = lv_startid;
gp_trans_thr->set_startid(lv_startid);
}
if (lp_currTrans != NULL)
{
TMlibTrace(("TMLIB_TRACE : REGISTERREGION using current transid (%d,%d) and startId %ld.\n",
lv_transid.get_node(), lv_transid.get_seq_num(),lv_savedStartId ), 1);
lv_error = lp_currTrans->register_region(lv_savedStartId, pv_port, pa_hostname, pv_hostname_length, pv_startcode, pa_regionInfo, pv_regionInfo_length);
}
// Create a temp TM_transaction object to pass the trans id to REGION SERVER
else {
lp_trans = new TM_Transaction();
lp_trans->setTransid(lv_transid);
lp_trans->setTag(gv_tmlib.new_tag());
TMlibTrace(("TMLIB_TRACE : REGISTERREGION using transid (%d,%d) and startId (%ld) passed to REGISTERREGION.\n",
lv_transid.get_node(), lv_transid.get_seq_num(),lv_startid), 1);
lv_error = lp_trans->register_region(lv_startid, pv_port, pa_hostname, pv_hostname_length, pv_startcode, pa_regionInfo, pv_regionInfo_length);
delete lp_trans;
}
TMlibTrace(("TMLIB_TRACE : REGISTERREGION EXIT: txid: (%d,%d), returning %d\n",
lv_transid.get_node(), lv_transid.get_seq_num(), lv_error), 2);
return lv_error;
} //REGISTERREGION
// -------------------------------------------------------------------
// CREATETABLE
//
// Purpose: send CREATETABLE message to the TM
// Params: pa_tabledesc, pv_tabledesc_length, pv_tblname, transid
// -------------------------------------------------------------------
short CREATETABLE(char *pa_tbldesc, int pv_tbldesc_length, char *pv_tblname, char** pv_keys, int pv_numsplits, int pv_keylen, long transid ,
char* &pv_err_str, int &pv_err_len)
{
TM_Transid lv_transid((TM_Native_Type) transid);
short lv_error = FEOK;
TMlibTrace(("TMLIB_TRACE : CREATETABLE ENTRY: txid: (%d,%d), tablename: %s, numsplits: %d, keylen %d\n",
lv_transid.get_node(), lv_transid.get_seq_num(), pv_tblname, pv_numsplits, pv_keylen), 2);
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
TM_Transaction *lp_trans = gp_trans_thr->get_current();
lv_error = lp_trans->create_table(pa_tbldesc, pv_tbldesc_length,
pv_tblname, pv_keys, pv_numsplits, pv_keylen,
pv_err_str, pv_err_len);
TMlibTrace(("TMLIB_TRACE : CREATETABLE EXIT: txid: (%d,%d), returning %d\n",
lv_transid.get_node(), lv_transid.get_seq_num(), lv_error), 2);
return lv_error;
}
// -------------------------------------------------------------------
// REGTRUNCATEONABORT
//
// Purpose: send REGTRUNCATEONABORT message to the TM
// Params: pa_tabledesc, pv_tabledesc_length, pv_tblname, transid
// -------------------------------------------------------------------
short REGTRUNCATEONABORT(char *pv_tblname, int pv_tblname_len, long pv_transid,
char* &pv_err_str, int &pv_err_len)
{
short lv_error = FEOK;
TM_Transid lv_transid((TM_Native_Type) pv_transid);
TMlibTrace(("TMLIB_TRACE : REGTRUNCATEONABORT ENTRY: txid: (%d,%d), tablename: %s\n",
lv_transid.get_node(), lv_transid.get_seq_num(), pv_tblname), 2);
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
TM_Transaction *lp_trans = gp_trans_thr->get_current();
lv_error = lp_trans->reg_truncateonabort(pv_tblname, pv_tblname_len,
pv_err_str, pv_err_len);
TMlibTrace(("TMLIB_TRACE : REGTRUNCATEONABORT EXIT: txid: (%d,%d), tablename: %s, returning %d\n",
lv_transid.get_node(), lv_transid.get_seq_num(), pv_tblname, lv_error), 2);
return lv_error;
}
short ALTERTABLE(char *pv_tblname, int pv_tblname_len, char ** pv_tbloptions,
int pv_tbloptslen, int pv_tbloptscnt, long pv_transid,
char* &pv_err_str, int &pv_err_len)
{
short lv_error = FEOK;
TM_Transid lv_transid((TM_Native_Type) pv_transid);
TMlibTrace(("TMLIB_TRACE : ALTERTABLE ENTRY: txid: (%d,%d), tablename: %s\n",
lv_transid.get_node(), lv_transid.get_seq_num(), pv_tblname), 2);
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
TM_Transaction *lp_trans = gp_trans_thr->get_current();
lv_error = lp_trans->alter_table(pv_tblname, pv_tblname_len, pv_tbloptions,
pv_tbloptslen, pv_tbloptscnt, pv_err_str, pv_err_len);
TMlibTrace(("TMLIB_TRACE : ALTERTABLE EXIT: txid: (%d,%d), tablename: %s, returning %d\n",
lv_transid.get_node(), lv_transid.get_seq_num(), pv_tblname, lv_error), 2);
return lv_error;
}
// -------------------------------------------------------------------
// DROPTABLE
//
// Purpose: send DROPTABLE message to TM
// Params: pv_tablename, transid
// -------------------------------------------------------------------
short DROPTABLE(char *pv_tblname, int pv_tblname_len, long transid,
char* &pv_err_str, int &pv_err_len)
{
short lv_error = FEOK;
TM_Transid lv_transid((TM_Native_Type) transid);
TMlibTrace(("TMLIB_TRACE : DROPTABLE ENTRY: txid: (%d,%d), tablename: %s\n",
lv_transid.get_node(), lv_transid.get_seq_num(), pv_tblname), 2);
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
TM_Transaction *lp_trans = gp_trans_thr->get_current();
lv_error = lp_trans->drop_table(pv_tblname, pv_tblname_len, pv_err_str,
pv_err_len);
TMlibTrace(("TMLIB_TRACE : DROPTABLE EXIT: txid: (%d,%d), tablename: %s, returning %d\n",
lv_transid.get_node(), lv_transid.get_seq_num(), pv_tblname, lv_error), 2);
return lv_error;
}
short HBASETM_REQUESTREGIONINFO(TM_HBASEREGIONINFO pa_trans[], short *pp_count)
{
TMlibTrace(("TRY::TEST::: TMLIB_TRACE : REQUESTREGIONINFO entry\n"), 2);
short lv_error = FEOK;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
TMlibTrace(("TMLIB_TRACE : REQUESTREGIONINFO entry\n"), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
lv_error = tmlib_check_miss_param (pp_count);
if (lv_error)
return lv_error;
tmlib_init_req_hdr(TM_MSG_TYPE_REQUESTREGIONINFO, &lv_req);
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
lv_error = tmlib_check_miss_param (pp_count);
if (lv_error)
return lv_error;
tmlib_init_req_hdr(TM_MSG_TYPE_REQUESTREGIONINFO, &lv_req);
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, gv_tmlib.iv_my_nid);
if (lv_error)
{
*pp_count = 0;
TMlibTrace(("TMLIB_TRACE : HBASETM_REQUESTREGIONINFO EXIT with error %d\n", lv_error), 1);
return lv_error;
}
*pp_count = lv_rsp.u.iv_hbaseregion_info.iv_count;
for(int i=0; i < *pp_count; i++)
memcpy((void *) &pa_trans[i], &lv_rsp.u.iv_hbaseregion_info.iv_trans[i], (sizeof(TM_HBASEREGIONINFO)));
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
TMlibTrace(("TMLIB_TRACE : REQUESTREGIONINFO exit\n"), 2);
return lv_error;
}//HBASETM_REQUESTREGIONINFO
// -------------------------------------------------------------------
// ABORTTRANSACTION
//
// Purpose: send ABORTTRANSACTION message to the TM
// Params : none
//-------------------------------------------------------------------
short ABORTTRANSACTION()
{
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
TM_Transaction *lp_trans = gp_trans_thr->get_current();
if (lp_trans == NULL)
{
TMlibTrace(("TMLIB_TRACE : ABORTTRANSACTION returning with error %d\n",
FENOTRANSID), 1);
return FENOTRANSID;
}
short lv_error = lp_trans->abort();
// cleanup for legacy API
if ((lv_error == FEINVTRANSID) ||
(lv_error == FENOTRANSID) ||
(lv_error == FEOK) ||
(lv_error == FEABORTEDTRANSID) ||
(lv_error == FEENDEDTRANSID))
{
// abort removes the tx from the list and deletes the
// enlistment object. We simply need to delete the trans
gp_trans_thr->set_current(NULL);
TM_Native_Type lv_native_type_txid = lp_trans->getTransid()->get_native_type();
gv_tmlib.cleanupTransactionLocal(lv_native_type_txid);
delete lp_trans;
}
return lv_error;
}
//------------------------------------------------------------------------
// BEGINTRANSACTION
//
// Purpose : send BEGINTRANSACTION message to TM
// BEGINTRANSACTION calls BEGINTX with a timeout value of 0 to pickup
// the default auto-abort timeout.
// Params : pp_tag, pointer to a tag (out), defines the tx among
// others for this process (and for now - all processes)
// ---------------------------------------------------------------------
short BEGINTRANSACTION(int *pp_tag)
{
return BEGINTX(pp_tag, 0, TM_TT_NOFLAGS);
}
//------------------------------------------------------------------------
// BEGINTX
//
// Purpose : send BEGINTRANSACTION message to TM
// Params : pp_tag, pointer to a tag (out), defines the tx among
// others for this process (and for now - all processes)
// pv_timeout contains the optional auto-abort time in seconds.
// 0 => transaction uses default auto-abort.
// -1 => transaction never times out.
// pv_type_flags contains the transaction type
// flags for this transaction. Supported flags:
// TM_TT_NOFLAGS - No flags specified.
// TM_TT_NO_UNDO - Do not undo transaction on rollback.
// TM_TT_FORCE_CONSISTENCY - Overrides NO_UNDO - The
// audit process will not set NO_UNDO for this transaction.
// ---------------------------------------------------------------------
short BEGINTX(int *pp_tag, int pv_timeout, int64 pv_type_flags)
{
TM_Transaction *lp_trans = NULL;
short lv_error = FEOK;
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
if (tmlib_check_miss_param(pp_tag) != FEOK)
{
TMlibTrace(("TMLIB_TRACE : BEGINTX returning with error %d\n",
FEMISSPARM), 1);
return FEMISSPARM;
}
if (pv_timeout < -1)
{
TMlibTrace(("TMLIB_TRACE : BEGINTX returning with error %d\n",
FEBADPARMVALUE), 1);
return FEBADPARMVALUE;
}
lp_trans = new TM_Transaction(pv_timeout, pv_type_flags);
if (lp_trans == NULL)
{
TMlibTrace(("TMLIB_TRACE : BEGINTX returning with error %d\n",
FENOBUFSPACE), 1);
return FENOBUFSPACE;
}
lv_error = lp_trans->get_error();
if (!lv_error)
*pp_tag = (int) lp_trans->getTag();
return lv_error;
} //BEGINTX
//--------------------------------------------------------------------
//DEALLOCATE_ERR
//
//Purpose : Called subsequent to ENDTRANSACTION_ERR
//Params : none
//--------------------------------------------------------------------
void DEALLOCATE_ERR(char *&errStr)
{
if(errStr)
{
delete errStr;
errStr = NULL;
}
}
//--------------------------------------------------------------------
//ENDTRANSACTION
//
//Purpose : end the current transaction
//Params : none
//--------------------------------------------------------------------
short ENDTRANSACTION()
{
char *errStr = NULL;
int errlen = 0;
short lv_error = ENDTRANSACTION_ERR(errStr,errlen);
DEALLOCATE_ERR(errStr);
return lv_error;
}
// --------------------------------------------------------------------
// ENDTRANSACTION
//
// Purpose : end the current transaction
// Params : none
// --------------------------------------------------------------------
short ENDTRANSACTION_ERR(char *&errStr, int &errlen)
{
short lv_error = FEOK;
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
TM_Transaction *lp_trans = gp_trans_thr->get_current();
if (lp_trans == NULL)
{
TMlibTrace(("TMLIB_TRACE : ENDTRANSACTION returning with error %d\n",
FENOTRANSID), 1);
return FENOTRANSID;
}
TMlibTrace(("TMLIB_TRACE : ENDTRANSACTION ENTRY: txid: %d\n", lp_trans->getTransid()->get_seq_num()), 1);
lv_error = lp_trans->end(errStr, errlen);
TMlibTrace(("TMLIB_TRACE : ENDTRANSACTION EXIT: txid: %d\n", lp_trans->getTransid()->get_seq_num()), 1);
// cleanup for legacy API
if ((lv_error == FEINVTRANSID) ||
(lv_error == FENOTRANSID) ||
(lv_error == FEOK) ||
(lv_error == FEABORTEDTRANSID) ||
(lv_error == FEENDEDTRANSID) ||
(lv_error == FELOCKED) ||
(lv_error == FEHASCONFLICT))
{
// end removes the tx from the list and deletes the
// enlistment object. We simply need to delete the trans
gp_trans_thr->set_current(NULL);
TM_Native_Type lv_native_type_txid = lp_trans->getTransid()->get_native_type();
gv_tmlib.cleanupTransactionLocal(lv_native_type_txid);
delete lp_trans;
}
return lv_error;
}
// --------------------------------------------------------------
// GETTRANSID
//
// Purpose : send GETTRANSID message to TM
// Params : pp_transid - pointer to a transid (out)
// --------------------------------------------------------------
short GETTRANSID(short* pp_transid)
{
TM_Native_Type *lp_transid_to_return = NULL;
TM_Transid *lp_transid = NULL;
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
short lv_error = tmlib_check_active_tx ();
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : GETTRANSID returning with error %d\n",
lv_error), 2);
return lv_error;
}
lv_error = tmlib_check_miss_param (pp_transid);
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : GETTRANSID returning with error %d\n",
lv_error), 1);
return lv_error;
}
lp_transid_to_return = (TM_Native_Type *)pp_transid;
lp_transid = gp_trans_thr->get_current()->getTransid();
if (lp_transid == NULL)
{
TMlibTrace(("TMLIB_TRACE : GETTRANSID failed, aborting\n"), 1);
abort();
}
*lp_transid_to_return = lp_transid->get_native_type();
return FEOK;
}
// --------------------------------------------------------------
// GETTRANSINFO
//
// Purpose : send GETTRANSID message to TM
// Params : pp_transid - pointer to a transid (out)
// --------------------------------------------------------------
short GETTRANSINFO(short *pp_transid, int64 *pp_type_flags)
{
TM_Native_Type *lp_transid_native = (TM_Native_Type *) pp_transid;
TM_Transid *lp_transid = NULL;
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
TM_Transaction *lp_trans = gp_trans_thr->get_current();
short lv_error = FEOK;
TMlibTrace(("TMLIB_TRACE : GETTRANSINFO ENTRY\n"), 2);
lv_error = tmlib_check_active_tx ();
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : GETTRANSINFO returning with error %d\n",
lv_error), 1);
return lv_error;
}
lv_error = tmlib_check_miss_param (pp_transid);
if (lv_error == FEOK)
lv_error = tmlib_check_miss_param(pp_type_flags);
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : GETTRANSINFO returning with error %d\n",
lv_error), 1);
return lv_error;
}
if (lp_trans == NULL)
lv_error = FENOTRANSID;
else
{
lp_transid = lp_trans->getTransid();
if (lp_transid)
{
TM_Native_Type lv_native = lp_transid->get_native_type();
memcpy(lp_transid_native, &lv_native, sizeof(TM_Native_Type));
*pp_type_flags = lp_trans->getTypeFlags();
}
else
lv_error = FENOTRANSID;
}
TMlibTrace(("TMLIB_TRACE : GETTRANSINFO EXIT, error %d\n", lv_error), 2);
return FEOK;
}
// -----------------------------------------------------------------
// JOINTRANSACTION
//
// Purpose - join transid specified by pv_transid
// Params - pv_transid
// ----------------------------------------------------------------
short JOINTRANSACTION(int64 pv_transid)
{
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION ENTRY\n"), 2);
if (pv_transid == 0)
{
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION returning with error %d, empty transid supplied.\n",
FEINVTRANSID), 1);
return FEINVTRANSID;
}
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
TM_Transaction *lp_trans = NULL;
short lv_error = FEOK;
TM_Transid lv_transid;
lv_transid = pv_transid;
TM_Transaction *pp_current = gp_trans_thr->get_current();
if ((pp_current != NULL)
&& (pp_current->getTransid()->get_native_type() == lv_transid.get_native_type()))
{
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION ID (%d,%d) returning with error %d\n",
lv_transid.get_node(), lv_transid.get_seq_num(), FEALREADYJOINED), 1);
return FEALREADYJOINED;
}
lp_trans = gp_trans_thr->get_trans (lv_transid.get_native_type());
if (lp_trans)
{ /*12/22/2010 Removed restriction on joining a transaction we have already
joined (implicit or explicit) in a server.
if (!lp_trans->isEnder())
{
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION ID (%d,%d) returning with error %d\n",
lv_transid.get_node(), lv_transid.get_seq_num(), FEALREADYJOINED), 1);
return FEALREADYJOINED;
}
else */
{
gp_trans_thr->set_current(lp_trans);
gp_trans_thr->set_current_suspended(false);
}
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION ID (%d,%d) EXIT FEOK\n",
lv_transid.get_node(), lv_transid.get_seq_num()), 2);
return FEOK;
}
else
{
lp_trans = new TM_Transaction (lv_transid, false); // implicit join and add
if (lp_trans == NULL)
{
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION ID (%d,%d) returning with error %d\n",
lv_transid.get_node(), lv_transid.get_seq_num(), FENOBUFSPACE), 1);
return FENOBUFSPACE;
}
lv_error = lp_trans->get_error();
if (lv_error)
delete lp_trans;
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION ID (%d,%d) EXIT, error %d\n",
lv_transid.get_node(), lv_transid.get_seq_num(), lv_error), 2);
return lv_error;
}
}
// -----------------------------------------------------------------
// RESUMETRANSACTION
//
// Purpose - resume transaction specified by pv_tag.
// Note that the tag is process local and can not be used in
// RESUMETRANSACTION calls outside the beginner.
// Params - pv_tag, either a valid tag or 0
// -----------------------------------------------------------------
short RESUMETRANSACTION(int pv_tag)
{
unsigned int lv_tag = (unsigned int) pv_tag;
TMlibTrace(("TMLIB_TRACE : RESUMETRANSACTION ENTRY, tag %d\n",
lv_tag), 2);
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
if ((pv_tag == -1) || (pv_tag == 0))
{
gp_trans_thr->set_current(NULL);
TMlibTrace(("TMLIB_TRACE : RESUMETRANSACTION EXIT\n"), 2);
return FEOK;
}
TM_Transaction *lp_trans = gp_trans_thr->get_trans(lv_tag);
if (lp_trans)
gp_trans_thr->set_current(lp_trans);
else
{
TMlibTrace(("TMLIB_TRACE : RESUMETRANSACTION EXIT with error %d\n",
FEINVTRANSID), 2);
return FEINVTRANSID;
}
TMlibTrace(("TMLIB_TRACE : RESUMETRANSACTION EXIT\n"), 2);
return FEOK;
}
// ------------------------------------------------------------------
// STATUSTRANSACTION
//
// Purpose : send STATUSTRANSACTION message to TM
// Params : pp_status - out param for status, possible values are
// ACTIVE, PREPARED, COMMITTED, ABORTING, ABORTED, HUNG
// -----------------------------------------------------------------
short STATUSTRANSACTION(short *pp_status, int64 pv_transid)
{
short lv_return = 0;
TMlibTrace(("TMLIB_TRACE : STATUSTRANSACTION ENTRY\n"), 2);
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// we don't know about this tx, which is ok, just send it on to the DTM
if (pv_transid != 0)
{
short lv_error = FEOK;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
TM_Transid lv_transid ((TM_Native_Type)pv_transid);
tmlib_init_req_hdr(TM_MSG_TYPE_STATUSTRANSACTION, &lv_req);
lv_transid.set_external_data_type(&lv_req.u.iv_status_trans.iv_transid);
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, lv_transid.get_node());
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : STATUSTRANSACTION EXIT with error %d\n", lv_error), 1);
return lv_error;
}
*pp_status = lv_rsp.u.iv_status_trans.iv_status;
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
TMlibTrace(("TMLIB_TRACE : STATUSTRANSACTION EXIT with error %d\n", lv_error), 2);
return lv_error;
}
else if (gp_trans_thr->get_current() == NULL)
lv_return = FENOTRANSID;
else
lv_return = gp_trans_thr->get_current()->status(pp_status);
TMlibTrace(("TMLIB_TRACE : STATUSTRANSACTION EXIT with error %d\n", lv_return), 2);
return lv_return;
}
// -----------------------------------------------------------------
// LISTTRANSACTION
//
// -----------------------------------------------------------------
short LISTTRANSACTION(TM_LIST_TRANS pa_trans[], short *pp_count, int pv_node)
{
short lv_error = FEOK;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
TMlibTrace(("TMLIB_TRACE : LISTTRANSACTION ENTRY\n"), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
lv_error = tmlib_check_miss_param (pa_trans);
if (lv_error)
return lv_error;
lv_error = tmlib_check_miss_param (pp_count);
if (lv_error)
return lv_error;
tmlib_init_req_hdr(TM_MSG_TYPE_LISTTRANSACTION, &lv_req);
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, pv_node);
if (lv_error)
{
*pp_count = 0;
TMlibTrace(("TMLIB_TRACE : LISTTRANSACTION EXIT with error %d\n", lv_error), 1);
return lv_error;
}
*pp_count = lv_rsp.u.iv_list_trans.iv_count;
for (int i=0; i<*pp_count; i++)
memcpy ((void *) &pa_trans[i], (void *) &lv_rsp.u.iv_list_trans.iv_trans[i], sizeof(TM_LIST_TRANS));
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
TMlibTrace(("TMLIB_TRACE : LISTTRANSACTION EXIT with error %d\n", lv_error), 2);
return lv_error;
}
// -----------------------------------------------------------------
// TMSTATS
//
// -----------------------------------------------------------------
short TMSTATS(int pv_node, TM_TMSTATS *pp_tmstats, bool pv_reset)
{
short lv_error = FEOK;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
lv_error = tmlib_check_miss_param (pp_tmstats);
if (lv_error)
return lv_error;
tmlib_init_req_hdr(TM_MSG_TYPE_TMSTATS, &lv_req);
lv_req.u.iv_tmstats.iv_reset = pv_reset;
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, pv_node);
if (!lv_error)
{
memcpy (pp_tmstats, &lv_rsp.u.iv_tmstats, sizeof (Tmstats_Rsp_Type));
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
}
return lv_error;
}
// -----------------------------------------------------------------
// DTM_GETNEXTSEQNUMBLOCK
// Retrieves the next block of transaction sequence
// numbers.
// -----------------------------------------------------------------
short DTM_GETNEXTSEQNUMBLOCK(unsigned int &pp_seqNum_start, unsigned int &pp_seqNum_count)
{
short lv_error = FEOK;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
tmlib_init_req_hdr(TM_MSG_TYPE_GETNEXTSEQNUMBLOCK, &lv_req);
lv_req.u.iv_GetNextSeqNum.iv_block_size = gv_tmlib.seqNum_blockSize();
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, gv_tmlib.iv_my_nid);
if (!lv_error)
{
pp_seqNum_start = lv_rsp.u.iv_GetNextSeqNum.iv_seqNumBlock_start;
pp_seqNum_count = lv_rsp.u.iv_GetNextSeqNum.iv_seqNumBlock_count;
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
}
return lv_error;
}
// -------------------------------------------------------------------
// SUSPENDTRANSACTION
//
// Purpose - Suspend current transaction
// Params - pp_transid, out param of transid
// ------------------------------------------------------------------
short SUSPENDTRANSACTION(short *pp_transid)
{
short lv_error = FEOK;
TM_Transid lv_transid;
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
TM_Transaction *lp_trans = gp_trans_thr->get_current();
if (lp_trans == NULL)
{
TMlibTrace(("TMLIB_TRACE : SUSPENDTRANSACTION returning error %d\n", FENOTRANSID), 1);
return FENOTRANSID;
}
// they did not join and hence cannot suspend
if (gp_trans_thr->get_current_propagated() == true)
{
TMlibTrace(("TMLIB_TRACE : SUSPENDTRANSACTION returning error %d\n", FETXSUSPENDREJECTED), 1);
return FETXSUSPENDREJECTED;
}
lv_error = lp_trans->suspend(&lv_transid);
// copy the transid if there was no error
if (!lv_error)
memcpy (pp_transid, lv_transid.get_data_address(), sizeof (TM_Native_Type));
// if there no error, OR if there WAS an error and we are not the ender, then
// get rid of the context as long as we don't have outstanding I/Os.
if (((!lv_error) || ((lv_error) && (!lp_trans->isEnder()))) &&
((gp_trans_thr->get_current() != NULL) && (gp_trans_thr->get_current_ios() == 0)))
{
gp_trans_thr->set_current(NULL);
if(!lp_trans->isEnder()) {
TM_Native_Type lv_native_type_txid = lp_trans->getTransid()->get_native_type();
gv_tmlib.cleanupTransactionLocal(lv_native_type_txid);
delete lp_trans;
}
}
return lv_error;
}
short TMF_GETTXHANDLE_(short *pp_handle)
{
TMlibTrace(("TMLIB_TRACE : TMF_GETTXHANDLE_ ENTRY\n"), 2);
short lv_error = GETTRANSID_EXT((TM_Transid_Type *)pp_handle);
if (lv_error == FEINVTRANSID)
{
TMlibTrace(("TMLIB_TRACE : TMF_GETTXHANDLE_ EXIT with error %d\n", FEINVALIDTXHANDLE), 1);
return FEINVALIDTXHANDLE;
}
TMlibTrace(("TMLIB_TRACE : TMF_GETTXHANDLE_ EXIT with error %d\n", lv_error), 2);
return lv_error;
}
//----------------------------------------------------------------------------
// TMF_SETTXHANDLE_
// Purpose : Emulates TMF API TMF_SETTXHANDLE_.
// if pp_handle == NULL (zero) or points to a zero value, then this API clears
// the current transaction.
//----------------------------------------------------------------------------
short TMF_SETTXHANDLE_(short *pp_handle)
{
TM_Transaction *lp_trans_old = NULL;
short lv_error = FEOK;
TM_Transid lv_tx;
TM_Transid_Type *lp_tx_type = NULL;
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
if (pp_handle) {
TMlibTrace(("TMLIB_TRACE : TMF_SETTXHANDLE_ ENTRY, handle " PFLL "\n", (int64) *pp_handle), 2);
}
else {
TMlibTrace(("TMLIB_TRACE : TMF_SETTXHANDLE_ ENTRY, null handle.\n"), 2);
}
if (pp_handle)
{
lv_error = tmlib_check_miss_param (pp_handle);
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : TMF_SETTXHANDLE_ EXIT with error %d\n", lv_error), 1);
return lv_error;
}
lp_tx_type = (TM_Transid_Type *)pp_handle;
}
lp_trans_old = gp_trans_thr->get_current();
gp_trans_thr->set_current(NULL); // null transid
if (pp_handle && !tmlib_zero_transid(lp_tx_type))
{
lv_tx = *lp_tx_type;
if (! gv_tmlib.reinstate_tx (&lv_tx, true))
{
gp_trans_thr->set_current(lp_trans_old);
TMlibTrace(("TMLIB_TRACE : TMF_SETTXHANDLE_ EXIT with error %d\n", FEINVALIDTXHANDLE), 1);
lv_error = FEINVALIDTXHANDLE;
}
}
TMlibTrace(("TMLIB_TRACE :TM_SETTXHANDLE_ EXIT returning %d.\n", lv_error), 2);
return lv_error;
}
// ------------------------------------------------------------------
//
// ------------------------------------------------------------------
short TMF_DOOMTRANSACTION_()
{
TM_Transaction *lp_trans = NULL;
short lv_error = FEOK;
TMlibTrace(("TMLIB_TRACE : TMF_DOOMTRANSACTION_ ENTRY\n"), 2);
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
lp_trans = gp_trans_thr->get_current();
if (lp_trans == NULL)
{
TMlibTrace(("TMLIB_TRACE : TMF_DOOMTRANSACTION_ EXIT with error %d\n", FENOTRANSID), 1);
return FENOTRANSID;
}
lv_error = lp_trans->abort(true);
// this is the expected error, so return FEOK
if (lv_error == FEABORTEDTRANSID)
{
TMlibTrace(("TMLIB_TRACE : TMF_DOOMTRANSACTION_ EXIT with error %d\n", FEOK), 2);
lv_error = FEOK;
}
else
TMlibTrace(("TMLIB_TRACE : TMF_DOOMTRANSACTION_ EXIT with error %d\n", lv_error), 2);
return lv_error;
}
// -----------------------------------------------------------------
// DTM_STATUSSYSTEM
// Purpose - Return TM system information
// Returns FEOK if successful
// -----------------------------------------------------------------
short DTM_STATUSSYSTEM(TM_STATUSSYS *pp_status)
{
short lv_error = FEOK;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
memset(&lv_rsp, 0, sizeof(lv_rsp));
TMlibTrace(("TMLIB_TRACE : DTM_STATUSSYS ENTRY"), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
lv_error = tmlib_check_miss_param (pp_status);
if (lv_error)
return lv_error;
tmlib_init_req_hdr(TM_MSG_TYPE_CALLSTATUSSYSTEM, &lv_req);
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, gv_tmlib.iv_my_nid);
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : DTM_STATUSSYS EXIT with error %d\n", lv_error), 1);
return lv_error;
}
memcpy(pp_status, &lv_rsp,
(sizeof(TM_STATUSSYS)));
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
TMlibTrace(("TMLIB_TRACE : DTM_STATUSSYS EXIT with error %d\n", lv_error), 2);
return lv_error;
} // DTM_STATUSSYSTEM
// ------------------------------------------------------------------
// DTM_ATTACHRM
// Purpose : Intruct a TM to attach a newly restarted RM
// ------------------------------------------------------------------
short DTM_ATTACHRM(short pv_node, char *pp_rmname)
{
short lv_error = FEOK;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
memset(&lv_rsp, 0, sizeof(lv_rsp));
TMlibTrace(("TMLIB_TRACE : DTM_ATTACHRM ENTRY, node %d\n", pv_node), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
lv_error = tmlib_check_miss_param (pp_rmname);
if (lv_error)
return lv_error;
tmlib_init_req_hdr(TM_MSG_TYPE_ATTACHRM, &lv_req);
strcpy(lv_req.u.iv_attachrm.ia_rmname, pp_rmname);
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, pv_node);
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : DTM_ATTACHRM EXIT with error %d\n", lv_error), 1);
return lv_error;
}
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
TMlibTrace(("TMLIB_TRACE : DTM_ATTACHRM EXIT with error %d\n", lv_error), 2);
return lv_error;
}
// -----------------------------------------------------------------
// DTM_STATUSTM
// Purpose - Return status information for a specific TM.
// Returns FEOK if successful
// FENOTFOUND if the node specified was not found.
// -----------------------------------------------------------------
short DTM_STATUSTM(short pv_node, TMSTATUS *pp_tmstatus)
{
short lv_error = FEOK;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
memset(&lv_rsp, 0, sizeof(lv_rsp));
TMlibTrace(("TMLIB_TRACE : DTM_STATUSTM ENTRY, node %d\n", pv_node), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
lv_error = tmlib_check_miss_param (pp_tmstatus);
if (lv_error)
return lv_error;
tmlib_init_req_hdr(TM_MSG_TYPE_STATUSTM, &lv_req);
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, pv_node);
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : DTM_STATUSTM EXIT with error %d\n", lv_error), 1);
return lv_error;
}
memcpy(pp_tmstatus, &lv_rsp.u.iv_statustm.iv_status,
(sizeof(lv_rsp.u.iv_statustm.iv_status) +
(lv_rsp.u.iv_statustm.iv_status.iv_rm_count * sizeof(RM_INFO))));
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
TMlibTrace(("TMLIB_TRACE : DTM_STATUSTM EXIT with error %d\n", lv_error), 2);
return lv_error;
} // DTM_STATUSTM
// -----------------------------------------------------------------
// DTM_STATUSTRANSACTION
// Purpose - Provides status information for a specified transaction
// Returns FEOK if successful
// FENOTRANSID Trans ID not known to TM
// FEINVTRANSID Invalid Trans ID
// FEBADPARMVALUE One of the required parameters is invalid
// -----------------------------------------------------------------
short DTM_STATUSTRANSACTION(int64 pv_transid, TM_STATUS_TRANS *pp_trans)
{
short lv_error = FEOK;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
memset(&lv_rsp, 0, sizeof(lv_rsp));
TMlibTrace(("TMLIB_TRACE : DTM_STATUSTRANSACTION ENTRY, transid " PFLL "\n", pv_transid), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
if (pv_transid != 0)
{
TM_Transid lv_transid ((TM_Native_Type)pv_transid);
lv_error = tmlib_check_miss_param (pp_trans);
if (lv_error){
TMlibTrace(("TMLIB_TRACE : DTM_STATUSTRANSMGMT EXIT with error %d\n", lv_error), 1);
return lv_error;
}
tmlib_init_req_hdr(TM_MSG_TYPE_STATUSTRANSMGMT, &lv_req);
//setting the response
lv_transid.set_external_data_type(&lv_req.u.iv_status_transm.iv_transid);
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, lv_transid.get_node());
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : DTM_STATUSTRANSMGMT EXIT with error %d\n", lv_error), 1);
return lv_error;
}
memcpy(pp_trans, &lv_rsp.u.iv_status_transm.iv_status_trans,
(sizeof(lv_rsp.u.iv_status_transm.iv_status_trans)));
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
}
else if (gp_trans_thr->get_current() == NULL){
lv_error = FENOTRANSID;
}
else {
lv_error = FEINVTRANSID;
}
TMlibTrace(("TMLIB_TRACE : DTM_STATUSTRANSACTION EXIT with error %d\n", lv_error), 2);
return lv_error;
} // DTM_STATUSTRANSACTION
// -----------------------------------------------------------------
// DTM_STATUSALLTRANS
// Purpose - Provides status information for all transactions
// Returns FEOK if successful
// -----------------------------------------------------------------
short DTM_STATUSALLTRANS(TM_STATUS_ALL_TRANS pa_trans[], short *pp_count, int pv_node)
{
short lv_error = FEOK;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
TMlibTrace(("TMLIB_TRACE : DTM_STATUSALLTRANS ENTRY\n"),2);
if(!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
lv_error = tmlib_check_miss_param (pa_trans);
if(lv_error)
return lv_error;
lv_error = tmlib_check_miss_param (pp_count);
if(lv_error)
return lv_error;
tmlib_init_req_hdr(TM_MSG_TYPE_STATUSALLTRANSMGT, &lv_req);
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, pv_node);
if(lv_error)
{
*pp_count = 0;
TMlibTrace(("TMLIB_TRACE : DTM_STATUSALLTRANS ENTRY%d\n", lv_error),1);
return lv_error;
}
*pp_count = lv_rsp.u.iv_status_alltrans.iv_count;
for (int i=0; i<*pp_count; i++)
memcpy((void *) &pa_trans[i], (void *) &lv_rsp.u.iv_status_alltrans.iv_trans[i], sizeof(TM_STATUS_ALL_TRANS));
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
TMlibTrace(("TMLIB_TRACE : DTM_STATUSALLTRANS EXIT with error %d\n", lv_error),2);
return lv_error;
}
// -----------------------------------------------------------------
// DTM_GETTRANSINFO
// Purpose - Provides full transaction ID information
// Returns FEOK if successful
// FENOTRANSID Trans ID not known to TM
// FEINVTRANSID Invalid Trans ID
// FEBADPARMVALUE One of the required parameters is invalid
// -----------------------------------------------------------------
short DTM_GETTRANSINFO(int64 pv_transid,
int32 *pp_seq_num,
int32 *pp_node,
int16 *pp_incarnation_num,
int16 *pp_tx_flags,
TM_TT_Flags *pp_tt_flags,
int16 *pp_version,
int16 *pp_checksum,
int64 *pp_timestamp)
{
short lv_error = FEOK;
TMlibTrace(("TMLIB_TRACE : DTM_GETTRANSINFO ENTRY, transid " PFLL "\n", pv_transid), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
if (pv_transid != 0)
{
TM_Transid lv_transid ((TM_Native_Type)pv_transid);
TM_Transid_Type lv_transid_type = lv_transid.get_data();
lv_error = DTM_GETTRANSINFO_EXT(lv_transid_type,
pp_seq_num,
pp_node,
pp_incarnation_num,
pp_tx_flags,
pp_tt_flags,
pp_version,
pp_checksum,
pp_timestamp);
}
else if (gp_trans_thr->get_current() == NULL){
lv_error = FENOTRANSID;
}
else {
lv_error = FEINVTRANSID;
}
TMlibTrace(("TMLIB_TRACE : DTM_GETTRANSINFO EXIT with error %d\n", lv_error), 2);
return lv_error;
} // DTM_GETTRANSINFO
// -----------------------------------------------------------------
// DTM_GETTRANSINFO_EXT
// Purpose - Provides full transaction ID information
// Returns FEOK if successful
// FENOTRANSID Trans ID not known to TM
// FEINVTRANSID Invalid Trans ID
// FEBADPARMVALUE One of the required parameters is invalid
// -----------------------------------------------------------------
short DTM_GETTRANSINFO_EXT(TM_Transid_Type pv_transid,
int32 *pp_seq_num,
int32 *pp_node,
int16 *pp_incarnation_num,
int16 *pp_tx_flags,
TM_TT_Flags *pp_tt_flags,
int16 *pp_version,
int16 *pp_checksum,
int64 *pp_timestamp)
{
short lv_error = FEOK;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
TM_Transid lv_transid;
memset(&lv_rsp, 0, sizeof(lv_rsp));
TMlibTrace(("TMLIB_TRACE : DTM_GETTRANSINFO_EXT ENTRY\n"), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
lv_transid = pv_transid;
if ((lv_transid.get_node()!=0) || (lv_transid.get_seq_num()!=0))
{
tmlib_init_req_hdr(TM_MSG_TYPE_GETTRANSINFO, &lv_req);
//setting the response
lv_transid.set_external_data_type(&lv_req.u.iv_status_transm.iv_transid);
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, lv_transid.get_node());
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : DTM_GETTRANSINFO_EXT EXIT with error %d\n", lv_error), 1);
return lv_error;
}
if(pp_seq_num != NULL) {
memcpy(pp_seq_num, &lv_rsp.u.iv_gettransinfo.iv_seqnum,
(sizeof(lv_rsp.u.iv_gettransinfo.iv_seqnum)));
}
if(pp_node != NULL) {
memcpy(pp_node, &lv_rsp.u.iv_gettransinfo.iv_node,
(sizeof(lv_rsp.u.iv_gettransinfo.iv_node)));
}
if(pp_incarnation_num != NULL) {
memcpy(pp_incarnation_num, &lv_rsp.u.iv_gettransinfo.iv_incarnation_num,
(sizeof(lv_rsp.u.iv_gettransinfo.iv_incarnation_num)));
}
if(pp_tx_flags != NULL) {
memcpy(pp_tx_flags, &lv_rsp.u.iv_gettransinfo.iv_tx_flags,
(sizeof(lv_rsp.u.iv_gettransinfo.iv_tx_flags)));
}
if(pp_tt_flags != NULL) {
memcpy(pp_tt_flags, &lv_rsp.u.iv_gettransinfo.iv_tt_flags,
(sizeof(lv_rsp.u.iv_gettransinfo.iv_tt_flags)));
}
if(pp_version != NULL) {
memcpy(pp_version, &lv_rsp.u.iv_gettransinfo.iv_version,
(sizeof(lv_rsp.u.iv_gettransinfo.iv_version)));
}
if(pp_checksum != NULL) {
memcpy(pp_checksum, &lv_rsp.u.iv_gettransinfo.iv_checksum,
(sizeof(lv_rsp.u.iv_gettransinfo.iv_checksum)));
}
if(pp_timestamp != NULL) {
memcpy(pp_timestamp, &lv_rsp.u.iv_gettransinfo.iv_timestamp,
(sizeof(lv_rsp.u.iv_gettransinfo.iv_timestamp)));
}
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
}
else if (gp_trans_thr->get_current() == NULL){
lv_error = FENOTRANSID;
}
else {
lv_error = FEINVTRANSID;
}
TMlibTrace(("TMLIB_TRACE : DTM_GETTRANSINFO_EXT EXIT with error %d\n", lv_error), 2);
return lv_error;
} // DTM_GETTRANSINFO_EXT
// -----------------------------------------------------------------
// DTM_GETTRANSIDSTR(int64 pv_transid, char *pp_transidstr);
// Purpose - Obtain transid information in string format
// Returns FEOK if successful
//
// -----------------------------------------------------------------
short DTM_GETTRANSIDSTR(int64 pv_transid, char *pp_transidstr)
{
int32 lv_seq_num, lv_node;
int16 lv_incarnation_num;
short lv_error = FEOK;
TMlibTrace(("TMLIB_TRACE : DTM_GETTRANSIDSTR ENTRY with transid: " PFLL "\n", pv_transid), 2);
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
lv_error = DTM_GETTRANSINFO(pv_transid, &lv_seq_num, &lv_node,
&lv_incarnation_num, NULL,
NULL, NULL, NULL,
NULL);
if(!lv_error) {
sprintf(pp_transidstr, "(%d, %d, %d)", lv_node, lv_seq_num, lv_incarnation_num);
}
TMlibTrace(("TMLIB_TRACE : DTM_GETTRANSIDSTR EXIT with error %d\n", lv_error), 2);
return lv_error;
}
// -----------------------------------------------------------------
// DTM_GETTRANSIDSTR(TM_Transid_Type pv_transid, char *pp_transidstr);
// Purpose - Obtain transid information in string format uses input transid
// format TM_Transid_Type
// Returns FEOK if successful
//
// -----------------------------------------------------------------
short DTM_GETTRANSIDSTR_EXT(TM_Transid_Type pv_transid, char *pp_transidstr)
{
short lv_error = FEOK;
TM_Transid lv_transid;
union {
TM_Txid_legacy iv_legacy_txid;
int64 iv_txid;
} u;
TMlibTrace(("TMLIB_TRACE : DTM_GETTRANSIDSTR_EXT ENTRY\n"), 2);
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
lv_transid = pv_transid;
u.iv_legacy_txid.iv_seq_num = lv_transid.get_seq_num();
u.iv_legacy_txid.iv_node = lv_transid.get_node();
if (u.iv_txid != 0)
{
lv_error = DTM_GETTRANSIDSTR(u.iv_txid, pp_transidstr);
}
TMlibTrace(("TMLIB_TRACE : DTM_GETTRANSIDSTR_EXT EXIT with error %d\n", lv_error), 2);
return lv_error;
}
// -----------------------------------------------------------------
// DTM_ENABLETRANSACTIONS
// Purpose - Request DTM enable transaction processing.
// Returns FEOK if successful
//
// -----------------------------------------------------------------
short DTM_ENABLETRANSACTIONS()
{
short lv_error = FEOK;
int32 lv_leadTM = -1;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
memset(&lv_rsp, 0, sizeof(lv_rsp));
TMlibTrace(("TMLIB_TRACE : DTM_ENABLETRANSACTIONS ENTRY.\n"), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
// First get the Lead TM node number.
tmlib_init_req_hdr(TM_MSG_TYPE_LEADTM, &lv_req);
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, gv_tmlib.iv_my_nid);
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : DTM_ENABLETRANSACTIONS EXIT - Get LeadTM returned error %d\n", lv_error), 1);
return lv_error;
}
lv_leadTM = lv_rsp.u.iv_leadtm.iv_node;
// Now we can send the enable txns request to the Lead TM
tmlib_init_req_hdr(TM_MSG_TYPE_ENABLETRANS, &lv_req);
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, lv_leadTM);
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : DTM_ENABLETRANSACTIONS EXIT returned error %d\n", lv_error), 1);
return lv_error;
}
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
TMlibTrace(("TMLIB_TRACE : DTM_ENABLETRANSACTIONS EXIT with error %d\n", lv_error), 2);
return lv_error;
} // DTM_ENABLETRANSACTIONS
// -----------------------------------------------------------------
// DTM_DISABLETRANSACTIONS
// Purpose - Request DTM disable transaction processing.
// pv_shutdown_level is defined in tm.h. This is the shutdown level
// requested.
// Returns FEOK if successful
// FEDEVDOWN The TM that enabletrans was sent to was not or
// no longer is the Lead TM.
// -----------------------------------------------------------------
short DTM_DISABLETRANSACTIONS(int32 pv_shutdown_level)
{
short lv_error = FEOK;
int32 lv_leadTM = -1;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
memset(&lv_rsp, 0, sizeof(lv_rsp));
TMlibTrace(("TMLIB_TRACE : DTM_DISABLETRANSACTIONS ENTRY shutdown level %d\n", pv_shutdown_level), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
if (pv_shutdown_level != TM_DISABLE_NORMAL &&
pv_shutdown_level != TM_DISABLE_SHUTDOWN_NORMAL &&
pv_shutdown_level != TM_DISABLE_SHUTDOWN_IMMEDIATE)
{
TMlibTrace(("TMLIB_TRACE : DTM_DISABLETRANSACTIONS EXIT - Bad shutdown level specified %d\n", pv_shutdown_level), 1);
return FEBADPARMVALUE;
}
// First get the Lead TM node number.
tmlib_init_req_hdr(TM_MSG_TYPE_LEADTM, &lv_req);
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, gv_tmlib.iv_my_nid);
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : DTM_DISABLETRANSACTIONS EXIT - Get LeadTM returned error %d\n", lv_error), 1);
return lv_error;
}
lv_leadTM = lv_rsp.u.iv_leadtm.iv_node;
// Now we can send the disable txns request to the Lead TM
tmlib_init_req_hdr(TM_MSG_TYPE_DISABLETRANS, &lv_req);
lv_req.u.iv_disabletrans.iv_shutdown_level = pv_shutdown_level;
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, lv_leadTM);
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : DTM_DISABLETRANSACTIONS EXIT returned error %d\n", lv_error), 1);
return lv_error;
}
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
TMlibTrace(("TMLIB_TRACE : DTM_DISABLETRANSACTIONS EXIT with reply error %d\n", lv_error), 2);
return lv_error;
} // DTM_DISABLETRANSACTIONS
// -----------------------------------------------------------------
// DTM_DRAINTRANSACTIONS
// Purpose - Request a TM specified by pv_node to disable new
// transactions and allow active transactions to complete or abort
// if pv_immediate is set. pv_immediate defaults to false.
// Returns FEOK if successful
// FEDEVDOWN The TM specified by pv_node is not up.
// -----------------------------------------------------------------
short DTM_DRAINTRANSACTIONS(int32 pv_node, bool pv_immediate=false)
{
short lv_error = FEOK;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
memset(&lv_rsp, 0, sizeof(lv_rsp));
TMlibTrace(("TMLIB_TRACE : DTM_DRAINTRANSACTIONS ENTRY node %d, immediate=%d.\n", pv_node, pv_immediate), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
tmlib_init_req_hdr(TM_MSG_TYPE_DRAINTRANS, &lv_req);
lv_req.u.iv_draintrans.iv_immediate = pv_immediate;
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, pv_node);
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : DTM_DRAINTRANSACTIONS EXIT returned error %d\n", lv_error), 1);
return lv_error;
}
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
TMlibTrace(("TMLIB_TRACE : DTM_DRAINTRANSACTIONS EXIT with reply error %d\n", lv_error), 2);
return lv_error;
} // DTM_DRAINTRANSACTIONS
// -----------------------------------------------------------------
// DTM_QUIESCE
// Purpose - Send a Quiesce request to the TM specified by pv_node.
// This is for internal testing of NodeQuiesce only!!!!
// Returns FEOK if successful
// FEDEVDOWN The TM specified by pv_node is not up.
// -----------------------------------------------------------------
short DTM_QUIESCE(int32 pv_node)
{
short lv_error = FEOK;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
memset(&lv_rsp, 0, sizeof(lv_rsp));
TMlibTrace(("TMLIB_TRACE : DTM_QUIESCE ENTRY node %d.\n", pv_node), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
tmlib_init_req_hdr(TM_MSG_TYPE_QUIESCE, &lv_req);
lv_req.u.iv_quiesce.iv_stop = false;
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, pv_node);
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : DTM_QUIESCE EXIT returned error %d\n", lv_error), 1);
return lv_error;
}
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
TMlibTrace(("TMLIB_TRACE : DTM_QUIESCE EXIT with reply error %d\n", lv_error), 2);
return lv_error;
} // DTM_QUIESCE
// -----------------------------------------------------------------
// DTM_UNQUIESCE
// Purpose - Send a Un-Quiesce request to the TM specified by pv_node.
// This is for internal testing of NodeQuiesce only!!!!
// Returns FEOK if successful
// FEDEVDOWN The TM specified by pv_node is not up.
// -----------------------------------------------------------------
short DTM_UNQUIESCE(int32 pv_node)
{
short lv_error = FEOK;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
memset(&lv_rsp, 0, sizeof(lv_rsp));
TMlibTrace(("TMLIB_TRACE : DTM_QUIESCE ENTRY node %d.\n", pv_node), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
tmlib_init_req_hdr(TM_MSG_TYPE_QUIESCE, &lv_req);
lv_req.u.iv_quiesce.iv_stop = true;
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, pv_node);
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : DTM_QUIESCE EXIT returned error %d\n", lv_error), 1);
return lv_error;
}
lv_error = lv_rsp.iv_msg_hdr.miv_err.error;
TMlibTrace(("TMLIB_TRACE : DTM_QUIESCE EXIT with reply error %d\n", lv_error), 2);
return lv_error;
} // DTM_QUIESCE
// ------------------------------------------------------------------
// EXTENDED API - same as their counterparts
// ------------------------------------------------------------------
short GETTRANSID_EXT (TM_Transid_Type *pp_transid)
{
TMlibTrace(("TMLIB_TRACE : GETTRANSID_EXT ENTRY\n"), 2);
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
if (gp_trans_thr->get_current() == NULL)
{
TMlibTrace(("TMLIB_TRACE : GETTRANSID_EXT EXIT with error %d\n", FENOTRANSID), 2);
return FENOTRANSID;
}
TM_Transid *lp_transid = gp_trans_thr->get_current()->getTransid();
if (lp_transid)
memcpy (pp_transid, lp_transid->get_data_address(), sizeof (TM_Transid_Type));
else
{
TMlibTrace(("TMLIB_TRACE : GETTRANSID_EXT EXIT with error %d\n", FENOTRANSID), 2);
return FENOTRANSID;
}
TMlibTrace(("TMLIB_TRACE : GETTRANSID_EXT EXIT\n"), 2);
return FEOK;
}
short GETTRANSINFO_EXT (TM_Transid_Type *pp_transid, int64 *pp_type_flags)
{
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
TM_Transaction *lp_trans = gp_trans_thr->get_current();
TM_Transid *lp_transid = NULL;
short lv_error = FEOK;
TMlibTrace(("TMLIB_TRACE : GETTRANSINFO_EXT ENTRY\n"), 2);
lv_error = tmlib_check_active_tx ();
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : GETTRANSINFO_EXT returning with error %d\n",
lv_error), 1);
return lv_error;
}
lv_error = tmlib_check_miss_param(pp_transid);
if (lv_error == FEOK)
lv_error = tmlib_check_miss_param(pp_type_flags);
if (lv_error != FEOK)
{
TMlibTrace(("TMLIB_TRACE : GETTRANSINFO_EXT returning with error %d\n",
lv_error), 1);
return lv_error;
}
if (lp_trans == NULL)
lv_error = FENOTRANSID;
else
{
lp_transid = lp_trans->getTransid();
if (lp_transid)
{
memcpy (pp_transid, lp_transid->get_data_address(), sizeof (TM_Transid_Type));
*pp_type_flags = lp_trans->getTypeFlags();
}
else
lv_error = FENOTRANSID;
}
TMlibTrace(("TMLIB_TRACE : GETTRANSINFO_EXT EXIT, error %d\n", lv_error), 2);
return FEOK;
} //GETTRANSINFO_EXT
short SUSPENDTRANSACTION_EXT (TM_Transid_Type *pp_transid)
{
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
TM_Transaction *lp_trans = gp_trans_thr->get_current();
TM_Transid lv_transid;
short lv_error = FEOK;
TMlibTrace(("TMLIB_TRACE : SUSPENDTRANSACTION_EXT ENTRY\n"), 2);
if (lp_trans == NULL)
{
TMlibTrace(("TMLIB_TRACE : SUSPENDTRANSACTION_EXT EXIT with error %d\n", FENOTRANSID), 1);
return FENOTRANSID;
}
// they did not join and hence cannot suspend
if (gp_trans_thr->get_current_propagated() == true)
{
TMlibTrace(("TMLIB_TRACE : SUSPENDTRANSACTION_EXT EXIT with error %d\n",
FETXSUSPENDREJECTED), 1);
return FETXSUSPENDREJECTED;
}
lv_error = lp_trans->suspend(&lv_transid);
if (!lv_error)
{
memcpy (pp_transid, lv_transid.get_data_address(), sizeof (TM_Transid_Type));
gp_trans_thr->set_current(NULL);
if(!lp_trans->isEnder())
delete lp_trans;
}
TMlibTrace(("TMLIB_TRACE : SUSPENDTRANSACTION_EXT EXIT with error %d\n", lv_error), 2);
return lv_error;
}
short JOINTRANSACTION_EXT (TM_Transid_Type *pp_transid)
{
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
TM_Transaction *lp_trans = NULL;
short lv_error = FEOK;
TM_Transid lv_transid;
if (pp_transid == NULL)
return FEINVTRANSID;
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION_EXT ENTRY\n"), 2);
lv_transid = *pp_transid;
if ((gp_trans_thr->get_current() != NULL)
&& (gp_trans_thr->get_current()->equal(lv_transid)))
{
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION_EXT EXIT with error %d\n", FEALREADYJOINED), 1);
return FEALREADYJOINED;
}
lp_trans = gp_trans_thr->get_trans (lv_transid.get_native_type());
if (lp_trans)
{
if (!lp_trans->isEnder())
{
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION_EXT EXIT with error %d\n", FEALREADYJOINED), 1);
return FEALREADYJOINED;
}
else
{
gp_trans_thr->set_current(lp_trans); // beginner, no need to increment anything
gp_trans_thr->set_current_suspended(false);
}
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION_EXT EXIT with error %d\n", FEOK), 2);
return FEOK;
}
else
{
lp_trans = new TM_Transaction (lv_transid, false); // implicit join and add
if (lp_trans == NULL)
{
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION_EXT EXIT with error %d\n", FENOBUFSPACE), 1);
return FENOBUFSPACE;
}
lv_error = lp_trans->get_error();
if (lv_error)
delete lp_trans;
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION_EXT EXIT with error %d\n", lv_error), 2);
return lv_error;
}
}
// ---------------------------------------------------------------
// TEST_TX_COUNT
//
// Purpose - internal testing
// ---------------------------------------------------------------
short TEST_TX_COUNT()
{
short lv_error;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
TMlibTrace(("TMLIB_TRACE : TEST_TC_COUNT ENTRY \n"), 2);
if (!gv_tmlib.is_initialized())
{
gv_tmlib.initialize();
}
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
tmlib_init_req_hdr(TM_MSG_TYPE_TEST_TX_COUNT, &lv_req);
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, gv_tmlib.iv_my_nid);
if (lv_error)
return lv_error;
int lv_count = lv_rsp.u.iv_count.iv_count;
TMlibTrace(("TMLIB_TRACE : TEST_TX_COUNT is %d, for node %d EXIT\n",
lv_count, gv_tmlib.iv_my_nid), 2);
return (short) lv_count;
}
//------------------------------------------------------------------------
// TMWAIT
//
// Purpose : Wait for the TM to start and be ready to process begintransactions
// Params : none.
// ---------------------------------------------------------------------
int16 TMWAIT()
{
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
int16 lv_error = FEOK;
TMlibTrace(("TMLIB_TRACE : TMWAIT ENTRY\n"), 2);
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
if (!gv_tmlib.open_tm(gv_tmlib.iv_my_nid))
{
TMlibTrace(("TMLIB_TRACE : TMWAIT returning FETMFNOTRUNNING\n"), 1);
return FETMFNOTRUNNING;
}
tmlib_init_req_hdr(TM_MSG_TYPE_WAIT_TMUP, &lv_req);
lv_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, gv_tmlib.iv_my_nid);
TMlibTrace(("TMLIB_TRACE : TMWAIT EXIT, feerror=%d, error=%d\n",
lv_error, lv_rsp.iv_msg_hdr.miv_err.error), 2);
if (lv_error != FEOK)
return lv_error;
else
return lv_rsp.iv_msg_hdr.miv_err.error;
}
//------------------------------------------------------------------------
// TMCLIENTEXIT
//
// Purpose : To close all the TM opens from the clients before exiting
// Params : none.
// ---------------------------------------------------------------------
int16 TMCLIENTEXIT()
{
int16 lv_error = FEOK;
lv_error = gv_tmlib.close_tm();
return lv_error;
}
// -------------------------------------------------------------------
// TMLIB methods
// -------------------------------------------------------------------
// ------------------------------------------------------------------
// TMLIB
// Purpose - Register callback with seabed!
// Also need to get any configuration values.
// ------------------------------------------------------------------
TMLIB::TMLIB() : JavaObjectInterfaceTM()
{
tm_rtsigblock_proc();
iv_initialized = false;
// msg_mon_trans_register_tmlib (tmlib_callback);
msg_mon_trans_register_tmlib2 (tmlib_callback2);
for (int lv_idx = 0; lv_idx < MAX_NODES; lv_idx++)
{
memset( &ia_tm_phandle[lv_idx].iv_phandle, 0, sizeof(SB_Phandle_Type));
ia_tm_phandle[lv_idx].iv_open = 0;
ia_tm_phandle[lv_idx].iv_pid = 0;
ia_is_enlisted[lv_idx] = false;
}
iv_next_nid = 0;
iv_next_tag = 1;
iv_my_nid = iv_my_pid = iv_node_count = iv_tm_pid = 0;
iv_txn_distribute = DIST_NOT_SET;
localBegin(false);
ms_getenv_bool("DTM_LOCAL_TRANSACTIONS", &iv_localBegin);
//if (localBegin())
// printf("!! Using local transactions. !!\n");
seqNum_blockSize(1000);
ms_getenv_int("DTM_LOCAL_BLOCKSIZE", &iv_seqNum_blockSize);
enableCleanupRMInterface(true);
ms_getenv_bool("TMLIB_ENABLE_CLEANUP", &iv_enableCleanupRMInterface);
ip_seqNum = new CtmSeqNum();
strcpy(rminterface_classname,"org/apache/hadoop/hbase/client/transactional/RMInterface");
strcpy(hbasetxclient_classname,"org/trafodion/dtm/HBaseTxClient");
} //TMLIB::TMLIB
// -----------------------------------------------------------------
// add_or_update
// Purpose - get a transaction into our system.
// The new transaction is the current transaction after the call.
// If the TM Library already has this transaction in it's list of
// active transactions, then we increase the depth after making it
// current. This can happen when a server receives multiple
// awaitiox completions for the same transaction without replying
// to the first awaitiox (receive depth > 1 on NSK).
// -----------------------------------------------------------------
short TMLIB::add_or_update (TM_Transid pv_transid, bool pv_can_end,
int pv_tag)
{
int lv_new_tx = false;
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update ENTRY\n"), 2);
// if the tx doesn't exist yet here, create a new one
TM_Transaction *lp_trans = gp_trans_thr->get_trans(pv_transid.get_native_type());
pv_can_end = pv_can_end; //Intel compiler warning 869
pv_tag = pv_tag; //Intel compiler warning 869
if (!lp_trans)
{
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update - adding new transaction " PFLL "\n",
pv_transid.get_native_type()), 3);
lv_new_tx = true;
lp_trans = new TM_Transaction(pv_transid, true /*fs server*/);
if (lp_trans == NULL)
{
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update EXIT with error %d\n", FENOBUFSPACE), 1);
return FENOBUFSPACE;
}
short lv_error = lp_trans->get_error();
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update - new transaction failed with error %d\n",
lp_trans->get_error()), 1);
delete lp_trans;
lp_trans = NULL;
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update EXIT with error %d\n", lv_error), 2);
return lv_error;
}
}
else
{
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update - found existing transaction " PFLL "\n",
pv_transid.get_native_type()), 3);
}
if (lp_trans)
{
gp_trans_thr->set_current(lp_trans);
if (lv_new_tx)
gp_trans_thr->set_current_propagated(true);
else
gp_trans_thr->increase_current_depth();
}
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update EXIT\n"), 2);
return FEOK;
}
// -----------------------------------------------------------------
// add_or_update
// Purpose - get a transaction and startid into our system.
// The new transaction is the current transaction after the call.
// If the TM Library already has this transaction in it's list of
// active transactions, then we increase the depth after making it
// current. This can happen when a server receives multiple
// awaitiox completions for the same transaction without replying
// to the first awaitiox (receive depth > 1 on NSK).
// -----------------------------------------------------------------
short TMLIB::add_or_update (TM_Transid pv_transid, TM_Transseq_Type pv_startid,
bool pv_can_end, int pv_tag)
{
int lv_new_tx = false;
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update ENTRY\n"), 2);
// if the tx doesn't exist yet here, create a new one
TM_Transaction *lp_trans = gp_trans_thr->get_trans(pv_transid.get_native_type());
pv_can_end = pv_can_end; //Intel compiler warning 869
pv_tag = pv_tag; //Intel compiler warning 869
if (!lp_trans)
{
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update - adding new transaction " PFLL "\n",
pv_transid.get_native_type()), 3);
lv_new_tx = true;
lp_trans = new TM_Transaction(pv_transid, true /*fs server*/);
if (lp_trans == NULL)
{
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update EXIT with error %d\n", FENOBUFSPACE), 1);
return FENOBUFSPACE;
}
short lv_error = lp_trans->get_error();
if (lv_error)
{
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update - new transaction failed with error %d\n",
lp_trans->get_error()), 1);
delete lp_trans;
lp_trans = NULL;
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update EXIT with error %d\n", lv_error), 2);
return lv_error;
}
}
else
{
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update - found existing transaction " PFLL "\n",
pv_transid.get_native_type()), 3);
}
if (lp_trans)
{
gp_trans_thr->set_current(lp_trans);
if (lv_new_tx){
gp_trans_thr->set_startid(pv_startid);
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update - setting startid %ld for transaction " PFLL "\n",
gp_trans_thr->get_startid(), pv_transid.get_native_type()), 3);
gp_trans_thr->set_current_propagated(true);
}
else
gp_trans_thr->increase_current_depth();
}
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update EXIT\n"), 2);
return FEOK;
}
// ------------------------------------------------------------------------
// clear_entry
// Purpose : clear an entry out of our system and suspend if instructed to
// ------------------------------------------------------------------------
bool TMLIB::clear_entry (TM_Transid pv_transid, bool pv_server,
/*bool pv_suspend,*/ bool pv_force)
{
bool lv_done = false;
int lv_depth = 0;
int lv_node = pv_transid.get_node();
TMlibTrace(("TMLIB_TRACE : TMLIB::clear_entry ENTRY\n"), 2);
// either decrease depth or remove, pv_force is only set on tx propagation
if (!pv_force)
{
lv_depth = gp_trans_thr->decrease_current_depth();
TMlibTrace(("TMLIB_TRACE : TMLIB::clear_entry - decreased depth to %d\n", lv_depth), 3);
// server side
if (lv_depth <= 0)
{
TMlibTrace(("TMLIB_TRACE : TMLIB::clear_entry - delisting from node %d\n", lv_node), 3);
// we ARE a server and our count is now zero. Get rid of it
if (pv_server)
gp_trans_thr->delete_current(); // only do for server
lv_done = true;
}
}
else {
TM_Transaction *lp_trans = gp_trans_thr->get_trans (pv_transid.get_native_type());
// we don't ever want to delist the owner
if ((lp_trans) && (!lp_trans->isEnder()))
{
TMlibTrace(("TMLIB_TRACE : TMLIB::clear_entry - delisting from node %d\n", lv_node), 3);
lv_done = true;
} else
{
gp_trans_thr->set_startid(-1);
gp_trans_thr->set_current(NULL);
}
}
if ((lv_done) && (pv_force)){
gp_trans_thr->set_startid(-1);
gp_trans_thr->set_current(NULL);
}
TMlibTrace(("TMLIB_TRACE : TMLIB::add_or_update EXIT\n"), 2);
return lv_done;
}
// -----------------------------------------------------------------
// reinstate_tx
// Purpose : get a transaction and return results
// -----------------------------------------------------------------
bool TMLIB::reinstate_tx(TM_Transid *pv_transid, bool pv_settx)
{
TMlibTrace(("TMLIB_TRACE : TMLIB::reinstate_tx\n"), 2);
TM_Transaction *lp_trans = gp_trans_thr->get_trans (pv_transid->get_native_type());
if (lp_trans)
{
gp_trans_thr->set_current(lp_trans);
// special case for beginner
if (pv_settx)
{
TMLIB_EnlistedTxn_Object* lp_enlisted = gp_trans_thr->get_enlisted(pv_transid->get_native_type());
if ((lp_enlisted) && (lp_enlisted->suspended_tx() == true))
return false;
}
return true;
}
return false;
}
// --------------------------------------------------------------------
// table get methods
// --------------------------------------------------------------------
bool TMLIB::phandle_get(TPT_PTR(pp_phandle), int pv_node)
{
bool lv_open = false;
lv_open = ia_tm_phandle[pv_node].iv_open;
if (lv_open)
*pp_phandle = ia_tm_phandle[pv_node].iv_phandle;
return lv_open;
}
// -----------------------------------------------------------------------
// table set methods
// ----------------------------------------------------------------------
void TMLIB::phandle_set (TPT_PTR(pp_phandle), int pv_node)
{
ia_tm_phandle[pv_node].iv_phandle = *pp_phandle;
ia_tm_phandle[pv_node].iv_open = true;
//call decompose to get out the nid/pid
XPROCESSHANDLE_DECOMPOSE_(pp_phandle,
NULL, // node - already know it
&ia_tm_phandle[pv_node].iv_pid, // pid
NULL, // don't care
NULL, // don't care
0, // don't care
NULL, // don't care
NULL, // don't care
0, // don't care
NULL, // don't care
NULL); //sdon't care
TMlibTrace(("TMLIB_TRACE : phandle_set, received tm pid of %d for node %d\n",
ia_tm_phandle[pv_node].iv_pid, pv_node), 3);
}
void TMLIB::initialize()
{
msg_mon_get_process_info(NULL, &iv_my_nid,
&iv_my_pid);
open_tm(iv_my_nid, true);
TMlibTrace(("TMLIB_TRACE : TMLIB::initialize : my nid,pid (%d, %d)\n",
iv_my_nid, iv_my_pid ), 1);
//TODO: switch the following call to msg_mon_get_node_info2 when available.
// This call has been changed so that the node count includes spare nodes, so
// will give the wrong value for iv_node_count.
msg_mon_get_node_info(&iv_node_count, MAX_NODES, NULL);
is_initialized(true);
// We don't use gv_tmlib_initialized but set it here just to keep things aligned.
gv_tmlib_initialized = true;
}
// -------------------------------------------------------------------
// TMLIB::initJNI
// Initialize JNI interface
// Only used on demand - if you do this in TMLIB::initialize
// it gets called when it may not be used and conflicts with udrserv.
// -------------------------------------------------------------------
void TMLIB::initJNI()
{
int lv_err = 0;
static bool ls_initialized = false;
//sleep(30);
if (ls_initialized)
return;
short lv_result = setupJNI();
if (lv_result) {
fprintf(stderr, "setupJNI returned error %d in TMLIB::initJNI. Exiting.\n", lv_result);
fflush(stderr);
abort();
}
if (localBegin()) {
lv_err = initConnection(iv_my_nid);
if (lv_err)
{
TMlibTrace(("TMLIB_TRACE : TMLIB::initJNI: initConnection failed with error %d.\n", lv_err), 1);
printf("TMLIB::initConnection failed with error %d.\n", lv_err);
//tm_log_event(DTM_HBASE_INIT_FAILED, SQ_LOG_CRIT, "DTM_HBASE_INIT_FAILED", lv_error);
abort();
}
else
TMlibTrace(("TMLIB_TRACE : TMLIB::initJNI: initConnection succeeded.\n"), 1);
}
ls_initialized = true;
} //initJNI
// -------------------------------------------------------------------
// open_tm
// Purpose : open a TM on the given node
// -------------------------------------------------------------------
bool TMLIB::open_tm(int pv_node, bool pv_startup)
{
char lv_buffer[8];
int lv_error = 0;
TM_Transid_Type lv_null_transid;
int lv_oid;
TPT_DECL (lv_phandle);
int lv_retry = 0;
// this mutex is for starting up and threads racing to
// open the tm. This will stay locked and hang all transactional
// threads up until the tm is open.
// strategy, we may need to redo this part a bit
if (!is_initialized() && !pv_startup)
initialize();
// get the phandle
if (phandle_get(&lv_phandle, pv_node) == true)
{
TMlibTrace(("TMLIB_TRACE : open_tm, TM for node %d already open\n",
pv_node), 3);
return true;
}
TMlibTrace(("TMLIB_TRACE : open_tm (node %d) ENTRY\n", pv_node), 2);
sprintf (lv_buffer, "$tm%d", pv_node);
while (lv_retry < 10)
{
lv_error = msg_mon_open_process(lv_buffer,
&lv_phandle,
&lv_oid);
if (!lv_error)
{
// set phandle under mutex
phandle_set(&lv_phandle, pv_node);
lv_null_transid.id[0] = lv_null_transid.id[1] =
lv_null_transid.id[2] = lv_null_transid.id[3] = 0;
msg_mon_trans_enlist (pv_node, ia_tm_phandle[pv_node].iv_pid,
lv_null_transid);
TMlibTrace(("TMLIB_TRACE : open_tm EXIT, successfully opened TM (%d,%d)\n",
pv_node, ia_tm_phandle[pv_node].iv_pid), 2);
return true;
}
else
{
TMlibTrace(("TMLIB_TRACE : open_tm failed to open tm %s, attempt %d, error %d.\n",
lv_buffer, lv_retry, lv_error), 2);
}
lv_retry++;
}
TMlibTrace(("TMLIB_TRACE : open_tm EXIT, failed to open TM for node %d, no more attempts\n",
pv_node), 1);
// could not open tm
return false;
}
// ---------------------------------------------------------
// send_tm
// Purpose - send message to the given TM
// ---------------------------------------------------------
short TMLIB::send_tm(Tm_Req_Msg_Type *pp_req, Tm_Rsp_Msg_Type *pp_rsp,
int pv_node)
{
ushort lv_req_len = sizeof (Tm_Req_Msg_Type);
int lv_rsp_len = sizeof (Tm_Rsp_Msg_Type);
int lv_msgid;
TPT_DECL( lv_phandle);
short la_results[6];
short lv_ret = FEOK;
int32 lv_linkRetries = 0;
const int32 lc_maxLinkRetries = 100;
const int32 lc_linkPause = 3000; // 3 second
TMlibTrace(("TMLIB_TRACE : send_tm (node %d) ENTRY\n", pv_node), 2);
if (!gv_tmlib.open_tm(pv_node))
{
TMlibTrace(("TMLIB_TRACE : returning FETMFNOTRUNNING\n"), 1);
return FETMFNOTRUNNING;
}
// get phandle and (FOR NOW) abort. if we get into this
// method, we should have already opened and stored the tm
gv_tmlib.phandle_get(&lv_phandle, pv_node);
do {
lv_ret = BMSG_LINK_(&lv_phandle, // phandle
&lv_msgid, // msgid
NULL, // reqctrl
0, // reqctrlsize
NULL, // replyctrl
0, // replyctrlmax
(char *) pp_req, // reqdata
lv_req_len, // reqdatasize
(char *) pp_rsp, // replydata
lv_rsp_len, // replydatamax
0, // linkertag
TMLIB_LINK_PRIORITY, // pri
0, // xmitclass
0); // linkopts
lv_linkRetries++;
if ((lv_ret == FENOLCB) && (lv_linkRetries <= lc_maxLinkRetries) &&
(lv_linkRetries > 1))
SB_Thread::Sthr::sleep(lc_linkPause); // in msec
} while (lv_ret == FENOLCB && ++lv_linkRetries <= lc_maxLinkRetries);
if (lv_ret)
{
TMlibTrace(("TMLIB_TRACE : send_tm , BMSG_LINK error is %d\n", lv_ret), 3);
}
else
{
lv_ret = BMSG_BREAK_(lv_msgid, la_results, &lv_phandle);
if (lv_ret)
{
TMlibTrace(("TMLIB_TRACE : send_tm , BMSG_BREAK error is %d\n", lv_ret), 3);
}
}
switch (lv_ret)
{
case FEPATHDOWN:
lv_ret = FETMFNOTRUNNING;
break;
case FESERVICEDISABLED:
// This is returned during fail-safe by the TM to indicate that the node
// is about to go down. No further transactional requests will be processed
// by this node and any DTM clients should migrate to another node.
// In M5 HA SPR we will simply hang here to block the client.
SB_Thread::Sthr::sleep(-1); // in msec - forever!
}
TMlibTrace(("TMLIB_TRACE : send_tm EXIT returning error %d.\n", lv_ret), 2);
return lv_ret;
}
// ---------------------------------------------------------
// send_tm_link
// Purpose - send message to the given TM
// ---------------------------------------------------------
short TMLIB::send_tm_link(char *pp_req, int buffer_size, Tm_Rsp_Msg_Type *pp_rsp,
int pv_node)
{
//ushort lv_req_len = sizeof (Tm_Req_Msg_Type);
int lv_rsp_len = sizeof (Tm_Rsp_Msg_Type);
int lv_msgid;
TPT_DECL( lv_phandle);
short la_results[6];
short lv_ret = FEOK;
int32 lv_linkRetries = 0;
const int32 lc_maxLinkRetries = 100;
const int32 lc_linkPause = 3000; // 3 second
TMlibTrace(("TMLIB_TRACE : send_tm (node %d) ENTRY\n", pv_node), 2);
if (!gv_tmlib.open_tm(pv_node))
{
TMlibTrace(("TMLIB_TRACE : returning FETMFNOTRUNNING\n"), 1);
return FETMFNOTRUNNING;
}
// get phandle and (FOR NOW) abort. if we get into this
// method, we should have already opened and stored the tm
gv_tmlib.phandle_get(&lv_phandle, pv_node);
do {
lv_ret = BMSG_LINK_(&lv_phandle, // phandle
&lv_msgid, // msgid
NULL, // reqctrl
0, // reqctrlsize
NULL, // replyctrl
0, // replyctrlmax
pp_req, // reqdata
buffer_size, // reqdatasize
(char *) pp_rsp, // replydata
lv_rsp_len, // replydatamax
0, // linkertag
TMLIB_LINK_PRIORITY, // pri
0, // xmitclass
0); // linkopts
lv_linkRetries++;
if ((lv_ret == FENOLCB) && (lv_linkRetries <= lc_maxLinkRetries) &&
(lv_linkRetries > 1))
SB_Thread::Sthr::sleep(lc_linkPause); // in msec
} while (lv_ret == FENOLCB && ++lv_linkRetries <= lc_maxLinkRetries);
if (lv_ret)
{
TMlibTrace(("TMLIB_TRACE : send_tm , BMSG_LINK error is %d\n", lv_ret), 3);
}
else
{
lv_ret = BMSG_BREAK_(lv_msgid, la_results, &lv_phandle);
if (lv_ret)
{
TMlibTrace(("TMLIB_TRACE : send_tm , BMSG_BREAK error is %d\n", lv_ret), 3);
}
}
switch (lv_ret)
{
case FEPATHDOWN:
lv_ret = FETMFNOTRUNNING;
break;
case FESERVICEDISABLED:
SB_Thread::Sthr::sleep(-1); // in msec - forever!
}
TMlibTrace(("TMLIB_TRACE : send_tm EXIT returning error %d.\n", lv_ret), 2);
return lv_ret;
}
// ---------------------------------------------------------
// beginner_nid
// Purpose - return the nid of the TM process to be used
// to begin the next transaction.
// If iv_txn_distribute is NODE_LOCAL_BEGINS then this is
// always the node of the calling process.
// However, if it is CLUSTER_WIDE_BEGINS then the TMLib
// allocated the next nid in the cluster.
// ---------------------------------------------------------
int TMLIB::beginner_nid()
{
int lv_nid = gv_tmlib.iv_my_nid;
// First time through work out the txn distribution algorithm.
if (iv_txn_distribute == DIST_NOT_SET)
{
int lv_txn_dist = 0;
ms_getenv_int("TMLIB_TXN_DISTRIBUTION", &lv_txn_dist);
if (lv_txn_dist > 0)
iv_txn_distribute = (TRANSACTION_DISTRIBUTION) lv_txn_dist;
else
iv_txn_distribute = DEFAULT_TRANSACTION_DISTRIBUTION;
TMlibTrace(("TMLIB_TRACE : TMLIB::beginner_nid : TMLIB_TXN_DISTRIBUTION set to %d\n",
iv_txn_distribute), 2);
}
if (gv_tmlib.iv_txn_distribute == CLUSTER_WIDE_BEGINS)
{
lv_nid = gv_tmlib.iv_next_nid;
// distribution algorightm is a simple round-robin for now
gv_tmlib.iv_next_nid++;
if (gv_tmlib.iv_next_nid >= gv_tmlib.iv_node_count)
gv_tmlib.iv_next_nid = 0;
}
TMlibTrace(("TMLIB_TRACE : TMLIB::beginner_nid : Beginning transaction on node %d\n",
lv_nid), 2);
return lv_nid;
} //beginner_nid
// ---------------------------------------------------------
// new_tag
// Purpose - allocate a new tag for a begintransaction call.
// ---------------------------------------------------------
unsigned int TMLIB::new_tag()
{
if (iv_next_tag == 0U || iv_next_tag == MAX_TXN_TAGS)
iv_next_tag = 1U;
unsigned int lv_tag = iv_next_tag++;
TMlibTrace(("TMLIB_TRACE : TMLIB::new_tag : Allocating tag %d, next tag %d.\n",
lv_tag, iv_next_tag), 3);
return lv_tag;
}
short TMLIB::setupJNI()
{
//printf("setupJNI.\n");
TMLibJavaMethods_[JM_CTOR ].jm_name = "<init>";
TMLibJavaMethods_[JM_CTOR ].jm_signature = "()V";
TMLibJavaMethods_[JM_INIT1 ].jm_name = "init";
TMLibJavaMethods_[JM_INIT1 ].jm_signature = "(S)Z";
TMLibJavaMethods_[JM_ABORT ].jm_name = "abortTransaction";
TMLibJavaMethods_[JM_ABORT ].jm_signature = "(J)S";
TMLibJavaMethods_[JM_TRYCOMMIT ].jm_name = "tryCommit";
TMLibJavaMethods_[JM_TRYCOMMIT ].jm_signature = "(J)S";
TMLibJavaMethods_[JM_CLEARTRANSACTIONSTATES].jm_name = "clearTransactionStates";
TMLibJavaMethods_[JM_CLEARTRANSACTIONSTATES].jm_signature = "(J)V";
//sleep(30);
short ret = JavaObjectInterfaceTM::init(hbasetxclient_classname, javaClass_,
(JavaMethodInit*)&TMLibJavaMethods_, JM_LAST_HBASETXCLIENT, false);
if (ret == JOI_OK) {
if (enableCleanupRMInterface()) {
// Setup call to RMInterface.clearTransactionStates
iv_RMInterface_class = _tlp_jenv->FindClass(rminterface_classname);
if (iv_RMInterface_class != NULL) {
TMLibJavaMethods_[JM_CLEARTRANSACTIONSTATES].methodID =
_tlp_jenv->GetStaticMethodID(iv_RMInterface_class,
TMLibJavaMethods_[JM_CLEARTRANSACTIONSTATES].jm_name.data(),
TMLibJavaMethods_[JM_CLEARTRANSACTIONSTATES].jm_signature.data());
}
else {
fprintf(stderr,"FindClass for class name %s failed. Aborting.\n",rminterface_classname);
fflush(stderr);
abort();
}
}
}
else {
fprintf(stderr,"JavaObjectInterfaceTM::init returned error %d. Aborting.\n",ret);
fflush(stderr);
abort();
}
return ret;
} //setupJNI
///////////////////////////////////////////////
// JNI Methods //
///////////////////////////////////////////////
short TMLIB::initConnection(short pv_nid)
{
jshort jdtmid = pv_nid;
//sleep(30);
_tlp_jenv->CallBooleanMethod(javaObj_, TMLibJavaMethods_[JM_INIT1].methodID, jdtmid);
if (getExceptionDetails(NULL)) {
tm_log_write(DTM_TM_JNI_ERROR, SQ_LOG_ERR, (char *)"TMLIB::initConnection()", (char *)_tlp_error_msg->c_str(), -1);
return RET_EXCEPTION;
}
// Ignore result and return JOI_OK
return JOI_OK;
}
void TMLIB::cleanupTransactionLocal(long transactionID)
{
initJNI();
if (enableCleanupRMInterface() == false)
return;
jlong jlv_transid = transactionID;
JOI_RetCode lv_joi_retcode = JOI_OK;
lv_joi_retcode = JavaObjectInterfaceTM::initJVM();
if (lv_joi_retcode != JOI_OK) {
fprintf(stderr, "JavaObjectInterfaceTM::initJVM returned: %d\n", lv_joi_retcode);
fflush(stderr);
abort();
}
_tlp_jenv->CallStaticVoidMethod(iv_RMInterface_class, TMLibJavaMethods_[JM_CLEARTRANSACTIONSTATES].methodID, jlv_transid);
if (getExceptionDetails(NULL)) {
tm_log_write(DTM_TM_JNI_ERROR, SQ_LOG_ERR, (char *)"TMLIB::cleanupTransactionLocal()", (char *)_tlp_error_msg->c_str(), -1);
fprintf(stderr, "clearTransactionStates raised an exception!\n");
fflush(stderr);
abort();
}
return;
} //cleanupTransactionLocal
short TMLIB::endTransactionLocal(long transactionID)
{
jlong jlv_transid = transactionID;
JOI_RetCode lv_joi_retcode = JOI_OK;
lv_joi_retcode = JavaObjectInterfaceTM::initJVM();
if (lv_joi_retcode != JOI_OK) {
fprintf(stderr,"Local commit failed. JavaObjectInterfaceTM::initJVM returned: %d\n", lv_joi_retcode);
fflush(stderr);
abort();
}
jshort jresult = _tlp_jenv->CallShortMethod(javaObj_, TMLibJavaMethods_[JM_TRYCOMMIT].methodID, jlv_transid);
if (getExceptionDetails(NULL)) {
tm_log_write(DTM_TM_JNI_ERROR, SQ_LOG_ERR, (char *)"TMLIB::endTransaction()", (char *)_tlp_error_msg->c_str(), -1);
return RET_EXCEPTION;
}
// RET_NOTX means the transaction wasn't found by the HBase client code (trx). This is ok here, it
// simply means the transaction hasn't been seen by the HBase client code, so no work was done on it.
if (jresult == RET_NOTX)
{
// printf("TMLIB::endTransactionLocal returning RET_NOTX(1) - empty txn.\n");
return RET_OK;
}
return jresult;
} //endTransactionLocal
short TMLIB::abortTransactionLocal(long transactionID)
{
jlong jlv_transid = transactionID;
JOI_RetCode lv_joi_retcode = JOI_OK;
lv_joi_retcode = JavaObjectInterfaceTM::initJVM();
if (lv_joi_retcode != JOI_OK) {
fprintf(stderr,"Local abort failed. JavaObjectInterfaceTM::initJVM returned: %d\n", lv_joi_retcode);
fflush(stderr);
abort();
}
jshort jresult = _tlp_jenv->CallShortMethod(javaObj_, TMLibJavaMethods_[JM_ABORT].methodID, jlv_transid);
if (getExceptionDetails(NULL)) {
tm_log_write(DTM_TM_JNI_ERROR, SQ_LOG_ERR, (char *)"TMLIB::abortTransaction()", (char *)_tlp_error_msg->c_str(), -1);
return RET_EXCEPTION;
}
// RET_NOTX means the transaction wasn't found by the HBase client code (trx). This is ok here, it
// simply means the transaction hasn't been seen by the HBase client code, so no work was done on it.
if (jresult == RET_NOTX)
{
return RET_OK;
}
return jresult;
} //abortTransactionLocal
bool TMLIB::close_tm()
{
TPT_DECL (lv_phandle);
if (!gv_tmlib.is_initialized())
return true;
for (int i = 0; i < iv_node_count; i++) {
if (phandle_get(&lv_phandle, i) == true)
msg_mon_close_process(&lv_phandle);
}
return true;
}
//----------------------------------------------------------------------------
// DTM_LOCALTRANSACTION
// Purpose: Returns true if local transactions are
// supported, otherwise false.
// Also returns the current transid
//----------------------------------------------------------------------------
bool DTM_LOCALTRANSACTION(int32 *pp_node, int32 *pp_seqnum)
{
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
if (!gv_tmlib.is_initialized())
gv_tmlib.initialize();
bool lv_local = gv_tmlib.localBegin();
TM_Transid *lp_transid = NULL;
// instantiate a gp_trans_thr object for this thread if needed.
if (gp_trans_thr == NULL)
gp_trans_thr = new TMLIB_ThreadTxn_Object();
short lv_error = tmlib_check_active_tx();
if (lv_error) {
TMlibTrace(("TMLIB_TRACE : DTM_LOCALTRANSACTION returning with error %d\n",
lv_error), 2);
*pp_node = *pp_seqnum = -1;
return false;
}
lp_transid = gp_trans_thr->get_current()->getTransid();
if (lp_transid == NULL) {
TMlibTrace(("TMLIB_TRACE : DTM_LOCALTRANSACTION failed, aborting\n"), 1);
abort();
}
*pp_node = lp_transid->get_node();
*pp_seqnum = lp_transid->get_seq_num();
return lv_local;
}