/*
 * Copyright 2004,2005 The Apache Software Foundation.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
 
#include <savan_subs_mgr.h>
#include <axutil_log.h>
#include <axutil_hash.h>
#include <axutil_property.h>
#include <axutil_types.h>
#include <axutil_file_handler.h>
#include <platforms/axutil_platform_auto_sense.h>
#include <savan_constants.h>
#include <savan_util.h>
#include <savan_error.h>
#include <remote_registry.h>
#include <remote_registry_resource.h>

#define SYNAPSE_NAMESPACE "http://ws.apache.org/ns/synapse"
#define SYNAPSE_NS_PREFIX "syn"
#define ELEM_NAME_SUBSCRIPTION "subscription"
#define ELEM_NAME_ENDPOINT "endpoint"
#define ELEM_NAME_ADDRESS "address"
#define ATTR_NAME_URL "uri"
#define EPR_TYPE "application/vnd.epr"
#define TOPIC_INDEX_PARENT_PATH "/eventing/index"
#define SUBSCRIPTION_COLLECTION_NAME "system.subscriptions"
#define TOPIC_INDEX "/eventing/index/TopicIndex"

/**
 * Savan registry based subscription manager dependes on the WSO2 registry for subscription storation.
 * This use WSF/C registry cache client to communicate with the WSO2 registry. Registry cache
 * client reduce the overhead of each time fetching records from the registry which is very 
 * expensive by caching the records locally.
 */
/** 
 * @brief Savan Registry Storage Manager Struct Impl
 *   Savan Registry Storage Manager 
 */
typedef struct savan_registry_subs_mgr
{
    savan_subs_mgr_t subsmgr;
    axis2_char_t *reg_url;
    axis2_char_t *username;
    axis2_char_t *password;
    axis2_conf_t *conf;
    remote_registry_t *remote_registry;
} savan_registry_subs_mgr_t;

typedef AXIS2_DECLARE_DATA struct savan_registry_subs_mgr_args
{
    const axutil_env_t *env;
    void *data;
} savan_registry_subs_mgr_args_t;

#define SAVAN_INTF_TO_IMPL(subsmgr) ((savan_registry_subs_mgr_t *) subsmgr)

static savan_subscriber_t *savan_registry_subs_mgr_extract_subscriber(
        const axutil_env_t *env,
        remote_registry_resource_t *resource);

static axis2_char_t *savan_registry_subs_mgr_serialize_endpoint(
        const axutil_env_t *env,
        const savan_subscriber_t *subscriber);

static axis2_status_t
savan_registry_subs_mgr_init_resource(
    savan_subs_mgr_t *subsmgr,
    const axutil_env_t *env);

AXIS2_EXTERN void AXIS2_CALL
savan_registry_subs_mgr_free(
    savan_subs_mgr_t *subsmgr,
    const axutil_env_t *env);

AXIS2_EXTERN axis2_status_t AXIS2_CALL
savan_registry_subs_mgr_insert_subscriber(
    savan_subs_mgr_t *subsmgr,
    const axutil_env_t *env,
    savan_subscriber_t *subscriber);

AXIS2_EXTERN axis2_status_t AXIS2_CALL
savan_registry_subs_mgr_update_subscriber(
    savan_subs_mgr_t *subsmgr,
    const axutil_env_t *env,
    savan_subscriber_t *subscriber);

AXIS2_EXTERN axis2_status_t AXIS2_CALL
savan_registry_subs_mgr_remove_subscriber(
    savan_subs_mgr_t *subsmgr,
    const axutil_env_t *env,
    const axis2_char_t *subscriber_id);

AXIS2_EXTERN savan_subscriber_t *AXIS2_CALL
savan_registry_subs_mgr_retrieve_subscriber(
    savan_subs_mgr_t *subsmgr,
    const axutil_env_t *env,
    const axis2_char_t *subscriber_id);

AXIS2_EXTERN axutil_array_list_t * AXIS2_CALL
savan_registry_subs_mgr_retrieve_all_subscribers(
    savan_subs_mgr_t *subsmgr,
    const axutil_env_t *env,
    const axis2_char_t *filter);

