blob: be091f86d84a3fff10183c8da51c9e828e6b5701 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "ProtocolFactory.h"
#include "AsynchIOHandler.h"
#include "AsynchIO.h"
#include "qpid/Plugin.h"
#include "qpid/sys/Socket.h"
#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <memory>
namespace qpid {
namespace sys {
class AsynchIOProtocolFactory : public ProtocolFactory {
const bool tcpNoDelay;
Socket listener;
const uint16_t listeningPort;
std::auto_ptr<AsynchAcceptor> acceptor;
public:
AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, int16_t port,
ConnectionCodec::Factory*,
boost::function2<void, int, std::string> failed);
uint16_t getPort() const;
std::string getHost() const;
private:
void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
bool isClient);
};
// Static instance to initialise plugin
static class TCPIOPlugin : public Plugin {
void earlyInitialize(Target&) {
}
void initialize(Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker
if (broker) {
const broker::Broker::Options& opts = broker->getOptions();
if (opts.requireEncrypted) {
QPID_LOG(info, "Not accepting unencrypted connections on TCP");
} else {
ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog,
opts.tcpNoDelay));
QPID_LOG(notice, "Listening on TCP port " << protocol->getPort());
broker->registerProtocolFactory("tcp", protocol);
}
}
}
} tcpPlugin;
AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay) :
tcpNoDelay(nodelay), listeningPort(listener.listen(port, backlog))
{}
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f, bool isClient) {
AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
if (tcpNoDelay) {
s.setTcpNoDelay(tcpNoDelay);
QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
}
if (isClient)
async->setClient();
AsynchIO* aio = AsynchIO::create
(s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
boost::bind(&AsynchIOHandler::disconnect, async, _1),
boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
async->init(aio, 4);
aio->start(poller);
}
uint16_t AsynchIOProtocolFactory::getPort() const {
return listeningPort; // Immutable no need for lock.
}
std::string AsynchIOProtocolFactory::getHost() const {
return listener.getSockname();
}
void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
acceptor.reset(
new AsynchAcceptor(listener,
boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
acceptor->start(poller);
}
void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
const std::string& host, int16_t port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
// Note that the following logic does not cause a memory leak.
// The allocated Socket is freed either by the AsynchConnector
// upon connection failure or by the AsynchIO upon connection
// shutdown. The allocated AsynchConnector frees itself when it
// is no longer needed.
Socket* socket = new Socket();
AsynchConnector::create (*socket,
poller,
host,
port,
boost::bind(&AsynchIOProtocolFactory::established,
this, poller, _1, fact, true),
failed);
}
}} // namespace qpid::sys