blob: 742639323b7e8ed36121f068d75ef45c764c2411 [file] [log] [blame]
#ifndef _sys_ProducerConsumer_h
#define _sys_ProducerConsumer_h
/*
*
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <boost/noncopyable.hpp>
#include "Exception.h"
#include "sys/Monitor.h"
namespace qpid {
namespace sys {
/**
* Producer-consumer synchronisation.
*
* Producers increase the number of available items, consumers reduce it.
* Consumers wait till an item is available. Waiting threads can be
* woken for shutdown using stop().
*
* Note: Currently implements unbounded producer-consumer, i.e. no limit
* to available items, producers never block. Can be extended to support
* bounded PC if required.
*
// TODO aconway 2007-02-13: example, from tests.
*/
class ProducerConsumer
{
public:
ProducerConsumer(size_t init_items=0);
~ProducerConsumer() { stop(); }
/**
* Wake any threads waiting for ProducerLock or ConsumerLock.
*@post No threads are waiting in Producer or Consumer locks.
*/
void stop();
/** True if queue is stopped */
bool isStopped() { return stopped; }
/** Number of items available for consumers */
size_t available() const;
/** Number of consumers waiting for items */
size_t consumers() const;
/** True if available == 0 */
bool empty() const { return available() == 0; }
/**
* Base class for producer and consumer locks.
*/
class Lock : private boost::noncopyable {
public:
/**
* You must call isOk() after creating a lock to verify its state.
*
*@return true means the lock succeeded. You MUST call either
*confirm() or cancel() before the lock goes out of scope.
*
* false means the lock failed - timed out or the
* ProducerConsumer is stopped. You should not do anything in
* the scope of the lock.
*/
bool isOk() const;
/**
* Confirm that an item was produced/consumed.
*@pre isOk()
*/
void confirm();
/**
* Cancel the lock to indicate nothing was produced/consumed.
* Note that locks are not actually released until destroyed.
*
*@pre isOk()
*/
void cancel();
/** True if this lock experienced a timeout */
bool isTimedOut() const { return status == TIMEOUT; }
/** True if we have been stopped */
bool isStopped() const { return pc.isStopped(); }
ProducerConsumer& pc;
protected:
/** Lock status */
enum Status { INCOMPLETE, CONFIRMED, CANCELLED, TIMEOUT };
Lock(ProducerConsumer& p);
~Lock();
void checkOk() const;
Mutex::ScopedLock lock;
Status status;
};
/** Lock for code that produces items. */
struct ProducerLock : public Lock {
/**
* Acquire locks to produce an item.
*@post If isOk() the calling thread has exclusive access
* to produce an item.
*/
ProducerLock(ProducerConsumer& p);
/** Release locks, signal waiting consumers if confirm() was called. */
~ProducerLock();
};
/** Lock for code that consumes items */
struct ConsumerLock : public Lock {
/**
* Wait for an item to consume and acquire locks.
*
*@post If isOk() there is at least one item available and the
*calling thread has exclusive access to consume it.
*/
ConsumerLock(ProducerConsumer& p);
/**
* Wait up to timeout to acquire lock.
*@post If isOk() caller has a producer lock.
* If isTimedOut() there was a timeout.
* If neither then we were stopped.
*/
ConsumerLock(ProducerConsumer& p, const Time& timeout);
/** Release locks */
~ConsumerLock();
};
private:
mutable Monitor monitor;
size_t items;
size_t waiters;
bool stopped;
friend class Lock;
friend class ProducerLock;
friend class ConsumerLock;
};
}} // namespace qpid::sys
#endif /*!_sys_ProducerConsumer_h*/