blob: e7d6076cde42f002fddc554b1d0e23bea1955d0c [file] [log] [blame]
/*
* Copyright 2004,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sandesha2_app_msg_processor.h>
#include <sandesha2_ack_msg_processor.h>
#include <sandesha2_seq_ack.h>
#include <sandesha2_seq_mgr.h>
#include <sandesha2_seq.h>
#include <sandesha2_ack_requested.h>
#include <sandesha2_last_msg.h>
#include <sandesha2_create_seq.h>
#include <sandesha2_identifier.h>
#include <sandesha2_spec_specific_consts.h>
#include <sandesha2_seq_property_mgr.h>
#include <sandesha2_create_seq_mgr.h>
#include <sandesha2_next_msg_mgr.h>
#include <sandesha2_sender_mgr.h>
#include <sandesha2_invoker_mgr.h>
#include <sandesha2_permanent_seq_property_mgr.h>
#include <sandesha2_permanent_create_seq_mgr.h>
#include <sandesha2_permanent_next_msg_mgr.h>
#include <sandesha2_permanent_sender_mgr.h>
#include <sandesha2_permanent_invoker_mgr.h>
#include <sandesha2_seq_property_bean.h>
#include <sandesha2_storage_mgr.h>
#include <sandesha2_fault_mgr.h>
#include <sandesha2_constants.h>
#include <sandesha2_utils.h>
#include <sandesha2_msg_ctx.h>
#include <sandesha2_create_seq_bean.h>
#include <sandesha2_sender_bean.h>
#include <sandesha2_msg_init.h>
#include <sandesha2_ack_mgr.h>
#include <sandesha2_msg_creator.h>
#include <sandesha2_client_constants.h>
#include <sandesha2_terminate_mgr.h>
#include <sandesha2_msg_retrans_adjuster.h>
#include <axis2_const.h>
#include <axutil_types.h>
#include <axis2_msg_ctx.h>
#include <axutil_string.h>
#include <axis2_engine.h>
#include <axutil_uuid_gen.h>
#include <axis2_relates_to.h>
#include <axis2_core_utils.h>
#include <axiom_soap_const.h>
#include <axiom_soap_body.h>
#include <axis2_http_transport_utils.h>
#include <axis2_listener_manager.h>
#include <platforms/axutil_platform_auto_sense.h>
/**
* @brief Application Message Processor struct impl
* Sandesha2 App Msg Processor
*/
typedef struct sandesha2_app_msg_processor_impl sandesha2_app_msg_processor_impl_t;
struct sandesha2_app_msg_processor_impl
{
sandesha2_msg_processor_t msg_processor;
};
#define SANDESHA2_INTF_TO_IMPL(msg_proc) \
((sandesha2_app_msg_processor_impl_t *)(msg_proc))
typedef struct sandesha2_app_msg_processor_args sandesha2_app_msg_processor_args_t;
struct sandesha2_app_msg_processor_args
{
axutil_env_t *env;
axis2_conf_ctx_t *conf_ctx;
axis2_char_t *internal_sequence_id;
axis2_char_t *msg_id;
axis2_bool_t is_server_side;
int retrans_interval;
void *bean;
void *msg_ctx;
sandesha2_seq_t *rm_sequence;
};
static sandesha2_app_msg_processor_args_t *
sandesha2_app_msg_processor_args_create(
axutil_env_t *env,
axis2_conf_ctx_t *conf_ctx,
axis2_char_t *internal_sequence_id,
axis2_char_t *msg_id,
const axis2_bool_t is_server_side,
int retrans_interval,
sandesha2_seq_t *rm_sequence);
static void
sandesha2_app_msg_processor_args_free(
sandesha2_app_msg_processor_args_t *args,
const axutil_env_t *env);
static void AXIS2_CALL
sandesha2_app_msg_processor_is_last_out_msg(
const axutil_env_t *env,
axis2_msg_ctx_t *msg_ctx,
axis2_char_t *rmd_sequence_id,
axis2_char_t *internal_sequence_id,
long msg_num,
sandesha2_seq_property_mgr_t *seq_prop_mgr);
static void * AXIS2_THREAD_FUNC
sandesha2_app_msg_processor_create_seq_msg_worker_function(
axutil_thread_t *thd,
void *data);
static axis2_status_t
sandesha2_app_msg_processor_start_create_seq_msg_resender(
const axutil_env_t *env,
axis2_conf_ctx_t *conf_ctx,
axis2_char_t *internal_sequence_id,
axis2_char_t *msg_id,
const axis2_bool_t is_server_side,
int retrans_interval);
static void * AXIS2_THREAD_FUNC
sandesha2_app_msg_processor_application_msg_worker_function(
axutil_thread_t *thd,
void *data);
static axis2_status_t
sandesha2_app_msg_processor_start_application_msg_resender(
const axutil_env_t *env,
axis2_conf_ctx_t *conf_ctx,
axis2_char_t *internal_sequence_id,
axis2_char_t *msg_id,
const axis2_bool_t is_server_side,
int retrans_interval,
axis2_msg_ctx_t *app_msg_ctx,
sandesha2_seq_t *rm_sequence);
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_process_in_msg (
sandesha2_msg_processor_t *msg_processor,
const axutil_env_t *env,
sandesha2_msg_ctx_t *msg_ctx);
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_process_out_msg(
sandesha2_msg_processor_t *msg_processor,
const axutil_env_t *env,
sandesha2_msg_ctx_t *msg_ctx);
static axis2_bool_t AXIS2_CALL
sandesha2_app_msg_processor_msg_num_is_in_list(
const axutil_env_t *env,
axis2_char_t *list,
long num);
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_process_create_seq_response(
const axutil_env_t *env,
axis2_msg_ctx_t *create_seq_msg_ctx,
sandesha2_storage_mgr_t *storage_mgr);
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_process_app_msg_response(
const axutil_env_t *env,
axis2_msg_ctx_t *app_msg_ctx);
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_send_create_seq_msg(
const axutil_env_t *env,
sandesha2_msg_ctx_t *msg_ctx,
axis2_char_t *internal_seq_id,
axis2_char_t *acks_to,
sandesha2_storage_mgr_t *storage_mgr,
sandesha2_seq_property_mgr_t *seq_prop_mgr,
sandesha2_create_seq_mgr_t *create_seq_mgr,
sandesha2_sender_mgr_t *sender_mgr);
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_send_app_msg(
const axutil_env_t *env,
sandesha2_msg_ctx_t *msg_ctx,
axis2_char_t *internal_seq_id,
long msg_num,
axis2_char_t *storage_key,
sandesha2_storage_mgr_t *storage_mgr,
sandesha2_create_seq_mgr_t *create_seq_mgr,
sandesha2_seq_property_mgr_t *seq_prop_mgr,
sandesha2_sender_mgr_t *sender_mgr);
static axis2_status_t
sandesha2_app_msg_processor_resend(
const axutil_env_t *env,
axis2_conf_ctx_t *conf_ctx,
axis2_char_t *msg_id,
axis2_bool_t is_svr_side,
const axis2_char_t *internal_seq_id,
sandesha2_storage_mgr_t *storage_mgr,
sandesha2_seq_property_mgr_t *seq_prop_mgr,
sandesha2_create_seq_mgr_t *create_seq_mgr,
sandesha2_sender_mgr_t *sender_mgr,
axis2_msg_ctx_t *app_msg_ctx);
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_set_next_msg_no(
const axutil_env_t *env,
axis2_char_t *internal_seq_id,
long msg_num,
sandesha2_seq_property_mgr_t *seq_prop_mgr);
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_set_last_out_msg_no(
const axutil_env_t *env,
axis2_char_t *internal_seq_id,
long msg_num,
sandesha2_seq_property_mgr_t *seq_prop_mgr);
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_free (
sandesha2_msg_processor_t *element,
const axutil_env_t *env);
AXIS2_EXTERN sandesha2_msg_processor_t* AXIS2_CALL
sandesha2_app_msg_processor_create(
const axutil_env_t *env)
{
sandesha2_app_msg_processor_impl_t *msg_proc_impl = NULL;
msg_proc_impl = (sandesha2_app_msg_processor_impl_t *)AXIS2_MALLOC (env->allocator,
sizeof(sandesha2_app_msg_processor_impl_t));
if(!msg_proc_impl)
{
AXIS2_ERROR_SET(env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
return NULL;
}
msg_proc_impl->msg_processor.ops = AXIS2_MALLOC(env->allocator,
sizeof(sandesha2_msg_processor_ops_t));
if(!msg_proc_impl->msg_processor.ops)
{
sandesha2_app_msg_processor_free((sandesha2_msg_processor_t*)
msg_proc_impl, env);
AXIS2_ERROR_SET(env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
return NULL;
}
msg_proc_impl->msg_processor.ops->process_in_msg =
sandesha2_app_msg_processor_process_in_msg;
msg_proc_impl->msg_processor.ops->process_out_msg =
sandesha2_app_msg_processor_process_out_msg;
msg_proc_impl->msg_processor.ops->free = sandesha2_app_msg_processor_free;
return &(msg_proc_impl->msg_processor);
}
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_free (
sandesha2_msg_processor_t *msg_processor,
const axutil_env_t *env)
{
sandesha2_app_msg_processor_impl_t *msg_proc_impl = NULL;
msg_proc_impl = SANDESHA2_INTF_TO_IMPL(msg_processor);
if(msg_processor->ops)
{
AXIS2_FREE(env->allocator, msg_processor->ops);
}
AXIS2_FREE(env->allocator, SANDESHA2_INTF_TO_IMPL(msg_processor));
return AXIS2_SUCCESS;
}
static sandesha2_app_msg_processor_args_t *
sandesha2_app_msg_processor_args_create(
axutil_env_t *env,
axis2_conf_ctx_t *conf_ctx,
axis2_char_t *internal_sequence_id,
axis2_char_t *msg_id,
const axis2_bool_t is_server_side,
int retrans_interval,
sandesha2_seq_t *rm_sequence)
{
sandesha2_app_msg_processor_args_t *args = NULL;
args = AXIS2_MALLOC(env->allocator, sizeof(sandesha2_app_msg_processor_args_t));
if(!args)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Could not create arguments for the thread process");
AXIS2_ERROR_SET(env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
return NULL;
}
args->env = env;
args->conf_ctx = conf_ctx;
args->internal_sequence_id = axutil_strdup(env, internal_sequence_id);
args->msg_id = axutil_strdup(env, msg_id);
args->retrans_interval = retrans_interval;
args->is_server_side = is_server_side;
if(rm_sequence)
{
args->rm_sequence = sandesha2_seq_clone(env, rm_sequence);
}
return args;
}
static void
sandesha2_app_msg_processor_args_free(
sandesha2_app_msg_processor_args_t *args,
const axutil_env_t *env)
{
args->conf_ctx = NULL;
if(args->internal_sequence_id)
{
AXIS2_FREE(env->allocator, args->internal_sequence_id);
args->internal_sequence_id = NULL;
}
if(args->msg_id)
{
AXIS2_FREE(env->allocator, args->msg_id);
args->msg_id = NULL;
}
args->retrans_interval = -1;
args->is_server_side = AXIS2_FALSE;
}
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_process_in_msg (
sandesha2_msg_processor_t *msg_processor,
const axutil_env_t *env,
sandesha2_msg_ctx_t *rm_msg_ctx)
{
axis2_msg_ctx_t *app_msg_ctx = NULL;
axis2_char_t *processed = NULL;
axis2_op_ctx_t *op_ctx = NULL;
axis2_conf_ctx_t *conf_ctx = NULL;
axutil_property_t *property = NULL;
sandesha2_storage_mgr_t *storage_mgr = NULL;
sandesha2_msg_ctx_t *fault_ctx = NULL;
sandesha2_seq_t *rm_sequence = NULL;
axis2_char_t *rmd_sequence_id = NULL;
sandesha2_seq_property_bean_t *msgs_bean = NULL;
long msg_no = 0;
long highest_in_msg_no = 0;
axis2_char_t *msgs_str = NULL;
axis2_char_t msg_num_str[32];
sandesha2_invoker_mgr_t *invoker_mgr = NULL;
sandesha2_sender_mgr_t *sender_mgr = NULL;
sandesha2_next_msg_mgr_t *next_msg_mgr = NULL;
sandesha2_create_seq_mgr_t *create_seq_mgr = NULL;
sandesha2_next_msg_bean_t *next_msg_bean = NULL;
axis2_bool_t in_order_invoke = AXIS2_FALSE;
sandesha2_invoker_bean_t *invoker_bean = NULL;
sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
axis2_char_t *highest_in_msg_no_str = NULL;
axis2_char_t *highest_in_msg_key_str = NULL;
axis2_bool_t msg_no_present_in_list = AXIS2_FALSE;
const axutil_string_t *str_soap_action = NULL;
const axis2_char_t *wsa_action = NULL;
const axis2_char_t *soap_action = NULL;
axis2_char_t *dbname = NULL;
axis2_svc_t *svc = NULL;
sandesha2_property_bean_t *property_bean = NULL;
AXIS2_PARAM_CHECK(env->error, rm_msg_ctx, AXIS2_FAILURE);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2]Entry:sandesha2_app_msg_processor_process_in_msg");
app_msg_ctx = sandesha2_msg_ctx_get_msg_ctx(rm_msg_ctx, env);
if(!app_msg_ctx)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Message context is not set");
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_NULL_MSG_CTX, AXIS2_FAILURE);
return AXIS2_FAILURE;
}
property = sandesha2_msg_ctx_get_property(rm_msg_ctx, env,
SANDESHA2_APPLICATION_PROCESSING_DONE);
if(property)
{
processed = axutil_property_get_value(property, env);
}
if(processed && !axutil_strcmp(processed, "true"))
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Message already processed. So returning here");
return AXIS2_SUCCESS;
}
op_ctx = axis2_msg_ctx_get_op_ctx(app_msg_ctx, env);
/*axis2_op_ctx_set_in_use(op_ctx, env, AXIS2_TRUE);*/
axis2_op_ctx_set_response_written(op_ctx, env, AXIS2_TRUE);
conf_ctx = axis2_msg_ctx_get_conf_ctx(app_msg_ctx, env);
dbname = sandesha2_util_get_dbname(env, conf_ctx);
storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
fault_ctx = sandesha2_fault_mgr_check_for_last_msg_num_exceeded(env, rm_msg_ctx, seq_prop_mgr);
if(fault_ctx)
{
axis2_engine_t *engine = axis2_engine_create(env, conf_ctx);
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2]sandesha2_app_msg_processor_process_in_msg send Fault");
if(!axis2_engine_send_fault(engine, env, sandesha2_msg_ctx_get_msg_ctx(fault_ctx, env)))
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2]An error occured while sending the fault");
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_SENDING_FAULT, AXIS2_FAILURE);
if(fault_ctx)
{
sandesha2_msg_ctx_free(fault_ctx, env);
}
if(engine)
{
axis2_engine_free(engine, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
return AXIS2_FAILURE;
}
if(fault_ctx)
{
sandesha2_msg_ctx_free(fault_ctx, env);
}
if(engine)
{
axis2_engine_free(engine, env);
}
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Pausing message context");
axis2_msg_ctx_set_paused(app_msg_ctx, env, AXIS2_TRUE);
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
return AXIS2_SUCCESS;
}
seq_prop_mgr = sandesha2_permanent_seq_property_mgr_create(env, dbname);
create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
next_msg_mgr = sandesha2_permanent_next_msg_mgr_create(env, dbname);
invoker_mgr = sandesha2_permanent_invoker_mgr_create(env, dbname);
sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
rm_sequence = sandesha2_msg_ctx_get_sequence(rm_msg_ctx, env);
sandesha2_seq_set_must_understand(rm_sequence, env, AXIS2_FALSE);
rmd_sequence_id = sandesha2_identifier_get_identifier(sandesha2_seq_get_identifier(rm_sequence, env), env);
fault_ctx = sandesha2_fault_mgr_check_for_unknown_seq(env,rm_msg_ctx, rmd_sequence_id, seq_prop_mgr,
create_seq_mgr, next_msg_mgr);
if(fault_ctx)
{
axis2_engine_t *engine = axis2_engine_create(env, conf_ctx);
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2]sandesha2_app_msg_processor_process_in_msg send Fault");
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(next_msg_mgr)
{
sandesha2_next_msg_mgr_free(next_msg_mgr, env);
}
if(invoker_mgr)
{
sandesha2_invoker_mgr_free(invoker_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
if(!axis2_engine_send_fault(engine, env, sandesha2_msg_ctx_get_msg_ctx(fault_ctx, env)))
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2]An error occured while sending the fault");
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_SENDING_FAULT, AXIS2_FAILURE);
if(fault_ctx)
{
sandesha2_msg_ctx_free(fault_ctx, env);
}
if(engine)
{
axis2_engine_free(engine, env);
}
return AXIS2_FAILURE;
}
if(fault_ctx)
{
sandesha2_msg_ctx_free(fault_ctx, env);
}
if(engine)
{
axis2_engine_free(engine, env);
}
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Pausing message context");
axis2_msg_ctx_set_paused(app_msg_ctx, env, AXIS2_TRUE);
return AXIS2_SUCCESS;
}
sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
fault_ctx = sandesha2_fault_mgr_check_for_seq_closed(env, rm_msg_ctx, rmd_sequence_id, seq_prop_mgr);
if(fault_ctx)
{
axis2_engine_t *engine = axis2_engine_create(env, conf_ctx);
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2]sandesha2_app_msg_processor_process_in_msg send Fault");
if(seq_prop_mgr)
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
if(create_seq_mgr)
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
if(sender_mgr)
sandesha2_sender_mgr_free(sender_mgr, env);
if(next_msg_mgr)
sandesha2_next_msg_mgr_free(next_msg_mgr, env);
if(invoker_mgr)
sandesha2_invoker_mgr_free(invoker_mgr, env);
if(storage_mgr)
sandesha2_storage_mgr_free(storage_mgr, env);
if(!axis2_engine_send_fault(engine, env, sandesha2_msg_ctx_get_msg_ctx(fault_ctx, env)))
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2]An error occured while sending the fault");
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_SENDING_FAULT, AXIS2_FAILURE);
if(fault_ctx)
{
sandesha2_msg_ctx_free(fault_ctx, env);
}
if(engine)
{
axis2_engine_free(engine, env);
}
return AXIS2_FAILURE;
}
if(fault_ctx)
{
sandesha2_msg_ctx_free(fault_ctx, env);
}
if(engine)
{
axis2_engine_free(engine, env);
}
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Pausing message context");
axis2_msg_ctx_set_paused(app_msg_ctx, env, AXIS2_TRUE);
return AXIS2_SUCCESS;
}
sandesha2_seq_mgr_update_last_activated_time(env, rmd_sequence_id, seq_prop_mgr);
msg_no = sandesha2_msg_number_get_msg_num(sandesha2_seq_get_msg_num(rm_sequence, env), env);
if(0 == msg_no)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2]Invalid message number");
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_INVALID_MSG_NUM, AXIS2_FAILURE);
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(next_msg_mgr)
{
sandesha2_next_msg_mgr_free(next_msg_mgr, env);
}
if(invoker_mgr)
{
sandesha2_invoker_mgr_free(invoker_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
return AXIS2_FAILURE;
}
highest_in_msg_no_str = sandesha2_utils_get_seq_property(env, rmd_sequence_id,
SANDESHA2_SEQ_PROP_HIGHEST_IN_MSG_NUMBER, seq_prop_mgr);
highest_in_msg_key_str = sandesha2_utils_get_seq_property(env, rmd_sequence_id,
SANDESHA2_SEQ_PROP_HIGHEST_IN_MSG_KEY, seq_prop_mgr);
if(!highest_in_msg_key_str)
{
highest_in_msg_key_str = axutil_uuid_gen(env);
}
if(highest_in_msg_no_str)
{
highest_in_msg_no = atol(highest_in_msg_no_str);
}
sprintf(msg_num_str, "%ld", msg_no);
if(msg_no > highest_in_msg_no)
{
sandesha2_seq_property_bean_t *highest_msg_no_bean = NULL;
sandesha2_seq_property_bean_t *highest_msg_key_bean = NULL;
sandesha2_seq_property_bean_t *highest_msg_id_bean = NULL;
const axis2_char_t *msg_id = NULL;
/*axiom_soap_envelope_t *response_envelope = NULL;*/
/*int soap_version = -1;*/
axutil_property_t *property = NULL;
axis2_char_t *client_seq_key = NULL;
highest_in_msg_no = msg_no;
msg_id = axis2_msg_ctx_get_msg_id(app_msg_ctx, env);
/* Store the highest in message number received so far */
highest_msg_no_bean = sandesha2_seq_property_bean_create_with_data(env, rmd_sequence_id,
SANDESHA2_SEQ_PROP_HIGHEST_IN_MSG_NUMBER, msg_num_str);
highest_msg_key_bean = sandesha2_seq_property_bean_create_with_data(env, rmd_sequence_id,
SANDESHA2_SEQ_PROP_HIGHEST_IN_MSG_KEY, highest_in_msg_key_str);
/* Store the id of the highest in message number message */
highest_msg_id_bean = sandesha2_seq_property_bean_create_with_data(env, rmd_sequence_id,
SANDESHA2_SEQ_PROP_HIGHEST_IN_MSG_ID, (axis2_char_t *)msg_id);
/*sandesha2_storage_mgr_remove_msg_ctx(storage_mgr, env,
highest_in_msg_key_str, conf_ctx, -1);
sandesha2_storage_mgr_store_msg_ctx(storage_mgr, env, highest_in_msg_key_str, app_msg_ctx,
AXIS2_TRUE);*/
property = axis2_msg_ctx_get_property(app_msg_ctx, env, SANDESHA2_CLIENT_SEQ_KEY);
if(property)
{
client_seq_key = axutil_property_get_value(property, env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2]Client sequence key:%s found", client_seq_key);
}
else
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2]Client sequence key not found");
}
if(highest_in_msg_no_str)
{
sandesha2_seq_property_mgr_update(seq_prop_mgr, env,
highest_msg_no_bean);
sandesha2_seq_property_mgr_update(seq_prop_mgr, env,
highest_msg_key_bean);
sandesha2_seq_property_mgr_update(seq_prop_mgr, env,
highest_msg_id_bean);
}
else
{
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env,
highest_msg_no_bean);
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env,
highest_msg_key_bean);
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env,
highest_msg_id_bean);
}
if(highest_msg_no_bean)
sandesha2_seq_property_bean_free(highest_msg_no_bean, env);
if(highest_msg_key_bean)
sandesha2_seq_property_bean_free(highest_msg_key_bean, env);
if(highest_msg_id_bean)
sandesha2_seq_property_bean_free(highest_msg_id_bean, env);
}
if(highest_in_msg_no_str)
{
AXIS2_FREE(env->allocator, highest_in_msg_no_str);
}
if(highest_in_msg_key_str)
{
AXIS2_FREE(env->allocator, highest_in_msg_key_str);
}
msgs_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, rmd_sequence_id,
SANDESHA2_SEQ_PROP_SERVER_COMPLETED_MESSAGES);
if(msgs_bean)
{
axis2_char_t *temp_value = sandesha2_seq_property_bean_get_value(msgs_bean, env);
if(temp_value)
{
msgs_str = axutil_strdup(env, temp_value);
}
}
else
{
msgs_bean = sandesha2_seq_property_bean_create(env);
sandesha2_seq_property_bean_set_seq_id(msgs_bean, env, rmd_sequence_id);
sandesha2_seq_property_bean_set_name(msgs_bean, env,
SANDESHA2_SEQ_PROP_SERVER_COMPLETED_MESSAGES);
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, msgs_bean);
}
if(msgs_str)
{
msg_no_present_in_list = sandesha2_app_msg_processor_msg_num_is_in_list(env, msgs_str, msg_no);
}
if(msg_no_present_in_list && !axutil_strcmp(SANDESHA2_QOS_DEFAULT_INVOCATION_TYPE,
SANDESHA2_QOS_EXACTLY_ONCE))
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Pausing message context");
sandesha2_msg_ctx_set_paused(rm_msg_ctx, env, AXIS2_TRUE);
}
if(!msg_no_present_in_list)
{
if(msgs_str)
{
axis2_char_t *tmp_str = NULL;
tmp_str = axutil_strcat(env, msgs_str, ",", msg_num_str, NULL);
AXIS2_FREE(env->allocator, msgs_str);
msgs_str = tmp_str;
}
else
{
msgs_str = axutil_strdup(env, msg_num_str);
}
sandesha2_seq_property_bean_set_value(msgs_bean, env, msgs_str);
sandesha2_seq_property_mgr_update(seq_prop_mgr, env, msgs_bean);
}
sandesha2_seq_property_bean_free(msgs_bean, env);
next_msg_bean = sandesha2_next_msg_mgr_retrieve(next_msg_mgr, env, rmd_sequence_id);
if(!next_msg_bean)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2]Sequence with seq_id:%s does not exist", rmd_sequence_id);
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_SEQ_NOT_EXIST, AXIS2_FAILURE);
if(seq_prop_mgr)
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
if(create_seq_mgr)
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
if(sender_mgr)
sandesha2_sender_mgr_free(sender_mgr, env);
if(next_msg_mgr)
sandesha2_next_msg_mgr_free(next_msg_mgr, env);
if(invoker_mgr)
sandesha2_invoker_mgr_free(invoker_mgr, env);
if(storage_mgr)
sandesha2_storage_mgr_free(storage_mgr, env);
if(msgs_str)
{
AXIS2_FREE(env->allocator, msgs_str);
}
return AXIS2_FAILURE;
}
sandesha2_next_msg_bean_free(next_msg_bean, env);
svc = axis2_msg_ctx_get_svc(app_msg_ctx, env);
if(!svc)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Service is NULL");
return AXIS2_FAILURE;
}
property_bean = sandesha2_utils_get_property_bean(env, svc);
if(!property_bean)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Property bean is NULL");
return AXIS2_FAILURE;
}
in_order_invoke = sandesha2_property_bean_is_in_order(property_bean, env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "in_order_invoke:%d", in_order_invoke);
/* test code */
if(axis2_msg_ctx_get_server_side(app_msg_ctx, env))
{
sandesha2_last_msg_t *last_msg = sandesha2_seq_get_last_msg(rm_sequence, env);
axis2_char_t *msg_id = (axis2_char_t *)axis2_msg_ctx_get_msg_id(app_msg_ctx, env);
if(last_msg)
{
sandesha2_seq_property_bean_t *seq_prop_bean = NULL;
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Last message");
/* Store the id of the last RM 1.0 message */
seq_prop_bean = sandesha2_seq_property_bean_create_with_data(
env, rmd_sequence_id, SANDESHA2_SEQ_PROP_LAST_IN_MESSAGE_ID, msg_id);
if(seq_prop_bean)
{
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, seq_prop_bean);
sandesha2_seq_property_bean_free(seq_prop_bean, env);
}
}
}
/* end test code */
/*
* If this message matches the WSRM 1.0 pattern for an empty last message (e.g.
* the sender wanted to signal the last message, but didn't have an application
* message to send) then we do not need to send the message on to the application.
*/
str_soap_action = axis2_msg_ctx_get_soap_action(app_msg_ctx, env);
soap_action = axutil_string_get_buffer(str_soap_action, env);
wsa_action = axis2_msg_ctx_get_wsa_action(app_msg_ctx, env);
if(!axutil_strcmp(SANDESHA2_SPEC_2005_02_ACTION_LAST_MESSAGE, wsa_action) || 0 == axutil_strcmp(
SANDESHA2_SPEC_2005_02_SOAP_ACTION_LAST_MESSAGE, soap_action))
{
axis2_status_t status = AXIS2_FAILURE;
int mep = AXIS2_MEP_CONSTANT_IN_ONLY;
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Got WSRM 1.0 last message. Send ack and aborting");
/* In order to send the ack message we fake by setting in only mep */
sandesha2_app_msg_processor_send_ack_if_reqd(env, rm_msg_ctx, msgs_str, rmd_sequence_id,
storage_mgr, sender_mgr, seq_prop_mgr, mep);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Pausing message context");
sandesha2_msg_ctx_set_paused(rm_msg_ctx, env, AXIS2_TRUE);
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(next_msg_mgr)
{
sandesha2_next_msg_mgr_free(next_msg_mgr, env);
}
if(invoker_mgr)
{
sandesha2_invoker_mgr_free(invoker_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
if(msgs_str)
{
AXIS2_FREE(env->allocator, msgs_str);
}
return status;
}
if(axis2_msg_ctx_get_server_side(app_msg_ctx, env) && in_order_invoke)
{
sandesha2_seq_property_bean_t *incoming_seq_list_bean = NULL;
axutil_array_list_t *incoming_seq_list = NULL;
axis2_char_t *str_value = NULL;
axutil_property_t *property = NULL;
axis2_char_t *str_key = NULL;
incoming_seq_list_bean = sandesha2_seq_property_mgr_retrieve(
seq_prop_mgr, env, SANDESHA2_SEQ_PROP_ALL_SEQS,
SANDESHA2_SEQ_PROP_INCOMING_SEQ_LIST);
if(!incoming_seq_list_bean)
{
/**
* Our array to_string format is [ele1,ele2,ele3]
* here we don't have a list so [] should be passed
*/
incoming_seq_list_bean = sandesha2_seq_property_bean_create(env);
sandesha2_seq_property_bean_set_seq_id(incoming_seq_list_bean, env,
SANDESHA2_SEQ_PROP_ALL_SEQS);
sandesha2_seq_property_bean_set_name(incoming_seq_list_bean, env,
SANDESHA2_SEQ_PROP_INCOMING_SEQ_LIST);
sandesha2_seq_property_bean_set_value(incoming_seq_list_bean,
env, "[]");
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env,
incoming_seq_list_bean);
}
str_value = sandesha2_seq_property_bean_get_value(
incoming_seq_list_bean, env);
incoming_seq_list = sandesha2_utils_get_array_list_from_string(env,
str_value);
if(!incoming_seq_list)
{
axis2_status_t status = AXIS2_ERROR_GET_STATUS_CODE(env->error);
if(AXIS2_SUCCESS != status)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2]Incoming sequence list empty");
if(seq_prop_mgr)
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
if(create_seq_mgr)
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
if(sender_mgr)
sandesha2_sender_mgr_free(sender_mgr, env);
if(next_msg_mgr)
sandesha2_next_msg_mgr_free(next_msg_mgr, env);
if(invoker_mgr)
sandesha2_invoker_mgr_free(invoker_mgr, env);
if(storage_mgr)
sandesha2_storage_mgr_free(storage_mgr, env);
if(msgs_str)
{
AXIS2_FREE(env->allocator, msgs_str);
}
return status;
}
}
/* Adding current seq to the incoming seq List */
if(!sandesha2_utils_array_list_contains(env,
incoming_seq_list, rmd_sequence_id))
{
axis2_char_t *str_seq_list = NULL;
axutil_array_list_add(incoming_seq_list, env, rmd_sequence_id);
str_seq_list = sandesha2_utils_array_list_to_string(env,
incoming_seq_list, SANDESHA2_ARRAY_LIST_STRING);
/* saving the property. */
sandesha2_seq_property_bean_set_value(incoming_seq_list_bean,
env, str_seq_list);
if(str_seq_list)
AXIS2_FREE(env->allocator, str_seq_list);
sandesha2_seq_property_mgr_update(seq_prop_mgr, env,
incoming_seq_list_bean);
}
/* save the message */
str_key = axutil_uuid_gen(env);
sandesha2_storage_mgr_store_msg_ctx(storage_mgr, env, str_key, app_msg_ctx, AXIS2_TRUE);
invoker_bean = sandesha2_invoker_bean_create_with_data(env, str_key,
msg_no, rmd_sequence_id, AXIS2_FALSE);
if(str_key)
AXIS2_FREE(env->allocator, str_key);
sandesha2_invoker_mgr_insert(invoker_mgr, env, invoker_bean);
property = axutil_property_create_with_args(env, 0, 0, 0,
AXIS2_VALUE_TRUE);
/* To avoid performing application processing more than once. */
sandesha2_msg_ctx_set_property(rm_msg_ctx, env,
SANDESHA2_APPLICATION_PROCESSING_DONE, property);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Pausing message context");
sandesha2_msg_ctx_set_paused(rm_msg_ctx, env, AXIS2_TRUE);
/* Start the invoker if stopped */
/*sandesha2_utils_start_invoker_for_seq(env, conf_ctx, rmd_sequence_id);*/
}
if(!sandesha2_app_msg_processor_send_ack_if_reqd(env, rm_msg_ctx, msgs_str, rmd_sequence_id, storage_mgr,
sender_mgr, seq_prop_mgr, -1))
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Sending acknowledgment failed");
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(next_msg_mgr)
{
sandesha2_next_msg_mgr_free(next_msg_mgr, env);
}
if(invoker_mgr)
{
sandesha2_invoker_mgr_free(invoker_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
if(msgs_str)
{
AXIS2_FREE(env->allocator, msgs_str);
}
return AXIS2_FAILURE;
}
if(msgs_str)
{
AXIS2_FREE(env->allocator, msgs_str);
}
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(next_msg_mgr)
{
sandesha2_next_msg_mgr_free(next_msg_mgr, env);
}
if(invoker_mgr)
{
sandesha2_invoker_mgr_free(invoker_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2]Exit:sandesha2_app_msg_processor_process_in_msg");
return AXIS2_SUCCESS;
}
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_process_out_msg(
sandesha2_msg_processor_t *msg_processor,
const axutil_env_t *env,
sandesha2_msg_ctx_t *rm_msg_ctx)
{
axis2_msg_ctx_t *msg_ctx = NULL;
axis2_conf_ctx_t *conf_ctx = NULL;
sandesha2_storage_mgr_t *storage_mgr = NULL;
sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
sandesha2_create_seq_mgr_t *create_seq_mgr = NULL;
sandesha2_sender_mgr_t *sender_mgr = NULL;
axis2_bool_t is_svr_side = AXIS2_FALSE;
axis2_char_t *internal_sequence_id = NULL;
axis2_char_t *storage_key = NULL;
/*axis2_bool_t last_msg = AXIS2_FALSE;*/
axutil_property_t *property = NULL;
long msg_num_lng = -1;
long system_msg_num = -1;
long msg_number = -1;
axis2_char_t *dummy_msg_str = NULL;
axis2_bool_t dummy_msg = AXIS2_FALSE;
axis2_char_t *rmd_sequence_id = NULL;
/*axis2_bool_t seq_timed_out = AXIS2_FALSE;*/
sandesha2_seq_property_bean_t *res_highest_msg_bean = NULL;
axis2_char_t msg_number_str[32];
axis2_bool_t send_create_seq = AXIS2_FALSE;
sandesha2_seq_property_bean_t *spec_ver_bean = NULL;
axis2_char_t *spec_ver = NULL;
axiom_soap_envelope_t *soap_env = NULL;
axis2_endpoint_ref_t *to_epr = NULL;
sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
axis2_char_t *op_name = NULL;
axis2_char_t *to_addr = NULL;
axis2_op_ctx_t *op_ctx = NULL;
axis2_msg_ctx_t *req_msg_ctx = NULL;
/*axis2_relates_to_t *relates_to = NULL;*/
axis2_char_t *dbname = NULL;
sandesha2_seq_property_bean_t *seq_timeout_bean = NULL;
axis2_status_t status = AXIS2_FAILURE;
sandesha2_msg_ctx_t *req_rm_msg_ctx = NULL;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2]Entry:sandesha2_app_msg_processor_process_out_msg");
AXIS2_PARAM_CHECK(env->error, rm_msg_ctx, AXIS2_FAILURE);
msg_ctx = sandesha2_msg_ctx_get_msg_ctx(rm_msg_ctx, env);
conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
op_ctx = axis2_msg_ctx_get_op_ctx(msg_ctx, env);
req_msg_ctx = axis2_op_ctx_get_msg_ctx(op_ctx, env, AXIS2_WSDL_MESSAGE_LABEL_IN);
/* TODO setting up fault callback */
dbname = sandesha2_util_get_dbname(env, conf_ctx);
storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
seq_prop_mgr = sandesha2_permanent_seq_property_mgr_create(env, dbname);
if(!seq_prop_mgr)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2]seq_prop_mgr is NULL");
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
return AXIS2_FAILURE;
}
create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
is_svr_side = axis2_msg_ctx_get_server_side(msg_ctx, env);
to_epr = axis2_msg_ctx_get_to(msg_ctx, env);
if((!to_epr || !axis2_endpoint_ref_get_address(to_epr, env) || 0 == axutil_strlen(
axis2_endpoint_ref_get_address(to_epr, env))) && !is_svr_side)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2]To epr is not set - a requirement in sandesha client side");
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
return AXIS2_FAILURE;
}
if(!axis2_msg_ctx_get_msg_id(msg_ctx, env))
{
axis2_msg_ctx_set_message_id(msg_ctx, env, axutil_uuid_gen(env));
}
if(is_svr_side)
{
sandesha2_seq_t *req_seq = NULL;
long request_msg_no = -1;
/*const axis2_relates_to_t *relates_to = NULL;*/
/*axis2_char_t *relates_to_value = NULL;*/
/*axis2_char_t *last_req_id = NULL;*/
req_rm_msg_ctx = sandesha2_msg_init_init_msg(env, req_msg_ctx);
req_seq = sandesha2_msg_ctx_get_sequence(req_rm_msg_ctx, env);
if(!req_seq)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Sequence is NULL");
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_SEQ_NOT_EXIST, AXIS2_FAILURE);
if(req_rm_msg_ctx)
{
sandesha2_msg_ctx_free(req_rm_msg_ctx, env);
}
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
return AXIS2_FAILURE;
}
rmd_sequence_id = sandesha2_identifier_get_identifier(sandesha2_seq_get_identifier(req_seq,
env), env);
if(!rmd_sequence_id)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Sequence ID is NULL");
if(req_rm_msg_ctx)
{
sandesha2_msg_ctx_free(req_rm_msg_ctx, env);
}
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
return AXIS2_FAILURE;
}
request_msg_no = sandesha2_msg_number_get_msg_num(sandesha2_seq_get_msg_num(req_seq, env), env);
internal_sequence_id = sandesha2_utils_get_internal_sequence_id(env, rmd_sequence_id);
}
else /* Client side */
{
axis2_char_t *to = NULL;
axis2_char_t *seq_key = NULL;
to = (axis2_char_t*)axis2_endpoint_ref_get_address(to_epr, env);
property = axis2_msg_ctx_get_property(msg_ctx, env, SANDESHA2_CLIENT_SEQ_KEY);
if(property)
{
seq_key = axutil_property_get_value(property, env);
}
if(!seq_key)
{
seq_key = axutil_uuid_gen(env);
property = axutil_property_create_with_args(env, 0, 0, 0, seq_key);
axis2_msg_ctx_set_property(msg_ctx, env, SANDESHA2_CLIENT_SEQ_KEY, property);
}
internal_sequence_id = sandesha2_utils_get_client_internal_sequence_id(env, to,
seq_key);
}
seq_timeout_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id,
SANDESHA2_SEQ_PROP_SEQ_TIMED_OUT);
if(seq_timeout_bean)
{
axis2_bool_t exit_system = AXIS2_FALSE;
axis2_char_t *str_timeout = sandesha2_seq_property_bean_get_value(seq_timeout_bean, env);
if(str_timeout && !axutil_strcmp(AXIS2_VALUE_TRUE, str_timeout))
{
axis2_char_t *temp_int_seq_id = sandesha2_seq_property_bean_get_seq_id(seq_timeout_bean, env);
axis2_char_t *temp_name = sandesha2_seq_property_bean_get_name(seq_timeout_bean, env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Removing the sequence property named %s in the sequence %s", temp_name,
temp_int_seq_id);
sandesha2_seq_property_mgr_remove(seq_prop_mgr, env, temp_int_seq_id, temp_name);
if(req_rm_msg_ctx)
{
sandesha2_msg_ctx_free(req_rm_msg_ctx, env);
}
if(internal_sequence_id)
{
AXIS2_FREE(env->allocator, internal_sequence_id);
}
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
/* We should halt the system here. Otherwise application client keep on sending messages. */
exit_system = AXIS2_TRUE;
}
sandesha2_seq_property_bean_free(seq_timeout_bean, env);
if(exit_system)
{
exit(AXIS2_FAILURE);
}
}
property = axis2_msg_ctx_get_property(msg_ctx, env, SANDESHA2_CLIENT_MESSAGE_NUMBER);
if(property)
{
msg_num_lng = *(long*)(axutil_property_get_value(property, env));
if(msg_num_lng <= 0)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Invalid message number");
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_INVALID_MSG_NUM, AXIS2_FAILURE);
if(req_rm_msg_ctx)
{
sandesha2_msg_ctx_free(req_rm_msg_ctx, env);
}
if(internal_sequence_id)
{
AXIS2_FREE(env->allocator, internal_sequence_id);
}
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
return AXIS2_FAILURE;
}
}
system_msg_num = sandesha2_app_msg_processor_get_prev_msg_no(env, internal_sequence_id, seq_prop_mgr);
if(msg_num_lng > 0 && msg_num_lng <= system_msg_num)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Invalid Message Number");
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_INVALID_MSG_NUM, AXIS2_FAILURE);
if(req_rm_msg_ctx)
{
sandesha2_msg_ctx_free(req_rm_msg_ctx, env);
}
if(internal_sequence_id)
{
AXIS2_FREE(env->allocator, internal_sequence_id);
}
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
return AXIS2_FAILURE;
}
if(msg_num_lng > 0)
{
msg_number = msg_num_lng;
}
else if(system_msg_num > 0)
{
msg_number = system_msg_num + 1;
}
else
{
msg_number = 1;
}
/* A dummy message is a one which will not be processed as a actual
* application message. The RM handlers will simply let these go.
*/
property = axis2_msg_ctx_get_property(msg_ctx, env, SANDESHA2_CLIENT_DUMMY_MESSAGE);
if(property)
{
dummy_msg_str = axutil_property_get_value(property, env);
}
if(dummy_msg_str && 0 == axutil_strcmp(dummy_msg_str, AXIS2_VALUE_TRUE))
{
dummy_msg = AXIS2_TRUE;
}
if(!dummy_msg)
{
sandesha2_app_msg_processor_set_next_msg_no(env, internal_sequence_id, msg_number, seq_prop_mgr);
}
sprintf(msg_number_str, "%ld", msg_number);
res_highest_msg_bean = sandesha2_seq_property_bean_create_with_data(env, internal_sequence_id,
SANDESHA2_SEQ_PROP_HIGHEST_OUT_MSG_NUMBER, msg_number_str);
if(res_highest_msg_bean)
{
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, res_highest_msg_bean);
sandesha2_seq_property_bean_free(res_highest_msg_bean, env);
}
/*if(last_msg)
{
sandesha2_seq_property_bean_t *res_last_msg_key_bean = NULL;
res_last_msg_key_bean = sandesha2_seq_property_bean_create_with_data(env,
internal_sequence_id, SANDESHA2_SEQ_PROP_LAST_OUT_MESSAGE_NO, msg_number_str);
if(res_last_msg_key_bean)
{
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, res_last_msg_key_bean);
sandesha2_seq_property_bean_free(res_last_msg_key_bean, env);
}
}*/
if(is_svr_side)
{
sandesha2_seq_property_bean_t *rmd_to_bean = NULL;
rmd_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, rmd_sequence_id,
SANDESHA2_SEQ_PROP_TO_EPR);
if(rmd_to_bean)
{
axis2_char_t *rmd_to = NULL;
rmd_to = axutil_strdup(env, sandesha2_seq_property_bean_get_value(rmd_to_bean, env));
property = axutil_property_create_with_args(env, 0, AXIS2_TRUE, 0, rmd_to);
axis2_msg_ctx_set_property(msg_ctx, env, SANDESHA2_SEQ_PROP_TO_EPR, property);
sandesha2_seq_property_bean_free(rmd_to_bean, env);
}
spec_ver_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, rmd_sequence_id,
SANDESHA2_SEQ_PROP_RM_SPEC_VERSION);
if(req_rm_msg_ctx)
{
sandesha2_msg_ctx_free(req_rm_msg_ctx, env);
}
if(!spec_ver_bean)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2] Invalid spec version");
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_INVALID_SPEC_VERSION,
AXIS2_FAILURE);
if(internal_sequence_id)
{
AXIS2_FREE(env->allocator, internal_sequence_id);
}
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
return AXIS2_FAILURE;
}
spec_ver = sandesha2_seq_property_bean_get_value(spec_ver_bean, env);
}
else
{
property = axis2_msg_ctx_get_property(msg_ctx, env, SANDESHA2_CLIENT_RM_SPEC_VERSION);
if(property)
{
spec_ver = axutil_property_get_value(property, env);
}
}
if(!spec_ver)
{
spec_ver = sandesha2_spec_specific_consts_get_default_spec_version(env);
}
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "Spec version:%s", spec_ver);
rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
if(1 == msg_number)
{
if(!rms_sequence_bean)
{
send_create_seq = AXIS2_TRUE;
}
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"Starting the rms sequence with rms rms internal sequence id %s",
internal_sequence_id);
sandesha2_seq_mgr_setup_new_rms_sequence(env, msg_ctx, internal_sequence_id, spec_ver,
seq_prop_mgr);
}
if(rms_sequence_bean)
{
sandesha2_seq_property_bean_free(rms_sequence_bean, env);
}
if(spec_ver_bean)
{
sandesha2_seq_property_bean_free(spec_ver_bean, env);
}
if(send_create_seq)
{
sandesha2_seq_property_bean_t *create_seq_added = NULL;
axis2_char_t *addr_ns_uri = NULL;
axis2_char_t *anon_uri = NULL;
create_seq_added = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id,
SANDESHA2_SEQ_PROP_OUT_CREATE_SEQ_SENT);
addr_ns_uri = sandesha2_utils_get_seq_property(env, internal_sequence_id,
SANDESHA2_SEQ_PROP_ADDRESSING_NAMESPACE_VALUE, seq_prop_mgr);
anon_uri = sandesha2_spec_specific_consts_get_anon_uri(env, addr_ns_uri);
if(addr_ns_uri)
{
AXIS2_FREE(env->allocator, addr_ns_uri);
}
if(!create_seq_added)
{
axis2_char_t *acks_to = NULL;
sandesha2_seq_property_bean_t *reply_to_epr_bean = NULL;
create_seq_added = sandesha2_seq_property_bean_create_with_data(env,
internal_sequence_id, SANDESHA2_SEQ_PROP_OUT_CREATE_SEQ_SENT, AXIS2_VALUE_TRUE);
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, create_seq_added);
if(axis2_msg_ctx_get_svc_ctx(msg_ctx, env))
{
property = axis2_msg_ctx_get_property(msg_ctx, env, SANDESHA2_CLIENT_ACKS_TO);
if(property)
{
acks_to = axutil_property_get_value(property, env);
}
}
if(is_svr_side)
{
axis2_endpoint_ref_t *acks_to_epr = NULL;
acks_to_epr = axis2_msg_ctx_get_to(req_msg_ctx, env);
acks_to = (axis2_char_t*)axis2_endpoint_ref_get_address(acks_to_epr, env);
}
else if(!acks_to)
{
acks_to = anon_uri;
}
if(!acks_to && is_svr_side)
{
reply_to_epr_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
rmd_sequence_id, SANDESHA2_SEQ_PROP_REPLY_TO_EPR);
if(reply_to_epr_bean)
{
acks_to = sandesha2_seq_property_bean_get_value(reply_to_epr_bean, env);
}
}
/**
* else if()
* TODO handle acks_to == anon_uri case
*/
status = sandesha2_app_msg_processor_send_create_seq_msg(env, rm_msg_ctx,
internal_sequence_id, acks_to, storage_mgr, seq_prop_mgr, create_seq_mgr,
sender_mgr);
if(reply_to_epr_bean)
{
sandesha2_seq_property_bean_free(reply_to_epr_bean, env);
}
if(AXIS2_SUCCESS != status)
{
/* Pause the message contex so that it won't be sent at transport sender */
axis2_msg_ctx_set_paused(msg_ctx, env, AXIS2_TRUE);
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2] Could not send create sequence message");
if(internal_sequence_id)
{
AXIS2_FREE(env->allocator, internal_sequence_id);
}
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
return status;
}
}
sandesha2_seq_property_bean_free(create_seq_added, env);
}
soap_env = sandesha2_msg_ctx_get_soap_envelope(rm_msg_ctx, env);
if(!soap_env)
{
soap_env = axiom_soap_envelope_create_default_soap_envelope(env,
AXIOM_SOAP12);
sandesha2_msg_ctx_set_soap_envelope(rm_msg_ctx, env, soap_env);
}
if(!sandesha2_msg_ctx_get_msg_id(rm_msg_ctx, env))
{
axis2_char_t *msg_id = NULL;
msg_id = axutil_uuid_gen(env);
sandesha2_msg_ctx_set_msg_id(rm_msg_ctx, env, msg_id);
}
if(is_svr_side)
{
/* Let the request end with 202 if a ack has not been
* written in the incoming thread
*/
axis2_ctx_t *ctx = NULL;
axis2_char_t *written = NULL;
ctx = axis2_op_ctx_get_base(op_ctx, env);
property = axis2_ctx_get_property(ctx, env, SANDESHA2_ACK_WRITTEN);
if(property)
{
written = axutil_property_get_value(property, env);
}
if(!written || axutil_strcmp(written, AXIS2_VALUE_TRUE))
{
if (op_ctx)
{
axis2_op_ctx_set_response_written(op_ctx, env, AXIS2_TRUE);
}
}
}
op_name = axutil_qname_get_localpart(axis2_op_get_qname( axis2_op_ctx_get_op(
axis2_msg_ctx_get_op_ctx(msg_ctx, env), env), env), env);
if (to_epr)
{
to_addr = (axis2_char_t*)axis2_endpoint_ref_get_address(to_epr, env);
}
if(!axis2_msg_ctx_get_wsa_action(msg_ctx, env))
{
axis2_msg_ctx_set_wsa_action(msg_ctx, env, to_addr);
}
if(!axis2_msg_ctx_get_soap_action(msg_ctx, env))
{
axutil_string_t *soap_action = axutil_string_create(env, to_addr);
if(soap_action)
{
axis2_msg_ctx_set_soap_action(msg_ctx, env, soap_action);
axutil_string_free(soap_action, env);
}
}
if(!dummy_msg)
{
storage_key = axutil_uuid_gen(env);
status = sandesha2_app_msg_processor_send_app_msg(env, rm_msg_ctx, internal_sequence_id,
msg_number, storage_key, storage_mgr, create_seq_mgr, seq_prop_mgr, sender_mgr);
if(storage_key)
{
AXIS2_FREE(env->allocator, storage_key);
}
}
if(axis2_msg_ctx_get_server_side(msg_ctx, env))
{
axis2_core_utils_reset_out_msg_ctx(env, msg_ctx);
}
axis2_msg_ctx_set_paused(msg_ctx, env, AXIS2_TRUE);
if(internal_sequence_id)
{
AXIS2_FREE(env->allocator, internal_sequence_id);
}
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Exit:sandesha2_app_msg_processor_process_out_msg");
return status;
}
static axis2_bool_t AXIS2_CALL
sandesha2_app_msg_processor_msg_num_is_in_list(
const axutil_env_t *env,
axis2_char_t *str_list,
long num)
{
axutil_array_list_t *list = NULL;
axis2_char_t str_long[32];
axis2_bool_t ret = AXIS2_FALSE;
AXIS2_PARAM_CHECK(env->error, str_list, AXIS2_FALSE);
sprintf(str_long, "%ld", num);
list = sandesha2_utils_get_array_list_from_string(env, str_list);
if(list)
{
int i = 0, size = 0;
if(axutil_array_list_contains(list, env, str_long))
{
ret = AXIS2_TRUE;
}
size = axutil_array_list_size(list, env);
for(i = 0; i < size; i++)
{
axis2_char_t *str = axutil_array_list_get(list, env, i);
if(str)
{
AXIS2_FREE(env->allocator, str);
str = NULL;
}
}
axutil_array_list_free(list, env);
}
return ret;
}
axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_send_ack_if_reqd(
const axutil_env_t *env,
sandesha2_msg_ctx_t *rm_msg_ctx,
axis2_char_t *msg_str,
axis2_char_t *rmd_sequence_id,
sandesha2_storage_mgr_t *storage_mgr,
sandesha2_sender_mgr_t *sender_mgr,
sandesha2_seq_property_mgr_t *seq_prop_mgr,
int mep)
{
const axis2_char_t *reply_to_addr = NULL;
sandesha2_seq_property_bean_t *acks_to_bean = NULL;
axis2_char_t *acks_to_str = NULL;
axis2_conf_ctx_t *conf_ctx = NULL;
sandesha2_ack_requested_t *ack_requested = NULL;
sandesha2_msg_ctx_t *ack_rm_msg_ctx = NULL;
axis2_msg_ctx_t *ack_msg_ctx = NULL;
axis2_msg_ctx_t *msg_ctx = NULL;
axis2_endpoint_ref_t *reply_to_epr = NULL;
long send_time = -1;
axis2_char_t *key = NULL;
axutil_property_t *property = NULL;
sandesha2_sender_bean_t *ack_bean = NULL;
sandesha2_seq_property_bean_t *rms_internal_sequence_bean = NULL;
axis2_char_t *internal_sequence_id = NULL;
axis2_bool_t sent = AXIS2_TRUE;
axis2_op_ctx_t *op_ctx = NULL;
axis2_op_t *op = NULL;
axis2_char_t *rm_version = NULL;
axis2_bool_t one_way = AXIS2_FALSE;
axis2_bool_t is_anonymous_reply_to = AXIS2_FALSE;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[Sandesha2] Entry:sandesha2_app_msg_processor_send_ack_if_reqd");
AXIS2_PARAM_CHECK(env->error, rm_msg_ctx, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, msg_str, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, seq_prop_mgr, AXIS2_FAILURE);
msg_ctx = sandesha2_msg_ctx_get_msg_ctx(rm_msg_ctx, env);
acks_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, rmd_sequence_id,
SANDESHA2_SEQ_PROP_ACKS_TO_EPR);
if(acks_to_bean)
{
acks_to_str = axutil_strdup(env, sandesha2_seq_property_bean_get_value(acks_to_bean, env));
sandesha2_seq_property_bean_free(acks_to_bean, env);
}
else
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] acknowledgment bean is null");
return AXIS2_FAILURE;
}
reply_to_epr = axis2_msg_ctx_get_reply_to(msg_ctx, env);
if(reply_to_epr)
{
reply_to_addr = axis2_endpoint_ref_get_address(reply_to_epr, env);
}
op_ctx = axis2_msg_ctx_get_op_ctx(msg_ctx, env);
if(op_ctx && mep == -1)
{
op = axis2_op_ctx_get_op(op_ctx, env);
mep = axis2_op_get_axis_specific_mep_const(op, env);
}
one_way = AXIS2_MEP_CONSTANT_IN_ONLY == mep;
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "MEP:%d", mep);
rm_version = sandesha2_utils_get_rm_version(env, rmd_sequence_id, seq_prop_mgr);
if(!rm_version)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2]Unable to find RM spec version for seq_id %s", rmd_sequence_id);
if(acks_to_str)
{
AXIS2_FREE(env->allocator, acks_to_str);
}
return AXIS2_FAILURE;
}
is_anonymous_reply_to = !reply_to_addr || (reply_to_addr && sandesha2_utils_is_anon_uri(env,
reply_to_addr));
if(sandesha2_utils_is_anon_uri(env, acks_to_str) && is_anonymous_reply_to && !one_way)
{
/* This means acknowledgment address is anomymous. Flow comes to this block only in the
* server side. In other words this is replay model in application server side. In this case
* we do not send the acknowledgment message here. Instead we send it in the message out path.
* See sandesha2_app_msg_processor_send_app_msg() code.
*/
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] RM 1.0 replay model in application server side");
if(acks_to_str)
{
AXIS2_FREE(env->allocator, acks_to_str);
}
if(rm_version)
{
AXIS2_FREE(env->allocator, rm_version);
}
return AXIS2_SUCCESS;
}
if(acks_to_str)
{
AXIS2_FREE(env->allocator, acks_to_str);
}
if(rm_version)
{
AXIS2_FREE(env->allocator, rm_version);
}
conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
if(!conf_ctx)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] cont_ctx is NULL");
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_CONF_CTX_NULL, AXIS2_FAILURE);
return AXIS2_FAILURE;
}
ack_requested = sandesha2_msg_ctx_get_ack_requested(rm_msg_ctx, env);
if(ack_requested)
{
sandesha2_ack_requested_set_must_understand(ack_requested, env, AXIS2_FALSE);
sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
}
ack_rm_msg_ctx = sandesha2_ack_mgr_generate_ack_msg(env, rm_msg_ctx, rmd_sequence_id, seq_prop_mgr);
ack_msg_ctx = sandesha2_msg_ctx_get_msg_ctx(ack_rm_msg_ctx, env);
/* If it is not one way message we piggyback the acknowledgment messages on the application messages
* or terminate message. So here we store them in the storage so that when the application/terminate
* message sent it pick it up from the storage to piggyback. See app_msg_send() function.
*/
if(!one_way)
{
axis2_relates_to_t *relates_to = NULL;
const axis2_char_t *related_msg_id = NULL;
axis2_char_t *outgoing_seq_id = NULL;
sandesha2_seq_property_bean_t *relates_to_bean = NULL;
sandesha2_seq_property_bean_t *outgoing_seq_id_bean = NULL;
rms_internal_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
rmd_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_INTERNAL_SEQ_ID);
if(rms_internal_sequence_bean)
{
internal_sequence_id = sandesha2_seq_property_bean_get_value(rms_internal_sequence_bean, env);
}
key = axutil_uuid_gen(env);
ack_bean = sandesha2_sender_bean_create(env);
/* To find the outgoing sequence id we use the related message sent. We face this problem of
* finding the outgoing sequence id only in the application client side. As a solution when
* messages are sent from the application client side we store the
* SANDESHA2_SEQ_PROP_RELATED_MSG_ID property which can be used to retrieve the outgoing
* sequence id as follows.
*/
relates_to = axis2_msg_ctx_get_relates_to(msg_ctx, env);
if(relates_to)
{
related_msg_id = axis2_relates_to_get_value(relates_to, env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "related_msg_id:%s", related_msg_id);
relates_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
related_msg_id, SANDESHA2_SEQ_PROP_RELATED_MSG_ID);
if(relates_to_bean)
{
outgoing_seq_id = sandesha2_seq_property_bean_get_value(relates_to_bean, env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "outgoing_seq_id:%s", outgoing_seq_id);
sandesha2_sender_bean_set_seq_id(ack_bean, env, outgoing_seq_id);
sandesha2_seq_property_mgr_remove(seq_prop_mgr, env, (axis2_char_t *) related_msg_id,
SANDESHA2_SEQ_PROP_RELATED_MSG_ID);
}
}
if(!outgoing_seq_id)
{
axis2_char_t *outgoing_internal_seq_id = NULL;
axis2_char_t *outgoing_seq_id = NULL;
outgoing_internal_seq_id = sandesha2_utils_get_internal_sequence_id(env, rmd_sequence_id);
outgoing_seq_id_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
outgoing_internal_seq_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
if(outgoing_seq_id_bean)
{
outgoing_seq_id = sandesha2_seq_property_bean_get_value(outgoing_seq_id_bean, env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "outgoing_seq_id:%s", outgoing_seq_id);
sandesha2_sender_bean_set_seq_id(ack_bean, env, outgoing_seq_id);
}
}
/* Store the sender bean for the acknowledgement message which can be used later to find and
* retrieve the acknowledgment message context from storage for piggybacking purposes.
*/
sandesha2_sender_bean_set_msg_ctx_ref_key(ack_bean, env, key);
send_time = sandesha2_utils_get_current_time_in_millis(env);
sandesha2_sender_bean_set_time_to_send(ack_bean, env, send_time);
sandesha2_sender_bean_set_msg_id(ack_bean, env, sandesha2_msg_ctx_get_msg_id(ack_rm_msg_ctx, env));
sandesha2_sender_bean_set_send(ack_bean, env, AXIS2_TRUE);
sandesha2_sender_bean_set_internal_seq_id(ack_bean, env, internal_sequence_id);
/*sandesha2_sender_bean_set_seq_id(ack_bean, env, rmd_sequence_id);*/
sandesha2_sender_bean_set_msg_type(ack_bean, env, SANDESHA2_MSG_TYPE_ACK);
sandesha2_sender_bean_set_resend(ack_bean, env, AXIS2_FALSE);
sandesha2_sender_mgr_insert(sender_mgr, env, ack_bean);
if(relates_to_bean)
{
sandesha2_seq_property_bean_free(relates_to_bean, env);
}
if(outgoing_seq_id_bean)
{
sandesha2_seq_property_bean_free(outgoing_seq_id_bean, env);
}
if(rms_internal_sequence_bean)
{
sandesha2_seq_property_bean_free(rms_internal_sequence_bean, env);
}
if(ack_bean)
{
sandesha2_sender_bean_free(ack_bean, env);
}
property = axutil_property_create_with_args(env, 0, AXIS2_TRUE, 0, key);
axis2_msg_ctx_set_property(ack_msg_ctx, env, SANDESHA2_MESSAGE_STORE_KEY, property);
}
/* If it is one way message in server side this is the only place we can send the acknowledgment.
* In all other cases we do not send the acknowledgment directly, but piggyback it on application
* messages or terminate sequence message.
*/
if(ack_rm_msg_ctx && one_way)
{
axis2_engine_t *engine = NULL;
engine = axis2_engine_create(env, conf_ctx);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Back channel is free");
sent = axis2_engine_send(engine, env, ack_msg_ctx);
if(engine)
{
axis2_engine_free(engine, env);
}
/* Reset the message context to avoid double freeing of transport out stream */
if(ack_msg_ctx)
{
axis2_core_utils_reset_out_msg_ctx(env, ack_msg_ctx);
}
}
/* Store the acknowledgement message context. */
sandesha2_storage_mgr_store_msg_ctx(storage_mgr, env, key, ack_msg_ctx, AXIS2_FALSE);
if(ack_rm_msg_ctx)
{
sandesha2_msg_ctx_free(ack_rm_msg_ctx, env);
}
/* Since we have stored this in storage and when piggybacking it is only taken from storage
* we can free this now.
*/
if(ack_msg_ctx)
{
axis2_endpoint_ref_t *temp_epr = NULL;
temp_epr = axis2_msg_ctx_get_to(ack_msg_ctx, env);
if(temp_epr)
{
axis2_endpoint_ref_free(temp_epr, env);
}
axis2_core_utils_reset_out_msg_ctx(env, ack_msg_ctx);
axis2_msg_ctx_free(ack_msg_ctx, env);
}
if(!sent)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[Sandesha2] Engine Send failed");
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_SENDING_ACK, AXIS2_FAILURE);
return AXIS2_FAILURE;
}
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[Sandesha2] Exit:sandesha2_app_msg_processor_send_ack_if_reqd");
return AXIS2_SUCCESS;
}
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_send_create_seq_msg(
const axutil_env_t *env,
sandesha2_msg_ctx_t *rm_msg_ctx,
axis2_char_t *internal_sequence_id,
axis2_char_t *acks_to,
sandesha2_storage_mgr_t *storage_mgr,
sandesha2_seq_property_mgr_t *seq_prop_mgr,
sandesha2_create_seq_mgr_t *create_seq_mgr,
sandesha2_sender_mgr_t *sender_mgr)
{
axis2_msg_ctx_t *msg_ctx = NULL;
sandesha2_create_seq_t *create_seq_part = NULL;
sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
sandesha2_msg_ctx_t *create_seq_rm_msg_ctx = NULL;
sandesha2_seq_offer_t *seq_offer = NULL;
axis2_msg_ctx_t *create_seq_msg_ctx = NULL;
sandesha2_create_seq_bean_t *create_seq_bean = NULL;
axis2_char_t *addr_ns_uri = NULL;
axis2_char_t *anon_uri = NULL;
axis2_char_t *create_sequence_msg_store_key = NULL;
axis2_transport_out_desc_t *transport_out = NULL;
axis2_transport_sender_t *transport_sender = NULL;
AXIS2_TRANSPORT_ENUMS transport = -1;
axis2_engine_t *engine = NULL;
axis2_op_t *create_seq_op = NULL;
axis2_status_t status = AXIS2_FAILURE;
axis2_bool_t continue_sending = AXIS2_TRUE;
long retrans_interval = -1;
sandesha2_property_bean_t *property_bean = NULL;
axis2_conf_ctx_t *conf_ctx = NULL;
axis2_char_t *msg_id = NULL;
sandesha2_sender_bean_t *create_sequence_sender_bean = NULL;
long millisecs = 0;
sandesha2_seq_property_bean_t *reply_to_bean = NULL;
axis2_char_t *reply_to_addr = NULL;
axis2_char_t *rm_version = NULL;
axis2_bool_t is_svr_side = AXIS2_FALSE;
axis2_op_ctx_t *temp_op_ctx = NULL;
axis2_listener_manager_t *listener_manager = NULL;
axis2_svc_t *svc = NULL;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[Sandesha2]Entry:sandesha2_app_msg_processor_send_create_seq_msg");
AXIS2_PARAM_CHECK(env->error, rm_msg_ctx, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, internal_sequence_id, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, acks_to, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, storage_mgr, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, create_seq_mgr, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, seq_prop_mgr, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, sender_mgr, AXIS2_FAILURE);
msg_ctx = sandesha2_msg_ctx_get_msg_ctx(rm_msg_ctx, env);
conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
create_seq_rm_msg_ctx = sandesha2_msg_creator_create_create_seq_msg(env, rm_msg_ctx, internal_sequence_id,
acks_to, seq_prop_mgr);
if(!create_seq_rm_msg_ctx)
{
return AXIS2_FAILURE;
}
svc = axis2_msg_ctx_get_svc(msg_ctx, env);
if(!svc)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Service is NULL");
return AXIS2_FAILURE;
}
property_bean = sandesha2_utils_get_property_bean(env, svc);
if(!property_bean)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Property bean is NULL");
return AXIS2_FAILURE;
}
retrans_interval = sandesha2_property_bean_get_retrans_interval(property_bean, env);
/* If this is a one way message and if use_separate_listener property is set to true we need to
* start a listener manager so that create sequence response could be listened at. Note that
* this mechanism need to be improved later as currently there is no way to stop the listner.
*/
temp_op_ctx = axis2_msg_ctx_get_op_ctx(msg_ctx, env);
if(temp_op_ctx)
{
const axis2_char_t *mep = NULL;
axis2_op_t *op = NULL;
op = axis2_op_ctx_get_op(temp_op_ctx, env);
mep = axis2_op_get_msg_exchange_pattern(op, env);
if(!axutil_strcmp(mep, AXIS2_MEP_URI_OUT_ONLY) ||
!axutil_strcmp(mep, AXIS2_MEP_URI_ROBUST_OUT_ONLY))
{
axis2_char_t *use_separate_listener = NULL;
axutil_property_t *property = NULL;
property = axis2_msg_ctx_get_property(msg_ctx, env, AXIS2_USE_SEPARATE_LISTENER);
if(property)
{
use_separate_listener = axutil_property_get_value(property, env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] use_separate_listener:%s",
use_separate_listener);
if(!axutil_strcmp(AXIS2_VALUE_TRUE, use_separate_listener))
{
axis2_transport_out_desc_t *transport_out_desc = NULL;
transport_out_desc = axis2_msg_ctx_get_transport_out_desc(msg_ctx, env);
if(transport_out_desc)
{
transport = axis2_transport_out_desc_get_enum(transport_out_desc, env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] transport:%d", transport);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Starting listener manager");
listener_manager = axis2_listener_manager_create(env);
/* TODO Need to call axis2_listener_manager_stop and clean listener manager */
status = axis2_listener_manager_make_sure_started(listener_manager, env,
transport, conf_ctx);
if(AXIS2_SUCCESS != status)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2] Starting listener manager failed");
return AXIS2_FAILURE;
}
/* Following sleep is required to ensure the listner is ready to receive response.
* If it is missing, the response gets lost. */
AXIS2_USLEEP(1);
}
}
}
}
}
sandesha2_msg_ctx_set_flow(create_seq_rm_msg_ctx, env, SANDESHA2_MSG_CTX_OUT_FLOW);
create_seq_part = sandesha2_msg_ctx_get_create_seq(create_seq_rm_msg_ctx, env);
{
sandesha2_seq_property_bean_t *to_epr_bean = NULL;
axis2_endpoint_ref_t *to_epr = axis2_msg_ctx_get_to(msg_ctx, env);
if(to_epr)
{
axis2_char_t *to_str = (axis2_char_t *)axis2_endpoint_ref_get_address(to_epr, env);
to_epr_bean = sandesha2_seq_property_bean_create_with_data(env, internal_sequence_id,
SANDESHA2_SEQ_PROP_TO_EPR, to_str);
if(to_epr_bean)
{
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, to_epr_bean);
sandesha2_seq_property_bean_free(to_epr_bean, env);
}
}
}
seq_offer = sandesha2_create_seq_get_seq_offer(create_seq_part, env);
if(seq_offer)
{
axis2_char_t *seq_offer_id = NULL;
sandesha2_seq_property_bean_t *offer_seq_bean = NULL;
seq_offer_id = sandesha2_identifier_get_identifier(sandesha2_seq_offer_get_identifier(
seq_offer, env), env);
offer_seq_bean = sandesha2_seq_property_bean_create(env);
sandesha2_seq_property_bean_set_name(offer_seq_bean, env, SANDESHA2_SEQ_PROP_OFFERED_SEQ);
sandesha2_seq_property_bean_set_seq_id(offer_seq_bean, env, internal_sequence_id);
sandesha2_seq_property_bean_set_value(offer_seq_bean, env, seq_offer_id);
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, offer_seq_bean);
sandesha2_seq_property_bean_free(offer_seq_bean, env);
}
create_seq_msg_ctx = sandesha2_msg_ctx_get_msg_ctx(create_seq_rm_msg_ctx, env);
if(!create_seq_msg_ctx)
{
if(create_seq_rm_msg_ctx)
{
sandesha2_msg_ctx_free(create_seq_rm_msg_ctx, env);
}
if(listener_manager)
{
axis2_listener_manager_stop(listener_manager, env, transport);
axis2_listener_manager_free(listener_manager, env);
}
return AXIS2_FAILURE;
}
axis2_msg_ctx_set_relates_to(create_seq_msg_ctx, env, NULL);
/* Create sequence message created here will be used by create sequence response message processor
* to retrieve message id
*/
create_sequence_msg_store_key = axutil_uuid_gen(env);
create_seq_bean = sandesha2_create_seq_bean_create_with_data(env, internal_sequence_id,
(axis2_char_t*)axis2_msg_ctx_get_wsa_message_id(create_seq_msg_ctx, env), NULL);
if(create_seq_bean)
{
sandesha2_create_seq_bean_set_ref_msg_store_key(create_seq_bean, env, create_sequence_msg_store_key);
sandesha2_create_seq_mgr_insert(create_seq_mgr, env, create_seq_bean);
sandesha2_create_seq_bean_free(create_seq_bean, env);
}
addr_ns_uri = sandesha2_utils_get_seq_property(env, internal_sequence_id,
SANDESHA2_SEQ_PROP_ADDRESSING_NAMESPACE_VALUE, seq_prop_mgr);
anon_uri = sandesha2_spec_specific_consts_get_anon_uri(env, addr_ns_uri);
if(addr_ns_uri)
{
AXIS2_FREE(env->allocator, addr_ns_uri);
}
if(!axis2_msg_ctx_get_reply_to(create_seq_msg_ctx, env))
{
axis2_endpoint_ref_t *cs_epr = NULL;
cs_epr = axis2_endpoint_ref_create(env, anon_uri);
axis2_msg_ctx_set_reply_to(create_seq_msg_ctx, env, cs_epr);
}
/* Create and store create sequence sender bean. This will be used later to find and retrieve
* create sequence message context stored in the storage.
*/
create_sequence_sender_bean = sandesha2_sender_bean_create(env);
sandesha2_sender_bean_set_msg_ctx_ref_key(create_sequence_sender_bean, env, create_sequence_msg_store_key);
millisecs = sandesha2_utils_get_current_time_in_millis(env);
sandesha2_sender_bean_set_time_to_send(create_sequence_sender_bean, env, millisecs);
msg_id = sandesha2_msg_ctx_get_msg_id(create_seq_rm_msg_ctx, env);
sandesha2_sender_bean_set_msg_id(create_sequence_sender_bean, env, msg_id);
sandesha2_sender_bean_set_internal_seq_id(create_sequence_sender_bean, env, internal_sequence_id);
sandesha2_sender_bean_set_send(create_sequence_sender_bean, env, AXIS2_TRUE);
sandesha2_sender_bean_set_msg_type(create_sequence_sender_bean, env, SANDESHA2_MSG_TYPE_CREATE_SEQ);
sandesha2_sender_mgr_insert(sender_mgr, env, create_sequence_sender_bean);
conf_ctx = axis2_msg_ctx_get_conf_ctx(create_seq_msg_ctx, env);
engine = axis2_engine_create(env, conf_ctx);
if(create_seq_rm_msg_ctx)
{
sandesha2_msg_ctx_free(create_seq_rm_msg_ctx, env);
}
create_seq_op = axis2_msg_ctx_get_op(create_seq_msg_ctx, env);
transport_out = axis2_msg_ctx_get_transport_out_desc(create_seq_msg_ctx, env);
transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
reply_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id,
SANDESHA2_SEQ_PROP_REPLY_TO_EPR);
if(reply_to_bean)
{
reply_to_addr = sandesha2_seq_property_bean_get_value(reply_to_bean, env);
}
rm_version = sandesha2_utils_get_rm_version(env, internal_sequence_id, seq_prop_mgr);
if(!rm_version)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2] Unable to find RM spec version for the rms internal_sequence_id %s",
internal_sequence_id);
if(listener_manager)
{
axis2_listener_manager_stop(listener_manager, env, transport);
axis2_listener_manager_free(listener_manager, env);
}
return AXIS2_FAILURE;
}
is_svr_side = axis2_msg_ctx_get_server_side(create_seq_msg_ctx, env);
/* If client side and in case of one of the following
* 1. listener_manager is not NULL
* 2. reply_to_addr is NULL
* 3. reply_to_addr is anonymous
* go into the following loop.
*/
if(!is_svr_side && (listener_manager || !reply_to_addr || sandesha2_utils_is_anon_uri(env,
reply_to_addr)))
{
/* Store the create sequence message context in the storage */
sandesha2_storage_mgr_store_msg_ctx(storage_mgr, env, create_sequence_msg_store_key,
create_seq_msg_ctx, AXIS2_TRUE);
AXIS2_FREE(env->allocator, create_sequence_msg_store_key);
if(axis2_engine_send(engine, env, create_seq_msg_ctx))
{
if(!axis2_msg_ctx_get_server_side(create_seq_msg_ctx, env))
{
status = sandesha2_app_msg_processor_process_create_seq_response(env, create_seq_msg_ctx,
storage_mgr);
}
}
else
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Engine Send failed");
}
if(engine)
{
axis2_engine_free(engine, env);
}
rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id,
SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
while(!rms_sequence_bean)
{
continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, create_sequence_sender_bean,
conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr, svc);
sandesha2_sender_mgr_update(sender_mgr, env, create_sequence_sender_bean);
if(!continue_sending)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Do not continue sending the create sequence message");
status = AXIS2_FAILURE;
break;
}
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Sleep before resending application message");
AXIS2_SLEEP(retrans_interval);
if(transport_sender)
{
/* This is neccessary to avoid a double free */
axis2_msg_ctx_set_property(msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, create_seq_msg_ctx))
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Transport sender invoke failed");
}
}
if(!axis2_msg_ctx_get_server_side(create_seq_msg_ctx, env))
{
status = sandesha2_app_msg_processor_process_create_seq_response(env, create_seq_msg_ctx,
storage_mgr);
if(AXIS2_SUCCESS != status)
{
break;
}
}
rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id,
SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
}
if(rms_sequence_bean)
{
sandesha2_seq_property_bean_free(rms_sequence_bean, env);
}
if(create_sequence_sender_bean)
{
sandesha2_sender_bean_free(create_sequence_sender_bean, env);
}
/* We have created this message context using sandesha2_utils_create_new_related_msg_ctx(). It is our
* reponsiblity to free if after use.
*/
if(create_seq_msg_ctx)
{
axis2_msg_ctx_free(create_seq_msg_ctx, env);
}
}
else /* Dual channel */
{
/* This is actually a trick that get the msg_ctx traversed through all the out phases.
* Once all the phases are passed it will get hit into the false sandesha2 transport
* sender which just reset the original transport sender back.
*/
axutil_property_t *property = NULL;
axis2_transport_out_desc_t *orig_transport_out = NULL;
axis2_transport_out_desc_t *sandesha2_transport_out = NULL;
orig_transport_out = axis2_msg_ctx_get_transport_out_desc(create_seq_msg_ctx, env);
property = axutil_property_create_with_args(env, 0, 0, 0, orig_transport_out);
axis2_msg_ctx_set_property(create_seq_msg_ctx, env, SANDESHA2_ORIGINAL_TRANSPORT_OUT_DESC,
property);
sandesha2_transport_out = sandesha2_utils_get_transport_out(env);
axis2_msg_ctx_set_transport_out_desc(create_seq_msg_ctx, env, sandesha2_transport_out);
if(!axis2_engine_send(engine, env, create_seq_msg_ctx))
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Engine Send failed");
}
if(engine)
{
axis2_engine_free(engine, env);
}
/* Store the create sequence message context in the storage */
sandesha2_storage_mgr_store_msg_ctx(storage_mgr, env, create_sequence_msg_store_key,
create_seq_msg_ctx, AXIS2_TRUE);
AXIS2_FREE(env->allocator, create_sequence_msg_store_key);
/*rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);*/
/* In dual channel create sequence message is sent in a separate thread. This thread will
* run until create sequence response message is received or timeout or re-sends
* exceed the maximum number of re-sends as specified in Policy.
*/
status = sandesha2_app_msg_processor_start_create_seq_msg_resender(env, conf_ctx,
internal_sequence_id, msg_id, is_svr_side, retrans_interval);
}
if(rm_version)
{
AXIS2_FREE(env->allocator, rm_version);
}
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[Sandesha2]Exit:sandesha2_app_msg_processor_send_create_seq_msg");
return status;
}
static axis2_status_t
sandesha2_app_msg_processor_start_create_seq_msg_resender(
const axutil_env_t *env,
axis2_conf_ctx_t *conf_ctx,
axis2_char_t *internal_sequence_id,
axis2_char_t *msg_id,
const axis2_bool_t is_server_side,
int retrans_interval)
{
axutil_thread_t *worker_thread = NULL;
sandesha2_app_msg_processor_args_t *args = NULL;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Entry:sandesha2_app_msg_processor_start_create_seq_msg_resender");
axutil_allocator_switch_to_global_pool(env->allocator);
args = sandesha2_app_msg_processor_args_create((axutil_env_t *) env, conf_ctx, internal_sequence_id,
msg_id, is_server_side, retrans_interval, NULL);
args->env = axutil_init_thread_env(env);
worker_thread = axutil_thread_pool_get_thread(env->thread_pool,
sandesha2_app_msg_processor_create_seq_msg_worker_function, (void*)args);
if(!worker_thread)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2] Thread creation failed for sandesha2_app_msg_processor_start_create_seq_msg_resender");
axutil_allocator_switch_to_local_pool(env->allocator);
return AXIS2_FAILURE;
}
axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
axutil_allocator_switch_to_local_pool(env->allocator);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Exit:sandesha2_app_msg_processor_start_create_seq_msg_resender");
return AXIS2_SUCCESS;
}
static void * AXIS2_THREAD_FUNC
sandesha2_app_msg_processor_create_seq_msg_worker_function(
axutil_thread_t *thd,
void *data)
{
sandesha2_app_msg_processor_args_t *args;
axutil_env_t *env = NULL;
sandesha2_storage_mgr_t *storage_mgr = NULL;
sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
sandesha2_create_seq_mgr_t *create_seq_mgr = NULL;
sandesha2_sender_mgr_t *sender_mgr = NULL;
int retrans_interval = 0;
axis2_char_t *dbname = NULL;
axis2_conf_ctx_t *conf_ctx = NULL;
axis2_char_t *internal_sequence_id = NULL;
axis2_bool_t is_server_side = AXIS2_FALSE;
axis2_char_t *msg_id = NULL;
/* sandesha2_seq_property_bean_t *rms_sequence_bean = NULL; */
axis2_bool_t continue_sending = AXIS2_TRUE;
axis2_transport_out_desc_t *transport_out = NULL;
axis2_transport_sender_t *transport_sender = NULL;
axis2_msg_ctx_t *create_seq_msg_ctx = NULL;
sandesha2_sender_bean_t *find_sender_bean = NULL;
sandesha2_sender_bean_t *sender_bean = NULL;
axis2_svc_t *svc = NULL;
args = (sandesha2_app_msg_processor_args_t*) data;
env = args->env;
axutil_allocator_switch_to_global_pool(env->allocator);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Entry:sandesha2_app_msg_processor_create_seq_msg_worker_function");
conf_ctx = args->conf_ctx;
msg_id = args->msg_id;
internal_sequence_id = args->internal_sequence_id;
is_server_side = args->is_server_side;
retrans_interval = args->retrans_interval;
dbname = sandesha2_util_get_dbname(env, conf_ctx);
storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
seq_prop_mgr = sandesha2_permanent_seq_property_mgr_create(env, dbname);
create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
find_sender_bean = sandesha2_sender_bean_create(env);
sandesha2_sender_bean_set_msg_type(find_sender_bean, env, SANDESHA2_MSG_TYPE_CREATE_SEQ);
sandesha2_sender_bean_set_internal_seq_id(find_sender_bean, env, internal_sequence_id);
sandesha2_sender_bean_set_send(find_sender_bean, env, AXIS2_TRUE);
sender_bean = sandesha2_sender_mgr_find_unique(sender_mgr, env, find_sender_bean);
while(sender_bean)
{
axis2_char_t *key = NULL;
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Sender bean found");
key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_bean, env);
if(!create_seq_msg_ctx)
{
create_seq_msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, key,
conf_ctx, AXIS2_TRUE);
transport_out = axis2_msg_ctx_get_transport_out_desc(create_seq_msg_ctx, env);
transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
svc = axis2_msg_ctx_get_svc(create_seq_msg_ctx, env);
if(!svc)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2] Service is NULL");
AXIS2_ERROR_SET(env->error, AXIS2_ERROR_SVC_OR_OP_NOT_FOUND, AXIS2_FAILURE);
break;
}
}
continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, sender_bean, conf_ctx,
storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr, svc);
sandesha2_sender_mgr_update(sender_mgr, env, sender_bean);
if(!continue_sending)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Do not continue sending the create sequence message");
break;
}
if(transport_sender)
{
/* This is neccessary to avoid a double free */
axis2_msg_ctx_set_property(create_seq_msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, create_seq_msg_ctx))
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Transport sender invoke failed");
}
}
sandesha2_sender_bean_free(sender_bean, env);
sender_bean = NULL;
sender_bean = sandesha2_sender_mgr_find_unique(sender_mgr, env, find_sender_bean);
if(sender_bean)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Sleep before resending application message");
AXIS2_SLEEP(retrans_interval);
}
}
if(find_sender_bean)
{
sandesha2_sender_bean_free(find_sender_bean, env);
}
if(create_seq_msg_ctx)
{
axis2_msg_ctx_free(create_seq_msg_ctx, env);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
sandesha2_app_msg_processor_args_free(args, env);
axutil_allocator_switch_to_local_pool(env->allocator);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Exit:sandesha2_app_msg_processor_create_seq_msg_worker_function");
axutil_free_thread_env(env);
return NULL;
}
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_process_create_seq_response(
const axutil_env_t *env,
axis2_msg_ctx_t *create_seq_msg_ctx,
sandesha2_storage_mgr_t *storage_mgr)
{
axis2_msg_ctx_t *response_msg_ctx = NULL;
axiom_soap_envelope_t *response_envelope = NULL;
axis2_char_t *soap_ns_uri = NULL;
axis2_conf_ctx_t *conf_ctx = NULL;
axis2_engine_t *engine = NULL;
axis2_status_t status = AXIS2_FAILURE;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Entry:sandesha2_app_msg_processor_process_create_seq_response");
AXIS2_PARAM_CHECK(env->error, create_seq_msg_ctx, AXIS2_FAILURE);
conf_ctx = axis2_msg_ctx_get_conf_ctx(create_seq_msg_ctx, env);
soap_ns_uri = axis2_msg_ctx_get_is_soap_11(create_seq_msg_ctx, env) ?
AXIOM_SOAP11_SOAP_ENVELOPE_NAMESPACE_URI:
AXIOM_SOAP12_SOAP_ENVELOPE_NAMESPACE_URI;
response_envelope = axis2_msg_ctx_get_response_soap_envelope(create_seq_msg_ctx, env);
if(!response_envelope)
{
response_envelope = (axiom_soap_envelope_t *) axis2_http_transport_utils_create_soap_msg(env,
create_seq_msg_ctx, soap_ns_uri);
if(!response_envelope)
{
/* There is no response message context. */
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Response envelope not found");
return AXIS2_SUCCESS;
}
}
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Response envelope for CreateSequenceResponse message found");
response_msg_ctx = axis2_msg_ctx_create(env, conf_ctx,
axis2_msg_ctx_get_transport_in_desc(create_seq_msg_ctx, env),
axis2_msg_ctx_get_transport_out_desc(create_seq_msg_ctx, env));
axis2_msg_ctx_set_status_code (response_msg_ctx, env, axis2_msg_ctx_get_status_code (create_seq_msg_ctx, env));
/* Note that we set here as client side to indicate that we are in the application client side. */
axis2_msg_ctx_set_server_side(response_msg_ctx, env, AXIS2_FALSE);
axis2_msg_ctx_set_op_ctx(response_msg_ctx, env, axis2_msg_ctx_get_op_ctx(create_seq_msg_ctx, env));
axis2_msg_ctx_set_conf_ctx(response_msg_ctx, env, conf_ctx);
axis2_msg_ctx_set_svc_ctx(response_msg_ctx, env, axis2_msg_ctx_get_svc_ctx(create_seq_msg_ctx, env));
axis2_msg_ctx_set_svc_grp_ctx(response_msg_ctx, env, axis2_msg_ctx_get_svc_grp_ctx(create_seq_msg_ctx,
env));
axis2_msg_ctx_set_soap_envelope(response_msg_ctx, env, response_envelope);
engine = axis2_engine_create(env, conf_ctx);
if(engine)
{
if(sandesha2_util_is_fault_envelope(env, response_envelope))
{
status = axis2_engine_receive_fault(engine, env, response_msg_ctx);
}
else
{
/* Note that this engine flow does not end with an message receiver, because
* when hit sandesha2_create_seq_response_msg_processor_process_in_msg()
* function it pause message context at the end of the function.
*/
status = axis2_engine_receive(engine, env, response_msg_ctx);
}
axis2_engine_free(engine, env);
}
/* Note that as explained above this message context is not added to the operation context,
* therefore will not be freed when operation context's msg_ctx_map is freed. So we need to
* free the response message here.
*/
axis2_msg_ctx_free(response_msg_ctx, env);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Exit:sandesha2_app_msg_processor_process_create_seq_response");
return status;
}
/*
* First
*/
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_send_app_msg(
const axutil_env_t *env,
sandesha2_msg_ctx_t *rm_msg_ctx,
axis2_char_t *internal_sequence_id,
long msg_num,
axis2_char_t *storage_key,
sandesha2_storage_mgr_t *storage_mgr,
sandesha2_create_seq_mgr_t *create_seq_mgr,
sandesha2_seq_property_mgr_t *seq_prop_mgr,
sandesha2_sender_mgr_t *sender_mgr)
{
axis2_msg_ctx_t *app_msg_ctx = NULL;
sandesha2_seq_property_bean_t *to_bean = NULL;
sandesha2_seq_property_bean_t *reply_to_bean = NULL;
sandesha2_seq_property_bean_t *from_acks_to_bean = NULL;
axis2_endpoint_ref_t *to_epr = NULL;
axis2_endpoint_ref_t *reply_to_epr = NULL;
axis2_char_t *from_acks_to_addr = NULL;
axis2_char_t *to_addr = NULL;
axis2_char_t *reply_to_addr = NULL;
axis2_char_t *new_to_str = NULL;
sandesha2_seq_t *rm_sequence = NULL;
sandesha2_seq_t *req_seq = NULL;
axis2_char_t *rm_version = NULL;
axis2_char_t *rm_ns_val = NULL;
sandesha2_msg_number_t *msg_number = NULL;
axis2_msg_ctx_t *req_msg = NULL;
sandesha2_sender_bean_t *app_msg_sender_bean = NULL;
long millisecs = 0;
axis2_engine_t *engine = NULL;
axis2_char_t *msg_id = NULL;
axis2_bool_t last_msg = AXIS2_FALSE;
axis2_op_ctx_t *temp_op_ctx = NULL;
axis2_status_t status = AXIS2_SUCCESS;
axis2_conf_ctx_t *conf_ctx = NULL;
axis2_bool_t is_svr_side = AXIS2_FALSE;
axis2_bool_t continue_sending = AXIS2_TRUE;
sandesha2_msg_ctx_t *req_rm_msg_ctx = NULL;
axis2_msg_ctx_t *req_msg_ctx = NULL;
axis2_op_ctx_t *op_ctx = NULL;
axis2_char_t *rmd_sequence_id = NULL;
long retrans_interval = 0;
axis2_conf_t *conf = NULL;
const axis2_char_t *mep = NULL;
axis2_relates_to_t *relates_to = NULL;
axis2_svc_t *svc = NULL;
sandesha2_property_bean_t *property_bean = NULL;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[Sandesha2] Entry:sandesha2_app_msg_processor_send_app_msg");
AXIS2_PARAM_CHECK(env->error, internal_sequence_id, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, storage_key, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, storage_mgr, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, seq_prop_mgr, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, sender_mgr, AXIS2_FAILURE);
app_msg_ctx = sandesha2_msg_ctx_get_msg_ctx(rm_msg_ctx, env);
conf_ctx = axis2_msg_ctx_get_conf_ctx(app_msg_ctx, env);
svc = axis2_msg_ctx_get_svc(app_msg_ctx, env);
property_bean = sandesha2_utils_get_property_bean(env, svc);
if(!property_bean)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Property bean is NULL");
return AXIS2_FAILURE;
}
retrans_interval = sandesha2_property_bean_get_retrans_interval(property_bean, env);
relates_to = axis2_msg_ctx_get_relates_to(app_msg_ctx, env);
if(relates_to)
{
sandesha2_seq_property_bean_t *response_relates_to_bean = NULL;
const axis2_char_t *relates_to_value = axis2_relates_to_get_value(relates_to, env);
/* Store the related message id value of the out going applicatoin message. This value
* is used in the terminate sequence message processor at server side to find the
* highest outgoing message id related to the highestest incoming message id.
*/
response_relates_to_bean = sandesha2_seq_property_bean_create_with_data(env,
internal_sequence_id, SANDESHA2_SEQ_PROP_HIGHEST_OUT_RELATES_TO,
(axis2_char_t *) relates_to_value);
if(response_relates_to_bean)
{
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, response_relates_to_bean);
sandesha2_seq_property_bean_free(response_relates_to_bean, env);
}
}
/* Set the last out message number(This messages number). This is used in creating the terminate
* sequence message to include the last message number.
*/
sandesha2_app_msg_processor_set_last_out_msg_no(env, internal_sequence_id, msg_num, seq_prop_mgr);
to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id,
SANDESHA2_SEQ_PROP_TO_EPR);
reply_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, internal_sequence_id,
SANDESHA2_SEQ_PROP_REPLY_TO_EPR);
if (to_bean)
{
to_addr = axutil_strdup(env, sandesha2_seq_property_bean_get_value(to_bean, env));
to_epr = axis2_endpoint_ref_create(env, to_addr);
sandesha2_seq_property_bean_free(to_bean, env);
}
if(reply_to_bean)
{
reply_to_addr = axutil_strdup(env, sandesha2_seq_property_bean_get_value(reply_to_bean, env));
reply_to_epr = axis2_endpoint_ref_create(env, reply_to_addr);
sandesha2_msg_ctx_set_reply_to(rm_msg_ctx, env, reply_to_epr);
sandesha2_seq_property_bean_free(reply_to_bean, env);
}
if(axis2_msg_ctx_get_server_side(app_msg_ctx, env))
{
axis2_endpoint_ref_t *reply_to = NULL;
req_msg = axis2_op_ctx_get_msg_ctx(axis2_msg_ctx_get_op_ctx(app_msg_ctx, env), env,
AXIS2_WSDL_MESSAGE_LABEL_IN);
if(req_msg)
{
reply_to = axis2_msg_ctx_get_reply_to(req_msg, env);
}
if(reply_to)
{
new_to_str = (axis2_char_t*)axis2_endpoint_ref_get_address(reply_to, env);
}
}
if(new_to_str)
{
axis2_endpoint_ref_t *temp_to_epr = NULL;
temp_to_epr = axis2_endpoint_ref_create(env, new_to_str);
if(to_epr)
{
axis2_endpoint_ref_free(to_epr, env);
}
}
else if (to_epr)
{
sandesha2_msg_ctx_set_to(rm_msg_ctx, env, to_epr);
}
rm_version = sandesha2_utils_get_rm_version(env, internal_sequence_id, seq_prop_mgr);
if(!rm_version)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2] Unable to find RM spec version for the rms internal_sequence_id %s",
internal_sequence_id);
if(to_addr)
{
AXIS2_FREE(env->allocator, to_addr);
}
if(reply_to_addr)
{
AXIS2_FREE(env->allocator, reply_to_addr);
}
return AXIS2_FAILURE;
}
rm_ns_val = sandesha2_spec_specific_consts_get_rm_ns_val(env, rm_version);
rm_sequence = sandesha2_seq_create(env, rm_ns_val);
msg_number = sandesha2_msg_number_create(env, rm_ns_val);
sandesha2_msg_number_set_msg_num(msg_number, env, msg_num);
sandesha2_seq_set_msg_num(rm_sequence, env, msg_number);
/* Setting the last message element in the sequence element if this is the last message */
if(axis2_msg_ctx_get_server_side(app_msg_ctx, env))
{
sandesha2_msg_ctx_t *req_rm_msg = NULL;
req_rm_msg = sandesha2_msg_init_init_msg(env, req_msg);
req_seq = sandesha2_msg_ctx_get_sequence(req_rm_msg, env);
if(!req_seq)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Sequence not found");
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_NULL_SEQ, AXIS2_FAILURE);
if(rm_version)
{
AXIS2_FREE(env->allocator, rm_version);
}
if(req_rm_msg)
{
sandesha2_msg_ctx_free(req_rm_msg, env);
}
if(to_addr)
{
AXIS2_FREE(env->allocator, to_addr);
}
if(reply_to_addr)
{
AXIS2_FREE(env->allocator, reply_to_addr);
}
return AXIS2_FAILURE;
}
if(sandesha2_seq_get_last_msg(req_seq, env))
{
last_msg = AXIS2_TRUE;
sandesha2_seq_set_last_msg(rm_sequence, env, sandesha2_last_msg_create(env, rm_ns_val));
}
if(req_rm_msg)
{
sandesha2_msg_ctx_free(req_rm_msg, env);
}
}
else
{
axis2_op_ctx_t *op_ctx = NULL;
axutil_property_t *property = NULL;
op_ctx = axis2_msg_ctx_get_op_ctx(app_msg_ctx, env);
if(op_ctx)
{
property = axis2_msg_ctx_get_property(app_msg_ctx, env, SANDESHA2_CLIENT_LAST_MESSAGE);
if(property)
{
axis2_char_t *value = axutil_property_get_value(property, env);
if(value && !axutil_strcmp(value, AXIS2_VALUE_TRUE))
{
if(sandesha2_spec_specific_consts_is_last_msg_indicator_reqd(env, rm_version))
{
last_msg = AXIS2_TRUE;
sandesha2_seq_set_last_msg(rm_sequence, env, sandesha2_last_msg_create(env,
rm_ns_val));
}
}
}
}
}
op_ctx = axis2_msg_ctx_get_op_ctx(app_msg_ctx, env);
if(op_ctx)
{
req_msg_ctx = axis2_op_ctx_get_msg_ctx(op_ctx, env, AXIS2_WSDL_MESSAGE_LABEL_IN);
if(req_msg_ctx)
{
req_rm_msg_ctx = sandesha2_msg_init_init_msg(env, req_msg_ctx);
req_seq = sandesha2_msg_ctx_get_sequence(req_rm_msg_ctx, env);
}
}
if(req_seq)
{
rmd_sequence_id = sandesha2_identifier_get_identifier(sandesha2_seq_get_identifier(req_seq,
env), env);
}
if(rmd_sequence_id)
{
from_acks_to_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env, rmd_sequence_id,
SANDESHA2_SEQ_PROP_ACKS_TO_EPR);
}
/* Decide if this is the RM 1.0 last message. If it is, store the message number which can be used
* in ack message processor and terminate sequence message processor to know if the RM1.0 last msg
* has arrived.
*/
sandesha2_app_msg_processor_is_last_out_msg(env, app_msg_ctx, rmd_sequence_id,
internal_sequence_id, msg_num, seq_prop_mgr);
if(from_acks_to_bean)
{
axis2_endpoint_ref_t *from_acks_to_epr = NULL;
from_acks_to_addr = axutil_strdup(env, sandesha2_seq_property_bean_get_value(from_acks_to_bean, env));
from_acks_to_epr = axis2_endpoint_ref_create(env, from_acks_to_addr);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "from_acks_to_address:%s", from_acks_to_addr);
if(from_acks_to_epr)
{
axis2_endpoint_ref_free(from_acks_to_epr, env);
}
sandesha2_seq_property_bean_free(from_acks_to_bean, env);
}
/* Store the sender bean for this applicatoin message. This sender bean is used to search and
* retrieve the application message from the storage later.
*/
app_msg_sender_bean = sandesha2_sender_bean_create(env);
sandesha2_sender_bean_set_internal_seq_id(app_msg_sender_bean, env, internal_sequence_id);
sandesha2_sender_bean_set_msg_ctx_ref_key(app_msg_sender_bean, env, storage_key);
millisecs = sandesha2_utils_get_current_time_in_millis(env);
sandesha2_sender_bean_set_time_to_send(app_msg_sender_bean, env, millisecs);
msg_id = sandesha2_msg_ctx_get_msg_id(rm_msg_ctx, env);
sandesha2_sender_bean_set_msg_id(app_msg_sender_bean, env, msg_id);
sandesha2_sender_bean_set_msg_no(app_msg_sender_bean, env, msg_num);
sandesha2_sender_bean_set_msg_type(app_msg_sender_bean, env, SANDESHA2_MSG_TYPE_APPLICATION);
sandesha2_sender_bean_set_send(app_msg_sender_bean, env, AXIS2_TRUE);
sandesha2_sender_mgr_insert(sender_mgr, env, app_msg_sender_bean);
is_svr_side = axis2_msg_ctx_get_server_side(app_msg_ctx, env);
/*
* If server side and anonymous acknowledgment. In other words this is replay mode.
* Note that in this case to_addr is NULL. In duplex mode to_addr cannot be NULL. We send
* the response application message in the back channel.
*/
if(is_svr_side && sandesha2_utils_is_anon_uri(env, from_acks_to_addr) && (!to_addr ||
sandesha2_utils_is_anon_uri(env, to_addr)))
{
sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
axis2_char_t *rms_sequence_id = NULL;
sandesha2_identifier_t *identifier = NULL;
rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
while(!rms_sequence_bean)
{
rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Create sequence response has not yet arrived. So sleep");
AXIS2_SLEEP(1);
}
if(rms_sequence_bean)
{
rms_sequence_id = axutil_strdup(env, sandesha2_seq_property_bean_get_value(rms_sequence_bean,
env));
sandesha2_seq_property_bean_free(rms_sequence_bean, env);
}
identifier = sandesha2_identifier_create(env, rm_ns_val);
sandesha2_identifier_set_identifier(identifier, env, rms_sequence_id);
sandesha2_seq_set_identifier(rm_sequence, env, identifier);
/* Add the sequence element in to the envelope. */
sandesha2_msg_ctx_set_sequence(rm_msg_ctx, env, rm_sequence);
sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
/* TODO add_ack_requested */
/* Add the acknowledgment message into the envelope*/
sandesha2_msg_creator_add_ack_msg(env, rm_msg_ctx, rmd_sequence_id, seq_prop_mgr);
if(req_rm_msg_ctx)
{
sandesha2_msg_ctx_free(req_rm_msg_ctx, env);
}
engine = axis2_engine_create(env, conf_ctx);
status = axis2_engine_resume_send(engine, env, app_msg_ctx);
if(rm_version)
{
AXIS2_FREE(env->allocator, rm_version);
}
if(app_msg_sender_bean)
{
sandesha2_sender_bean_free(app_msg_sender_bean, env);
}
if(engine)
{
axis2_engine_free(engine, env);
}
if(to_addr)
{
AXIS2_FREE(env->allocator, to_addr);
}
if(reply_to_addr)
{
AXIS2_FREE(env->allocator, reply_to_addr);
}
if(from_acks_to_addr)
{
AXIS2_FREE(env->allocator, from_acks_to_addr);
}
if(rms_sequence_id)
{
AXIS2_FREE(env->allocator, rms_sequence_id);
}
return status;
}
if(to_addr)
{
AXIS2_FREE(env->allocator, to_addr);
}
temp_op_ctx = axis2_msg_ctx_get_op_ctx(app_msg_ctx, env);
if(temp_op_ctx)
{
axis2_op_t *op = NULL;
op = axis2_op_ctx_get_op(temp_op_ctx, env);
mep = axis2_op_get_msg_exchange_pattern(op, env);
}
continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, app_msg_sender_bean,
conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr, svc);
if(app_msg_sender_bean)
{
sandesha2_sender_bean_free(app_msg_sender_bean, env);
}
if(!continue_sending)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Do not continue sending the message");
if(rm_version)
{
AXIS2_FREE(env->allocator, rm_version);
}
if(reply_to_addr)
{
AXIS2_FREE(env->allocator, reply_to_addr);
}
if(from_acks_to_addr)
{
AXIS2_FREE(env->allocator, from_acks_to_addr);
}
return AXIS2_FAILURE;
}
axis2_msg_ctx_set_current_handler_index(app_msg_ctx, env,
axis2_msg_ctx_get_current_handler_index(app_msg_ctx, env) + 1);
conf = axis2_conf_ctx_get_conf(conf_ctx, env);
if(!is_svr_side && (!reply_to_addr || sandesha2_utils_is_anon_uri(env, reply_to_addr)))
{
/* Client side and oneway. We do not spawn new threads here but send the application
* message as the same thread as the application client thread. If the first send
* fails then we go into a loop and try resending until timeout or maximum number of times
* exceeded as specified in policy.
*/
axis2_transport_out_desc_t *transport_out = NULL;
axis2_transport_sender_t *transport_sender = NULL;
sandesha2_sender_bean_t *sender_bean = NULL;
sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
axis2_char_t *rms_sequence_id = NULL;
sandesha2_identifier_t *identifier = NULL;
sandesha2_seq_property_bean_t *relates_to_bean = NULL;
rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
/* We will wait until the response for the create sequence message received. */
while(!rms_sequence_bean)
{
rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Create sequence response has not yet arrived. So sleep");
AXIS2_SLEEP(1);
}
if(rms_sequence_bean)
{
rms_sequence_id = axutil_strdup(env, sandesha2_seq_property_bean_get_value(rms_sequence_bean,
env));
sandesha2_seq_property_bean_free(rms_sequence_bean, env);
}
/* Store the outgoing sequence id using the message id of the applicatoin message. This is
* used in send_ack_if_reqd() function to determine the outgoing sequence id. Note that
* this is useful only in the application client side.
*/
relates_to_bean = sandesha2_seq_property_bean_create_with_data(env, msg_id,
SANDESHA2_SEQ_PROP_RELATED_MSG_ID, rms_sequence_id);
if(relates_to_bean)
{
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, relates_to_bean);
sandesha2_seq_property_bean_free(relates_to_bean, env);
}
/* If mep is out-in we need to mark that this is replay mode. This is used in terminate
* manager.
*/
if(!axutil_strcmp(mep, AXIS2_MEP_URI_OUT_IN))
{
sandesha2_seq_property_bean_t *replay_bean = NULL;
replay_bean = sandesha2_seq_property_bean_create_with_data(env, rms_sequence_id,
SANDESHA2_SEQ_PROP_REPLAY, NULL);
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, replay_bean);
if(replay_bean)
{
sandesha2_seq_property_bean_free(replay_bean, env);
}
}
/* Add the acknowledgement element into the soap envelope */
if(!sandesha2_util_is_ack_already_piggybacked(env, rm_msg_ctx))
{
sandesha2_ack_mgr_piggyback_acks_if_present(env, rms_sequence_id, rm_msg_ctx, storage_mgr,
seq_prop_mgr, sender_mgr);
}
identifier = sandesha2_identifier_create(env, rm_ns_val);
sandesha2_identifier_set_identifier(identifier, env, rms_sequence_id);
sandesha2_seq_set_identifier(rm_sequence, env, identifier);
/* Add the sequence element into the soap envelope */
sandesha2_msg_ctx_set_sequence(rm_msg_ctx, env, rm_sequence);
sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
/* TODO add_ack_requested */
engine = axis2_engine_create(env, conf_ctx);
if(axis2_engine_resume_send(engine, env, app_msg_ctx))
{
if(!axis2_msg_ctx_get_server_side(app_msg_ctx, env))
{
status = sandesha2_app_msg_processor_process_app_msg_response(env, app_msg_ctx);
}
}
else
{
AXIS2_LOG_WARNING(env->log, AXIS2_LOG_SI, "[sandesha2] Engine resume send failed");
}
if(engine)
{
axis2_engine_free(engine, env);
}
/* If application client side and single channel, resend is done in the same
* thread as the application client.
*/
sender_bean = sandesha2_sender_mgr_get_application_msg_to_send(sender_mgr, env,
internal_sequence_id, msg_id);
if(!sender_bean)
{
/* There is no pending message to send. */
status = AXIS2_SUCCESS;
}
else
{
transport_out = axis2_msg_ctx_get_transport_out_desc(app_msg_ctx, env);
if(transport_out)
{
transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
}
if(!transport_sender)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2] Transport sender could not be retrieved from transport_out");
status = AXIS2_FAILURE;
}
/* Loop until timeout or exceed specified number of resends */
while(AXIS2_TRUE && transport_sender)
{
continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, sender_bean,
conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr, svc);
sandesha2_sender_mgr_update(sender_mgr, env, sender_bean);
if(sender_bean)
{
sandesha2_sender_bean_free(sender_bean, env);
}
if(!continue_sending)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Do not continue sending the application message");
break;
}
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Sleep before resending application message");
AXIS2_SLEEP(retrans_interval);
/* This is neccessary to avoid a double free */
axis2_msg_ctx_set_property(app_msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, app_msg_ctx))
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2] Transport sender invoke failed in sending application message");
}
if(!axis2_msg_ctx_get_server_side(app_msg_ctx, env))
{
status = sandesha2_app_msg_processor_process_app_msg_response(env, app_msg_ctx);
if(AXIS2_SUCCESS != status)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2] Resend failed for message id %s in sequence %s", msg_id,
internal_sequence_id);
break;
}
}
sender_bean = sandesha2_sender_mgr_get_application_msg_to_send(sender_mgr, env,
internal_sequence_id, msg_id);
if(!sender_bean)
{
/* There is no pending message to send. So exit from the loop. */
break;
}
}
}
if(rm_version)
{
AXIS2_FREE(env->allocator, rm_version);
}
if(reply_to_addr)
{
AXIS2_FREE(env->allocator, reply_to_addr);
}
if(from_acks_to_addr)
{
AXIS2_FREE(env->allocator, from_acks_to_addr);
}
if(rms_sequence_id)
{
AXIS2_FREE(env->allocator, rms_sequence_id);
}
return status;
}
else /* Sending in twoway. This could be in client or server. Sending always happen within a thread.*/
{
/* This is actually a trick that get the msg_ctx traversed through all the out phases.
* Once all the phases are passed it will get hit into the false sandesha2 transport
* sender which just reset the original transport sender back.
*/
axutil_property_t *property = NULL;
axis2_transport_out_desc_t *orig_transport_out = NULL;
axis2_transport_out_desc_t *sandesha2_transport_out = NULL;
orig_transport_out = axis2_msg_ctx_get_transport_out_desc(app_msg_ctx, env);
property = axutil_property_create_with_args(env, 0, 0, 0, orig_transport_out);
axis2_msg_ctx_set_property(app_msg_ctx, env, SANDESHA2_ORIGINAL_TRANSPORT_OUT_DESC,
property);
sandesha2_transport_out = sandesha2_utils_get_transport_out(env);
axis2_msg_ctx_set_transport_out_desc(app_msg_ctx, env, sandesha2_transport_out);
axis2_msg_ctx_increment_ref(app_msg_ctx, env);
engine = axis2_engine_create(env, conf_ctx);
if(!axis2_engine_resume_send(engine, env, app_msg_ctx))
{
AXIS2_LOG_WARNING(env->log, AXIS2_LOG_SI, "[sandesha2] Engine resume send failed");
}
if(engine)
{
axis2_engine_free(engine, env);
}
/* Store the application message context. This ensures that message context is stored before
* trying to write it into the wire at transport. When the sender thread start it retrieve
* the message context from the storage and send it.
*/
sandesha2_storage_mgr_store_msg_ctx(storage_mgr, env, storage_key, app_msg_ctx, AXIS2_TRUE);
/* Start the application message sender. Here we spawn a thread and see whether acknowledgment
* has arrived through the sandesha2_sender_mgr_get_application_msg_to_send() function. If it
* has arrived exit from the thread. Otherwise retry until timeout or number of re-sends
* exceed the value specified in Policy.
*/
status = sandesha2_app_msg_processor_start_application_msg_resender(env, conf_ctx,
internal_sequence_id, msg_id, is_svr_side, retrans_interval, app_msg_ctx, rm_sequence);
}
if(rm_version)
{
AXIS2_FREE(env->allocator, rm_version);
}
if(reply_to_addr)
{
AXIS2_FREE(env->allocator, reply_to_addr);
}
if(from_acks_to_addr)
{
AXIS2_FREE(env->allocator, from_acks_to_addr);
}
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[Sandesha2] Exit:sandesha2_app_msg_processor_send_app_msg");
return status;
}
static axis2_status_t
sandesha2_app_msg_processor_start_application_msg_resender(
const axutil_env_t *env,
axis2_conf_ctx_t *conf_ctx,
axis2_char_t *internal_sequence_id,
axis2_char_t *msg_id,
const axis2_bool_t is_server_side,
int retrans_interval,
axis2_msg_ctx_t *app_msg_ctx,
sandesha2_seq_t *rm_sequence)
{
axutil_thread_t *worker_thread = NULL;
sandesha2_app_msg_processor_args_t *args = NULL;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Entry:sandesha2_app_msg_processor_start_application_msg_resender");
axutil_allocator_switch_to_global_pool(env->allocator);
args = sandesha2_app_msg_processor_args_create((axutil_env_t *) env, conf_ctx, internal_sequence_id,
msg_id, is_server_side, retrans_interval, rm_sequence);
args->env = axutil_init_thread_env(env);
worker_thread = axutil_thread_pool_get_thread(env->thread_pool,
sandesha2_app_msg_processor_application_msg_worker_function, (void*)args);
if(!worker_thread)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2] Thread creation failed for sandesha2_app_msg_processor_start_application_msg_resender");
axutil_allocator_switch_to_local_pool(env->allocator);
return AXIS2_FAILURE;
}
axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
axutil_allocator_switch_to_local_pool(env->allocator);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Exit:sandesha2_app_msg_processor_start_application_msg_resender");
return AXIS2_SUCCESS;
}
static void * AXIS2_THREAD_FUNC
sandesha2_app_msg_processor_application_msg_worker_function(
axutil_thread_t *thd,
void *data)
{
sandesha2_app_msg_processor_args_t *args;
axutil_env_t *env = NULL;
sandesha2_storage_mgr_t *storage_mgr = NULL;
sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
sandesha2_create_seq_mgr_t *create_seq_mgr = NULL;
sandesha2_sender_mgr_t *sender_mgr = NULL;
int retrans_interval = 0;
axis2_char_t *dbname = NULL;
axis2_conf_ctx_t *conf_ctx = NULL;
axis2_char_t *internal_sequence_id = NULL;
axis2_bool_t is_server_side = AXIS2_FALSE;
sandesha2_sender_bean_t *sender_bean = NULL;
axis2_char_t *msg_id = NULL;
axis2_status_t status = AXIS2_FAILURE;
axis2_svc_t *svc = NULL;
axis2_char_t *key = NULL;
axis2_msg_ctx_t *app_msg_ctx = NULL;
sandesha2_seq_property_bean_t *rms_sequence_bean = NULL;
axis2_char_t *rms_sequence_id = NULL;
sandesha2_msg_ctx_t *rm_msg_ctx = NULL;
sandesha2_identifier_t *identifier = NULL;
sandesha2_seq_property_bean_t *relates_to_bean = NULL;
axis2_char_t *rm_version = NULL;
axis2_char_t *rm_ns_val = NULL;
sandesha2_seq_t *rm_sequence = NULL;
args = (sandesha2_app_msg_processor_args_t*) data;
env = args->env;
axutil_allocator_switch_to_global_pool(env->allocator);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Entry:sandesha2_app_msg_processor_application_msg_worker_function");
conf_ctx = args->conf_ctx;
rm_sequence = args->rm_sequence;
msg_id = args->msg_id;
internal_sequence_id = args->internal_sequence_id;
is_server_side = args->is_server_side;
retrans_interval = args->retrans_interval;
dbname = sandesha2_util_get_dbname(env, conf_ctx);
storage_mgr = sandesha2_utils_get_storage_mgr(env, dbname);
seq_prop_mgr = sandesha2_permanent_seq_property_mgr_create(env, dbname);
create_seq_mgr = sandesha2_permanent_create_seq_mgr_create(env, dbname);
sender_mgr = sandesha2_permanent_sender_mgr_create(env, dbname);
sender_bean = sandesha2_sender_mgr_get_application_msg_to_send(sender_mgr, env,
internal_sequence_id, msg_id);
if(!sender_bean)
{
/* There is no pending message to send. So exit from the thread. */
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] There is no pending message to send. So exit from the thread");
sandesha2_app_msg_processor_args_free(args, env);
axutil_allocator_switch_to_local_pool(env->allocator);
axutil_free_thread_env(env);
return NULL;
}
rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_bean, env);
app_msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, key, conf_ctx,
AXIS2_TRUE);
svc = axis2_msg_ctx_get_svc(app_msg_ctx, env);
/* Loop until create sequence response arrive */
while(!rms_sequence_bean)
{
axis2_bool_t continue_sending = AXIS2_TRUE;
continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, sender_bean,
conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr, svc);
sandesha2_sender_mgr_update(sender_mgr, env, sender_bean);
if(!continue_sending)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Do not continue sending the application message");
if(sender_bean)
{
sandesha2_sender_bean_free(sender_bean, env);
}
if(app_msg_ctx)
{
axis2_msg_ctx_free(app_msg_ctx, env);
}
sandesha2_app_msg_processor_args_free(args, env);
axutil_allocator_switch_to_local_pool(env->allocator);
axutil_free_thread_env(env);
return NULL;
}
rms_sequence_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
internal_sequence_id, SANDESHA2_SEQUENCE_PROPERTY_RMS_SEQ_ID);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Create sequence response has not yet arrived. So sleep");
AXIS2_SLEEP(1);
}
if(rms_sequence_bean)
{
rms_sequence_id = axutil_strdup(env, sandesha2_seq_property_bean_get_value(rms_sequence_bean,
env));
sandesha2_seq_property_bean_free(rms_sequence_bean, env);
}
/* Store the outgoing sequence id using the message id of the application message. This is
* used in send_ack_if_reqd() function to determine the outgoing sequence id. Note that
* this is useful only in the application client side.
*/
relates_to_bean = sandesha2_seq_property_bean_create_with_data(env, msg_id,
SANDESHA2_SEQ_PROP_RELATED_MSG_ID, rms_sequence_id);
if(relates_to_bean)
{
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, relates_to_bean);
sandesha2_seq_property_bean_free(relates_to_bean, env);
}
rm_msg_ctx = sandesha2_msg_init_init_msg(env, app_msg_ctx);
rm_version = sandesha2_utils_get_rm_version(env, internal_sequence_id, seq_prop_mgr);
if(!rm_version)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2] Unable to find RM spec version for the rms internal_sequence_id %s",
internal_sequence_id);
sandesha2_app_msg_processor_args_free(args, env);
axutil_allocator_switch_to_local_pool(env->allocator);
axutil_free_thread_env(env);
return NULL;
}
rm_ns_val = sandesha2_spec_specific_consts_get_rm_ns_val(env, rm_version);
identifier = sandesha2_identifier_create(env, rm_ns_val);
sandesha2_identifier_set_identifier(identifier, env, rms_sequence_id);
sandesha2_seq_set_identifier(rm_sequence, env, identifier);
/* Add the sequence element into the soap envelope */
sandesha2_msg_ctx_set_sequence(rm_msg_ctx, env, rm_sequence);
sandesha2_msg_ctx_add_soap_envelope(rm_msg_ctx, env);
/* TODO add_ack_requested */
/* Add the acknowledgement element into soap envelope */
if(!sandesha2_util_is_ack_already_piggybacked(env, rm_msg_ctx))
{
sandesha2_ack_mgr_piggyback_acks_if_present(env, rms_sequence_id, rm_msg_ctx, storage_mgr,
seq_prop_mgr, sender_mgr);
}
sender_bean = sandesha2_sender_mgr_get_application_msg_to_send(sender_mgr, env,
internal_sequence_id, msg_id);
/* Resend the application message until timeout or exceed the maximum number of re-sends as
* specified by Policy.
*/
while(sender_bean)
{
/*key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_bean, env);
app_msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, key, conf_ctx,
AXIS2_TRUE);
if(!app_msg_ctx)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] msg_ctx is not present in the store yet.");
// msg_ctx is still not stored so try again later.
if(sender_bean)
{
sandesha2_sender_bean_free(sender_bean, env);
}
break;
}*/
status = sandesha2_app_msg_processor_resend(env, conf_ctx, msg_id, is_server_side,
internal_sequence_id, storage_mgr, seq_prop_mgr, create_seq_mgr,
sender_mgr, app_msg_ctx);
if(AXIS2_SUCCESS != status)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Resend failed for message id %s in sequence %s", msg_id,
internal_sequence_id);
if(sender_bean)
{
sandesha2_sender_bean_free(sender_bean, env);
}
break;
}
if(sender_bean)
{
sandesha2_sender_bean_free(sender_bean, env);
}
sender_bean = sandesha2_sender_mgr_get_application_msg_to_send(sender_mgr, env,
internal_sequence_id, msg_id);
if(sender_bean)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Sleep before resending application message");
AXIS2_SLEEP(retrans_interval);
}
if(!sender_bean)
{
/* There is no pending message to send. So exit from the thread. */
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] There is no pending message to send. So exit from the thread");
break;
}
}
if(app_msg_ctx)
{
axis2_msg_ctx_free(app_msg_ctx, env);
}
if(rm_msg_ctx)
{
sandesha2_msg_ctx_free(rm_msg_ctx, env);
}
if(rms_sequence_id)
{
AXIS2_FREE(env->allocator, rms_sequence_id);
}
if(storage_mgr)
{
sandesha2_storage_mgr_free(storage_mgr, env);
}
if(create_seq_mgr)
{
sandesha2_create_seq_mgr_free(create_seq_mgr, env);
}
if(sender_mgr)
{
sandesha2_sender_mgr_free(sender_mgr, env);
}
if(seq_prop_mgr)
{
sandesha2_seq_property_mgr_free(seq_prop_mgr, env);
}
sandesha2_app_msg_processor_args_free(args, env);
axutil_allocator_switch_to_local_pool(env->allocator);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Exit:sandesha2_app_msg_processor_application_msg_worker_function");
axutil_free_thread_env(env);
return NULL;
}
/* This function will be called in the duplex mode only from within the application message sender thread. */
static axis2_status_t
sandesha2_app_msg_processor_resend(
const axutil_env_t *env,
axis2_conf_ctx_t *conf_ctx,
axis2_char_t *msg_id,
axis2_bool_t is_svr_side,
const axis2_char_t *internal_sequence_id,
sandesha2_storage_mgr_t *storage_mgr,
sandesha2_seq_property_mgr_t *seq_prop_mgr,
sandesha2_create_seq_mgr_t *create_seq_mgr,
sandesha2_sender_mgr_t *sender_mgr,
axis2_msg_ctx_t *app_msg_ctx)
{
sandesha2_sender_bean_t *sender_worker_bean = NULL;
sandesha2_sender_bean_t *bean1 = NULL;
axis2_bool_t continue_sending = AXIS2_TRUE;
axis2_transport_out_desc_t *transport_out = NULL;
axis2_transport_sender_t *transport_sender = NULL;
axis2_bool_t successfully_sent = AXIS2_FALSE;
axis2_status_t status = AXIS2_SUCCESS;
axis2_bool_t resend = AXIS2_FALSE;
axis2_svc_t *svc = NULL;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Entry:sandesha2_app_msg_processor_resend");
sender_worker_bean = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
if(!sender_worker_bean)
{
AXIS2_LOG_WARNING(env->log, AXIS2_LOG_SI, "[sandesha2] sender_worker_bean is NULL");
return AXIS2_FAILURE;
}
svc = axis2_msg_ctx_get_svc(app_msg_ctx, env);
continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, sender_worker_bean,
conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr, svc);
sandesha2_sender_mgr_update(sender_mgr, env, sender_worker_bean);
if(!continue_sending)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
"[sandesha2] Do not continue sending the application message");
if(sender_worker_bean)
{
sandesha2_sender_bean_free(sender_worker_bean, env);
}
return AXIS2_FAILURE;
}
transport_out = axis2_msg_ctx_get_transport_out_desc(app_msg_ctx, env);
if(transport_out)
{
transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
}
if(transport_sender)
{
/* This is neccessary to avoid a double free at http_sender.c */
axis2_msg_ctx_set_property(app_msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
if(AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, app_msg_ctx))
{
successfully_sent = AXIS2_TRUE;
}else
{
successfully_sent = AXIS2_FALSE;
}
}
msg_id = sandesha2_sender_bean_get_msg_id(sender_worker_bean, env);
bean1 = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
if(bean1)
{
resend = sandesha2_sender_bean_is_resend(sender_worker_bean, env);
if(resend)
{
int sent_count = -1;
sent_count = sandesha2_sender_bean_get_sent_count(sender_worker_bean, env);
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "sent_count:%d", sent_count);
sandesha2_sender_bean_set_sent_count(bean1, env, sent_count);
sandesha2_sender_bean_set_time_to_send(bean1, env,
sandesha2_sender_bean_get_time_to_send(sender_worker_bean, env));
sandesha2_sender_mgr_update(sender_mgr, env, bean1);
}
}
if(sender_worker_bean)
{
sandesha2_sender_bean_free(sender_worker_bean, env);
}
if(successfully_sent)
{
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Application message successfully sent");
}
if(bean1)
{
sandesha2_sender_bean_free(bean1, env);
}
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[sandesha2] Exit:sandesha2_app_msg_processor_resend");
return status;
}
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_process_app_msg_response(
const axutil_env_t *env,
axis2_msg_ctx_t *msg_ctx)
{
axis2_msg_ctx_t *response_msg_ctx = NULL;
axiom_soap_envelope_t *response_envelope = NULL;
axis2_conf_ctx_t *conf_ctx = NULL;
axis2_engine_t *engine = NULL;
axis2_status_t status = AXIS2_FAILURE;
axutil_property_t *property = NULL;
axis2_op_ctx_t *op_ctx = NULL;
const axis2_char_t *mep = NULL;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Entry:sandesha2_app_msg_processor_process_app_msg_response");
conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
response_envelope = axis2_msg_ctx_get_response_soap_envelope(msg_ctx, env);
if(!response_envelope)
{
axis2_char_t *soap_ns_uri = NULL;
soap_ns_uri = axis2_msg_ctx_get_is_soap_11(msg_ctx, env) ?
AXIOM_SOAP11_SOAP_ENVELOPE_NAMESPACE_URI:
AXIOM_SOAP12_SOAP_ENVELOPE_NAMESPACE_URI;
response_envelope = axis2_http_transport_utils_create_soap_msg(env, msg_ctx, soap_ns_uri);
if(!response_envelope)
{
/* There is no response message context. */
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Response envelope not found");
return AXIS2_SUCCESS;
}
}
/* create the response */
response_msg_ctx = axis2_msg_ctx_create(env, conf_ctx, axis2_msg_ctx_get_transport_in_desc(msg_ctx,
env), axis2_msg_ctx_get_transport_out_desc(msg_ctx, env));
if (!response_msg_ctx)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Could not create response message context");
return AXIS2_FAILURE;
}
/* Note that we set here as client side to indicate that we are in the application client side.
* This knowledge is importatnt within app_msg_processor_process_in_msg() function.*/
axis2_msg_ctx_set_server_side(response_msg_ctx, env, AXIS2_FALSE);
axis2_msg_ctx_set_op_ctx(response_msg_ctx, env, axis2_msg_ctx_get_op_ctx(msg_ctx, env));
axis2_msg_ctx_set_conf_ctx(response_msg_ctx, env, conf_ctx);
axis2_msg_ctx_set_svc_grp_ctx(response_msg_ctx, env, axis2_msg_ctx_get_svc_grp_ctx(msg_ctx, env));
axis2_msg_ctx_set_status_code (response_msg_ctx, env, axis2_msg_ctx_get_status_code (msg_ctx, env));
/* To avoid a second passing through incoming handlers at op_client */
property = axis2_msg_ctx_get_property(msg_ctx, env, AXIS2_HANDLER_ALREADY_VISITED);
if(property)
{
axutil_property_set_value(property, env, AXIS2_VALUE_TRUE);
}
else
{
property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_TRUE);
axis2_msg_ctx_set_property(msg_ctx, env, AXIS2_HANDLER_ALREADY_VISITED, property);
}
axis2_msg_ctx_set_soap_envelope(response_msg_ctx, env, response_envelope);
engine = axis2_engine_create(env, conf_ctx);
if (engine)
{
/* Note that this flow does not hit a message receiver because we have set the
* message context to be in client side. Consequently message context will not
* be added to the operation context(which is normally done at msg_recv_receive()
* function).
*/
status = axis2_engine_receive(engine, env, response_msg_ctx);
axis2_engine_free(engine, env);
}
op_ctx = axis2_msg_ctx_get_op_ctx(msg_ctx, env);
if(op_ctx)
{
axis2_op_t *op = NULL;
op = axis2_op_ctx_get_op(op_ctx, env);
mep = axis2_op_get_msg_exchange_pattern(op, env);
}
if(!axutil_strcmp(mep, AXIS2_MEP_URI_OUT_IN))
{
/* Note that as explained above this message context is not added to the operation context,
* therefore will not be freed when operation context's msg_ctx_map is freed. So we need to
* free the response message here. Note that we copied this response soap envelope from the
* outgoing message context from application client. This response envelope will be freed
* at operation client. So to avoid double freeing we increment its ref.
*/
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Increment the soap envelope ref counter");
axiom_soap_envelope_increment_ref(response_envelope, env);
}
axis2_msg_ctx_free(response_msg_ctx, env);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Exit:sandesha2_app_msg_processor_process_app_msg_response");
return status;
}
long AXIS2_CALL
sandesha2_app_msg_processor_get_prev_msg_no(
const axutil_env_t *env,
axis2_char_t *internal_seq_id,
sandesha2_seq_property_mgr_t *seq_prop_mgr)
{
sandesha2_seq_property_bean_t *next_msg_no_bean = NULL;
long next_msg_no = -1;
AXIS2_PARAM_CHECK(env->error, internal_seq_id, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, seq_prop_mgr, AXIS2_FAILURE);
next_msg_no_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr,
env, internal_seq_id, SANDESHA2_SEQ_PROP_NEXT_MESSAGE_NUMBER);
if(next_msg_no_bean)
{
axis2_char_t *str_value = NULL;
str_value = sandesha2_seq_property_bean_get_value(next_msg_no_bean, env);
if(str_value)
{
next_msg_no = atol(str_value);
}
sandesha2_seq_property_bean_free(next_msg_no_bean, env);
}
return next_msg_no;
}
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_set_next_msg_no(
const axutil_env_t *env,
axis2_char_t *internal_seq_id,
long msg_num,
sandesha2_seq_property_mgr_t *seq_prop_mgr)
{
sandesha2_seq_property_bean_t *next_msg_no_bean = NULL;
axis2_bool_t update = AXIS2_TRUE;
axis2_char_t str_long[32];
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2]Entry:sandesha2_app_msg_processor_set_next_msg_no");
AXIS2_PARAM_CHECK(env->error, internal_seq_id, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, seq_prop_mgr, AXIS2_FAILURE);
if(msg_num <= 0)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI,
"[sandesha2] Invalid Message Number");
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_INVALID_MSG_NUM,
AXIS2_FAILURE);
return AXIS2_FAILURE;
}
next_msg_no_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
internal_seq_id, SANDESHA2_SEQ_PROP_NEXT_MESSAGE_NUMBER);
if(!next_msg_no_bean)
{
update = AXIS2_FALSE;
next_msg_no_bean = sandesha2_seq_property_bean_create(env);
sandesha2_seq_property_bean_set_seq_id(next_msg_no_bean, env,
internal_seq_id);
sandesha2_seq_property_bean_set_name(next_msg_no_bean, env,
SANDESHA2_SEQ_PROP_NEXT_MESSAGE_NUMBER);
}
sprintf(str_long, "%ld", msg_num);
sandesha2_seq_property_bean_set_value(next_msg_no_bean, env, str_long);
if(update)
{
sandesha2_seq_property_mgr_update(seq_prop_mgr, env, next_msg_no_bean);
}
else
{
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, next_msg_no_bean);
}
if(next_msg_no_bean)
sandesha2_seq_property_bean_free(next_msg_no_bean, env);
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Exit:sandesha2_app_msg_processor_set_next_msg_no");
return AXIS2_SUCCESS;
}
static axis2_status_t AXIS2_CALL
sandesha2_app_msg_processor_set_last_out_msg_no(
const axutil_env_t *env,
axis2_char_t *internal_seq_id,
long msg_num,
sandesha2_seq_property_mgr_t *seq_prop_mgr)
{
sandesha2_seq_property_bean_t *last_out_msg_no_bean = NULL;
axis2_bool_t update = AXIS2_TRUE;
axis2_char_t str_long[32];
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Entry:sandesha2_app_msg_processor_set_last_out_msg_no");
AXIS2_PARAM_CHECK(env->error, internal_seq_id, AXIS2_FAILURE);
AXIS2_PARAM_CHECK(env->error, seq_prop_mgr, AXIS2_FAILURE);
if(msg_num <= 0)
{
AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] Invalid Message Number");
AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_INVALID_MSG_NUM, AXIS2_FAILURE);
return AXIS2_FAILURE;
}
last_out_msg_no_bean = sandesha2_seq_property_mgr_retrieve(seq_prop_mgr, env,
internal_seq_id, SANDESHA2_SEQ_PROP_LAST_OUT_MESSAGE_NUMBER);
if(!last_out_msg_no_bean)
{
update = AXIS2_FALSE;
last_out_msg_no_bean = sandesha2_seq_property_bean_create(env);
sandesha2_seq_property_bean_set_seq_id(last_out_msg_no_bean, env, internal_seq_id);
sandesha2_seq_property_bean_set_name(last_out_msg_no_bean, env,
SANDESHA2_SEQ_PROP_LAST_OUT_MESSAGE_NUMBER);
}
sprintf(str_long, "%ld", msg_num);
sandesha2_seq_property_bean_set_value(last_out_msg_no_bean, env, str_long);
if(update)
{
sandesha2_seq_property_mgr_update(seq_prop_mgr, env, last_out_msg_no_bean);
}
else
{
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, last_out_msg_no_bean);
}
if(last_out_msg_no_bean)
{
sandesha2_seq_property_bean_free(last_out_msg_no_bean, env);
}
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Exit:sandesha2_app_msg_processor_set_last_out_msg_no");
return AXIS2_SUCCESS;
}
static void AXIS2_CALL
sandesha2_app_msg_processor_is_last_out_msg(
const axutil_env_t *env,
axis2_msg_ctx_t *msg_ctx,
axis2_char_t *rmd_sequence_id,
axis2_char_t *internal_sequence_id,
long msg_num,
sandesha2_seq_property_mgr_t *seq_prop_mgr)
{
axis2_bool_t is_svr_side = AXIS2_FALSE;
axis2_bool_t last_msg = AXIS2_FALSE;
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Entry:sandesha2_app_msg_processor_is_last_out_msg");
is_svr_side = axis2_msg_ctx_get_server_side(msg_ctx, env);
if(is_svr_side)
{
axis2_char_t *last_req_id = NULL;
axis2_char_t *relates_to_value = NULL;
const axis2_relates_to_t *relates_to = NULL;
/* Deciding whether this is the last message. We assume it is, if it
* relates to a message which arrived with the LastMessage flag on it.
*/
last_req_id = sandesha2_utils_get_seq_property(env, rmd_sequence_id,
SANDESHA2_SEQ_PROP_LAST_IN_MESSAGE_ID, seq_prop_mgr);
relates_to = axis2_msg_ctx_get_relates_to(msg_ctx, env);
relates_to_value = (axis2_char_t *)axis2_relates_to_get_value(relates_to, env);
if(relates_to && last_req_id && !axutil_strcmp(last_req_id, relates_to_value))
{
last_msg = AXIS2_TRUE;
}
if(last_req_id)
{
AXIS2_FREE(env->allocator, last_req_id);
}
}
else
{
axutil_property_t *property = NULL;
axis2_char_t *last_app_msg = NULL;
property = axis2_msg_ctx_get_property(msg_ctx, env, SANDESHA2_CLIENT_LAST_MESSAGE);
if(property)
{
last_app_msg = axutil_property_get_value(property, env);
}
if(last_app_msg && !axutil_strcmp(last_app_msg, AXIS2_VALUE_TRUE))
{
axis2_char_t *spec_ver = NULL;
spec_ver = sandesha2_utils_get_rm_version(env, internal_sequence_id, seq_prop_mgr);
if(!spec_ver)
{
axutil_property_t *spec_ver_prop = NULL;
spec_ver_prop = axis2_msg_ctx_get_property(msg_ctx, env,
SANDESHA2_CLIENT_RM_SPEC_VERSION);
spec_ver = axutil_strdup(env, axutil_property_get_value(spec_ver_prop, env));
}
if(sandesha2_spec_specific_consts_is_last_msg_indicator_reqd(env, spec_ver))
{
last_msg = AXIS2_TRUE;
}
if(spec_ver)
{
AXIS2_FREE(env->allocator, spec_ver);
spec_ver = NULL;
}
}
}
if(last_msg)
{
axis2_char_t msg_number_str[32];
sandesha2_seq_property_bean_t *res_last_msg_key_bean = NULL;
AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Last message true");
sprintf(msg_number_str, "%ld", msg_num);
/* Store the message number of the RM 1.0 last message */
res_last_msg_key_bean = sandesha2_seq_property_bean_create_with_data(env,
internal_sequence_id, SANDESHA2_SEQ_PROP_LAST_OUT_MESSAGE_NO, msg_number_str);
if(res_last_msg_key_bean)
{
sandesha2_seq_property_mgr_insert(seq_prop_mgr, env, res_last_msg_key_bean);
sandesha2_seq_property_bean_free(res_last_msg_key_bean, env);
}
}
AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI,
"[sandesha2] Exit:sandesha2_app_msg_processor_is_last_out_msg");
}