blob: 58ad19129ccd1aaa5cf3e744b3224d2df2776e95 [file] [log] [blame]
/* $Id$
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* etch_packetizer.c
* packetizes a stream data source. Reads and verfifies a packet header consisting
* of a 32-bit signature and 32-bit length. verifies the flag, using length extracted
* from header; reads packet data and forwards it to the packet handler. as a packet
* source, accepts a packet and prepends a packet header prior to delivering it to a
* data source.
*/
#include "etch_packetizer.h"
#include "etch_objecttypes.h"
#include "etch_url.h"
#include "etch_log.h"
static const char* LOG_CATEGORY = "etch_packetizer";
etch_packetizer* new_packetizer_b (i_transportdata*, const int maxpktsize);
int etch_pktizer_session_control (void*, etch_event*, etch_object*);
int etch_pktizer_session_notify (void*, etch_event*);
etch_object* etch_pktizer_session_query (void*, etch_query*);
int etch_pktizer_transport_control(void*, etch_event*, etch_object*);
int etch_pktizer_transport_notify (void*, etch_event*);
etch_object* etch_pktizer_transport_query (void*, etch_query*);
const wchar_t* ETCH_PKTIZER_MAX_PKT_SIZE_TERM = L"Packetizer.maxPktSize";
const int ETCH_PKTIZER_DEFMAXPKTSIZE = 10240;
const int ETCH_PKTIZER_HEADERSIZE = 8;
const int ETCH_PKTIZER_SIG = 0xdeadbeef;
/* - - - - - - - - - - - - - - - - - - - - - -
* packetizer constructor/destructor
* - - - - - - - - - - - - - - - - - - - - - -
*/
/**
* new_packetizer()
* * etch_packetizer public constructor
* @param itd i_transportdata interface owned by caller
* @param uri a URI string owned by caller.
* @param resources a resources map owned by caller.
*/
etch_packetizer* new_packetizer (i_transportdata* itd, wchar_t* uri, etch_resources* resxmap)
{
etch_url* url = new_url(uri);
etch_packetizer* packetizer = new_packetizer_a (itd, url, resxmap);
etch_object_destroy(url);
return packetizer;
}
/**
* new_packetizer_a()
* etch_packetizer constructor 2
*/
etch_packetizer* new_packetizer_a (i_transportdata* itd, etch_url* url, etch_resources* resxmap)
{
int maxpktsize = 0, result = 0;
result = etchurl_get_integer_term (url, ETCH_PKTIZER_MAX_PKT_SIZE_TERM, &maxpktsize);
if (-1 == result || maxpktsize <= 0) maxpktsize = ETCH_PKTIZER_DEFMAXPKTSIZE;
return new_packetizer_b (itd, maxpktsize);
}
/*
* destroy_packetizer()
* etch_packetizer destructor
*/
int destroy_packetizer(void* data)
{
etch_packetizer* thisx = (etch_packetizer*)data;
etch_status_t status = ETCH_SUCCESS;
if (NULL == thisx) return -1;
if (!is_etchobj_static_content(thisx))
{
etch_object_destroy(thisx->sessiondata);
etch_object_destroy(thisx->transportpkt);
if (thisx->datalock) {
status = etch_mutex_destroy(thisx->datalock);
ETCH_ASSERT(status == ETCH_SUCCESS);
}
etch_object_destroy(thisx->savebuf);
}
return destroy_objectex((etch_object*)thisx);
}
/**
* etch_pktizer_transport_packet()
* delivers packet to transport after adding packet header.
* @param whoto recipient - caller retains this memory.
* @param fbuf buffer positioned at the packet data, with space for header
* @return 0 success, -1 error
*/
int etch_pktizer_transport_packet(void* data, void* whoData, void* bufferData)
{
etch_who* whoto = (etch_who*)whoData;
etch_flexbuffer* fbuf = (etch_flexbuffer*)bufferData;
etch_packetizer* thisx = (etch_packetizer*)data;
int result = -1;
ETCH_ASSERT(is_etch_packetizer(thisx));
do
{ size_t saveindex = 0;
const int datalen = (int) etch_flexbuf_avail(fbuf);
if (datalen < ETCH_PKTIZER_HEADERSIZE) break;
saveindex = fbuf->index;
etch_flexbuf_put_int(fbuf, ETCH_PKTIZER_SIG);
etch_flexbuf_put_int(fbuf, datalen - ETCH_PKTIZER_HEADERSIZE);
fbuf->index = saveindex;
/* deliver packet buffer to transport */
result = thisx->transport->transport_data(thisx->transport->thisx, whoto, fbuf);
} while(0);
return result;
}
/**
* etch_pktizer_session_data()
* i_sessiondata::session_data override.
* delivers data to the session from the transport
* @param whofrom from who sent the packet data
* caller retains this memory, can be null.
* @param fbuf the packet from the packet source positioned at the data, caller retains.
* @return 0 success, -1 error.
*/
int etch_pktizer_session_data (void* data, void* whoData, void* bufferData)
{
etch_packetizer* pzr = (etch_packetizer*)data;
etch_who* whofrom = (etch_who*)whoData;
etch_flexbuffer* fbuf = (etch_flexbuffer*)bufferData;
/* two scenarios: the first is that we have no buffered data and the
* entire packet is contained within the buffer. in that case we can skip
* the details and ship the packet directly to the handler.
*/
size_t curlen, curndx, bytes_avail, fbuf_avail, packet_bodylen, bytes_put;
int bytes_needed;
etch_flexbuffer* savebuf = NULL;
ETCH_ASSERT(is_etch_packetizer(pzr));
savebuf = pzr->savebuf;
while(1) /* while(fbuf.avail() > 0 ... */
{ if (0 >= (fbuf_avail = etch_flexbuf_avail(fbuf))) break;
bytes_avail = savebuf->datalen + fbuf_avail;
if (pzr->is_wantheader)
{
if (bytes_avail >= (unsigned) ETCH_PKTIZER_HEADERSIZE)
{ /* there are enough bytes in the pipeline for a header */
if (savebuf->datalen == 0)
{ /* save buffer is empty, entire header in fbuf */
packet_bodylen = etch_pktizer_process_header(pzr, fbuf, FALSE);
}
else /* save non empty, header split across save and buf, so ... */
{ /* move enough bytes from buf to save to complete a header */
bytes_needed = ETCH_PKTIZER_HEADERSIZE - (int) savebuf->datalen;
bytes_put = etch_flexbuf_put_from (savebuf, fbuf, bytes_needed);
if (0 == bytes_put) return -1;
etch_flexbuf_set_index(savebuf, 0);
packet_bodylen = etch_pktizer_process_header(pzr, savebuf, TRUE);
}
if (packet_bodylen == -1) return -1; /* bad header */
if (packet_bodylen == 0) continue; /* header with empty body */
pzr->bodylength = packet_bodylen;
pzr->is_wantheader = FALSE;
}
else /* want header, but too few bytes in the pipeline */
{ /* ... so save fbuf to the save buffer */
etch_flexbuf_set_index (savebuf, savebuf->datalen);
etch_flexbuf_put_from(savebuf, fbuf, -1);
}
}
else /* didn't need a header, so ... */
if (bytes_avail >= pzr->bodylength)
{
/* we need the body, and there are enough bytes in the pipeline.
* three scenarios: the body is entirely in savebuf, the body is
* split, or the body is entirely in fbuf. assert that the body
* is not entirely in save, or we'd have processed it last time.
*/
ETCH_ASSERT(savebuf->datalen < pzr->bodylength);
if (savebuf->datalen == 0)
{ /* saved buffer is empty, entire body is in fbuf */
curlen = fbuf->datalen;
curndx = fbuf->index;
etch_flexbuf_set_length(fbuf, curndx + pzr->bodylength);
/* send the packet in the input buffer up the chain to the next
* higher layer of the transport stack. fyi packetizer.session.thisx
* is that layer object, ordinarily the messagizer.
*/
pzr->session->session_packet (pzr->session->thisx, whofrom, fbuf);
/* fyi the input buffer can contain a partial packet, multiple
* packets, or whatever, so we may not be done with it yet. */
etch_flexbuf_set_length(fbuf, curlen);
etch_flexbuf_set_index (fbuf, curndx + pzr->bodylength);
pzr->is_wantheader = TRUE;
}
else
{ /* savebuf.datalen not zero, so body is split across the save buffer
* and the input buffer. move enough bytes from input buffer fbuf
* to complete the packet body.
*/
bytes_needed = (int) (pzr->bodylength - savebuf->datalen);
bytes_put = etch_flexbuf_put_from (savebuf, fbuf, bytes_needed);
if (bytes_put <= 0) return -1;
etch_flexbuf_set_index(savebuf, 0);
/* send the newly-isolated packet up the chain to the next higher
* layer of the transport stack. fyi packetizer.session.thisx
* is that layer object, ordinarily the messagizer.
*/
pzr->session->session_packet (pzr->session->thisx, whofrom, savebuf);
etch_flexbuf_reset(savebuf);
pzr->is_wantheader = TRUE;
}
}
else /* need body but too few bytes in pipeline to complete it */
{ /* ... so save the input buffer to the save buffer */
bytes_put = etch_flexbuf_put_from (savebuf, fbuf, ETCH_FLEXBUF_PUT_ALL);
}
}
/* buffer is now empty and we're done */
return fbuf_avail == 0? 0: -1;
}
/**
* etch_pktizer_set_session()
* set packetizer session interface presumably to that of the next higher layer.
* @param session an i_sessionpacket reference. caller owns this object.
*/
void etch_pktizer_set_session (void* data, void* sessionData)
{
etch_packetizer* thisx = (etch_packetizer*)data;
i_session* newsession = (i_session*)sessionData;
ETCH_ASSERT(is_etch_packetizer(thisx));
ETCH_ASSERT(is_etch_sessionpacket(newsession));
thisx->session = (i_sessionpacket*)newsession;
}
/**
* new_packetizer_b()
* etch_packetizer private constructor
* @param transport the transport interface of the next lower layer of the stack,
* that being the connection, e.g. etch_tcp_connection. not owned.
* @param maxpktsize maximum number of bytes in a packet (default currently 10240)
*/
etch_packetizer* new_packetizer_b (i_transportdata* itd, const int maxpktsize)
{
etch_packetizer* packetizer = NULL;
i_transport* itransport = NULL;
i_session* isession = NULL;
etch_mutex* mutex = NULL;
int result = -1;
ETCH_ASSERT(is_etch_transportdata(itd));
do
{
#if(ETCHPZR_HAS_MUTEX)
if (NULL == (mutex = new_mutex(etch_apr_mempool, ETCHMUTEX_NESTED))) break;
#endif
/* - - - - - - - - - - - - - - -
* etch_packetizer
* - - - - - - - - - - - - - - -
*/
packetizer = (etch_packetizer*) new_object
(sizeof(etch_packetizer), ETCHTYPEB_PACKETIZER, CLASSID_PACKETIZER);
((etch_object*)packetizer)->destroy = destroy_packetizer;
((etch_object*)packetizer)->clone = clone_null;
packetizer->datalock = mutex;
packetizer->headersize = ETCH_PKTIZER_HEADERSIZE;
packetizer->is_wantheader = TRUE;
packetizer->maxpacketsize = maxpktsize > 0? maxpktsize: ETCH_PKTIZER_DEFMAXPKTSIZE;
packetizer->savebuf = new_flexbuffer(ETCH_DEFSIZE); /* 2K default */
/* set our transport to that of next lower layer (connection) */
packetizer->transport = itd; /* not owned */
/* - - - - - - - - - - - - - - -
* i_transportpacket
* - - - - - - - - - - - - - - -
*/
itransport = new_transport_interface_ex (packetizer,
(etch_transport_control) etch_pktizer_transport_control,
(etch_transport_notify) etch_pktizer_transport_notify,
(etch_transport_query) etch_pktizer_transport_query,
etch_pktizer_get_session,
etch_pktizer_set_session);
/* instantiate i_transportpacket interface which packetizer implements */
packetizer->transportpkt = new_transportpkt_interface (packetizer,
etch_pktizer_transport_packet,
itransport); /* transportpkt now owns itransport */
/* copy i_transportpacket interface methods up to packetizer */
packetizer->transport_packet = etch_pktizer_transport_packet;
packetizer->transport_control = itransport->transport_control;
packetizer->transport_notify = itransport->transport_notify;
packetizer->transport_query = itransport->transport_query;
packetizer->get_session = itransport->get_session;
packetizer->set_session = itransport->set_session;
/* - - - - - - - - - - - - - - -
* i_sessiondata
* - - - - - - - - - - - - - - -
*/
isession = new_session_interface (packetizer,
(etch_session_control) etch_pktizer_session_control,
(etch_session_notify) etch_pktizer_session_notify,
(etch_session_query) etch_pktizer_session_query);
/* instantiate i_sessiondata interface which packetizer implements */
packetizer->sessiondata = new_sessiondata_interface(packetizer,
etch_pktizer_session_data,
isession); /* sessiondata now owns isession */
/* copy session interface to parent */
packetizer->session_data = etch_pktizer_session_data;
packetizer->session_control = isession->session_control;
packetizer->session_notify = isession->session_notify;
packetizer->session_query = isession->session_query;
/* finally set session of next lower layer (tcp connection) to our session */
/* fyi we must pass the implementor of transport as thisx, i.e. tcpconnection */
packetizer->transport->set_session (packetizer->transport->thisx, packetizer->sessiondata);
result = 0;
} while(0);
if (-1 == result) {
etch_object_destroy(packetizer);
packetizer = NULL;
}
return packetizer;
}
/* - - - - - - - - - - - - - - -
* i_transportpacket
* - - - - - - - - - - - - - - -
*/
/**
* etch_pktizer_get_session()
* @return a reference to the packetizer i_sessiondata interface.
* this object is owned by whatever object set the packetizer session.
*/
i_session* etch_pktizer_get_session (void* data)
{
etch_packetizer* thisx = (etch_packetizer*)data;
ETCH_ASSERT(is_etch_packetizer(thisx));
return (i_session*)thisx->session;
}
/**
* etch_pktizer_transport_control()
* i_transportpacket::transport_control override.
* @param control event, caller relinquishes.
* @param value control value, caller relinquishes.
*/
int etch_pktizer_transport_control (void* data, etch_event* control, etch_object* value)
{
etch_packetizer* thisx = (etch_packetizer*)data;
ETCH_ASSERT(is_etch_packetizer(thisx));
/* pzr itd pzr itd tcpconx */
return thisx->transport->transport_control (thisx->transport->thisx, control, value);
}
/**
* etch_pktizer_transport_notify()
* i_transportpacket::transport_notify override.
* @param evt, caller relinquishes.
*/
int etch_pktizer_transport_notify (void* data, etch_event* evt)
{
etch_packetizer* thisx = (etch_packetizer*)data;
ETCH_ASSERT(is_etch_packetizer(thisx));
return thisx->transport->transport_notify (thisx->transport->thisx, evt);
}
/**
* etch_pktizer_transport_query()
* i_transportpacket::transport_query override.
* @param query, caller relinquishes.
*/
etch_object* etch_pktizer_transport_query (void* data, etch_query* query)
{
etch_packetizer* thisx = (etch_packetizer*)data;
ETCH_ASSERT(is_etch_packetizer(thisx));
return thisx->transport->transport_query (thisx->transport->thisx, query);
}
/* - - - - - - - - - - - - - - -
* i_sessiondata
* - - - - - - - - - - - - - - -
*/
/*
* etch_pktizer_process_header()
* extract header information from packet
* returns data length from header, or -1 if bad header
*/
int etch_pktizer_process_header (etch_packetizer* pzr, etch_flexbuffer* fbuf, const int is_reset)
{
int curlen = 0, signature = 0;
int result = etch_flexbuf_get_int(fbuf, &signature);
if (-1 == result || ETCH_PKTIZER_SIG != signature) return -1;
result = etch_flexbuf_get_int(fbuf, &curlen);
if (is_reset)
etch_flexbuf_reset(fbuf);
if( curlen > (int) pzr->maxpacketsize) {
ETCH_LOG(LOG_CATEGORY, ETCH_LOG_WARN, "Packetsize in header is too long, dropping header.");
}
return result == -1 || curlen < 0 || curlen > (int) pzr->maxpacketsize?
-1: curlen;
}
/**
* etch_pktizer_session_control()
* i_sessiondata::session_control override.
* @param control event, caller relinquishes.
* @param value control value, caller relinquishes.
*/
int etch_pktizer_session_control (void* data, etch_event* control, etch_object* value)
{
etch_packetizer* thisx = (etch_packetizer*)data;
ETCH_ASSERT(is_etch_packetizer(thisx));
return thisx->session->session_control (thisx->session->thisx, control, value);
}
/**
* etch_pktizer_session_notify()
* i_sessiondata::session_notify override.
* @param event, caller relinquishes.
*/
int etch_pktizer_session_notify (void* data, etch_event* evt)
{
etch_packetizer* thisx = (etch_packetizer*)data;
ETCH_ASSERT(is_etch_packetizer(thisx));
return thisx->session->session_notify (thisx->session->thisx, evt);
}
/**
* etch_pktizer_session_query()
* i_sessiondata::session_query override.
* @param query, caller relinquishes.
*/
etch_object* etch_pktizer_session_query (void* data, etch_query* query)
{
etch_packetizer* thisx = (etch_packetizer*)data;
ETCH_ASSERT(is_etch_packetizer(thisx));
return thisx->session->session_query (thisx->session->thisx, query);
}