| |
| <!DOCTYPE html> |
| |
| <html> |
| <head> |
| <meta charset="utf-8" /> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0" /> |
| <title>proton._utils — Qpid Proton Python API 0.32.0 documentation</title> |
| <link rel="stylesheet" href="../../_static/sphinxdoc.css" type="text/css" /> |
| <link rel="stylesheet" href="../../_static/pygments.css" type="text/css" /> |
| <script id="documentation_options" data-url_root="../../" src="../../_static/documentation_options.js"></script> |
| <script src="../../_static/jquery.js"></script> |
| <script src="../../_static/underscore.js"></script> |
| <script src="../../_static/doctools.js"></script> |
| <script src="../../_static/language_data.js"></script> |
| <script async="async" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.7/latest.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| <link rel="index" title="Index" href="../../genindex.html" /> |
| <link rel="search" title="Search" href="../../search.html" /> |
| </head><body> |
| <div class="related" role="navigation" aria-label="related navigation"> |
| <h3>Navigation</h3> |
| <ul> |
| <li class="right" style="margin-right: 10px"> |
| <a href="../../genindex.html" title="General Index" |
| accesskey="I">index</a></li> |
| <li class="nav-item nav-item-0"><a href="../../index.html">Qpid Proton Python API 0.32.0 documentation</a> »</li> |
| <li class="nav-item nav-item-1"><a href="../index.html" accesskey="U">Module code</a> »</li> |
| <li class="nav-item nav-item-this"><a href="">proton._utils</a></li> |
| </ul> |
| </div> |
| |
| <div class="document"> |
| <div class="documentwrapper"> |
| <div class="bodywrapper"> |
| <div class="body" role="main"> |
| |
| <h1>Source code for proton._utils</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one</span> |
| <span class="c1"># or more contributor license agreements. See the NOTICE file</span> |
| <span class="c1"># distributed with this work for additional information</span> |
| <span class="c1"># regarding copyright ownership. The ASF licenses this file</span> |
| <span class="c1"># to you under the Apache License, Version 2.0 (the</span> |
| <span class="c1"># "License"); you may not use this file except in compliance</span> |
| <span class="c1"># with the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing,</span> |
| <span class="c1"># software distributed under the License is distributed on an</span> |
| <span class="c1"># "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span> |
| <span class="c1"># KIND, either express or implied. See the License for the</span> |
| <span class="c1"># specific language governing permissions and limitations</span> |
| <span class="c1"># under the License.</span> |
| <span class="c1">#</span> |
| |
| <span class="kn">from</span> <span class="nn">__future__</span> <span class="kn">import</span> <span class="n">absolute_import</span> |
| |
| <span class="kn">import</span> <span class="nn">collections</span> |
| <span class="kn">import</span> <span class="nn">time</span> |
| <span class="kn">import</span> <span class="nn">threading</span> |
| |
| <span class="kn">from</span> <span class="nn">._exceptions</span> <span class="kn">import</span> <span class="n">ProtonException</span><span class="p">,</span> <span class="n">ConnectionException</span><span class="p">,</span> <span class="n">LinkException</span><span class="p">,</span> <span class="n">Timeout</span> |
| <span class="kn">from</span> <span class="nn">._delivery</span> <span class="kn">import</span> <span class="n">Delivery</span> |
| <span class="kn">from</span> <span class="nn">._endpoints</span> <span class="kn">import</span> <span class="n">Endpoint</span><span class="p">,</span> <span class="n">Link</span> |
| <span class="kn">from</span> <span class="nn">._events</span> <span class="kn">import</span> <span class="n">Handler</span> |
| <span class="kn">from</span> <span class="nn">._url</span> <span class="kn">import</span> <span class="n">Url</span> |
| |
| <span class="kn">from</span> <span class="nn">._reactor</span> <span class="kn">import</span> <span class="n">Container</span> |
| <span class="kn">from</span> <span class="nn">._handlers</span> <span class="kn">import</span> <span class="n">MessagingHandler</span><span class="p">,</span> <span class="n">IncomingMessageHandler</span> |
| |
| |
| <span class="k">class</span> <span class="nc">BlockingLink</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">connection</span><span class="p">,</span> <span class="n">link</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">connection</span> <span class="o">=</span> <span class="n">connection</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">link</span> <span class="o">=</span> <span class="n">link</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">wait</span><span class="p">(</span><span class="k">lambda</span><span class="p">:</span> <span class="ow">not</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">state</span> <span class="o">&</span> <span class="n">Endpoint</span><span class="o">.</span><span class="n">REMOTE_UNINIT</span><span class="p">),</span> |
| <span class="n">msg</span><span class="o">=</span><span class="s2">"Opening link </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="n">link</span><span class="o">.</span><span class="n">name</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_checkClosed</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">_waitForClose</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timeout</span><span class="o">=</span><span class="mi">1</span><span class="p">):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">wait</span><span class="p">(</span><span class="k">lambda</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">state</span> <span class="o">&</span> <span class="n">Endpoint</span><span class="o">.</span><span class="n">REMOTE_CLOSED</span><span class="p">,</span> |
| <span class="n">timeout</span><span class="o">=</span><span class="n">timeout</span><span class="p">,</span> |
| <span class="n">msg</span><span class="o">=</span><span class="s2">"Opening link </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">name</span><span class="p">)</span> |
| <span class="k">except</span> <span class="n">Timeout</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="k">pass</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_checkClosed</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">_checkClosed</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">state</span> <span class="o">&</span> <span class="n">Endpoint</span><span class="o">.</span><span class="n">REMOTE_CLOSED</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">closing</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">LinkDetached</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">close</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Close the link.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">wait</span><span class="p">(</span><span class="k">lambda</span><span class="p">:</span> <span class="ow">not</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">state</span> <span class="o">&</span> <span class="n">Endpoint</span><span class="o">.</span><span class="n">REMOTE_ACTIVE</span><span class="p">),</span> |
| <span class="n">msg</span><span class="o">=</span><span class="s2">"Closing link </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">name</span><span class="p">)</span> |
| |
| <span class="c1"># Access to other link attributes.</span> |
| <span class="k">def</span> <span class="fm">__getattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">):</span> |
| <span class="k">return</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="SendException"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.SendException">[docs]</a><span class="k">class</span> <span class="nc">SendException</span><span class="p">(</span><span class="n">ProtonException</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Exception used to indicate an exceptional state/condition on a send request.</span> |
| |
| <span class="sd"> :param state: The delivery state which caused the exception.</span> |
| <span class="sd"> :type state: ``int``</span> |
| <span class="sd"> """</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">state</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">state</span></div> |
| |
| |
| <span class="k">def</span> <span class="nf">_is_settled</span><span class="p">(</span><span class="n">delivery</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">delivery</span><span class="o">.</span><span class="n">settled</span> <span class="ow">or</span> <span class="n">delivery</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">snd_settle_mode</span> <span class="o">==</span> <span class="n">Link</span><span class="o">.</span><span class="n">SND_SETTLED</span> |
| |
| |
| <div class="viewcode-block" id="BlockingSender"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingSender">[docs]</a><span class="k">class</span> <span class="nc">BlockingSender</span><span class="p">(</span><span class="n">BlockingLink</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A synchronous sender wrapper. This is typically created by calling</span> |
| <span class="sd"> :meth:`BlockingConnection.create_sender`.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">connection</span><span class="p">,</span> <span class="n">sender</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">BlockingSender</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">connection</span><span class="p">,</span> <span class="n">sender</span><span class="p">)</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">target</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">target</span><span class="o">.</span><span class="n">address</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">target</span><span class="o">.</span><span class="n">address</span> <span class="o">!=</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_target</span><span class="o">.</span><span class="n">address</span><span class="p">:</span> |
| <span class="c1"># this may be followed by a detach, which may contain an error condition, so wait a little...</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_waitForClose</span><span class="p">()</span> |
| <span class="c1"># ...but close ourselves if peer does not</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="k">raise</span> <span class="n">LinkException</span><span class="p">(</span><span class="s2">"Failed to open sender </span><span class="si">%s</span><span class="s2">, target does not match"</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">name</span><span class="p">)</span> |
| |
| <div class="viewcode-block" id="BlockingSender.send"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingSender.send">[docs]</a> <span class="k">def</span> <span class="nf">send</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">msg</span><span class="p">,</span> <span class="n">timeout</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">error_states</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Blocking send which will return only when the send is complete</span> |
| <span class="sd"> and the message settled.</span> |
| |
| <span class="sd"> :param timeout: Timeout in seconds. If ``False``, the value of ``timeout`` used in the</span> |
| <span class="sd"> constructor of the :class:`BlockingConnection` object used in the constructor will be used.</span> |
| <span class="sd"> If ``None``, there is no timeout. Any other value is treated as a timeout in seconds.</span> |
| <span class="sd"> :type timeout: ``None``, ``False``, ``float``</span> |
| <span class="sd"> :param error_states: List of delivery flags which when present in Delivery object</span> |
| <span class="sd"> will cause a :class:`SendException` exception to be raised. If ``None``, these</span> |
| <span class="sd"> will default to a list containing :const:`proton.Delivery.REJECTED` and :const:`proton.Delivery.RELEASED`.</span> |
| <span class="sd"> :type error_states: ``list``</span> |
| <span class="sd"> :return: Delivery object for this message.</span> |
| <span class="sd"> :rtype: :class:`proton.Delivery`</span> |
| <span class="sd"> """</span> |
| <span class="n">delivery</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">send</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">wait</span><span class="p">(</span><span class="k">lambda</span><span class="p">:</span> <span class="n">_is_settled</span><span class="p">(</span><span class="n">delivery</span><span class="p">),</span> <span class="n">msg</span><span class="o">=</span><span class="s2">"Sending on sender </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">name</span><span class="p">,</span> |
| <span class="n">timeout</span><span class="o">=</span><span class="n">timeout</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">delivery</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">snd_settle_mode</span> <span class="o">!=</span> <span class="n">Link</span><span class="o">.</span><span class="n">SND_SETTLED</span><span class="p">:</span> |
| <span class="n">delivery</span><span class="o">.</span><span class="n">settle</span><span class="p">()</span> |
| <span class="n">bad</span> <span class="o">=</span> <span class="n">error_states</span> |
| <span class="k">if</span> <span class="n">bad</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">bad</span> <span class="o">=</span> <span class="p">[</span><span class="n">Delivery</span><span class="o">.</span><span class="n">REJECTED</span><span class="p">,</span> <span class="n">Delivery</span><span class="o">.</span><span class="n">RELEASED</span><span class="p">]</span> |
| <span class="k">if</span> <span class="n">delivery</span><span class="o">.</span><span class="n">remote_state</span> <span class="ow">in</span> <span class="n">bad</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">SendException</span><span class="p">(</span><span class="n">delivery</span><span class="o">.</span><span class="n">remote_state</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">delivery</span></div></div> |
| |
| |
| <span class="k">class</span> <span class="nc">Fetcher</span><span class="p">(</span><span class="n">MessagingHandler</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A message handler for blocking receivers.</span> |
| |
| <span class="sd"> :param connection:</span> |
| <span class="sd"> :type connection: :class:</span> |
| <span class="sd"> :param prefetch:</span> |
| <span class="sd"> :type prefetch:</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">connection</span><span class="p">,</span> <span class="n">prefetch</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">Fetcher</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">prefetch</span><span class="o">=</span><span class="n">prefetch</span><span class="p">,</span> <span class="n">auto_accept</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">connection</span> <span class="o">=</span> <span class="n">connection</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">incoming</span> <span class="o">=</span> <span class="n">collections</span><span class="o">.</span><span class="n">deque</span><span class="p">([])</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">unsettled</span> <span class="o">=</span> <span class="n">collections</span><span class="o">.</span><span class="n">deque</span><span class="p">([])</span> |
| |
| <span class="k">def</span> <span class="nf">on_message</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">incoming</span><span class="o">.</span><span class="n">append</span><span class="p">((</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="p">,</span> <span class="n">event</span><span class="o">.</span><span class="n">delivery</span><span class="p">))</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">yield_</span><span class="p">()</span> <span class="c1"># Wake up the wait() loop to handle the message.</span> |
| |
| <span class="k">def</span> <span class="nf">on_link_error</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">state</span> <span class="o">&</span> <span class="n">Endpoint</span><span class="o">.</span><span class="n">LOCAL_ACTIVE</span><span class="p">:</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">closing</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">LinkDetached</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">on_connection_error</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">closing</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">ConnectionClosed</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="p">)</span> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">has_message</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> The number of messages that have been received and are waiting to be</span> |
| <span class="sd"> retrieved with :meth:`pop`.</span> |
| |
| <span class="sd"> :type: ``int``</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">incoming</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">pop</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Get the next available incoming message. If the message is unsettled, its</span> |
| <span class="sd"> delivery object is moved onto the unsettled queue, and can be settled with</span> |
| <span class="sd"> a call to :meth:`settle`.</span> |
| |
| <span class="sd"> :rtype: :class:`proton.Message`</span> |
| <span class="sd"> """</span> |
| <span class="n">message</span><span class="p">,</span> <span class="n">delivery</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">incoming</span><span class="o">.</span><span class="n">popleft</span><span class="p">()</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">delivery</span><span class="o">.</span><span class="n">settled</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">unsettled</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">delivery</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">message</span> |
| |
| <span class="k">def</span> <span class="nf">settle</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">state</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Settle the next message previously taken with :meth:`pop`.</span> |
| |
| <span class="sd"> :param state:</span> |
| <span class="sd"> :type state:</span> |
| <span class="sd"> """</span> |
| <span class="n">delivery</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">unsettled</span><span class="o">.</span><span class="n">popleft</span><span class="p">()</span> |
| <span class="k">if</span> <span class="n">state</span><span class="p">:</span> |
| <span class="n">delivery</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">state</span><span class="p">)</span> |
| <span class="n">delivery</span><span class="o">.</span><span class="n">settle</span><span class="p">()</span> |
| |
| |
| <div class="viewcode-block" id="BlockingReceiver"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingReceiver">[docs]</a><span class="k">class</span> <span class="nc">BlockingReceiver</span><span class="p">(</span><span class="n">BlockingLink</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A synchronous receiver wrapper. This is typically created by calling</span> |
| <span class="sd"> :meth:`BlockingConnection.create_receiver`.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">connection</span><span class="p">,</span> <span class="n">receiver</span><span class="p">,</span> <span class="n">fetcher</span><span class="p">,</span> <span class="n">credit</span><span class="o">=</span><span class="mi">1</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">BlockingReceiver</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">connection</span><span class="p">,</span> <span class="n">receiver</span><span class="p">)</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">source</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span> <span class="o">!=</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_source</span><span class="o">.</span><span class="n">address</span><span class="p">:</span> |
| <span class="c1"># this may be followed by a detach, which may contain an error condition, so wait a little...</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_waitForClose</span><span class="p">()</span> |
| <span class="c1"># ...but close ourselves if peer does not</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="k">raise</span> <span class="n">LinkException</span><span class="p">(</span><span class="s2">"Failed to open receiver </span><span class="si">%s</span><span class="s2">, source does not match"</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">name</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">credit</span><span class="p">:</span> <span class="n">receiver</span><span class="o">.</span><span class="n">flow</span><span class="p">(</span><span class="n">credit</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">fetcher</span> <span class="o">=</span> <span class="n">fetcher</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span> <span class="o">=</span> <span class="n">connection</span><span class="o">.</span><span class="n">container</span> |
| |
| <span class="k">def</span> <span class="fm">__del__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">fetcher</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="c1"># The next line causes a core dump if the Proton-C reactor finalizes</span> |
| <span class="c1"># first. The self.container reference prevents out of order reactor</span> |
| <span class="c1"># finalization. It may not be set if exception in BlockingLink.__init__</span> |
| <span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s2">"container"</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">handler</span> <span class="o">=</span> <span class="kc">None</span> <span class="c1"># implicit call to reactor</span> |
| |
| <div class="viewcode-block" id="BlockingReceiver.receive"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingReceiver.receive">[docs]</a> <span class="k">def</span> <span class="nf">receive</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timeout</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Blocking receive call which will return only when a message is received or</span> |
| <span class="sd"> a timeout (if supplied) occurs.</span> |
| |
| <span class="sd"> :param timeout: Timeout in seconds. If ``False``, the value of ``timeout`` used in the</span> |
| <span class="sd"> constructor of the :class:`BlockingConnection` object used in the constructor will be used.</span> |
| <span class="sd"> If ``None``, there is no timeout. Any other value is treated as a timeout in seconds.</span> |
| <span class="sd"> :type timeout: ``None``, ``False``, ``float``</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">fetcher</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">Exception</span><span class="p">(</span><span class="s2">"Can't call receive on this receiver as a handler was not provided"</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">credit</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">flow</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">wait</span><span class="p">(</span><span class="k">lambda</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">fetcher</span><span class="o">.</span><span class="n">has_message</span><span class="p">,</span> <span class="n">msg</span><span class="o">=</span><span class="s2">"Receiving on receiver </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">name</span><span class="p">,</span> |
| <span class="n">timeout</span><span class="o">=</span><span class="n">timeout</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">fetcher</span><span class="o">.</span><span class="n">pop</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="BlockingReceiver.accept"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingReceiver.accept">[docs]</a> <span class="k">def</span> <span class="nf">accept</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Accept and settle the received message. The delivery is set to</span> |
| <span class="sd"> :const:`proton.Delivery.ACCEPTED`.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">settle</span><span class="p">(</span><span class="n">Delivery</span><span class="o">.</span><span class="n">ACCEPTED</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BlockingReceiver.reject"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingReceiver.reject">[docs]</a> <span class="k">def</span> <span class="nf">reject</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Reject the received message. The delivery is set to</span> |
| <span class="sd"> :const:`proton.Delivery.REJECTED`.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">settle</span><span class="p">(</span><span class="n">Delivery</span><span class="o">.</span><span class="n">REJECTED</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BlockingReceiver.release"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingReceiver.release">[docs]</a> <span class="k">def</span> <span class="nf">release</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">delivered</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Release the received message.</span> |
| |
| <span class="sd"> :param delivered: If ``True``, the message delivery is being set to</span> |
| <span class="sd"> :const:`proton.Delivery.MODIFIED`, ie being returned to the sender</span> |
| <span class="sd"> and annotated. If ``False``, the message is returned without</span> |
| <span class="sd"> annotations and the delivery set to :const:`proton.Delivery.RELEASED`.</span> |
| <span class="sd"> :type delivered: ``bool``</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">delivered</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">settle</span><span class="p">(</span><span class="n">Delivery</span><span class="o">.</span><span class="n">MODIFIED</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">settle</span><span class="p">(</span><span class="n">Delivery</span><span class="o">.</span><span class="n">RELEASED</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BlockingReceiver.settle"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingReceiver.settle">[docs]</a> <span class="k">def</span> <span class="nf">settle</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">state</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Settle any received messages.</span> |
| |
| <span class="sd"> :param state: Update the delivery of all unsettled messages with the</span> |
| <span class="sd"> supplied state, then settle them.</span> |
| <span class="sd"> :type state: ``None`` or a valid delivery state (see</span> |
| <span class="sd"> :class:`proton.Delivery`.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">fetcher</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">Exception</span><span class="p">(</span><span class="s2">"Can't call accept/reject etc on this receiver as a handler was not provided"</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">fetcher</span><span class="o">.</span><span class="n">settle</span><span class="p">(</span><span class="n">state</span><span class="p">)</span></div></div> |
| |
| |
| <div class="viewcode-block" id="LinkDetached"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.LinkDetached">[docs]</a><span class="k">class</span> <span class="nc">LinkDetached</span><span class="p">(</span><span class="n">LinkException</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> The exception raised when the remote peer unexpectedly closes a link in a blocking</span> |
| <span class="sd"> context, or an unexpected link error occurs.</span> |
| |
| <span class="sd"> :param link: The link which closed unexpectedly.</span> |
| <span class="sd"> :type link: :class:`proton.Link`</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">link</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">link</span> <span class="o">=</span> <span class="n">link</span> |
| <span class="k">if</span> <span class="n">link</span><span class="o">.</span><span class="n">is_sender</span><span class="p">:</span> |
| <span class="n">txt</span> <span class="o">=</span> <span class="s2">"sender </span><span class="si">%s</span><span class="s2"> to </span><span class="si">%s</span><span class="s2"> closed"</span> <span class="o">%</span> <span class="p">(</span><span class="n">link</span><span class="o">.</span><span class="n">name</span><span class="p">,</span> <span class="n">link</span><span class="o">.</span><span class="n">target</span><span class="o">.</span><span class="n">address</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">txt</span> <span class="o">=</span> <span class="s2">"receiver </span><span class="si">%s</span><span class="s2"> from </span><span class="si">%s</span><span class="s2"> closed"</span> <span class="o">%</span> <span class="p">(</span><span class="n">link</span><span class="o">.</span><span class="n">name</span><span class="p">,</span> <span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">link</span><span class="o">.</span><span class="n">remote_condition</span><span class="p">:</span> |
| <span class="n">txt</span> <span class="o">+=</span> <span class="s2">" due to: </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="n">link</span><span class="o">.</span><span class="n">remote_condition</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">condition</span> <span class="o">=</span> <span class="n">link</span><span class="o">.</span><span class="n">remote_condition</span><span class="o">.</span><span class="n">name</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">txt</span> <span class="o">+=</span> <span class="s2">" by peer"</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">condition</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">LinkDetached</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">txt</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="ConnectionClosed"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.ConnectionClosed">[docs]</a><span class="k">class</span> <span class="nc">ConnectionClosed</span><span class="p">(</span><span class="n">ConnectionException</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> The exception raised when the remote peer unexpectedly closes a connection in a blocking</span> |
| <span class="sd"> context, or an unexpected connection error occurs.</span> |
| |
| <span class="sd"> :param connection: The connection which closed unexpectedly.</span> |
| <span class="sd"> :type connection: :class:`proton.Connection`</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">connection</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">connection</span> <span class="o">=</span> <span class="n">connection</span> |
| <span class="n">txt</span> <span class="o">=</span> <span class="s2">"Connection </span><span class="si">%s</span><span class="s2"> closed"</span> <span class="o">%</span> <span class="n">connection</span><span class="o">.</span><span class="n">hostname</span> |
| <span class="k">if</span> <span class="n">connection</span><span class="o">.</span><span class="n">remote_condition</span><span class="p">:</span> |
| <span class="n">txt</span> <span class="o">+=</span> <span class="s2">" due to: </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="n">connection</span><span class="o">.</span><span class="n">remote_condition</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">condition</span> <span class="o">=</span> <span class="n">connection</span><span class="o">.</span><span class="n">remote_condition</span><span class="o">.</span><span class="n">name</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">txt</span> <span class="o">+=</span> <span class="s2">" by peer"</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">condition</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">ConnectionClosed</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">txt</span><span class="p">)</span></div> |
| |
| |
| <div class="viewcode-block" id="BlockingConnection"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingConnection">[docs]</a><span class="k">class</span> <span class="nc">BlockingConnection</span><span class="p">(</span><span class="n">Handler</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> A synchronous style connection wrapper.</span> |
| |
| <span class="sd"> This object's implementation uses OS resources. To ensure they</span> |
| <span class="sd"> are released when the object is no longer in use, make sure that</span> |
| <span class="sd"> object operations are enclosed in a try block and that close() is</span> |
| <span class="sd"> always executed on exit.</span> |
| |
| <span class="sd"> :param url: Connection URL</span> |
| <span class="sd"> :type url: :class:`proton.Url` or ``str``</span> |
| <span class="sd"> :param timeout: Connection timeout in seconds. If ``None``, defaults to 60 seconds.</span> |
| <span class="sd"> :type timeout: ``None`` or float</span> |
| <span class="sd"> :param container: Container to process the events on the connection. If ``None``,</span> |
| <span class="sd"> a new :class:`proton.Container` will be created.</span> |
| <span class="sd"> :param ssl_domain: </span> |
| <span class="sd"> :param heartbeat: A value in seconds indicating the desired frequency of</span> |
| <span class="sd"> heartbeats used to test the underlying socket is alive.</span> |
| <span class="sd"> :type heartbeat: ``float``</span> |
| <span class="sd"> :param kwargs: Container keyword arguments. See :class:`proton.reactor.Container`</span> |
| <span class="sd"> for a list of the valid kwargs.</span> |
| <span class="sd"> """</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">url</span><span class="p">,</span> <span class="n">timeout</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">container</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">ssl_domain</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">heartbeat</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">disconnected</span> <span class="o">=</span> <span class="kc">False</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">timeout</span> <span class="o">=</span> <span class="n">timeout</span> <span class="ow">or</span> <span class="mi">60</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span> <span class="o">=</span> <span class="n">container</span> <span class="ow">or</span> <span class="n">Container</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">timeout</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">timeout</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">start</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">url</span> <span class="o">=</span> <span class="n">Url</span><span class="p">(</span><span class="n">url</span><span class="p">)</span><span class="o">.</span><span class="n">defaults</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">conn</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">closing</span> <span class="o">=</span> <span class="kc">False</span> |
| <span class="n">failed</span> <span class="o">=</span> <span class="kc">True</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">conn</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">connect</span><span class="p">(</span><span class="n">url</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">,</span> <span class="n">handler</span><span class="o">=</span><span class="bp">self</span><span class="p">,</span> <span class="n">ssl_domain</span><span class="o">=</span><span class="n">ssl_domain</span><span class="p">,</span> <span class="n">reconnect</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">heartbeat</span><span class="o">=</span><span class="n">heartbeat</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">wait</span><span class="p">(</span><span class="k">lambda</span><span class="p">:</span> <span class="ow">not</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">conn</span><span class="o">.</span><span class="n">state</span> <span class="o">&</span> <span class="n">Endpoint</span><span class="o">.</span><span class="n">REMOTE_UNINIT</span><span class="p">),</span> |
| <span class="n">msg</span><span class="o">=</span><span class="s2">"Opening connection"</span><span class="p">)</span> |
| <span class="n">failed</span> <span class="o">=</span> <span class="kc">False</span> |
| <span class="k">finally</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">failed</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| |
| <div class="viewcode-block" id="BlockingConnection.create_sender"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingConnection.create_sender">[docs]</a> <span class="k">def</span> <span class="nf">create_sender</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">address</span><span class="p">,</span> <span class="n">handler</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">options</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Create a blocking sender.</span> |
| |
| <span class="sd"> :param address: Address of target node.</span> |
| <span class="sd"> :type address: ``str``</span> |
| <span class="sd"> :param handler: Event handler for this sender.</span> |
| <span class="sd"> :type handler: Any child class of :class:`proton.Handler`</span> |
| <span class="sd"> :param name: Sender name.</span> |
| <span class="sd"> :type name: ``str``</span> |
| <span class="sd"> :param options: A single option, or a list of sender options</span> |
| <span class="sd"> :type options: :class:`SenderOption` or [SenderOption, SenderOption, ...]</span> |
| <span class="sd"> :return: New blocking sender instance.</span> |
| <span class="sd"> :rtype: :class:`BlockingSender`</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="n">BlockingSender</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">create_sender</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">conn</span><span class="p">,</span> <span class="n">address</span><span class="p">,</span> <span class="n">name</span><span class="o">=</span><span class="n">name</span><span class="p">,</span> <span class="n">handler</span><span class="o">=</span><span class="n">handler</span><span class="p">,</span> |
| <span class="n">options</span><span class="o">=</span><span class="n">options</span><span class="p">))</span></div> |
| |
| <div class="viewcode-block" id="BlockingConnection.create_receiver"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingConnection.create_receiver">[docs]</a> <span class="k">def</span> <span class="nf">create_receiver</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">address</span><span class="p">,</span> <span class="n">credit</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">dynamic</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">handler</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">name</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">options</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Create a blocking receiver.</span> |
| |
| <span class="sd"> :param address: Address of source node.</span> |
| <span class="sd"> :type address: ``str``</span> |
| <span class="sd"> :param credit: Initial link flow credit. If not set, will default to 1.</span> |
| <span class="sd"> :type credit: ``int``</span> |
| <span class="sd"> :param dynamic: If ``True``, indicates dynamic creation of the receiver.</span> |
| <span class="sd"> :type dynamic: ``bool``</span> |
| <span class="sd"> :param handler: Event handler for this receiver.</span> |
| <span class="sd"> :type handler: Any child class of :class:`proton.Handler`</span> |
| <span class="sd"> :param name: Receiver name.</span> |
| <span class="sd"> :type name: ``str``</span> |
| <span class="sd"> :param options: A single option, or a list of receiver options</span> |
| <span class="sd"> :type options: :class:`ReceiverOption` or [ReceiverOption, ReceiverOption, ...]</span> |
| <span class="sd"> :return: New blocking receiver instance.</span> |
| <span class="sd"> :rtype: :class:`BlockingReceiver`</span> |
| <span class="sd"> """</span> |
| <span class="n">prefetch</span> <span class="o">=</span> <span class="n">credit</span> |
| <span class="k">if</span> <span class="n">handler</span><span class="p">:</span> |
| <span class="n">fetcher</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">if</span> <span class="n">prefetch</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">prefetch</span> <span class="o">=</span> <span class="mi">1</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">fetcher</span> <span class="o">=</span> <span class="n">Fetcher</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">credit</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">BlockingReceiver</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">create_receiver</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">conn</span><span class="p">,</span> <span class="n">address</span><span class="p">,</span> <span class="n">name</span><span class="o">=</span><span class="n">name</span><span class="p">,</span> <span class="n">dynamic</span><span class="o">=</span><span class="n">dynamic</span><span class="p">,</span> <span class="n">handler</span><span class="o">=</span><span class="n">handler</span> <span class="ow">or</span> <span class="n">fetcher</span><span class="p">,</span> |
| <span class="n">options</span><span class="o">=</span><span class="n">options</span><span class="p">),</span> <span class="n">fetcher</span><span class="p">,</span> <span class="n">credit</span><span class="o">=</span><span class="n">prefetch</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BlockingConnection.close"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingConnection.close">[docs]</a> <span class="k">def</span> <span class="nf">close</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Close the connection.</span> |
| <span class="sd"> """</span> |
| <span class="c1"># TODO: provide stronger interrupt protection on cleanup. See PEP 419</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">closing</span><span class="p">:</span> |
| <span class="k">return</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">closing</span> <span class="o">=</span> <span class="kc">True</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">errors</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">conn</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">wait</span><span class="p">(</span><span class="k">lambda</span><span class="p">:</span> <span class="ow">not</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">conn</span><span class="o">.</span><span class="n">state</span> <span class="o">&</span> <span class="n">Endpoint</span><span class="o">.</span><span class="n">REMOTE_ACTIVE</span><span class="p">),</span> |
| <span class="n">msg</span><span class="o">=</span><span class="s2">"Closing connection"</span><span class="p">)</span> |
| <span class="k">finally</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">conn</span><span class="o">.</span><span class="n">free</span><span class="p">()</span> |
| <span class="c1"># Nothing left to block on. Allow reactor to clean up.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">conn</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">global_handler</span> <span class="o">=</span> <span class="kc">None</span> <span class="c1"># break circular ref: container to cadapter.on_error</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">stop_events</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span> <span class="o">=</span> <span class="kc">None</span></div> |
| |
| <span class="k">def</span> <span class="nf">_is_closed</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn</span><span class="o">.</span><span class="n">state</span> <span class="o">&</span> <span class="p">(</span><span class="n">Endpoint</span><span class="o">.</span><span class="n">LOCAL_CLOSED</span> <span class="o">|</span> <span class="n">Endpoint</span><span class="o">.</span><span class="n">REMOTE_CLOSED</span><span class="p">)</span> |
| |
| <div class="viewcode-block" id="BlockingConnection.run"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingConnection.run">[docs]</a> <span class="k">def</span> <span class="nf">run</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages)</span> |
| <span class="sd"> """</span> |
| <span class="k">while</span> <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">process</span><span class="p">():</span> <span class="k">pass</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">process</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="BlockingConnection.wait"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingConnection.wait">[docs]</a> <span class="k">def</span> <span class="nf">wait</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">condition</span><span class="p">,</span> <span class="n">timeout</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">msg</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Process events until ``condition()`` returns ``True``.</span> |
| |
| <span class="sd"> :param condition: Condition which determines when the wait will end.</span> |
| <span class="sd"> :type condition: Function which returns ``bool``</span> |
| <span class="sd"> :param timeout: Timeout in seconds. If ``False``, the value of ``timeout`` used in the</span> |
| <span class="sd"> constructor of this object will be used. If ``None``, there is no timeout. Any other</span> |
| <span class="sd"> value is treated as a timeout in seconds.</span> |
| <span class="sd"> :type timeout: ``None``, ``False``, ``float``</span> |
| <span class="sd"> :param msg: Context message for :class:`proton.Timeout` exception</span> |
| <span class="sd"> :type msg: ``str``</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">timeout</span> <span class="ow">is</span> <span class="kc">False</span><span class="p">:</span> |
| <span class="n">timeout</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">timeout</span> |
| <span class="k">if</span> <span class="n">timeout</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">while</span> <span class="ow">not</span> <span class="n">condition</span><span class="p">()</span> <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">disconnected</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">process</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">container_timeout</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">timeout</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">timeout</span> <span class="o">=</span> <span class="n">timeout</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">deadline</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">+</span> <span class="n">timeout</span> |
| <span class="k">while</span> <span class="ow">not</span> <span class="n">condition</span><span class="p">()</span> <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">disconnected</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">process</span><span class="p">()</span> |
| <span class="k">if</span> <span class="n">deadline</span> <span class="o"><</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">():</span> |
| <span class="n">txt</span> <span class="o">=</span> <span class="s2">"Connection </span><span class="si">%s</span><span class="s2"> timed out"</span> <span class="o">%</span> <span class="bp">self</span><span class="o">.</span><span class="n">url</span> |
| <span class="k">if</span> <span class="n">msg</span><span class="p">:</span> <span class="n">txt</span> <span class="o">+=</span> <span class="s2">": "</span> <span class="o">+</span> <span class="n">msg</span> |
| <span class="k">raise</span> <span class="n">Timeout</span><span class="p">(</span><span class="n">txt</span><span class="p">)</span> |
| <span class="k">finally</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">timeout</span> <span class="o">=</span> <span class="n">container_timeout</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">disconnected</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">_is_closed</span><span class="p">():</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">conn</span><span class="o">.</span><span class="n">handler</span> <span class="o">=</span> <span class="kc">None</span> <span class="c1"># break cyclical reference</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">disconnected</span> <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">_is_closed</span><span class="p">():</span> |
| <span class="k">raise</span> <span class="n">ConnectionException</span><span class="p">(</span> |
| <span class="s2">"Connection </span><span class="si">%s</span><span class="s2"> disconnected: </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">disconnected</span><span class="p">))</span></div> |
| |
| <div class="viewcode-block" id="BlockingConnection.on_link_remote_close"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingConnection.on_link_remote_close">[docs]</a> <span class="k">def</span> <span class="nf">on_link_remote_close</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Event callback for when the remote terminus closes.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">state</span> <span class="o">&</span> <span class="n">Endpoint</span><span class="o">.</span><span class="n">LOCAL_ACTIVE</span><span class="p">:</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">closing</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">LinkDetached</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BlockingConnection.on_connection_remote_close"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.BlockingConnection.on_connection_remote_close">[docs]</a> <span class="k">def</span> <span class="nf">on_connection_remote_close</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Event callback for when the link peer closes the connection.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">state</span> <span class="o">&</span> <span class="n">Endpoint</span><span class="o">.</span><span class="n">LOCAL_ACTIVE</span><span class="p">:</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">closing</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">ConnectionClosed</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="p">)</span></div> |
| |
| <span class="k">def</span> <span class="nf">on_transport_tail_closed</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">on_transport_closed</span><span class="p">(</span><span class="n">event</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">on_transport_head_closed</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">on_transport_closed</span><span class="p">(</span><span class="n">event</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">on_transport_closed</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">disconnected</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">transport</span><span class="o">.</span><span class="n">condition</span> <span class="ow">or</span> <span class="s2">"unknown"</span></div> |
| |
| |
| <span class="k">class</span> <span class="nc">AtomicCount</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">start</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> <span class="n">step</span><span class="o">=</span><span class="mi">1</span><span class="p">):</span> |
| <span class="sd">"""Thread-safe atomic counter. Start at start, increment by step."""</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">count</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">step</span> <span class="o">=</span> <span class="n">start</span><span class="p">,</span> <span class="n">step</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">lock</span> <span class="o">=</span> <span class="n">threading</span><span class="o">.</span><span class="n">Lock</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">next</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""Get the next value"""</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">lock</span><span class="o">.</span><span class="n">acquire</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">count</span> <span class="o">+=</span> <span class="bp">self</span><span class="o">.</span><span class="n">step</span><span class="p">;</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">count</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">lock</span><span class="o">.</span><span class="n">release</span><span class="p">()</span> |
| <span class="k">return</span> <span class="n">result</span> |
| |
| |
| <div class="viewcode-block" id="SyncRequestResponse"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.SyncRequestResponse">[docs]</a><span class="k">class</span> <span class="nc">SyncRequestResponse</span><span class="p">(</span><span class="n">IncomingMessageHandler</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Implementation of the synchronous request-response (aka RPC) pattern.</span> |
| <span class="sd"> A single instance can send many requests to the same or different</span> |
| <span class="sd"> addresses.</span> |
| |
| <span class="sd"> :param connection: Connection for requests and responses.</span> |
| <span class="sd"> :type connection: :class:`BlockingConnection`</span> |
| <span class="sd"> :param address: Address for all requests. If not specified, each request</span> |
| <span class="sd"> must have the address property set. Successive messages may have</span> |
| <span class="sd"> different addresses.</span> |
| <span class="sd"> :type address: ``str`` or ``None``</span> |
| <span class="sd"> """</span> |
| |
| <span class="n">correlation_id</span> <span class="o">=</span> <span class="n">AtomicCount</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">connection</span><span class="p">,</span> <span class="n">address</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">SyncRequestResponse</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">connection</span> <span class="o">=</span> <span class="n">connection</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">address</span> <span class="o">=</span> <span class="n">address</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sender</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">create_sender</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">address</span><span class="p">)</span> |
| <span class="c1"># dynamic=true generates a unique address dynamically for this receiver.</span> |
| <span class="c1"># credit=1 because we want to receive 1 response message initially.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">receiver</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">create_receiver</span><span class="p">(</span><span class="kc">None</span><span class="p">,</span> <span class="n">dynamic</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">credit</span><span class="o">=</span><span class="mi">1</span><span class="p">,</span> <span class="n">handler</span><span class="o">=</span><span class="bp">self</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">response</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <div class="viewcode-block" id="SyncRequestResponse.call"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.SyncRequestResponse.call">[docs]</a> <span class="k">def</span> <span class="nf">call</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">request</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Send a request message, wait for and return the response message.</span> |
| |
| <span class="sd"> :param request: Request message. If ``self.address`` is not set the</span> |
| <span class="sd"> request message address must be set and will be used.</span> |
| <span class="sd"> :type request: :class:`proton.Message`</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">address</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">request</span><span class="o">.</span><span class="n">address</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"Request message has no address: </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="n">request</span><span class="p">)</span> |
| <span class="n">request</span><span class="o">.</span><span class="n">reply_to</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">reply_to</span> |
| <span class="n">request</span><span class="o">.</span><span class="n">correlation_id</span> <span class="o">=</span> <span class="n">correlation_id</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">correlation_id</span><span class="o">.</span><span class="n">next</span><span class="p">())</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sender</span><span class="o">.</span><span class="n">send</span><span class="p">(</span><span class="n">request</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">wakeup</span><span class="p">():</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">response</span> <span class="ow">and</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">response</span><span class="o">.</span><span class="n">correlation_id</span> <span class="o">==</span> <span class="n">correlation_id</span><span class="p">)</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">wait</span><span class="p">(</span><span class="n">wakeup</span><span class="p">,</span> <span class="n">msg</span><span class="o">=</span><span class="s2">"Waiting for response"</span><span class="p">)</span> |
| <span class="n">response</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">response</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">response</span> <span class="o">=</span> <span class="kc">None</span> <span class="c1"># Ready for next response.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">receiver</span><span class="o">.</span><span class="n">flow</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span> <span class="c1"># Set up credit for the next response.</span> |
| <span class="k">return</span> <span class="n">response</span></div> |
| |
| <span class="nd">@property</span> |
| <span class="k">def</span> <span class="nf">reply_to</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> The dynamic address of our receiver.</span> |
| |
| <span class="sd"> :type: ``str``</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">receiver</span><span class="o">.</span><span class="n">remote_source</span><span class="o">.</span><span class="n">address</span> |
| |
| <div class="viewcode-block" id="SyncRequestResponse.on_message"><a class="viewcode-back" href="../../proton.utils.html#proton.utils.SyncRequestResponse.on_message">[docs]</a> <span class="k">def</span> <span class="nf">on_message</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Called when we receive a message for our receiver.</span> |
| |
| <span class="sd"> :param event: The event which occurs when a message is received.</span> |
| <span class="sd"> :type event: :class:`proton.Event`</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">response</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">message</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">yield_</span><span class="p">()</span> <span class="c1"># Wake up the wait() loop to handle the message.</span></div></div> |
| </pre></div> |
| |
| <div class="clearer"></div> |
| </div> |
| </div> |
| </div> |
| <div class="sphinxsidebar" role="navigation" aria-label="main navigation"> |
| <div class="sphinxsidebarwrapper"> |
| <div id="searchbox" style="display: none" role="search"> |
| <h3 id="searchlabel">Quick search</h3> |
| <div class="searchformwrapper"> |
| <form class="search" action="../../search.html" method="get"> |
| <input type="text" name="q" aria-labelledby="searchlabel" /> |
| <input type="submit" value="Go" /> |
| </form> |
| </div> |
| </div> |
| <script>$('#searchbox').show(0);</script> |
| </div> |
| </div> |
| <div class="clearer"></div> |
| </div> |
| <div class="related" role="navigation" aria-label="related navigation"> |
| <h3>Navigation</h3> |
| <ul> |
| <li class="right" style="margin-right: 10px"> |
| <a href="../../genindex.html" title="General Index" |
| >index</a></li> |
| <li class="nav-item nav-item-0"><a href="../../index.html">Qpid Proton Python API 0.32.0 documentation</a> »</li> |
| <li class="nav-item nav-item-1"><a href="../index.html" >Module code</a> »</li> |
| <li class="nav-item nav-item-this"><a href="">proton._utils</a></li> |
| </ul> |
| </div> |
| <div class="footer" role="contentinfo"> |
| © Copyright 2019, Apache Qpid Contributors. |
| Created using <a href="https://www.sphinx-doc.org/">Sphinx</a> 3.2.1. |
| </div> |
| </body> |
| </html> |