blob: 313e675e1510fa6b2c63f27a5292423ef4061317 [file] [log] [blame]
#ifndef PROTON_IO_CONTAINER_IMPL_BASE_HPP
#define PROTON_IO_CONTAINER_IMPL_BASE_HPP
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "./link_namer.hpp"
#include "../container.hpp"
#include <mutex>
#include <sstream>
namespace proton {
namespace io {
/// **Experimental** - A base container implementation.
///
/// This is a thread-safe partial implementation of the
/// proton::container interface to reduce boilerplate code in
/// container implementations. Requires C++11.
///
/// You can ignore this class if you want to implement the functions
/// in a different way.
class container_impl_base : public container {
public:
/// @copydoc container::client_connection_options
void client_connection_options(const connection_options & opts) {
store(client_copts_, opts);
}
/// @copydoc container::client_connection_options
connection_options client_connection_options() const {
return load(client_copts_);
}
/// @copydoc container::server_connection_options
void server_connection_options(const connection_options & opts) {
store(server_copts_, opts);
}
/// @copydoc container::server_connection_options
connection_options server_connection_options() const {
return load(server_copts_);
}
/// @copydoc container::sender_options
void sender_options(const class sender_options & opts) {
store(sender_opts_, opts);
}
/// @copydoc container::sender_options
class sender_options sender_options() const {
return load(sender_opts_);
}
/// @copydoc container::receiver_options
void receiver_options(const class receiver_options & opts) {
store(receiver_opts_, opts);
}
/// @copydoc container::receiver_options
class receiver_options receiver_options() const {
return load(receiver_opts_);
}
/// @copydoc container::open_sender
returned<sender> open_sender(
const std::string &url, const class sender_options &opts, const connection_options &copts)
{
return open_link<sender, class sender_options>(url, opts, copts, &connection::open_sender);
}
/// @copydoc container::open_receiver
returned<receiver> open_receiver(
const std::string &url, const class receiver_options &opts, const connection_options &copts)
{
return open_link<receiver>(url, opts, copts, &connection::open_receiver);
}
private:
template<class T, class Opts>
returned<T> open_link(
const std::string &url_str, const Opts& opts, const connection_options& copts,
T (connection::*open_fn)(const std::string&, const Opts&))
{
std::string addr = url(url_str).path();
std::shared_ptr<thread_safe<connection> > ts_connection = connect(url_str, copts);
std::promise<returned<T> > result_promise;
auto do_open = [ts_connection, addr, opts, open_fn, &result_promise]() {
try {
connection c = ts_connection->unsafe();
returned<T> s = make_thread_safe((c.*open_fn)(addr, opts));
result_promise.set_value(s);
} catch (...) {
result_promise.set_exception(std::current_exception());
}
};
ts_connection->event_loop()->inject(do_open);
std::future<returned<T> > result_future = result_promise.get_future();
if (!result_future.valid())
throw error(url_str+": connection closed");
return result_future.get();
}
mutable std::mutex lock_;
template <class T> T load(const T& v) const {
std::lock_guard<std::mutex> g(lock_);
return v;
}
template <class T> void store(T& v, const T& x) const {
std::lock_guard<std::mutex> g(lock_);
v = x;
}
connection_options client_copts_, server_copts_;
class receiver_options receiver_opts_;
class sender_options sender_opts_;
};
} // io
} // proton
#endif // PROTON_IO_CONTAINER_IMPL_BASE_HPP