blob: 89a2bf516d7952053195eeaa1c699a51bd3884bd [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/replication/ReplicationExchange.h"
#include "qpid/replication/constants.h"
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
namespace qpid {
namespace replication {
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::replication::constants;
const std::string SEQUENCE_VALUE("qpid.replication-event.sequence");
ReplicationExchange::ReplicationExchange(const std::string& name, bool durable,
const FieldTable& _args,
QueueRegistry& qr,
Manageable* parent, Broker* broker)
: Exchange(name, durable, _args, parent, broker), queues(qr), sequence(args.getAsInt64(SEQUENCE_VALUE)), init(false)
{
args.setInt64(SEQUENCE_VALUE, sequence);
if (mgmtExchange != 0)
mgmtExchange->set_type(typeName);
}
std::string ReplicationExchange::getType() const { return typeName; }
void ReplicationExchange::route(Deliverable& msg, const std::string& /*routingKey*/, const FieldTable* args)
{
if (mgmtExchange != 0) {
mgmtExchange->inc_msgReceives();
mgmtExchange->inc_byteReceives(msg.contentSize());
}
if (args) {
int eventType = args->getAsInt(REPLICATION_EVENT_TYPE);
if (eventType) {
if (isDuplicate(args)) return;
switch (eventType) {
case ENQUEUE:
handleEnqueueEvent(args, msg);
return;
case DEQUEUE:
handleDequeueEvent(args, msg);
return;
default:
throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType));
}
}
} else {
QPID_LOG(warning, "Dropping unexpected message with no headers");
if (mgmtExchange != 0) {
mgmtExchange->inc_msgDrops();
mgmtExchange->inc_byteDrops(msg.contentSize());
}
}
}
void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable& msg)
{
std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
Queue::shared_ptr queue = queues.find(queueName);
if (queue) {
SequenceNumber seqno1(args->getAsInt(QUEUE_MESSAGE_POSITION));
// note that queue will ++ before enqueue.
if (queue->getPosition() > --seqno1) // test queue.pos < seqnumber
{
QPID_LOG(error, "Cannot enqueue replicated message. Destination Queue " << queueName << " ahead of source queue");
mgmtExchange->inc_msgDrops();
mgmtExchange->inc_byteDrops(msg.contentSize());
} else {
queue->setPosition(seqno1);
msg.getMessage().removeCustomProperty(REPLICATION_TARGET_QUEUE);
msg.getMessage().removeCustomProperty(REPLICATION_EVENT_SEQNO);
msg.getMessage().removeCustomProperty(REPLICATION_EVENT_TYPE);
msg.getMessage().removeCustomProperty(QUEUE_MESSAGE_POSITION);
msg.deliverTo(queue);
QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
if (mgmtExchange != 0) {
mgmtExchange->inc_msgRoutes();
mgmtExchange->inc_byteRoutes( msg.contentSize());
}
}
} else {
QPID_LOG(error, "Cannot enqueue replicated message. Queue " << queueName << " does not exist");
if (mgmtExchange != 0) {
mgmtExchange->inc_msgDrops();
mgmtExchange->inc_byteDrops(msg.contentSize());
}
}
}
void ReplicationExchange::handleDequeueEvent(const FieldTable* args, Deliverable& msg)
{
std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
Queue::shared_ptr queue = queues.find(queueName);
if (queue) {
SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION));
QueuedMessage dequeued;
if (queue->acquireMessageAt(position, dequeued)) {
queue->dequeue(0, dequeued);
QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position);
if (mgmtExchange != 0) {
mgmtExchange->inc_msgRoutes();
mgmtExchange->inc_byteRoutes(msg.contentSize());
}
} else {
QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName);
if (mgmtExchange != 0) {
mgmtExchange->inc_msgDrops();
mgmtExchange->inc_byteDrops(msg.contentSize());
}
}
} else {
QPID_LOG(error, "Cannot process replicated 'dequeue' event. Queue " << queueName << " does not exist");
if (mgmtExchange != 0) {
mgmtExchange->inc_msgDrops();
mgmtExchange->inc_byteDrops(msg.contentSize());
}
}
}
bool ReplicationExchange::isDuplicate(const FieldTable* args)
{
if (!args->get(REPLICATION_EVENT_SEQNO)) return false;
SequenceNumber seqno(args->getAsInt(REPLICATION_EVENT_SEQNO));
if (!init) {
init = true;
sequence = seqno;
return false;
} else if (seqno > sequence) {
if (seqno - sequence > 1) {
QPID_LOG(error, "Gap in replication event sequence between: " << sequence << " and " << seqno);
}
sequence = seqno;
return false;
} else {
QPID_LOG(info, "Duplicate detected: seqno=" << seqno << " (last seqno=" << sequence << ")");
return true;
}
}
bool ReplicationExchange::bind(Queue::shared_ptr /*queue*/, const std::string& /*routingKey*/, const FieldTable* /*args*/)
{
throw NotImplementedException("Replication exchange does not support bind operation");
}
bool ReplicationExchange::unbind(Queue::shared_ptr /*queue*/, const std::string& /*routingKey*/, const FieldTable* /*args*/)
{
throw NotImplementedException("Replication exchange does not support unbind operation");
}
bool ReplicationExchange::isBound(Queue::shared_ptr /*queue*/, const string* const /*routingKey*/, const FieldTable* const /*args*/)
{
return false;
}
const std::string ReplicationExchange::typeName("replication");
void ReplicationExchange::encode(Buffer& buffer) const
{
args.setInt64(std::string(SEQUENCE_VALUE), sequence);
Exchange::encode(buffer);
}
struct ReplicationExchangePlugin : Plugin
{
Broker* broker;
ReplicationExchangePlugin();
void earlyInitialize(Plugin::Target& target);
void initialize(Plugin::Target& target);
Exchange::shared_ptr create(const std::string& name, bool durable,
const framing::FieldTable& args,
management::Manageable* parent,
qpid::broker::Broker* broker);
};
ReplicationExchangePlugin::ReplicationExchangePlugin() : broker(0) {}
Exchange::shared_ptr ReplicationExchangePlugin::create(const std::string& name, bool durable,
const framing::FieldTable& args,
management::Manageable* parent, qpid::broker::Broker* broker)
{
Exchange::shared_ptr e(new ReplicationExchange(name, durable, args, broker->getQueues(), parent, broker));
return e;
}
void ReplicationExchangePlugin::earlyInitialize(Plugin::Target& target)
{
broker = dynamic_cast<broker::Broker*>(&target);
if (broker) {
ExchangeRegistry::FactoryFunction f = boost::bind(&ReplicationExchangePlugin::create, this, _1, _2, _3, _4, _5);
broker->getExchanges().registerType(ReplicationExchange::typeName, f);
QPID_LOG(info, "Registered replication exchange");
}
}
void ReplicationExchangePlugin::initialize(Target&) {}
static ReplicationExchangePlugin exchangePlugin;
}} // namespace qpid::replication