| /* | |
| * Licensed to the Apache Software Foundation (ASF) under one | |
| * or more contributor license agreements. See the NOTICE file | |
| * distributed with this work for additional information | |
| * regarding copyright ownership. The ASF licenses this file | |
| * to you under the Apache License, Version 2.0 (the | |
| * "License"); you may not use this file except in compliance | |
| * with the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, | |
| * software distributed under the License is distributed on an | |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
| * KIND, either express or implied. See the License for the | |
| * specific language governing permissions and limitations | |
| * under the License. | |
| */ | |
| #include <windows.h> | |
| #include <msclr\lock.h> | |
| #include <transact.h> | |
| #include <xolehlp.h> | |
| #include <txdtc.h> | |
| #include <oletx2xa.h> | |
| #include <iostream> | |
| #include <fstream> | |
| #include "qpid/client/AsyncSession.h" | |
| #include "qpid/client/SubscriptionManager.h" | |
| #include "qpid/client/Connection.h" | |
| #include "qpid/client/Message.h" | |
| #include "qpid/client/MessageListener.h" | |
| #include "qpid/framing/FrameSet.h" | |
| #include "AmqpConnection.h" | |
| #include "AmqpSession.h" | |
| #include "DtxResourceManager.h" | |
| #include "XaTransaction.h" | |
| #include "QpidException.h" | |
| #include "QpidMarshal.h" | |
| namespace Apache { | |
| namespace Qpid { | |
| namespace Interop { | |
| using namespace System; | |
| using namespace System::Runtime::InteropServices; | |
| using namespace System::Transactions; | |
| using namespace msclr; | |
| /* | |
| * There is one DtxResourceManager per broker and per application process. | |
| * | |
| * Each RM manages a collection of active XaTransaction objects. Participating AmqpSessions enlist | |
| * (or re-enlist) with an XaTransaction indexed by the corresponding System.Transaction object. The | |
| * RM maintains its own AmqpSession for sending 2PC commnds (dtxPrepare, dtxCommit etc.). The | |
| * XaTransaction object works through the lifecycle of the Transaction, including prompting the | |
| * enlisted sessions to send their delimiting dtxEnd commands. | |
| * | |
| * A separate DtcPlugin.cpp file provides the recovery logic when needed in a library named | |
| * qpidxarm.dll. The MSDTC maintans recovery info in its log and tracks when there may be | |
| * transactions in doubt. See the documentation for IDtcToXaHelperSinglePipe. | |
| * | |
| * To enable transaction support: | |
| * DTC requires a registry key to find the plugin | |
| * [HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\MSDTC\XADLL] qpidxarm.dll -> [path to qpidxarm.dll] | |
| * DTC needs to be configured for XA | |
| * cmdprompt -> dcomcnfg -> Component services -> My Computer -> DTC -> Local DTC -> right click properties -> Security -> Enable XA Transactions | |
| * | |
| */ | |
| // TODO: provide shutdown mechanism, perhaps callback from Connection Idle for enlisted connections. | |
| // But note that a new RM registration with the DTC is very expensive. | |
| DtxResourceManager::DtxResourceManager(AmqpConnection^ appConnection) { | |
| dtcComp = NULL; | |
| xaHelperp = NULL; | |
| rmCookie = 0; | |
| doubtCount = 0; | |
| tmDown = false; | |
| AmqpConnection^ clonedCon = appConnection->Clone(); | |
| dtxControlSession = clonedCon->CreateSession(); | |
| dataSourceName = clonedCon->DataSourceName; | |
| transactionMap = gcnew Collections::Generic::Dictionary<Transaction^, XaTransaction^>(); | |
| HRESULT hr; | |
| try { | |
| // instead of pinning this instance, just use tmp stack variables for small stuff | |
| IUnknown* tmp = NULL; | |
| // request the default DTC | |
| hr = DtcGetTransactionManager(NULL, NULL, IID_IUnknown, 0, 0, 0, (void **)&tmp); | |
| if (hr != S_OK) | |
| throw gcnew QpidException("connection failure to DTC service"); | |
| dtcComp = tmp; | |
| IDtcToXaHelperSinglePipe *tmp2 = NULL; | |
| hr = ((IUnknown *)dtcComp)->QueryInterface(IID_IDtcToXaHelperSinglePipe, (void**) &tmp2); | |
| if (hr != S_OK) | |
| throw gcnew QpidException("DTC XA unavailable"); | |
| xaHelperp = tmp2; | |
| std::string native_dsn = QpidMarshal::ToNative(dataSourceName); | |
| DWORD tmp3; | |
| // This call doesn't return until the DTC has opened and closed a connection to the broker | |
| // and written a recovery entry in its log. | |
| hr = ((IDtcToXaHelperSinglePipe *) xaHelperp)->XARMCreate(const_cast<char *>(native_dsn.c_str()), "qpidxarm.dll", &tmp3); | |
| if (hr != S_OK) { | |
| switch (hr) { | |
| case E_FAIL: | |
| throw gcnew QpidException("Resource Manager DLL configuration error"); | |
| case E_INVALIDARG: | |
| throw gcnew QpidException("Resource Manager internal error"); | |
| case E_OUTOFMEMORY: | |
| throw gcnew QpidException("Resource Manager out of memory"); | |
| case E_UNEXPECTED: | |
| throw gcnew QpidException("Resource Manager internal failure"); | |
| case XACT_E_TMNOTAVAILABLE: | |
| case XACT_E_CONNECTION_DOWN: | |
| throw gcnew QpidException("MSDTC unavailable"); | |
| default: | |
| throw gcnew QpidException("Resource Manager Registration failed"); | |
| } | |
| } | |
| rmCookie = tmp3; | |
| } | |
| finally { | |
| if (rmCookie == 0) { | |
| // undo partial construction | |
| Cleanup(); | |
| } | |
| } | |
| } | |
| DtxResourceManager::!DtxResourceManager() { | |
| Cleanup(); | |
| } | |
| DtxResourceManager::~DtxResourceManager() { | |
| GC::SuppressFinalize(this); | |
| Cleanup(); | |
| } | |
| // Called when the DTC COM proxy sends TMDOWN to a pending XaTransaction | |
| // called once for each outstanding tx | |
| void DtxResourceManager::TmDown() { | |
| // this block is the only place where both locks are held | |
| lock l1(transactionMap); | |
| lock l2(resourceManagerMap); | |
| if (tmDown) | |
| return; | |
| tmDown = true; | |
| resourceManagerMap->Remove(this->dataSourceName); | |
| // defer cleanup until last TmDown notification received | |
| } | |
| void DtxResourceManager::Cleanup() { | |
| for each (Collections::Generic::KeyValuePair<Transaction^, XaTransaction^> kvp in transactionMap) { | |
| XaTransaction^ xaTr = kvp.Value; | |
| xaTr->ChildFinalize(); | |
| } | |
| try { | |
| if (rmCookie != 0) { | |
| // implies no recovery needed | |
| bool cleanSession = (doubtCount == 0) && (transactionMap->Count == 0); | |
| ((IDtcToXaHelperSinglePipe *)xaHelperp)->ReleaseRMCookie(rmCookie, cleanSession); | |
| rmCookie = 0; | |
| } | |
| if (xaHelperp != NULL) { | |
| ((IDtcToXaHelperSinglePipe *) xaHelperp)->Release(); | |
| xaHelperp = NULL; | |
| } | |
| if (dtcComp != NULL) { | |
| ((IUnknown *) dtcComp)->Release(); | |
| dtcComp = NULL; | |
| } | |
| if (dtxControlSession != nullptr) { | |
| dtxControlSession->Connection->Close(); | |
| } | |
| } | |
| catch (Exception^) {} | |
| } | |
| XaTransaction^ DtxResourceManager::GetXaTransaction(AmqpSession^ appSession, Transaction^ transaction) { | |
| // find or create the RM instance associated with the session's broker | |
| AmqpConnection^ connection = appSession->Connection; | |
| DtxResourceManager^ instance = connection->CachedResourceManager; | |
| // try cached rm first | |
| if (instance != nullptr) { | |
| XaTransaction^ xaTx = instance->InternalGetXaTransaction(appSession, transaction); | |
| if (xaTx != nullptr) | |
| return xaTx; | |
| else { | |
| // cached version no longer available, force new rm creation | |
| connection->CachedResourceManager = nullptr; | |
| } | |
| } | |
| lock l(resourceManagerMap); | |
| String^ dsn = connection->DataSourceName; | |
| if (!resourceManagerMap->TryGetValue(dsn, instance)) { | |
| instance = gcnew DtxResourceManager(connection->Clone()); | |
| resourceManagerMap->Add(dsn, instance); | |
| connection->CachedResourceManager = instance; | |
| } | |
| l.release(); | |
| return instance->InternalGetXaTransaction(appSession, transaction); | |
| } | |
| XaTransaction^ DtxResourceManager::InternalGetXaTransaction(AmqpSession^ appSession, Transaction^ transaction) { | |
| // find or create the tx proxy instance associated with the DTC transaction | |
| lock l(transactionMap); | |
| if (tmDown) | |
| return nullptr; | |
| XaTransaction^ xaTransaction = nullptr; | |
| if (!transactionMap->TryGetValue(transaction, xaTransaction)) { | |
| xaTransaction = gcnew XaTransaction(transaction, (IDtcToXaHelperSinglePipe *) xaHelperp, rmCookie, this); | |
| transactionMap->Add(transaction, xaTransaction); | |
| } | |
| return xaTransaction; | |
| } | |
| void DtxResourceManager::Complete(Transaction ^tx) { | |
| lock l(transactionMap); | |
| transactionMap->Remove(tx); | |
| if (tmDown && (transactionMap->Count == 0)) { | |
| // no more activity on this instance | |
| GC::SuppressFinalize(this); | |
| Cleanup(); | |
| } | |
| } | |
| void DtxResourceManager::IncrementDoubt() { | |
| Interlocked::Increment(doubtCount); | |
| } | |
| void DtxResourceManager::DecrementDoubt() { | |
| Interlocked::Decrement(doubtCount); | |
| } | |
| #ifdef QPID_RECOVERY_TEST_HOOK | |
| void DtxResourceManager::ForceRecovery(Transaction ^tx) { | |
| lock l(resourceManagerMap); | |
| for each (Collections::Generic::KeyValuePair<System::String^, DtxResourceManager^> kvp in resourceManagerMap) { | |
| Collections::Generic::Dictionary<Transaction^, XaTransaction^>^ txmap = kvp.Value->transactionMap; | |
| XaTransaction^ xaTransaction = nullptr; | |
| lock l2(txmap); | |
| if (txmap->TryGetValue(tx, xaTransaction)) { | |
| xaTransaction->ForceRecovery(); | |
| } | |
| } | |
| } | |
| #endif | |
| }}} // namespace Apache::Qpid::Interop |