| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| module Avro |
| module SchemaCompatibility |
| # Perform a full, recursive check that a datum written using the writers_schema |
| # can be read using the readers_schema. |
| def self.can_read?(writers_schema, readers_schema) |
| Checker.new.can_read?(writers_schema, readers_schema) |
| end |
| |
| # Perform a full, recursive check that a datum written using either the |
| # writers_schema or the readers_schema can be read using the other schema. |
| def self.mutual_read?(writers_schema, readers_schema) |
| Checker.new.mutual_read?(writers_schema, readers_schema) |
| end |
| |
| # Perform a basic check that a datum written with the writers_schema could |
| # be read using the readers_schema. This check includes matching the types, |
| # including schema promotion, and matching the full name (including aliases) for named types. |
| def self.match_schemas(writers_schema, readers_schema) |
| w_type = writers_schema.type_sym |
| r_type = readers_schema.type_sym |
| |
| # This conditional is begging for some OO love. |
| if w_type == :union || r_type == :union |
| return true |
| end |
| |
| if w_type == r_type |
| return true if Schema::PRIMITIVE_TYPES_SYM.include?(r_type) |
| |
| case r_type |
| when :record |
| return readers_schema.match_fullname?(writers_schema.fullname) |
| when :error |
| return readers_schema.match_fullname?(writers_schema.fullname) |
| when :request |
| return true |
| when :fixed |
| return readers_schema.match_fullname?(writers_schema.fullname) && |
| writers_schema.size == readers_schema.size |
| when :enum |
| return readers_schema.match_fullname?(writers_schema.fullname) |
| when :map |
| return match_schemas(writers_schema.values, readers_schema.values) |
| when :array |
| return match_schemas(writers_schema.items, readers_schema.items) |
| end |
| end |
| |
| # Handle schema promotion |
| if w_type == :int && [:long, :float, :double].include?(r_type) |
| return true |
| elsif w_type == :long && [:float, :double].include?(r_type) |
| return true |
| elsif w_type == :float && r_type == :double |
| return true |
| elsif w_type == :string && r_type == :bytes |
| return true |
| elsif w_type == :bytes && r_type == :string |
| return true |
| end |
| |
| return false |
| end |
| |
| class Checker |
| SIMPLE_CHECKS = Schema::PRIMITIVE_TYPES_SYM.dup.add(:fixed).freeze |
| |
| attr_reader :recursion_set |
| private :recursion_set |
| |
| def initialize |
| @recursion_set = Set.new |
| end |
| |
| def can_read?(writers_schema, readers_schema) |
| full_match_schemas(writers_schema, readers_schema) |
| end |
| |
| def mutual_read?(writers_schema, readers_schema) |
| can_read?(writers_schema, readers_schema) && can_read?(readers_schema, writers_schema) |
| end |
| |
| private |
| |
| def full_match_schemas(writers_schema, readers_schema) |
| return true if recursion_in_progress?(writers_schema, readers_schema) |
| |
| return false unless Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema) |
| |
| if writers_schema.type_sym != :union && SIMPLE_CHECKS.include?(readers_schema.type_sym) |
| return true |
| end |
| |
| case readers_schema.type_sym |
| when :record |
| match_record_schemas(writers_schema, readers_schema) |
| when :map |
| full_match_schemas(writers_schema.values, readers_schema.values) |
| when :array |
| full_match_schemas(writers_schema.items, readers_schema.items) |
| when :union |
| match_union_schemas(writers_schema, readers_schema) |
| when :enum |
| # reader's symbols must contain all writer's symbols or reader has default |
| (writers_schema.symbols - readers_schema.symbols).empty? || !readers_schema.default.nil? |
| else |
| if writers_schema.type_sym == :union && writers_schema.schemas.size == 1 |
| full_match_schemas(writers_schema.schemas.first, readers_schema) |
| else |
| false |
| end |
| end |
| end |
| |
| def match_union_schemas(writers_schema, readers_schema) |
| raise 'readers_schema must be a union' unless readers_schema.type_sym == :union |
| |
| case writers_schema.type_sym |
| when :union |
| writers_schema.schemas.all? { |writer_type| full_match_schemas(writer_type, readers_schema) } |
| else |
| readers_schema.schemas.any? { |reader_type| full_match_schemas(writers_schema, reader_type) } |
| end |
| end |
| |
| def match_record_schemas(writers_schema, readers_schema) |
| return false if writers_schema.type_sym == :union |
| |
| writer_fields_hash = writers_schema.fields_hash |
| readers_schema.fields.each do |field| |
| if writer_fields_hash.key?(field.name) |
| return false unless full_match_schemas(writer_fields_hash[field.name].type, field.type) |
| else |
| names = writer_fields_hash.keys & field.alias_names |
| if names.size > 1 |
| return false |
| elsif names.size == 1 |
| return false unless full_match_schemas(writer_fields_hash[names.first].type, field.type) |
| else |
| return false unless field.default? |
| end |
| end |
| end |
| |
| return true |
| end |
| |
| def recursion_in_progress?(writers_schema, readers_schema) |
| key = [writers_schema.object_id, readers_schema.object_id] |
| |
| if recursion_set.include?(key) |
| true |
| else |
| recursion_set.add(key) |
| false |
| end |
| end |
| end |
| end |
| end |