| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #include "ConnectionContext.h" |
| #include "DriverImpl.h" |
| #include "PnData.h" |
| #include "ReceiverContext.h" |
| #include "Sasl.h" |
| #include "SenderContext.h" |
| #include "SessionContext.h" |
| #include "Transaction.h" |
| #include "Transport.h" |
| #include "util.h" |
| #include "qpid/amqp/descriptors.h" |
| #include "qpid/amqp/Encoder.h" |
| #include "qpid/amqp/Descriptor.h" |
| #include "qpid/messaging/exceptions.h" |
| #include "qpid/messaging/AddressImpl.h" |
| #include "qpid/messaging/Duration.h" |
| #include "qpid/messaging/Message.h" |
| #include "qpid/messaging/MessageImpl.h" |
| #include "qpid/framing/Buffer.h" |
| #include "qpid/framing/ProtocolInitiation.h" |
| #include "qpid/framing/Uuid.h" |
| #include "qpid/log/Statement.h" |
| #include "qpid/sys/SecurityLayer.h" |
| #include "qpid/sys/SystemInfo.h" |
| #include "qpid/sys/Time.h" |
| #include "qpid/sys/Timer.h" |
| #include "qpid/sys/urlAdd.h" |
| #include "config.h" |
| #include <boost/lexical_cast.hpp> |
| #include <boost/bind.hpp> |
| #include <vector> |
| extern "C" { |
| #include <proton/engine.h> |
| } |
| |
| namespace qpid { |
| namespace messaging { |
| namespace amqp { |
| using types::Variant; |
| |
| namespace { |
| |
| void do_trace(pn_transport_t* transport, const char* message) |
| { |
| ConnectionContext* c = reinterpret_cast<ConnectionContext*>(pn_transport_get_context(transport)); |
| if (c) c->trace(message); |
| } |
| |
| void set_tracer(pn_transport_t* transport, void* context) |
| { |
| pn_transport_set_context(transport, context); |
| pn_transport_set_tracer(transport, &do_trace); |
| } |
| |
| #ifdef USE_PROTON_TRANSPORT_CONDITION |
| std::string get_error(pn_connection_t* connection, pn_transport_t* transport) |
| { |
| std::stringstream text; |
| pn_error_t* cerror = pn_connection_error(connection); |
| if (cerror) text << "connection error " << pn_error_text(cerror) << " [" << cerror << "]"; |
| pn_condition_t* tcondition = pn_transport_condition(transport); |
| if (pn_condition_is_set(tcondition)) text << get_error_string(tcondition, "transport error", ": "); |
| return text.str(); |
| } |
| #else |
| std::string get_error(pn_connection_t* connection, pn_transport_t* transport) |
| { |
| std::stringstream text; |
| pn_error_t* cerror = pn_connection_error(connection); |
| if (cerror) text << "connection error " << pn_error_text(cerror) << " [" << cerror << "]"; |
| pn_error_t* terror = pn_transport_error(transport); |
| if (terror) text << "transport error " << pn_error_text(terror) << " [" << terror << "]"; |
| return text.str(); |
| } |
| #endif |
| |
| class ConnectionTickerTask : public qpid::sys::TimerTask |
| { |
| qpid::sys::Timer& timer; |
| ConnectionContext& connection; |
| public: |
| ConnectionTickerTask(const qpid::sys::Duration& interval, qpid::sys::Timer& t, ConnectionContext& c) : |
| TimerTask(interval, "ConnectionTicker"), |
| timer(t), |
| connection(c) |
| {} |
| |
| void fire() { |
| QPID_LOG(debug, "ConnectionTickerTask fired"); |
| // Setup next firing |
| setupNextFire(); |
| timer.add(this); |
| |
| // Send Ticker |
| connection.activateOutput(); |
| } |
| }; |
| } |
| |
| void ConnectionContext::trace(const char* message) const |
| { |
| QPID_LOG_CAT(trace, protocol, "[" << identifier << "]: " << message); |
| } |
| |
| ConnectionContext::ConnectionContext(const std::string& url, const qpid::types::Variant::Map& o) |
| : qpid::messaging::ConnectionOptions(o), |
| fullUrl(url, protocol.empty() ? qpid::Address::TCP : protocol), |
| engine(pn_transport()), |
| connection(pn_connection()), |
| //note: disabled read/write of header as now handled by engine |
| writeHeader(false), |
| readHeader(false), |
| haveOutput(false), |
| state(DISCONNECTED), |
| codecAdapter(*this), |
| notifyOnWrite(false) |
| { |
| // Concatenate all known URLs into a single URL, get rid of duplicate addresses. |
| sys::urlAddStrings(fullUrl, urls.begin(), urls.end(), protocol.empty() ? |
| qpid::Address::TCP : protocol); |
| if (identifier.empty()) { |
| identifier = qpid::types::Uuid(true).str(); |
| } |
| configureConnection(); |
| } |
| |
| ConnectionContext::~ConnectionContext() |
| { |
| if (ticker) ticker->cancel(); |
| close(); |
| sessions.clear(); |
| pn_connection_free(connection); |
| pn_transport_free(engine); |
| } |
| |
| bool ConnectionContext::isOpen() const |
| { |
| sys::Monitor::ScopedLock l(lock); |
| return state == CONNECTED && pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); |
| } |
| |
| void ConnectionContext::sync(boost::shared_ptr<SessionContext> ssn) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| syncLH(ssn, l); |
| } |
| |
| void ConnectionContext::syncLH(boost::shared_ptr<SessionContext> ssn, sys::Monitor::ScopedLock&) { |
| while (!ssn->settled()) { |
| QPID_LOG(debug, "Waiting for sends to settle on sync()"); |
| wait(ssn);//wait until message has been confirmed |
| wakeupDriver(); |
| } |
| checkClosed(ssn); |
| } |
| |
| void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { |
| //explicitly release messages that have yet to be fetched |
| for (SessionContext::ReceiverMap::iterator i = ssn->receivers.begin(); i != ssn->receivers.end(); ++i) { |
| drain_and_release_messages(ssn, i->second); |
| } |
| syncLH(ssn, l); |
| } |
| |
| if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { |
| pn_session_close(ssn->session); |
| } |
| sessions.erase(ssn->getName()); |
| |
| wakeupDriver(); |
| } |
| |
| void ConnectionContext::close() |
| { |
| sys::Monitor::ScopedLock l(lock); |
| if (state != CONNECTED) return; |
| if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) { |
| for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { |
| syncLH(i->second, l); |
| if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) { |
| pn_session_close(i->second->session); |
| } |
| } |
| pn_connection_close(connection); |
| wakeupDriver(); |
| //wait for close to be confirmed by peer? |
| while (!(pn_connection_state(connection) & PN_REMOTE_CLOSED)) { |
| if (state == DISCONNECTED) { |
| QPID_LOG(warning, "Disconnected before close received from peer."); |
| break; |
| } |
| lock.wait(); |
| } |
| sessions.clear(); |
| } |
| if (state != DISCONNECTED) { |
| transport->close(); |
| while (state != DISCONNECTED) { |
| lock.wait(); |
| } |
| } |
| if (ticker) { |
| ticker->cancel(); |
| ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(); |
| } |
| } |
| |
| bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) |
| { |
| /** |
| * For fetch() on a receiver with zero capacity, need to reissue the |
| * credit on reconnect, so track the fetches in progress. |
| */ |
| qpid::sys::AtomicCount::ScopedIncrement track(lnk->fetching); |
| { |
| sys::Monitor::ScopedLock l(lock); |
| checkClosed(ssn, lnk); |
| if (!lnk->capacity) { |
| pn_link_flow(lnk->receiver, 1); |
| wakeupDriver(); |
| } |
| } |
| if (get(ssn, lnk, message, timeout)) { |
| return true; |
| } else { |
| { |
| sys::Monitor::ScopedLock l(lock); |
| pn_link_drain(lnk->receiver, 0); |
| wakeupDriver(); |
| while (pn_link_draining(lnk->receiver) && !pn_link_queued(lnk->receiver)) { |
| QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); |
| wait(ssn, lnk); |
| } |
| if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) { |
| pn_link_flow(lnk->receiver, lnk->capacity); |
| } |
| } |
| if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| } |
| |
| qpid::sys::AbsTime convert(qpid::messaging::Duration timeout) |
| { |
| qpid::sys::AbsTime until; |
| uint64_t ms = timeout.getMilliseconds(); |
| if (ms < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) { |
| return qpid::sys::AbsTime(qpid::sys::now(), ms * qpid::sys::TIME_MSEC); |
| } else { |
| return qpid::sys::FAR_FUTURE; |
| } |
| } |
| |
| bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) |
| { |
| qpid::sys::AbsTime until(convert(timeout)); |
| while (true) { |
| sys::Monitor::ScopedLock l(lock); |
| checkClosed(ssn, lnk); |
| pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver); |
| QPID_LOG(debug, "In ConnectionContext::get(), current=" << current); |
| if (current && !pn_delivery_partial(current)) { |
| qpid::messaging::MessageImpl& impl = MessageImplAccess::get(message); |
| boost::shared_ptr<EncodedMessage> encoded(new EncodedMessage(pn_delivery_pending(current))); |
| encoded->setNestAnnotationsOption(nestAnnotations); |
| ssize_t read = pn_link_recv(lnk->receiver, encoded->getData(), encoded->getSize()); |
| if (read < 0) throw qpid::messaging::MessagingException("Failed to read message"); |
| encoded->trim((size_t) read); |
| QPID_LOG(debug, "Received message of " << encoded->getSize() << " bytes: "); |
| encoded->init(impl); |
| impl.setEncoded(encoded); |
| impl.setInternalId(ssn->record(current)); |
| if (lnk->capacity) { |
| pn_link_flow(lnk->receiver, 1); |
| if (lnk->wakeupToIssueCredit()) { |
| wakeupDriver(); |
| } else { |
| haveOutput = true; |
| } |
| } |
| // Automatically ack messages if we are in a transaction. |
| if (ssn->transaction) |
| acknowledgeLH(ssn, &message, false, l); |
| return true; |
| } else if (until > qpid::sys::now()) { |
| waitUntil(ssn, lnk, until); |
| } else { |
| return false; |
| } |
| } |
| return false; |
| } |
| |
| boost::shared_ptr<ReceiverContext> ConnectionContext::nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout) |
| { |
| qpid::sys::AbsTime until(convert(timeout)); |
| while (true) { |
| sys::Monitor::ScopedLock l(lock); |
| checkClosed(ssn); |
| boost::shared_ptr<ReceiverContext> r = ssn->nextReceiver(); |
| if (r) { |
| return r; |
| } else if (until > qpid::sys::now()) { |
| waitUntil(ssn, until); |
| } else { |
| return boost::shared_ptr<ReceiverContext>(); |
| } |
| } |
| } |
| |
| void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) { |
| sys::Monitor::ScopedLock l(lock); |
| acknowledgeLH(ssn, message, cumulative, l); |
| } |
| |
| void ConnectionContext::acknowledgeLH(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&) |
| { |
| checkClosed(ssn); |
| if (message) { |
| ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative); |
| } else { |
| ssn->acknowledge(); |
| } |
| wakeupDriver(); |
| } |
| |
| void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| checkClosed(ssn); |
| ssn->nack(MessageImplAccess::get(message).getInternalId(), reject); |
| wakeupDriver(); |
| } |
| |
| void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| if (pn_link_state(lnk->sender) & PN_LOCAL_ACTIVE) { |
| lnk->close(); |
| } |
| wakeupDriver(); |
| while (pn_link_state(lnk->sender) & PN_REMOTE_ACTIVE) { |
| wait(ssn); |
| } |
| ssn->removeSender(lnk->getName()); |
| } |
| |
| void ConnectionContext::drain_and_release_messages(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) |
| { |
| pn_link_drain(lnk->receiver, 0); |
| wakeupDriver(); |
| //Not all implementations handle drain correctly, so limit the |
| //time spent waiting for it |
| qpid::sys::AbsTime until(qpid::sys::now(), qpid::sys::TIME_SEC*2); |
| while (pn_link_credit(lnk->receiver) > pn_link_queued(lnk->receiver) && until > qpid::sys::now()) { |
| QPID_LOG(debug, "Waiting for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); |
| waitUntil(ssn, lnk, until); |
| } |
| //release as yet unfetched messages: |
| for (pn_delivery_t* d = pn_link_current(lnk->receiver); d; d = pn_link_current(lnk->receiver)) { |
| pn_link_advance(lnk->receiver); |
| pn_delivery_update(d, PN_RELEASED); |
| pn_delivery_settle(d); |
| } |
| } |
| |
| void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| drain_and_release_messages(ssn, lnk); |
| if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) { |
| lnk->close(); |
| } |
| wakeupDriver(); |
| while (pn_link_state(lnk->receiver) & PN_REMOTE_ACTIVE) { |
| wait(ssn); |
| } |
| ssn->removeReceiver(lnk->getName()); |
| } |
| |
| void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) |
| { |
| lnk->configure(); |
| attach(ssn, lnk->sender); |
| checkClosed(ssn, lnk); |
| lnk->verify(); |
| QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget()); |
| } |
| |
| void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) |
| { |
| lnk->configure(); |
| attach(ssn, lnk->receiver, lnk->capacity); |
| checkClosed(ssn, lnk); |
| lnk->verify(); |
| QPID_LOG(debug, "Attach succeeded from " << lnk->getSource()); |
| } |
| |
| void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, pn_link_t* link, int credit) |
| { |
| pn_link_open(link); |
| QPID_LOG(debug, "Link attach sent for " << link << ", state=" << pn_link_state(link)); |
| if (credit) pn_link_flow(link, credit); |
| wakeupDriver(); |
| while (pn_link_state(link) & PN_REMOTE_UNINIT) { |
| QPID_LOG(debug, "Waiting for confirmation of link attach for " << link << ", state=" << pn_link_state(link) << "..."); |
| wait(ssn); |
| } |
| } |
| |
| boost::shared_ptr<SenderContext> ConnectionContext::createSender(boost::shared_ptr<SessionContext> session, const qpid::messaging::Address& address) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| boost::shared_ptr<SenderContext> sender = session->createSender(address, setToOnSend); |
| try { |
| attach(session, sender); |
| return sender; |
| } catch (...) { |
| session->removeSender(sender->getName()); |
| throw; |
| } |
| |
| } |
| boost::shared_ptr<ReceiverContext> ConnectionContext::createReceiver(boost::shared_ptr<SessionContext> session, const qpid::messaging::Address& address) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| boost::shared_ptr<ReceiverContext> receiver = session->createReceiver(address); |
| try { |
| attach(session, receiver); |
| return receiver; |
| } catch (...) { |
| session->removeReceiver(receiver->getName()); |
| throw; |
| } |
| } |
| boost::shared_ptr<SenderContext> ConnectionContext::getSender(boost::shared_ptr<SessionContext> session, const std::string& name) const |
| { |
| sys::Monitor::ScopedLock l(lock); |
| return session->getSender(name); |
| } |
| |
| boost::shared_ptr<ReceiverContext> ConnectionContext::getReceiver(boost::shared_ptr<SessionContext> session, const std::string& name) const |
| { |
| sys::Monitor::ScopedLock l(lock); |
| return session->getReceiver(name); |
| } |
| |
| void ConnectionContext::send( |
| boost::shared_ptr<SessionContext> ssn, |
| boost::shared_ptr<SenderContext> snd, |
| const qpid::messaging::Message& message, |
| bool sync, |
| SenderContext::Delivery** delivery) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| sendLH(ssn, snd, message, sync, delivery, l); |
| } |
| |
| void ConnectionContext::sendLH( |
| boost::shared_ptr<SessionContext> ssn, |
| boost::shared_ptr<SenderContext> snd, |
| const qpid::messaging::Message& message, |
| bool sync, |
| SenderContext::Delivery** delivery, |
| sys::Monitor::ScopedLock&) |
| { |
| checkClosed(ssn); |
| while (pn_transport_pending(engine) > 65536) { |
| QPID_LOG(debug, "Have " << pn_transport_pending(engine) << " bytes of output pending; waiting for this to be written..."); |
| notifyOnWrite = true; |
| wakeupDriver(); |
| wait(ssn, snd); |
| notifyOnWrite = false; |
| } |
| while (!snd->send(message, delivery)) { |
| QPID_LOG(debug, "Waiting for capacity..."); |
| if (pn_transport_pending(engine)) wakeupDriver(); |
| wait(ssn, snd);//wait for capacity |
| } |
| wakeupDriver(); |
| if (sync && *delivery) { |
| while (!(*delivery)->delivered()) { |
| QPID_LOG(debug, "Waiting for confirmation..."); |
| wait(ssn, snd);//wait until message has been confirmed |
| } |
| if ((*delivery)->rejected()) { |
| throw MessageRejected("Message was rejected by peer"); |
| } |
| |
| } |
| } |
| |
| void ConnectionContext::setCapacity(boost::shared_ptr<SenderContext> sender, uint32_t capacity) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| sender->setCapacity(capacity); |
| } |
| uint32_t ConnectionContext::getCapacity(boost::shared_ptr<SenderContext> sender) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| return sender->getCapacity(); |
| } |
| uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<SenderContext> sender) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| return sender->getUnsettled(); |
| } |
| |
| void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> receiver, uint32_t capacity) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| receiver->setCapacity(capacity); |
| pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity()); |
| wakeupDriver(); |
| } |
| uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| return receiver->getCapacity(); |
| } |
| uint32_t ConnectionContext::getAvailable(boost::shared_ptr<ReceiverContext> receiver) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| return receiver->getAvailable(); |
| } |
| uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> receiver) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| return receiver->getUnsettled(); |
| } |
| |
| void ConnectionContext::activateOutput() |
| { |
| sys::Monitor::ScopedLock l(lock); |
| if (state == CONNECTED) wakeupDriver(); |
| } |
| /** |
| * Expects lock to be held by caller |
| */ |
| void ConnectionContext::wakeupDriver() |
| { |
| switch (state) { |
| case CONNECTED: |
| haveOutput = true; |
| transport->activateOutput(); |
| QPID_LOG(debug, "wakeupDriver()"); |
| break; |
| case DISCONNECTED: |
| case CONNECTING: |
| QPID_LOG(error, "wakeupDriver() called while not connected"); |
| break; |
| } |
| } |
| |
| namespace { |
| pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED; |
| pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED; |
| } |
| |
| void ConnectionContext::reset() |
| { |
| pn_connection_free(connection); |
| pn_transport_free(engine); |
| |
| engine = pn_transport(); |
| connection = pn_connection(); |
| configureConnection(); |
| |
| for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { |
| i->second->reset(connection); |
| } |
| } |
| |
| bool ConnectionContext::check() { |
| if (checkDisconnected()) { |
| if (ConnectionOptions::reconnect) { |
| QPID_LOG(notice, "Auto-reconnecting to " << fullUrl); |
| autoconnect(); |
| QPID_LOG(notice, "Auto-reconnected to " << currentUrl); |
| } else { |
| throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)"); |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| bool ConnectionContext::checkDisconnected() { |
| if (state == DISCONNECTED) { |
| reset(); |
| } else { |
| if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { |
| std::string text = get_error_string(pn_connection_remote_condition(connection), "Connection closed by peer"); |
| pn_connection_close(connection); |
| throw qpid::messaging::ConnectionError(text); |
| } |
| } |
| return state == DISCONNECTED; |
| } |
| |
| void ConnectionContext::wait() |
| { |
| if (check()) return; // Reconnected, may need to re-test condition. |
| lock.wait(); |
| check(); |
| } |
| void ConnectionContext::waitUntil(qpid::sys::AbsTime until) |
| { |
| lock.wait(until); |
| check(); |
| } |
| void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn) |
| { |
| wait(); |
| checkClosed(ssn); |
| } |
| void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) |
| { |
| wait(); |
| checkClosed(ssn, lnk); |
| } |
| void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) |
| { |
| wait(); |
| checkClosed(ssn, lnk); |
| } |
| void ConnectionContext::waitUntil(boost::shared_ptr<SessionContext> ssn, qpid::sys::AbsTime until) |
| { |
| waitUntil(until); |
| checkClosed(ssn); |
| } |
| void ConnectionContext::waitUntil(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::sys::AbsTime until) |
| { |
| waitUntil(until); |
| checkClosed(ssn, lnk); |
| } |
| void ConnectionContext::waitUntil(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk, qpid::sys::AbsTime until) |
| { |
| waitUntil(until); |
| checkClosed(ssn, lnk); |
| } |
| void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn) |
| { |
| check(); |
| ssn->error.raise(); |
| if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { |
| std::string text = get_error_string(pn_session_remote_condition(ssn->session), "Session ended by peer"); |
| pn_session_close(ssn->session); |
| throw qpid::messaging::SessionError(text); |
| } else if ((pn_session_state(ssn->session) & IS_CLOSED) == IS_CLOSED) { |
| throw qpid::messaging::SessionClosed(); |
| } |
| } |
| |
| bool ConnectionContext::isClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) |
| { |
| try { |
| checkClosed(ssn, lnk->receiver); |
| return false; |
| } catch (const LinkError&) { |
| return true; |
| } |
| } |
| void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) |
| { |
| checkClosed(ssn, lnk->receiver); |
| } |
| void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) |
| { |
| checkClosed(ssn, lnk->sender); |
| } |
| void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_link_t* lnk) |
| { |
| checkClosed(ssn); |
| if ((pn_link_state(lnk) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { |
| pn_condition_t* error = pn_link_remote_condition(lnk); |
| std::string text = get_error_string(error, "Link detached by peer"); |
| pn_link_close(lnk); |
| std::string name = pn_condition_get_name(error); |
| if (name == qpid::amqp::error_conditions::NOT_FOUND) { |
| throw qpid::messaging::NotFound(text); |
| } else if (name == qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS) { |
| throw qpid::messaging::UnauthorizedAccess(text); |
| } else { |
| throw qpid::messaging::LinkError(text); |
| } |
| } else if ((pn_link_state(lnk) & IS_CLOSED) == IS_CLOSED) { |
| throw qpid::messaging::LinkError("Link is not attached"); |
| } |
| } |
| |
| void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s) |
| { |
| if (s->error) return; |
| pn_session_open(s->session); |
| wakeupDriver(); |
| while (pn_session_state(s->session) & PN_REMOTE_UNINIT) { |
| wait(); |
| } |
| |
| for (SessionContext::SenderMap::iterator i = s->senders.begin(); i != s->senders.end(); ++i) { |
| QPID_LOG(debug, id << " reattaching sender " << i->first); |
| attach(s, i->second->sender); |
| i->second->verify(); |
| QPID_LOG(debug, id << " sender " << i->first << " reattached"); |
| i->second->resend(); |
| } |
| for (SessionContext::ReceiverMap::iterator i = s->receivers.begin(); i != s->receivers.end(); ++i) { |
| QPID_LOG(debug, id << " reattaching receiver " << i->first); |
| if (i->second->capacity) { |
| attach(s, i->second->receiver, i->second->capacity); |
| } else { |
| attach(s, i->second->receiver, (uint32_t) i->second->fetching); |
| } |
| i->second->verify(); |
| QPID_LOG(debug, id << " receiver " << i->first << " reattached"); |
| } |
| wakeupDriver(); |
| } |
| |
| boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n) |
| { |
| boost::shared_ptr<SessionContext> session; |
| std::string name = n.empty() ? qpid::framing::Uuid(true).str() : n; |
| { |
| sys::Monitor::ScopedLock l(lock); |
| SessionMap::const_iterator i = sessions.find(name); |
| if (i == sessions.end()) { |
| session = boost::shared_ptr<SessionContext>(new SessionContext(connection)); |
| session->setName(name); |
| pn_session_open(session->session); |
| wakeupDriver(); |
| sessions[name] = session; // Add it now so it will be restarted if we reconnect in wait() |
| while (pn_session_state(session->session) & PN_REMOTE_UNINIT) { |
| wait(); |
| } |
| } else { |
| throw qpid::messaging::KeyError(std::string("Session already exists: ") + name); |
| } |
| |
| } |
| if (transactional) { // Outside of lock |
| startTxSession(session); |
| } |
| return session; |
| } |
| |
| boost::shared_ptr<SessionContext> ConnectionContext::getSession(const std::string& name) const |
| { |
| SessionMap::const_iterator i = sessions.find(name); |
| if (i == sessions.end()) { |
| throw qpid::messaging::KeyError(std::string("No such session") + name); |
| } else { |
| return i->second; |
| } |
| } |
| |
| void ConnectionContext::setOption(const std::string& name, const qpid::types::Variant& value) |
| { |
| set(name, value); |
| } |
| |
| std::string ConnectionContext::getAuthenticatedUsername() |
| { |
| return sasl.get() ? sasl->getAuthenticatedUsername() : std::string(); |
| } |
| |
| std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| QPID_LOG(trace, id << " decode(" << size << ")"); |
| if (readHeader) { |
| size_t decoded = readProtocolHeader(buffer, size); |
| if (decoded < size) { |
| decoded += decode(buffer + decoded, size - decoded); |
| } |
| return decoded; |
| } |
| |
| //TODO: Fix pn_engine_input() to take const buffer |
| ssize_t n = pn_transport_input(engine, const_cast<char*>(buffer), size); |
| if (n > 0 || n == PN_EOS) { |
| // PN_EOS either means we received a Close (which also means we've |
| // consumed all the input), OR some Very Bad Thing happened and this |
| // connection is toast. |
| if (n == PN_EOS) |
| { |
| std::string error; |
| if (checkTransportError(error)) { |
| // "He's dead, Jim." |
| QPID_LOG_CAT(error, network, id << " connection failed: " << error); |
| transport->abort(); |
| return 0; |
| } else { |
| n = size; // assume all consumed |
| } |
| } |
| QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size) |
| // QPID-6698: don't use wallclock here, use monotonic clock |
| int64_t now = qpid::sys::Duration(qpid::sys::ZERO, qpid::sys::AbsTime::now()); |
| pn_transport_tick(engine, now / int64_t(qpid::sys::TIME_MSEC)); |
| lock.notifyAll(); |
| return n; |
| } else if (n == PN_ERR) { |
| std::string error; |
| checkTransportError(error); |
| QPID_LOG_CAT(error, network, id << " connection error: " << error); |
| transport->abort(); |
| return 0; |
| } else { |
| return 0; |
| } |
| |
| } |
| std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| QPID_LOG(trace, id << " encode(" << size << ")"); |
| if (writeHeader) { |
| size_t encoded = writeProtocolHeader(buffer, size); |
| if (encoded < size) { |
| encoded += encode(buffer + encoded, size - encoded); |
| } |
| return encoded; |
| } |
| |
| ssize_t n = pn_transport_output(engine, buffer, size); |
| if (n > 0) { |
| QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size) |
| haveOutput = true; |
| if (notifyOnWrite) lock.notifyAll(); |
| if (ticker) ticker->restart(); |
| return n; |
| } else if (n == PN_ERR) { |
| std::string error; |
| checkTransportError(error); |
| QPID_LOG_CAT(error, network, id << " connection error: " << error); |
| transport->abort(); |
| return 0; |
| } else if (n == PN_EOS) { |
| haveOutput = false; |
| // Normal close, or error? |
| std::string error; |
| if (checkTransportError(error)) { |
| QPID_LOG_CAT(error, network, id << " connection failed: " << error); |
| transport->abort(); |
| } |
| return 0; |
| } else { |
| haveOutput = false; |
| return 0; |
| } |
| } |
| bool ConnectionContext::canEncodePlain() |
| { |
| sys::Monitor::ScopedLock l(lock); |
| |
| // QPID-6698: don't use wallclock here, use monotonic clock |
| int64_t now = qpid::sys::Duration(qpid::sys::ZERO, qpid::sys::AbsTime::now()); |
| pn_transport_tick(engine, now / int64_t(qpid::sys::TIME_MSEC)); |
| return (haveOutput || pn_transport_pending(engine)) && state == CONNECTED; |
| } |
| void ConnectionContext::closed() |
| { |
| sys::Monitor::ScopedLock l(lock); |
| state = DISCONNECTED; |
| lock.notifyAll(); |
| } |
| void ConnectionContext::opened() |
| { |
| sys::Monitor::ScopedLock l(lock); |
| state = CONNECTED; |
| lock.notifyAll(); |
| } |
| bool ConnectionContext::isClosed() const |
| { |
| return !isOpen(); |
| } |
| namespace { |
| qpid::framing::ProtocolVersion AMQP_1_0_PLAIN(1,0,qpid::framing::ProtocolVersion::AMQP); |
| } |
| |
| std::string ConnectionContext::getError() |
| { |
| return get_error(connection, engine); |
| } |
| |
| framing::ProtocolVersion ConnectionContext::getVersion() const |
| { |
| return AMQP_1_0_PLAIN; |
| } |
| |
| std::size_t ConnectionContext::readProtocolHeader(const char* buffer, std::size_t size) |
| { |
| framing::ProtocolInitiation pi(getVersion()); |
| if (size >= pi.encodedSize()) { |
| readHeader = false; |
| qpid::framing::Buffer out(const_cast<char*>(buffer), size); |
| pi.decode(out); |
| QPID_LOG_CAT(debug, protocol, id << " read protocol header: " << pi); |
| return pi.encodedSize(); |
| } else { |
| return 0; |
| } |
| } |
| std::size_t ConnectionContext::writeProtocolHeader(char* buffer, std::size_t size) |
| { |
| framing::ProtocolInitiation pi(getVersion()); |
| if (size >= pi.encodedSize()) { |
| QPID_LOG_CAT(debug, protocol, id << " writing protocol header: " << pi); |
| writeHeader = false; |
| qpid::framing::Buffer out(buffer, size); |
| pi.encode(out); |
| return pi.encodedSize(); |
| } else { |
| QPID_LOG_CAT(debug, protocol, id << " insufficient buffer for protocol header: " << size) |
| return 0; |
| } |
| } |
| bool ConnectionContext::useSasl() |
| { |
| return !(mechanism == "none" || mechanism == "NONE" || mechanism == "None"); |
| } |
| |
| qpid::sys::Codec& ConnectionContext::getCodec() |
| { |
| return *this; |
| } |
| |
| const qpid::messaging::ConnectionOptions* ConnectionContext::getOptions() |
| { |
| return this; |
| } |
| |
| std::size_t ConnectionContext::decode(const char* buffer, std::size_t size) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| size_t decoded = 0; |
| try { |
| if (sasl.get() && !sasl->authenticated()) { |
| decoded = sasl->decode(buffer, size); |
| if (!sasl->authenticated()) return decoded; |
| } |
| if (decoded < size) { |
| if (sasl.get() && sasl->getSecurityLayer()) decoded += sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded); |
| else decoded += decodePlain(buffer+decoded, size-decoded); |
| } |
| } catch (const AuthenticationFailure&) { |
| transport->close(); |
| } |
| return decoded; |
| } |
| std::size_t ConnectionContext::encode(char* buffer, std::size_t size) |
| { |
| sys::Monitor::ScopedLock l(lock); |
| size_t encoded = 0; |
| try { |
| if (sasl.get() && sasl->canEncode()) { |
| encoded += sasl->encode(buffer, size); |
| if (!sasl->authenticated()) return encoded; |
| } |
| if (encoded < size) { |
| if (sasl.get() && sasl->getSecurityLayer()) encoded += sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded); |
| else encoded += encodePlain(buffer+encoded, size-encoded); |
| } |
| } catch (const AuthenticationFailure&) { |
| transport->close(); |
| } |
| return encoded; |
| } |
| bool ConnectionContext::canEncode() |
| { |
| sys::Monitor::ScopedLock l(lock); |
| if (sasl.get()) { |
| try { |
| if (sasl->canEncode()) return true; |
| else if (!sasl->authenticated()) return false; |
| else if (sasl->getSecurityLayer()) return sasl->getSecurityLayer()->canEncode(); |
| } catch (const AuthenticationFailure&) { |
| transport->close(); |
| return false; |
| } |
| } |
| return canEncodePlain(); |
| } |
| |
| namespace { |
| const std::string CLIENT_PROCESS_NAME("qpid.client_process"); |
| const std::string CLIENT_PID("qpid.client_pid"); |
| const std::string CLIENT_PPID("qpid.client_ppid"); |
| } |
| void ConnectionContext::setProperties() |
| { |
| PnData data(pn_connection_properties(connection)); |
| pn_data_put_map(data.data); |
| pn_data_enter(data.data); |
| data.putSymbol(CLIENT_PROCESS_NAME); |
| data.putSymbol(sys::SystemInfo::getProcessName()); |
| data.putSymbol(CLIENT_PID); |
| data.put(int32_t(sys::SystemInfo::getProcessId())); |
| data.putSymbol(CLIENT_PPID); |
| data.put(int32_t(sys::SystemInfo::getParentProcessId())); |
| for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) |
| { |
| data.putSymbol(i->first); |
| data.put(i->second); |
| } |
| pn_data_exit(data.data); |
| } |
| |
| const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettings() |
| { |
| return transport ? transport->getSecuritySettings() : 0; |
| } |
| |
| void ConnectionContext::open() |
| { |
| sys::Monitor::ScopedLock l(lock); |
| if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); |
| if (!driver) driver = DriverImpl::getDefault(); |
| QPID_LOG(info, "Starting connection to " << fullUrl); |
| autoconnect(); |
| } |
| |
| |
| namespace { |
| double FOREVER(std::numeric_limits<double>::max()); |
| bool expired(const sys::AbsTime& start, double timeout) |
| { |
| if (timeout == 0) return true; |
| if (timeout == FOREVER) return false; |
| qpid::sys::Duration used(start, qpid::sys::now()); |
| qpid::sys::Duration allowed((int64_t)(timeout*qpid::sys::TIME_SEC)); |
| return allowed < used; |
| } |
| const std::string COLON(":"); |
| } |
| |
| void throwConnectFail(const Url& url, const std::string& msg) { |
| throw qpid::messaging::TransportFailure( |
| Msg() << "Connect failed to " << url << ": " << msg); |
| } |
| |
| void ConnectionContext::autoconnect() |
| { |
| qpid::sys::AbsTime started(qpid::sys::now()); |
| for (double i = minReconnectInterval; !tryConnectUrl(fullUrl); i = std::min(i*2, maxReconnectInterval)) { |
| if (!ConnectionOptions::reconnect) throwConnectFail(fullUrl, "Reconnect disabled"); |
| if (limit >= 0 && retries++ >= limit) throwConnectFail(fullUrl, "Exceeded retries"); |
| if (expired(started, timeout)) throwConnectFail(fullUrl, "Exceeded timeout"); |
| QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds to" |
| << fullUrl); |
| qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds. |
| } |
| retries = 0; |
| } |
| |
| void ConnectionContext::reconnect(const Url& url) { |
| QPID_LOG(notice, "Reconnecting to " << url); |
| sys::Monitor::ScopedLock l(lock); |
| if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); |
| if (!driver) driver = DriverImpl::getDefault(); |
| reset(); |
| if (!tryConnectUrl(url)) throwConnectFail(url, "Failed to reconnect"); |
| QPID_LOG(notice, "Reconnected to " << currentUrl); |
| } |
| |
| void ConnectionContext::reconnect(const std::string& url) { reconnect(Url(url)); } |
| |
| void ConnectionContext::reconnect() { reconnect(fullUrl); } |
| |
| void ConnectionContext::waitNoReconnect() { |
| if (!checkDisconnected()) { |
| lock.wait(); |
| checkDisconnected(); |
| } |
| } |
| |
| // Try to connect to a URL, i.e. try to connect to each of its addresses in turn |
| // till one succeeds or they all fail. |
| // @return true if we connect successfully |
| bool ConnectionContext::tryConnectUrl(const Url& url) |
| { |
| if (url.getUser().size()) username = url.getUser(); |
| if (url.getPass().size()) password = url.getPass(); |
| |
| for (Url::const_iterator i = url.begin(); i != url.end(); ++i) { |
| QPID_LOG(info, "Connecting to " << *i); |
| if (tryConnectAddr(*i) && tryOpenAddr(*i)) { |
| QPID_LOG(info, "Connected to " << *i); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| // Try to open an AMQP protocol connection on an address, after we have already |
| // established a transport connect (see tryConnectAddr below) |
| // @return true if the AMQP connection is succesfully opened. |
| bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) { |
| currentUrl = Url(addr); |
| if (sasl.get()) { |
| wakeupDriver(); |
| while (!sasl->authenticated() && state != DISCONNECTED) { |
| QPID_LOG(debug, id << " Waiting to be authenticated..."); |
| waitNoReconnect(); |
| } |
| if (state == DISCONNECTED) return false; |
| QPID_LOG(debug, id << " Authenticated"); |
| } |
| |
| QPID_LOG(debug, id << " Opening..."); |
| pn_connection_open(connection); |
| wakeupDriver(); //want to write |
| while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) && |
| state != DISCONNECTED) |
| waitNoReconnect(); |
| if (state == DISCONNECTED) return false; |
| if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { |
| throw qpid::messaging::ConnectionError("Failed to open connection"); |
| } |
| |
| // Connection open - check for idle timeout from the remote and start a |
| // periodic tick to monitor for idle connections |
| pn_timestamp_t remote = pn_transport_get_remote_idle_timeout(engine); |
| pn_timestamp_t local = pn_transport_get_idle_timeout(engine); |
| uint64_t shortest = ((remote && local) |
| ? std::min(remote, local) |
| : (remote) ? remote : local); |
| if (shortest) { |
| // send an idle frame at least twice before timeout |
| shortest = (shortest + 1)/2; |
| qpid::sys::Duration d(shortest * qpid::sys::TIME_MSEC); |
| ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask(d, driver->getTimer(), *this)); |
| driver->getTimer().add(ticker); |
| QPID_LOG(debug, id << " AMQP 1.0 idle-timeout set:" |
| << " local=" << pn_transport_get_idle_timeout(engine) |
| << " remote=" << pn_transport_get_remote_idle_timeout(engine)); |
| } |
| |
| QPID_LOG(debug, id << " Opened"); |
| |
| return restartSessions(); |
| } |
| |
| std::string ConnectionContext::getUrl() const |
| { |
| sys::Monitor::ScopedLock l(lock); |
| return (state == CONNECTED) ? currentUrl.str() : std::string(); |
| } |
| |
| // Try to establish a transport connect to an individual address (typically a |
| // TCP host:port) |
| // @return true if we succeed in connecting. |
| bool ConnectionContext::tryConnectAddr(const qpid::Address& address) |
| { |
| transport = driver->getTransport(address.protocol, *this); |
| id = boost::lexical_cast<std::string>(address); |
| if (useSasl()) { |
| sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, address.host)); |
| } |
| state = CONNECTING; |
| try { |
| QPID_LOG(debug, id << " Connecting ..."); |
| transport->connect(address.host, boost::lexical_cast<std::string>(address.port)); |
| bool waiting(true); |
| while (waiting) { |
| switch (state) { |
| case CONNECTED: |
| QPID_LOG(debug, id << " Connected"); |
| return true; |
| case CONNECTING: |
| lock.wait(); |
| break; |
| case DISCONNECTED: |
| waiting = false; |
| break; |
| } |
| } |
| } catch (const std::exception& e) { |
| QPID_LOG(info, id << " Error while connecting: " << e.what()); |
| state = DISCONNECTED; |
| } |
| transport = boost::shared_ptr<Transport>(); |
| return false; |
| } |
| |
| bool ConnectionContext::restartSessions() |
| { |
| try { |
| for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { |
| restartSession(i->second); |
| } |
| return true; |
| } catch (const qpid::TransportFailure& e) { |
| QPID_LOG(debug, "Connection Failed to re-initialize sessions: " << e.what()); |
| return false; |
| } |
| } |
| |
| void ConnectionContext::initSecurityLayer(qpid::sys::SecurityLayer& s) |
| { |
| s.init(&codecAdapter); |
| } |
| |
| ConnectionContext::CodecAdapter::CodecAdapter(ConnectionContext& c) : context(c) {} |
| std::size_t ConnectionContext::CodecAdapter::decode(const char* buffer, std::size_t size) |
| { |
| return context.decodePlain(buffer, size); |
| } |
| std::size_t ConnectionContext::CodecAdapter::encode(char* buffer, std::size_t size) |
| { |
| return context.encodePlain(buffer, size); |
| } |
| bool ConnectionContext::CodecAdapter::canEncode() |
| { |
| return context.canEncodePlain(); |
| } |
| |
| void ConnectionContext::startTxSession(boost::shared_ptr<SessionContext> session) { |
| try { |
| QPID_LOG(debug, id << " attaching transaction for " << session->getName()); |
| boost::shared_ptr<Transaction> tx(new Transaction(session->session)); |
| session->transaction = tx; |
| { |
| sys::Monitor::ScopedLock l(lock); |
| attach(session, boost::shared_ptr<SenderContext>(tx)); |
| } |
| tx->declare(boost::bind(&ConnectionContext::send, this, _1, _2, _3, _4, _5), session); |
| } catch (const Exception& e) { |
| throw TransactionError(Msg() << "Cannot start transaction: " << e.what()); |
| } |
| } |
| |
| void ConnectionContext::discharge(boost::shared_ptr<SessionContext> session, bool fail) { |
| { |
| sys::Monitor::ScopedLock l(lock); |
| checkClosed(session); |
| if (!session->transaction) |
| throw TransactionError("No Transaction"); |
| Transaction::SendFunction sendFn = boost::bind( |
| &ConnectionContext::sendLH, this, _1, _2, _3, _4, _5, boost::ref(l)); |
| syncLH(session, boost::ref(l)); // Sync to make sure all tx transfers have been received. |
| session->transaction->discharge(sendFn, session, fail); |
| session->transaction->declare(sendFn, session); |
| } |
| } |
| |
| void ConnectionContext::commit(boost::shared_ptr<SessionContext> session) { |
| discharge(session, false); |
| } |
| |
| void ConnectionContext::rollback(boost::shared_ptr<SessionContext> session) { |
| discharge(session, true); |
| } |
| |
| |
| // setup the transport and connection objects: |
| void ConnectionContext::configureConnection() |
| { |
| pn_connection_set_container(connection, identifier.c_str()); |
| setProperties(); |
| if (heartbeat) { |
| // fail an idle connection at 2 x heartbeat (in msecs) |
| pn_transport_set_idle_timeout(engine, heartbeat*2*1000); |
| } |
| |
| bool enableTrace(false); |
| QPID_LOG_TEST_CAT(trace, protocol, enableTrace); |
| if (enableTrace) { |
| pn_transport_trace(engine, PN_TRACE_FRM); |
| set_tracer(engine, this); |
| } |
| |
| int err = pn_transport_bind(engine, connection); |
| if (err) |
| QPID_LOG(error, id << " Error binding connection and transport: " << err); |
| } |
| |
| |
| // check for failures of the transport: |
| bool ConnectionContext::checkTransportError(std::string& text) |
| { |
| std::stringstream info; |
| |
| #ifdef USE_PROTON_TRANSPORT_CONDITION |
| pn_condition_t* tcondition = pn_transport_condition(engine); |
| if (pn_condition_is_set(tcondition)) |
| info << get_error_string(tcondition, "transport error", ": "); |
| #else |
| pn_error_t* terror = pn_transport_error(engine); |
| if (terror) info << "transport error " << pn_error_text(terror) << " [" << terror << "]"; |
| #endif |
| |
| text = info.str(); |
| return !text.empty(); |
| } |
| }}} // namespace qpid::messaging::amqp |