static const savan_subs_mgr_ops_t subs_mgr_ops = 
{
    savan_registry_subs_mgr_free,
    savan_registry_subs_mgr_insert_subscriber,
    savan_registry_subs_mgr_update_subscriber,
    savan_registry_subs_mgr_remove_subscriber,
    savan_registry_subs_mgr_retrieve_subscriber,
    savan_registry_subs_mgr_retrieve_all_subscribers,
    NULL
};

AXIS2_EXTERN savan_subs_mgr_t * AXIS2_CALL
savan_subs_mgr_create(
    const axutil_env_t *env,
    axis2_conf_t *conf)
{
    savan_registry_subs_mgr_t *subsmgrimpl = NULL;
    axis2_status_t status = AXIS2_FAILURE;
    
    subsmgrimpl = AXIS2_MALLOC(env->allocator, sizeof(savan_registry_subs_mgr_t));
    if (!subsmgrimpl)
    {
        AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_STORAGE_MANAGER_CREATION_FAILED, AXIS2_FAILURE);
        return NULL;
    }

    memset ((void *) subsmgrimpl, 0, sizeof(savan_registry_subs_mgr_t));

    subsmgrimpl->remote_registry = NULL;
    subsmgrimpl->reg_url = axutil_strdup(env, savan_util_get_resource_connection_string(env, conf));
    subsmgrimpl->username = axutil_strdup(env, savan_util_get_resource_username(env, conf));
    subsmgrimpl->password = axutil_strdup(env, savan_util_get_resource_password(env, conf));
    subsmgrimpl->conf = conf;
    subsmgrimpl->subsmgr.ops = &subs_mgr_ops;

    status = savan_registry_subs_mgr_init_resource((savan_subs_mgr_t *) subsmgrimpl, env);
    if(status != AXIS2_SUCCESS)
    {
        savan_registry_subs_mgr_free((savan_subs_mgr_t *) subsmgrimpl, env);
        return NULL;
    }
    return (savan_subs_mgr_t *) subsmgrimpl;
}

AXIS2_EXTERN savan_subs_mgr_t * AXIS2_CALL
savan_subs_mgr_create_with_connection_info(
    const axutil_env_t *env,
    axis2_char_t *connection_string,
    axis2_char_t *username,
    axis2_char_t *password)
{
    savan_registry_subs_mgr_t *subsmgrimpl = NULL;
    axis2_status_t status = AXIS2_FAILURE;
    
    subsmgrimpl = AXIS2_MALLOC(env->allocator, sizeof(savan_registry_subs_mgr_t));
    if (!subsmgrimpl)
    {
        AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_STORAGE_MANAGER_CREATION_FAILED, AXIS2_FAILURE);
        return NULL;
    }

    memset ((void *) subsmgrimpl, 0, sizeof(savan_registry_subs_mgr_t));

    subsmgrimpl->remote_registry = NULL;
    subsmgrimpl->reg_url = axutil_strdup(env, connection_string);
    subsmgrimpl->username = axutil_strdup(env, username);
    subsmgrimpl->password = axutil_strdup(env, password);
    subsmgrimpl->conf = NULL;
    subsmgrimpl->subsmgr.ops = &subs_mgr_ops;

    status = savan_registry_subs_mgr_init_resource((savan_subs_mgr_t *) subsmgrimpl, env);
    if(status != AXIS2_SUCCESS)
    {
        savan_registry_subs_mgr_free((savan_subs_mgr_t *) subsmgrimpl, env);
        return NULL;
    }
    return (savan_subs_mgr_t *) subsmgrimpl;
}

AXIS2_EXTERN void AXIS2_CALL
savan_registry_subs_mgr_free(
    savan_subs_mgr_t *subsmgr,
    const axutil_env_t *env)
{
    savan_registry_subs_mgr_t *subsmgrimpl = NULL;
    subsmgrimpl = SAVAN_INTF_TO_IMPL(subsmgr);

    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_registry_subs_mgr_free");

    if(subsmgrimpl->reg_url)
    {
        AXIS2_FREE(env->allocator, subsmgrimpl->reg_url);
        subsmgrimpl->reg_url = NULL;
    }
    
    if(subsmgrimpl->username)
    {
        AXIS2_FREE(env->allocator, subsmgrimpl->username);
        subsmgrimpl->username = NULL;
    }
    
    if(subsmgrimpl->password)
    {
        AXIS2_FREE(env->allocator, subsmgrimpl->password);
        subsmgrimpl->password = NULL;
    }

    subsmgrimpl->conf = NULL;

    if(subsmgrimpl)
    {
        AXIS2_FREE(env->allocator, subsmgrimpl);
        subsmgrimpl = NULL;
    }

    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:savan_registry_subs_mgr_free");
}

