/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include <proton/messenger.h>
#include <proton/engine.h>
#include <proton/object.h>
#include <assert.h>
#ifndef __cplusplus
#include <stdbool.h>
#endif
#include <stdlib.h>
#include <string.h>
#include "core/util.h"
#include "store.h"

typedef struct pni_stream_t pni_stream_t;

struct pni_store_t {
  pni_stream_t *streams;
  pni_entry_t *store_head;
  pni_entry_t *store_tail;
  pn_hash_t *tracked;
  size_t size;
  unsigned window;
  pn_sequence_t lwm;
  pn_sequence_t hwm;
};

struct pni_stream_t {
  pni_store_t *store;
  pn_string_t *address;
  pni_entry_t *stream_head;
  pni_entry_t *stream_tail;
  pni_stream_t *next;
};

struct pni_entry_t {
  pni_stream_t *stream;
  pni_entry_t *stream_next;
  pni_entry_t *stream_prev;
  pni_entry_t *store_next;
  pni_entry_t *store_prev;
  pn_buffer_t *bytes;
  pn_delivery_t *delivery;
  void *context;
  pn_status_t status;
  pn_sequence_t id;
  bool free;
};

void pni_entry_finalize(void *object)
{
  pni_entry_t *entry = (pni_entry_t *) object;
  assert(entry->free);
  pn_delivery_t *d = entry->delivery;
  if (d) {
    pn_delivery_settle(d);
    pni_entry_set_delivery(entry, NULL);
  }
}

pni_store_t *pni_store()
{
  pni_store_t *store = (pni_store_t *) malloc(sizeof(pni_store_t));
  if (!store) return NULL;

  store->size = 0;
  store->streams = NULL;
  store->store_head = NULL;
  store->store_tail = NULL;
  store->window = 0;
  store->lwm = 0;
  store->hwm = 0;
  store->tracked = pn_hash(PN_OBJECT, 0, 0.75);

  return store;
}

size_t pni_store_size(pni_store_t *store)
{
  assert(store);
  return store->size;
}

pni_stream_t *pni_stream(pni_store_t *store, const char *address, bool create)
{
  assert(store);
  assert(address);

  pni_stream_t *prev = NULL;
  pni_stream_t *stream = store->streams;
  while (stream) {
    if (!strcmp(pn_string_get(stream->address), address)) {
      return stream;
    }
    prev = stream;
    stream = stream->next;
  }

  if (create) {
    stream = (pni_stream_t *) malloc(sizeof(pni_stream_t));
    if (stream != NULL) {
      stream->store = store;
      stream->address = pn_string(address);
      stream->stream_head = NULL;
      stream->stream_tail = NULL;
      stream->next = NULL;

      if (prev) {
        prev->next = stream;
      } else {
        store->streams = stream;
      }
    }
  }

  return stream;
}

pni_stream_t *pni_stream_head(pni_store_t *store)
{
  assert(store);
  return store->streams;
}

pni_stream_t *pni_stream_next(pni_stream_t *stream)
{
  assert(stream);
  return stream->next;
}

void pni_entry_free(pni_entry_t *entry)
{
  if (!entry) return;
  pni_stream_t *stream = entry->stream;
  pni_store_t *store = stream->store;
  LL_REMOVE(stream, stream, entry);
  LL_REMOVE(store, store, entry);
  entry->free = true;

  pn_buffer_free(entry->bytes);
  entry->bytes = NULL;
  pn_decref(entry);
  store->size--;
}

void pni_stream_free(pni_stream_t *stream)
{
  if (!stream) return;
  pni_entry_t *entry;
  while ((entry = LL_HEAD(stream, stream))) {
    pni_entry_free(entry);
  }
  pn_free(stream->address);
  stream->address = NULL;
  free(stream);
}

void pni_store_free(pni_store_t *store)
{
  if (!store) return;
  pn_free(store->tracked);
  pni_stream_t *stream = store->streams;
  while (stream) {
    pni_stream_t *next = stream->next;
    pni_stream_free(stream);
    stream = next;
  }
  free(store);
}

pni_stream_t *pni_stream_put(pni_store_t *store, const char *address)
{
  assert(store); assert(address);
  return pni_stream(store, address, true);
}

pni_stream_t *pni_stream_get(pni_store_t *store, const char *address)
{
  assert(store); assert(address);
  return pni_stream(store, address, false);
}

#define CID_pni_entry CID_pn_object
#define pni_entry_initialize NULL
#define pni_entry_hashcode NULL
#define pni_entry_compare NULL
#define pni_entry_inspect NULL

pni_entry_t *pni_store_put(pni_store_t *store, const char *address)
{
  assert(store);
  static const pn_class_t clazz = PN_CLASS(pni_entry);

  if (!address) address = "";
  pni_stream_t *stream = pni_stream_put(store, address);
  if (!stream) return NULL;
  pni_entry_t *entry = (pni_entry_t *) pn_class_new(&clazz, sizeof(pni_entry_t));
  if (!entry) return NULL;
  entry->stream = stream;
  entry->free = false;
  entry->stream_next = NULL;
  entry->stream_prev = NULL;
  entry->store_next = NULL;
  entry->store_prev = NULL;
  entry->delivery = NULL;
  entry->bytes = pn_buffer(64);
  entry->status = PN_STATUS_UNKNOWN;
  LL_ADD(stream, stream, entry);
  LL_ADD(store, store, entry);
  store->size++;
  return entry;
}

