blob: 849691777e37aa6875d47fe10d1f0f6fa81f232e [file] [log] [blame]
<h1>scheduled_send.cpp</h1>
<div class="highlight"><pre><span></span><span class="cp">#include</span> <span class="cpf">&quot;options.hpp&quot;</span><span class="cp"></span>
<span class="cp">#include</span> <span class="cpf">&lt;proton/container.hpp&gt;</span><span class="cp"></span>
<span class="cp">#include</span> <span class="cpf">&lt;proton/connection.hpp&gt;</span><span class="cp"></span>
<span class="cp">#include</span> <span class="cpf">&lt;proton/message.hpp&gt;</span><span class="cp"></span>
<span class="cp">#include</span> <span class="cpf">&lt;proton/messaging_handler.hpp&gt;</span><span class="cp"></span>
<span class="cp">#include</span> <span class="cpf">&lt;proton/sender.hpp&gt;</span><span class="cp"></span>
<span class="cp">#include</span> <span class="cpf">&lt;proton/tracker.hpp&gt;</span><span class="cp"></span>
<span class="cp">#include</span> <span class="cpf">&lt;proton/work_queue.hpp&gt;</span><span class="cp"></span>
<span class="cp">#include</span> <span class="cpf">&lt;iostream&gt;</span><span class="cp"></span>
<span class="cp">#include</span> <span class="cpf">&quot;fake_cpp11.hpp&quot;</span><span class="cp"></span>
<span class="c1">// Send messages at a constant rate one per interval. cancel after a timeout.</span>
<span class="k">class</span> <span class="nc">scheduled_sender</span> <span class="o">:</span> <span class="k">public</span> <span class="n">proton</span><span class="o">::</span><span class="n">messaging_handler</span> <span class="p">{</span>
<span class="k">private</span><span class="o">:</span>
<span class="n">std</span><span class="o">::</span><span class="n">string</span> <span class="n">url</span><span class="p">;</span>
<span class="n">proton</span><span class="o">::</span><span class="n">sender</span> <span class="n">sender</span><span class="p">;</span>
<span class="n">proton</span><span class="o">::</span><span class="n">duration</span> <span class="n">interval</span><span class="p">,</span> <span class="n">timeout</span><span class="p">;</span>
<span class="n">proton</span><span class="o">::</span><span class="n">work_queue</span><span class="o">*</span> <span class="n">work_queue</span><span class="p">;</span>
<span class="kt">bool</span> <span class="n">ready</span><span class="p">,</span> <span class="n">canceled</span><span class="p">;</span>
<span class="k">public</span><span class="o">:</span>
<span class="n">scheduled_sender</span><span class="p">(</span><span class="k">const</span> <span class="n">std</span><span class="o">::</span><span class="n">string</span> <span class="o">&amp;</span><span class="n">s</span><span class="p">,</span> <span class="kt">double</span> <span class="n">d</span><span class="p">,</span> <span class="kt">double</span> <span class="n">t</span><span class="p">)</span> <span class="o">:</span>
<span class="n">url</span><span class="p">(</span><span class="n">s</span><span class="p">),</span>
<span class="n">interval</span><span class="p">(</span><span class="kt">int</span><span class="p">(</span><span class="n">d</span><span class="o">*</span><span class="n">proton</span><span class="o">::</span><span class="n">duration</span><span class="o">::</span><span class="n">SECOND</span><span class="p">.</span><span class="n">milliseconds</span><span class="p">())),</span> <span class="c1">// Send interval.</span>
<span class="n">timeout</span><span class="p">(</span><span class="kt">int</span><span class="p">(</span><span class="n">t</span><span class="o">*</span><span class="n">proton</span><span class="o">::</span><span class="n">duration</span><span class="o">::</span><span class="n">SECOND</span><span class="p">.</span><span class="n">milliseconds</span><span class="p">())),</span> <span class="c1">// Cancel after timeout.</span>
<span class="n">work_queue</span><span class="p">(</span><span class="mi">0</span><span class="p">),</span>
<span class="n">ready</span><span class="p">(</span><span class="nb">true</span><span class="p">),</span> <span class="c1">// Ready to send.</span>
<span class="n">canceled</span><span class="p">(</span><span class="nb">false</span><span class="p">)</span> <span class="c1">// Canceled.</span>
<span class="p">{}</span>
<span class="c1">// The awkward looking double lambda is necessary because the scheduled lambdas run in the container context</span>
<span class="c1">// and must arrange lambdas for send and close to happen in the connection context.</span>
<span class="kt">void</span> <span class="n">on_container_start</span><span class="p">(</span><span class="n">proton</span><span class="o">::</span><span class="n">container</span> <span class="o">&amp;</span><span class="n">c</span><span class="p">)</span> <span class="n">OVERRIDE</span> <span class="p">{</span>
<span class="n">c</span><span class="p">.</span><span class="n">open_sender</span><span class="p">(</span><span class="n">url</span><span class="p">);</span>
<span class="p">}</span>
<span class="kt">void</span> <span class="n">on_sender_open</span><span class="p">(</span><span class="n">proton</span><span class="o">::</span><span class="n">sender</span> <span class="o">&amp;</span><span class="n">s</span><span class="p">)</span> <span class="n">OVERRIDE</span> <span class="p">{</span>
<span class="n">sender</span> <span class="o">=</span> <span class="n">s</span><span class="p">;</span>
<span class="n">work_queue</span> <span class="o">=</span> <span class="o">&amp;</span><span class="n">s</span><span class="p">.</span><span class="n">work_queue</span><span class="p">();</span>
<span class="c1">// Call this-&gt;cancel after timeout.</span>
<span class="n">s</span><span class="p">.</span><span class="n">container</span><span class="p">().</span><span class="n">schedule</span><span class="p">(</span><span class="n">timeout</span><span class="p">,</span> <span class="p">[</span><span class="k">this</span><span class="p">]()</span> <span class="p">{</span> <span class="k">this</span><span class="o">-&gt;</span><span class="n">work_queue</span><span class="o">-&gt;</span><span class="n">add</span><span class="p">(</span> <span class="p">[</span><span class="k">this</span><span class="p">]()</span> <span class="p">{</span> <span class="k">this</span><span class="o">-&gt;</span><span class="n">cancel</span><span class="p">();</span> <span class="p">});</span> <span class="p">});</span>
<span class="c1">// Start regular ticks every interval.</span>
<span class="n">s</span><span class="p">.</span><span class="n">container</span><span class="p">().</span><span class="n">schedule</span><span class="p">(</span><span class="n">interval</span><span class="p">,</span> <span class="p">[</span><span class="k">this</span><span class="p">]()</span> <span class="p">{</span> <span class="k">this</span><span class="o">-&gt;</span><span class="n">work_queue</span><span class="o">-&gt;</span><span class="n">add</span><span class="p">(</span> <span class="p">[</span><span class="k">this</span><span class="p">]()</span> <span class="p">{</span> <span class="k">this</span><span class="o">-&gt;</span><span class="n">tick</span><span class="p">();</span> <span class="p">});</span> <span class="p">});</span>
<span class="p">}</span>
<span class="kt">void</span> <span class="n">cancel</span><span class="p">()</span> <span class="p">{</span>
<span class="n">canceled</span> <span class="o">=</span> <span class="nb">true</span><span class="p">;</span>
<span class="n">sender</span><span class="p">.</span><span class="n">connection</span><span class="p">().</span><span class="n">close</span><span class="p">();</span>
<span class="p">}</span>
<span class="kt">void</span> <span class="n">tick</span><span class="p">()</span> <span class="p">{</span>
<span class="c1">// Schedule the next tick unless we have been cancelled.</span>
<span class="k">if</span> <span class="p">(</span><span class="o">!</span><span class="n">canceled</span><span class="p">)</span>
<span class="n">sender</span><span class="p">.</span><span class="n">container</span><span class="p">().</span><span class="n">schedule</span><span class="p">(</span><span class="n">interval</span><span class="p">,</span> <span class="p">[</span><span class="k">this</span><span class="p">]()</span> <span class="p">{</span> <span class="k">this</span><span class="o">-&gt;</span><span class="n">work_queue</span><span class="o">-&gt;</span><span class="n">add</span><span class="p">(</span> <span class="p">[</span><span class="k">this</span><span class="p">]()</span> <span class="p">{</span> <span class="k">this</span><span class="o">-&gt;</span><span class="n">tick</span><span class="p">();</span> <span class="p">});</span> <span class="p">});</span>
<span class="k">if</span> <span class="p">(</span><span class="n">sender</span><span class="p">.</span><span class="n">credit</span><span class="p">()</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="p">)</span> <span class="c1">// Only send if we have credit</span>
<span class="n">send</span><span class="p">();</span>
<span class="k">else</span>
<span class="n">ready</span> <span class="o">=</span> <span class="nb">true</span><span class="p">;</span> <span class="c1">// Set the ready flag, send as soon as we get credit.</span>
<span class="p">}</span>
<span class="kt">void</span> <span class="n">on_sendable</span><span class="p">(</span><span class="n">proton</span><span class="o">::</span><span class="n">sender</span> <span class="o">&amp;</span><span class="p">)</span> <span class="n">OVERRIDE</span> <span class="p">{</span>
<span class="k">if</span> <span class="p">(</span><span class="n">ready</span><span class="p">)</span> <span class="c1">// We have been ticked since the last send.</span>
<span class="n">send</span><span class="p">();</span>
<span class="p">}</span>
<span class="kt">void</span> <span class="n">send</span><span class="p">()</span> <span class="p">{</span>
<span class="n">std</span><span class="o">::</span><span class="n">cout</span> <span class="o">&lt;&lt;</span> <span class="s">&quot;send&quot;</span> <span class="o">&lt;&lt;</span> <span class="n">std</span><span class="o">::</span><span class="n">endl</span><span class="p">;</span>
<span class="n">sender</span><span class="p">.</span><span class="n">send</span><span class="p">(</span><span class="n">proton</span><span class="o">::</span><span class="n">message</span><span class="p">(</span><span class="s">&quot;ping&quot;</span><span class="p">));</span>
<span class="n">ready</span> <span class="o">=</span> <span class="nb">false</span><span class="p">;</span>
<span class="p">}</span>
<span class="p">};</span>
<span class="kt">int</span> <span class="nf">main</span><span class="p">(</span><span class="kt">int</span> <span class="n">argc</span><span class="p">,</span> <span class="kt">char</span> <span class="o">**</span><span class="n">argv</span><span class="p">)</span> <span class="p">{</span>
<span class="n">std</span><span class="o">::</span><span class="n">string</span> <span class="n">address</span><span class="p">(</span><span class="s">&quot;127.0.0.1:5672/examples&quot;</span><span class="p">);</span>
<span class="kt">double</span> <span class="n">interval</span> <span class="o">=</span> <span class="mf">1.0</span><span class="p">;</span>
<span class="kt">double</span> <span class="n">timeout</span> <span class="o">=</span> <span class="mf">5.0</span><span class="p">;</span>
<span class="n">example</span><span class="o">::</span><span class="n">options</span> <span class="n">opts</span><span class="p">(</span><span class="n">argc</span><span class="p">,</span> <span class="n">argv</span><span class="p">);</span>
<span class="n">opts</span><span class="p">.</span><span class="n">add_value</span><span class="p">(</span><span class="n">address</span><span class="p">,</span> <span class="sc">&#39;a&#39;</span><span class="p">,</span> <span class="s">&quot;address&quot;</span><span class="p">,</span> <span class="s">&quot;connect and send to URL&quot;</span><span class="p">,</span> <span class="s">&quot;URL&quot;</span><span class="p">);</span>
<span class="n">opts</span><span class="p">.</span><span class="n">add_value</span><span class="p">(</span><span class="n">interval</span><span class="p">,</span> <span class="sc">&#39;i&#39;</span><span class="p">,</span> <span class="s">&quot;interval&quot;</span><span class="p">,</span> <span class="s">&quot;send a message every INTERVAL seconds&quot;</span><span class="p">,</span> <span class="s">&quot;INTERVAL&quot;</span><span class="p">);</span>
<span class="n">opts</span><span class="p">.</span><span class="n">add_value</span><span class="p">(</span><span class="n">timeout</span><span class="p">,</span> <span class="sc">&#39;t&#39;</span><span class="p">,</span> <span class="s">&quot;timeout&quot;</span><span class="p">,</span> <span class="s">&quot;stop after T seconds&quot;</span><span class="p">,</span> <span class="s">&quot;T&quot;</span><span class="p">);</span>
<span class="k">try</span> <span class="p">{</span>
<span class="n">opts</span><span class="p">.</span><span class="n">parse</span><span class="p">();</span>
<span class="n">scheduled_sender</span> <span class="n">h</span><span class="p">(</span><span class="n">address</span><span class="p">,</span> <span class="n">interval</span><span class="p">,</span> <span class="n">timeout</span><span class="p">);</span>
<span class="n">proton</span><span class="o">::</span><span class="n">container</span><span class="p">(</span><span class="n">h</span><span class="p">).</span><span class="n">run</span><span class="p">();</span>
<span class="k">return</span> <span class="mi">0</span><span class="p">;</span>
<span class="p">}</span> <span class="k">catch</span> <span class="p">(</span><span class="k">const</span> <span class="n">example</span><span class="o">::</span><span class="n">bad_option</span><span class="o">&amp;</span> <span class="n">e</span><span class="p">)</span> <span class="p">{</span>
<span class="n">std</span><span class="o">::</span><span class="n">cout</span> <span class="o">&lt;&lt;</span> <span class="n">opts</span> <span class="o">&lt;&lt;</span> <span class="n">std</span><span class="o">::</span><span class="n">endl</span> <span class="o">&lt;&lt;</span> <span class="n">e</span><span class="p">.</span><span class="n">what</span><span class="p">()</span> <span class="o">&lt;&lt;</span> <span class="n">std</span><span class="o">::</span><span class="n">endl</span><span class="p">;</span>
<span class="p">}</span> <span class="k">catch</span> <span class="p">(</span><span class="k">const</span> <span class="n">std</span><span class="o">::</span><span class="n">exception</span><span class="o">&amp;</span> <span class="n">e</span><span class="p">)</span> <span class="p">{</span>
<span class="n">std</span><span class="o">::</span><span class="n">cerr</span> <span class="o">&lt;&lt;</span> <span class="n">e</span><span class="p">.</span><span class="n">what</span><span class="p">()</span> <span class="o">&lt;&lt;</span> <span class="n">std</span><span class="o">::</span><span class="n">endl</span><span class="p">;</span>
<span class="p">}</span>
<span class="k">return</span> <span class="mi">1</span><span class="p">;</span>
<span class="p">}</span>
</pre></div>
<p><a href="scheduled_send.cpp">Download this file</a></p>