AXIS2_EXTERN axis2_status_t AXIS2_CALL
savan_registry_subs_mgr_insert_subscriber(
    savan_subs_mgr_t *subsmgr,
    const axutil_env_t *env,
    savan_subscriber_t *subscriber)
{
    axis2_char_t *subscription_id = NULL;
    axis2_char_t *expires = NULL;
    axis2_char_t *filter = NULL;
    axis2_char_t *id = NULL;
    axis2_char_t *path = NULL; 
    remote_registry_resource_t *res = NULL;
    axutil_hash_t *properties = NULL;
    char *content = NULL;
    
    savan_registry_subs_mgr_t *subsmgrimpl = NULL;
    subsmgrimpl = SAVAN_INTF_TO_IMPL(subsmgr);

    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
            "[savan] Entry:savan_registry_subs_mgr_insert_subscriber");
	
    subscription_id = savan_subscriber_get_id(subscriber, env);
    filter = savan_subscriber_get_filter(subscriber, env);
    path = axutil_strcat(env, AXIS2_PATH_SEP_STR, filter, AXIS2_PATH_SEP_STR, 
            SUBSCRIPTION_COLLECTION_NAME, AXIS2_PATH_SEP_STR, subscription_id, NULL);
    id = axutil_strcat(env, subsmgrimpl->reg_url, AXIS2_PATH_SEP_STR, filter, 
            AXIS2_PATH_SEP_STR, SUBSCRIPTION_COLLECTION_NAME, AXIS2_PATH_SEP_STR, subscription_id, 
            NULL);

    res = remote_registry_resource_create(env);
    content = savan_registry_subs_mgr_serialize_endpoint(env, subscriber);
    remote_registry_resource_set_content(res, env, content);
    remote_registry_resource_set_content_len(res, env, axutil_strlen(content));
    remote_registry_resource_set_media_type(res, env, EPR_TYPE);
    remote_registry_resource_set_description(res, env, "");

    properties = axutil_hash_make(env);
    if(properties)
    {
        axis2_char_t *endto = NULL;
        axis2_endpoint_ref_t *endto_epr = NULL;
        axis2_char_t *filter_dialect = NULL;


        expires = savan_subscriber_get_expires(subscriber, env);
        if(expires)
        {
            axutil_hash_set(properties, axutil_strdup(env, "expires"), AXIS2_HASH_KEY_STRING, 
                    axutil_strdup(env, expires));
        }
        else
        {
            axutil_hash_set(properties, axutil_strdup(env, "expires"), AXIS2_HASH_KEY_STRING, 
                    axutil_strdup(env, "*"));
        }

        axutil_hash_set(properties, axutil_strdup(env, "staticFlag"), AXIS2_HASH_KEY_STRING, 
                axutil_strdup(env, "false"));
        axutil_hash_set(properties, axutil_strdup(env, "filterValue"), AXIS2_HASH_KEY_STRING, 
                axutil_strdup(env, filter));

        endto_epr = savan_subscriber_get_end_to(subscriber, env);
        if(endto_epr)
        {
            endto = (axis2_char_t *) axis2_endpoint_ref_get_address(endto_epr, env);
        }
        if(endto)
        {
            axutil_hash_set(properties, axutil_strdup(env, "subManagerURI"), AXIS2_HASH_KEY_STRING, 
                axutil_strdup(env, endto));
        }

        filter_dialect = savan_subscriber_get_filter_dialect(subscriber, env);
        if(filter_dialect)
        {
            axutil_hash_set(properties, axutil_strdup(env, "filterDialect"), AXIS2_HASH_KEY_STRING, 
                    axutil_strdup(env, filter_dialect));
        }

        remote_registry_resource_set_properties(res, env, properties);
    }

    remote_registry_put(subsmgrimpl->remote_registry, env, path, res);
    if(id)
    {
        AXIS2_FREE(env->allocator, id);
    }
    if(path)
    {
        AXIS2_FREE(env->allocator, path);
        path = NULL;
    }
    if(res)
    {
        remote_registry_resource_free(res, env);
        res = NULL;
    }

    res = remote_registry_get(subsmgrimpl->remote_registry, env, TOPIC_INDEX, NULL);
    if(!res)
    {
        AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_SUBSCRIBER_INSERT_ERROR, AXIS2_FAILURE);
        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Could not retrive resource TopicIndex");
        return AXIS2_FAILURE;
    }
    id = axutil_strcat(env, subsmgrimpl->reg_url, TOPIC_INDEX, NULL);
    properties = remote_registry_resource_get_properties(res, env);
    if(properties)
    {
        path = axutil_strcat(env, AXIS2_PATH_SEP_STR, filter, AXIS2_PATH_SEP_STR, 
                SUBSCRIPTION_COLLECTION_NAME, NULL);
        axutil_hash_set(properties, subscription_id, AXIS2_HASH_KEY_STRING, path);
        remote_registry_resource_set_properties(res, env, properties);
    }

    remote_registry_resource_set_content(res, env, NULL);
    remote_registry_resource_set_content_len(res, env, 0);
    remote_registry_put(subsmgrimpl->remote_registry, env, TOPIC_INDEX, res);

    if(id)
    {
        AXIS2_FREE(env->allocator, id);
    }
    if(res)
    {
        remote_registry_resource_free(res, env);
        res = NULL;
    }

    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
            "[savan] Exit:savan_registry_subs_mgr_insert_subscriber");
    return AXIS2_SUCCESS;
}

