Filtering abstraction is now completed

diff --git a/include/savan_filter_mod.h b/include/savan_filter_mod.h
index dc44980..28d5706 100644
--- a/include/savan_filter_mod.h
+++ b/include/savan_filter_mod.h
@@ -14,8 +14,8 @@
  * limitations under the License.
  */
  
-#ifndef SAVAN_FILTER_H
-#define SAVAN_FILTER_H
+#ifndef SAVAN_FILTER_MOD_H
+#define SAVAN_FILTER_MOD_H
 
 /**
   * @file savan_filter_mod.h
@@ -109,4 +109,4 @@
 }
 #endif
 
-#endif /*SAVAN_FILTER_H*/
+#endif /*SAVAN_FILTER_MOD_H*/
diff --git a/include/savan_publishing_client.h b/include/savan_publishing_client.h
index d7c70d1..2ee8f70 100644
--- a/include/savan_publishing_client.h
+++ b/include/savan_publishing_client.h
@@ -30,14 +30,16 @@
 
 #include <axis2_defines.h>
 #include <axutil_env.h>
-#include <axis2_conf_ctx.h>
+#include <axis2_svc_client.h>
 #include <axutil_hash.h>
+#include <savan_subscriber.h>
 
 #ifdef __cplusplus
 extern "C"
 {
 #endif
 
+    struct savan_filter_mod;
     typedef struct savan_publishing_client_t savan_publishing_client_t;
 
     /**
@@ -52,8 +54,27 @@
         savan_publishing_client_t *client,
         const axutil_env_t *env,
         axiom_node_t *payload,
-        axis2_char_t *topic_url);
+        axis2_char_t *filter);
     
+     /**
+     * Publishes the given msg to the client.
+     * @param client pointer to publishing client
+     * @param svc_client pointer to service client
+     * @param env pointer to environment struct
+     * @param subscriber pointer to subscriber
+     * @param filtermod pointer to filter module
+     * @param payload the content to be published
+     * @return AXIS2_SUCCESS on success, else AXIS2_FAILURE 
+     */
+   AXIS2_EXTERN axis2_status_t AXIS2_CALL
+    savan_publishing_client_publish_to_subscriber(
+        savan_publishing_client_t *client,
+        const axutil_env_t *env,
+        axis2_svc_client_t *svc_client,
+        savan_subscriber_t *subscriber,
+        struct savan_filter_mod *filtermod,
+        axiom_node_t *payload);
+
     AXIS2_EXTERN savan_publishing_client_t * AXIS2_CALL
     savan_publishing_client_create(
         const axutil_env_t *env,
diff --git a/include/savan_subscriber.h b/include/savan_subscriber.h
index dadccb3..4b38f81 100644
--- a/include/savan_subscriber.h
+++ b/include/savan_subscriber.h
@@ -30,7 +30,6 @@
 
 #include <axis2_defines.h>
 #include <axutil_env.h>
-#include <axis2_conf_ctx.h>
 
 #ifdef __cplusplus
 extern "C"
@@ -201,20 +200,6 @@
         const axutil_env_t *env);
 
     /**
-     * Publishes the given msg to the client.
-     * @param subscriber pointer to subscriber
-     * @param env pointer to environment struct
-     * @param payload the content to be published
-     * @return AXIS2_SUCCESS on success, else AXIS2_FAILURE 
-     */
-    AXIS2_EXTERN axis2_status_t AXIS2_CALL
-    savan_subscriber_publish(
-        savan_subscriber_t *subscriber,
-        const axutil_env_t *env,
-        struct savan_filter_mod *filtermod,
-        axiom_node_t *payload);
-
-    /**
      * Set whether the subscription is renewed or not.
      * @param subscriber pointer to subscriber
      * @param env pointer to environment struct
@@ -258,27 +243,6 @@
         void *subscriber, 
         const axutil_env_t *env);
 
-    /*AXIS2_EXTERN axis2_status_t AXIS2_CALL
-        savan_subscriber_set_topic_name(
-        savan_subscriber_t *subscriber,
-        const axutil_env_t *env,
-        axis2_char_t *topic_name);
-
-    AXIS2_EXTERN axis2_char_t *AXIS2_CALL
-    savan_subscriber_get_topic_name(
-        savan_subscriber_t *subscriber,
-        const axutil_env_t *env);*/
-    
-    /*AXIS2_EXTERN axis2_status_t AXIS2_CALL
-    savan_subscriber_set_topic_url(
-        savan_subscriber_t *subscriber,
-        const axutil_env_t *env,
-        axis2_char_t *topic_url);
-
-    AXIS2_EXTERN axis2_char_t *AXIS2_CALL
-    savan_subscriber_get_topic_url(
-        savan_subscriber_t *subscriber,
-        const axutil_env_t *env);*/
 /** @} */
 #ifdef __cplusplus
 }
