blob: aec0586318b77bb25162e2ec6db530885237018e [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* resourceenforcer_queue.c
* Thread safe queue implementation for resource enforcement.
*/
#include "resourceenforcer/resourceenforcer_queue.h"
#include <pthread.h>
#include <sys/time.h>
#include <time.h>
#include <errno.h>
#include "postgres.h"
#include "cdb/cdbvars.h"
queue *queue_create(void)
{
queue * q = (queue *)malloc(sizeof(queue));
if (q == NULL)
{
write_log("Function queue_create out of memory");
return NULL;
}
q->head = NULL;
q->tail = NULL;
q->size = 0;
pthread_mutexattr_t m_atts;
if (pthread_mutexattr_init(&m_atts) != 0)
{
free(q);
write_log("Function pthread_mutexattr_init failed with error %d",
errno);
return NULL;
}
if (pthread_mutexattr_settype(&m_atts, PTHREAD_MUTEX_ERRORCHECK) != 0)
{
free(q);
write_log("Function pthread_mutexattr_settype failed with error %d",
errno);
return NULL;
}
if (pthread_mutex_init(&(q->queue_mutex), &m_atts) != 0)
{
free(q);
write_log("Function pthread_mutex_init failed with error %d",
errno);
return NULL;
}
if (pthread_cond_init(&(q->queue_cond), NULL) != 0)
{
free(q);
write_log("Function pthread_cond_init failed with error %d",
errno);
return NULL;
}
return q;
}
void queue_destroy(queue *q)
{
Assert(q != NULL);
pthread_mutex_lock(&(q->queue_mutex));
qnode *q_cur = q->head;
qnode *q_nxt = NULL;
while (q_cur)
{
q_nxt = q_cur->next;
free(q_cur->data);
free(q_cur);
q_cur = q_nxt;
}
pthread_mutex_unlock(&(q->queue_mutex));
free(q);
}
int enqueue(queue *q, void *data)
{
Assert(q);
Assert(data);
pthread_mutex_lock(&(q->queue_mutex));
qnode *node = (qnode *)malloc(sizeof(qnode));
if (node == NULL)
{
write_log("Function enqueue out of memory");
pthread_mutex_unlock(&(q->queue_mutex));
return -1;
}
node->prev = NULL;
node->next = NULL;
node->data = data;
if (q->size == 0)
{
q->head = node;
}
else
{
q->tail->next = node;
node->prev = q->tail;
}
q->tail = node;
q->size++;
if (q->size == 1)
{
pthread_cond_signal(&(q->queue_cond));
}
pthread_mutex_unlock(&(q->queue_mutex));
return 0;
}
/* timeout_ms: 10^-3 second */
void *dequeue(queue *q, int timeout_ms)
{
Assert(q);
struct timeval tv;
struct timespec ts;
int res = 0;
gettimeofday(&tv, NULL);
int64 timeout_ns = (int64)timeout_ms * 1000000;
int64 tv_nsec = tv.tv_usec * 1000 + timeout_ns;
int nsec = tv_nsec / 1000000000;
tv_nsec -= nsec * 1000000000;
ts.tv_sec = tv.tv_sec + nsec;
/* leave in ms for this */
ts.tv_nsec = tv_nsec;
pthread_mutex_lock(&(q->queue_mutex));
while (q->size == 0)
{
res = pthread_cond_timedwait(&(q->queue_cond), &(q->queue_mutex), &ts);
if (res == 0)
{
continue;
}
else if (res == ETIMEDOUT)
{
pthread_mutex_unlock(&(q->queue_mutex));
return NULL;
}
else
{
write_log("Function pthread_cond_timedwait with error %d",
res);
pthread_mutex_unlock(&(q->queue_mutex));
return NULL;
}
}
qnode *node = q->head;
if (NULL == node->next)
{
q->head = NULL;
q->tail = NULL;
}
else
{
q->head = node->next;
node->next->prev = NULL;
}
q->size--;
pthread_mutex_unlock(&(q->queue_mutex));
void *data = node->data;
free(node);
return data;
}
int queue_size(queue *q)
{
Assert(q);
int size = 0;
pthread_mutex_lock(&(q->queue_mutex));
size = q->size;
pthread_mutex_unlock(&(q->queue_mutex));
return size;
}