AXIS2_EXTERN axis2_status_t AXIS2_CALL
savan_registry_subs_mgr_update_subscriber(
    savan_subs_mgr_t *subsmgr,
    const axutil_env_t *env,
    savan_subscriber_t *subscriber)
{
    axis2_char_t *expires = NULL;
    axis2_char_t *filter = NULL;
    axis2_char_t *subscriber_id = NULL;
    axis2_char_t *path = NULL; 
    remote_registry_resource_t *res = NULL;
    remote_registry_resource_t *index_res = NULL;
    axutil_hash_t *properties = NULL;
    axis2_char_t *val = NULL;
 
    savan_registry_subs_mgr_t *subsmgrimpl = NULL;
    subsmgrimpl = SAVAN_INTF_TO_IMPL(subsmgr);

    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
            "[savan] Entry:savan_registry_subs_mgr_update_subscriber");

    index_res = remote_registry_get(subsmgrimpl->remote_registry, env, TOPIC_INDEX, NULL);
    if(!index_res)
    {
        AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_SUBSCRIBER_INSERT_ERROR, AXIS2_FAILURE);
        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Could not retrive resource TopicIndex");
        return AXIS2_FAILURE;
    }

    subscriber_id = savan_subscriber_get_id(subscriber, env);
    filter = savan_subscriber_get_filter(subscriber, env);
    val = remote_registry_resource_get_property(index_res, env, subscriber_id);
    if(val)
    {
        path = axutil_strcat(env, val, AXIS2_PATH_SEP_STR, subscriber_id, NULL);
    }

    res = remote_registry_get(subsmgrimpl->remote_registry, env, path, NULL);
    if(!res)
    {
        AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_SUBSCRIBER_UPDATE_ERROR, AXIS2_FAILURE);
        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
                "[savan] Could not retrive resource subscriber path:%s in registry", path);
        return AXIS2_FAILURE;
    }

    remote_registry_resource_set_description(res, env, "");
    properties = remote_registry_resource_get_properties(res, env);
    if(properties)
    {
        axis2_char_t *endto = NULL;
        axis2_endpoint_ref_t *endto_epr = NULL;
        axis2_char_t *filter_dialect = NULL;

        expires = savan_subscriber_get_expires(subscriber, env);
        if(expires)
        {
            axutil_hash_set(properties, axutil_strdup(env, "expires"), AXIS2_HASH_KEY_STRING, 
                    axutil_strdup(env, expires));
        }

        axutil_hash_set(properties, axutil_strdup(env, "staticFlag"), AXIS2_HASH_KEY_STRING, 
                axutil_strdup(env, "false"));
        axutil_hash_set(properties, axutil_strdup(env, "filterValue"), AXIS2_HASH_KEY_STRING, 
                axutil_strdup(env, filter));

        endto_epr = savan_subscriber_get_end_to(subscriber, env);
        if(endto_epr)
        {
            endto = (axis2_char_t *) axis2_endpoint_ref_get_address(endto_epr, env);
        }
        if(endto)
        {
            axutil_hash_set(properties, axutil_strdup(env, "subManagerURI"), AXIS2_HASH_KEY_STRING, 
                axutil_strdup(env, endto));
        }

        filter_dialect = savan_subscriber_get_filter_dialect(subscriber, env);
        if(filter_dialect)
        {
            axutil_hash_set(properties, axutil_strdup(env, "filterDialect"), AXIS2_HASH_KEY_STRING, 
                    axutil_strdup(env, filter_dialect));
        }

        remote_registry_resource_set_properties(res, env, properties);
    }

    remote_registry_put(subsmgrimpl->remote_registry, env, path, res);
    
    if(path)
    {
        AXIS2_FREE(env->allocator, path);
        path = NULL;
    }
    if(res)
    {
        remote_registry_resource_free(res, env);
        res = NULL;
    }

    savan_subscriber_set_renew_status(subscriber, env, AXIS2_TRUE);
    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
            "[savan] Exit:savan_registry_subs_mgr_update_subscriber");
    return AXIS2_SUCCESS;
}

