blob: 5e6483fc94ae8aab77a049c099bfcbec77c82787 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "proton/io/connection_engine.hpp"
#include "proton/event_loop.hpp"
#include "proton/error.hpp"
#include "proton/messaging_handler.hpp"
#include "proton/uuid.hpp"
#include "contexts.hpp"
#include "messaging_adapter.hpp"
#include "msg.hpp"
#include "proton_bits.hpp"
#include "proton_event.hpp"
#include <proton/connection.h>
#include <proton/transport.h>
#include <proton/event.h>
#include <algorithm>
namespace proton {
namespace io {
void connection_engine::init() {
if (pn_connection_engine_init(&engine_, pn_connection(), pn_transport()) != 0) {
this->~connection_engine(); // Dtor won't be called on throw from ctor.
throw proton::error(std::string("connection_engine allocation failed"));
}
}
connection_engine::connection_engine() : handler_(0), container_(0) { init(); }
connection_engine::connection_engine(class container& cont, event_loop* loop) : handler_(0), container_(&cont) {
init();
connection_context& ctx = connection_context::get(connection());
ctx.container = container_;
ctx.event_loop.reset(loop);
}
connection_engine::~connection_engine() {
pn_connection_engine_destroy(&engine_);
}
void connection_engine::configure(const connection_options& opts, bool server) {
proton::connection c(connection());
opts.apply_unbound(c);
if (server) pn_transport_set_server(engine_.transport);
pn_connection_engine_bind(&engine_);
opts.apply_bound(c);
handler_ = opts.handler();
connection_context::get(connection()).collector = engine_.collector;
}
void connection_engine::connect(const connection_options& opts) {
connection_options all;
if (container_) {
all.container_id(container_->id());
all.update(container_->client_connection_options());
}
all.update(opts);
configure(all, false);
connection().open();
}
void connection_engine::accept(const connection_options& opts) {
connection_options all;
if (container_) {
all.container_id(container_->id());
all.update(container_->server_connection_options());
}
all.update(opts);
configure(all, true);
}
bool connection_engine::dispatch() {
pn_event_t* c_event;
while ((c_event = pn_connection_engine_event(&engine_)) != NULL) {
proton_event cpp_event(c_event, container_);
try {
if (handler_ != 0) {
messaging_adapter adapter(*handler_);
cpp_event.dispatch(adapter);
}
} catch (const std::exception& e) {
pn_condition_t *cond = pn_transport_condition(engine_.transport);
if (!pn_condition_is_set(cond)) {
pn_condition_format(cond, "exception", "%s", e.what());
}
}
pn_connection_engine_pop_event(&engine_);
}
return !pn_connection_engine_finished(&engine_);
}
mutable_buffer connection_engine::read_buffer() {
pn_rwbytes_t buffer = pn_connection_engine_read_buffer(&engine_);
return mutable_buffer(buffer.start, buffer.size);
}
void connection_engine::read_done(size_t n) {
return pn_connection_engine_read_done(&engine_, n);
}
void connection_engine::read_close() {
pn_connection_engine_read_close(&engine_);
}
const_buffer connection_engine::write_buffer() {
pn_bytes_t buffer = pn_connection_engine_write_buffer(&engine_);
return const_buffer(buffer.start, buffer.size);
}
void connection_engine::write_done(size_t n) {
return pn_connection_engine_write_done(&engine_, n);
}
void connection_engine::write_close() {
pn_connection_engine_write_close(&engine_);
}
void connection_engine::disconnected(const proton::error_condition& err) {
pn_condition_t* condition = pn_transport_condition(engine_.transport);
if (!pn_condition_is_set(condition)) {
set_error_condition(err, condition);
}
pn_connection_engine_close(&engine_);
}
proton::connection connection_engine::connection() const {
return make_wrapper(engine_.connection);
}
proton::transport connection_engine::transport() const {
return make_wrapper(engine_.transport);
}
proton::container* connection_engine::container() const {
return container_;
}
}}