blob: a40da24eb8b89abf6e8b4434e354a9f4c0843134 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/sys/ProtocolFactory.h"
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/NameGenerator.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/ssl/util.h"
#include "qpid/sys/ssl/SslSocket.h"
#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/SystemInfo.h"
#include "qpid/sys/Poller.h"
#include <boost/bind.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
namespace sys {
class Timer;
using namespace qpid::sys::ssl;
struct SslServerOptions : ssl::SslOptions
{
uint16_t port;
bool clientAuth;
bool nodict;
bool multiplex;
SslServerOptions() : port(5671),
clientAuth(false),
nodict(false),
multiplex(false)
{
addOptions()
("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
("ssl-require-client-authentication", optValue(clientAuth),
"Forces clients to authenticate in order to establish an SSL connection")
("ssl-sasl-no-dict", optValue(nodict),
"Disables SASL mechanisms that are vulnerable to passive dictionary-based password attacks");
}
};
class SslProtocolFactory : public ProtocolFactory {
boost::ptr_vector<Socket> listeners;
boost::ptr_vector<AsynchAcceptor> acceptors;
Timer& brokerTimer;
uint32_t maxNegotiateTime;
uint16_t listeningPort;
const bool tcpNoDelay;
bool nodict;
public:
SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
Timer& timer);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
ConnectFailedCallback);
uint16_t getPort() const;
private:
void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
};
// Static instance to initialise plugin
static struct SslPlugin : public Plugin {
SslServerOptions options;
bool nssInitialized;
Options* getOptions() { return &options; }
SslPlugin() : nssInitialized(false) {}
~SslPlugin() { if (nssInitialized) ssl::shutdownNSS(); }
void earlyInitialize(Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (broker && !options.certDbPath.empty()) {
const broker::Broker::Options& opts = broker->getOptions();
if (opts.port == options.port && // AMQP & AMQPS ports are the same
opts.port != 0) {
// The presence of this option is used to signal to the TCP
// plugin not to start listening on the shared port. The actual
// value cannot be configured through the command line or config
// file (other than by setting the ports to the same value)
// because we are only adding it after option parsing.
options.multiplex = true;
options.addOptions()("ssl-multiplex", optValue(options.multiplex), "Allow SSL and non-SSL connections on the same port");
}
}
}
void initialize(Target& target) {
QPID_LOG(trace, "Initialising SSL plugin");
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker
if (broker) {
if (options.certDbPath.empty()) {
QPID_LOG(notice, "SSL plugin not enabled, you must set --ssl-cert-db to enable it.");
} else {
try {
ssl::initNSS(options, true);
nssInitialized = true;
const broker::Broker::Options& opts = broker->getOptions();
ProtocolFactory::shared_ptr protocol(
static_cast<ProtocolFactory*>(new SslProtocolFactory(opts, options, broker->getTimer())));
if (protocol->getPort()!=0 ) {
QPID_LOG(notice, "Listening for " <<
(options.multiplex ? "SSL or TCP" : "SSL") <<
" connections on TCP/TCP6 port " <<
protocol->getPort());
}
broker->registerProtocolFactory("ssl", protocol);
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what());
}
}
}
}
} sslPlugin;
namespace {
// Expand list of Interfaces and addresses to a list of addresses
std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
std::vector<std::string> addresses;
// If there are no specific interfaces listed use a single "" to listen on every interface
if (interfaces.empty()) {
addresses.push_back("");
return addresses;
}
for (unsigned i = 0; i < interfaces.size(); ++i) {
const std::string& interface = interfaces[i];
if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
// We don't have an interface of that name -
// Check for IPv6 ('[' ']') brackets and remove them
// then pass to be looked up directly
if (interface[0]=='[' && interface[interface.size()-1]==']') {
addresses.push_back(interface.substr(1, interface.size()-2));
} else {
addresses.push_back(interface);
}
}
}
return addresses;
}
}
SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
Timer& timer) :
brokerTimer(timer),
maxNegotiateTime(opts.maxNegotiateTime),
tcpNoDelay(opts.tcpNoDelay),
nodict(options.nodict)
{
std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
if (addresses.empty()) {
// We specified some interfaces, but couldn't find addresses for them
QPID_LOG(warning, "SSL: No specified network interfaces found: Not Listening");
listeningPort = 0;
}
for (unsigned i = 0; i<addresses.size(); ++i) {
QPID_LOG(debug, "Using interface: " << addresses[i]);
SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port));
// We must have at least one resolved address
QPID_LOG(info, "Listening to: " << sa.asString())
Socket* s = options.multiplex ?
new SslMuxSocket(options.certName, options.clientAuth) :
new SslSocket(options.certName, options.clientAuth);
uint16_t lport = s->listen(sa, opts.connectionBacklog);
QPID_LOG(debug, "Listened to: " << lport);
listeners.push_back(s);
listeningPort = lport;
// Try any other resolved addresses
while (sa.nextAddress()) {
// Hack to ensure that all listening connections are on the same port
sa.setAddrInfoPort(listeningPort);
QPID_LOG(info, "Listening to: " << sa.asString())
Socket* s = options.multiplex ?
new SslMuxSocket(options.certName, options.clientAuth) :
new SslSocket(options.certName, options.clientAuth);
uint16_t lport = s->listen(sa, opts.connectionBacklog);
QPID_LOG(debug, "Listened to: " << lport);
listeners.push_back(s);
}
}
}
void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f) {
AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
establishedCommon(async, poller, s);
}
void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f, const std::string& name) {
AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
establishedCommon(async, poller, s);
}
void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
if (tcpNoDelay) {
s.setTcpNoDelay();
QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
}
AsynchIO* aio = AsynchIO::create(
s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
boost::bind(&AsynchIOHandler::disconnect, async, _1),
boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
async->init(aio, brokerTimer, maxNegotiateTime);
aio->start(poller);
}
uint16_t SslProtocolFactory::getPort() const {
return listeningPort; // Immutable no need for lock.
}
void SslProtocolFactory::accept(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
for (unsigned i = 0; i<listeners.size(); ++i) {
acceptors.push_back(
AsynchAcceptor::create(listeners[i],
boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
acceptors[i].start(poller);
}
}
void SslProtocolFactory::connectFailed(
const Socket& s, int ec, const std::string& emsg,
ConnectFailedCallback failedCb)
{
failedCb(ec, emsg);
s.close();
delete &s;
}
void SslProtocolFactory::connect(
Poller::shared_ptr poller,
const std::string& name,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
// Note that the following logic does not cause a memory leak.
// The allocated Socket is freed either by the SslConnector
// upon connection failure or by the SslIoHandle upon connection
// shutdown. The allocated SslConnector frees itself when it
// is no longer needed.
Socket* socket = new qpid::sys::ssl::SslSocket();
try {
AsynchConnector* c = AsynchConnector::create(
*socket,
host,
port,
boost::bind(&SslProtocolFactory::establishedOutgoing,
this, poller, _1, fact, name),
boost::bind(&SslProtocolFactory::connectFailed,
this, _1, _2, _3, failed));
c->start(poller);
} catch (std::exception&) {
// TODO: Design question - should we do the error callback and also throw?
int errCode = socket->getError();
connectFailed(*socket, errCode, strError(errCode), failed);
throw;
}
}
}} // namespace qpid::sys