blob: f15b019b4637971509d9970024236ead10537778 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "proton/io/connection_engine.hpp"
#include "proton/event_loop.hpp"
#include "proton/error.hpp"
#include "proton/messaging_handler.hpp"
#include "proton/uuid.hpp"
#include "contexts.hpp"
#include "messaging_adapter.hpp"
#include "msg.hpp"
#include "proton_bits.hpp"
#include "proton_event.hpp"
#include <proton/connection.h>
#include <proton/transport.h>
#include <proton/event.h>
#include <algorithm>
#include <iosfwd>
namespace proton {
namespace io {
connection_engine::connection_engine() :
container_(0)
{
int err;
if ((err = pn_connection_engine_init(&c_engine_)))
throw proton::error(std::string("connection_engine init failed: ")+pn_code(err));
}
connection_engine::connection_engine(class container& cont, event_loop* loop) :
container_(&cont)
{
int err;
if ((err = pn_connection_engine_init(&c_engine_)))
throw proton::error(std::string("connection_engine init failed: ")+pn_code(err));
connection_context& ctx = connection_context::get(connection());
ctx.container = container_;
ctx.event_loop.reset(loop);
}
void connection_engine::configure(const connection_options& opts) {
proton::connection c = connection();
opts.apply_unbound(c);
opts.apply_bound(c);
handler_.reset(new messaging_adapter(*opts.handler()));
connection_context::get(connection()).collector = c_engine_.collector;
}
connection_engine::~connection_engine() {
pn_connection_engine_final(&c_engine_);
}
void connection_engine::connect(const connection_options& opts) {
connection_options all;
if (container_) {
all.container_id(container_->id());
all.update(container_->client_connection_options());
}
all.update(opts);
configure(all);
connection().open();
}
void connection_engine::accept(const connection_options& opts) {
connection_options all;
if (container_) {
all.container_id(container_->id());
all.update(container_->server_connection_options());
}
all.update(opts);
configure(all);
}
bool connection_engine::dispatch() {
pn_event_t* c_event;
while ((c_event = pn_connection_engine_dispatch(&c_engine_)) != NULL) {
proton_event cpp_event(c_event, container_);
try {
if (!!handler_) cpp_event.dispatch(*handler_);
} catch (const std::exception& e) {
disconnected(error_condition("exception", e.what()));
}
}
return !pn_connection_engine_finished(&c_engine_);
}
mutable_buffer connection_engine::read_buffer() {
pn_buf_t buffer = pn_connection_engine_read_buffer(&c_engine_);
return mutable_buffer(buffer.data, buffer.size);
}
void connection_engine::read_done(size_t n) {
return pn_connection_engine_read_done(&c_engine_, n);
}
void connection_engine::read_close() {
pn_connection_engine_read_close(&c_engine_);
}
const_buffer connection_engine::write_buffer() {
pn_cbuf_t buffer = pn_connection_engine_write_buffer(&c_engine_);
return const_buffer(buffer.data, buffer.size);
}
void connection_engine::write_done(size_t n) {
return pn_connection_engine_write_done(&c_engine_, n);
}
void connection_engine::write_close() {
pn_connection_engine_write_close(&c_engine_);
}
void connection_engine::disconnected(const proton::error_condition& err) {
pn_condition_t* condition = pn_connection_engine_condition(&c_engine_);
if (!pn_condition_is_set(condition)) // Don't overwrite existing condition
set_error_condition(err, condition);
pn_connection_engine_disconnected(&c_engine_);
}
proton::connection connection_engine::connection() const {
return make_wrapper(c_engine_.connection);
}
proton::transport connection_engine::transport() const {
return make_wrapper(c_engine_.transport);
}
proton::container* connection_engine::container() const {
return container_;
}
}}