Merge pull request #1 from Humbedooh/master

Pull in 0.6.1 changes from humbedooh/pypubsub
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9cbd9d6..1978038 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,6 @@
 # 0.6.1
 - Added FS-backed persistant backlog storage for persisting backlog through restarts
+- Addressed issues with aiohttp pipe writes not being coroutine-safe
 
 # 0.6.0
 - Reworked configuration structure
diff --git a/pypubsub.py b/pypubsub.py
index 4c2d475..b6e8941 100644
--- a/pypubsub.py
+++ b/pypubsub.py
@@ -244,7 +244,7 @@
             await asyncio.sleep(10)
 
     def read_backlog_storage(self):
-        if os.path.exists(self.config.backlog.storage):
+        if self.config.backlog.storage and os.path.exists(self.config.backlog.storage):
             try:
                 readlines = 0
                 with open(self.config.backlog.storage, 'r') as fp:
@@ -296,6 +296,7 @@
         self.connection = connection
         self.acl = {}
         self.server = server
+        self.lock = asyncio.Lock()
 
         # Set topics subscribed to
         self.topics = [x for x in request.path.split('/') if x]
@@ -351,7 +352,8 @@
         js = b"%s\n" % json.dumps({"stillalive": time.time()}).encode('utf-8')
         if self.old_school:
             js += b"\0"
-        await self.connection.write(js)
+        async with self.lock:
+            await self.connection.write(js)
 
 
 class Payload:
@@ -391,9 +393,11 @@
             if all(el in self.topics for el in sub.topics):
                 try:
                     if sub.old_school:
-                        await sub.connection.write(ojs)
+                        async with sub.lock:
+                            await sub.connection.write(ojs)
                     else:
-                        await sub.connection.write(js)
+                        async with sub.lock:
+                            await sub.connection.write(js)
                 except Exception:
                     bad_subs.append(sub)
         return bad_subs