| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #include "ConnectionImpl.h" |
| #include "SessionImpl.h" |
| #include "qpid/messaging/exceptions.h" |
| #include "qpid/messaging/Session.h" |
| #include "qpid/messaging/PrivateImplRef.h" |
| #include "qpid/framing/Uuid.h" |
| #include "qpid/log/Statement.h" |
| #include "qpid/Url.h" |
| #include "qpid/amqp_0_10/Codecs.h" |
| #include <boost/intrusive_ptr.hpp> |
| #include <vector> |
| #include <sstream> |
| #include <limits> |
| |
| namespace qpid { |
| namespace client { |
| namespace amqp0_10 { |
| |
| using qpid::types::Variant; |
| using qpid::types::VAR_LIST; |
| using qpid::framing::Uuid; |
| |
| namespace { |
| |
| const std::string TCP("tcp"); |
| double FOREVER(std::numeric_limits<double>::max()); |
| |
| // Time values in seconds can be specified as integer or floating point values. |
| double timeValue(const Variant& value) { |
| if (types::isIntegerType(value.getType())) |
| return double(value.asInt64()); |
| return value.asDouble(); |
| } |
| |
| void merge(const std::string& value, std::vector<std::string>& list) { |
| if (std::find(list.begin(), list.end(), value) == list.end()) |
| list.push_back(value); |
| } |
| |
| void merge(const Variant::List& from, std::vector<std::string>& to) |
| { |
| for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) |
| merge(i->asString(), to); |
| } |
| |
| std::string asString(const std::vector<std::string>& v) { |
| std::stringstream os; |
| os << "["; |
| for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) { |
| if (i != v.begin()) os << ", "; |
| os << *i; |
| } |
| os << "]"; |
| return os.str(); |
| } |
| |
| bool expired(const sys::AbsTime& start, double timeout) |
| { |
| if (timeout == 0) return true; |
| if (timeout == FOREVER) return false; |
| sys::Duration used(start, sys::now()); |
| sys::Duration allowed((int64_t)(timeout*sys::TIME_SEC)); |
| return allowed < used; |
| } |
| |
| } // namespace |
| |
| ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) : |
| replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), |
| minReconnectInterval(0.001), maxReconnectInterval(2), |
| retries(0), reconnectOnLimitExceeded(true) |
| { |
| setOptions(options); |
| urls.insert(urls.begin(), url); |
| QPID_LOG(debug, "Created connection " << url << " with " << options); |
| } |
| |
| void ConnectionImpl::setOptions(const Variant::Map& options) |
| { |
| for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) { |
| setOption(i->first, i->second); |
| } |
| } |
| |
| void ConnectionImpl::setOption(const std::string& name, const Variant& value) |
| { |
| sys::Mutex::ScopedLock l(lock); |
| if (name == "reconnect") { |
| reconnect = value; |
| } else if (name == "reconnect-timeout" || name == "reconnect_timeout") { |
| timeout = timeValue(value); |
| } else if (name == "reconnect-limit" || name == "reconnect_limit") { |
| limit = value; |
| } else if (name == "reconnect-interval" || name == "reconnect_interval") { |
| maxReconnectInterval = minReconnectInterval = timeValue(value); |
| } else if (name == "reconnect-interval-min" || name == "reconnect_interval_min") { |
| minReconnectInterval = timeValue(value); |
| } else if (name == "reconnect-interval-max" || name == "reconnect_interval_max") { |
| maxReconnectInterval = timeValue(value); |
| } else if (name == "reconnect-urls-replace" || name == "reconnect_urls_replace") { |
| replaceUrls = value.asBool(); |
| } else if (name == "reconnect-urls" || name == "reconnect_urls") { |
| if (replaceUrls) urls.clear(); |
| if (value.getType() == VAR_LIST) { |
| merge(value.asList(), urls); |
| } else { |
| merge(value.asString(), urls); |
| } |
| } else if (name == "username") { |
| settings.username = value.asString(); |
| } else if (name == "password") { |
| settings.password = value.asString(); |
| } else if (name == "sasl-mechanism" || name == "sasl_mechanism" || |
| name == "sasl-mechanisms" || name == "sasl_mechanisms") { |
| settings.mechanism = value.asString(); |
| } else if (name == "sasl-service" || name == "sasl_service") { |
| settings.service = value.asString(); |
| } else if (name == "sasl-min-ssf" || name == "sasl_min_ssf") { |
| settings.minSsf = value; |
| } else if (name == "sasl-max-ssf" || name == "sasl_max_ssf") { |
| settings.maxSsf = value; |
| } else if (name == "heartbeat") { |
| settings.heartbeat = value; |
| } else if (name == "tcp-nodelay" || name == "tcp_nodelay") { |
| settings.tcpNoDelay = value; |
| } else if (name == "locale") { |
| settings.locale = value.asString(); |
| } else if (name == "max-channels" || name == "max_channels") { |
| settings.maxChannels = value; |
| } else if (name == "max-frame-size" || name == "max_frame_size") { |
| settings.maxFrameSize = value; |
| } else if (name == "bounds") { |
| settings.bounds = value; |
| } else if (name == "transport") { |
| settings.protocol = value.asString(); |
| } else if (name == "ssl-cert-name" || name == "ssl_cert_name") { |
| settings.sslCertName = value.asString(); |
| } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") { |
| reconnectOnLimitExceeded = value; |
| } else if (name == "client-properties") { |
| amqp_0_10::translate(value.asMap(), settings.clientProperties); |
| } else { |
| throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised")); |
| } |
| } |
| |
| |
| void ConnectionImpl::close() |
| { |
| while(true) { |
| messaging::Session session; |
| { |
| qpid::sys::Mutex::ScopedLock l(lock); |
| if (sessions.empty()) break; |
| session = sessions.begin()->second; |
| } |
| session.close(); |
| } |
| detach(); |
| } |
| |
| void ConnectionImpl::detach() |
| { |
| qpid::sys::Mutex::ScopedLock l(lock); |
| connection.close(); |
| } |
| |
| bool ConnectionImpl::isOpen() const |
| { |
| qpid::sys::Mutex::ScopedLock l(lock); |
| return connection.isOpen(); |
| } |
| |
| boost::intrusive_ptr<SessionImpl> getImplPtr(qpid::messaging::Session& session) |
| { |
| return boost::dynamic_pointer_cast<SessionImpl>( |
| qpid::messaging::PrivateImplRef<qpid::messaging::Session>::get(session) |
| ); |
| } |
| |
| void ConnectionImpl::closed(SessionImpl& s) |
| { |
| qpid::sys::Mutex::ScopedLock l(lock); |
| for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { |
| if (getImplPtr(i->second).get() == &s) { |
| sessions.erase(i); |
| break; |
| } |
| } |
| } |
| |
| qpid::messaging::Session ConnectionImpl::getSession(const std::string& name) const |
| { |
| qpid::sys::Mutex::ScopedLock l(lock); |
| Sessions::const_iterator i = sessions.find(name); |
| if (i == sessions.end()) { |
| throw qpid::messaging::KeyError("No such session: " + name); |
| } else { |
| return i->second; |
| } |
| } |
| |
| qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const std::string& n) |
| { |
| std::string name = n.empty() ? Uuid(true).str() : n; |
| qpid::messaging::Session impl(new SessionImpl(*this, transactional)); |
| while (true) { |
| try { |
| getImplPtr(impl)->setSession(connection.newSession(name)); |
| qpid::sys::Mutex::ScopedLock l(lock); |
| sessions[name] = impl; |
| break; |
| } catch (const qpid::TransportFailure&) { |
| reopen(); |
| } catch (const qpid::SessionException& e) { |
| throw qpid::messaging::SessionError(e.what()); |
| } catch (const std::exception& e) { |
| throw qpid::messaging::MessagingException(e.what()); |
| } |
| } |
| return impl; |
| } |
| |
| void ConnectionImpl::open() |
| { |
| qpid::sys::AbsTime start = qpid::sys::now(); |
| qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore); |
| try { |
| if (!connection.isOpen()) connect(start); |
| } |
| catch (const types::Exception&) { throw; } |
| catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); } |
| } |
| |
| void ConnectionImpl::reopen() |
| { |
| if (!reconnect) { |
| throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); |
| } |
| open(); |
| } |
| |
| |
| void ConnectionImpl::connect(const qpid::sys::AbsTime& started) |
| { |
| QPID_LOG(debug, "Starting connection, urls=" << asString(urls)); |
| for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) { |
| if (!reconnect) { |
| throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); |
| } |
| if (limit >= 0 && retries++ >= limit) { |
| throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit"); |
| } |
| if (expired(started, timeout)) { |
| throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout"); |
| } |
| QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls=" |
| << asString(urls)); |
| qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds. |
| } |
| QPID_LOG(debug, "Connection successful, urls=" << asString(urls)); |
| retries = 0; |
| } |
| |
| void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) { |
| for (std::vector<Url>::const_iterator i = more.begin(); i != more.end(); ++i) |
| merge(i->str(), urls); |
| QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls)); |
| } |
| |
| bool ConnectionImpl::tryConnect() |
| { |
| sys::Mutex::ScopedLock l(lock); |
| for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) { |
| try { |
| QPID_LOG(info, "Trying to connect to " << *i << "..."); |
| Url url(*i, settings.protocol.size() ? settings.protocol : TCP); |
| if (url.getUser().size()) settings.username = url.getUser(); |
| if (url.getPass().size()) settings.password = url.getPass(); |
| connection.open(url, settings); |
| QPID_LOG(info, "Connected to " << *i); |
| mergeUrls(connection.getInitialBrokers(), l); |
| return resetSessions(l); |
| } catch (const qpid::TransportFailure& e) { |
| QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); |
| } |
| } |
| return false; |
| } |
| |
| bool ConnectionImpl::resetSessions(const sys::Mutex::ScopedLock& ) |
| { |
| try { |
| qpid::sys::Mutex::ScopedLock l(lock); |
| for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { |
| getImplPtr(i->second)->setSession(connection.newSession(i->first)); |
| } |
| return true; |
| } catch (const qpid::TransportFailure&) { |
| QPID_LOG(debug, "Connection failed while re-initialising sessions"); |
| return false; |
| } catch (const qpid::framing::ResourceLimitExceededException& e) { |
| if (reconnectOnLimitExceeded) { |
| QPID_LOG(debug, "Detaching and reconnecting due to: " << e.what()); |
| detach(); |
| return false; |
| } else { |
| throw qpid::messaging::TargetCapacityExceeded(e.what()); |
| } |
| } |
| } |
| |
| bool ConnectionImpl::backoff() |
| { |
| if (reconnectOnLimitExceeded) { |
| detach(); |
| open(); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| std::string ConnectionImpl::getAuthenticatedUsername() |
| { |
| return connection.getNegotiatedSettings().username; |
| } |
| |
| }}} // namespace qpid::client::amqp0_10 |