blob: 7d9e7f357242ad20a434cabf1fadcea157dfb785 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
#include <avro/platform.h>
#include <stdlib.h>
#include <string.h>
#include "avro/allocation.h"
#include "avro/consumer.h"
#include "avro/data.h"
#include "avro/errors.h"
#include "avro/legacy.h"
#include "avro/schema.h"
#include "avro_private.h"
#include "st.h"
#if !defined(DEBUG_RESOLVER)
#define DEBUG_RESOLVER 0
#endif
#if DEBUG_RESOLVER
#include <stdio.h>
#define debug(...) { fprintf(stderr, __VA_ARGS__); fprintf(stderr, "\n"); }
#else
#define debug(...) /* no debug output */
#endif
typedef struct avro_resolver_t avro_resolver_t;
struct avro_resolver_t {
avro_consumer_t parent;
/* The reader schema for this resolver. */
avro_schema_t rschema;
/* An array of any child resolvers needed for the subschemas of
* wschema */
avro_consumer_t **child_resolvers;
/* If the reader and writer schemas are records, this field
* contains a mapping from writer field indices to reader field
* indices. */
int *index_mapping;
/* The number of elements in the child_resolvers and
* index_mapping arrays. */
size_t num_children;
/* If the reader schema is a union, but the writer schema is
* not, this field indicates which branch of the reader union
* should be selected. */
int reader_union_branch;
};
/**
* Frees a resolver object, while ensuring that all of the resolvers in
* a graph of resolvers is only freed once.
*/
static void
avro_resolver_free_cycles(avro_consumer_t *consumer, st_table *freeing)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
/*
* First check if we've already started freeing this resolver.
*/
if (st_lookup(freeing, (st_data_t) resolver, NULL)) {
return;
}
/*
* Otherwise add this resolver to the freeing set, and then
* actually free the thing.
*/
st_insert(freeing, (st_data_t) resolver, (st_data_t) NULL);
avro_schema_decref(resolver->parent.schema);
avro_schema_decref(resolver->rschema);
if (resolver->child_resolvers) {
unsigned int i;
for (i = 0; i < resolver->num_children; i++) {
avro_consumer_t *child = resolver->child_resolvers[i];
if (child) {
avro_resolver_free_cycles(child, freeing);
}
}
avro_free(resolver->child_resolvers,
sizeof(avro_resolver_t *) * resolver->num_children);
}
if (resolver->index_mapping) {
avro_free(resolver->index_mapping,
sizeof(int) * resolver->num_children);
}
avro_freet(avro_resolver_t, resolver);
}
static void
avro_resolver_free(avro_consumer_t *consumer)
{
st_table *freeing = st_init_numtable();
avro_resolver_free_cycles(consumer, freeing);
st_free_table(freeing);
}
/**
* Create a new avro_resolver_t instance. You must fill in the callback
* pointers that are appropriate for the writer schema after this
* function returns.
*/
static avro_resolver_t *
avro_resolver_create(avro_schema_t wschema,
avro_schema_t rschema)
{
avro_resolver_t *resolver = (avro_resolver_t *) avro_new(avro_resolver_t);
memset(resolver, 0, sizeof(avro_resolver_t));
resolver->parent.free = avro_resolver_free;
resolver->parent.schema = avro_schema_incref(wschema);
resolver->rschema = avro_schema_incref(rschema);
resolver->reader_union_branch = -1;
return resolver;
}
static avro_datum_t
avro_resolver_get_real_dest(avro_resolver_t *resolver, avro_datum_t dest)
{
if (resolver->reader_union_branch < 0) {
/*
* The reader schema isn't a union, so use the dest
* field as-is.
*/
return dest;
}
debug("Retrieving union branch %d for %s value",
resolver->reader_union_branch,
avro_schema_type_name(resolver->parent.schema));
avro_datum_t branch = NULL;
avro_union_set_discriminant
(dest, resolver->reader_union_branch, &branch);
return branch;
}
#define skip_links(schema) \
while (is_avro_link(schema)) { \
schema = avro_schema_link_target(schema); \
}
/*-----------------------------------------------------------------------
* Memoized resolvers
*/
static avro_consumer_t *
avro_resolver_new_memoized(avro_memoize_t *mem,
avro_schema_t wschema, avro_schema_t rschema);
/*-----------------------------------------------------------------------
* Reader unions
*/
/*
* For each Avro type, we have to check whether the reader schema on its
* own is compatible, and whether the reader is a union that contains a
* compatible type. The macros in this section help us perform both of
* these checks with less code.
*/
/**
* A helper macro that handles the case where neither writer nor reader
* are unions. Uses @ref check_func to see if the two schemas are
* compatible.
*/
#define check_non_union(saved, wschema, rschema, check_func) \
do { \
avro_resolver_t *self = NULL; \
int rc = check_func(saved, &self, wschema, rschema, \
rschema); \
if (self) { \
debug("Non-union schemas %s (writer) " \
"and %s (reader) match", \
avro_schema_type_name(wschema), \
avro_schema_type_name(rschema)); \
\
self->reader_union_branch = -1; \
return &self->parent; \
} \
\
if (rc) { \
return NULL; \
} \
} while (0)
/**
* Helper macro that handles the case where the reader is a union, and
* the writer is not. Checks each branch of the reader union schema,
* looking for the first branch that is compatible with the writer
* schema. The @ref check_func argument should be a function that can
* check the compatiblity of each branch schema.
*/
#define check_reader_union(saved, wschema, rschema, check_func) \
do { \
if (!is_avro_union(rschema)) { \
break; \
} \
\
debug("Checking reader union schema"); \
size_t num_branches = avro_schema_union_size(rschema); \
unsigned int i; \
\
for (i = 0; i < num_branches; i++) { \
avro_schema_t branch_schema = \
avro_schema_union_branch(rschema, i); \
skip_links(branch_schema); \
avro_resolver_t *self = NULL; \
int rc = check_func(saved, &self, \
wschema, branch_schema, \
rschema); \
if (self) { \
debug("Reader union branch %d (%s) " \
"and writer %s match", \
i, avro_schema_type_name(branch_schema), \
avro_schema_type_name(wschema)); \
self->reader_union_branch = i; \
return &self->parent; \
} else { \
debug("Reader union branch %d (%s) " \
"doesn't match", \
i, avro_schema_type_name(branch_schema)); \
} \
\
if (rc) { \
return NULL; \
} \
} \
\
debug("No reader union branches match"); \
} while (0)
/**
* A helper macro that defines wraps together check_non_union and
* check_reader_union for a simple (non-union) writer schema type.
*/
#define check_simple_writer(saved, wschema, rschema, type_name) \
do { \
check_non_union(saved, wschema, rschema, try_##type_name); \
check_reader_union(saved, wschema, rschema, try_##type_name); \
debug("Writer %s doesn't match reader %s", \
avro_schema_type_name(wschema), \
avro_schema_type_name(rschema)); \
avro_set_error("Cannot store " #type_name " into %s", \
avro_schema_type_name(rschema)); \
return NULL; \
} while (0)
/*-----------------------------------------------------------------------
* primitives
*/
static int
avro_resolver_boolean_value(avro_consumer_t *consumer, int value,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Storing %s into %p", value? "TRUE": "FALSE", dest);
return avro_boolean_set(dest, value);
}
static int
try_boolean(avro_memoize_t *mem, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_boolean(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.boolean_value = avro_resolver_boolean_value;
}
return 0;
}
static void
free_bytes(void *ptr, size_t sz)
{
/*
* The binary encoder class allocates bytes values with an extra
* byte, so that they're NUL terminated.
*/
avro_free(ptr, sz+1);
}
static int
avro_resolver_bytes_value(avro_consumer_t *consumer,
const void *value, size_t value_len,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Storing %" PRIsz " bytes into %p", value_len, dest);
return avro_givebytes_set(dest, (const char *) value, value_len, free_bytes);
}
static int
try_bytes(avro_memoize_t *mem, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_bytes(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.bytes_value = avro_resolver_bytes_value;
}
return 0;
}
static int
avro_resolver_double_value(avro_consumer_t *consumer, double value,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Storing %le into %p", value, dest);
return avro_double_set(dest, value);
}
static int
try_double(avro_memoize_t *mem, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_double(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.double_value = avro_resolver_double_value;
}
return 0;
}
static int
avro_resolver_float_value(avro_consumer_t *consumer, float value,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Storing %e into %p", value, dest);
return avro_float_set(dest, value);
}
static int
avro_resolver_float_double_value(avro_consumer_t *consumer, float value,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Storing %e into %p (promoting float to double)", value, dest);
return avro_double_set(dest, value);
}
static int
try_float(avro_memoize_t *mem, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_float(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.float_value = avro_resolver_float_value;
}
else if (is_avro_double(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.float_value = avro_resolver_float_double_value;
}
return 0;
}
static int
avro_resolver_int_value(avro_consumer_t *consumer, int32_t value,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Storing %" PRId32 " into %p", value, dest);
return avro_int32_set(dest, value);
}
static int
avro_resolver_int_long_value(avro_consumer_t *consumer, int32_t value,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Storing %" PRId32 " into %p (promoting int to long)", value, dest);
return avro_int64_set(dest, value);
}
static int
avro_resolver_int_double_value(avro_consumer_t *consumer, int32_t value,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Storing %" PRId32 " into %p (promoting int to double)", value, dest);
return avro_double_set(dest, value);
}
static int
avro_resolver_int_float_value(avro_consumer_t *consumer, int32_t value,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Storing %" PRId32 " into %p (promoting int to float)", value, dest);
return avro_float_set(dest, (const float) value);
}
static int
try_int(avro_memoize_t *mem, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_int32(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.int_value = avro_resolver_int_value;
}
else if (is_avro_int64(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.int_value = avro_resolver_int_long_value;
}
else if (is_avro_double(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.int_value = avro_resolver_int_double_value;
}
else if (is_avro_float(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.int_value = avro_resolver_int_float_value;
}
return 0;
}
static int
avro_resolver_long_value(avro_consumer_t *consumer, int64_t value,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Storing %" PRId64 " into %p", value, dest);
return avro_int64_set(dest, value);
}
static int
avro_resolver_long_float_value(avro_consumer_t *consumer, int64_t value,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Storing %" PRId64 " into %p (promoting long to float)", value, dest);
return avro_float_set(dest, (const float) value);
}
static int
avro_resolver_long_double_value(avro_consumer_t *consumer, int64_t value,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Storing %" PRId64 " into %p (promoting long to double)", value, dest);
return avro_double_set(dest, (const double) value);
}
static int
try_long(avro_memoize_t *mem, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_int64(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.long_value = avro_resolver_long_value;
}
else if (is_avro_double(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.long_value = avro_resolver_long_double_value;
}
else if (is_avro_float(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.long_value = avro_resolver_long_float_value;
}
return 0;
}
static int
avro_resolver_null_value(avro_consumer_t *consumer, void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
AVRO_UNUSED(dest);
debug("Storing null into %p", dest);
return 0;
}
static int
try_null(avro_memoize_t *mem, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_null(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.null_value = avro_resolver_null_value;
}
return 0;
}
static int
avro_resolver_string_value(avro_consumer_t *consumer,
const void *value, size_t value_len,
void *user_data)
{
AVRO_UNUSED(value_len);
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Storing \"%s\" into %p", (const char *) value, dest);
return avro_givestring_set(dest, (const char *) value, avro_alloc_free_func);
}
static int
try_string(avro_memoize_t *mem, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
if (is_avro_string(rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.string_value = avro_resolver_string_value;
}
return 0;
}
/*-----------------------------------------------------------------------
* arrays
*/
static int
avro_resolver_array_start_block(avro_consumer_t *consumer,
int is_first_block,
unsigned int block_count,
void *user_data)
{
if (is_first_block) {
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
AVRO_UNUSED(dest);
debug("Starting array %p", dest);
}
AVRO_UNUSED(block_count);
return 0;
}
static int
avro_resolver_array_element(avro_consumer_t *consumer,
unsigned int index,
avro_consumer_t **element_consumer,
void **element_user_data,
void *user_data)
{
AVRO_UNUSED(index);
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Adding element to array %p", dest);
/*
* Allocate a new element datum and add it to the array.
*/
avro_schema_t array_schema = avro_datum_get_schema(dest);
avro_schema_t item_schema = avro_schema_array_items(array_schema);
avro_datum_t element = avro_datum_from_schema(item_schema);
avro_array_append_datum(dest, element);
avro_datum_decref(element);
/*
* Return the consumer that we allocated to process the array's
* children.
*/
*element_consumer = resolver->child_resolvers[0];
*element_user_data = element;
return 0;
}
static int
try_array(avro_memoize_t *mem, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
/*
* First verify that the reader is an array.
*/
if (!is_avro_array(rschema)) {
return 0;
}
/*
* Array schemas have to have compatible element schemas to be
* compatible themselves. Try to create an avro_resolver_t to
* check the compatibility.
*/
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
avro_schema_t witems = avro_schema_array_items(wschema);
avro_schema_t ritems = avro_schema_array_items(rschema);
avro_consumer_t *item_consumer =
avro_resolver_new_memoized(mem, witems, ritems);
if (!item_consumer) {
avro_memoize_delete(mem, wschema, root_rschema);
avro_consumer_free(&(*resolver)->parent);
avro_prefix_error("Array values aren't compatible: ");
return EINVAL;
}
/*
* The two schemas are compatible, so go ahead and create a
* GavroResolver for the array. Store the item schema's
* resolver into the child_resolvers field.
*/
(*resolver)->num_children = 1;
(*resolver)->child_resolvers = (avro_consumer_t **) avro_calloc(1, sizeof(avro_consumer_t *));
(*resolver)->child_resolvers[0] = item_consumer;
(*resolver)->parent.array_start_block = avro_resolver_array_start_block;
(*resolver)->parent.array_element = avro_resolver_array_element;
return 0;
}
/*-----------------------------------------------------------------------
* enums
*/
static int
avro_resolver_enum_value(avro_consumer_t *consumer, int value,
void *user_data)
{
AVRO_UNUSED(value);
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
const char *symbol_name = avro_schema_enum_get(resolver->parent.schema, value);
debug("Storing symbol %s into %p", symbol_name, dest);
return avro_enum_set_name(dest, symbol_name);
}
static int
try_enum(avro_memoize_t *mem, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
/*
* Enum schemas have to have the same name — but not the same
* list of symbols — to be compatible.
*/
if (is_avro_enum(rschema)) {
const char *wname = avro_schema_name(wschema);
const char *rname = avro_schema_name(rschema);
if (!strcmp(wname, rname)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.enum_value = avro_resolver_enum_value;
}
}
return 0;
}
/*-----------------------------------------------------------------------
* fixed
*/
static int
avro_resolver_fixed_value(avro_consumer_t *consumer,
const void *value, size_t value_len,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Storing (fixed) %" PRIsz " bytes into %p", value_len, dest);
return avro_givefixed_set(dest, (const char *) value, value_len, avro_alloc_free_func);
}
static int
try_fixed(avro_memoize_t *mem, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
/*
* Fixed schemas need the same name and size to be compatible.
*/
if (avro_schema_equal(wschema, rschema)) {
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
(*resolver)->parent.fixed_value = avro_resolver_fixed_value;
}
return 0;
}
/*-----------------------------------------------------------------------
* maps
*/
static int
avro_resolver_map_start_block(avro_consumer_t *consumer,
int is_first_block,
unsigned int block_count,
void *user_data)
{
if (is_first_block) {
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
AVRO_UNUSED(dest);
debug("Starting map %p", dest);
}
AVRO_UNUSED(block_count);
return 0;
}
static int
avro_resolver_map_element(avro_consumer_t *consumer,
unsigned int index,
const char *key,
avro_consumer_t **value_consumer,
void **value_user_data,
void *user_data)
{
AVRO_UNUSED(index);
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
debug("Adding element to map %p", dest);
/*
* Allocate a new element datum and add it to the map.
*/
avro_schema_t map_schema = avro_datum_get_schema(dest);
avro_schema_t value_schema = avro_schema_map_values(map_schema);
avro_datum_t value = avro_datum_from_schema(value_schema);
avro_map_set(dest, key, value);
avro_datum_decref(value);
/*
* Return the consumer that we allocated to process the map's
* children.
*/
*value_consumer = resolver->child_resolvers[0];
*value_user_data = value;
return 0;
}
static int
try_map(avro_memoize_t *mem, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
/*
* First verify that the reader is an map.
*/
if (!is_avro_map(rschema)) {
return 0;
}
/*
* Array schemas have to have compatible element schemas to be
* compatible themselves. Try to create an avro_resolver_t to
* check the compatibility.
*/
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
avro_schema_t wvalues = avro_schema_map_values(wschema);
avro_schema_t rvalues = avro_schema_map_values(rschema);
avro_consumer_t *value_consumer =
avro_resolver_new_memoized(mem, wvalues, rvalues);
if (!value_consumer) {
avro_memoize_delete(mem, wschema, root_rschema);
avro_consumer_free(&(*resolver)->parent);
avro_prefix_error("Map values aren't compatible: ");
return EINVAL;
}
/*
* The two schemas are compatible, so go ahead and create a
* GavroResolver for the map. Store the value schema's
* resolver into the child_resolvers field.
*/
(*resolver)->num_children = 1;
(*resolver)->child_resolvers = (avro_consumer_t **) avro_calloc(1, sizeof(avro_consumer_t *));
(*resolver)->child_resolvers[0] = value_consumer;
(*resolver)->parent.map_start_block = avro_resolver_map_start_block;
(*resolver)->parent.map_element = avro_resolver_map_element;
return 0;
}
/*-----------------------------------------------------------------------
* records
*/
static int
avro_resolver_record_start(avro_consumer_t *consumer,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
AVRO_UNUSED(dest);
debug("Starting record at %p", dest);
/*
* TODO: Eventually, we'll fill in default values for the extra
* reader fields here.
*/
return 0;
}
static int
avro_resolver_record_field(avro_consumer_t *consumer,
unsigned int index,
avro_consumer_t **field_consumer,
void **field_user_data,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
avro_datum_t ud_dest = (avro_datum_t) user_data;
avro_datum_t dest = avro_resolver_get_real_dest(resolver, ud_dest);
const char *field_name =
avro_schema_record_field_name(consumer->schema, index);
/*
* Grab the resolver for this field of the writer record. If
* it's NULL, this this field doesn't exist in the reader
* record, and should be skipped.
*/
debug("Retrieving resolver for writer field %i (%s)",
index, field_name);
if (!resolver->child_resolvers[index]) {
debug("Reader doesn't have field %s, skipping", field_name);
return 0;
}
/*
* TODO: Once we can retrieve record fields by index (quickly),
* use the index_mapping.
*/
avro_datum_t field = NULL;
avro_record_get(dest, field_name, &field);
*field_consumer = resolver->child_resolvers[index];
*field_user_data = field;
return 0;
}
static int
try_record(avro_memoize_t *mem, avro_resolver_t **resolver,
avro_schema_t wschema, avro_schema_t rschema,
avro_schema_t root_rschema)
{
/*
* First verify that the reader is also a record, and has the
* same name as the writer.
*/
if (!is_avro_record(rschema)) {
return 0;
}
const char *wname = avro_schema_name(wschema);
const char *rname = avro_schema_name(rschema);
if (strcmp(wname, rname)) {
return 0;
}
/*
* Categorize the fields in the record schemas. Fields that are
* only in the writer are ignored. Fields that are only in the
* reader raise a schema mismatch error, unless the field has a
* default value. Fields that are in both are resolved
* recursively.
*
* The child_resolver array will contain an avro_resolver_t for
* each field in the writer schema. To build this array, we
* loop through the fields of the reader schema. If that field
* is also in the writer schema, we resolve them recursively,
* and store the resolver into the array. If the field isn't in
* the writer schema, we raise an error. (TODO: Eventually,
* we'll handle default values here.) After this loop finishes,
* any NULLs in the child_resolver array will represent fields
* in the writer but not the reader; these fields will be
* skipped when processing the input.
*/
*resolver = avro_resolver_create(wschema, root_rschema);
avro_memoize_set(mem, wschema, root_rschema, *resolver);
size_t wfields = avro_schema_record_size(wschema);
size_t rfields = avro_schema_record_size(rschema);
debug("Checking writer record schema %s", wname);
avro_consumer_t **child_resolvers =
(avro_consumer_t **) avro_calloc(wfields, sizeof(avro_consumer_t *));
int *index_mapping = (int *) avro_calloc(wfields, sizeof(int));
unsigned int ri;
for (ri = 0; ri < rfields; ri++) {
avro_schema_t rfield =
avro_schema_record_field_get_by_index(rschema, ri);
const char *field_name =
avro_schema_record_field_name(rschema, ri);
debug("Resolving reader record field %u (%s)", ri, field_name);
/*
* See if this field is also in the writer schema.
*/
int wi = avro_schema_record_field_get_index(wschema, field_name);
if (wi == -1) {
/*
* This field isn't in the writer schema —
* that's an error! TODO: Handle default
* values!
*/
debug("Field %s isn't in writer", field_name);
avro_set_error("Reader field %s doesn't appear in writer",
field_name);
goto error;
}
/*
* Try to recursively resolve the schemas for this
* field. If they're not compatible, that's an error.
*/
avro_schema_t wfield =
avro_schema_record_field_get_by_index(wschema, wi);
avro_consumer_t *field_resolver =
avro_resolver_new_memoized(mem, wfield, rfield);
if (!field_resolver) {
avro_prefix_error("Field %s isn't compatible: ", field_name);
goto error;
}
/*
* Save the details for this field.
*/
debug("Found match for field %s (%u in reader, %d in writer)",
field_name, ri, wi);
child_resolvers[wi] = field_resolver;
index_mapping[wi] = ri;
}
/*
* We might not have found matches for all of the writer fields,
* but that's okay — any extras will be ignored.
*/
(*resolver)->num_children = wfields;
(*resolver)->child_resolvers = child_resolvers;
(*resolver)->index_mapping = index_mapping;
(*resolver)->parent.record_start = avro_resolver_record_start;
(*resolver)->parent.record_field = avro_resolver_record_field;
return 0;
error:
/*
* Clean up any consumer we might have already created.
*/
avro_memoize_delete(mem, wschema, root_rschema);
avro_consumer_free(&(*resolver)->parent);
{
unsigned int i;
for (i = 0; i < wfields; i++) {
if (child_resolvers[i]) {
avro_consumer_free(child_resolvers[i]);
}
}
}
avro_free(child_resolvers, wfields * sizeof(avro_consumer_t *));
avro_free(index_mapping, wfields * sizeof(int));
return EINVAL;
}
/*-----------------------------------------------------------------------
* union
*/
static int
avro_resolver_union_branch(avro_consumer_t *consumer,
unsigned int discriminant,
avro_consumer_t **branch_consumer,
void **branch_user_data,
void *user_data)
{
avro_resolver_t *resolver = (avro_resolver_t *) consumer;
/*
* Grab the resolver for this branch of the writer union. If
* it's NULL, then this branch is incompatible with the reader.
*/
debug("Retrieving resolver for writer branch %u", discriminant);
if (!resolver->child_resolvers[discriminant]) {
avro_set_error("Writer union branch %u is incompatible "
"with reader schema \"%s\"",
discriminant, avro_schema_type_name(resolver->rschema));
return EINVAL;
}
/*
* Return the branch's resolver.
*/
*branch_consumer = resolver->child_resolvers[discriminant];
*branch_user_data = user_data;
return 0;
}
static avro_consumer_t *
try_union(avro_memoize_t *mem, avro_schema_t wschema, avro_schema_t rschema)
{
/*
* For a writer union, we recursively try to resolve each branch
* against the reader schema. This will work correctly whether
* or not the reader is also a union — if the reader is a union,
* then we'll resolve each (non-union) writer branch against the
* reader union, which will be checked in our calls to
* check_simple_writer below. The net result is that we might
* end up trying every combination of writer and reader
* branches, when looking for compatible schemas.
*
* Regardless of what the reader schema is, for each writer
* branch, we stash away the recursive avro_resolver_t into the
* child_resolvers array. A NULL entry in this array means that
* that branch isn't compatible with the reader. This isn't an
* immediate schema resolution error, since we allow
* incompatible branches in the types as long as that branch
* never appears in the actual data. We only return an error if
* there are *no* branches that are compatible.
*/
size_t num_branches = avro_schema_union_size(wschema);
debug("Checking %" PRIsz "-branch writer union schema", num_branches);
avro_resolver_t *resolver = avro_resolver_create(wschema, rschema);
avro_memoize_set(mem, wschema, rschema, resolver);
avro_consumer_t **child_resolvers =
(avro_consumer_t **) avro_calloc(num_branches, sizeof(avro_consumer_t *));
int some_branch_compatible = 0;
unsigned int i;
for (i = 0; i < num_branches; i++) {
avro_schema_t branch_schema =
avro_schema_union_branch(wschema, i);
debug("Resolving writer union branch %u (%s)",
i, avro_schema_type_name(branch_schema));
/*
* Try to recursively resolve this branch of the writer
* union. Don't raise an error if this fails — it's
* okay for some of the branches to not be compatible
* with the reader, as long as those branches never
* appear in the input.
*/
child_resolvers[i] =
avro_resolver_new_memoized(mem, branch_schema, rschema);
if (child_resolvers[i]) {
debug("Found match for writer union branch %u", i);
some_branch_compatible = 1;
} else {
debug("No match for writer union branch %u", i);
}
}
/*
* As long as there's at least one branch that's compatible with
* the reader, then we consider this schema resolution a
* success.
*/
if (!some_branch_compatible) {
debug("No writer union branches match");
avro_set_error("No branches in the writer are compatible "
"with reader schema %s",
avro_schema_type_name(rschema));
goto error;
}
resolver->num_children = num_branches;
resolver->child_resolvers = child_resolvers;
resolver->parent.union_branch = avro_resolver_union_branch;
return &resolver->parent;
error:
/*
* Clean up any consumer we might have already created.
*/
avro_memoize_delete(mem, wschema, rschema);
avro_consumer_free(&resolver->parent);
for (i = 0; i < num_branches; i++) {
if (child_resolvers[i]) {
avro_consumer_free(child_resolvers[i]);
}
}
avro_free(child_resolvers, num_branches * sizeof(avro_consumer_t *));
return NULL;
}
/*-----------------------------------------------------------------------
* schema type dispatcher
*/
static avro_consumer_t *
avro_resolver_new_memoized(avro_memoize_t *mem,
avro_schema_t wschema, avro_schema_t rschema)
{
check_param(NULL, is_avro_schema(wschema), "writer schema");
check_param(NULL, is_avro_schema(rschema), "reader schema");
skip_links(wschema);
skip_links(rschema);
/*
* First see if we've already matched these two schemas. If so,
* just return that resolver.
*/
avro_resolver_t *saved = NULL;
if (avro_memoize_get(mem, wschema, rschema, (void **) &saved)) {
debug("Already resolved %s and %s",
avro_schema_type_name(wschema),
avro_schema_type_name(rschema));
return &saved->parent;
}
/*
* Otherwise we have some work to do.
*/
switch (avro_typeof(wschema))
{
case AVRO_BOOLEAN:
check_simple_writer(mem, wschema, rschema, boolean);
return NULL;
case AVRO_BYTES:
check_simple_writer(mem, wschema, rschema, bytes);
return NULL;
case AVRO_DOUBLE:
check_simple_writer(mem, wschema, rschema, double);
return NULL;
case AVRO_FLOAT:
check_simple_writer(mem, wschema, rschema, float);
return NULL;
case AVRO_INT32:
check_simple_writer(mem, wschema, rschema, int);
return NULL;
case AVRO_INT64:
check_simple_writer(mem, wschema, rschema, long);
return NULL;
case AVRO_NULL:
check_simple_writer(mem, wschema, rschema, null);
return NULL;
case AVRO_STRING:
check_simple_writer(mem, wschema, rschema, string);
return NULL;
case AVRO_ARRAY:
check_simple_writer(mem, wschema, rschema, array);
return NULL;
case AVRO_ENUM:
check_simple_writer(mem, wschema, rschema, enum);
return NULL;
case AVRO_FIXED:
check_simple_writer(mem, wschema, rschema, fixed);
return NULL;
case AVRO_MAP:
check_simple_writer(mem, wschema, rschema, map);
return NULL;
case AVRO_RECORD:
check_simple_writer(mem, wschema, rschema, record);
return NULL;
case AVRO_UNION:
return try_union(mem, wschema, rschema);
default:
avro_set_error("Unknown schema type");
return NULL;
}
return NULL;
}
avro_consumer_t *
avro_resolver_new(avro_schema_t wschema, avro_schema_t rschema)
{
avro_memoize_t mem;
avro_memoize_init(&mem);
avro_consumer_t *result =
avro_resolver_new_memoized(&mem, wschema, rschema);
avro_memoize_done(&mem);
return result;
}