blob: 519dd71c9c93405b39e441b077ee11a7f0e61922 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "DataReader.h"
#include "qpid/amqp/CharSequence.h"
#include "qpid/amqp/Descriptor.h"
#include "qpid/log/Statement.h"
#include <string>
extern "C" {
#include <proton/engine.h>
}
namespace qpid {
namespace broker {
namespace amqp {
namespace {
qpid::amqp::CharSequence convert(pn_bytes_t in)
{
qpid::amqp::CharSequence out;
out.data = in.start;
out.size = in.size;
return out;
}
qpid::amqp::CharSequence convert(pn_uuid_t in)
{
qpid::amqp::CharSequence out;
out.data = in.bytes;
out.size = 16;
return out;
}
}
DataReader::DataReader(qpid::amqp::Reader& r) : reader(r) {}
void DataReader::read(pn_data_t* data)
{
/*
while (pn_data_next(data)) {
readOne(data);
}
*/
do {
readOne(data);
} while (pn_data_next(data));
}
void DataReader::readOne(pn_data_t* data)
{
qpid::amqp::Descriptor descriptor(0);
bool described = pn_data_is_described(data);
if (described) {
pn_data_enter(data);
pn_data_next(data);
if (pn_data_type(data) == PN_ULONG) {
descriptor = qpid::amqp::Descriptor(pn_data_get_ulong(data));
} else if (pn_data_type(data) == PN_SYMBOL) {
descriptor = qpid::amqp::Descriptor(convert(pn_data_get_symbol(data)));
} else {
QPID_LOG(notice, "Ignoring descriptor of type " << pn_data_type(data));
}
pn_data_next(data);
}
switch (pn_data_type(data)) {
case PN_NULL:
reader.onNull(described ? &descriptor : 0);
break;
case PN_BOOL:
reader.onBoolean(pn_data_get_bool(data), described ? &descriptor : 0);
break;
case PN_UBYTE:
reader.onUByte(pn_data_get_ubyte(data), described ? &descriptor : 0);
break;
case PN_BYTE:
reader.onByte(pn_data_get_byte(data), described ? &descriptor : 0);
break;
case PN_USHORT:
reader.onUShort(pn_data_get_ushort(data), described ? &descriptor : 0);
break;
case PN_SHORT:
reader.onShort(pn_data_get_short(data), described ? &descriptor : 0);
break;
case PN_UINT:
reader.onUInt(pn_data_get_uint(data), described ? &descriptor : 0);
break;
case PN_INT:
reader.onInt(pn_data_get_int(data), described ? &descriptor : 0);
break;
case PN_CHAR:
pn_data_get_char(data);
break;
case PN_ULONG:
reader.onULong(pn_data_get_ulong(data), described ? &descriptor : 0);
break;
case PN_LONG:
reader.onLong(pn_data_get_long(data), described ? &descriptor : 0);
break;
case PN_TIMESTAMP:
reader.onTimestamp(pn_data_get_timestamp(data), described ? &descriptor : 0);
break;
case PN_FLOAT:
reader.onFloat(pn_data_get_float(data), described ? &descriptor : 0);
break;
case PN_DOUBLE:
reader.onDouble(pn_data_get_double(data), described ? &descriptor : 0);
break;
case PN_DECIMAL32:
pn_data_get_decimal32(data);
break;
case PN_DECIMAL64:
pn_data_get_decimal64(data);
break;
case PN_DECIMAL128:
pn_data_get_decimal128(data);
break;
case PN_UUID:
reader.onUuid(convert(pn_data_get_uuid(data)), described ? &descriptor : 0);
break;
case PN_BINARY:
reader.onBinary(convert(pn_data_get_binary(data)), described ? &descriptor : 0);
break;
case PN_STRING:
reader.onString(convert(pn_data_get_string(data)), described ? &descriptor : 0);
break;
case PN_SYMBOL:
reader.onSymbol(convert(pn_data_get_symbol(data)), described ? &descriptor : 0);
break;
case PN_DESCRIBED:
break;
case PN_ARRAY:
readArray(data, described ? &descriptor : 0);
break;
case PN_LIST:
readList(data, described ? &descriptor : 0);
break;
case PN_MAP:
readMap(data, described ? &descriptor : 0);
break;
}
if (described) pn_data_exit(data);
}
void DataReader::readArray(pn_data_t* /*data*/, const qpid::amqp::Descriptor* /*descriptor*/)
{
//not yet implemented
}
void DataReader::readList(pn_data_t* data, const qpid::amqp::Descriptor* descriptor)
{
size_t count = pn_data_get_list(data);
reader.onStartList(count, qpid::amqp::CharSequence(), descriptor);
pn_data_enter(data);
for (size_t i = 0; i < count && pn_data_next(data); ++i) {
read(data);
}
pn_data_exit(data);
reader.onEndList(count, descriptor);
}
void DataReader::readMap(pn_data_t* data, const qpid::amqp::Descriptor* descriptor)
{
size_t count = pn_data_get_map(data);
reader.onStartMap(count, qpid::amqp::CharSequence(), descriptor);
pn_data_enter(data);
for (size_t i = 0; i < count && pn_data_next(data); ++i) {
read(data);
}
pn_data_exit(data);
reader.onEndMap(count, descriptor);
}
}}} // namespace qpid::broker::amqp