blob: d05127db4bf802ca1066b71b4459a7fdb055996d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
require 'qmfengine'
require 'thread'
require 'socket'
require 'monitor'
module Qmf
# Pull all the TYPE_* constants into Qmf namespace. Maybe there's an easier way?
Qmfengine.constants.each do |c|
c = c.to_s
if c.index('TYPE_') == 0 or c.index('ACCESS_') == 0 or c.index('DIR_') == 0 or
c.index('CLASS_') == 0 or c.index('SEV_') == 0
const_set(c, Qmfengine.const_get(c))
end
end
module StringHelpers
def ensure_encoding(str)
enc = (Encoding.default_external.name || "UTF-8" rescue "UTF-8")
str.respond_to?(:force_encoding) ? str.force_encoding(enc) : str
end
end
class Util
include StringHelpers
def qmf_to_native(val)
case val.getType
when TYPE_UINT8, TYPE_UINT16, TYPE_UINT32 then val.asUint
when TYPE_UINT64 then val.asUint64
when TYPE_SSTR, TYPE_LSTR then ensure_encoding(val.asString)
when TYPE_ABSTIME then val.asInt64
when TYPE_DELTATIME then val.asUint64
when TYPE_REF then ObjectId.new(val.asObjectId)
when TYPE_BOOL then val.asBool
when TYPE_FLOAT then val.asFloat
when TYPE_DOUBLE then val.asDouble
when TYPE_UUID then val.asUuid
when TYPE_INT8, TYPE_INT16, TYPE_INT32 then val.asInt
when TYPE_INT64 then val.asInt64
when TYPE_MAP then value_to_dict(val)
when TYPE_LIST then value_to_list(val)
when TYPE_OBJECT
when TYPE_ARRAY
end
end
def native_to_qmf(target, value)
if target.class == Qmfengine::Value
val = target
typecode = val.getType
else
typecode = target
val = Qmfengine::Value.new(typecode)
end
case typecode
when TYPE_UINT8, TYPE_UINT16, TYPE_UINT32 then val.setUint(value)
when TYPE_UINT64 then val.setUint64(value)
when TYPE_SSTR, TYPE_LSTR then value ? val.setString(value) : val.setString('')
when TYPE_ABSTIME then val.setInt64(value)
when TYPE_DELTATIME then val.setUint64(value)
when TYPE_REF then val.setObjectId(value.impl)
when TYPE_BOOL then value ? val.setBool(value) : val.setBool(0)
when TYPE_FLOAT then val.setFloat(value)
when TYPE_DOUBLE then val.setDouble(value)
when TYPE_UUID then val.setUuid(value)
when TYPE_INT8, TYPE_INT16, TYPE_INT32 then val.setInt(value)
when TYPE_INT64 then val.setInt64(value)
when TYPE_MAP then dict_to_value(val, value)
when TYPE_LIST then list_to_value(val, value)
when TYPE_OBJECT
when TYPE_ARRAY
end
return val
end
def pick_qmf_type(value)
if value.class == Fixnum
if value >= 0
return TYPE_UINT32 if value < 0x100000000
return TYPE_UINT64
else
return TYPE_INT32 if value > -0xffffffff
return TYPE_INT64
end
end
if value.class == Bignum
return TYPE_UINT64 if value >= 0
return TYPE_INT64
end
if value.class == String
return TYPE_SSTR if value.length < 256
return TYPE_LSTR
end
return TYPE_DOUBLE if value.class == Float
return TYPE_BOOL if value.class == TrueClass
return TYPE_BOOL if value.class == FalseClass
return TYPE_BOOL if value.class == NilClass
return TYPE_MAP if value.class == Hash
return TYPE_LIST if value.class == Array
raise ArgumentError, "QMF type not known for native type #{value.class}"
end
def value_to_dict(val)
# Assume val is of type Qmfengine::Value
raise ArgumentError, "value_to_dict must be given a map value" if !val.isMap
map = {}
for i in 0...val.keyCount
key = val.key(i)
map[key] = qmf_to_native(val.byKey(key))
end
return map
end
def dict_to_value(val, map)
map.each do |key, value|
raise ArgumentError, "QMF map key must be a string" if key.class != String
typecode = pick_qmf_type(value)
val.insert(key, native_to_qmf(typecode, value))
end
end
def value_to_list(val)
# Assume val is of type Qmfengine::Value
raise ArgumentError, "value_to_dict must be given a map value" if !val.isList
list = []
for i in 0...val.listItemCount
list.push(qmf_to_native(val.listItem(i)))
end
return list
end
def list_to_value(val, list)
list.each do |value|
typecode = pick_qmf_type(value)
val.appendToList(native_to_qmf(typecode, value))
end
end
end
$util = Util.new
##==============================================================================
## CONNECTION
##==============================================================================
class ConnectionSettings
include StringHelpers
attr_reader :impl
def initialize(url = nil)
if url
@impl = Qmfengine::ConnectionSettings.new(url)
else
@impl = Qmfengine::ConnectionSettings.new()
end
end
def set_attr(key, val)
if val.class == String
v = Qmfengine::Value.new(TYPE_LSTR)
v.setString(val)
elsif val.class == TrueClass or val.class == FalseClass
v = Qmfengine::Value.new(TYPE_BOOL)
v.setBool(val)
elsif val.class == Fixnum
v = Qmfengine::Value.new(TYPE_UINT32)
v.setUint(val)
else
raise ArgumentError, "Value for attribute '#{key}' has unsupported type: #{val.class}"
end
good = @impl.setAttr(key, v)
raise "Invalid attribute '#{key}'" unless good
end
def get_attr(key)
_v = @impl.getAttr(key)
if _v.isString()
return ensure_encoding(_v.asString())
elsif _v.isUint()
return _v.asUint()
elsif _v.isBool()
return _v.asBool()
else
raise Exception("Argument error: value for attribute '#{key}' has unsupported type: #{_v.getType()}")
end
end
def method_missing(name_in, *args)
name = name_in.to_s
if name[name.length - 1] == 61
attr = name[0..name.length - 2]
set_attr(attr, args[0])
return
else
return get_attr(name)
end
end
end
class ConnectionHandler
def conn_event_connected(); end
def conn_event_disconnected(error); end
def conn_event_visit(); end
def sess_event_session_closed(context, error); end
def sess_event_recv(context, message); end
end
class Connection
include MonitorMixin
attr_reader :impl
def initialize(settings)
super()
@impl = Qmfengine::ResilientConnection.new(settings.impl)
@sockEngine, @sock = Socket::socketpair(Socket::PF_UNIX, Socket::SOCK_STREAM, 0)
@impl.setNotifyFd(@sockEngine.fileno)
@new_conn_handlers = []
@conn_handlers_to_delete = []
@conn_handlers = []
@connected = nil
@thread = Thread.new do
run
end
end
def connected?
@connected
end
def kick
@impl.notify
end
def add_conn_handler(handler)
synchronize do
@new_conn_handlers << handler
end
kick
end
def del_conn_handler(handler)
synchronize do
@conn_handlers_to_delete << handler
end
kick
end
def run()
eventImpl = Qmfengine::ResilientConnectionEvent.new
new_handlers = nil
del_handlers = nil
bt_count = 0
while :true
@sock.read(1)
synchronize do
new_handlers = @new_conn_handlers
del_handlers = @conn_handlers_to_delete
@new_conn_handlers = []
@conn_handlers_to_delete = []
end
new_handlers.each do |nh|
@conn_handlers << nh
nh.conn_event_connected() if @connected
end
new_handlers = nil
del_handlers.each do |dh|
d = @conn_handlers.delete(dh)
end
del_handlers = nil
valid = @impl.getEvent(eventImpl)
while valid
begin
case eventImpl.kind
when Qmfengine::ResilientConnectionEvent::CONNECTED
@connected = :true
@conn_handlers.each { |h| h.conn_event_connected() }
when Qmfengine::ResilientConnectionEvent::DISCONNECTED
@connected = nil
@conn_handlers.each { |h| h.conn_event_disconnected(eventImpl.errorText) }
when Qmfengine::ResilientConnectionEvent::SESSION_CLOSED
eventImpl.sessionContext.handler.sess_event_session_closed(eventImpl.sessionContext, eventImpl.errorText)
when Qmfengine::ResilientConnectionEvent::RECV
eventImpl.sessionContext.handler.sess_event_recv(eventImpl.sessionContext, eventImpl.message)
end
rescue Exception => ex
if bt_count < 2
bt_count += 1
end
end
@impl.popEvent
valid = @impl.getEvent(eventImpl)
end
@conn_handlers.each { |h| h.conn_event_visit }
end
end
end
class Session
attr_reader :handle, :handler
def initialize(conn, label, handler)
@conn = conn
@label = label
@handler = handler
@handle = Qmfengine::SessionHandle.new
result = @conn.impl.createSession(label, self, @handle)
end
def destroy()
@conn.impl.destroySession(@handle)
end
end
##==============================================================================
## OBJECTS and EVENTS
##==============================================================================
class QmfEvent
attr_reader :impl, :event_class
def initialize(cls, kwargs={})
@broker = kwargs[:broker] if kwargs.include?(:broker)
@allow_sets = :true
if cls
@event_class = cls
@impl = Qmfengine::Event.new(@event_class.impl)
elsif kwargs.include?(:impl)
@impl = Qmfengine::Event.new(kwargs[:impl])
@event_class = SchemaEventClass.new(nil, nil, nil, :impl => @impl.getClass)
end
end
def arguments
list = []
@event_class.arguments.each do |arg|
list << [arg, get_attr(arg.name)]
end
return list
end
def get_attr(name)
val = value(name)
$util.qmf_to_native(val)
end
def set_attr(name, v)
val = value(name)
$util.native_to_qmf(val, v)
end
def [](name)
get_attr(name)
end
def []=(name, value)
set_attr(name, value)
end
def method_missing(name_in, *args)
#
# Convert the name to a string and determine if it represents an
# attribute assignment (i.e. "attr=")
#
name = name_in.to_s
attr_set = (name[name.length - 1] == 61)
name = name[0..name.length - 2] if attr_set
raise "Sets not permitted on this object" if attr_set && !@allow_sets
#
# If the name matches an argument name, set or return the value of the argument.
#
@event_class.arguments.each do |arg|
if arg.name == name
if attr_set
return set_attr(name, args[0])
else
return get_attr(name)
end
end
end
#
# This name means nothing to us, pass it up the line to the parent
# class's handler.
#
super.method_missing(name_in, args)
end
private
def value(name)
val = @impl.getValue(name.to_s)
if val.nil?
raise ArgumentError, "Attribute '#{name}' not defined for event #{@event_class.impl.getClassKey.getPackageName}:#{@object_class.impl.getClassKey.getClassName}"
end
return val
end
end
class QmfObject
include MonitorMixin
attr_reader :impl, :object_class
def initialize(cls, kwargs={})
super()
@cv = new_cond
@sync_count = 0
@sync_result = nil
@allow_sets = :false
@broker = kwargs[:broker] if kwargs.include?(:broker)
if cls
@object_class = cls
@impl = Qmfengine::Object.new(@object_class.impl)
elsif kwargs.include?(:impl)
@impl = Qmfengine::Object.new(kwargs[:impl])
@object_class = SchemaObjectClass.new(nil, nil, :impl => @impl.getClass)
end
end
def object_id
return ObjectId.new(@impl.getObjectId)
end
def properties
list = []
@object_class.properties.each do |prop|
list << [prop, get_attr(prop.name)]
end
return list
end
def statistics
list = []
@object_class.statistics.each do |stat|
list << [stat, get_attr(stat.name)]
end
return list
end
def get_attr(name)
val = value(name)
$util.qmf_to_native(val)
end
def set_attr(name, v)
val = value(name)
$util.native_to_qmf(val, v)
end
def [](name)
get_attr(name)
end
def []=(name, value)
set_attr(name, value)
end
def inc_attr(name, by=1)
set_attr(name, get_attr(name) + by)
end
def dec_attr(name, by=1)
set_attr(name, get_attr(name) - by)
end
def method_missing(name_in, *args)
#
# Convert the name to a string and determine if it represents an
# attribute assignment (i.e. "attr=")
#
name = name_in.to_s
attr_set = (name[name.length - 1] == 61)
name = name[0..name.length - 2] if attr_set
raise "Sets not permitted on this object" if attr_set && !@allow_sets
#
# If the name matches a property name, set or return the value of the property.
#
@object_class.properties.each do |prop|
if prop.name == name
if attr_set
return set_attr(name, args[0])
else
return get_attr(name)
end
end
end
#
# Do the same for statistics
#
@object_class.statistics.each do |stat|
if stat.name == name
if attr_set
return set_attr(name, args[0])
else
return get_attr(name)
end
end
end
#
# If we still haven't found a match for the name, check to see if
# it matches a method name. If so, marshall the arguments and invoke
# the method.
#
@object_class.methods.each do |method|
if method.name == name
raise "Sets not permitted on methods" if attr_set
timeout = 30
synchronize do
@sync_count = 1
@impl.invokeMethod(name, _marshall(method, args), self)
@broker.conn.kick if @broker
unless @cv.wait(timeout) { @sync_count == 0 }
raise "Timed out waiting for response"
end
end
return @sync_result
end
end
#
# This name means nothing to us, pass it up the line to the parent
# class's handler.
#
super.method_missing(name_in, args)
end
def _method_result(result)
synchronize do
@sync_result = result
@sync_count -= 1
@cv.signal
end
end
#
# Convert a Ruby array of arguments (positional) into a Value object of type "map".
#
private
def _marshall(schema, args)
map = Qmfengine::Value.new(TYPE_MAP)
schema.arguments.each do |arg|
if arg.direction == DIR_IN || arg.direction == DIR_IN_OUT
map.insert(arg.name, Qmfengine::Value.new(arg.typecode))
end
end
marshalled = Arguments.new(map)
idx = 0
schema.arguments.each do |arg|
if arg.direction == DIR_IN || arg.direction == DIR_IN_OUT
marshalled[arg.name] = args[idx] unless args[idx] == nil
idx += 1
end
end
return marshalled.map
end
private
def value(name)
val = @impl.getValue(name.to_s)
if val.nil?
raise ArgumentError, "Attribute '#{name}' not defined for class #{@object_class.impl.getClassKey.getPackageName}:#{@object_class.impl.getClassKey.getClassName}"
end
return val
end
end
class AgentObject < QmfObject
def initialize(cls, kwargs={})
super(cls, kwargs)
@allow_sets = :true
end
def destroy
@impl.destroy
end
def set_object_id(oid)
@impl.setObjectId(oid.impl)
end
end
class ConsoleObject < QmfObject
attr_reader :current_time, :create_time, :delete_time
def initialize(cls, kwargs={})
super(cls, kwargs)
end
def update()
raise "No linkage to broker" unless @broker
newer = @broker.console.objects(Query.new(:object_id => object_id))
raise "Expected exactly one update for this object" unless newer.size == 1
merge_update(newer[0])
end
def merge_update(new_object)
@impl.merge(new_object.impl)
end
def deleted?()
@impl.isDeleted
end
def key()
end
end
class ObjectId
attr_reader :impl, :agent_key
def initialize(impl=nil)
if impl
@impl = Qmfengine::ObjectId.new(impl)
else
@impl = Qmfengine::ObjectId.new
end
@agent_key = "#{@impl.getBrokerBank}.#{@impl.getAgentBank}"
end
def object_num_high
@impl.getObjectNumHi
end
def object_num_low
@impl.getObjectNumLo
end
def ==(other)
return @impl == other.impl
end
def to_s
@impl.str
end
end
class Arguments
attr_reader :map
def initialize(map)
@map = map
@by_hash = {}
key_count = @map.keyCount
a = 0
while a < key_count
key = @map.key(a)
@by_hash[key] = $util.qmf_to_native(@map.byKey(key))
a += 1
end
end
def [] (key)
return @by_hash[key]
end
def []= (key, value)
@by_hash[key] = value
set(key, value)
end
def each
@by_hash.each { |k, v| yield(k, v) }
end
def method_missing(name, *args)
if @by_hash.include?(name.to_s)
return @by_hash[name.to_s]
end
super.method_missing(name, args)
end
def set(key, value)
val = @map.byKey(key)
$util.native_to_qmf(val, value)
end
end
class MethodResponse
include StringHelpers
def initialize(impl)
@impl = Qmfengine::MethodResponse.new(impl)
end
def status
@impl.getStatus
end
def exception
@impl.getException
end
def text
ensure_encoding(exception.asString)
end
def args
Arguments.new(@impl.getArgs)
end
def method_missing(name, *extra_args)
args.__send__(name, extra_args)
end
end
##==============================================================================
## QUERY
##==============================================================================
class Query
attr_reader :impl
def initialize(kwargs = {})
if kwargs.include?(:impl)
@impl = Qmfengine::Query.new(kwargs[:impl])
else
package = ''
if kwargs.include?(:key)
@impl = Qmfengine::Query.new(kwargs[:key])
elsif kwargs.include?(:object_id)
@impl = Qmfengine::Query.new(kwargs[:object_id].impl)
else
package = kwargs[:package] if kwargs.include?(:package)
if kwargs.include?(:class)
@impl = Qmfengine::Query.new(kwargs[:class], package)
else
raise ArgumentError, "Invalid arguments, use :key, :object_id or :class[,:package]"
end
end
end
end
def package_name
@impl.getPackage
end
def class_name
@impl.getClass
end
def object_id
objid = @impl.getObjectId
if objid.class == NilClass
return nil
end
return ObjectId.new(objid)
end
end
##==============================================================================
## SCHEMA
##==============================================================================
class SchemaArgument
attr_reader :impl
def initialize(name, typecode, kwargs={})
if kwargs.include?(:impl)
@impl = kwargs[:impl]
else
@impl = Qmfengine::SchemaArgument.new(name, typecode)
@impl.setDirection(kwargs[:dir]) if kwargs.include?(:dir)
@impl.setUnit(kwargs[:unit]) if kwargs.include?(:unit)
@impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc)
end
end
def name
@impl.getName
end
def direction
@impl.getDirection
end
def typecode
@impl.getType
end
def to_s
name
end
end
class SchemaMethod
attr_reader :impl, :arguments
def initialize(name, kwargs={})
@arguments = []
if kwargs.include?(:impl)
@impl = kwargs[:impl]
arg_count = @impl.getArgumentCount
for i in 0...arg_count
@arguments << SchemaArgument.new(nil, nil, :impl => @impl.getArgument(i))
end
else
@impl = Qmfengine::SchemaMethod.new(name)
@impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc)
end
end
def add_argument(arg)
@arguments << arg
@impl.addArgument(arg.impl)
end
def name
@impl.getName
end
def to_s
name
end
end
class SchemaProperty
attr_reader :impl
def initialize(name, typecode, kwargs={})
if kwargs.include?(:impl)
@impl = kwargs[:impl]
else
@impl = Qmfengine::SchemaProperty.new(name, typecode)
@impl.setAccess(kwargs[:access]) if kwargs.include?(:access)
@impl.setIndex(kwargs[:index]) if kwargs.include?(:index)
@impl.setOptional(kwargs[:optional]) if kwargs.include?(:optional)
@impl.setUnit(kwargs[:unit]) if kwargs.include?(:unit)
@impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc)
end
end
def name
@impl.getName
end
def to_s
name
end
end
class SchemaStatistic
attr_reader :impl
def initialize(name, typecode, kwargs={})
if kwargs.include?(:impl)
@impl = kwargs[:impl]
else
@impl = Qmfengine::SchemaStatistic.new(name, typecode)
@impl.setUnit(kwargs[:unit]) if kwargs.include?(:unit)
@impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc)
end
end
def name
@impl.getName
end
def to_s
name
end
end
class SchemaClassKey
include StringHelpers
attr_reader :impl
def initialize(i)
@impl = Qmfengine::SchemaClassKey.new(i)
end
def package_name
@impl.getPackageName
end
def class_name
@impl.getClassName
end
def to_s
ensure_encoding(@impl.asString)
end
end
class SchemaObjectClass
attr_reader :impl, :properties, :statistics, :methods
def initialize(package, name, kwargs={})
@properties = []
@statistics = []
@methods = []
if kwargs.include?(:impl)
@impl = kwargs[:impl]
@impl.getPropertyCount.times do |i|
@properties << SchemaProperty.new(nil, nil, :impl => @impl.getProperty(i))
end
@impl.getStatisticCount.times do |i|
@statistics << SchemaStatistic.new(nil, nil, :impl => @impl.getStatistic(i))
end
@impl.getMethodCount.times do |i|
@methods << SchemaMethod.new(nil, :impl => @impl.getMethod(i))
end
else
@impl = Qmfengine::SchemaObjectClass.new(package, name)
end
end
def add_property(prop)
@properties << prop
@impl.addProperty(prop.impl)
end
def add_statistic(stat)
@statistics << stat
@impl.addStatistic(stat.impl)
end
def add_method(meth)
@methods << meth
@impl.addMethod(meth.impl)
end
def class_key
SchemaClassKey.new(@impl.getClassKey)
end
def package_name
@impl.getClassKey.getPackageName
end
def class_name
@impl.getClassKey.getClassName
end
end
class SchemaEventClass
attr_reader :impl, :arguments
def initialize(package, name, sev, kwargs={})
@arguments = []
if kwargs.include?(:impl)
@impl = kwargs[:impl]
@impl.getArgumentCount.times do |i|
@arguments << SchemaArgument.new(nil, nil, :impl => @impl.getArgument(i))
end
else
@impl = Qmfengine::SchemaEventClass.new(package, name, sev)
@impl.setDesc(kwargs[:desc]) if kwargs.include?(:desc)
end
end
def add_argument(arg)
@arguments << arg
@impl.addArgument(arg.impl)
end
def name
@impl.getClassKey.getClassName
end
def class_key
SchemaClassKey.new(@impl.getClassKey)
end
def package_name
@impl.getClassKey.getPackageName
end
def class_name
@impl.getClassKey.getClassName
end
end
##==============================================================================
## CONSOLE
##==============================================================================
class ConsoleHandler
def agent_added(agent); end
def agent_deleted(agent); end
def new_package(package); end
def new_class(class_key); end
def object_update(object, hasProps, hasStats); end
def event_received(event); end
def agent_heartbeat(agent, timestamp); end
def method_response(resp); end
def broker_info(broker); end
end
class Console
include MonitorMixin
attr_reader :impl
def initialize(handler = nil, kwargs={})
super()
@handler = handler
@impl = Qmfengine::Console.new
@event = Qmfengine::ConsoleEvent.new
@broker_list = []
@cv = new_cond
@sync_count = nil
@sync_result = nil
@select = []
@bt_count = 0
@cb_cond = new_cond
@cb_thread = Thread.new do
run_cb_thread
end
end
def add_connection(conn)
broker = Broker.new(self, conn)
synchronize { @broker_list << broker }
return broker
end
def del_connection(broker)
broker.shutdown
synchronize { @broker_list.delete(broker) }
end
def packages()
plist = []
count = @impl.packageCount
for i in 0...count
plist << @impl.getPackageName(i)
end
return plist
end
def classes(package, kind=CLASS_OBJECT)
clist = []
count = @impl.classCount(package)
for i in 0...count
key = @impl.getClass(package, i)
class_kind = @impl.getClassKind(key)
if class_kind == kind
if kind == CLASS_OBJECT
clist << SchemaObjectClass.new(nil, nil, :impl => @impl.getObjectClass(key))
elsif kind == CLASS_EVENT
clist << SchemaEventClass.new(nil, nil, nil, :impl => @impl.getEventClass(key))
end
end
end
return clist
end
def bind_package(package)
@impl.bindPackage(package)
end
def bind_class(kwargs = {})
if kwargs.include?(:key)
@impl.bindClass(kwargs[:key])
elsif kwargs.include?(:package)
package = kwargs[:package]
if kwargs.include?(:class)
@impl.bindClass(package, kwargs[:class])
else
@impl.bindClass(package)
end
else
raise ArgumentError, "Invalid arguments, use :key or :package[,:class]"
end
end
def bind_event(kwargs = {})
if kwargs.include?(:key)
@impl.bindEvent(kwargs[:key])
elsif kwargs.include?(:package)
package = kwargs[:package]
if kwargs.include?(:event)
@impl.bindEvent(package, kwargs[:event])
else
@impl.bindEvent(package, "*")
end
else
raise ArgumentError, "Invalid arguments, use :key or :package[,:event]"
end
end
def agents(broker = nil)
blist = []
if broker
blist << broker
else
synchronize { blist = @broker_list }
end
agents = []
blist.each do |b|
count = b.impl.agentCount
for idx in 0...count
agents << AgentProxy.new(b.impl.getAgent(idx), b)
end
end
return agents
end
def objects(query, kwargs = {})
timeout = 30
agent = nil
kwargs.merge!(query) if query.class == Hash
if kwargs.include?(:timeout)
timeout = kwargs[:timeout]
kwargs.delete(:timeout)
end
if kwargs.include?(:agent)
agent = kwargs[:agent]
kwargs.delete(:agent)
end
query = Query.new(kwargs) if query.class == Hash
@select = []
kwargs.each do |k,v|
@select << [k, v] if k.is_a?(String)
end
synchronize do
@sync_count = 1
@sync_result = []
broker = nil
synchronize { broker = @broker_list[0] }
broker.send_query(query.impl, nil, agent)
unless @cv.wait(timeout) { @sync_count == 0 }
raise "Timed out waiting for response"
end
return @sync_result
end
end
# Return one and only one object or nil.
def object(query, kwargs = {})
objs = objects(query, kwargs)
return objs.length == 1 ? objs[0] : nil
end
# Return the first of potentially many objects.
def first_object(query, kwargs = {})
objs = objects(query, kwargs)
return objs.length > 0 ? objs[0] : nil
end
# Check the object against select to check for a match
def select_match(object)
@select.each do |key, value|
object.properties.each do |prop, propval|
if key == prop.name && value != propval
return nil
end
end
end
return :true
end
def _get_result(list, context)
synchronize do
list.each do |item|
@sync_result << item if select_match(item)
end
@sync_count -= 1
@cv.signal
end
end
def start_sync(query)
end
def touch_sync(sync)
end
def end_sync(sync)
end
def run_cb_thread
while :true
synchronize { @cb_cond.wait(1) }
begin
count = do_console_events
end until count == 0
end
end
def start_console_events
synchronize { @cb_cond.signal }
end
def do_console_events
count = 0
valid = @impl.getEvent(@event)
while valid
count += 1
begin
case @event.kind
when Qmfengine::ConsoleEvent::AGENT_ADDED
@handler.agent_added(AgentProxy.new(@event.agent, nil)) if @handler
when Qmfengine::ConsoleEvent::AGENT_DELETED
@handler.agent_deleted(AgentProxy.new(@event.agent, nil)) if @handler
when Qmfengine::ConsoleEvent::NEW_PACKAGE
@handler.new_package(@event.name) if @handler
when Qmfengine::ConsoleEvent::NEW_CLASS
@handler.new_class(SchemaClassKey.new(@event.classKey)) if @handler
when Qmfengine::ConsoleEvent::OBJECT_UPDATE
@handler.object_update(ConsoleObject.new(nil, :impl => @event.object), @event.hasProps, @event.hasStats) if @handler
when Qmfengine::ConsoleEvent::EVENT_RECEIVED
@handler.event_received(QmfEvent.new(nil, :impl => @event.event)) if @handler
when Qmfengine::ConsoleEvent::AGENT_HEARTBEAT
@handler.agent_heartbeat(AgentProxy.new(@event.agent, nil), @event.timestamp) if @handler
when Qmfengine::ConsoleEvent::METHOD_RESPONSE
end
rescue Exception => ex
if @bt_count < 2
@bt_count += 1
end
end
@impl.popEvent
valid = @impl.getEvent(@event)
end
return count
end
end
class AgentProxy
attr_reader :impl, :broker, :label, :key
def initialize(impl, broker)
@impl = Qmfengine::AgentProxy.new(impl)
@broker = broker
@label = @impl.getLabel
@key = "#{@impl.getBrokerBank}.#{@impl.getAgentBank}"
end
end
class Broker < ConnectionHandler
include MonitorMixin
attr_reader :impl, :conn, :console, :broker_bank
def initialize(console, conn)
super()
@broker_bank = 1
@console = console
@conn = conn
@session = nil
@cv = new_cond
@stable = nil
@event = Qmfengine::BrokerEvent.new
@xmtMessage = Qmfengine::Message.new
@impl = Qmfengine::BrokerProxy.new(@console.impl)
@console.impl.addConnection(@impl, self)
@conn.add_conn_handler(self)
@operational = :true
end
def shutdown()
@console.impl.delConnection(@impl)
@conn.del_conn_handler(self)
@operational = :false
end
def wait_for_stable(timeout = nil)
synchronize do
return if @stable
if timeout
unless @cv.wait(timeout) { @stable }
raise "Timed out waiting for broker connection to become stable"
end
else
while not @stable
@cv.wait
end
end
end
end
def send_query(query, ctx, agent)
agent_impl = agent.impl if agent
@impl.sendQuery(query, ctx, agent_impl)
@conn.kick
end
def do_broker_events()
count = 0
valid = @impl.getEvent(@event)
while valid
count += 1
case @event.kind
when Qmfengine::BrokerEvent::BROKER_INFO
when Qmfengine::BrokerEvent::DECLARE_QUEUE
@conn.impl.declareQueue(@session.handle, @event.name)
when Qmfengine::BrokerEvent::DELETE_QUEUE
@conn.impl.deleteQueue(@session.handle, @event.name)
when Qmfengine::BrokerEvent::BIND
@conn.impl.bind(@session.handle, @event.exchange, @event.name, @event.bindingKey)
when Qmfengine::BrokerEvent::UNBIND
@conn.impl.unbind(@session.handle, @event.exchange, @event.name, @event.bindingKey)
when Qmfengine::BrokerEvent::SETUP_COMPLETE
@impl.startProtocol
when Qmfengine::BrokerEvent::STABLE
synchronize do
@stable = :true
@cv.signal
end
when Qmfengine::BrokerEvent::QUERY_COMPLETE
result = []
for idx in 0...@event.queryResponse.getObjectCount
result << ConsoleObject.new(nil, :impl => @event.queryResponse.getObject(idx), :broker => self)
end
@console._get_result(result, @event.context)
when Qmfengine::BrokerEvent::METHOD_RESPONSE
obj = @event.context
obj._method_result(MethodResponse.new(@event.methodResponse))
end
@impl.popEvent
valid = @impl.getEvent(@event)
end
return count
end
def do_broker_messages()
count = 0
valid = @impl.getXmtMessage(@xmtMessage)
while valid
count += 1
@conn.impl.sendMessage(@session.handle, @xmtMessage)
@impl.popXmt
valid = @impl.getXmtMessage(@xmtMessage)
end
return count
end
def do_events()
begin
@console.start_console_events
bcnt = do_broker_events
mcnt = do_broker_messages
end until bcnt == 0 and mcnt == 0
end
def conn_event_connected()
@session = Session.new(@conn, "qmfc-%s.%d" % [Socket.gethostname, Process::pid], self)
@impl.sessionOpened(@session.handle)
do_events
end
def conn_event_disconnected(error)
end
def conn_event_visit
do_events
end
def sess_event_session_closed(context, error)
@impl.sessionClosed()
end
def sess_event_recv(context, message)
@impl.handleRcvMessage(message)
do_events
end
end
##==============================================================================
## AGENT
##==============================================================================
class AgentHandler
def get_query(context, query, userId); end
def method_call(context, name, object_id, args, userId); end
end
class Agent < ConnectionHandler
def initialize(handler, label="")
if label == ""
@agentLabel = "rb-%s.%d" % [Socket.gethostname, Process::pid]
else
@agentLabel = label
end
@conn = nil
@handler = handler
@impl = Qmfengine::Agent.new(@agentLabel)
@event = Qmfengine::AgentEvent.new
@xmtMessage = Qmfengine::Message.new
end
def set_connection(conn)
@conn = conn
@conn.add_conn_handler(self)
end
def register_class(cls)
@impl.registerClass(cls.impl)
end
def alloc_object_id(low = 0, high = 0)
ObjectId.new(@impl.allocObjectId(low, high))
end
def raise_event(event)
@impl.raiseEvent(event.impl)
end
def query_response(context, object)
@impl.queryResponse(context, object.impl)
end
def query_complete(context)
@impl.queryComplete(context)
end
def method_response(context, status, text, arguments)
@impl.methodResponse(context, status, text, arguments.map)
end
def do_agent_events()
count = 0
valid = @impl.getEvent(@event)
while valid
count += 1
case @event.kind
when Qmfengine::AgentEvent::GET_QUERY
@handler.get_query(@event.sequence, Query.new(:impl => @event.query), @event.authUserId)
when Qmfengine::AgentEvent::START_SYNC
when Qmfengine::AgentEvent::END_SYNC
when Qmfengine::AgentEvent::METHOD_CALL
args = Arguments.new(@event.arguments)
@handler.method_call(@event.sequence, @event.name, ObjectId.new(@event.objectId),
args, @event.authUserId)
when Qmfengine::AgentEvent::DECLARE_QUEUE
@conn.impl.declareQueue(@session.handle, @event.name)
when Qmfengine::AgentEvent::DELETE_QUEUE
@conn.impl.deleteQueue(@session.handle, @event.name)
when Qmfengine::AgentEvent::BIND
@conn.impl.bind(@session.handle, @event.exchange, @event.name, @event.bindingKey)
when Qmfengine::AgentEvent::UNBIND
@conn.impl.unbind(@session.handle, @event.exchange, @event.name, @event.bindingKey)
when Qmfengine::AgentEvent::SETUP_COMPLETE
@impl.startProtocol()
end
@impl.popEvent
valid = @impl.getEvent(@event)
end
return count
end
def do_agent_messages()
count = 0
valid = @impl.getXmtMessage(@xmtMessage)
while valid
count += 1
@conn.impl.sendMessage(@session.handle, @xmtMessage)
@impl.popXmt
valid = @impl.getXmtMessage(@xmtMessage)
end
return count
end
def do_events()
begin
ecnt = do_agent_events
mcnt = do_agent_messages
end until ecnt == 0 and mcnt == 0
end
def conn_event_connected()
@session = Session.new(@conn, "qmfa-%s.%d" % [Socket.gethostname, Process::pid], self)
@impl.newSession
do_events
end
def conn_event_disconnected(error)
end
def conn_event_visit
do_events
end
def sess_event_session_closed(context, error)
end
def sess_event_recv(context, message)
@impl.handleRcvMessage(message)
do_events
end
end
end