diff --git a/src/client/savan_publishing_client.c b/src/client/savan_publishing_client.c
index dd81c89..e7431dd 100644
--- a/src/client/savan_publishing_client.c
+++ b/src/client/savan_publishing_client.c
@@ -28,6 +28,7 @@
 #include <savan_publishing_client.h>
 #include <savan_subscriber.h>
 #include <savan_util.h>
+#include <savan_error.h>
 #include <savan_constants.h>
 #include <savan_storage_mgr.h>
 
@@ -82,6 +83,10 @@
     int i = 0, size = 0;
     savan_storage_mgr_t *storage_mgr = NULL;
     savan_filter_mod_t *filtermod = NULL;
+    axis2_char_t *path = NULL;
+    axis2_conf_ctx_t *client_conf_ctx = NULL;
+    axis2_svc_t *client_svc = NULL;
+    axis2_svc_client_t *svc_client = NULL;
 
     AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_publishing_client_publish");
    
@@ -100,18 +105,40 @@
         return AXIS2_SUCCESS; /* returning FAILURE will break handler chain */
     }
 
+    path = AXIS2_GETENV("AXIS2C_HOME");
+    svc_client = axis2_svc_client_create(env, path);
+
+    if(!svc_client)
+    {
+        AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI, 
+            "[savan]svc_client creation failed, unable to continue");
+        return AXIS2_SUCCESS;
+    }
+
+    client_conf_ctx = axis2_svc_client_get_conf_ctx(svc_client, env);
+    client_svc = axis2_svc_client_get_svc(svc_client, env);
+
     size = axutil_array_list_size(subs_store, env);
     for(i = 0; i < size; i++)
     {
+        axis2_svc_client_t *svc_client = NULL;
         savan_subscriber_t *sub = NULL;
+
         sub = (savan_subscriber_t *)axutil_array_list_get(subs_store, env, i);
         if (sub)
         {
             axis2_char_t *id = savan_subscriber_get_id(sub, env);
             AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Publishing to:%s", id);
 
+            svc_client = axis2_svc_client_create_with_conf_ctx_and_svc(env, path, client_conf_ctx, 
+                    client_svc);
             filtermod = savan_util_get_filter_module(env, client->conf);
-            if(!savan_subscriber_publish(sub, env, filtermod, payload))
+            /* Ideally publishing to each subscriber should happen within a thread for each 
+             * subscriber. However until Axis2/C provide a good thread pool to handle
+             * such tasks I use this sequential publishing to subscribers.
+             */
+            if(!savan_publishing_client_publish_to_subscriber(client, env, svc_client, sub, 
+                        filtermod, payload))
             {
                 axis2_endpoint_ref_t *notifyto = savan_subscriber_get_notify_to(sub, env);
                 const axis2_char_t *address = NULL;
@@ -125,12 +152,100 @@
                         "Publishing to the Data Sink:%s proviced by subscriber:%s Failed. Check "\
                         "whether the Data Sink url is correct", address, id);
             }
+            if(svc_client)
+            {
+                axis2_svc_client_free(svc_client, env);
+            }
         }
     }
 
