blob: 68e7fee912a9535099afba8fd6bc380a4688247b [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import optparse, os, sys, time
from uuid import uuid4
from qpid.messaging import *
from qpid.log import enable, DEBUG, WARN
from common import *
parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...",
description="reserve a machine")
parser.add_option("-b", "--broker", default="localhost",
help="connect to specified BROKER (default %default)")
parser.add_option("-a", "--address", default="reservations",
help="address for reservation requests")
parser.add_option("-r", "--release", action="store_true",
help="release any machines matching the pattern")
parser.add_option("-s", "--status", action="store_true",
help="list machine status")
parser.add_option("-d", "--discover", action="store_true",
help="use discovery instead of inventory")
parser.add_option("-o", "--owner", default=os.environ["USER"],
help="the holder of the reservation")
parser.add_option("-n", "--number", type=int, default=1,
help="the number of machines to reserve")
parser.add_option("-t", "--timeout", type=float, default=10,
help="timeout in seconds to wait for resources")
parser.add_option("-v", dest="verbose", action="store_true",
help="enable verbose logging")
opts, args = parser.parse_args()
if opts.verbose:
enable("qpid", DEBUG)
else:
enable("qpid", WARN)
if args:
patterns = args
else:
patterns = ["*"]
conn = Connection.establish(opts.broker)
if opts.release:
request_type = "release"
candidate_status = BUSY
candidate_owner = opts.owner
else:
request_type = "reserve"
candidate_status = FREE
candidate_owner = None
class Requester(Dispatcher):
def __init__(self):
self.agents = {}
self.requests = set()
self.outstanding = set()
def agent_status(self, id):
status, owner = self.agents[id]
if owner:
return "%s %s(%s)" % (id, status, owner)
else:
return "%s %s" % (id, status)
def correlation(self, cid):
self.requests.add(cid)
self.outstanding.add(cid)
def ignored(self, msg):
return msg.properties.get("type") not in ("status", "empty") or \
msg.correlation_id not in self.requests
def do_status(self, msg):
id, status, owner = get_status(msg)
self.agents[id] = (status, owner)
if opts.status:
print self.agent_status(id)
def do_empty(self, msg):
print "no matching resources"
def candidates(self, candidate_status, candidate_owner):
for id, (status, owner) in self.agents.items():
if status == candidate_status and owner == candidate_owner:
yield id
def dispatch(self, msg):
result = Dispatcher.dispatch(self, msg)
count = msg.properties.get("count")
sequence = msg.properties.get("sequence")
if count and sequence == count:
self.outstanding.discard(msg.correlation_id)
return result
try:
ssn = conn.session()
rcv = ssn.receiver(opts.address, capacity=10)
snd = ssn.sender(opts.address)
correlation_id = str(uuid4())
if opts.discover:
properties = {"type": "discover", "identity": patterns}
content = None
else:
properties = {"type": "query"}
content = {"identity": patterns}
snd.send(Message(reply_to = opts.address,
correlation_id = correlation_id,
properties = properties,
content = content))
req = Requester()
req.correlation(correlation_id)
start = time.time()
ellapsed = 0
requested = set()
discovering = opts.discover
while ellapsed <= opts.timeout and (discovering or req.outstanding):
try:
msg = rcv.fetch(opts.timeout - ellapsed)
ssn.acknowledge(msg)
except Empty:
continue
finally:
ellapsed = time.time() - start
req.dispatch(msg)
if not opts.status:
if len(requested) < opts.number:
for cid in req.candidates(candidate_status, candidate_owner):
if cid in requested: continue
req_msg = Message(reply_to = opts.address,
correlation_id = str(uuid4()),
properties = {"type": request_type,
"identity": [cid]},
content = {"owner": opts.owner})
if not requested:
print "requesting %s:" % request_type,
print cid,
sys.stdout.flush()
req.correlation(req_msg.correlation_id)
snd.send(req_msg)
requested.add(cid)
else:
discovering = False
if requested:
print
owners = {}
for id in requested:
st, ow = req.agents[id]
if not owners.has_key(ow):
owners[ow] = []
owners[ow].append(id)
keys = list(owners.keys())
keys.sort()
for k in keys:
owners[k].sort()
v = ", ".join(owners[k])
if k is None:
print "free: %s" % v
else:
print "owner %s: %s" % (k, v)
elif req.agents and not opts.status:
print "no available resources"
if req.outstanding:
print "request timed out"
except KeyboardInterrupt:
pass
finally:
conn.close()