Merge pull request #3 from Humbedooh/master
Merge 0.6.3 from upstream
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1978038..a7ebfdd 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,10 @@
+
+# 0.6.3
+- Fixed an issue with payload delivery stalling due to client pipe timeouts
+
+# 0.6.2
+- Fixed a configuration issue with SQS storage
+
# 0.6.1
- Added FS-backed persistant backlog storage for persisting backlog through restarts
- Addressed issues with aiohttp pipe writes not being coroutine-safe
diff --git a/README.md b/README.md
index 21b88b0..fbd27e2 100644
--- a/README.md
+++ b/README.md
@@ -12,6 +12,7 @@
* [Listening for events via cURL](#listening-for-events-via-curl)
* [Listening for events via Python](#listening-for-events-via-python)
* [Listening for events via node.js](#listening-for-events-via-nodejs)
+ * [Listening for events via Ruby](#listening-for-events-via-ruby)
* [Accessing older payloads via the backlog catalogue](#accessing-older-payloads-via-the-backlog-catalogue)
- [Access-Control-List and private events](#access-control-list-and-private-events)
* [Pushing a private event](#pushing-a-private-event)
@@ -128,6 +129,53 @@
pps.attach(process);
~~~
+### Listening for events via Ruby
+Likewise, using Ruby is a pretty straightforward case:
+
+~~~ruby
+require 'net/http'
+require 'json'
+require 'thread'
+
+pubsub_URL = 'http://localhost:2069/'
+
+def do_stuff_with(event)
+ print("Got a pubsub event!:\n")
+ print(event)
+end
+
+def listen(url)
+ ps_thread = Thread.new do
+ begin
+ uri = URI.parse(url)
+ Net::HTTP.start(uri.host, uri.port) do |http|
+ request = Net::HTTP::Get.new uri.request_uri
+ http.request request do |response|
+ body = ''
+ response.read_body do |chunk|
+ event = JSON.parse(chunk)
+ if event['stillalive'] # pingback
+ print("ping? PONG!\n")
+ else
+ do_stuff_with(event)
+ end
+ end
+ end
+ end
+ end
+ return ps_thread
+end
+
+begin
+ ps_thread = listen(pubsub_URL)
+ print("Pubsub thread started, waiting for results...")
+ while ps_thread.alive?
+ sleep 10
+ end
+ print("Pubsub thread died :(\n")
+end
+~~~
+
### Accessing older payloads via the backlog catalogue
If configured, via the `payload_backlog_size` setting in the main configuration, clients can
request payloads that were pushed before they subscribed, using an `X-Fetch-Since` request
diff --git a/pypubsub.py b/pypubsub.py
index b6e8941..f530b1c 100644
--- a/pypubsub.py
+++ b/pypubsub.py
@@ -33,7 +33,7 @@
import plugins.sqs
# Some consts
-PUBSUB_VERSION = '0.6.1'
+PUBSUB_VERSION = '0.6.3'
PUBSUB_CONTENT_TYPE = 'application/vnd.pypubsub-stream'
PUBSUB_DEFAULT_PORT = 2069
PUBSUB_DEFAULT_IP = '0.0.0.0'
@@ -46,7 +46,7 @@
PUBSUB_NOT_ALLOWED = "You are not authorized to deliver payloads!\n"
PUBSUB_BAD_PAYLOAD = "Bad payload type. Payloads must be JSON dictionary objects, {..}!\n"
PUBSUB_PAYLOAD_TOO_LARGE = "Payload is too large for me to serve, please make it shorter.\n"
-
+PUBSUB_WRITE_TIMEOUT = 0.35 # If we can't deliver to a pipe within N seconds, drop it.
class Configuration:
def __init__(self, yml):
@@ -116,9 +116,12 @@
# Cull subscribers we couldn't deliver payload to.
for bad_sub in bad_subs:
print("Culling %r due to connection errors" % bad_sub)
- self.subscribers.remove(bad_sub)
+ try:
+ self.subscribers.remove(bad_sub)
+ except ValueError: # Already removed elsewhere
+ pass
self.pending_events = []
- await asyncio.sleep(0.5)
+ await asyncio.sleep(0.1)
async def handle_request(self, request):
"""Generic handler for all incoming HTTP requests"""
@@ -353,7 +356,7 @@
if self.old_school:
js += b"\0"
async with self.lock:
- await self.connection.write(js)
+ await asyncio.wait_for(self.connection.write(js), timeout=PUBSUB_WRITE_TIMEOUT)
class Payload:
@@ -394,10 +397,10 @@
try:
if sub.old_school:
async with sub.lock:
- await sub.connection.write(ojs)
+ await asyncio.wait_for(sub.connection.write(ojs), timeout=PUBSUB_WRITE_TIMEOUT)
else:
async with sub.lock:
- await sub.connection.write(js)
+ await asyncio.wait_for(sub.connection.write(js), timeout=PUBSUB_WRITE_TIMEOUT)
except Exception:
bad_subs.append(sub)
return bad_subs
diff --git a/requirements.txt b/requirements.txt
index e2e5bc3..c40df5d 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,5 +4,5 @@
python-ldap>=3.0.0
PyYAML~=5.1.2
aiobotocore~=1.0.4
-botocore~=1.16.4
-aiofile~=1.5.2
\ No newline at end of file
+botocore
+aiofile~=1.5.2