blob: 3f6156f230eaed90bbc9a67fec7053e058e0adb8 [file] [log] [blame]
/*
*
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "QpidError.h"
#include "ScopedIncrement.h"
#include "ProducerConsumer.h"
namespace qpid {
namespace sys {
// // ================ ProducerConsumer
ProducerConsumer::ProducerConsumer(size_t init_items)
: items(init_items), waiters(0), stopped(false)
{}
void ProducerConsumer::stop() {
Mutex::ScopedLock l(monitor);
stopped = true;
monitor.notifyAll();
// Wait for waiting consumers to wake up.
while (waiters > 0)
monitor.wait();
}
size_t ProducerConsumer::available() const {
Mutex::ScopedLock l(monitor);
return items;
}
size_t ProducerConsumer::consumers() const {
Mutex::ScopedLock l(monitor);
return waiters;
}
// ================ Lock
ProducerConsumer::Lock::Lock(ProducerConsumer& p)
: pc(p), lock(p.monitor), status(INCOMPLETE) {}
bool ProducerConsumer::Lock::isOk() const {
return !pc.isStopped() && status==INCOMPLETE;
}
void ProducerConsumer::Lock::checkOk() const {
assert(!pc.isStopped());
assert(status == INCOMPLETE);
}
ProducerConsumer::Lock::~Lock() {
assert(status != INCOMPLETE || pc.isStopped());
}
void ProducerConsumer::Lock::confirm() {
checkOk();
status = CONFIRMED;
}
void ProducerConsumer::Lock::cancel() {
checkOk();
status = CANCELLED;
}
// ================ ProducerLock
ProducerConsumer::ProducerLock::ProducerLock(ProducerConsumer& p) : Lock(p)
{}
ProducerConsumer::ProducerLock::~ProducerLock() {
if (status == CONFIRMED) {
pc.items++;
pc.monitor.notify(); // Notify a consumer.
}
}
// ================ ConsumerLock
ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p)
{
if (isOk()) {
ScopedIncrement<size_t> inc(pc.waiters);
while (pc.items == 0 && !pc.stopped) {
pc.monitor.wait();
}
}
}
ProducerConsumer::ConsumerLock::ConsumerLock(
ProducerConsumer& p, const Time& timeout) : Lock(p)
{
if (isOk()) {
// Don't wait if timeout==0
if (timeout == 0) {
if (pc.items == 0)
status = TIMEOUT;
return;
}
else {
Time deadline = now() + timeout;
ScopedIncrement<size_t> inc(pc.waiters);
while (pc.items == 0 && !pc.stopped) {
if (!pc.monitor.wait(deadline)) {
status = TIMEOUT;
return;
}
}
}
}
}
ProducerConsumer::ConsumerLock::~ConsumerLock() {
if (pc.isStopped()) {
if (pc.waiters == 0)
pc.monitor.notifyAll(); // All waiters woken, notify stop thread(s)
}
else if (status==CONFIRMED) {
pc.items--;
if (pc.items > 0)
pc.monitor.notify(); // Notify another consumer.
}
}
}} // namespace qpid::sys