/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <axis2_amqp_util.h>
#include <axis2_amqp_defines.h>
#include <axis2_amqp_receiver.h>

static const axis2_transport_receiver_ops_t amqp_receiver_ops =
{
    axis2_amqp_receiver_init,
    axis2_amqp_receiver_start,
    axis2_amqp_receiver_get_reply_to_epr,
    axis2_amqp_receiver_get_conf_ctx,
    axis2_amqp_receiver_is_running,
    axis2_amqp_receiver_stop,
    axis2_amqp_receiver_free
};

AXIS2_EXTERN axis2_transport_receiver_t* AXIS2_CALL
axis2_amqp_receiver_create(
    const axutil_env_t* env,
    const axis2_char_t* repo,
    const axis2_char_t* qpid_broker_ip,
    int qpid_broker_port)
{
    AXIS2_ENV_CHECK(env, NULL);

    axis2_amqp_receiver_resource_pack_t* receiver_resource_pack = NULL;

    receiver_resource_pack = (axis2_amqp_receiver_resource_pack_t*)AXIS2_MALLOC(env->allocator,
        sizeof(axis2_amqp_receiver_resource_pack_t));

    if(!receiver_resource_pack)
    {
        AXIS2_ERROR_SET(env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
        return NULL;
    }

    receiver_resource_pack->receiver.ops = &amqp_receiver_ops;
    receiver_resource_pack->qpid_receiver = NULL;
    receiver_resource_pack->conf_ctx = NULL;
    receiver_resource_pack->conf_ctx_private = NULL;

    if(repo)
    {
        /**
         * 1. We first create a private conf ctx which is owned by this server
         * 	  we only free this private conf context. We should never free the
         *    receiver_impl->conf_ctx because it may be owned by any other object which
         *    may lead to double free.
         *
         * 2. The Qpid broker IP and port are set in conf_ctx at two different places.
         * 	  If the repo is specified, they are set here. Otherwise, they are set
         * 	  in axis2_amqp_receiver_init method.
         */
        axutil_property_t* property = NULL;
        const axis2_char_t* broker_ip = NULL;
        int* broker_port = (int*)AXIS2_MALLOC(env->allocator, sizeof(int));
        *broker_port = AXIS2_QPID_NULL_CONF_INT;

        receiver_resource_pack->conf_ctx_private = axis2_build_conf_ctx(env, repo);
        if(!receiver_resource_pack->conf_ctx_private)
        {
            axis2_amqp_receiver_free((axis2_transport_receiver_t *)receiver_resource_pack, env);
            return NULL;
        }

        /* Set broker IP */
        broker_ip = qpid_broker_ip ? qpid_broker_ip : AXIS2_QPID_DEFAULT_BROKER_IP;
        property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0,
            (void*)broker_ip);
        axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx_private, env,
            AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP, property);

        /* Set broker port */
        *broker_port = (qpid_broker_port != AXIS2_QPID_NULL_CONF_INT) ? qpid_broker_port
            : AXIS2_QPID_DEFAULT_BROKER_PORT;
        property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0,
            (void*)broker_port);
        axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx_private, env,
            AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_PORT, property);

        receiver_resource_pack->conf_ctx = receiver_resource_pack->conf_ctx_private;
    }

    return &(receiver_resource_pack->receiver);
}

AXIS2_EXTERN axis2_status_t AXIS2_CALL
axis2_amqp_receiver_init(
    axis2_transport_receiver_t* receiver,
    const axutil_env_t* env,
    axis2_conf_ctx_t* conf_ctx,
    axis2_transport_in_desc_t* in_desc)
{
    axis2_amqp_receiver_resource_pack_t* receiver_resource_pack = NULL;
    axutil_property_t* property = NULL;
    const axis2_char_t* broker_ip = NULL;
    int* broker_port = (int*)AXIS2_MALLOC(env->allocator, sizeof(int));
    *broker_port = AXIS2_QPID_NULL_CONF_INT;

    AXIS2_ENV_CHECK(env, AXIS2_FAILURE);

    receiver_resource_pack = AXIS2_AMQP_RECEIVER_TO_RESOURCE_PACK(receiver);
    receiver_resource_pack->conf_ctx = conf_ctx;

    /* Set broker IP */
    broker_ip = axis2_amqp_util_get_in_desc_conf_value_string(in_desc, env,
        AXIS2_AMQP_CONF_QPID_BROKER_IP);
    if(!broker_ip)
    {
        broker_ip = AXIS2_QPID_DEFAULT_BROKER_IP;
    }
    property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0,
        (void*)broker_ip);
    axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx, env,
        AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP, property);

    /* Set broker port */
    *broker_port = axis2_amqp_util_get_in_desc_conf_value_int(in_desc, env,
        AXIS2_AMQP_CONF_QPID_BROKER_PORT);
    if(*broker_port == AXIS2_QPID_NULL_CONF_INT)
    {
        *broker_port = AXIS2_QPID_DEFAULT_BROKER_PORT;
    }
    property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0,
        (void*)broker_port);
    axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx, env,
        AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_PORT, property);

    return AXIS2_SUCCESS;
}

