| /* $Id$ | |
| * | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to you under the Apache License, Version | |
| * 2.0 (the "License"); you may not use this file except in compliance | |
| * with the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| /** | |
| * etch_queue.c | |
| */ | |
| #include "apr_network_io.h" | |
| #include "etch_queue.h" | |
| #include "etch_global.h" | |
| #include "etchthread.h" | |
| #include "etchlog.h" | |
| int destroy_etch_queue(etch_queue*); | |
| void etchqueue_clear(etch_queue*); | |
| int etchqueue_iterable_first(etch_iterator*); | |
| int etchqueue_iterable_next(etch_iterator*); | |
| int etchqueue_iterable_has_next(etch_iterator*); | |
| /** | |
| * new_queue() | |
| * etch_queue constructor | |
| */ | |
| etch_queue* new_queue(const int initialsize) | |
| { | |
| etch_queue* newq = NULL; | |
| etch_apr_queue_t* aprq = NULL; | |
| apr_pool_t* newsubpool = NULL; | |
| int result = -1, qsize = initialsize > 0? initialsize: ETCH_DEFQSIZE; | |
| do | |
| { result = apr_pool_create(&newsubpool, etch_apr_mempool); | |
| if (0 != result) break; | |
| result = etch_apr_queue_create (&aprq, qsize, newsubpool); | |
| if (0 != result) break; | |
| newq = (etch_queue*) new_object(sizeof(etch_queue), | |
| ETCHTYPEB_ETCHQUEUE, CLASSID_ETCHQUEUE); | |
| newq->aprq = aprq; | |
| newq->subpool = newsubpool; | |
| newq->qcapacity = initialsize; | |
| newq->destroy = destroy_etch_queue; | |
| /* assume content to be etch objects owned by the queue */ | |
| newq->is_readonly = FALSE; | |
| newq->content_type = ETCHQUEUE_CONTENT_OBJECT; | |
| new_iterable(&newq->iterable, NULL, etchqueue_iterable_first, | |
| etchqueue_iterable_next, etchqueue_iterable_has_next); | |
| result = 0; | |
| } while(0); | |
| if (0 != result) | |
| { | |
| if (newsubpool) apr_pool_destroy(newsubpool); | |
| } | |
| return newq; | |
| } | |
| /* | |
| * destroy_etch_queue() | |
| * etch_queue destructor | |
| */ | |
| int destroy_etch_queue(etch_queue* queue) | |
| { | |
| if (NULL == queue) return -1; | |
| if (-1 == etch_apr_queue_trylock(queue->aprq)) | |
| return -1; | |
| else etch_apr_queue_unlock(queue->aprq); | |
| if (queue->refcount > 0 && --queue->refcount > 0) return -1; | |
| if (!is_etchobj_static_content(queue)) | |
| { | |
| /* free etch-managed memory */ | |
| etchqueue_clear(queue); | |
| /* free APR-managed memory - APR queue sets a callback such that | |
| * destroying the memory pool destroys the APR queue object */ | |
| if (queue->subpool) | |
| apr_pool_destroy(queue->subpool); | |
| queue->subpool = NULL; | |
| queue->aprq = NULL; | |
| } | |
| return destroy_objectex((objmask*)queue); | |
| } | |
| /* | |
| * etchqueue_clear() | |
| * frees queue content memory. | |
| * this ensures the underlying queue is closed, so it is only invoked prior to | |
| * destruction of the queue in order to free any remaining queued etch objects. | |
| * all other memory associated with the queue, other than the shell object, | |
| * is managed by APR. | |
| */ | |
| void etchqueue_clear(etch_queue* thisx) | |
| { | |
| if (NULL == thisx || NULL == thisx->aprq || thisx->is_readonly) return; | |
| etchqueue_close(thisx, TRUE); /* close queue and notify all waiters */ | |
| do | |
| { void* qobj = NULL; | |
| queuecallback callback = thisx->freehook; | |
| int i = 0, is_freehandled = 0, result = 0; | |
| const int is_obj_content = thisx->content_type == ETCHQUEUE_CONTENT_OBJECT; | |
| while(1) | |
| { | |
| result = etchqueue_get_withwait (thisx, ETCHQUEUE_CLEARING_CLOSED_QUEUE, &qobj); | |
| if (0 != result) break; | |
| is_freehandled = callback? callback(i, qobj): FALSE; | |
| if (is_freehandled); | |
| else | |
| if (is_obj_content) | |
| ((objmask*)qobj)->destroy(qobj); | |
| else etch_free(qobj); | |
| } | |
| } while(0); | |
| } | |
| /** | |
| * etchqueue_size() | |
| * @return current queue depth | |
| * not thread-safe, caller should hold the mailbox queue lock | |
| */ | |
| int etchqueue_size(etch_queue* thisx) | |
| { | |
| return thisx && thisx->aprq? etch_apr_queue_size(thisx->aprq): 0; | |
| } | |
| /** | |
| * etchqueue_is_closed() | |
| * not thread-safe, caller should hold the mailbox queue lock | |
| */ | |
| int etchqueue_is_closed(etch_queue* thisx) | |
| { | |
| return thisx && thisx->aprq? thisx->aprq->terminated != 0: TRUE; | |
| } | |
| /** | |
| * etchqueue_is_full() | |
| * not thread-safe, caller should hold the mailbox queue lock | |
| */ | |
| int etchqueue_is_full(etch_queue* thisx) | |
| { | |
| return thisx && thisx->aprq? thisx->aprq->nelts == thisx->aprq->bounds: TRUE; | |
| } | |
| /** | |
| * etchqueue_close() | |
| * underlying implementation ensures operation atomicity | |
| */ | |
| int etchqueue_close(etch_queue* thisx, const int is_needlock) | |
| { | |
| int result = 0; | |
| if (!etchqueue_is_closed(thisx)) | |
| { | |
| if (is_needlock) | |
| result = etch_apr_queue_term(thisx->aprq); | |
| else | |
| result = etch_apr_queue_unsafeclose(thisx->aprq); | |
| result = result == 0? 0: -1; | |
| } | |
| return result; | |
| } | |
| /** | |
| * etchqueue_put() | |
| * push specified item onto the queue. if the queue is full, | |
| * wait indefinitely until space becomes available. | |
| * @return 0 success, -1 error. | |
| */ | |
| int etchqueue_put(etch_queue* thisx, void* item) | |
| { | |
| const int result = etchqueue_put_withwait(thisx, ETCH_INFWAIT, item); | |
| return result; | |
| } | |
| /** | |
| * etchqueue_put_withwait() | |
| * push specified item onto the queue. if the queue is full, | |
| * waitms specified time interval until space becomes available. | |
| * @param waitms wait interval in milliseconds. specify ETCH_INFWAIT to wait | |
| * forever, ETCH_NOWAIT to not wait. | |
| * @return 0 success, -1 unsuccessful. | |
| * | |
| */ | |
| int etchqueue_put_withwait(etch_queue* thisx, const int waitms, void* item) | |
| { | |
| int result = 0, aprresult = 0; | |
| const int64 wait_usec = waitms < 0? -1: waitms * 1000; | |
| if (NULL == item || etchqueue_is_closed(thisx)) return -1; | |
| /* | |
| * etch_apr_queue_push() takes care of notifying threads waiting on a | |
| * queue slot to become available, if any. | |
| */ | |
| aprresult = etch_apr_queue_push (thisx->aprq, wait_usec, item); | |
| switch(aprresult) | |
| { case APR_SUCCESS: break; | |
| case APR_EAGAIN: /* timed out and queue still full */ | |
| case APR_TIMEUP: | |
| result = ETCH_QUEUE_OPERATION_TIMEOUT; | |
| break; | |
| case APR_EINTR: /* wait interrupted and queue still full */ | |
| result = ETCH_QUEUE_OPERATION_CANCELED; | |
| break; | |
| case APR_EOF: /* queue closed */ | |
| default: /* some error */ | |
| result = -1; | |
| } | |
| return result; | |
| } | |
| /** | |
| * etchqueue_get() | |
| * pop specified item off the queue and return it. if the queue is empty, | |
| * wait indefinitely for an item to arrive. popped item is returned in | |
| * out parameter. | |
| * @return 0 success, -1 error. | |
| */ | |
| int etchqueue_get(etch_queue* thisx, void** itemout) | |
| { | |
| const int result = etchqueue_get_withwait(thisx, ETCH_INFWAIT, itemout); | |
| return result; | |
| } | |
| /** | |
| * etchqueue_get_withwait() | |
| * pop specified item off the queue and return it. if the queue is empty, | |
| * wait specified time interval for an item to arrive. popped item is | |
| * returned in out parameter. | |
| * @param waitms wait interval in milliseconds. specify ETCH_INFWAIT to wait | |
| * forever, ETCH_NOWAIT to not wait, ETCHQUEUE_CLEARING_CLOSED_QUEUE to do a | |
| * get with no wait regardless of whether queue has been closed. | |
| * @return 0 success, -1 unsuccessful. | |
| * | |
| */ | |
| int etchqueue_get_withwait(etch_queue* thisx, const int waitms, void** itemout) | |
| { | |
| int result = 0, aprresult = 0; | |
| const int64 wait_usec = waitms < 0? waitms: waitms * 1000; | |
| if (etchqueue_is_closed(thisx)) | |
| if (waitms != ETCHQUEUE_CLEARING_CLOSED_QUEUE) | |
| return -1; | |
| /* | |
| * etch_apr_queue_pop() takes care of notifying waiters if necessary | |
| */ | |
| aprresult = etch_apr_queue_pop (thisx->aprq, wait_usec, itemout); | |
| switch(aprresult) | |
| { case APR_SUCCESS: break; | |
| case APR_EAGAIN: /* timed out and queue still full */ | |
| case APR_TIMEUP: | |
| result = ETCH_QUEUE_OPERATION_TIMEOUT; break; | |
| case APR_EINTR: /* wait interrupted and queue still empty */ | |
| result = ETCH_QUEUE_OPERATION_CANCELED; break; | |
| case APR_EOF: /* queue empty */ | |
| result = ETCH_QUEUE_EOF; break; | |
| default: /* some error */ | |
| result = -1; | |
| } | |
| return result; | |
| } | |
| /** | |
| * etchqueue_try_put() | |
| * @return 0 success, -1 unsuccessful. | |
| * | |
| */ | |
| int etchqueue_try_put(etch_queue* thisx, void* item) | |
| { | |
| const int result = etch_apr_queue_trypush(thisx->aprq, item); | |
| return result == 0? 0: -1; | |
| } | |
| /** | |
| * etchqueue_try_get() | |
| * @return 0 success, -1 unsuccessful. | |
| */ | |
| int etchqueue_try_get(etch_queue* thisx, void** itemout) | |
| { | |
| const int result = etch_apr_queue_trypop(thisx->aprq, itemout); | |
| return result == 0? 0: -1; | |
| } | |
| /** | |
| * etchqueue_notify_all() | |
| * wake up all waiters, | |
| * @return 0 success, -1 unsuccessful. | |
| */ | |
| int etchqueue_notify_all(etch_queue* thisx) | |
| { | |
| const int aprresult = etch_apr_queue_interrupt_all(thisx->aprq); | |
| return aprresult == 0? 0: -1; | |
| } | |
| /** | |
| * acquire the lock guarding the queue | |
| */ | |
| int etchqueue_lock(etch_queue* thisx) | |
| { | |
| return 0 == etch_apr_queue_lock(thisx->aprq)? 0: -1; | |
| } | |
| /** | |
| * release the lock guarding the queue | |
| */ | |
| int etchqueue_unlock(etch_queue* thisx) | |
| { | |
| return 0 == etch_apr_queue_unlock(thisx->aprq)? 0: -1; | |
| } | |
| /** | |
| * acquire the lock guarding the queue if currently availbale | |
| */ | |
| int etchqueue_trylock(etch_queue* thisx) | |
| { | |
| return 0 == etch_apr_queue_trylock(thisx->aprq)? 0: -1; | |
| } | |
| /* - - - - - - - - - - | |
| * i_iterable | |
| * - - - - - - - - - - | |
| */ | |
| /* | |
| * etchqueue_iterable_first() | |
| * i_iterable first() implementation | |
| */ | |
| int etchqueue_iterable_first(etch_iterator* iter) | |
| { | |
| etch_queue* etchq = NULL; | |
| etch_apr_queue_t* aprq = NULL;; | |
| if (!iter || !iter->collection) return -1; | |
| etchq = iter->collection; | |
| if (!etchqueue_size(etchq)) return -1; | |
| iter->current_value = etchq->aprq->data[0]; | |
| iter->ordinal = iter->current_value? 1: 0; | |
| return iter->ordinal? 0: -1; | |
| } | |
| /* | |
| * etchqueue_iterable_next() | |
| * i_iterable next() implementation | |
| * functions as first() if there is no current position. | |
| */ | |
| int etchqueue_iterable_next(etch_iterator* iter) | |
| { | |
| etch_queue* etchq = iter? iter->collection: NULL; | |
| const int count = etchqueue_size(etchq); | |
| if (!count || !iter->ordinal) return -1; | |
| iter->current_value = etchq->aprq->data[iter->ordinal]; | |
| iter->ordinal = iter->current_value? ++iter->ordinal: 0; | |
| return iter->ordinal? 0: -1; | |
| } | |
| /* | |
| * etchqueue_iterable_has_next() | |
| * i_iterable has_next() implementation. | |
| */ | |
| int etchqueue_iterable_has_next(etch_iterator* iter) | |
| { | |
| etch_queue* etchq = iter? iter->collection: NULL; | |
| const int count = etchqueue_size(etchq); | |
| return count && iter->ordinal && (iter->ordinal <= count); | |
| } |