AXIS2_EXTERN axis2_status_t AXIS2_CALL
savan_registry_subs_mgr_remove_subscriber(
    savan_subs_mgr_t *subsmgr,
    const axutil_env_t *env,
    const axis2_char_t *subscriber_id)
{
    remote_registry_resource_t *res = NULL;
    axis2_char_t *val;
    axis2_status_t status = AXIS2_FAILURE;
    axis2_char_t *path = NULL;

    savan_registry_subs_mgr_t *subsmgrimpl = NULL;
    subsmgrimpl = SAVAN_INTF_TO_IMPL(subsmgr);

    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
            "[savan] Entry:savan_registry_subs_mgr_remove_subscriber");

    res = remote_registry_get(subsmgrimpl->remote_registry, env, TOPIC_INDEX, NULL);
    if(!res)
    {
        AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_SUBSCRIBER_UPDATE_ERROR, AXIS2_FAILURE);
        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
                "[savan] Could not retrive resource TopicIndex");
        return AXIS2_FAILURE;
    }

    val = remote_registry_resource_get_property(res, env, subscriber_id);
    if(val)
    {
        path = axutil_strcat(env, val, AXIS2_PATH_SEP_STR, subscriber_id, NULL);
        status = remote_registry_delete(subsmgrimpl->remote_registry, env, path);
    }
    
    remote_registry_resource_free(res, env);
    if(AXIS2_SUCCESS != status)
    {
        AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_SUBSCRIBER_REMOVE_ERROR, AXIS2_FAILURE);
        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
                "[savan] Could not remove subscriber for id %s", subscriber_id);
        return AXIS2_FAILURE;
    }

    res = remote_registry_get(subsmgrimpl->remote_registry, env, TOPIC_INDEX, NULL);
    if(!res)
    {
        AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_SUBSCRIBER_UPDATE_ERROR, AXIS2_FAILURE);
        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
                "[savan] Could not retrive resource TopicIndex");
        return AXIS2_FAILURE;
    }

    remote_registry_resource_remove_property(res, env, subscriber_id);
    remote_registry_put(subsmgrimpl->remote_registry, env, TOPIC_INDEX, res);
    remote_registry_resource_free(res, env);

    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
            "[savan] Exit:savan_registry_subs_mgr_remove_subscriber");
    return AXIS2_SUCCESS;
}

