| |
| <h1>broker.py</h1> |
| <div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">collections</span> |
| <span class="kn">import</span> <span class="nn">optparse</span> |
| <span class="kn">import</span> <span class="nn">uuid</span> |
| |
| <span class="kn">import</span> <span class="nn">unittest</span> |
| |
| <span class="kn">from</span> <span class="nn">proton</span> <span class="kn">import</span> <span class="n">Endpoint</span> |
| <span class="kn">from</span> <span class="nn">proton.handlers</span> <span class="kn">import</span> <span class="n">MessagingHandler</span> |
| <span class="kn">from</span> <span class="nn">proton.reactor</span> <span class="kn">import</span> <span class="n">Container</span> |
| |
| |
| <span class="k">class</span> <span class="nc">Queue</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dynamic</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dynamic</span> <span class="o">=</span> <span class="n">dynamic</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">queue</span> <span class="o">=</span> <span class="n">collections</span><span class="o">.</span><span class="n">deque</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">consumers</span> <span class="o">=</span> <span class="p">[]</span> |
| |
| <span class="k">def</span> <span class="nf">subscribe</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">consumer</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">consumers</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">consumer</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">unsubscribe</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">consumer</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> :return: True if the queue is to be deleted</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">consumer</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">consumers</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">consumers</span><span class="o">.</span><span class="n">remove</span><span class="p">(</span><span class="n">consumer</span><span class="p">)</span> |
| <span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">consumers</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span> <span class="ow">and</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dynamic</span> <span class="ow">or</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">queue</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">publish</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">message</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">queue</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">message</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dispatch</span><span class="p">()</span> |
| |
| <span class="k">def</span> <span class="nf">dispatch</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">consumer</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">consumer</span><span class="p">:</span> |
| <span class="n">c</span> <span class="o">=</span> <span class="p">[</span><span class="n">consumer</span><span class="p">]</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">c</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">consumers</span> |
| <span class="k">while</span> <span class="bp">self</span><span class="o">.</span><span class="n">_deliver_to</span><span class="p">(</span><span class="n">c</span><span class="p">):</span> |
| <span class="k">pass</span> |
| |
| <span class="k">def</span> <span class="nf">_deliver_to</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">consumers</span><span class="p">):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="kc">False</span> |
| <span class="k">for</span> <span class="n">c</span> <span class="ow">in</span> <span class="n">consumers</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">c</span><span class="o">.</span><span class="n">credit</span><span class="p">:</span> |
| <span class="n">c</span><span class="o">.</span><span class="n">send</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">queue</span><span class="o">.</span><span class="n">popleft</span><span class="p">())</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="kc">True</span> |
| <span class="k">return</span> <span class="n">result</span> |
| <span class="k">except</span> <span class="ne">IndexError</span><span class="p">:</span> <span class="c1"># no more messages</span> |
| <span class="k">return</span> <span class="kc">False</span> |
| |
| |
| <span class="k">class</span> <span class="nc">Broker</span><span class="p">(</span><span class="n">MessagingHandler</span><span class="p">):</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">url</span><span class="p">):</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">Broker</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">url</span> <span class="o">=</span> <span class="n">url</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">queues</span> <span class="o">=</span> <span class="p">{}</span> |
| |
| <span class="k">def</span> <span class="nf">on_start</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">acceptor</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">container</span><span class="o">.</span><span class="n">listen</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">url</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">_queue</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">address</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">address</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">queues</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">queues</span><span class="p">[</span><span class="n">address</span><span class="p">]</span> <span class="o">=</span> <span class="n">Queue</span><span class="p">()</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">queues</span><span class="p">[</span><span class="n">address</span><span class="p">]</span> |
| |
| <span class="k">def</span> <span class="nf">on_link_opening</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">is_sender</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_source</span><span class="o">.</span><span class="n">dynamic</span><span class="p">:</span> |
| <span class="n">address</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span> <span class="o">=</span> <span class="n">address</span> |
| <span class="n">q</span> <span class="o">=</span> <span class="n">Queue</span><span class="p">(</span><span class="kc">True</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">queues</span><span class="p">[</span><span class="n">address</span><span class="p">]</span> <span class="o">=</span> <span class="n">q</span> |
| <span class="n">q</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_source</span><span class="o">.</span><span class="n">address</span><span class="p">:</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_source</span><span class="o">.</span><span class="n">address</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_queue</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span><span class="p">)</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_target</span><span class="o">.</span><span class="n">address</span><span class="p">:</span> |
| <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">target</span><span class="o">.</span><span class="n">address</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">remote_target</span><span class="o">.</span><span class="n">address</span> |
| |
| <span class="k">def</span> <span class="nf">_unsubscribe</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">link</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">queues</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">queues</span><span class="p">[</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span><span class="p">]</span><span class="o">.</span><span class="n">unsubscribe</span><span class="p">(</span><span class="n">link</span><span class="p">):</span> |
| <span class="k">del</span> <span class="bp">self</span><span class="o">.</span><span class="n">queues</span><span class="p">[</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span><span class="p">]</span> |
| |
| <span class="k">def</span> <span class="nf">on_link_closing</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">is_sender</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_unsubscribe</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">on_connection_closing</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">remove_stale_consumers</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">on_disconnected</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">remove_stale_consumers</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">connection</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">remove_stale_consumers</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">connection</span><span class="p">):</span> |
| <span class="n">l</span> <span class="o">=</span> <span class="n">connection</span><span class="o">.</span><span class="n">link_head</span><span class="p">(</span><span class="n">Endpoint</span><span class="o">.</span><span class="n">REMOTE_ACTIVE</span><span class="p">)</span> |
| <span class="k">while</span> <span class="n">l</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">l</span><span class="o">.</span><span class="n">is_sender</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_unsubscribe</span><span class="p">(</span><span class="n">l</span><span class="p">)</span> |
| <span class="n">l</span> <span class="o">=</span> <span class="n">l</span><span class="o">.</span><span class="n">next</span><span class="p">(</span><span class="n">Endpoint</span><span class="o">.</span><span class="n">REMOTE_ACTIVE</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">on_sendable</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_queue</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">source</span><span class="o">.</span><span class="n">address</span><span class="p">)</span><span class="o">.</span><span class="n">dispatch</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">on_message</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="n">address</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">link</span><span class="o">.</span><span class="n">target</span><span class="o">.</span><span class="n">address</span> |
| <span class="k">if</span> <span class="n">address</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">address</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="o">.</span><span class="n">address</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_queue</span><span class="p">(</span><span class="n">address</span><span class="p">)</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="n">event</span><span class="o">.</span><span class="n">message</span><span class="p">)</span> |
| |
| |
| <span class="k">def</span> <span class="nf">main</span><span class="p">():</span> |
| <span class="n">parser</span> <span class="o">=</span> <span class="n">optparse</span><span class="o">.</span><span class="n">OptionParser</span><span class="p">(</span><span class="n">usage</span><span class="o">=</span><span class="s2">"usage: %prog [options]"</span><span class="p">)</span> |
| <span class="n">parser</span><span class="o">.</span><span class="n">add_option</span><span class="p">(</span><span class="s2">"-a"</span><span class="p">,</span> <span class="s2">"--address"</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="s2">"localhost:5672"</span><span class="p">,</span> |
| <span class="n">help</span><span class="o">=</span><span class="s2">"address router listens on (default </span><span class="si">%d</span><span class="s2">efault)"</span><span class="p">)</span> |
| <span class="n">opts</span><span class="p">,</span> <span class="n">args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_args</span><span class="p">()</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">Container</span><span class="p">(</span><span class="n">Broker</span><span class="p">(</span><span class="n">opts</span><span class="o">.</span><span class="n">address</span><span class="p">))</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> |
| <span class="k">except</span> <span class="ne">KeyboardInterrupt</span><span class="p">:</span> |
| <span class="k">pass</span> |
| |
| |
| <span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s1">'__main__'</span><span class="p">:</span> |
| <span class="n">main</span><span class="p">()</span> |
| </pre></div> |
| |
| <p><a href="broker.py">Download this file</a></p> |