blob: cc735a3e22ff491c82df5fb3bbb61d5058ddf85e [file] [log] [blame]
/** @file
FIFO queue
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@section details Details
ProtectedQueue implements a FIFO queue with the following functionality:
-# Multiple threads could be simultaneously trying to enqueue and
dequeue. Hence the queue needs to be protected with mutex.
-# In case the queue is empty, dequeue() sleeps for a specified amount
of time, or until a new element is inserted, whichever is earlier.
*/
#include "P_EventSystem.h"
// The protected queue is designed to delay signaling of threads
// until some amount of work has been completed on the current thread
// in order to prevent excess context switches.
//
// Defining EAGER_SIGNALLING disables this behavior and causes
// threads to be made runnable immediately.
//
// #define EAGER_SIGNALLING
extern ClassAllocator<Event> eventAllocator;
void
ProtectedQueue::enqueue(Event *e , bool fast_signal)
{
ink_assert(!e->in_the_prot_queue && !e->in_the_priority_queue);
EThread *e_ethread = e->ethread;
e->in_the_prot_queue = 1;
bool was_empty = (ink_atomiclist_push(&al, e) == NULL);
if (was_empty) {
EThread *inserting_thread = this_ethread();
// queue e->ethread in the list of threads to be signalled
// inserting_thread == 0 means it is not a regular EThread
if (inserting_thread != e_ethread) {
if (!inserting_thread || !inserting_thread->ethreads_to_be_signalled) {
signal();
if (fast_signal) {
if (e_ethread->signal_hook)
e_ethread->signal_hook(e_ethread);
}
} else {
#ifdef EAGER_SIGNALLING
// Try to signal now and avoid deferred posting.
if (e_ethread->EventQueueExternal.try_signal())
return;
#endif
if (fast_signal) {
if (e_ethread->signal_hook)
e_ethread->signal_hook(e_ethread);
}
int &t = inserting_thread->n_ethreads_to_be_signalled;
EThread **sig_e = inserting_thread->ethreads_to_be_signalled;
if ((t + 1) >= eventProcessor.n_ethreads) {
// we have run out of room
if ((t + 1) == eventProcessor.n_ethreads) {
// convert to direct map, put each ethread (sig_e[i]) into
// the direct map loation: sig_e[sig_e[i]->id]
for (int i = 0; i < t; i++) {
EThread *cur = sig_e[i]; // put this ethread
while (cur) {
EThread *next = sig_e[cur->id]; // into this location
if (next == cur)
break;
sig_e[cur->id] = cur;
cur = next;
}
// if not overwritten
if (sig_e[i] && sig_e[i]->id != i)
sig_e[i] = 0;
}
t++;
}
// we have a direct map, insert this EThread
sig_e[e_ethread->id] = e_ethread;
} else
// insert into vector
sig_e[t++] = e_ethread;
}
}
}
}
void
flush_signals(EThread * thr)
{
ink_assert(this_ethread() == thr);
int n = thr->n_ethreads_to_be_signalled;
if (n > eventProcessor.n_ethreads)
n = eventProcessor.n_ethreads; // MAX
int i;
// Since the lock is only there to prevent a race in ink_cond_timedwait
// the lock is taken only for a short time, thus it is unlikely that
// this code has any effect.
#ifdef EAGER_SIGNALLING
for (i = 0; i < n; i++) {
// Try to signal as many threads as possible without blocking.
if (thr->ethreads_to_be_signalled[i]) {
if (thr->ethreads_to_be_signalled[i]->EventQueueExternal.try_signal())
thr->ethreads_to_be_signalled[i] = 0;
}
}
#endif
for (i = 0; i < n; i++) {
if (thr->ethreads_to_be_signalled[i]) {
thr->ethreads_to_be_signalled[i]->EventQueueExternal.signal();
if (thr->ethreads_to_be_signalled[i]->signal_hook)
thr->ethreads_to_be_signalled[i]->signal_hook(thr->ethreads_to_be_signalled[i]);
thr->ethreads_to_be_signalled[i] = 0;
}
}
thr->n_ethreads_to_be_signalled = 0;
}
void
ProtectedQueue::dequeue_timed(ink_hrtime cur_time, ink_hrtime timeout, bool sleep)
{
(void) cur_time;
Event *e;
if (sleep) {
ink_mutex_acquire(&lock);
if (INK_ATOMICLIST_EMPTY(al)) {
timespec ts = ink_hrtime_to_timespec(timeout);
ink_cond_timedwait(&might_have_data, &lock, &ts);
}
ink_mutex_release(&lock);
}
e = (Event *) ink_atomiclist_popall(&al);
// invert the list, to preserve order
SLL<Event, Event::Link_link> l, t;
t.head = e;
while ((e = t.pop()))
l.push(e);
// insert into localQueue
while ((e = l.pop())) {
if (!e->cancelled)
localQueue.enqueue(e);
else {
e->mutex = NULL;
eventAllocator.free(e);
}
}
}