| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| module Avro |
| class Protocol |
| VALID_TYPE_SCHEMA_TYPES = Set.new(%w[enum record error fixed]) |
| VALID_TYPE_SCHEMA_TYPES_SYM = Set.new(VALID_TYPE_SCHEMA_TYPES.map(&:to_sym)) |
| class ProtocolParseError < Avro::AvroError; end |
| |
| attr_reader :name, :namespace, :types, :messages, :md5 |
| def self.parse(protocol_string) |
| json_data = MultiJson.load(protocol_string) |
| |
| if json_data.is_a? Hash |
| name = json_data['protocol'] |
| namespace = json_data['namespace'] |
| types = json_data['types'] |
| messages = json_data['messages'] |
| Protocol.new(name, namespace, types, messages) |
| else |
| raise ProtocolParseError, "Not a JSON object: #{json_data}" |
| end |
| end |
| |
| def initialize(name, namespace=nil, types=nil, messages=nil) |
| # Ensure valid ctor args |
| if !name |
| raise ProtocolParseError, 'Protocols must have a non-empty name.' |
| elsif !name.is_a?(String) |
| raise ProtocolParseError, 'The name property must be a string.' |
| elsif !namespace.is_a?(String) |
| raise ProtocolParseError, 'The namespace property must be a string.' |
| elsif !types.is_a?(Array) |
| raise ProtocolParseError, 'The types property must be a list.' |
| elsif !messages.is_a?(Hash) |
| raise ProtocolParseError, 'The messages property must be a JSON object.' |
| end |
| |
| @name = name |
| @namespace = namespace |
| type_names = {} |
| @types = parse_types(types, type_names) |
| @messages = parse_messages(messages, type_names) |
| @md5 = Digest::MD5.digest(to_s) |
| end |
| |
| def to_s |
| MultiJson.dump to_avro |
| end |
| |
| def ==(other) |
| to_avro == other.to_avro |
| end |
| |
| private |
| def parse_types(types, type_names) |
| type_objects = [] |
| types.collect do |type| |
| # FIXME adding type.name to type_names is not defined in the |
| # spec. Possible bug in the python impl and the spec. |
| type_object = Schema.real_parse(type, type_names, namespace) |
| unless VALID_TYPE_SCHEMA_TYPES_SYM.include?(type_object.type_sym) |
| msg = "Type #{type} not an enum, record, fixed or error." |
| raise ProtocolParseError, msg |
| end |
| type_object |
| end |
| end |
| |
| def parse_messages(messages, names) |
| message_objects = {} |
| messages.each do |name, body| |
| if message_objects.has_key?(name) |
| raise ProtocolParseError, "Message name \"#{name}\" repeated." |
| elsif !body.is_a?(Hash) |
| raise ProtocolParseError, "Message name \"#{name}\" has non-object body #{body.inspect}" |
| end |
| |
| request = body['request'] |
| response = body['response'] |
| errors = body['errors'] |
| message_objects[name] = Message.new(name, request, response, errors, names, namespace) |
| end |
| message_objects |
| end |
| |
| protected |
| def to_avro(names=Set.new) |
| hsh = {'protocol' => name} |
| hsh['namespace'] = namespace if namespace |
| hsh['types'] = types.map{|t| t.to_avro(names) } if types |
| |
| if messages |
| hsh['messages'] = messages.inject({}) {|h, (k,t)| h[k] = t.to_avro(names); h } |
| end |
| |
| hsh |
| end |
| |
| class Message |
| attr_reader :name, :request, :response, :errors, :default_namespace |
| |
| def initialize(name, request, response, errors=nil, names=nil, default_namespace=nil) |
| @name = name |
| @default_namespace = default_namespace |
| @request = parse_request(request, names) |
| @response = parse_response(response, names) |
| @errors = parse_errors(errors, names) if errors |
| end |
| |
| def to_avro(names=Set.new) |
| { |
| 'request' => request.to_avro(names), |
| 'response' => response.to_avro(names) |
| }.tap do |hash| |
| hash['errors'] = errors.to_avro(names) if errors |
| end |
| end |
| |
| def to_s |
| Yajl.dump to_avro |
| end |
| |
| def parse_request(request, names) |
| unless request.is_a?(Array) |
| raise ProtocolParseError, "Request property not an Array: #{request.inspect}" |
| end |
| Schema::RecordSchema.new(nil, default_namespace, request, names, :request) |
| end |
| |
| def parse_response(response, names) |
| if response.is_a?(String) && names |
| fullname = Name.make_fullname(response, default_namespace) |
| return names[fullname] if names.include?(fullname) |
| end |
| |
| Schema.real_parse(response, names, default_namespace) |
| end |
| |
| def parse_errors(errors, names) |
| unless errors.is_a?(Array) |
| raise ProtocolParseError, "Errors property not an Array: #{errors}" |
| end |
| Schema.real_parse(errors, names, default_namespace) |
| end |
| end |
| end |
| end |