Add async pubsub client.
This comes from infrastructure-qbot/pubsub.py #5d653be
diff --git a/asfpy/pubsub.py b/asfpy/pubsub.py
index 6b78cbf..e816c36 100644
--- a/asfpy/pubsub.py
+++ b/asfpy/pubsub.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -14,13 +14,150 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
""" PyPubSub listener class. """
+#
+# TYPICAL USAGE:
+#
+# async for payload in listen(PUBSUB_URL):
+#
+# This will produce a series of payloads, forever.
+#
+# NOTE: this listener is intended for pypubsub, which terminates
+# payloads with a newline. The old svnpubsub used NUL characters,
+# so this client will not work with that server.
+#
import requests
import requests.exceptions
import json
import time
import sys
+import asyncio
+import logging
+
+import aiohttp
+
+
+LOGGER = logging.getLogger(__name__)
+
+# The server sends keepalives every 5 seconds, so we should see
+# activity well within this timeout period.
+DEFAULT_INACTIVITY_TIMEOUT = 11
+### for debug:
+#DEFAULT_INACTIVITY_TIMEOUT = 4.5
+
+
+async def listen(pubsub_url, username=None, password=None, timeout=None):
+
+ if username:
+ auth = aiohttp.BasicAuth(username, password)
+ else:
+ auth = None
+
+ if timeout is None:
+ timeout = DEFAULT_INACTIVITY_TIMEOUT
+ ct = aiohttp.ClientTimeout(sock_read=timeout)
+
+ async with aiohttp.ClientSession(auth=auth, timeout=ct) as session:
+
+ # Retry immediately, and then back it off.
+ delay = 0.0
+
+ ### tbd: look at event loop, to see if it has been halted
+ while True:
+ LOGGER.debug('Opening new connection...')
+ try:
+ async for payload in _process_connection(session, pubsub_url):
+ if not payload:
+ pass ### tbd?: event loop killed or hit EOF
+
+ # We got a payload, so reset the DELAY.
+ delay = 0.0
+
+ yield payload
+
+ except (ConnectionRefusedError,
+ aiohttp.ClientConnectorError,
+ aiohttp.ServerTimeoutError,
+ aiohttp.ClientPayloadError,
+ ) as e:
+ LOGGER.error(f'Connection failed ({type(e).__name__}: {e})'
+ f', reconnecting in {delay} seconds')
+ await asyncio.sleep(delay)
+
+ # Back off on the delay. Step it up from 0s, doubling each
+ # time, and top out at 30s retry. Steps: 0, 2, 6, 14, 30.
+ delay = min(30.0, (delay + 1.0) * 2)
+
+
+async def _process_connection(session, pubsub_url):
+ # Connect to pubsub and listen for payloads.
+ async with session.get(pubsub_url) as conn:
+
+ #print('LIMITS:', conn.content.get_read_buffer_limits())
+
+ while True:
+
+ # The pubsub server defines stream payloads as:
+ # ENCODED_JSON(payload)+"\n"
+ #
+ # Due to the encoding, bare newlines will not occur
+ # within the encoded part. Thus, we can read content
+ # until we find a newline.
+ #
+ # Note: this newline is in RAW, but the json loader
+ # ignores it.
+ try:
+ raw = await conn.content.readuntil(b'\n')
+ raise ValueError("Chunk too big")
+ except ValueError as e:
+ LOGGER.error(f'Saw "{e}"; re-raising as ClientPayloadError to close/reconnect')
+ raise aiohttp.ClientPayloadError(f're-raised from ValueError in readuntil()')
+
+ if not raw:
+ # We just hit EOF.
+ yield None
+
+ yield json.loads(raw)
+
+
+def test_listening():
+ logging.basicConfig(level=logging.DEBUG)
+
+ import time
+ start = time.time()
+ count = 0
+
+ # We'll say "now" is when we believe the connection to be alive.
+ last_traffic = time.time()
+
+ async def report_stats():
+ while True:
+ # NOTE: do not set this lower than 60, or at startup,
+ # DURATION will be zero, creating a div-by-zero error.
+ await asyncio.sleep(70)
+
+ duration = int((time.time() - start) / 60)
+ alive_since = int(time.time() - last_traffic)
+ print(f'[{duration}m] {count} events. {count/duration:.1f}/min'
+ f' traffic: {alive_since}s ago')
+
+ async def print_events():
+ async for payload in listen('https://pubsub.apache.org:2070/'):
+ nonlocal last_traffic
+ last_traffic = time.time()
+
+ if 'stillalive' not in payload:
+ print(f'PAYLOAD: [{payload.get("pubsub_path", "none")}]'
+ f' KEYS: {sorted(payload.keys())}')
+ nonlocal count
+ count += 1
+
+ async def run_test():
+ await asyncio.gather(report_stats(), print_events())
+
+ asyncio.run(run_test())
class Listener:
@@ -104,8 +241,5 @@
if __name__ == '__main__':
- # For example:
- # $ python3 pubsub.py http://pubsub.apache.org:2069/git
- def print_payload(payload):
- print('RECEIVED:', payload)
- listen_forever(print_payload, sys.argv[1], debug=True, raw=True)
+ ### pass PUBSUB_URL ?
+ test_listening()