blob: c60632025cb5f542b7ae04e22bdddac3906b2d31 [file] [log] [blame]
#ifndef _broker_Link_h
#define _broker_Link_h
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include "qpid/Url.h"
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/PersistableConfig.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/sys/Mutex.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/Link.h"
#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
namespace sys {
class TimerTask;
}
namespace broker {
class LinkRegistry;
class Broker;
class LinkExchange;
namespace amqp_0_10 {
class Connection;
}
class Link : public PersistableConfig, public management::Manageable,
public boost::enable_shared_from_this<Link>
{
private:
mutable sys::Mutex lock;
const std::string name;
LinkRegistry* links;
// these remain constant across failover - used to identify this link
const std::string configuredTransport;
const std::string configuredHost;
const uint16_t configuredPort;
// these reflect the current address of remote - will change during failover
std::string host;
uint16_t port;
std::string transport;
bool durable;
std::string authMechanism;
std::string username;
std::string password;
mutable uint64_t persistenceId;
qmf::org::apache::qpid::broker::Link::shared_ptr mgmtObject;
Broker* broker;
int state;
uint32_t visitCount;
uint32_t currentInterval;
Url url; // URL can contain many addresses.
size_t reconnectNext; // Index for next re-connect attempt
typedef std::vector<Bridge::shared_ptr> Bridges;
Bridges created; // Bridges pending creation
Bridges active; // Bridges active
Bridges cancellations; // Bridges pending cancellation
framing::ChannelId nextFreeChannel;
RangeSet<framing::ChannelId> freeChannels;
amqp_0_10::Connection* connection;
management::ManagementAgent* agent;
boost::function<void(Link*)> listener;
boost::intrusive_ptr<sys::TimerTask> timerTask;
boost::shared_ptr<broker::LinkExchange> failoverExchange; // subscribed to remote's amq.failover exchange
bool failover; // Do we subscribe to a failover exchange?
uint failoverChannel;
std::string failoverSession;
static const int STATE_WAITING = 1;
static const int STATE_CONNECTING = 2;
static const int STATE_OPERATIONAL = 3;
static const int STATE_FAILED = 4;
static const int STATE_CLOSED = 5;
static const int STATE_CLOSING = 6; // Waiting for outstanding connect to complete first
static const uint32_t MAX_INTERVAL = 32;
void setStateLH (int newState);
void startConnectionLH(); // Start the IO Connection
void destroy(); // Cleanup connection before link goes away
void ioThreadProcessing(); // Called on connection's IO thread by request
bool tryFailoverLH(); // Called during maintenance visit
void reconnectLH(const Address&); //called by LinkRegistry
// connection management (called by LinkRegistry)
void established(amqp_0_10::Connection*); // Called when connection is created
void opened(); // Called when connection is open (after create)
void closed(int, std::string); // Called when connection goes away
void notifyConnectionForced(const std::string text);
void closeConnection(const std::string& reason);
friend class LinkRegistry; // to call established, opened, closed
public:
typedef boost::shared_ptr<Link> shared_ptr;
typedef boost::function<void(Link*)> DestroyedListener;
Link(const std::string& name,
LinkRegistry* links,
const std::string& host,
uint16_t port,
const std::string& transport,
DestroyedListener l,
bool durable,
const std::string& authMechanism,
const std::string& username,
const std::string& password,
Broker* broker,
management::Manageable* parent = 0,
bool failover=true);
virtual ~Link();
/** these return the *configured* transport/host/port, which does not change over the
lifetime of the Link */
std::string getHost() const { return configuredHost; }
uint16_t getPort() const { return configuredPort; }
std::string getTransport() const { return configuredTransport; }
/** returns the current address of the remote, which may be different from the
configured transport/host/port due to failover. Returns true if connection is
active */
QPID_BROKER_EXTERN bool getRemoteAddress(qpid::Address& addr) const;
bool isDurable() { return durable; }
void maintenanceVisit ();
QPID_BROKER_EXTERN framing::ChannelId nextChannel(); // allocate channel from link free pool
QPID_BROKER_EXTERN void returnChannel(framing::ChannelId); // return channel to link free pool
void add(Bridge::shared_ptr);
void cancel(Bridge::shared_ptr);
QPID_BROKER_EXTERN void setUrl(const Url&); // Set URL for reconnection.
// Close the link.
QPID_BROKER_EXTERN void close();
std::string getAuthMechanism() { return authMechanism; }
std::string getUsername() { return username; }
std::string getPassword() { return password; }
Broker* getBroker() { return broker; }
bool isConnecting() const { return state == STATE_CONNECTING; }
// PersistableConfig:
void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
uint32_t encodedSize() const;
void encode(framing::Buffer& buffer) const;
const std::string& getName() const;
static const std::string ENCODED_IDENTIFIER;
static const std::string ENCODED_IDENTIFIER_V1;
static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
static bool isEncodedLink(const std::string& key);
// Manageable entry points
management::ManagementObject::shared_ptr GetManagementObject(void) const;
management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
// manage the exchange owned by this link
static const std::string exchangeTypeName;
static boost::shared_ptr<Exchange> linkExchangeFactory(const std::string& name);
/** create a name for a link (if none supplied by user config) */
static std::string createName(const std::string& transport,
const std::string& host,
uint16_t port);
/** The current connction for this link. Note returns 0 if the link is not
* presently connected.
*/
amqp_0_10::Connection* getConnection() { return connection; }
};
}
}
#endif /*!_broker_Link.cpp_h*/