| /* $Id$ | |
| * | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to you under the Apache License, Version | |
| * 2.0 (the "License"); you may not use this file except in compliance | |
| * with the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| /* | |
| * etch_queue_apr.c | |
| * based on apr_queue, modified by Cisco as follows: | |
| * timeouts were added on the push and pop waits, and functions were modified | |
| * to release their lock at a single exit point. | |
| */ | |
| #include "apr.h" | |
| #include <stdio.h> | |
| #include <stdlib.h> | |
| #include "apu.h" | |
| #include "apr_portable.h" | |
| #include "apr_thread_mutex.h" | |
| #include "apr_thread_cond.h" | |
| #include "apr_errno.h" | |
| #include "etch_queue_apr.h" /* Cisco */ | |
| /* #define QUEUE_DEBUG */ | |
| #ifdef QUEUE_DEBUG | |
| static void Q_DBG(char*msg, etch_apr_queue_t *q) { | |
| fprintf(stderr, "%ld\t#%d in %d out %d\t%s\n", | |
| (size_t) apr_os_thread_current(), /* cast - Cisco */ | |
| q->nelts, q->in, q->out, | |
| msg | |
| ); | |
| fflush(stdout); /* Cisco */ | |
| } | |
| #else | |
| #define Q_DBG(x,y) | |
| #endif | |
| /** | |
| * Detects when the etch_apr_queue_t is full. This utility function is expected | |
| * to be called from within critical sections, and is not threadsafe. | |
| */ | |
| #define etch_apr_queue_full(queue) ((queue)->nelts == (queue)->bounds) | |
| /** | |
| * Detects when the etch_apr_queue_t is empty. This utility function is expected | |
| * to be called from within critical sections, and is not threadsafe. | |
| */ | |
| #define etch_apr_queue_empty(queue) ((queue)->nelts == 0) | |
| /** | |
| * Callback routine that is called to destroy this | |
| * etch_apr_queue_t when its pool is destroyed. | |
| */ | |
| static apr_status_t etch_apr_queue_destroy(void *data) | |
| { | |
| etch_apr_queue_t *queue = data; | |
| apr_thread_cond_destroy(queue->not_empty); | |
| apr_thread_cond_destroy(queue->not_full); | |
| apr_thread_mutex_destroy(queue->one_big_mutex); | |
| return APR_SUCCESS; | |
| } | |
| /** | |
| * Initialize the etch_apr_queue_t. | |
| */ | |
| apr_status_t etch_apr_queue_create(etch_apr_queue_t **q, | |
| unsigned int queue_capacity, | |
| apr_pool_t *a) | |
| { | |
| apr_status_t rv; | |
| etch_apr_queue_t *queue; | |
| queue = apr_palloc(a, sizeof(etch_apr_queue_t)); | |
| *q = queue; | |
| /* nested doesn't work ;( */ | |
| rv = apr_thread_mutex_create(&queue->one_big_mutex, | |
| APR_THREAD_MUTEX_UNNESTED, | |
| a); | |
| if (rv != APR_SUCCESS) | |
| return rv; | |
| rv = apr_thread_cond_create(&queue->not_empty, a); | |
| if (rv != APR_SUCCESS) | |
| return rv; | |
| rv = apr_thread_cond_create(&queue->not_full, a); | |
| if (rv != APR_SUCCESS) | |
| return rv; | |
| /* Set all the data in the queue to NULL */ | |
| queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*)); | |
| queue->bounds = queue_capacity; | |
| queue->nelts = 0; | |
| queue->in = 0; | |
| queue->out = 0; | |
| queue->terminated = 0; | |
| queue->full_waiters = 0; | |
| queue->empty_waiters = 0; | |
| apr_pool_cleanup_register(a, queue, etch_apr_queue_destroy, apr_pool_cleanup_null); | |
| return APR_SUCCESS; | |
| } | |
| /** | |
| * Push new data onto the queue. Blocks if the queue is full. Once | |
| * the push operation has completed, it signals other threads waiting | |
| * in apr_queue_pop() that they may continue consuming sockets. | |
| * @param timeout added by Cisco. now uses apr_thread_cond_timewait(). | |
| * interval of time to wait. zero means forever, negative indicates no wait, | |
| * otherwise wait time in *microseconds*. | |
| * @return APR_SUCCESS, APR_EAGAIN, APR_EOF, APR_EINTR, APR_TIMEUP, | |
| * or some APR error | |
| */ | |
| apr_status_t etch_apr_queue_push(etch_apr_queue_t *queue, | |
| apr_interval_time_t timeout, | |
| void *data) | |
| { | |
| apr_status_t rv; | |
| if (queue->terminated) | |
| rv = APR_EOF; /* no more elements ever again */ | |
| else | |
| if (APR_SUCCESS == (rv = apr_thread_mutex_lock(queue->one_big_mutex))) | |
| { | |
| do | |
| { if (etch_apr_queue_full(queue)) | |
| { | |
| if (!queue->terminated) | |
| { | |
| if (-1 == timeout) | |
| { rv = APR_EAGAIN; /* asked to not wait */ | |
| break; | |
| } | |
| queue->full_waiters++; | |
| if (0 == timeout) | |
| rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex); | |
| else | |
| rv = apr_thread_cond_timedwait(queue->not_full, queue->one_big_mutex, timeout); | |
| queue->full_waiters--; | |
| if (rv != APR_SUCCESS) | |
| break; | |
| } | |
| /* If we wake up and it's still empty, then we were interrupted */ | |
| if (etch_apr_queue_full(queue)) | |
| { | |
| Q_DBG("queue full (intr)", queue); | |
| rv = queue->terminated? APR_EOF: APR_EINTR; | |
| break; | |
| } | |
| } | |
| queue->data[queue->in] = data; | |
| queue->in = (queue->in + 1) % queue->bounds; | |
| queue->nelts++; | |
| if (queue->empty_waiters) | |
| { | |
| Q_DBG("sig !empty", queue); | |
| rv = apr_thread_cond_signal(queue->not_empty); | |
| } | |
| } while(0); | |
| apr_thread_mutex_unlock(queue->one_big_mutex); | |
| } | |
| return rv; | |
| } | |
| /** | |
| * Push new data onto the queue. Blocks if the queue is full. Once | |
| * the push operation has completed, it signals other threads waiting | |
| * in apr_queue_pop() that they may continue consuming sockets. | |
| */ | |
| apr_status_t etch_apr_queue_trypush(etch_apr_queue_t *queue, void *data) | |
| { | |
| apr_status_t rv; | |
| if (queue->terminated) | |
| rv = APR_EOF; | |
| else | |
| if (APR_SUCCESS == (rv = apr_thread_mutex_lock(queue->one_big_mutex))) | |
| { | |
| if (etch_apr_queue_full(queue)) | |
| rv = APR_EAGAIN; | |
| else | |
| { queue->data[queue->in] = data; | |
| queue->in = (queue->in + 1) % queue->bounds; | |
| queue->nelts++; | |
| if (queue->empty_waiters) | |
| { | |
| Q_DBG("sig !empty", queue); | |
| rv = apr_thread_cond_signal(queue->not_empty); | |
| } | |
| } | |
| apr_thread_mutex_unlock(queue->one_big_mutex); | |
| } | |
| return rv; | |
| } | |
| /** | |
| * not thread safe | |
| */ | |
| unsigned int etch_apr_queue_size(etch_apr_queue_t *queue) { | |
| return queue->nelts; | |
| } | |
| /** | |
| * Retrieves the next item from the queue. If there are no | |
| * items available, it will block until one becomes available. | |
| * Once retrieved, the item is placed into the address specified by | |
| * 'data'. | |
| * @param timeout added by Cisco. now uses apr_thread_cond_timewait(). | |
| * interval of time to wait. zero means forever, -1 means no wait, | |
| * -2 means don't wait and ignore queue closed indicator, | |
| * otherwise timeout is blocking time in microseconds. | |
| * @return APR_SUCCESS, APR_EAGAIN, APR_EOF, APR_EINTR, APR_TIMEUP, | |
| * or some APR error | |
| */ | |
| apr_status_t etch_apr_queue_pop(etch_apr_queue_t *queue, | |
| apr_interval_time_t timeout, | |
| void **data) | |
| { | |
| apr_status_t rv; | |
| if (queue->terminated) /* Cisco back door to clear closed queue */ | |
| { if (timeout != ETCHQUEUE_CLEARING_CLOSED_QUEUE) | |
| return APR_EOF; /* no more elements ever again */ | |
| } | |
| if (APR_SUCCESS == (rv = apr_thread_mutex_lock(queue->one_big_mutex))) | |
| { | |
| do | |
| { /* Keep waiting until we wake up and find that the queue is not empty. */ | |
| if (etch_apr_queue_empty(queue)) | |
| { | |
| if (-1 == timeout) | |
| { rv = APR_EAGAIN; /* asked to not wait */ | |
| break; | |
| } | |
| if (!queue->terminated) | |
| { | |
| queue->empty_waiters++; | |
| if (0 == timeout) | |
| rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex); | |
| else | |
| rv = apr_thread_cond_timedwait(queue->not_empty, queue->one_big_mutex, timeout); | |
| queue->empty_waiters--; | |
| if (rv != APR_SUCCESS) /* rv will be APR_TIMEUP if timed out */ | |
| break; | |
| } | |
| /* If we wake up and it's still empty, then we were interrupted */ | |
| if (etch_apr_queue_empty(queue)) | |
| { | |
| Q_DBG("queue empty (intr)", queue); | |
| rv = queue->terminated? APR_EOF: APR_EINTR; | |
| break; | |
| } | |
| } | |
| *data = queue->data[queue->out]; | |
| queue->nelts--; | |
| queue->out = (queue->out + 1) % queue->bounds; | |
| if (queue->full_waiters) | |
| { | |
| Q_DBG("signal !full", queue); | |
| rv = apr_thread_cond_signal(queue->not_full); | |
| } | |
| } while(0); | |
| apr_thread_mutex_unlock(queue->one_big_mutex); | |
| } | |
| return rv; | |
| } | |
| /** | |
| * Retrieves the next item from the queue. If there are no | |
| * items available, return APR_EAGAIN. Once retrieved, | |
| * the item is placed into the address specified by 'data'. | |
| */ | |
| apr_status_t etch_apr_queue_trypop(etch_apr_queue_t *queue, void **data) | |
| { | |
| apr_status_t rv; | |
| if (queue->terminated) | |
| rv = APR_EOF; /* no more elements ever again */ | |
| else | |
| if (APR_SUCCESS == (rv = apr_thread_mutex_lock(queue->one_big_mutex))) | |
| { | |
| if (etch_apr_queue_empty(queue)) | |
| rv = APR_EAGAIN; | |
| else | |
| { *data = queue->data[queue->out]; | |
| queue->nelts--; | |
| queue->out = (queue->out + 1) % queue->bounds; | |
| if (queue->full_waiters) | |
| { | |
| Q_DBG("signal !full", queue); | |
| rv = apr_thread_cond_signal(queue->not_full); | |
| } | |
| } | |
| apr_thread_mutex_unlock(queue->one_big_mutex); | |
| } | |
| return rv; | |
| } | |
| apr_status_t etch_apr_queue_interrupt_all(etch_apr_queue_t *queue) | |
| { | |
| apr_status_t rv; | |
| Q_DBG("intr all", queue); | |
| if (APR_SUCCESS == (rv = apr_thread_mutex_lock(queue->one_big_mutex))) | |
| { | |
| apr_thread_cond_broadcast(queue->not_empty); | |
| apr_thread_cond_broadcast(queue->not_full); | |
| apr_thread_mutex_unlock(queue->one_big_mutex); | |
| } | |
| return rv; | |
| } | |
| /** | |
| * etch_apr_queue_unsafe_interrupt_all() | |
| * added by Cisco to opeate when lock already held since queue lock is not nestable. | |
| */ | |
| apr_status_t etch_apr_queue_unsafe_interrupt_all(etch_apr_queue_t *queue) | |
| { | |
| Q_DBG("intr all", queue); | |
| apr_thread_cond_broadcast(queue->not_empty); | |
| apr_thread_cond_broadcast(queue->not_full); | |
| return APR_SUCCESS; | |
| } | |
| apr_status_t etch_apr_queue_term(etch_apr_queue_t *queue) | |
| { | |
| apr_status_t rv; | |
| if (APR_SUCCESS == (rv = apr_thread_mutex_lock(queue->one_big_mutex))) | |
| { | |
| /* we must hold one_big_mutex when setting this... otherwise, | |
| * we could end up setting it and waking everybody up just after a | |
| * would-be popper checks it but right before they block | |
| */ | |
| queue->terminated = 1; | |
| apr_thread_mutex_unlock(queue->one_big_mutex); | |
| } | |
| return etch_apr_queue_interrupt_all(queue); | |
| } | |
| /** | |
| * added by Cisco to close queue when lock already held | |
| */ | |
| apr_status_t etch_apr_queue_unsafeclose(etch_apr_queue_t *queue) | |
| { | |
| queue->terminated = 1; | |
| return etch_apr_queue_unsafe_interrupt_all(queue); | |
| } | |
| /** | |
| * added by Cisco to access lock externally | |
| */ | |
| apr_status_t etch_apr_queue_lock(etch_apr_queue_t *queue) | |
| { | |
| return apr_thread_mutex_lock(queue->one_big_mutex); | |
| } | |
| /** | |
| * added by Cisco to access lock externally | |
| */ | |
| apr_status_t etch_apr_queue_unlock(etch_apr_queue_t *queue) | |
| { | |
| return apr_thread_mutex_unlock(queue->one_big_mutex); | |
| } | |
| /** | |
| * etch_apr_queue_trylock() | |
| * added by Cisco to access lock externally | |
| */ | |
| int etch_apr_queue_trylock(etch_apr_queue_t *queue) | |
| { | |
| return apr_thread_mutex_trylock(queue->one_big_mutex); | |
| } |