blob: 548fbd18811c846b7ec64e3ce926732cd33fd13e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <iostream>
#include <boost/assert.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/ptr_container/ptr_deque.hpp>
#include <boost/bind.hpp>
#include <boost/scoped_ptr.hpp>
#include <sys/ConnectionOutputHandler.h>
#include <sys/ConnectionInputHandler.h>
#include <sys/ConnectionInputHandlerFactory.h>
#include <sys/Acceptor.h>
#include <sys/Socket.h>
#include <framing/Buffer.h>
#include <framing/AMQFrame.h>
#include <Exception.h>
#include "EventChannelConnection.h"
namespace qpid {
namespace sys {
using namespace qpid::framing;
using namespace std;
class EventChannelAcceptor : public Acceptor {
public:
EventChannelAcceptor(
int16_t port_, int backlog, int nThreads, bool trace_
);
int getPort() const;
void run(ConnectionInputHandlerFactory& factory);
void shutdown();
private:
void accept();
Mutex lock;
Socket listener;
const int port;
const bool isTrace;
bool isRunning;
boost::ptr_vector<EventChannelConnection> connections;
AcceptEvent acceptEvent;
ConnectionInputHandlerFactory* factory;
bool isShutdown;
EventChannelThreads::shared_ptr threads;
};
Acceptor::shared_ptr Acceptor::create(
int16_t port, int backlog, int threads, bool trace)
{
return Acceptor::shared_ptr(
new EventChannelAcceptor(port, backlog, threads, trace));
}
// Must define Acceptor virtual dtor.
Acceptor::~Acceptor() {}
EventChannelAcceptor::EventChannelAcceptor(
int16_t port_, int backlog, int nThreads, bool trace_
) : listener(Socket::createTcp()),
port(listener.listen(int(port_), backlog)),
isTrace(trace_),
isRunning(false),
acceptEvent(listener.fd(),
boost::bind(&EventChannelAcceptor::accept, this)),
factory(0),
isShutdown(false),
threads(EventChannelThreads::create(EventChannel::create(), nThreads))
{ }
int EventChannelAcceptor::getPort() const {
return port; // Immutable no need for lock.
}
void EventChannelAcceptor::run(ConnectionInputHandlerFactory& f) {
{
Mutex::ScopedLock l(lock);
if (!isRunning && !isShutdown) {
isRunning = true;
factory = &f;
threads->post(acceptEvent);
}
}
threads->join(); // Wait for shutdown.
}
void EventChannelAcceptor::shutdown() {
bool doShutdown = false;
{
Mutex::ScopedLock l(lock);
doShutdown = !isShutdown; // I'm the shutdown thread.
isShutdown = true;
}
if (doShutdown) {
::close(acceptEvent.getDescriptor());
threads->shutdown();
for_each(connections.begin(), connections.end(),
boost::bind(&EventChannelConnection::close, _1));
}
threads->join();
}
void EventChannelAcceptor::accept()
{
// No lock, we only post one accept event at a time.
if (isShutdown)
return;
if (acceptEvent.getException()) {
Exception::log(*acceptEvent.getException(),
"EventChannelAcceptor::accept");
shutdown();
return;
}
// TODO aconway 2006-11-29: Need to reap closed connections also.
int fd = acceptEvent.getAcceptedDesscriptor();
connections.push_back(
new EventChannelConnection(threads, *factory, fd, fd, isTrace));
threads->post(acceptEvent); // Keep accepting.
}
}} // namespace qpid::sys