+    if(svc_client)
+    {
+        axis2_svc_client_free(svc_client, env);
+    }
+
     AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:savan_publishing_client_publish");
     
     return AXIS2_SUCCESS;
 }
 
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+savan_publishing_client_publish_to_subscriber(
+    savan_publishing_client_t *client,
+    const axutil_env_t *env,
+    axis2_svc_client_t *svc_client,
+    savan_subscriber_t *subscriber,
+    savan_filter_mod_t *filtermod,
+    axiom_node_t *payload)
+{
+    axis2_options_t *options = NULL;
+    axis2_status_t status = AXIS2_SUCCESS;
+    axis2_endpoint_ref_t *to = NULL;
+    const axis2_char_t *address = NULL;
+    axiom_node_t *filtered_payload = NULL;
+    axis2_endpoint_ref_t *notifyto = NULL;
+
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_publishing_client_publish_to_subscriber");
+
+    options = (axis2_options_t *) axis2_svc_client_get_options(svc_client, env);
+    if(!options)
+    {
+        options = axis2_options_create(env);
+        axis2_svc_client_set_options(svc_client, env, options);
+    }
+
+    notifyto = savan_subscriber_get_notify_to(subscriber, env);
+    if(notifyto)
+    {
+        address = axis2_endpoint_ref_get_address(notifyto, env);
+        if(address)
+        {
+            AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Publishing to:%s", address);
+            to = axis2_endpoint_ref_create(env, address);
+            axis2_options_set_to(options, env, to);
+        }
+    }
+    axis2_options_set_xml_parser_reset(options, env, AXIS2_FALSE);
+
+
+    /* If this is a filtering request and we cannot find any filter module to filter then error */
+    if(savan_subscriber_get_filter(subscriber, env) && !filtermod)
+    {
+        AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_FILTER_MODULE_COULD_NOT_BE_RETRIEVED, AXIS2_FAILURE);
+        return AXIS2_FAILURE;
+    }
+
+    /* If this is a filtering request and filter module is defined then filter the request.
+     */
+    if(filtermod && savan_subscriber_get_filter(subscriber, env))
+    {
+	    /* Apply the filter, and check whether it evaluates to success */
+        filtered_payload = savan_filter_mod_apply(filtermod ,env, subscriber, payload);
+        if(!filtered_payload)
+        {
+            status = axutil_error_get_status_code(env->error);
+            if(AXIS2_SUCCESS != status)
+            {
+                return status;
+            }
+        }
+    }
+
+    if(filtered_payload)
+    {
+        axis2_svc_client_fire_and_forget(svc_client, env, filtered_payload);
+    }
+    else
+    {
+        axis2_svc_client_fire_and_forget(svc_client, env, payload);
+    }
+
+    axiom_node_detach(payload, env); /*insert this to prevent payload corruption in subsequent 
+                                       "publish" calls with some payload.*/
+
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:savan_publishing_client_publish_to_subscriber");
+
+    return status;
+}
+
 
diff --git a/src/storage/sqlite/storage_mgr.c b/src/storage/sqlite/storage_mgr.c
index c1fe1fb..ce9823a 100644
--- a/src/storage/sqlite/storage_mgr.c
+++ b/src/storage/sqlite/storage_mgr.c
@@ -390,7 +390,6 @@
     axis2_char_t *delivery_mode = NULL;
     axis2_char_t *expires = NULL;
     axis2_char_t *filter = NULL;
-    axis2_char_t *topic_name = NULL;
     int renewed = 0;
     axis2_endpoint_ref_t *endto_epr = NULL;
     axis2_endpoint_ref_t *notifyto_epr = NULL;
@@ -579,7 +578,6 @@
     axis2_char_t *delivery_mode = NULL;
     axis2_char_t *expires = NULL;
     axis2_char_t *filter = NULL;
-    axis2_char_t *topic_name = NULL;
     int renewed = 0;
     axis2_endpoint_ref_t *endto_epr = NULL;
     axis2_endpoint_ref_t *notifyto_epr = NULL;
diff --git a/src/subscribers/savan_subscriber.c b/src/subscribers/savan_subscriber.c
index a5d37b8..d9361e5 100644
--- a/src/subscribers/savan_subscriber.c
+++ b/src/subscribers/savan_subscriber.c
@@ -16,9 +16,7 @@
  
 #include <axiom.h>
 #include <platforms/axutil_platform_auto_sense.h>
-
 #include <axis2_endpoint_ref.h>
-#include <axis2_svc_client.h>
 
 #include <savan_subscriber.h>
 #include <savan_util.h>
@@ -32,8 +30,6 @@
     axis2_char_t *delivery_mode;
     axis2_char_t *expires;
     axis2_char_t *filter;
-    /*axis2_char_t *topic_name;
-    axis2_char_t *topic_url;*/
     axis2_bool_t renewed;
 	axis2_char_t *filter_dialect;
 };
@@ -59,8 +55,6 @@
     subscriber->expires = NULL;
     subscriber->filter = NULL;
     subscriber->filter_dialect = NULL;
-    /*subscriber->topic_name = NULL;
-    subscriber->topic_url = NULL;*/
     subscriber->renewed = AXIS2_FALSE;
         
     return subscriber;
@@ -100,16 +94,6 @@
         AXIS2_FREE(env->allocator, subscriber->filter);
     }
 
-    /*if(subscriber->topic_name)
-    {
-        AXIS2_FREE(env->allocator, subscriber->topic_name);
-    }
-    
-    if(subscriber->topic_url)
-    {
-        AXIS2_FREE(env->allocator, subscriber->topic_url);
-    }*/
-
     if(subscriber->filter_dialect)
     {
         AXIS2_FREE(env->allocator, subscriber->filter_dialect);
@@ -289,92 +273,6 @@
 }
 
 AXIS2_EXTERN axis2_status_t AXIS2_CALL
