QPID-7895: Update to linearstore to make the flush timer more intelligent and only run when there is flushable content in the write buffers
diff --git a/.gitignore b/.gitignore
index 63a3f41..b7c3e17 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,4 +16,4 @@
# specific language governing permissions and limitations
# under the License.
#
-
+/build/
diff --git a/src/qpid/linearstore/JournalImpl.cpp b/src/qpid/linearstore/JournalImpl.cpp
index 5539f39..d7186cb 100644
--- a/src/qpid/linearstore/JournalImpl.cpp
+++ b/src/qpid/linearstore/JournalImpl.cpp
@@ -32,13 +32,63 @@
InactivityFireEvent::InactivityFireEvent(JournalImpl* p,
const ::qpid::sys::Duration timeout):
- ::qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {}
+ ::qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p),
+ _state(NOT_ADDED)
+{}
+
+bool InactivityFireEvent::addToTimer() {
+ ::qpid::sys::Mutex::ScopedLock sl(_ifeStateLock);
+ if (_state == NOT_ADDED) {
+ _state = RUNNING;
+ return true;
+ }
+ return false;
+}
+
+bool InactivityFireEvent::resetIfNotRunning() {
+ ::qpid::sys::Mutex::ScopedLock sl(_ifeStateLock);
+ switch (_state) {
+ case NOT_ADDED: THROW_STORE_FULL_EXCEPTION("Called InactivityFireEvent::resetIfNotRunning() before being added to timer");
+ case FIRED :
+ setupNextFire();
+ _state = RUNNING;
+ return true;
+ case FLUSHED:
+ restart();
+ _state = RUNNING;
+ break;
+ default:; // ignore
+ }
+ return false;
+}
+
+void InactivityFireEvent::flushed() {
+ ::qpid::sys::Mutex::ScopedLock sl(_ifeStateLock);
+ if (_state == RUNNING) {
+ _state = FLUSHED;
+ }
+}
void InactivityFireEvent::fire() {
- ::qpid::sys::Mutex::ScopedLock sl(_ife_lock);
- if (_parent) {
- _parent->flushFire();
+ {
+ ::qpid::sys::Mutex::ScopedLock sl2(_ifeStateLock);
+ if (_state != RUNNING) {
+ return;
+ }
+ _state = FIRED;
}
+ {
+ ::qpid::sys::Mutex::ScopedLock sl(_ifeParentLock);
+ if (_parent) {
+ _parent->flushFire();
+ }
+ }
+}
+
+void InactivityFireEvent::cancel() {
+ ::qpid::sys::TimerTask::cancel();
+ ::qpid::sys::Mutex::ScopedLock sl(_ifeParentLock);
+ _parent = 0;
}
GetEventsFireEvent::GetEventsFireEvent(JournalImpl* p,
@@ -65,16 +115,10 @@
timer(timer_),
_journalLogRef(journalLogRef),
getEventsTimerSetFlag(false),
- writeActivityFlag(false),
- flushTriggeredFlag(true),
deleteCallback(onDelete)
{
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
- {
- timer.start();
- timer.add(inactivityFireEventPtr);
- }
initManagement(a);
@@ -383,9 +427,7 @@
void
JournalImpl::stop(bool block_till_aio_cmpl)
{
- InactivityFireEvent* ifep = dynamic_cast<InactivityFireEvent*>(inactivityFireEventPtr.get());
- assert(ifep); // dynamic_cast can return null if the cast fails
- ifep->cancel();
+ inactivityFireEventPtr->cancel();
jcntl::stop(block_till_aio_cmpl);
if (_mgmtObject.get() != 0) {
@@ -402,6 +444,7 @@
::qpid::sys::Mutex::ScopedLock sl(_getf_lock);
if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); }
}
+ inactivityFireEventPtr->flushed();
return res;
}
@@ -417,19 +460,7 @@
void
JournalImpl::flushFire()
{
- if (writeActivityFlag) {
- writeActivityFlag = false;
- flushTriggeredFlag = false;
- } else {
- if (!flushTriggeredFlag) {
- flush(false);
- flushTriggeredFlag = true;
- }
- }
- inactivityFireEventPtr->setupNextFire();
- {
- timer.add(inactivityFireEventPtr);
- }
+ flush(false);
}
void
@@ -473,7 +504,14 @@
void
JournalImpl::handleIoResult(const ::qpid::linearstore::journal::iores r)
{
- writeActivityFlag = true;
+ if (inactivityFireEventPtr->addToTimer()) {
+ timer.start();
+ timer.add(inactivityFireEventPtr);
+ } else {
+ if (inactivityFireEventPtr->resetIfNotRunning()) {
+ timer.add(inactivityFireEventPtr);
+ }
+ }
switch (r)
{
case ::qpid::linearstore::journal::RHM_IORES_SUCCESS:
diff --git a/src/qpid/linearstore/JournalImpl.h b/src/qpid/linearstore/JournalImpl.h
index 7c9d28e..0b89088 100644
--- a/src/qpid/linearstore/JournalImpl.h
+++ b/src/qpid/linearstore/JournalImpl.h
@@ -47,14 +47,19 @@
class InactivityFireEvent : public ::qpid::sys::TimerTask
{
JournalImpl* _parent;
- ::qpid::sys::Mutex _ife_lock;
+ ::qpid::sys::Mutex _ifeParentLock;
+ ::qpid::sys::Mutex _ifeStateLock;
+ enum {NOT_ADDED=0, RUNNING, FIRED, FLUSHED} _state;
public:
InactivityFireEvent(JournalImpl* p,
const ::qpid::sys::Duration timeout);
virtual ~InactivityFireEvent() {}
+ bool addToTimer();
+ bool resetIfNotRunning();
+ void flushed();
void fire();
- inline void cancel() { ::qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
+ void cancel();
};
class GetEventsFireEvent : public ::qpid::sys::TimerTask
@@ -81,13 +86,11 @@
::qpid::sys::Timer& timer;
JournalLogImpl& _journalLogRef;
bool getEventsTimerSetFlag;
- boost::intrusive_ptr< ::qpid::sys::TimerTask> getEventsFireEventsPtr;
+ boost::intrusive_ptr<GetEventsFireEvent> getEventsFireEventsPtr;
::qpid::sys::Mutex _getf_lock;
::qpid::sys::Mutex _read_lock;
- bool writeActivityFlag;
- bool flushTriggeredFlag;
- boost::intrusive_ptr< ::qpid::sys::TimerTask> inactivityFireEventPtr;
+ boost::intrusive_ptr<InactivityFireEvent> inactivityFireEventPtr;
::qpid::management::ManagementAgent* _agent;
::qmf::org::apache::qpid::linearstore::Journal::shared_ptr _mgmtObject;