| <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> |
| <html xmlns="http://www.w3.org/1999/xhtml"> |
| <head> |
| <meta http-equiv="Content-Type" content="text/xhtml;charset=UTF-8"/> |
| <meta http-equiv="X-UA-Compatible" content="IE=9"/> |
| <meta name="generator" content="Doxygen 1.8.13"/> |
| <meta name="viewport" content="width=device-width, initial-scale=1"/> |
| <title>Qpid Proton C++ API: service_bus.cpp</title> |
| <link href="tabs.css" rel="stylesheet" type="text/css"/> |
| <script type="text/javascript" src="jquery.js"></script> |
| <script type="text/javascript" src="dynsections.js"></script> |
| <link href="navtree.css" rel="stylesheet" type="text/css"/> |
| <script type="text/javascript" src="resize.js"></script> |
| <script type="text/javascript" src="navtreedata.js"></script> |
| <script type="text/javascript" src="navtree.js"></script> |
| <script type="text/javascript"> |
| $(document).ready(initResizable); |
| </script> |
| <link href="search/search.css" rel="stylesheet" type="text/css"/> |
| <script type="text/javascript" src="search/searchdata.js"></script> |
| <script type="text/javascript" src="search/search.js"></script> |
| <script type="text/javascript"> |
| $(document).ready(function() { init_search(); }); |
| </script> |
| <link href="doxygen.css" rel="stylesheet" type="text/css" /> |
| </head> |
| <body> |
| <div id="top"><!-- do not remove this div, it is closed by doxygen! --> |
| <div id="titlearea"> |
| <table cellspacing="0" cellpadding="0"> |
| <tbody> |
| <tr style="height: 56px;"> |
| <td id="projectalign" style="padding-left: 0.5em;"> |
| <div id="projectname">Qpid Proton C++ API |
|  <span id="projectnumber">0.21.0</span> |
| </div> |
| </td> |
| <td> <div id="MSearchBox" class="MSearchBoxInactive"> |
| <span class="left"> |
| <img id="MSearchSelect" src="search/mag_sel.png" |
| onmouseover="return searchBox.OnSearchSelectShow()" |
| onmouseout="return searchBox.OnSearchSelectHide()" |
| alt=""/> |
| <input type="text" id="MSearchField" value="Search" accesskey="S" |
| onfocus="searchBox.OnSearchFieldFocus(true)" |
| onblur="searchBox.OnSearchFieldFocus(false)" |
| onkeyup="searchBox.OnSearchFieldChange(event)"/> |
| </span><span class="right"> |
| <a id="MSearchClose" href="javascript:searchBox.CloseResultsWindow()"><img id="MSearchCloseImg" border="0" src="search/close.png" alt=""/></a> |
| </span> |
| </div> |
| </td> |
| </tr> |
| </tbody> |
| </table> |
| </div> |
| <!-- end header part --> |
| <!-- Generated by Doxygen 1.8.13 --> |
| <script type="text/javascript"> |
| var searchBox = new SearchBox("searchBox", "search",false,'Search'); |
| </script> |
| </div><!-- top --> |
| <div id="side-nav" class="ui-resizable side-nav-resizable"> |
| <div id="nav-tree"> |
| <div id="nav-tree-contents"> |
| <div id="nav-sync" class="sync"></div> |
| </div> |
| </div> |
| <div id="splitbar" style="-moz-user-select:none;" |
| class="ui-resizable-handle"> |
| </div> |
| </div> |
| <script type="text/javascript"> |
| $(document).ready(function(){initNavTree('service_bus_8cpp-example.html','');}); |
| </script> |
| <div id="doc-content"> |
| <!-- window showing the filter options --> |
| <div id="MSearchSelectWindow" |
| onmouseover="return searchBox.OnSearchSelectShow()" |
| onmouseout="return searchBox.OnSearchSelectHide()" |
| onkeydown="return searchBox.OnSearchSelectKey(event)"> |
| </div> |
| |
| <!-- iframe showing the search results (closed by default) --> |
| <div id="MSearchResultsWindow"> |
| <iframe src="javascript:void(0)" frameborder="0" |
| name="MSearchResults" id="MSearchResults"> |
| </iframe> |
| </div> |
| |
| <div class="header"> |
| <div class="headertitle"> |
| <div class="title">service_bus.cpp</div> </div> |
| </div><!--header--> |
| <div class="contents"> |
| <p>A working example for accessing Service Bus session-enabled queues.Also provides some general notes on Service Bus usage.</p> |
| <div class="fragment"><div class="line"><span class="comment">/*</span></div><div class="line"><span class="comment"> *</span></div><div class="line"><span class="comment"> * Licensed to the Apache Software Foundation (ASF) under one</span></div><div class="line"><span class="comment"> * or more contributor license agreements. See the NOTICE file</span></div><div class="line"><span class="comment"> * distributed with this work for additional information</span></div><div class="line"><span class="comment"> * regarding copyright ownership. The ASF licenses this file</span></div><div class="line"><span class="comment"> * to you under the Apache License, Version 2.0 (the</span></div><div class="line"><span class="comment"> * "License"); you may not use this file except in compliance</span></div><div class="line"><span class="comment"> * with the License. You may obtain a copy of the License at</span></div><div class="line"><span class="comment"> *</span></div><div class="line"><span class="comment"> * http://www.apache.org/licenses/LICENSE-2.0</span></div><div class="line"><span class="comment"> *</span></div><div class="line"><span class="comment"> * Unless required by applicable law or agreed to in writing,</span></div><div class="line"><span class="comment"> * software distributed under the License is distributed on an</span></div><div class="line"><span class="comment"> * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span></div><div class="line"><span class="comment"> * KIND, either express or implied. See the License for the</span></div><div class="line"><span class="comment"> * specific language governing permissions and limitations</span></div><div class="line"><span class="comment"> * under the License.</span></div><div class="line"><span class="comment"> *</span></div><div class="line"><span class="comment"> */</span></div><div class="line"></div><div class="line"><span class="comment">/*</span></div><div class="line"><span class="comment"> * Service Bus example.</span></div><div class="line"><span class="comment"> *</span></div><div class="line"><span class="comment"> * This is an example of using "Service Bus sessions" (not the same thing as an</span></div><div class="line"><span class="comment"> * AMQP session) to selectively retrieve messages from a queue. The queue must</span></div><div class="line"><span class="comment"> * be configured within Service Bus to support sessions. Service Bus uses the</span></div><div class="line"><span class="comment"> * AMQP group_id message property to associate messages with a particular</span></div><div class="line"><span class="comment"> * Service Bus session. It uses AMQP filters to specify which session is</span></div><div class="line"><span class="comment"> * associated with a receiver.</span></div><div class="line"><span class="comment"> *</span></div><div class="line"><span class="comment"> * The mechanics for sending and receiving to other types of service bus queue</span></div><div class="line"><span class="comment"> * are broadly the same, as long as the step using the</span></div><div class="line"><span class="comment"> * receiver.source().filters() is omitted.</span></div><div class="line"><span class="comment"> *</span></div><div class="line"><span class="comment"> * Other Service Bus notes: There is no drain support, hence the need to to use</span></div><div class="line"><span class="comment"> * timeouts in this example to detect the end of the message stream. There is</span></div><div class="line"><span class="comment"> * no browse support when setting the AMQP link distribution mode to COPY.</span></div><div class="line"><span class="comment"> * Service Bus claims to support browsing, but it is unclear how to manage that</span></div><div class="line"><span class="comment"> * with an AMQP client. Maximum message sizes (for body and headers) vary</span></div><div class="line"><span class="comment"> * between queue types and fee tier ranging from 64KB to 1MB. Due to the</span></div><div class="line"><span class="comment"> * distributed nature of Service Bus, queues do not automatically preserve FIFO</span></div><div class="line"><span class="comment"> * order of messages unless the user takes steps to force the message stream to</span></div><div class="line"><span class="comment"> * a single partition of the queue or creates the queue with partitioning disabled.</span></div><div class="line"><span class="comment"> *</span></div><div class="line"><span class="comment"> * This example shows use of the simpler SAS (Shared Access Signature)</span></div><div class="line"><span class="comment"> * authentication scheme where the credentials are supplied on the connection.</span></div><div class="line"><span class="comment"> * Service Bus does not actually check these credentials when setting up the</span></div><div class="line"><span class="comment"> * connection, it merely caches the SAS key and policy (AKA key name) for later</span></div><div class="line"><span class="comment"> * access authorization when creating senders and receivers. There is a second</span></div><div class="line"><span class="comment"> * authentication scheme that allows for multiple tokens and even updating them</span></div><div class="line"><span class="comment"> * within a long-lived connection which uses special management request-response</span></div><div class="line"><span class="comment"> * queues in Service Bus. The format of this exchange may be documented</span></div><div class="line"><span class="comment"> * somewhere but is also available by working through the CbsAsyncExample.cs</span></div><div class="line"><span class="comment"> * program in the Amqp.Net Lite project.</span></div><div class="line"><span class="comment"> *</span></div><div class="line"><span class="comment"> * The sample output for this program is:</span></div><div class="line"><span class="comment"></span></div><div class="line"><span class="comment"> sent message: message 0 in service bus session "red"</span></div><div class="line"><span class="comment"> sent message: message 1 in service bus session "green"</span></div><div class="line"><span class="comment"> sent message: message 2 in service bus session "blue"</span></div><div class="line"><span class="comment"> sent message: message 3 in service bus session "red"</span></div><div class="line"><span class="comment"> sent message: message 4 in service bus session "black"</span></div><div class="line"><span class="comment"> sent message: message 5 in service bus session "blue"</span></div><div class="line"><span class="comment"> sent message: message 6 in service bus session "yellow"</span></div><div class="line"><span class="comment">receiving messages with session identifier "green" from queue ses_q1</span></div><div class="line"><span class="comment"> received message: message 1 in service bus session "green"</span></div><div class="line"><span class="comment">receiving messages with session identifier "red" from queue ses_q1</span></div><div class="line"><span class="comment"> received message: message 0 in service bus session "red"</span></div><div class="line"><span class="comment"> received message: message 3 in service bus session "red"</span></div><div class="line"><span class="comment">receiving messages with session identifier "blue" from queue ses_q1</span></div><div class="line"><span class="comment"> received message: message 2 in service bus session "blue"</span></div><div class="line"><span class="comment"> received message: message 5 in service bus session "blue"</span></div><div class="line"><span class="comment">receiving messages with session identifier "black" from queue ses_q1</span></div><div class="line"><span class="comment"> received message: message 4 in service bus session "black"</span></div><div class="line"><span class="comment">receiving messages with session identifier "yellow" from queue ses_q1</span></div><div class="line"><span class="comment"> received message: message 6 in service bus session "yellow"</span></div><div class="line"><span class="comment">Done. No more messages.</span></div><div class="line"><span class="comment"></span></div><div class="line"><span class="comment"> *</span></div><div class="line"><span class="comment"> */</span></div><div class="line"></div><div class="line"><span class="preprocessor">#include "options.hpp"</span></div><div class="line"></div><div class="line"><span class="preprocessor">#include <<a class="code" href="connection_8hpp.html">proton/connection.hpp</a>></span></div><div class="line"><span class="preprocessor">#include <<a class="code" href="connection__options_8hpp.html">proton/connection_options.hpp</a>></span></div><div class="line"><span class="preprocessor">#include <<a class="code" href="container_8hpp.html">proton/container.hpp</a>></span></div><div class="line"><span class="preprocessor">#include <<a class="code" href="delivery_8hpp.html">proton/delivery.hpp</a>></span></div><div class="line"><span class="preprocessor">#include <<a class="code" href="message_8hpp.html">proton/message.hpp</a>></span></div><div class="line"><span class="preprocessor">#include <<a class="code" href="messaging__handler_8hpp.html">proton/messaging_handler.hpp</a>></span></div><div class="line"><span class="preprocessor">#include <<a class="code" href="receiver__options_8hpp.html">proton/receiver_options.hpp</a>></span></div><div class="line"><span class="preprocessor">#include <<a class="code" href="sender_8hpp.html">proton/sender.hpp</a>></span></div><div class="line"><span class="preprocessor">#include <<a class="code" href="sender__options_8hpp.html">proton/sender_options.hpp</a>></span></div><div class="line"><span class="preprocessor">#include <<a class="code" href="source__options_8hpp.html">proton/source_options.hpp</a>></span></div><div class="line"><span class="preprocessor">#include <<a class="code" href="tracker_8hpp.html">proton/tracker.hpp</a>></span></div><div class="line"><span class="preprocessor">#include <<a class="code" href="work__queue_8hpp.html">proton/work_queue.hpp</a>></span></div><div class="line"></div><div class="line"><span class="preprocessor">#include <iostream></span></div><div class="line"><span class="preprocessor">#include <sstream></span></div><div class="line"></div><div class="line"><span class="preprocessor">#include "fake_cpp11.hpp"</span></div><div class="line"></div><div class="line"><span class="keyword">using</span> <a name="_a0"></a><a class="code" href="classproton_1_1source__options.html">proton::source_options</a>;</div><div class="line"><span class="keyword">using</span> <a name="_a1"></a><a class="code" href="classproton_1_1connection__options.html">proton::connection_options</a>;</div><div class="line"><span class="keyword">using</span> <a name="_a2"></a><a class="code" href="classproton_1_1sender__options.html">proton::sender_options</a>;</div><div class="line"><span class="keyword">using</span> <a name="_a3"></a><a class="code" href="classproton_1_1receiver__options.html">proton::receiver_options</a>;</div><div class="line"></div><div class="line"><span class="keywordtype">void</span> do_next_sequence();</div><div class="line"></div><div class="line"><span class="keyword">namespace </span>{</div><div class="line"><span class="keywordtype">void</span> check_arg(<span class="keyword">const</span> std::string &value, <span class="keyword">const</span> std::string &name) {</div><div class="line"> <span class="keywordflow">if</span> (value.empty())</div><div class="line"> <span class="keywordflow">throw</span> std::runtime_error(<span class="stringliteral">"missing argument for \""</span> + name + <span class="stringliteral">"\""</span>);</div><div class="line">}</div><div class="line">}</div><div class="line"></div><div class="line"><span class="keyword">class </span>session_receiver : <span class="keyword">public</span> <a name="_a4"></a><a class="code" href="classproton_1_1messaging__handler.html">proton::messaging_handler</a> {</div><div class="line"> <span class="keyword">private</span>:</div><div class="line"> <span class="keyword">const</span> std::string &connection_url;</div><div class="line"> <span class="keyword">const</span> std::string &entity;</div><div class="line"> <a name="_a5"></a><a class="code" href="classproton_1_1value.html">proton::value</a> session_identifier; <span class="comment">// AMQP null type by default, matches any Service Bus sequence identifier</span></div><div class="line"> <span class="keywordtype">int</span> message_count;</div><div class="line"> <span class="keywordtype">bool</span> closed;</div><div class="line"> <a name="_a6"></a><a class="code" href="classproton_1_1duration.html">proton::duration</a> read_timeout;</div><div class="line"> <a name="_a7"></a><a class="code" href="classproton_1_1timestamp.html">proton::timestamp</a> last_read;</div><div class="line"> <a name="_a8"></a><a class="code" href="classproton_1_1container.html">proton::container</a> *container;</div><div class="line"> <a name="_a9"></a><a class="code" href="classproton_1_1receiver.html">proton::receiver</a> receiver;</div><div class="line"></div><div class="line"> <span class="keyword">public</span>:</div><div class="line"> session_receiver(<span class="keyword">const</span> std::string &c, <span class="keyword">const</span> std::string &e,</div><div class="line"> <span class="keyword">const</span> <span class="keywordtype">char</span> *sid) : connection_url(c), entity(e), message_count(0), closed(false), read_timeout(5000), last_read(0), container(0) {</div><div class="line"> <span class="keywordflow">if</span> (sid)</div><div class="line"> session_identifier = std::string(sid);</div><div class="line"> <span class="comment">// session_identifier is now either empty/null or an AMQP string type.</span></div><div class="line"> <span class="comment">// If null, Service Bus will pick the first available message and create</span></div><div class="line"> <span class="comment">// a filter at its end with that message's session identifier.</span></div><div class="line"> <span class="comment">// Technically, an AMQP string is not a valid filter-set value unless it</span></div><div class="line"> <span class="comment">// is annotated as an AMQP described type, so this may change.</span></div><div class="line"></div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keywordtype">void</span> run (<a class="code" href="classproton_1_1container.html">proton::container</a> &c) {</div><div class="line"> message_count = 0;</div><div class="line"> closed = <span class="keyword">false</span>;</div><div class="line"> c.<a name="a10"></a><a class="code" href="classproton_1_1container.html#adbd9ed231804512a47cca3c81f00cdf1">connect</a>(connection_url, connection_options().handler(*<span class="keyword">this</span>));</div><div class="line"> container = &c;</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keywordtype">void</span> <a name="a11"></a><a class="code" href="classproton_1_1messaging__handler.html#a41277abe0e33a3df2764b08dcc12d768">on_connection_open</a>(<a name="_a12"></a><a class="code" href="classproton_1_1connection.html">proton::connection</a> &connection) OVERRIDE {</div><div class="line"> <a name="_a13"></a><a class="code" href="classproton_1_1map.html">proton::source::filter_map</a> sb_filter_map;</div><div class="line"> <a name="_a14"></a><a class="code" href="classproton_1_1symbol.html">proton::symbol</a> key(<span class="stringliteral">"com.microsoft:session-filter"</span>);</div><div class="line"> sb_filter_map.<a name="a15"></a><a class="code" href="classproton_1_1map.html#ac310ae7d64b7ad8a70b200c07a55a736">put</a>(key, session_identifier);</div><div class="line"> receiver = connection.open_receiver(entity, receiver_options().source(source_options().filters(sb_filter_map)));</div><div class="line"></div><div class="line"> <span class="comment">// Start timeout processing here. If Service Bus has no pending</span></div><div class="line"> <span class="comment">// messages, it may defer completing the receiver open until a message</span></div><div class="line"> <span class="comment">// becomes available (e.g. to be able to set the actual session</span></div><div class="line"> <span class="comment">// identifier if none was specified).</span></div><div class="line"> last_read = <a name="a16"></a><a class="code" href="classproton_1_1timestamp.html#a8a432817c74685a518a08ede48d1db34">proton::timestamp::now</a>();</div><div class="line"> <span class="comment">// Call this->process_timeout after read_timeout.</span></div><div class="line"> container-><a name="a17"></a><a class="code" href="classproton_1_1container.html#aa99ede2051ccdf5fe8257d893559ea26">schedule</a>(read_timeout, [<span class="keyword">this</span>]() { this->process_timeout(); });</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keywordtype">void</span> <a name="a18"></a><a class="code" href="classproton_1_1messaging__handler.html#a77f7e38659ee43ccb764e417ad6dd401">on_receiver_open</a>(<a class="code" href="classproton_1_1receiver.html">proton::receiver</a> &r) OVERRIDE {</div><div class="line"> <span class="keywordflow">if</span> (closed) <span class="keywordflow">return</span>; <span class="comment">// PROTON-1264</span></div><div class="line"> <a class="code" href="classproton_1_1value.html">proton::value</a> actual_session_id = r.source().filters().<a name="a19"></a><a class="code" href="classproton_1_1value.html#a051c12c4c7efc82a5f268d4f64e15b54">get</a>(<span class="stringliteral">"com.microsoft:session-filter"</span>);</div><div class="line"> std::cout << <span class="stringliteral">"receiving messages with session identifier \""</span> << actual_session_id</div><div class="line"> << <span class="stringliteral">"\" from queue "</span> << entity << std::endl;</div><div class="line"> last_read = <a class="code" href="classproton_1_1timestamp.html#a8a432817c74685a518a08ede48d1db34">proton::timestamp::now</a>();</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keywordtype">void</span> <a name="a20"></a><a class="code" href="classproton_1_1messaging__handler.html#a584c9daeadf4322801f58e054017fecb">on_message</a>(<a name="_a21"></a><a class="code" href="classproton_1_1delivery.html">proton::delivery</a> &, <a name="_a22"></a><a class="code" href="classproton_1_1message.html">proton::message</a> &m) OVERRIDE {</div><div class="line"> message_count++;</div><div class="line"> std::cout << <span class="stringliteral">" received message: "</span> << m.body() << std::endl;</div><div class="line"> last_read = <a class="code" href="classproton_1_1timestamp.html#a8a432817c74685a518a08ede48d1db34">proton::timestamp::now</a>();</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keywordtype">void</span> process_timeout() {</div><div class="line"> <a class="code" href="classproton_1_1timestamp.html">proton::timestamp</a> deadline = last_read + read_timeout;</div><div class="line"> <a class="code" href="classproton_1_1timestamp.html">proton::timestamp</a> now = <a class="code" href="classproton_1_1timestamp.html#a8a432817c74685a518a08ede48d1db34">proton::timestamp::now</a>();</div><div class="line"> <span class="keywordflow">if</span> (now >= deadline) {</div><div class="line"> receiver.<a name="a23"></a><a class="code" href="classproton_1_1link.html#a5ae591df94fc66ccb85cbb6565368bca">close</a>();</div><div class="line"> closed = <span class="keyword">true</span>;</div><div class="line"> receiver.<a name="a24"></a><a class="code" href="classproton_1_1link.html#aff302bb6016f2ae29f01bb4e07389a52">connection</a>().<a name="a25"></a><a class="code" href="classproton_1_1connection.html#a5ae591df94fc66ccb85cbb6565368bca">close</a>();</div><div class="line"> <span class="keywordflow">if</span> (message_count)</div><div class="line"> do_next_sequence();</div><div class="line"> <span class="keywordflow">else</span></div><div class="line"> std::cout << <span class="stringliteral">"Done. No more messages."</span> << std::endl;</div><div class="line"> } <span class="keywordflow">else</span> {</div><div class="line"> <a class="code" href="classproton_1_1duration.html">proton::duration</a> next = deadline - now;</div><div class="line"> container-><a class="code" href="classproton_1_1container.html#aa99ede2051ccdf5fe8257d893559ea26">schedule</a>(next, [<span class="keyword">this</span>]() { this->process_timeout(); });</div><div class="line"> }</div><div class="line"> }</div><div class="line">};</div><div class="line"></div><div class="line"></div><div class="line"><span class="keyword">class </span>session_sender : <span class="keyword">public</span> <a class="code" href="classproton_1_1messaging__handler.html">proton::messaging_handler</a> {</div><div class="line"> <span class="keyword">private</span>:</div><div class="line"> <span class="keyword">const</span> std::string &connection_url;</div><div class="line"> <span class="keyword">const</span> std::string &entity;</div><div class="line"> <span class="keywordtype">int</span> msg_count;</div><div class="line"> <span class="keywordtype">int</span> total;</div><div class="line"> <span class="keywordtype">int</span> accepts;</div><div class="line"></div><div class="line"> <span class="keyword">public</span>:</div><div class="line"> session_sender(<span class="keyword">const</span> std::string &c, <span class="keyword">const</span> std::string &e) : connection_url(c), entity(e),</div><div class="line"> msg_count(0), total(7), accepts(0) {}</div><div class="line"></div><div class="line"> <span class="keywordtype">void</span> run(<a class="code" href="classproton_1_1container.html">proton::container</a> &c) {</div><div class="line"> c.<a name="a26"></a><a class="code" href="classproton_1_1container.html#adfbfd13668611a525bb44328d7a3b1e8">open_sender</a>(connection_url + <span class="stringliteral">"/"</span> + entity, sender_options(), connection_options().handler(*<span class="keyword">this</span>));</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keywordtype">void</span> send_remaining_messages(<a name="_a27"></a><a class="code" href="classproton_1_1sender.html">proton::sender</a> &s) {</div><div class="line"> std::string gid;</div><div class="line"> <span class="keywordflow">for</span> (; msg_count < total && s.<a name="a28"></a><a class="code" href="classproton_1_1link.html#afd27bd11ba72d7df51c44f71b15749eb">credit</a>() > 0; msg_count++) {</div><div class="line"> <span class="keywordflow">switch</span> (msg_count) {</div><div class="line"> <span class="keywordflow">case</span> 0: gid = <span class="stringliteral">"red"</span>; <span class="keywordflow">break</span>;</div><div class="line"> <span class="keywordflow">case</span> 1: gid = <span class="stringliteral">"green"</span>; <span class="keywordflow">break</span>;</div><div class="line"> <span class="keywordflow">case</span> 2: gid = <span class="stringliteral">"blue"</span>; <span class="keywordflow">break</span>;</div><div class="line"> <span class="keywordflow">case</span> 3: gid = <span class="stringliteral">"red"</span>; <span class="keywordflow">break</span>;</div><div class="line"> <span class="keywordflow">case</span> 4: gid = <span class="stringliteral">"black"</span>; <span class="keywordflow">break</span>;</div><div class="line"> <span class="keywordflow">case</span> 5: gid = <span class="stringliteral">"blue"</span>; <span class="keywordflow">break</span>;</div><div class="line"> <span class="keywordflow">case</span> 6: gid = <span class="stringliteral">"yellow"</span>; <span class="keywordflow">break</span>;</div><div class="line"> }</div><div class="line"></div><div class="line"> std::ostringstream mbody;</div><div class="line"> mbody << <span class="stringliteral">"message "</span> << msg_count << <span class="stringliteral">" in service bus session \""</span> << gid << <span class="stringliteral">"\""</span>;</div><div class="line"> <a class="code" href="classproton_1_1message.html">proton::message</a> m(mbody.str());</div><div class="line"> m.<a name="a29"></a><a class="code" href="classproton_1_1message.html#ac203a412f35c34fce9de3016a9d57fb4">group_id</a>(gid); <span class="comment">// Service Bus uses the group_id property to as the session identifier.</span></div><div class="line"> s.<a name="a30"></a><a class="code" href="classproton_1_1sender.html#a214eb30b24e6831d016a47b9dddda830">send</a>(m);</div><div class="line"> std::cout << <span class="stringliteral">" sent message: "</span> << m.body() << std::endl;</div><div class="line"> }</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keywordtype">void</span> on_sendable(<a class="code" href="classproton_1_1sender.html">proton::sender</a> &s) OVERRIDE {</div><div class="line"> send_remaining_messages(s);</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keywordtype">void</span> on_tracker_accept(<a name="_a31"></a><a class="code" href="classproton_1_1tracker.html">proton::tracker</a> &t) OVERRIDE {</div><div class="line"> accepts++;</div><div class="line"> <span class="keywordflow">if</span> (accepts == total) {</div><div class="line"> <span class="comment">// upload complete</span></div><div class="line"> t.sender().close();</div><div class="line"> t.sender().connection().close();</div><div class="line"> do_next_sequence();</div><div class="line"> }</div><div class="line"> }</div><div class="line">};</div><div class="line"></div><div class="line"></div><div class="line"><span class="keyword">class </span>sequence : <span class="keyword">public</span> <a class="code" href="classproton_1_1messaging__handler.html">proton::messaging_handler</a> {</div><div class="line"> <span class="keyword">private</span>:</div><div class="line"> <a class="code" href="classproton_1_1container.html">proton::container</a> *container;</div><div class="line"> <span class="keywordtype">int</span> sequence_no;</div><div class="line"> session_sender snd;</div><div class="line"> session_receiver rcv_red, rcv_green, rcv_null;</div><div class="line"></div><div class="line"> <span class="keyword">public</span>:</div><div class="line"> <span class="keyword">static</span> sequence *the_sequence;</div><div class="line"></div><div class="line"> sequence (<span class="keyword">const</span> std::string &c, <span class="keyword">const</span> std::string &e) :</div><div class="line"> container(0), sequence_no(0),</div><div class="line"> snd(c, e), rcv_red(c, e, <span class="stringliteral">"red"</span>), rcv_green(c, e, <span class="stringliteral">"green"</span>), rcv_null(c, e, NULL) {</div><div class="line"> the_sequence = <span class="keyword">this</span>;</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keywordtype">void</span> on_container_start(<a class="code" href="classproton_1_1container.html">proton::container</a> &c) OVERRIDE {</div><div class="line"> container = &c;</div><div class="line"> next_sequence();</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keywordtype">void</span> next_sequence() {</div><div class="line"> <span class="keywordflow">switch</span> (sequence_no++) {</div><div class="line"> <span class="comment">// run these in order exactly once</span></div><div class="line"> <span class="keywordflow">case</span> 0: snd.run(*container); <span class="keywordflow">break</span>;</div><div class="line"> <span class="keywordflow">case</span> 1: rcv_green.run(*container); <span class="keywordflow">break</span>;</div><div class="line"> <span class="keywordflow">case</span> 2: rcv_red.run(*container); <span class="keywordflow">break</span>;</div><div class="line"> <span class="comment">// Run this until the receiver decides there is no messages left to sequence through</span></div><div class="line"> <span class="keywordflow">default</span>: rcv_null.run(*container); <span class="keywordflow">break</span>;</div><div class="line"> }</div><div class="line"> }</div><div class="line">};</div><div class="line"></div><div class="line">sequence *sequence::the_sequence = NULL;</div><div class="line"></div><div class="line"><span class="keywordtype">void</span> do_next_sequence() { sequence::the_sequence->next_sequence(); }</div><div class="line"></div><div class="line"></div><div class="line"><span class="keywordtype">int</span> main(<span class="keywordtype">int</span> argc, <span class="keywordtype">char</span> **argv) {</div><div class="line"> std::string sb_namespace; <span class="comment">// i.e. "foo.servicebus.windows.net"</span></div><div class="line"> <span class="comment">// Make sure the next two are urlencoded for Proton</span></div><div class="line"> std::string sb_key_name; <span class="comment">// shared access key name for entity (AKA "Policy Name")</span></div><div class="line"> std::string sb_key; <span class="comment">// shared access key</span></div><div class="line"> std::string sb_entity; <span class="comment">// AKA the service bus queue. Must enable</span></div><div class="line"> <span class="comment">// sessions on it for this example.</span></div><div class="line"></div><div class="line"> example::options opts(argc, argv);</div><div class="line"> opts.add_value(sb_namespace, <span class="charliteral">'n'</span>, <span class="stringliteral">"namespace"</span>, <span class="stringliteral">"Service Bus full namespace"</span>, <span class="stringliteral">"NAMESPACE"</span>);</div><div class="line"> opts.add_value(sb_key_name, <span class="charliteral">'p'</span>, <span class="stringliteral">"policy"</span>, <span class="stringliteral">"policy name that specifies access rights (key name)"</span>, <span class="stringliteral">"POLICY"</span>);</div><div class="line"> opts.add_value(sb_key, <span class="charliteral">'k'</span>, <span class="stringliteral">"key"</span>, <span class="stringliteral">"secret key for the policy"</span>, <span class="stringliteral">"key"</span>);</div><div class="line"> opts.add_value(sb_entity, <span class="charliteral">'e'</span>, <span class="stringliteral">"entity"</span>, <span class="stringliteral">"entity path (queue name)"</span>, <span class="stringliteral">"ENTITY"</span>);</div><div class="line"></div><div class="line"> <span class="keywordflow">try</span> {</div><div class="line"> opts.parse();</div><div class="line"> check_arg(sb_namespace, <span class="stringliteral">"namespace"</span>);</div><div class="line"> check_arg(sb_key_name, <span class="stringliteral">"policy"</span>);</div><div class="line"> check_arg(sb_key, <span class="stringliteral">"key"</span>);</div><div class="line"> check_arg(sb_entity, <span class="stringliteral">"entity"</span>);</div><div class="line"> std::string connection_string(<span class="stringliteral">"amqps://"</span> + sb_key_name + <span class="stringliteral">":"</span> + sb_key + <span class="stringliteral">"@"</span> + sb_namespace);</div><div class="line"></div><div class="line"> sequence seq(connection_string, sb_entity);</div><div class="line"> <a class="code" href="classproton_1_1container.html">proton::container</a>(seq).<a name="a32"></a><a class="code" href="classproton_1_1container.html#a13a43e6d814de94978c515cb084873b1">run</a>();</div><div class="line"> <span class="keywordflow">return</span> 0;</div><div class="line"> } <span class="keywordflow">catch</span> (<span class="keyword">const</span> std::exception& e) {</div><div class="line"> std::cerr << e.what() << std::endl;</div><div class="line"> }</div><div class="line"></div><div class="line"> <span class="keywordflow">return</span> 1;</div><div class="line">}</div></div><!-- fragment --> </div><!-- contents --> |
| </div><!-- doc-content --> |
| <!-- start footer part --> |
| <div id="nav-path" class="navpath"><!-- id is needed for treeview function! --> |
| <ul> |
| <li class="footer">Generated by |
| <a href="http://www.doxygen.org/index.html"> |
| <img class="footer" src="doxygen.png" alt="doxygen"/></a> 1.8.13 </li> |
| </ul> |
| </div> |
| </body> |
| </html> |