blob: 66c4af085ee55e6ff44eca771686112c5aa0a217 [file] [log] [blame]
/* $Id$
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "transport/EtchPacketizer.h"
#include "support/EtchRuntime.h"
const capu::uint32_t& EtchPacketizer::HEADER_SIZE() {
static const capu::uint32_t headerSize(8);
return headerSize;
}
const capu::uint32_t& EtchPacketizer::SIG() {
static const capu::uint32_t sig(0xDEADBEEFu);
return sig;
}
const capu::uint32_t& EtchPacketizer::SIG_SIZE() {
static const capu::uint32_t sigSize(4);
return sigSize;
}
const capu::int32_t& EtchPacketizer::DEFAULT_MAX_PKT_SIZE(){
static const capu::int32_t pktSize(16384 - EtchPacketizer::HEADER_SIZE());
return pktSize;
}
const EtchString& EtchPacketizer::MAX_PKT_SIZE_TERM() {
static const EtchString pktSize("Packetizer.maxPktSize");
return pktSize;
}
EtchPacketizer::EtchPacketizer(EtchRuntime* runtime, EtchTransportData* transport, EtchString& uri)
: mRuntime(runtime), mTransport(transport) {
if (mTransport != NULL)
mTransport->setSession(this);
EtchURL url(uri.c_str());
EtchString value;
url.getTerms().get(EtchPacketizer::MAX_PKT_SIZE_TERM(), &value);
if (value.length() == 0)
mMaxPktSize = EtchPacketizer::DEFAULT_MAX_PKT_SIZE();
else {
mMaxPktSize = atoi(value.c_str());
if (mMaxPktSize <= 0)
mMaxPktSize = DEFAULT_MAX_PKT_SIZE();
}
mReadBuf = new EtchFlexBuffer();
mTempBuf = new EtchFlexBuffer();
}
EtchPacketizer::EtchPacketizer(EtchRuntime* runtime, EtchTransportData* transport, EtchURL* uri)
: mRuntime(runtime), mTransport(transport) {
EtchString value("");
if (mTransport != NULL)
transport->setSession(this);
uri->getTerms().get(EtchPacketizer::MAX_PKT_SIZE_TERM(), &value);
if (value.length() == 0)
mMaxPktSize = EtchPacketizer::DEFAULT_MAX_PKT_SIZE();
else {
mMaxPktSize = atoi(value.c_str());
if (mMaxPktSize <= 0)
mMaxPktSize = DEFAULT_MAX_PKT_SIZE();
}
mReadBuf = new EtchFlexBuffer();
mTempBuf = new EtchFlexBuffer();
}
EtchPacketizer::~EtchPacketizer() {
}
capu::int32_t EtchPacketizer::getHeaderSize() {
return EtchPacketizer::HEADER_SIZE();
}
status_t EtchPacketizer::transportControl(capu::SmartPointer<EtchObject> control, capu::SmartPointer<EtchObject> value) {
return mTransport->transportControl(control, value);
}
status_t EtchPacketizer::transportNotify(capu::SmartPointer<EtchObject> event) {
return mTransport->transportNotify(event);
}
status_t EtchPacketizer::transportQuery(capu::SmartPointer<EtchObject> query, capu::SmartPointer<EtchObject> *result) {
return mTransport->transportQuery(query, result);
}
EtchSessionPacket* EtchPacketizer::getSession() {
return mSession;
}
void EtchPacketizer::setSession(EtchSessionPacket* session) {
mSession = session;
}
status_t EtchPacketizer::sessionControl(capu::SmartPointer<EtchObject> control, capu::SmartPointer<EtchObject> value) {
return mSession->sessionControl(control, value);
}
status_t EtchPacketizer::sessionNotify(capu::SmartPointer<EtchObject> event) {
return mSession->sessionNotify(event);
}
status_t EtchPacketizer::sessionQuery(capu::SmartPointer<EtchObject> query, capu::SmartPointer<EtchObject> &result) {
return mSession->sessionQuery(query, result);
}
status_t EtchPacketizer::transportPacket(capu::SmartPointer<EtchWho> recipient, capu::SmartPointer<EtchFlexBuffer> buf) {
// assert index is at the start of the header.
capu::uint32_t dataSize = buf->getAvailableBytes();
if (dataSize < EtchPacketizer::HEADER_SIZE())
return ETCH_ERANGE;
capu::uint32_t pktSize = dataSize - EtchPacketizer::HEADER_SIZE();
if (pktSize > mMaxPktSize)
return ETCH_ERANGE;
capu::uint32_t index = buf->getIndex();
buf->putInt(EtchPacketizer::SIG());
buf->putInt(pktSize);
buf->setIndex(index);
ETCH_LOG_DEBUG(mRuntime->getLogger(), mRuntime->getLogger().getPacketizerContext(), "Header is constructed and raw data has been sent to Transport for transmission");
return mTransport->transportData(recipient, buf);
}
status_t EtchPacketizer::sessionData(capu::SmartPointer<EtchWho> sender, capu::SmartPointer<EtchFlexBuffer> buf) {
status_t result = ETCH_OK;
if(mReadBuf->getAvailableBytes() > 0)
{
//If there is already data in the buffer, amend the new buffer by setting the index to the end of the buffer
mReadBuf->setIndex(mReadBuf->getLength());
}
mReadBuf->put(*buf);
mReadBuf->setIndex(0);
while(bufferContainsPacket())
{
EtchFlexBufferPtr packetData = extractPacket();
result = mSession->sessionPacket(sender, packetData);
}
return result;
}
capu::bool_t EtchPacketizer::bufferContainsPacket()
{
if(!bufferContainsHeader())
{
return false;
}
capu::uint32_t packetSize = 0;
if(mReadBuf->getInteger(packetSize) != ETCH_OK || mReadBuf->getAvailableBytes() < packetSize)
{
mReadBuf->setIndex(0);
return false;
}
if (packetSize > mMaxPktSize) {
ETCH_LOG_ERROR(mRuntime->getLogger(), mRuntime->getLogger().getPacketizerContext(), "EtchPacketizer: PacketSize exceeded Maximum Packetsize. MaximumSize: " << mMaxPktSize << " PacketSize: " << packetSize);
mReadBuf->setIndex(mReadBuf->getIndex() + packetSize);
bufferClean();
}
mReadBuf->setIndex(0);
return true;
}
capu::bool_t EtchPacketizer::bufferContainsHeader()
{
if(mReadBuf->getAvailableBytes() < HEADER_SIZE())
return false;
capu::uint32_t sig = 0;
mReadBuf->getInteger(sig);
if(sig == SIG()) {
return true;
}
ETCH_LOG_ERROR(mRuntime->getLogger(), mRuntime->getLogger().getPacketizerContext(), "EtchPacketizer: Packet SIG is incorrect - discarding package and searching for next Valid SIG.");
capu::uint32_t length = mReadBuf->getLength() - sizeof(capu::int32_t);
capu::uint32_t index = mReadBuf->getIndex();
for(capu::uint32_t i = index; i < length; ++i) {
mReadBuf->getInteger(sig);
if(sig == SIG()) {
ETCH_LOG_DEBUG(mRuntime->getLogger(), mRuntime->getLogger().getPacketizerContext(), "EtchPacketizer: Found correct SIG in Buffer after having a damaged Buffer.");
mReadBuf->setIndex(i); //we found a correct Header
bufferClean();
mReadBuf->setIndex(SIG_SIZE());
return true;
}
mReadBuf->setIndex(i+1);
}
mReadBuf->setIndex(mReadBuf->getLength() - HEADER_SIZE()); //Save the last 8 Bytes
bufferClean();
return false;
}
void EtchPacketizer::bufferClean()
{
mTempBuf->put(*mReadBuf);
mTempBuf->setIndex(0);
mReadBuf->reset();
mReadBuf->put(*mTempBuf);
mReadBuf->setIndex(0);
mTempBuf->reset();
}
EtchFlexBufferPtr EtchPacketizer::extractPacket()
{
mReadBuf->setIndex(mReadBuf->getIndex() + 4);
capu::uint32_t packetSize = 0;
mReadBuf->getInteger(packetSize);
EtchFlexBufferPtr packetData = new EtchFlexBuffer();
packetData->put(*mReadBuf, packetSize);
mReadBuf->setIndex(mReadBuf->getIndex() + packetSize);
if(mReadBuf->getAvailableBytes() > 0)
{
mTempBuf->put(*mReadBuf);
mTempBuf->setIndex(0);
}
mReadBuf->reset();
mReadBuf->put(*mTempBuf);
mReadBuf->setIndex(0);
mTempBuf->reset();
packetData->setIndex(0);
ETCH_LOG_DEBUG(mRuntime->getLogger(), mRuntime->getLogger().getPacketizerContext(), "EtchPacketizer: extracted packet - passing " << packetSize << " Bytes to Messagizer.")
return packetData;
}
EtchTransportData* EtchPacketizer::getTransport() {
return mTransport;
}