| |
| <!DOCTYPE html> |
| |
| <html> |
| <head> |
| <meta charset="utf-8" /> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0" /> |
| <title>Tutorial — Qpid Proton Python API 0.32.0 documentation</title> |
| <link rel="stylesheet" href="_static/sphinxdoc.css" type="text/css" /> |
| <link rel="stylesheet" href="_static/pygments.css" type="text/css" /> |
| <script id="documentation_options" data-url_root="./" src="_static/documentation_options.js"></script> |
| <script src="_static/jquery.js"></script> |
| <script src="_static/underscore.js"></script> |
| <script src="_static/doctools.js"></script> |
| <script src="_static/language_data.js"></script> |
| <script async="async" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.7/latest.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| <link rel="index" title="Index" href="genindex.html" /> |
| <link rel="search" title="Search" href="search.html" /> |
| <link rel="prev" title="AMQP Types" href="types.html" /> |
| </head><body> |
| <div class="related" role="navigation" aria-label="related navigation"> |
| <h3>Navigation</h3> |
| <ul> |
| <li class="right" style="margin-right: 10px"> |
| <a href="genindex.html" title="General Index" |
| accesskey="I">index</a></li> |
| <li class="right" > |
| <a href="types.html" title="AMQP Types" |
| accesskey="P">previous</a> |</li> |
| <li class="nav-item nav-item-0"><a href="index.html">Qpid Proton Python API 0.32.0 documentation</a> »</li> |
| <li class="nav-item nav-item-this"><a href="">Tutorial</a></li> |
| </ul> |
| </div> |
| |
| <div class="document"> |
| <div class="documentwrapper"> |
| <div class="bodywrapper"> |
| <div class="body" role="main"> |
| |
| <div class="section" id="tutorial"> |
| <span id="id1"></span><h1>Tutorial<a class="headerlink" href="#tutorial" title="Permalink to this headline">¶</a></h1> |
| <div class="section" id="hello-world"> |
| <h2>Hello World!<a class="headerlink" href="#hello-world" title="Permalink to this headline">¶</a></h2> |
| <p>Tradition dictates that we start with hello world! However rather than |
| simply striving for the shortest program possible, we’ll aim for a |
| more illustrative example while still restricting the functionality to |
| sending and receiving a single message.</p> |
| <div class="highlight-default notranslate"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre> 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
| 11 |
| 12 |
| 13 |
| 14 |
| 15 |
| 16 |
| 17 |
| 18 |
| 19 |
| 20 |
| 21 |
| 22 |
| 23 |
| 24 |
| 25</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">__future__</span> <span class="kn">import</span> <span class="n">print_function</span><span class="p">,</span> <span class="n">unicode_literals</span> |
| <span class="kn">from</span> <span class="nn">proton</span> <span class="kn">import</span> <span class="n">Message</span> |
| <span class="kn">from</span> <span class="nn">proton.handlers</span> <span class="kn">import</span> <span class="n">MessagingHandler</span> |
| <span class="kn">from</span> <span class="nn">proton.reactor</span> <span class="kn">import</span> <span class="n">Container</span> |
| |
| <span class="k">class</span> <span class="nc">HelloWorld</span><span class="p">(</span><span class="n">MessagingHandler</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">server</span><span class="p">,</span> <span class="n">address</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">HelloWorld</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">server</span> <span class="o">=</span> <span class="n">server</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">address</span> <span class="o">=</span> <span class="n">address</span> |
| |
| <span class="k">def</span> <span class="nf">on_start</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="n">conn</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">connect</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">server</span><span class="p">)</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">create_receiver</span><span class="p">(</span><span class="n">conn</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">address</span><span class="p">)</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">create_sender</span><span class="p">(</span><span class="n">conn</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">address</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">on_sendable</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">sender</span><span class="o">.</span><span class="n">send</span><span class="p">(</span><span class="n">Message</span><span class="p">(</span><span class="n">body</span><span class="o">=</span><span class="s2">"Hello World!"</span><span class="p">))</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">sender</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">on_message</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="nb">print</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">body</span><span class="p">)</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| |
| <span class="n">Container</span><span class="p">(</span><span class="n">HelloWorld</span><span class="p">(</span><span class="s2">"localhost:5672"</span><span class="p">,</span> <span class="s2">"examples"</span><span class="p">))</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> |
| </pre></div> |
| </td></tr></table></div> |
| <p>You can see the import of <a class="reference internal" href="proton.reactor.html#proton.reactor.Container" title="proton.reactor.Container"><code class="xref py py-class docutils literal notranslate"><span class="pre">Container</span></code></a> from <code class="docutils literal notranslate"><span class="pre">proton.reactor</span></code> on the |
| second line. This is a class that makes programming with proton a |
| little easier for the common cases. It includes within it an event |
| loop, and programs written using this utility are generally structured |
| to react to various events. This reactive style is particularly suited |
| to messaging applications.</p> |
| <p>To be notified of a particular event, you define a class with the |
| appropriately named method on it. That method is then called by the |
| event loop when the event occurs.</p> |
| <p>We define a class here, <code class="docutils literal notranslate"><span class="pre">HelloWorld</span></code>, which handles the key events of |
| interest in sending and receiving a message.</p> |
| <p>The <code class="docutils literal notranslate"><span class="pre">on_start()</span></code> method is called when the event loop first |
| starts. We handle that by establishing our connection (line 13), a |
| sender over which to send the message (line 15) and a receiver over |
| which to receive it back again (line 14).</p> |
| <p>The <code class="docutils literal notranslate"><span class="pre">on_sendable()</span></code> method is called when message can be transferred |
| over the associated sender link to the remote peer. We send out our |
| <code class="docutils literal notranslate"><span class="pre">Hello</span> <span class="pre">World!</span></code> message (line 18), then close the sender (line 19) as |
| we only want to send one message. The closing of the sender will |
| prevent further calls to <code class="docutils literal notranslate"><span class="pre">on_sendable()</span></code>.</p> |
| <p>The <code class="docutils literal notranslate"><span class="pre">on_message()</span></code> method is called when a message is |
| received. Within that we simply print the body of the message (line |
| 22) and then close the connection (line 23).</p> |
| <p>Now that we have defined the logic for handling these events, we |
| create an instance of a <a class="reference internal" href="proton.reactor.html#proton.reactor.Container" title="proton.reactor.Container"><code class="xref py py-class docutils literal notranslate"><span class="pre">Container</span></code></a>, pass it |
| our handler and then enter the event loop by calling |
| <a class="reference internal" href="proton.reactor.html#proton.reactor.Container.run" title="proton.reactor.Container.run"><code class="xref py py-meth docutils literal notranslate"><span class="pre">run()</span></code></a>. At this point, control |
| passes to the container instance, which will make the appropriate |
| callbacks to any defined handlers.</p> |
| <p>To run the example, you will need to have a broker (or similar) |
| accepting connections on that url either with a queue (or topic) |
| matching the given address or else configured to create such a queue |
| (or topic) dynamically. There is a simple broker.py script included |
| alongside the examples that can be used for this purpose if |
| desired. (It is also written using the API described here, and as such |
| gives an example of a slightly more involved application).</p> |
| </div> |
| <div class="section" id="hello-world-direct"> |
| <h2>Hello World, Direct!<a class="headerlink" href="#hello-world-direct" title="Permalink to this headline">¶</a></h2> |
| <p>Though often used in conjunction with a broker, AMQP does not |
| <em>require</em> this. It also allows senders and receivers to communicate |
| directly if desired.</p> |
| <p>Let’s modify our example to demonstrate this.</p> |
| <div class="highlight-default notranslate"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre> 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
| 11 |
| 12 |
| 13 |
| 14 |
| 15 |
| 16 |
| 17 |
| 18 |
| 19 |
| 20 |
| 21 |
| 22 |
| 23 |
| 24 |
| 25 |
| 26 |
| 27 |
| 28</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">__future__</span> <span class="kn">import</span> <span class="n">print_function</span><span class="p">,</span> <span class="n">unicode_literals</span> |
| <span class="kn">from</span> <span class="nn">proton</span> <span class="kn">import</span> <span class="n">Message</span> |
| <span class="kn">from</span> <span class="nn">proton.handlers</span> <span class="kn">import</span> <span class="n">MessagingHandler</span> |
| <span class="kn">from</span> <span class="nn">proton.reactor</span> <span class="kn">import</span> <span class="n">Container</span> |
| |
| <span class="k">class</span> <span class="nc">HelloWorld</span><span class="p">(</span><span class="n">MessagingHandler</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">url</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">HelloWorld</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">url</span> <span class="o">=</span> <span class="n">url</span> |
| |
| <span class="k">def</span> <span class="nf">on_start</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="hll"> <span class="bp">self</span><span class="o">.</span><span class="n">acceptor</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">listen</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">)</span> |
| </span> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">create_sender</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">on_sendable</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">sender</span><span class="o">.</span><span class="n">send</span><span class="p">(</span><span class="n">Message</span><span class="p">(</span><span class="n">body</span><span class="o">=</span><span class="s2">"Hello World!"</span><span class="p">))</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">sender</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">on_message</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="nb">print</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">body</span><span class="p">)</span> |
| |
| <span class="hll"> <span class="k">def</span> <span class="nf">on_accepted</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| </span><span class="hll"> <span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| </span> |
| <span class="hll"> <span class="k">def</span> <span class="nf">on_connection_closed</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| </span><span class="hll"> <span class="bp">self</span><span class="o">.</span><span class="n">acceptor</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| </span> |
| <span class="n">Container</span><span class="p">(</span><span class="n">HelloWorld</span><span class="p">(</span><span class="s2">"localhost:8888/examples"</span><span class="p">))</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> |
| </pre></div> |
| </td></tr></table></div> |
| <p>The first difference, on line 12, is that rather than creating a |
| receiver on the same connection as our sender, we listen for incoming |
| connections by invoking the |
| <a class="reference internal" href="proton.reactor.html#proton.reactor.Container.listen" title="proton.reactor.Container.listen"><code class="xref py py-meth docutils literal notranslate"><span class="pre">listen()</span></code></a> method on the |
| container.</p> |
| <p>As we only need then to initiate one link, the sender, we can do that |
| by passing in a url rather than an existing connection, and the |
| connection will also be automatically established for us.</p> |
| <p>We send the message in response to the <code class="docutils literal notranslate"><span class="pre">on_sendable()</span></code> callback and |
| print the message out in response to the <code class="docutils literal notranslate"><span class="pre">on_message()</span></code> callback |
| exactly as before.</p> |
| <p>However we also handle two new events. We now close the connection |
| from the senders side once the message has been accepted (line |
| 23). The acceptance of the message is an indication of successful |
| transfer to the peer. We are notified of that event through the |
| <code class="docutils literal notranslate"><span class="pre">on_accepted()</span></code> callback. Then, once the connection has been closed, |
| of which we are notified through the <code class="docutils literal notranslate"><span class="pre">on_closed()</span></code> callback, we stop |
| accepting incoming connections (line 26) at which point there is no |
| work to be done and the event loop exits, and the run() method will |
| return.</p> |
| <p>So now we have our example working without a broker involved!</p> |
| </div> |
| <div class="section" id="asynchronous-send-and-receive"> |
| <h2>Asynchronous Send and Receive<a class="headerlink" href="#asynchronous-send-and-receive" title="Permalink to this headline">¶</a></h2> |
| <p>Of course, these <code class="docutils literal notranslate"><span class="pre">HelloWorld!</span></code> examples are very artificial, |
| communicating as they do over a network connection but with the same |
| process. A more realistic example involves communication between |
| separate processes (which could indeed be running on completely |
| separate machines).</p> |
| <p>Let’s separate the sender from the receiver, and let’s transfer more than |
| a single message between them.</p> |
| <p>We’ll start with a simple sender.</p> |
| <div class="highlight-default notranslate"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre> 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
| 11 |
| 12 |
| 13 |
| 14 |
| 15 |
| 16 |
| 17 |
| 18 |
| 19 |
| 20 |
| 21 |
| 22 |
| 23 |
| 24 |
| 25 |
| 26 |
| 27 |
| 28 |
| 29 |
| 30 |
| 31 |
| 32 |
| 33 |
| 34 |
| 35 |
| 36 |
| 37 |
| 38 |
| 39 |
| 40 |
| 41 |
| 42 |
| 43</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">__future__</span> <span class="kn">import</span> <span class="n">print_function</span><span class="p">,</span> <span class="n">unicode_literals</span> |
| <span class="kn">import</span> <span class="nn">optparse</span> |
| <span class="kn">from</span> <span class="nn">proton</span> <span class="kn">import</span> <span class="n">Message</span> |
| <span class="kn">from</span> <span class="nn">proton.handlers</span> <span class="kn">import</span> <span class="n">MessagingHandler</span> |
| <span class="kn">from</span> <span class="nn">proton.reactor</span> <span class="kn">import</span> <span class="n">Container</span> |
| |
| <span class="k">class</span> <span class="nc">Send</span><span class="p">(</span><span class="n">MessagingHandler</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">url</span><span class="p">,</span> <span class="n">messages</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">Send</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">url</span> <span class="o">=</span> <span class="n">url</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sent</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">confirmed</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">total</span> <span class="o">=</span> <span class="n">messages</span> |
| |
| <span class="k">def</span> <span class="nf">on_start</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">create_sender</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">on_sendable</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="k">while</span> <span class="n">event</span><span class="o">.</span><span class="n">sender</span><span class="o">.</span><span class="n">credit</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">sent</span> <span class="o"><</span> <span class="bp">self</span><span class="o">.</span><span class="n">total</span><span class="p">:</span> |
| <span class="n">msg</span> <span class="o">=</span> <span class="n">Message</span><span class="p">(</span><span class="nb">id</span><span class="o">=</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sent</span><span class="o">+</span><span class="mi">1</span><span class="p">),</span> <span class="n">body</span><span class="o">=</span><span class="p">{</span><span class="s1">'sequence'</span><span class="p">:(</span><span class="bp">self</span><span class="o">.</span><span class="n">sent</span><span class="o">+</span><span class="mi">1</span><span class="p">)})</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">sender</span><span class="o">.</span><span class="n">send</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sent</span> <span class="o">+=</span> <span class="mi">1</span> |
| |
| <span class="k">def</span> <span class="nf">on_accepted</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">confirmed</span> <span class="o">+=</span> <span class="mi">1</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">confirmed</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">total</span><span class="p">:</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"all messages confirmed"</span><span class="p">)</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">on_disconnected</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sent</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">confirmed</span> |
| |
| <span class="n">parser</span> <span class="o">=</span> <span class="n">optparse</span><span class="o">.</span><span class="n">OptionParser</span><span class="p">(</span><span class="n">usage</span><span class="o">=</span><span class="s2">"usage: %prog [options]"</span><span class="p">,</span> |
| <span class="n">description</span><span class="o">=</span><span class="s2">"Send messages to the supplied address."</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_option</span><span class="p">(</span><span class="s2">"-a"</span><span class="p">,</span> <span class="s2">"--address"</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="s2">"localhost:5672/examples"</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s2">"address to which messages are sent (default </span><span class="si">%d</span><span class="s2">efault)"</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_option</span><span class="p">(</span><span class="s2">"-m"</span><span class="p">,</span> <span class="s2">"--messages"</span><span class="p">,</span> <span class="nb">type</span><span class="o">=</span><span class="s2">"int"</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="mi">100</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s2">"number of messages to send (default </span><span class="si">%d</span><span class="s2">efault)"</span><span class="p">)</span> |
| <span class="n">opts</span><span class="p">,</span> <span class="n">args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_args</span><span class="p">()</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">Container</span><span class="p">(</span><span class="n">Send</span><span class="p">(</span><span class="n">opts</span><span class="o">.</span><span class="n">address</span><span class="p">,</span> <span class="n">opts</span><span class="o">.</span><span class="n">messages</span><span class="p">))</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> |
| <span class="k">except</span> <span class="ne">KeyboardInterrupt</span><span class="p">:</span> <span class="k">pass</span> |
| </pre></div> |
| </td></tr></table></div> |
| <p>As with the previous example, we define the application logic in a |
| class that handles various events. As before, we use the |
| <code class="docutils literal notranslate"><span class="pre">on_start()</span></code> event to establish our sender link over which we will |
| transfer messages and the <code class="docutils literal notranslate"><span class="pre">on_sendable()</span></code> event to know when we can |
| transfer our messages.</p> |
| <p>Because we are transferring more than one message, we need to keep |
| track of how many we have sent. We’ll use a <code class="docutils literal notranslate"><span class="pre">sent</span></code> member variable |
| for that. The <code class="docutils literal notranslate"><span class="pre">total</span></code> member variable will hold the number of |
| messages we want to send.</p> |
| <p>AMQP defines a credit-based flow control mechanism. Flow control |
| allows the receiver to control how many messages it is prepared to |
| receive at a given time and thus prevents any component being |
| overwhelmed by the number of messages it is sent.</p> |
| <p>In the <code class="docutils literal notranslate"><span class="pre">on_sendable()</span></code> callback, we check that our sender has credit |
| before sending messages. We also check that we haven’t already sent |
| the required number of messages.</p> |
| <p>The <code class="docutils literal notranslate"><span class="pre">send()</span></code> call on line 21 is of course asynchronous. When it |
| returns, the message has not yet actually been transferred across the |
| network to the receiver. By handling the <code class="docutils literal notranslate"><span class="pre">on_accepted()</span></code> event, we |
| can get notified when the receiver has received and accepted the |
| message. In our example we use this event to track the confirmation of |
| the messages we have sent. We only close the connection and exit when |
| the receiver has received all the messages we wanted to send.</p> |
| <p>If we are disconnected after a message is sent and before it has been |
| confirmed by the receiver, it is said to be <code class="docutils literal notranslate"><span class="pre">in</span> <span class="pre">doubt</span></code>. We don’t |
| know whether or not it was received. In this example, we will handle |
| that by resending any in-doubt messages. This is known as an |
| ‘at-least-once’ guarantee, since each message should eventually be |
| received at least once, though a given message may be received more |
| than once (i.e. duplicates are possible). In the <code class="docutils literal notranslate"><span class="pre">on_disconnected()</span></code> |
| callback, we reset the sent count to reflect only those that have been |
| confirmed. The library will automatically try to reconnect for us, and |
| when our sender is sendable again, we can restart from the point we |
| know the receiver got to.</p> |
| <p>Now let’s look at the corresponding receiver:</p> |
| <div class="highlight-default notranslate"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre> 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
| 11 |
| 12 |
| 13 |
| 14 |
| 15 |
| 16 |
| 17 |
| 18 |
| 19 |
| 20 |
| 21 |
| 22 |
| 23 |
| 24 |
| 25 |
| 26 |
| 27 |
| 28 |
| 29 |
| 30 |
| 31 |
| 32 |
| 33 |
| 34 |
| 35 |
| 36 |
| 37 |
| 38 |
| 39</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">__future__</span> <span class="kn">import</span> <span class="n">print_function</span> |
| <span class="kn">import</span> <span class="nn">optparse</span> |
| <span class="kn">from</span> <span class="nn">proton.handlers</span> <span class="kn">import</span> <span class="n">MessagingHandler</span> |
| <span class="kn">from</span> <span class="nn">proton.reactor</span> <span class="kn">import</span> <span class="n">Container</span> |
| |
| <span class="k">class</span> <span class="nc">Recv</span><span class="p">(</span><span class="n">MessagingHandler</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">url</span><span class="p">,</span> <span class="n">count</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">Recv</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">url</span> <span class="o">=</span> <span class="n">url</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">expected</span> <span class="o">=</span> <span class="n">count</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">received</span> <span class="o">=</span> <span class="mi">0</span> |
| |
| <span class="k">def</span> <span class="nf">on_start</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">create_receiver</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">on_message</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">id</span> <span class="ow">and</span> <span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">id</span> <span class="o"><</span> <span class="bp">self</span><span class="o">.</span><span class="n">received</span><span class="p">:</span> |
| <span class="c1"># ignore duplicate message</span> |
| <span class="k">return</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">expected</span> <span class="o">==</span> <span class="mi">0</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">received</span> <span class="o"><</span> <span class="bp">self</span><span class="o">.</span><span class="n">expected</span><span class="p">:</span> |
| <span class="nb">print</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">body</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">received</span> <span class="o">+=</span> <span class="mi">1</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">received</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">expected</span><span class="p">:</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">receiver</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| |
| <span class="n">parser</span> <span class="o">=</span> <span class="n">optparse</span><span class="o">.</span><span class="n">OptionParser</span><span class="p">(</span><span class="n">usage</span><span class="o">=</span><span class="s2">"usage: %prog [options]"</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_option</span><span class="p">(</span><span class="s2">"-a"</span><span class="p">,</span> <span class="s2">"--address"</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="s2">"localhost:5672/examples"</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s2">"address from which messages are received (default </span><span class="si">%d</span><span class="s2">efault)"</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_option</span><span class="p">(</span><span class="s2">"-m"</span><span class="p">,</span> <span class="s2">"--messages"</span><span class="p">,</span> <span class="nb">type</span><span class="o">=</span><span class="s2">"int"</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="mi">100</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s2">"number of messages to receive; 0 receives indefinitely (default </span><span class="si">%d</span><span class="s2">efault)"</span><span class="p">)</span> |
| <span class="n">opts</span><span class="p">,</span> <span class="n">args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_args</span><span class="p">()</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">Container</span><span class="p">(</span><span class="n">Recv</span><span class="p">(</span><span class="n">opts</span><span class="o">.</span><span class="n">address</span><span class="p">,</span> <span class="n">opts</span><span class="o">.</span><span class="n">messages</span><span class="p">))</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> |
| <span class="k">except</span> <span class="ne">KeyboardInterrupt</span><span class="p">:</span> <span class="k">pass</span> |
| |
| |
| |
| </pre></div> |
| </td></tr></table></div> |
| <p>Here we handle the <code class="docutils literal notranslate"><span class="pre">on_start()</span></code> by creating our receiver, much like |
| we did for the sender. We also handle the <code class="docutils literal notranslate"><span class="pre">on_message()</span></code> event for |
| received messages and print the message out as in the <code class="docutils literal notranslate"><span class="pre">Hello</span> <span class="pre">World!</span></code> |
| examples. However, we add some logic to allow the receiver to wait for |
| a given number of messages, then close the connection and exit. We |
| also add some logic to check for and ignore duplicates, using a simple |
| sequential id scheme.</p> |
| <p>Again, though sending between these two examples requires some sort of |
| intermediary process (e.g. a broker), AMQP allows us to send messages |
| directly between two processes without this if we so wish. In that |
| case, one of the processes needs to accept incoming socket connections. |
| Let’s create a modified version of the receiving example that does this:</p> |
| <div class="highlight-default notranslate"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre> 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
| 11 |
| 12 |
| 13 |
| 14 |
| 15 |
| 16 |
| 17 |
| 18 |
| 19 |
| 20 |
| 21 |
| 22 |
| 23 |
| 24 |
| 25 |
| 26 |
| 27 |
| 28 |
| 29 |
| 30 |
| 31 |
| 32 |
| 33 |
| 34 |
| 35 |
| 36 |
| 37 |
| 38 |
| 39 |
| 40</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">__future__</span> <span class="kn">import</span> <span class="n">print_function</span> |
| <span class="kn">import</span> <span class="nn">optparse</span> |
| <span class="kn">from</span> <span class="nn">proton.handlers</span> <span class="kn">import</span> <span class="n">MessagingHandler</span> |
| <span class="kn">from</span> <span class="nn">proton.reactor</span> <span class="kn">import</span> <span class="n">Container</span> |
| |
| <span class="k">class</span> <span class="nc">Recv</span><span class="p">(</span><span class="n">MessagingHandler</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">url</span><span class="p">,</span> <span class="n">count</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">Recv</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">url</span> <span class="o">=</span> <span class="n">url</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">expected</span> <span class="o">=</span> <span class="n">count</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">received</span> <span class="o">=</span> <span class="mi">0</span> |
| |
| <span class="k">def</span> <span class="nf">on_start</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="hll"> <span class="bp">self</span><span class="o">.</span><span class="n">acceptor</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">listen</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">)</span> |
| </span> |
| <span class="k">def</span> <span class="nf">on_message</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">id</span> <span class="ow">and</span> <span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">id</span> <span class="o"><</span> <span class="bp">self</span><span class="o">.</span><span class="n">received</span><span class="p">:</span> |
| <span class="c1"># ignore duplicate message</span> |
| <span class="k">return</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">expected</span> <span class="o">==</span> <span class="mi">0</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">received</span> <span class="o"><</span> <span class="bp">self</span><span class="o">.</span><span class="n">expected</span><span class="p">:</span> |
| <span class="nb">print</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">body</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">received</span> <span class="o">+=</span> <span class="mi">1</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">received</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">expected</span><span class="p">:</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">receiver</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="hll"> <span class="bp">self</span><span class="o">.</span><span class="n">acceptor</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| </span> |
| <span class="n">parser</span> <span class="o">=</span> <span class="n">optparse</span><span class="o">.</span><span class="n">OptionParser</span><span class="p">(</span><span class="n">usage</span><span class="o">=</span><span class="s2">"usage: %prog [options]"</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_option</span><span class="p">(</span><span class="s2">"-a"</span><span class="p">,</span> <span class="s2">"--address"</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="s2">"localhost:5672/examples"</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s2">"address from which messages are received (default </span><span class="si">%d</span><span class="s2">efault)"</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_option</span><span class="p">(</span><span class="s2">"-m"</span><span class="p">,</span> <span class="s2">"--messages"</span><span class="p">,</span> <span class="nb">type</span><span class="o">=</span><span class="s2">"int"</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="mi">100</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s2">"number of messages to receive; 0 receives indefinitely (default </span><span class="si">%d</span><span class="s2">efault)"</span><span class="p">)</span> |
| <span class="n">opts</span><span class="p">,</span> <span class="n">args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_args</span><span class="p">()</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">Container</span><span class="p">(</span><span class="n">Recv</span><span class="p">(</span><span class="n">opts</span><span class="o">.</span><span class="n">address</span><span class="p">,</span> <span class="n">opts</span><span class="o">.</span><span class="n">messages</span><span class="p">))</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> |
| <span class="k">except</span> <span class="ne">KeyboardInterrupt</span><span class="p">:</span> <span class="k">pass</span> |
| |
| |
| |
| </pre></div> |
| </td></tr></table></div> |
| <p>There are only two differences here. On line 14, instead of initiating |
| a link (and implicitly a connection), we listen for incoming |
| connections. On line 26, when we have received all the expected |
| messages, we then stop listening for incoming connections by closing |
| the listener object.</p> |
| <p>You can use the original send example now to send to this receiver |
| directly. (Note: you will need to stop any broker that is listening on |
| the 5672 port, or else change the port used by specifying a different |
| address to each example via the -a command line switch).</p> |
| <p>We could also modify the original sender to allow the original |
| receiver to connect to it. Again, that just requires two modifications:</p> |
| <div class="highlight-default notranslate"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre> 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
| 11 |
| 12 |
| 13 |
| 14 |
| 15 |
| 16 |
| 17 |
| 18 |
| 19 |
| 20 |
| 21 |
| 22 |
| 23 |
| 24 |
| 25 |
| 26 |
| 27 |
| 28 |
| 29 |
| 30 |
| 31 |
| 32 |
| 33 |
| 34 |
| 35 |
| 36 |
| 37 |
| 38 |
| 39 |
| 40 |
| 41 |
| 42 |
| 43 |
| 44</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">__future__</span> <span class="kn">import</span> <span class="n">print_function</span><span class="p">,</span> <span class="n">unicode_literals</span> |
| <span class="kn">import</span> <span class="nn">optparse</span> |
| <span class="kn">from</span> <span class="nn">proton</span> <span class="kn">import</span> <span class="n">Message</span> |
| <span class="kn">from</span> <span class="nn">proton.handlers</span> <span class="kn">import</span> <span class="n">MessagingHandler</span> |
| <span class="kn">from</span> <span class="nn">proton.reactor</span> <span class="kn">import</span> <span class="n">Container</span> |
| |
| <span class="k">class</span> <span class="nc">Send</span><span class="p">(</span><span class="n">MessagingHandler</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">url</span><span class="p">,</span> <span class="n">messages</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">Send</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">url</span> <span class="o">=</span> <span class="n">url</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sent</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">confirmed</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">total</span> <span class="o">=</span> <span class="n">messages</span> |
| |
| <span class="k">def</span> <span class="nf">on_start</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="hll"> <span class="bp">self</span><span class="o">.</span><span class="n">acceptor</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">listen</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">)</span> |
| </span> |
| <span class="k">def</span> <span class="nf">on_sendable</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="k">while</span> <span class="n">event</span><span class="o">.</span><span class="n">sender</span><span class="o">.</span><span class="n">credit</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">sent</span> <span class="o"><</span> <span class="bp">self</span><span class="o">.</span><span class="n">total</span><span class="p">:</span> |
| <span class="n">msg</span> <span class="o">=</span> <span class="n">Message</span><span class="p">(</span><span class="nb">id</span><span class="o">=</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sent</span><span class="o">+</span><span class="mi">1</span><span class="p">),</span> <span class="n">body</span><span class="o">=</span><span class="p">{</span><span class="s1">'sequence'</span><span class="p">:(</span><span class="bp">self</span><span class="o">.</span><span class="n">sent</span><span class="o">+</span><span class="mi">1</span><span class="p">)})</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">sender</span><span class="o">.</span><span class="n">send</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sent</span> <span class="o">+=</span> <span class="mi">1</span> |
| |
| <span class="k">def</span> <span class="nf">on_accepted</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">confirmed</span> <span class="o">+=</span> <span class="mi">1</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">confirmed</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">total</span><span class="p">:</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"all messages confirmed"</span><span class="p">)</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="hll"> <span class="bp">self</span><span class="o">.</span><span class="n">acceptor</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| </span> |
| <span class="k">def</span> <span class="nf">on_disconnected</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sent</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">confirmed</span> |
| |
| <span class="n">parser</span> <span class="o">=</span> <span class="n">optparse</span><span class="o">.</span><span class="n">OptionParser</span><span class="p">(</span><span class="n">usage</span><span class="o">=</span><span class="s2">"usage: %prog [options]"</span><span class="p">,</span> |
| <span class="n">description</span><span class="o">=</span><span class="s2">"Send messages to the supplied address."</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_option</span><span class="p">(</span><span class="s2">"-a"</span><span class="p">,</span> <span class="s2">"--address"</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="s2">"localhost:5672/examples"</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s2">"address to which messages are sent (default </span><span class="si">%d</span><span class="s2">efault)"</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_option</span><span class="p">(</span><span class="s2">"-m"</span><span class="p">,</span> <span class="s2">"--messages"</span><span class="p">,</span> <span class="nb">type</span><span class="o">=</span><span class="s2">"int"</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="mi">100</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s2">"number of messages to send (default </span><span class="si">%d</span><span class="s2">efault)"</span><span class="p">)</span> |
| <span class="n">opts</span><span class="p">,</span> <span class="n">args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_args</span><span class="p">()</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">Container</span><span class="p">(</span><span class="n">Send</span><span class="p">(</span><span class="n">opts</span><span class="o">.</span><span class="n">address</span><span class="p">,</span> <span class="n">opts</span><span class="o">.</span><span class="n">messages</span><span class="p">))</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> |
| <span class="k">except</span> <span class="ne">KeyboardInterrupt</span><span class="p">:</span> <span class="k">pass</span> |
| </pre></div> |
| </td></tr></table></div> |
| <p>As with the modified receiver, instead of initiating establishment of |
| a link, we listen for incoming connections on line 16 and then on line |
| 29, when we have received confirmation of all the messages we sent, we |
| can close the listener in order to exit. The symmetry in the |
| underlying AMQP that enables this is quite unique and elegant, and in |
| reflecting this the proton API provides a flexible toolkit for |
| implementing all sorts of interesting intermediaries (the broker.py |
| script provided as a simple broker for testing purposes provides an |
| example of this).</p> |
| <p>To try this modified sender, run the original receiver against it.</p> |
| </div> |
| <div class="section" id="request-response"> |
| <h2>Request/Response<a class="headerlink" href="#request-response" title="Permalink to this headline">¶</a></h2> |
| <p>A common pattern is to send a request message and expect a response |
| message in return. AMQP has special support for this pattern. Let’s |
| have a look at a simple example. We’ll start with the ‘server’, |
| i.e. the program that will process the request and send the |
| response. Note that we are still using a broker in this example.</p> |
| <p>Our server will provide a very simple service: it will respond with |
| the body of the request converted to uppercase.</p> |
| <div class="highlight-default notranslate"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre> 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
| 11 |
| 12 |
| 13 |
| 14 |
| 15 |
| 16 |
| 17 |
| 18 |
| 19 |
| 20 |
| 21 |
| 22 |
| 23 |
| 24 |
| 25 |
| 26 |
| 27 |
| 28 |
| 29 |
| 30 |
| 31 |
| 32 |
| 33 |
| 34 |
| 35 |
| 36 |
| 37</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">__future__</span> <span class="kn">import</span> <span class="n">print_function</span> |
| <span class="kn">import</span> <span class="nn">optparse</span> |
| <span class="kn">from</span> <span class="nn">proton</span> <span class="kn">import</span> <span class="n">Message</span><span class="p">,</span> <span class="n">Url</span> |
| <span class="kn">from</span> <span class="nn">proton.handlers</span> <span class="kn">import</span> <span class="n">MessagingHandler</span> |
| <span class="kn">from</span> <span class="nn">proton.reactor</span> <span class="kn">import</span> <span class="n">Container</span> |
| |
| <span class="k">class</span> <span class="nc">Server</span><span class="p">(</span><span class="n">MessagingHandler</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">url</span><span class="p">,</span> <span class="n">address</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">Server</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">url</span> <span class="o">=</span> <span class="n">url</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">address</span> <span class="o">=</span> <span class="n">address</span> |
| |
| <span class="k">def</span> <span class="nf">on_start</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"Listening on"</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">container</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">conn</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">connect</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">receiver</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">create_receiver</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">conn</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">address</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">server</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">create_sender</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">conn</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">on_message</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"Received"</span><span class="p">,</span> <span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">server</span><span class="o">.</span><span class="n">send</span><span class="p">(</span><span class="n">Message</span><span class="p">(</span><span class="n">address</span><span class="o">=</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">reply_to</span><span class="p">,</span> <span class="n">body</span><span class="o">=</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">body</span><span class="o">.</span><span class="n">upper</span><span class="p">(),</span> |
| <span class="n">correlation_id</span><span class="o">=</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">correlation_id</span><span class="p">))</span> |
| |
| <span class="n">parser</span> <span class="o">=</span> <span class="n">optparse</span><span class="o">.</span><span class="n">OptionParser</span><span class="p">(</span><span class="n">usage</span><span class="o">=</span><span class="s2">"usage: %prog [options]"</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_option</span><span class="p">(</span><span class="s2">"-a"</span><span class="p">,</span> <span class="s2">"--address"</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="s2">"localhost:5672/examples"</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s2">"address from which messages are received (default </span><span class="si">%d</span><span class="s2">efault)"</span><span class="p">)</span> |
| <span class="n">opts</span><span class="p">,</span> <span class="n">args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_args</span><span class="p">()</span> |
| |
| <span class="n">url</span> <span class="o">=</span> <span class="n">Url</span><span class="p">(</span><span class="n">opts</span><span class="o">.</span><span class="n">address</span><span class="p">)</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">Container</span><span class="p">(</span><span class="n">Server</span><span class="p">(</span><span class="n">url</span><span class="p">,</span> <span class="n">url</span><span class="o">.</span><span class="n">path</span><span class="p">))</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> |
| <span class="k">except</span> <span class="ne">KeyboardInterrupt</span><span class="p">:</span> <span class="k">pass</span> |
| |
| |
| |
| </pre></div> |
| </td></tr></table></div> |
| <p>The code here is not too different from the simple receiver |
| example. When we receive a request however, we look at the |
| <a class="reference internal" href="proton.html#proton.Message.reply_to" title="proton.Message.reply_to"><code class="xref py py-attr docutils literal notranslate"><span class="pre">reply_to</span></code></a> address on the |
| <a class="reference internal" href="proton.html#proton.Message" title="proton.Message"><code class="xref py py-class docutils literal notranslate"><span class="pre">Message</span></code></a> and create a sender for that over which to |
| send the response. We’ll cache the senders in case we get further |
| requests with the same reply_to.</p> |
| <p>Now let’s create a simple client to test this service out.</p> |
| <div class="highlight-default notranslate"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre> 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
| 11 |
| 12 |
| 13 |
| 14 |
| 15 |
| 16 |
| 17 |
| 18 |
| 19 |
| 20 |
| 21 |
| 22 |
| 23 |
| 24 |
| 25 |
| 26 |
| 27 |
| 28 |
| 29 |
| 30 |
| 31 |
| 32 |
| 33 |
| 34 |
| 35 |
| 36 |
| 37 |
| 38 |
| 39 |
| 40 |
| 41 |
| 42 |
| 43 |
| 44 |
| 45</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">__future__</span> <span class="kn">import</span> <span class="n">print_function</span><span class="p">,</span> <span class="n">unicode_literals</span> |
| <span class="kn">import</span> <span class="nn">optparse</span> |
| <span class="kn">from</span> <span class="nn">proton</span> <span class="kn">import</span> <span class="n">Message</span> |
| <span class="kn">from</span> <span class="nn">proton.handlers</span> <span class="kn">import</span> <span class="n">MessagingHandler</span> |
| <span class="kn">from</span> <span class="nn">proton.reactor</span> <span class="kn">import</span> <span class="n">Container</span><span class="p">,</span> <span class="n">DynamicNodeProperties</span> |
| |
| <span class="k">class</span> <span class="nc">Client</span><span class="p">(</span><span class="n">MessagingHandler</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">url</span><span class="p">,</span> <span class="n">requests</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">Client</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">url</span> <span class="o">=</span> <span class="n">url</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">requests</span> <span class="o">=</span> <span class="n">requests</span> |
| |
| <span class="k">def</span> <span class="nf">on_start</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sender</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">create_sender</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">receiver</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">create_receiver</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">sender</span><span class="o">.</span><span class="n">connection</span><span class="p">,</span> <span class="kc">None</span><span class="p">,</span> <span class="n">dynamic</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">next_request</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">receiver</span><span class="o">.</span><span class="n">remote_source</span><span class="o">.</span><span class="n">address</span><span class="p">:</span> |
| <span class="n">req</span> <span class="o">=</span> <span class="n">Message</span><span class="p">(</span><span class="n">reply_to</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">receiver</span><span class="o">.</span><span class="n">remote_source</span><span class="o">.</span><span class="n">address</span><span class="p">,</span> <span class="n">body</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">requests</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sender</span><span class="o">.</span><span class="n">send</span><span class="p">(</span><span class="n">req</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">on_link_opened</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">receiver</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">receiver</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">next_request</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">on_message</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"</span><span class="si">%s</span><span class="s2"> => </span><span class="si">%s</span><span class="s2">"</span> <span class="o">%</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">requests</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="mi">0</span><span class="p">),</span> <span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">body</span><span class="p">))</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">requests</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">next_request</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| |
| <span class="n">REQUESTS</span><span class="o">=</span> <span class="p">[</span><span class="s2">"Twas brillig, and the slithy toves"</span><span class="p">,</span> |
| <span class="s2">"Did gire and gymble in the wabe."</span><span class="p">,</span> |
| <span class="s2">"All mimsy were the borogroves,"</span><span class="p">,</span> |
| <span class="s2">"And the mome raths outgrabe."</span><span class="p">]</span> |
| |
| <span class="n">parser</span> <span class="o">=</span> <span class="n">optparse</span><span class="o">.</span><span class="n">OptionParser</span><span class="p">(</span><span class="n">usage</span><span class="o">=</span><span class="s2">"usage: %prog [options]"</span><span class="p">,</span> |
| <span class="n">description</span><span class="o">=</span><span class="s2">"Send requests to the supplied address and print responses."</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_option</span><span class="p">(</span><span class="s2">"-a"</span><span class="p">,</span> <span class="s2">"--address"</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="s2">"localhost:5672/examples"</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s2">"address to which messages are sent (default </span><span class="si">%d</span><span class="s2">efault)"</span><span class="p">)</span> |
| <span class="n">opts</span><span class="p">,</span> <span class="n">args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_args</span><span class="p">()</span> |
| |
| <span class="n">Container</span><span class="p">(</span><span class="n">Client</span><span class="p">(</span><span class="n">opts</span><span class="o">.</span><span class="n">address</span><span class="p">,</span> <span class="n">args</span> <span class="ow">or</span> <span class="n">REQUESTS</span><span class="p">))</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> |
| |
| </pre></div> |
| </td></tr></table></div> |
| <p>As well as sending requests, we need to be able to get back the |
| responses. We create a receiver for that (see line 15), but we don’t |
| specify an address, we set the dynamic option which tells the broker |
| to create a temporary address over which we can receive our responses.</p> |
| <p>We need to use the address allocated by the broker as the reply_to |
| address of our requests, so we can’t send them until the broker has |
| confirmed our receiving link has been set up (at which point we will |
| have our allocated address). To do that, we add an |
| <code class="docutils literal notranslate"><span class="pre">on_link_opened()</span></code> method to our handler class, and if the link |
| associated with the event is the receiver, we use that as the trigger to |
| send our first request.</p> |
| <p>Again, we could avoid having any intermediary process here if we |
| wished. The following code implementas a server to which the client |
| above could connect directly without any need for a broker or similar.</p> |
| <div class="highlight-default notranslate"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre> 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
| 11 |
| 12 |
| 13 |
| 14 |
| 15 |
| 16 |
| 17 |
| 18 |
| 19 |
| 20 |
| 21 |
| 22 |
| 23 |
| 24 |
| 25 |
| 26 |
| 27 |
| 28 |
| 29 |
| 30 |
| 31 |
| 32 |
| 33 |
| 34 |
| 35 |
| 36 |
| 37 |
| 38 |
| 39 |
| 40 |
| 41 |
| 42 |
| 43 |
| 44 |
| 45 |
| 46</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">__future__</span> <span class="kn">import</span> <span class="n">print_function</span> |
| |
| <span class="kn">import</span> <span class="nn">uuid</span> |
| <span class="kn">from</span> <span class="nn">proton</span> <span class="kn">import</span> <span class="n">Message</span> |
| <span class="kn">from</span> <span class="nn">proton.handlers</span> <span class="kn">import</span> <span class="n">MessagingHandler</span> |
| <span class="kn">from</span> <span class="nn">proton.reactor</span> <span class="kn">import</span> <span class="n">Container</span> |
| |
| <span class="k">class</span> <span class="nc">Server</span><span class="p">(</span><span class="n">MessagingHandler</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">url</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">Server</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">url</span> <span class="o">=</span> <span class="n">url</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">senders</span> <span class="o">=</span> <span class="p">{}</span> |
| |
| <span class="k">def</span> <span class="nf">on_start</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"Listening on"</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">container</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">container</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">acceptor</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">listen</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">on_link_opening</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">is_sender</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_source</span> <span class="ow">and</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_source</span><span class="o">.</span><span class="n">dynamic</span><span class="p">:</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">senders</span><span class="p">[</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span><span class="p">]</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span> |
| <span class="k">elif</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_target</span> <span class="ow">and</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_target</span><span class="o">.</span><span class="n">address</span><span class="p">:</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">target</span><span class="o">.</span><span class="n">address</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_target</span><span class="o">.</span><span class="n">address</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">senders</span><span class="p">[</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_target</span><span class="o">.</span><span class="n">address</span><span class="p">]</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span> |
| <span class="k">elif</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_source</span><span class="p">:</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_source</span><span class="o">.</span><span class="n">address</span> |
| <span class="k">elif</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_target</span><span class="p">:</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">target</span><span class="o">.</span><span class="n">address</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_target</span><span class="o">.</span><span class="n">address</span> |
| |
| <span class="k">def</span> <span class="nf">on_message</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"Received"</span><span class="p">,</span> <span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="p">)</span> |
| <span class="n">sender</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">senders</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">reply_to</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">sender</span><span class="p">:</span> |
| <span class="nb">print</span><span class="p">(</span><span class="s2">"No link for reply"</span><span class="p">)</span> |
| <span class="k">return</span> |
| <span class="n">sender</span><span class="o">.</span><span class="n">send</span><span class="p">(</span><span class="n">Message</span><span class="p">(</span><span class="n">address</span><span class="o">=</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">reply_to</span><span class="p">,</span> <span class="n">body</span><span class="o">=</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">body</span><span class="o">.</span><span class="n">upper</span><span class="p">(),</span> |
| <span class="n">correlation_id</span><span class="o">=</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">correlation_id</span><span class="p">))</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">Container</span><span class="p">(</span><span class="n">Server</span><span class="p">(</span><span class="s2">"0.0.0.0:8888"</span><span class="p">))</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> |
| <span class="k">except</span> <span class="ne">KeyboardInterrupt</span><span class="p">:</span> <span class="k">pass</span> |
| |
| |
| |
| </pre></div> |
| </td></tr></table></div> |
| <p>Though this requires some more extensive changes than the simple |
| sending and receiving examples, the essence of the program is still |
| the same. Here though, rather than the server establishing a link for |
| the response, it relies on the link that the client established, since |
| that now comes in directly to the server process.</p> |
| <div class="section" id="miscellaneous"> |
| <h3>Miscellaneous<a class="headerlink" href="#miscellaneous" title="Permalink to this headline">¶</a></h3> |
| <p>Many brokers offer the ability to consume messages based on a |
| ‘selector’ that defines which messages are of interest based on |
| particular values of the headers. The following example shows how that |
| can be achieved:</p> |
| <div class="highlight-default notranslate"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre> 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
| 11 |
| 12 |
| 13 |
| 14 |
| 15 |
| 16 |
| 17 |
| 18 |
| 19 |
| 20 |
| 21 |
| 22 |
| 23 |
| 24 |
| 25 |
| 26 |
| 27 |
| 28 |
| 29 |
| 30 |
| 31 |
| 32 |
| 33 |
| 34 |
| 35 |
| 36 |
| 37</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">__future__</span> <span class="kn">import</span> <span class="n">print_function</span><span class="p">,</span> <span class="n">unicode_literals</span> |
| <span class="kn">import</span> <span class="nn">optparse</span> |
| <span class="kn">from</span> <span class="nn">proton</span> <span class="kn">import</span> <span class="n">Url</span> |
| <span class="kn">from</span> <span class="nn">proton.reactor</span> <span class="kn">import</span> <span class="n">Container</span><span class="p">,</span> <span class="n">Selector</span> |
| <span class="kn">from</span> <span class="nn">proton.handlers</span> <span class="kn">import</span> <span class="n">MessagingHandler</span> |
| |
| <span class="k">class</span> <span class="nc">Recv</span><span class="p">(</span><span class="n">MessagingHandler</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">url</span><span class="p">,</span> <span class="n">count</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">Recv</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">url</span> <span class="o">=</span> <span class="n">Url</span><span class="p">(</span><span class="n">url</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">expected</span> <span class="o">=</span> <span class="n">count</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">received</span> <span class="o">=</span> <span class="mi">0</span> |
| |
| <span class="k">def</span> <span class="nf">on_start</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="n">conn</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">connect</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">)</span> |
| <span class="hll"> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">create_receiver</span><span class="p">(</span><span class="n">conn</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="o">.</span><span class="n">path</span><span class="p">,</span> <span class="n">options</span><span class="o">=</span><span class="n">Selector</span><span class="p">(</span><span class="s2">"colour = 'green'"</span><span class="p">))</span> |
| </span> |
| <span class="k">def</span> <span class="nf">on_message</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="nb">print</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">body</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">received</span> <span class="o">+=</span> <span class="mi">1</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">received</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">expected</span><span class="p">:</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">receiver</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| |
| <span class="n">parser</span> <span class="o">=</span> <span class="n">optparse</span><span class="o">.</span><span class="n">OptionParser</span><span class="p">(</span><span class="n">usage</span><span class="o">=</span><span class="s2">"usage: %prog [options]"</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_option</span><span class="p">(</span><span class="s2">"-a"</span><span class="p">,</span> <span class="s2">"--address"</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="s2">"localhost:5672/examples"</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s2">"address from which messages are received (default </span><span class="si">%d</span><span class="s2">efault)"</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_option</span><span class="p">(</span><span class="s2">"-m"</span><span class="p">,</span> <span class="s2">"--messages"</span><span class="p">,</span> <span class="nb">type</span><span class="o">=</span><span class="s2">"int"</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s2">"number of messages to receive; 0 receives indefinitely (default </span><span class="si">%d</span><span class="s2">efault)"</span><span class="p">)</span> |
| <span class="n">opts</span><span class="p">,</span> <span class="n">args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_args</span><span class="p">()</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">Container</span><span class="p">(</span><span class="n">Recv</span><span class="p">(</span><span class="n">opts</span><span class="o">.</span><span class="n">address</span><span class="p">,</span> <span class="n">opts</span><span class="o">.</span><span class="n">messages</span><span class="p">))</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> |
| <span class="k">except</span> <span class="ne">KeyboardInterrupt</span><span class="p">:</span> <span class="k">pass</span> |
| |
| |
| |
| </pre></div> |
| </td></tr></table></div> |
| <p>When creating the receiver, we specify a Selector object as an |
| option. The options argument can take a single object or a |
| list. Another option that is sometimes of interest when using a broker |
| is the ability to ‘browse’ the messages on a queue, rather than |
| consuming them. This is done in AMQP by specifying a distribution mode |
| of ‘copy’ (instead of ‘move’ which is the expected default for |
| queues). An example of that is shown next:</p> |
| <div class="highlight-default notranslate"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre> 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
| 11 |
| 12 |
| 13 |
| 14 |
| 15 |
| 16 |
| 17 |
| 18 |
| 19 |
| 20 |
| 21 |
| 22 |
| 23</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">__future__</span> <span class="kn">import</span> <span class="n">print_function</span> |
| <span class="kn">from</span> <span class="nn">proton.reactor</span> <span class="kn">import</span> <span class="n">Container</span><span class="p">,</span> <span class="n">Copy</span> |
| <span class="kn">from</span> <span class="nn">proton.handlers</span> <span class="kn">import</span> <span class="n">MessagingHandler</span> |
| |
| <span class="k">class</span> <span class="nc">Recv</span><span class="p">(</span><span class="n">MessagingHandler</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">Recv</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">on_start</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="n">conn</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">connect</span><span class="p">(</span><span class="s2">"localhost:5672"</span><span class="p">)</span> |
| <span class="hll"> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">create_receiver</span><span class="p">(</span><span class="n">conn</span><span class="p">,</span> <span class="s2">"examples"</span><span class="p">,</span> <span class="n">options</span><span class="o">=</span><span class="n">Copy</span><span class="p">())</span> |
| </span> |
| <span class="k">def</span> <span class="nf">on_message</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="nb">print</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">receiver</span><span class="o">.</span><span class="n">queued</span> <span class="o">==</span> <span class="mi">0</span> <span class="ow">and</span> <span class="n">event</span><span class="o">.</span><span class="n">receiver</span><span class="o">.</span><span class="n">drained</span><span class="p">:</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">Container</span><span class="p">(</span><span class="n">Recv</span><span class="p">())</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> |
| <span class="k">except</span> <span class="ne">KeyboardInterrupt</span><span class="p">:</span> <span class="k">pass</span> |
| |
| |
| |
| </pre></div> |
| </td></tr></table></div> |
| </div> |
| </div> |
| </div> |
| |
| |
| <div class="clearer"></div> |
| </div> |
| </div> |
| </div> |
| <div class="sphinxsidebar" role="navigation" aria-label="main navigation"> |
| <div class="sphinxsidebarwrapper"> |
| <h3><a href="index.html">Table of Contents</a></h3> |
| <ul> |
| <li><a class="reference internal" href="#">Tutorial</a><ul> |
| <li><a class="reference internal" href="#hello-world">Hello World!</a></li> |
| <li><a class="reference internal" href="#hello-world-direct">Hello World, Direct!</a></li> |
| <li><a class="reference internal" href="#asynchronous-send-and-receive">Asynchronous Send and Receive</a></li> |
| <li><a class="reference internal" href="#request-response">Request/Response</a><ul> |
| <li><a class="reference internal" href="#miscellaneous">Miscellaneous</a></li> |
| </ul> |
| </li> |
| </ul> |
| </li> |
| </ul> |
| |
| <h4>Previous topic</h4> |
| <p class="topless"><a href="types.html" |
| title="previous chapter">AMQP Types</a></p> |
| <div role="note" aria-label="source link"> |
| <h3>This Page</h3> |
| <ul class="this-page-menu"> |
| <li><a href="_sources/tutorial.rst.txt" |
| rel="nofollow">Show Source</a></li> |
| </ul> |
| </div> |
| <div id="searchbox" style="display: none" role="search"> |
| <h3 id="searchlabel">Quick search</h3> |
| <div class="searchformwrapper"> |
| <form class="search" action="search.html" method="get"> |
| <input type="text" name="q" aria-labelledby="searchlabel" /> |
| <input type="submit" value="Go" /> |
| </form> |
| </div> |
| </div> |
| <script>$('#searchbox').show(0);</script> |
| </div> |
| </div> |
| <div class="clearer"></div> |
| </div> |
| <div class="related" role="navigation" aria-label="related navigation"> |
| <h3>Navigation</h3> |
| <ul> |
| <li class="right" style="margin-right: 10px"> |
| <a href="genindex.html" title="General Index" |
| >index</a></li> |
| <li class="right" > |
| <a href="types.html" title="AMQP Types" |
| >previous</a> |</li> |
| <li class="nav-item nav-item-0"><a href="index.html">Qpid Proton Python API 0.32.0 documentation</a> »</li> |
| <li class="nav-item nav-item-this"><a href="">Tutorial</a></li> |
| </ul> |
| </div> |
| <div class="footer" role="contentinfo"> |
| © Copyright 2019, Apache Qpid Contributors. |
| Created using <a href="https://www.sphinx-doc.org/">Sphinx</a> 3.2.1. |
| </div> |
| </body> |
| </html> |