blob: f10bfa7b2c823ed5f4b48bc1d1b93526b247de6a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
/*
* Service Bus example.
*
* This is an example of using "Service Bus sessions" (not the same thing as an
* AMQP session) to selectively retrieve messages from a queue. The queue must
* be configured within Service Bus to support sessions. Service Bus uses the
* AMQP group_id message property to associate messages with a particular
* Service Bus session. It uses AMQP filters to specify which session is
* associated with a receiver.
*
* The mechanics for sending and receiving to other types of service bus queue
* are broadly the same, as long as the step using the
* receiver.source().filters() is omitted.
*
* Other Service Bus notes: There is no drain support, hence the need to to use
* timeouts in this example to detect the end of the message stream. There is
* no browse support when setting the AMQP link distribution mode to COPY.
* Service Bus claims to support browsing, but it is unclear how to manage that
* with an AMQP client. Maximum message sizes (for body and headers) vary
* between queue types and fee tier ranging from 64KB to 1MB. Due to the
* distributed nature of Service Bus, queues do not automatically preserve FIFO
* order of messages unless the user takes steps to force the message stream to
* a single partition of the queue or creates the queue with partitioning disabled.
*
* This example shows use of the simpler SAS (Shared Access Signature)
* authentication scheme where the credentials are supplied on the connection.
* Service Bus does not actually check these credentials when setting up the
* connection, it merely caches the SAS key and policy (AKA key name) for later
* access authorization when creating senders and receivers. There is a second
* authentication scheme that allows for multiple tokens and even updating them
* within a long-lived connection which uses special management request-response
* queues in Service Bus. The format of this exchange may be documented
* somewhere but is also available by working through the CbsAsyncExample.cs
* program in the Amqp.Net Lite project.
*
* The sample output for this program is:
sent message: message 0 in service bus session "red"
sent message: message 1 in service bus session "green"
sent message: message 2 in service bus session "blue"
sent message: message 3 in service bus session "red"
sent message: message 4 in service bus session "black"
sent message: message 5 in service bus session "blue"
sent message: message 6 in service bus session "yellow"
receiving messages with session identifier "green" from queue ses_q1
received message: message 1 in service bus session "green"
receiving messages with session identifier "red" from queue ses_q1
received message: message 0 in service bus session "red"
received message: message 3 in service bus session "red"
receiving messages with session identifier "blue" from queue ses_q1
received message: message 2 in service bus session "blue"
received message: message 5 in service bus session "blue"
receiving messages with session identifier "black" from queue ses_q1
received message: message 4 in service bus session "black"
receiving messages with session identifier "yellow" from queue ses_q1
received message: message 6 in service bus session "yellow"
Done. No more messages.
*
*/
#include "options.hpp"
#include <proton/connection.hpp>
#include <proton/connection_options.hpp>
#include <proton/container.hpp>
#include <proton/delivery.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/receiver_options.hpp>
#include <proton/ssl.hpp>
#include <proton/sender.hpp>
#include <proton/sender_options.hpp>
#include <proton/source_options.hpp>
#include <proton/tracker.hpp>
#include <proton/work_queue.hpp>
#include <iostream>
#include <sstream>
#include "fake_cpp11.hpp"
using proton::source_options;
using proton::connection_options;
using proton::sender_options;
using proton::receiver_options;
using proton::ssl_client_options;
void do_next_sequence();
namespace {
void check_arg(const std::string &value, const std::string &name) {
if (value.empty())
throw std::runtime_error("missing argument for \"" + name + "\"");
}
}
/// Connect to Service Bus queue and retrieve messages in a particular session.
class session_receiver : public proton::messaging_handler {
private:
const std::string &connection_url;
const std::string &entity;
connection_options coptions;
proton::value session_identifier; // AMQP null type by default, matches any Service Bus sequence identifier
int message_count;
bool closed;
proton::duration read_timeout;
proton::timestamp last_read;
proton::container *container;
proton::receiver receiver;
public:
session_receiver(const std::string &c, const std::string &e, const connection_options &co,
const char *sid) :
connection_url(c), entity(e), coptions(co),
message_count(0), closed(false), read_timeout(5000), last_read(0), container(0) {
if (sid)
session_identifier = std::string(sid);
// session_identifier is now either empty/null or an AMQP string type.
// If null, Service Bus will pick the first available message and create
// a filter at its end with that message's session identifier.
// Technically, an AMQP string is not a valid filter-set value unless it
// is annotated as an AMQP described type, so this may change.
}
void run (proton::container &c) {
message_count = 0;
closed = false;
c.connect(connection_url, coptions.handler(*this));
container = &c;
}
void on_connection_open(proton::connection &connection) OVERRIDE {
proton::source::filter_map sb_filter_map;
proton::symbol key("com.microsoft:session-filter");
sb_filter_map.put(key, session_identifier);
receiver = connection.open_receiver(entity, receiver_options().source(source_options().filters(sb_filter_map)));
// Start timeout processing here. If Service Bus has no pending
// messages, it may defer completing the receiver open until a message
// becomes available (e.g. to be able to set the actual session
// identifier if none was specified).
last_read = proton::timestamp::now();
// Call this->process_timeout after read_timeout.
connection.work_queue().schedule(read_timeout, [this]() { this->process_timeout(); });
}
void on_receiver_open(proton::receiver &r) OVERRIDE {
if (closed) return; // PROTON-1264
proton::value actual_session_id = r.source().filters().get("com.microsoft:session-filter");
std::cout << "receiving messages with session identifier \"" << actual_session_id
<< "\" from queue " << entity << std::endl;
last_read = proton::timestamp::now();
}
void on_message(proton::delivery &, proton::message &m) OVERRIDE {
message_count++;
std::cout << " received message: " << m.body() << std::endl;
last_read = proton::timestamp::now();
}
void process_timeout() {
proton::timestamp deadline = last_read + read_timeout;
proton::timestamp now = proton::timestamp::now();
if (now >= deadline) {
receiver.close();
closed = true;
receiver.connection().close();
if (message_count)
do_next_sequence();
else
std::cout << "Done. No more messages." << std::endl;
} else {
proton::duration next = deadline - now;
receiver.work_queue().schedule(next, [this]() { this->process_timeout(); });
}
}
};
/// Connect to Service Bus queue and send messages divided into different sessions.
class session_sender : public proton::messaging_handler {
private:
const std::string &connection_url;
const std::string &entity;
connection_options coptions;
int msg_count;
int total;
int accepts;
public:
session_sender(const std::string &c, const std::string &e, const connection_options &co) :
connection_url(c), entity(e), coptions(co),
msg_count(0), total(7), accepts(0) {}
void run(proton::container &c) {
c.open_sender(connection_url + "/" + entity, sender_options(), coptions.handler(*this));
}
void send_remaining_messages(proton::sender &s) {
std::string gid;
for (; msg_count < total && s.credit() > 0; msg_count++) {
switch (msg_count) {
case 0: gid = "red"; break;
case 1: gid = "green"; break;
case 2: gid = "blue"; break;
case 3: gid = "red"; break;
case 4: gid = "black"; break;
case 5: gid = "blue"; break;
case 6: gid = "yellow"; break;
}
std::ostringstream mbody;
mbody << "message " << msg_count << " in service bus session \"" << gid << "\"";
proton::message m(mbody.str());
m.group_id(gid); // Service Bus uses the group_id property to as the session identifier.
s.send(m);
std::cout << " sent message: " << m.body() << std::endl;
}
}
void on_sendable(proton::sender &s) OVERRIDE {
send_remaining_messages(s);
}
void on_tracker_accept(proton::tracker &t) OVERRIDE {
accepts++;
if (accepts == total) {
// upload complete
t.sender().close();
t.sender().connection().close();
do_next_sequence();
}
}
};
/// Orchestrate the sequential actions of sending and receiving session-based messages.
class sequence : public proton::messaging_handler {
private:
proton::container *container;
int sequence_no;
session_sender snd;
session_receiver rcv_red, rcv_green, rcv_null;
public:
static sequence *the_sequence;
sequence (const std::string &c, const std::string &e, const connection_options &co) :
container(0), sequence_no(0),
snd(c, e, co), rcv_red(c, e, co, "red"), rcv_green(c, e, co, "green"), rcv_null(c, e, co, NULL) {
the_sequence = this;
}
void on_container_start(proton::container &c) OVERRIDE {
container = &c;
next_sequence();
}
void next_sequence() {
switch (sequence_no++) {
// run these in order exactly once
case 0: snd.run(*container); break;
case 1: rcv_green.run(*container); break;
case 2: rcv_red.run(*container); break;
// Run this until the receiver decides there is no messages left to sequence through
default: rcv_null.run(*container); break;
}
}
};
sequence *sequence::the_sequence = NULL;
void do_next_sequence() { sequence::the_sequence->next_sequence(); }
int main(int argc, char **argv) {
std::string sb_namespace; // i.e. "foo.servicebus.windows.net"
std::string sb_key_name; // shared access key name for entity (AKA "Policy Name")
std::string sb_key; // shared access key
std::string sb_entity; // AKA the service bus queue. Must enable
// sessions on it for this example.
example::options opts(argc, argv);
opts.add_value(sb_namespace, 'n', "namespace", "Service Bus full namespace", "NAMESPACE");
opts.add_value(sb_key_name, 'p', "policy", "policy name that specifies access rights (key name)", "POLICY");
opts.add_value(sb_key, 'k', "key", "secret key for the policy", "key");
opts.add_value(sb_entity, 'e', "entity", "entity path (queue name)", "ENTITY");
try {
opts.parse();
check_arg(sb_namespace, "namespace");
check_arg(sb_key_name, "policy");
check_arg(sb_key, "key");
check_arg(sb_entity, "entity");
std::string connection_string("amqps://" + sb_namespace);
sequence seq(connection_string, sb_entity,
connection_options()
.user(sb_key_name)
.password(sb_key)
.sasl_allowed_mechs("PLAIN"));
proton::container(seq).run();
return 0;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}