| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| module Avro |
| class SchemaValidator |
| ROOT_IDENTIFIER = '.'.freeze |
| PATH_SEPARATOR = '.'.freeze |
| INT_RANGE = Schema::INT_MIN_VALUE..Schema::INT_MAX_VALUE |
| LONG_RANGE = Schema::LONG_MIN_VALUE..Schema::LONG_MAX_VALUE |
| COMPLEX_TYPES = [:array, :error, :map, :record, :request].freeze |
| BOOLEAN_VALUES = [true, false].freeze |
| DEFAULT_VALIDATION_OPTIONS = { recursive: true, encoded: false, fail_on_extra_fields: false }.freeze |
| RECURSIVE_SIMPLE_VALIDATION_OPTIONS = { encoded: true }.freeze |
| RUBY_CLASS_TO_AVRO_TYPE = { |
| NilClass => 'null'.freeze, |
| String => 'string'.freeze, |
| Float => 'float'.freeze, |
| Hash => 'record'.freeze |
| }.freeze |
| |
| class Result |
| def <<(error) |
| errors << error |
| end |
| |
| def add_error(path, message) |
| self << "at #{path} #{message}" |
| end |
| |
| def failure? |
| defined?(@errors) && errors.any? |
| end |
| |
| def to_s |
| failure? ? errors.join("\n") : '' |
| end |
| |
| def errors |
| # Use less memory for success results by lazily creating the errors array |
| @errors ||= [] |
| end |
| end |
| |
| class ValidationError < StandardError |
| attr_reader :result |
| |
| def initialize(result = Result.new) |
| @result = result |
| super |
| end |
| |
| def to_s |
| result.to_s |
| end |
| end |
| |
| TypeMismatchError = Class.new(ValidationError) |
| |
| class << self |
| def validate!(expected_schema, logical_datum, options = DEFAULT_VALIDATION_OPTIONS) |
| result = Result.new |
| if options.fetch(:recursive, true) |
| validate_recursive(expected_schema, logical_datum, ROOT_IDENTIFIER, result, options) |
| else |
| validate_simple(expected_schema, logical_datum, ROOT_IDENTIFIER, result, options) |
| end |
| fail ValidationError, result if result.failure? |
| result |
| end |
| |
| private |
| |
| def validate_recursive(expected_schema, logical_datum, path, result, options) |
| datum = resolve_datum(expected_schema, logical_datum, options[:encoded]) |
| |
| validate_simple(expected_schema, datum, path, result, RECURSIVE_SIMPLE_VALIDATION_OPTIONS) |
| |
| case expected_schema.type_sym |
| when :array |
| validate_array(expected_schema, datum, path, result, options) |
| when :map |
| validate_map(expected_schema, datum, path, result, options) |
| when :union |
| validate_union(expected_schema, datum, path, result, options) |
| when :record, :error, :request |
| fail TypeMismatchError unless datum.is_a?(Hash) |
| expected_schema.fields.each do |field| |
| deeper_path = deeper_path_for_hash(field.name, path) |
| nested_value = datum.key?(field.name) ? datum[field.name] : datum[field.name.to_sym] |
| validate_recursive(field.type, nested_value, deeper_path, result, options) |
| end |
| if options[:fail_on_extra_fields] |
| datum_fields = datum.keys.map(&:to_s) |
| schema_fields = expected_schema.fields.map(&:name) |
| (datum_fields - schema_fields).each do |extra_field| |
| result.add_error(path, "extra field '#{extra_field}' - not in schema") |
| end |
| end |
| end |
| rescue TypeMismatchError |
| result.add_error(path, "expected type #{expected_schema.type_sym}, got #{actual_value_message(datum)}") |
| end |
| |
| def validate_simple(expected_schema, logical_datum, path, result, options) |
| datum = resolve_datum(expected_schema, logical_datum, options[:encoded]) |
| validate_type(expected_schema) |
| |
| case expected_schema.type_sym |
| when :null |
| fail TypeMismatchError unless datum.nil? |
| when :boolean |
| fail TypeMismatchError unless BOOLEAN_VALUES.include?(datum) |
| when :string, :bytes |
| fail TypeMismatchError unless datum.is_a?(String) |
| when :int |
| fail TypeMismatchError unless datum.is_a?(Integer) |
| result.add_error(path, "out of bound value #{datum}") unless INT_RANGE.cover?(datum) |
| when :long |
| fail TypeMismatchError unless datum.is_a?(Integer) |
| result.add_error(path, "out of bound value #{datum}") unless LONG_RANGE.cover?(datum) |
| when :float, :double |
| fail TypeMismatchError unless datum.is_a?(Float) || datum.is_a?(Integer) || datum.is_a?(BigDecimal) |
| when :fixed |
| if datum.is_a? String |
| result.add_error(path, fixed_string_message(expected_schema.size, datum)) unless datum.bytesize == expected_schema.size |
| else |
| result.add_error(path, "expected fixed with size #{expected_schema.size}, got #{actual_value_message(datum)}") |
| end |
| when :enum |
| result.add_error(path, enum_message(expected_schema.symbols, datum)) unless expected_schema.symbols.include?(datum) |
| end |
| rescue TypeMismatchError |
| result.add_error(path, "expected type #{expected_schema.type_sym}, got #{actual_value_message(datum)}") |
| end |
| |
| def resolve_datum(expected_schema, logical_datum, encoded) |
| if encoded |
| logical_datum |
| else |
| expected_schema.type_adapter.encode(logical_datum) rescue nil |
| end |
| end |
| |
| def validate_type(expected_schema) |
| unless Avro::Schema::VALID_TYPES_SYM.include?(expected_schema.type_sym) |
| fail "Unexpected schema type #{expected_schema.type_sym} #{expected_schema.inspect}" |
| end |
| end |
| |
| def fixed_string_message(size, datum) |
| "expected fixed with size #{size}, got \"#{datum}\" with size #{datum.bytesize}" |
| end |
| |
| def enum_message(symbols, datum) |
| "expected enum with values #{symbols}, got #{actual_value_message(datum)}" |
| end |
| |
| def validate_array(expected_schema, datum, path, result, options) |
| fail TypeMismatchError unless datum.is_a?(Array) |
| datum.each_with_index do |d, i| |
| validate_recursive(expected_schema.items, d, "#{path}[#{i}]", result, options) |
| end |
| end |
| |
| def validate_map(expected_schema, datum, path, result, options) |
| fail TypeMismatchError unless datum.is_a?(Hash) |
| datum.keys.each do |k| |
| result.add_error(path, "unexpected key type '#{ruby_to_avro_type(k.class)}' in map") unless k.is_a?(String) |
| end |
| datum.each do |k, v| |
| deeper_path = deeper_path_for_hash(k, path) |
| validate_recursive(expected_schema.values, v, deeper_path, result, options) |
| end |
| end |
| |
| def validate_union(expected_schema, datum, path, result, options) |
| if expected_schema.schemas.size == 1 |
| validate_recursive(expected_schema.schemas.first, datum, path, result, options) |
| return |
| end |
| failures = [] |
| compatible_type = first_compatible_type(datum, expected_schema, path, failures, options) |
| return unless compatible_type.nil? |
| |
| complex_type_failed = failures.detect { |r| COMPLEX_TYPES.include?(r[:type]) } |
| if complex_type_failed |
| complex_type_failed[:result].errors.each { |error| result << error } |
| else |
| types = expected_schema.schemas.map { |s| "'#{s.type_sym}'" }.join(', ') |
| result.add_error(path, "expected union of [#{types}], got #{actual_value_message(datum)}") |
| end |
| end |
| |
| def first_compatible_type(datum, expected_schema, path, failures, options = {}) |
| expected_schema.schemas.find do |schema| |
| # Avoid expensive validation if we're just validating a nil |
| next datum.nil? if schema.type_sym == :null |
| |
| result = Result.new |
| validate_recursive(schema, datum, path, result, options) |
| failures << { type: schema.type_sym, result: result } if result.failure? |
| !result.failure? |
| end |
| end |
| |
| def deeper_path_for_hash(sub_key, path) |
| deeper_path = "#{path}#{PATH_SEPARATOR}#{sub_key}" |
| deeper_path.squeeze!(PATH_SEPARATOR) |
| deeper_path |
| end |
| |
| def actual_value_message(value) |
| avro_type = if value.is_a?(Integer) |
| ruby_integer_to_avro_type(value) |
| else |
| ruby_to_avro_type(value.class) |
| end |
| if value.nil? |
| avro_type |
| else |
| "#{avro_type} with value #{value.inspect}" |
| end |
| end |
| |
| def ruby_to_avro_type(ruby_class) |
| RUBY_CLASS_TO_AVRO_TYPE.fetch(ruby_class, ruby_class) |
| end |
| |
| def ruby_integer_to_avro_type(value) |
| INT_RANGE.cover?(value) ? 'int'.freeze : 'long'.freeze |
| end |
| end |
| end |
| end |