| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #include "qpid/replication/ReplicatingEventListener.h" |
| #include "qpid/replication/constants.h" |
| #include "qpid/broker/Broker.h" |
| #include "qpid/broker/DeliverableMessage.h" |
| #include "qpid/broker/QueueEvents.h" |
| #include "qpid/framing/AMQFrame.h" |
| #include "qpid/framing/FrameHandler.h" |
| #include "qpid/framing/MessageTransferBody.h" |
| #include "qpid/log/Statement.h" |
| |
| namespace qpid { |
| namespace replication { |
| |
| using namespace qpid::broker; |
| using namespace qpid::framing; |
| using namespace qpid::replication::constants; |
| |
| void ReplicatingEventListener::handle(QueueEvents::Event event) |
| { |
| switch (event.type) { |
| case QueueEvents::ENQUEUE: |
| deliverEnqueueMessage(event.msg); |
| QPID_LOG(debug, "Queuing 'enqueue' event on " << event.msg.queue->getName() << " for replication"); |
| break; |
| case QueueEvents::DEQUEUE: |
| deliverDequeueMessage(event.msg); |
| QPID_LOG(debug, "Queuing 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position " |
| << event.msg.position << ")"); |
| break; |
| } |
| } |
| |
| namespace { |
| const std::string EMPTY; |
| } |
| |
| void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeued) |
| { |
| FieldTable headers; |
| headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName()); |
| headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE); |
| headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position); |
| boost::intrusive_ptr<Message> msg(createMessage(headers)); |
| DeliveryProperties* props = msg->getFrames().getHeaders()->get<DeliveryProperties>(true); |
| props->setRoutingKey(dequeued.queue->getName()); |
| route(msg); |
| } |
| |
| void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueued) |
| { |
| boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload)); |
| msg->insertCustomProperty(REPLICATION_TARGET_QUEUE, enqueued.queue->getName()); |
| msg->insertCustomProperty(REPLICATION_EVENT_TYPE, ENQUEUE); |
| msg->insertCustomProperty(QUEUE_MESSAGE_POSITION,enqueued.position); |
| route(msg); |
| } |
| |
| void ReplicatingEventListener::route(boost::intrusive_ptr<qpid::broker::Message> msg) |
| { |
| try { |
| if (exchange) { |
| DeliverableMessage deliverable(msg); |
| exchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders()); |
| } else if (queue) { |
| queue->deliver(msg); |
| } else { |
| QPID_LOG(error, "Cannot route replication event, neither replication queue nor exchange configured"); |
| } |
| } catch (const std::exception& e) { |
| QPID_LOG(error, "Error enqueing replication event: " << e.what()); |
| } |
| } |
| |
| |
| boost::intrusive_ptr<Message> ReplicatingEventListener::createMessage(const FieldTable& headers) |
| { |
| boost::intrusive_ptr<Message> msg(new Message()); |
| AMQFrame method((MessageTransferBody(ProtocolVersion(), EMPTY, 0, 0))); |
| AMQFrame header((AMQHeaderBody())); |
| header.setBof(false); |
| header.setEof(true); |
| header.setBos(true); |
| header.setEos(true); |
| msg->getFrames().append(method); |
| msg->getFrames().append(header); |
| MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); |
| props->setApplicationHeaders(headers); |
| return msg; |
| } |
| |
| struct AppendingHandler : FrameHandler |
| { |
| boost::intrusive_ptr<Message> msg; |
| |
| AppendingHandler(boost::intrusive_ptr<Message> m) : msg(m) {} |
| |
| void handle(AMQFrame& f) |
| { |
| msg->getFrames().append(f); |
| } |
| }; |
| |
| boost::intrusive_ptr<Message> ReplicatingEventListener::cloneMessage(Queue& queue, boost::intrusive_ptr<Message> original) |
| { |
| boost::intrusive_ptr<Message> copy(new Message()); |
| AMQFrame method((MessageTransferBody(ProtocolVersion(), EMPTY, 0, 0))); |
| AppendingHandler handler(copy); |
| handler.handle(method); |
| |
| //To avoid modifying original headers, create new frame with |
| //cloned body: |
| AMQFrame header(*original->getFrames().getHeaders()); |
| header.setBof(false); |
| header.setEof(!original->getFrames().getContentSize());//if there is any content then the header is not the end of the frameset |
| header.setBos(true); |
| header.setEos(true); |
| handler.handle(header); |
| |
| original->sendContent(queue, handler, std::numeric_limits<int16_t>::max()); |
| return copy; |
| } |
| |
| Options* ReplicatingEventListener::getOptions() |
| { |
| return &options; |
| } |
| |
| void ReplicatingEventListener::initialize(Plugin::Target& target) |
| { |
| Broker* broker = dynamic_cast<broker::Broker*>(&target); |
| if (broker) { |
| broker->addFinalizer(boost::bind(&ReplicatingEventListener::shutdown, this)); |
| if (!options.exchange.empty()) { |
| if (!options.queue.empty()) { |
| QPID_LOG(warning, "Replication queue option ignored as replication exchange has been specified"); |
| } |
| try { |
| exchange = broker->getExchanges().declare(options.exchange, options.exchangeType).first; |
| } catch (const UnknownExchangeTypeException&) { |
| QPID_LOG(error, "Replication disabled due to invalid type: " << options.exchangeType); |
| } |
| } else if (!options.queue.empty()) { |
| if (options.createQueue) { |
| queue = broker->getQueues().declare(options.queue).first; |
| } else { |
| queue = broker->getQueues().find(options.queue); |
| } |
| if (queue) { |
| queue->insertSequenceNumbers(REPLICATION_EVENT_SEQNO); |
| } else { |
| QPID_LOG(error, "Replication queue named '" << options.queue << "' does not exist; replication plugin disabled."); |
| } |
| } |
| if (queue || exchange) { |
| QueueEvents::EventListener callback = boost::bind(&ReplicatingEventListener::handle, this, _1); |
| broker->getQueueEvents().registerListener(options.name, callback); |
| QPID_LOG(info, "Registered replicating queue event listener"); |
| } |
| } |
| } |
| |
| void ReplicatingEventListener::earlyInitialize(Target&) {} |
| void ReplicatingEventListener::shutdown() { queue.reset(); exchange.reset(); } |
| |
| ReplicatingEventListener::PluginOptions::PluginOptions() : Options("Queue Replication Options"), |
| exchangeType("direct"), |
| name("replicator"), |
| createQueue(false) |
| { |
| addOptions() |
| ("replication-exchange-name", optValue(exchange, "EXCHANGE"), "Exchange to which events for other queues are routed") |
| ("replication-exchange-type", optValue(exchangeType, "direct|topic etc"), "Type of exchange to use") |
| ("replication-queue", optValue(queue, "QUEUE"), "Queue on which events for other queues are recorded") |
| ("replication-listener-name", optValue(name, "NAME"), "name by which to register the replicating event listener") |
| ("create-replication-queue", optValue(createQueue), "if set, the replication will be created if it does not exist"); |
| } |
| |
| static ReplicatingEventListener plugin; |
| |
| }} // namespace qpid::replication |