blob: 83b70055a941c5c1ff91fc7858c9a7a009eb4658 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "../options.hpp"
#include "mt_container.hpp"
#include <proton/connection.hpp>
#include <proton/connection_options.hpp>
#include <proton/container.hpp>
#include <proton/default_container.hpp>
#include <proton/delivery.hpp>
#include <proton/error_condition.hpp>
#include <proton/listen_handler.hpp>
#include <proton/listener.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/sender_options.hpp>
#include <proton/source_options.hpp>
#include <proton/target.hpp>
#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <atomic>
#include <deque>
#include <functional>
#include <iostream>
#include <map>
#include <mutex>
#include <thread>
#include "../fake_cpp11.hpp"
// Thread safe queue.
// Stores messages, notifies subscribed connections when there is data.
class queue {
public:
queue(const std::string& name) : name_(name) {}
std::string name() const { return name_; }
// Push a message onto the queue.
// If the queue was previously empty, notify subscribers it has messages.
// Called from receiver's connection.
void push(const proton::message &m) {
std::lock_guard<std::mutex> g(lock_);
messages_.push_back(m);
if (messages_.size() == 1) { // Non-empty, notify subscribers
for (auto cb : callbacks_)
cb(this);
callbacks_.clear();
}
}
// If the queue is not empty, pop a message into m and return true.
// Otherwise save callback to be called when there are messages and return false.
// Called from sender's connection.
bool pop(proton::message& m, std::function<void(queue*)> callback) {
std::lock_guard<std::mutex> g(lock_);
if (messages_.empty()) {
callbacks_.push_back(callback);
return false;
} else {
m = std::move(messages_.front());
messages_.pop_front();
return true;
}
}
private:
const std::string name_;
std::mutex lock_;
std::deque<proton::message> messages_;
std::vector<std::function<void(queue*)> > callbacks_;
};
/// Thread safe map of queues.
class queues {
public:
queues() : next_id_(0) {}
// Get or create the named queue.
queue* get(const std::string& name) {
std::lock_guard<std::mutex> g(lock_);
auto i = queues_.insert(queue_map::value_type(name, nullptr)).first;
if (!i->second)
i->second.reset(new queue(name));
return i->second.get();
}
// Create a dynamic queue with a unique name.
queue* dynamic() {
std::ostringstream os;
os << "_dynamic_" << next_id_++;
return get(os.str());
}
private:
typedef std::map<std::string, std::unique_ptr<queue> > queue_map;
std::mutex lock_;
queue_map queues_;
std::atomic<int> next_id_; // Use to generate unique queue IDs.
};
/// Broker connection handler. Things to note:
///
/// 1. Each handler manages a single connection.
///
/// 2. For a *single connection* calls to proton::handler functions and calls to
/// function objects passed to proton::event_loop::inject() are serialized,
/// i.e. never called concurrently. Handlers can have per-connection state
/// without needing locks.
///
/// 3. Handler/injected functions for *different connections* can be called
/// concurrently. Resources used by multiple connections (e.g. the queues in
/// this example) must be thread-safe.
///
/// 4. You can 'inject' work to be done sequentially using a connection's
/// proton::event_loop. In this example, we create a std::function callback
/// that we pass to queues, so they can notify us when they have messages.
///
class broker_connection_handler : public proton::messaging_handler {
public:
broker_connection_handler(queues& qs) : queues_(qs) {}
void on_connection_open(proton::connection& c) OVERRIDE {
// Create the has_messages callback for queue subscriptions.
//
// Make a std::shared_ptr to a thread_safe handle for our proton::connection.
// The connection's proton::event_loop will remain valid as a shared_ptr exists.
std::shared_ptr<proton::thread_safe<proton::connection> > ts_c = make_shared_thread_safe(c);
// Make a lambda function to inject a call to this->has_messages() via the proton::event_loop.
// The function is bound to a shared_ptr so this is safe. If the connection has already closed
// proton::event_loop::inject() will drop the callback.
has_messages_callback_ = [this, ts_c](queue* q) mutable {
ts_c->event_loop()->inject(
std::bind(&broker_connection_handler::has_messages, this, q));
};
c.open(); // Accept the connection
}
// A sender sends messages from a queue to a subscriber.
void on_sender_open(proton::sender &sender) OVERRIDE {
queue *q = sender.source().dynamic() ?
queues_.dynamic() : queues_.get(sender.source().address());
sender.open(proton::sender_options().source((proton::source_options().address(q->name()))));
std::cout << "sending from " << q->name() << std::endl;
}
// We have credit to send a message.
void on_sendable(proton::sender &s) OVERRIDE {
queue* q = sender_queue(s);
if (!do_send(q, s)) // Queue is empty, save ourselves in the blocked set.
blocked_.insert(std::make_pair(q, s));
}
// A receiver receives messages from a publisher to a queue.
void on_receiver_open(proton::receiver &r) OVERRIDE {
std::string qname = r.target().address();
if (qname == "shutdown") {
std::cout << "broker shutting down" << std::endl;
// Sending to the special "shutdown" queue stops the broker.
r.connection().container().stop(
proton::error_condition("shutdown", "stop broker"));
} else {
std::cout << "receiving to " << qname << std::endl;
}
}
// A message is received.
void on_message(proton::delivery &d, proton::message &m) OVERRIDE {
std::string qname = d.receiver().target().address();
queues_.get(qname)->push(m);
}
void on_session_close(proton::session &session) OVERRIDE {
// Erase all blocked senders that belong to session.
auto predicate = [session](const proton::sender& s) {
return s.session() == session;
};
erase_sender_if(blocked_.begin(), blocked_.end(), predicate);
}
void on_sender_close(proton::sender &sender) OVERRIDE {
// Erase sender from the blocked set.
auto range = blocked_.equal_range(sender_queue(sender));
auto predicate = [sender](const proton::sender& s) { return s == sender; };
erase_sender_if(range.first, range.second, predicate);
}
void on_error(const proton::error_condition& e) OVERRIDE {
std::cerr << "error: " << e.what() << std::endl;
}
// The container calls on_transport_close() last.
void on_transport_close(proton::transport&) OVERRIDE {
delete this; // All done.
}
private:
typedef std::multimap<queue*, proton::sender> blocked_map;
// Get the queue associated with a sender.
queue* sender_queue(const proton::sender& s) {
return queues_.get(s.source().address()); // Thread safe.
}
// Only called if we have credit. Return true if we sent a message.
bool do_send(queue* q, proton::sender &s) {
proton::message m;
bool popped = q->pop(m, has_messages_callback_);
if (popped)
s.send(m);
/// if !popped the queue has saved the callback for later.
return popped;
}
// Called via the connection's proton::event_loop when q has messages.
// Try all the blocked senders.
void has_messages(queue* q) {
auto range = blocked_.equal_range(q);
for (auto i = range.first; i != range.second;) {
if (i->second.credit() <= 0 || do_send(q, i->second))
i = blocked_.erase(i); // No credit or send was successful, stop blocked.
else
++i; // have credit, didn't send, keep blocked
}
}
// Use to erase closed senders from blocked_ set.
template <class Predicate>
void erase_sender_if(blocked_map::iterator begin, blocked_map::iterator end, Predicate p) {
for (auto i = begin; i != end; ) {
if (p(i->second))
i = blocked_.erase(i);
else
++i;
}
}
queues& queues_;
blocked_map blocked_;
std::function<void(queue*)> has_messages_callback_;
proton::connection connection_;
};
class broker {
public:
broker(const std::string addr) :
container_(make_mt_container("mt_broker")), listener_(queues_)
{
container_->listen(addr, listener_);
std::cout << "broker listening on " << addr << std::endl;
}
void run() {
std::vector<std::thread> threads(std::thread::hardware_concurrency()-1);
for (auto& t : threads)
t = std::thread(&proton::container::run, container_.get());
container_->run(); // Use this thread too.
for (auto& t : threads)
t.join();
}
private:
struct listener : public proton::listen_handler {
listener(queues& qs) : queues_(qs) {}
proton::connection_options on_accept() OVERRIDE{
return proton::connection_options().handler(*(new broker_connection_handler(queues_)));
}
void on_error(const std::string& s) OVERRIDE {
std::cerr << "listen error: " << s << std::endl;
throw std::runtime_error(s);
}
queues& queues_;
};
queues queues_;
std::unique_ptr<proton::container> container_;
listener listener_;
};
int main(int argc, char **argv) {
// Command line options
std::string address("0.0.0.0");
example::options opts(argc, argv);
opts.add_value(address, 'a', "address", "listen on URL", "URL");
try {
opts.parse();
broker(address).run();
return 0;
} catch (const example::bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
} catch (const std::exception& e) {
std::cerr << "broker shutdown: " << e.what() << std::endl;
}
return 1;
}