blob: 1bc9a15d920758a01f44b3b25c121ad00bd2d2d8 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <windows.h>
#include <msclr\lock.h>
#include <oletx2xa.h>
#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
#include "qpid/client/MessageListener.h"
#include "qpid/framing/FrameSet.h"
#include "AmqpConnection.h"
#include "AmqpSession.h"
#include "QpidMarshal.h"
#include "QpidException.h"
#include "DtxResourceManager.h"
#include "XaTransaction.h"
namespace Apache {
namespace Qpid {
namespace Interop {
using namespace System;
using namespace System::Runtime::InteropServices;
using namespace msclr;
using namespace qpid::client;
using namespace std;
// Note on locks: Use thisLock for fast counting and idle/busy
// notifications. Use the "sessions" list to serialize session
// creation/reaping and overall tear down.
AmqpConnection::AmqpConnection(String^ server, int port) :
connectionp(NULL),
busyCount(0),
disposed(false)
{
initialize (server, port, false, false, nullptr, nullptr);
}
AmqpConnection::AmqpConnection(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password) :
connectionp(NULL),
busyCount(0),
disposed(false)
{
initialize (server, port, ssl, saslPlain, username, password);
}
void AmqpConnection::initialize(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password)
{
if (server == nullptr)
throw gcnew ArgumentNullException("AMQP server");
if (saslPlain) {
if (username == nullptr)
throw gcnew ArgumentNullException("username");
if (username == nullptr)
throw gcnew ArgumentNullException("password");
}
bool success = false;
System::Exception^ openException = nullptr;
sessions = gcnew Collections::Generic::List<AmqpSession^>();
thisLock = gcnew Object();
try {
connectionp = new Connection;
if (ssl || saslPlain) {
ConnectionSettings proposedSettings;
proposedSettings.host = QpidMarshal::ToNative(server);
proposedSettings.port = port;
if (ssl)
proposedSettings.protocol = "ssl";
if (saslPlain) {
proposedSettings.username = QpidMarshal::ToNative(username);
proposedSettings.password = QpidMarshal::ToNative(password);
proposedSettings.mechanism = "PLAIN";
}
connectionp->open (proposedSettings);
}
else {
connectionp->open (QpidMarshal::ToNative(server), port);
}
// TODO: registerFailureCallback for failover
success = true;
const ConnectionSettings& settings = connectionp->getNegotiatedSettings();
this->maxFrameSize = settings.maxFrameSize;
this->host = server;
this->port = port;
this->ssl = ssl;
this->saslPlain = saslPlain;
this->username = username;
this->password = password;
this->isOpen = true;
} catch (const qpid::Exception& error) {
String^ errmsg = gcnew String(error.what());
openException = gcnew QpidException(errmsg);
} finally {
if (!success) {
Cleanup();
if (openException == nullptr) {
openException = gcnew QpidException ("unknown connection failure");
}
throw openException;
}
}
}
AmqpConnection^ AmqpConnection::Clone() {
if (disposed)
throw gcnew ObjectDisposedException("AmqpConnection.Clone");
return gcnew AmqpConnection (this->host, this->port, this->ssl, this->saslPlain, this->username, this->password);
}
void AmqpConnection::Cleanup()
{
{
lock l(sessions);
if (disposed)
return;
disposed = true;
}
try {
// let the child sessions clean up
for each(AmqpSession^ s in sessions) {
s->ConnectionClosed();
}
}
finally
{
if (connectionp != NULL) {
isOpen = false;
connectionp->close();
delete connectionp;
connectionp = NULL;
}
}
}
AmqpConnection::~AmqpConnection()
{
Cleanup();
}
AmqpConnection::!AmqpConnection()
{
Cleanup();
}
void AmqpConnection::Close()
{
// Simulate Dispose()...
Cleanup();
GC::SuppressFinalize(this);
}
AmqpSession^ AmqpConnection::CreateSession()
{
lock l(sessions);
if (disposed) {
throw gcnew ObjectDisposedException("AmqpConnection");
}
AmqpSession^ session = gcnew AmqpSession(this, connectionp);
sessions->Add(session);
return session;
}
// called whenever a child session becomes newly busy (a first reader or writer since last idle)
void AmqpConnection::NotifyBusy()
{
bool changed = false;
{
lock l(thisLock);
if (busyCount++ == 0)
changed = true;
}
}
// called whenever a child session becomes newly idle (a last reader or writer has closed)
// The connection is idle when none of its child sessions are busy
void AmqpConnection::NotifyIdle()
{
bool connectionIdle = false;
{
lock l(thisLock);
if (--busyCount == 0)
connectionIdle = true;
}
if (connectionIdle) {
OnConnectionIdle(this, System::EventArgs::Empty);
}
}
void HexAppend(StringBuilder^ sb, String^ s) {
if (s->Length > 0) {
array<unsigned char>^ bytes = Encoding::UTF8->GetBytes(s);
for each (unsigned char b in bytes) {
sb->Append(String::Format("{0:x2}", b));
}
}
sb->Append(".");
}
// Note: any change to this format has to be reflected in the DTC plugin's xa_open()
// for now: "QPIDdsnV2.port.host.instance_id.SSL_tf.SASL_mech.username.password"
// This extended info is needed so that the DTC can make a separate connection to the broker
// for recovery.
String^ AmqpConnection::DataSourceName::get() {
if (dataSourceName == nullptr) {
StringBuilder^ sb = gcnew StringBuilder();
sb->Append("QPIDdsnV2.");
sb->Append(this->port);
sb->Append(".");
HexAppend(sb, this->host);
sb->Append(System::Diagnostics::Process::GetCurrentProcess()->Id);
sb->Append("-");
sb->Append(AppDomain::CurrentDomain->Id);
sb->Append(".");
if (this->ssl)
sb->Append("T");
else
sb->Append("F");
sb->Append(".");
if (this->saslPlain) {
sb->Append("P.");
HexAppend(sb, this->username);
HexAppend(sb, this->password);
}
else {
// SASL anonymous
sb->Append("A.");
}
dataSourceName = sb->ToString();
}
return dataSourceName;
}
}}} // namespace Apache::Qpid::Interop