blob: d56e57b85b99f51dab05a4760e1c4a9e215aa09c [file] [log] [blame]
<h1>broker.py</h1>
<div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">collections</span>
<span class="kn">import</span> <span class="nn">optparse</span>
<span class="kn">import</span> <span class="nn">uuid</span>
<span class="kn">import</span> <span class="nn">unittest</span>
<span class="kn">from</span> <span class="nn">proton</span> <span class="kn">import</span> <span class="n">Endpoint</span>
<span class="kn">from</span> <span class="nn">proton.handlers</span> <span class="kn">import</span> <span class="n">MessagingHandler</span>
<span class="kn">from</span> <span class="nn">proton.reactor</span> <span class="kn">import</span> <span class="n">Container</span>
<span class="k">class</span> <span class="nc">Queue</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dynamic</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dynamic</span> <span class="o">=</span> <span class="n">dynamic</span>
<span class="bp">self</span><span class="o">.</span><span class="n">queue</span> <span class="o">=</span> <span class="n">collections</span><span class="o">.</span><span class="n">deque</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">consumers</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">def</span> <span class="nf">subscribe</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">consumer</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">consumers</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">consumer</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">unsubscribe</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">consumer</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> :return: True if the queue is to be deleted</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">consumer</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">consumers</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">consumers</span><span class="o">.</span><span class="n">remove</span><span class="p">(</span><span class="n">consumer</span><span class="p">)</span>
<span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">consumers</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span> <span class="ow">and</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dynamic</span> <span class="ow">or</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">queue</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">publish</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">message</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">queue</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">message</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dispatch</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">dispatch</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">consumer</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="n">consumer</span><span class="p">:</span>
<span class="n">c</span> <span class="o">=</span> <span class="p">[</span><span class="n">consumer</span><span class="p">]</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">c</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">consumers</span>
<span class="k">while</span> <span class="bp">self</span><span class="o">.</span><span class="n">_deliver_to</span><span class="p">(</span><span class="n">c</span><span class="p">):</span>
<span class="k">pass</span>
<span class="k">def</span> <span class="nf">_deliver_to</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">consumers</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">result</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">for</span> <span class="n">c</span> <span class="ow">in</span> <span class="n">consumers</span><span class="p">:</span>
<span class="k">if</span> <span class="n">c</span><span class="o">.</span><span class="n">credit</span><span class="p">:</span>
<span class="n">c</span><span class="o">.</span><span class="n">send</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">queue</span><span class="o">.</span><span class="n">popleft</span><span class="p">())</span>
<span class="n">result</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">return</span> <span class="n">result</span>
<span class="k">except</span> <span class="ne">IndexError</span><span class="p">:</span> <span class="c1"># no more messages</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="k">class</span> <span class="nc">Broker</span><span class="p">(</span><span class="n">MessagingHandler</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">url</span><span class="p">):</span>
<span class="nb">super</span><span class="p">(</span><span class="n">Broker</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">url</span> <span class="o">=</span> <span class="n">url</span>
<span class="bp">self</span><span class="o">.</span><span class="n">queues</span> <span class="o">=</span> <span class="p">{}</span>
<span class="k">def</span> <span class="nf">on_start</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">acceptor</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">listen</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_queue</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">address</span><span class="p">):</span>
<span class="k">if</span> <span class="n">address</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">queues</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">queues</span><span class="p">[</span><span class="n">address</span><span class="p">]</span> <span class="o">=</span> <span class="n">Queue</span><span class="p">()</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">queues</span><span class="p">[</span><span class="n">address</span><span class="p">]</span>
<span class="k">def</span> <span class="nf">on_link_opening</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span>
<span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">is_sender</span><span class="p">:</span>
<span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_source</span><span class="o">.</span><span class="n">dynamic</span><span class="p">:</span>
<span class="n">address</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span>
<span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span> <span class="o">=</span> <span class="n">address</span>
<span class="n">q</span> <span class="o">=</span> <span class="n">Queue</span><span class="p">(</span><span class="kc">True</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">queues</span><span class="p">[</span><span class="n">address</span><span class="p">]</span> <span class="o">=</span> <span class="n">q</span>
<span class="n">q</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_source</span><span class="o">.</span><span class="n">address</span><span class="p">:</span>
<span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_source</span><span class="o">.</span><span class="n">address</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_queue</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span><span class="p">)</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_target</span><span class="o">.</span><span class="n">address</span><span class="p">:</span>
<span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">target</span><span class="o">.</span><span class="n">address</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_target</span><span class="o">.</span><span class="n">address</span>
<span class="k">def</span> <span class="nf">_unsubscribe</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">link</span><span class="p">):</span>
<span class="k">if</span> <span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">queues</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">queues</span><span class="p">[</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span><span class="p">]</span><span class="o">.</span><span class="n">unsubscribe</span><span class="p">(</span><span class="n">link</span><span class="p">):</span>
<span class="k">del</span> <span class="bp">self</span><span class="o">.</span><span class="n">queues</span><span class="p">[</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span><span class="p">]</span>
<span class="k">def</span> <span class="nf">on_link_closing</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span>
<span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">is_sender</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_unsubscribe</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">on_connection_closing</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">remove_stale_consumers</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">on_disconnected</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">remove_stale_consumers</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">remove_stale_consumers</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">connection</span><span class="p">):</span>
<span class="n">l</span> <span class="o">=</span> <span class="n">connection</span><span class="o">.</span><span class="n">link_head</span><span class="p">(</span><span class="n">Endpoint</span><span class="o">.</span><span class="n">REMOTE_ACTIVE</span><span class="p">)</span>
<span class="k">while</span> <span class="n">l</span><span class="p">:</span>
<span class="k">if</span> <span class="n">l</span><span class="o">.</span><span class="n">is_sender</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_unsubscribe</span><span class="p">(</span><span class="n">l</span><span class="p">)</span>
<span class="n">l</span> <span class="o">=</span> <span class="n">l</span><span class="o">.</span><span class="n">next</span><span class="p">(</span><span class="n">Endpoint</span><span class="o">.</span><span class="n">REMOTE_ACTIVE</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">on_sendable</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_queue</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span><span class="p">)</span><span class="o">.</span><span class="n">dispatch</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">on_message</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span>
<span class="n">address</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">target</span><span class="o">.</span><span class="n">address</span>
<span class="k">if</span> <span class="n">address</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">address</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">address</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_queue</span><span class="p">(</span><span class="n">address</span><span class="p">)</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">main</span><span class="p">():</span>
<span class="n">parser</span> <span class="o">=</span> <span class="n">optparse</span><span class="o">.</span><span class="n">OptionParser</span><span class="p">(</span><span class="n">usage</span><span class="o">=</span><span class="s2">&quot;usage: %prog [options]&quot;</span><span class="p">)</span>
<span class="n">parser</span><span class="o">.</span><span class="n">add_option</span><span class="p">(</span><span class="s2">&quot;-a&quot;</span><span class="p">,</span> <span class="s2">&quot;--address&quot;</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="s2">&quot;localhost:5672&quot;</span><span class="p">,</span>
<span class="n">help</span><span class="o">=</span><span class="s2">&quot;address router listens on (default </span><span class="si">%d</span><span class="s2">efault)&quot;</span><span class="p">)</span>
<span class="n">opts</span><span class="p">,</span> <span class="n">args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_args</span><span class="p">()</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">Container</span><span class="p">(</span><span class="n">Broker</span><span class="p">(</span><span class="n">opts</span><span class="o">.</span><span class="n">address</span><span class="p">))</span><span class="o">.</span><span class="n">run</span><span class="p">()</span>
<span class="k">except</span> <span class="ne">KeyboardInterrupt</span><span class="p">:</span>
<span class="k">pass</span>
<span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s1">&#39;__main__&#39;</span><span class="p">:</span>
<span class="n">main</span><span class="p">()</span>
</pre></div>
<p><a href="broker.py">Download this file</a></p>