blob: 673ad16d38d6583266b0e91e17b67aecd2268ada [file] [log] [blame]
/*
* Copyright 2004,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <savan_publisher_mod.h>
#include <axutil_log.h>
#include <axutil_hash.h>
#include <axutil_property.h>
#include <axutil_types.h>
#include <axutil_file_handler.h>
#include <platforms/axutil_platform_auto_sense.h>
#include <savan_constants.h>
#include <savan_util.h>
#include <savan_error.h>
#include <libxslt/xsltutils.h>
#include <axiom_soap.h>
#include <axiom_soap_const.h>
#include <axiom_soap_envelope.h>
#include <axiom_element.h>
#include <axiom_node.h>
#include <esb_sender.h>
#include <esb_runtime.h>
/**
*
*/
/**
* @brief Savan XPath Publisher Struct Impl
* Savan XPath Publisher
*/
typedef struct savan_esb_publisher_mod
{
savan_publisher_mod_t publishermod;
axis2_conf_t *conf;
} savan_esb_publisher_mod_t;
#define SAVAN_INTF_TO_IMPL(publishermod) ((savan_esb_publisher_mod_t *) publishermod)
static axis2_status_t
savan_esb_publisher_mod_publish_to_subscriber(
savan_publisher_mod_t *publishermod,
const axutil_env_t *env,
void *msg_ctx,
savan_subscriber_t *subscriber);
AXIS2_EXTERN void AXIS2_CALL
savan_esb_publisher_mod_free(
savan_publisher_mod_t *publishermod,
const axutil_env_t *env);
AXIS2_EXTERN void AXIS2_CALL
savan_esb_publisher_mod_publish(
savan_publisher_mod_t *publishermod,
const axutil_env_t *env,
void *msg_ctx,
savan_storage_mgr_t *storage_mgr);
static const savan_publisher_mod_ops_t savan_publisher_mod_ops =
{
savan_esb_publisher_mod_free,
savan_esb_publisher_mod_publish
};
AXIS2_EXTERN savan_publisher_mod_t * AXIS2_CALL
savan_publisher_mod_create(
const axutil_env_t *env)
{
savan_esb_publisher_mod_t *publishermodimpl = NULL;
publishermodimpl = AXIS2_MALLOC(env->allocator, sizeof(savan_esb_publisher_mod_t));
if (!publishermodimpl)
{
AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_FILTER_CREATION_FAILED, AXIS2_FAILURE);
return NULL;
}
memset ((void *) publishermodimpl, 0, sizeof(savan_esb_publisher_mod_t));
publishermodimpl->conf = NULL;
publishermodimpl->publishermod.ops = &savan_publisher_mod_ops;
return (savan_publisher_mod_t *) publishermodimpl;
}
AXIS2_EXTERN savan_publisher_mod_t * AXIS2_CALL
savan_publisher_mod_create_with_conf(
const axutil_env_t *env,
axis2_conf_t *conf)
{
savan_esb_publisher_mod_t *publishermodimpl = NULL;
publishermodimpl = AXIS2_MALLOC(env->allocator, sizeof(savan_esb_publisher_mod_t));
if (!publishermodimpl)
{
AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_FILTER_CREATION_FAILED, AXIS2_FAILURE);
return NULL;
}
memset ((void *) publishermodimpl, 0, sizeof(savan_esb_publisher_mod_t));
publishermodimpl->conf = conf;
publishermodimpl->publishermod.ops = &savan_publisher_mod_ops;
return (savan_publisher_mod_t *) publishermodimpl;
}
AXIS2_EXTERN void AXIS2_CALL
savan_esb_publisher_mod_free(
savan_publisher_mod_t *publishermod,
const axutil_env_t *env)
{
savan_esb_publisher_mod_t *publishermodimpl = NULL;
publishermodimpl = SAVAN_INTF_TO_IMPL(publishermod);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_esb_publisher_mod_free");
publishermodimpl->conf = NULL;
if(publishermodimpl)
{
AXIS2_FREE(env->allocator, publishermodimpl);
publishermodimpl = NULL;
}
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:savan_esb_publisher_mod_free");
}
AXIS2_EXTERN void AXIS2_CALL
savan_esb_publisher_mod_publish(
savan_publisher_mod_t *publishermod,
const axutil_env_t *env,
void *esb_ctx,
savan_storage_mgr_t *storage_mgr)
{
savan_esb_publisher_mod_t *publishermodimpl = NULL;
axutil_array_list_t *subs_store = NULL;
int i = 0, size = 0;
axis2_char_t *filter = NULL;
publishermodimpl = SAVAN_INTF_TO_IMPL(publishermod);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_esb_publisher_mod_publish");
axutil_allocator_switch_to_global_pool(env->allocator);
if(storage_mgr)
{
subs_store = savan_storage_mgr_retrieve_all_subscribers(storage_mgr, env, filter);
}
if (!subs_store)
{
axutil_allocator_switch_to_local_pool(env->allocator);
AXIS2_LOG_WARNING(env->log, AXIS2_LOG_SI, "[savan] Subscriber store is NULL");
}
size = axutil_array_list_size(subs_store, env);
for(i = 0; i < size; i++)
{
savan_subscriber_t *sub = NULL;
sub = (savan_subscriber_t *)axutil_array_list_get(subs_store, env, i);
if (sub)
{
axis2_char_t *id = savan_subscriber_get_id(sub, env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Publishing to:%s", id);
/* Ideally publishing to each subscriber should happen within a thread for each
* subscriber. However until Axis2/C provide a good thread pool to handle
* such tasks I use this sequential publishing to subscribers.
*/
if(!savan_esb_publisher_mod_publish_to_subscriber(publishermod, env, esb_ctx, sub))
{
axis2_endpoint_ref_t *notifyto = savan_subscriber_get_notify_to(sub, env);
const axis2_char_t *address = NULL;
if(notifyto)
{
address = axis2_endpoint_ref_get_address(notifyto, env);
}
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"Publishing to the Data Sink:%s proviced by subscriber:%s Failed. Check "\
"whether the Data Sink url is correct", address, id);
}
}
}
axutil_allocator_switch_to_local_pool(env->allocator);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:savan_esb_publisher_mod_publish");
}
static axis2_status_t
savan_esb_publisher_mod_publish_to_subscriber(
savan_publisher_mod_t *publishermod,
const axutil_env_t *env,
void *esb_ctx,
savan_subscriber_t *subscriber)
{
axis2_status_t status = AXIS2_SUCCESS;
const axis2_char_t *address = NULL;
axis2_endpoint_ref_t *notifyto = NULL;
esb_rt_epr_t *epr = NULL;
axiom_soap_envelope_t *envelope = NULL;
axiom_soap_body_t *body = NULL;
axiom_node_t *body_node = NULL;
axiom_node_t *payload = NULL;
axis2_msg_ctx_t *msg_ctx = NULL;
msg_ctx = ((esb_ctx_t *) esb_ctx)->in_in_msg_ctx;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_esb_publisher_mod_publish_to_subscriber");
notifyto = savan_subscriber_get_notify_to(subscriber, env);
if(notifyto)
{
address = axis2_endpoint_ref_get_address(notifyto, env);
if(address)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Publishing to:%s", address);
epr = esb_rt_epr_create(env);
epr->uri = axutil_strdup(env, address);
}
}
envelope = axis2_msg_ctx_get_soap_envelope(msg_ctx, env);
body = axiom_soap_envelope_get_body(envelope, env);
body_node = axiom_soap_body_get_base_node(body, env);
payload = axiom_node_get_first_element(body_node, env);
esb_send_on_out_only(env, epr, (esb_ctx_t *) esb_ctx);
axiom_node_detach(payload, env); /*insert this to prevent payload corruption in subsequent
"publish" calls with some payload.*/
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:savan_esb_publisher_mod_publish_to_subscriber");
return status;
}