Merge pull request #1 from Humbedooh/master
Pull in 0.6.1 changes from humbedooh/pypubsub
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9cbd9d6..1978038 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,6 @@
# 0.6.1
- Added FS-backed persistant backlog storage for persisting backlog through restarts
+- Addressed issues with aiohttp pipe writes not being coroutine-safe
# 0.6.0
- Reworked configuration structure
diff --git a/pypubsub.py b/pypubsub.py
index 4c2d475..b6e8941 100644
--- a/pypubsub.py
+++ b/pypubsub.py
@@ -244,7 +244,7 @@
await asyncio.sleep(10)
def read_backlog_storage(self):
- if os.path.exists(self.config.backlog.storage):
+ if self.config.backlog.storage and os.path.exists(self.config.backlog.storage):
try:
readlines = 0
with open(self.config.backlog.storage, 'r') as fp:
@@ -296,6 +296,7 @@
self.connection = connection
self.acl = {}
self.server = server
+ self.lock = asyncio.Lock()
# Set topics subscribed to
self.topics = [x for x in request.path.split('/') if x]
@@ -351,7 +352,8 @@
js = b"%s\n" % json.dumps({"stillalive": time.time()}).encode('utf-8')
if self.old_school:
js += b"\0"
- await self.connection.write(js)
+ async with self.lock:
+ await self.connection.write(js)
class Payload:
@@ -391,9 +393,11 @@
if all(el in self.topics for el in sub.topics):
try:
if sub.old_school:
- await sub.connection.write(ojs)
+ async with sub.lock:
+ await sub.connection.write(ojs)
else:
- await sub.connection.write(js)
+ async with sub.lock:
+ await sub.connection.write(js)
except Exception:
bad_subs.append(sub)
return bad_subs