blob: 6f6e77233e4255f4a44328eb77f89dde651cd4e2 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <proton/messenger.h>
#include <proton/engine.h>
#include <proton/object.h>
#include <assert.h>
#ifndef __cplusplus
#include <stdbool.h>
#endif
#include <stdlib.h>
#include <string.h>
#include "core/util.h"
#include "store.h"
typedef struct pni_stream_t pni_stream_t;
struct pni_store_t {
pni_stream_t *streams;
pni_entry_t *store_head;
pni_entry_t *store_tail;
pn_hash_t *tracked;
size_t size;
unsigned window;
pn_sequence_t lwm;
pn_sequence_t hwm;
};
struct pni_stream_t {
pni_store_t *store;
pn_string_t *address;
pni_entry_t *stream_head;
pni_entry_t *stream_tail;
pni_stream_t *next;
};
struct pni_entry_t {
pni_stream_t *stream;
pni_entry_t *stream_next;
pni_entry_t *stream_prev;
pni_entry_t *store_next;
pni_entry_t *store_prev;
pn_buffer_t *bytes;
pn_delivery_t *delivery;
void *context;
pn_status_t status;
pn_sequence_t id;
bool free;
};
void pni_entry_finalize(void *object)
{
pni_entry_t *entry = (pni_entry_t *) object;
assert(entry->free);
pn_delivery_t *d = entry->delivery;
if (d) {
pn_delivery_settle(d);
pni_entry_set_delivery(entry, NULL);
}
}
pni_store_t *pni_store()
{
pni_store_t *store = (pni_store_t *) malloc(sizeof(pni_store_t));
if (!store) return NULL;
store->size = 0;
store->streams = NULL;
store->store_head = NULL;
store->store_tail = NULL;
store->window = 0;
store->lwm = 0;
store->hwm = 0;
store->tracked = pn_hash(PN_OBJECT, 0, 0.75);
return store;
}
size_t pni_store_size(pni_store_t *store)
{
assert(store);
return store->size;
}
pni_stream_t *pni_stream(pni_store_t *store, const char *address, bool create)
{
assert(store);
assert(address);
pni_stream_t *prev = NULL;
pni_stream_t *stream = store->streams;
while (stream) {
if (!strcmp(pn_string_get(stream->address), address)) {
return stream;
}
prev = stream;
stream = stream->next;
}
if (create) {
stream = (pni_stream_t *) malloc(sizeof(pni_stream_t));
if (stream != NULL) {
stream->store = store;
stream->address = pn_string(address);
stream->stream_head = NULL;
stream->stream_tail = NULL;
stream->next = NULL;
if (prev) {
prev->next = stream;
} else {
store->streams = stream;
}
}
}
return stream;
}
pni_stream_t *pni_stream_head(pni_store_t *store)
{
assert(store);
return store->streams;
}
pni_stream_t *pni_stream_next(pni_stream_t *stream)
{
assert(stream);
return stream->next;
}
void pni_entry_free(pni_entry_t *entry)
{
if (!entry) return;
pni_stream_t *stream = entry->stream;
pni_store_t *store = stream->store;
LL_REMOVE(stream, stream, entry);
LL_REMOVE(store, store, entry);
entry->free = true;
pn_buffer_free(entry->bytes);
entry->bytes = NULL;
pn_decref(entry);
store->size--;
}
void pni_stream_free(pni_stream_t *stream)
{
if (!stream) return;
pni_entry_t *entry;
while ((entry = LL_HEAD(stream, stream))) {
pni_entry_free(entry);
}
pn_free(stream->address);
stream->address = NULL;
free(stream);
}
void pni_store_free(pni_store_t *store)
{
if (!store) return;
pn_free(store->tracked);
pni_stream_t *stream = store->streams;
while (stream) {
pni_stream_t *next = stream->next;
pni_stream_free(stream);
stream = next;
}
free(store);
}
pni_stream_t *pni_stream_put(pni_store_t *store, const char *address)
{
assert(store); assert(address);
return pni_stream(store, address, true);
}
pni_stream_t *pni_stream_get(pni_store_t *store, const char *address)
{
assert(store); assert(address);
return pni_stream(store, address, false);
}
#define CID_pni_entry CID_pn_object
#define pni_entry_initialize NULL
#define pni_entry_hashcode NULL
#define pni_entry_compare NULL
#define pni_entry_inspect NULL
pni_entry_t *pni_store_put(pni_store_t *store, const char *address)
{
assert(store);
static const pn_class_t clazz = PN_CLASS(pni_entry);
if (!address) address = "";
pni_stream_t *stream = pni_stream_put(store, address);
if (!stream) return NULL;
pni_entry_t *entry = (pni_entry_t *) pn_class_new(&clazz, sizeof(pni_entry_t));
if (!entry) return NULL;
entry->stream = stream;
entry->free = false;
entry->stream_next = NULL;
entry->stream_prev = NULL;
entry->store_next = NULL;
entry->store_prev = NULL;
entry->delivery = NULL;
entry->bytes = pn_buffer(64);
entry->status = PN_STATUS_UNKNOWN;
LL_ADD(stream, stream, entry);
LL_ADD(store, store, entry);
store->size++;
return entry;
}
pni_entry_t *pni_store_get(pni_store_t *store, const char *address)
{
assert(store);
if (address) {
pni_stream_t *stream = pni_stream_get(store, address);
if (!stream) return NULL;
return LL_HEAD(stream, stream);
} else {
return LL_HEAD(store, store);
}
}
pn_buffer_t *pni_entry_bytes(pni_entry_t *entry)
{
assert(entry);
return entry->bytes;
}
pn_status_t pni_entry_get_status(pni_entry_t *entry)
{
assert(entry);
return entry->status;
}
void pni_entry_set_status(pni_entry_t *entry, pn_status_t status)
{
assert(entry);
entry->status = status;
}
pn_delivery_t *pni_entry_get_delivery(pni_entry_t *entry)
{
assert(entry);
return entry->delivery;
}
void pni_entry_set_delivery(pni_entry_t *entry, pn_delivery_t *delivery)
{
assert(entry);
if (entry->delivery) {
pn_delivery_set_context(entry->delivery, NULL);
}
entry->delivery = delivery;
if (delivery) {
pn_delivery_set_context(delivery, entry);
}
pni_entry_updated(entry);
}
void pni_entry_set_context(pni_entry_t *entry, void *context)
{
assert(entry);
entry->context = context;
}
void *pni_entry_get_context(pni_entry_t *entry)
{
assert(entry);
return entry->context;
}
static pn_status_t disp2status(uint64_t disp)
{
if (!disp) return PN_STATUS_PENDING;
switch (disp) {
case PN_RECEIVED:
return PN_STATUS_PENDING;
case PN_ACCEPTED:
return PN_STATUS_ACCEPTED;
case PN_REJECTED:
return PN_STATUS_REJECTED;
case PN_RELEASED:
return PN_STATUS_RELEASED;
case PN_MODIFIED:
return PN_STATUS_MODIFIED;
default:
assert(0);
}
return (pn_status_t) 0;
}
void pni_entry_updated(pni_entry_t *entry)
{
assert(entry);
pn_delivery_t *d = entry->delivery;
if (d) {
if (pn_delivery_remote_state(d)) {
entry->status = disp2status(pn_delivery_remote_state(d));
} else if (pn_delivery_settled(d)) {
uint64_t disp = pn_delivery_local_state(d);
if (disp) {
entry->status = disp2status(disp);
} else {
entry->status = PN_STATUS_SETTLED;
}
} else {
entry->status = PN_STATUS_PENDING;
}
}
}
pn_sequence_t pni_entry_id(pni_entry_t *entry)
{
assert(entry);
return entry->id;
}
pni_entry_t *pni_store_entry(pni_store_t *store, pn_sequence_t id)
{
assert(store);
return (pni_entry_t *) pn_hash_get(store->tracked, id);
}
bool pni_store_tracking(pni_store_t *store, pn_sequence_t id)
{
return (id - store->lwm < INT32_MAX) && (store->hwm - id < INT32_MAX + 1u);
}
pn_sequence_t pni_entry_track(pni_entry_t *entry)
{
assert(entry);
pni_store_t *store = entry->stream->store;
entry->id = store->hwm++;
pn_hash_put(store->tracked, entry->id, entry);
if (store->window < INT32_MAX) {
while (store->hwm - store->lwm > store->window) {
pni_entry_t *e = pni_store_entry(store, store->lwm);
if (e) {
pn_hash_del(store->tracked, store->lwm);
}
store->lwm++;
}
}
return entry->id;
}
int pni_store_update(pni_store_t *store, pn_sequence_t id, pn_status_t status,
int flags, bool settle, bool match)
{
assert(store);
if (!pni_store_tracking(store, id)) {
return 0;
}
size_t start;
if (PN_CUMULATIVE & flags) {
start = store->lwm;
} else {
start = id;
}
for (pn_sequence_t i = start; i <= id; i++) {
pni_entry_t *e = pni_store_entry(store, i);
if (e) {
pn_delivery_t *d = e->delivery;
if (d) {
if (!pn_delivery_local_state(d)) {
if (match) {
pn_delivery_update(d, pn_delivery_remote_state(d));
} else {
switch (status) {
case PN_STATUS_ACCEPTED:
pn_delivery_update(d, PN_ACCEPTED);
break;
case PN_STATUS_REJECTED:
pn_delivery_update(d, PN_REJECTED);
break;
default:
break;
}
}
pni_entry_updated(e);
}
}
if (settle) {
if (d) {
pn_delivery_settle(d);
}
pn_hash_del(store->tracked, e->id);
}
}
}
while (store->hwm - store->lwm > 0 &&
!pn_hash_get(store->tracked, store->lwm)) {
store->lwm++;
}
return 0;
}
int pni_store_get_window(pni_store_t *store)
{
assert(store);
return store->window;
}
void pni_store_set_window(pni_store_t *store, int window)
{
assert(store);
store->window = window;
}