blob: 1d5b24e3c63329c79f0a762f111489b7afc6f615 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Avro
module SchemaCompatibility
# Perform a full, recursive check that a datum written using the writers_schema
# can be read using the readers_schema.
def self.can_read?(writers_schema, readers_schema)
Checker.new.can_read?(writers_schema, readers_schema)
end
# Perform a full, recursive check that a datum written using either the
# writers_schema or the readers_schema can be read using the other schema.
def self.mutual_read?(writers_schema, readers_schema)
Checker.new.mutual_read?(writers_schema, readers_schema)
end
# Perform a basic check that a datum written with the writers_schema could
# be read using the readers_schema. This check includes matching the types,
# including schema promotion, and matching the full name (including aliases) for named types.
def self.match_schemas(writers_schema, readers_schema)
w_type = writers_schema.type_sym
r_type = readers_schema.type_sym
# This conditional is begging for some OO love.
if w_type == :union || r_type == :union
return true
end
if w_type == r_type
return true if Schema::PRIMITIVE_TYPES_SYM.include?(r_type)
case r_type
when :record
return readers_schema.match_fullname?(writers_schema.fullname)
when :error
return readers_schema.match_fullname?(writers_schema.fullname)
when :request
return true
when :fixed
return readers_schema.match_fullname?(writers_schema.fullname) &&
writers_schema.size == readers_schema.size
when :enum
return readers_schema.match_fullname?(writers_schema.fullname)
when :map
return match_schemas(writers_schema.values, readers_schema.values)
when :array
return match_schemas(writers_schema.items, readers_schema.items)
end
end
# Handle schema promotion
if w_type == :int && [:long, :float, :double].include?(r_type)
return true
elsif w_type == :long && [:float, :double].include?(r_type)
return true
elsif w_type == :float && r_type == :double
return true
elsif w_type == :string && r_type == :bytes
return true
elsif w_type == :bytes && r_type == :string
return true
end
return false
end
class Checker
SIMPLE_CHECKS = Schema::PRIMITIVE_TYPES_SYM.dup.add(:fixed).freeze
attr_reader :recursion_set
private :recursion_set
def initialize
@recursion_set = Set.new
end
def can_read?(writers_schema, readers_schema)
full_match_schemas(writers_schema, readers_schema)
end
def mutual_read?(writers_schema, readers_schema)
can_read?(writers_schema, readers_schema) && can_read?(readers_schema, writers_schema)
end
private
def full_match_schemas(writers_schema, readers_schema)
return true if recursion_in_progress?(writers_schema, readers_schema)
return false unless Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema)
if writers_schema.type_sym != :union && SIMPLE_CHECKS.include?(readers_schema.type_sym)
return true
end
case readers_schema.type_sym
when :record
match_record_schemas(writers_schema, readers_schema)
when :map
full_match_schemas(writers_schema.values, readers_schema.values)
when :array
full_match_schemas(writers_schema.items, readers_schema.items)
when :union
match_union_schemas(writers_schema, readers_schema)
when :enum
# reader's symbols must contain all writer's symbols or reader has default
(writers_schema.symbols - readers_schema.symbols).empty? || !readers_schema.default.nil?
else
if writers_schema.type_sym == :union && writers_schema.schemas.size == 1
full_match_schemas(writers_schema.schemas.first, readers_schema)
else
false
end
end
end
def match_union_schemas(writers_schema, readers_schema)
raise 'readers_schema must be a union' unless readers_schema.type_sym == :union
case writers_schema.type_sym
when :union
writers_schema.schemas.all? { |writer_type| full_match_schemas(writer_type, readers_schema) }
else
readers_schema.schemas.any? { |reader_type| full_match_schemas(writers_schema, reader_type) }
end
end
def match_record_schemas(writers_schema, readers_schema)
return false if writers_schema.type_sym == :union
writer_fields_hash = writers_schema.fields_hash
readers_schema.fields.each do |field|
if writer_fields_hash.key?(field.name)
return false unless full_match_schemas(writer_fields_hash[field.name].type, field.type)
else
names = writer_fields_hash.keys & field.alias_names
if names.size > 1
return false
elsif names.size == 1
return false unless full_match_schemas(writer_fields_hash[names.first].type, field.type)
else
return false unless field.default?
end
end
end
return true
end
def recursion_in_progress?(writers_schema, readers_schema)
key = [writers_schema.object_id, readers_schema.object_id]
if recursion_set.include?(key)
true
else
recursion_set.add(key)
false
end
end
end
end
end