AXIS2_EXTERN savan_subscriber_t *AXIS2_CALL
savan_registry_subs_mgr_retrieve_subscriber(
    savan_subs_mgr_t *subsmgr,
    const axutil_env_t *env,
    const axis2_char_t *subscriber_id)
{
    remote_registry_resource_t *res = NULL;
    remote_registry_resource_t *root_res = NULL;
    axis2_char_t *val;
    axis2_char_t *path = NULL;
    savan_subscriber_t *subscriber = NULL;

    savan_registry_subs_mgr_t *subsmgrimpl = NULL;
    subsmgrimpl = SAVAN_INTF_TO_IMPL(subsmgr);


    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
            "[savan] Entry:savan_registry_subs_mgr_retrieve_subscriber");

    root_res = remote_registry_get(subsmgrimpl->remote_registry, env, TOPIC_INDEX, NULL);
    if(!root_res)
    {
        AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_SUBSCRIBER_UPDATE_ERROR, AXIS2_FAILURE);
        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
                "[savan] Could not retrive resource TopicIndex");
        return AXIS2_FAILURE;
    }

    val = remote_registry_resource_get_property(root_res, env, subscriber_id);
    if(val)
    {
        path = axutil_strcat(env, val, AXIS2_PATH_SEP_STR, subscriber_id, NULL);
        res = remote_registry_get(subsmgrimpl->remote_registry, env, path, NULL);
    }

    if(res)
    {
        subscriber = savan_registry_subs_mgr_extract_subscriber(env, res);
        if(subscriber)
        {
            savan_subscriber_set_id(subscriber, env, subscriber_id);
        }
    }

    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
            "[savan] Exit:savan_registry_subs_mgr_retrieve_subscriber");
    return subscriber;
}

AXIS2_EXTERN axutil_array_list_t * AXIS2_CALL
savan_registry_subs_mgr_retrieve_all_subscribers(
    savan_subs_mgr_t *subsmgr,
    const axutil_env_t *env,
    const axis2_char_t *filter)
{
    remote_registry_resource_t *root_res = NULL;
    axis2_char_t *path = NULL;
    axutil_array_list_t *data_list = NULL;

    savan_registry_subs_mgr_t *subsmgrimpl = NULL;
    subsmgrimpl = SAVAN_INTF_TO_IMPL(subsmgr);
 
    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
            "[savan] Entry:savan_registry_subs_mgr_retrieve_all_subscribers");

    /* Get subscribers for the filter from registry. Eg. /weather/4/system.subscriptions */
    if(filter)
    {
        root_res = remote_registry_get(subsmgrimpl->remote_registry, env, 
                (axis2_char_t *) filter, NULL);
        if(root_res)
        {
            axutil_array_list_t *child_entries = NULL;
            int i = 0;

            child_entries = remote_registry_resource_get_entries(root_res, env);
            if(child_entries)
            {
                data_list = axutil_array_list_create(env, 0);

                /* load the child entries recursively */
                for(i = 0; i < axutil_array_list_size(child_entries, env); i ++)
                {
                    remote_registry_resource_t *res = NULL;

                    res = (remote_registry_resource_t*)axutil_array_list_get(child_entries, env, i);
                    if(res)
                    {
                        savan_subscriber_t *subscriber = NULL;

                        subscriber = savan_registry_subs_mgr_extract_subscriber(env, res);
                        axutil_array_list_add(data_list, env, subscriber);
                    }
                }
            }
        }
    }
    else /* Get subscribers for the topic root from registry(/eventing/index/TopicIndex);*/
    {
        axutil_hash_t *properties = NULL;

        root_res = remote_registry_get(subsmgrimpl->remote_registry, env, TOPIC_INDEX, NULL);
        if(!root_res)
        {
            AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_SUBSCRIBER_UPDATE_ERROR, AXIS2_FAILURE);
            AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
                    "[savan] Could not retrive resource TopicIndex");
            return AXIS2_FAILURE;
        }
        properties = remote_registry_resource_get_properties(root_res, env);
        if(properties)
        {
            savan_subscriber_t *subscriber = NULL;
            axutil_hash_index_t *hi;
            void *val;
            void *key;

            data_list = axutil_array_list_create(env, 0);

            for (hi = axutil_hash_first(properties, env); hi; hi = axutil_hash_next(env, hi)) 
            {
                remote_registry_resource_t *res = NULL;
                axis2_char_t *subscriber_id = NULL;

                axutil_hash_this(hi, (const void**)&key, NULL, &val);
                subscriber_id = (axis2_char_t *) key;
                path = axutil_strcat(env, val, AXIS2_PATH_SEP_STR, key, NULL);
                res = remote_registry_get(subsmgrimpl->remote_registry, env, path, NULL);
                if(res)
                {
                    if(res)
                    {
                        subscriber = savan_registry_subs_mgr_extract_subscriber(env, res);
                        savan_subscriber_set_id(subscriber, env, subscriber_id);
                        axutil_array_list_add(data_list, env, subscriber);
                    }
                }
            }
        }
    }

    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
            "[savan] Exit:savan_registry_subs_mgr_retrieve_all_subscribers");
    return data_list;
}

