blob: f28bb47a9f98e73438ff6b8a28c8d9c24e950732 [file] [log] [blame]
/* $Id$
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* etch_transport.c
* common transport functionality
*/
#include "etch.h"
#include "etch_runtime.h"
#include "etch_plain_mailbox_manager.h"
#include "etch_svcobj_masks.h"
#include "etch_plain_mailbox.h"
#include "etch_transport.h"
#include "etch_packetizer.h"
#include "etch_messagizer.h"
#include "etch_tcp_server.h"
#include "etch_tcp_connection.h"
#include "etch_exception.h"
#include "etch_objecttypes.h"
#include "etch_log.h"
#include "etch_mem.h"
static const char* LOG_CATEGORY = "etch_transport";
// extern types
extern apr_pool_t* g_etch_main_pool;
typedef etch_plainmailboxmgr etch_mailbox_manager;
int tcpdelsvc_init (etch_tcp_delivery_service*);
int destroy_delivery_service_interface (void*);
int destroy_delivery_service_via_interface(void*);
int destroy_tcp_delivery_service(void*);
int tcpdelsvc_begincall(i_delivery_service*, etch_message*, i_mailbox**);
int tcpdelsvc_endcall (i_delivery_service*, i_mailbox*, etch_type*, etch_object**);
int tcpdelsvc_session_message (void*, etch_who*, etch_message*);
int tcpdelsvc_session_control (void*, etch_event*, etch_object*);
int tcpdelsvc_session_notify (void*, etch_event*);
etch_object* tcpdelsvc_session_query (void*, etch_query*);
i_session* tcpdelsvc_get_session(void*);
etch_object* tcpdelsvc_transport_query (void*, etch_query*);
int tcpdelsvc_transport_control(void*, etch_event*, etch_object*);
int tcpdelsvc_transport_notify (void*, etch_event*);
/* - - - - - - - - - - - - - - - - - - - - - -
* delivery service
* - - - - - - - - - - - - - - - - - - - - - -
*/
/*
* new_etch_transport()
* etch_delivery_service constructor.
* @remarks this is the transport factory, implemented via a switch.
* @param uri a uri string, caller relinquishes.
* @param params a transport parameter bundle, caller retains.
* @param conximpl optional pre-existing connection implementation, such as tcp_connection*.
* @return a delivery service interface. note that invoking the destructor on this interface
* destroys the delivery service implementation, as well as the interface.
*/
i_delivery_service* new_etch_transport (wchar_t* uri, etch_factory_params* params, void* conximpl)
{
etch_url* url = new_url(uri);
i_delivery_service* newds = new_etch_transport_a (url, params, conximpl);
etch_object_destroy(url);
return newds;
}
/*
* new_etch_transport_a()
* etch_delivery_service constructor.
* @remarks this is the transport factory, implemented via a switch.
* @param url an etch_url, caller retains.
* @param resources a resources map, caller retains.
* @return a delivery service interface. invoking the destructor on this interface
* destroys the delivery service implementation, as well as the interface.
*/
i_delivery_service* new_etch_transport_a (etch_url* url, etch_factory_params* params, void* conximpl)
{
i_delivery_service* newds = NULL;
if (is_url_scheme_udp(url))
{
/* not yet implemented */
}
#if(0)
else /* handlers for other url schemes follow here eventually */
if (is_url_scheme_foo(url))
{
/* ... */
}
#endif
else
{ /* url schemes http, tcp, default */
etch_tcp_delivery_service* tcpds = new_tcp_delivery_service (url, params, conximpl);
if (tcpds)
{ newds = tcpds->ids;
newds->thisx = tcpds;
}
}
return newds;
}
/*
* new_delivery_service()
*/
etch_object* new_delivery_service (const int objsize, const unsigned short class_id)
{
return new_object (objsize, ETCHTYPEB_DELIVERYSVC_IMPL, class_id);
}
/*
* new_tcp_delivery_service()
* etch_tcp_delivery_service constructor
* replaces java TcpTransportFactory.newTransport
* @param params server parameter bundle, caller retains.
* &param tcpx if present, the already accepted client connection.
* if present, caller retains.
*/
etch_tcp_delivery_service* new_tcp_delivery_service (etch_url* url,
etch_factory_params* params, etch_tcp_connection* tcpconx)
{
etch_resources* resources = NULL;
etch_packetizer* packetizer = NULL;
etch_messagizer* messagizer = NULL;
etch_mailbox_manager* mboxmgr = NULL;
etch_tcp_delivery_service* delsvc = NULL;
const int is_tcpconx_owned = tcpconx == NULL;
ETCH_ASSERT(params && params->in_resx);
resources = params->in_resx;
do
{ /* as each next higher layer of the delivery service is instantiated, it
* is passed passed a transport interface to the previously-instantiated
* layer. in each such case, note that the new layer does not own memory
* for the passed transport interface.
*/
if (NULL == tcpconx)
tcpconx = new_tcp_connection (url, params->in_resx, NULL);
ETCH_ASSERT(tcpconx);
if (0 != init_etch_tcpconx_interfaces (tcpconx)) break;
packetizer = new_packetizer_a (tcpconx->itd, url, resources);
if (NULL == packetizer) break;
messagizer = new_messagizer_a (packetizer->transportpkt, url, resources);
if (NULL == messagizer) break;
mboxmgr = new_plain_mailbox_manager (messagizer->transportmsg,
url->raw, resources, params->mblock);
if (NULL == mboxmgr) break;
delsvc = (etch_tcp_delivery_service*) new_delivery_service
(sizeof(etch_tcp_delivery_service), CLASSID_TCP_DELIVERYSVC);
((etch_object*)delsvc)->destroy = destroy_tcp_delivery_service;
/* set our transport to that of the next lower layer (mailbox manager) */
delsvc->transport = mboxmgr->transportmsg;
delsvc->transportx = mboxmgr->imanager; /* todo can we lose this ref? */
delsvc->mailboxmgr = mboxmgr;
delsvc->tcpconx = tcpconx;
delsvc->wait_up = tcpconx->cx.wait_up; /* connection up/down monitor */
delsvc->wait_down = tcpconx->cx.wait_down; /* connection up/down monitor */
delsvc->rwlock = params->mblock; /* not owned */
delsvc->packetizer = packetizer; /* todo can we lose these refs */
delsvc->messagizer = messagizer;
delsvc->resources = resources;
delsvc->is_tcpconx_owned = is_tcpconx_owned;
tcpdelsvc_init (delsvc); /* initialize the delivery service interface */
} while(0);
if (NULL == delsvc)
{
etch_object_destroy(tcpconx);
tcpconx = NULL;
etch_object_destroy(packetizer);
packetizer = NULL;
etch_object_destroy(messagizer);
messagizer = NULL;
etch_object_destroy(mboxmgr);
mboxmgr = NULL;
}
return delsvc;
}
/**
* tcpdelsvc_set_session()
* @param session the i_sessionmessage interface. caller retains ownership.
* this is generally called from the stub constructor.
*/
void tcpdelsvc_set_session (void* data, void* sessionData)
{
i_delivery_service* ids = (i_delivery_service*)data;
i_sessionmessage* session = (i_sessionmessage*)sessionData;
etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
ETCH_ASSERT(is_etch_sessionmsg(session));
/* set delivery service session to be the passed (stub's) session */
tcpds->session = tcpds->ids->ism = session;
}
/**
* tcpdelsvc_transport_message()
* @param whoto recipient - caller retains this memory, can be null.
* @param message the message
* caller relinquishes this memory on success, retains on failure.
* @return 0 success, -1 error.
*/
int tcpdelsvc_transport_message (void* data, void* whoData, void* messageData)
{
i_delivery_service* ids = (i_delivery_service*)data;
etch_who* whoto = (etch_who*)whoData;
etch_message* msg = (etch_message*)messageData;
etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
i_transportmessage* dstransport = tcpds->transport;
ETCH_ASSERT(is_etch_transportmsg(dstransport));
return dstransport->transport_message (dstransport->thisx, whoto, msg);
}
/**
* tcpdelsvc_init()
* initialize delivery service interface
*/
int tcpdelsvc_init (etch_tcp_delivery_service* delsvc)
{
i_session* isession = NULL;
i_transport* itransport = NULL;
i_delivery_service* ids = new_delivery_service_interface(NULL, NULL);
delsvc->ids = ids;
/* external transport and session */
ids->transport = delsvc->transport;
ids->session = delsvc->session;
ids->begin_call = delsvc->begin_call = (etch_delivsvc_begincall)tcpdelsvc_begincall;
ids->end_call = delsvc->end_call = (etch_delvisvc_endcall)tcpdelsvc_endcall;
/* - - - - - - - - - - - - - - -
* i_transportmessage (native)
* - - - - - - - - - - - - - - -
*/
itransport = new_transport_interface (ids,
tcpdelsvc_transport_control,
tcpdelsvc_transport_notify,
tcpdelsvc_transport_query);
delsvc->transportmsg = new_transportmsg_interface (ids,
tcpdelsvc_transport_message,
itransport); /* transportmsg now owns itransport */
delsvc->transportmsg->set_session = tcpdelsvc_set_session;
delsvc->transportmsg->get_session = tcpdelsvc_get_session;
/* copy native transport back to interface */
ids->itm = delsvc->transportmsg;
/* copy i_transportmsg interface methods up to this object */
delsvc->transport_message = delsvc->transportmsg->transport_message;
delsvc->transport_control = itransport->transport_control;
delsvc->transport_notify = itransport->transport_notify;
delsvc->transport_query = itransport->transport_query;
delsvc->set_session = itransport->set_session;
delsvc->get_session = itransport->get_session;
/* - - - - - - - - - - - - - - -
* i_sessionmessage (native)
* - - - - - - - - - - - - - - -
*/
isession = new_session_interface (ids,
tcpdelsvc_session_control,
tcpdelsvc_session_notify,
tcpdelsvc_session_query);
delsvc->sessionmsg = new_sessionmsg_interface (ids,
tcpdelsvc_session_message,
isession); /* sessionmsg now owns isession */
/* copy native session back to interface */
ids->ism = delsvc->sessionmsg;
/* copy i_sessionmessage interface methods up to this object */
delsvc->session_message = delsvc->sessionmsg->session_message;
delsvc->session_control = isession->session_control;
delsvc->session_notify = isession->session_notify;
delsvc->session_query = isession->session_query;
/* finally set session of next lower layer (messagizer) to our session */
delsvc->transport->set_session (delsvc->transport->thisx, delsvc->sessionmsg);
return 0;
}
/**
* new_delivery_service_interface()
* delivery service interface constructor
*/
i_delivery_service* new_delivery_service_interface
(i_sessionmessage* isessionmsg, i_transportmessage* itransportmsg)
{
i_delivery_service* ids = (i_delivery_service*) new_object(sizeof(i_delivery_service),
ETCHTYPEB_DELIVERYSVCINT, CLASSID_DELIVERYSVC);
/* this destructor destroys the parent implementation,
* which in turn destroys this interface object */
((etch_object*)ids)->destroy = destroy_delivery_service_via_interface;
ids->ism = isessionmsg;
ids->itm = itransportmsg;
return ids;
}
/**
* destroy_delivery_service_stub()
* invoked by i_deliveryservice destructor to destroy the stub via the session
* shared by the stub and delivery service, and finally destroy the session.
*/
int destroy_delivery_service_stub (void * data)
{
i_delivery_service* ids = (i_delivery_service*)data;
/* 1. the stub constructor created a session interface and set the
* delivery service's session to that session, and set the delivery
* service's is_session_owned true.
* 2. the stub and delivery service therefore share the session interface.
* 3. the delivery service implementation object (e.g. tcp delivery service)
* also references that session, but of course does not own it.
* 4. the session interface's thisx pointer is the stub object.
* 5. the delivery service owns the stub by contract.
* the delivery service implementation object's destructor invokes the
* delivery service interface destructor to destroy the session and stub.
* 6. the delivery service interface destructor destroys the stub by invoking
* this method, which finds the stub via the thisx of the shared session.
*/
xxxx_either_stub* stubobj = NULL;
if (!ids->session || !ids->is_session_owned) return -1;
stubobj = ids->session->thisx; /* stub is the session interface's thisx */
if (is_etch_stub(stubobj))
((etch_object*)stubobj)->destroy(stubobj);
else /* is there a use case for this */
{ assert(is_etch_sessionmsg(ids->session));
etch_object_destroy(ids->session);
}
ids->session = NULL;
return 0;
}
/**
* destroy_delivery_service_interface()
* i_delivery_service destructor 1.
*/
int destroy_delivery_service_interface(void* data)
{
i_delivery_service* ids = (i_delivery_service*)data;
if (NULL == ids) return -1;
if (!is_etchobj_static_content(ids))
{
/* stub owns the ism, ds owns the stub: destroy stub and session */
destroy_delivery_service_stub(ids);
/* stub does not own the itm: itm is that of the mailbox manager */
if (ids->itm && ids->is_transport_owned) {
etch_object_destroy(ids->itm);
ids->itm = NULL;
}
}
return destroy_objectex((etch_object*)ids);
}
/*
* destroy_delivery_service_via_interface()
* i_delivery_service destructor 2.
* this destructor will destroy the parent delivery service implementation, whose
* destructor will in turn destroy this interface object. this permits the object
* creating the delivery service to hold a reference to the interface only, and to
* destroy the implementation and interface by invoking the interface's destructor.
*/
int destroy_delivery_service_via_interface(void* data)
{
i_delivery_service* ids = (i_delivery_service*)data;
etch_object* deliveryservice_implobj = ids? ids->thisx: NULL;
if (deliveryservice_implobj) /* etch_tcp_delivery_service, e.g. */
deliveryservice_implobj->destroy (deliveryservice_implobj);
return 0;
}
/**
* destroy_tcp_delivery_service()
* etch_tcp_delivery_service destructor
*/
int destroy_tcp_delivery_service (void* data)
{
etch_tcp_delivery_service* thisx = (etch_tcp_delivery_service*)data;
const char* thistext = "delsvc dtor";
if (NULL == thisx) return -1;
if (!is_etchobj_static_content(thisx))
{
/* ensure any threads referencing mailboxes (see mailbox.message())
* have run to completion before we start tearing it down. */
etchmbox_get_readlockex (thisx->rwlock, thistext);
etchmbox_release_readlockex (thisx->rwlock, thistext);
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "destroying packetizer ...\n");
etch_object_destroy(((etch_packetizer*)thisx->packetizer));
thisx->packetizer = NULL;
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "destroying messagizer ...\n");
etch_object_destroy(((etch_messagizer*)thisx->messagizer));
thisx->messagizer = NULL;
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "destroying mailbox manager ...\n");
etch_object_destroy(((etch_mailbox_manager*)thisx->mailboxmgr));
thisx->mailboxmgr = NULL;
/* on server side, listen thread destroys tcpconx on exit.
* on client side, tcpconx is destroyed here. */
if (thisx->is_tcpconx_owned){
etch_object_destroy(thisx->tcpconx);
thisx->tcpconx = NULL;
}
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "destroying delivery interface ...\n");
destroy_delivery_service_interface(thisx->ids);
etch_object_destroy(thisx->sessionmsg);
thisx->sessionmsg = NULL;
etch_object_destroy(thisx->transportmsg);
thisx->transportmsg = NULL;
}
return destroy_objectex((etch_object*)thisx);
}
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* i_deliveryservice (i_sessionmessage, i_transportmessage)
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
/**
* get_etch_ds_impl()
* convenience method to verify i_delivery_service, and from it,
* get, verify, and return the delivery service implementation object.
*/
etch_tcp_delivery_service* get_etch_ds_impl (i_delivery_service* ids)
{
etch_tcp_delivery_service* tcpds = NULL;
ETCH_ASSERT(is_etch_ideliverysvc(ids));
tcpds = ids->thisx;
ETCH_ASSERT(is_etch_deliverysvc(tcpds));
return tcpds;
}
/**
* tcpdelsvc_begincall()
* i_deliveryservice :: begincall
* @param msg caller relinquishes on success, retains on failure
* @param out mailbox interface returned on success
* @return 0 success, or -1 failure. new mailbox return in out parameter.
*/
int tcpdelsvc_begincall (i_delivery_service* ids, etch_message* msg, i_mailbox** out)
{
int result = 0;
etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
i_transportmessage* dstransport = tcpds->transport;
ETCH_ASSERT(is_etch_transportmsg(dstransport));
/* transport is mailbox mgr pmboxmgr_transport_call(imbmgr) */
result = tcpds->transportx->transport_call (tcpds->transportx, NULL, msg, out);
return result;
}
/**
* tcpdelsvc_endcall()
* read the response message, close its mailbox and return the result object.
* @param mbox the current mailbox (interface), caller retains.
* @param response_type type of the response message, caller retains.
* @param out pointer to caller's location to receive the message response object.
* @remarks if some exception condition occurred, the result object will not be an
* object of the expected result type, but rather will be an etch_mailbox_element
* object containing an exception. therefore, the result object should be tested
* with is_exception(resultobj) prior to expecting it to be of the expected type.
* @return 0 success, -1 failure. a result object is returned via out parameter.
* the result object is the expected object type of the service function result,
* or if a response could not be read, the etch_mailbox_element object wrapping
* both the reply message object, and an exception object detailing the problem.
* for example, if the service message is etch_int32* add(etch_int32*, etch_int32*),
* the result object will be an etch_int32 unless an exception occurred.
*/
int tcpdelsvc_endcall (i_delivery_service* ids, i_mailbox* ibox, etch_type* response_type, etch_object** out)
{
int result = 0;
int timeout = 0;
etch_config_t* config = NULL;
int32 default_timeout = 0;
etch_object* result_obj = NULL;
etch_mailbox_element* mbe = NULL;
const char* thistext = "tcpdelsvc_endcall";
/* get the response message type's instance data */
etch_type_impl* typeinfo = response_type? (etch_type_impl*) response_type->impl: NULL;
ETCH_ASSERT(typeinfo && out);
etch_runtime_get_config(&config);
ETCH_ASSERT(config);
/* we do not default to wait forever in order that we can fail gracefully rather
* than hang the mailbox read. this behavior is essential for comprehensive testing.
* if a type specifies a timeout of zero, we do not interpret this as ETCH_INFWAIT,
* but rather we substitute a default timeout, which is configurable.
* if a message type actually requires an abnormally lengthy wait, it will specify
* some presumably large number of milliseconds as its timeout value. similarly,
* if we actually want to wait "forever" by default, we can configure a very large
* timeout value which is effectively "forever".
*/
etch_config_get_property_int(config, "etch.mailbox.timeout.read", &default_timeout);
//TODO: no timeout value
timeout = typeinfo->timeout > 0 ? typeinfo->timeout : default_timeout;
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_XDEBUG, "reading mailbox (wait %d) ...\n", timeout);
/* read reply message from mailbox, waiting for message to arrive if necessary */
result = ibox->read_withwait(ibox, timeout, &mbe);
if(mbe == NULL) {
result = 0;
*out = (etch_object*) new_etch_exception_from_errorcode(ETCH_ERROR);
}
/* mailbox read timed out or otherwise failed */
else if (is_etch_exception(mbe))
{ /* here we are returning an object of a type not expected by the service.
* this method's contract will specify that the app must test the returned
* object for an exception prior to "casting" it to the expected result
* object type. another possibility is that we could have default
* constructors for each response type.
*/
*out = (etch_object*) mbe;
}
/* find the result object expected in the reply message */
else if (NULL == (result_obj = message_get (mbe->msg, typeinfo->response_field)))
{
etch_object_destroy(mbe);
*out = NULL;
result = 0;
}
else
{ /* we found the reply message result object. return this result object,
* and destroy the message wrapper and the reply message along with it.
* note that we must be careful destroying the message, since the message
* result object, that we intend to return to the application, is part of
* the message and thus destroyed with the message unless steps are taken
* to protect it. we could clone the result and return the clone, but
* that would presuppose that the object is cloneable, so instead we will
* protect the object, destroy the message, and finally unprotect the object.
*/
set_etchobj_static_all(result_obj); /* protect result object */
etch_object_destroy(mbe); /* destroy message and wrapper */
clear_etchobj_static_all(result_obj); /* unprotect result object */
*out = result_obj; /* return result object */
// ETCH_LOG(LOG_CATEGORY, ETCH_LOG_XDEBUG, "mailbox result object type %d class %d\n",
// ((etch_object*)result_obj)->obj_type, ((etch_object*)result_obj)->class_id);
result = 0;
}
/* acquire mailbox read write lock to switch context back to the receive thread,
* which will continue at the return from the queue.put in mailbox.message().
* we have passed in a dedicated mutex in start_xxxx_client, intended to be used
* by client main thread and receive thread as a read/write lock. there should
* be a cleaner way to sync mailbox reads and writes from within the mailbox
* itself, we need to investigate. ideally we would sync all of mailbox.message()
* using its queue waiter's mutex; however at present all such waiter mutexes are
* non-recursive, and so we would need to do extensive re-testing if we were to
* change waiter mutexes to be nestable, assuming they would work at all that way.
* we should make this change asap however, since the way it is now, we don't wait
* for the mailbox write to do its fire_notify() before we read the mailbox.
* however the following lock at least ensures the fire_notify() happens prior to
* closing the mailbox, which is the overriding consideration.
*/
etchmbox_get_readlock (ibox->thisx, thistext);
etchmbox_release_readlock (ibox->thisx, thistext);
/* we're now done with the the mailbox so close it. */
ibox->close_read (ibox);
return result;
}
/* - - - - - - - - - - - - - - - - - - - - - - - - -
* i_deliveryservice :: i_sessionmessage (i_session)
* - - - - - - - - - - - - - - - - - - - - - - - - -
*/
/* this is the delivery service interface implementation of i_sessionmessage,
* distinct from the transport.session's implementation of i_sessionmessage
* which is implemented externally and set via set_session().
*/
/**
* tcpdelsvc_session_message()
* @param whofrom caller retains, can be null.
* @param msg caller relinquishes
* @return 0 (message handled), or -1 (error, closed, or timeout)
*/
int tcpdelsvc_session_message (void* data, etch_who* whofrom, etch_message* msg)
{
i_delivery_service* ids = (i_delivery_service*)data;
etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
i_sessionmessage* dssession = tcpds->session;
ETCH_ASSERT(is_etch_sessionmsg(dssession));
return dssession->session_message(dssession->thisx, whofrom, msg);
}
/**
* tcpdelsvc_session_control()
* delivery service interface implementation of i_session_message
* @param control event, caller relinquishes.
* @param value control value, caller relinquishes.
*/
int tcpdelsvc_session_control (void* data, etch_event* control, etch_object* value)
{
i_delivery_service* ids = (i_delivery_service*)data;
etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
i_sessionmessage* dssession = tcpds->session;
ETCH_ASSERT(is_etch_sessionmsg(dssession));
return dssession->session_control(dssession->thisx, control, value);
}
/**
* etch_tcpdelsvc_session_notify()
* @param evt event, caller relinquishes.
*/
int tcpdelsvc_session_notify (void* data, etch_event* evt)
{
i_delivery_service* ids = (i_delivery_service*)data;
int result = -1, evtype = evt? evt->value: 0;
etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
i_sessionmessage* dssession = tcpds->session;
ETCH_ASSERT(is_etch_sessionmsg(dssession));
switch(evtype)
{
case ETCHEVT_SESSION_UP:
etch_wait_set(tcpds->wait_up, evtype);
break;
case ETCHEVT_SESSION_DOWN:
etch_wait_set(tcpds->wait_down, evtype);
break;
}
result = dssession->session_notify (dssession->thisx, evt);
return result;
}
/**
* etch_tcpdelsvc_session_query()
* @param query, caller relinquishes.
*/
etch_object* tcpdelsvc_session_query (void* data, etch_query* query)
{
i_delivery_service* ids = (i_delivery_service*)data;
etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
i_sessionmessage* dssession = tcpds->session;
ETCH_ASSERT(is_etch_sessionmsg(dssession));
return dssession->session_query (dssession->thisx, query);
}
/* - - - - - - - - - - - - - - - - - - - - - - - - - - -
* i_deliveryservice :: i_transportmessage (i_transport)
* - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
/**
* tcpdelsvc_get_session()
* @return a reference to the delivery service i_sessionmessage interface.
* caller does not own this object.
*/
i_session* tcpdelsvc_get_session (void* data)
{
i_delivery_service* ids = (i_delivery_service*)data;
etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
return (i_session*)tcpds->session;
}
/**
* tcpdelsvc_transport_control()
* @param control, caller relinquishes.
* @param value control value, caller relinquishes.
* @remarks as it currently stands, the value object passed through these transport
* interfaces must be a cloneable object, either through being cloneable by default,
* such as the etch_primitive or etch_object derivatives (for example, etch_int32,
* etch_string, etch_date, etch_event, etch_object, and others); or by virtue of
* having custom clone() functions assigned to them.
*/
int tcpdelsvc_transport_control (void* data, etch_event* control, etch_object* valobj)
{
i_delivery_service* ids = (i_delivery_service*)data;
int result = 0;
etch_connection* cx = NULL;
i_transportmessage* dstransport = NULL;
etch_tcp_delivery_service* tcpds = NULL;
const int objclass = control? ((etch_object*)control)->class_id: 0;
const int timeoutms = control && (is_etch_int32(valobj))? ((etch_int32*)control)->value: 0;
ETCH_ASSERT(is_etch_ideliverysvc(ids) && objclass);
tcpds = get_etch_ds_impl(ids); /* delivery service implementation */
ETCH_ASSERT(is_etch_deliverysvc(tcpds));
dstransport = tcpds->transport; /* delivery service transport (mailbox mgr) */
ETCH_ASSERT(is_etch_transportmsg(dstransport));
cx = &tcpds->tcpconx->cx; /* underlying connection */
switch(objclass) /* forward the transport event */
{
case CLASSID_CONTROL_START_WAITUP:
/* point to the condition variable on the waiter. the need to do this
* is a semikludge; that is, having to set some state prior to calling
* waitup; however we can't have the function call preset the state
* because the absence of the state variable means "not waiting", and
* is tested for by the wait function. so we need to have a target for
* the "up" state before we do the waitup, since the connect will complete
* before we get around to asking for the waitup, and it needs to be able
* to mark state as up, thus we set that target below. previously the state
* variable cond_var was not set until the wait_up was invoked. in the
* current design the cond_var is nulled out after a wait, in order to
* reset wait state to not waiting, so we need to ensure it is populated
* in advance of any need to set a wait condition to some state, prior to
* actually waiting.
*/
//etchconx_init_waitstate (cx); /* see comment above */
((etch_object*)control)->class_id = CLASSID_CONTROL_START; /* modify event to not wait */
result = dstransport->transport_control (dstransport->thisx, control, valobj);
if (0 == result)
result = etchconx_wait_up (cx, timeoutms);
break;
case CLASSID_CONTROL_STOP_WAITDOWN:
//etchconx_init_waitstate (cx); /* see comment above */
((etch_object*)control)->class_id = CLASSID_CONTROL_STOP; /* modify event to not wait */
result = dstransport->transport_control (dstransport->thisx, control, valobj);
if (0 == result)
result = etchconx_wait_down (cx, timeoutms);
break;
case CLASSID_CONTROL_START:
case CLASSID_CONTROL_STOP:
default: /* event not of interest here so pass it on */
result = dstransport->transport_control (dstransport->thisx, control, valobj);
}
return result;
}
/**
* tcpdelsvc_transport_notify()
* @param evt, caller relinquishes.
*/
int tcpdelsvc_transport_notify (void* data, etch_event* evt)
{
i_delivery_service* ids = (i_delivery_service*)data;
etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
return tcpds->transport->transport_notify( tcpds->transport->thisx, evt);
}
/**
* tcpdelsvc_transport_query()
* i_transportmessage::transport_query override.
* @param query, caller relinquishes.
*/
etch_object* tcpdelsvc_transport_query (void* data, etch_query* query)
{
i_delivery_service* ids = (i_delivery_service*)data;
int result = 0;
etch_object* resultobj = NULL;
etch_connection* cx = NULL;
const int timeoutms = query? query->value: 0;
const int objclass = query? ((etch_object*)query)->class_id: 0;
etch_tcp_delivery_service* tcpds = get_etch_ds_impl(ids);
i_transportmessage* dstransport = tcpds->transport;
cx = &tcpds->tcpconx->cx;
switch(objclass)
{
case CLASSID_QUERY_WAITUP:
result = etchconx_wait_up(cx, timeoutms);
break;
case CLASSID_QUERY_WAITDOWN:
result = etchconx_wait_down(cx, timeoutms);
break;
default:
resultobj = dstransport->transport_query (dstransport->thisx, query);
query = NULL; /* argument was relinquished */
}
if (query)
etch_object_destroy(query);
return resultobj;
}
/* - - - - - - - - - - - - - - - - - - - - - -
* etch_resources
* - - - - - - - - - - - - - - - - - - - - - -
*/
/*
* etch_transport_resources_init()
* @return transport resources map initialized with default items and values
*/
etch_resources* etch_transport_resources_init(etch_resources* resxmap)
{
const int SESSIONPOOL_INITSIZE = 4;
etch_resources* resources = resxmap? resxmap: new_etch_resources(0);
/* these threadpools are session thread managers, i.e. they manage
* a pool of stub listener threads on either side. */
if (NULL == etch_resources_get (resources, ETCH_RESXKEY_POOLTYPE_QUEUED))
{ /* until we implement a queued pool, we use a free pool */
etch_threadpool* threadpool
= new_threadpool (ETCH_THREADPOOLTYPE_FREE, SESSIONPOOL_INITSIZE);
etch_resources_add (resources, (wchar_t*)ETCH_RESXKEY_POOLTYPE_QUEUED,
(etch_object*) threadpool);
}
if (NULL == etch_resources_get (resources, ETCH_RESXKEY_POOLTYPE_FREE))
{
etch_threadpool* threadpool
= new_threadpool (ETCH_THREADPOOLTYPE_FREE, SESSIONPOOL_INITSIZE);
etch_resources_add (resources, (wchar_t*)ETCH_RESXKEY_POOLTYPE_FREE,
(etch_object*)threadpool);
}
if (NULL == etch_resources_get (resources, ETCH_RESXKEY_MSGIZER_FORMAT))
{
etch_resources_add (resources, (wchar_t*)ETCH_RESXKEY_MSGIZER_FORMAT,
(etch_object*)new_stringw(ETCH_RESXVAL_XPORTFMT_BINARY));
}
return resources;
}
/*
* get_etch_transport_resources()
* @return transport resources map initialized with default items and values
*/
etch_resources* get_etch_transport_resources(etch_resources* resxmap)
{
etch_resources* outresx = NULL;
ETCH_ASSERT(resxmap == NULL || is_etch_hashtable(resxmap));
outresx = etch_transport_resources_init(resxmap);
ETCH_ASSERT(outresx);
return outresx;
}
/* - - - - - - - - - - - - - - - - - - - - - -
* server/client "factories"
* - - - - - - - - - - - - - - - - - - - - - -
*/
/*
* destroy_etch_clientsession()
* destructor for a server's client session's instance data.
*/
int destroy_etch_clientsession (void* data)
{
etch_session* thisx = (etch_session*)data;
if (NULL == thisx) return -1;
if (!is_etchobj_static_content(thisx))
{
etch_object_destroy(thisx->ds);
etch_object_destroy(thisx->server_stub);
etch_object_destroy(thisx->server);
etch_object_destroy(thisx->client);
/* note that we do not destroy the conximpl (accepted tcp connection)
* here, the receive thread destroys it instead */
/* remove this entry from active sessions list.
* note that thisx.thisx is the serverparams hosting the sessions list */
remove_etch_session (thisx->thisx, thisx->session_id);
}
return destroy_objectex((etch_object*)thisx);
}
/*
* new_etch_clientsession()
* constructor for session parameter bundle.
* this object wraps all of a server's per-session instance data.
*/
etch_session* new_etch_clientsession (void* host, etch_connection* cx)
{
etch_session* newobj = (etch_session*) new_object(sizeof(etch_session),
ETCHTYPEB_CLIENT_SESSION, CLASSID_CLIENT_SESSION);
((etch_object*)newobj)->destroy = destroy_etch_clientsession;
newobj->thisx = host; /* etch_serverparams* */
newobj->cx = cx;
newobj->session_id = cx->conxid; /* session key */
/* carrying the session instance data with the connection is not good design,
* however we need it in the receive loop listener threadproc which does not
* see transport.h. when we figure another way to get the session's data
* through to the listener thread, we should remove it from the connection,
* since session data is at a much higher level than connection of course.
*/
cx->session = (etch_object*) newobj;
return newobj;
}
/*
* get_etch_session()
* look up session instance data object matching specified session ID.
* @return index of entry, or -1 if not found.
* array entry is returned via out parameter if specified.
*/
int get_etch_session (etch_server_factory* sf, const int session_id, etch_session** out)
{
int ndx = 0, retndx = -1;
etch_iterator iterator;
etch_session *s = NULL;
ETCH_ASSERT(is_etch_factoryparams(sf));
etch_arraylist_getlock(sf->clientlist);
set_iterator(&iterator, sf->clientlist, &sf->clientlist->iterable);
while(iterator.has_next(&iterator))
{ if ((NULL != (s = iterator.current_value)) && (s->session_id == session_id))
{ if (out) *out = s;
retndx = ndx;
break;
}
ndx++;
iterator.next(&iterator);
}
etch_arraylist_rellock(sf->clientlist);
return retndx;
}
/*
* remove_etch_session()
* remove session instance data object matching specified session ID.
* does not destroy list item content but rather returns it for caller disposition.
* @return the removed object, or NULL if not found.
*/
etch_session* remove_etch_session (etch_server_factory* sf, const int session_id)
{
etch_session* foundsession = NULL;
int whichndx = 0;
etch_arraylist_getlock(sf->clientlist);
whichndx = get_etch_session (sf, session_id, &foundsession);
if (whichndx >= 0) {
etch_arraylist_remove (sf->clientlist, whichndx, FALSE);
}
etch_arraylist_rellock(sf->clientlist);
return foundsession;
}
/*
* destroy_etch_client_factory()
* client params destructor.
* destroys the ancillary objects attached to a remote server.
*/
int destroy_etch_client_factory (void* data)
{
etch_client_factory* thisx = (etch_client_factory*)data;
if (NULL == thisx) return -1;
if (!is_etchobj_static_content(thisx))
{
if (thisx->iclient) /* destroy i_xxxx_client */
{
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "destroying client interface ...\n");
if(((etch_object*)thisx->iclient))
((etch_object*)thisx->iclient)->destroy(((etch_object*)thisx->iclient));
thisx->iclient = NULL;
}
if (thisx->stub) /* destroy xxxx_client_stub */
{
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "destroying stub ...\n");
if(((etch_object*)thisx->stub))
((etch_object*)thisx->stub)->destroy(((etch_object*)thisx->stub));
thisx->stub = NULL;
}
/* destroy i_delivery_service */
if (thisx->dsvc)
{
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "destroying delivery service ...\n");
if(thisx->dsvc)
etch_object_destroy(thisx->dsvc);
thisx->dsvc = NULL;
}
/* destroy etch_resources */
if (thisx->in_resx)
{
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "destroying resources ...\n");
etch_object_destroy(thisx->in_resx);
thisx->in_resx = NULL;
}
etch_object_destroy(thisx->mainpool);
thisx->mainpool = NULL;
etch_mutex_destroy(thisx->rwlock);
thisx->rwlock = NULL;
}
return destroy_objectex(((etch_object*)thisx));
}
/*
* new_client_factory()
* constructor for client parameter bundle.
*/
etch_client_factory* new_client_factory (etch_object* session, i_session* isession, main_client_create_func main_client_create)
{
etch_status_t status = ETCH_SUCCESS;
const int MAINPOOL_INITIAL_SIZE = 4;
etch_client_factory* cf = (etch_client_factory*) new_object
(sizeof(etch_client_factory), ETCHTYPEB_FACTORYPARAMS, CLASSID_CLIENTFACTORY);
((etch_object*)cf)->destroy = destroy_etch_client_factory;
cf->new_client = main_client_create;
cf->mainpool = new_threadpool (ETCH_THREADPOOLTYPE_FREE, MAINPOOL_INITIAL_SIZE);
// TODO: pool
status = etch_mutex_create(&cf->rwlock, ETCH_MUTEX_UNNESTED, NULL);
ETCH_ASSERT(status == ETCH_SUCCESS);
return cf;
}
/*
* destroy_etch_server_factory()
* server params destructor.
*/
int destroy_etch_server_factory (void* data)
{
etch_server_factory* thisx = (etch_server_factory*)data;
if (NULL == thisx) return -1;
if (!is_etchobj_static_content(thisx))
{
etch_object_destroy(thisx->clientlist);
thisx->clientlist = NULL;
etch_object_destroy(thisx->mainpool);
thisx->mainpool = NULL;
etch_object_destroy(thisx->subpool);
thisx->subpool = NULL;
etch_mutex_destroy(thisx->mblock);
thisx->mblock = NULL;
}
return destroy_objectex((etch_object*)thisx);
}
/*
* new_server_factory()
* constructor for server parameter bundle.
* fyi invoked from this.new_etch_listener()
*/
etch_server_factory* new_server_factory (etch_object* session, i_session* isession,
helper_listener_create_func helper_listener_create, main_server_create_func main_server_create)
{
etch_status_t status = ETCH_SUCCESS;
const int MAINPOOL_INITIAL_SIZE = 4, SUBPOOL_INITIAL_SIZE = 4;
etch_server_factory* sf = (etch_server_factory*) new_object(sizeof(etch_server_factory), ETCHTYPEB_FACTORYPARAMS, CLASSID_SERVERFACTORY);
((etch_object*)sf)->destroy = destroy_etch_server_factory;
sf->clientlist = new_etch_arraylist_synchronized(ETCH_DEFSIZE, ETCH_DEFSIZE);
sf->clientlist->content_type = ETCHARRAYLIST_CONTENT_SIMPLE;
sf->clientlist->is_readonly = TRUE; /* array does not own its content */
sf->helper_new_listener = helper_listener_create;
sf->main_new_server = main_server_create;
sf->session = session;
sf->isession = isession;
sf->mainpool = new_threadpool (ETCH_THREADPOOLTYPE_FREE, MAINPOOL_INITIAL_SIZE);
sf->subpool = new_threadpool (ETCH_THREADPOOLTYPE_FREE, SUBPOOL_INITIAL_SIZE);
// TODO: pool
status = etch_mutex_create(&sf->mblock, ETCH_MUTEX_UNNESTED, NULL);
ETCH_ASSERT(status == ETCH_SUCCESS);
return sf;
}
/* - - - - - - - - - - - - - - - - - - - - - -
* transport listener
* - - - - - - - - - - - - - - - - - - - - - -
*/
/*
* tcpxfact_get_session()
* return session interface from the server factory bundle.
* validate and assert the i_sessionlistener object.
*/
i_session* tcpxfact_get_session (i_sessionlistener* lxr)
{
i_session* session = NULL;
etch_server_factory* factory = NULL;
ETCH_ASSERT(is_etch_sessionlxr(lxr));
factory = lxr->server_params;
session = factory? factory->isession: NULL;
return session;
}
/*
* tcpxfact_session_control()
* @param control event, caller relinquishes.
* @param value control value, caller relinquishes.
*/
int tcpxfact_session_control (void* data, etch_event* control, etch_object* value)
{
i_sessionlistener* thisx = (i_sessionlistener*)data;
int result = -1;
i_session* session = tcpxfact_get_session (thisx);
if (session && session->session_control)
result = session->session_control (session, control, value);
else
{
etch_object_destroy(control);
etch_object_destroy(value);
}
return result;
}
/*
* tcpxfact_session_notify()
* @param evt event, caller relinquishes.
*/
int tcpxfact_session_notify (void* data, etch_event* evt)
{
i_sessionlistener* thisx = (i_sessionlistener*)data;
int result = -1;
i_session* session = tcpxfact_get_session (thisx);
if (session && session->session_notify)
result = session->session_notify (session, evt);
else
etch_object_destroy(evt);
return result;
}
/*
* tcpxfact_session_query()
* @param query caller relinquishes
*/
etch_object* tcpxfact_session_query (void* data, etch_query* query)
{
i_sessionlistener* thisx = (i_sessionlistener*)data;
void* resultobj = NULL;
i_session* session = tcpxfact_get_session (thisx);
if (session && session->session_query)
resultobj = session->session_query (session, query);
else
etch_object_destroy(query);
return resultobj;
}
/*
* transport_thread_id()
* return thread_id for thread zero on the main pool of this listener.
*/
int transport_thread_id (i_sessionlistener* listener)
{
etch_server_factory* sf = NULL; etch_thread* thread0 = NULL;
ETCH_ASSERT(listener && is_etch_serverparams(listener->server_params));
sf = (etch_server_factory*) listener->server_params;
ETCH_ASSERT(sf->mainpool);
thread0 = threadpool_thread (sf->mainpool, 0);
return thread0? thread0->params.etch_thread_id: 0;
}
/*
* transport_session_count()
* return count of outstanding client sessions for this server.
*/
int transport_session_count (i_sessionlistener* listener)
{
etch_server_factory* sf = NULL;
if (NULL == listener || NULL == listener->server_params) return 0;
sf = listener->server_params;
if (NULL == sf || NULL == sf->clientlist) return 0;
return sf->clientlist->count;
}
/*
* tcpxfact_teardown_client_sessions()
* signal and wait for each session thread to exit, destroying each
* thread, connection and session. tearing down the session destroys its
* delivery service, remote client, and stub. this is intended to be invoked
* only at server shutdown, after the main (accept) thread has exited.
*/
int transport_teardown_client_sessions (i_sessionlistener* listener)
{
etch_iterator iterator;
etch_session* session = NULL;
etch_server_factory* sf = listener->server_params;
set_iterator (&iterator, sf->clientlist, &sf->clientlist->iterable);
while(iterator.has_next(&iterator)) /* for each extant client session ... */
{
if (NULL != (session = iterator.current_value))
{
if (is_etch_connection(session->cx))
{
session->cx->is_started = FALSE; /* mark connection stopped */
if (is_etch_thread(session->cx->thread)){
etch_join (session->cx->thread); /* BLOCK for thread exit */
}
}
//etch_object_destroy(session->conximpl);
//session->conximpl = NULL;
//etch_object_destroy(session); /* teardown this session */
}
iterator.next(&iterator);
}
return 0;
}
/*
* etch_listener_waitfor_exit()
* block until accept listener thread exits.
*/
int etch_listener_waitfor_exit (i_sessionlistener* thisx)
{
etch_server_factory* p = thisx->server_params;
const int result = threadpool_waitfor_all (p->mainpool, FALSE);
return result;
}
/*
* tcpxfact_session_accepted()
* override for transport factory session_accepted()
* signature is typedef int (*etch_session_accepted) (void* thisx, void* socket);
* parallels java TcpTransportFactory.newListener.newSessionListener.sessionAccepted
* @param thisx the i_sessionlistener quasi interface
* @param socket an open accept raw socket, wrapped by etch_socket. caller relinquishes.
* in practice this is an apr socket wrapped by etch_socket.
* @return 0 success, -1 failure.
*/
int tcpxfact_session_accepted (void* data, void* connectionData)
{
i_sessionlistener* thisx = (i_sessionlistener*)data;
etch_tcp_connection* tcpconx = (etch_tcp_connection*)connectionData;
int result = 0;
void* newstub = NULL;
etch_session* newsession = NULL;
etch_server_factory* params = NULL;
etch_connection* cx = &tcpconx->cx;
i_delivery_service* delivery_service = NULL;
const int session_id = cx->conxid;
ETCH_ASSERT(is_etch_sessionlxr(thisx));
ETCH_ASSERT(is_etch_tcpconnection(tcpconx));
params = (etch_server_factory*) thisx->server_params;
ETCH_ASSERT(params && params->helper_new_listener);
/* fyi java binding makes a copy of the generic resources here.
* we instead will not use resources for client-specific map entries,
* but rather will use the parameter bundle. TODO allocate client-specific
* segment of parameter bundle.
*/
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_INFO, "creating client session %d ...\n", session_id);
/* instantiate delivery service */
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "creating delivery service ...\n");
delivery_service = new_etch_transport_a (thisx->url, thisx->server_params, tcpconx);
if (NULL == delivery_service)
{ ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "could not create delivery service\n");
return -1;
}
ETCH_ASSERT(delivery_service->itm && delivery_service->itm->transport_control);
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "delivery service created\n");
newsession = new_etch_clientsession (params, cx);
newsession->mainlistener = thisx; /* session points back to accept listener */
newsession->ds = delivery_service;
newsession->conximpl = (etch_object*) tcpconx;
/* CALL BACK to helper.xxx_helper_listener_create to create this
* client's server side listener, server implementation, and stub.
*/
newstub = params->helper_new_listener(params, newsession);
ETCH_ASSERT(is_etch_stub(newstub));
etch_arraylist_add (params->clientlist, newsession);
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "client session %d created\n", session_id);
/* START this client's individual listener. since we have an accepted socket
* in hand, it is in effect already started.
*/
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_INFO, "starting client session %d ...\n", session_id);
result = delivery_service->itm->transport_control (delivery_service,
new_etch_event(CLASSID_CONTROL_START, 0), NULL);
return result;
}
/*
* destroy_etch_listener()
* etch_listener destructor.
*/
int destroy_etch_listener (void* data)
{
i_sessionlistener* thisx = (i_sessionlistener*)data;
if (NULL == thisx) return -1;
if (!is_etchobj_static_content(thisx)) {
etch_object_destroy(thisx->url);
thisx->url = NULL;
if (thisx->is_session_owned)
etch_free(thisx->isession);
if (thisx->is_transport_owned)
etch_free(thisx->itransport);
if (thisx->is_resources_owned)
{
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_DEBUG, "destroying resources ...\n");
etch_object_destroy(thisx->resources);
thisx->resources = NULL;
}
if (thisx->thisx)
{ /* watch this spot: the i_sessionlistener and the etch_tcp_server
* have mutual references. we must ensure that if we are to
* destroy the etch_tcp_server via the i_sessionlistener, that the
* etch_tcp_server does not also destroy the i_sessionlistener. */
etch_tcp_server* srvobj = (etch_tcp_server*) thisx->thisx;
ETCH_ASSERT(is_etch_tcpserver(srvobj));
etch_object_destroy(srvobj);
}
if (thisx->server_params)
{
etch_server_factory* sf = thisx->server_params;
ETCH_ASSERT(is_etch_serverparams(sf));
etch_object_destroy(sf);
}
}
return destroy_objectex((etch_object*)thisx);
}
/*
* new_etch_listener()
* constructs a new transport listener used to construct server sessions.
* @param uri a uri string, caller relinquishes.
* @param resx a resources map, caller relinquishes. currently ALWAYS NULL.
* @param helper_new_server_funcptr pointer to the listener ctor in server helper.
* @param main_new_server_funcptr pointer to the server impl ctor in main.
* @param get_xxxx_resources_funcptr helper new service resources callback.
* @return an i_sessionlistener interface. caller owns it. note that java binding
* returns a transport interface, whereas c binding will instead extract the
* transport interface from i_sessionlistener.itransport.
*/
i_sessionlistener* new_etch_listener (wchar_t* uri, etch_resources* resx,
void* factory_thisx,
helper_listener_create_func helper_listener_create,
main_server_create_func main_server_create,
helper_resources_init_func helper_resources_init)
{
etch_tcp_server* tcp_server = NULL;
etch_server_factory* params = NULL;
etch_url* url = new_url(uri);
/* listener assumes the session interface of the server factory creator.
* this accomplishes the same thing as the session method implementations
* found in java TcpTransportFactory.newListener().
*/
i_session* isession = new_session_interface (NULL,
tcpxfact_session_control,
tcpxfact_session_notify,
tcpxfact_session_query);
/* create the listener interface, specifying the on_session_accepted
* callback to be invoked on each successful server accept in order
* to create a new server. relinquish isession to listener here. */
i_sessionlistener* listener = new_sessionlistener_interface (NULL,
tcpxfact_session_accepted, isession);
((etch_object*)listener)->destroy = destroy_etch_listener;
listener->wait_exit = etch_listener_waitfor_exit;
listener->url = url; /* relinquished */
/* create server "factory", which is in the c binding a parameter
* bundle which includes callbacks to the new server constructors */
params = new_server_factory ((etch_object*) listener, listener->isession, helper_listener_create, main_server_create);
params->thisx = factory_thisx;
/* instantiate generic resources and call back to specific helper for vf */
listener->is_resources_owned = TRUE;
listener->resources = get_etch_transport_resources (resx); /* resx null */
params->in_resx = listener->resources;
params->in_uri = uri;
helper_resources_init(params);
listener->server_params = params;
/* fyi params delivery service is set later, in svr->on_session_accepted(),
whose implementation is tcpxfact_session_accepted(), in this module */
/* create the tcp connection and acceptor SVR BREAK 001 */
tcp_server = new_tcp_server (url, params->mainpool, params->subpool, resx, listener);
if (NULL == tcp_server) {
etch_object_destroy(listener);
return NULL;
}
/* listener [main] expects that i_sessionlistener.thisx is the server,
* e.g. an etch_tcp_server* */
listener->thisx = tcp_server;
/* copy server object's session virtuals to this object */
/* see java TcpTransportFactory.newListener() for session impls */
listener->session = tcp_server->session;
listener->isession = tcp_server->isession;
listener->session_control = tcp_server->session_control;
listener->session_notify = tcp_server->session_notify;
listener->session_query = tcp_server->session_query;
/* set this listener object's transport to be the server connection's transport */
ETCH_ASSERT(tcp_server->itransport);
etch_free(listener->itransport); /* TODO don't instantiate in the first place */
listener->itransport = tcp_server->itransport;
listener->transport_control = tcp_server->transport_control;
listener->transport_notify = tcp_server->transport_notify;
listener->transport_query = tcp_server->transport_query;
listener->set_session = tcp_server->set_session;
listener->get_session = tcp_server->get_session;
listener->is_transport_owned = FALSE;
return listener; /* caller owns this object */
}