blob: 22b601e688847a311c5463215818c8efe60ecfd3 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <sstream>
#include <QpidError.h>
#include "LFProcessor.h"
#include "APRBase.h"
#include "APRPool.h"
#include "LFSessionContext.h"
using namespace qpid::sys;
using qpid::QpidError;
// TODO aconway 2006-10-12: stopped is read outside locks.
//
LFProcessor::LFProcessor(int _workers, int _size, int _timeout) :
size(_size),
timeout(_timeout),
signalledCount(0),
current(0),
count(0),
workerCount(_workers),
hasLeader(false),
workers(new Thread[_workers]),
stopped(false)
{
pool = APRPool::get();
CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
}
LFProcessor::~LFProcessor(){
if (!stopped) stop();
delete[] workers;
CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
APRPool::free(pool);
}
void LFProcessor::start(){
for(int i = 0; i < workerCount; i++){
workers[i] = Thread(this);
}
}
void LFProcessor::add(const apr_pollfd_t* const fd){
CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
Monitor::ScopedLock l(countLock);
sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data));
count++;
}
void LFProcessor::remove(const apr_pollfd_t* const fd){
CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
Monitor::ScopedLock l(countLock);
sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data)));
count--;
}
void LFProcessor::reactivate(const apr_pollfd_t* const fd){
CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
}
void LFProcessor::deactivate(const apr_pollfd_t* const fd){
CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
}
void LFProcessor::update(const apr_pollfd_t* const fd){
CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
}
bool LFProcessor::full(){
Mutex::ScopedLock locker(countLock);
return count == size;
}
bool LFProcessor::empty(){
Mutex::ScopedLock locker(countLock);
return count == 0;
}
void LFProcessor::poll() {
apr_status_t status = APR_EGENERAL;
do{
current = 0;
if(!stopped){
status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs);
}
}while(status != APR_SUCCESS && !stopped);
}
void LFProcessor::run(){
try{
while(!stopped){
const apr_pollfd_t* event = 0;
LFSessionContext* session = 0;
{
Monitor::ScopedLock l(leadLock);
waitToLead();
event = getNextEvent();
if(!event) return;
session = reinterpret_cast<LFSessionContext*>(
event->client_data);
session->startProcessing();
relinquishLead();
}
//process event:
if(event->rtnevents & APR_POLLIN) session->read();
if(event->rtnevents & APR_POLLOUT) session->write();
if(session->isClosed()){
session->handleClose();
Monitor::ScopedLock l(countLock);
sessions.erase(find(sessions.begin(),sessions.end(), session));
count--;
}else{
session->stopProcessing();
}
}
}catch(std::exception e){
std::cout << e.what() << std::endl;
}
}
void LFProcessor::waitToLead(){
while(hasLeader && !stopped) leadLock.wait();
hasLeader = !stopped;
}
void LFProcessor::relinquishLead(){
hasLeader = false;
leadLock.notify();
}
const apr_pollfd_t* LFProcessor::getNextEvent(){
while(true){
if(stopped){
return 0;
}else if(current < signalledCount){
//use result of previous poll if one is available
return signalledFDs + (current++);
}else{
//else poll to get new events
poll();
}
}
}
void LFProcessor::stop(){
stopped = true;
{
Monitor::ScopedLock l(leadLock);
leadLock.notifyAll();
}
for(int i = 0; i < workerCount; i++){
workers[i].join();
}
for(iterator i = sessions.begin(); i < sessions.end(); i++){
(*i)->shutdown();
}
}