blob: 1036669ab219fb287be33ae5f35111e659a7cb7d [file] [log] [blame]
/* $Id$
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* etch_msgizer.c
* messagizer accepts packets and translates them to messages,
* and it accepts messages and translates them to packets.
*/
#include "etch_messagizer.h"
#include "etch_tdformat.h"
#include "etch_thread.h"
#include "etch_objecttypes.h"
#include "etch_log.h"
static const char* LOG_CATEGORY = "etch_messagizer";
// extern types
extern apr_pool_t* g_etch_main_pool;
int destroy_messagizer(void*);
int etch_msgizer_transport_message(void*, void*, void*);
int etch_msgizer_session_packet (void*, void*, void*);
int etch_msgizer_session_control (void*, etch_event*, etch_object*);
int etch_msgizer_session_notify (void*, etch_event*);
etch_object* etch_msgizer_session_query (void*, etch_query*);
int etch_msgizer_transport_control(void*, etch_event*, etch_object*);
int etch_msgizer_transport_notify (void*, etch_event*);
etch_object* etch_msgizer_transport_query (void*, etch_query*);
/* - - - - - - - - - - - - - - -
* etch_messagizer
* - - - - - - - - - - - - - - -
*/
/**
* new_messagizer()
* etch_messagizer public constructor
* @param ixport transport interface owned by caller
* @param uri a URI string owned by caller.
* @param resx a resources map owned by caller.
*/
etch_messagizer* new_messagizer (i_transportpacket* ixp, wchar_t* uri, etch_resources* resx)
{
etch_url* url = new_url(uri);
etch_messagizer* messagizer = new_messagizer_a(ixp, url, resx);
etch_object_destroy(url);
return messagizer;
}
/**
* new_messagizer_a()
* etch_messagizer private constructor
* @param ipacket transport interface owned by caller
* @param uri a URI string owned by caller.
* @param resxmap a resources map owned by caller.
*/
etch_messagizer* new_messagizer_a (i_transportpacket* ixp, etch_url* url, etch_resources* resxmap)
{
etch_status_t status = ETCH_SUCCESS;
tagdata_format_factory* formatfactory = NULL;
etch_messagizer* messagizer = NULL;
i_transport* itransport = NULL;
etch_value_factory* vf = NULL;
etch_string* formatobj = NULL;
i_session* isession = NULL;
etch_mutex* mutex = NULL;
int result = -1;
do
{
formatobj = (etch_string*) etchurl_get_term (url, ETCH_RESXKEY_MSGIZER_FORMAT);
if (NULL == formatobj && NULL != resxmap)
formatobj = (etch_string*) etch_resources_get (resxmap, ETCH_RESXKEY_MSGIZER_FORMAT);
if (NULL == formatobj) {
ETCH_LOGW(LOG_CATEGORY, ETCH_LOG_ERROR, L"URI missing format specifier '%s'\n", ETCH_RESXKEY_MSGIZER_FORMAT);
break;
}
formatfactory = get_format_factory(formatobj->v.valw);
if (NULL == formatfactory) {
ETCH_LOGW(LOG_CATEGORY, ETCH_LOG_ERROR, L"no format '%s' is installed\n",
formatobj->v.valw);
break;
}
vf = resxmap? (etch_value_factory*) etch_resources_get(resxmap,
ETCH_RESXKEY_MSGIZER_VALUFACT): NULL;
if (NULL == vf)
{
ETCH_LOGW(LOG_CATEGORY, ETCH_LOG_ERROR, L"no value factory '%s' is installed\n", ETCH_RESXKEY_MSGIZER_VALUFACT);
break;
}
// TODO: pool
status = etch_mutex_create(&mutex, ETCH_MUTEX_NESTED, NULL);
if(status != ETCH_SUCCESS) {
// error
break;
}
/* - - - - - - - - - - - - - - -
* etch_messagizer
* - - - - - - - - - - - - - - -
*/
messagizer = (etch_messagizer*) new_object(sizeof(etch_messagizer), ETCHTYPEB_MESSAGIZER, CLASSID_MESSAGIZER);
((etch_object*)messagizer)->destroy = destroy_messagizer;
((etch_object*)messagizer)->clone = clone_null;
messagizer->msglock = mutex;
messagizer->msgbuf = new_flexbuffer(ETCH_DEFSIZE); /* 2K default */
/* set our transport to that of next lower layer (packetizer) */
messagizer->transport = ixp; /* i_transportpacket owned by caller */
/* - - - - - - - - - - - - - - -
* i_transportmessage
* - - - - - - - - - - - - - - -
*/
itransport = new_transport_interface_ex (messagizer,
(etch_transport_control) etch_msgizer_transport_control,
(etch_transport_notify) etch_msgizer_transport_notify,
(etch_transport_query) etch_msgizer_transport_query,
etch_msgizer_get_session,
etch_msgizer_set_session);
/* instantiate i_transportmessage interface which messagizer implements */
messagizer->transportmsg = new_transportmsg_interface(messagizer,
etch_msgizer_transport_message,
itransport); /* transportmsg now owns itransport */
/* copy i_transportmessage interface methods up to messagizer */
messagizer->transport_message = etch_msgizer_transport_message;
messagizer->transport_control = itransport->transport_control;
messagizer->transport_notify = itransport->transport_notify;
messagizer->transport_query = itransport->transport_query;
messagizer->get_session = itransport->get_session;
messagizer->set_session = itransport->set_session;
/* - - - - - - - - - - - - - - -
* i_sessionpacket
* - - - - - - - - - - - - - - -
*/
isession = new_session_interface(messagizer,
(etch_session_control) etch_msgizer_session_control,
(etch_session_notify) etch_msgizer_session_notify,
(etch_session_query) etch_msgizer_session_query);
/* instantiate i_sessionpacket interface which messagizer implements */
messagizer->sessionpkt = new_sessionpkt_interface(messagizer,
etch_msgizer_session_packet,
isession); /* transportmsg now owns isession */
/* copy session interface to parent */
messagizer->session_packet = etch_msgizer_session_packet;
messagizer->session_control = isession->session_control;
messagizer->session_notify = isession->session_notify;
messagizer->session_query = isession->session_query;
/* finally set session of next lower layer to our session */
/* fyi we must pass the implementor of transport as thisx, i.e. packetizer */
messagizer->transport->set_session (messagizer->transport->thisx, messagizer->sessionpkt);
/* - - - - - - - - - - - - - - -
* tagged data in out
* - - - - - - - - - - - - - - -
*/
messagizer->tdi = formatfactory->new_tagdata_input(vf);
if (NULL == messagizer->tdi) break;
messagizer->tdo = formatfactory->new_tagdata_output(vf);
if (NULL == messagizer->tdo) break;
result = 0;
} while(0);
if (-1 == result) {
etch_object_destroy(vf);
etch_object_destroy(messagizer);
messagizer = NULL;
}
etch_object_destroy(formatfactory);
return messagizer;
}
/**
* destroy_messagizer()
* destructor for etch_messagizer
*/
int destroy_messagizer (void* data)
{
etch_messagizer* thisx = (etch_messagizer*)data;
if (!is_etchobj_static_content(thisx))
{
etch_object_destroy(thisx->transportmsg);
etch_object_destroy(thisx->sessionpkt);
etch_object_destroy(thisx->tdi);
etch_object_destroy(thisx->tdo);
etch_object_destroy(thisx->msgbuf);
etch_object_destroy(thisx->msglock);
}
return destroy_objectex((etch_object*) thisx);
}
/**
* etch_msgizer_get_transport()
* @return a reference to the messagizer transport interface.
* this object is owned by whatever object created the messagizer.
*/
i_transportpacket* etch_msgizer_get_transport (etch_messagizer* thisx)
{
ETCH_ASSERT(is_etch_messagizer(thisx));
return thisx->transport;
}
/* - - - - - - - - - - - - - - -
* i_transportmessage
* - - - - - - - - - - - - - - -
*/
/**
* etch_msgizer_transport_message()
* i_transportmessage::transport_message override.
* serializes message and delivers its data to transport
* @param whoto recipient - caller retains this memory, can be null.
* @param message the message
* caller relinquishes this memory on success, retains on failure.
* @return 0 success, -1 error.
*/
int etch_msgizer_transport_message (void* data, void* whoData, void* messageData)
{
etch_who* whoto = (etch_who*)whoData;
etch_message* msg = (etch_message*)messageData;
etch_messagizer* thisx = (etch_messagizer*)data;
etch_status_t status = ETCH_SUCCESS;
int result = 0;
ETCH_ASSERT(is_etch_messagizer(thisx));
status = etch_mutex_lock(thisx->msglock);
ETCH_ASSERT(status == ETCH_SUCCESS);
do
{ const int headersize = thisx->transport->get_headersize (thisx->transport);
etch_flexbuf_skip (thisx->msgbuf, headersize, TRUE);
/* serialize the message to the buffer */
result = ((struct i_tagged_data_output*)
(((etch_object*)thisx->tdo)->vtab))
->write_message (thisx->tdo, msg, thisx->msgbuf);
if (-1 == result) {
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_ERROR,"serialization of message to buffer failed");
etch_flexbuf_reset(thisx->msgbuf);
break;
}
etch_flexbuf_set_index (thisx->msgbuf, 0);
/* deliver packet buffer to transport */ /* msgizer->itp->pktizer */
result = thisx->transport->transport_packet (thisx->transport->thisx, whoto, thisx->msgbuf);
if (0 == result) {
etch_object_destroy(msg);
}
} while(0);
status = etch_mutex_unlock(thisx->msglock);
ETCH_ASSERT(status == ETCH_SUCCESS);
return result;
}
/**
* etch_msgizer_get_session()
* @return a reference to the messagizer i_sessionmessage interface.
* this object is owned by whatever object set the messagizer session.
*/
i_session* etch_msgizer_get_session (void* data)
{
etch_messagizer* thisx = (etch_messagizer*)data;
ETCH_ASSERT(is_etch_messagizer(thisx));
return (i_session*)thisx->session;
}
/**
* etch_msgizer_set_session()
* @param session an i_sessionmessage reference. caller owns this object.
*/
void etch_msgizer_set_session (void* data, void* sessionData)
{
i_session* session = (i_session*)sessionData;
etch_messagizer* thisx = (etch_messagizer*)data;
ETCH_ASSERT(is_etch_messagizer(thisx));
ETCH_ASSERT(is_etch_sessionmsg(session));
thisx->session = (i_sessionmessage*)session;
}
/**
* etch_msgizer_transport_control()
* i_transportmessage::transport_control override.
* @param control event, caller relinquishes.
* @param value control value, caller relinquishes.
*/
int etch_msgizer_transport_control (void* data, etch_event* control, etch_object* value)
{
etch_messagizer* thisx = (etch_messagizer*)data;
ETCH_ASSERT(is_etch_messagizer(thisx));
/* mzr itp mzr itp packetizer */
return thisx->transport->transport_control (thisx->transport->thisx, control, value);
}
/**
* etch_msgizer_transport_notify()
* i_transportmessage::transport_notify override.
* @param evt, caller relinquishes.
*/
int etch_msgizer_transport_notify (void* data, etch_event* evt)
{
etch_messagizer* thisx = (etch_messagizer*)data;
ETCH_ASSERT(is_etch_messagizer(thisx));
return thisx->transport->transport_notify (thisx->transport->thisx, evt);
}
/**
* etch_msgizer_transport_query()
* i_transportmessage::transport_query override.
* @param query, caller relinquishes.
*/
etch_object* etch_msgizer_transport_query (void* data, etch_query* query)
{
etch_messagizer* thisx = (etch_messagizer*)data;
ETCH_ASSERT(is_etch_messagizer(thisx));
return thisx->transport->transport_query (thisx->transport->thisx, query);
}
/* - - - - - - - - - - - - - - -
* i_sessionpacket
* - - - - - - - - - - - - - - -
*/
/**
* etch_msgizer_session_packet()
* i_sessionpacket::session_packet override.
* delivers data to the session from the transport.
* @param from from who sent the packet.
* caller retains memory for this object, can be null.
* @param buffer the packet from the packet source.
* caller retains memory for this object.
* @return 0 success, -1 error (exception condition)
*/
int etch_msgizer_session_packet (void* data, void* whoData, void* bufferData)
{
etch_who* whofrom = (etch_who*)whoData;
etch_flexbuffer* fbuf = (etch_flexbuffer*)bufferData;
etch_messagizer* thisx = (etch_messagizer*)data;
etch_message* msg = NULL;
int is_message_handled = FALSE;
ETCH_ASSERT(is_etch_messagizer(thisx));
/* create an etch message from the packetized data */
msg = ((struct i_tagged_data_input*)((etch_object*)thisx->tdi)->vtab)->read_message (thisx->tdi, fbuf);
/* send the new message up via session.
* memory management rules are: if session_message() handles the message,
* it owns msg memory. otherwise (if not handled), msg memory is owned by
* the unwanted_message wrapper created here, which itself is owned by the
* session_notify() destination.
*/
is_message_handled = (0 == thisx->session->session_message (thisx->session->thisx, whofrom, msg));
/* if the message was not handled, e.g. the message is an exception returned
* from a one-way message and there was therefore no mailbox to receive it,
* forward the message to the session via mailbox manager.
*/
if (!is_message_handled) {
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "unable to post message to a mailbox\n");
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "deferring '%s' to session\n", message_aname(msg));
thisx->session->session_notify(thisx->session->thisx, (etch_event*)new_unwanted_message(whofrom, msg));
//etch_object_destroy(msg);
return -1;
}
/* regardless we have relinquished msg at this point. it is now either queued up
* in a mailbox, or forwarded in the unwanted message wrapper above. */
return 0;
}
/**
* etch_msgizer_session_control()
* i_sessionpacket::session_control override.
* @param control event, caller relinquishes.
* @param value control value, caller relinquishes.
*/
int etch_msgizer_session_control (void* data, etch_event* control, etch_object* value)
{
etch_messagizer* thisx = (etch_messagizer*)data;
ETCH_ASSERT(is_etch_messagizer(thisx)); /* ism mbmgr */
return thisx->session->session_control (thisx->session->thisx, control, value);
}
/**
* etch_msgizer_session_notify()
* i_sessionpacket::session_notify override.
* @param event, caller relinquishes.
*/
int etch_msgizer_session_notify (void* data, etch_event* evt)
{
etch_messagizer* thisx = (etch_messagizer*)data;
ETCH_ASSERT(is_etch_messagizer(thisx)); /* ism mbmgr */
return thisx->session->session_notify (thisx->session->thisx, evt);
}
/**
* etch_msgizer_session_query()
* i_sessionpacket::session_query override.
* @param query, caller relinquishes.
*/
etch_object* etch_msgizer_session_query (void* data, etch_query* query)
{
etch_messagizer* thisx = (etch_messagizer*)data;
ETCH_ASSERT(is_etch_messagizer(thisx)); /* ism mbmgr */
return thisx->session->session_query (thisx->session->thisx, query);
}