blob: 23743316ff0ac166144a2103432d6a4cae2868ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <windows.h>
#include <msclr\lock.h>
#include <transact.h>
#include <xolehlp.h>
#include <txdtc.h>
#include <oletx2xa.h>
#include <iostream>
#include <fstream>
#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/Connection.h"
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/Xid.h"
#include "QpidException.h"
#include "AmqpConnection.h"
#include "AmqpSession.h"
#include "DtxResourceManager.h"
#include "XaTransaction.h"
namespace Apache {
namespace Qpid {
namespace Interop {
using namespace System;
using namespace System::Runtime::InteropServices;
using namespace System::Transactions;
using namespace msclr;
using namespace qpid::framing::dtx;
// ------------------------------------------------------------------------
// Start of a pure native code section
#pragma unmanaged
// ------------------------------------------------------------------------
// This is the native COM object the DTC expects to talk to for coordination.
// There is exactly one native instance of this for each managed XaTransaction object.
class DtcCallbackHandler : public ITransactionResourceAsync
{
private:
long useCount;
DtcCallbackFp managedCallback;
public:
ITransactionEnlistmentAsync *txHandle;
DtcCallbackHandler(DtcCallbackFp cbp) : managedCallback(cbp), useCount(0) {}
~DtcCallbackHandler() {}
virtual HRESULT __stdcall PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase);
virtual HRESULT __stdcall CommitRequest(DWORD grfrm, XACTUOW *unused);
virtual HRESULT __stdcall AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3);
virtual HRESULT __stdcall TMDown();
virtual HRESULT __stdcall DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject);
virtual ULONG __stdcall DtcCallbackHandler::AddRef();
virtual ULONG __stdcall DtcCallbackHandler::Release();
void __stdcall AbortRequestDone();
};
HRESULT DtcCallbackHandler::PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase)
{
if (singlePhase) {
return managedCallback(DTC_SINGLE_PHASE) ? S_OK : E_FAIL;
}
return managedCallback(DTC_PREPARE) ? S_OK : E_FAIL;
}
HRESULT DtcCallbackHandler::CommitRequest(DWORD grfrm, XACTUOW *unused)
{
return managedCallback(DTC_COMMIT) ? S_OK : E_FAIL;
}
HRESULT DtcCallbackHandler::AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3)
{
return managedCallback(DTC_ABORT) ? S_OK : E_FAIL;
}
HRESULT DtcCallbackHandler::TMDown()
{
return managedCallback(DTC_TMDOWN) ? S_OK : E_FAIL;
}
HRESULT DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject)
{
*ppvObject = NULL;
if ((riid == IID_IUnknown) || (riid == IID_IResourceManagerSink))
*ppvObject = this;
else
return ResultFromScode(E_NOINTERFACE);
this->AddRef();
return S_OK;
}
ULONG DtcCallbackHandler::AddRef()
{
return InterlockedIncrement(&useCount);
}
ULONG DtcCallbackHandler::Release()
{
long uc = InterlockedDecrement(&useCount);
if (uc)
return uc;
delete this;
return 0;
}
// ------------------------------------------------------------------------
// End of pure native code section
#pragma managed
// ------------------------------------------------------------------------
#ifdef QPID_RECOVERY_TEST_HOOK
void XaTransaction::ForceRecovery() {
debugFailMode = true;
}
#endif
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
XaTransaction::XaTransaction(Transaction^ t, IDtcToXaHelperSinglePipe *xaHelperp, DWORD rmCookie, DtxResourceManager^ rm) {
bool success = false;
xidp = NULL;
commandCompletionp = NULL;
firstDtxStartCompletionp = NULL;
nativeHandler = NULL;
resourceManager = rm;
controlSession = rm->DtxControlSession;
active = true;
preparing = false;
systemTransaction = t;
IntPtr comTxp = IntPtr::Zero;
completionHandle = gcnew ManualResetEvent(false);
try {
enlistedSessions = gcnew Collections::Generic::List<AmqpSession^>();
// take a System.Transactions.Transaction and obtain
// the corresponding DTC COM object.
IDtcTransaction^ dtcTransaction = TransactionInterop::GetDtcTransaction(t);
comTxp = Marshal::GetIUnknownForObject(dtcTransaction);
XID winXid;
HRESULT hr = xaHelperp->ConvertTridToXID((DWORD *)comTxp.ToPointer(), rmCookie, &winXid);
if (hr != S_OK)
throw gcnew QpidException("get XA XID");
// Convert the X/Open format to the internal Qpid format
xidp = new qpid::framing::Xid();
xidp->setFormat((uint32_t) winXid.formatID);
int bqualPos = 0;
if (winXid.gtrid_length > 0) {
xidp->setGlobalId(std::string(winXid.data, winXid.gtrid_length));
bqualPos = winXid.gtrid_length;
}
if (winXid.bqual_length > 0) {
xidp->setBranchId(std::string(winXid.data + bqualPos, winXid.bqual_length));
}
// create the callback chain: DTC proxy -> DtcCallbackHandler -> this
inboundDelegate = gcnew DtcCallbackDelegate(this, &XaTransaction::DtcCallback);
IntPtr ip = Marshal::GetFunctionPointerForDelegate(inboundDelegate);
nativeHandler = new DtcCallbackHandler(static_cast<DtcCallbackFp>(ip.ToPointer()));
// add myself for later smart pointer destruction
nativeHandler->AddRef();
hr = xaHelperp->EnlistWithRM(rmCookie, (ITransaction *)comTxp.ToPointer(), nativeHandler, &(nativeHandler->txHandle));
if (hr != S_OK)
throw gcnew QpidException("Enlist");
success = true;
}
finally {
if (!success)
Cleanup();
if (comTxp != IntPtr::Zero)
((IUnknown *) comTxp.ToPointer())->Release();
}
}
void XaTransaction::Cleanup() {
if (firstDtxStartCompletionp != NULL) {
try {
firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp);
}
catch (...) {
// TODO: log it?
}
firstDtxStartCompletionp = NULL;
}
if (nativeHandler != NULL) {
nativeHandler->Release();
nativeHandler = NULL;
}
if (xidp != NULL) {
delete xidp;
xidp = NULL;
}
}
XaTransaction^ XaTransaction::Enlist (AmqpSession ^session) {
lock l(enlistedSessions);
if (!active)
throw gcnew QpidException("transaction enlistment internal error");
if (!enlistedSessions->Contains(session)) {
enlistedSessions->Add(session);
if (firstEnlistedSession == nullptr) {
firstEnlistedSession = session;
IntPtr intptr = session->DtxStart((IntPtr) xidp, false, false);
firstDtxStartCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
}
else {
// the broker must see the dtxStart as a join operation, and it must arrive
// at the broker after the first dtx start
if (firstDtxStartCompletionp != NULL)
firstDtxStartCompletionp->wait();
session->DtxStart((IntPtr) xidp, true, false);
}
}
else {
// already started once, so resume is true
session->DtxStart((IntPtr) xidp, false, true);
}
return this;
}
void XaTransaction::SessionClosing(AmqpSession^ session) {
lock l(enlistedSessions);
if (!enlistedSessions->Contains(session))
return;
enlistedSessions->Remove(session);
if (!active) {
// Phase0Flush already done on all sessions
l.release();
return;
}
IntPtr completion = session->BeginPhase0Flush(this);
session->EndPhase0Flush(this, completion);
if (session == firstEnlistedSession) {
// if we just completed the dtxEnd, we know the dtxStart completed before that
if (firstDtxStartCompletionp != NULL) {
firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp);
firstDtxStartCompletionp = NULL;
}
}
}
void XaTransaction::Phase0Flush() {
// let each session delimit their transactional work with an AMQP dtx.end protocol frame
lock l(enlistedSessions);
if (!active)
return;
active = false; // no more enlistments
int scount = enlistedSessions->Count;
if (scount > 0) {
array<IntPtr> ^completions = gcnew array<IntPtr>(scount);
for (int i = 0; i < scount; i++) {
// TODO: skip phase0 flush for rollback case
completions[i] = enlistedSessions[i]->BeginPhase0Flush(this);
}
for (int i = 0; i < scount; i++) {
// without each session.sync(), session commands are queued up in the right order,
// but on their separate outbound channels, and destined for receipt at separate Broker inbound
// channels. It is not clear how to be sure Phase 0 dtx.End is processed in the
// correct order before commit on the broker without the sync.
enlistedSessions[i]->EndPhase0Flush(this, completions[i]);
}
}
// since all dtxEnds have completed, we know all starts have too
if (firstDtxStartCompletionp != NULL) {
try {
firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp);
}
catch (...) {
// TODO: log it?
}
firstDtxStartCompletionp = NULL;
}
}
bool XaTransaction::DtcCallback (DtcCallbackType callback) {
// called by the DTC proxy thread. Be brief and don't block (but Phase0Flush?)
if (AppDomain::CurrentDomain->IsFinalizingForUnload())
return false;
IntPtr intptr = IntPtr::Zero;
currentCommand = callback;
try {
switch (callback) {
case DTC_PREPARE:
Phase0Flush();
try {
intptr = controlSession->DtxPrepare((IntPtr) xidp);
preparing = true;
resourceManager->IncrementDoubt();
}
catch (System::Exception^ ) {
// intptr remains nullptr
}
commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
break;
case DTC_COMMIT:
#ifdef QPID_RECOVERY_TEST_HOOK
if (debugFailMode){ return; }
#endif
// no phase 0 required. always preceded by a prepare
try {
intptr = controlSession->DtxCommit((IntPtr) xidp, false);
}
catch (System::Exception^ ) {
// intptr remains nullptr
}
commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
break;
case DTC_ABORT:
Phase0Flush();
#ifdef QPID_RECOVERY_TEST_HOOK
if (debugFailMode){ return; }
#endif
try {
intptr = controlSession->DtxRollback((IntPtr) xidp);
}
catch (System::Exception^ ) {
// intptr remains nullptr
}
commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
break;
case DTC_SINGLE_PHASE:
Phase0Flush();
try {
intptr = controlSession->DtxCommit((IntPtr) xidp, true);
}
catch (System::Exception^ ) {
// intptr remains nullptr
}
commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
break;
case DTC_TMDOWN:
commandCompletionp = NULL;
ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
break;
}
return true;
}
catch (System::Exception^ e) {
// TODO: log it
Console::WriteLine("Unexpected DtcCallback exception: {0}", e->ToString());
}
catch (...) {
// TODO: log it
}
return false;
}
// this handles the case where the application regains control for
// a new transaction before we are notified (abort/rollback
// optimization in DTC).
void XaTransaction::NotifyPhase0() {
if (active)
Phase0Flush();
}
void XaTransaction::AsyncCompleter(Object ^unused) {
bool success = false;
if (commandCompletionp != NULL) {
try {
// waits for the AMQP broker's response and returns the decoded content
XaResult& xaResult = commandCompletionp->get();
if (xaResult.hasStatus()) {
if (xaResult.getStatus() == XaStatus::XA_STATUS_XA_OK) {
success = true;
}
}
}
catch (...) {
// TODO: log it?
}
try {
controlSession->ReleaseCompletion((IntPtr) commandCompletionp);
}
catch (...) {
// TODO: log it?
}
commandCompletionp = NULL;
}
ITransactionEnlistmentAsync *dtcTxHandle = nativeHandler->txHandle;
HRESULT hr = success ? S_OK : E_FAIL;
switch (currentCommand) {
case DTC_PREPARE:
dtcTxHandle->PrepareRequestDone(hr, NULL, NULL);
break;
case DTC_COMMIT:
dtcTxHandle->CommitRequestDone(hr);
if (success)
resourceManager->DecrementDoubt();
Complete();
break;
case DTC_ABORT:
dtcTxHandle->AbortRequestDone(hr);
if (success) {
if (preparing) {
preparing = false;
resourceManager->DecrementDoubt();
}
}
Complete();
break;
case DTC_SINGLE_PHASE:
if (success)
hr = XACT_S_SINGLEPHASE;
dtcTxHandle->PrepareRequestDone(hr, NULL, NULL);
Complete();
break;
case DTC_TMDOWN:
// Stop the RM from accepting new enlistments
resourceManager->TmDown();
Complete();
break;
}
}
void XaTransaction::Complete() {
Cleanup();
resourceManager->Complete(systemTransaction);
completionHandle->Set();
}
void XaTransaction::WaitForCompletion() {
completionHandle->WaitOne();
}
/*
void XaTransaction::WaitForFlush() {
isFlushedHandle->WaitOne();
}
*/
// called from DtxResourceManager Finalize
void XaTransaction::ChildFinalize() {
lock l(enlistedSessions);
Phase0Flush();
Cleanup();
}
}}} // namespace Apache::Qpid::Interop