blob: aca58a57d412bcdc6f759f356169fe113b9814ae [file]
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
// dtm includes
#include "tmlib.h"
#include "tmlogging.h"
#include "tmtxkey.h"
// seabed includes
#include "seabed/trace.h"
extern TMLIB gv_tmlib;
// --------------------------------------------------------------------------
// TM_Transaction::TM_Transaction
// -- constructor with implicit begin
// Input parameters:
// abort_timeout - > 0 = Timeout in seconds until transaction will be aborted.
// -1 = wait forever
// 0 = system default.
// transactiontype_bits - Transaction Type bits. TBD.
// --------------------------------------------------------------------------
TM_Transaction::TM_Transaction(int abort_timeout, int64 transactiontype_bits)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::Constructor1 ENTRY\n"), 4);
iv_coordinator = true;
iv_last_error = 0;
iv_server = false;
iv_abort_timeout = abort_timeout;
iv_transactiontype = transactiontype_bits;
begin (abort_timeout, transactiontype_bits);
}
// --------------------------------------------------------------------------
// TM_Transaction::TM_Transaction
// -- constructor with implicit join
// --------------------------------------------------------------------------
TM_Transaction::TM_Transaction(TM_Transid pv_transid, bool pv_server_tx)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::Constructor2 ENTRY. transid "
"(%d,%d), server flag %d.\n", pv_transid.get_node(), pv_transid.get_seq_num(), pv_server_tx), 4);
iv_transid = pv_transid;
iv_coordinator = false;
iv_server = pv_server_tx;
iv_last_error = 0;
iv_abort_timeout = 0;
iv_transactiontype = 0;
join (iv_coordinator);
}
// --------------------------------------------------------------------------
// TM_Transaction::TM_Transaction
// -- Default constructor with no join or begin
// --------------------------------------------------------------------------
TM_Transaction::TM_Transaction()
:iv_server(false), iv_transid(0)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::Constructor3 Default ENTRY.\n"), 4);
iv_coordinator = false;
iv_last_error = 0;
iv_abort_timeout = 0;
iv_transactiontype = 0;
}
// --------------------------------------------------------------------------
// TM_Transaction::~TM_Transaction
// -- destructor
// --------------------------------------------------------------------------
TM_Transaction::~TM_Transaction()
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::Destructor ENTRY. transid "
"(%d,%d).\n", iv_transid.get_node(), iv_transid.get_seq_num()), 4);
}
// TOPL
short TM_Transaction::register_region(long startid, int port, char *hostName, int hostname_Length, long startcode, char *regionInfo, int regionInfo_Length)
{
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
//printf ("TM_Transaction::register_region ENTRY with startid %ld \n", startid);
TMlibTrace(("TMLIB_TRACE : TM_Transaction::register_region ENTRY\n"), 2);
if (!gv_tmlib_initialized)
gv_tmlib.initialize();
tmlib_init_req_hdr(TM_MSG_TYPE_REGISTERREGION, &lv_req);
lv_req.u.iv_register_region.iv_startid = startid;
lv_req.u.iv_register_region.iv_pid = gv_tmlib.iv_my_pid;
lv_req.u.iv_register_region.iv_nid = gv_tmlib.iv_my_nid;
iv_transid.set_external_data_type(&lv_req.u.iv_register_region.iv_transid);
lv_req.u.iv_register_region.iv_port = port;
memcpy (lv_req.u.iv_register_region.ia_hostname, hostName, hostname_Length);
lv_req.u.iv_register_region.iv_hostname_length = hostname_Length;
lv_req.u.iv_register_region.iv_startcode = startcode;
memcpy (lv_req.u.iv_register_region.ia_regioninfo2, regionInfo, regionInfo_Length);
lv_req.u.iv_register_region.iv_regioninfo_length = regionInfo_Length;
iv_last_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, iv_transid.get_node());
if (iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::register_region returning error %d\n", iv_last_error), 1);
return iv_last_error;
}
// record tag in our param, if an error occurred, this
// will be -1, so we can do a blind copy here
iv_last_error = lv_rsp.iv_msg_hdr.miv_err.error;
TMlibTrace(("TMLIB_TRACE : TM_Transaction::register_region EXIT with error %d\n", iv_last_error), 2);
return iv_last_error;
}
short TM_Transaction::create_table(char* pa_tbldesc, int pv_tbldesc_len,
char* pa_tblname, char** pa_keys,
int pv_numsplits, int pv_keylen,
char* &pv_err_str, int &pv_err_len)
{
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
int len = sizeof(Tm_Req_Msg_Type);
int len_aligned = 8*(((len + 7)/8));
int buffer_size = pv_numsplits*pv_keylen;
int total_buffer = len_aligned + buffer_size;
char *buffer = new char[total_buffer];
TMlibTrace(("TMLIB_TRACE : TM_Transaction::create_table ENTRY tablename: %s\n", pa_tblname), 1);
if (!gv_tmlib_initialized)
gv_tmlib.initialize();
tmlib_init_req_hdr(TM_MSG_TYPE_DDLREQUEST, &lv_req);
iv_transid.set_external_data_type(&lv_req.u.iv_ddl_request.iv_transid);
memcpy(lv_req.u.iv_ddl_request.ddlreq, pa_tbldesc, pv_tbldesc_len);
lv_req.u.iv_ddl_request.ddlreq_len = pv_tbldesc_len;
lv_req.u.iv_ddl_request.crt_numsplits = pv_numsplits;
lv_req.u.iv_ddl_request.crt_keylen = pv_keylen;
lv_req.u.iv_ddl_request.ddlreq_type = TM_DDL_CREATE;
memcpy(buffer, (char*)&lv_req, len);
int i;
int index = len_aligned;
for(i=0; i<pv_numsplits; i++){
memcpy((buffer+index), pa_keys[i], pv_keylen);
index = index + pv_keylen;
}
iv_last_error = gv_tmlib.send_tm_link(buffer, total_buffer, &lv_rsp, iv_transid.get_node());
delete buffer;
if(iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::create_table returning error %d\n", iv_last_error), 1);
return iv_last_error;
}
iv_last_error = lv_rsp.iv_msg_hdr.miv_err.error;
if(iv_last_error)
{
int maxErrStrBufLen = sizeof(lv_rsp.u.iv_ddl_response.iv_err_str);
pv_err_len = lv_rsp.u.iv_ddl_response.iv_err_str_len < maxErrStrBufLen ? lv_rsp.u.iv_ddl_response.iv_err_str_len : maxErrStrBufLen;
pv_err_str = new char[pv_err_len];
memcpy(pv_err_str, lv_rsp.u.iv_ddl_response.iv_err_str, pv_err_len);
}
return iv_last_error;
}
short TM_Transaction::alter_table(char * pa_tblname, int pv_tblname_len,
char ** pv_tbloptions, int pv_tbloptslen, int pv_tbloptscnt,
char* &pv_err_str, int &pv_err_len)
{
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
TMlibTrace(("TMLIB_TRACE : TM_Transaction::alter_table ENTRY tablename: %s\n", pa_tblname), 1);
int len = sizeof(Tm_Req_Msg_Type);
int len_aligned = 8*(((len + 7)/8));
int buffer_size = pv_tbloptslen*pv_tbloptscnt;
int total_buffer = len_aligned + buffer_size;
char *buffer = new char[total_buffer];
if (!gv_tmlib_initialized)
gv_tmlib.initialize();
tmlib_init_req_hdr(TM_MSG_TYPE_DDLREQUEST, &lv_req);
iv_transid.set_external_data_type(&lv_req.u.iv_ddl_request.iv_transid);
memcpy(lv_req.u.iv_ddl_request.ddlreq, pa_tblname, pv_tblname_len);
lv_req.u.iv_ddl_request.ddlreq_len = pv_tblname_len;
lv_req.u.iv_ddl_request.alt_numopts = pv_tbloptscnt;
lv_req.u.iv_ddl_request.alt_optslen = pv_tbloptslen;
lv_req.u.iv_ddl_request.ddlreq_type = TM_DDL_ALTER;
memcpy(buffer, (char*)&lv_req, len);
int i;
int index = len_aligned;
for(i=0; i<pv_tbloptscnt; i++){
memcpy((buffer+index), pv_tbloptions[i], pv_tbloptslen);
index = index + pv_tbloptslen;
}
short lv_last_error = gv_tmlib.send_tm_link(buffer, total_buffer, &lv_rsp, iv_transid.get_node());
delete buffer;
if(lv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::alter_table returning error %d\n", iv_last_error), 1);
return lv_last_error;
}
iv_last_error = lv_rsp.iv_msg_hdr.miv_err.error;
if(iv_last_error)
{
int maxErrStrBufLen = sizeof(lv_rsp.u.iv_ddl_response.iv_err_str);
pv_err_len = lv_rsp.u.iv_ddl_response.iv_err_str_len < maxErrStrBufLen ? lv_rsp.u.iv_ddl_response.iv_err_str_len : maxErrStrBufLen;
pv_err_str = new char[pv_err_len];
memcpy(pv_err_str, lv_rsp.u.iv_ddl_response.iv_err_str, pv_err_len);
}
return iv_last_error;
}
short TM_Transaction::reg_truncateonabort(char* pa_tblname, int pv_tblname_len,
char* &pv_err_str, int &pv_err_len)
{
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
TMlibTrace(("TMLIB_TRACE : TM_Transaction::reg_truncateonabort ENTRY tablename: %s\n", pa_tblname), 1);
if (!gv_tmlib_initialized)
gv_tmlib.initialize();
tmlib_init_req_hdr(TM_MSG_TYPE_DDLREQUEST, &lv_req);
iv_transid.set_external_data_type(&lv_req.u.iv_ddl_request.iv_transid);
memcpy(lv_req.u.iv_ddl_request.ddlreq, pa_tblname, pv_tblname_len);
lv_req.u.iv_ddl_request.ddlreq_len = pv_tblname_len;
lv_req.u.iv_ddl_request.ddlreq_type = TM_DDL_TRUNCATE;
iv_last_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, iv_transid.get_node());
if(iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::reg_truncateonabort returning error %d\n", iv_last_error), 1);
return iv_last_error;
}
iv_last_error = lv_rsp.iv_msg_hdr.miv_err.error;
if(iv_last_error)
{
int maxErrStrBufLen = sizeof(lv_rsp.u.iv_ddl_response.iv_err_str);
pv_err_len = lv_rsp.u.iv_ddl_response.iv_err_str_len < maxErrStrBufLen ? lv_rsp.u.iv_ddl_response.iv_err_str_len : maxErrStrBufLen;
pv_err_str = new char[pv_err_len];
memcpy(pv_err_str, lv_rsp.u.iv_ddl_response.iv_err_str, pv_err_len);
}
return iv_last_error;
}
short TM_Transaction::drop_table(char* pa_tblname, int pv_tblname_len,
char* &pv_err_str, int &pv_err_len)
{
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
TMlibTrace(("TMLIB_TRACE : TM_Transaction::drop_table ENTRY tablename: %s\n", pa_tblname), 1);
if (!gv_tmlib_initialized)
gv_tmlib.initialize();
tmlib_init_req_hdr(TM_MSG_TYPE_DDLREQUEST, &lv_req);
iv_transid.set_external_data_type(&lv_req.u.iv_ddl_request.iv_transid);
memcpy(lv_req.u.iv_ddl_request.ddlreq, pa_tblname, pv_tblname_len);
lv_req.u.iv_ddl_request.ddlreq_len = pv_tblname_len;
lv_req.u.iv_ddl_request.ddlreq_type = TM_DDL_DROP;
iv_last_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, iv_transid.get_node());
if(iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::drop_table returning error %d\n", iv_last_error), 1);
return iv_last_error;
}
iv_last_error = lv_rsp.iv_msg_hdr.miv_err.error;
if(iv_last_error)
{
int maxErrStrBufLen = sizeof(lv_rsp.u.iv_ddl_response.iv_err_str);
pv_err_len = lv_rsp.u.iv_ddl_response.iv_err_str_len < maxErrStrBufLen ? lv_rsp.u.iv_ddl_response.iv_err_str_len : maxErrStrBufLen;
pv_err_str = new char[pv_err_len];
memcpy(pv_err_str, lv_rsp.u.iv_ddl_response.iv_err_str, pv_err_len);
}
return iv_last_error;
}
// --------------------------------------------------------------------------
// TM_Transaction::begin
// -- begin transaction, private method
// Added local transaction support for Trafodion
// --------------------------------------------------------------------------
short TM_Transaction::begin(int abort_timeout, int64 transactiontype_bits)
{
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
TMlibTrace(("TMLIB_TRACE : TM_Transaction::begin ENTRY\n"), 2);
// Validate abort_timeout
if (abort_timeout != -1 && abort_timeout < 0)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::begin : abort_timeout not -1 or > 0\n"), 1);
iv_last_error = FEBOUNDSERR;
return iv_last_error;
}
// We don't validate transactiontype flags
if (!gv_tmlib_initialized)
gv_tmlib.initialize();
// if there is already an active tx, switch it out
// allows user to do multiple BEGINS
if (gp_trans_thr->get_current() != NULL)
gp_trans_thr->get_current()->release();
// Bypass BEGINTRANSACTION call for Trafodion local transactions.
if (gv_tmlib.localBegin()) {
CTmTxKey *lp_txid = new CTmTxKey(gv_tmlib.iv_my_nid, gv_tmlib.seqNum()->nextSeqNum());
iv_transid = lp_txid->id();
iv_tag = lp_txid->seqnum();
gp_trans_thr->add_trans(this);
delete lp_txid;
//printf("Local begin bypassing begin txn (%d,%d)\n", lp_txid->node(), lp_txid->seqnum());
}
else { //Begin in TM
tmlib_init_req_hdr(TM_MSG_TYPE_BEGINTRANSACTION, &lv_req);
lv_req.u.iv_begin_trans.iv_pid = gv_tmlib.iv_my_pid;
lv_req.u.iv_begin_trans.iv_nid = gv_tmlib.iv_my_nid;
lv_req.u.iv_begin_trans.iv_abort_timeout = abort_timeout;
lv_req.u.iv_begin_trans.iv_transactiontype_bits = transactiontype_bits;
iv_last_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, gv_tmlib.beginner_nid());
if (iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::begin returning error %d\n", iv_last_error), 1);
return iv_last_error;
}
// record tag in our param, if an error occurred, this
// will be -1, so we can do a blind copy here
iv_last_error = lv_rsp.iv_msg_hdr.miv_err.error;
if (!iv_last_error)
{
// record as the active tx
iv_transid = lv_rsp.u.iv_begin_trans.iv_transid;
iv_tag = gv_tmlib.new_tag();
gp_trans_thr->add_trans(this);
}
}
TMlibTrace(("TMLIB_TRACE : TM_Transaction::begin (seq num %d, tag %d) EXIT\n",
iv_transid.get_seq_num(), iv_tag), 2);
return iv_last_error;
}
// --------------------------------------------------------------------------
// TM_Transaction::end
// -- end transaction
// Trafodion: added support for local transactions.
// --------------------------------------------------------------------------
short TM_Transaction::end(char* &pv_err_str, int &pv_err_len)
{
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
TMlibTrace(("TMLIB_TRACE : TM_Transaction::end (seq num %d) ENTRY\n", iv_transid.get_seq_num()), 2);
iv_last_error = tmlib_check_active_tx();
if (iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::end returning error %d \n", iv_last_error), 1);
return iv_last_error;
}
iv_last_error = tmlib_check_outstanding_ios();
if (iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::end returning error %d \n", iv_last_error), 1);
return iv_last_error;
}
if (!gv_tmlib_initialized)
gv_tmlib.initialize();
if (gv_tmlib.localBegin()) {
//printf("Local begin ENDTRANSACTION %d\n", iv_transid.get_seq_num());
gv_tmlib.initJNI();
short lv_HBerror = gv_tmlib.endTransactionLocal((int64)iv_transid.get_native_type());
iv_last_error = HBasetoTxnError(lv_HBerror);
if (iv_last_error)
{
printf("TM_Transaction::end returning error (%d,%d)\n",iv_last_error, lv_HBerror);
TMlibTrace(("TMLIB_TRACE : TM_Transaction::end local txn returning error (%d, %d).\n", iv_last_error, lv_HBerror), 1);
return iv_last_error;
}
}
else {
tmlib_init_req_hdr(TM_MSG_TYPE_ENDTRANSACTION, &lv_req);
lv_req.u.iv_end_trans.iv_tag = iv_transid.get_seq_num();
lv_req.u.iv_end_trans.iv_pid = gv_tmlib.iv_my_pid;
lv_req.u.iv_end_trans.iv_nid = gv_tmlib.iv_my_nid;
iv_transid.set_external_data_type(&lv_req.u.iv_end_trans.iv_transid);
iv_last_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, iv_transid.get_node());
if (iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::end returning error %d \n", iv_last_error), 1);
return iv_last_error;
}
// so far, FEJOINSOUSTANDING is the only retryable error
// We want to clean up for things like FEOK, FENOTRANSID, FEINVTRANSID
// if its FEDEVICEDOWNFORTMF, then it will get aborted as that means the
// owner TM is down.
// FEABORTEDTRANSID & FEENDEDTRANSID is ok,
if ((lv_rsp.iv_msg_hdr.miv_err.error == FEINVTRANSID) ||
(lv_rsp.iv_msg_hdr.miv_err.error == FENOTRANSID) ||
(lv_rsp.iv_msg_hdr.miv_err.error == FEOK) ||
(lv_rsp.iv_msg_hdr.miv_err.error == FEABORTEDTRANSID) ||
(lv_rsp.iv_msg_hdr.miv_err.error == FEENDEDTRANSID))
{
gv_tmlib.clear_entry (iv_transid , false, true);
gp_trans_thr->remove_trans (this);
}
iv_last_error = lv_rsp.iv_msg_hdr.miv_err.error;
if(iv_last_error)
{
int maxErrStrBufLen = sizeof(lv_rsp.u.iv_end_trans.iv_err_str);
pv_err_len = lv_rsp.u.iv_end_trans.iv_err_str_len < maxErrStrBufLen ? lv_rsp.u.iv_end_trans.iv_err_str_len : maxErrStrBufLen;
pv_err_str = new char[pv_err_len];
memcpy(pv_err_str, lv_rsp.u.iv_end_trans.iv_err_str, pv_err_len);
}
}
TMlibTrace(("TMLIB_TRACE : TM_Transaction::end (seq num %d) EXIT\n", iv_transid.get_seq_num()), 2);
return iv_last_error;
}
// --------------------------------------------------------------------------
// TM_Transaction::abort
// -- abort transaction
// --------------------------------------------------------------------------
short TM_Transaction::abort(bool pv_doom)
{
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
TMlibTrace(("TMLIB_TRACE : TM_Transaction::abort (seq num %d) ENTRY\n", iv_transid.get_seq_num()), 2);
iv_last_error = tmlib_check_active_tx();
if (iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::abort returning error %d \n", iv_last_error), 1);
return iv_last_error;
}
// SQL needs to be
// able to call ABORTTRANSACTION while having outstanding IOs.
/*
iv_last_error = tmlib_check_outstanding_ios();
if (iv_last_error)
return iv_last_error;
*/
if (!gv_tmlib_initialized)
gv_tmlib.initialize();
if (gv_tmlib.localBegin()) {
//printf("Local begin ABORTTRANSACTION %d\n", iv_transid.get_seq_num());
gv_tmlib.initJNI();
short lv_HBerror = gv_tmlib.abortTransactionLocal((int64) iv_transid.get_native_type());
iv_last_error = HBasetoTxnError(lv_HBerror);
if (iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::abort local txn returning error (%d, %d).\n", iv_last_error, lv_HBerror), 1);
return iv_last_error;
}
}
else {
if (pv_doom)
tmlib_init_req_hdr(TM_MSG_TYPE_DOOMTX, &lv_req);
else
tmlib_init_req_hdr(TM_MSG_TYPE_ABORTTRANSACTION, &lv_req);
lv_req.u.iv_abort_trans.iv_tag = iv_transid.get_seq_num();
lv_req.u.iv_abort_trans.iv_nid = gv_tmlib.iv_my_nid;
lv_req.u.iv_abort_trans.iv_pid = gv_tmlib.iv_my_pid;
iv_transid.set_external_data_type (&lv_req.u.iv_abort_trans.iv_transid);
iv_last_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, iv_transid.get_node());
if (iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::abort returning error %d \n", iv_last_error), 1);
return iv_last_error;
}
// so far, FEJOINSOUSTANDING is the only retryable error
// We want to clean up for things like FEOK, FENOTRANSID, FEINVTRANSID
// if its FEDEVICEDOWNFORTMF, then it will get aborted as that means the
// owner TM is down.
// FEABORTEDTRANSID & FEENDEDTRANSID is ok,
if ((lv_rsp.iv_msg_hdr.miv_err.error == FEINVTRANSID) ||
(lv_rsp.iv_msg_hdr.miv_err.error == FENOTRANSID) ||
(lv_rsp.iv_msg_hdr.miv_err.error == FEOK) ||
(lv_rsp.iv_msg_hdr.miv_err.error == FEABORTEDTRANSID) ||
(lv_rsp.iv_msg_hdr.miv_err.error == FEENDEDTRANSID))
{
if (!pv_doom)
{
gv_tmlib.clear_entry(iv_transid, false, true);
gp_trans_thr->remove_trans (this);
}
}
iv_last_error = lv_rsp.iv_msg_hdr.miv_err.error;
}
TMlibTrace(("TMLIB_TRACE : TM_Transaction::abort (seq num %d) EXIT\n", iv_transid.get_seq_num()), 2);
return iv_last_error;
}
// --------------------------------------------------------------------------
// TM_Transaction::join
// -- join an existing transaction, private method
// --------------------------------------------------------------------------
short TM_Transaction::join(bool pv_coordinator_role)
{
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
TMlibTrace(("TMLIB_TRACE : TM_Transaction::join ENTRY\n"), 2);
if (!gv_tmlib_initialized)
gv_tmlib.initialize();
// if there is already an active tx, switch it out
// allows user to do multiple JOINS
if (gp_trans_thr->get_current() != NULL)
{
if (gp_trans_thr->get_current()->equal(iv_transid))
{
iv_last_error = FEALREADYJOINED;
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION_EXT returning error %d for seq num %d\n",
iv_last_error, iv_transid.get_seq_num()), 1);
return iv_last_error;
}
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION_EXT called with active tx\n"), 3);
RESUMETRANSACTION(0);
}
// no joins or enlists
if (iv_server)
{
iv_last_error = FEOK;
iv_tag = gv_tmlib.new_tag();
gp_trans_thr->add_trans(this);
TMlibTrace(("TMLIB_TRACE : TM_Transaction::join, server EXIT\n"), 2);
return iv_last_error;
}
// do a join
tmlib_init_req_hdr(TM_MSG_TYPE_JOINTRANSACTION, &lv_req);
iv_transid.set_external_data_type(&lv_req.u.iv_join_trans.iv_transid);
lv_req.u.iv_join_trans.iv_coord_role = pv_coordinator_role;
lv_req.u.iv_join_trans.iv_pid = gv_tmlib.iv_my_pid;
lv_req.u.iv_join_trans.iv_nid = gv_tmlib.iv_my_nid;
iv_last_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, iv_transid.get_node());
if (iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : JOINTRANSACTION_EXT returning error %d for seq num %d\n",
iv_last_error, iv_transid.get_seq_num()), 1);
return iv_last_error;
}
iv_last_error = lv_rsp.iv_msg_hdr.miv_err.error;
if (iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : Join returning error %d for txn seq num %d\n",
lv_rsp.iv_msg_hdr.miv_err.error, iv_transid.get_seq_num()), 1);
return iv_last_error;
}
iv_coordinator = pv_coordinator_role;
iv_tag = gv_tmlib.new_tag();
gp_trans_thr->add_trans(this);
if (!gv_tmlib.is_enlisted (iv_transid.get_node()))
{
TMlibTrace(("TMLIB_TRACE : TM_transaction::join, adding enlist for node %d\n", iv_transid.get_node()), 3);
TM_Transid_Type lv_null_transid;
lv_null_transid.id[0] = lv_null_transid.id[1] = lv_null_transid.id[2] =
lv_null_transid.id[3] = 0;
msg_mon_trans_enlist (iv_transid.get_node(), lv_rsp.u.iv_join_trans.iv_tm_pid,
lv_null_transid);
gv_tmlib.enlist(iv_transid.get_node());
}
iv_last_error = lv_rsp.iv_msg_hdr.miv_err.error;
TMlibTrace(("TMLIB_TRACE : TM_transaction::join (seq num %d) EXIT\n", iv_transid.get_seq_num()), 2);
return iv_last_error;
}
// --------------------------------------------------------------------------
// TM_Transaction::suspend
// -- suspend the transaction, will remove reference to it in the library
// --------------------------------------------------------------------------
short TM_Transaction::suspend(TM_Transid *pp_transid, bool pv_coordinator_role)
{
char la_buf[DTM_STRING_BUF_SIZE];
TM_Transid lp_ext_transid;
short lv_error = FEOK;
TMlibTrace(("TMLIB_TRACE : TM_Transaction::suspend ENTRY \n"), 2);
iv_last_error = tmlib_check_miss_param(pp_transid);
if (iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::suspend returning error %d for seq num %d \n",
iv_last_error, iv_transid.get_seq_num()), 1);
return iv_last_error;
}
iv_last_error = tmlib_check_active_tx();
if (iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::suspend returning error %d for seq num %d \n",
iv_last_error, iv_transid.get_seq_num()), 1);
return iv_last_error;
}
iv_last_error = tmlib_check_outstanding_ios();
if (iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::suspend returning error %d for seq num %d (IGNORED)\n",
iv_last_error, iv_transid.get_seq_num()), 1);
iv_last_error = FEOK; //12/1/10 Temp change to ignore error 81.
//return iv_last_error;
}
if (!gv_tmlib_initialized)
{
gv_tmlib.initialize();
}
// they did not join and hence cannot suspend
if (gp_trans_thr->get_current_propagated() == true)
{
iv_last_error = FETXSUSPENDREJECTED;
TMlibTrace(("TMLIB_TRACE : TM_Transaction::suspend returning error %d for seq num %d \n",
iv_last_error, iv_transid.get_seq_num()), 1);
return iv_last_error;
}
memcpy (pp_transid, iv_transid.get_data_address(), sizeof (TM_Transid_Type));
if (pv_coordinator_role == false)
{
if (!isEnder())
{
sprintf(la_buf, "Transaction with coordinator role of false is marked as the coordinator.\n");
tm_log_write(DTM_TRANS_ENDER, SQ_LOG_CRIT, la_buf);
TMlibTrace(("TMLIB_TRACE : TM_Transaction::suspend : %s, seq num %d\n",
la_buf, iv_transid.get_seq_num()), 1);
abort();
}
iv_coordinator = pv_coordinator_role; // TODO - errors in suspend
}
if (isEnder())
{
gp_trans_thr->set_current_suspended(true);
gv_tmlib.clear_entry (iv_transid, false, true);
}
else
{
lv_error = tmlib_send_suspend(iv_transid, pv_coordinator_role,gv_tmlib.iv_my_pid);
if (gp_trans_thr->get_current_ios() == 0)
{
gv_tmlib.clear_entry (iv_transid, false, true);
gp_trans_thr->remove_trans (this);
}
else
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::suspend : seq num %d not removing txn "
"because there are still %d outstanding ios.\n",
iv_transid.get_seq_num(), gp_trans_thr->get_current_ios()), 3);
}
iv_last_error = lv_error;
}
TMlibTrace(("TMLIB_TRACE : TM_Transaction::suspend (seq num %d) returning %dEXIT\n",
iv_transid.get_seq_num(), iv_last_error), 2);
return iv_last_error;
}
// --------------------------------------------------------------------------
// TM_Transaction::resume
// -- resume previosly joined or begun transaction
// --------------------------------------------------------------------------
short TM_Transaction::resume()
{
char la_buf[DTM_STRING_BUF_SIZE];
TMlibTrace(("TMLIB_TRACE : TM_Transaction::resume ENTRY\n"), 2);
TM_Transaction *lp_trans = gp_trans_thr->get_trans (iv_transid.get_native_type());
if (lp_trans == NULL)
{
sprintf(la_buf, "Unable to find transaction to resume.\n");
tm_log_write(DTM_TRANS_BAD_TRANS_SEQ, SQ_LOG_CRIT, la_buf);
TMlibTrace(("TMLIB_TRACE : TM_Transaction::resume - %s \n", la_buf), 1);
abort();
}
gp_trans_thr->set_current (lp_trans);
iv_last_error = FEOK;
TMlibTrace(("TMLIB_TRACE : TM_Transaction::resume EXIT\n"), 2);
return iv_last_error;
}
// --------------------------------------------------------------------------
// TM_Transaction::release
// -- release transaction (wont delete, it will just not be current anymore)
// --------------------------------------------------------------------------
TM_Transaction* TM_Transaction::release()
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::release ENTRY\n"), 2);
gp_trans_thr->set_current (NULL);
iv_last_error = FEOK;
TMlibTrace(("TMLIB_TRACE : TM_Transaction::release EXIT\n"), 2);
return this;
}
// --------------------------------------------------------------------------
// TM_Transaction::status
// -- return status of transaction
// --------------------------------------------------------------------------
short TM_Transaction::status(short *pp_status)
{
TM_Transaction *lp_trans = NULL;
Tm_Req_Msg_Type lv_req;
Tm_Rsp_Msg_Type lv_rsp;
TMlibTrace(("TMLIB_TRACE : TM_Transaction::status ENTRY\n"), 2);
iv_last_error = tmlib_check_miss_param (pp_status);
if (iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::status returning error %d \n",
iv_last_error), 1);
return iv_last_error;
}
iv_last_error = tmlib_check_active_tx();
if (iv_last_error)
{
TMlibTrace(("TMLIB_TRACE : TM_Transaction::status returning error %d \n",
iv_last_error), 1);
return iv_last_error;
}
if (!gv_tmlib_initialized)
{
gv_tmlib.initialize();
}
// if the use passed in a transid, we must use that and make sure
// we go to the correct DTM
lp_trans = gp_trans_thr->get_trans (iv_transid.get_native_type());
if (lp_trans == NULL)
{
iv_last_error = FEINVTRANSID;
TMlibTrace(("TMLIB_TRACE : TM_Transaction::status returning error %d\n", iv_last_error), 1);
return iv_last_error;
}
tmlib_init_req_hdr(TM_MSG_TYPE_STATUSTRANSACTION, &lv_req);
iv_transid.set_external_data_type(&lv_req.u.iv_status_trans.iv_transid);
iv_last_error = gv_tmlib.send_tm(&lv_req, &lv_rsp, iv_transid.get_node());
if (iv_last_error)
return iv_last_error;
*pp_status = lv_rsp.u.iv_status_trans.iv_status;
iv_last_error = lv_rsp.iv_msg_hdr.miv_err.error;
TMlibTrace(("TMLIB_TRACE : TM_Transaction::status EXIT\n"), 2);
return iv_last_error;
}
// --------------------------------------------------------------------------
// TM_Transaction::getTransid
// -- Return transid
// --------------------------------------------------------------------------
TM_Transid * TM_Transaction::getTransid()
{
TMlibTrace(("TMLIB_TRACE : GETTRANSID_EXT ENTRY/EXIT\n"), 2);
iv_last_error = FEOK;
return &iv_transid;
}
// --------------------------------------------------------------------------
// TM_Transaction::getCurrent
// Return a pointer to the current transaction object (not specifically this
// transaction!).
// --------------------------------------------------------------------------
TM_Transaction * TM_Transaction::getCurrent()
{
return gp_trans_thr->get_current();
}
// --------------------------------------------------------------------------
// TM_Transaction::getTypeFlags
// Returns the transaction type flags for this transaction.
// --------------------------------------------------------------------------
int64 TM_Transaction::getTypeFlags()
{
return iv_transid.get_type_flags();
}