Add async pubsub client.

This comes from infrastructure-qbot/pubsub.py #5d653be
diff --git a/asfpy/pubsub.py b/asfpy/pubsub.py
index 6b78cbf..e816c36 100644
--- a/asfpy/pubsub.py
+++ b/asfpy/pubsub.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -14,13 +14,150 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 """ PyPubSub listener class. """
+#
+# TYPICAL USAGE:
+#
+#   async for payload in listen(PUBSUB_URL):
+#
+# This will produce a series of payloads, forever.
+#
+# NOTE: this listener is intended for pypubsub, which terminates
+#   payloads with a newline. The old svnpubsub used NUL characters,
+#   so this client will not work with that server.
+#
 
 import requests
 import requests.exceptions
 import json
 import time
 import sys
+import asyncio
+import logging
+
+import aiohttp
+
+
+LOGGER = logging.getLogger(__name__)
+
+# The server sends keepalives every 5 seconds, so we should see
+# activity well within this timeout period.
+DEFAULT_INACTIVITY_TIMEOUT = 11
+### for debug:
+#DEFAULT_INACTIVITY_TIMEOUT = 4.5
+
+
+async def listen(pubsub_url, username=None, password=None, timeout=None):
+
+    if username:
+        auth = aiohttp.BasicAuth(username, password)
+    else:
+        auth = None
+
+    if timeout is None:
+        timeout = DEFAULT_INACTIVITY_TIMEOUT
+    ct = aiohttp.ClientTimeout(sock_read=timeout)
+
+    async with aiohttp.ClientSession(auth=auth, timeout=ct) as session:
+
+        # Retry immediately, and then back it off.
+        delay = 0.0
+
+        ### tbd: look at event loop, to see if it has been halted
+        while True:
+            LOGGER.debug('Opening new connection...')
+            try:
+                async for payload in _process_connection(session, pubsub_url):
+                    if not payload:
+                        pass  ### tbd?: event loop killed or hit EOF
+
+                    # We got a payload, so reset the DELAY.
+                    delay = 0.0
+
+                    yield payload
+
+            except (ConnectionRefusedError,
+                    aiohttp.ClientConnectorError,
+                    aiohttp.ServerTimeoutError,
+                    aiohttp.ClientPayloadError,
+                    ) as e:
+                LOGGER.error(f'Connection failed ({type(e).__name__}: {e})'
+                             f', reconnecting in {delay} seconds')
+                await asyncio.sleep(delay)
+
+                # Back off on the delay. Step it up from 0s, doubling each
+                # time, and top out at 30s retry. Steps: 0, 2, 6, 14, 30.
+                delay = min(30.0, (delay + 1.0) * 2)
+
+
+async def _process_connection(session, pubsub_url):
+    # Connect to pubsub and listen for payloads.
+    async with session.get(pubsub_url) as conn:
+
+        #print('LIMITS:', conn.content.get_read_buffer_limits())
+
+        while True:
+
+            # The pubsub server defines stream payloads as:
+            #    ENCODED_JSON(payload)+"\n"
+            #
+            # Due to the encoding, bare newlines will not occur
+            # within the encoded part. Thus, we can read content
+            # until we find a newline.
+            #
+            # Note: this newline is in RAW, but the json loader
+            # ignores it.
+            try:
+                raw = await conn.content.readuntil(b'\n')
+                raise ValueError("Chunk too big")
+            except ValueError as e:
+                LOGGER.error(f'Saw "{e}"; re-raising as ClientPayloadError to close/reconnect')
+                raise aiohttp.ClientPayloadError(f're-raised from ValueError in readuntil()')
+
+            if not raw:
+                # We just hit EOF.
+                yield None
+
+            yield json.loads(raw)
+
+
+def test_listening():
+    logging.basicConfig(level=logging.DEBUG)
+
+    import time
+    start = time.time()
+    count = 0
+
+    # We'll say "now" is when we believe the connection to be alive.
+    last_traffic = time.time()
+
+    async def report_stats():
+        while True:
+            # NOTE: do not set this lower than 60, or at startup,
+            # DURATION will be zero, creating a div-by-zero error.
+            await asyncio.sleep(70)
+
+            duration = int((time.time() - start) / 60)
+            alive_since = int(time.time() - last_traffic)
+            print(f'[{duration}m] {count} events.  {count/duration:.1f}/min'
+                  f'  traffic: {alive_since}s ago')
+
+    async def print_events():
+        async for payload in listen('https://pubsub.apache.org:2070/'):
+            nonlocal last_traffic
+            last_traffic = time.time()
+
+            if 'stillalive' not in payload:
+                print(f'PAYLOAD: [{payload.get("pubsub_path", "none")}]'
+                      f'  KEYS: {sorted(payload.keys())}')
+                nonlocal count
+                count += 1
+
+    async def run_test():
+        await asyncio.gather(report_stats(), print_events())
+
+    asyncio.run(run_test())
 
 
 class Listener:
@@ -104,8 +241,5 @@
 
 
 if __name__ == '__main__':
-    # For example:
-    # $ python3 pubsub.py http://pubsub.apache.org:2069/git
-    def print_payload(payload):
-        print('RECEIVED:', payload)
-    listen_forever(print_payload, sys.argv[1], debug=True, raw=True)
+    ### pass PUBSUB_URL ?
+    test_listening()