blob: 3dd38c552a42fe7563ff70bce4ef03b99dd9633f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <axis2_amqp_util.h>
#include <axis2_amqp_defines.h>
#include <axis2_amqp_receiver.h>
static const axis2_transport_receiver_ops_t amqp_receiver_ops =
{
axis2_amqp_receiver_init,
axis2_amqp_receiver_start,
axis2_amqp_receiver_get_reply_to_epr,
axis2_amqp_receiver_get_conf_ctx,
axis2_amqp_receiver_is_running,
axis2_amqp_receiver_stop,
axis2_amqp_receiver_free
};
AXIS2_EXTERN axis2_transport_receiver_t* AXIS2_CALL
axis2_amqp_receiver_create(
const axutil_env_t* env,
const axis2_char_t* repo,
const axis2_char_t* qpid_broker_ip,
int qpid_broker_port)
{
AXIS2_ENV_CHECK(env, NULL);
axis2_amqp_receiver_resource_pack_t* receiver_resource_pack = NULL;
receiver_resource_pack = (axis2_amqp_receiver_resource_pack_t*)AXIS2_MALLOC(env->allocator,
sizeof(axis2_amqp_receiver_resource_pack_t));
if(!receiver_resource_pack)
{
AXIS2_ERROR_SET(env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
return NULL;
}
receiver_resource_pack->receiver.ops = &amqp_receiver_ops;
receiver_resource_pack->qpid_receiver = NULL;
receiver_resource_pack->conf_ctx = NULL;
receiver_resource_pack->conf_ctx_private = NULL;
if(repo)
{
/**
* 1. We first create a private conf ctx which is owned by this server
* we only free this private conf context. We should never free the
* receiver_impl->conf_ctx because it may be owned by any other object which
* may lead to double free.
*
* 2. The Qpid broker IP and port are set in conf_ctx at two different places.
* If the repo is specified, they are set here. Otherwise, they are set
* in axis2_amqp_receiver_init method.
*/
axutil_property_t* property = NULL;
const axis2_char_t* broker_ip = NULL;
int* broker_port = (int*)AXIS2_MALLOC(env->allocator, sizeof(int));
*broker_port = AXIS2_QPID_NULL_CONF_INT;
receiver_resource_pack->conf_ctx_private = axis2_build_conf_ctx(env, repo);
if(!receiver_resource_pack->conf_ctx_private)
{
axis2_amqp_receiver_free((axis2_transport_receiver_t *)receiver_resource_pack, env);
return NULL;
}
/* Set broker IP */
broker_ip = qpid_broker_ip ? qpid_broker_ip : AXIS2_QPID_DEFAULT_BROKER_IP;
property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0,
(void*)broker_ip);
axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx_private, env,
AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP, property);
/* Set broker port */
*broker_port = (qpid_broker_port != AXIS2_QPID_NULL_CONF_INT) ? qpid_broker_port
: AXIS2_QPID_DEFAULT_BROKER_PORT;
property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0,
(void*)broker_port);
axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx_private, env,
AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_PORT, property);
receiver_resource_pack->conf_ctx = receiver_resource_pack->conf_ctx_private;
}
return &(receiver_resource_pack->receiver);
}
AXIS2_EXTERN axis2_status_t AXIS2_CALL
axis2_amqp_receiver_init(
axis2_transport_receiver_t* receiver,
const axutil_env_t* env,
axis2_conf_ctx_t* conf_ctx,
axis2_transport_in_desc_t* in_desc)
{
axis2_amqp_receiver_resource_pack_t* receiver_resource_pack = NULL;
axutil_property_t* property = NULL;
const axis2_char_t* broker_ip = NULL;
int* broker_port = (int*)AXIS2_MALLOC(env->allocator, sizeof(int));
*broker_port = AXIS2_QPID_NULL_CONF_INT;
AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
receiver_resource_pack = AXIS2_AMQP_RECEIVER_TO_RESOURCE_PACK(receiver);
receiver_resource_pack->conf_ctx = conf_ctx;
/* Set broker IP */
broker_ip = axis2_amqp_util_get_in_desc_conf_value_string(in_desc, env,
AXIS2_AMQP_CONF_QPID_BROKER_IP);
if(!broker_ip)
{
broker_ip = AXIS2_QPID_DEFAULT_BROKER_IP;
}
property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0,
(void*)broker_ip);
axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx, env,
AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP, property);
/* Set broker port */
*broker_port = axis2_amqp_util_get_in_desc_conf_value_int(in_desc, env,
AXIS2_AMQP_CONF_QPID_BROKER_PORT);
if(*broker_port == AXIS2_QPID_NULL_CONF_INT)
{
*broker_port = AXIS2_QPID_DEFAULT_BROKER_PORT;
}
property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0,
(void*)broker_port);
axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx, env,
AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_PORT, property);
return AXIS2_SUCCESS;
}
AXIS2_EXTERN axis2_status_t AXIS2_CALL
axis2_amqp_receiver_start(
axis2_transport_receiver_t* receiver,
const axutil_env_t* env)
{
AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
axis2_status_t status = AXIS2_FAILURE;
axis2_amqp_receiver_resource_pack_t* amqp_receiver_resource_pack = NULL;
axis2_qpid_receiver_resource_pack_t* qpid_receiver_resource_pack = NULL;
amqp_receiver_resource_pack = AXIS2_AMQP_RECEIVER_TO_RESOURCE_PACK(receiver);
/* Create Qpid Receiver */
qpid_receiver_resource_pack = axis2_qpid_receiver_create(env,
amqp_receiver_resource_pack->conf_ctx);
if(qpid_receiver_resource_pack)
{
amqp_receiver_resource_pack->qpid_receiver = qpid_receiver_resource_pack;
status = axis2_qpid_receiver_start(qpid_receiver_resource_pack, env);
}
return status;
}
AXIS2_EXTERN axis2_endpoint_ref_t* AXIS2_CALL
axis2_amqp_receiver_get_reply_to_epr(
axis2_transport_receiver_t* receiver,
const axutil_env_t* env,
const axis2_char_t* svc_name)
{
return NULL;
}
AXIS2_EXTERN axis2_conf_ctx_t* AXIS2_CALL
axis2_amqp_receiver_get_conf_ctx(
axis2_transport_receiver_t* receiver,
const axutil_env_t* env)
{
AXIS2_ENV_CHECK(env, NULL);
return AXIS2_AMQP_RECEIVER_TO_RESOURCE_PACK(receiver)->conf_ctx;
}
AXIS2_EXTERN axis2_bool_t AXIS2_CALL
axis2_amqp_receiver_is_running(
axis2_transport_receiver_t* receiver,
const axutil_env_t* env)
{
return AXIS2_TRUE;
}
AXIS2_EXTERN axis2_status_t AXIS2_CALL
axis2_amqp_receiver_stop(
axis2_transport_receiver_t* receiver,
const axutil_env_t* env)
{
return AXIS2_SUCCESS;
}
AXIS2_EXTERN void AXIS2_CALL
axis2_amqp_receiver_free(
axis2_transport_receiver_t* receiver,
const axutil_env_t* env)
{
AXIS2_ENV_CHECK(env, void);
axis2_amqp_receiver_resource_pack_t* receiver_resource_pack = NULL;
receiver_resource_pack = AXIS2_AMQP_RECEIVER_TO_RESOURCE_PACK(receiver);
if(receiver_resource_pack->qpid_receiver)
{
axis2_qpid_receiver_free(receiver_resource_pack->qpid_receiver, env);
receiver_resource_pack->qpid_receiver = NULL;
}
if(receiver_resource_pack->conf_ctx_private)
{
axis2_conf_ctx_free(receiver_resource_pack->conf_ctx_private, env);
receiver_resource_pack->conf_ctx_private = NULL;
}
receiver_resource_pack->conf_ctx = NULL; /* Do not free this. It may be owned by some other object */
AXIS2_FREE(env->allocator, receiver_resource_pack);
}
/* Library Exports */
AXIS2_EXPORT int
#ifndef AXIS2_STATIC_DEPLOY
axis2_get_instance(
#else
axis2_amqp_receiver_get_instance(
#endif
struct axis2_transport_receiver** inst,
const axutil_env_t* env)
{
int status = AXIS2_SUCCESS;
*inst = axis2_amqp_receiver_create(env, NULL, NULL, AXIS2_QPID_NULL_CONF_INT);
if(!(*inst))
{
status = AXIS2_FAILURE;
}
return status;
}
AXIS2_EXPORT int
#ifndef AXIS2_STATIC_DEPLOY
axis2_remove_instance(
#else
axis2_amqp_receiver_remove_instance(
#endif
axis2_transport_receiver_t* inst,
const axutil_env_t* env)
{
if(inst)
{
axis2_transport_receiver_free(inst, env);
}
return AXIS2_SUCCESS;
}