blob: 20f32a1b372ba273f3e6436f3c66b4f502ff4f46 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/amqp/Outgoing.h"
#include "qpid/broker/amqp/Exception.h"
#include "qpid/broker/amqp/Header.h"
#include "qpid/broker/amqp/Session.h"
#include "qpid/broker/amqp/Translation.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Selector.h"
#include "qpid/broker/TopicKeyNode.h"
#include "qpid/sys/OutputControl.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/amqp/Descriptor.h"
#include "qpid/amqp/MessageEncoder.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include "config.h"
namespace qpid {
namespace broker {
namespace amqp {
Outgoing::Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name)
: ManagedOutgoingLink(broker, parent, source, target, name), session(parent) {}
void Outgoing::wakeup()
{
session.wakeup();
}
namespace {
bool requested_reliable(pn_link_t* link)
{
return pn_link_remote_snd_settle_mode(link) == PN_SND_UNSETTLED;
}
bool requested_unreliable(pn_link_t* link)
{
return pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED;
}
}
OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session,
qpid::sys::OutputControl& o, SubscriptionType type, bool e, bool p)
: Outgoing(broker, session, source, target, pn_link_name(l)),
Consumer(pn_link_name(l), type, target),
exclusive(e),
isControllingUser(p),
queue(q), deliveries(5000), link(l), out(o),
current(0),
buffer(1024)/*used only for header at present*/,
//for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested
unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)),
cancelled(false),
trackingUndeliverableMessages(false)
{
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
deliveries[i].init(i);
}
if (isControllingUser) queue->markInUse(true);
}
void OutgoingFromQueue::init()
{
queue->consume(shared_from_this(), exclusive);//may throw exception
}
bool OutgoingFromQueue::doWork()
{
QPID_LOG(trace, "Dispatching to " << getName() << ": " << pn_link_credit(link));
if (canDeliver()) {
try{
if (queue->dispatch(shared_from_this())) {
return true;
} else {
pn_link_drained(link);
QPID_LOG(trace, "No message available on " << queue->getName());
}
} catch (const qpid::framing::ResourceDeletedException& e) {
throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, e.what());
}
} else {
QPID_LOG(trace, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link));
}
return false;
}
void OutgoingFromQueue::write(const char* data, size_t size)
{
pn_link_send(link, data, size);
}
void OutgoingFromQueue::handle(pn_delivery_t* delivery)
{
size_t i = Record::getIndex(pn_delivery_tag(delivery));
Record& r = deliveries[i];
if (r.delivery && pn_delivery_updated(delivery)) {
assert(r.delivery == delivery);
r.disposition = pn_delivery_remote_state(delivery);
std::pair<TxBuffer*,uint64_t> txn = session.getTransactionalState(delivery);
if (txn.first) {
r.disposition = txn.second;
}
if (!r.disposition && pn_delivery_settled(delivery)) {
//if peer has settled without setting state, assume accepted
r.disposition = PN_ACCEPTED;
}
if (r.disposition) {
switch (r.disposition) {
case PN_ACCEPTED:
if (preAcquires()) queue->dequeue(r.cursor, txn.first);
outgoingMessageAccepted();
break;
case PN_REJECTED:
if (preAcquires()) queue->reject(r.cursor);
outgoingMessageRejected();
break;
case PN_RELEASED:
if (preAcquires()) queue->release(r.cursor, false);//for PN_RELEASED, delivery count should not be incremented
outgoingMessageRejected();//TODO: not quite true...
break;
case PN_MODIFIED:
if (preAcquires()) {
//TODO: handle message-annotations
if (pn_disposition_is_undeliverable(pn_delivery_remote(delivery))) {
if (!trackingUndeliverableMessages) {
// observe queue for changes to track undeliverable messages
queue->getObservers().add(
boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this()));
trackingUndeliverableMessages = true;
}
undeliverableMessages.add(r.msg.getSequence());
}
queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery)));
}
outgoingMessageRejected();//TODO: not quite true...
break;
default:
QPID_LOG(warning, "Unhandled disposition: " << r.disposition);
}
//TODO: only settle once any dequeue on store has completed
pn_delivery_settle(delivery);
r.reset();
}
}
}
bool OutgoingFromQueue::canDeliver()
{
return deliveries[current].delivery == 0 && pn_link_credit(link);
}
void OutgoingFromQueue::detached(bool closed)
{
QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName());
if (trackingUndeliverableMessages) {
// stop observation of the queue
queue->getObservers().remove(
boost::dynamic_pointer_cast<OutgoingFromQueue>(shared_from_this()));
}
queue->cancel(shared_from_this());
//TODO: release in a clearer order?
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
if (deliveries[i].msg) queue->release(deliveries[i].cursor, true);
}
if (exclusive) {
queue->releaseExclusiveOwnership(closed);
} else if (isControllingUser) {
queue->releaseFromUse(true);
}
cancelled = true;
}
OutgoingFromQueue::~OutgoingFromQueue()
{
if (!cancelled && isControllingUser) queue->releaseFromUse(true);
}
//Consumer interface:
bool OutgoingFromQueue::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg)
{
Record& r = deliveries[current++];
if (current >= deliveries.capacity()) current = 0;
r.cursor = cursor;
r.msg = msg;
r.delivery = pn_delivery(link, r.tag);
//write header
qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
encoder.writeHeader(Header(r.msg));
write(&buffer[0], encoder.getPosition());
Translation t(r.msg);
t.write(*this);
if (pn_link_advance(link)) {
if (unreliable) pn_delivery_settle(r.delivery);
outgoingMessageSent();
QPID_LOG(debug, "Sent message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
} else {
QPID_LOG(error, "Failed to send message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
}
if (unreliable) {
if (preAcquires()) queue->dequeue(0, r.cursor);
r.reset();
}
QPID_LOG(debug, "Requested delivery of " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
return true;
}
void OutgoingFromQueue::notify()
{
QPID_LOG(trace, "Notification received for " << queue->getName());
out.activateOutput();
}
bool OutgoingFromQueue::accept(const qpid::broker::Message&)
{
return true;
}
void OutgoingFromQueue::setSubjectFilter(const std::string& f)
{
subjectFilter = f;
}
void OutgoingFromQueue::setSelectorFilter(const std::string& f)
{
selector.reset(new Selector(f));
}
namespace {
bool match(TokenIterator& filter, TokenIterator& target)
{
bool wild = false;
while (!filter.finished())
{
if (filter.match1('*')) {
if (target.finished()) return false;
//else move to next word in filter target
filter.next();
target.next();
} else if (filter.match1('#')) {
// i.e. filter word is '#' which can match a variable number of words in the target
filter.next();
if (filter.finished()) return true;
else if (target.finished()) return false;
wild = true;
} else {
//filter word needs to match target exactly
if (target.finished()) return false;
std::string word;
target.pop(word);
if (filter.match(word)) {
wild = false;
filter.next();
} else if (!wild) {
return false;
}
}
}
return target.finished();
}
bool match(const std::string& filter, const std::string& target)
{
TokenIterator lhs(filter);
TokenIterator rhs(target);
return match(lhs, rhs);
}
}
bool OutgoingFromQueue::filter(const qpid::broker::Message& m)
{
if (undeliverableMessages.contains(m.getSequence())) return false;
return (subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey()))
&& (!selector || selector->filter(m));
}
void OutgoingFromQueue::cancel() {}
void OutgoingFromQueue::acknowledged(const qpid::broker::DeliveryRecord&) {}
qpid::broker::OwnershipToken* OutgoingFromQueue::getSession()
{
return 0;
}
OutgoingFromQueue::Record::Record() : delivery(0), disposition(0), index(0)
{
#ifdef NO_PROTON_DELIVERY_TAG_T
tag.start = tagData;
#else
tag.bytes = tagData;
#endif
tag.size = TAG_WIDTH;
}
void OutgoingFromQueue::Record::init(size_t i)
{
index = i;
qpid::framing::Buffer buffer(tagData, tag.size);
assert(index <= std::numeric_limits<uint32_t>::max());
buffer.putLong(index);
}
void OutgoingFromQueue::Record::reset()
{
cursor = QueueCursor();
msg = qpid::broker::Message();
delivery = 0;
disposition = 0;
}
size_t OutgoingFromQueue::Record::getIndex(pn_delivery_tag_t t)
{
assert(t.size == TAG_WIDTH);
#ifdef NO_PROTON_DELIVERY_TAG_T
qpid::framing::Buffer buffer(const_cast<char*>(t.start)/*won't ever be written to*/, t.size);
#else
qpid::framing::Buffer buffer(const_cast<char*>(t.bytes)/*won't ever be written to*/, t.size);
#endif
return (size_t) buffer.getLong();
}
boost::shared_ptr<Queue> OutgoingFromQueue::getExclusiveSubscriptionQueue(Outgoing* o)
{
OutgoingFromQueue* s = dynamic_cast<OutgoingFromQueue*>(o);
if (s && s->exclusive) return s->queue;
else return boost::shared_ptr<Queue>();
}
void OutgoingFromQueue::dequeued(const qpid::broker::Message &m)
{
if (undeliverableMessages.contains(m.getSequence())) {
undeliverableMessages.remove(m.getSequence());
}
}
}}} // namespace qpid::broker::amqp