blob: ff3c7db46e351d5522b695f6d25c01d78ef927fa [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os
import getopt
import sys
import locale
from qmf.console import Session
_recursive = False
_host = "localhost"
_durable = False
_clusterDurable = False
_fileCount = 8
_fileSize = 24
_maxQueueSize = None
_maxQueueCount = None
_policyType = None
_lvq = False
_msgSequence = False
_ive = False
FILECOUNT = "qpid.file_count"
FILESIZE = "qpid.file_size"
MAX_QUEUE_SIZE = "qpid.max_size"
MAX_QUEUE_COUNT = "qpid.max_count"
POLICY_TYPE = "qpid.policy_type"
CLUSTER_DURABLE = "qpid.persist_last_node"
LVQ = "qpid.last_value_queue"
MSG_SEQUENCE = "qpid.msg_sequence"
IVE = "qpid.ive"
def Usage ():
print "Usage: qpid-config [OPTIONS]"
print " qpid-config [OPTIONS] exchanges [filter-string]"
print " qpid-config [OPTIONS] queues [filter-string]"
print " qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]"
print " qpid-config [OPTIONS] del exchange <name>"
print " qpid-config [OPTIONS] add queue <name> [AddQueueOptions]"
print " qpid-config [OPTIONS] del queue <name>"
print " qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]"
print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]"
print
print "Options:"
print " -b [ --bindings ] Show bindings in queue or exchange list"
print " -a [ --broker-addr ] Address (localhost) Address of qpidd broker"
print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
print
print "Add Queue Options:"
print " --durable Queue is durable"
print " --cluster-durable Queue becomes durable if there is only one functioning cluster node"
print " --file-count N (8) Number of files in queue's persistence journal"
print " --file-size N (24) File size in pages (64Kib/page)"
print " --max-queue-size N Maximum in-memory queue size as bytes"
print " --max-queue-count N Maximum in-memory queue size as a number of messages"
print " --policy-type TYPE Action taken when queue limit is reached (reject, flow_to_disk, ring, ring_strict)"
print " --last-value-queue Enable LVQ behavior on the queue"
print
print "Add Exchange Options:"
print " --durable Exchange is durable"
print " --sequence Exchange will insert a 'qpid.msg_sequence' field in the message header"
print " with a value that increments for each message forwarded."
print " --ive Exchange will behave as an 'initial-value-exchange', keeping a reference"
print " to the last message forwarded and enqueuing that message to newly bound"
print " queues."
print
sys.exit (1)
class BrokerManager:
def __init__ (self):
self.brokerName = None
self.qmf = None
self.broker = None
def SetBroker (self, brokerUrl):
self.url = brokerUrl
self.qmf = Session()
self.broker = self.qmf.addBroker(brokerUrl)
agents = self.qmf.getAgents()
for a in agents:
if a.getAgentBank() == 0:
self.brokerAgent = a
def Disconnect(self):
if self.broker:
self.qmf.delBroker(self.broker)
def Overview (self):
exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
print "Total Exchanges: %d" % len (exchanges)
etype = {}
for ex in exchanges:
if ex.type not in etype:
etype[ex.type] = 1
else:
etype[ex.type] = etype[ex.type] + 1
for typ in etype:
print "%15s: %d" % (typ, etype[typ])
print
print " Total Queues: %d" % len (queues)
_durable = 0
for queue in queues:
if queue.durable:
_durable = _durable + 1
print " durable: %d" % _durable
print " non-durable: %d" % (len (queues) - _durable)
def ExchangeList (self, filter):
exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
caption1 = "Type "
caption2 = "Exchange Name"
maxNameLen = len(caption2)
for ex in exchanges:
if self.match(ex.name, filter):
if len(ex.name) > maxNameLen: maxNameLen = len(ex.name)
print "%s%-*s Attributes" % (caption1, maxNameLen, caption2)
line = ""
for i in range(((maxNameLen + len(caption1)) / 5) + 5):
line += "====="
print line
for ex in exchanges:
if self.match (ex.name, filter):
print "%-10s%-*s " % (ex.type, maxNameLen, ex.name),
args = ex.arguments
if ex.durable: print "--durable",
if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence",
if IVE in args and args[IVE] == 1: print "--ive",
print
def ExchangeListRecurse (self, filter):
exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
for ex in exchanges:
if self.match (ex.name, filter):
print "Exchange '%s' (%s)" % (ex.name, ex.type)
for bind in bindings:
if bind.exchangeRef == ex.getObjectId():
qname = "<unknown>"
queue = self.findById (queues, bind.queueRef)
if queue != None:
qname = queue.name
print " bind [%s] => %s" % (bind.bindingKey, qname)
def QueueList (self, filter):
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
caption = "Queue Name"
maxNameLen = len(caption)
for q in queues:
if self.match (q.name, filter):
if len(q.name) > maxNameLen: maxNameLen = len(q.name)
print "%-*s Attributes" % (maxNameLen, caption)
line = ""
for i in range((maxNameLen / 5) + 5):
line += "====="
print line
for q in queues:
if self.match (q.name, filter):
print "%-*s " % (maxNameLen, q.name),
args = q.arguments
if q.durable: print "--durable",
if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable",
if q.autoDelete: print "auto-del",
if q.exclusive: print "excl",
if FILESIZE in args: print "--file-size=%d" % args[FILESIZE],
if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT],
if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE],
if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT],
if POLICY_TYPE in args: print "--policy-type=%s" % args[POLICY_TYPE],
if LVQ in args and args[LVQ] == 1: print "--last-value-queue",
print
def QueueListRecurse (self, filter):
exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
bindings = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
for queue in queues:
if self.match (queue.name, filter):
print "Queue '%s'" % queue.name
for bind in bindings:
if bind.queueRef == queue.getObjectId():
ename = "<unknown>"
ex = self.findById (exchanges, bind.exchangeRef)
if ex != None:
ename = ex.name
if ename == "":
ename = "''"
print " bind [%s] => %s" % (bind.bindingKey, ename)
def AddExchange (self, args):
if len (args) < 2:
Usage ()
etype = args[0]
ename = args[1]
declArgs = {}
if _msgSequence:
declArgs[MSG_SEQUENCE] = 1
if _ive:
declArgs[IVE] = 1
self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, durable=_durable, arguments=declArgs)
def DelExchange (self, args):
if len (args) < 1:
Usage ()
ename = args[0]
self.broker.getAmqpSession().exchange_delete (exchange=ename)
def AddQueue (self, args):
if len (args) < 1:
Usage ()
qname = args[0]
declArgs = {}
if _durable:
declArgs[FILECOUNT] = _fileCount
declArgs[FILESIZE] = _fileSize
if _maxQueueSize:
declArgs[MAX_QUEUE_SIZE] = _maxQueueSize
if _maxQueueCount:
declArgs[MAX_QUEUE_COUNT] = _maxQueueCount
if _policyType:
declArgs[POLICY_TYPE] = _policyType
if _clusterDurable:
declArgs[CLUSTER_DURABLE] = 1
if _lvq:
declArgs[LVQ] = 1
self.broker.getAmqpSession().queue_declare (queue=qname, durable=_durable, arguments=declArgs)
def DelQueue (self, args):
if len (args) < 1:
Usage ()
qname = args[0]
self.broker.getAmqpSession().queue_delete (queue=qname)
def Bind (self, args):
if len (args) < 2:
Usage ()
ename = args[0]
qname = args[1]
key = ""
if len (args) > 2:
key = args[2]
self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key)
def Unbind (self, args):
if len (args) < 2:
Usage ()
ename = args[0]
qname = args[1]
key = ""
if len (args) > 2:
key = args[2]
self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key)
def findById (self, items, id):
for item in items:
if item.getObjectId() == id:
return item
return None
def match (self, name, filter):
if filter == "":
return True
if name.find (filter) == -1:
return False
return True
def YN (bool):
if bool:
return 'Y'
return 'N'
##
## Main Program
##
try:
longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", "file-count=",
"file-size=", "max-queue-size=", "max-queue-count=", "policy-type=",
"last-value-queue", "sequence", "ive")
(optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts)
except:
Usage ()
try:
encoding = locale.getpreferredencoding()
cargs = [a.decode(encoding) for a in encArgs]
except:
cargs = encArgs
for opt in optlist:
if opt[0] == "-b" or opt[0] == "--bindings":
_recursive = True
if opt[0] == "-a" or opt[0] == "--broker-addr":
_host = opt[1]
if opt[0] == "--durable":
_durable = True
if opt[0] == "--cluster-durable":
_clusterDurable = True
if opt[0] == "--file-count":
_fileCount = int (opt[1])
if opt[0] == "--file-size":
_fileSize = int (opt[1])
if opt[0] == "--max-queue-size":
_maxQueueSize = int (opt[1])
if opt[0] == "--max-queue-count":
_maxQueueCount = int (opt[1])
if opt[0] == "--policy-type":
_policyType = opt[1]
if opt[0] == "--last-value-queue":
_lvq = True
if opt[0] == "--sequence":
_msgSequence = True
if opt[0] == "--ive":
_ive = True
nargs = len (cargs)
bm = BrokerManager ()
try:
bm.SetBroker(_host)
if nargs == 0:
bm.Overview ()
else:
cmd = cargs[0]
modifier = ""
if nargs > 1:
modifier = cargs[1]
if cmd == "exchanges":
if _recursive:
bm.ExchangeListRecurse (modifier)
else:
bm.ExchangeList (modifier)
elif cmd == "queues":
if _recursive:
bm.QueueListRecurse (modifier)
else:
bm.QueueList (modifier)
elif cmd == "add":
if modifier == "exchange":
bm.AddExchange (cargs[2:])
elif modifier == "queue":
bm.AddQueue (cargs[2:])
else:
Usage ()
elif cmd == "del":
if modifier == "exchange":
bm.DelExchange (cargs[2:])
elif modifier == "queue":
bm.DelQueue (cargs[2:])
else:
Usage ()
elif cmd == "bind":
bm.Bind (cargs[1:])
elif cmd == "unbind":
bm.Unbind (cargs[1:])
else:
Usage ()
except KeyboardInterrupt:
print
except Exception,e:
print "Failed:", e.args
sys.exit(1)
bm.Disconnect()