# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'avro/logical_types'

module Avro
  class Schema
    # Sets of strings, for backwards compatibility. See below for sets of symbols,
    # for better performance.
    PRIMITIVE_TYPES = Set.new(%w[null boolean string bytes int long float double])
    NAMED_TYPES =     Set.new(%w[fixed enum record error])

    VALID_TYPES = PRIMITIVE_TYPES + NAMED_TYPES + Set.new(%w[array map union request])

    PRIMITIVE_TYPES_SYM = Set.new(PRIMITIVE_TYPES.map(&:to_sym))
    NAMED_TYPES_SYM     = Set.new(NAMED_TYPES.map(&:to_sym))
    VALID_TYPES_SYM     = Set.new(VALID_TYPES.map(&:to_sym))

    NAME_REGEX = /^([A-Za-z_][A-Za-z0-9_]*)(\.([A-Za-z_][A-Za-z0-9_]*))*$/

    INT_MIN_VALUE = -(1 << 31)
    INT_MAX_VALUE = (1 << 31) - 1
    LONG_MIN_VALUE = -(1 << 63)
    LONG_MAX_VALUE = (1 << 63) - 1

    def self.parse(json_string)
      real_parse(MultiJson.load(json_string), {})
    end

    # Build Avro Schema from data parsed out of JSON string.
    def self.real_parse(json_obj, names=nil, default_namespace=nil)
      if json_obj.is_a? Hash
        type = json_obj['type']
        logical_type = json_obj['logicalType']
        raise SchemaParseError, %Q(No "type" property: #{json_obj}) if type.nil?

        # Check that the type is valid before calling #to_sym, since symbols are never garbage
        # collected (important to avoid DoS if we're accepting schemas from untrusted clients)
        unless VALID_TYPES.include?(type)
          raise SchemaParseError, "Unknown type: #{type}"
        end

        type_sym = type.to_sym
        if PRIMITIVE_TYPES_SYM.include?(type_sym)
          case type_sym
          when :bytes
            precision = json_obj['precision']
            scale = json_obj['scale']
            return BytesSchema.new(type_sym, logical_type, precision, scale)
          else
            return PrimitiveSchema.new(type_sym, logical_type)
          end
        elsif NAMED_TYPES_SYM.include? type_sym
          name = json_obj['name']
          if !Avro.disable_schema_name_validation && name !~ NAME_REGEX
            raise SchemaParseError, "Name #{name} is invalid for type #{type}!"
          end
          namespace = json_obj.include?('namespace') ? json_obj['namespace'] : default_namespace
          aliases = json_obj['aliases']
          case type_sym
          when :fixed
            size = json_obj['size']
            return FixedSchema.new(name, namespace, size, names, logical_type, aliases)
          when :enum
            symbols = json_obj['symbols']
            doc     = json_obj['doc']
            default = json_obj['default']
            return EnumSchema.new(name, namespace, symbols, names, doc, default, aliases)
          when :record, :error
            fields = json_obj['fields']
            doc    = json_obj['doc']
            return RecordSchema.new(name, namespace, fields, names, type_sym, doc, aliases)
          else
            raise SchemaParseError.new("Unknown named type: #{type}")
          end

        else
          case type_sym
          when :array
            return ArraySchema.new(json_obj['items'], names, default_namespace)
          when :map
            return MapSchema.new(json_obj['values'], names, default_namespace)
          else
            raise SchemaParseError.new("Unknown Valid Type: #{type}")
          end
        end

      elsif json_obj.is_a? Array
        # JSON array (union)
        return UnionSchema.new(json_obj, names, default_namespace)
      elsif PRIMITIVE_TYPES.include? json_obj
        return PrimitiveSchema.new(json_obj)
      else
        raise UnknownSchemaError.new(json_obj)
      end
    end

    # Determine if a ruby datum is an instance of a schema
    def self.validate(expected_schema, logical_datum, options = { recursive: true, encoded: false })
      SchemaValidator.validate!(expected_schema, logical_datum, options)
      true
    rescue SchemaValidator::ValidationError
      false
    end

    def initialize(type, logical_type=nil)
      @type_sym = type.is_a?(Symbol) ? type : type.to_sym
      @logical_type = logical_type
    end

    attr_reader :type_sym
    attr_reader :logical_type

    # Returns the type as a string (rather than a symbol), for backwards compatibility.
    # Deprecated in favor of {#type_sym}.
    def type; @type_sym.to_s; end

    def type_adapter
      @type_adapter ||= LogicalTypes.type_adapter(type, logical_type) || LogicalTypes::Identity
    end

    # Returns the MD5 fingerprint of the schema as an Integer.
    def md5_fingerprint
      parsing_form = SchemaNormalization.to_parsing_form(self)
      Digest::MD5.hexdigest(parsing_form).to_i(16)
    end

    # Returns the SHA-256 fingerprint of the schema as an Integer.
    def sha256_fingerprint
      parsing_form = SchemaNormalization.to_parsing_form(self)
      Digest::SHA256.hexdigest(parsing_form).to_i(16)
    end

    CRC_EMPTY = 0xc15d213aa4d7a795

    # The java library caches this value after initialized, so this pattern
    # mimics that.
    @@fp_table = nil
    def initFPTable
      @@fp_table = Array.new(256)
      256.times do |i|
        fp = i
        8.times do
          fp = (fp >> 1) ^ ( CRC_EMPTY & -( fp & 1 ) )
        end
        @@fp_table[i] = fp
      end
    end

    def crc_64_avro_fingerprint
      parsing_form = Avro::SchemaNormalization.to_parsing_form(self)
      data_bytes = parsing_form.unpack("C*")

      initFPTable unless @@fp_table

      fp = CRC_EMPTY
      data_bytes.each do |b|
        fp = (fp >> 8) ^ @@fp_table[ (fp ^ b) & 0xff ]
      end
      fp
    end

    SINGLE_OBJECT_MAGIC_NUMBER = [0xC3, 0x01]
    def single_object_encoding_header
      [SINGLE_OBJECT_MAGIC_NUMBER, single_object_schema_fingerprint].flatten
    end
    def single_object_schema_fingerprint
      working = crc_64_avro_fingerprint
      bytes = Array.new(8)
      8.times do |i|
        bytes[i] = (working & 0xff)
        working = working >> 8
      end
      bytes
    end

    def read?(writers_schema)
      SchemaCompatibility.can_read?(writers_schema, self)
    end

    def be_read?(other_schema)
      other_schema.read?(self)
    end

    def mutual_read?(other_schema)
      SchemaCompatibility.mutual_read?(other_schema, self)
    end

    def ==(other, _seen=nil)
      other.is_a?(Schema) && type_sym == other.type_sym
    end

    def hash(_seen=nil)
      type_sym.hash
    end

    def subparse(json_obj, names=nil, namespace=nil)
      if json_obj.is_a?(String) && names
        fullname = Name.make_fullname(json_obj, namespace)
        return names[fullname] if names.include?(fullname)
      end

      begin
        Schema.real_parse(json_obj, names, namespace)
      rescue => e
        raise e if e.is_a? SchemaParseError
        raise SchemaParseError, "Sub-schema for #{self.class.name} not a valid Avro schema. Bad schema: #{json_obj}"
      end
    end

    def to_avro(_names=nil)
      props = {'type' => type}
      props['logicalType'] = logical_type if logical_type
      props
    end

    def to_s
      MultiJson.dump to_avro
    end

    def validate_aliases!
      unless aliases.nil? ||
        (aliases.is_a?(Array) && aliases.all? { |a| a.is_a?(String) })

        raise Avro::SchemaParseError,
              "Invalid aliases value #{aliases.inspect} for #{type} #{name}. Must be an array of strings."
      end
    end
    private :validate_aliases!

    class NamedSchema < Schema
      attr_reader :name, :namespace, :aliases

      def initialize(type, name, namespace=nil, names=nil, doc=nil, logical_type=nil, aliases=nil)
        super(type, logical_type)
        @name, @namespace = Name.extract_namespace(name, namespace)
        @doc = doc
        @aliases = aliases
        validate_aliases! if aliases
        Name.add_name(names, self)
      end

      def to_avro(names=Set.new)
        if @name
          return fullname if names.include?(fullname)
          names << fullname
        end
        props = {'name' => @name}
        props.merge!('namespace' => @namespace) if @namespace
        props['namespace'] = @namespace if @namespace
        props['doc'] = @doc if @doc
        props['aliases'] = aliases if aliases && aliases.any?
        super.merge props
      end

      def fullname
        @fullname ||= Name.make_fullname(@name, @namespace)
      end

      def fullname_aliases
        @fullname_aliases ||= if aliases
                                aliases.map { |a| Name.make_fullname(a, namespace) }
                              else
                                []
                              end
      end

      def match_fullname?(name)
        name == fullname || fullname_aliases.include?(name)
      end
    end

    class RecordSchema < NamedSchema
      attr_reader :fields, :doc

      def self.make_field_objects(field_data, names, namespace=nil)
        field_objects, field_names, alias_names = [], Set.new, Set.new
        field_data.each do |field|
          if field.respond_to?(:[]) # TODO(jmhodges) wtffffff
            type = field['type']
            name = field['name']
            default = field.key?('default') ? field['default'] : :no_default
            order = field['order']
            doc = field['doc']
            aliases = field['aliases']
            new_field = Field.new(type, name, default, order, names, namespace, doc, aliases)
            # make sure field name has not been used yet
            if field_names.include?(new_field.name)
              raise SchemaParseError, "Field name #{new_field.name.inspect} is already in use"
            end
            field_names << new_field.name
            # make sure alias has not be been used yet
            if new_field.aliases && alias_names.intersect?(new_field.aliases.to_set)
              raise SchemaParseError, "Alias #{(alias_names & new_field.aliases).to_a} already in use"
            end
            alias_names.merge(new_field.aliases) if new_field.aliases
          else
            raise SchemaParseError, "Not a valid field: #{field}"
          end
          field_objects << new_field
        end
        field_objects
      end

      def initialize(name, namespace, fields, names=nil, schema_type=:record, doc=nil, aliases=nil)
        if schema_type == :request || schema_type == 'request'
          @type_sym = schema_type.to_sym
          @namespace = namespace
          @name = nil
          @doc = nil
        else
          super(schema_type, name, namespace, names, doc, nil, aliases)
        end
        @fields = if fields
                    RecordSchema.make_field_objects(fields, names, self.namespace)
                  else
                    {}
                  end
      end

      def fields_hash
        @fields_hash ||= fields.inject({}){|hsh, field| hsh[field.name] = field; hsh }
      end

      def fields_by_alias
        @fields_by_alias ||= fields.each_with_object({}) do |field, hash|
          if field.aliases
            field.aliases.each do |a|
              hash[a] = field
            end
          end
        end
      end

      def to_avro(names=Set.new)
        hsh = super
        return hsh unless hsh.is_a?(Hash)
        hsh['fields'] = @fields.map {|f| f.to_avro(names) }
        if type_sym == :request
          hsh['fields']
        else
          hsh
        end
      end
    end

    class ArraySchema < Schema
      attr_reader :items

      def initialize(items, names=nil, default_namespace=nil)
        super(:array)
        @items = subparse(items, names, default_namespace)
      end

      def to_avro(names=Set.new)
        super.merge('items' => items.to_avro(names))
      end
    end

    class MapSchema < Schema
      attr_reader :values

      def initialize(values, names=nil, default_namespace=nil)
        super(:map)
        @values = subparse(values, names, default_namespace)
      end

      def to_avro(names=Set.new)
        super.merge('values' => values.to_avro(names))
      end
    end

    class UnionSchema < Schema
      attr_reader :schemas

      def initialize(schemas, names=nil, default_namespace=nil)
        super(:union)

        @schemas = schemas.each_with_object([]) do |schema, schema_objects|
          new_schema = subparse(schema, names, default_namespace)
          ns_type = new_schema.type_sym

          if VALID_TYPES_SYM.include?(ns_type) &&
              !NAMED_TYPES_SYM.include?(ns_type) &&
              schema_objects.any?{|o| o.type_sym == ns_type }
            raise SchemaParseError, "#{ns_type} is already in Union"
          elsif ns_type == :union
            raise SchemaParseError, "Unions cannot contain other unions"
          else
            schema_objects << new_schema
          end
        end
      end

      def to_avro(names=Set.new)
        schemas.map {|schema| schema.to_avro(names) }
      end
    end

    class EnumSchema < NamedSchema
      SYMBOL_REGEX = /^[A-Za-z_][A-Za-z0-9_]*$/

      attr_reader :symbols, :doc, :default

      def initialize(name, space, symbols, names=nil, doc=nil, default=nil, aliases=nil)
        if symbols.uniq.length < symbols.length
          fail_msg = "Duplicate symbol: #{symbols}"
          raise Avro::SchemaParseError, fail_msg
        end

        if !Avro.disable_enum_symbol_validation
          invalid_symbols = symbols.select { |symbol| symbol !~ SYMBOL_REGEX }

          if invalid_symbols.any?
            raise SchemaParseError,
              "Invalid symbols for #{name}: #{invalid_symbols.join(', ')} don't match #{SYMBOL_REGEX.inspect}"
          end
        end

        if default && !symbols.include?(default)
          raise Avro::SchemaParseError, "Default '#{default}' is not a valid symbol for enum #{name}"
        end

        super(:enum, name, space, names, doc, nil, aliases)
        @default = default
        @symbols = symbols
      end

      def to_avro(_names=Set.new)
        avro = super
        if avro.is_a?(Hash)
          avro['symbols'] = symbols
          avro['default'] = default if default
        end
        avro
      end
    end

    # Valid primitive types are in PRIMITIVE_TYPES.
    class PrimitiveSchema < Schema
      def initialize(type, logical_type=nil)
        if PRIMITIVE_TYPES_SYM.include?(type)
          super(type, logical_type)
        elsif PRIMITIVE_TYPES.include?(type)
          super(type.to_sym, logical_type)
        else
          raise AvroError.new("#{type} is not a valid primitive type.")
        end
      end

      def to_avro(names=nil)
        hsh = super
        hsh.size == 1 ? type : hsh
      end
    end

    class BytesSchema < PrimitiveSchema
      attr_reader :precision, :scale
      def initialize(type, logical_type=nil, precision=nil, scale=nil)
        super(type.to_sym, logical_type)
        @precision = precision
        @scale = scale
      end

      def to_avro(names=nil)
        avro = super
        return avro if avro.is_a?(String)

        avro['precision'] = precision if precision
        avro['scale'] = scale if scale
        avro
      end
    end

    class FixedSchema < NamedSchema
      attr_reader :size
      def initialize(name, space, size, names=nil, logical_type=nil, aliases=nil)
        # Ensure valid cto args
        unless size.is_a?(Integer)
          raise AvroError, 'Fixed Schema requires a valid integer for size property.'
        end
        super(:fixed, name, space, names, nil, logical_type, aliases)
        @size = size
      end

      def to_avro(names=Set.new)
        avro = super
        avro.is_a?(Hash) ? avro.merge('size' => size) : avro
      end
    end

    class Field < Schema
      attr_reader :type, :name, :default, :order, :doc, :aliases

      def initialize(type, name, default=:no_default, order=nil, names=nil, namespace=nil, doc=nil, aliases=nil)
        @type = subparse(type, names, namespace)
        @name = name
        @default = default
        @order = order
        @doc = doc
        @aliases = aliases
        validate_aliases! if aliases
        validate_default! if default? && !Avro.disable_field_default_validation
      end

      def default?
        @default != :no_default
      end

      def to_avro(names=Set.new)
        {'name' => name, 'type' => type.to_avro(names)}.tap do |avro|
          avro['default'] = default if default?
          avro['order'] = order if order
          avro['doc'] = doc if doc
        end
      end

      def alias_names
        @alias_names ||= Array(aliases)
      end

      private

      def validate_default!
        type_for_default = if type.type_sym == :union
                             type.schemas.first
                           else
                             type
                           end

        Avro::SchemaValidator.validate!(type_for_default, default)
      rescue Avro::SchemaValidator::ValidationError => e
        raise Avro::SchemaParseError, "Error validating default for #{name}: #{e.message}"
      end
    end
  end

  class SchemaParseError < AvroError; end

  class UnknownSchemaError < SchemaParseError
    attr_reader :type_name

    def initialize(type)
      @type_name = type
      super("#{type.inspect} is not a schema we know about.")
    end
  end

  module Name
    def self.extract_namespace(name, namespace)
      parts = name.split('.')
      if parts.size > 1
        namespace, name = parts[0..-2].join('.'), parts.last
      end
      return name, namespace
    end

    # Add a new schema object to the names dictionary (in place).
    def self.add_name(names, new_schema)
      new_fullname = new_schema.fullname
      if Avro::Schema::VALID_TYPES.include?(new_fullname)
        raise SchemaParseError, "#{new_fullname} is a reserved type name."
      elsif names.nil?
        names = {}
      elsif names.has_key?(new_fullname)
        raise SchemaParseError, "The name \"#{new_fullname}\" is already in use."
      end

      names[new_fullname] = new_schema
      names
    end

    def self.make_fullname(name, namespace)
      if !name.include?('.') && !namespace.nil?
        namespace + '.' + name
      else
        name
      end
    end
  end
end
