blob: 233fdce3504af31a9109c3213b492af4289986ae [file] [log] [blame]
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <sys/types.h>
#include "seabed/ms.h"
#include "seabed/trace.h"
#include "tmlogging.h"
#include "tmtx.h"
#include "tmrmhbase.h"
#include "tminfo.h"
#include "hbasetm.h"
#include "tmtxbranches.h"
// ----------------------------------------------------------------
// CTmTxBranches::num_rm_failed
// Purpose : Count and return the number of RMs in failed
// state. Currently returning 0.
// Periodically the TM will attempt to reopen any failed
// RMs. This is done by the Timer thread.
// ----------------------------------------------------------------
int32 CTmTxBranches::num_rm_failed(CTmTxBase *pp_txn)
{
int32 lv_count = 0;
TMTrace (2, ("CTmTxBranches::num_rm_failed ENTRY.\n"));
lv_count = iv_TSEBranches.num_rm_failed(pp_txn);
lv_count += iv_HBASEBranches.num_rm_failed(pp_txn);
TMTrace (2, ("CTmTxBranches::num_rm_failed EXIT Returned %d failed RMs.\n", lv_count));
return lv_count;
} //CTmTxBranches::num_rm_failed
// ----------------------------------------------------------------
// CTmTxBranches::num_rm_partic
// Purpose : Returns the number of participating branches.
// ----------------------------------------------------------------
int32 CTmTxBranches::num_rm_partic(CTmTxBase *pp_txn)
{
int32 lv_count = 0;
lv_count = iv_TSEBranches.num_rm_partic(pp_txn);
lv_count += iv_HBASEBranches.num_rm_partic(pp_txn);
TMTrace (2, ("CTmTxBranches::num_rm_partic EXIT Returned %d participating RMs.\n", lv_count));
return lv_count;
} //CTmTxBranches::num_rm_partic
// ----------------------------------------------------------------
// CTmTxBranches::num_rms_unresolved
// Purpose : Returns the number of RMs/branches which have the
// iv_partic flag set but don't have the iv_resolved flag set.
// This is used to identify late TSE checkins (ax_reg requests)
// while processing phase 1 & 2.
// ----------------------------------------------------------------
int32 CTmTxBranches::num_rms_unresolved(CTmTxBase *pp_txn)
{
int32 lv_count = 0;
lv_count = iv_TSEBranches.num_rms_unresolved(pp_txn);
lv_count += iv_HBASEBranches.num_rms_unresolved(pp_txn);
TMTrace (2, ("CTmTxBranches::num_rms_unresolved EXIT Returned %d unresolved RMs.\n", lv_count));
return lv_count;
} //CTmTxBranches::num_rms_unresolved
// ----------------------------------------------------------------
// CTmTxBranches::reset_resolved
// Purpose : Reset all branches resolved flags. This is used to
// reset the flags after prepare phase to all commit_branches() to
// find any ax_reg requests which arrive during phase 2.
// ----------------------------------------------------------------
void CTmTxBranches::reset_resolved(CTmTxBase *pp_txn)
{
iv_TSEBranches.reset_resolved(pp_txn);
iv_HBASEBranches.reset_resolved(pp_txn);
TMTrace (2, ("CTmTxBranches::reset_resolved EXIT.\n"));
} //CTmTxBranches::reset_resolved
// ----------------------------------------------------------------
// CTmTxBranches::init_rms
// Purpose : Initialize branches
// ----------------------------------------------------------------
void CTmTxBranches::init_rms(CTmTxBase *pp_txn, bool pv_partic)
{
TMTrace (2, ("CTmTxBranches::init_rms ENTRY Txn ID (%d,%d), partic? %d.\n",
pp_txn->node(), pp_txn->seqnum(), pv_partic));
iv_TSEBranches.init_rms(pp_txn, pv_partic);
iv_HBASEBranches.init_rms(pp_txn, pv_partic);
TMTrace (2, ("CTmTxBranches::init_rms EXIT.\n"));
} //CTmTxBranches::init_rms
// --------------------------------------------------------------
// Branch stuff below
// --------------------------------------------------------------
// ---------------------------------------------------------------------------
// rollback_branches
// Purpose : Pass the rollback request to HBase TM Library
// ---------------------------------------------------------------------------
int32 CTmTxBranches::rollback_branches (CTmTxBase *pp_txn,
int64 pv_flags,
CTmTxMessage * pp_msg,
bool pv_error_condition)
{
int32 lv_err = FEOK, lv_err1 = RET_OK;
TMTrace (2, ("CTmTxBranches::rollback_branches, Txn ID (%d,%d), ENTRY, flags " PFLL "\n",
pp_txn->node(), pp_txn->seqnum(), pv_flags));
iv_HBASEBranches.rollback_branches(pp_txn, pv_flags, pp_msg, pv_error_condition);
iv_TSEBranches.rollback_branches(pp_txn, pv_flags, pp_msg, pv_error_condition);
// For situations where the only branches hare in HBase, we can reply to the application which called abort now and
// then wait for the aborts to complete. If we had other branches, then we may have already replied!
if (pp_msg->replyPending())
pp_msg->reply();
// Now wait for the HBase library to complete it's work.
lv_err1 = completeRequest_branches(pp_txn);
lv_err = (lv_err1!=RET_OK)?XAER_RMERR:XA_OK;
TMTrace (2, ("CTmTxBranches::rollback_branches, Txn ID (%d,%d), EXIT, "
"UnResolved branches %d, return error %d, HBase client error %d.\n",
pp_txn->node(), pp_txn->seqnum(), num_rms_unresolved(pp_txn), lv_err, lv_err1));
return lv_err;
} //rollback_branches
// ---------------------------------------------------------------------------
// commit_branches
// Purpose : Send out commit (phase 2) to HBase TM Library.
// ---------------------------------------------------------------------------
int32 CTmTxBranches::commit_branches (CTmTxBase *pp_txn,
int64 pv_flags, CTmTxMessage * pp_msg)
{
int32 lv_err = XA_OK, lv_err1 = RET_OK, lv_err2 = XA_OK;
TMTrace (2, ("CTmTxBranches::commit_branches, Txn ID (%d,%d), ENTRY, flags " PFLL "\n",
pp_txn->node(), pp_txn->seqnum(), pv_flags));
lv_err1 = iv_HBASEBranches.commit_branches(pp_txn, pv_flags, pp_msg);
lv_err2 = iv_TSEBranches.commit_branches(pp_txn, pv_flags, pp_msg);
lv_err = (lv_err1!=RET_OK)?XAER_RMERR:((lv_err2!=XA_OK)?lv_err2:XA_OK);
TMTrace (2, ("CTmTxBranches::commit_branches, Txn ID (%d,%d), EXIT, HBase branch returned %d, "
"TSE Branch returned %d, Composite result %d. UnResolved branches %d.\n",
pp_txn->node(), pp_txn->seqnum(), lv_err1, lv_err2, lv_err, num_rms_unresolved(pp_txn)));
return lv_err;
} // commit_branches
// ---------------------------------------------------------------------------
// completeRequest_branches
// Purpose : Wait for Phase 2 to complete in the HBase TM Library.
// This isn't required for other branch types at this time because
// they reply early directly within the branch code.
// ---------------------------------------------------------------------------
int32 CTmTxBranches::completeRequest_branches (CTmTxBase *pp_txn)
{
int32 lv_err = XA_OK, lv_err1 = RET_OK;
TMTrace (2, ("CTmTxBranches::completeRequest_branches, Txn ID (%d,%d), ENTRY\n",
pp_txn->node(), pp_txn->seqnum()));
lv_err1 = iv_HBASEBranches.completeRequest_branches(pp_txn);
lv_err = (lv_err1!=RET_OK)?XAER_RMERR:XA_OK;
TMTrace (2, ("CTmTxBranches::completeRequest_branches, Txn ID (%d,%d), EXIT, HBase branch returned %d.\n",
pp_txn->node(), pp_txn->seqnum(), lv_err1));
return lv_err;
} // completeRequest_branches
// ---------------------------------------------------------------------------
// end_branches
// Purpose - Doesn't really do anything for HBase TM Library
// ---------------------------------------------------------------------------
int32 CTmTxBranches::end_branches (CTmTxBase *pp_txn,
int64 pv_flags)
{
int32 lv_err = FEOK;
TMTrace (2, ("CTmTxBranches::end_branches, Txn ID (%d,%d), ENTRY, flags " PFLL "\n",
pp_txn->node(), pp_txn->seqnum(), pv_flags));
iv_HBASEBranches.end_branches(pp_txn, pv_flags);
lv_err = iv_TSEBranches.end_branches(pp_txn, pv_flags);
TMTrace (2, ("CTmTxBranches::end_branches, Txn ID (%d,%d), EXIT, UnResolved branches %d.\n",
pp_txn->node(), pp_txn->seqnum(), num_rms_unresolved(pp_txn)));
return lv_err;
} //end_branches
// ---------------------------------------------------------------
// forget_heur_branches
// Purpose : Heuristic forget
// --------------------------------------------------------------
int32 CTmTxBranches::forget_heur_branches (CTmTxBase *pp_txn,
int64 pv_flags)
{
int32 lv_error = FEOK;
TMTrace (2, ("CTmTxBranches::forget_heur_branches ENTRY : ID (%d,%d), flags " PFLL "\n",
pp_txn->node(), pp_txn->seqnum(), pv_flags));
tm_log_event(DTM_TMTX_FORGET_HEURISTIC, SQ_LOG_WARNING, "DTM_TMTX_FORGET_HEURISTIC",
-1,-1,pp_txn->node(), pp_txn->seqnum());
gv_tm_info.write_trans_state (pp_txn->transid(), TM_TX_STATE_FORGOTTEN_HEUR, pp_txn->abort_flags(), false);
pp_txn->wrote_trans_state(true);
if (pp_txn->tx_state() != TM_TX_STATE_FORGOTTEN_HEUR)
{
lv_error = forget_branches (pp_txn, pv_flags);
switch (lv_error)
{
case XA_OK:
pp_txn->tx_state(TM_TX_STATE_FORGOTTEN_HEUR);
break;
case XAER_RMFAIL:
//TODO recovery case, what to do here?
abort();
default:
tm_log_event(DTM_TMTX_INVALID_BRANCH, SQ_LOG_CRIT, "DTM_TMTX_INVALID_BRANCH");
TMTrace (1, ("CTmTxBranches::forget_heur_branches - Invalid branch state\n"));
abort ();
break;
}
}
return lv_error;
} // forget_heur_branches
// ---------------------------------------------------------------
// forget_branches
// Purpose : all RMs have responded, so forget this tx.
// Not passed to HBase TM Library.
// --------------------------------------------------------------
int32 CTmTxBranches::forget_branches (CTmTxBase *pp_txn,
int64 pv_flags)
{
int32 lv_err = FEOK;
TMTrace (2, ("CTmTxBranches::forget_branches, Txn ID (%d,%d), ENTRY, flags " PFLL "\n",
pp_txn->node(), pp_txn->seqnum(), pv_flags));
iv_TSEBranches.forget_branches(pp_txn, pv_flags);
iv_HBASEBranches.forget_branches(pp_txn, pv_flags);
TMTrace (2, ("CTmTxBranches::forget_branches, Txn ID (%d,%d), EXIT, UnResolved branches %d.\n",
pp_txn->node(), pp_txn->seqnum(), num_rms_unresolved(pp_txn)));
return lv_err;
} //forget_branches
// ------------------------------------------------------------
// prepare_branches
// Purpose : Send prepare to HBase TM Library
// Only return read-only if all subordinate branch
// types return read-only. Other than that, we
// always return an error response from non-HBase
// branches to maintain compatibility with
// older code.
// HBase branches return
// Ok ReadOnly Error1
// +----------------------------------
// RM Ok | Ok Ok Error1
// branches RO | Ok RO Error1
// return Error2 | Error2 Error2 Error2
// ------------------------------------------------------------
int32 CTmTxBranches::prepare_branches (CTmTxBase *pp_txn,
int64 pv_flags,
CTmTxMessage *pp_msg)
{
int32 lv_err = XA_OK, lv_err1 = RET_OK, lv_err2 = XA_OK;
TMTrace (2, ("CTmTxBranches::prepare_branches, Txn ID (%d,%d), ENTRY, flags " PFLL "\n",
pp_txn->node(), pp_txn->seqnum(), pv_flags));
lv_err1 = iv_HBASEBranches.prepare_branches(pp_txn, pv_flags, pp_msg);
lv_err2 = iv_TSEBranches.prepare_branches(pp_txn, pv_flags, pp_msg);
// The transaction is only read-only if both branch types return read-only.
if (lv_err1 == RET_READONLY &&
(lv_err2 == XA_RDONLY || lv_err2 == XAER_NOTA))
// Right now we return XA_OK here and the transaction state
// machine works out whether there are any participants left
// in the transaction and decides whether the transaction
// is read-only on that basis.
// lv_err = XA_RDONLY;
lv_err = XA_OK;
else
{
// If either of the branch types returned read-only then clear because both
// must be read-only.
if (lv_err1 == RET_READONLY)
lv_err1 = RET_OK;
if (lv_err2 == XA_RDONLY || lv_err2 == XAER_NOTA)
lv_err2 = XA_OK;
//lv_err = (lv_err1!=RET_OK)?XAER_RMERR:((lv_err2!=XA_OK)?lv_err2:XA_OK);
lv_err = (lv_err2!=RET_OK)?XAER_RMERR:((lv_err1!=XA_OK)?lv_err1:XA_OK);
}
// If an error occurred then make sure we drive rollback
if (lv_err != XA_OK && lv_err != XA_RDONLY)
pp_txn->mark_for_rollback(true);
TMTrace (2, ("CTmTxBranches::prepare_branches, Txn ID (%d,%d), EXIT, HBase branch returned %d, "
"TSE Branch returned %d. Result %d, UnResolved branches %d.\n",
pp_txn->node(), pp_txn->seqnum(), lv_err1, lv_err2, lv_err, num_rms_unresolved(pp_txn)));
return lv_err;
} //prepare_branches
//------------------------------------------------------------------------------
// start_branches
// Purpose - Handles start request for all branches
// Library.
//------------------------------------------------------------------------------
int32 CTmTxBranches::start_branches (CTmTxBase *pp_txn, int64 pv_flags, CTmTxMessage * pp_msg)
{
int32 lv_err = FEOK;
TMTrace (2, ("CTmTxBranches::start_branches, Txn ID (%d,%d), ENTRY, flags " PFLL "\n",
pp_txn->node(), pp_txn->seqnum(), pv_flags));
iv_TSEBranches.start_branches(pp_txn, pv_flags, pp_msg);
iv_HBASEBranches.start_branches(pp_txn, pv_flags, pp_msg);
TMTrace (2, ("CTmTxBranches::start_branches, Txn ID (%d,%d), EXIT, UnResolved branches %d.\n",
pp_txn->node(), pp_txn->seqnum(), num_rms_unresolved(pp_txn)));
return lv_err;
} // start_branches
//------------------------------------------------------------------------------
// registerRegion
// Purpose - Handles start request for all branches
// Library.
//------------------------------------------------------------------------------
int32 CTmTxBranches::registerRegion (CTmTxBase *pp_txn, int64 pv_flags, CTmTxMessage * pp_msg)
{
int32 lv_err = FEOK;
TMTrace (2, ("CTmTxBranches::registerRegion, Txn ID (%d,%d), ENTRY, flags " PFLL "\n",
pp_txn->node(), pp_txn->seqnum(), pv_flags));
//iv_TSEBranches.registerRegion(pp_txn, pv_flags, pp_msg);
lv_err = iv_HBASEBranches.registerRegion(pp_txn, pv_flags, pp_msg);
TMTrace (2, ("CTmTxBranches::registerRegion, Txn ID (%d,%d), EXIT, UnResolved branches %d.\n",
pp_txn->node(), pp_txn->seqnum(), num_rms_unresolved(pp_txn)));
return lv_err;
} // registerRegion
//------------------------------------------------------------------------------
// ddlOperation
// Purpose - Handles ddlOperation requests
// -----------------------------------------------------------------------------
int32 CTmTxBranches::ddlOperation(CTmTxBase *pp_txn, int64 pv_flags, CTmTxMessage * pp_msg)
{
int32 lv_err = FEOK;
TMTrace (2, ("CTmTxBranches::ddlOperation, Txn ID (%d,%d), ENTRY, flags " PFLL "\n",
pp_txn->node(), pp_txn->seqnum(), pv_flags));
lv_err = iv_HBASEBranches.hb_ddl_operation(pp_txn, pv_flags, pp_msg);
TMTrace (2, ("CTmTxBranches::ddlOperation, Txn ID (%d,%d), EXIT, UnResolved branches %d.\n",
pp_txn->node(), pp_txn->seqnum(), num_rms_unresolved(pp_txn)));
return lv_err;
}
//------------------------------------------------------------------------------
// shutdown_branches
// Purpose - shutdown all branches.
//------------------------------------------------------------------------------
int32 CTmTxBranches::shutdown_branches (bool pv_leadTM, bool pv_clean)
{
int32 lv_error = FEOK, lv_err1 = RET_OK, lv_err2 = XA_OK;
TMTrace (2, ("CTmTxBranches::shutdown_branches, ENTRY, lead TM %d, clean? %d.\n",
pv_leadTM, pv_clean));
lv_err1 = TSE()->shutdown_branches(pv_leadTM, pv_clean);
lv_err2 = HBASE()->shutdown_branches(pv_leadTM, pv_clean);
lv_error = (lv_err1!=RET_OK)?lv_err1:((lv_err2!=XA_OK)?XAER_RMERR:FEOK);
TMTrace (2, ("CTmTxBranches::shutdown_branches EXIT with error %d.\n", lv_error));
return lv_error;
} // shutdown_branches