blob: f9b8e17883d28567e593184b6762640101b72c92 [file] [log] [blame]
/*
* Copyright 2004,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <savan_publisher.h>
#include <axutil_log.h>
#include <axutil_hash.h>
#include <axutil_property.h>
#include <axutil_types.h>
#include <axutil_file_handler.h>
#include <platforms/axutil_platform_auto_sense.h>
#include <axis2_svc_client.h>
#include <axis2_conf.h>
#include <axis2_options.h>
#include <axutil_array_list.h>
#include <savan_constants.h>
#include <savan_util.h>
#include <savan_subscriber.h>
#include <savan_subs_mgr.h>
#include <savan_error.h>
#include <axiom_soap.h>
#include <axiom_soap_const.h>
#include <axiom_soap_envelope.h>
#include <axiom_element.h>
#include <axiom_node.h>
/**
*
*/
/**
* @brief Savan Default Publisher Struct Impl
* Savan Default Publisher
*/
typedef struct savan_default_publisher
{
savan_publisher_t publishermod;
axis2_conf_t *conf;
} savan_default_publisher_t;
#define SAVAN_INTF_TO_IMPL(publishermod) ((savan_default_publisher_t *) publishermod)
AXIS2_EXTERN void AXIS2_CALL
savan_default_publisher_free(
savan_publisher_t *publishermod,
const axutil_env_t *env);
AXIS2_EXTERN void AXIS2_CALL
savan_default_publisher_publish(
savan_publisher_t *publishermod,
const axutil_env_t *env,
void *msg_ctx,
savan_subs_mgr_t *subs_mgr);
static axis2_status_t
savan_default_publisher_publish_to_subscriber(
savan_publisher_t *publishermod,
const axutil_env_t *env,
axis2_svc_client_t *svc_client,
savan_subscriber_t *subscriber,
savan_filter_mod_t *filtermod,
axiom_node_t *payload);
static const savan_publisher_ops_t savan_publisher_ops =
{
savan_default_publisher_free,
savan_default_publisher_publish
};
AXIS2_EXTERN savan_publisher_t * AXIS2_CALL
savan_publisher_create_with_conf(
const axutil_env_t *env,
axis2_conf_t *conf)
{
savan_default_publisher_t *publishermodimpl = NULL;
publishermodimpl = AXIS2_MALLOC(env->allocator, sizeof(savan_default_publisher_t));
if (!publishermodimpl)
{
AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_FILTER_CREATION_FAILED, AXIS2_FAILURE);
return NULL;
}
memset ((void *) publishermodimpl, 0, sizeof(savan_default_publisher_t));
publishermodimpl->conf = conf;
publishermodimpl->publishermod.ops = &savan_publisher_ops;
return (savan_publisher_t *) publishermodimpl;
}
AXIS2_EXTERN savan_publisher_t * AXIS2_CALL
savan_publisher_create(
const axutil_env_t *env)
{
return NULL;
}
AXIS2_EXTERN void AXIS2_CALL
savan_default_publisher_free(
savan_publisher_t *publishermod,
const axutil_env_t *env)
{
savan_default_publisher_t *publishermodimpl = NULL;
publishermodimpl = SAVAN_INTF_TO_IMPL(publishermod);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_default_publisher_free");
publishermodimpl->conf = NULL;
if(publishermodimpl)
{
AXIS2_FREE(env->allocator, publishermodimpl);
publishermodimpl = NULL;
}
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:savan_default_publisher_free");
}
AXIS2_EXTERN void AXIS2_CALL
savan_default_publisher_publish(
savan_publisher_t *publishermod,
const axutil_env_t *env,
void *msg_ctx,
savan_subs_mgr_t *subs_mgr)
{
savan_default_publisher_t *publishermodimpl = NULL;
axutil_array_list_t *subs_store = NULL;
int i = 0, size = 0;
savan_filter_mod_t *filtermod = NULL;
const axis2_char_t *path = NULL;
axiom_node_t *payload = NULL;
axiom_soap_envelope_t *envelope = NULL;
axiom_soap_body_t *body = NULL;
axiom_node_t *body_node = NULL;
const axis2_char_t *filter = NULL;
axis2_conf_ctx_t *conf_ctx = NULL;
axutil_property_t *topic_property = NULL;
publishermodimpl = SAVAN_INTF_TO_IMPL(publishermod);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_default_publisher_publish");
topic_property = axis2_msg_ctx_get_property(msg_ctx, env, "topic");
if(topic_property)
{
filter = axutil_property_get_value(topic_property, env);
}
axutil_allocator_switch_to_global_pool(env->allocator);
if(subs_mgr)
{
subs_store = savan_subs_mgr_retrieve_all_subscribers(subs_mgr, env, filter);
}
if (!subs_store)
{
axutil_allocator_switch_to_local_pool(env->allocator);
AXIS2_LOG_WARNING(env->log, AXIS2_LOG_SI, "[savan] Subscriber store is NULL");
}
conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
path = axis2_conf_ctx_get_root_dir(conf_ctx, env);
if(!path)
{
path = AXIS2_GETENV("AXIS2C_HOME");
}
envelope = axis2_msg_ctx_get_soap_envelope((axis2_msg_ctx_t *) msg_ctx, env);
if (!envelope)
{
axutil_allocator_switch_to_local_pool(env->allocator);
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to extract the soap envelop");
}
body = axiom_soap_envelope_get_body(envelope, env);
if (!body)
{
axutil_allocator_switch_to_local_pool(env->allocator);
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to extract the soap body");
}
body_node = axiom_soap_body_get_base_node(body, env);
payload = axiom_node_get_first_child(body_node, env);
size = axutil_array_list_size(subs_store, env);
for(i = 0; i < size; i++)
{
axis2_svc_client_t *svc_client = NULL;
savan_subscriber_t *sub = NULL;
sub = (savan_subscriber_t *)axutil_array_list_get(subs_store, env, i);
if (sub)
{
axis2_char_t *id = savan_subscriber_get_id(sub, env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Publishing to:%s", id);
svc_client = axis2_svc_client_create(env, path);
filtermod = savan_util_get_filter_module(env, publishermodimpl->conf);
/* Ideally publishing to each subscriber should happen within a thread for each
* subscriber. However until Axis2/C provide a good thread pool to handle
* such tasks I use this sequential publishing to subscribers.
*/
if(!savan_default_publisher_publish_to_subscriber(publishermod, env, svc_client, sub,
filtermod, payload))
{
axis2_endpoint_ref_t *notifyto = savan_subscriber_get_notify_to(sub, env);
const axis2_char_t *address = NULL;
if(notifyto)
{
address = axis2_endpoint_ref_get_address(notifyto, env);
}
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"Publishing to the Data Sink:%s proviced by subscriber:%s Failed. Check "\
"whether the Data Sink url is correct", address, id);
}
if(svc_client)
{
axis2_svc_client_free(svc_client, env);
}
}
}
axutil_allocator_switch_to_local_pool(env->allocator);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:savan_default_publisher_publish");
}
static axis2_status_t
savan_default_publisher_publish_to_subscriber(
savan_publisher_t *publishermod,
const axutil_env_t *env,
axis2_svc_client_t *svc_client,
savan_subscriber_t *subscriber,
savan_filter_mod_t *filtermod,
axiom_node_t *payload)
{
axis2_options_t *options = NULL;
axis2_status_t status = AXIS2_SUCCESS;
axis2_endpoint_ref_t *to = NULL;
const axis2_char_t *address = NULL;
axis2_bool_t filter_apply = AXIS2_TRUE;
axis2_endpoint_ref_t *notifyto = NULL;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_default_publisher_publish_to_subscriber");
options = (axis2_options_t *) axis2_svc_client_get_options(svc_client, env);
if(!options)
{
options = axis2_options_create(env);
axis2_svc_client_set_options(svc_client, env, options);
}
axis2_options_set_action(options, env, "http://ws.apache.org/ws/2007/05/eventing-extended/Publish");
axis2_svc_client_engage_module(svc_client, env, AXIS2_MODULE_ADDRESSING);
notifyto = savan_subscriber_get_notify_to(subscriber, env);
if(notifyto)
{
address = axis2_endpoint_ref_get_address(notifyto, env);
if(address)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Publishing to:%s", address);
to = axis2_endpoint_ref_create(env, address);
axis2_options_set_to(options, env, to);
}
}
axis2_options_set_xml_parser_reset(options, env, AXIS2_FALSE);
#ifdef SAVAN_FILTERING
/* If this is a filtering request and filter module is defined then filter the request.
*/
{
axis2_char_t *filter_dialect = NULL;
filter_dialiect = savan_subscriber_get_filter_dialect(subscriber, env);
if(!axutil_strcmp(filter_dialect, SYNAPSE_FILTER_DIALECT))
{
/* Do nothing */
}
else if(filtermod && savan_subscriber_get_filter(subscriber, env))
{
/* Apply the filter, and check whether it evaluates to success */
filter_apply = savan_filter_mod_apply(filtermod ,env, subscriber, payload);
if(!filter_apply)
{
status = axutil_error_get_status_code(env->error);
if(AXIS2_SUCCESS != status)
{
axiom_node_detach(payload, env);
return status;
}
}
}
else
{
AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_FILTER_MODULE_COULD_NOT_BE_RETRIEVED,
AXIS2_FAILURE);
return AXIS2_FAILURE;
}
}
#endif
if(filter_apply)
{
axis2_svc_client_fire_and_forget(svc_client, env, payload);
}
axiom_node_detach(payload, env); /*insert this to prevent payload corruption in subsequent
"publish" calls with some payload.*/
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:savan_default_publisher_publish_to_subscriber");
return status;
}