| #ifndef _AsyncCompletion_ |
| #define _AsyncCompletion_ |
| |
| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include <boost/intrusive_ptr.hpp> |
| |
| #include "qpid/broker/BrokerImportExport.h" |
| #include "qpid/sys/AtomicValue.h" |
| #include "qpid/sys/Mutex.h" |
| #include "qpid/sys/Monitor.h" |
| |
| namespace qpid { |
| namespace broker { |
| |
| /** |
| * Class to implement asynchronous notification of completion. |
| * |
| * Use-case: An "initiator" needs to wait for a set of "completers" to |
| * finish a unit of work before an action can occur. This object |
| * tracks the progress of the set of completers, and allows the action |
| * to occur once all completers have signalled that they are done. |
| * |
| * The initiator and completers may be running in separate threads. |
| * |
| * The initiating thread is the thread that initiates the action, |
| * i.e. the connection read thread. |
| * |
| * A completing thread is any thread that contributes to completion, |
| * e.g. a store thread that does an async write. |
| * There may be zero or more completers. |
| * |
| * When the work is complete, a callback is invoked. The callback |
| * may be invoked in the Initiator thread, or one of the Completer |
| * threads. The callback is passed a flag indicating whether or not |
| * the callback is running under the context of the Initiator thread. |
| * |
| * Use model: |
| * 1) Initiator thread invokes begin() |
| * 2) After begin() has been invoked, zero or more Completers invoke |
| * startCompleter(). Completers may be running in the same or |
| * different thread as the Initiator, as long as they guarantee that |
| * startCompleter() is invoked at least once before the Initiator invokes end(). |
| * 3) Completers may invoke finishCompleter() at any time, even after the |
| * initiator has invoked end(). finishCompleter() may be called from any |
| * thread. |
| * 4) startCompleter()/finishCompleter() calls "nest": for each call to |
| * startCompleter(), a corresponding call to finishCompleter() must be made. |
| * Once the last finishCompleter() is called, the Completer must no longer |
| * reference the completion object. |
| * 5) The Initiator invokes end() at the point where it has finished |
| * dispatching work to the Completers, and is prepared for the callback |
| * handler to be invoked. Note: if there are no outstanding Completers |
| * pending when the Initiator invokes end(), the callback will be invoked |
| * directly, and the sync parameter will be set true. This indicates to the |
| * Initiator that the callback is executing in the context of the end() call, |
| * and the Initiator is free to optimize the handling of the completion, |
| * assuming no need for synchronization with Completer threads. |
| */ |
| |
| class AsyncCompletion |
| { |
| public: |
| |
| /** Supplied by the Initiator to the end() method, allows for a callback |
| * when all outstanding completers are done. If the callback cannot be |
| * made during the end() call, the clone() method must supply a copy of |
| * this callback object that persists after end() returns. The cloned |
| * callback object will be used by the last completer thread, and |
| * released when the callback returns. |
| */ |
| class Callback : public RefCounted |
| { |
| public: |
| virtual void completed(bool) = 0; |
| virtual boost::intrusive_ptr<Callback> clone() = 0; |
| }; |
| |
| private: |
| mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded; |
| mutable qpid::sys::Monitor callbackLock; |
| bool inCallback, active; |
| |
| void invokeCallback(bool sync) { |
| qpid::sys::Mutex::ScopedLock l(callbackLock); |
| if (active) { |
| if (callback.get()) { |
| inCallback = true; |
| { |
| qpid::sys::Mutex::ScopedUnlock ul(callbackLock); |
| callback->completed(sync); |
| } |
| inCallback = false; |
| callback = boost::intrusive_ptr<Callback>(); |
| callbackLock.notifyAll(); |
| } |
| active = false; |
| } |
| } |
| |
| protected: |
| /** Invoked when all completers have signalled that they have completed |
| * (via calls to finishCompleter()). bool == true if called via end() |
| */ |
| boost::intrusive_ptr<Callback> callback; |
| |
| public: |
| AsyncCompletion() : completionsNeeded(0), inCallback(false), active(true) {}; |
| virtual ~AsyncCompletion() { cancel(); } |
| |
| |
| /** True when all outstanding operations have compeleted |
| */ |
| bool isDone() |
| { |
| return !active; |
| } |
| |
| /** Called to signal the start of an asynchronous operation. The operation |
| * is considered pending until finishCompleter() is called. |
| * E.g. called when initiating an async store operation. |
| */ |
| void startCompleter() { ++completionsNeeded; } |
| |
| /** Called by completer to signal that it has finished the operation started |
| * when startCompleter() was invoked. |
| * e.g. called when async write complete. |
| */ |
| void finishCompleter() |
| { |
| if (--completionsNeeded == 0) { |
| invokeCallback(false); |
| } |
| } |
| |
| /** called by initiator before any calls to startCompleter can be done. |
| */ |
| void begin() |
| { |
| ++completionsNeeded; |
| } |
| |
| /** called by initiator after all potential completers have called |
| * startCompleter(). |
| */ |
| void end(Callback& cb) |
| { |
| assert(completionsNeeded.get() > 0); // ensure begin() has been called! |
| // the following only "decrements" the count if it is 1. This means |
| // there are no more outstanding completers and we are done. |
| if (completionsNeeded.boolCompareAndSwap(1, 0)) { |
| // done! Complete immediately |
| cb.completed(true); |
| return; |
| } |
| |
| // the compare-and-swap did not succeed. This means there are |
| // outstanding completers pending (count > 1). Get a persistent |
| // Callback object to use when the last completer is done. |
| // Decrement after setting up the callback ensures that pending |
| // completers cannot touch the callback until it is ready. |
| callback = cb.clone(); |
| if (--completionsNeeded == 0) { |
| // note that a completer may have completed during the |
| // callback setup or decrement: |
| invokeCallback(true); |
| } |
| } |
| |
| /** may be called by Initiator to cancel the callback. Will wait for |
| * callback to complete if in progress. |
| */ |
| virtual void cancel() { |
| qpid::sys::Mutex::ScopedLock l(callbackLock); |
| while (inCallback) callbackLock.wait(); |
| callback = boost::intrusive_ptr<Callback>(); |
| active = false; |
| } |
| }; |
| |
| }} // qpid::broker:: |
| #endif /*!_AsyncCompletion_*/ |