QPID-6567: [Python Client 0-8..0-91] ensure client won't send messages after/before sending flow-ok on suspending/resuming respectively. work by Lorenz Quack <quack.lorenz@gmail.com>
diff --git a/qpid/client.py b/qpid/client.py
index 3bbc097..5fedaa2 100644
--- a/qpid/client.py
+++ b/qpid/client.py
@@ -208,12 +208,12 @@
     ch.closed(msg)
 
   def channel_flow(self, ch, msg):
-    # On resuming we want to minimize the possibility of sending a message before flow-ok has been sent.
+    # On resuming we don't want to send a message before flow-ok has been sent.
     # Therefore, we send flow-ok before we set the flow_control flag.
     if msg.active:
         msg.flow_ok()
     ch.set_flow_control(not msg.active)
-    # On pausing we want to minimize the possibility of sending a message after flow-ok has been sent.
+    # On suspending we don't want to send a message after flow-ok has been sent.
     # Therefore, we send flow-ok after we set the flow_control flag.
     if not msg.active:
         msg.flow_ok()
diff --git a/qpid/peer.py b/qpid/peer.py
index 02986dc..7b1faff 100644
--- a/qpid/peer.py
+++ b/qpid/peer.py
@@ -232,7 +232,7 @@
     self.synchronous = True
 
     self._flow_control_wait_failure = options.get("qpid.flow_control_wait_failure", 60)
-    self._flow_control_wc = threading.Condition()
+    self._flow_control_wait_condition = threading.Condition()
     self._flow_control = False
 
   def closed(self, reason):
@@ -347,8 +347,12 @@
       self.futures[cmd_id] = future
 
     if frame.method.klass.name == "basic" and frame.method.name == "publish":
+      self._flow_control_wait_condition.acquire()
       self.check_flow_control()
-    self.write(frame, content)
+      self.write(frame, content)
+      self._flow_control_wait_condition.release()
+    else:
+      self.write(frame, content)
 
     try:
       # here we depend on all nowait fields being named nowait
@@ -392,21 +396,19 @@
 
   # part of flow control for AMQP 0-8, 0-9, and 0-9-1
   def set_flow_control(self, value):
-    self._flow_control_wc.acquire()
+    self._flow_control_wait_condition.acquire()
     self._flow_control = value
     if value == False:
-      self._flow_control_wc.notify()
-    self._flow_control_wc.release()
+      self._flow_control_wait_condition.notify()
+    self._flow_control_wait_condition.release()
 
   # part of flow control for AMQP 0-8, 0-9, and 0-9-1
   def check_flow_control(self):
-    self._flow_control_wc.acquire()
     if self._flow_control:
-      self._flow_control_wc.wait(self._flow_control_wait_failure)
+      self._flow_control_wait_condition.wait(self._flow_control_wait_failure)
     if self._flow_control:
-      self._flow_control_wc.release()
+      self._flow_control_wait_condition.release()
       raise Timeout("Unable to send message for " + str(self._flow_control_wait_failure) + " seconds due to broker enforced flow control")
-    self._flow_control_wc.release()
 
   def __getattr__(self, name):
     type = self.spec.method(name)