| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| # Console API for Qpid Management Framework |
| |
| require 'socket' |
| require 'monitor' |
| require 'thread' |
| require 'uri' |
| require 'time' |
| |
| module Qpid::Qmf |
| |
| # To access the asynchronous operations, a class must be derived from |
| # Console with overrides of any combination of the available methods. |
| class Console |
| |
| # Invoked when a connection is established to a broker |
| def broker_connected(broker); end |
| |
| # Invoked when the connection to a broker is lost |
| def broker_disconnected(broker); end |
| |
| # Invoked when a QMF package is discovered |
| def new_package(name); end |
| |
| # Invoked when a new class is discovered. Session.getSchema can be |
| # used to obtain details about the class |
| def new_class(kind, klass_key); end |
| |
| # Invoked when a QMF agent is discovered |
| def new_agent(agent); end |
| |
| # Invoked when a QMF agent disconects |
| def del_agent(agent); end |
| |
| # Invoked when an object is updated |
| def object_props(broker, record); end |
| |
| # Invoked when an object is updated |
| def object_stats(broker, record); end |
| |
| # Invoked when an event is raised |
| def event(broker, event); end |
| |
| # Invoked when an agent heartbeat is received. |
| def heartbeat(agent, timestamp); end |
| |
| # Invoked when the connection sequence reaches the point where broker information is available. |
| def broker_info(broker); end |
| |
| # Invoked when a method response from an asynchronous method call is received. |
| def method_response(broker, seq, response); end |
| end |
| |
| class BrokerURL |
| |
| attr_reader :host, :port, :auth_name, :auth_pass |
| |
| def initialize(text) |
| uri = URI.parse(text) |
| |
| @host = uri.host |
| @port = uri.port ? uri.port : 5672 |
| @auth_name = uri.user |
| @auth_pass = uri.password |
| |
| return uri |
| end |
| |
| def name |
| "#{@host}:#{@port}" |
| end |
| |
| def match(host, port) |
| # FIXME: Unlcear what the Python code is actually checking for |
| # here, especially since HOST can resolve to multiple IP's |
| @port == port && |
| (host == @host || ipaddr(host, port) == ipaddr(@host, @port)) |
| end |
| |
| private |
| def ipaddr(host, port) |
| s = Socket::getaddrinfo(host, port, |
| Socket::AF_INET, Socket::SOCK_STREAM) |
| s[0][2] |
| end |
| end |
| |
| # An instance of the Session class represents a console session running |
| # against one or more QMF brokers. A single instance of Session is |
| # needed to interact with the management framework as a console. |
| class Session |
| CONTEXT_SYNC = 1 |
| CONTEXT_STARTUP = 2 |
| CONTEXT_MULTIGET = 3 |
| |
| DEFAULT_GET_WAIT_TIME = 60 |
| |
| include MonitorMixin |
| |
| attr_reader :binding_key_list, :select, :seq_mgr, :console, :packages |
| |
| # Initialize a session. If the console argument is provided, the |
| # more advanced asynchronous features are available. If console is |
| # defaulted, the session will operate in a simpler, synchronous |
| # manner. The rcvObjects, rcvEvents, and rcvHeartbeats arguments |
| # are meaningful only if 'console' is provided. They control |
| # whether object updates, events, and agent-heartbeats are |
| # subscribed to. If the console is not interested in receiving one |
| # or more of the above, setting the argument to False will reduce |
| # tha bandwidth used by the API. If manageConnections is set to |
| # True, the Session object will manage connections to the brokers. |
| # This means that if a broker is unreachable, it will retry until a |
| # connection can be established. If a connection is lost, the |
| # Session will attempt to reconnect. |
| # |
| # If manageConnections is set to False, the user is responsible for |
| # handing failures. In this case, an unreachable broker will cause |
| # addBroker to raise an exception. If userBindings is set to False |
| # (the default) and rcvObjects is True, the console will receive |
| # data for all object classes. If userBindings is set to True, the |
| # user must select which classes the console shall receive by |
| # invoking the bindPackage or bindClass methods. This allows the |
| # console to be configured to receive only information that is |
| # relavant to a particular application. If rcvObjects id False, |
| # userBindings has no meaning. |
| # |
| # Accept a hash of parameters, where keys can be :console, |
| # :rcv_objects, :rcv_events, :rcv_heartbeats, :manage_connections, |
| # and :user_bindings |
| def initialize(kwargs = {}) |
| super() |
| @console = kwargs[:console] || nil |
| @brokers = [] |
| @packages = {} |
| @seq_mgr = SequenceManager.new |
| @cv = new_cond |
| @sync_sequence_list = [] |
| @result = [] |
| @select = [] |
| @error = nil |
| @rcv_objects = kwargs[:rcv_objects] == nil ? true : kwargs[:rcv_objects] |
| @rcv_events = kwargs[:rcv_events] == nil ? true : kwargs[:rcv_events] |
| @rcv_heartbeats = kwargs[:rcv_heartbeats] == nil ? true : kwargs[:rcv_heartbeats] |
| @user_bindings = kwargs[:user_bindings] == nil ? false : kwargs[:user_bindings] |
| unless @console |
| @rcv_objects = false |
| @rcv_events = false |
| @rcv_heartbeats = false |
| end |
| @binding_key_list = binding_keys |
| @manage_connections = kwargs[:manage_connections] || false |
| |
| if @user_bindings && ! @rcv_objects |
| raise ArgumentError, "user_bindings can't be set unless rcv_objects is set and a console is provided" |
| end |
| |
| end |
| |
| def to_s |
| "QMF Console Session Manager (brokers: #{@brokers.size})" |
| end |
| |
| def managedConnections? |
| return @manage_connections |
| end |
| |
| # Connect to a Qpid broker. Returns an object of type Broker |
| # |
| # To supply a username for authentication, use the URL syntax: |
| # |
| # amqp://username@hostname:port |
| # |
| # If the broker needs a password for the client, an interactive prompt will be |
| # provided to the user. |
| # |
| # To supply a username and a password, use |
| # |
| # amqp://username:password@hostname:port |
| # |
| # The following keyword arguments may be used to control authentication: |
| # |
| # :mechanism - SASL mechanism (i.e. "PLAIN", "GSSAPI", "ANONYMOUS", etc. |
| # - defaults to unspecified (the system chooses for you) |
| # :service - SASL service name (i.e. the kerberos principal of the broker) |
| # - defaults to "qpidd" |
| # :min_ssf - Minimum Security Strength Factor for SASL security layers |
| # - defaults to 0 |
| # :max_ssf - Maximum Security Strength Factor for SASL security layers |
| # - defaults to 65535 |
| # |
| def add_broker(target = "amqp://localhost", kwargs = {}) |
| url = BrokerURL.new(target) |
| broker = Broker.new(self, url.host, url.port, url.auth_name, url.auth_pass, kwargs) |
| unless broker.connected? || @manage_connections |
| raise broker.error |
| end |
| |
| @brokers << broker |
| objects(:broker => broker, :class => "agent") unless @manage_connections |
| return broker |
| end |
| |
| # Disconnect from a broker. The 'broker' argument is the object |
| # returned from the addBroker call |
| def del_broker(broker) |
| broker.shutdown |
| @brokers.delete(broker) |
| end |
| |
| # Get the list of known classes within a QMF package |
| def classes(package_name) |
| list = [] |
| @brokers.each { |broker| broker.wait_for_stable } |
| if @packages.include?(package_name) |
| # FIXME What's the actual structure of @packages[package_name] |
| @packages[package_name].each do |key, schema_class| |
| list << schema_class.klass_key |
| end |
| end |
| return list |
| end |
| |
| # Get the schema for a QMF class |
| def schema(klass_key) |
| @brokers.each { |broker| broker.wait_for_stable } |
| if @packages.include?(klass_key.package) |
| @packages[klass_key.package][ [klass_key.klass_name, klass_key.hash] ] |
| end |
| end |
| |
| def bind_package(package_name) |
| unless @user_bindings && @rcv_objects |
| raise "userBindings option not set for Session" |
| end |
| @brokers.each do |broker| |
| args = { :exchange => "qpid.management", |
| :queue => broker.topic_name, |
| :binding_key => "console.obj.*.*.#{package_name}.#" } |
| broker.amqp_session.exchange_bind(args) |
| end |
| end |
| |
| def bind_class(package_name, class_name) |
| unless @user_bindings && @rcv_objects |
| raise "userBindings option not set for Session" |
| end |
| @brokers.each do |broker| |
| args = { :exchange => "qpid.management", |
| :queue => broker.topic_name, |
| :binding_key=> "console.obj.*.*.#{package_name}.#{class_name}.#" } |
| broker.amqp_session.exchange_bind(args) |
| end |
| end |
| |
| def bind_class_key(klass_key) |
| unless @user_bindings && @rcv_objects |
| raise "userBindings option not set for Session" |
| end |
| pname, cname, hash = klass_key.to_a() |
| @brokers.each do |broker| |
| args = { :exchange => "qpid.management", |
| :queue => broker.topic_name, |
| :binding_key => "console.obj.*.*.#{pname}.#{cname}.#" } |
| broker.amqp_session.exchange_bind(args) |
| end |
| end |
| |
| # Get a list of currently known agents |
| def agents(broker=nil) |
| broker_list = [] |
| if broker.nil? |
| broker_list = @brokers.dup |
| else |
| broker_list << broker |
| end |
| broker_list.each { |b| b.wait_for_stable } |
| agent_list = [] |
| broker_list.each { |b| agent_list += b.agents } |
| return agent_list |
| end |
| |
| # Get a list of objects from QMF agents. |
| # All arguments are passed by name(keyword). |
| # |
| # The class for queried objects may be specified in one of the |
| # following ways: |
| # :schema => <schema> - supply a schema object returned from getSchema. |
| # :key => <key> - supply a klass_key from the list returned by getClasses. |
| # :class => <name> - supply a class name as a string. If the class name exists |
| # in multiple packages, a _package argument may also be supplied. |
| # :object_id = <id> - get the object referenced by the object-id |
| # |
| # If objects should be obtained from only one agent, use the following argument. |
| # Otherwise, the query will go to all agents. |
| # |
| # :agent = <agent> - supply an agent from the list returned by getAgents. |
| # |
| # If the get query is to be restricted to one broker (as opposed to |
| # all connected brokers), add the following argument: |
| # |
| # :broker = <broker> - supply a broker as returned by addBroker. |
| # |
| # The default timeout for this synchronous operation is 60 seconds. To change the timeout, |
| # use the following argument: |
| # |
| # :timeout = <time in seconds> |
| # |
| # If additional arguments are supplied, they are used as property |
| # selectors, as long as their keys are strings. For example, if |
| # the argument "name" => "test" is supplied, only objects whose |
| # "name" property is "test" will be returned in the result. |
| def objects(kwargs) |
| if kwargs.include?(:broker) |
| broker_list = [] |
| broker_list << kwargs[:broker] |
| else |
| broker_list = @brokers |
| end |
| broker_list.each { |broker| |
| broker.wait_for_stable |
| if kwargs[:package] != "org.apache.qpid.broker" or kwargs[:class] != "agent" |
| objects(:agent => broker.agent(1,0), :package => "org.apache.qpid.broker", :class => "agent") if broker.connected? |
| end |
| } |
| |
| agent_list = [] |
| if kwargs.include?(:agent) |
| agent = kwargs[:agent] |
| unless broker_list.include?(agent.broker) |
| raise ArgumentError, "Supplied agent is not accessible through the supplied broker" |
| end |
| agent_list << agent if agent.broker.connected? |
| else |
| if kwargs.include?(:object_id) |
| oid = kwargs[:object_id] |
| broker_list.each { |broker| |
| broker.agents.each { |agent| |
| if oid.broker_bank == agent.broker_bank && oid.agent_bank == agent.agent_bank |
| agent_list << agent if agent.broker.connected? |
| end |
| } |
| } |
| else |
| broker_list.each { |broker| |
| agent_list += broker.agents if broker.connected? |
| } |
| end |
| end |
| |
| cname = nil |
| if kwargs.include?(:schema) |
| # FIXME: What kind of object is kwargs[:schema] |
| pname, cname, hash = kwargs[:schema].getKey().to_a |
| elsif kwargs.include?(:key) |
| pname, cname, hash = kwargs[:key].to_a |
| elsif kwargs.include?(:class) |
| pname, cname, hash = [kwargs[:package], kwargs[:class], nil] |
| end |
| if cname.nil? && ! kwargs.include?(:object_id) |
| raise ArgumentError, |
| "No class supplied, use :schema, :key, :class, or :object_id' argument" |
| end |
| |
| map = {} |
| @select = [] |
| if kwargs.include?(:object_id) |
| map["_objectid"] = kwargs[:object_id].to_s |
| else |
| map["_class"] = cname |
| map["_package"] = pname if pname |
| map["_hash"] = hash if hash |
| kwargs.each do |k,v| |
| @select << [k, v] if k.is_a?(String) |
| end |
| end |
| |
| @result = [] |
| agent_list.each do |agent| |
| broker = agent.broker |
| send_codec = Qpid::StringCodec.new(broker.conn.spec) |
| seq = nil |
| synchronize do |
| seq = @seq_mgr.reserve(CONTEXT_MULTIGET) |
| @sync_sequence_list << seq |
| end |
| broker.set_header(send_codec, ?G, seq) |
| send_codec.write_map(map) |
| bank_key = "%d.%d" % [broker.broker_bank, agent.agent_bank] |
| smsg = broker.message(send_codec.encoded, "agent.#{bank_key}") |
| broker.emit(smsg) |
| end |
| |
| timeout = false |
| if kwargs.include?(:timeout) |
| wait_time = kwargs[:timeout] |
| else |
| wait_time = DEFAULT_GET_WAIT_TIME |
| end |
| synchronize do |
| unless @cv.wait_for(wait_time) { @sync_sequence_list.empty? || @error } |
| @sync_sequence_list.each do |pending_seq| |
| @seq_mgr.release(pending_seq) |
| end |
| @sync_sequence_list = [] |
| timeout = true |
| end |
| end |
| |
| if @error |
| errorText = @error |
| @error = nil |
| raise errorText |
| end |
| |
| if @result.empty? && timeout |
| raise "No agent responded within timeout period" |
| end |
| @result |
| end |
| |
| # Return one and only one object or nil. |
| def object(kwargs) |
| objs = objects(kwargs) |
| return objs.length == 1 ? objs[0] : nil |
| end |
| |
| # Return the first of potentially many objects. |
| def first_object(kwargs) |
| objs = objects(kwargs) |
| return objs.length > 0 ? objs[0] : nil |
| end |
| |
| def set_event_filter(kwargs); end |
| |
| def handle_broker_connect(broker); end |
| |
| def handle_broker_resp(broker, codec, seq) |
| broker.broker_id = codec.read_uuid |
| @console.broker_info(broker) if @console |
| |
| # Send a package request |
| # (effectively inc and dec outstanding by not doing anything) |
| send_codec = Qpid::StringCodec.new(broker.conn.spec) |
| seq = @seq_mgr.reserve(CONTEXT_STARTUP) |
| broker.set_header(send_codec, ?P, seq) |
| smsg = broker.message(send_codec.encoded) |
| broker.emit(smsg) |
| end |
| |
| def handle_package_ind(broker, codec, seq) |
| pname = codec.read_str8 |
| new_package = false |
| synchronize do |
| new_package = ! @packages.include?(pname) |
| @packages[pname] = {} if new_package |
| end |
| @console.new_package(pname) if @console |
| |
| # Send a class request |
| broker.inc_outstanding |
| send_codec = Qpid::StringCodec.new(broker.conn.spec) |
| seq = @seq_mgr.reserve(CONTEXT_STARTUP) |
| broker.set_header(send_codec, ?Q, seq) |
| send_codec.write_str8(pname) |
| smsg = broker.message(send_codec.encoded) |
| broker.emit(smsg) |
| end |
| |
| def handle_command_complete(broker, codec, seq) |
| code = codec.read_uint32 |
| text = codec.read_str8 |
| context = @seq_mgr.release(seq) |
| if context == CONTEXT_STARTUP |
| broker.dec_outstanding |
| elsif context == CONTEXT_SYNC && seq == broker.sync_sequence |
| broker.sync_done |
| elsif context == CONTEXT_MULTIGET && @sync_sequence_list.include?(seq) |
| synchronize do |
| @sync_sequence_list.delete(seq) |
| @cv.signal if @sync_sequence_list.empty? |
| end |
| end |
| end |
| |
| def handle_class_ind(broker, codec, seq) |
| kind = codec.read_uint8 |
| classKey = ClassKey.new(codec) |
| unknown = false |
| |
| synchronize do |
| return unless @packages.include?(classKey.package) |
| unknown = true unless @packages[classKey.package].include?([classKey.klass_name, classKey.hash]) |
| end |
| |
| |
| if unknown |
| # Send a schema request for the unknown class |
| broker.inc_outstanding |
| send_codec = Qpid::StringCodec.new(broker.conn.spec) |
| seq = @seq_mgr.reserve(CONTEXT_STARTUP) |
| broker.set_header(send_codec, ?S, seq) |
| classKey.encode(send_codec) |
| smsg = broker.message(send_codec.encoded) |
| broker.emit(smsg) |
| end |
| end |
| |
| def handle_method_resp(broker, codec, seq) |
| code = codec.read_uint32 |
| text = codec.read_str16 |
| out_args = {} |
| pair = @seq_mgr.release(seq) |
| return unless pair |
| method, synchronous = pair |
| if code == 0 |
| method.arguments.each do |arg| |
| if arg.dir.index(?O) |
| out_args[arg.name] = decode_value(codec, arg.type) |
| end |
| end |
| end |
| result = MethodResult.new(code, text, out_args) |
| if synchronous: |
| broker.synchronize do |
| broker.sync_result = MethodResult.new(code, text, out_args) |
| broker.sync_done |
| end |
| else |
| @console.method_response(broker, seq, result) if @console |
| end |
| end |
| |
| def handle_heartbeat_ind(broker, codec, seq, msg) |
| if @console |
| broker_bank = 1 |
| agent_bank = 0 |
| dp = msg.get("delivery_properties") |
| if dp |
| key = dp["routing_key"] |
| key_elements = key.split(".") |
| if key_elements.length == 4 |
| broker_bank = key_elements[2].to_i |
| agent_bank = key_elements[3].to_i |
| end |
| end |
| agent = broker.agent(broker_bank, agent_bank) |
| timestamp = codec.read_uint64 |
| @console.heartbeat(agent, timestamp) if agent |
| end |
| end |
| |
| def handle_event_ind(broker, codec, seq) |
| if @console |
| event = Event.new(self, broker, codec) |
| @console.event(broker, event) |
| end |
| end |
| |
| def handle_schema_resp(broker, codec, seq) |
| kind = codec.read_uint8 |
| classKey = ClassKey.new(codec) |
| klass = SchemaClass.new(self, kind, classKey, codec) |
| synchronize { @packages[classKey.package][ [classKey.klass_name, classKey.hash] ] = klass } |
| |
| @seq_mgr.release(seq) |
| broker.dec_outstanding |
| @console.new_class(kind, classKey) if @console |
| end |
| |
| def handle_content_ind(broker, codec, seq, prop=false, stat=false) |
| klass_key = ClassKey.new(codec) |
| pname, cname, hash = klass_key.to_a() ; |
| |
| schema = nil |
| synchronize do |
| return unless @packages.include?(klass_key.package) |
| return unless @packages[klass_key.package].include?([klass_key.klass_name, klass_key.hash]) |
| schema = @packages[klass_key.package][ [klass_key.klass_name, klass_key.hash] ] |
| end |
| |
| |
| object = Qpid::Qmf::Object.new(self, broker, schema, codec, prop, stat) |
| if pname == "org.apache.qpid.broker" && cname == "agent" && prop |
| broker.update_agent(object) |
| end |
| |
| synchronize do |
| if @sync_sequence_list.include?(seq) |
| if object.timestamps()[2] == 0 && select_match(object) |
| @result << object |
| end |
| return |
| end |
| end |
| |
| @console.object_props(broker, object) if @console && @rcv_objects && prop |
| @console.object_stats(broker, object) if @console && @rcv_objects && stat |
| end |
| |
| def handle_broker_disconnect(broker); end |
| |
| def handle_error(error) |
| synchronize do |
| @error = error if @sync_sequence_list.length > 0 |
| @sync_sequence_list = [] |
| @cv.signal |
| end |
| end |
| |
| # Decode, from the codec, a value based on its typecode |
| def decode_value(codec, typecode) |
| case typecode |
| when 1: data = codec.read_uint8 # U8 |
| when 2: data = codec.read_uint16 # U16 |
| when 3: data = codec.read_uint32 # U32 |
| when 4: data = codec.read_uint64 # U64 |
| when 6: data = codec.read_str8 # SSTR |
| when 7: data = codec.read_str16 # LSTR |
| when 8: data = codec.read_int64 # ABSTIME |
| when 9: data = codec.read_uint64 # DELTATIME |
| when 10: data = ObjectId.new(codec) # REF |
| when 11: data = codec.read_uint8 != 0 # BOOL |
| when 12: data = codec.read_float # FLOAT |
| when 13: data = codec.read_double # DOUBLE |
| when 14: data = codec.read_uuid # UUID |
| when 15: data = codec.read_map # FTABLE |
| when 16: data = codec.read_int8 # S8 |
| when 17: data = codec.read_int16 # S16 |
| when 18: data = codec.read_int32 # S32 |
| when 19: data = codec.read_int64 # S64 |
| when 20: # Object |
| inner_type_code = codec.read_uint8() |
| if (inner_type_code == 20) |
| classKey = ClassKey.new(codec) |
| innerSchema = schema(classKey) |
| data = Object.new(self, @broker, innerSchema, codec, true, true, false) if innerSchema |
| else |
| data = decode_value(codec, inner_type_code) |
| end |
| when 21: |
| data = [] |
| rec_codec = Qpid::StringCodec.new(codec.spec, codec.read_vbin32()) |
| count = rec_codec.read_uint32() |
| while count > 0 do |
| type = rec_codec.read_uint8() |
| data << (decode_value(rec_codec,type)) |
| count -= 1 |
| end |
| when 22: |
| data = [] |
| rec_codec = Qpid::StringCodec.new(codec.spec, codec.read_vbin32()) |
| count = rec_codec.read_uint32() |
| type = rec_codec.read_uint8() |
| while count > 0 do |
| data << (decode_value(rec_codec,type)) |
| count -= 1 |
| end |
| else |
| raise ArgumentError, "Invalid type code: #{typecode} - #{typecode.inspect}" |
| end |
| return data |
| end |
| |
| ENCODINGS = { |
| String => 6, |
| Fixnum => 18, |
| Bignum => 19, |
| Float => 12, |
| Array => 21, |
| Hash => 15 |
| } |
| |
| def encoding(object) |
| klass = object.class |
| if ENCODINGS.has_key?(klass) |
| return ENCODINGS[klass] |
| end |
| for base in klass.__bases__ |
| result = encoding(base) |
| return result unless result.nil? |
| end |
| end |
| |
| # Encode, into the codec, a value based on its typecode |
| def encode_value(codec, value, typecode) |
| # FIXME: Python does a lot of magic type conversions |
| # We just assume that value has the right type; this is safer |
| # than coercing explicitly, since Array::pack will complain |
| # loudly about various type errors |
| case typecode |
| when 1: codec.write_uint8(value) # U8 |
| when 2: codec.write_uint16(value) # U16 |
| when 3: codec.write_uint32(value) # U32 |
| when 4: codec.write_uint64(value) # U64 |
| when 6: codec.write_str8(value) # SSTR |
| when 7: codec.write_str16(value) # LSTR |
| when 8: codec.write_int64(value) # ABSTIME |
| when 9: codec.write_uint64(value) # DELTATIME |
| when 10: value.encode(codec) # REF |
| when 11: codec.write_uint8(value ? 1 : 0) # BOOL |
| when 12: codec.write_float(value) # FLOAT |
| when 13: codec.write_double(value) # DOUBLE |
| when 14: codec.write_uuid(value) # UUID |
| when 15: codec.write_map(value) # FTABLE |
| when 16: codec.write_int8(value) # S8 |
| when 17: codec.write_int16(value) # S16 |
| when 18: codec.write_int32(value) # S32 |
| when 19: codec.write_int64(value) # S64 |
| when 20: value.encode(codec) |
| when 21: # List |
| send_codec = Qpid::StringCodec.new(codec.spec) |
| encode_value(send_codec, value.size, 3) |
| value.each do v |
| ltype = encoding(v) |
| encode_value(send_codec,ltype,1) |
| encode_value(send_codec,v,ltype) |
| end |
| codec.write_vbin32(send_codec.encoded) |
| when 22: # Array |
| send_codec = Qpid::StringCodec.new(codec.spec) |
| encode_value(send_codec, value.size, 3) |
| if value.size > 0 |
| ltype = encoding(value[0]) |
| encode_value(send_codec,ltype,1) |
| value.each do v |
| encode_value(send_codec,v,ltype) |
| end |
| end |
| codec.write_vbin32(send_codec.encoded) |
| else |
| raise ValueError, "Invalid type code: %d" % typecode |
| end |
| end |
| |
| def display_value(value, typecode) |
| case typecode |
| when 1: return value.to_s |
| when 2: return value.to_s |
| when 3: return value.to_s |
| when 4: return value.to_s |
| when 6: return value.to_s |
| when 7: return value.to_s |
| when 8: return strftime("%c", gmtime(value / 1000000000)) |
| when 9: return value.to_s |
| when 10: return value.to_s |
| when 11: return value ? 'T' : 'F' |
| when 12: return value.to_s |
| when 13: return value.to_s |
| when 14: return Qpid::UUID::format(value) |
| when 15: return value.to_s |
| when 16: return value.to_s |
| when 17: return value.to_s |
| when 18: return value.to_s |
| when 19: return value.to_s |
| when 20: return value.to_s |
| when 21: return value.to_s |
| when 22: return value.to_s |
| else |
| raise ValueError, "Invalid type code: %d" % typecode |
| end |
| end |
| |
| private |
| |
| def binding_keys |
| key_list = [] |
| key_list << "schema.#" |
| if @rcv_objects && @rcv_events && @rcv_heartbeats && |
| ! @user_bindings |
| key_list << "console.#" |
| else |
| if @rcv_objects && ! @user_bindings |
| key_list << "console.obj.#" |
| else |
| key_list << "console.obj.*.*.org.apache.qpid.broker.agent" |
| end |
| key_list << "console.event.#" if @rcv_events |
| key_list << "console.heartbeat.#" if @rcv_heartbeats |
| end |
| return key_list |
| end |
| |
| # Check the object against select to check for a match |
| def select_match(object) |
| select.each do |key, value| |
| object.properties.each do |prop, propval| |
| return false if key == prop.name && value != propval |
| end |
| end |
| return true |
| end |
| |
| end |
| |
| class Package |
| attr_reader :name |
| |
| def initialize(name) |
| @name = name |
| end |
| end |
| |
| # A ClassKey uniquely identifies a class from the schema. |
| class ClassKey |
| attr_reader :package, :klass_name, :hash |
| |
| def initialize(package="", klass_name="", hash=0) |
| if (package.kind_of?(Qpid::Codec)) |
| @package = package.read_str8() |
| @klass_name = package.read_str8() |
| @hash = package.read_bin128() |
| else |
| @package = package |
| @klass_name = klass_name |
| @hash = hash |
| end |
| end |
| |
| def encode(codec) |
| codec.write_str8(@package) |
| codec.write_str8(@klass_name) |
| codec.write_bin128(@hash) |
| end |
| |
| def to_a() |
| return [@package, @klass_name, @hash] |
| end |
| |
| def hash_string() |
| "%08x-%08x-%08x-%08x" % hash.unpack("NNNN") |
| end |
| |
| def to_s() |
| return "#{@package}:#{@klass_name}(#{hash_string()})" |
| end |
| end |
| |
| class SchemaClass |
| |
| CLASS_KIND_TABLE = 1 |
| CLASS_KIND_EVENT = 2 |
| |
| attr_reader :klass_key, :arguments, :super_klass_key |
| |
| def initialize(session, kind, key, codec) |
| @session = session |
| @kind = kind |
| @klass_key = key |
| @super_klass_key = nil |
| @properties = [] |
| @statistics = [] |
| @methods = [] |
| @arguments = [] |
| |
| has_supertype = 0 #codec.read_uint8 |
| if @kind == CLASS_KIND_TABLE |
| prop_count = codec.read_uint16 |
| stat_count = codec.read_uint16 |
| method_count = codec.read_uint16 |
| if has_supertype == 1 |
| @super_klass_key = ClassKey.new(codec) |
| end |
| prop_count.times { |idx| |
| @properties << SchemaProperty.new(codec) } |
| stat_count.times { |idx| |
| @statistics << SchemaStatistic.new(codec) } |
| method_count.times { |idx| |
| @methods<< SchemaMethod.new(codec) } |
| elsif @kind == CLASS_KIND_EVENT |
| arg_count = codec.read_uint16 |
| arg_count.times { |idx| |
| sa = SchemaArgument.new(codec, false) |
| @arguments << sa |
| } |
| end |
| end |
| |
| def is_table? |
| @kind == CLASS_KIND_TABLE |
| end |
| |
| def is_event? |
| @kind == CLASS_KIND_EVENT |
| end |
| |
| def properties(include_inherited = true) |
| returnValue = @properties |
| if !@super_klass_key.nil? && include_inherited |
| returnValue = @properties + @session.schema(@super_klass_key).properties |
| end |
| return returnValue |
| end |
| |
| def statistics(include_inherited = true) |
| returnValue = @statistics |
| if !@super_klass_key.nil? && include_inherited |
| returnValue = @statistics + @session.schema(@super_klass_key).statistics |
| end |
| return returnValue |
| end |
| |
| def methods(include_inherited = true) |
| returnValue = @methods |
| if !@super_klass_key.nil? && include_inherited |
| returnValue = @methods + @session.schema(@super_klass_key).methods |
| end |
| return returnValue |
| end |
| |
| def to_s |
| if @kind == CLASS_KIND_TABLE |
| kind_str = "Table" |
| elsif @kind == CLASS_KIND_EVENT |
| kind_str = "Event" |
| else |
| kind_str = "Unsupported" |
| end |
| "#{kind_str} Class: #{klass_key.to_s}" |
| end |
| end |
| |
| class SchemaProperty |
| |
| attr_reader :name, :type, :access, :index, :optional, |
| :unit, :min, :max, :maxlen, :desc, :refClass, :refPackage |
| |
| def initialize(codec) |
| map = codec.read_map |
| @name = map["name"] |
| @type = map["type"] |
| @access = map["access"] |
| @index = map["index"] != 0 |
| @optional = map["optional"] != 0 |
| @unit = map["unit"] |
| @min = map["min"] |
| @max = map["max"] |
| @maxlen = map["maxlen"] |
| @desc = map["desc"] |
| @refClass = map["refClass"] |
| @refPackage = map["refPackage"] |
| end |
| |
| def to_s |
| @name |
| end |
| end |
| |
| class SchemaStatistic |
| |
| attr_reader :name, :type, :unit, :desc, :refClass, :refPackage |
| |
| def initialize(codec) |
| map = codec.read_map |
| @name = map["name"] |
| @type = map["type"] |
| @unit = map["unit"] |
| @desc = map["desc"] |
| @refClass = map["refClass"] |
| @refPackage = map["refPackage"] |
| end |
| |
| def to_s |
| @name |
| end |
| end |
| |
| class SchemaMethod |
| |
| attr_reader :name, :desc, :arguments |
| |
| def initialize(codec) |
| map = codec.read_map |
| @name = map["name"] |
| arg_count = map["argCount"] |
| @desc = map["desc"] |
| @arguments = [] |
| arg_count.times { |idx| |
| @arguments << SchemaArgument.new(codec, true) |
| } |
| end |
| |
| def to_s |
| result = @name + "(" |
| first = true |
| result += @arguments.select { |arg| arg.dir.index(?I) }.join(", ") |
| result += ")" |
| return result |
| end |
| end |
| |
| class SchemaArgument |
| |
| attr_reader :name, :type, :dir, :unit, :min, :max, :maxlen |
| attr_reader :desc, :default, :refClass, :refPackage |
| |
| def initialize(codec, method_arg) |
| map = codec.read_map |
| @name = map["name"] |
| @type = map["type"] |
| @dir = map["dir"].upcase if method_arg |
| @unit = map["unit"] |
| @min = map["min"] |
| @max = map["max"] |
| @maxlen = map["maxlen"] |
| @desc = map["desc"] |
| @default = map["default"] |
| @refClass = map["refClass"] |
| @refPackage = map["refPackage"] |
| end |
| end |
| |
| # Object that represents QMF object identifiers |
| class ObjectId |
| |
| include Comparable |
| |
| attr_reader :first, :second |
| |
| def initialize(codec, first=0, second=0) |
| if codec |
| @first = codec.read_uint64 |
| @second = codec.read_uint64 |
| else |
| @first = first |
| @second = second |
| end |
| end |
| |
| def <=>(other) |
| return 1 unless other.is_a?(ObjectId) |
| return -1 if first < other.first |
| return 1 if first > other.first |
| return second <=> other.second |
| end |
| |
| def to_s |
| "%d-%d-%d-%d-%d" % [flags, sequence, broker_bank, agent_bank, object] |
| end |
| |
| def index |
| [first, second] |
| end |
| |
| def flags |
| (first & 0xF000000000000000) >> 60 |
| end |
| |
| def sequence |
| (first & 0x0FFF000000000000) >> 48 |
| end |
| |
| def broker_bank |
| (first & 0x0000FFFFF0000000) >> 28 |
| end |
| |
| def agent_bank |
| first & 0x000000000FFFFFFF |
| end |
| |
| def object |
| second |
| end |
| |
| def durable? |
| sequence == 0 |
| end |
| |
| def encode(codec) |
| codec.write_uint64(first) |
| codec.write_uint64(second) |
| end |
| end |
| |
| class Object |
| |
| DEFAULT_METHOD_WAIT_TIME = 60 |
| |
| attr_reader :object_id, :schema, :properties, :statistics, |
| :current_time, :create_time, :delete_time, :broker |
| |
| def initialize(session, broker, schema, codec, prop, stat, managed=true) |
| @session = session |
| @broker = broker |
| @schema = schema |
| if managed |
| @current_time = codec.read_uint64 |
| @create_time = codec.read_uint64 |
| @delete_time = codec.read_uint64 |
| @object_id = ObjectId.new(codec) |
| end |
| @properties = [] |
| @statistics = [] |
| if prop |
| missing = parse_presence_masks(codec, schema) |
| schema.properties.each do |property| |
| v = nil |
| unless missing.include?(property.name) |
| v = @session.decode_value(codec, property.type) |
| end |
| @properties << [property, v] |
| end |
| end |
| |
| if stat |
| schema.statistics.each do |statistic| |
| s = @session.decode_value(codec, statistic.type) |
| @statistics << [statistic, s] |
| end |
| end |
| end |
| |
| def klass_key |
| @schema.klass_key |
| end |
| |
| |
| def methods |
| @schema.methods |
| end |
| |
| # Return the current, creation, and deletion times for this object |
| def timestamps |
| return [@current_time, @create_time, @delete_time] |
| end |
| |
| # Return a string describing this object's primary key |
| def index |
| @properties.select { |property, value| |
| property.index |
| }.collect { |property,value| |
| @session.display_value(value, property.type) }.join(":") |
| end |
| |
| # Replace properties and/or statistics with a newly received update |
| def merge_update(newer) |
| unless object_id == newer.object_id |
| raise "Objects with different object-ids" |
| end |
| @properties = newer.properties unless newer.properties.empty? |
| @statistics = newer.statistics unless newer.statistics.empty? |
| end |
| |
| def update |
| obj = @session.object(:object_id => @object_id, :broker => @broker) |
| if obj |
| merge_update(obj) |
| else |
| raise "Underlying object no longer exists." |
| end |
| end |
| |
| def to_s |
| @schema.klass_key.to_s |
| end |
| |
| # This must be defined because ruby has this (deprecated) method built in. |
| def id |
| method_missing(:id) |
| end |
| |
| # Same here.. |
| def type |
| method_missing(:type) |
| end |
| |
| def name |
| method_missing(:name) |
| end |
| |
| def method_missing(name, *args) |
| name = name.to_s |
| |
| if method = @schema.methods.find { |method| name == method.name } |
| return invoke(method, name, args) |
| end |
| |
| @properties.each do |property, value| |
| return value if name == property.name |
| if name == "_#{property.name}_" && property.type == 10 |
| # Dereference references |
| deref = @session.objects(:object_id => value, :broker => @broker) |
| return nil unless deref.size == 1 |
| return deref[0] |
| end |
| end |
| @statistics.each do |statistic, value| |
| if name == statistic.name |
| return value |
| end |
| end |
| raise "Type Object has no attribute '#{name}'" |
| end |
| |
| def encode(codec) |
| codec.write_uint8(20) |
| @schema.klass_key.encode(codec) |
| |
| # emit presence masks for optional properties |
| mask = 0 |
| bit = 0 |
| schema.properties.each do |property| |
| if prop.optional |
| bit = 1 if bit == 0 |
| mask |= bit if value |
| bit = bit << 1 |
| if bit == 256 |
| bit = 0 |
| codec.write_uint8(mask) |
| mask = 0 |
| end |
| codec.write_uint8(mask) if bit != 0 |
| end |
| end |
| |
| # encode properties |
| @properties.each do |property, value| |
| @session.encode_value(codec, value, prop.type) if value |
| end |
| |
| # encode statistics |
| @statistics.each do |statistic, value| |
| @session.encode_value(codec, value, stat.type) |
| end |
| end |
| |
| private |
| |
| def send_method_request(method, name, args, synchronous = false, time_wait = nil) |
| @schema.methods.each do |schema_method| |
| if name == schema_method.name |
| send_codec = Qpid::StringCodec.new(@broker.conn.spec) |
| seq = @session.seq_mgr.reserve([schema_method, synchronous]) |
| @broker.set_header(send_codec, ?M, seq) |
| @object_id.encode(send_codec) |
| @schema.klass_key.encode(send_codec) |
| send_codec.write_str8(name) |
| |
| formals = method.arguments.select { |arg| arg.dir.index(?I) } |
| count = method.arguments.select { |arg| arg.dir.index(?I) }.size |
| unless formals.size == args.size |
| raise "Incorrect number of arguments: expected #{formals.size}, got #{args.size}" |
| end |
| |
| formals.zip(args).each do |formal, actual| |
| @session.encode_value(send_codec, actual, formal.type) |
| end |
| |
| ttl = time_wait ? time_wait * 1000 : nil |
| smsg = @broker.message(send_codec.encoded, |
| "agent.#{object_id.broker_bank}.#{object_id.agent_bank}", ttl=ttl) |
| @broker.sync_start if synchronous |
| @broker.emit(smsg) |
| |
| return seq |
| end |
| end |
| end |
| |
| def invoke(method, name, args) |
| kwargs = args[args.size - 1] |
| sync = true |
| timeout = DEFAULT_METHOD_WAIT_TIME |
| |
| if kwargs.class == Hash |
| if kwargs.include?(:timeout) |
| timeout = kwargs[:timeout] |
| end |
| |
| if kwargs.include?(:async) |
| sync = !kwargs[:async] |
| end |
| args.pop |
| end |
| |
| seq = send_method_request(method, name, args, synchronous = sync) |
| if seq |
| return seq unless sync |
| unless @broker.wait_for_sync_done(timeout) |
| @session.seq_mgr.release(seq) |
| raise "Timed out waiting for method to respond" |
| end |
| |
| if @broker.error |
| error_text = @broker.error |
| @broker.error = nil |
| raise error_text |
| end |
| |
| return @broker.sync_result |
| end |
| raise "Invalid Method (software defect) [#{name}]" |
| end |
| |
| def parse_presence_masks(codec, schema) |
| exclude_list = [] |
| bit = 0 |
| schema.properties.each do |property| |
| if property.optional |
| if bit == 0 |
| mask = codec.read_uint8 |
| bit = 1 |
| end |
| if (mask & bit) == 0 |
| exclude_list << property.name |
| end |
| bit *= 2 |
| bit = 0 if bit == 256 |
| end |
| end |
| return exclude_list |
| end |
| end |
| |
| class MethodResult |
| |
| attr_reader :status, :text, :out_args |
| |
| def initialize(status, text, out_args) |
| @status = status |
| @text = text |
| @out_args = out_args |
| end |
| |
| def method_missing(name) |
| name = name.to_s() |
| if @out_args.include?(name) |
| return @out_args[name] |
| else |
| raise "Unknown method result arg #{name}" |
| end |
| end |
| |
| def to_s |
| argsString = "" |
| padding = "" |
| out_args.each do |key,value| |
| argsString += padding |
| padding = " " if padding == "" |
| argsString += key.to_s |
| argsString += " => " |
| argsString += value.to_s() |
| end |
| "MethodResult(Msg: '#{text}' Status: #{status} Return: [#{argsString}])" |
| end |
| end |
| |
| class ManagedConnection |
| |
| DELAY_MIN = 1 |
| DELAY_MAX = 128 |
| DELAY_FACTOR = 2 |
| include MonitorMixin |
| |
| def initialize(broker) |
| super() |
| @broker = broker |
| @cv = new_cond |
| @is_cancelled = false |
| end |
| |
| # Main body of the running thread. |
| def start |
| @thread = Thread.new { |
| delay = DELAY_MIN |
| while true |
| begin |
| @broker.try_to_connect |
| synchronize do |
| while !@is_cancelled and @broker.connected? |
| @cv.wait |
| Thread.exit if @is_cancelled |
| delay = DELAY_MIN |
| end |
| end |
| |
| rescue |
| delay *= DELAY_FACTOR if delay < DELAY_MAX |
| end |
| |
| synchronize do |
| @cv.wait(delay) |
| Thread.exit if @is_cancelled |
| end |
| end |
| } |
| end |
| |
| # Tell this thread to stop running and return. |
| def stop |
| synchronize do |
| @is_cancelled = true |
| @cv.signal |
| end |
| end |
| |
| # Notify the thread that the connection was lost. |
| def disconnected |
| synchronize do |
| @cv.signal |
| end |
| end |
| |
| def join |
| @thread.join |
| end |
| end |
| |
| class Broker |
| |
| SYNC_TIME = 60 |
| @@next_seq = 1 |
| |
| include MonitorMixin |
| |
| attr_accessor :error |
| |
| attr_reader :amqp_session_id, :amqp_session, :conn, :broker_bank, :topic_name |
| |
| attr_accessor :broker_id, :sync_result |
| |
| def initialize(session, host, port, auth_name, auth_pass, kwargs) |
| super() |
| |
| # For debugging.. |
| Thread.abort_on_exception = true |
| |
| @session = session |
| @host = host |
| @port = port |
| @auth_name = auth_name |
| @auth_pass = auth_pass |
| @user_id = nil |
| @auth_mechanism = kwargs[:mechanism] |
| @auth_service = kwargs[:service] |
| @broker_bank = 1 |
| @topic_bound = false |
| @cv = new_cond |
| @error = nil |
| @broker_id = nil |
| @is_connected = false |
| @amqp_session_id = "%s.%d.%d" % [Socket.gethostname, Process::pid, @@next_seq] |
| @@next_seq += 1 |
| @conn = nil |
| if @session.managedConnections? |
| @thread = ManagedConnection.new(self) |
| @thread.start |
| else |
| @thread = nil |
| try_to_connect |
| end |
| end |
| |
| def connected? |
| @is_connected |
| end |
| |
| def agent(broker_bank, agent_bank) |
| bank_key = "%d.%d" % [broker_bank, agent_bank] |
| return @agents[bank_key] |
| end |
| |
| # Get the list of agents reachable via this broker |
| def agents |
| @agents.values |
| end |
| |
| def url |
| "#{@host}:#{@port}" |
| end |
| |
| def to_s |
| if connected? |
| "Broker connected at: #{url}" |
| else |
| "Disconnected Broker" |
| end |
| end |
| |
| def wait_for_sync_done(timeout=nil) |
| wait_time = timeout ? timeout : SYNC_TIME |
| synchronize do |
| return @cv.wait_for(wait_time) { ! @sync_in_flight || @error } |
| end |
| end |
| |
| def wait_for_stable |
| synchronize do |
| return unless connected? |
| return if @reqs_outstanding == 0 |
| @sync_in_flight = true |
| unless @cv.wait_for(SYNC_TIME) { @reqs_outstanding == 0 } |
| raise "Timed out waiting for broker to synchronize" |
| end |
| end |
| end |
| |
| # Compose the header of a management message |
| def set_header(codec, opcode, seq=0) |
| codec.write_uint8(?A) |
| codec.write_uint8(?M) |
| codec.write_uint8(?2) |
| codec.write_uint8(opcode) |
| codec.write_uint32(seq) |
| end |
| |
| def message(body, routing_key="broker", ttl=nil) |
| dp = @amqp_session.delivery_properties |
| dp.routing_key = routing_key |
| dp.ttl = ttl if ttl |
| mp = @amqp_session.message_properties |
| mp.content_type = "x-application/qmf" |
| mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name) |
| #mp.user_id = @user_id if @user_id |
| return Qpid::Message.new(dp, mp, body) |
| end |
| |
| def emit(msg, dest="qpid.management") |
| @amqp_session.message_transfer(:destination => dest, |
| :message => msg) |
| end |
| |
| def inc_outstanding |
| synchronize { @reqs_outstanding += 1 } |
| end |
| |
| def dec_outstanding |
| synchronize do |
| @reqs_outstanding -= 1 |
| if @reqs_outstanding == 0 && ! @topic_bound |
| @topic_bound = true |
| @session.binding_key_list.each do |key| |
| args = { |
| :exchange => "qpid.management", |
| :queue => @topic_name, |
| :binding_key => key } |
| @amqp_session.exchange_bind(args) |
| end |
| end |
| if @reqs_outstanding == 0 && @sync_in_flight |
| sync_done |
| end |
| end |
| end |
| |
| def sync_start |
| synchronize { @sync_in_flight = true } |
| end |
| |
| def sync_done |
| synchronize do |
| @sync_in_flight = false |
| @cv.signal |
| end |
| end |
| |
| def update_agent(obj) |
| bank_key = "%d.%d" % [obj.brokerBank, obj.agentBank] |
| if obj.delete_time == 0 |
| unless @agents.include?(bank_key) |
| agent = Agent.new(self, obj.agentBank, obj.label) |
| @agents[bank_key] = agent |
| @session.console.new_agent(agent) if @session.console |
| end |
| else |
| agent = @agents.delete(bank_key) |
| @session.console.del_agent(agent) if agent && @session.console |
| end |
| end |
| |
| def shutdown |
| if @thread |
| @thread.stop |
| @thread.join |
| end |
| if connected? |
| @amqp_session.incoming("rdest").stop |
| if @session.console |
| @amqp_session.incoming("tdest").stop |
| end |
| @amqp_session.close |
| @is_connected = false |
| end |
| end |
| |
| def try_to_connect |
| @agents = {} |
| @agents["1.0"] = Agent.new(self, 0, "BrokerAgent") |
| @topic_bound = false |
| @sync_in_flight = false |
| @sync_request = 0 |
| @sync_result = nil |
| @reqs_outstanding = 1 |
| |
| # FIXME: Need sth for Qpid::Util::connect |
| |
| @conn = Qpid::Connection.new(TCPSocket.new(@host, @port), |
| :mechanism => @auth_mechanism, |
| :username => @auth_name, |
| :password => @auth_pass, |
| :host => @host, |
| :service => @auth_service) |
| @conn.start |
| @user_id = @conn.user_id |
| @reply_name = "reply-%s" % amqp_session_id |
| @amqp_session = @conn.session(@amqp_session_id) |
| @amqp_session.auto_sync = true |
| |
| @amqp_session.queue_declare(:queue => @reply_name, |
| :exclusive => true, |
| :auto_delete => true) |
| |
| @amqp_session.exchange_bind(:exchange => "amq.direct", |
| :queue => @reply_name, |
| :binding_key => @reply_name) |
| @amqp_session.message_subscribe(:queue => @reply_name, |
| :destination => "rdest", |
| :accept_mode => @amqp_session.message_accept_mode.none, |
| :acquire_mode => @amqp_session.message_acquire_mode.pre_acquired) |
| q = @amqp_session.incoming("rdest") |
| q.exc_listen(& method(:exception_cb)) |
| q.listen(& method(:reply_cb)) |
| @amqp_session.message_set_flow_mode(:destination => "rdest", |
| :flow_mode => 1) |
| @amqp_session.message_flow(:destination => "rdest", |
| :unit => 0, |
| :value => 0xFFFFFFFF) |
| @amqp_session.message_flow(:destination => "rdest", |
| :unit => 1, |
| :value => 0xFFFFFFFF) |
| |
| @topic_name = "topic-#{@amqp_session_id}" |
| @amqp_session.queue_declare(:queue => @topic_name, |
| :exclusive => true, |
| :auto_delete => true) |
| @amqp_session.message_subscribe(:queue => @topic_name, |
| :destination => "tdest", |
| :accept_mode => @amqp_session.message_accept_mode.none, |
| :acquire_mode => @amqp_session.message_acquire_mode.pre_acquired) |
| @amqp_session.incoming("tdest").listen(& method(:reply_cb)) |
| @amqp_session.message_set_flow_mode(:destination => "tdest", |
| :flow_mode => 1) |
| @amqp_session.message_flow(:destination => "tdest", |
| :unit => 0, |
| :value => 0xFFFFFFFF) |
| @amqp_session.message_flow(:destination => "tdest", |
| :unit => 1, |
| :value => 0xFFFFFFFF) |
| |
| @is_connected = true |
| @session.handle_broker_connect(self) |
| |
| codec = Qpid::StringCodec.new(@conn.spec) |
| set_header(codec, ?B) |
| msg = message(codec.encoded) |
| emit(msg) |
| end |
| |
| private |
| |
| # Check the header of a management message and extract the opcode and |
| # class |
| def check_header(codec) |
| begin |
| return [nil, nil] unless codec.read_uint8 == ?A |
| return [nil, nil] unless codec.read_uint8 == ?M |
| return [nil, nil] unless codec.read_uint8 == ?2 |
| opcode = codec.read_uint8 |
| seq = codec.read_uint32 |
| return [opcode, seq] |
| rescue |
| return [nil, nil] |
| end |
| end |
| |
| def reply_cb(msg) |
| codec = Qpid::StringCodec.new(@conn.spec, msg.body) |
| loop do |
| opcode, seq = check_header(codec) |
| return unless opcode |
| case opcode |
| when ?b: @session.handle_broker_resp(self, codec, seq) |
| when ?p: @session.handle_package_ind(self, codec, seq) |
| when ?z: @session.handle_command_complete(self, codec, seq) |
| when ?q: @session.handle_class_ind(self, codec, seq) |
| when ?m: @session.handle_method_resp(self, codec, seq) |
| when ?h: @session.handle_heartbeat_ind(self, codec, seq, msg) |
| when ?e: @session.handle_event_ind(self, codec, seq) |
| when ?s: @session.handle_schema_resp(self, codec, seq) |
| when ?c: @session.handle_content_ind(self, codec, seq, true, false) |
| when ?i: @session.handle_content_ind(self, codec, seq, false, true) |
| when ?g: @session.handle_content_ind(self, codec, seq, true, true) |
| else |
| raise "Unexpected opcode #{opcode.inspect}" |
| end |
| end |
| end |
| |
| def exception_cb(data) |
| @is_connected = false |
| @error = data |
| synchronize { @cv.signal if @sync_in_flight } |
| @session.handle_error(@error) |
| @session.handle_broker_disconnect(self) |
| @thread.disconnected if @thread |
| end |
| end |
| |
| class Agent |
| attr_reader :broker, :agent_bank, :label |
| |
| def initialize(broker, agent_bank, label) |
| @broker = broker |
| @agent_bank = agent_bank |
| @label = label |
| end |
| |
| def broker_bank |
| @broker.broker_bank |
| end |
| |
| def to_s |
| "Agent at bank %d.%d (%s)" % [@broker.broker_bank, @agent_bank, @label] |
| end |
| end |
| |
| class Event |
| |
| attr_reader :klass_key, :arguments, :timestamp, :name, :schema |
| |
| def initialize(session, broker, codec) |
| @session = session |
| @broker = broker |
| @klass_key = ClassKey.new(codec) |
| @timestamp = codec.read_int64 |
| @severity = codec.read_uint8 |
| @schema = nil |
| |
| pname, cname, hash = @klass_key.to_a() |
| session.packages.keys.each do |pname| |
| k = [cname, hash] |
| if session.packages[pname].include?(k) |
| @schema = session.packages[pname][k] |
| @arguments = {} |
| @schema.arguments.each do |arg| |
| v = session.decode_value(codec, arg.type) |
| @arguments[arg.name] = v |
| end |
| end |
| end |
| end |
| |
| def to_s |
| return "<uninterpretable>" unless @schema |
| t = Time.at(self.timestamp / 1000000000) |
| out = t.strftime("%c") |
| out += " " + sev_name + " " + @klass_key.package + ":" + @klass_key.klass_name |
| out += " broker=" + @broker.url |
| @schema.arguments.each do |arg| |
| out += " " + arg.name + "=" + @session.display_value(@arguments[arg.name], arg.type) |
| end |
| return out |
| end |
| |
| def sev_name |
| case @severity |
| when 0 : return "EMER " |
| when 1 : return "ALERT" |
| when 2 : return "CRIT " |
| when 3 : return "ERROR" |
| when 4 : return "WARN " |
| when 5 : return "NOTIC" |
| when 6 : return "INFO " |
| when 7 : return "DEBUG" |
| else |
| return "INV-%d" % @severity |
| end |
| end |
| |
| end |
| |
| # Manage sequence numbers for asynchronous method calls |
| class SequenceManager |
| include MonitorMixin |
| |
| def initialize |
| super() |
| @sequence = 0 |
| @pending = {} |
| end |
| |
| # Reserve a unique sequence number |
| def reserve (data) |
| synchronize do |
| result = @sequence |
| @sequence += 1 |
| @pending[result] = data |
| return result |
| end |
| end |
| |
| # Release a reserved sequence number |
| def release (seq) |
| synchronize { @pending.delete(seq) } |
| end |
| end |
| |
| class DebugConsole < Console |
| |
| def broker_connected(broker) |
| puts "brokerConnected #{broker}" |
| end |
| |
| def broker_disconnected(broker) |
| puts "brokerDisconnected #{broker}" |
| end |
| |
| def new_package(name) |
| puts "newPackage #{name}" |
| end |
| |
| def new_class(kind, klass_key) |
| puts "newClass #{kind} #{klass_key}" |
| end |
| |
| def new_agent(agent) |
| puts "new_agent #{agent}" |
| end |
| |
| def del_agent(agent) |
| puts "delAgent #{agent}" |
| end |
| |
| def object_props(broker, record) |
| puts "objectProps #{record}" |
| end |
| |
| def object_stats(broker, record) |
| puts "objectStats #{record}" |
| end |
| |
| def event(broker, event) |
| puts "event #{event}" |
| end |
| |
| def heartbeat(agent, timestamp) |
| puts "heartbeat #{agent}" |
| end |
| |
| def broker_info(broker) |
| puts "brokerInfo #{broker}" |
| end |
| end |
| |
| module XML |
| TYPES = { |
| 1 => "uint8", |
| 2 => "uint16", |
| 3 => "uint32", |
| 4 => "uint64", |
| 5 => "bool", |
| 6 => "short-stirng", |
| 7 => "long-string", |
| 8 => "abs-time", |
| 9 => "delta-time", |
| 10 => "reference", |
| 11 => "boolean", |
| 12 => "float", |
| 13 => "double", |
| 14 => "uuid", |
| 15 => "field-table", |
| 16 => "int8", |
| 17 => "int16", |
| 18 => "int32", |
| 19 => "int64", |
| 20 => "object", |
| 21 => "list", |
| 22 => "array" |
| } |
| |
| ACCESS_MODES = { |
| 1 => "RC", |
| 2 => "RW", |
| 3 => "RO" |
| } |
| |
| def common_attributes(item) |
| attr_string = "" |
| attr_string << " desc='#{item.desc}'" if item.desc |
| attr_string << " desc='#{item.desc}'" if item.desc |
| attr_string << " refPackage='#{item.refPackage}'" if item.refPackage |
| attr_string << " refClass='#{item.refClass}'" if item.refClass |
| attr_string << " unit='#{item.unit}'" if item.unit |
| attr_string << " min='#{item.min}'" if item.min |
| attr_string << " max='#{item.max}'" if item.max |
| attr_string << " maxlen='#{item.maxlen}'" if item.maxlen |
| return attr_string |
| end |
| |
| module_function :common_attributes |
| |
| def schema_xml(session, *packages) |
| schema = "<schemas>\n" |
| packages.each do |package| |
| schema << "\t<schema package='#{package}'>\n" |
| session.classes(package).each do |klass_key| |
| klass = session.schema(klass_key) |
| if klass.is_table? |
| if klass.super_klass_key |
| schema << "\t\t<class name='#{klass.klass_key.klass_name}' hash='#{klass.klass_key.hash_string}' extends='#{klass.super_klass_key.to_s}'>\n" |
| else |
| schema << "\t\t<class name='#{klass.klass_key.klass_name}' hash='#{klass.klass_key.hash_string}'>\n" |
| end |
| klass.properties(false).each do |property| |
| schema << "\t\t\t<property name='#{property.name}' type='#{TYPES[property.type]}' access='#{ACCESS_MODES[property.access]}' optional='#{property.optional ? "True" : "False"}'#{common_attributes(property)}/>\n" |
| end |
| klass.methods(false).each do |method| |
| schema << "\t\t\t<method name='#{method.name}'>\n" |
| method.arguments.each do |arg| |
| schema << "\t\t\t\t<arg name='#{arg.name}' dir='#{arg.dir}' type='#{TYPES[arg.type]}'#{common_attributes(arg)}/>\n" |
| end |
| schema << "\t\t\t</method>\n" |
| end |
| schema << "\t\t</class>\n" |
| else |
| schema << "\t\t<event name='#{klass.klass_key.klass_name}' hash='#{klass.klass_key.hash_string}'>\n" |
| klass.arguments.each do |arg| |
| schema << "\t\t\t<arg name='#{arg.name}'type='#{TYPES[arg.type]}'#{common_attributes(arg)}/>\n" |
| end |
| schema << "\t\t</event>\n" |
| end |
| end |
| schema << "\t</package>\n" |
| end |
| schema << "</schema>" |
| end |
| |
| module_function :schema_xml |
| end |
| |
| end |