blob: d6cd3e20e7189fb1e0162bd7de4df946da584a5a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/FedOps.h"
#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/SessionState.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/types/Variant.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/framing/Uuid.h"
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include <iostream>
using qpid::framing::FieldTable;
using qpid::framing::Uuid;
using qpid::framing::Buffer;
using qpid::framing::AMQFrame;
using qpid::framing::AMQContentBody;
using qpid::framing::AMQHeaderBody;
using qpid::framing::MessageProperties;
using qpid::framing::MessageTransferBody;
using qpid::types::Variant;
using qpid::management::ManagementAgent;
using std::string;
namespace _qmf = qmf::org::apache::qpid::broker;
namespace {
const std::string QPID_REPLICATE("qpid.replicate");
const std::string NONE("none");
const uint8_t EXPLICIT_ACK(0); // msg.accept required to be sent
const uint8_t IMPLIED_ACK(1); // msg.accept assumed, not sent
}
namespace qpid {
namespace broker {
void Bridge::PushHandler::handle(framing::AMQFrame& frame)
{
conn->received(frame);
}
Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
CancellationListener l, const _qmf::ArgsLinkBridge& _args,
InitializeCallback init, const std::string& _queueName, const string& ae) :
link(_link), channel(_id), args(_args),
listener(l), name(_name),
queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag()
: _queueName),
altEx(ae), persistenceId(0),
conn(0), initialize(init), detached(false),
useExistingQueue(!_queueName.empty()),
sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag())
{
// If both acks (i_sync) and limited credit is configured, then we'd
// better be able to sync before running out of credit or we
// may stall (note: i_credit==0 means "unlimited")
if (args.i_credit && args.i_sync && args.i_sync > args.i_credit)
throw Exception("The credit value must be greater than configured sync (ack) interval.");
ManagementAgent* agent = link->getBroker()->getManagementAgent();
if (agent != 0) {
mgmtObject = _qmf::Bridge::shared_ptr(new _qmf::Bridge
(agent, this, link, name, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync,
args.i_credit));
mgmtObject->set_channelId(channel);
agent->addObject(mgmtObject);
}
QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest);
}
Bridge::~Bridge()
{
mgmtObject->resourceDestroy();
}
void Bridge::create(amqp_0_10::Connection& c)
{
detached = false; // Reset detached in case we are recovering.
conn = &c;
SessionHandler& sessionHandler = c.getChannel(channel);
sessionHandler.setErrorListener(shared_from_this());
if (args.i_srcIsLocal) {
if (args.i_dynamic)
throw Exception("Dynamic routing not supported for push routes");
// Point the bridging commands at the local connection handler
pushHandler.reset(new PushHandler(&c));
channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get()));
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
session->attach(sessionName, false);
session->commandPoint(0,0);
} else {
sessionHandler.attachAs(sessionName);
// Point the bridging commands at the remote peer broker
peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
}
if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
if (initialize) {
initialize(*this, sessionHandler); // custom subscription initializer supplied
} else {
// will a temp queue be created for this bridge?
const bool temp_queue = !args.i_srcIsQueue && !useExistingQueue;
// UI convention: user specifies 0 for infinite credit
const uint32_t credit = (args.i_credit == 0) ? LinkRegistry::INFINITE_CREDIT : args.i_credit;
// use explicit acks only for non-temp queues, useless for temp queues since they are
// destroyed when the session drops (can't resend unacked msgs)
const uint8_t ack_mode = (args.i_sync && !temp_queue) ? EXPLICIT_ACK : IMPLIED_ACK;
// configure command.sync frequency
FieldTable options;
uint32_t freq = 0;
if (ack_mode == EXPLICIT_ACK) { // user explicitly configured syncs
freq = uint32_t(args.i_sync);
} else if (credit && credit != LinkRegistry::INFINITE_CREDIT) {
// force occasional sync to keep from stalling due to lack of credit
freq = (credit + 1)/2;
}
if (freq)
options.setInt("qpid.sync_frequency", freq);
// create a subscription on the remote
if (args.i_srcIsQueue) {
peer->getMessage().subscribe(args.i_src, args.i_dest, ack_mode, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, credit); // message credit
peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT); // byte credit
QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest);
} else {
if (!useExistingQueue) {
FieldTable queueSettings;
if (args.i_tag.size()) {
queueSettings.setString("qpid.trace.id", args.i_tag);
} else {
const string& peerTag = c.getFederationPeerTag();
if (peerTag.size())
queueSettings.setString("qpid.trace.id", peerTag);
}
if (args.i_excludes.size()) {
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
} else {
const string& localTag = link->getBroker()->getFederationTag();
if (localTag.size())
queueSettings.setString("qpid.trace.exclude", localTag);
}
bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
bool exclusive = true; // only exclusive if the queue is owned by the bridge
bool autoDelete = exclusive && !durable;//auto delete transient queues?
peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
}
if (!args.i_dynamic)
peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
peer->getMessage().subscribe(queueName, args.i_dest, ack_mode, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, credit);
peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT);
if (args.i_dynamic) {
Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
if (exchange.get() == 0)
throw Exception("Exchange not found for dynamic route");
exchange->registerDynamicBridge(this);
QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src);
} else {
QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest);
}
}
}
if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking();
}
void Bridge::cancel(amqp_0_10::Connection& c)
{
// If &c != conn then we have failed over so the old connection is closed.
if (&c == conn && resetProxy()) {
peer->getMessage().cancel(args.i_dest);
peer->getSession().detach(sessionName);
}
QPID_LOG(debug, "Cancelled bridge " << name);
}
/** Notify the bridge that the connection has closed */
void Bridge::closed()
{
if (args.i_dynamic) {
Exchange::shared_ptr exchange = link->getBroker()->getExchanges().find(args.i_src);
if (exchange.get()) exchange->removeDynamicBridge(this);
}
QPID_LOG(debug, "Closed bridge " << name);
}
/** Shut down the bridge */
void Bridge::close()
{
listener(this); // ask the LinkRegistry to destroy us
}
void Bridge::setPersistenceId(uint64_t pId) const
{
persistenceId = pId;
}
const std::string Bridge::ENCODED_IDENTIFIER("bridge.v2");
const std::string Bridge::ENCODED_IDENTIFIER_V1("bridge");
bool Bridge::isEncodedBridge(const std::string& key)
{
return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1;
}
Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
{
string kind;
buffer.getShortString(kind);
string host;
uint16_t port;
string src;
string dest;
string key;
string id;
string excludes;
string name;
Link::shared_ptr link;
if (kind == ENCODED_IDENTIFIER_V1) {
/** previous versions identified the bridge by host:port, not by name, and
* transport wasn't provided. Try to find a link using those paramters.
*/
buffer.getShortString(host);
port = buffer.getShort();
link = links.getLink(host, port);
if (!link) {
QPID_LOG(error, "Bridge::decode() failed: cannot find Link for host=" << host << ", port=" << port);
return Bridge::shared_ptr();
}
} else {
string linkName;
buffer.getShortString(name);
buffer.getShortString(linkName);
link = links.getLink(linkName);
if (!link) {
QPID_LOG(error, "Bridge::decode() failed: cannot find Link named='" << linkName << "'");
return Bridge::shared_ptr();
}
}
bool durable(buffer.getOctet());
buffer.getShortString(src);
buffer.getShortString(dest);
buffer.getShortString(key);
bool is_queue(buffer.getOctet());
bool is_local(buffer.getOctet());
buffer.getShortString(id);
buffer.getShortString(excludes);
bool dynamic(buffer.getOctet());
uint16_t sync = buffer.getShort();
uint32_t credit = buffer.getLong();
if (kind == ENCODED_IDENTIFIER_V1) {
/** previous versions did not provide a name for the bridge, so create one
*/
name = createName(link->getName(), src, dest, key);
}
return links.declare(name, *link, durable, src, dest, key, is_queue,
is_local, id, excludes, dynamic, sync, credit).first;
}
void Bridge::encode(Buffer& buffer) const
{
buffer.putShortString(ENCODED_IDENTIFIER);
buffer.putShortString(name);
buffer.putShortString(link->getName());
buffer.putOctet(args.i_durable ? 1 : 0);
buffer.putShortString(args.i_src);
buffer.putShortString(args.i_dest);
buffer.putShortString(args.i_key);
buffer.putOctet(args.i_srcIsQueue ? 1 : 0);
buffer.putOctet(args.i_srcIsLocal ? 1 : 0);
buffer.putShortString(args.i_tag);
buffer.putShortString(args.i_excludes);
buffer.putOctet(args.i_dynamic ? 1 : 0);
buffer.putShort(args.i_sync);
buffer.putLong(args.i_credit);
}
uint32_t Bridge::encodedSize() const
{
return ENCODED_IDENTIFIER.size() + 1 // +1 byte length
+ name.size() + 1
+ link->getName().size() + 1
+ 1 // durable
+ args.i_src.size() + 1
+ args.i_dest.size() + 1
+ args.i_key.size() + 1
+ 1 // srcIsQueue
+ 1 // srcIsLocal
+ args.i_tag.size() + 1
+ args.i_excludes.size() + 1
+ 1 // dynamic
+ 2 // sync
+ 4; // credit
}
management::ManagementObject::shared_ptr Bridge::GetManagementObject(void) const
{
return mgmtObject;
}
management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId,
management::Args& /*args*/,
string&)
{
if (methodId == _qmf::Bridge::METHOD_CLOSE) {
//notify that we are closed
QPID_LOG(debug, "Bridge::close() method called on bridge '" << name << "'");
close();
return management::Manageable::STATUS_OK;
} else {
return management::Manageable::STATUS_UNKNOWN_METHOD;
}
}
void Bridge::propagateBinding(const string& key, const string& tagList,
const string& op, const string& origin,
qpid::framing::FieldTable* extra_args)
{
const string& localTag = link->getBroker()->getFederationTag();
const string& peerTag = conn->getFederationPeerTag();
if (tagList.find(peerTag) == tagList.npos) {
FieldTable bindArgs;
if (extra_args) {
for (qpid::framing::FieldTable::ValueMap::iterator i=extra_args->begin(); i != extra_args->end(); ++i) {
bindArgs.insert((*i));
}
}
string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag);
bindArgs.setString(QPID_REPLICATE, NONE);
bindArgs.setString(qpidFedOp, op);
bindArgs.setString(qpidFedTags, newTagList);
if (origin.empty())
bindArgs.setString(qpidFedOrigin, localTag);
else
bindArgs.setString(qpidFedOrigin, origin);
conn->requestIOProcessing(
weakCallback<Bridge>(
boost::bind(&Bridge::ioThreadPropagateBinding, _1,
queueName, args.i_src, key, bindArgs),
this));
}
}
void Bridge::sendReorigin()
{
FieldTable bindArgs;
bindArgs.setString(qpidFedOp, fedOpReorigin);
bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag());
conn->requestIOProcessing(
weakCallback<Bridge>(
boost::bind(&Bridge::ioThreadPropagateBinding, _1,
queueName, args.i_src, args.i_key, bindArgs),
this));
}
bool Bridge::resetProxy()
{
SessionHandler& sessionHandler = conn->getChannel(channel);
if (!sessionHandler.getSession()) peer.reset();
else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
return peer.get();
}
void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchange, const string& key, FieldTable args)
{
if (resetProxy()) {
peer->getExchange().bind(queue, exchange, key, args);
} else {
// link's periodic maintenance visit will attempt to recover
}
}
bool Bridge::containsLocalTag(const string& tagList) const
{
const string& localTag = link->getBroker()->getFederationTag();
return (tagList.find(localTag) != tagList.npos);
}
const string& Bridge::getLocalTag() const
{
return link->getBroker()->getFederationTag();
}
// SessionHandler::ErrorListener methods.
void Bridge::connectionException(
framing::connection::CloseCode code, const std::string& msg)
{
if (errorListener) errorListener->connectionException(code, msg);
}
void Bridge::channelException(
framing::session::DetachCode code, const std::string& msg)
{
if (errorListener) errorListener->channelException(code, msg);
}
void Bridge::executionException(
framing::execution::ErrorCode code, const std::string& msg)
{
if (errorListener) errorListener->executionException(code, msg);
}
void Bridge::incomingExecutionException(
framing::execution::ErrorCode code, const std::string& msg)
{
if (errorListener) errorListener->incomingExecutionException(code, msg);
}
void Bridge::detach() {
detached = true;
if (errorListener) errorListener->detach();
}
std::string Bridge::createName(const std::string& linkName,
const std::string& src,
const std::string& dest,
const std::string& key)
{
std::stringstream keystream;
keystream << linkName << "!" << src << "!" << dest << "!" << key;
return keystream.str();
}
}}