static axis2_status_t
savan_registry_subs_mgr_init_resource(
    savan_subs_mgr_t *subsmgr,
    const axutil_env_t *env)
{
    remote_registry_resource_t *res = NULL;
    axis2_char_t *id = NULL;
    
    savan_registry_subs_mgr_t *subsmgrimpl = NULL;
    subsmgrimpl = SAVAN_INTF_TO_IMPL(subsmgr);

    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_registry_subs_mgr_init_resource");

    subsmgrimpl->remote_registry = remote_registry_create(env, subsmgrimpl->reg_url, 
            subsmgrimpl->username, subsmgrimpl->password);
    if(!subsmgrimpl->remote_registry)
    {
        AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_DATABASE_CREATION_ERROR, AXIS2_FAILURE);
        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
                "[savan] Creating remote registry instance failed");
        return AXIS2_FAILURE;
    }

    res = remote_registry_get(subsmgrimpl->remote_registry, env, TOPIC_INDEX, NULL);
    if(!res)
    {
        res = remote_registry_resource_create(env);
        id = axutil_strcat(env, subsmgrimpl->reg_url, TOPIC_INDEX, NULL);
        remote_registry_resource_set_properties(res, env, axutil_hash_make(env));
        remote_registry_resource_set_description(res, env, "");
        remote_registry_put(subsmgrimpl->remote_registry, env, TOPIC_INDEX, res);
    }

    if(id)
    {
        AXIS2_FREE(env->allocator, id);
    }
    if(res)
    {
        remote_registry_resource_free(res, env);
    }

    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:savan_registry_subs_mgr_init_resource");

    return AXIS2_SUCCESS;
}

static axis2_char_t *savan_registry_subs_mgr_serialize_endpoint(
        const axutil_env_t *env,
        const savan_subscriber_t *subscriber)
{
    axiom_namespace_t *ns = NULL;
    axiom_node_t *subs_node = NULL;
    axiom_element_t *subs_elem = NULL;
    axiom_node_t *endpoint_node = NULL;
    axiom_element_t *endpoint_elem = NULL;
    axiom_node_t *addr_node = NULL;
    axiom_element_t *addr_elem = NULL;
    axiom_attribute_t *url_attr = NULL;
    axis2_char_t *notifyto = NULL;
    char *content = NULL;
    
    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
            "[savan] Entry:savan_registry_subs_mgr_serialize_endpoint");
    
    /* Format of the message is as 
     * <subscription><syn:endpoint xmlns:syn="http://ws.apache.org/ns/synapse"><syn:address uri=
     * "http://localhost:9000/services/SimpleStockQuoteService" /></syn:endpoint></subscription>
     */
    ns = axiom_namespace_create (env, SYNAPSE_NAMESPACE, SYNAPSE_NS_PREFIX);
    if(subscriber)
    {
        axis2_endpoint_ref_t *notifyto_epr = NULL;

        notifyto_epr = savan_subscriber_get_notify_to((savan_subscriber_t *) subscriber, env);
        if(notifyto_epr)
        {
            notifyto = (axis2_char_t *) axis2_endpoint_ref_get_address(notifyto_epr, env);
        }
    }

    subs_elem = axiom_element_create(env, NULL, ELEM_NAME_SUBSCRIPTION, NULL, &subs_node);
    endpoint_elem = axiom_element_create(env, subs_node, ELEM_NAME_ENDPOINT, ns, &endpoint_node);
    addr_elem = axiom_element_create(env, endpoint_node, ELEM_NAME_ADDRESS, ns, &addr_node);
    url_attr = axiom_attribute_create(env, ATTR_NAME_URL, notifyto, NULL);
    axiom_element_add_attribute(addr_elem, env, url_attr, addr_node);

    content = (char *) axiom_node_to_string(subs_node, env);
    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
            "[savan] Exit:savan_registry_subs_mgr_serialize_endpoint");

    return content; 
}