pni_entry_t *pni_store_get(pni_store_t *store, const char *address)
{
  assert(store);
  if (address) {
    pni_stream_t *stream = pni_stream_get(store, address);
    if (!stream) return NULL;
    return LL_HEAD(stream, stream);
  } else {
    return LL_HEAD(store, store);
  }
}

pn_buffer_t *pni_entry_bytes(pni_entry_t *entry)
{
  assert(entry);
  return entry->bytes;
}

pn_status_t pni_entry_get_status(pni_entry_t *entry)
{
  assert(entry);
  return entry->status;
}

void pni_entry_set_status(pni_entry_t *entry, pn_status_t status)
{
  assert(entry);
  entry->status = status;
}

pn_delivery_t *pni_entry_get_delivery(pni_entry_t *entry)
{
  assert(entry);
  return entry->delivery;
}

void pni_entry_set_delivery(pni_entry_t *entry, pn_delivery_t *delivery)
{
  assert(entry);
  if (entry->delivery) {
    pn_delivery_set_context(entry->delivery, NULL);
  }
  entry->delivery = delivery;
  if (delivery) {
    pn_delivery_set_context(delivery, entry);
  }
  pni_entry_updated(entry);
}

void pni_entry_set_context(pni_entry_t *entry, void *context)
{
  assert(entry);
  entry->context = context;
}

void *pni_entry_get_context(pni_entry_t *entry)
{
  assert(entry);
  return entry->context;
}

static pn_status_t disp2status(uint64_t disp)
{
  if (!disp) return PN_STATUS_PENDING;

  switch (disp) {
  case PN_RECEIVED:
    return PN_STATUS_PENDING;
  case PN_ACCEPTED:
    return PN_STATUS_ACCEPTED;
  case PN_REJECTED:
    return PN_STATUS_REJECTED;
  case PN_RELEASED:
    return PN_STATUS_RELEASED;
  case PN_MODIFIED:
    return PN_STATUS_MODIFIED;
  default:
    assert(0);
  }

  return (pn_status_t) 0;
}


void pni_entry_updated(pni_entry_t *entry)
{
  assert(entry);
  pn_delivery_t *d = entry->delivery;
  if (d) {
    if (pn_delivery_remote_state(d)) {
      entry->status = disp2status(pn_delivery_remote_state(d));
    } else if (pn_delivery_settled(d)) {
      uint64_t disp = pn_delivery_local_state(d);
      if (disp) {
        entry->status = disp2status(disp);
      } else {
        entry->status = PN_STATUS_SETTLED;
      }
    } else {
      entry->status = PN_STATUS_PENDING;
    }
  }
}

pn_sequence_t pni_entry_id(pni_entry_t *entry)
{
  assert(entry);
  return entry->id;
}

pni_entry_t *pni_store_entry(pni_store_t *store, pn_sequence_t id)
{
  assert(store);
  return (pni_entry_t *) pn_hash_get(store->tracked, id);
}

bool pni_store_tracking(pni_store_t *store, pn_sequence_t id)
{
  return (id - store->lwm < INT32_MAX) && (store->hwm - id < INT32_MAX + 1u);
}

pn_sequence_t pni_entry_track(pni_entry_t *entry)
{
  assert(entry);

  pni_store_t *store = entry->stream->store;
  entry->id = store->hwm++;
  pn_hash_put(store->tracked, entry->id, entry);

  if (store->window < INT32_MAX) {
    while (store->hwm - store->lwm > store->window) {
      pni_entry_t *e = pni_store_entry(store, store->lwm);
      if (e) {
        pn_hash_del(store->tracked, store->lwm);
      }
      store->lwm++;
    }
  }

  return entry->id;
}

int pni_store_update(pni_store_t *store, pn_sequence_t id, pn_status_t status,
                     int flags, bool settle, bool match)
{
  assert(store);

  if (!pni_store_tracking(store, id)) {
    return 0;
  }

  size_t start;
  if (PN_CUMULATIVE & flags) {
    start = store->lwm;
  } else {
    start = id;
  }

  for (pn_sequence_t i = start; i <= id; i++) {
    pni_entry_t *e = pni_store_entry(store, i);
    if (e) {
      pn_delivery_t *d = e->delivery;
      if (d) {
        if (!pn_delivery_local_state(d)) {
          if (match) {
            pn_delivery_update(d, pn_delivery_remote_state(d));
          } else {
            switch (status) {
            case PN_STATUS_ACCEPTED:
              pn_delivery_update(d, PN_ACCEPTED);
              break;
            case PN_STATUS_REJECTED:
              pn_delivery_update(d, PN_REJECTED);
              break;
            default:
              break;
            }
          }

          pni_entry_updated(e);
        }
      }
      if (settle) {
        if (d) {
          pn_delivery_settle(d);
        }
        pn_hash_del(store->tracked, e->id);
      }
    }
  }

  while (store->hwm - store->lwm > 0 &&
         !pn_hash_get(store->tracked, store->lwm)) {
    store->lwm++;
  }

  return 0;
}

int pni_store_get_window(pni_store_t *store)
{
  assert(store);
  return store->window;
}

void pni_store_set_window(pni_store_t *store, int window)
{
  assert(store);
  store->window = window;
}
