blob: 038fe4e9807d6636b4ea989a784bccd4471958e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "proton/io/connection_driver.hpp"
#include "proton/connection.hpp"
#include "proton/container.hpp"
#include "proton/error.hpp"
#include "proton/messaging_handler.hpp"
#include "proton/transport.hpp"
#include "proton/uuid.hpp"
#include "proton/work_queue.hpp"
#include "contexts.hpp"
#include "messaging_adapter.hpp"
#include "msg.hpp"
#include "proton_bits.hpp"
#include <proton/connection.h>
#include <proton/transport.h>
#include <proton/event.h>
#include <algorithm>
namespace proton {
namespace io {
void connection_driver::init() {
if (pn_connection_driver_init(&driver_, pn_connection(), pn_transport()) != 0) {
this->~connection_driver(); // Dtor won't be called on throw from ctor.
throw proton::error(std::string("connection_driver allocation failed"));
}
}
connection_driver::connection_driver() : handler_(0) { init(); }
connection_driver::connection_driver(const std::string& id) : container_id_(id), handler_(0) {
init();
}
connection_driver::~connection_driver() {
pn_connection_driver_destroy(&driver_);
}
void connection_driver::configure(const connection_options& opts, bool server) {
proton::connection c(connection());
opts.apply_unbound(c);
if (server) {
pn_transport_set_server(driver_.transport);
opts.apply_unbound_server(driver_.transport);
} else {
opts.apply_unbound_client(driver_.transport);
}
pn_connection_driver_bind(&driver_);
handler_ = opts.handler();
}
void connection_driver::connect(const connection_options& opts) {
connection_options all;
all.container_id(container_id_);
all.update(opts);
configure(all, false);
connection().open();
}
void connection_driver::accept(const connection_options& opts) {
connection_options all;
all.container_id(container_id_);
all.update(opts);
configure(all, true);
}
bool connection_driver::has_events() const {
return driver_.collector && pn_collector_peek(driver_.collector);
}
bool connection_driver::dispatch() {
pn_event_t* c_event;
while ((c_event = pn_connection_driver_next_event(&driver_)) != NULL) {
try {
if (handler_ != 0) {
messaging_adapter::dispatch(*handler_, c_event);
}
} catch (const std::exception& e) {
pn_condition_t *cond = pn_transport_condition(driver_.transport);
if (!pn_condition_is_set(cond)) {
pn_condition_format(cond, "exception", "%s", e.what());
}
}
}
return !pn_connection_driver_finished(&driver_);
}
mutable_buffer connection_driver::read_buffer() {
pn_rwbytes_t buffer = pn_connection_driver_read_buffer(&driver_);
return mutable_buffer(buffer.start, buffer.size);
}
void connection_driver::read_done(size_t n) {
return pn_connection_driver_read_done(&driver_, n);
}
void connection_driver::read_close() {
pn_connection_driver_read_close(&driver_);
}
const_buffer connection_driver::write_buffer() {
pn_bytes_t buffer = pn_connection_driver_write_buffer(&driver_);
return const_buffer(buffer.start, buffer.size);
}
const_buffer connection_driver::write_done(size_t n) {
pn_bytes_t buffer = pn_connection_driver_write_done(&driver_, n);
return const_buffer(buffer.start, buffer.size);
}
void connection_driver::write_close() {
pn_connection_driver_write_close(&driver_);
}
timestamp connection_driver::tick(timestamp now) {
return timestamp(pn_transport_tick(driver_.transport, now.milliseconds()));
}
void connection_driver::disconnected(const proton::error_condition& err) {
pn_condition_t* condition = pn_transport_condition(driver_.transport);
if (!pn_condition_is_set(condition)) {
set_error_condition(err, condition);
}
pn_connection_driver_close(&driver_);
}
proton::connection connection_driver::connection() const {
return make_wrapper(driver_.connection);
}
proton::transport connection_driver::transport() const {
return make_wrapper(driver_.transport);
}
}}