blob: 12f3c21e0a11746edca3222aac8d897e4a920981 [file] [log] [blame]
#ifndef QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H
#define QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <deque>
#include <map>
#include <memory>
#include <string>
#include <boost/shared_ptr.hpp>
#include "qpid/Url.h"
#include "qpid/messaging/ConnectionOptions.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/ConnectionCodec.h"
#include "qpid/sys/Monitor.h"
#include "qpid/types/Variant.h"
#include "qpid/messaging/amqp/TransportContext.h"
struct pn_connection_t;
struct pn_link_t;
struct pn_session_t;
struct pn_transport_t;
namespace qpid {
namespace framing {
class ProtocolVersion;
}
namespace sys {
class SecurityLayer;
struct SecuritySettings;
}
namespace messaging {
class Duration;
class Message;
namespace amqp {
class DriverImpl;
class ReceiverContext;
class Sasl;
class SessionContext;
class SenderContext;
class Transport;
/**
*
*/
class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messaging::ConnectionOptions, public TransportContext
{
public:
ConnectionContext(const std::string& url, const qpid::types::Variant::Map& options);
~ConnectionContext();
void open();
bool isOpen() const;
void close();
boost::shared_ptr<SessionContext> newSession(bool transactional, const std::string& name);
boost::shared_ptr<SessionContext> getSession(const std::string& name) const;
void endSession(boost::shared_ptr<SessionContext>);
void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void drain_and_release_messages(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
bool isClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync);
bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative);
void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject);
void sync(boost::shared_ptr<SessionContext> ssn);
boost::shared_ptr<ReceiverContext> nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout);
void setOption(const std::string& name, const qpid::types::Variant& value);
std::string getAuthenticatedUsername();
void setCapacity(boost::shared_ptr<SenderContext>, uint32_t);
uint32_t getCapacity(boost::shared_ptr<SenderContext>);
uint32_t getUnsettled(boost::shared_ptr<SenderContext>);
void setCapacity(boost::shared_ptr<ReceiverContext>, uint32_t);
uint32_t getCapacity(boost::shared_ptr<ReceiverContext>);
uint32_t getAvailable(boost::shared_ptr<ReceiverContext>);
uint32_t getUnsettled(boost::shared_ptr<ReceiverContext>);
void activateOutput();
qpid::sys::Codec& getCodec();
const qpid::messaging::ConnectionOptions* getOptions();
//ConnectionCodec interface:
std::size_t decode(const char* buffer, std::size_t size);
std::size_t encode(char* buffer, std::size_t size);
bool canEncode();
void closed();
bool isClosed() const;
framing::ProtocolVersion getVersion() const;
//additionally, Transport needs:
void opened();//signal successful connection
void reconnect(const std::string& url);
void reconnect();
std::string getUrl() const;
const qpid::sys::SecuritySettings* getTransportSecuritySettings();
void initSecurityLayer(qpid::sys::SecurityLayer&);
void trace(const char*) const;
private:
typedef std::map<std::string, boost::shared_ptr<SessionContext> > SessionMap;
class CodecAdapter : public qpid::sys::Codec
{
public:
CodecAdapter(ConnectionContext&);
std::size_t decode(const char* buffer, std::size_t size);
std::size_t encode(char* buffer, std::size_t size);
bool canEncode();
private:
ConnectionContext& context;
};
Url fullUrl; // Combined URL of all known addresses.
Url currentUrl; // URL of currently connected address.
boost::shared_ptr<DriverImpl> driver;
boost::shared_ptr<Transport> transport;
pn_transport_t* engine;
pn_connection_t* connection;
SessionMap sessions;
mutable qpid::sys::Monitor lock;
bool writeHeader;
bool readHeader;
bool haveOutput;
std::string id;
enum {
DISCONNECTED,
CONNECTING,
CONNECTED
} state;
std::auto_ptr<Sasl> sasl;
CodecAdapter codecAdapter;
bool notifyOnWrite;
void check();
bool checkDisconnected();
void waitNoReconnect();
void wait();
void waitUntil(qpid::sys::AbsTime until);
void wait(boost::shared_ptr<SessionContext>);
void waitUntil(boost::shared_ptr<SessionContext>, qpid::sys::AbsTime until);
void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void waitUntil(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>, qpid::sys::AbsTime until);
void waitUntil(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>, qpid::sys::AbsTime until);
void checkClosed(boost::shared_ptr<SessionContext>);
void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*);
void wakeupDriver();
void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0);
void autoconnect();
bool tryConnectUrl(const qpid::Url& url);
bool tryOpenAddr(const qpid::Address& address);
bool tryConnectAddr(const qpid::Address& address);
void reconnect(const Url& url);
void reset();
bool restartSessions();
void restartSession(boost::shared_ptr<SessionContext>);
std::size_t decodePlain(const char* buffer, std::size_t size);
std::size_t encodePlain(char* buffer, std::size_t size);
bool canEncodePlain();
std::size_t readProtocolHeader(const char* buffer, std::size_t size);
std::size_t writeProtocolHeader(char* buffer, std::size_t size);
std::string getError();
bool useSasl();
void setProperties();
};
}}} // namespace qpid::messaging::amqp
#endif /*!QPID_MESSAGING_AMQP_CONNECTIONCONTEXT_H*/