-savan_subscriber_publish(
-    savan_subscriber_t *subscriber,
-    const axutil_env_t *env,
-    savan_filter_mod_t *filtermod,
-    axiom_node_t *payload)
-{
-    axis2_svc_client_t *svc_client = NULL;
-    axis2_char_t *path = NULL;
-    axis2_options_t *options = NULL;
-    axis2_status_t status = AXIS2_SUCCESS;
-    axis2_endpoint_ref_t *to = NULL;
-    const axis2_char_t *address = NULL;
-    axiom_node_t *filtered_payload = NULL;
-
-    AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Start:savan_subscriber_publish");
-	
-    path = AXIS2_GETENV("AXIS2C_HOME");
-    svc_client = axis2_svc_client_create(env, path);
-
-    if(!svc_client)
-    {
-        AXIS2_LOG_ERROR (env->log, AXIS2_LOG_SI, 
-            "[savan]svc_client creation failed, unable to continue");
-        return AXIS2_SUCCESS;
-    }
-
-
-    /* Setup options */
-    options = axis2_options_create(env);
-    if(subscriber->notify_to)
-    {
-        address = axis2_endpoint_ref_get_address(subscriber->notify_to, env);
-        if(address)
-        {
-            AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Publishing to:%s", address);
-            to = axis2_endpoint_ref_create(env, address);
-            axis2_options_set_to(options, env, to);
-        }
-    }
-    axis2_options_set_xml_parser_reset(options, env, AXIS2_FALSE);
-
-	/* Apply the filter, and check whether it evaluates to success */
-
-    if(savan_subscriber_get_filter(subscriber, env) && !filtermod)
-    {
-        AXIS2_HANDLE_ERROR(env, SAVAN_ERROR_FILTER_MODULE_COULD_NOT_BE_RETRIEVED, AXIS2_FAILURE);
-        return AXIS2_FAILURE;
-    }
-
-    if(filtermod)
-    {
-        filtered_payload = savan_filter_mod_apply(filtermod ,env, subscriber, payload);
-        if(!filtered_payload)
-        {
-            status = axutil_error_get_status_code(env->error);
-            if(AXIS2_SUCCESS != status)
-            {
-                return status;
-            }
-        }
-    }
-
-    /* Set service client options */
-    axis2_svc_client_set_options(svc_client, env, options);
-    if(filtered_payload)
-    {
-        axis2_svc_client_fire_and_forget(svc_client, env, filtered_payload);
-    }
-    else
-    {
-        axis2_svc_client_fire_and_forget(svc_client, env, payload);
-    }
-
-    axiom_node_detach(payload, env); /*insert this to prevent payload corruption in subsequent 
-                                       "publish" calls with some payload.*/
-    if(svc_client)
-    {
-        axis2_svc_client_free(svc_client, env);
-    }
-
-    AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] End:savan_subscriber_publish");
-
-    return status;
-}
-
-AXIS2_EXTERN axis2_status_t AXIS2_CALL
 savan_subscriber_set_renew_status(
     savan_subscriber_t *subscriber,
     const axutil_env_t *env,
@@ -393,45 +291,3 @@
     return subscriber->renewed;
 }
 
-/*AXIS2_EXTERN axis2_status_t AXIS2_CALL
-savan_subscriber_set_topic_name(
-    savan_subscriber_t *subscriber,
-    const axutil_env_t *env,
-    axis2_char_t *topic_name)
-{
-    subscriber->topic_name = axutil_strdup(env, topic_name);
-
-    return AXIS2_SUCCESS;
-}
-
-AXIS2_EXTERN axis2_char_t *AXIS2_CALL
-savan_subscriber_get_topic_name(
-    savan_subscriber_t *subscriber,
-    const axutil_env_t *env)
-{
-    return subscriber->topic_name;
-}
-
-AXIS2_EXTERN axis2_status_t AXIS2_CALL
-savan_subscriber_set_topic_url(
-    savan_subscriber_t *subscriber,
-    const axutil_env_t *env,
-    axis2_char_t *topic_url)
-{
-    subscriber->topic_url = axutil_strdup(env, topic_url);
-    if(!subscriber->topic_name)
-    {
-        subscriber->topic_name = savan_util_get_topic_name_from_topic_url(env, topic_url);
-    }
-
-    return AXIS2_SUCCESS;
-}
-
-AXIS2_EXTERN axis2_char_t *AXIS2_CALL
-savan_subscriber_get_topic_url(
-    savan_subscriber_t *subscriber,
-    const axutil_env_t *env)
-{
-    return subscriber->topic_url;
-}
-*/