blob: c46a9aea402375492c049b4009d2e910ce45113a [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import print_function, unicode_literals
import signal, optparse
from proton.handlers import MessagingHandler
from proton.reactor import Container
from proton import Array, Data, symbol, UNDESCRIBED
from cproton import pn_sasl_config_path
class AuthService(MessagingHandler):
def __init__(self, address):
super(AuthService, self).__init__()
self.address = address
self.permissions = {}
self.allow('admin', '*', ['send', 'recv'])
self.allow('guest', 'foo', ['send', 'recv'])
self.listener = None
def allow(self, user, address, permissions):
if not self.permissions.get(user):
self.permissions[user] = {}
self.permissions[user][address] = Array(UNDESCRIBED, Data.STRING, *permissions)
def on_start(self, event):
self.listener = event.container.listen(self.address)
def stop(self):
if self.listener:
self.listener.close()
def on_connection_opening(self, event):
if self.permissions.get(event.transport.user):
event.connection.properties = {
symbol('authenticated-identity'): "%s" % event.transport.user,
symbol('address-authz'): self.permissions[event.transport.user]
}
else:
event.connection.properties = {
symbol('authenticated-identity'): "%s" % event.transport.user,
symbol('address-authz'): {}
}
parser = optparse.OptionParser(usage="usage: %prog [options]",
description="test authentication and authorization service")
parser.add_option("-a", "--address", default="localhost:55671",
help="address to listen on (default %default)")
parser.add_option("-c", "--config", help="sasl config path")
opts, args = parser.parse_args()
print('starting')
if opts.config:
pn_sasl_config_path(None, opts.config)
print('set sasl config path to %s' % opts.config)
handler = AuthService(opts.address)
def sigterm_handler(_signo, _stack_frame):
#sys.exit(0)
handler.stop()
signal.signal(signal.SIGTERM, sigterm_handler)
Container(handler).run()