| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #include "ClientConnection.h" |
| |
| #include <openssl/x509.h> |
| #include <pulsar/Authentication.h> |
| #include <pulsar/MessageIdBuilder.h> |
| #include <pulsar/ServiceInfo.h> |
| |
| #include <chrono> |
| #include <fstream> |
| |
| #include "AsioDefines.h" |
| #include "ClientConnectionAdaptor.h" |
| #include "ClientImpl.h" |
| #include "Commands.h" |
| #include "ConnectionPool.h" |
| #include "ConsumerImpl.h" |
| #include "ExecutorService.h" |
| #include "Future.h" |
| #include "LogUtils.h" |
| #include "MockServer.h" |
| #include "OpSendMsg.h" |
| #include "ProducerImpl.h" |
| #include "PulsarApi.pb.h" |
| #include "ResultUtils.h" |
| #include "ServiceNameResolver.h" |
| #include "ServiceURI.h" |
| #include "Url.h" |
| #include "auth/AuthOauth2.h" |
| #include "auth/InitialAuthData.h" |
| #include "checksum/ChecksumProvider.h" |
| |
| #ifdef USE_ASIO |
| #include <asio/connect.hpp> |
| #include <asio/ssl/host_name_verification.hpp> |
| #else |
| #include <boost/asio/connect.hpp> |
| #include <boost/asio/ssl/host_name_verification.hpp> |
| #endif |
| |
| DECLARE_LOG_OBJECT() |
| |
| using namespace ASIO::ip; |
| |
| namespace pulsar { |
| |
| namespace { |
| static std::ostream& operator<<(std::ostream& os, const tcp::resolver::results_type& results) { |
| for (const auto& entry : results) { |
| const auto& ep = entry.endpoint(); |
| os << ep.address().to_string() << ":" << ep.port() << " "; |
| } |
| return os; |
| } |
| } // anonymous namespace |
| |
| using proto::BaseCommand; |
| |
| static const uint32_t DefaultBufferSize = 64 * 1024; |
| |
| static MessageId toMessageId(const proto::MessageIdData& messageIdData) { |
| return MessageIdBuilder::from(messageIdData).build(); |
| } |
| |
| // Convert error codes from protobuf to client API Result |
| static Result getResult(ServerError serverError, const std::string& message) { |
| switch (serverError) { |
| case UnknownError: |
| return ResultUnknownError; |
| |
| case MetadataError: |
| return ResultBrokerMetadataError; |
| |
| case ChecksumError: |
| return ResultChecksumError; |
| |
| case PersistenceError: |
| return ResultBrokerPersistenceError; |
| |
| case AuthenticationError: |
| return ResultAuthenticationError; |
| |
| case AuthorizationError: |
| return ResultAuthorizationError; |
| |
| case ConsumerBusy: |
| return ResultConsumerBusy; |
| |
| case ServiceNotReady: |
| return (message.find("the broker do not have test listener") == std::string::npos) |
| ? ResultRetryable |
| : ResultConnectError; |
| |
| case ProducerBlockedQuotaExceededError: |
| return ResultProducerBlockedQuotaExceededError; |
| |
| case ProducerBlockedQuotaExceededException: |
| return ResultProducerBlockedQuotaExceededException; |
| |
| case TopicNotFound: |
| return ResultTopicNotFound; |
| |
| case SubscriptionNotFound: |
| return ResultSubscriptionNotFound; |
| |
| case ConsumerNotFound: |
| return ResultConsumerNotFound; |
| |
| case UnsupportedVersionError: |
| return ResultUnsupportedVersionError; |
| |
| case TooManyRequests: |
| return ResultTooManyLookupRequestException; |
| |
| case TopicTerminatedError: |
| return ResultTopicTerminated; |
| |
| case ProducerBusy: |
| return ResultProducerBusy; |
| |
| case InvalidTopicName: |
| return ResultInvalidTopicName; |
| |
| case IncompatibleSchema: |
| return ResultIncompatibleSchema; |
| |
| case ConsumerAssignError: |
| return ResultConsumerAssignError; |
| |
| case TransactionCoordinatorNotFound: |
| return ResultTransactionCoordinatorNotFoundError; |
| |
| case InvalidTxnStatus: |
| return ResultInvalidTxnStatusError; |
| |
| case NotAllowedError: |
| return ResultNotAllowedError; |
| |
| case TransactionConflict: |
| return ResultTransactionConflict; |
| |
| case TransactionNotFound: |
| return ResultTransactionNotFound; |
| |
| case ProducerFenced: |
| return ResultProducerFenced; |
| } |
| // NOTE : Do not add default case in the switch above. In future if we get new cases for |
| // ServerError and miss them in the switch above we would like to get notified. Adding |
| // return here to make the compiler happy. |
| return ResultUnknownError; |
| } |
| |
| inline std::ostream& operator<<(std::ostream& os, proto::ServerError error) { |
| os << getResult(error, ""); |
| return os; |
| } |
| |
| static bool file_exists(const std::string& path) { |
| if (path.empty()) { |
| return false; |
| } |
| std::ifstream f(path); |
| return f.good(); |
| } |
| |
| std::atomic<int32_t> ClientConnection::maxMessageSize_{Commands::DefaultMaxMessageSize}; |
| |
| ClientConnection::ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress, |
| const ServiceInfo& serviceInfo, const ExecutorServicePtr& executor, |
| const ClientConfiguration& clientConfiguration, |
| const std::string& clientVersion, ConnectionPool& pool, size_t poolIndex) |
| : operationsTimeout_(ClientImpl::getOperationTimeout(clientConfiguration)), |
| authentication_(serviceInfo.authentication()), |
| serverProtocolVersion_(proto::ProtocolVersion_MIN), |
| executor_(executor), |
| resolver_(executor_->createTcpResolver()), |
| socket_(executor_->createSocket()), |
| logicalAddress_(logicalAddress), |
| physicalAddress_(physicalAddress), |
| cnxStringPtr_(std::make_shared<std::string>("[<none> -> " + physicalAddress + "] ")), |
| incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), |
| connectTimeout_(std::chrono::milliseconds(clientConfiguration.getConnectionTimeout())), |
| connectTimer_(executor_->createDeadlineTimer()), |
| outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), |
| keepAliveIntervalInSeconds_(clientConfiguration.getKeepAliveIntervalInSeconds()), |
| consumerStatsRequestTimer_(executor_->createDeadlineTimer()), |
| maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()), |
| clientVersion_(clientVersion), |
| pool_(pool), |
| poolIndex_(poolIndex) { |
| LOG_INFO(cnxString() << "Create ClientConnection, timeout=" |
| << clientConfiguration.getConnectionTimeout()); |
| if (!authentication_) { |
| LOG_ERROR("Invalid authentication plugin"); |
| throw ResultAuthenticationError; |
| return; |
| } |
| |
| if (auto oauth2Auth = std::dynamic_pointer_cast<AuthOauth2>(authentication_)) { |
| // Configure the TLS trust certs file for Oauth2 |
| auto authData = std::dynamic_pointer_cast<AuthenticationDataProvider>( |
| std::make_shared<InitialAuthData>(serviceInfo.tlsTrustCertsFilePath().value_or(""))); |
| oauth2Auth->getAuthData(authData); |
| } |
| |
| if (serviceInfo.useTls()) { |
| ASIO::ssl::context ctx(ASIO::ssl::context::sslv23_client); |
| ctx.set_options(ASIO::ssl::context::default_workarounds | ASIO::ssl::context::no_sslv2 | |
| ASIO::ssl::context::no_sslv3 | ASIO::ssl::context::no_tlsv1 | |
| ASIO::ssl::context::no_tlsv1_1); |
| Url serviceUrl; |
| Url proxyUrl; |
| Url::parse(physicalAddress, serviceUrl); |
| proxyServiceUrl_ = clientConfiguration.getProxyServiceUrl(); |
| proxyProtocol_ = clientConfiguration.getProxyProtocol(); |
| if (proxyProtocol_ == ClientConfiguration::SNI && !proxyServiceUrl_.empty()) { |
| Url::parse(proxyServiceUrl_, proxyUrl); |
| isSniProxy_ = true; |
| LOG_INFO("Configuring SNI Proxy-url=" << proxyServiceUrl_); |
| } |
| if (clientConfiguration.isTlsAllowInsecureConnection()) { |
| ctx.set_verify_mode(ASIO::ssl::context::verify_none); |
| isTlsAllowInsecureConnection_ = true; |
| } else { |
| ctx.set_verify_mode(ASIO::ssl::context::verify_peer); |
| |
| if (serviceInfo.tlsTrustCertsFilePath()) { |
| const auto& trustCertFilePath = *serviceInfo.tlsTrustCertsFilePath(); |
| if (file_exists(trustCertFilePath)) { |
| ctx.load_verify_file(trustCertFilePath); |
| } else { |
| LOG_ERROR(trustCertFilePath << ": No such trustCertFile"); |
| throw ResultAuthenticationError; |
| } |
| } else { |
| ctx.set_default_verify_paths(); |
| LOG_INFO("Use " << X509_get_default_cert_file() << " as default CA path"); |
| } |
| } |
| |
| std::string tlsCertificates = clientConfiguration.getTlsCertificateFilePath(); |
| std::string tlsPrivateKey = clientConfiguration.getTlsPrivateKeyFilePath(); |
| |
| AuthenticationDataPtr authData; |
| if (authentication_->getAuthData(authData) == ResultOk && authData->hasDataForTls()) { |
| tlsCertificates = authData->getTlsCertificates(); |
| tlsPrivateKey = authData->getTlsPrivateKey(); |
| if (!file_exists(tlsCertificates)) { |
| LOG_ERROR(tlsCertificates << ": No such tlsCertificates"); |
| throw ResultAuthenticationError; |
| } |
| if (!file_exists(tlsCertificates)) { |
| LOG_ERROR(tlsCertificates << ": No such tlsCertificates"); |
| throw ResultAuthenticationError; |
| } |
| ctx.use_private_key_file(tlsPrivateKey, ASIO::ssl::context::pem); |
| ctx.use_certificate_chain_file(tlsCertificates); |
| } else { |
| if (file_exists(tlsPrivateKey) && file_exists(tlsCertificates)) { |
| ctx.use_private_key_file(tlsPrivateKey, ASIO::ssl::context::pem); |
| ctx.use_certificate_chain_file(tlsCertificates); |
| } |
| } |
| |
| tlsSocket_ = ExecutorService::createTlsSocket(socket_, ctx); |
| |
| if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) { |
| LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port()); |
| std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host(); |
| tlsSocket_->set_verify_callback(ASIO::ssl::host_name_verification(urlHost)); |
| } |
| |
| LOG_DEBUG("TLS SNI Host: " << serviceUrl.host()); |
| if (!SSL_set_tlsext_host_name(tlsSocket_->native_handle(), serviceUrl.host().c_str())) { |
| ASIO_ERROR ec{static_cast<int>(::ERR_get_error()), ASIO::error::get_ssl_category()}; |
| LOG_ERROR(ec.message() << ": Error while setting TLS SNI"); |
| return; |
| } |
| } |
| } |
| |
| ClientConnection::~ClientConnection() { |
| LOG_INFO(cnxString() << "Destroyed connection to " << logicalAddress_ << "-" << poolIndex_); |
| } |
| |
| void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdConnected) { |
| if (!cmdConnected.has_server_version()) { |
| LOG_ERROR(cnxString() << "Server version is not set"); |
| close(); |
| return; |
| } |
| |
| if (cmdConnected.has_max_message_size()) { |
| LOG_DEBUG("Connection has max message size setting: " << cmdConnected.max_message_size()); |
| maxMessageSize_.store(cmdConnected.max_message_size(), std::memory_order_release); |
| LOG_DEBUG("Current max message size is: " << maxMessageSize_); |
| } |
| |
| Lock lock(mutex_); |
| |
| if (isClosed()) { |
| LOG_INFO(cnxString() << "Connection already closed"); |
| return; |
| } |
| cancelTimer(*connectTimer_); |
| state_ = Ready; |
| serverProtocolVersion_ = cmdConnected.protocol_version(); |
| |
| if (serverProtocolVersion_ >= proto::v1) { |
| // Only send keep-alive probes if the broker supports it |
| keepAliveTimer_ = executor_->createDeadlineTimer(); |
| if (keepAliveTimer_) { |
| keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_)); |
| keepAliveTimer_->async_wait( |
| [this, self{shared_from_this()}](const ASIO_ERROR& err) { handleKeepAliveTimeout(err); }); |
| } |
| } |
| |
| lock.unlock(); |
| |
| connectPromise_.setValue(shared_from_this()); |
| |
| if (serverProtocolVersion_ >= proto::v8) { |
| startConsumerStatsTimer(std::vector<uint64_t>()); |
| } |
| } |
| |
| void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests) { |
| std::vector<Promise<Result, BrokerConsumerStatsImpl>> consumerStatsPromises; |
| Lock lock(mutex_); |
| |
| for (int i = 0; i < consumerStatsRequests.size(); i++) { |
| PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.find(consumerStatsRequests[i]); |
| if (it != pendingConsumerStatsMap_.end()) { |
| LOG_DEBUG(cnxString() << " removing request_id " << it->first |
| << " from the pendingConsumerStatsMap_"); |
| consumerStatsPromises.push_back(it->second); |
| pendingConsumerStatsMap_.erase(it); |
| } else { |
| LOG_DEBUG(cnxString() << "request_id " << it->first << " already fulfilled - not removing it"); |
| } |
| } |
| |
| consumerStatsRequests.clear(); |
| for (PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.begin(); |
| it != pendingConsumerStatsMap_.end(); ++it) { |
| consumerStatsRequests.push_back(it->first); |
| } |
| |
| // If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero |
| // Check if we have a timer still before we set the request timer to pop again. |
| if (consumerStatsRequestTimer_) { |
| consumerStatsRequestTimer_->expires_after(operationsTimeout_); |
| consumerStatsRequestTimer_->async_wait( |
| [this, self{shared_from_this()}, consumerStatsRequests](const ASIO_ERROR& err) { |
| handleConsumerStatsTimeout(err, consumerStatsRequests); |
| }); |
| } |
| lock.unlock(); |
| // Complex logic since promises need to be fulfilled outside the lock |
| for (int i = 0; i < consumerStatsPromises.size(); i++) { |
| consumerStatsPromises[i].setFailed(ResultTimeout); |
| LOG_WARN(cnxString() << " Operation timedout, didn't get response from broker"); |
| } |
| } |
| |
| /// The number of unacknowledged probes to send before considering the connection dead and notifying the |
| /// application layer |
| typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPCNT> tcp_keep_alive_count; |
| |
| /// The interval between subsequential keepalive probes, regardless of what the connection has exchanged in |
| /// the meantime |
| typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPINTVL> tcp_keep_alive_interval; |
| |
| /// The interval between the last data packet sent (simple ACKs are not considered data) and the first |
| /// keepalive |
| /// probe; after the connection is marked to need keepalive, this counter is not used any further |
| #ifdef __APPLE__ |
| typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPALIVE> tcp_keep_alive_idle; |
| #else |
| typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> tcp_keep_alive_idle; |
| #endif |
| |
| /* |
| * TCP Connect handler |
| * |
| * if async_connect without any error, connected_ would be set to true |
| * at this point the connection is deemed valid to be used by clients of this class |
| */ |
| void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endpoint& endpoint) { |
| if (!err) { |
| std::stringstream cnxStringStream; |
| try { |
| cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() |
| << "] "; |
| std::atomic_store(&cnxStringPtr_, std::make_shared<std::string>(cnxStringStream.str())); |
| } catch (const ASIO_SYSTEM_ERROR& e) { |
| LOG_ERROR("Failed to get endpoint: " << e.what()); |
| close(ResultRetryable); |
| return; |
| } |
| if (logicalAddress_ == physicalAddress_) { |
| LOG_INFO(cnxString() << "Connected to broker"); |
| } else { |
| LOG_INFO(cnxString() << "Connected to broker through proxy. Logical broker: " << logicalAddress_ |
| << ", proxy: " << proxyServiceUrl_ |
| << ", physical address:" << physicalAddress_); |
| } |
| |
| Lock lock(mutex_); |
| if (isClosed()) { |
| LOG_INFO(cnxString() << "Connection already closed"); |
| return; |
| } |
| state_ = TcpConnected; |
| lock.unlock(); |
| |
| ASIO_ERROR error; |
| socket_->set_option(tcp::no_delay(true), error); |
| if (error) { |
| LOG_WARN(cnxString() << "Socket failed to set tcp::no_delay: " << error.message()); |
| } |
| |
| socket_->set_option(tcp::socket::keep_alive(true), error); |
| if (error) { |
| LOG_WARN(cnxString() << "Socket failed to set tcp::socket::keep_alive: " << error.message()); |
| } |
| |
| // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this |
| // should never happen, given that we're sending our own keep-alive probes (within the TCP |
| // connection) every 30 seconds |
| socket_->set_option(tcp_keep_alive_idle(1 * 60), error); |
| if (error) { |
| LOG_DEBUG(cnxString() << "Socket failed to set tcp_keep_alive_idle: " << error.message()); |
| } |
| |
| // Send up to 10 probes before declaring the connection broken |
| socket_->set_option(tcp_keep_alive_count(10), error); |
| if (error) { |
| LOG_DEBUG(cnxString() << "Socket failed to set tcp_keep_alive_count: " << error.message()); |
| } |
| |
| // Interval between probes: 6 seconds |
| socket_->set_option(tcp_keep_alive_interval(6), error); |
| if (error) { |
| LOG_DEBUG(cnxString() << "Socket failed to set tcp_keep_alive_interval: " << error.message()); |
| } |
| |
| if (tlsSocket_) { |
| if (!isTlsAllowInsecureConnection_) { |
| ASIO_ERROR err; |
| Url service_url; |
| if (!Url::parse(physicalAddress_, service_url)) { |
| LOG_ERROR(cnxString() << "Invalid Url, unable to parse: " << err << " " << err.message()); |
| close(); |
| return; |
| } |
| } |
| // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation |
| // fault might happen |
| tlsSocket_->async_handshake( |
| ASIO::ssl::stream<tcp::socket>::client, |
| [this, self{shared_from_this()}](const auto& err) { handleHandshake(err); }); |
| } else { |
| handleHandshake(ASIO_SUCCESS); |
| } |
| } else { |
| LOG_ERROR(cnxString() << "Failed to establish connection to " << endpoint << ": " << err.message()); |
| { |
| std::lock_guard lock{mutex_}; |
| if (isClosed()) { |
| return; |
| } |
| cancelTimer(*connectTimer_); |
| } |
| if (err == ASIO::error::operation_aborted) { |
| close(); |
| } else { |
| close(ResultRetryable); |
| } |
| } |
| } |
| |
| void ClientConnection::handleHandshake(const ASIO_ERROR& err) { |
| if (err) { |
| if (err.value() == ASIO::ssl::error::stream_truncated) { |
| LOG_WARN(cnxString() << "Handshake failed: " << err.message()); |
| close(ResultRetryable); |
| } else { |
| LOG_ERROR(cnxString() << "Handshake failed: " << err.message()); |
| close(); |
| } |
| return; |
| } |
| |
| bool connectingThroughProxy = logicalAddress_ != physicalAddress_; |
| Result result = ResultOk; |
| SharedBuffer buffer; |
| try { |
| buffer = Commands::newConnect(authentication_, logicalAddress_, connectingThroughProxy, |
| clientVersion_, result); |
| } catch (const std::exception& e) { |
| LOG_ERROR(cnxString() << "Failed to create Connect command: " << e.what()); |
| close(ResultAuthenticationError); |
| return; |
| } |
| if (result != ResultOk) { |
| LOG_ERROR(cnxString() << "Failed to establish connection: " << result); |
| close(result); |
| return; |
| } |
| // Send CONNECT command to broker |
| auto self = shared_from_this(); |
| asyncWrite(buffer.const_asio_buffer(), |
| customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) { |
| handleSentPulsarConnect(err, buffer); |
| })); |
| } |
| |
| void ClientConnection::handleSentPulsarConnect(const ASIO_ERROR& err, const SharedBuffer& buffer) { |
| if (isClosed()) { |
| return; |
| } |
| if (err) { |
| LOG_ERROR(cnxString() << "Failed to establish connection: " << err.message()); |
| close(); |
| return; |
| } |
| |
| // Schedule the reading of CONNECTED command from broker |
| readNextCommand(); |
| } |
| |
| void ClientConnection::handleSentAuthResponse(const ASIO_ERROR& err, const SharedBuffer& buffer) { |
| if (isClosed()) { |
| return; |
| } |
| if (err) { |
| LOG_WARN(cnxString() << "Failed to send auth response: " << err.message()); |
| close(); |
| return; |
| } |
| } |
| |
| /* |
| * Async method to establish TCP connection with broker |
| * |
| * tcpConnectCompletionHandler is notified when the result of this call is available. |
| * |
| */ |
| void ClientConnection::tcpConnectAsync() { |
| if (isClosed()) { |
| return; |
| } |
| |
| ASIO_ERROR err; |
| Url service_url; |
| std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_; |
| if (!Url::parse(hostUrl, service_url)) { |
| LOG_ERROR(cnxString() << "Invalid Url, unable to parse: " << err << " " << err.message()); |
| close(); |
| return; |
| } |
| |
| if (service_url.protocol() != "pulsar" && service_url.protocol() != "pulsar+ssl") { |
| LOG_ERROR(cnxString() << "Invalid Url protocol '" << service_url.protocol() |
| << "'. Valid values are 'pulsar' and 'pulsar+ssl'"); |
| close(); |
| return; |
| } |
| |
| LOG_DEBUG(cnxString() << "Resolving " << service_url.host() << ":" << service_url.port()); |
| |
| resolver_->async_resolve( |
| service_url.host(), std::to_string(service_url.port()), |
| [this, self{shared_from_this()}](auto err, const auto& results) { handleResolve(err, results); }); |
| } |
| |
| void ClientConnection::handleResolve(ASIO_ERROR err, const tcp::resolver::results_type& results) { |
| if (err) { |
| std::string hostUrl = isSniProxy_ ? cnxString() : proxyServiceUrl_; |
| LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message()); |
| close(); |
| return; |
| } |
| |
| if (!results.empty()) { |
| LOG_DEBUG(cnxString() << "Resolved " << results.size() << " endpoints"); |
| for (const auto& entry : results) { |
| const auto& ep = entry.endpoint(); |
| LOG_DEBUG(cnxString() << " " << ep.address().to_string() << ":" << ep.port()); |
| } |
| } |
| |
| std::lock_guard lock{mutex_}; |
| if (isClosed()) { |
| return; |
| } |
| |
| connectTimer_->expires_after(connectTimeout_); |
| connectTimer_->async_wait([this, results, self{shared_from_this()}](const auto& err) { |
| if (err) { |
| return; |
| } |
| Lock lock{mutex_}; |
| if (!isClosed() && state_ != Ready) { |
| LOG_ERROR(cnxString() << "Connection to " << results << " was not established in " |
| << connectTimeout_.count() << " ms"); |
| lock.unlock(); |
| close(); |
| } // else: the connection is closed or already established |
| }); |
| |
| ASIO::async_connect( |
| *socket_, results, |
| [this, self{shared_from_this()}](const ASIO_ERROR& err, const tcp::endpoint& endpoint) { |
| handleTcpConnected(err, endpoint); |
| }); |
| } |
| |
| void ClientConnection::readNextCommand() { |
| const static uint32_t minReadSize = sizeof(uint32_t); |
| auto self = shared_from_this(); |
| asyncReceive(incomingBuffer_.asio_buffer(), |
| customAllocReadHandler([this, self](const ASIO_ERROR& err, size_t bytesTransferred) { |
| handleRead(err, bytesTransferred, minReadSize); |
| })); |
| } |
| |
| void ClientConnection::handleRead(const ASIO_ERROR& err, size_t bytesTransferred, uint32_t minReadSize) { |
| if (isClosed()) { |
| return; |
| } |
| // Update buffer write idx with new data |
| incomingBuffer_.bytesWritten(bytesTransferred); |
| |
| if (err || bytesTransferred == 0) { |
| if (err == ASIO::error::operation_aborted) { |
| LOG_DEBUG(cnxString() << "Read operation was canceled: " << err.message()); |
| } else if (bytesTransferred == 0 || err == ASIO::error::eof) { |
| LOG_DEBUG(cnxString() << "Server closed the connection: " << err.message()); |
| } else { |
| LOG_ERROR(cnxString() << "Read operation failed: " << err.message()); |
| } |
| close(ResultDisconnected); |
| } else if (bytesTransferred < minReadSize) { |
| // Read the remaining part, use a slice of buffer to write on the next |
| // region |
| SharedBuffer buffer = incomingBuffer_.slice(bytesTransferred); |
| auto self = shared_from_this(); |
| auto nextMinReadSize = minReadSize - bytesTransferred; |
| asyncReceive(buffer.asio_buffer(), |
| customAllocReadHandler( |
| [this, self, nextMinReadSize](const ASIO_ERROR& err, size_t bytesTransferred) { |
| handleRead(err, bytesTransferred, nextMinReadSize); |
| })); |
| } else { |
| processIncomingBuffer(); |
| } |
| } |
| |
| void ClientConnection::processIncomingBuffer() { |
| // Process all the available frames from the incoming buffer |
| while (incomingBuffer_.readableBytes() >= sizeof(uint32_t)) { |
| // Extract message frames from incoming buffer |
| // At this point we have at least 4 bytes in the buffer |
| uint32_t frameSize = incomingBuffer_.readUnsignedInt(); |
| |
| if (frameSize > incomingBuffer_.readableBytes()) { |
| // We don't have the entire frame yet |
| const uint32_t bytesToReceive = frameSize - incomingBuffer_.readableBytes(); |
| |
| // Rollback the reading of frameSize (when the frame will be complete, |
| // we'll read it again |
| incomingBuffer_.rollback(sizeof(uint32_t)); |
| |
| if (bytesToReceive > incomingBuffer_.writableBytes()) { |
| // Need to allocate a buffer big enough for the frame |
| uint32_t newBufferSize = std::max<uint32_t>(DefaultBufferSize, frameSize + sizeof(uint32_t)); |
| incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, newBufferSize); |
| } |
| auto self = shared_from_this(); |
| asyncReceive(incomingBuffer_.asio_buffer(), |
| customAllocReadHandler( |
| [this, self, bytesToReceive](const ASIO_ERROR& err, size_t bytesTransferred) { |
| handleRead(err, bytesTransferred, bytesToReceive); |
| })); |
| return; |
| } |
| |
| // At this point, we have at least one complete frame available in the buffer |
| uint32_t cmdSize = incomingBuffer_.readUnsignedInt(); |
| proto::BaseCommand incomingCmd; |
| if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) { |
| LOG_ERROR(cnxString() << "Error parsing protocol buffer command"); |
| close(ResultDisconnected); |
| return; |
| } |
| |
| incomingBuffer_.consume(cmdSize); |
| |
| if (incomingCmd.type() == BaseCommand::MESSAGE) { |
| // Parse message metadata and extract payload |
| proto::MessageMetadata msgMetadata; |
| proto::BrokerEntryMetadata brokerEntryMetadata; |
| |
| // read checksum |
| uint32_t remainingBytes = frameSize - (cmdSize + 4); |
| |
| auto readerIndex = incomingBuffer_.readerIndex(); |
| if (incomingBuffer_.readUnsignedShort() == Commands::magicBrokerEntryMetadata) { |
| // broker entry metadata is present |
| uint32_t brokerEntryMetadataSize = incomingBuffer_.readUnsignedInt(); |
| if (!brokerEntryMetadata.ParseFromArray(incomingBuffer_.data(), brokerEntryMetadataSize)) { |
| LOG_ERROR(cnxString() |
| << "[consumer id " << incomingCmd.message().consumer_id() |
| << ", message ledger id " << incomingCmd.message().message_id().ledgerid() |
| << ", entry id " << incomingCmd.message().message_id().entryid() |
| << "] Error parsing broker entry metadata"); |
| close(ResultDisconnected); |
| return; |
| } |
| incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 + brokerEntryMetadataSize); |
| remainingBytes -= (2 + 4 + brokerEntryMetadataSize); |
| } else { |
| incomingBuffer_.setReaderIndex(readerIndex); |
| } |
| |
| bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd); |
| |
| uint32_t metadataSize = incomingBuffer_.readUnsignedInt(); |
| if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) { |
| LOG_ERROR(cnxString() |
| << "[consumer id " << incomingCmd.message().consumer_id() // |
| << ", message ledger id " << incomingCmd.message().message_id().ledgerid() // |
| << ", entry id " << incomingCmd.message().message_id().entryid() |
| << "] Error parsing message metadata"); |
| close(ResultDisconnected); |
| return; |
| } |
| |
| incomingBuffer_.consume(metadataSize); |
| remainingBytes -= (4 + metadataSize); |
| |
| uint32_t payloadSize = remainingBytes; |
| SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(), payloadSize); |
| incomingBuffer_.consume(payloadSize); |
| handleIncomingMessage(incomingCmd.message(), isChecksumValid, brokerEntryMetadata, msgMetadata, |
| payload); |
| } else { |
| handleIncomingCommand(incomingCmd); |
| } |
| } |
| if (incomingBuffer_.readableBytes() > 0) { |
| // We still have 1 to 3 bytes from the next frame |
| assert(incomingBuffer_.readableBytes() < sizeof(uint32_t)); |
| |
| // Restart with a new buffer and copy the few bytes at the beginning |
| incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, DefaultBufferSize); |
| |
| // At least we need to read 4 bytes to have the complete frame size |
| uint32_t minReadSize = sizeof(uint32_t) - incomingBuffer_.readableBytes(); |
| |
| auto self = shared_from_this(); |
| asyncReceive( |
| incomingBuffer_.asio_buffer(), |
| customAllocReadHandler([this, self, minReadSize](const ASIO_ERROR& err, size_t bytesTransferred) { |
| handleRead(err, bytesTransferred, minReadSize); |
| })); |
| return; |
| } |
| |
| // We have read everything we had in the buffer |
| // Rollback the indexes to reuse the same buffer |
| incomingBuffer_.reset(); |
| |
| readNextCommand(); |
| } |
| |
| bool ClientConnection::verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t& remainingBytes, |
| proto::BaseCommand& incomingCmd) { |
| int readerIndex = incomingBuffer_.readerIndex(); |
| bool isChecksumValid = true; |
| |
| if (incomingBuffer_.readUnsignedShort() == Commands::magicCrc32c) { |
| uint32_t storedChecksum = incomingBuffer_.readUnsignedInt(); |
| remainingBytes -= (2 + 4) /* subtract size of checksum itself */; |
| |
| // compute metadata-payload checksum |
| int metadataPayloadSize = remainingBytes; |
| uint32_t computedChecksum = computeChecksum(0, incomingBuffer_.data(), metadataPayloadSize); |
| // verify checksum |
| isChecksumValid = (storedChecksum == computedChecksum); |
| |
| if (!isChecksumValid) { |
| LOG_ERROR("[consumer id " |
| << incomingCmd.message().consumer_id() // |
| << ", message ledger id " << incomingCmd.message().message_id().ledgerid() // |
| << ", entry id " << incomingCmd.message().message_id().entryid() // |
| << "stored-checksum" << storedChecksum << "computedChecksum" << computedChecksum // |
| << "] Checksum verification failed"); |
| } |
| } else { |
| incomingBuffer_.setReaderIndex(readerIndex); |
| } |
| return isChecksumValid; |
| } |
| |
| void ClientConnection::handleActiveConsumerChange(const proto::CommandActiveConsumerChange& change) { |
| LOG_DEBUG(cnxString() << "Received notification about active consumer change, consumer_id: " |
| << change.consumer_id() << " isActive: " << change.is_active()); |
| Lock lock(mutex_); |
| ConsumersMap::iterator it = consumers_.find(change.consumer_id()); |
| if (it != consumers_.end()) { |
| ConsumerImplPtr consumer = it->second.lock(); |
| |
| if (consumer) { |
| lock.unlock(); |
| consumer->activeConsumerChanged(change.is_active()); |
| } else { |
| consumers_.erase(change.consumer_id()); |
| LOG_DEBUG(cnxString() << "Ignoring incoming message for already destroyed consumer " |
| << change.consumer_id()); |
| } |
| } else { |
| LOG_DEBUG(cnxString() << "Got invalid consumer Id in " << change.consumer_id() |
| << " -- isActive: " << change.is_active()); |
| } |
| } |
| |
| void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid, |
| proto::BrokerEntryMetadata& brokerEntryMetadata, |
| proto::MessageMetadata& msgMetadata, SharedBuffer& payload) { |
| LOG_DEBUG(cnxString() << "Received a message from the server for consumer: " << msg.consumer_id()); |
| |
| Lock lock(mutex_); |
| ConsumersMap::iterator it = consumers_.find(msg.consumer_id()); |
| if (it != consumers_.end()) { |
| ConsumerImplPtr consumer = it->second.lock(); |
| |
| if (consumer) { |
| // Unlock the mutex before notifying the consumer of the |
| // new received message |
| lock.unlock(); |
| consumer->messageReceived(shared_from_this(), msg, isChecksumValid, brokerEntryMetadata, |
| msgMetadata, payload); |
| } else { |
| consumers_.erase(msg.consumer_id()); |
| LOG_DEBUG(cnxString() << "Ignoring incoming message for already destroyed consumer " |
| << msg.consumer_id()); |
| } |
| } else { |
| LOG_DEBUG(cnxString() << "Got invalid consumer Id in " // |
| << msg.consumer_id() << " -- msg: " << msgMetadata.sequence_id()); |
| } |
| } |
| |
| void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) { |
| LOG_DEBUG(cnxString() << "Handling incoming command: " << Commands::messageType(incomingCmd.type())); |
| |
| switch (state_.load()) { |
| case Pending: { |
| LOG_ERROR(cnxString() << "Connection is not ready yet"); |
| break; |
| } |
| |
| case TcpConnected: { |
| // Handle Pulsar Connected |
| if (incomingCmd.type() != BaseCommand::CONNECTED) { |
| // Wrong cmd |
| close(); |
| } else { |
| handlePulsarConnected(incomingCmd.connected()); |
| } |
| break; |
| } |
| |
| case Disconnected: { |
| LOG_ERROR(cnxString() << "Connection already disconnected"); |
| break; |
| } |
| |
| case Ready: { |
| // Since we are receiving data from the connection, we are assuming that for now the |
| // connection is still working well. |
| if (incomingCmd.type() != BaseCommand::PING) { |
| havePendingPingRequest_ = false; |
| } |
| |
| // Handle normal commands |
| switch (incomingCmd.type()) { |
| case BaseCommand::SEND_RECEIPT: |
| handleSendReceipt(incomingCmd.send_receipt()); |
| break; |
| |
| case BaseCommand::SEND_ERROR: |
| handleSendError(incomingCmd.send_error()); |
| break; |
| |
| case BaseCommand::SUCCESS: |
| handleSuccess(incomingCmd.success()); |
| break; |
| |
| case BaseCommand::PARTITIONED_METADATA_RESPONSE: |
| handlePartitionedMetadataResponse(incomingCmd.partitionmetadataresponse()); |
| break; |
| |
| case BaseCommand::CONSUMER_STATS_RESPONSE: |
| handleConsumerStatsResponse(incomingCmd.consumerstatsresponse()); |
| break; |
| |
| case BaseCommand::LOOKUP_RESPONSE: |
| handleLookupTopicRespose(incomingCmd.lookuptopicresponse()); |
| break; |
| |
| case BaseCommand::PRODUCER_SUCCESS: |
| handleProducerSuccess(incomingCmd.producer_success()); |
| break; |
| |
| case BaseCommand::ERROR: |
| handleError(incomingCmd.error()); |
| break; |
| |
| case BaseCommand::TOPIC_MIGRATED: |
| handleTopicMigrated(incomingCmd.topicmigrated()); |
| break; |
| |
| case BaseCommand::CLOSE_PRODUCER: |
| handleCloseProducer(incomingCmd.close_producer()); |
| break; |
| |
| case BaseCommand::CLOSE_CONSUMER: |
| handleCloseConsumer(incomingCmd.close_consumer()); |
| break; |
| |
| case BaseCommand::PING: |
| // Respond to ping request |
| LOG_DEBUG(cnxString() << "Replying to ping command"); |
| sendCommand(Commands::newPong()); |
| break; |
| |
| case BaseCommand::PONG: |
| LOG_DEBUG(cnxString() << "Received response to ping message"); |
| break; |
| |
| case BaseCommand::AUTH_CHALLENGE: |
| handleAuthChallenge(); |
| break; |
| |
| case BaseCommand::ACTIVE_CONSUMER_CHANGE: |
| handleActiveConsumerChange(incomingCmd.active_consumer_change()); |
| break; |
| |
| case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE: |
| handleGetLastMessageIdResponse(incomingCmd.getlastmessageidresponse()); |
| break; |
| |
| case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE: |
| handleGetTopicOfNamespaceResponse(incomingCmd.gettopicsofnamespaceresponse()); |
| break; |
| |
| case BaseCommand::GET_SCHEMA_RESPONSE: |
| handleGetSchemaResponse(incomingCmd.getschemaresponse()); |
| break; |
| |
| case BaseCommand::ACK_RESPONSE: |
| handleAckResponse(incomingCmd.ackresponse()); |
| break; |
| |
| default: |
| LOG_WARN(cnxString() << "Received invalid message from server"); |
| close(ResultDisconnected); |
| break; |
| } |
| } |
| } |
| } |
| |
| Future<Result, BrokerConsumerStatsImpl> ClientConnection::newConsumerStats(uint64_t consumerId, |
| uint64_t requestId) { |
| Lock lock(mutex_); |
| Promise<Result, BrokerConsumerStatsImpl> promise; |
| if (isClosed()) { |
| lock.unlock(); |
| LOG_ERROR(cnxString() << " Client is not connected to the broker"); |
| promise.setFailed(ResultNotConnected); |
| return promise.getFuture(); |
| } |
| pendingConsumerStatsMap_.insert(std::make_pair(requestId, promise)); |
| lock.unlock(); |
| if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && |
| mockServer_->sendRequest("CONSUMER_STATS", requestId)) { |
| return promise.getFuture(); |
| } |
| sendCommand(Commands::newConsumerStats(consumerId, requestId)); |
| return promise.getFuture(); |
| } |
| |
| void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative, |
| const std::string& listenerName, uint64_t requestId, |
| const LookupDataResultPromisePtr& promise) { |
| newLookup(Commands::newLookup(topicName, authoritative, requestId, listenerName), requestId, "LOOKUP", |
| promise); |
| } |
| |
| void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName, uint64_t requestId, |
| const LookupDataResultPromisePtr& promise) { |
| newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), requestId, "PARTITIONED_METADATA", |
| promise); |
| } |
| |
| void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType, |
| const LookupDataResultPromisePtr& promise) { |
| Lock lock(mutex_); |
| if (isClosed()) { |
| lock.unlock(); |
| promise->setFailed(ResultNotConnected); |
| return; |
| } else if (numOfPendingLookupRequest_ >= maxPendingLookupRequest_) { |
| lock.unlock(); |
| promise->setFailed(ResultTooManyLookupRequestException); |
| return; |
| } |
| |
| auto request = insertRequest( |
| pendingLookupRequests_, requestId, [weakSelf{weak_from_this()}, requestId, requestType]() { |
| if (auto self = weakSelf.lock()) { |
| LOG_WARN(self->cnxString() |
| << requestType << " request timeout to broker, req_id: " << requestId); |
| self->numOfPendingLookupRequest_--; |
| } |
| }); |
| request->getFuture().addListener([promise](Result result, const LookupDataResultPtr& lookupDataResult) { |
| if (result == ResultOk) { |
| promise->setValue(lookupDataResult); |
| } else { |
| promise->setFailed(result); |
| } |
| }); |
| |
| numOfPendingLookupRequest_++; |
| lock.unlock(); |
| LOG_DEBUG(cnxString() << "Inserted lookup request " << requestType << " (req_id: " << requestId << ")"); |
| if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && |
| mockServer_->sendRequest(requestType, requestId)) { |
| return; |
| } |
| sendCommand(cmd); |
| } |
| |
| void ClientConnection::sendCommand(const SharedBuffer& cmd) { |
| Lock lock(mutex_); |
| |
| if (pendingWriteOperations_++ == 0) { |
| // Write immediately to socket |
| executor_->dispatch([this, cmd, self{shared_from_this()}] { sendCommandInternal(cmd); }); |
| } else { |
| // Queue to send later |
| pendingWriteBuffers_.push_back(cmd); |
| } |
| } |
| |
| void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) { |
| auto self = shared_from_this(); |
| asyncWrite(cmd.const_asio_buffer(), |
| customAllocWriteHandler([this, self, cmd](const ASIO_ERROR& err, size_t bytesTransferred) { |
| handleSend(err, cmd); |
| })); |
| } |
| |
| void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) { |
| Lock lock(mutex_); |
| if (pendingWriteOperations_++ > 0) { |
| pendingWriteBuffers_.emplace_back(args); |
| return; |
| } |
| auto self = shared_from_this(); |
| auto sendMessageInternal = [this, self, args] { |
| BaseCommand outgoingCmd; |
| auto buffer = Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args); |
| // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the |
| // callback is called, an invalid buffer range might be passed to the underlying socket send. |
| asyncWrite(buffer, customAllocWriteHandler( |
| [this, self, buffer](const ASIO_ERROR& err, size_t bytesTransferred) { |
| handleSendPair(err); |
| })); |
| }; |
| executor_->dispatch(sendMessageInternal); |
| } |
| |
| void ClientConnection::handleSend(const ASIO_ERROR& err, const SharedBuffer&) { |
| if (isClosed()) { |
| return; |
| } |
| if (err) { |
| LOG_WARN(cnxString() << "Could not send message on connection: " << err << " " << err.message()); |
| close(ResultDisconnected); |
| } else { |
| sendPendingCommands(); |
| } |
| } |
| |
| void ClientConnection::handleSendPair(const ASIO_ERROR& err) { |
| if (isClosed()) { |
| return; |
| } |
| if (err) { |
| LOG_WARN(cnxString() << "Could not send pair message on connection: " << err << " " << err.message()); |
| close(ResultDisconnected); |
| } else { |
| sendPendingCommands(); |
| } |
| } |
| |
| void ClientConnection::sendPendingCommands() { |
| Lock lock(mutex_); |
| |
| if (--pendingWriteOperations_ > 0) { |
| assert(!pendingWriteBuffers_.empty()); |
| auto any = pendingWriteBuffers_.front(); |
| pendingWriteBuffers_.pop_front(); |
| |
| auto self = shared_from_this(); |
| if (any.type() == typeid(SharedBuffer)) { |
| SharedBuffer buffer = std::any_cast<SharedBuffer>(any); |
| asyncWrite(buffer.const_asio_buffer(), |
| customAllocWriteHandler( |
| [this, self, buffer](const ASIO_ERROR& err, size_t) { handleSend(err, buffer); })); |
| } else { |
| assert(any.type() == typeid(std::shared_ptr<SendArguments>)); |
| |
| auto args = std::any_cast<std::shared_ptr<SendArguments>>(any); |
| BaseCommand outgoingCmd; |
| PairSharedBuffer buffer = |
| Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args); |
| |
| // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the |
| // callback is called, an invalid buffer range might be passed to the underlying socket send. |
| asyncWrite(buffer, customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) { |
| handleSendPair(err); |
| })); |
| } |
| } else { |
| // No more pending writes |
| outgoingBuffer_.reset(); |
| } |
| } |
| |
| Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuffer& cmd, int requestId, |
| const char* requestType) { |
| Lock lock(mutex_); |
| |
| if (isClosed()) { |
| lock.unlock(); |
| LOG_DEBUG(cnxString() << "Fail " << requestType << "(req_id: " << requestId |
| << ") to a closed connection"); |
| Promise<Result, ResponseData> promise; |
| promise.setFailed(ResultNotConnected); |
| return promise.getFuture(); |
| } |
| |
| auto request = insertRequest( |
| pendingRequests_, requestId, |
| [cnxString{cnxString()}, physicalAddress{physicalAddress_}, requestId, requestType]() { |
| LOG_WARN(cnxString << "Network request timeout to broker, remote: " << physicalAddress |
| << ", req_id: " << requestId << ", request: " << requestType); |
| }); |
| lock.unlock(); |
| |
| LOG_DEBUG(cnxString() << "Inserted request " << requestType << " (req_id: " << requestId << ")"); |
| if (mockingRequests_.load(std::memory_order_acquire)) { |
| if (mockServer_ == nullptr) { |
| LOG_WARN(cnxString() << "Mock server is unexpectedly null when processing " << requestType); |
| sendCommand(cmd); |
| } else if (!mockServer_->sendRequest(requestType, requestId)) { |
| sendCommand(cmd); |
| } |
| } else { |
| sendCommand(cmd); |
| } |
| return request->getFuture(); |
| } |
| |
| void ClientConnection::handleKeepAliveTimeout(const ASIO_ERROR& ec) { |
| if (isClosed() || ec) { |
| return; |
| } |
| |
| if (havePendingPingRequest_) { |
| LOG_WARN(cnxString() << "Forcing connection to close after keep-alive timeout"); |
| close(ResultDisconnected); |
| } else { |
| // Send keep alive probe to peer |
| LOG_DEBUG(cnxString() << "Sending ping message"); |
| havePendingPingRequest_ = true; |
| sendCommand(Commands::newPing()); |
| |
| // If the close operation has already called the keepAliveTimer_.reset() then the use_count will |
| // be zero And we do not attempt to dereference the pointer. |
| Lock lock(mutex_); |
| if (keepAliveTimer_) { |
| keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_)); |
| keepAliveTimer_->async_wait( |
| [this, self{shared_from_this()}](const auto& err) { handleKeepAliveTimeout(err); }); |
| } |
| lock.unlock(); |
| } |
| } |
| |
| void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec, |
| const std::vector<uint64_t>& consumerStatsRequests) { |
| if (ec) { |
| LOG_DEBUG(cnxString() << " Ignoring timer cancelled event, code[" << ec << "]"); |
| return; |
| } |
| startConsumerStatsTimer(consumerStatsRequests); |
| } |
| |
| const std::future<void>& ClientConnection::close(Result result, bool switchCluster) { |
| Lock lock(mutex_); |
| if (closeFuture_) { |
| connectPromise_.setFailed(result); |
| return *closeFuture_; |
| } |
| auto promise = std::make_shared<std::promise<void>>(); |
| closeFuture_ = promise->get_future(); |
| // The atomic update on state_ guarantees the previous modification on closeFuture_ is visible once the |
| // atomic read on state_ returns Disconnected `isClosed()`. |
| // However, it cannot prevent the race like: |
| // 1. thread 1: Check `isClosed()`, which returns false. |
| // 2. thread 2: call `close()`, now, `state_` becomes Disconnected, and `closeFuture_` is set. |
| // 3. thread 1: post the `async_write` to the `io_context`, |
| // 4. io thread: call `socket_->close()` |
| // 5. io thread: execute `async_write` on `socket_`, which has been closed |
| // However, even the race happens, it's still safe because all the socket operations happen in the same |
| // io thread, the `async_write` operation will simply fail with an error, no crash will happen. |
| state_ = Disconnected; |
| |
| // Move the internal fields to process them after `mutex_` was unlocked |
| auto consumers = std::move(consumers_); |
| auto producers = std::move(producers_); |
| auto pendingRequests = std::move(pendingRequests_); |
| auto pendingLookupRequests = std::move(pendingLookupRequests_); |
| auto pendingConsumerStatsMap = std::move(pendingConsumerStatsMap_); |
| auto pendingGetLastMessageIdRequests = std::move(pendingGetLastMessageIdRequests_); |
| auto pendingGetNamespaceTopicsRequests = std::move(pendingGetNamespaceTopicsRequests_); |
| auto pendingGetSchemaRequests = std::move(pendingGetSchemaRequests_); |
| |
| numOfPendingLookupRequest_ = 0; |
| |
| if (keepAliveTimer_) { |
| cancelTimer(*keepAliveTimer_); |
| keepAliveTimer_.reset(); |
| } |
| |
| if (consumerStatsRequestTimer_) { |
| cancelTimer(*consumerStatsRequestTimer_); |
| consumerStatsRequestTimer_.reset(); |
| } |
| |
| cancelTimer(*connectTimer_); |
| lock.unlock(); |
| int refCount = weak_from_this().use_count(); |
| if (result != ResultAlreadyClosed /* closed by the pool */ && !isResultRetryable(result)) { |
| LOG_ERROR(cnxString() << "Connection closed with " << result << " (refCnt: " << refCount << ")"); |
| } else { |
| LOG_INFO(cnxString() << "Connection disconnected (refCnt: " << refCount << ")"); |
| } |
| // Remove the connection from the pool before completing any promise |
| pool_.remove(logicalAddress_, physicalAddress_, poolIndex_, this); |
| |
| // Close the socket after removing itself from the pool so that other requests won't be able to acquire |
| // this connection after the socket is closed. |
| executor_->dispatch([this, promise, self{shared_from_this()}] { |
| // According to asio document, ip::tcp::socket and ssl::stream are unsafe as shared objects, so the |
| // methods must be called within the same implicit or explicit strand. |
| // The implementation of `ExecutorService` guarantees the internal `io_context::run()` is only called |
| // in one thread, so we can safely call the socket methods without posting to a strand instance. |
| ASIO_ERROR err; |
| socket_->shutdown(ASIO::socket_base::shutdown_both, err); |
| socket_->close(err); |
| if (err) { |
| LOG_WARN(cnxString() << "Failed to close socket: " << err.message()); |
| } |
| if (tlsSocket_) { |
| auto tlsSocket = tlsSocket_; |
| tlsSocket->async_shutdown([promise, self, tlsSocket](const auto&) { promise->set_value(); }); |
| } else { |
| promise->set_value(); |
| } |
| }); |
| |
| auto self = shared_from_this(); |
| for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) { |
| auto producer = it->second.lock(); |
| if (producer) { |
| producer->handleDisconnection(result, self); |
| } |
| } |
| |
| for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) { |
| auto consumer = it->second.lock(); |
| if (consumer) { |
| if (switchCluster) { |
| consumer->onClusterSwitching(); |
| } |
| consumer->handleDisconnection(result, self); |
| } |
| } |
| self.reset(); |
| |
| connectPromise_.setFailed(result); |
| |
| // Fail all pending requests after releasing the lock. |
| for (auto& kv : pendingRequests) { |
| kv.second->fail(result); |
| } |
| for (auto& kv : pendingLookupRequests) { |
| kv.second->fail(result); |
| } |
| for (auto& kv : pendingConsumerStatsMap) { |
| LOG_ERROR(cnxString() << " Closing Client Connection, please try again later"); |
| kv.second.setFailed(result); |
| } |
| for (auto& kv : pendingGetLastMessageIdRequests) { |
| kv.second->fail(result); |
| } |
| for (auto& kv : pendingGetNamespaceTopicsRequests) { |
| kv.second->fail(result); |
| } |
| for (auto& kv : pendingGetSchemaRequests) { |
| kv.second->fail(result); |
| } |
| return *closeFuture_; |
| } |
| |
| bool ClientConnection::isClosed() const { return state_ == Disconnected; } |
| |
| Future<Result, ClientConnectionWeakPtr> ClientConnection::getConnectFuture() { |
| return connectPromise_.getFuture(); |
| } |
| |
| void ClientConnection::registerProducer(int producerId, const ProducerImplPtr& producer) { |
| Lock lock(mutex_); |
| producers_.insert(std::make_pair(producerId, producer)); |
| } |
| |
| void ClientConnection::registerConsumer(int consumerId, const ConsumerImplPtr& consumer) { |
| Lock lock(mutex_); |
| consumers_.insert(std::make_pair(consumerId, consumer)); |
| } |
| |
| void ClientConnection::removeProducer(int producerId) { |
| Lock lock(mutex_); |
| producers_.erase(producerId); |
| } |
| |
| void ClientConnection::removeConsumer(int consumerId) { |
| Lock lock(mutex_); |
| consumers_.erase(consumerId); |
| } |
| |
| const std::string& ClientConnection::brokerAddress() const { return physicalAddress_; } |
| |
| int ClientConnection::getServerProtocolVersion() const { return serverProtocolVersion_; } |
| |
| int32_t ClientConnection::getMaxMessageSize() { return maxMessageSize_.load(std::memory_order_acquire); } |
| |
| Commands::ChecksumType ClientConnection::getChecksumType() const { |
| return getServerProtocolVersion() >= proto::v6 ? Commands::Crc32c : Commands::None; |
| } |
| |
| Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(uint64_t consumerId, |
| uint64_t requestId) { |
| Lock lock(mutex_); |
| if (isClosed()) { |
| lock.unlock(); |
| LOG_ERROR(cnxString() << " Client is not connected to the broker"); |
| auto promise = std::make_shared<GetLastMessageIdResponsePromisePtr::element_type>(); |
| promise->setFailed(ResultNotConnected); |
| return promise->getFuture(); |
| } |
| |
| auto request = |
| insertRequest(pendingGetLastMessageIdRequests_, requestId, [cnxString = cnxString(), requestId]() { |
| LOG_WARN(cnxString << "GetLastMessageId request timeout to broker, req_id: " << requestId); |
| }); |
| lock.unlock(); |
| |
| if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && |
| mockServer_->sendRequest("GET_LAST_MESSAGE_ID", requestId)) { |
| return request->getFuture(); |
| } |
| sendCommand(Commands::newGetLastMessageId(consumerId, requestId)); |
| return request->getFuture(); |
| } |
| |
| Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace( |
| const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) { |
| Lock lock(mutex_); |
| if (isClosed()) { |
| lock.unlock(); |
| LOG_ERROR(cnxString() << "Client is not connected to the broker"); |
| Promise<Result, NamespaceTopicsPtr> promise; |
| promise.setFailed(ResultNotConnected); |
| return promise.getFuture(); |
| } |
| |
| auto request = |
| insertRequest(pendingGetNamespaceTopicsRequests_, requestId, [cnxString = cnxString(), requestId]() { |
| LOG_WARN(cnxString << "GetTopicsOfNamespace request timeout to broker, req_id: " << requestId); |
| }); |
| lock.unlock(); |
| if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && |
| mockServer_->sendRequest("GET_TOPICS_OF_NAMESPACE", requestId)) { |
| return request->getFuture(); |
| } |
| sendCommand(Commands::newGetTopicsOfNamespace(nsName, mode, requestId)); |
| return request->getFuture(); |
| } |
| |
| Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& topicName, |
| const std::string& version, uint64_t requestId) { |
| Lock lock(mutex_); |
| |
| if (isClosed()) { |
| lock.unlock(); |
| LOG_ERROR(cnxString() << "Client is not connected to the broker"); |
| Promise<Result, SchemaInfo> promise; |
| promise.setFailed(ResultNotConnected); |
| return promise.getFuture(); |
| } |
| |
| auto request = |
| insertRequest(pendingGetSchemaRequests_, requestId, [cnxString = cnxString(), requestId]() { |
| LOG_WARN(cnxString << "GetSchema request timeout to broker, req_id: " << requestId); |
| }); |
| lock.unlock(); |
| |
| if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && |
| mockServer_->sendRequest("GET_SCHEMA", requestId)) { |
| return request->getFuture(); |
| } |
| sendCommand(Commands::newGetSchema(topicName, version, requestId)); |
| return request->getFuture(); |
| } |
| |
| void ClientConnection::checkServerError(ServerError error, const std::string& message) { |
| pulsar::adaptor::checkServerError(*this, error, message); |
| } |
| |
| void ClientConnection::handleSendReceipt(const proto::CommandSendReceipt& sendReceipt) { |
| int producerId = sendReceipt.producer_id(); |
| uint64_t sequenceId = sendReceipt.sequence_id(); |
| const proto::MessageIdData& messageIdData = sendReceipt.message_id(); |
| auto messageId = toMessageId(messageIdData); |
| |
| LOG_DEBUG(cnxString() << "Got receipt for producer: " << producerId << " -- msg: " << sequenceId |
| << "-- message id: " << messageId); |
| |
| Lock lock(mutex_); |
| auto it = producers_.find(producerId); |
| if (it != producers_.end()) { |
| ProducerImplPtr producer = it->second.lock(); |
| lock.unlock(); |
| |
| if (producer) { |
| if (!producer->ackReceived(sequenceId, messageId)) { |
| // If the producer fails to process the ack, we need to close the connection |
| // to give it a chance to recover from there |
| close(ResultDisconnected); |
| } |
| } |
| } else { |
| LOG_ERROR(cnxString() << "Got invalid producer Id in SendReceipt: " // |
| << producerId << " -- msg: " << sequenceId); |
| } |
| } |
| |
| void ClientConnection::handleSendError(const proto::CommandSendError& error) { |
| LOG_WARN(cnxString() << "Received send error from server: " << error.message()); |
| if (ChecksumError == error.error()) { |
| long producerId = error.producer_id(); |
| long sequenceId = error.sequence_id(); |
| Lock lock(mutex_); |
| auto it = producers_.find(producerId); |
| if (it != producers_.end()) { |
| ProducerImplPtr producer = it->second.lock(); |
| lock.unlock(); |
| |
| if (producer) { |
| if (!producer->removeCorruptMessage(sequenceId)) { |
| // If the producer fails to remove corrupt msg, we need to close the |
| // connection to give it a chance to recover from there |
| close(ResultDisconnected); |
| } |
| } |
| } |
| } else { |
| close(ResultDisconnected); |
| } |
| } |
| |
| void ClientConnection::handleSuccess(const proto::CommandSuccess& success) { |
| LOG_DEBUG(cnxString() << "Received success response from server. req_id: " << success.request_id()); |
| |
| Lock lock(mutex_); |
| auto it = pendingRequests_.find(success.request_id()); |
| if (it != pendingRequests_.end()) { |
| auto request = std::move(it->second); |
| pendingRequests_.erase(it); |
| lock.unlock(); |
| |
| request->complete({}); |
| } |
| } |
| |
| void ClientConnection::handlePartitionedMetadataResponse( |
| const proto::CommandPartitionedTopicMetadataResponse& partitionMetadataResponse) { |
| LOG_DEBUG(cnxString() << "Received partition-metadata response from server. req_id: " |
| << partitionMetadataResponse.request_id()); |
| |
| Lock lock(mutex_); |
| auto it = pendingLookupRequests_.find(partitionMetadataResponse.request_id()); |
| if (it != pendingLookupRequests_.end()) { |
| auto request = std::move(it->second); |
| pendingLookupRequests_.erase(it); |
| numOfPendingLookupRequest_--; |
| lock.unlock(); |
| |
| if (!partitionMetadataResponse.has_response() || |
| (partitionMetadataResponse.response() == |
| proto::CommandPartitionedTopicMetadataResponse::Failed)) { |
| if (partitionMetadataResponse.has_error()) { |
| LOG_ERROR(cnxString() << "Failed partition-metadata lookup req_id: " |
| << partitionMetadataResponse.request_id() |
| << " error: " << partitionMetadataResponse.error() |
| << " msg: " << partitionMetadataResponse.message()); |
| checkServerError(partitionMetadataResponse.error(), partitionMetadataResponse.message()); |
| request->fail( |
| getResult(partitionMetadataResponse.error(), partitionMetadataResponse.message())); |
| } else { |
| LOG_ERROR(cnxString() << "Failed partition-metadata lookup req_id: " |
| << partitionMetadataResponse.request_id() << " with empty response: "); |
| request->fail(ResultConnectError); |
| } |
| } else { |
| LookupDataResultPtr lookupResultPtr = std::make_shared<LookupDataResult>(); |
| lookupResultPtr->setPartitions(partitionMetadataResponse.partitions()); |
| request->complete(lookupResultPtr); |
| } |
| |
| } else { |
| LOG_WARN("Received unknown request id from server: " << partitionMetadataResponse.request_id()); |
| } |
| } |
| |
| void ClientConnection::handleConsumerStatsResponse( |
| const proto::CommandConsumerStatsResponse& consumerStatsResponse) { |
| LOG_DEBUG(cnxString() << "ConsumerStatsResponse command - Received consumer stats " |
| "response from server. req_id: " |
| << consumerStatsResponse.request_id()); |
| Lock lock(mutex_); |
| auto it = pendingConsumerStatsMap_.find(consumerStatsResponse.request_id()); |
| if (it != pendingConsumerStatsMap_.end()) { |
| auto request = std::move(it->second); |
| pendingConsumerStatsMap_.erase(it); |
| lock.unlock(); |
| |
| if (consumerStatsResponse.has_error_code()) { |
| if (consumerStatsResponse.has_error_message()) { |
| LOG_ERROR(cnxString() |
| << " Failed to get consumer stats - " << consumerStatsResponse.error_message()); |
| } |
| request.setFailed( |
| getResult(consumerStatsResponse.error_code(), consumerStatsResponse.error_message())); |
| } else { |
| LOG_DEBUG(cnxString() << "ConsumerStatsResponse command - Received consumer stats " |
| "response from server. req_id: " |
| << consumerStatsResponse.request_id() << " Stats: "); |
| BrokerConsumerStatsImpl brokerStats( |
| consumerStatsResponse.msgrateout(), consumerStatsResponse.msgthroughputout(), |
| consumerStatsResponse.msgrateredeliver(), consumerStatsResponse.consumername(), |
| consumerStatsResponse.availablepermits(), consumerStatsResponse.unackedmessages(), |
| consumerStatsResponse.blockedconsumeronunackedmsgs(), consumerStatsResponse.address(), |
| consumerStatsResponse.connectedsince(), consumerStatsResponse.type(), |
| consumerStatsResponse.msgrateexpired(), consumerStatsResponse.msgbacklog()); |
| request.setValue(brokerStats); |
| } |
| } else { |
| LOG_WARN("ConsumerStatsResponse command - Received unknown request id from server: " |
| << consumerStatsResponse.request_id()); |
| } |
| } |
| |
| void ClientConnection::handleLookupTopicRespose( |
| const proto::CommandLookupTopicResponse& lookupTopicResponse) { |
| Lock lock(mutex_); |
| auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id()); |
| if (it != pendingLookupRequests_.end()) { |
| auto request = std::move(it->second); |
| pendingLookupRequests_.erase(it); |
| numOfPendingLookupRequest_--; |
| lock.unlock(); |
| |
| if (!lookupTopicResponse.has_response() || |
| (lookupTopicResponse.response() == proto::CommandLookupTopicResponse::Failed)) { |
| if (lookupTopicResponse.has_error()) { |
| LOG_ERROR(cnxString() << "Failed lookup req_id: " << lookupTopicResponse.request_id() |
| << " error: " << lookupTopicResponse.error() |
| << " msg: " << lookupTopicResponse.message()); |
| checkServerError(lookupTopicResponse.error(), lookupTopicResponse.message()); |
| request->fail(getResult(lookupTopicResponse.error(), lookupTopicResponse.message())); |
| } else { |
| LOG_ERROR(cnxString() << "Failed lookup req_id: " << lookupTopicResponse.request_id() |
| << " with empty response: "); |
| request->fail(ResultConnectError); |
| } |
| } else { |
| LOG_DEBUG(cnxString() << "Received lookup response from server. req_id: " |
| << lookupTopicResponse.request_id() // |
| << " -- broker-url: " << lookupTopicResponse.brokerserviceurl() |
| << " -- broker-tls-url: " // |
| << lookupTopicResponse.brokerserviceurltls() |
| << " authoritative: " << lookupTopicResponse.authoritative() // |
| << " redirect: " << lookupTopicResponse.response()); |
| LookupDataResultPtr lookupResultPtr = std::make_shared<LookupDataResult>(); |
| |
| if (tlsSocket_) { |
| lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurltls()); |
| } else { |
| lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurl()); |
| } |
| |
| lookupResultPtr->setBrokerUrlTls(lookupTopicResponse.brokerserviceurltls()); |
| lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative()); |
| lookupResultPtr->setRedirect(lookupTopicResponse.response() == |
| proto::CommandLookupTopicResponse::Redirect); |
| lookupResultPtr->setShouldProxyThroughServiceUrl(lookupTopicResponse.proxy_through_service_url()); |
| request->complete(lookupResultPtr); |
| } |
| |
| } else { |
| LOG_WARN("Received unknown request id from server: " << lookupTopicResponse.request_id()); |
| } |
| } |
| |
| void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess& producerSuccess) { |
| LOG_DEBUG(cnxString() << "Received success producer response from server. req_id: " |
| << producerSuccess.request_id() // |
| << " -- producer name: " << producerSuccess.producer_name()); |
| |
| Lock lock(mutex_); |
| auto it = pendingRequests_.find(producerSuccess.request_id()); |
| if (it != pendingRequests_.end()) { |
| auto request = it->second; |
| if (!producerSuccess.producer_ready()) { |
| LOG_INFO(cnxString() << " Producer " << producerSuccess.producer_name() |
| << " has been queued up at broker. req_id: " |
| << producerSuccess.request_id()); |
| request->disableTimeout(); |
| lock.unlock(); |
| } else { |
| pendingRequests_.erase(it); |
| lock.unlock(); |
| ResponseData data; |
| data.producerName = producerSuccess.producer_name(); |
| data.lastSequenceId = producerSuccess.last_sequence_id(); |
| if (producerSuccess.has_schema_version()) { |
| data.schemaVersion = producerSuccess.schema_version(); |
| } |
| if (producerSuccess.has_topic_epoch()) { |
| data.topicEpoch = std::make_optional(producerSuccess.topic_epoch()); |
| } else { |
| data.topicEpoch = std::nullopt; |
| } |
| request->complete(data); |
| } |
| } |
| } |
| |
| void ClientConnection::handleError(const proto::CommandError& error) { |
| Result result = getResult(error.error(), error.message()); |
| LOG_WARN(cnxString() << "Received error response from server: " << result |
| << (error.has_message() ? (" (" + error.message() + ")") : "") |
| << " -- req_id: " << error.request_id()); |
| |
| Lock lock(mutex_); |
| |
| auto it = pendingRequests_.find(error.request_id()); |
| if (it != pendingRequests_.end()) { |
| auto request = std::move(it->second); |
| pendingRequests_.erase(it); |
| lock.unlock(); |
| |
| request->fail(result); |
| } else { |
| auto it = pendingGetLastMessageIdRequests_.find(error.request_id()); |
| if (it != pendingGetLastMessageIdRequests_.end()) { |
| auto request = std::move(it->second); |
| pendingGetLastMessageIdRequests_.erase(it); |
| lock.unlock(); |
| |
| request->fail(result); |
| } else { |
| auto it = pendingGetNamespaceTopicsRequests_.find(error.request_id()); |
| if (it != pendingGetNamespaceTopicsRequests_.end()) { |
| auto request = std::move(it->second); |
| pendingGetNamespaceTopicsRequests_.erase(it); |
| lock.unlock(); |
| |
| request->fail(result); |
| } else { |
| lock.unlock(); |
| } |
| } |
| } |
| } |
| |
| std::string ClientConnection::getMigratedBrokerServiceUrl( |
| const proto::CommandTopicMigrated& commandTopicMigrated) { |
| if (tlsSocket_) { |
| if (commandTopicMigrated.has_brokerserviceurltls()) { |
| return commandTopicMigrated.brokerserviceurltls(); |
| } |
| } else if (commandTopicMigrated.has_brokerserviceurl()) { |
| return commandTopicMigrated.brokerserviceurl(); |
| } |
| return ""; |
| } |
| |
| void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& commandTopicMigrated) { |
| const long resourceId = commandTopicMigrated.resource_id(); |
| const std::string migratedBrokerServiceUrl = getMigratedBrokerServiceUrl(commandTopicMigrated); |
| |
| if (migratedBrokerServiceUrl.empty()) { |
| LOG_WARN("Failed to find the migrated broker url for resource:" |
| << resourceId |
| << (commandTopicMigrated.has_brokerserviceurl() |
| ? ", migratedBrokerUrl: " + commandTopicMigrated.brokerserviceurl() |
| : "") |
| << (commandTopicMigrated.has_brokerserviceurltls() |
| ? ", migratedBrokerUrlTls: " + commandTopicMigrated.brokerserviceurltls() |
| : "")); |
| return; |
| } |
| |
| Lock lock(mutex_); |
| if (commandTopicMigrated.resource_type() == proto::CommandTopicMigrated_ResourceType_Producer) { |
| auto it = producers_.find(resourceId); |
| if (it != producers_.end()) { |
| auto producer = it->second.lock(); |
| producer->setRedirectedClusterURI(migratedBrokerServiceUrl); |
| unsafeRemovePendingRequest(producer->firstRequestIdAfterConnect()); |
| LOG_INFO("Producer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl); |
| } else { |
| LOG_WARN("Got invalid producer Id in topicMigrated command: " << resourceId); |
| } |
| } else { |
| auto it = consumers_.find(resourceId); |
| if (it != consumers_.end()) { |
| auto consumer = it->second.lock(); |
| consumer->setRedirectedClusterURI(migratedBrokerServiceUrl); |
| unsafeRemovePendingRequest(consumer->firstRequestIdAfterConnect()); |
| LOG_INFO("Consumer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl); |
| } else { |
| LOG_WARN("Got invalid consumer Id in topicMigrated command: " << resourceId); |
| } |
| } |
| } |
| |
| optional<std::string> ClientConnection::getAssignedBrokerServiceUrl( |
| const proto::CommandCloseProducer& closeProducer) { |
| if (tlsSocket_) { |
| if (closeProducer.has_assignedbrokerserviceurltls()) { |
| return closeProducer.assignedbrokerserviceurltls(); |
| } |
| } else if (closeProducer.has_assignedbrokerserviceurl()) { |
| return closeProducer.assignedbrokerserviceurl(); |
| } |
| return {}; |
| } |
| |
| optional<std::string> ClientConnection::getAssignedBrokerServiceUrl( |
| const proto::CommandCloseConsumer& closeConsumer) { |
| if (tlsSocket_) { |
| if (closeConsumer.has_assignedbrokerserviceurltls()) { |
| return closeConsumer.assignedbrokerserviceurltls(); |
| } |
| } else if (closeConsumer.has_assignedbrokerserviceurl()) { |
| return closeConsumer.assignedbrokerserviceurl(); |
| } |
| return {}; |
| } |
| |
| void ClientConnection::handleCloseProducer(const proto::CommandCloseProducer& closeProducer) { |
| int producerId = closeProducer.producer_id(); |
| |
| LOG_DEBUG("Broker notification of Closed producer: " << producerId); |
| |
| Lock lock(mutex_); |
| auto it = producers_.find(producerId); |
| if (it != producers_.end()) { |
| ProducerImplPtr producer = it->second.lock(); |
| producers_.erase(it); |
| lock.unlock(); |
| |
| if (producer) { |
| auto assignedBrokerServiceUrl = getAssignedBrokerServiceUrl(closeProducer); |
| producer->disconnectProducer(assignedBrokerServiceUrl); |
| } |
| } else { |
| LOG_ERROR(cnxString() << "Got invalid producer Id in closeProducer command: " << producerId); |
| } |
| } |
| |
| void ClientConnection::handleCloseConsumer(const proto::CommandCloseConsumer& closeconsumer) { |
| int consumerId = closeconsumer.consumer_id(); |
| |
| LOG_DEBUG("Broker notification of Closed consumer: " << consumerId); |
| |
| Lock lock(mutex_); |
| auto it = consumers_.find(consumerId); |
| if (it != consumers_.end()) { |
| ConsumerImplPtr consumer = it->second.lock(); |
| consumers_.erase(it); |
| lock.unlock(); |
| |
| if (consumer) { |
| auto assignedBrokerServiceUrl = getAssignedBrokerServiceUrl(closeconsumer); |
| consumer->disconnectConsumer(assignedBrokerServiceUrl); |
| } |
| } else { |
| LOG_ERROR(cnxString() << "Got invalid consumer Id in closeConsumer command: " << consumerId); |
| } |
| } |
| |
| void ClientConnection::handleAuthChallenge() { |
| LOG_DEBUG(cnxString() << "Received auth challenge from broker"); |
| |
| Result result; |
| SharedBuffer buffer = Commands::newAuthResponse(authentication_, result); |
| if (result != ResultOk) { |
| LOG_ERROR(cnxString() << "Failed to send auth response: " << result); |
| close(result); |
| return; |
| } |
| auto self = shared_from_this(); |
| asyncWrite(buffer.const_asio_buffer(), |
| customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) { |
| handleSentAuthResponse(err, buffer); |
| })); |
| } |
| |
| void ClientConnection::handleGetLastMessageIdResponse( |
| const proto::CommandGetLastMessageIdResponse& getLastMessageIdResponse) { |
| LOG_DEBUG(cnxString() << "Received getLastMessageIdResponse from server. req_id: " |
| << getLastMessageIdResponse.request_id()); |
| |
| Lock lock(mutex_); |
| auto it = pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id()); |
| |
| if (it != pendingGetLastMessageIdRequests_.end()) { |
| auto request = std::move(it->second); |
| pendingGetLastMessageIdRequests_.erase(it); |
| lock.unlock(); |
| |
| if (getLastMessageIdResponse.has_consumer_mark_delete_position()) { |
| request->complete({toMessageId(getLastMessageIdResponse.last_message_id()), |
| toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())}); |
| } else { |
| request->complete({toMessageId(getLastMessageIdResponse.last_message_id())}); |
| } |
| } else { |
| lock.unlock(); |
| LOG_WARN("getLastMessageIdResponse command - Received unknown request id from server: " |
| << getLastMessageIdResponse.request_id()); |
| } |
| } |
| |
| void ClientConnection::handleGetTopicOfNamespaceResponse( |
| const proto::CommandGetTopicsOfNamespaceResponse& response) { |
| LOG_DEBUG(cnxString() << "Received GetTopicsOfNamespaceResponse from server. req_id: " |
| << response.request_id() << " topicsSize" << response.topics_size()); |
| |
| Lock lock(mutex_); |
| auto it = pendingGetNamespaceTopicsRequests_.find(response.request_id()); |
| |
| if (it != pendingGetNamespaceTopicsRequests_.end()) { |
| auto request = std::move(it->second); |
| pendingGetNamespaceTopicsRequests_.erase(it); |
| lock.unlock(); |
| |
| int numTopics = response.topics_size(); |
| std::set<std::string> topicSet; |
| // get all topics |
| for (int i = 0; i < numTopics; i++) { |
| // remove partition part |
| const std::string& topicName = response.topics(i); |
| int pos = topicName.find("-partition-"); |
| std::string filteredName = topicName.substr(0, pos); |
| |
| // filter duped topic name |
| if (topicSet.find(filteredName) == topicSet.end()) { |
| topicSet.insert(filteredName); |
| } |
| } |
| |
| NamespaceTopicsPtr topicsPtr = |
| std::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end()); |
| |
| request->complete(topicsPtr); |
| } else { |
| lock.unlock(); |
| LOG_WARN( |
| "GetTopicsOfNamespaceResponse command - Received unknown request id from " |
| "server: " |
| << response.request_id()); |
| } |
| } |
| |
| void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResponse& response) { |
| LOG_DEBUG(cnxString() << "Received GetSchemaResponse from server. req_id: " << response.request_id()); |
| Lock lock(mutex_); |
| auto it = pendingGetSchemaRequests_.find(response.request_id()); |
| if (it != pendingGetSchemaRequests_.end()) { |
| auto request = std::move(it->second); |
| pendingGetSchemaRequests_.erase(it); |
| lock.unlock(); |
| |
| if (response.has_error_code()) { |
| Result result = getResult(response.error_code(), response.error_message()); |
| if (response.error_code() != proto::TopicNotFound) { |
| LOG_WARN(cnxString() << "Received error GetSchemaResponse from server " << result |
| << (response.has_error_message() |
| ? (" (" + response.error_message() + ")") |
| : "") |
| << " -- req_id: " << response.request_id()); |
| } |
| request->fail(result); |
| return; |
| } |
| |
| const auto& schema = response.schema(); |
| const auto& properMap = schema.properties(); |
| StringMap properties; |
| for (auto kv = properMap.begin(); kv != properMap.end(); ++kv) { |
| properties[kv->key()] = kv->value(); |
| } |
| SchemaInfo schemaInfo(static_cast<SchemaType>(schema.type()), "", schema.schema_data(), properties); |
| request->complete(schemaInfo); |
| } else { |
| lock.unlock(); |
| LOG_WARN( |
| "GetSchemaResponse command - Received unknown request id from " |
| "server: " |
| << response.request_id()); |
| } |
| } |
| |
| void ClientConnection::handleAckResponse(const proto::CommandAckResponse& response) { |
| LOG_DEBUG(cnxString() << "Received AckResponse from server. req_id: " << response.request_id()); |
| |
| Lock lock(mutex_); |
| auto it = pendingRequests_.find(response.request_id()); |
| if (it == pendingRequests_.cend()) { |
| lock.unlock(); |
| LOG_WARN("Cannot find the cached AckResponse whose req_id is " << response.request_id()); |
| return; |
| } |
| |
| auto request = std::move(it->second); |
| pendingRequests_.erase(it); |
| lock.unlock(); |
| |
| if (response.has_error()) { |
| request->fail(getResult(response.error(), "")); |
| } else { |
| request->complete({}); |
| } |
| } |
| |
| void ClientConnection::unsafeRemovePendingRequest(long requestId) { |
| auto it = pendingRequests_.find(requestId); |
| if (it != pendingRequests_.end()) { |
| auto request = std::move(it->second); |
| pendingRequests_.erase(it); |
| request->fail(ResultDisconnected); |
| } |
| } |
| |
| } // namespace pulsar |