Merge pull request #3 from Humbedooh/master

Merge 0.6.3 from upstream
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1978038..a7ebfdd 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,10 @@
+
+# 0.6.3
+- Fixed an issue with payload delivery stalling due to client pipe timeouts
+
+# 0.6.2
+- Fixed a configuration issue with SQS storage
+
 # 0.6.1
 - Added FS-backed persistant backlog storage for persisting backlog through restarts
 - Addressed issues with aiohttp pipe writes not being coroutine-safe
diff --git a/README.md b/README.md
index 21b88b0..fbd27e2 100644
--- a/README.md
+++ b/README.md
@@ -12,6 +12,7 @@
   * [Listening for events via cURL](#listening-for-events-via-curl)
   * [Listening for events via Python](#listening-for-events-via-python)
   * [Listening for events via node.js](#listening-for-events-via-nodejs)
+  * [Listening for events via Ruby](#listening-for-events-via-ruby)
   * [Accessing older payloads via the backlog catalogue](#accessing-older-payloads-via-the-backlog-catalogue)
 - [Access-Control-List and private events](#access-control-list-and-private-events)
   * [Pushing a private event](#pushing-a-private-event)
@@ -128,6 +129,53 @@
 pps.attach(process);
 ~~~
 
+### Listening for events via Ruby
+Likewise, using Ruby is a pretty straightforward case:
+
+~~~ruby
+require 'net/http'
+require 'json'
+require 'thread'
+
+pubsub_URL = 'http://localhost:2069/'
+
+def do_stuff_with(event)
+  print("Got a pubsub event!:\n")
+  print(event)
+end
+
+def listen(url)
+  ps_thread = Thread.new do
+    begin
+      uri = URI.parse(url)
+      Net::HTTP.start(uri.host, uri.port) do |http|
+        request = Net::HTTP::Get.new uri.request_uri
+        http.request request do |response|
+          body = ''
+          response.read_body do |chunk|
+            event = JSON.parse(chunk)
+            if event['stillalive']  # pingback
+              print("ping? PONG!\n")
+            else
+              do_stuff_with(event)
+            end
+          end
+        end
+      end
+  end
+  return ps_thread
+end
+
+begin
+  ps_thread = listen(pubsub_URL)
+  print("Pubsub thread started, waiting for results...")
+  while ps_thread.alive?
+    sleep 10
+  end
+  print("Pubsub thread died :(\n")
+end
+~~~
+
 ### Accessing older payloads via the backlog catalogue
 If configured, via the `payload_backlog_size` setting in the main configuration, clients can 
 request payloads that were pushed before they subscribed, using an `X-Fetch-Since` request 
diff --git a/pypubsub.py b/pypubsub.py
index b6e8941..f530b1c 100644
--- a/pypubsub.py
+++ b/pypubsub.py
@@ -33,7 +33,7 @@
 import plugins.sqs
 
 # Some consts
-PUBSUB_VERSION = '0.6.1'
+PUBSUB_VERSION = '0.6.3'
 PUBSUB_CONTENT_TYPE = 'application/vnd.pypubsub-stream'
 PUBSUB_DEFAULT_PORT = 2069
 PUBSUB_DEFAULT_IP = '0.0.0.0'
@@ -46,7 +46,7 @@
 PUBSUB_NOT_ALLOWED = "You are not authorized to deliver payloads!\n"
 PUBSUB_BAD_PAYLOAD = "Bad payload type. Payloads must be JSON dictionary objects, {..}!\n"
 PUBSUB_PAYLOAD_TOO_LARGE = "Payload is too large for me to serve, please make it shorter.\n"
-
+PUBSUB_WRITE_TIMEOUT = 0.35  # If we can't deliver to a pipe within N seconds, drop it.
 
 class Configuration:
     def __init__(self, yml):
@@ -116,9 +116,12 @@
                 # Cull subscribers we couldn't deliver payload to.
                 for bad_sub in bad_subs:
                     print("Culling %r due to connection errors" % bad_sub)
-                    self.subscribers.remove(bad_sub)
+                    try:
+                        self.subscribers.remove(bad_sub)
+                    except ValueError:  # Already removed elsewhere
+                        pass
             self.pending_events = []
-            await asyncio.sleep(0.5)
+            await asyncio.sleep(0.1)
 
     async def handle_request(self, request):
         """Generic handler for all incoming HTTP requests"""
@@ -353,7 +356,7 @@
         if self.old_school:
             js += b"\0"
         async with self.lock:
-            await self.connection.write(js)
+            await asyncio.wait_for(self.connection.write(js), timeout=PUBSUB_WRITE_TIMEOUT)
 
 
 class Payload:
@@ -394,10 +397,10 @@
                 try:
                     if sub.old_school:
                         async with sub.lock:
-                            await sub.connection.write(ojs)
+                            await asyncio.wait_for(sub.connection.write(ojs), timeout=PUBSUB_WRITE_TIMEOUT)
                     else:
                         async with sub.lock:
-                            await sub.connection.write(js)
+                            await asyncio.wait_for(sub.connection.write(js), timeout=PUBSUB_WRITE_TIMEOUT)
                 except Exception:
                     bad_subs.append(sub)
         return bad_subs
diff --git a/requirements.txt b/requirements.txt
index e2e5bc3..c40df5d 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,5 +4,5 @@
 python-ldap>=3.0.0
 PyYAML~=5.1.2
 aiobotocore~=1.0.4
-botocore~=1.16.4
-aiofile~=1.5.2
\ No newline at end of file
+botocore
+aiofile~=1.5.2