blob: 4b478e8183513e6e6c45e6e2ca2d88dfdc7243e7 [file] [log] [blame]
/*
* Copyright 2004,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <axis2_engine.h>
#include <axis2_handler_desc.h>
#include <axutil_array_list.h>
#include <axis2_msg_ctx.h>
#include <axutil_property.h>
#include <axis2_conf_ctx.h>
#include <axiom_soap_body.h>
#include <sandesha2_storage_mgr.h>
#include <sandesha2_permanent_storage_mgr.h>
#include <sandesha2_permanent_seq_property_mgr.h>
#include <sandesha2_permanent_sender_mgr.h>
#include <sandesha2_seq_property_mgr.h>
#include <sandesha2_sender_mgr.h>
#include <sandesha2_msg_ctx.h>
#include <sandesha2_msg_processor.h>
#include <sandesha2_ack_msg_processor.h>
#include <sandesha2_ack_req_msg_processor.h>
#include <sandesha2_msg_init.h>
#include <sandesha2_msg_creator.h>
#include <sandesha2_seq.h>
#include <sandesha2_constants.h>
#include <sandesha2_utils.h>
#include <sandesha2_seq_ack.h>
#include <sandesha2_ack_requested.h>
#include <sandesha2_app_msg_processor.h>
#include <axutil_types.h>
static axis2_status_t AXIS2_CALL
sandesha2_in_handler_invoke(
struct axis2_handler *handler,
const axutil_env_t *env,
struct axis2_msg_ctx *msg_ctx);
static axis2_bool_t AXIS2_CALL
sandesha2_in_handler_drop_if_duplicate(
struct axis2_handler *handler,
const axutil_env_t *env,
sandesha2_msg_ctx_t *rm_msg_ctx,
sandesha2_storage_mgr_t *storage_mgr,
sandesha2_seq_property_mgr_t *seq_prop_mgr,
sandesha2_sender_mgr_t *sender_mgr);
static axis2_status_t AXIS2_CALL
sandesha2_in_handler_process_dropped_msg(
struct axis2_handler *handler,
const axutil_env_t *env,
axis2_conf_ctx_t *conf_ctx,
sandesha2_msg_ctx_t *rm_msg_ctx,
sandesha2_storage_mgr_t *storage_mgr,
sandesha2_seq_property_mgr_t *seq_prop_mgr,
sandesha2_sender_mgr_t *sender_mgr);
AXIS2_EXTERN axis2_handler_t* AXIS2_CALL
sandesha2_in_handler_create(
const axutil_env_t *env,
axutil_qname_t *qname)
{
axis2_handler_t *handler = NULL;
handler = axis2_handler_create(env);
if (!handler)
{
return NULL;
}
/* handler init is handled by conf loading, so no need to do it here */
/* set the base struct's invoke op */
axis2_handler_set_invoke(handler, env, sandesha2_in_handler_invoke);
return handler;
}
static axis2_status_t AXIS2_CALL
sandesha2_in_handler_invoke(
struct axis2_handler *handler,
const axutil_env_t *env,
struct axis2_msg_ctx *msg_ctx)
{
axutil_property_t *temp_prop = NULL;
axis2_conf_ctx_t *conf_ctx = NULL;
axis2_conf_t *conf = NULL;
axis2_ctx_t *ctx = NULL;
axis2_char_t *str_done = NULL;
axis2_char_t *reinjected_msg = NULL;
sandesha2_msg_ctx_t *rm_msg_ctx = NULL;
sandesha2_msg_processor_t *msg_processor = NULL;
sandesha2_seq_ack_t *seq_ack = NULL;
sandesha2_ack_requested_t *ack_requested = NULL;
sandesha2_storage_mgr_t *storage_mgr = NULL;
sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
sandesha2_sender_mgr_t *sender_mgr = NULL;
axis2_char_t *dbname = NULL;
axis2_bool_t isolated_last_msg = AXIS2_FALSE;
axis2_bool_t dropped = AXIS2_FALSE;
axis2_char_t *value = NULL;
axutil_property_t *property = NULL;
AXIS2_PARAM_CHECK(env->error, msg_ctx, AXIS2_FAILURE);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Start:sandesha2_in_handler_invoke");
conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
if(!conf_ctx)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2]Configuration Context is NULL");
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_CONF_CTX_NULL, AXIS2_FAILURE);
return AXIS2_FAILURE;
}
if(!sandesha2_permanent_storage_mgr_create_db(env, conf_ctx))
{
return AXIS2_FAILURE;
}
ctx = axis2_msg_ctx_get_base(msg_ctx, env);
temp_prop = axis2_ctx_get_property(ctx, env, SANDESHA2_APPLICATION_PROCESSING_DONE);
if(temp_prop)
{
str_done = (axis2_char_t *) axutil_property_get_value(temp_prop, env);
}
if(str_done && !axutil_strcmp(AXIS2_VALUE_TRUE, str_done))
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Application processing done");
return AXIS2_SUCCESS;
}
temp_prop = axis2_ctx_get_property(ctx, env, SANDESHA2_REINJECTED_MESSAGE);
if(temp_prop)
{
reinjected_msg = (axis2_char_t *) axutil_property_get_value(temp_prop, env);
}
if(reinjected_msg && !axutil_strcmp(AXIS2_VALUE_TRUE, reinjected_msg))
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Reinjected_msg. So return here");
return AXIS2_SUCCESS; /* Reinjected Messages are not processed by sandesha2 inflow handlers */
}
conf = axis2_conf_ctx_get_conf(conf_ctx, env);
rm_msg_ctx = sandesha2_msg_init_init_msg(env, msg_ctx);
dbname = sandesha2_util_get_dbname(env, conf_ctx);
storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
seq_prop_mgr = sandesha2_permanent_seq_property_mgr_create(env, dbname);
sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
property = axis2_msg_ctx_get_property(msg_ctx, env, SANDESHA2_ISOLATED_LAST_MSG);
if(property)
{
value = axutil_property_get_value(property, env);
}
if(value && !axutil_strcmp(AXIS2_VALUE_TRUE, value))
{
isolated_last_msg = AXIS2_TRUE;
}
if(!isolated_last_msg)
{
dropped = sandesha2_in_handler_drop_if_duplicate(handler, env, rm_msg_ctx, storage_mgr,
seq_prop_mgr, sender_mgr);
}
if(dropped)
{
sandesha2_in_handler_process_dropped_msg(handler, env, conf_ctx, rm_msg_ctx, storage_mgr,
seq_prop_mgr, sender_mgr);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] msg_ctx dropped. So return here");
if(rm_msg_ctx)
{
sandesha2_msg_ctx_free(rm_msg_ctx, env);
}
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
return AXIS2_SUCCESS;
}
/*
* TODO Validate the message
* sandesha2_msg_validator_validate(env, rm_msg_ctx);
*/
/*seq_ack = sandesha2_msg_ctx_get_seq_ack(rm_msg_ctx, env);
if(seq_ack)
{
sandesha2_msg_processor_t *ack_proc = NULL;
ack_proc = sandesha2_ack_msg_processor_create(env);
sandesha2_msg_processor_process_in_msg(ack_proc, env, rm_msg_ctx);
sandesha2_msg_processor_free(ack_proc, env);
}*/
ack_requested = sandesha2_msg_ctx_get_ack_requested(rm_msg_ctx, env);
if(ack_requested)
{
sandesha2_ack_requested_set_must_understand(ack_requested, env,
AXIS2_FALSE);
sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
}
seq_ack = sandesha2_msg_ctx_get_seq_ack(rm_msg_ctx, env);
if(seq_ack)
{
sandesha2_msg_processor_t *ack_proc = NULL;
ack_proc = sandesha2_ack_msg_processor_create(env);
sandesha2_msg_processor_process_in_msg(ack_proc, env, rm_msg_ctx);
sandesha2_msg_processor_free(ack_proc, env);
}
msg_processor = sandesha2_msg_processor_create_msg_processor(env, rm_msg_ctx);
if(msg_processor)
{
sandesha2_msg_processor_process_in_msg(msg_processor, env, rm_msg_ctx);
sandesha2_msg_processor_free(msg_processor, env);
}
if(rm_msg_ctx)
{
sandesha2_msg_ctx_free(rm_msg_ctx, env);
}
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Exit:sandesha2_in_handler_invoke");
return AXIS2_SUCCESS;
}
static axis2_bool_t AXIS2_CALL
sandesha2_in_handler_drop_if_duplicate(
struct axis2_handler *handler,
const axutil_env_t *env,
sandesha2_msg_ctx_t *rm_msg_ctx,
sandesha2_storage_mgr_t *storage_mgr,
sandesha2_seq_property_mgr_t *seq_prop_mgr,
sandesha2_sender_mgr_t *sender_mgr)
{
axis2_bool_t drop = AXIS2_FALSE;
AXIS2_PARAM_CHECK(env->error, rm_msg_ctx, AXIS2_FALSE);
AXIS2_PARAM_CHECK(env->error, storage_mgr, AXIS2_FALSE);
if(SANDESHA2_MSG_TYPE_APPLICATION == sandesha2_msg_ctx_get_msg_type(rm_msg_ctx, env))
{
sandesha2_seq_t *sequence = NULL;
long msg_no = -1;
axis2_char_t *rmd_sequence_id = NULL;
sequence = sandesha2_msg_ctx_get_sequence(rm_msg_ctx, env);
if(sequence)
{
rmd_sequence_id = sandesha2_identifier_get_identifier(sandesha2_seq_get_identifier(sequence,
env), env);
msg_no = sandesha2_msg_number_get_msg_num(sandesha2_seq_get_msg_num(sequence, env), env);
}
if(rmd_sequence_id && 0 < msg_no)
{
sandesha2_seq_property_bean_t *rcvd_msgs_bean = NULL;
if(seq_prop_mgr)
{
rcvd_msgs_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, rmd_sequence_id,
SANDESHA2_SEQ_PROP_SERVER_COMPLETED_MESSAGES);
}
if(rcvd_msgs_bean)
{
axis2_char_t *rcvd_msgs_str = NULL;
axutil_array_list_t *msg_no_list = NULL;
int i = 0, size = 0;
rcvd_msgs_str = sandesha2_seq_property_bean_get_value(rcvd_msgs_bean, env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "rcvd_msgs_str:%s", rcvd_msgs_str);
msg_no_list = sandesha2_utils_get_array_list_from_string(env, rcvd_msgs_str);
if(msg_no_list)
{
size = axutil_array_list_size(msg_no_list, env);
for(i = 0; i < size; i++)
{
axis2_char_t *temp = NULL;
temp = axutil_array_list_get(msg_no_list, env, i);
if(axutil_atol(temp) == msg_no)
{
drop = AXIS2_TRUE;
}
AXIS2_FREE(env->allocator, temp);
}
axutil_array_list_free(msg_no_list, env);
}
}
if(!drop)
{
axiom_soap_body_t *soap_body = NULL;
axiom_node_t *body_node = NULL;
axiom_element_t *body_element = NULL;
axiom_children_iterator_t *children_iterator = NULL;
axis2_bool_t empty_body = AXIS2_FALSE;
soap_body = axiom_soap_envelope_get_body(sandesha2_msg_ctx_get_soap_envelope(
rm_msg_ctx, env), env);
body_node = axiom_soap_body_get_base_node(soap_body, env);
body_element = axiom_node_get_data_element(body_node, env);
children_iterator = axiom_element_get_children(body_element, env, body_node);
if(!axiom_children_iterator_has_next(children_iterator, env))
{
empty_body = AXIS2_TRUE;
}
if(empty_body)
{
axis2_char_t *rcvd_msgs_str1 = NULL;
axis2_char_t *bean_value = NULL;
axis2_char_t msg_no_str[32];
sandesha2_msg_processor_t *app_msg_processor = NULL;
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Empty body last msg recieved");
sandesha2_msg_ctx_set_wsa_action(rm_msg_ctx, env,
SANDESHA2_SPEC_2005_02_SOAP_ACTION_LAST_MESSAGE);
if(!rcvd_msgs_bean)
{
rcvd_msgs_bean = sandesha2_seq_property_bean_create_with_data(env,
rmd_sequence_id, SANDESHA2_SEQ_PROP_SERVER_COMPLETED_MESSAGES, "");
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, rcvd_msgs_bean);
}
rcvd_msgs_str1 = sandesha2_seq_property_bean_get_value(rcvd_msgs_bean, env);
sprintf(msg_no_str, "%ld", msg_no);
if(rcvd_msgs_str1 && 0 < axutil_strlen(rcvd_msgs_str1))
{
bean_value = axutil_strcat(env, rcvd_msgs_str1, ",", msg_no_str, NULL);
}
else
{
bean_value = axutil_strdup(env, msg_no_str);
}
sandesha2_seq_property_bean_set_value(rcvd_msgs_bean, env, bean_value);
sandesha2_seq_property_mgr_update(seq_prop_mgr, env, rcvd_msgs_bean);
if(drop)
{
app_msg_processor = sandesha2_app_msg_processor_create(env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Applicatoin message already received. So current "\
"application message dropped. Sending an ack message");
sandesha2_app_msg_processor_send_ack_if_reqd(env, rm_msg_ctx, bean_value,
rmd_sequence_id, storage_mgr, sender_mgr, seq_prop_mgr, -1);
sandesha2_msg_processor_free(app_msg_processor, env);
}
}
}
if(rcvd_msgs_bean)
{
sandesha2_seq_property_bean_free(rcvd_msgs_bean, env);
}
}
}
else if(SANDESHA2_MSG_TYPE_UNKNOWN == sandesha2_msg_ctx_get_msg_type(
rm_msg_ctx, env))
{
axis2_relates_to_t *relates_to = NULL;
axis2_conf_ctx_t *conf_ctx = NULL;
relates_to = sandesha2_msg_ctx_get_relates_to(rm_msg_ctx, env);
if(relates_to)
{
const axis2_char_t *relates_to_val = NULL;
axis2_op_ctx_t *op_ctx = NULL;
axis2_op_ctx_t *op_ctx1 = NULL;
relates_to_val = axis2_relates_to_get_value(relates_to, env);
conf_ctx = axis2_msg_ctx_get_conf_ctx(sandesha2_msg_ctx_get_msg_ctx(
rm_msg_ctx, env), env);
op_ctx = axis2_conf_ctx_get_op_ctx(conf_ctx, env, relates_to_val);
op_ctx1 = axis2_msg_ctx_get_op_ctx(sandesha2_msg_ctx_get_msg_ctx(
rm_msg_ctx, env), env);
if(!op_ctx && !op_ctx1)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Dropping duplicate RM message");
drop = AXIS2_TRUE;
}
}
}
if(drop)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Pausing message context");
sandesha2_msg_ctx_set_paused(rm_msg_ctx, env, AXIS2_TRUE);
return AXIS2_TRUE;
}
return AXIS2_FALSE;
}
/* In this function appropriately respond for the dropeed message. In two way messaging if the
* response for the dropped application message is not acknowledged then take the the response
* message from the database and append acknowledgment for the dropped message to it.
* Otherwise send the acknowledgment for the dropped message.
*/
static axis2_status_t AXIS2_CALL
sandesha2_in_handler_process_dropped_msg(
struct axis2_handler *handler,
const axutil_env_t *env,
axis2_conf_ctx_t *conf_ctx,
sandesha2_msg_ctx_t *rm_msg_ctx,
sandesha2_storage_mgr_t *storage_mgr,
sandesha2_seq_property_mgr_t *seq_prop_mgr,
sandesha2_sender_mgr_t *sender_mgr)
{
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Entry:sandesha2_in_handler_process_dropped_msg");
AXIS2_PARAM_CHECK(env->error, rm_msg_ctx, AXIS2_FALSE);
AXIS2_PARAM_CHECK(env->error, storage_mgr, AXIS2_FALSE);
if(SANDESHA2_MSG_TYPE_APPLICATION == sandesha2_msg_ctx_get_msg_type(rm_msg_ctx, env))
{
sandesha2_seq_t *sequence = NULL;
axis2_char_t *rmd_sequence_id = NULL;
sequence = sandesha2_msg_ctx_get_sequence(rm_msg_ctx, env);
if(sequence)
{
rmd_sequence_id = sandesha2_identifier_get_identifier(sandesha2_seq_get_identifier(sequence,
env), env);
}
if(rmd_sequence_id)
{
sandesha2_seq_property_bean_t *rcvd_msgs_bean = NULL;
axis2_char_t *rcvd_msgs_str = NULL;
sandesha2_msg_processor_t *app_msg_processor = NULL;
rcvd_msgs_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, rmd_sequence_id,
SANDESHA2_SEQ_PROP_SERVER_COMPLETED_MESSAGES);
if(rcvd_msgs_bean)
{
sandesha2_sender_bean_t *sender_bean = NULL;
axis2_char_t *internal_sequence_id = NULL;
long msg_no = -1;
sandesha2_sender_bean_t *find_sender_bean = NULL;
rcvd_msgs_str = sandesha2_seq_property_bean_get_value(rcvd_msgs_bean, env);
msg_no = sandesha2_msg_number_get_msg_num(sandesha2_seq_get_msg_num(sequence, env), env);
internal_sequence_id = sandesha2_utils_get_internal_sequence_id(env, rmd_sequence_id);
find_sender_bean = sandesha2_sender_bean_create(env);
sandesha2_sender_bean_set_msg_no(find_sender_bean, env, msg_no);
sandesha2_sender_bean_set_internal_seq_id(find_sender_bean, env, internal_sequence_id);
sandesha2_sender_bean_set_send(find_sender_bean, env, AXIS2_TRUE);
sender_bean = sandesha2_sender_mgr_find_unique(sender_mgr, env, find_sender_bean);
if(sender_bean)
{
axis2_char_t *storage_key = NULL;
axis2_msg_ctx_t *app_msg_ctx = NULL;
sandesha2_msg_ctx_t *app_rm_msg_ctx = NULL;
axis2_engine_t *engine = NULL;
axis2_op_ctx_t *op_ctx = NULL;
axis2_msg_ctx_t *in_msg_ctx = NULL;
storage_key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_bean, env);
app_msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, storage_key,
conf_ctx, AXIS2_TRUE);
in_msg_ctx = sandesha2_msg_ctx_get_msg_ctx(rm_msg_ctx, env);
axis2_msg_ctx_set_transport_out_stream(app_msg_ctx, env,
axis2_msg_ctx_get_transport_out_stream(in_msg_ctx, env));
axis2_msg_ctx_set_out_transport_info(app_msg_ctx, env,
axis2_msg_ctx_get_out_transport_info(in_msg_ctx, env));
op_ctx = axis2_msg_ctx_get_op_ctx(in_msg_ctx, env);
axis2_op_ctx_set_response_written(op_ctx, env, AXIS2_TRUE);
app_rm_msg_ctx = sandesha2_msg_init_init_msg(env, app_msg_ctx);
sandesha2_msg_creator_add_ack_msg(env, app_rm_msg_ctx, rmd_sequence_id, seq_prop_mgr);
engine = axis2_engine_create(env, conf_ctx);
axis2_engine_send(engine, env, app_msg_ctx);
sandesha2_msg_ctx_free(app_rm_msg_ctx, env);
}
else
{
app_msg_processor = sandesha2_app_msg_processor_create(env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Applicatoin message already received. So current application"\
"message dropped. Sending an ack message");
sandesha2_app_msg_processor_send_ack_if_reqd(env, rm_msg_ctx, rcvd_msgs_str,
rmd_sequence_id, storage_mgr, sender_mgr, seq_prop_mgr, -1);
sandesha2_msg_processor_free(app_msg_processor, env);
}
}
}
}
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Exit:sandesha2_in_handler_process_dropped_msg");
return AXIS2_SUCCESS;
}