blob: c8997877c5633dea2c8ad3fe2e6d639e4b025261 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "Core.h"
#include "MessageHandler.h"
#include "BrokerContext.h"
#include "QueueContext.h"
#include "EventHandler.h"
#include "PrettyId.h"
#include "Group.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/sys/Thread.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
namespace qpid {
namespace cluster {
using namespace broker;
using namespace framing;
MessageHandler::MessageHandler(Group& g, Core& c) :
HandlerBase(g.getEventHandler()),
broker(c.getBroker()),
core(c),
messageBuilders(g.getMessageBuilders()),
messageHolder(g.getMessageHolder())
{}
bool MessageHandler::handle(const framing::AMQFrame& frame) {
assert(frame.getBody());
const AMQBody& body = *frame.getBody();
if (framing::invoke(*this, body).wasHandled()) return true;
// Test for message frame
if (body.type() == HEADER_BODY || body.type() == CONTENT_BODY ||
(body.getMethod() && body.getMethod()->isA<MessageTransferBody>()))
{
boost::shared_ptr<broker::Queue> queue;
boost::intrusive_ptr<broker::Message> message;
if (sender() == self())
messageHolder.check(frame, queue, message);
else
messageBuilders.handle(sender(), frame, queue, message);
if (message) {
BrokerContext::ScopedSuppressReplication ssr;
queue->deliver(message);
if (sender() == self()) // Async completion
message->getIngressCompletion().finishCompleter();
}
return true;
}
return false;
}
boost::shared_ptr<broker::Queue> MessageHandler::findQueue(
const std::string& q, const char* msg)
{
boost::shared_ptr<Queue> queue = broker.getQueues().find(q);
if (!queue) throw Exception(QPID_MSG(msg << ": unknown queue " << q));
return queue;
}
void MessageHandler::enqueue(const std::string& q, uint16_t channel) {
// We only need to build message from other brokers, our own messages
// are held by the MessageHolder.
if (sender() != self()) {
boost::shared_ptr<Queue> queue = findQueue(q, "cluster enqueue");
messageBuilders.announce(sender(), channel, queue);
}
}
void MessageHandler::acquire(const std::string& q, uint32_t position) {
// FIXME aconway 2011-09-15: systematic logging across cluster module.
QPID_LOG(trace, "cluster message " << q << "[" << position
<< "] acquired by " << PrettyId(sender(), self()));
// Note acquires from other members. My own acquires were executed in
// the broker thread
if (sender() != self()) {
boost::shared_ptr<Queue> queue = findQueue(q, "cluster acquire");
QueueContext::get(*queue)->acquire(position);
}
}
void MessageHandler::dequeue(const std::string& q, uint32_t position) {
// FIXME aconway 2011-09-15: systematic logging across cluster module.
QPID_LOG(trace, "cluster message " << q << "[" << position
<< "] dequeued by " << PrettyId(sender(), self()));
// FIXME aconway 2010-10-28: for local dequeues, we should
// complete the ack that initiated the dequeue at this point, see
// BrokerContext::dequeue
// My own dequeues were processed in the broker thread before multicasting.
if (sender() != self()) {
boost::shared_ptr<Queue> queue = findQueue(q, "cluster dequeue");
QueueContext::get(*queue)->dequeue(position);
}
}
void MessageHandler::requeue(const std::string& q, uint32_t position, bool redelivered) {
if (sender() != self()) {
boost::shared_ptr<Queue> queue = findQueue(q, "cluster requeue");
QueueContext::get(*queue)->requeue(position, redelivered);
}
}
}} // namespace qpid::cluster