blob: 6e0cc6f0f2ebc3d49ebafe8a9b0bb2f2533bee50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
#include <avro/platform.h>
#include <stdlib.h>
#include <string.h>
#include "avro/allocation.h"
#include "avro/basics.h"
#include "avro/data.h"
#include "avro/io.h"
#include "avro/value.h"
#include "avro_private.h"
#include "encoding.h"
/*
* Forward declaration; this is basically the same as avro_value_read,
* but it doesn't reset dest first. (Since it will have already been
* reset in avro_value_read itself).
*/
static int
read_value(avro_reader_t reader, avro_value_t *dest);
static int
read_array_value(avro_reader_t reader, avro_value_t *dest)
{
int rval;
size_t i; /* index within the current block */
size_t index = 0; /* index within the entire array */
int64_t block_count;
int64_t block_size;
check_prefix(rval, avro_binary_encoding.
read_long(reader, &block_count),
"Cannot read array block count: ");
while (block_count != 0) {
if (block_count < 0) {
block_count = block_count * -1;
check_prefix(rval, avro_binary_encoding.
read_long(reader, &block_size),
"Cannot read array block size: ");
}
for (i = 0; i < (size_t) block_count; i++, index++) {
avro_value_t child;
check(rval, avro_value_append(dest, &child, NULL));
check(rval, read_value(reader, &child));
}
check_prefix(rval, avro_binary_encoding.
read_long(reader, &block_count),
"Cannot read array block count: ");
}
return 0;
}
static int
read_map_value(avro_reader_t reader, avro_value_t *dest)
{
int rval;
size_t i; /* index within the current block */
size_t index = 0; /* index within the entire array */
int64_t block_count;
int64_t block_size;
check_prefix(rval, avro_binary_encoding.read_long(reader, &block_count),
"Cannot read map block count: ");
while (block_count != 0) {
if (block_count < 0) {
block_count = block_count * -1;
check_prefix(rval, avro_binary_encoding.
read_long(reader, &block_size),
"Cannot read map block size: ");
}
for (i = 0; i < (size_t) block_count; i++, index++) {
char *key;
int64_t key_size;
avro_value_t child;
check_prefix(rval, avro_binary_encoding.
read_string(reader, &key, &key_size),
"Cannot read map key: ");
rval = avro_value_add(dest, key, &child, NULL, NULL);
if (rval) {
avro_free(key, key_size);
return rval;
}
rval = read_value(reader, &child);
if (rval) {
avro_free(key, key_size);
return rval;
}
avro_free(key, key_size);
}
check_prefix(rval, avro_binary_encoding.
read_long(reader, &block_count),
"Cannot read map block count: ");
}
return 0;
}
static int
read_record_value(avro_reader_t reader, avro_value_t *dest)
{
int rval;
size_t field_count;
size_t i;
avro_schema_t record_schema = avro_value_get_schema(dest);
check(rval, avro_value_get_size(dest, &field_count));
for (i = 0; i < field_count; i++) {
avro_value_t field;
check(rval, avro_value_get_by_index(dest, i, &field, NULL));
if (field.iface != NULL) {
check(rval, read_value(reader, &field));
} else {
avro_schema_t field_schema =
avro_schema_record_field_get_by_index(record_schema, i);
check(rval, avro_skip_data(reader, field_schema));
}
}
return 0;
}
static int
read_union_value(avro_reader_t reader, avro_value_t *dest)
{
int rval;
int64_t discriminant;
avro_schema_t union_schema;
int64_t branch_count;
avro_value_t branch;
check_prefix(rval, avro_binary_encoding.
read_long(reader, &discriminant),
"Cannot read union discriminant: ");
union_schema = avro_value_get_schema(dest);
branch_count = avro_schema_union_size(union_schema);
if (discriminant < 0 || discriminant >= branch_count) {
avro_set_error("Invalid union discriminant value: (%d)",
discriminant);
return 1;
}
check(rval, avro_value_set_branch(dest, discriminant, &branch));
check(rval, read_value(reader, &branch));
return 0;
}
/*
* A wrapped buffer implementation that takes control of a buffer
* allocated using avro_malloc.
*/
struct avro_wrapped_alloc {
const void *original;
size_t allocated_size;
};
static void
avro_wrapped_alloc_free(avro_wrapped_buffer_t *self)
{
struct avro_wrapped_alloc *alloc = (struct avro_wrapped_alloc *) self->user_data;
avro_free((void *) alloc->original, alloc->allocated_size);
avro_freet(struct avro_wrapped_alloc, alloc);
}
static int
avro_wrapped_alloc_new(avro_wrapped_buffer_t *dest,
const void *buf, size_t length)
{
struct avro_wrapped_alloc *alloc = (struct avro_wrapped_alloc *) avro_new(struct avro_wrapped_alloc);
if (alloc == NULL) {
return ENOMEM;
}
dest->buf = buf;
dest->size = length;
dest->user_data = alloc;
dest->free = avro_wrapped_alloc_free;
dest->copy = NULL;
dest->slice = NULL;
alloc->original = buf;
alloc->allocated_size = length;
return 0;
}
static int
read_value(avro_reader_t reader, avro_value_t *dest)
{
int rval;
switch (avro_value_get_type(dest)) {
case AVRO_BOOLEAN:
{
int8_t val;
check_prefix(rval, avro_binary_encoding.
read_boolean(reader, &val),
"Cannot read boolean value: ");
return avro_value_set_boolean(dest, val);
}
case AVRO_BYTES:
{
char *bytes;
int64_t len;
check_prefix(rval, avro_binary_encoding.
read_bytes(reader, &bytes, &len),
"Cannot read bytes value: ");
/*
* read_bytes allocates an extra byte to always
* ensure that the data is NUL terminated, but
* that byte isn't included in the length. We
* include that extra byte in the allocated
* size, but not in the length of the buffer.
*/
avro_wrapped_buffer_t buf;
check(rval, avro_wrapped_alloc_new(&buf, bytes, len+1));
buf.size--;
return avro_value_give_bytes(dest, &buf);
}
case AVRO_DOUBLE:
{
double val;
check_prefix(rval, avro_binary_encoding.
read_double(reader, &val),
"Cannot read double value: ");
return avro_value_set_double(dest, val);
}
case AVRO_FLOAT:
{
float val;
check_prefix(rval, avro_binary_encoding.
read_float(reader, &val),
"Cannot read float value: ");
return avro_value_set_float(dest, val);
}
case AVRO_INT32:
{
int32_t val;
check_prefix(rval, avro_binary_encoding.
read_int(reader, &val),
"Cannot read int value: ");
return avro_value_set_int(dest, val);
}
case AVRO_INT64:
{
int64_t val;
check_prefix(rval, avro_binary_encoding.
read_long(reader, &val),
"Cannot read long value: ");
return avro_value_set_long(dest, val);
}
case AVRO_NULL:
{
check_prefix(rval, avro_binary_encoding.
read_null(reader),
"Cannot read null value: ");
return avro_value_set_null(dest);
}
case AVRO_STRING:
{
char *str;
int64_t size;
/*
* read_string returns a size that includes the
* NUL terminator, and the free function will be
* called with a size that also includes the NUL
*/
check_prefix(rval, avro_binary_encoding.
read_string(reader, &str, &size),
"Cannot read string value: ");
avro_wrapped_buffer_t buf;
check(rval, avro_wrapped_alloc_new(&buf, str, size));
return avro_value_give_string_len(dest, &buf);
}
case AVRO_ARRAY:
return read_array_value(reader, dest);
case AVRO_ENUM:
{
int64_t val;
check_prefix(rval, avro_binary_encoding.
read_long(reader, &val),
"Cannot read enum value: ");
return avro_value_set_enum(dest, val);
}
case AVRO_FIXED:
{
avro_schema_t schema = avro_value_get_schema(dest);
char *bytes;
int64_t size = avro_schema_fixed_size(schema);
bytes = (char *) avro_malloc(size);
if (!bytes) {
avro_prefix_error("Cannot allocate new fixed value");
return ENOMEM;
}
rval = avro_read(reader, bytes, size);
if (rval) {
avro_prefix_error("Cannot read fixed value: ");
avro_free(bytes, size);
return rval;
}
avro_wrapped_buffer_t buf;
rval = avro_wrapped_alloc_new(&buf, bytes, size);
if (rval != 0) {
avro_free(bytes, size);
return rval;
}
return avro_value_give_fixed(dest, &buf);
}
case AVRO_MAP:
return read_map_value(reader, dest);
case AVRO_RECORD:
return read_record_value(reader, dest);
case AVRO_UNION:
return read_union_value(reader, dest);
default:
{
avro_set_error("Unknown schema type");
return EINVAL;
}
}
return 0;
}
int
avro_value_read(avro_reader_t reader, avro_value_t *dest)
{
int rval;
check(rval, avro_value_reset(dest));
return read_value(reader, dest);
}