blob: 82c920cf47c06ee5f6dfe6ec572ed13e78997dd8 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "SubscriptionImpl.h"
#include "MessageImpl.h"
#include "CompletionImpl.h"
#include "SubscriptionManager.h"
#include "SubscriptionSettings.h"
#include "HandlePrivate.h"
#include "PrivateImplPrivate.h"
namespace qpid {
namespace client {
using sys::Mutex;
using framing::MessageAcquireResult;
SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
: manager(m), name(n), queue(q), settings(s), listener(l)
{}
void SubscriptionImpl::subscribe()
{
async(manager.getSession()).messageSubscribe(
arg::queue=queue,
arg::destination=name,
arg::acceptMode=settings.acceptMode,
arg::acquireMode=settings.acquireMode,
arg::exclusive=settings.exclusive);
setFlowControl(settings.flowControl);
}
std::string SubscriptionImpl::getName() const { return name; }
std::string SubscriptionImpl::getQueue() const { return queue; }
const SubscriptionSettings& SubscriptionImpl::getSettings() const {
Mutex::ScopedLock l(lock);
return settings;
}
void SubscriptionImpl::setFlowControl(const FlowControl& f) {
Mutex::ScopedLock l(lock);
AsyncSession s=manager.getSession();
if (&settings.flowControl != &f) settings.flowControl = f;
s.messageSetFlowMode(name, f.window);
s.messageFlow(name, CREDIT_UNIT_MESSAGE, f.messages);
s.messageFlow(name, CREDIT_UNIT_BYTE, f.bytes);
s.sync();
}
void SubscriptionImpl::grantCredit(framing::message::CreditUnit unit, uint32_t value) {
async(manager.getSession()).messageFlow(name, unit, value);
}
void SubscriptionImpl::setAutoAck(size_t n) {
Mutex::ScopedLock l(lock);
settings.autoAck = n;
}
SequenceSet SubscriptionImpl::getUnacquired() const { Mutex::ScopedLock l(lock); return unacquired; }
SequenceSet SubscriptionImpl::getUnaccepted() const { Mutex::ScopedLock l(lock); return unaccepted; }
void SubscriptionImpl::acquire(const SequenceSet& messageIds) {
Mutex::ScopedLock l(lock);
MessageAcquireResult result = manager.getSession().messageAcquire(messageIds);
unacquired.remove(result.getTransfers());
if (settings.acceptMode == ACCEPT_MODE_EXPLICIT)
unaccepted.add(result.getTransfers());
}
void SubscriptionImpl::accept(const SequenceSet& messageIds) {
Mutex::ScopedLock l(lock);
manager.getSession().messageAccept(messageIds);
unaccepted.remove(messageIds);
switch (settings.completionMode) {
case COMPLETE_ON_ACCEPT:
manager.getSession().markCompleted(messageIds, true);
break;
case COMPLETE_ON_DELIVERY:
manager.getSession().sendCompletion();
break;
default://do nothing
break;
}
}
void SubscriptionImpl::release(const SequenceSet& messageIds) {
Mutex::ScopedLock l(lock);
manager.getSession().messageRelease(messageIds);
if (settings.acceptMode == ACCEPT_MODE_EXPLICIT)
unaccepted.remove(messageIds);
}
Session SubscriptionImpl::getSession() const { return manager.getSession(); }
SubscriptionManager& SubscriptionImpl::getSubscriptionManager() const { return manager; }
void SubscriptionImpl::cancel() { manager.cancel(name); }
void SubscriptionImpl::received(Message& m) {
Mutex::ScopedLock l(lock);
if (privateImplGetPtr(m)->getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED)
unacquired.add(m.getId());
else if (privateImplGetPtr(m)->getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
unaccepted.add(m.getId());
if (listener) {
Mutex::ScopedUnlock u(lock);
listener->received(m);
}
if (settings.completionMode == COMPLETE_ON_DELIVERY) {
manager.getSession().markCompleted(m.getId(), false, false);
}
if (settings.autoAck) {
if (unaccepted.size() >= settings.autoAck) {
async(manager.getSession()).messageAccept(unaccepted);
switch (settings.completionMode) {
case COMPLETE_ON_ACCEPT:
manager.getSession().markCompleted(unaccepted, true);
break;
case COMPLETE_ON_DELIVERY:
manager.getSession().sendCompletion();
break;
default://do nothing
break;
}
unaccepted.clear();
}
}
}
Demux::QueuePtr SubscriptionImpl::divert()
{
demuxRule = std::auto_ptr<ScopedDivert>(new ScopedDivert(name, manager.getSession().getExecution().getDemux()));
return demuxRule->getQueue();
}
void SubscriptionImpl::cancelDiversion() {
demuxRule.reset();
}
}} // namespace qpid::client