| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| require 'qmfengine' |
| require 'thread' |
| require 'socket' |
| require 'monitor' |
| |
| module Qmf |
| |
| # Pull all the TYPE_* constants into Qmf namespace. Maybe there's an easier way? |
| Qmfengine.constants.each do |c| |
| c = c.to_s |
| if c.index('TYPE_') == 0 or c.index('ACCESS_') == 0 or c.index('DIR_') == 0 or |
| c.index('CLASS_') == 0 or c.index('SEV_') == 0 |
| const_set(c, Qmfengine.const_get(c)) |
| end |
| end |
| |
| module StringHelpers |
| def ensure_encoding(str) |
| enc = (Encoding.default_external.name || "UTF-8" rescue "UTF-8") |
| str.respond_to?(:force_encoding) ? str.force_encoding(enc) : str |
| end |
| end |
| |
| class Util |
| include StringHelpers |
| |
| def qmf_to_native(val) |
| case val.getType |
| when TYPE_UINT8, TYPE_UINT16, TYPE_UINT32 then val.asUint |
| when TYPE_UINT64 then val.asUint64 |
| when TYPE_SSTR, TYPE_LSTR then ensure_encoding(val.asString) |
| when TYPE_ABSTIME then val.asInt64 |
| when TYPE_DELTATIME then val.asUint64 |
| when TYPE_REF then ObjectId.new(val.asObjectId) |
| when TYPE_BOOL then val.asBool |
| when TYPE_FLOAT then val.asFloat |
| when TYPE_DOUBLE then val.asDouble |
| when TYPE_UUID then val.asUuid |
| when TYPE_INT8, TYPE_INT16, TYPE_INT32 then val.asInt |
| when TYPE_INT64 then val.asInt64 |
| when TYPE_MAP then value_to_dict(val) |
| when TYPE_LIST then value_to_list(val) |
| when TYPE_OBJECT |
| when TYPE_ARRAY |
| end |
| end |
| |
| def native_to_qmf(target, value) |
| if target.class == Qmfengine::Value |
| val = target |
| typecode = val.getType |
| else |
| typecode = target |
| val = Qmfengine::Value.new(typecode) |
| end |
| |
| case typecode |
| when TYPE_UINT8, TYPE_UINT16, TYPE_UINT32 then val.setUint(value) |
| when TYPE_UINT64 then val.setUint64(value) |
| when TYPE_SSTR, TYPE_LSTR then value ? val.setString(value) : val.setString('') |
| when TYPE_ABSTIME then val.setInt64(value) |
| when TYPE_DELTATIME then val.setUint64(value) |
| when TYPE_REF then val.setObjectId(value.impl) |
| when TYPE_BOOL then value ? val.setBool(value) : val.setBool(0) |
| when TYPE_FLOAT then val.setFloat(value) |
| when TYPE_DOUBLE then val.setDouble(value) |
| when TYPE_UUID then val.setUuid(value) |
| when TYPE_INT8, TYPE_INT16, TYPE_INT32 then val.setInt(value) |
| when TYPE_INT64 then val.setInt64(value) |
| when TYPE_MAP then dict_to_value(val, value) |
| when TYPE_LIST then list_to_value(val, value) |
| when TYPE_OBJECT |
| when TYPE_ARRAY |
| end |
| return val |
| end |
| |
| def pick_qmf_type(value) |
| if value.class == Fixnum |
| if value >= 0 |
| return TYPE_UINT32 if value < 0x100000000 |
| return TYPE_UINT64 |
| else |
| return TYPE_INT32 if value > -0xffffffff |
| return TYPE_INT64 |
| end |
| end |
| |
| if value.class == Bignum |
| return TYPE_UINT64 if value >= 0 |
| return TYPE_INT64 |
| end |
| |
| if value.class == String |
| return TYPE_SSTR if value.length < 256 |
| return TYPE_LSTR |
| end |
| |
| return TYPE_DOUBLE if value.class == Float |
| |
| return TYPE_BOOL if value.class == TrueClass |
| return TYPE_BOOL if value.class == FalseClass |
| return TYPE_BOOL if value.class == NilClass |
| |
| return TYPE_MAP if value.class == Hash |
| return TYPE_LIST if value.class == Array |
| |
| raise ArgumentError, "QMF type not known for native type #{value.class}" |
| end |
| |
| def value_to_dict(val) |
| # Assume val is of type Qmfengine::Value |
| raise ArgumentError, "value_to_dict must be given a map value" if !val.isMap |
| map = {} |
| for i in 0...val.keyCount |
| key = val.key(i) |
| map[key] = qmf_to_native(val.byKey(key)) |
| end |
| return map |
| end |
| |
| def dict_to_value(val, map) |
| map.each do |key, value| |
| raise ArgumentError, "QMF map key must be a string" if key.class != String |
| typecode = pick_qmf_type(value) |
| val.insert(key, native_to_qmf(typecode, value)) |
| end |
| end |
| |
| def value_to_list(val) |
| # Assume val is of type Qmfengine::Value |
| raise ArgumentError, "value_to_dict must be given a map value" if !val.isList |
| list = [] |
| for i in 0...val.listItemCount |
| list.push(qmf_to_native(val.listItem(i))) |
| end |
| return list |
| end |
| |
| def list_to_value(val, list) |
| list.each do |value| |
| typecode = pick_qmf_type(value) |
| val.appendToList(native_to_qmf(typecode, value)) |
| end |
| end |
| end |
| |
| $util = Util.new |
| |
| ##============================================================================== |
| ## CONNECTION |
| ##============================================================================== |
| |
| class ConnectionSettings |
| include StringHelpers |
| attr_reader :impl |
| |
| def initialize(url = nil) |
| if url |
| @impl = Qmfengine::ConnectionSettings.new(url) |
| else |
| @impl = Qmfengine::ConnectionSettings.new() |
| end |
| end |
| |
| def set_attr(key, val) |
| if val.class == String |
| v = Qmfengine::Value.new(TYPE_LSTR) |
| v.setString(val) |
| elsif val.class == TrueClass or val.class == FalseClass |
| v = Qmfengine::Value.new(TYPE_BOOL) |
| v.setBool(val) |
| elsif val.class == Fixnum |
| v = Qmfengine::Value.new(TYPE_UINT32) |
| v.setUint(val) |
| else |
| raise ArgumentError, "Value for attribute '#{key}' has unsupported type: #{val.class}" |
| end |
| |
| good = @impl.setAttr(key, v) |
| raise "Invalid attribute '#{key}'" unless good |
| end |
| |
| def get_attr(key) |
| _v = @impl.getAttr(key) |
| if _v.isString() |
| return ensure_encoding(_v.asString()) |
| elsif _v.isUint() |
| return _v.asUint() |
| elsif _v.isBool() |
| return _v.asBool() |
| else |
| raise Exception("Argument error: value for attribute '#{key}' has unsupported type: #{_v.getType()}") |
| end |
| end |
| |
| |
| def method_missing(name_in, *args) |
| name = name_in.to_s |
| if name[name.length - 1] == 61 |
| attr = name[0..name.length - 2] |
| set_attr(attr, args[0]) |
| return |
| else |
| return get_attr(name) |
| end |
| end |
| end |
| |
| class ConnectionHandler |
| def conn_event_connected(); end |
| def conn_event_disconnected(error); end |
| def conn_event_visit(); end |
| def sess_event_session_closed(context, error); end |
| def sess_event_recv(context, message); end |
| end |
| |
| class Connection |
| include MonitorMixin |
| |
| attr_reader :impl |
| |
| def initialize(settings) |
| super() |
| @impl = Qmfengine::ResilientConnection.new(settings.impl) |
| @sockEngine, @sock = Socket::socketpair(Socket::PF_UNIX, Socket::SOCK_STREAM, 0) |
| @impl.setNotifyFd(@sockEngine.fileno) |
| @new_conn_handlers = [] |
| @conn_handlers_to_delete = [] |
| @conn_handlers = [] |
| @connected = nil |
| |
| @thread = Thread.new do |
| run |
| end |
| end |
| |
| def connected? |
| @connected |
| end |
| |
| def kick |
| @impl.notify |
| end |
| |
| def add_conn_handler(handler) |
| synchronize do |
| @new_conn_handlers << handler |
| end |
| kick |
| end |
| |
| def del_conn_handler(handler) |
| synchronize do |
| @conn_handlers_to_delete << handler |
| end |
| kick |
| end |
| |
| def run() |
| eventImpl = Qmfengine::ResilientConnectionEvent.new |
| new_handlers = nil |
| del_handlers = nil |
| bt_count = 0 |
| |
| while :true |
| @sock.read(1) |
| |
| synchronize do |
| new_handlers = @new_conn_handlers |
| del_handlers = @conn_handlers_to_delete |
| @new_conn_handlers = [] |
| @conn_handlers_to_delete = [] |
| end |
| |
| new_handlers.each do |nh| |
| @conn_handlers << nh |
| nh.conn_event_connected() if @connected |
| end |
| new_handlers = nil |
| |
| del_handlers.each do |dh| |
| d = @conn_handlers.delete(dh) |
| end |
| del_handlers = nil |
| |
| valid = @impl.getEvent(eventImpl) |
| while valid |
| begin |
| case eventImpl.kind |
| when Qmfengine::ResilientConnectionEvent::CONNECTED |
| @connected = :true |
| @conn_handlers.each { |h| h.conn_event_connected() } |
| when Qmfengine::ResilientConnectionEvent::DISCONNECTED |
| @connected = nil |
| @conn_handlers.each { |h| h.conn_event_disconnected(eventImpl.errorText) } |
| when Qmfengine::ResilientConnectionEvent::SESSION_CLOSED |
| eventImpl.sessionContext.handler.sess_event_session_closed(eventImpl.sessionContext, eventImpl.errorText) |
| when Qmfengine::ResilientConnectionEvent::RECV |
| eventImpl.sessionContext.handler.sess_event_recv(eventImpl.sessionContext, eventImpl.message) |
| end |
| rescue Exception => ex |
| if bt_count < 2 |
| bt_count += 1 |
| end |
| end |
| @impl.popEvent |
| valid = @impl.getEvent(eventImpl) |
| end |
| @conn_handlers.each { |h| h.conn_event_visit } |
| end |
| end |
| end |
| |
| class Session |
| attr_reader :handle, :handler |
| |
| def initialize(conn, label, handler) |
| @conn = conn |
| @label = label |
| @handler = handler |
| @handle = Qmfengine::SessionHandle.new |
| result = @conn.impl.createSession(label, self, @handle) |
| end |
| |
| def destroy() |
| @conn.impl.destroySession(@handle) |
| end |
| end |
| |
| ##============================================================================== |
| ## OBJECTS and EVENTS |
| ##============================================================================== |
| |
| class QmfEvent |
| attr_reader :impl, :event_class |
| def initialize(cls, kwargs={}) |
| @broker = kwargs[:broker] if kwargs.include?(:broker) |
| @allow_sets = :true |
| |
| if cls |
| @event_class = cls |
| @impl = Qmfengine::Event.new(@event_class.impl) |
| elsif kwargs.include?(:impl) |
| @impl = Qmfengine::Event.new(kwargs[:impl]) |
| @event_class = SchemaEventClass.new(nil, nil, nil, :impl => @impl.getClass) |
| end |
| end |
| |
| def arguments |
| list = [] |
| @event_class.arguments.each do |arg| |
| list << [arg, get_attr(arg.name)] |
| end |
| return list |
| end |
| |
| def get_attr(name) |
| val = value(name) |
| $util.qmf_to_native(val) |
| end |
| |
| def set_attr(name, v) |
| val = value(name) |
| $util.native_to_qmf(val, v) |
| end |
| |
| def [](name) |
| get_attr(name) |
| end |
| |
| def []=(name, value) |
| set_attr(name, value) |
| end |
| |
| def method_missing(name_in, *args) |
| # |
| # Convert the name to a string and determine if it represents an |
| # attribute assignment (i.e. "attr=") |
| # |
| name = name_in.to_s |
| attr_set = (name[name.length - 1] == 61) |
| name = name[0..name.length - 2] if attr_set |
| raise "Sets not permitted on this object" if attr_set && !@allow_sets |
| |
| # |
| # If the name matches an argument name, set or return the value of the argument. |
| # |
| @event_class.arguments.each do |arg| |
| if arg.name == name |
| if attr_set |
| return set_attr(name, args[0]) |
| else |
| return get_attr(name) |
| end |
| end |
| end |
| |
| # |
| # This name means nothing to us, pass it up the line to the parent |
| # class's handler. |
| # |
| super.method_missing(name_in, args) |
| end |
| |
| private |
| def value(name) |
| val = @impl.getValue(name.to_s) |
| if val.nil? |
| raise ArgumentError, "Attribute '#{name}' not defined for event #{@event_class.impl.getClassKey.getPackageName}:#{@object_class.impl.getClassKey.getClassName}" |
| end |
| return val |
| end |
| end |
| |
| class QmfObject |
| include MonitorMixin |
| attr_reader :impl, :object_class |
| def initialize(cls, kwargs={}) |
| super() |
| @cv = new_cond |
| @sync_count = 0 |
| @sync_result = nil |
| @allow_sets = :false |
| @broker = kwargs[:broker] if kwargs.include?(:broker) |
| |
| if cls |
| @object_class = cls |
| @impl = Qmfengine::Object.new(@object_class.impl) |
| elsif kwargs.include?(:impl) |
| @impl = Qmfengine::Object.new(kwargs[:impl]) |
| @object_class = SchemaObjectClass.new(nil, nil, :impl => @impl.getClass) |
| end |
| end |
| |
| def object_id |
| return ObjectId.new(@impl.getObjectId) |
| end |
| |
| def properties |
| list = [] |
| @object_class.properties.each do |prop| |
| list << [prop, get_attr(prop.name)] |
| end |
| return list |
| end |
| |
| def statistics |
| list = [] |
| @object_class.statistics.each do |stat| |
| list << [stat, get_attr(stat.name)] |
| end |
| return list |
| end |
| |
| def get_attr(name) |
| val = value(name) |
| $util.qmf_to_native(val) |
| end |
| |
| def set_attr(name, v) |
| val = value(name) |
| $util.native_to_qmf(val, v) |
| end |
| |
| def [](name) |
| get_attr(name) |
| end |
| |
| def []=(name, value) |
| set_attr(name, value) |
| end |
| |
| def inc_attr(name, by=1) |
| set_attr(name, get_attr(name) + by) |
| end |
| |
| def dec_attr(name, by=1) |
| set_attr(name, get_attr(name) - by) |
| end |
| |
| def method_missing(name_in, *args) |
| # |
| # Convert the name to a string and determine if it represents an |
| # attribute assignment (i.e. "attr=") |
| # |
| name = name_in.to_s |
| attr_set = (name[name.length - 1] == 61) |
| name = name[0..name.length - 2] if attr_set |
| raise "Sets not permitted on this object" if attr_set && !@allow_sets |
| |
| # |
| # If the name matches a property name, set or return the value of the property. |
| # |
| @object_class.properties.each do |prop| |
| if prop.name == name |
| if attr_set |
| return set_attr(name, args[0]) |
| else |
| return get_attr(name) |
| end |
| end |
| end |
| |
| # |
| # Do the same for statistics |
| # |
| @object_class.statistics.each do |stat| |
| if stat.name == name |
| if attr_set |
| return set_attr(name, args[0]) |
| else |
| return get_attr(name) |
| end |
| end |
| end |
| |
| # |
| # If we still haven't found a match for the name, check to see if |
| # it matches a method name. If so, marshall the arguments and invoke |
| # the method. |
| # |
| @object_class.methods.each do |method| |
| if method.name == name |
| raise "Sets not permitted on methods" if attr_set |
| timeout = 30 |
| synchronize do |
| @sync_count = 1 |
| @impl.invokeMethod(name, _marshall(method, args), self) |
| @broker.conn.kick if @broker |
| unless @cv.wait(timeout) { @sync_count == 0 } |
| raise "Timed out waiting for response" |
| end |
| end |
| |
| return @sync_result |
| end |
| end |
| |
| # |
| # This name means nothing to us, pass it up the line to the parent |
| # class's handler. |
| # |
| super.method_missing(name_in, args) |
| end |
| |
| def _method_result(result) |
| synchronize do |
| @sync_result = result |
| @sync_count -= 1 |
| @cv.signal |
| end |
| end |
| |
| # |
| # Convert a Ruby array of arguments (positional) into a Value object of type "map". |
| # |
| private |
| def _marshall(schema, args) |
| map = Qmfengine::Value.new(TYPE_MAP) |
| schema.arguments.each do |arg| |
| if arg.direction == DIR_IN || arg.direction == DIR_IN_OUT |
| map.insert(arg.name, Qmfengine::Value.new(arg.typecode)) |
| end |
| end |
| |
| marshalled = Arguments.new(map) |
| idx = 0 |
| schema.arguments.each do |arg| |
| if arg.direction == DIR_IN || arg.direction == DIR_IN_OUT |
| marshalled[arg.name] = args[idx] unless args[idx] == nil |
| idx += 1 |
| end |
| end |
| |
| return marshalled.map |
| end |
| |
| private |
| def value(name) |
| val = @impl.getValue(name.to_s) |
| if val.nil? |
| raise ArgumentError, "Attribute '#{name}' not defined for class #{@object_class.impl.getClassKey.getPackageName}:#{@object_class.impl.getClassKey.getClassName}" |
| end |
| return val |
| end |
| end |
| |
| class AgentObject < QmfObject |
| def initialize(cls, kwargs={}) |
| super(cls, kwargs) |
| @allow_sets = :true |
| end |
| |
| def destroy |
| @impl.destroy |
| end |
| |
| def set_object_id(oid) |
| @impl.setObjectId(oid.impl) |
| end |
| end |
| |
| class ConsoleObject < QmfObject |
| attr_reader :current_time, :create_time, :delete_time |
| |
| def initialize(cls, kwargs={}) |
| super(cls, kwargs) |
| end |
| |
| def update() |
| raise "No linkage to broker" unless @broker |
| newer = @broker.console.objects(Query.new(:object_id => object_id)) |
| raise "Expected exactly one update for this object" unless newer.size == 1 |
| merge_update(newer[0]) |
| end |
| |
| def merge_update(new_object) |
| @impl.merge(new_object.impl) |
| end |
| |
| def deleted?() |
| @impl.isDeleted |
| end |
| |
| def key() |
| end |
| end |
| |
| class ObjectId |
| attr_reader :impl, :agent_key |
| def initialize(impl=nil) |
| if impl |
| @impl = Qmfengine::ObjectId.new(impl) |
| else |
| @impl = Qmfengine::ObjectId.new |
| end |
| @agent_key = "#{@impl.getBrokerBank}.#{@impl.getAgentBank}" |
| end |
| |
| def object_num_high |
| @impl.getObjectNumHi |
| end |
| |
| def object_num_low |
| @impl.getObjectNumLo |
| end |
| |
| def ==(other) |
| return @impl == other.impl |
| end |
| |
| def to_s |
| @impl.str |
| end |
| end |
| |
| class Arguments |
| attr_reader :map |
| def initialize(map) |
| @map = map |
| @by_hash = {} |
| key_count = @map.keyCount |
| a = 0 |
| while a < key_count |
| key = @map.key(a) |
| @by_hash[key] = $util.qmf_to_native(@map.byKey(key)) |
| a += 1 |
| end |
| end |
| |
| def [] (key) |
| return @by_hash[key] |
| end |
| |
| def []= (key, value) |
| @by_hash[key] = value |
| set(key, value) |
| end |
| |
| def each |
| @by_hash.each { |k, v| yield(k, v) } |
| end |
| |
| def method_missing(name, *args) |
| if @by_hash.include?(name.to_s) |
| return @by_hash[name.to_s] |
| end |
| |
| super.method_missing(name, args) |
| end |
| |
| def set(key, value) |
| val = @map.byKey(key) |
| $util.native_to_qmf(val, value) |
| end |
| end |
| |
| class MethodResponse |
| include StringHelpers |
| |
| def initialize(impl) |
| @impl = Qmfengine::MethodResponse.new(impl) |
| end |
| |
| def status |
| @impl.getStatus |
| end |
| |
| def exception |
| @impl.getException |
| end |
| |
| def text |
| ensure_encoding(exception.asString) |
| end |
| |
| def args |
| Arguments.new(@impl.getArgs) |
| end |
| |
| def method_missing(name, *extra_args) |
| args.__send__(name, extra_args) |
| end |
| end |
| |
| ##============================================================================== |
| ## QUERY |
| ##============================================================================== |
| |
| class Query |
| attr_reader :impl |
| def initialize(kwargs = {}) |
| if kwargs.include?(:impl) |
| @impl = Qmfengine::Query.new(kwargs[:impl]) |
| else |
| package = '' |
| if kwargs.include?(:key) |
| @impl = Qmfengine::Query.new(kwargs[:key]) |
| elsif kwargs.include?(:object_id) |
| @impl = Qmfengine::Query.new(kwargs[:object_id].impl) |
| else |
| package = kwargs[:package] if kwargs.include?(:package) |
| if kwargs.include?(:class) |
| @impl = Qmfengine::Query.new(kwargs[:class], package) |
| else |
| raise ArgumentError, "Invalid arguments, use :key, :object_id or :class[,:package]" |
| end |
| end |
| end |
| end |
| |
| def package_name |
| @impl.getPackage |
| end |
| |
| def class_name |
| @impl.getClass |
| end |
| |
| def object_id |
| objid = @impl.getObjectId |
| if objid.class == NilClass |
| return nil |
| end |
| return ObjectId.new(objid) |
| end |
| end |
| |
| ##============================================================================== |
| ## SCHEMA |
| ##============================================================================== |
| |
| class SchemaArgument |
| attr_reader :impl |
| def initialize(name, typecode, kwargs={}) |
| if kwargs.include?(:impl) |
| @impl = kwargs[:impl] |
| else |
| @impl = Qmfengine::SchemaArgument.new(name, typecode) |
| @impl.setDirection(kwargs[:dir]) if kwargs.include?(:dir) |
| @impl.setUnit(kwargs[:unit]) if kwargs.include?(:unit) |
| @impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc) |
| end |
| end |
| |
| def name |
| @impl.getName |
| end |
| |
| def direction |
| @impl.getDirection |
| end |
| |
| def typecode |
| @impl.getType |
| end |
| |
| def to_s |
| name |
| end |
| end |
| |
| class SchemaMethod |
| attr_reader :impl, :arguments |
| def initialize(name, kwargs={}) |
| @arguments = [] |
| if kwargs.include?(:impl) |
| @impl = kwargs[:impl] |
| arg_count = @impl.getArgumentCount |
| for i in 0...arg_count |
| @arguments << SchemaArgument.new(nil, nil, :impl => @impl.getArgument(i)) |
| end |
| else |
| @impl = Qmfengine::SchemaMethod.new(name) |
| @impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc) |
| end |
| end |
| |
| def add_argument(arg) |
| @arguments << arg |
| @impl.addArgument(arg.impl) |
| end |
| |
| def name |
| @impl.getName |
| end |
| |
| def to_s |
| name |
| end |
| end |
| |
| class SchemaProperty |
| attr_reader :impl |
| def initialize(name, typecode, kwargs={}) |
| if kwargs.include?(:impl) |
| @impl = kwargs[:impl] |
| else |
| @impl = Qmfengine::SchemaProperty.new(name, typecode) |
| @impl.setAccess(kwargs[:access]) if kwargs.include?(:access) |
| @impl.setIndex(kwargs[:index]) if kwargs.include?(:index) |
| @impl.setOptional(kwargs[:optional]) if kwargs.include?(:optional) |
| @impl.setUnit(kwargs[:unit]) if kwargs.include?(:unit) |
| @impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc) |
| end |
| end |
| |
| def name |
| @impl.getName |
| end |
| |
| def to_s |
| name |
| end |
| end |
| |
| class SchemaStatistic |
| attr_reader :impl |
| def initialize(name, typecode, kwargs={}) |
| if kwargs.include?(:impl) |
| @impl = kwargs[:impl] |
| else |
| @impl = Qmfengine::SchemaStatistic.new(name, typecode) |
| @impl.setUnit(kwargs[:unit]) if kwargs.include?(:unit) |
| @impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc) |
| end |
| end |
| |
| def name |
| @impl.getName |
| end |
| |
| def to_s |
| name |
| end |
| end |
| |
| class SchemaClassKey |
| include StringHelpers |
| attr_reader :impl |
| def initialize(i) |
| @impl = Qmfengine::SchemaClassKey.new(i) |
| end |
| |
| def package_name |
| @impl.getPackageName |
| end |
| |
| def class_name |
| @impl.getClassName |
| end |
| |
| def to_s |
| ensure_encoding(@impl.asString) |
| end |
| end |
| |
| class SchemaObjectClass |
| attr_reader :impl, :properties, :statistics, :methods |
| def initialize(package, name, kwargs={}) |
| @properties = [] |
| @statistics = [] |
| @methods = [] |
| if kwargs.include?(:impl) |
| @impl = kwargs[:impl] |
| |
| @impl.getPropertyCount.times do |i| |
| @properties << SchemaProperty.new(nil, nil, :impl => @impl.getProperty(i)) |
| end |
| |
| @impl.getStatisticCount.times do |i| |
| @statistics << SchemaStatistic.new(nil, nil, :impl => @impl.getStatistic(i)) |
| end |
| |
| @impl.getMethodCount.times do |i| |
| @methods << SchemaMethod.new(nil, :impl => @impl.getMethod(i)) |
| end |
| else |
| @impl = Qmfengine::SchemaObjectClass.new(package, name) |
| end |
| end |
| |
| def add_property(prop) |
| @properties << prop |
| @impl.addProperty(prop.impl) |
| end |
| |
| def add_statistic(stat) |
| @statistics << stat |
| @impl.addStatistic(stat.impl) |
| end |
| |
| def add_method(meth) |
| @methods << meth |
| @impl.addMethod(meth.impl) |
| end |
| |
| def class_key |
| SchemaClassKey.new(@impl.getClassKey) |
| end |
| |
| def package_name |
| @impl.getClassKey.getPackageName |
| end |
| |
| def class_name |
| @impl.getClassKey.getClassName |
| end |
| end |
| |
| class SchemaEventClass |
| attr_reader :impl, :arguments |
| def initialize(package, name, sev, kwargs={}) |
| @arguments = [] |
| if kwargs.include?(:impl) |
| @impl = kwargs[:impl] |
| @impl.getArgumentCount.times do |i| |
| @arguments << SchemaArgument.new(nil, nil, :impl => @impl.getArgument(i)) |
| end |
| else |
| @impl = Qmfengine::SchemaEventClass.new(package, name, sev) |
| @impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc) |
| end |
| end |
| |
| def add_argument(arg) |
| @arguments << arg |
| @impl.addArgument(arg.impl) |
| end |
| |
| def name |
| @impl.getClassKey.getClassName |
| end |
| |
| def class_key |
| SchemaClassKey.new(@impl.getClassKey) |
| end |
| |
| def package_name |
| @impl.getClassKey.getPackageName |
| end |
| |
| def class_name |
| @impl.getClassKey.getClassName |
| end |
| end |
| |
| ##============================================================================== |
| ## CONSOLE |
| ##============================================================================== |
| |
| class ConsoleHandler |
| def agent_added(agent); end |
| def agent_deleted(agent); end |
| def new_package(package); end |
| def new_class(class_key); end |
| def object_update(object, hasProps, hasStats); end |
| def event_received(event); end |
| def agent_heartbeat(agent, timestamp); end |
| def method_response(resp); end |
| def broker_info(broker); end |
| end |
| |
| class Console |
| include MonitorMixin |
| attr_reader :impl |
| |
| def initialize(handler = nil, kwargs={}) |
| super() |
| @handler = handler |
| @impl = Qmfengine::Console.new |
| @event = Qmfengine::ConsoleEvent.new |
| @broker_list = [] |
| @cv = new_cond |
| @sync_count = nil |
| @sync_result = nil |
| @select = [] |
| @bt_count = 0 |
| @cb_cond = new_cond |
| @cb_thread = Thread.new do |
| run_cb_thread |
| end |
| end |
| |
| def add_connection(conn) |
| broker = Broker.new(self, conn) |
| synchronize { @broker_list << broker } |
| return broker |
| end |
| |
| def del_connection(broker) |
| broker.shutdown |
| synchronize { @broker_list.delete(broker) } |
| end |
| |
| def packages() |
| plist = [] |
| count = @impl.packageCount |
| for i in 0...count |
| plist << @impl.getPackageName(i) |
| end |
| return plist |
| end |
| |
| def classes(package, kind=CLASS_OBJECT) |
| clist = [] |
| count = @impl.classCount(package) |
| for i in 0...count |
| key = @impl.getClass(package, i) |
| class_kind = @impl.getClassKind(key) |
| if class_kind == kind |
| if kind == CLASS_OBJECT |
| clist << SchemaObjectClass.new(nil, nil, :impl => @impl.getObjectClass(key)) |
| elsif kind == CLASS_EVENT |
| clist << SchemaEventClass.new(nil, nil, nil, :impl => @impl.getEventClass(key)) |
| end |
| end |
| end |
| |
| return clist |
| end |
| |
| def bind_package(package) |
| @impl.bindPackage(package) |
| end |
| |
| def bind_class(kwargs = {}) |
| if kwargs.include?(:key) |
| @impl.bindClass(kwargs[:key]) |
| elsif kwargs.include?(:package) |
| package = kwargs[:package] |
| if kwargs.include?(:class) |
| @impl.bindClass(package, kwargs[:class]) |
| else |
| @impl.bindClass(package) |
| end |
| else |
| raise ArgumentError, "Invalid arguments, use :key or :package[,:class]" |
| end |
| end |
| |
| def bind_event(kwargs = {}) |
| if kwargs.include?(:key) |
| @impl.bindEvent(kwargs[:key]) |
| elsif kwargs.include?(:package) |
| package = kwargs[:package] |
| if kwargs.include?(:event) |
| @impl.bindEvent(package, kwargs[:event]) |
| else |
| @impl.bindEvent(package, "*") |
| end |
| else |
| raise ArgumentError, "Invalid arguments, use :key or :package[,:event]" |
| end |
| end |
| |
| def agents(broker = nil) |
| blist = [] |
| if broker |
| blist << broker |
| else |
| synchronize { blist = @broker_list } |
| end |
| |
| agents = [] |
| blist.each do |b| |
| count = b.impl.agentCount |
| for idx in 0...count |
| agents << AgentProxy.new(b.impl.getAgent(idx), b) |
| end |
| end |
| |
| return agents |
| end |
| |
| def objects(query, kwargs = {}) |
| timeout = 30 |
| agent = nil |
| kwargs.merge!(query) if query.class == Hash |
| |
| if kwargs.include?(:timeout) |
| timeout = kwargs[:timeout] |
| kwargs.delete(:timeout) |
| end |
| |
| if kwargs.include?(:agent) |
| agent = kwargs[:agent] |
| kwargs.delete(:agent) |
| end |
| |
| query = Query.new(kwargs) if query.class == Hash |
| |
| @select = [] |
| kwargs.each do |k,v| |
| @select << [k, v] if k.is_a?(String) |
| end |
| |
| synchronize do |
| @sync_count = 1 |
| @sync_result = [] |
| broker = nil |
| synchronize { broker = @broker_list[0] } |
| broker.send_query(query.impl, nil, agent) |
| unless @cv.wait(timeout) { @sync_count == 0 } |
| raise "Timed out waiting for response" |
| end |
| |
| return @sync_result |
| end |
| end |
| |
| # Return one and only one object or nil. |
| def object(query, kwargs = {}) |
| objs = objects(query, kwargs) |
| return objs.length == 1 ? objs[0] : nil |
| end |
| |
| # Return the first of potentially many objects. |
| def first_object(query, kwargs = {}) |
| objs = objects(query, kwargs) |
| return objs.length > 0 ? objs[0] : nil |
| end |
| |
| # Check the object against select to check for a match |
| def select_match(object) |
| @select.each do |key, value| |
| object.properties.each do |prop, propval| |
| if key == prop.name && value != propval |
| return nil |
| end |
| end |
| end |
| return :true |
| end |
| |
| def _get_result(list, context) |
| synchronize do |
| list.each do |item| |
| @sync_result << item if select_match(item) |
| end |
| @sync_count -= 1 |
| @cv.signal |
| end |
| end |
| |
| def start_sync(query) |
| end |
| |
| def touch_sync(sync) |
| end |
| |
| def end_sync(sync) |
| end |
| |
| def run_cb_thread |
| while :true |
| synchronize { @cb_cond.wait(1) } |
| begin |
| count = do_console_events |
| end until count == 0 |
| end |
| end |
| |
| def start_console_events |
| synchronize { @cb_cond.signal } |
| end |
| |
| def do_console_events |
| count = 0 |
| valid = @impl.getEvent(@event) |
| while valid |
| count += 1 |
| begin |
| case @event.kind |
| when Qmfengine::ConsoleEvent::AGENT_ADDED |
| @handler.agent_added(AgentProxy.new(@event.agent, nil)) if @handler |
| when Qmfengine::ConsoleEvent::AGENT_DELETED |
| @handler.agent_deleted(AgentProxy.new(@event.agent, nil)) if @handler |
| when Qmfengine::ConsoleEvent::NEW_PACKAGE |
| @handler.new_package(@event.name) if @handler |
| when Qmfengine::ConsoleEvent::NEW_CLASS |
| @handler.new_class(SchemaClassKey.new(@event.classKey)) if @handler |
| when Qmfengine::ConsoleEvent::OBJECT_UPDATE |
| @handler.object_update(ConsoleObject.new(nil, :impl => @event.object), @event.hasProps, @event.hasStats) if @handler |
| when Qmfengine::ConsoleEvent::EVENT_RECEIVED |
| @handler.event_received(QmfEvent.new(nil, :impl => @event.event)) if @handler |
| when Qmfengine::ConsoleEvent::AGENT_HEARTBEAT |
| @handler.agent_heartbeat(AgentProxy.new(@event.agent, nil), @event.timestamp) if @handler |
| when Qmfengine::ConsoleEvent::METHOD_RESPONSE |
| end |
| rescue Exception => ex |
| if @bt_count < 2 |
| @bt_count += 1 |
| end |
| end |
| @impl.popEvent |
| valid = @impl.getEvent(@event) |
| end |
| return count |
| end |
| end |
| |
| class AgentProxy |
| attr_reader :impl, :broker, :label, :key |
| |
| def initialize(impl, broker) |
| @impl = Qmfengine::AgentProxy.new(impl) |
| @broker = broker |
| @label = @impl.getLabel |
| @key = "#{@impl.getBrokerBank}.#{@impl.getAgentBank}" |
| end |
| end |
| |
| class Broker < ConnectionHandler |
| include MonitorMixin |
| attr_reader :impl, :conn, :console, :broker_bank |
| |
| def initialize(console, conn) |
| super() |
| @broker_bank = 1 |
| @console = console |
| @conn = conn |
| @session = nil |
| @cv = new_cond |
| @stable = nil |
| @event = Qmfengine::BrokerEvent.new |
| @xmtMessage = Qmfengine::Message.new |
| @impl = Qmfengine::BrokerProxy.new(@console.impl) |
| @console.impl.addConnection(@impl, self) |
| @conn.add_conn_handler(self) |
| @operational = :true |
| end |
| |
| def shutdown() |
| @console.impl.delConnection(@impl) |
| @conn.del_conn_handler(self) |
| @operational = :false |
| end |
| |
| def wait_for_stable(timeout = nil) |
| synchronize do |
| return if @stable |
| if timeout |
| unless @cv.wait(timeout) { @stable } |
| raise "Timed out waiting for broker connection to become stable" |
| end |
| else |
| while not @stable |
| @cv.wait |
| end |
| end |
| end |
| end |
| |
| def send_query(query, ctx, agent) |
| agent_impl = agent.impl if agent |
| @impl.sendQuery(query, ctx, agent_impl) |
| @conn.kick |
| end |
| |
| def do_broker_events() |
| count = 0 |
| valid = @impl.getEvent(@event) |
| while valid |
| count += 1 |
| case @event.kind |
| when Qmfengine::BrokerEvent::BROKER_INFO |
| when Qmfengine::BrokerEvent::DECLARE_QUEUE |
| @conn.impl.declareQueue(@session.handle, @event.name) |
| when Qmfengine::BrokerEvent::DELETE_QUEUE |
| @conn.impl.deleteQueue(@session.handle, @event.name) |
| when Qmfengine::BrokerEvent::BIND |
| @conn.impl.bind(@session.handle, @event.exchange, @event.name, @event.bindingKey) |
| when Qmfengine::BrokerEvent::UNBIND |
| @conn.impl.unbind(@session.handle, @event.exchange, @event.name, @event.bindingKey) |
| when Qmfengine::BrokerEvent::SETUP_COMPLETE |
| @impl.startProtocol |
| when Qmfengine::BrokerEvent::STABLE |
| synchronize do |
| @stable = :true |
| @cv.signal |
| end |
| when Qmfengine::BrokerEvent::QUERY_COMPLETE |
| result = [] |
| for idx in 0...@event.queryResponse.getObjectCount |
| result << ConsoleObject.new(nil, :impl => @event.queryResponse.getObject(idx), :broker => self) |
| end |
| @console._get_result(result, @event.context) |
| when Qmfengine::BrokerEvent::METHOD_RESPONSE |
| obj = @event.context |
| obj._method_result(MethodResponse.new(@event.methodResponse)) |
| end |
| @impl.popEvent |
| valid = @impl.getEvent(@event) |
| end |
| return count |
| end |
| |
| def do_broker_messages() |
| count = 0 |
| valid = @impl.getXmtMessage(@xmtMessage) |
| while valid |
| count += 1 |
| @conn.impl.sendMessage(@session.handle, @xmtMessage) |
| @impl.popXmt |
| valid = @impl.getXmtMessage(@xmtMessage) |
| end |
| return count |
| end |
| |
| def do_events() |
| begin |
| @console.start_console_events |
| bcnt = do_broker_events |
| mcnt = do_broker_messages |
| end until bcnt == 0 and mcnt == 0 |
| end |
| |
| def conn_event_connected() |
| @session = Session.new(@conn, "qmfc-%s.%d" % [Socket.gethostname, Process::pid], self) |
| @impl.sessionOpened(@session.handle) |
| do_events |
| end |
| |
| def conn_event_disconnected(error) |
| |
| end |
| |
| def conn_event_visit |
| do_events |
| end |
| |
| def sess_event_session_closed(context, error) |
| @impl.sessionClosed() |
| end |
| |
| def sess_event_recv(context, message) |
| @impl.handleRcvMessage(message) |
| do_events |
| end |
| end |
| |
| ##============================================================================== |
| ## AGENT |
| ##============================================================================== |
| |
| class AgentHandler |
| def get_query(context, query, userId); end |
| def method_call(context, name, object_id, args, userId); end |
| end |
| |
| class Agent < ConnectionHandler |
| def initialize(handler, label="") |
| if label == "" |
| @agentLabel = "rb-%s.%d" % [Socket.gethostname, Process::pid] |
| else |
| @agentLabel = label |
| end |
| @conn = nil |
| @handler = handler |
| @impl = Qmfengine::Agent.new(@agentLabel) |
| @event = Qmfengine::AgentEvent.new |
| @xmtMessage = Qmfengine::Message.new |
| end |
| |
| def set_connection(conn) |
| @conn = conn |
| @conn.add_conn_handler(self) |
| end |
| |
| def register_class(cls) |
| @impl.registerClass(cls.impl) |
| end |
| |
| def alloc_object_id(low = 0, high = 0) |
| ObjectId.new(@impl.allocObjectId(low, high)) |
| end |
| |
| def raise_event(event) |
| @impl.raiseEvent(event.impl) |
| end |
| |
| def query_response(context, object) |
| @impl.queryResponse(context, object.impl) |
| end |
| |
| def query_complete(context) |
| @impl.queryComplete(context) |
| end |
| |
| def method_response(context, status, text, arguments) |
| @impl.methodResponse(context, status, text, arguments.map) |
| end |
| |
| def do_agent_events() |
| count = 0 |
| valid = @impl.getEvent(@event) |
| while valid |
| count += 1 |
| case @event.kind |
| when Qmfengine::AgentEvent::GET_QUERY |
| @handler.get_query(@event.sequence, Query.new(:impl => @event.query), @event.authUserId) |
| when Qmfengine::AgentEvent::START_SYNC |
| when Qmfengine::AgentEvent::END_SYNC |
| when Qmfengine::AgentEvent::METHOD_CALL |
| args = Arguments.new(@event.arguments) |
| @handler.method_call(@event.sequence, @event.name, ObjectId.new(@event.objectId), |
| args, @event.authUserId) |
| when Qmfengine::AgentEvent::DECLARE_QUEUE |
| @conn.impl.declareQueue(@session.handle, @event.name) |
| when Qmfengine::AgentEvent::DELETE_QUEUE |
| @conn.impl.deleteQueue(@session.handle, @event.name) |
| when Qmfengine::AgentEvent::BIND |
| @conn.impl.bind(@session.handle, @event.exchange, @event.name, @event.bindingKey) |
| when Qmfengine::AgentEvent::UNBIND |
| @conn.impl.unbind(@session.handle, @event.exchange, @event.name, @event.bindingKey) |
| when Qmfengine::AgentEvent::SETUP_COMPLETE |
| @impl.startProtocol() |
| end |
| @impl.popEvent |
| valid = @impl.getEvent(@event) |
| end |
| return count |
| end |
| |
| def do_agent_messages() |
| count = 0 |
| valid = @impl.getXmtMessage(@xmtMessage) |
| while valid |
| count += 1 |
| @conn.impl.sendMessage(@session.handle, @xmtMessage) |
| @impl.popXmt |
| valid = @impl.getXmtMessage(@xmtMessage) |
| end |
| return count |
| end |
| |
| def do_events() |
| begin |
| ecnt = do_agent_events |
| mcnt = do_agent_messages |
| end until ecnt == 0 and mcnt == 0 |
| end |
| |
| def conn_event_connected() |
| @session = Session.new(@conn, "qmfa-%s.%d" % [Socket.gethostname, Process::pid], self) |
| @impl.newSession |
| do_events |
| end |
| |
| def conn_event_disconnected(error) |
| |
| end |
| |
| def conn_event_visit |
| do_events |
| end |
| |
| def sess_event_session_closed(context, error) |
| |
| end |
| |
| def sess_event_recv(context, message) |
| @impl.handleRcvMessage(message) |
| do_events |
| end |
| end |
| end |