Merge pull request #6 from Humbedooh/master
Merge upstream into infra repo
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 666db83..0b81feb 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,6 @@
# 0.7.2
- Addessed an issue with SQS not updating in real-time, only when backlog is requested.
+- Added secure topics feature for locking publishing of certain pubsub topics to the ACL.
# 0.7.1
- Use asyncio queues for modifying the list of events pending publishing to avoid potential race conditions.
diff --git a/README.md b/README.md
index bcfd183..076d411 100644
--- a/README.md
+++ b/README.md
@@ -19,6 +19,7 @@
* [Pushing a private event](#pushing-a-private-event)
* [Retrieving private events](#retrieving-private-events)
* [LDAP-based ACL](#ldap-based-acl)
+ * [Securing certain topics](#securing-certain-topics)
- [Working with Amazon SQS](#working-with-amazon-sqs)
- [Persistent backlogs](#persistent-backlogs)
- [License](#license)
@@ -263,6 +264,21 @@
See `pypubsub.yaml` for an LDAP example.
+### Securing certain topics
+You can secure topics, meaning only authenticated users with special credentials may post using
+those topics. To do so, you will need to edit the `secure_topics` list in the `clients` section of
+your configuration file, for instance:
+
+~~~yaml
+clients:
+ secure_topics:
+ - bread
+ - syrup
+~~~
+The above would lock publishing the topics `bread` and `syrup` for anyone not specifically allowed
+ to use those topics in their ACL segment. Users or LDAP groups can be allowed topics via the
+`topics` directive in their ACL segment. See the `pypubsub_acl.yaml` file for an example.
+
## Working with Amazon SQS
PyPubSub supports AWS SQS for weaving in payloads from their server-less Simple Queue Services.
Multiple queues can be supported and items pushed to SQS will seamlessly appear in the
diff --git a/plugins/sqs.py b/plugins/sqs.py
index c34b9ab..693b435 100644
--- a/plugins/sqs.py
+++ b/plugins/sqs.py
@@ -18,6 +18,7 @@
""" This is the SQS component of PyPubSub """
+import asyncio
import aiobotocore
import botocore.exceptions
import json
@@ -32,61 +33,67 @@
# Assume everything is configured in the client's .aws config
session = aiobotocore.get_session()
queue_name = config.get('queue', '???')
- async with session.create_client('sqs',
- aws_secret_access_key=config.get('secret'),
- aws_access_key_id=config.get('key'),
- region_name=config.get('region', 'default')
- ) as client:
- try:
- response = await client.get_queue_url(QueueName=queue_name)
- except botocore.exceptions.ClientError as err:
- if err.response['Error']['Code'] == \
- 'AWS.SimpleQueueService.NonExistentQueue':
- print(f"SQS item {queue_name} does not exist, bailing!")
- return
- else:
- raise
- queue_url = response['QueueUrl']
- print(f"Connected to SQS {queue_url}, reading stream...")
- while True:
+ while True:
+ async with session.create_client('sqs',
+ aws_secret_access_key=config.get('secret'),
+ aws_access_key_id=config.get('key'),
+ region_name=config.get('region', 'default')
+ ) as client:
try:
- response = await client.receive_message(
- QueueUrl=queue_url,
- WaitTimeSeconds=3,
- )
+ response = await client.get_queue_url(QueueName=queue_name)
+ except botocore.exceptions.ClientError as err:
+ if err.response['Error']['Code'] == \
+ 'AWS.SimpleQueueService.NonExistentQueue':
+ print(f"SQS item {queue_name} does not exist, bailing!")
+ return
+ else:
+ raise
+ queue_url = response['QueueUrl']
+ print(f"Connected to SQS {queue_url}, reading stream...")
+ while True:
+ try:
+ response = await client.receive_message(
+ QueueUrl=queue_url,
+ WaitTimeSeconds=3,
+ )
- if 'Messages' in response:
- for msg in response['Messages']:
- body = msg.get('Body', '{}')
- mid = msg.get('MessageId', '')
- try:
- # If we already logged this one, but couldn't delete - skip payload construction,
- # but do try to remove it again...
- if mid not in ITEMS_SEEN:
- js = json.loads(body)
- path = js.get('pubsub_path', '/') # Default to catch-all pubsub topic
- payload = pypubsub.Payload(path, js)
- server.pending_events.put_nowait(payload)
- backlog_size = server.config.backlog.queue_size
- if backlog_size > 0:
- server.backlog.append(payload)
- except ValueError as e:
- print(f"Could not parse payload from SQS: {e}")
- # Do we delete messages or keep them?
- if config.get('delete'):
+ if 'Messages' in response:
+ for msg in response['Messages']:
+ body = msg.get('Body', '{}')
+ mid = msg.get('MessageId', '')
try:
- await client.delete_message(
- QueueUrl=queue_url,
- ReceiptHandle=msg['ReceiptHandle']
- )
- if mid in ITEMS_SEEN:
- ITEMS_SEEN.remove(mid) # Remove if found and now deleted
- except Exception as e:
+ # If we already logged this one, but couldn't delete - skip payload construction,
+ # but do try to remove it again...
if mid not in ITEMS_SEEN:
- print(f"Could not remove item from SQS, marking as potential later duplicate!")
- print(e)
- ITEMS_SEEN.append(mid)
- else: # dedup nonetheless
- ITEMS_SEEN.append(mid)
- except KeyboardInterrupt:
- break
+ js = json.loads(body)
+ path = js.get('pubsub_path', '/') # Default to catch-all pubsub topic
+ payload = pypubsub.Payload(path, js)
+ server.pending_events.put_nowait(payload)
+ backlog_size = server.config.backlog.queue_size
+ if backlog_size > 0:
+ server.backlog.append(payload)
+ except ValueError as e:
+ print(f"Could not parse payload from SQS: {e}")
+ # Do we delete messages or keep them?
+ if config.get('delete'):
+ try:
+ await client.delete_message(
+ QueueUrl=queue_url,
+ ReceiptHandle=msg['ReceiptHandle']
+ )
+ if mid in ITEMS_SEEN:
+ ITEMS_SEEN.remove(mid) # Remove if found and now deleted
+ except Exception as e:
+ if mid not in ITEMS_SEEN:
+ print(f"Could not remove item from SQS, marking as potential later duplicate!")
+ print(e)
+ ITEMS_SEEN.append(mid)
+ else: # dedup nonetheless
+ ITEMS_SEEN.append(mid)
+ except KeyboardInterrupt:
+ return
+ except botocore.exceptions.ClientError as e:
+ print(f"Could not receive message(s) from SQS, bouncing connection:")
+ print(e)
+ break
+ await asyncio.sleep(10) # Sleep for 10 before bouncing SQS connection so as to not retry too often.
diff --git a/pypubsub.py b/pypubsub.py
index 9f63562..a0c562d 100644
--- a/pypubsub.py
+++ b/pypubsub.py
@@ -67,6 +67,7 @@
backlog: BacklogConfig
payloaders: typing.List[netaddr.ip.IPNetwork]
oldschoolers: typing.List[str]
+ secure_topics: typing.Optional[typing.List[str]]
def __init__(self, yml: dict):
@@ -108,6 +109,9 @@
# Binary backwards compatibility
self.oldschoolers = yml['clients'].get('oldschoolers', [])
+ # Secure topics, if any
+ self.secure_topics = set(yml['clients'].get('secure_topics', []) or [])
+
class Server:
"""Main server class, responsible for handling requests and publishing events """
@@ -158,6 +162,12 @@
'X-Requests': str(self.server.requests_count),
}
+ subscriber = Subscriber(self, request)
+ # Is there a basic auth in this request? If so, set up ACL
+ auth = request.headers.get('Authorization')
+ if auth:
+ await subscriber.parse_acl(auth)
+
# Are we handling a publisher payload request? (PUT/POST)
if request.method in ['PUT', 'POST']:
ip = netaddr.IPAddress(request.remote)
@@ -166,6 +176,16 @@
if ip in network:
allowed = True
break
+ # Check for secure topics
+ payload_topics = set(request.path.split("/"))
+ if any(x in self.config.secure_topics for x in payload_topics):
+ allowed = False
+ # Figure out which secure topics we need permission for:
+ which_secure = [x for x in self.config.secure_topics if x in payload_topics]
+ # Is the user allowed to post to all of these secure topics?
+ if subscriber.secure_topics and all(x in subscriber.secure_topics for x in which_secure):
+ allowed = True
+
if not allowed:
resp = aiohttp.web.Response(headers=headers, status=403, text=PUBSUB_NOT_ALLOWED)
return resp
@@ -200,14 +220,9 @@
# We do not support HTTP 1.0 here...
if request.version.major == 1 and request.version.minor == 0:
return resp
- subscriber = Subscriber(self, resp, request)
-
- # Is there a basic auth in this request? If so, set up ACL
- auth = request.headers.get('Authorization')
- if auth:
- subscriber.acl = await subscriber.parse_acl(auth)
# Subscribe the user before we deal with the potential backlog request and pings
+ subscriber.connection = resp
self.subscribers.append(subscriber)
resp.content_type = PUBSUB_CONTENT_TYPE
try:
@@ -322,11 +337,12 @@
acl: dict
topics: typing.List[typing.List[str]]
- def __init__(self, server: Server, connection: aiohttp.web.StreamResponse, request: aiohttp.web.BaseRequest):
- self.connection = connection
+ def __init__(self, server: Server, request: aiohttp.web.BaseRequest):
+ self.connection: typing.Optional[aiohttp.web.StreamResponse] = None
self.acl = {}
self.server = server
self.lock = asyncio.Lock()
+ self.secure_topics = []
# Set topics subscribed to
self.topics = []
@@ -357,7 +373,8 @@
for k, v in acl.items():
assert isinstance(v, list), f"ACL segment {k} for user {u} is not a list of topics!"
print(f"Client {u} successfully authenticated (and ACL is valid).")
- return acl
+ self.acl = acl
+ self.secure_topics = set(self.server.acl[u].get('topics', []) or [])
elif self.server.config.ldap:
acl = {}
groups = await self.server.config.ldap.get_groups(u,p)
@@ -370,7 +387,8 @@
assert isinstance(topics,
list), f"ACL segment {segment} for user {u} is not a list of topics!"
acl[segment] = topics
- return acl
+ self.acl = acl
+
except binascii.Error as e:
pass # Bad Basic Auth params, bail quietly
except AssertionError as e:
@@ -378,7 +396,7 @@
print(f"ACL configuration error: ACL scheme for {u} contains errors, setting ACL to nothing.")
except Exception as e:
print(f"Basic unknown exception occurred: {e}")
- return {}
+
async def ping(self):
"""Generic ping-back to the client"""
diff --git a/pypubsub.yaml b/pypubsub.yaml
index c9eed69..854b77e 100644
--- a/pypubsub.yaml
+++ b/pypubsub.yaml
@@ -16,6 +16,9 @@
# Oldschoolers denotes clients expecting binary events, such as svnwcsub
oldschoolers:
- svnwcsub
+ # Secure topics are a list of topics that may only be broadcast by certain authenticated users.
+ # Default is nil, which means all topics are allowed by anyone in the allow CIDRs
+ secure_topics: ~
# ldap:
# uri: ldaps://ldap.example.org
# user_dn: uid=%s,ou=people,dc=example,dc=org
diff --git a/pypubsub_acl.yaml b/pypubsub_acl.yaml
index 892823c..c25d668 100644
--- a/pypubsub_acl.yaml
+++ b/pypubsub_acl.yaml
@@ -1,14 +1,22 @@
# ACL for authenticated users
+# in this example, username testuser with password foobar has access to two separate pubsub contexts
testuser:
password: foobar
acl:
- # Allow access to events for a private git repository
- foobar:
+ # Allow sample access to events for a private git repository called privaterepo.git
+ # Topics are AND'ed together and create a common denominator for what the client has access to.
+ # Thus, the below grants access to git/privaterepo.git, but not git/otherrepo.git
+ # You can name/title these segments as you see fit.
+ title_of_my_first_access_segment:
- git
- privaterepo.git
# Allow access to events for dev@example.org mailing list
- myemails:
+ allow_email_access_for_foobar:
- email
- example.org
- dev
+ # Explicitly allow this user to publish topics 'foo' and 'bar', even if they are secured.
+ topics:
+ - foo
+ - bar
diff --git a/requirements.txt b/requirements.txt
index 8523d31..7e7654e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,7 +2,7 @@
asyncio>=3.4.3
netaddr>=0.7.19
python-ldap>=3.0.0
-PyYAML~=5.1.2
+PyYAML~=5.4.1
aiobotocore~=1.0.7
botocore~=1.15.32
aiofile~=1.5.2