Merge pull request #7 from Humbedooh/master

Merge upstream 0.7.3 into local master
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0b81feb..0e258ab 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,11 @@
+# 0.7.3
+- Added support for reloading the ACL configuration with SIGUSR2 (#2).
+- Added support for resuming a stream with a sequence cursor.
+
 # 0.7.2
-- Addessed an issue with SQS not updating in real-time, only when backlog is requested.
+- Addressed an issue with SQS not updating in real-time, only when backlog is requested.
 - Added secure topics feature for locking publishing of certain pubsub topics to the ACL. 
+- Minor stability improvements.
 
 # 0.7.1
 - Use asyncio queues for modifying the list of events pending publishing to avoid potential race conditions.
diff --git a/README.md b/README.md
index 076d411..331d71e 100644
--- a/README.md
+++ b/README.md
@@ -87,7 +87,8 @@
   "text": "Apples are delicious",
   "pubsub_topics": ["fruits", "apples"],
   "pubsub_path": "/fruits/apples",
-  "pubsub_timestamp": 1588293679.5432327
+  "pubsub_timestamp": 1588293679.5432327,
+  "pubsub_cursor": "f02b4908-755f-4455-a215-d1627f190110"
 }
 ~~~
 
@@ -207,6 +208,17 @@
 than the timestamp presented by the client requesting a backlog), they will be
 delivered to the client, assuming they are younger than the backlog maximum age requirement. 
 
+### Accessing older payloads with a sequence cursor
+Payloads can also (as of `0.7.3`) be replayed by using the value from the last event's 
+`pubsub_cursor` value, resulting in a playback of all events pertaining to your desired 
+topics made _after_ the event with that cursor value in the `X-Fetch-Since-Cursor` request 
+header:
+
+~~~shell
+curl -H 'X-Fetch-Since-Cursor: f02b4908-755f-4455-a215-d1627f190110' http://localhost:2069/
+~~~
+
+
 *It is worth noting here*, for pseudo security reasons, that if the backlog maximum is set 
 sufficiently low (or the age requirement is omitted), this feature could be used to deduce 
 whether or not private events have happened, as a client can request everything in the backlog 
diff --git a/pypubsub.py b/pypubsub.py
index a0c562d..49a5e8f 100644
--- a/pypubsub.py
+++ b/pypubsub.py
@@ -32,9 +32,11 @@
 import plugins.ldap
 import plugins.sqs
 import typing
+import signal
+import uuid
 
 # Some consts
-PUBSUB_VERSION = '0.7.2'
+PUBSUB_VERSION = '0.7.3'
 PUBSUB_CONTENT_TYPE = 'application/vnd.pypubsub-stream'
 PUBSUB_DEFAULT_PORT = 2069
 PUBSUB_DEFAULT_IP = '0.0.0.0'
@@ -130,11 +132,17 @@
         self.pending_events = asyncio.Queue()
         self.backlog = []
         self.last_ping = time.time()
+        self.acl_file = args.acl
         self.acl = {}
+        self.load_acl()
+
+    def load_acl(self):
+        """Loads ACL from file"""
         try:
-            self.acl = yaml.safe_load(open(args.acl))
+            self.acl = yaml.safe_load(open(self.acl_file))
+            print(f"Loaded ACL from {self.acl_file}")
         except FileNotFoundError:
-            print(f"ACL configuration file {args.acl} not found, private events will not be broadcast.")
+            print(f"ACL configuration file {self.acl_file} not found, private events will not be broadcast.")
 
     async def poll(self):
         """Polls for new stuff to publish, and if found, publishes to whomever wants it."""
@@ -230,8 +238,9 @@
                 await resp.prepare(request)
 
                 # Is the client requesting a backlog of items?
-                backlog = request.headers.get('X-Fetch-Since')
-                if backlog:
+                epoch_based_backlog = request.headers.get('X-Fetch-Since')
+                cursor_based_backlog = request.headers.get('X-Fetch-Since-Cursor')
+                if epoch_based_backlog:  # epoch-based backlog search
                     try:
                         backlog_ts = int(backlog)
                     except ValueError:  # Default to 0 if we can't parse the epoch
@@ -243,6 +252,15 @@
                     for item in self.backlog:
                         if item.timestamp >= backlog_ts:
                             await item.publish([subscriber])
+                
+                if cursor_based_backlog and len(cursor_based_backlog) == 36:  # UUID4 cursor-based backlog search
+                    # For each item, publish to client if it was published after this cursor
+                    is_after_cursor = False
+                    for item in self.backlog:
+                        if item.cursor == cursor_based_backlog:  # Found cursor, mark it!
+                            is_after_cursor = True
+                        elif is_after_cursor:  # This is after the cursor, stream it
+                            await item.publish([subscriber])
 
                 while True:
                     await subscriber.ping()
@@ -325,6 +343,11 @@
 
     def run(self):
         loop = asyncio.get_event_loop()
+        # add a signal handler for SIGUSR2 to reload the ACL from disk
+        try:
+            loop.add_signal_handler(signal.SIGUSR2, self.load_acl)
+        except ValueError:
+            pass
         try:
             loop.run_until_complete(self.server_loop(loop))
         except KeyboardInterrupt:
@@ -415,15 +438,18 @@
         self.timestamp = timestamp or time.time()
         self.topics = [x for x in path.split('/') if x]
         self.private = False
+        self.cursor = str(uuid.uuid4())  # Event cursor for playback - UUID4 style
 
         # Private payload?
         if self.topics and self.topics[0] == 'private':
             self.private = True
             del self.topics[0]  # Remove the private bit from topics now.
 
+        # Set standard pubsub meta data in the payload
         self.json['pubsub_timestamp'] = self.timestamp
         self.json['pubsub_topics'] = self.topics
         self.json['pubsub_path'] = path
+        self.json['pubsub_cursor'] = self.cursor
 
     async def publish(self, subscribers: typing.List[Subscriber]):
         """Publishes an object to all subscribers using those topics (or a sub-set thereof)"""
diff --git a/requirements.txt b/requirements.txt
index 7e7654e..adb54a0 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -5,4 +5,4 @@
 PyYAML~=5.4.1
 aiobotocore~=1.0.7
 botocore~=1.15.32
-aiofile~=1.5.2
+aiofile~=3.8.5