static savan_subscriber_t *savan_registry_subs_mgr_extract_subscriber(
        const axutil_env_t *env,
        remote_registry_resource_t *resource)
{
    savan_subscriber_t *subscriber = NULL;
    axis2_char_t *content = NULL;
    axutil_qname_t *qname = NULL;
    axiom_node_t *subs_node = NULL;
    axiom_element_t *subs_element = NULL;
    axiom_node_t *endpoint_node = NULL;
    axiom_element_t *endpoint_element = NULL;
    axiom_node_t *address_node = NULL;
    axiom_element_t *address_element = NULL;
    axis2_char_t *address = NULL;
    axutil_hash_t *properties = NULL;
    axis2_char_t *static_flag = NULL;
    axis2_char_t *subs_mgr_uri = NULL;
    axis2_char_t *filter = NULL;
    axis2_char_t *filter_dialect = NULL;
    axis2_char_t *expires = NULL;
    axis2_endpoint_ref_t *notifyto_epr = NULL;

    content = remote_registry_resource_get_content(resource, env);
    subs_node = axiom_node_create_from_buffer(env, content);
    subs_element = axiom_node_get_data_element(subs_node, env);
    if(!subs_element)
    {
        AXIS2_HANDLE_ERROR(env, AXIS2_ERROR_OM_ELEMENT_EXPECTED, AXIS2_FAILURE);
        return NULL;
    }

    qname = axutil_qname_create(env, ELEM_NAME_ENDPOINT, SYNAPSE_NAMESPACE, NULL);
    if(!qname)
    {
        AXIS2_HANDLE_ERROR(env, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
        return NULL;
    }

    endpoint_element = axiom_element_get_first_child_with_qname(subs_element, env, qname, 
            subs_node, &endpoint_node);

    if(qname)
    {
        axutil_qname_free(qname, env);
    }

    qname = axutil_qname_create(env, ELEM_NAME_ADDRESS, SYNAPSE_NAMESPACE, NULL);
    if(!qname)
    {
        AXIS2_HANDLE_ERROR(env, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
        return NULL;
    }

    address_element = axiom_element_get_first_child_with_qname(endpoint_element, env, qname, 
            endpoint_node, &address_node);

    if(qname)
    {
        axutil_qname_free(qname, env);
    }

    address = axiom_element_get_attribute_value_by_name(address_element, env, ATTR_NAME_URL);
    
    if(!address)
    {
        AXIS2_HANDLE_ERROR(env, AXIS2_ERROR_OM_ELEMENT_INVALID_STATE, AXIS2_FAILURE);
        return NULL; 
    }

    notifyto_epr = axis2_endpoint_ref_create(env, address);
    if(!notifyto_epr)
    {
        AXIS2_HANDLE_ERROR(env, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
        return NULL;
    }

    subscriber = savan_subscriber_create(env);
    if(!subscriber)
    {
        return NULL;
    }

    savan_subscriber_set_notify_to(subscriber, env, notifyto_epr);

    properties = remote_registry_resource_get_properties(resource, env);
    if(!properties)
    {
        savan_subscriber_free(subscriber, env);
        AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_PARSING_SUBSCRIBER_NODE_FAILED, AXIS2_FAILURE);
        return NULL;
    }
    
    expires = remote_registry_resource_get_property(resource, env, "expires");
    if(expires)
    {
        savan_subscriber_set_expires(subscriber, env, expires);
    }

    static_flag = remote_registry_resource_get_property(resource, env, "staticFlag");
    filter = remote_registry_resource_get_property(resource, env, "filterValue");
    if(filter)
    {
        savan_subscriber_set_filter(subscriber, env, filter);
    }

    filter_dialect = remote_registry_resource_get_property(resource, env, "filterDialect");
    if(filter_dialect)
    {
        savan_subscriber_set_filter_dialect(subscriber, env, filter_dialect);
    }

    subs_mgr_uri = remote_registry_resource_get_property(resource, env, "subManagerURI");

    return subscriber;
}

