blob: f8189df0dde53f9badecda658ef845d6c963c18b [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <windows.h>
#include <msclr\lock.h>
#include "qpid/client/AsyncSession.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/FrameSet.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
#include "qpid/client/MessageListener.h"
#include "qpid/client/Demux.h"
#include "qpid/client/SessionImpl.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "MessageBodyStream.h"
#include "AmqpMessage.h"
#include "AmqpSession.h"
#include "InputLink.h"
#include "QpidMarshal.h"
#include "QpidException.h"
namespace Apache {
namespace Qpid {
namespace Interop {
using namespace System;
using namespace System::Runtime::InteropServices;
using namespace System::Threading;
using namespace msclr;
using namespace qpid::client;
using namespace qpid::framing;
using namespace std;
using namespace Apache::Qpid::AmqpTypes;
// Scalability note: When using async methods, an async helper thread is created
// to block on the Demux BlockingQueue. This design should be revised in line
// with proposed changes to the native library to reduce the number of servicing
// threads for large numbers of subscriptions.
// synchronization is accomplished with locks, but also by ensuring that only one
// MessageWaiter (the one at the front of the line) is ever active.
// async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch
// thread (who deposits FrameSets into the local queue and is oblivious to the
// managed space locks).
// The folowing def must match the "Frames" private typedef.
// TODO, make Qpid-cpp "Frames" definition visible.
typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames;
InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue,
qpid::client::AsyncSession *qpidSessionp, qpid::client::SubscriptionManager *qpidSubsMgrp,
bool exclusive,
bool temporary, System::String^ filterKey, System::String^ exchange) :
amqpSession(session),
subscriptionp(NULL),
localQueuep(NULL),
queuePtrp(NULL),
dequeuedFrameSetpp(NULL),
disposed(false),
finalizing(false)
{
bool success = false;
System::Exception^ linkException = nullptr;
waiters = gcnew Collections::Generic::List<MessageWaiter^>();
linkLock = waiters; // private and available
subscriptionLock = gcnew Object();
qpidAddress = QpidAddress::CreateAddress(sourceQueue, true);
qpidAddress->ResolveLink(session);
browsing = qpidAddress->Browsing;
try {
std::string qname = QpidMarshal::ToNative(qpidAddress->LinkName);
if (temporary) {
qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true);
qpidSessionp->exchangeBind(arg::exchange=QpidMarshal::ToNative(exchange),
arg::queue=qname, arg::bindingKey=QpidMarshal::ToNative(filterKey));
qpidSessionp->sync();
}
localQueuep = new LocalQueue;
SubscriptionSettings settings;
settings.flowControl = FlowControl::messageCredit(0);
settings.completionMode = CompletionMode::MANUAL_COMPLETION;
if (browsing) {
settings.acquireMode = AcquireMode::ACQUIRE_MODE_NOT_ACQUIRED;
settings.acceptMode = AcceptMode::ACCEPT_MODE_NONE;
}
else {
settings.acquireMode = AcquireMode::ACQUIRE_MODE_PRE_ACQUIRED;
settings.acceptMode = AcceptMode::ACCEPT_MODE_EXPLICIT;
}
Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings);
subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup
// the roundabout way to obtain localQueuep->queue
SessionBase_0_10Access sa(*qpidSessionp);
boost::shared_ptr<SessionImpl> simpl = sa.get();
queuePtrp = new Demux::QueuePtr(simpl->getDemux().get(sub.getName()));
success = true;
} finally {
if (!success) {
Cleanup();
linkException = gcnew QpidException ("InputLink creation failure");
throw linkException;
}
}
}
// called with lock held
void InputLink::ReleaseNative()
{
// involves talking to the Broker unless the connection is broken
if ((subscriptionp != NULL) && !finalizing) {
// TODO: find boost time error on cleanup when in finalizer thread
try {
subscriptionp->cancel();
}
catch (const std::exception& error) {
// TODO: log this properly
std::cout << "shutdown error " << error.what() << std::endl;
}
}
// free native mem (or smart pointers) that we own
if (subscriptionp != NULL) {
delete subscriptionp;
subscriptionp = NULL;
}
if (queuePtrp != NULL) {
delete queuePtrp;
queuePtrp = NULL;
}
if (localQueuep != NULL) {
if (!finalizing) {
// TODO: find boost time error on cleanup when in finalizer thread
delete localQueuep;
localQueuep = NULL;
}
}
if (dequeuedFrameSetpp != NULL) {
delete dequeuedFrameSetpp;
dequeuedFrameSetpp = NULL;
}
}
void InputLink::Cleanup()
{
{
lock l(linkLock);
if (disposed)
return;
disposed = true;
// if the asyncHelper exists and is idle, unblock it
if (asyncHelperWaitHandle != nullptr) {
asyncHelperWaitHandle->Set();
}
// wakeup anyone waiting for messages
if (queuePtrp != NULL)
(*queuePtrp)->close();
// wait for any sync operations on the subscription to complete before ReleaseNative
lock l2(subscriptionLock);
try {}
finally
{
ReleaseNative();
}
}
// Now that subscription is torn down, we can execute pending delete on remote node
qpidAddress->CleanupLink(amqpSession);
amqpSession->NotifyClosed();
}
InputLink::~InputLink()
{
Cleanup();
}
InputLink::!InputLink()
{
finalizing = true;
Cleanup();
}
void InputLink::Close()
{
// Simulate Dispose()...
Cleanup();
GC::SuppressFinalize(this);
}
// call with lock held
bool InputLink::haveMessage()
{
if (dequeuedFrameSetpp != NULL)
return true;
if (queuePtrp != NULL) {
if ((*queuePtrp)->size() > 0)
return true;
}
return false;
}
IntPtr InputLink::nextLocalMessage()
{
lock l(linkLock);
if (disposed)
return (IntPtr) NULL;
// A message already pulled off BlockingQueue?
if (dequeuedFrameSetpp != NULL) {
QpidFrameSetPtr* rv = dequeuedFrameSetpp;
dequeuedFrameSetpp = NULL;
return (IntPtr) rv;
}
if ((*queuePtrp)->empty())
return (IntPtr) NULL;
bool received = false;
QpidFrameSetPtr* frameSetpp = new QpidFrameSetPtr;
try {
received = (*queuePtrp)->pop(*frameSetpp, qpid::sys::TIME_INFINITE);
if (received) {
QpidFrameSetPtr* rv = frameSetpp;
// no need to free native in finally block
frameSetpp = NULL;
return (IntPtr) rv;
}
} catch(const std::exception& error) {
// should be no async tampering with queue since we hold the lock and have a
// smart pointer ref to the native LocalQueue, even if the network connection fails...
cout << "unknown exception in InputLink.nextLocalMessage() " << error.what() <<endl;
// TODO: log this
}
finally {
if (frameSetpp != NULL) {
delete frameSetpp;
}
}
return (IntPtr) NULL;
}
void InputLink::unblockWaiter()
{
// to be followed by resetQueue() below
lock l(linkLock);
if (disposed)
return;
(*queuePtrp)->close();
}
// Set things right after unblockWaiter(). Closing and opening a Qpid BlockingQueue unsticks
// a blocking thread without interefering with queue contents or the ability to push
// new incoming messages.
void InputLink::resetQueue()
{
lock l(linkLock);
if (disposed)
return;
if ((*queuePtrp)->isClosed()) {
(*queuePtrp)->open();
}
}
// returns true if there is a message to consume, i.e. nextLocalMessage() won't block
bool InputLink::internalWaitForMessage()
{
Demux::QueuePtr demuxQueuePtr;
bool received = false;
QpidFrameSetPtr* frameSetpp = NULL;
try {
lock l(linkLock);
if (disposed)
return false;
if (haveMessage())
return true;
AdjustCredit();
// get a scoped smart ptr ref to guard against async close or hangup
demuxQueuePtr = *queuePtrp;
frameSetpp = new QpidFrameSetPtr;
l.release();
// Async cleanup is now possible. Only use demuxQueuePtr until lock reacquired.
received = demuxQueuePtr->pop(*frameSetpp, qpid::sys::TIME_INFINITE);
l.acquire();
if (received) {
dequeuedFrameSetpp = frameSetpp;
frameSetpp = NULL; // native will eventually be freed in Cleanup or MessageBodyStream
}
return true;
} catch(const std::exception& ) {
// timeout or connection closed
return false;
}
finally {
if (frameSetpp != NULL) {
delete frameSetpp;
}
}
return false;
}
// call with lock held
void InputLink::addWaiter(MessageWaiter^ waiter)
{
waiters->Add(waiter);
if (waiters->Count == 1) {
// mark this waiter as ready to run
// Only the waiter at the head of the queue is active.
waiter->Activate();
}
if (waiter->Assigned)
return;
if (asyncHelperWaitHandle == nullptr) {
asyncHelperWaitHandle = gcnew ManualResetEvent(false);
ThreadStart^ threadDelegate = gcnew ThreadStart(this, &InputLink::asyncHelper);
(gcnew Thread(threadDelegate))->Start();
}
if (waiters->Count == 1) {
// wake up the asyncHelper
asyncHelperWaitHandle->Set();
}
}
void InputLink::removeWaiter(MessageWaiter^ waiter) {
// a waiter can be removed from anywhere in the list if timed out
lock l(linkLock);
int idx = waiters->IndexOf(waiter);
if (idx == -1) {
// TODO: assert or log
if (asyncHelperWaitHandle != nullptr) {
// just in case.
asyncHelperWaitHandle->Set();
}
return;
}
waiters->RemoveAt(idx);
if (waiter->TimedOut) {
// may have to give back message if it arrives momentarily
AdjustCredit();
}
// let the next waiter know it's his turn.
if (waiters->Count > 0) {
MessageWaiter^ nextWaiter = waiters[0];
// wakeup the asyncHelper thread to help out if necessary.
if (!nextWaiter->Assigned) {
asyncHelperWaitHandle->Set();
}
l.release();
nextWaiter->Activate();
return;
}
else {
if (disposed && (asyncHelperWaitHandle != nullptr)) {
asyncHelperWaitHandle->Set();
}
}
}
void InputLink::asyncHelper()
{
lock l(linkLock);
while (true) {
if (disposed && (waiters->Count == 0)) {
asyncHelperWaitHandle = nullptr;
return;
}
if (waiters->Count > 0) {
MessageWaiter^ waiter = waiters[0];
l.release();
if (waiter->AcceptForWork()) {
waiter->Run();
}
l.acquire();
}
// sleep if more work may be coming or it is currently someone else's turn
if (((waiters->Count == 0) && !disposed) || ((waiters->Count != 0) && waiters[0]->Assigned)) {
// wait for something to do
asyncHelperWaitHandle->Reset();
l.release();
asyncHelperWaitHandle->WaitOne();
l.acquire();
}
}
}
void InputLink::sync()
{
// used by the MessageWaiter timeout thread to not run before fully initialized
lock l(linkLock);
}
void InputLink::PrefetchLimit::set(int value)
{
lock l(linkLock);
prefetchLimit = value;
int delta = 0;
// rough rule of thumb to keep the flow, but reduce chatter.
// for small messages, the credit request is almost as expensive as the transfer itself.
// experience may suggest a better heuristic or require a property for the low water mark
if (prefetchLimit >= 3) {
delta = prefetchLimit / 3;
}
minWorkingCredit = prefetchLimit - delta;
AdjustCredit();
}
// call with lock held
void InputLink::AdjustCredit()
{
if (creditSyncPending || disposed)
return;
// low watermark check
if ((prefetchLimit != 0) &&
(workingCredit >= minWorkingCredit) &&
(workingCredit >= waiters->Count))
return;
// should have enough for all waiters or to satisfy the prefetch window
int targetCredit = waiters->Count;
if (targetCredit < prefetchLimit)
targetCredit = prefetchLimit;
if (targetCredit > workingCredit) {
subscriptionp->grantMessageCredit(targetCredit - workingCredit);
workingCredit = targetCredit;
return;
}
if (targetCredit < workingCredit) {
if ((targetCredit == 0) && (prefetchLimit == 0)) {
creditSyncPending = true;
ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit));
}
// TODO: also shrink credit when prefetchLimit != 0
}
}
void InputLink::SyncCredit(Object ^unused)
{
lock l(linkLock);
try {
if (disposed)
return;
if (!amqpSession->MessageStop(subscriptionp->getName())) {
// connection closed
return;
}
l.release();
// use setFlowControl to re-enable credit flow on the broker.
// setFlowControl is a sync operation
{
lock l2(subscriptionLock);
if (subscriptionp != NULL) {
subscriptionp->setFlowControl(subscriptionp->getSettings().flowControl);
}
}
l.acquire();
if (disposed)
return;
// let existing waiters use up any messages that arrived.
// local queue size can only decrease until more credit is issued
while (true) {
if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) {
l.release();
// a rare use case and not used in performance oriented code.
// optimization can wait until the qpid/messaging api is used
Thread::Sleep(10);
l.acquire();
if (disposed)
return;
}
else {
break;
}
}
// At this point, the lock is held and we are fully synced with the broker
// so we have a valid snapshot
if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) {
// can't be sure application will request a message again any time soon
QpidFrameSetPtr frameSetp;
while (!(*queuePtrp)->empty()) {
(*queuePtrp)->pop(frameSetp);
SequenceSet frameSetID(frameSetp->getId());
subscriptionp->release(frameSetID);
}
// don't touch dequeuedFrameSetpp. It is spoken for: explicitely from a
// MessageWaiter about to to get the nextLocalMessage(), or implicitely
// from a WaitForMessage().
}
// TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit
workingCredit = (*queuePtrp)->size();
if (dequeuedFrameSetpp != NULL) {
workingCredit++;
}
}
finally {
creditSyncPending = false;
}
AdjustCredit();
}
AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
{
QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer();
bool ownFrameSet = true;
bool haveProperties = false;
try {
MessageBodyStream^ mstream = gcnew MessageBodyStream(fspp);
ownFrameSet = false; // stream releases on close/dispose
AmqpMessage^ amqpMessage = gcnew AmqpMessage(mstream);
AMQHeaderBody* headerBodyp = (*fspp)->getHeaders();
uint64_t contentSize = (*fspp)->getContentSize();
SequenceSet frameSetID((*fspp)->getId());
// target managed representation
AmqpProperties^ amqpProperties = gcnew AmqpProperties();
// source native representation
const DeliveryProperties* deliveryProperties = headerBodyp->get<DeliveryProperties>();
const qpid::framing::MessageProperties* messageProperties = headerBodyp->get<qpid::framing::MessageProperties>();
if (deliveryProperties) {
if (deliveryProperties->hasRoutingKey()) {
haveProperties = true;
amqpProperties->RoutingKey = gcnew String(deliveryProperties->getRoutingKey().c_str());
}
if (deliveryProperties->hasDeliveryMode()) {
if (deliveryProperties->getDeliveryMode() == qpid::framing::PERSISTENT)
amqpProperties->Durable = true;
}
if (deliveryProperties->hasTtl()) {
long long ticks = deliveryProperties->getTtl() * TimeSpan::TicksPerMillisecond;
amqpProperties->TimeToLive = Nullable<TimeSpan>(TimeSpan::FromTicks(ticks));
}
}
if (messageProperties) {
if (messageProperties->hasReplyTo()) {
haveProperties = true;
const ReplyTo& rpto = messageProperties->getReplyTo();
String^ rk = nullptr;
String^ ex = nullptr;
if (rpto.hasRoutingKey()) {
rk = gcnew String(rpto.getRoutingKey().c_str());
}
if (rpto.hasExchange()) {
ex = gcnew String(rpto.getExchange().c_str());
}
amqpProperties->SetReplyTo(ex,rk);
}
if (messageProperties->hasContentType()) {
haveProperties = true;
amqpProperties->ContentType = gcnew String(messageProperties->getContentType().c_str());
if (messageProperties->hasContentEncoding()) {
String^ enc = gcnew String(messageProperties->getContentEncoding().c_str());
if (!String::IsNullOrEmpty(enc)) {
// TODO: properly assemble 1.0 style to 0-10 for all cases
amqpProperties->ContentType += "; charset=" + enc;
}
}
}
if (messageProperties->hasCorrelationId()) {
haveProperties = true;
const std::string& ncid = messageProperties->getCorrelationId();
int len = ncid.size();
array<unsigned char>^ mcid = gcnew array<unsigned char>(len);
Marshal::Copy ((IntPtr) (void *) ncid.data(), mcid, 0, len);
amqpProperties->CorrelationId = mcid;
}
if (messageProperties->hasUserId()) {
haveProperties = true;
const std::string& nuid = messageProperties->getUserId();
int len = nuid.size();
array<unsigned char>^ muid = gcnew array<unsigned char>(len);
Marshal::Copy ((IntPtr) (void *) nuid.data(), muid, 0, len);
amqpProperties->UserId = muid;
}
if (messageProperties->hasApplicationHeaders()) {
haveProperties = true;
const qpid::framing::FieldTable& fieldTable = messageProperties->getApplicationHeaders();
int count = fieldTable.count();
if (count > 0) {
haveProperties = true;
Collections::Generic::Dictionary<System::String^, AmqpType^>^ mmap =
gcnew Collections::Generic::Dictionary<System::String^, AmqpType^>(count);
for(qpid::framing::FieldTable::ValueMap::const_iterator i = fieldTable.begin(); i != fieldTable.end(); i++) {
qpid::framing::FieldValue::Data &data = i->second->getData();
// TODO: replace these generic int/string conversions with handler for each AMQP specific type:
// uint8_t dataType = i->second->getType();
// switch (dataType) { case TYPE_CODE_STR8: ... }
if (data.convertsToInt()) {
mmap->Add (gcnew String(i->first.data()), gcnew AmqpInt((int) i->second->getData().getInt()));
}
if (data.convertsToString()) {
std::string ns = data.getString();
String^ ms = gcnew String(ns.data(), 0, ns.size());
mmap->Add (gcnew String(i->first.data()), gcnew AmqpString(ms));
}
}
amqpProperties->PropertyMap = mmap;
}
}
}
if (haveProperties) {
amqpMessage->Properties = amqpProperties;
}
// We have a message we can return to the caller.
// Tell the broker we got it.
// subscriptionp->accept(frameSetID) is a slow sync operation in the native API
// so do it within the AsyncSession directly
amqpSession->AcceptAndComplete(frameSetID, browsing);
workingCredit--;
// check if more messages need to be requested from broker
AdjustCredit();
return amqpMessage;
}
finally {
if (ownFrameSet)
delete (fspp);
}
}
// As for IInputChannel:
// if success, return true + amqpMessage
// elseif timeout, return false
// elseif closed/EOF, return true and amqpMessage = null
// else throw an Exception
bool InputLink::TryReceive(TimeSpan timeout, [Out] AmqpMessage^% amqpMessage)
{
lock l(linkLock);
if (waiters->Count == 0) {
// see if there is a message already available without blocking
IntPtr fspp = nextLocalMessage();
if (fspp.ToPointer() != NULL) {
amqpMessage = createAmqpMessage(fspp);
return true;
}
}
MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, false, nullptr, nullptr);
addWaiter(waiter);
l.release();
waiter->Run();
l.acquire();
if (waiter->TimedOut) {
return false;
}
IntPtr waiterMsg = waiter->Message;
if (waiterMsg.ToPointer() == NULL) {
if (disposed) {
// indicate normal EOF on channel
amqpMessage = nullptr;
return true;
}
}
amqpMessage = createAmqpMessage(waiterMsg);
return true;
}
IAsyncResult^ InputLink::BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state)
{
//TODO: if haveMessage() complete synchronously
lock l(linkLock);
MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, true, callback, state);
addWaiter(waiter);
return waiter;
}
bool InputLink::EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage)
{
// TODO: validate result
MessageWaiter^ waiter = (MessageWaiter ^) result;
waiter->WaitForCompletion();
if (waiter->RunException != nullptr)
throw waiter->RunException;
if (waiter->TimedOut) {
amqpMessage = nullptr;
return false;
}
IntPtr waiterMsg = waiter->Message;
if (waiterMsg.ToPointer() == NULL) {
if (disposed) {
// indicate normal EOF on channel
amqpMessage = nullptr;
return true;
}
}
amqpMessage = createAmqpMessage(waiterMsg);
return true;
}
bool InputLink::WaitForMessage(TimeSpan timeout)
{
lock l(linkLock);
if (disposed)
return false;
if (waiters->Count == 0) {
// see if there is a message already available without blocking
if (haveMessage())
return true;
}
// Same as for TryReceive, except consuming = false
MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, false, nullptr, nullptr);
addWaiter(waiter);
l.release();
waiter->Run();
l.acquire();
if (waiter->TimedOut) {
return false;
}
return haveMessage();
}
IAsyncResult^ InputLink::BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state)
{
lock l(linkLock);
// Same as for BeginTryReceive, except consuming = false
MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, true, callback, state);
addWaiter(waiter);
return waiter;
}
bool InputLink::EndWaitForMessage(IAsyncResult^ result)
{
MessageWaiter^ waiter = (MessageWaiter ^) result;
waiter->WaitForCompletion();
if (waiter->TimedOut) {
return false;
}
return haveMessage();
}
}}} // namespace Apache::Qpid::Interop