Filtering abstraction is now completed
diff --git a/include/savan_filter_mod.h b/include/savan_filter_mod.h
index dc44980..28d5706 100644
--- a/include/savan_filter_mod.h
+++ b/include/savan_filter_mod.h
@@ -14,8 +14,8 @@
* limitations under the License.
*/
-#ifndef SAVAN_FILTER_H
-#define SAVAN_FILTER_H
+#ifndef SAVAN_FILTER_MOD_H
+#define SAVAN_FILTER_MOD_H
/**
* @file savan_filter_mod.h
@@ -109,4 +109,4 @@
}
#endif
-#endif /*SAVAN_FILTER_H*/
+#endif /*SAVAN_FILTER_MOD_H*/
diff --git a/include/savan_publishing_client.h b/include/savan_publishing_client.h
index d7c70d1..2ee8f70 100644
--- a/include/savan_publishing_client.h
+++ b/include/savan_publishing_client.h
@@ -30,14 +30,16 @@
#include <axis2_defines.h>
#include <axutil_env.h>
-#include <axis2_conf_ctx.h>
+#include <axis2_svc_client.h>
#include <axutil_hash.h>
+#include <savan_subscriber.h>
#ifdef __cplusplus
extern "C"
{
#endif
+ struct savan_filter_mod;
typedef struct savan_publishing_client_t savan_publishing_client_t;
/**
@@ -52,8 +54,27 @@
savan_publishing_client_t *client,
const axutil_env_t *env,
axiom_node_t *payload,
- axis2_char_t *topic_url);
+ axis2_char_t *filter);
+ /**
+ * Publishes the given msg to the client.
+ * @param client pointer to publishing client
+ * @param svc_client pointer to service client
+ * @param env pointer to environment struct
+ * @param subscriber pointer to subscriber
+ * @param filtermod pointer to filter module
+ * @param payload the content to be published
+ * @return AXIS2_SUCCESS on success, else AXIS2_FAILURE
+ */
+ AXIS2_EXTERN axis2_status_t AXIS2_CALL
+ savan_publishing_client_publish_to_subscriber(
+ savan_publishing_client_t *client,
+ const axutil_env_t *env,
+ axis2_svc_client_t *svc_client,
+ savan_subscriber_t *subscriber,
+ struct savan_filter_mod *filtermod,
+ axiom_node_t *payload);
+
AXIS2_EXTERN savan_publishing_client_t * AXIS2_CALL
savan_publishing_client_create(
const axutil_env_t *env,
diff --git a/include/savan_subscriber.h b/include/savan_subscriber.h
index dadccb3..4b38f81 100644
--- a/include/savan_subscriber.h
+++ b/include/savan_subscriber.h
@@ -30,7 +30,6 @@
#include <axis2_defines.h>
#include <axutil_env.h>
-#include <axis2_conf_ctx.h>
#ifdef __cplusplus
extern "C"
@@ -201,20 +200,6 @@
const axutil_env_t *env);
/**
- * Publishes the given msg to the client.
- * @param subscriber pointer to subscriber
- * @param env pointer to environment struct
- * @param payload the content to be published
- * @return AXIS2_SUCCESS on success, else AXIS2_FAILURE
- */
- AXIS2_EXTERN axis2_status_t AXIS2_CALL
- savan_subscriber_publish(
- savan_subscriber_t *subscriber,
- const axutil_env_t *env,
- struct savan_filter_mod *filtermod,
- axiom_node_t *payload);
-
- /**
* Set whether the subscription is renewed or not.
* @param subscriber pointer to subscriber
* @param env pointer to environment struct
@@ -258,27 +243,6 @@
void *subscriber,
const axutil_env_t *env);
- /*AXIS2_EXTERN axis2_status_t AXIS2_CALL
- savan_subscriber_set_topic_name(
- savan_subscriber_t *subscriber,
- const axutil_env_t *env,
- axis2_char_t *topic_name);
-
- AXIS2_EXTERN axis2_char_t *AXIS2_CALL
- savan_subscriber_get_topic_name(
- savan_subscriber_t *subscriber,
- const axutil_env_t *env);*/
-
- /*AXIS2_EXTERN axis2_status_t AXIS2_CALL
- savan_subscriber_set_topic_url(
- savan_subscriber_t *subscriber,
- const axutil_env_t *env,
- axis2_char_t *topic_url);
-
- AXIS2_EXTERN axis2_char_t *AXIS2_CALL
- savan_subscriber_get_topic_url(
- savan_subscriber_t *subscriber,
- const axutil_env_t *env);*/
/** @} */
#ifdef __cplusplus
}
diff --git a/src/client/savan_publishing_client.c b/src/client/savan_publishing_client.c
index dd81c89..e7431dd 100644
--- a/src/client/savan_publishing_client.c
+++ b/src/client/savan_publishing_client.c
@@ -28,6 +28,7 @@
#include <savan_publishing_client.h>
#include <savan_subscriber.h>
#include <savan_util.h>
+#include <savan_error.h>
#include <savan_constants.h>
#include <savan_storage_mgr.h>
@@ -82,6 +83,10 @@
int i = 0, size = 0;
savan_storage_mgr_t *storage_mgr = NULL;
savan_filter_mod_t *filtermod = NULL;
+ axis2_char_t *path = NULL;
+ axis2_conf_ctx_t *client_conf_ctx = NULL;
+ axis2_svc_t *client_svc = NULL;
+ axis2_svc_client_t *svc_client = NULL;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_publishing_client_publish");
@@ -100,18 +105,40 @@
return AXIS2_SUCCESS; /* returning FAILURE will break handler chain */
}
+ path = AXIS2_GETENV("AXIS2C_HOME");
+ svc_client = axis2_svc_client_create(env, path);
+
+ if(!svc_client)
+ {
+ AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI,
+ "[savan]svc_client creation failed, unable to continue");
+ return AXIS2_SUCCESS;
+ }
+
+ client_conf_ctx = axis2_svc_client_get_conf_ctx(svc_client, env);
+ client_svc = axis2_svc_client_get_svc(svc_client, env);
+
size = axutil_array_list_size(subs_store, env);
for(i = 0; i < size; i++)
{
+ axis2_svc_client_t *svc_client = NULL;
savan_subscriber_t *sub = NULL;
+
sub = (savan_subscriber_t *)axutil_array_list_get(subs_store, env, i);
if (sub)
{
axis2_char_t *id = savan_subscriber_get_id(sub, env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Publishing to:%s", id);
+ svc_client = axis2_svc_client_create_with_conf_ctx_and_svc(env, path, client_conf_ctx,
+ client_svc);
filtermod = savan_util_get_filter_module(env, client->conf);
- if(!savan_subscriber_publish(sub, env, filtermod, payload))
+ /* Ideally publishing to each subscriber should happen within a thread for each
+ * subscriber. However until Axis2/C provide a good thread pool to handle
+ * such tasks I use this sequential publishing to subscribers.
+ */
+ if(!savan_publishing_client_publish_to_subscriber(client, env, svc_client, sub,
+ filtermod, payload))
{
axis2_endpoint_ref_t *notifyto = savan_subscriber_get_notify_to(sub, env);
const axis2_char_t *address = NULL;
@@ -125,12 +152,100 @@
"Publishing to the Data Sink:%s proviced by subscriber:%s Failed. Check "\
"whether the Data Sink url is correct", address, id);
}
+ if(svc_client)
+ {
+ axis2_svc_client_free(svc_client, env);
+ }
}
}
+ if(svc_client)
+ {
+ axis2_svc_client_free(svc_client, env);
+ }
+
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:savan_publishing_client_publish");
return AXIS2_SUCCESS;
}
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+savan_publishing_client_publish_to_subscriber(
+ savan_publishing_client_t *client,
+ const axutil_env_t *env,
+ axis2_svc_client_t *svc_client,
+ savan_subscriber_t *subscriber,
+ savan_filter_mod_t *filtermod,
+ axiom_node_t *payload)
+{
+ axis2_options_t *options = NULL;
+ axis2_status_t status = AXIS2_SUCCESS;
+ axis2_endpoint_ref_t *to = NULL;
+ const axis2_char_t *address = NULL;
+ axiom_node_t *filtered_payload = NULL;
+ axis2_endpoint_ref_t *notifyto = NULL;
+
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_publishing_client_publish_to_subscriber");
+
+ options = (axis2_options_t *) axis2_svc_client_get_options(svc_client, env);
+ if(!options)
+ {
+ options = axis2_options_create(env);
+ axis2_svc_client_set_options(svc_client, env, options);
+ }
+
+ notifyto = savan_subscriber_get_notify_to(subscriber, env);
+ if(notifyto)
+ {
+ address = axis2_endpoint_ref_get_address(notifyto, env);
+ if(address)
+ {
+ AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Publishing to:%s", address);
+ to = axis2_endpoint_ref_create(env, address);
+ axis2_options_set_to(options, env, to);
+ }
+ }
+ axis2_options_set_xml_parser_reset(options, env, AXIS2_FALSE);
+
+
+ /* If this is a filtering request and we cannot find any filter module to filter then error */
+ if(savan_subscriber_get_filter(subscriber, env) && !filtermod)
+ {
+ AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_FILTER_MODULE_COULD_NOT_BE_RETRIEVED, AXIS2_FAILURE);
+ return AXIS2_FAILURE;
+ }
+
+ /* If this is a filtering request and filter module is defined then filter the request.
+ */
+ if(filtermod && savan_subscriber_get_filter(subscriber, env))
+ {
+ /* Apply the filter, and check whether it evaluates to success */
+ filtered_payload = savan_filter_mod_apply(filtermod ,env, subscriber, payload);
+ if(!filtered_payload)
+ {
+ status = axutil_error_get_status_code(env->error);
+ if(AXIS2_SUCCESS != status)
+ {
+ return status;
+ }
+ }
+ }
+
+ if(filtered_payload)
+ {
+ axis2_svc_client_fire_and_forget(svc_client, env, filtered_payload);
+ }
+ else
+ {
+ axis2_svc_client_fire_and_forget(svc_client, env, payload);
+ }
+
+ axiom_node_detach(payload, env); /*insert this to prevent payload corruption in subsequent
+ "publish" calls with some payload.*/
+
+ AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:savan_publishing_client_publish_to_subscriber");
+
+ return status;
+}
+
diff --git a/src/storage/sqlite/storage_mgr.c b/src/storage/sqlite/storage_mgr.c
index c1fe1fb..ce9823a 100644
--- a/src/storage/sqlite/storage_mgr.c
+++ b/src/storage/sqlite/storage_mgr.c
@@ -390,7 +390,6 @@
axis2_char_t *delivery_mode = NULL;
axis2_char_t *expires = NULL;
axis2_char_t *filter = NULL;
- axis2_char_t *topic_name = NULL;
int renewed = 0;
axis2_endpoint_ref_t *endto_epr = NULL;
axis2_endpoint_ref_t *notifyto_epr = NULL;
@@ -579,7 +578,6 @@
axis2_char_t *delivery_mode = NULL;
axis2_char_t *expires = NULL;
axis2_char_t *filter = NULL;
- axis2_char_t *topic_name = NULL;
int renewed = 0;
axis2_endpoint_ref_t *endto_epr = NULL;
axis2_endpoint_ref_t *notifyto_epr = NULL;
diff --git a/src/subscribers/savan_subscriber.c b/src/subscribers/savan_subscriber.c
index a5d37b8..d9361e5 100644
--- a/src/subscribers/savan_subscriber.c
+++ b/src/subscribers/savan_subscriber.c
@@ -16,9 +16,7 @@
#include <axiom.h>
#include <platforms/axutil_platform_auto_sense.h>
-
#include <axis2_endpoint_ref.h>
-#include <axis2_svc_client.h>
#include <savan_subscriber.h>
#include <savan_util.h>
@@ -32,8 +30,6 @@
axis2_char_t *delivery_mode;
axis2_char_t *expires;
axis2_char_t *filter;
- /*axis2_char_t *topic_name;
- axis2_char_t *topic_url;*/
axis2_bool_t renewed;
axis2_char_t *filter_dialect;
};
@@ -59,8 +55,6 @@
subscriber->expires = NULL;
subscriber->filter = NULL;
subscriber->filter_dialect = NULL;
- /*subscriber->topic_name = NULL;
- subscriber->topic_url = NULL;*/
subscriber->renewed = AXIS2_FALSE;
return subscriber;
@@ -100,16 +94,6 @@
AXIS2_FREE(env->allocator, subscriber->filter);
}
- /*if(subscriber->topic_name)
- {
- AXIS2_FREE(env->allocator, subscriber->topic_name);
- }
-
- if(subscriber->topic_url)
- {
- AXIS2_FREE(env->allocator, subscriber->topic_url);
- }*/
-
if(subscriber->filter_dialect)
{
AXIS2_FREE(env->allocator, subscriber->filter_dialect);
@@ -289,92 +273,6 @@
}
AXIS2_EXTERN axis2_status_t AXIS2_CALL
-savan_subscriber_publish(
- savan_subscriber_t *subscriber,
- const axutil_env_t *env,
- savan_filter_mod_t *filtermod,
- axiom_node_t *payload)
-{
- axis2_svc_client_t *svc_client = NULL;
- axis2_char_t *path = NULL;
- axis2_options_t *options = NULL;
- axis2_status_t status = AXIS2_SUCCESS;
- axis2_endpoint_ref_t *to = NULL;
- const axis2_char_t *address = NULL;
- axiom_node_t *filtered_payload = NULL;
-
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Start:savan_subscriber_publish");
-
- path = AXIS2_GETENV("AXIS2C_HOME");
- svc_client = axis2_svc_client_create(env, path);
-
- if(!svc_client)
- {
- AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI,
- "[savan]svc_client creation failed, unable to continue");
- return AXIS2_SUCCESS;
- }
-
-
- /* Setup options */
- options = axis2_options_create(env);
- if(subscriber->notify_to)
- {
- address = axis2_endpoint_ref_get_address(subscriber->notify_to, env);
- if(address)
- {
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Publishing to:%s", address);
- to = axis2_endpoint_ref_create(env, address);
- axis2_options_set_to(options, env, to);
- }
- }
- axis2_options_set_xml_parser_reset(options, env, AXIS2_FALSE);
-
- /* Apply the filter, and check whether it evaluates to success */
-
- if(savan_subscriber_get_filter(subscriber, env) && !filtermod)
- {
- AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_FILTER_MODULE_COULD_NOT_BE_RETRIEVED, AXIS2_FAILURE);
- return AXIS2_FAILURE;
- }
-
- if(filtermod)
- {
- filtered_payload = savan_filter_mod_apply(filtermod ,env, subscriber, payload);
- if(!filtered_payload)
- {
- status = axutil_error_get_status_code(env->error);
- if(AXIS2_SUCCESS != status)
- {
- return status;
- }
- }
- }
-
- /* Set service client options */
- axis2_svc_client_set_options(svc_client, env, options);
- if(filtered_payload)
- {
- axis2_svc_client_fire_and_forget(svc_client, env, filtered_payload);
- }
- else
- {
- axis2_svc_client_fire_and_forget(svc_client, env, payload);
- }
-
- axiom_node_detach(payload, env); /*insert this to prevent payload corruption in subsequent
- "publish" calls with some payload.*/
- if(svc_client)
- {
- axis2_svc_client_free(svc_client, env);
- }
-
- AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] End:savan_subscriber_publish");
-
- return status;
-}
-
-AXIS2_EXTERN axis2_status_t AXIS2_CALL
savan_subscriber_set_renew_status(
savan_subscriber_t *subscriber,
const axutil_env_t *env,
@@ -393,45 +291,3 @@
return subscriber->renewed;
}
-/*AXIS2_EXTERN axis2_status_t AXIS2_CALL
-savan_subscriber_set_topic_name(
- savan_subscriber_t *subscriber,
- const axutil_env_t *env,
- axis2_char_t *topic_name)
-{
- subscriber->topic_name = axutil_strdup(env, topic_name);
-
- return AXIS2_SUCCESS;
-}
-
-AXIS2_EXTERN axis2_char_t *AXIS2_CALL
-savan_subscriber_get_topic_name(
- savan_subscriber_t *subscriber,
- const axutil_env_t *env)
-{
- return subscriber->topic_name;
-}
-
-AXIS2_EXTERN axis2_status_t AXIS2_CALL
-savan_subscriber_set_topic_url(
- savan_subscriber_t *subscriber,
- const axutil_env_t *env,
- axis2_char_t *topic_url)
-{
- subscriber->topic_url = axutil_strdup(env, topic_url);
- if(!subscriber->topic_name)
- {
- subscriber->topic_name = savan_util_get_topic_name_from_topic_url(env, topic_url);
- }
-
- return AXIS2_SUCCESS;
-}
-
-AXIS2_EXTERN axis2_char_t *AXIS2_CALL
-savan_subscriber_get_topic_url(
- savan_subscriber_t *subscriber,
- const axutil_env_t *env)
-{
- return subscriber->topic_url;
-}
-*/