Merge pull request #6 from Humbedooh/master

Merge upstream into infra repo
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 666db83..0b81feb 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,6 @@
 # 0.7.2
 - Addessed an issue with SQS not updating in real-time, only when backlog is requested.
+- Added secure topics feature for locking publishing of certain pubsub topics to the ACL. 
 
 # 0.7.1
 - Use asyncio queues for modifying the list of events pending publishing to avoid potential race conditions.
diff --git a/README.md b/README.md
index bcfd183..076d411 100644
--- a/README.md
+++ b/README.md
@@ -19,6 +19,7 @@
   * [Pushing a private event](#pushing-a-private-event)
   * [Retrieving private events](#retrieving-private-events)
   * [LDAP-based ACL](#ldap-based-acl)
+  * [Securing certain topics](#securing-certain-topics)
 - [Working with Amazon SQS](#working-with-amazon-sqs)
 - [Persistent backlogs](#persistent-backlogs)
 - [License](#license)
@@ -263,6 +264,21 @@
 
 See `pypubsub.yaml` for an LDAP example.
 
+### Securing certain topics
+You can secure topics, meaning only authenticated users with special credentials may post using 
+those topics. To do so, you will need to edit the `secure_topics`  list in the `clients` section of 
+your configuration file, for instance:
+
+~~~yaml
+clients:
+  secure_topics:
+    - bread
+    - syrup
+~~~
+The above would lock publishing the topics `bread` and `syrup` for anyone not specifically allowed
+ to use those topics in their ACL segment. Users or LDAP groups can be allowed topics via the 
+`topics` directive in their ACL segment. See the `pypubsub_acl.yaml` file for an example.
+
 ## Working with Amazon SQS
 PyPubSub supports AWS SQS for weaving in payloads from their server-less Simple Queue Services.
 Multiple queues can be supported and items pushed to SQS will seamlessly appear in the 
diff --git a/plugins/sqs.py b/plugins/sqs.py
index c34b9ab..693b435 100644
--- a/plugins/sqs.py
+++ b/plugins/sqs.py
@@ -18,6 +18,7 @@
 
 """ This is the SQS component of PyPubSub """
 
+import asyncio
 import aiobotocore
 import botocore.exceptions
 import json
@@ -32,61 +33,67 @@
     # Assume everything is configured in the client's .aws config
     session = aiobotocore.get_session()
     queue_name = config.get('queue', '???')
-    async with session.create_client('sqs',
-                                     aws_secret_access_key=config.get('secret'),
-                                     aws_access_key_id=config.get('key'),
-                                     region_name=config.get('region', 'default')
-                                     ) as client:
-        try:
-            response = await client.get_queue_url(QueueName=queue_name)
-        except botocore.exceptions.ClientError as err:
-            if err.response['Error']['Code'] == \
-                    'AWS.SimpleQueueService.NonExistentQueue':
-                print(f"SQS item {queue_name} does not exist, bailing!")
-                return
-            else:
-                raise
-        queue_url = response['QueueUrl']
-        print(f"Connected to SQS {queue_url}, reading stream...")
-        while True:
+    while True:
+        async with session.create_client('sqs',
+                                         aws_secret_access_key=config.get('secret'),
+                                         aws_access_key_id=config.get('key'),
+                                         region_name=config.get('region', 'default')
+                                         ) as client:
             try:
-                response = await client.receive_message(
-                    QueueUrl=queue_url,
-                    WaitTimeSeconds=3,
-                )
+                response = await client.get_queue_url(QueueName=queue_name)
+            except botocore.exceptions.ClientError as err:
+                if err.response['Error']['Code'] == \
+                        'AWS.SimpleQueueService.NonExistentQueue':
+                    print(f"SQS item {queue_name} does not exist, bailing!")
+                    return
+                else:
+                    raise
+            queue_url = response['QueueUrl']
+            print(f"Connected to SQS {queue_url}, reading stream...")
+            while True:
+                try:
+                    response = await client.receive_message(
+                        QueueUrl=queue_url,
+                        WaitTimeSeconds=3,
+                    )
 
-                if 'Messages' in response:
-                    for msg in response['Messages']:
-                        body = msg.get('Body', '{}')
-                        mid = msg.get('MessageId', '')
-                        try:
-                            # If we already logged this one, but couldn't delete - skip payload construction,
-                            # but do try to remove it again...
-                            if mid not in ITEMS_SEEN:
-                                js = json.loads(body)
-                                path = js.get('pubsub_path', '/')  # Default to catch-all pubsub topic
-                                payload = pypubsub.Payload(path, js)
-                                server.pending_events.put_nowait(payload)
-                                backlog_size = server.config.backlog.queue_size
-                                if backlog_size > 0:
-                                    server.backlog.append(payload)
-                        except ValueError as e:
-                            print(f"Could not parse payload from SQS: {e}")
-                        # Do we delete messages or keep them?
-                        if config.get('delete'):
+                    if 'Messages' in response:
+                        for msg in response['Messages']:
+                            body = msg.get('Body', '{}')
+                            mid = msg.get('MessageId', '')
                             try:
-                                await client.delete_message(
-                                    QueueUrl=queue_url,
-                                    ReceiptHandle=msg['ReceiptHandle']
-                                )
-                                if mid in ITEMS_SEEN:
-                                    ITEMS_SEEN.remove(mid) # Remove if found and now deleted
-                            except Exception as e:
+                                # If we already logged this one, but couldn't delete - skip payload construction,
+                                # but do try to remove it again...
                                 if mid not in ITEMS_SEEN:
-                                    print(f"Could not remove item from SQS, marking as potential later duplicate!")
-                                    print(e)
-                                    ITEMS_SEEN.append(mid)
-                        else:  # dedup nonetheless
-                            ITEMS_SEEN.append(mid)
-            except KeyboardInterrupt:
-                break
+                                    js = json.loads(body)
+                                    path = js.get('pubsub_path', '/')  # Default to catch-all pubsub topic
+                                    payload = pypubsub.Payload(path, js)
+                                    server.pending_events.put_nowait(payload)
+                                    backlog_size = server.config.backlog.queue_size
+                                    if backlog_size > 0:
+                                        server.backlog.append(payload)
+                            except ValueError as e:
+                                print(f"Could not parse payload from SQS: {e}")
+                            # Do we delete messages or keep them?
+                            if config.get('delete'):
+                                try:
+                                    await client.delete_message(
+                                        QueueUrl=queue_url,
+                                        ReceiptHandle=msg['ReceiptHandle']
+                                    )
+                                    if mid in ITEMS_SEEN:
+                                        ITEMS_SEEN.remove(mid) # Remove if found and now deleted
+                                except Exception as e:
+                                    if mid not in ITEMS_SEEN:
+                                        print(f"Could not remove item from SQS, marking as potential later duplicate!")
+                                        print(e)
+                                        ITEMS_SEEN.append(mid)
+                            else:  # dedup nonetheless
+                                ITEMS_SEEN.append(mid)
+                except KeyboardInterrupt:
+                    return
+                except botocore.exceptions.ClientError as e:
+                    print(f"Could not receive message(s) from SQS, bouncing connection:")
+                    print(e)
+                    break
+            await asyncio.sleep(10)  # Sleep for 10 before bouncing SQS connection so as to not retry too often.
diff --git a/pypubsub.py b/pypubsub.py
index 9f63562..a0c562d 100644
--- a/pypubsub.py
+++ b/pypubsub.py
@@ -67,6 +67,7 @@
     backlog: BacklogConfig
     payloaders: typing.List[netaddr.ip.IPNetwork]
     oldschoolers: typing.List[str]
+    secure_topics: typing.Optional[typing.List[str]]
 
     def __init__(self, yml: dict):
 
@@ -108,6 +109,9 @@
         # Binary backwards compatibility
         self.oldschoolers = yml['clients'].get('oldschoolers', [])
 
+        # Secure topics, if any
+        self.secure_topics = set(yml['clients'].get('secure_topics', []) or [])
+
 
 class Server:
     """Main server class, responsible for handling requests and publishing events """
@@ -158,6 +162,12 @@
             'X-Requests': str(self.server.requests_count),
         }
 
+        subscriber = Subscriber(self, request)
+        # Is there a basic auth in this request? If so, set up ACL
+        auth = request.headers.get('Authorization')
+        if auth:
+            await subscriber.parse_acl(auth)
+
         # Are we handling a publisher payload request? (PUT/POST)
         if request.method in ['PUT', 'POST']:
             ip = netaddr.IPAddress(request.remote)
@@ -166,6 +176,16 @@
                 if ip in network:
                     allowed = True
                     break
+            # Check for secure topics
+            payload_topics = set(request.path.split("/"))
+            if any(x in self.config.secure_topics for x in payload_topics):
+                allowed = False
+                # Figure out which secure topics we need permission for:
+                which_secure = [x for x in self.config.secure_topics if x in payload_topics]
+                # Is the user allowed to post to all of these secure topics?
+                if subscriber.secure_topics and all(x in subscriber.secure_topics for x in which_secure):
+                    allowed = True
+
             if not allowed:
                 resp = aiohttp.web.Response(headers=headers, status=403, text=PUBSUB_NOT_ALLOWED)
                 return resp
@@ -200,14 +220,9 @@
             # We do not support HTTP 1.0 here...
             if request.version.major == 1 and request.version.minor == 0:
                 return resp
-            subscriber = Subscriber(self, resp, request)
-
-            # Is there a basic auth in this request? If so, set up ACL
-            auth = request.headers.get('Authorization')
-            if auth:
-                subscriber.acl = await subscriber.parse_acl(auth)
 
             # Subscribe the user before we deal with the potential backlog request and pings
+            subscriber.connection = resp
             self.subscribers.append(subscriber)
             resp.content_type = PUBSUB_CONTENT_TYPE
             try:
@@ -322,11 +337,12 @@
     acl:    dict
     topics: typing.List[typing.List[str]]
 
-    def __init__(self, server: Server, connection: aiohttp.web.StreamResponse, request: aiohttp.web.BaseRequest):
-        self.connection = connection
+    def __init__(self, server: Server, request: aiohttp.web.BaseRequest):
+        self.connection: typing.Optional[aiohttp.web.StreamResponse] = None
         self.acl = {}
         self.server = server
         self.lock = asyncio.Lock()
+        self.secure_topics = []
 
         # Set topics subscribed to
         self.topics = []
@@ -357,7 +373,8 @@
                     for k, v in acl.items():
                         assert isinstance(v, list), f"ACL segment {k} for user {u} is not a list of topics!"
                     print(f"Client {u} successfully authenticated (and ACL is valid).")
-                    return acl
+                    self.acl = acl
+                    self.secure_topics = set(self.server.acl[u].get('topics', []) or [])
             elif self.server.config.ldap:
                 acl = {}
                 groups = await self.server.config.ldap.get_groups(u,p)
@@ -370,7 +387,8 @@
                             assert isinstance(topics,
                                               list), f"ACL segment {segment} for user {u} is not a list of topics!"
                             acl[segment] = topics
-                return acl
+                self.acl = acl
+
         except binascii.Error as e:
             pass  # Bad Basic Auth params, bail quietly
         except AssertionError as e:
@@ -378,7 +396,7 @@
             print(f"ACL configuration error: ACL scheme for {u} contains errors, setting ACL to nothing.")
         except Exception as e:
             print(f"Basic unknown exception occurred: {e}")
-        return {}
+        
 
     async def ping(self):
         """Generic ping-back to the client"""
diff --git a/pypubsub.yaml b/pypubsub.yaml
index c9eed69..854b77e 100644
--- a/pypubsub.yaml
+++ b/pypubsub.yaml
@@ -16,6 +16,9 @@
   # Oldschoolers denotes clients expecting binary events, such as svnwcsub
   oldschoolers:
     - svnwcsub
+  # Secure topics are a list of topics that may only be broadcast by certain authenticated users.
+  # Default is nil, which means all topics are allowed by anyone in the allow CIDRs
+  secure_topics: ~
 #  ldap:
 #    uri: ldaps://ldap.example.org
 #    user_dn: uid=%s,ou=people,dc=example,dc=org
diff --git a/pypubsub_acl.yaml b/pypubsub_acl.yaml
index 892823c..c25d668 100644
--- a/pypubsub_acl.yaml
+++ b/pypubsub_acl.yaml
@@ -1,14 +1,22 @@
 # ACL for authenticated users
 
+# in this example, username testuser with password foobar has access to two separate pubsub contexts
 testuser:
   password: foobar
   acl:
-    # Allow access to events for a private git repository
-    foobar:
+    # Allow sample access to events for a private git repository called privaterepo.git
+    # Topics are AND'ed together and create a common denominator for what the client has access to.
+    # Thus, the below grants access to git/privaterepo.git, but not git/otherrepo.git
+    # You can name/title these segments as you see fit.
+    title_of_my_first_access_segment:
       - git
       - privaterepo.git
     # Allow access to events for dev@example.org mailing list
-    myemails:
+    allow_email_access_for_foobar:
       - email
       - example.org
       - dev
+  # Explicitly allow this user to publish topics 'foo' and 'bar', even if they are secured.
+  topics:
+    - foo
+    - bar
diff --git a/requirements.txt b/requirements.txt
index 8523d31..7e7654e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,7 +2,7 @@
 asyncio>=3.4.3
 netaddr>=0.7.19
 python-ldap>=3.0.0
-PyYAML~=5.1.2
+PyYAML~=5.4.1
 aiobotocore~=1.0.7
 botocore~=1.15.32
 aiofile~=1.5.2