blob: e816c36843edebd8e5100bf4583ee4131a1d8389 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" PyPubSub listener class. """
#
# TYPICAL USAGE:
#
# async for payload in listen(PUBSUB_URL):
#
# This will produce a series of payloads, forever.
#
# NOTE: this listener is intended for pypubsub, which terminates
# payloads with a newline. The old svnpubsub used NUL characters,
# so this client will not work with that server.
#
import requests
import requests.exceptions
import json
import time
import sys
import asyncio
import logging
import aiohttp
LOGGER = logging.getLogger(__name__)
# The server sends keepalives every 5 seconds, so we should see
# activity well within this timeout period.
DEFAULT_INACTIVITY_TIMEOUT = 11
### for debug:
#DEFAULT_INACTIVITY_TIMEOUT = 4.5
async def listen(pubsub_url, username=None, password=None, timeout=None):
if username:
auth = aiohttp.BasicAuth(username, password)
else:
auth = None
if timeout is None:
timeout = DEFAULT_INACTIVITY_TIMEOUT
ct = aiohttp.ClientTimeout(sock_read=timeout)
async with aiohttp.ClientSession(auth=auth, timeout=ct) as session:
# Retry immediately, and then back it off.
delay = 0.0
### tbd: look at event loop, to see if it has been halted
while True:
LOGGER.debug('Opening new connection...')
try:
async for payload in _process_connection(session, pubsub_url):
if not payload:
pass ### tbd?: event loop killed or hit EOF
# We got a payload, so reset the DELAY.
delay = 0.0
yield payload
except (ConnectionRefusedError,
aiohttp.ClientConnectorError,
aiohttp.ServerTimeoutError,
aiohttp.ClientPayloadError,
) as e:
LOGGER.error(f'Connection failed ({type(e).__name__}: {e})'
f', reconnecting in {delay} seconds')
await asyncio.sleep(delay)
# Back off on the delay. Step it up from 0s, doubling each
# time, and top out at 30s retry. Steps: 0, 2, 6, 14, 30.
delay = min(30.0, (delay + 1.0) * 2)
async def _process_connection(session, pubsub_url):
# Connect to pubsub and listen for payloads.
async with session.get(pubsub_url) as conn:
#print('LIMITS:', conn.content.get_read_buffer_limits())
while True:
# The pubsub server defines stream payloads as:
# ENCODED_JSON(payload)+"\n"
#
# Due to the encoding, bare newlines will not occur
# within the encoded part. Thus, we can read content
# until we find a newline.
#
# Note: this newline is in RAW, but the json loader
# ignores it.
try:
raw = await conn.content.readuntil(b'\n')
raise ValueError("Chunk too big")
except ValueError as e:
LOGGER.error(f'Saw "{e}"; re-raising as ClientPayloadError to close/reconnect')
raise aiohttp.ClientPayloadError(f're-raised from ValueError in readuntil()')
if not raw:
# We just hit EOF.
yield None
yield json.loads(raw)
def test_listening():
logging.basicConfig(level=logging.DEBUG)
import time
start = time.time()
count = 0
# We'll say "now" is when we believe the connection to be alive.
last_traffic = time.time()
async def report_stats():
while True:
# NOTE: do not set this lower than 60, or at startup,
# DURATION will be zero, creating a div-by-zero error.
await asyncio.sleep(70)
duration = int((time.time() - start) / 60)
alive_since = int(time.time() - last_traffic)
print(f'[{duration}m] {count} events. {count/duration:.1f}/min'
f' traffic: {alive_since}s ago')
async def print_events():
async for payload in listen('https://pubsub.apache.org:2070/'):
nonlocal last_traffic
last_traffic = time.time()
if 'stillalive' not in payload:
print(f'PAYLOAD: [{payload.get("pubsub_path", "none")}]'
f' KEYS: {sorted(payload.keys())}')
nonlocal count
count += 1
async def run_test():
await asyncio.gather(report_stats(), print_events())
asyncio.run(run_test())
class Listener:
""" Generic listener for pubsubs. Grabs each payload and runs process() on them. """
def __init__(self, url):
self.url = url
self.connection = None
def attach(self, func, **kwargs):
raw = kwargs.get('raw', False)
debug = kwargs.get('debug', False)
since = kwargs.get('since', -1)
auth = kwargs.get('auth', None)
listen_forever(func, self.url, auth, raw, since, debug)
def listen_forever(func, url, auth=None, raw=False, since=-1, debug=False):
"""Listen on URL forever, calling FUNC for each payload.
### more docco about FUNC calling, AUTH, RAW, SINCE, DEBUG
"""
while True:
if debug:
message("[INFO] Subscribing to stream at %s\n", url, fp=sys.stdout)
connection = None
while not connection:
try:
headers = {
'User-Agent': 'python/asfpy'
}
if since != -1:
headers['X-Fetch-Since'] = str(since)
connection = requests.get(url, headers=headers, auth=auth, timeout=30, stream=True)
if debug:
message("[INFO] Subscribed, reading stream\n", fp=sys.stdout)
except requests.exceptions.RequestException:
message("[WARNING] Could not connect to pubsub service at %s,"
" retrying in 10s...\n", url, sleep=10)
continue
if not connection:
if debug:
message("[WARNING] %s did not respond with a streamable connection,"
" reconnecting in 10 seconds\n", url, sleep=10)
try:
body = ""
for chunk in connection.iter_content(chunk_size=None):
body += chunk.decode('utf-8', errors='ignore')
# pypubsub/gitpubsub payloads end in \n, svnpubsub payloads end in \0:
if body[-1] in ["\n", "\x00"]:
try:
payload = json.loads(body.rstrip("\r\n\x00"))
except ValueError as detail:
if debug:
message("[WARNING] Bad JSON or something: %s\n", detail)
# No payload. Circle back around for another.
payload = None
if not raw and isinstance(payload, dict):
payload = payload.get('payload')
if payload:
# Since we have a valid payload, we do not want to repeat it.
# Thus, set `since` to -1 now, so as to not have an x-fetch-since
# header on the next retry in case this connection fails at some point.
since = -1
func(payload)
body = ""
except requests.exceptions.RequestException:
if debug:
message("[WARNING] Disconnected from %s, reconnecting\n", url, sleep=2)
continue
if debug:
message("Connection to %s was closed, reconnecting in 10 seconds\n", url, sleep=10)
def message(fmt, *args, sleep=None, fp=sys.stderr):
fp.write(fmt % args)
fp.flush()
if sleep:
time.sleep(sleep)
if __name__ == '__main__':
### pass PUBSUB_URL ?
test_listening()