blob: 7e9a1219aea88b5fb6962b7db0127a90e86bde7b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "Core.h"
#include "MessageHandler.h"
#include "BrokerContext.h"
#include "EventHandler.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/Buffer.h"
#include "qpid/sys/Thread.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
namespace qpid {
namespace cluster {
using namespace broker;
MessageHandler::MessageHandler(EventHandler& e) :
HandlerBase(e),
broker(e.getCore().getBroker())
{}
bool MessageHandler::invoke(const framing::AMQBody& body) {
return framing::invoke(*this, body).wasHandled();
}
void MessageHandler::routing(RoutingId routingId, const std::string& message) {
if (sender() == self()) return; // Already in getCore().getRoutingMap()
boost::intrusive_ptr<Message> msg = new Message;
// FIXME aconway 2010-10-28: decode message in bounded-size buffers.
framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
msg->decodeHeader(buf);
msg->decodeContent(buf);
memberMap[sender()].routingMap[routingId] = msg;
}
boost::shared_ptr<broker::Queue> MessageHandler::findQueue(
const std::string& q, const char* msg)
{
boost::shared_ptr<Queue> queue = broker.getQueues().find(q);
if (!queue) throw Exception(QPID_MSG(msg << ": unknown queue " << q));
return queue;
}
void MessageHandler::enqueue(RoutingId routingId, const std::string& q) {
boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
boost::intrusive_ptr<Message> msg;
if (sender() == self())
msg = eventHandler.getCore().getRoutingMap().get(routingId);
else
msg = memberMap[sender()].routingMap[routingId];
if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q
<< " failed: unknown message"));
BrokerContext::ScopedSuppressReplication ssr;
queue->deliver(msg);
}
void MessageHandler::routed(RoutingId routingId) {
if (sender() == self())
eventHandler.getCore().getRoutingMap().erase(routingId);
else
memberMap[sender()].routingMap.erase(routingId);
}
void MessageHandler::acquire(const std::string& q, uint32_t position) {
// Note acquires from other members. My own acquires were executed in
// the connection thread
if (sender() != self()) {
// FIXME aconway 2010-10-28: need to store acquired messages on QueueContext
// by broker for possible re-queuing if a broker leaves.
boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
QueuedMessage qm;
BrokerContext::ScopedSuppressReplication ssr;
bool ok = queue->acquireMessageAt(position, qm);
(void)ok; // Avoid unused variable warnings.
assert(ok); // FIXME aconway 2011-08-04: failing this assertion.
assert(qm.position.getValue() == position);
assert(qm.payload);
}
}
void MessageHandler::dequeue(const std::string& q, uint32_t /*position*/) {
if (sender() == self()) {
// FIXME aconway 2010-10-28: we should complete the ack that initiated
// the dequeue at this point, see BrokerContext::dequeue
return;
}
boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
BrokerContext::ScopedSuppressReplication ssr;
// FIXME aconway 2011-05-12: Remove the acquired message from QueueContext.
// Do we need to call this? Review with gsim.
// QueuedMessage qm;
// Get qm from QueueContext?
// queue->dequeue(0, qm);
}
void MessageHandler::release(const std::string& /*queue*/ , uint32_t /*position*/) {
// FIXME aconway 2011-05-24:
}
}} // namespace qpid::cluster