blob: 07bbed346ae44d8033c001bf5e29ecfb2c6988ed [file] [log] [blame]
/* $Id$
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* etch_queue_apr.c
* based on apr_queue, modified by Cisco as follows:
* timeouts were added on the push and pop waits, and functions were modified
* to release their lock at a single exit point.
*/
#include "apr.h"
#include <stdio.h>
#include <stdlib.h>
//#include "apu.h"
#include "apr_portable.h"
#include "apr_thread_mutex.h"
#include "apr_thread_cond.h"
#include "apr_errno.h"
#include "etch_queue_apr.h" /* Cisco */
/* #define QUEUE_DEBUG */
#ifdef QUEUE_DEBUG
static void Q_DBG(char*msg, etch_apr_queue_t *q) {
fprintf(stderr, "%ld\t#%d in %d out %d\t%s\n",
(size_t) apr_os_thread_current(), /* cast - Cisco */
q->nelts, q->in, q->out,
msg
);
fflush(stdout); /* Cisco */
}
#else
#define Q_DBG(x,y)
#endif
/**
* Detects when the etch_apr_queue_t is full. This utility function is expected
* to be called from within critical sections, and is not threadsafe.
*/
#define etch_apr_queue_full(queue) ((queue)->nelts == (queue)->bounds)
/**
* Detects when the etch_apr_queue_t is empty. This utility function is expected
* to be called from within critical sections, and is not threadsafe.
*/
#define etch_apr_queue_empty(queue) ((queue)->nelts == 0)
/**
* Callback routine that is called to destroy this
* etch_apr_queue_t when its pool is destroyed.
*/
static apr_status_t etch_apr_queue_destroy(void *data)
{
etch_apr_queue_t *queue = data;
apr_thread_cond_destroy(queue->not_empty);
apr_thread_cond_destroy(queue->not_full);
apr_thread_mutex_destroy(queue->one_big_mutex);
return APR_SUCCESS;
}
/**
* Initialize the etch_apr_queue_t.
*/
apr_status_t etch_apr_queue_create(etch_apr_queue_t **q,
unsigned int queue_capacity,
apr_pool_t *a)
{
apr_status_t rv;
etch_apr_queue_t *queue;
queue = apr_palloc(a, sizeof(etch_apr_queue_t));
*q = queue;
/* nested doesn't work ;( */
rv = apr_thread_mutex_create(&queue->one_big_mutex,
APR_THREAD_MUTEX_UNNESTED,
a);
if (rv != APR_SUCCESS)
return rv;
rv = apr_thread_cond_create(&queue->not_empty, a);
if (rv != APR_SUCCESS)
return rv;
rv = apr_thread_cond_create(&queue->not_full, a);
if (rv != APR_SUCCESS)
return rv;
/* Set all the data in the queue to NULL */
queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*));
queue->bounds = queue_capacity;
queue->nelts = 0;
queue->in = 0;
queue->out = 0;
queue->terminated = 0;
queue->full_waiters = 0;
queue->empty_waiters = 0;
apr_pool_cleanup_register(a, queue, etch_apr_queue_destroy, apr_pool_cleanup_null);
return APR_SUCCESS;
}
/**
* Push new data onto the queue. Blocks if the queue is full. Once
* the push operation has completed, it signals other threads waiting
* in apr_queue_pop() that they may continue consuming sockets.
* @param timeout added by Cisco. now uses apr_thread_cond_timewait().
* interval of time to wait. zero means forever, negative indicates no wait,
* otherwise wait time in *microseconds*.
* @return APR_SUCCESS, APR_EAGAIN, APR_EOF, APR_EINTR, APR_TIMEUP,
* or some APR error
*/
apr_status_t etch_apr_queue_push(etch_apr_queue_t *queue,
apr_interval_time_t timeout,
void *data)
{
apr_status_t rv;
if (queue->terminated)
rv = APR_EOF; /* no more elements ever again */
else
if (APR_SUCCESS == (rv = apr_thread_mutex_lock(queue->one_big_mutex)))
{
do
{ if (etch_apr_queue_full(queue))
{
if (!queue->terminated)
{
if (-1 == timeout)
{ rv = APR_EAGAIN; /* asked to not wait */
break;
}
queue->full_waiters++;
if (0 == timeout)
rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
else
rv = apr_thread_cond_timedwait(queue->not_full, queue->one_big_mutex, timeout);
queue->full_waiters--;
if (rv != APR_SUCCESS)
break;
}
/* If we wake up and it's still empty, then we were interrupted */
if (etch_apr_queue_full(queue))
{
Q_DBG("queue full (intr)", queue);
rv = queue->terminated? APR_EOF: APR_EINTR;
break;
}
}
queue->data[queue->in] = data;
queue->in = (queue->in + 1) % queue->bounds;
queue->nelts++;
if (queue->empty_waiters)
{
Q_DBG("sig !empty", queue);
rv = apr_thread_cond_signal(queue->not_empty);
}
} while(0);
apr_thread_mutex_unlock(queue->one_big_mutex);
}
return rv;
}
/**
* Push new data onto the queue. Blocks if the queue is full. Once
* the push operation has completed, it signals other threads waiting
* in apr_queue_pop() that they may continue consuming sockets.
*/
apr_status_t etch_apr_queue_trypush(etch_apr_queue_t *queue, void *data)
{
apr_status_t rv;
if (queue->terminated)
rv = APR_EOF;
else
if (APR_SUCCESS == (rv = apr_thread_mutex_lock(queue->one_big_mutex)))
{
if (etch_apr_queue_full(queue))
rv = APR_EAGAIN;
else
{ queue->data[queue->in] = data;
queue->in = (queue->in + 1) % queue->bounds;
queue->nelts++;
if (queue->empty_waiters)
{
Q_DBG("sig !empty", queue);
rv = apr_thread_cond_signal(queue->not_empty);
}
}
apr_thread_mutex_unlock(queue->one_big_mutex);
}
return rv;
}
/**
* not thread safe
*/
unsigned int etch_apr_queue_size(etch_apr_queue_t *queue) {
return queue->nelts;
}
/**
* Retrieves the next item from the queue. If there are no
* items available, it will block until one becomes available.
* Once retrieved, the item is placed into the address specified by
* 'data'.
* @param timeout added by Cisco. now uses apr_thread_cond_timewait().
* interval of time to wait. zero means forever, -1 means no wait,
* -2 means don't wait and ignore queue closed indicator,
* otherwise timeout is blocking time in microseconds.
* @return APR_SUCCESS, APR_EAGAIN, APR_EOF, APR_EINTR, APR_TIMEUP,
* or some APR error
*/
apr_status_t etch_apr_queue_pop(etch_apr_queue_t *queue,
apr_interval_time_t timeout,
void **data)
{
apr_status_t rv;
if (queue->terminated) /* Cisco back door to clear closed queue */
{ if (timeout != ETCHQUEUE_CLEARING_CLOSED_QUEUE)
return APR_EOF; /* no more elements ever again */
}
if (APR_SUCCESS == (rv = apr_thread_mutex_lock(queue->one_big_mutex)))
{
do
{ /* Keep waiting until we wake up and find that the queue is not empty. */
if (etch_apr_queue_empty(queue))
{
if (-1 == timeout)
{ rv = APR_EAGAIN; /* asked to not wait */
break;
}
if (!queue->terminated)
{
queue->empty_waiters++;
if (0 == timeout)
rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
else
rv = apr_thread_cond_timedwait(queue->not_empty, queue->one_big_mutex, timeout);
queue->empty_waiters--;
if (rv != APR_SUCCESS) /* rv will be APR_TIMEUP if timed out */
break;
}
/* If we wake up and it's still empty, then we were interrupted */
if (etch_apr_queue_empty(queue))
{
Q_DBG("queue empty (intr)", queue);
rv = queue->terminated? APR_EOF: APR_EINTR;
break;
}
}
*data = queue->data[queue->out];
queue->nelts--;
queue->out = (queue->out + 1) % queue->bounds;
if (queue->full_waiters)
{
Q_DBG("signal !full", queue);
rv = apr_thread_cond_signal(queue->not_full);
}
} while(0);
apr_thread_mutex_unlock(queue->one_big_mutex);
}
return rv;
}
/**
* Retrieves the next item from the queue. If there are no
* items available, return APR_EAGAIN. Once retrieved,
* the item is placed into the address specified by 'data'.
*/
apr_status_t etch_apr_queue_trypop(etch_apr_queue_t *queue, void **data)
{
apr_status_t rv;
if (queue->terminated)
rv = APR_EOF; /* no more elements ever again */
else
if (APR_SUCCESS == (rv = apr_thread_mutex_lock(queue->one_big_mutex)))
{
if (etch_apr_queue_empty(queue))
rv = APR_EAGAIN;
else
{ *data = queue->data[queue->out];
queue->nelts--;
queue->out = (queue->out + 1) % queue->bounds;
if (queue->full_waiters)
{
Q_DBG("signal !full", queue);
rv = apr_thread_cond_signal(queue->not_full);
}
}
apr_thread_mutex_unlock(queue->one_big_mutex);
}
return rv;
}
apr_status_t etch_apr_queue_interrupt_all(etch_apr_queue_t *queue)
{
apr_status_t rv;
Q_DBG("intr all", queue);
if (APR_SUCCESS == (rv = apr_thread_mutex_lock(queue->one_big_mutex)))
{
apr_thread_cond_broadcast(queue->not_empty);
apr_thread_cond_broadcast(queue->not_full);
apr_thread_mutex_unlock(queue->one_big_mutex);
}
return rv;
}
/**
* etch_apr_queue_unsafe_interrupt_all()
* added by Cisco to opeate when lock already held since queue lock is not nestable.
*/
apr_status_t etch_apr_queue_unsafe_interrupt_all(etch_apr_queue_t *queue)
{
Q_DBG("intr all", queue);
apr_thread_cond_broadcast(queue->not_empty);
apr_thread_cond_broadcast(queue->not_full);
return APR_SUCCESS;
}
apr_status_t etch_apr_queue_term(etch_apr_queue_t *queue)
{
apr_status_t rv;
if (APR_SUCCESS == (rv = apr_thread_mutex_lock(queue->one_big_mutex)))
{
/* we must hold one_big_mutex when setting this... otherwise,
* we could end up setting it and waking everybody up just after a
* would-be popper checks it but right before they block
*/
queue->terminated = 1;
apr_thread_mutex_unlock(queue->one_big_mutex);
}
return etch_apr_queue_interrupt_all(queue);
}
/**
* added by Cisco to close queue when lock already held
*/
apr_status_t etch_apr_queue_unsafeclose(etch_apr_queue_t *queue)
{
queue->terminated = 1;
return etch_apr_queue_unsafe_interrupt_all(queue);
}
/**
* added by Cisco to access lock externally
*/
apr_status_t etch_apr_queue_lock(etch_apr_queue_t *queue)
{
return apr_thread_mutex_lock(queue->one_big_mutex);
}
/**
* added by Cisco to access lock externally
*/
apr_status_t etch_apr_queue_unlock(etch_apr_queue_t *queue)
{
return apr_thread_mutex_unlock(queue->one_big_mutex);
}
/**
* etch_apr_queue_trylock()
* added by Cisco to access lock externally
*/
int etch_apr_queue_trylock(etch_apr_queue_t *queue)
{
return apr_thread_mutex_trylock(queue->one_big_mutex);
}