AXIS2_EXTERN axis2_status_t AXIS2_CALL
axis2_amqp_receiver_start(
    axis2_transport_receiver_t* receiver,
    const axutil_env_t* env)
{
    AXIS2_ENV_CHECK(env, AXIS2_FAILURE);

    axis2_status_t status = AXIS2_FAILURE;

    axis2_amqp_receiver_resource_pack_t* amqp_receiver_resource_pack = NULL;
    axis2_qpid_receiver_resource_pack_t* qpid_receiver_resource_pack = NULL;

    amqp_receiver_resource_pack = AXIS2_AMQP_RECEIVER_TO_RESOURCE_PACK(receiver);

    /* Create Qpid Receiver */
    qpid_receiver_resource_pack = axis2_qpid_receiver_create(env,
        amqp_receiver_resource_pack->conf_ctx);

    if(qpid_receiver_resource_pack)
    {
        amqp_receiver_resource_pack->qpid_receiver = qpid_receiver_resource_pack;

        status = axis2_qpid_receiver_start(qpid_receiver_resource_pack, env);
    }

    return status;
}

AXIS2_EXTERN axis2_endpoint_ref_t* AXIS2_CALL
axis2_amqp_receiver_get_reply_to_epr(
    axis2_transport_receiver_t* receiver,
    const axutil_env_t* env,
    const axis2_char_t* svc_name)
{
    return NULL;
}

AXIS2_EXTERN axis2_conf_ctx_t* AXIS2_CALL
axis2_amqp_receiver_get_conf_ctx(
    axis2_transport_receiver_t* receiver,
    const axutil_env_t* env)
{
    AXIS2_ENV_CHECK(env, NULL);

    return AXIS2_AMQP_RECEIVER_TO_RESOURCE_PACK(receiver)->conf_ctx;
}

AXIS2_EXTERN axis2_bool_t AXIS2_CALL
axis2_amqp_receiver_is_running(
    axis2_transport_receiver_t* receiver,
    const axutil_env_t* env)
{
    return AXIS2_TRUE;
}

AXIS2_EXTERN axis2_status_t AXIS2_CALL
axis2_amqp_receiver_stop(
    axis2_transport_receiver_t* receiver,
    const axutil_env_t* env)
{
    return AXIS2_SUCCESS;
}

AXIS2_EXTERN void AXIS2_CALL
axis2_amqp_receiver_free(
    axis2_transport_receiver_t* receiver,
    const axutil_env_t* env)
{
    AXIS2_ENV_CHECK(env, void);

    axis2_amqp_receiver_resource_pack_t* receiver_resource_pack = NULL;
    receiver_resource_pack = AXIS2_AMQP_RECEIVER_TO_RESOURCE_PACK(receiver);

    if(receiver_resource_pack->qpid_receiver)
    {
        axis2_qpid_receiver_free(receiver_resource_pack->qpid_receiver, env);
        receiver_resource_pack->qpid_receiver = NULL;
    }

    if(receiver_resource_pack->conf_ctx_private)
    {
        axis2_conf_ctx_free(receiver_resource_pack->conf_ctx_private, env);
        receiver_resource_pack->conf_ctx_private = NULL;
    }

    receiver_resource_pack->conf_ctx = NULL; /* Do not free this. It may be owned by some other object */

    AXIS2_FREE(env->allocator, receiver_resource_pack);
}

/* Library Exports */

AXIS2_EXPORT int
#ifndef AXIS2_STATIC_DEPLOY
axis2_get_instance(
#else
    axis2_amqp_receiver_get_instance(
#endif
    struct axis2_transport_receiver** inst,
    const axutil_env_t* env)
{
    int status = AXIS2_SUCCESS;

    *inst = axis2_amqp_receiver_create(env, NULL, NULL, AXIS2_QPID_NULL_CONF_INT);
    if(!(*inst))
    {
        status = AXIS2_FAILURE;
    }

    return status;
}

AXIS2_EXPORT int
#ifndef AXIS2_STATIC_DEPLOY
axis2_remove_instance(
#else
    axis2_amqp_receiver_remove_instance(
#endif
    axis2_transport_receiver_t* inst,
    const axutil_env_t* env)
{
    if(inst)
    {
        axis2_transport_receiver_free(inst, env);
    }

    return AXIS2_SUCCESS;
}
